From 5a8e63b2f016735364d17866d5e2bcb35d20c78b Mon Sep 17 00:00:00 2001 From: rsc Date: Sun, 29 Feb 2004 22:10:26 +0000 Subject: Fighting the good fight. Move libfmt, libutf into subdirectories of lib9. Add poll-based socket i/o to libthread, so that we can avoid using multiple procs when possible, thus removing dependence on crappy pthreads implementations. Convert samterm, acme to the single-proc libthread. Bring libcomplete, acme up-to-date w.r.t. Plan 9 distribution. --- src/libthread/channel.c | 18 +++--- src/libthread/create.c | 4 ++ src/libthread/exec-unix.c | 139 +++++++++++++++------------------------------ src/libthread/main.c | 7 +++ src/libthread/mkfile | 2 + src/libthread/proctab.c | 19 +++++++ src/libthread/sched.c | 34 +++++++++++ src/libthread/threadimpl.h | 3 +- 8 files changed, 124 insertions(+), 102 deletions(-) (limited to 'src/libthread') diff --git a/src/libthread/channel.c b/src/libthread/channel.c index d1ec2985..4f642c84 100644 --- a/src/libthread/channel.c +++ b/src/libthread/channel.c @@ -65,9 +65,7 @@ chaninit(Channel *c, int elemsize, int elemcnt) { if(elemcnt < 0 || elemsize <= 0 || c == nil) return -1; - c->f = 0; - c->n = 0; - c->freed = 0; + memset(c, 0, sizeof *c); c->e = elemsize; c->s = elemcnt; _threaddebug(DBGCHAN, "chaninit %p", c); @@ -104,13 +102,16 @@ alt(Alt *alts) * chanlock. Instead, we delay the note until we've dropped * the lock. */ + + /* + * T might be nil here -- the scheduler sends on threadwaitchan + * directly (in non-blocking mode, of course!). + */ t = _threadgetproc()->thread; - if(t->moribund || _threadexitsallstatus) + if((t && t->moribund) || _threadexitsallstatus) yield(); /* won't return */ s = _procsplhi(); lock(&chanlock); - t->alt = alts; - t->chan = Chanalt; /* test whether any channels can proceed */ n = 0; @@ -125,7 +126,6 @@ alt(Alt *alts) if(c==nil){ unlock(&chanlock); _procsplx(s); - t->chan = Channone; return -1; } if(canexec(xa)) @@ -138,7 +138,6 @@ alt(Alt *alts) if(xa->op == CHANNOBLK){ unlock(&chanlock); _procsplx(s); - t->chan = Channone; _threadnalt++; return xa - alts; } @@ -159,6 +158,9 @@ _threadnalt++; * we need to be here. */ Again: + t->alt = alts; + t->chan = Chanalt; + unlock(&chanlock); _procsplx(s); r = _threadrendezvous((ulong)&c, 0); diff --git a/src/libthread/create.c b/src/libthread/create.c index d487e195..518eaae6 100644 --- a/src/libthread/create.c +++ b/src/libthread/create.c @@ -86,6 +86,7 @@ proccreate(void (*f)(void*), void *arg, uint stacksize) p = _threadgetproc(); if(p->idle){ + fprint(2, "cannot create procs once there is an idle thread\n"); werrstr("cannot create procs once there is an idle thread"); return -1; } @@ -124,6 +125,7 @@ threadcreateidle(void (*f)(void *arg), void *arg, uint stacksize) int id; if(_threadprocs!=1){ + fprint(2, "cannot have idle thread in multi-proc program\n"); werrstr("cannot have idle thread in multi-proc program"); return -1; } @@ -153,6 +155,8 @@ _newproc(void (*f)(void *arg), void *arg, uint stacksize, char *name, int grp, i else *_threadpq.tail = p; _threadpq.tail = &p->next; + if(_threadprocs == 1) + _threadmultiproc(); _threadprocs++; unlock(&_threadpq.lock); return p; diff --git a/src/libthread/exec-unix.c b/src/libthread/exec-unix.c index c04d414c..bfffa14d 100644 --- a/src/libthread/exec-unix.c +++ b/src/libthread/exec-unix.c @@ -2,28 +2,18 @@ #include #include "threadimpl.h" +static void efork(int[3], int[2], char*, char**); void -procexec(Channel *pidc, int fd[3], char *prog, char *args[]) +threadexec(Channel *pidc, int fd[3], char *prog, char *args[]) { - int n; - Proc *p; - Thread *t; - - _threaddebug(DBGEXEC, "procexec %s", prog); - /* must be only thread in proc */ - p = _threadgetproc(); - t = p->thread; - if(p->threads.head != t || p->threads.head->nextt != nil){ - werrstr("not only thread in proc"); - Bad: - _threaddebug(DBGEXEC, "procexec bad %r"); - if(pidc) - sendul(pidc, ~0); - return; - } + int pfd[2]; + int n, pid; + char exitstr[ERRMAX]; + _threaddebug(DBGEXEC, "threadexec %s", prog); + /* - * We want procexec to behave like exec; if exec succeeds, + * We want threadexec to behave like exec; if exec succeeds, * never return, and if it fails, return with errstr set. * Unfortunately, the exec happens in another proc since * we have to wait for the exec'ed process to finish. @@ -34,114 +24,77 @@ procexec(Channel *pidc, int fd[3], char *prog, char *args[]) * then the proc doing the exec sends the errstr down the * pipe to us. */ - if(pipe(p->exec.fd) < 0) + if(pipe(pfd) < 0) goto Bad; - if(fcntl(p->exec.fd[0], F_SETFD, 1) < 0) + if(fcntl(pfd[0], F_SETFD, 1) < 0) goto Bad; - if(fcntl(p->exec.fd[1], F_SETFD, 1) < 0) + if(fcntl(pfd[1], F_SETFD, 1) < 0) goto Bad; - /* exec in parallel via the scheduler */ - assert(p->needexec==0); - p->exec.prog = prog; - p->exec.args = args; - p->exec.stdfd = fd; - p->needexec = 1; - _sched(); + switch(pid = fork()){ + case -1: + close(pfd[0]); + close(pfd[1]); + goto Bad; + case 0: + efork(fd, pfd, prog, args); + _exit(0); + default: + break; + } - close(p->exec.fd[1]); - if((n = read(p->exec.fd[0], p->exitstr, ERRMAX-1)) > 0){ /* exec failed */ - p->exitstr[n] = '\0'; - errstr(p->exitstr, ERRMAX); - close(p->exec.fd[0]); + close(pfd[1]); + if((n = read(pfd[0], exitstr, ERRMAX-1)) > 0){ /* exec failed */ + exitstr[n] = '\0'; + errstr(exitstr, ERRMAX); + close(pfd[0]); goto Bad; } - close(p->exec.fd[0]); + close(pfd[0]); close(fd[0]); if(fd[1] != fd[0]) close(fd[1]); if(fd[2] != fd[1] && fd[2] != fd[0]) close(fd[2]); if(pidc) - sendul(pidc, t->ret); + sendul(pidc, pid); - _threaddebug(DBGEXEC, "procexec schedexecwait"); - /* wait for exec'ed program, then exit */ - _schedexecwait(); -} + _threaddebug(DBGEXEC, "threadexec schedexecwait"); + threadexits(0); -void -procexecl(Channel *pidc, int fd[3], char *f, ...) -{ - procexec(pidc, fd, f, &f+1); +Bad: + _threaddebug(DBGEXEC, "threadexec bad %r"); + if(pidc) + sendul(pidc, ~0); } void -_schedexecwait(void) +threadexecl(Channel *pidc, int fd[3], char *f, ...) { - int pid; - Channel *c; - Proc *p; - Thread *t; - Waitmsg *w; - - p = _threadgetproc(); - t = p->thread; - pid = t->ret; - _threaddebug(DBGEXEC, "_schedexecwait %d", t->ret); - - for(;;){ - w = wait(); - if(w == nil) - break; - if(w->pid == pid) - break; - free(w); - } - if(w != nil){ - if((c = _threadwaitchan) != nil) - sendp(c, w); - else - free(w); - } - threadexits("procexec"); + threadexec(pidc, fd, f, &f+1); } static void -efork(void *ve) +efork(int stdfd[3], int fd[2], char *prog, char **args) { char buf[ERRMAX]; - Execargs *e; int i; - e = ve; - _threaddebug(DBGEXEC, "_schedexec %s -- calling execv", e->prog); - dup(e->stdfd[0], 0); - dup(e->stdfd[1], 1); - dup(e->stdfd[2], 2); + _threaddebug(DBGEXEC, "_schedexec %s -- calling execv", prog); + dup(stdfd[0], 0); + dup(stdfd[1], 1); + dup(stdfd[2], 2); for(i=3; i<40; i++) - if(i != e->fd[1]) + if(i != fd[1]) close(i); rfork(RFNOTEG); - execvp(e->prog, e->args); + execvp(prog, args); _threaddebug(DBGEXEC, "_schedexec failed: %r"); rerrstr(buf, sizeof buf); if(buf[0]=='\0') strcpy(buf, "exec failed"); - write(e->fd[1], buf, strlen(buf)); - close(e->fd[1]); + write(fd[1], buf, strlen(buf)); + close(fd[1]); _exits(buf); } -int -_schedexec(Execargs *e) -{ - int pid; - - pid = fork(); - if(pid == 0){ - efork(e); - _exit(1); - } - return pid; -} diff --git a/src/libthread/main.c b/src/libthread/main.c index 97a6154a..53061471 100644 --- a/src/libthread/main.c +++ b/src/libthread/main.c @@ -24,6 +24,12 @@ _threaddie(int x) exit(_threadexitsallstatus[0] ? 1 : 0); } +static void +_nop(int x) +{ + USED(x); +} + int main(int argc, char **argv) { @@ -31,6 +37,7 @@ main(int argc, char **argv) Proc *p; signal(SIGTERM, _threaddie); + signal(SIGCHLD, _nop); // rfork(RFREND); //_threaddebuglevel = (DBGSCHED|DBGCHAN|DBGREND)^~0; diff --git a/src/libthread/mkfile b/src/libthread/mkfile index 8854cde7..58a757b3 100644 --- a/src/libthread/mkfile +++ b/src/libthread/mkfile @@ -12,6 +12,7 @@ OFILES=\ debug.$O\ exec-unix.$O\ exit.$O\ + fdwait.$O\ getpid.$O\ id.$O\ iocall.$O\ @@ -30,6 +31,7 @@ OFILES=\ memsetd.$O\ note.$O\ proctab.$O\ + read9pmsg.$O\ ref.$O\ rendez.$O\ sched.$O\ diff --git a/src/libthread/proctab.c b/src/libthread/proctab.c index 5e5dcb2b..ec28d676 100644 --- a/src/libthread/proctab.c +++ b/src/libthread/proctab.c @@ -6,6 +6,18 @@ enum PTABHASH = 257, }; +static int multi; +static Proc *theproc; + +void +_threadmultiproc(void) +{ + if(multi == 0){ + multi = 1; + _threadsetproc(theproc); + } +} + static Lock ptablock; Proc *ptab[PTABHASH]; @@ -14,6 +26,10 @@ _threadsetproc(Proc *p) { int h; + if(!multi){ + theproc = p; + return; + } lock(&ptablock); h = ((unsigned)p->pid)%PTABHASH; p->link = ptab[h]; @@ -27,6 +43,9 @@ __threadgetproc(int rm) Proc **l, *p; int h, pid; + if(!multi) + return theproc; + pid = _threadgetpid(); lock(&ptablock); diff --git a/src/libthread/sched.c b/src/libthread/sched.c index d85a76e2..755fc280 100644 --- a/src/libthread/sched.c +++ b/src/libthread/sched.c @@ -1,4 +1,5 @@ #include +#include #include "threadimpl.h" //static Thread *runthread(Proc*); @@ -67,10 +68,12 @@ _schedinit(void *arg) t = nil; _sched(); } +/* if(p->needexec){ t->ret = _schedexec(&p->exec); p->needexec = 0; } +*/ if(p->newproc){ t->ret = _schedfork(p->newproc); if(t->ret < 0){ @@ -90,14 +93,45 @@ _schedinit(void *arg) static Thread* runthread(Proc *p) { + Channel *c; Thread *t; Tqueue *q; + Waitmsg *w; + int e, sent; if(p->nthreads==0 || (p->nthreads==1 && p->idle)) return nil; q = &p->ready; +relock: lock(&p->readylock); if(q->head == nil){ + e = errno; + if((c = _threadwaitchan) != nil){ + if(c->n <= c->s){ + sent = 0; + for(;;){ + if((w = p->waitmsg) != nil) + p->waitmsg = nil; + else + w = waitnohang(); + if(w == nil) + break; + if(sent == 0){ + unlock(&p->readylock); + sent = 1; + } + if(nbsendp(c, w) != 1) + break; + } + p->waitmsg = w; + if(sent) + goto relock; + } + }else{ + while((w = waitnohang()) != nil) + free(w); + } + errno = e; if(p->idle){ if(p->idle->state != Ready){ fprint(2, "everyone is asleep\n"); diff --git a/src/libthread/threadimpl.h b/src/libthread/threadimpl.h index 0dd1e870..590342ab 100644 --- a/src/libthread/threadimpl.h +++ b/src/libthread/threadimpl.h @@ -139,6 +139,7 @@ struct Proc void *arg; /* passed between shared and unshared stk */ char str[ERRMAX]; /* used by threadexits to avoid malloc */ char errbuf[ERRMAX]; /* errstr */ + Waitmsg *waitmsg; void* udata; /* User per-proc data pointer */ }; @@ -181,6 +182,7 @@ void __threaddebug(ulong, char*, ...); void _threadexitsall(char*); void _threadflagrendez(Thread*); Proc* _threadgetproc(void); +extern void _threadmultiproc(void); Proc* _threaddelproc(void); void _threadsetproc(Proc*); void _threadinitstack(Thread*, void(*)(void*), void*); @@ -195,7 +197,6 @@ long _xdec(long*); void _xinc(long*); void _threadremove(Proc*, Thread*); -extern int _threadmultiproc; extern int _threaddebuglevel; extern char* _threadexitsallstatus; extern Pqueue _threadpq; -- cgit v1.2.3