diff options
Diffstat (limited to 'src/cmd/upas/q')
-rw-r--r-- | src/cmd/upas/q/mkfile | 22 | ||||
-rw-r--r-- | src/cmd/upas/q/qer.c | 193 | ||||
-rw-r--r-- | src/cmd/upas/q/runq.c | 766 |
3 files changed, 981 insertions, 0 deletions
diff --git a/src/cmd/upas/q/mkfile b/src/cmd/upas/q/mkfile new file mode 100644 index 00000000..0aa5e52c --- /dev/null +++ b/src/cmd/upas/q/mkfile @@ -0,0 +1,22 @@ +<$PLAN9/src/mkhdr + +TARG = qer\ + runq\ + +OFILES= + +HFILES=../common/common.h\ + ../common/sys.h\ + +LIB=../common/libcommon.a\ + +BIN=$PLAN9/bin/upas + +UPDATE=\ + mkfile\ + $HFILES\ + ${OFILES:%.$O=%.c}\ + ${TARG:%=%.c}\ + +<$PLAN9/src/mkmany +CFLAGS=$CFLAGS -I../common diff --git a/src/cmd/upas/q/qer.c b/src/cmd/upas/q/qer.c new file mode 100644 index 00000000..10882896 --- /dev/null +++ b/src/cmd/upas/q/qer.c @@ -0,0 +1,193 @@ +#include "common.h" + +typedef struct Qfile Qfile; +struct Qfile +{ + Qfile *next; + char *name; + char *tname; +} *files; + +char *user; +int isnone; + +int copy(Qfile*); + +void +usage(void) +{ + fprint(2, "usage: qer [-f file] [-q dir] q-root description reply-to arg-list\n"); + exits("usage"); +} + +void +error(char *f, char *a) +{ + char err[Errlen+1]; + char buf[256]; + + rerrstr(err, sizeof(err)); + snprint(buf, sizeof(buf), f, a); + fprint(2, "qer: %s: %s\n", buf, err); + exits(buf); +} + +void +main(int argc, char**argv) +{ + Dir *dir; + String *f, *c; + int fd; + char file[1024]; + char buf[1024]; + long n; + char *cp, *qdir; + int i; + Qfile *q, **l; + + l = &files; + qdir = 0; + + ARGBEGIN { + case 'f': + q = malloc(sizeof(Qfile)); + q->name = ARGF(); + q->next = *l; + *l = q; + break; + case 'q': + qdir = ARGF(); + if(qdir == 0) + usage(); + break; + default: + usage(); + } ARGEND; + + if(argc < 3) + usage(); + user = getuser(); + isnone = (qdir != 0) || (strcmp(user, "none") == 0); + + if(qdir == 0) { + qdir = user; + if(qdir == 0) + error("unknown user", 0); + } + snprint(file, sizeof(file), "%s/%s", argv[0], qdir); + + /* + * data file name + */ + f = s_copy(file); + s_append(f, "/D.XXXXXX"); + mktemp(s_to_c(f)); + cp = utfrrune(s_to_c(f), '/'); + cp++; + + /* + * create directory and data file. once the data file + * exists, runq won't remove the directory + */ + fd = -1; + for(i = 0; i < 10; i++){ + int perm; + + dir = dirstat(file); + if(dir == nil){ + perm = isnone?0777:0775; + if(sysmkdir(file, perm) < 0) + continue; + } else { + if((dir->qid.type&QTDIR)==0) + error("not a directory %s", file); + } + perm = isnone?0664:0660; + fd = create(s_to_c(f), OWRITE, perm); + if(fd >= 0) + break; + sleep(250); + } + if(fd < 0) + error("creating data file %s", s_to_c(f)); + + /* + * copy over associated files + */ + if(files){ + *cp = 'F'; + for(q = files; q; q = q->next){ + q->tname = strdup(s_to_c(f)); + if(copy(q) < 0) + error("copying %s to queue", q->name); + (*cp)++; + } + } + + /* + * copy in the data file + */ + i = 0; + while((n = read(0, buf, sizeof(buf)-1)) > 0){ + if(i++ == 0 && strncmp(buf, "From", 4) != 0){ + buf[n] = 0; + syslog(0, "smtp", "qer usys data starts with %-40.40s\n", buf); + } + if(write(fd, buf, n) != n) + error("writing data file %s", s_to_c(f)); + } +/* if(n < 0) + error("reading input"); */ + close(fd); + + /* + * create control file + */ + *cp = 'C'; + fd = syscreatelocked(s_to_c(f), OWRITE, 0664); + if(fd < 0) + error("creating control file %s", s_to_c(f)); + c = s_new(); + for(i = 1; i < argc; i++){ + s_append(c, argv[i]); + s_append(c, " "); + } + for(q = files; q; q = q->next){ + s_append(c, q->tname); + s_append(c, " "); + } + s_append(c, "\n"); + if(write(fd, s_to_c(c), strlen(s_to_c(c))) < 0) { + sysunlockfile(fd); + error("writing control file %s", s_to_c(f)); + } + sysunlockfile(fd); + exits(0); +} + +int +copy(Qfile *q) +{ + int from, to, n; + char buf[4096]; + + from = open(q->name, OREAD); + if(from < 0) + return -1; + to = create(q->tname, OWRITE, 0660); + if(to < 0){ + close(from); + return -1; + } + for(;;){ + n = read(from, buf, sizeof(buf)); + if(n <= 0) + break; + n = write(to, buf, n); + if(n < 0) + break; + } + close(to); + close(from); + return n; +} diff --git a/src/cmd/upas/q/runq.c b/src/cmd/upas/q/runq.c new file mode 100644 index 00000000..7ab055ed --- /dev/null +++ b/src/cmd/upas/q/runq.c @@ -0,0 +1,766 @@ +#include "common.h" +#include <ctype.h> + +void doalldirs(void); +void dodir(char*); +void dofile(Dir*); +void rundir(char*); +char* file(char*, char); +void warning(char*, void*); +void error(char*, void*); +int returnmail(char**, char*, char*); +void logit(char*, char*, char**); +void doload(int); + +#define HUNK 32 +char *cmd; +char *root; +int debug; +int giveup = 2*24*60*60; +int load; +int limit; + +/* the current directory */ +Dir *dirbuf; +long ndirbuf = 0; +int nfiles; +char *curdir; + +char *runqlog = "runq"; + +int *pidlist; +char **badsys; /* array of recalcitrant systems */ +int nbad; +int npid = 50; +int sflag; /* single thread per directory */ +int aflag; /* all directories */ +int Eflag; /* ignore E.xxxxxx dates */ +int Rflag; /* no giving up, ever */ + +void +usage(void) +{ + fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n"); + exits(""); +} + +void +main(int argc, char **argv) +{ + char *qdir, *x; + + qdir = 0; + + ARGBEGIN{ + case 'l': + x = ARGF(); + if(x == 0) + usage(); + load = atoi(x); + if(load < 0) + load = 0; + break; + case 'E': + Eflag++; + break; + case 'R': /* no giving up -- just leave stuff in the queue */ + Rflag++; + break; + case 'a': + aflag++; + break; + case 'd': + debug++; + break; + case 'r': + limit = atoi(ARGF()); + break; + case 's': + sflag++; + break; + case 't': + giveup = 60*60*atoi(ARGF()); + break; + case 'q': + qdir = ARGF(); + if(qdir == 0) + usage(); + break; + case 'n': + npid = atoi(ARGF()); + if(npid == 0) + usage(); + break; + }ARGEND; + + if(argc != 2) + usage(); + + pidlist = malloc(npid*sizeof(*pidlist)); + if(pidlist == 0) + error("can't malloc", 0); + + if(aflag == 0 && qdir == 0) { + qdir = getuser(); + if(qdir == 0) + error("unknown user", 0); + } + root = argv[0]; + cmd = argv[1]; + + if(chdir(root) < 0) + error("can't cd to %s", root); + + doload(1); + if(aflag) + doalldirs(); + else + dodir(qdir); + doload(0); + exits(0); +} + +int +emptydir(char *name) +{ + int fd; + long n; + char buf[2048]; + + fd = open(name, OREAD); + if(fd < 0) + return 1; + n = read(fd, buf, sizeof(buf)); + close(fd); + if(n <= 0) { + if(debug) + fprint(2, "removing directory %s\n", name); + syslog(0, runqlog, "rmdir %s", name); + sysremove(name); + return 1; + } + return 0; +} + +int +forkltd(void) +{ + int i; + int pid; + + for(i = 0; i < npid; i++){ + if(pidlist[i] <= 0) + break; + } + + while(i >= npid){ + pid = waitpid(); + if(pid < 0){ + syslog(0, runqlog, "forkltd confused"); + exits(0); + } + + for(i = 0; i < npid; i++) + if(pidlist[i] == pid) + break; + } + pidlist[i] = fork(); + return pidlist[i]; +} + +/* + * run all user directories, must be bootes (or root on unix) to do this + */ +void +doalldirs(void) +{ + Dir *db; + int fd; + long i, n; + + + fd = open(".", OREAD); + if(fd == -1){ + warning("reading %s", root); + return; + } + n = sysdirreadall(fd, &db); + if(n > 0){ + for(i=0; i<n; i++){ + if(db[i].qid.type & QTDIR){ + if(emptydir(db[i].name)) + continue; + switch(forkltd()){ + case -1: + syslog(0, runqlog, "out of procs"); + doload(0); + exits(0); + case 0: + if(sysdetach() < 0) + error("%r", 0); + dodir(db[i].name); + exits(0); + default: + break; + } + } + } + free(db); + } + close(fd); +} + +/* + * cd to a user directory and run it + */ +void +dodir(char *name) +{ + curdir = name; + + if(chdir(name) < 0){ + warning("cd to %s", name); + return; + } + if(debug) + fprint(2, "running %s\n", name); + rundir(name); + chdir(".."); +} + +/* + * run the current directory + */ +void +rundir(char *name) +{ + int fd; + long i; + + if(aflag && sflag) + fd = sysopenlocked(".", OREAD); + else + fd = open(".", OREAD); + if(fd == -1){ + warning("reading %s", name); + return; + } + nfiles = sysdirreadall(fd, &dirbuf); + if(nfiles > 0){ + for(i=0; i<nfiles; i++){ + if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.') + continue; + dofile(&dirbuf[i]); + } + free(dirbuf); + } + if(aflag && sflag) + sysunlockfile(fd); + else + close(fd); +} + +/* + * free files matching name in the current directory + */ +void +remmatch(char *name) +{ + long i; + + syslog(0, runqlog, "removing %s/%s", curdir, name); + + for(i=0; i<nfiles; i++){ + if(strcmp(&dirbuf[i].name[1], &name[1]) == 0) + sysremove(dirbuf[i].name); + } + + /* error file (may have) appeared after we read the directory */ + /* stomp on data file in case of phase error */ + sysremove(file(name, 'D')); + sysremove(file(name, 'E')); +} + +/* + * like trylock, but we've already got the lock on fd, + * and don't want an L. lock file. + */ +static Mlock * +keeplockalive(char *path, int fd) +{ + char buf[1]; + Mlock *l; + + l = malloc(sizeof(Mlock)); + if(l == 0) + return 0; + l->fd = fd; + l->name = s_new(); + s_append(l->name, path); + + /* fork process to keep lock alive until sysunlock(l) */ + switch(l->pid = rfork(RFPROC)){ + default: + break; + case 0: + fd = l->fd; + for(;;){ + sleep(1000*60); + if(pread(fd, buf, 1, 0) < 0) + break; + } + _exits(0); + } + return l; +} + +/* + * try a message + */ +void +dofile(Dir *dp) +{ + Dir *d; + int dfd, ac, dtime, efd, pid, i, etime; + char *buf, *cp, **av; + Waitmsg *wm; + Biobuf *b; + Mlock *l = nil; + + if(debug) + fprint(2, "dofile %s\n", dp->name); + /* + * if no data file or empty control or data file, just clean up + * the empty control file must be 15 minutes old, to minimize the + * chance of a race. + */ + d = dirstat(file(dp->name, 'D')); + if(d == nil){ + syslog(0, runqlog, "no data file for %s", dp->name); + remmatch(dp->name); + return; + } + if(dp->length == 0){ + if(time(0)-dp->mtime > 15*60){ + syslog(0, runqlog, "empty ctl file for %s", dp->name); + remmatch(dp->name); + } + return; + } + dtime = d->mtime; + free(d); + + /* + * retry times depend on the age of the errors file + */ + if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){ + etime = d->mtime; + free(d); + if(etime - dtime < 60*60){ + /* up to the first hour, try every 15 minutes */ + if(time(0) - etime < 15*60) + return; + } else { + /* after the first hour, try once an hour */ + if(time(0) - etime < 60*60) + return; + } + + } + + /* + * open control and data + */ + b = sysopen(file(dp->name, 'C'), "rl", 0660); + if(b == 0) { + if(debug) + fprint(2, "can't open %s: %r\n", file(dp->name, 'C')); + return; + } + dfd = open(file(dp->name, 'D'), OREAD); + if(dfd < 0){ + if(debug) + fprint(2, "can't open %s: %r\n", file(dp->name, 'D')); + Bterm(b); + sysunlockfile(Bfildes(b)); + return; + } + + /* + * make arg list + * - read args into (malloc'd) buffer + * - malloc a vector and copy pointers to args into it + */ + buf = malloc(dp->length+1); + if(buf == 0){ + warning("buffer allocation", 0); + Bterm(b); + sysunlockfile(Bfildes(b)); + close(dfd); + return; + } + if(Bread(b, buf, dp->length) != dp->length){ + warning("reading control file %s\n", dp->name); + Bterm(b); + sysunlockfile(Bfildes(b)); + close(dfd); + free(buf); + return; + } + buf[dp->length] = 0; + av = malloc(2*sizeof(char*)); + if(av == 0){ + warning("argv allocation", 0); + close(dfd); + free(buf); + Bterm(b); + sysunlockfile(Bfildes(b)); + return; + } + for(ac = 1, cp = buf; *cp; ac++){ + while(isspace(*cp)) + *cp++ = 0; + if(*cp == 0) + break; + + av = realloc(av, (ac+2)*sizeof(char*)); + if(av == 0){ + warning("argv allocation", 0); + close(dfd); + free(buf); + Bterm(b); + sysunlockfile(Bfildes(b)); + return; + } + av[ac] = cp; + while(*cp && !isspace(*cp)){ + if(*cp++ == '"'){ + while(*cp && *cp != '"') + cp++; + if(*cp) + cp++; + } + } + } + av[0] = cmd; + av[ac] = 0; + + if(!Eflag &&time(0) - dtime > giveup){ + if(returnmail(av, dp->name, "Giveup") != 0) + logit("returnmail failed", dp->name, av); + remmatch(dp->name); + goto done; + } + + for(i = 0; i < nbad; i++){ + if(strcmp(av[3], badsys[i]) == 0) + goto done; + } + + /* + * Ken's fs, for example, gives us 5 minutes of inactivity before + * the lock goes stale, so we have to keep reading it. + */ + l = keeplockalive(file(dp->name, 'C'), Bfildes(b)); + + /* + * transfer + */ + pid = fork(); + switch(pid){ + case -1: + sysunlock(l); + sysunlockfile(Bfildes(b)); + syslog(0, runqlog, "out of procs"); + exits(0); + case 0: + if(debug) { + fprint(2, "Starting %s", cmd); + for(ac = 0; av[ac]; ac++) + fprint(2, " %s", av[ac]); + fprint(2, "\n"); + } + logit("execing", dp->name, av); + close(0); + dup(dfd, 0); + close(dfd); + close(2); + efd = open(file(dp->name, 'E'), OWRITE); + if(efd < 0){ + if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); + efd = create(file(dp->name, 'E'), OWRITE, 0666); + if(efd < 0){ + if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser()); + exits("could not open error file - Retry"); + } + } + seek(efd, 0, 2); + exec(cmd, av); + error("can't exec %s", cmd); + break; + default: + for(;;){ + wm = wait(); + if(wm == nil) + error("wait failed: %r", ""); + if(wm->pid == pid) + break; + free(wm); + } + if(debug) + fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); + + if(wm->msg[0]){ + if(debug) + fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); + if(!Rflag && strstr(wm->msg, "Retry")==0){ + /* return the message and remove it */ + if(returnmail(av, dp->name, wm->msg) != 0) + logit("returnmail failed", dp->name, av); + remmatch(dp->name); + } else { + /* add sys to bad list and try again later */ + nbad++; + badsys = realloc(badsys, nbad*sizeof(char*)); + badsys[nbad-1] = strdup(av[3]); + } + } else { + /* it worked remove the message */ + remmatch(dp->name); + } + free(wm); + + } +done: + if (l) + sysunlock(l); + Bterm(b); + sysunlockfile(Bfildes(b)); + free(buf); + free(av); + close(dfd); +} + + +/* + * return a name starting with the given character + */ +char* +file(char *name, char type) +{ + static char nname[Elemlen+1]; + + strncpy(nname, name, Elemlen); + nname[Elemlen] = 0; + nname[0] = type; + return nname; +} + +/* + * send back the mail with an error message + * + * return 0 if successful + */ +int +returnmail(char **av, char *name, char *msg) +{ + int pfd[2]; + Waitmsg *wm; + int fd; + char buf[256]; + char attachment[256]; + int i; + long n; + String *s; + char *sender; + + if(av[1] == 0 || av[2] == 0){ + logit("runq - dumping bad file", name, av); + return 0; + } + + s = unescapespecial(s_copy(av[2])); + sender = s_to_c(s); + + if(!returnable(sender) || strcmp(sender, "postmaster") == 0) { + logit("runq - dumping p to p mail", name, av); + return 0; + } + + if(pipe(pfd) < 0){ + logit("runq - pipe failed", name, av); + return -1; + } + + switch(rfork(RFFDG|RFPROC|RFENVG)){ + case -1: + logit("runq - fork failed", name, av); + return -1; + case 0: + logit("returning", name, av); + close(pfd[1]); + close(0); + dup(pfd[0], 0); + close(pfd[0]); + putenv("upasname", "/dev/null"); + snprint(buf, sizeof(buf), "%s/marshal", UPASBIN); + snprint(attachment, sizeof(attachment), "%s", file(name, 'D')); + execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, nil); + error("can't exec", 0); + break; + default: + break; + } + + close(pfd[0]); + fprint(pfd[1], "\n"); /* get out of headers */ + if(av[1]){ + fprint(pfd[1], "Your request ``%.20s ", av[1]); + for(n = 3; av[n]; n++) + fprint(pfd[1], "%s ", av[n]); + } + fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg); + fd = open(file(name, 'E'), OREAD); + if(fd >= 0){ + for(;;){ + n = read(fd, buf, sizeof(buf)); + if(n <= 0) + break; + if(write(pfd[1], buf, n) != n){ + close(fd); + goto out; + } + } + close(fd); + } + close(pfd[1]); +out: + wm = wait(); + if(wm == nil){ + syslog(0, "runq", "wait: %r"); + logit("wait failed", name, av); + return -1; + } + i = 0; + if(wm->msg[0]){ + i = -1; + syslog(0, "runq", "returnmail child: %s", wm->msg); + logit("returnmail child failed", name, av); + } + free(wm); + return i; +} + +/* + * print a warning and continue + */ +void +warning(char *f, void *a) +{ + char err[65]; + char buf[256]; + + rerrstr(err, sizeof(err)); + snprint(buf, sizeof(buf), f, a); + fprint(2, "runq: %s: %s\n", buf, err); +} + +/* + * print an error and die + */ +void +error(char *f, void *a) +{ + char err[Errlen]; + char buf[256]; + + rerrstr(err, sizeof(err)); + snprint(buf, sizeof(buf), f, a); + fprint(2, "runq: %s: %s\n", buf, err); + exits(buf); +} + +void +logit(char *msg, char *file, char **av) +{ + int n, m; + char buf[256]; + + n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg); + for(; *av; av++){ + m = strlen(*av); + if(n + m + 4 > sizeof(buf)) + break; + sprint(buf + n, " '%s'", *av); + n += m + 3; + } + syslog(0, runqlog, "%s", buf); +} + +char *loadfile = ".runqload"; + +/* + * load balancing + */ +void +doload(int start) +{ + int fd; + char buf[32]; + int i, n; + Mlock *l; + Dir *d; + + if(load <= 0) + return; + + if(chdir(root) < 0){ + load = 0; + return; + } + + l = syslock(loadfile); + fd = open(loadfile, ORDWR); + if(fd < 0){ + fd = create(loadfile, 0666, ORDWR); + if(fd < 0){ + load = 0; + sysunlock(l); + return; + } + } + + /* get current load */ + i = 0; + n = read(fd, buf, sizeof(buf)-1); + if(n >= 0){ + buf[n] = 0; + i = atoi(buf); + } + if(i < 0) + i = 0; + + /* ignore load if file hasn't been changed in 30 minutes */ + d = dirfstat(fd); + if(d != nil){ + if(d->mtime + 30*60 < time(0)) + i = 0; + free(d); + } + + /* if load already too high, give up */ + if(start && i >= load){ + sysunlock(l); + exits(0); + } + + /* increment/decrement load */ + if(start) + i++; + else + i--; + seek(fd, 0, 0); + fprint(fd, "%d\n", i); + sysunlock(l); + close(fd); +} |