From 7966faa931bfa9cf4ca53dd2d5b6e1eb0f174411 Mon Sep 17 00:00:00 2001 From: rsc Date: Thu, 23 Sep 2004 03:01:36 +0000 Subject: Continue fighting pthreads. Clean up thread library a bit too. --- src/libthread/chanprint.c | 4 +- src/libthread/create.c | 227 +++++++++++++++-------------- src/libthread/debug.c | 4 +- src/libthread/exec-unix.c | 14 +- src/libthread/exec.c | 82 ----------- src/libthread/exit.c | 31 +--- src/libthread/id.c | 8 +- src/libthread/iocall.c | 5 +- src/libthread/ioproc.c | 5 +- src/libthread/main.c | 152 ++++++------------- src/libthread/mkfile | 4 +- src/libthread/note.c | 6 +- src/libthread/proctab.c | 2 +- src/libthread/ref.c | 6 + src/libthread/rendez.c | 38 ----- src/libthread/sched.c | 355 +++++++++++++++++++++++---------------------- src/libthread/sleep.c | 38 +++++ src/libthread/texec.c | 11 +- src/libthread/threadimpl.h | 110 +++++++------- src/libthread/ucontext.c | 13 +- 20 files changed, 500 insertions(+), 615 deletions(-) delete mode 100644 src/libthread/exec.c delete mode 100644 src/libthread/rendez.c create mode 100644 src/libthread/sleep.c (limited to 'src') diff --git a/src/libthread/chanprint.c b/src/libthread/chanprint.c index af9e8103..781c6f6f 100644 --- a/src/libthread/chanprint.c +++ b/src/libthread/chanprint.c @@ -1,4 +1,6 @@ -#include "threadimpl.h" +#include +#include +#include int chanprint(Channel *c, char *fmt, ...) diff --git a/src/libthread/create.c b/src/libthread/create.c index 5dee4c48..290c52a8 100644 --- a/src/libthread/create.c +++ b/src/libthread/create.c @@ -1,45 +1,46 @@ #include "threadimpl.h" -Pqueue _threadpq; -int _threadprocs; -int __pthread_nonstandard_stacks; +Pqueue _threadpq; /* list of all procs */ +int _threadnprocs; /* count of procs */ -static int nextID(void); +static int newthreadid(void); +static int newprocid(void); /* * Create and initialize a new Thread structure attached to a given proc. */ -void -_stackfree(void *v) -{ - free(v); -} - -static int -newthread(Proc *p, void (*f)(void *arg), void *arg, uint stacksize, char *name, int grp) +int +_newthread(Proc *p, void (*f)(void *arg), void *arg, uint stacksize, + char *name, int grp) { int id; Thread *t; - char *s; - __pthread_nonstandard_stacks = 1; - if(stacksize < 32) - sysfatal("bad stacksize %d", stacksize); t = _threadmalloc(sizeof(Thread), 1); - t->lastfd = -1; - s = _threadmalloc(stacksize, 0); - t->stk = (uchar*)s; - t->stksize = stacksize; - _threaddebugmemset(s, 0xFE, stacksize); - _threadinitstack(t, f, arg); t->proc = p; t->grp = grp; + t->id = id = newthreadid(); if(name) - t->cmdname = strdup(name); - t->id = nextID(); - id = t->id; - t->next = (Thread*)~0; - _threaddebug(DBGSCHED, "create thread %d.%d name %s", p->pid, t->id, name); + t->name = strdup(name); + _threaddebug(DBGSCHED, "create thread %d.%d name %s", p->id, id, name); + + /* + * Allocate and clear stack. + */ + if(stacksize < 1024) + sysfatal("bad stacksize %d", stacksize); + t->stk = _threadmalloc(stacksize, 0); + t->stksize = stacksize; + _threaddebugmemset(t->stk, 0xFE, stacksize); + + /* + * Set up t->context to call f(arg). + */ + _threadinitstack(t, f, arg); + + /* + * Add thread to proc. + */ lock(&p->lock); p->nthreads++; if(p->threads.head == nil) @@ -49,120 +50,134 @@ newthread(Proc *p, void (*f)(void *arg), void *arg, uint stacksize, char *name, t->prevt->nextt = t; } p->threads.tail = t; + t->next = (Thread*)~0; + + /* + * Mark thread as ready to run. + */ t->state = Ready; _threadready(t); unlock(&p->lock); + return id; } -static int -nextID(void) -{ - static Lock l; - static int id; - int i; - - lock(&l); - i = ++id; - unlock(&l); - return i; -} - -int -procrfork(void (*f)(void *), void *arg, uint stacksize, int rforkflag) +/* + * Free a Thread structure. + */ +void +_threadfree(Thread *t) { - Proc *p; - int id; - - p = _threadgetproc(); - assert(p->newproc == nil); - p->newproc = _newproc(f, arg, stacksize, nil, p->thread->grp, rforkflag); - id = p->newproc->threads.head->id; - _sched(); - return id; + free(t->stk); + free(t->name); + free(t); } -int -proccreate(void (*f)(void*), void *arg, uint stacksize) +/* + * Create and initialize a new Proc structure with a single Thread + * running inside it. Add the Proc to the global process list. + */ +Proc* +_newproc(void) { Proc *p; - p = _threadgetproc(); - if(p->idle){ - fprint(2, "cannot create procs once there is an idle thread\n"); - werrstr("cannot create procs once there is an idle thread"); - return -1; - } - return procrfork(f, arg, stacksize, 0); + /* + * Allocate. + */ + p = _threadmalloc(sizeof *p, 1); + p->id = newprocid(); + + /* + * Add to list. Record if we're now multiprocess. + */ + lock(&_threadpq.lock); + if(_threadpq.head == nil) + _threadpq.head = p; + else + *_threadpq.tail = p; + _threadpq.tail = &p->next; + if(_threadnprocs == 1) + _threadmultiproc(); + _threadnprocs++; + unlock(&_threadpq.lock); + + return p; } -void -_freeproc(Proc *p) +/* + * Allocate a new thread running f(arg) on a stack of size stacksize. + * Return the thread id. The thread group inherits from the current thread. + */ +int +threadcreate(void (*f)(void*), void *arg, uint stacksize) { - Thread *t, *nextt; - - for(t = p->threads.head; t; t = nextt){ - if(t->cmdname) - free(t->cmdname); - assert(t->stk != nil); - _stackfree(t->stk); - nextt = t->nextt; - free(t); - } - free(p); + return _newthread(_threadgetproc(), f, arg, stacksize, nil, threadgetgrp()); } /* - * Create a new thread and schedule it to run. - * The thread grp is inherited from the currently running thread. + * Allocate a new idle thread. Only allowed in a single-proc program. */ int -threadcreate(void (*f)(void *arg), void *arg, uint stacksize) +threadcreateidle(void (*f)(void *arg), void *arg, uint stacksize) { - return newthread(_threadgetproc(), f, arg, stacksize, nil, threadgetgrp()); + int id; + + assert(_threadnprocs == 1); + + id = threadcreate(f, arg, stacksize); + _threaddebug(DBGSCHED, "idle is %d", id); + _threadsetidle(id); + return id; } +/* + * Threadcreate, but do it inside a fresh proc. + */ int -threadcreateidle(void (*f)(void *arg), void *arg, uint stacksize) +proccreate(void (*f)(void*), void *arg, uint stacksize) { int id; + Proc *p, *np; - if(_threadprocs!=1){ - fprint(2, "cannot have idle thread in multi-proc program\n"); - werrstr("cannot have idle thread in multi-proc program"); - return -1; - } - id = newthread(_threadgetproc(), f, arg, stacksize, nil, threadgetgrp()); - _threaddebug(DBGSCHED, "idle is %d", id); - _threadidle(); + p = _threadgetproc(); + np = _newproc(); + p->newproc = np; + p->schedfn = _threadstartproc; + id = _newthread(np, f, arg, stacksize, nil, p->thread->grp); + _sched(); /* call into scheduler to create proc XXX */ return id; } /* - * Create and initialize a new Proc structure with a single Thread - * running inside it. Add the Proc to the global process list. + * Allocate a new thread id. */ -Proc* -_newproc(void (*f)(void *arg), void *arg, uint stacksize, char *name, int grp, int rforkflag) +static int +newthreadid(void) { - Proc *p; + static Lock l; + static int id; + int i; - p = _threadmalloc(sizeof *p, 1); - p->pid = -1; - p->rforkflag = rforkflag; - newthread(p, f, arg, stacksize, name, grp); + lock(&l); + i = ++id; + unlock(&l); + return i; +} - lock(&_threadpq.lock); - if(_threadpq.head == nil) - _threadpq.head = p; - else - *_threadpq.tail = p; - _threadpq.tail = &p->next; +/* + * Allocate a new proc id. + */ +static int +newprocid(void) +{ + static Lock l; + static int id; + int i; - if(_threadprocs == 1) - _threadmultiproc(); - _threadprocs++; - unlock(&_threadpq.lock); - return p; + lock(&l); + i = ++id; + unlock(&l); + return i; } diff --git a/src/libthread/debug.c b/src/libthread/debug.c index 63e2e1b5..14b47c00 100644 --- a/src/libthread/debug.c +++ b/src/libthread/debug.c @@ -19,9 +19,9 @@ __threaddebug(ulong flag, char *fmt, ...) if(p==nil) fmtprint(&f, "noproc "); else if(p->thread) - fmtprint(&f, "%d.%d ", p->pid, p->thread->id); + fmtprint(&f, "%d.%d ", p->id, p->thread->id); else - fmtprint(&f, "%d._ ", p->pid); + fmtprint(&f, "%d._ ", p->id); va_start(arg, fmt); fmtvprint(&f, fmt, arg); diff --git a/src/libthread/exec-unix.c b/src/libthread/exec-unix.c index 3d4dfcac..91639bbb 100644 --- a/src/libthread/exec-unix.c +++ b/src/libthread/exec-unix.c @@ -10,9 +10,20 @@ _threadexec(Channel *pidc, int fd[3], char *prog, char *args[], int freeargs) int pfd[2]; int n, pid; char exitstr[ERRMAX]; + static int firstexec = 1; + static Lock lk; _threaddebug(DBGEXEC, "threadexec %s", prog); - + + if(firstexec){ + lock(&lk); + if(firstexec){ + firstexec = 0; + _threadfirstexec(); + } + unlock(&lk); + } + /* * We want threadexec to behave like exec; if exec succeeds, * never return, and if it fails, return with errstr set. @@ -41,6 +52,7 @@ _threadexec(Channel *pidc, int fd[3], char *prog, char *args[], int freeargs) efork(fd, pfd, prog, args); _exit(0); default: + _threadafterexec(); if(freeargs) free(args); break; diff --git a/src/libthread/exec.c b/src/libthread/exec.c deleted file mode 100644 index 0fb68111..00000000 --- a/src/libthread/exec.c +++ /dev/null @@ -1,82 +0,0 @@ -#include "threadimpl.h" - -#define PIPEMNT "/mnt/temp" - -void -procexec(Channel *pidc, int fd[3], char *prog, char *args[]) -{ - int n; - Proc *p; - Thread *t; - - _threaddebug(DBGEXEC, "procexec %s", prog); - /* must be only thread in proc */ - p = _threadgetproc(); - t = p->thread; - if(p->threads.head != t || p->threads.head->nextt != nil){ - werrstr("not only thread in proc"); - Bad: - if(pidc) - sendul(pidc, ~0); - return; - } - - /* - * We want procexec to behave like exec; if exec succeeds, - * never return, and if it fails, return with errstr set. - * Unfortunately, the exec happens in another proc since - * we have to wait for the exec'ed process to finish. - * To provide the semantics, we open a pipe with the - * write end close-on-exec and hand it to the proc that - * is doing the exec. If the exec succeeds, the pipe will - * close so that our read below fails. If the exec fails, - * then the proc doing the exec sends the errstr down the - * pipe to us. - */ - if(bind("#|", PIPEMNT, MREPL) < 0) - goto Bad; - if((p->exec.fd[0] = open(PIPEMNT "/data", OREAD)) < 0){ - unmount(nil, PIPEMNT); - goto Bad; - } - if((p->exec.fd[1] = open(PIPEMNT "/data1", OWRITE|OCEXEC)) < 0){ - close(p->exec.fd[0]); - unmount(nil, PIPEMNT); - goto Bad; - } - unmount(nil, PIPEMNT); - - /* exec in parallel via the scheduler */ - assert(p->needexec==0); - p->exec.prog = prog; - p->exec.args = args; - p->exec.stdfd = fd; - p->needexec = 1; - _sched(); - - close(p->exec.fd[1]); - if((n = read(p->exec.fd[0], p->exitstr, ERRMAX-1)) > 0){ /* exec failed */ - p->exitstr[n] = '\0'; - errstr(p->exitstr, ERRMAX); - close(p->exec.fd[0]); - goto Bad; - } - close(p->exec.fd[0]); - close(fd[0]); - if(fd[1] != fd[0]) - close(fd[1]); - if(fd[2] != fd[1] && fd[2] != fd[0]) - close(fd[2]); - if(pidc) - sendul(pidc, t->ret); - - /* wait for exec'ed program, then exit */ - _schedexecwait(); -} - -void -procexecl(Channel *pidc, int fd[3], char *f, ...) -{ - procexec(pidc, fd, f, &f+1); -} - diff --git a/src/libthread/exit.c b/src/libthread/exit.c index f3d4bb8e..79aa7c7f 100644 --- a/src/libthread/exit.c +++ b/src/libthread/exit.c @@ -26,42 +26,13 @@ threadexits(char *exitstr) void threadexitsall(char *exitstr) { - Proc *p; - int *pid; - int i, npid, mypid; - _threaddebug(DBGSCHED, "threadexitsall %s", exitstr); if(exitstr == nil) exitstr = ""; _threadexitsallstatus = exitstr; _threaddebug(DBGSCHED, "_threadexitsallstatus set to %p", _threadexitsallstatus); - mypid = _threadgetpid(); - - /* - * signal others. - * copying all the pids first avoids other thread's - * teardown procedures getting in the way. - */ - lock(&_threadpq.lock); - npid = 0; - for(p=_threadpq.head; p; p=p->next) - npid++; - pid = _threadmalloc(npid*sizeof(pid[0]), 0); - npid = 0; - for(p = _threadpq.head; p; p=p->next) - pid[npid++] = p->pid; - unlock(&_threadpq.lock); - for(i=0; ithread; - if (t->cmdname) - free(t->cmdname); + if(t->name) + free(t->name); va_start(arg, fmt); - t->cmdname = vsmprint(fmt, arg); + t->name = vsmprint(fmt, arg); va_end(arg); /* Plan 9 only @@ -85,7 +85,7 @@ threadsetname(char *fmt, ...) char* threadgetname(void) { - return _threadgetproc()->thread->cmdname; + return _threadgetproc()->thread->name; } void** diff --git a/src/libthread/iocall.c b/src/libthread/iocall.c index 0577f8a0..e359c4d5 100644 --- a/src/libthread/iocall.c +++ b/src/libthread/iocall.c @@ -1,4 +1,7 @@ -#include "threadimpl.h" +#include +#include +#include +#include "ioproc.h" long iocall(Ioproc *io, long (*op)(va_list*), ...) diff --git a/src/libthread/ioproc.c b/src/libthread/ioproc.c index 2b2a6025..4e4c32a8 100644 --- a/src/libthread/ioproc.c +++ b/src/libthread/ioproc.c @@ -1,4 +1,7 @@ -#include "threadimpl.h" +#include +#include +#include +#include "ioproc.h" enum { diff --git a/src/libthread/main.c b/src/libthread/main.c index 8cdd8ca3..83ee177c 100644 --- a/src/libthread/main.c +++ b/src/libthread/main.c @@ -1,135 +1,79 @@ -#include -#include +/* + * Thread library. + */ + #include "threadimpl.h" typedef struct Mainarg Mainarg; struct Mainarg { - int argc; - char **argv; + int argc; + char **argv; }; -int mainstacksize; -int _threadnotefd; -int _threadpasserpid; -static void mainlauncher(void*); +int mainstacksize; extern void (*_sysfatal)(char*, va_list); -void -_threadstatus(int x) -{ - USED(x); - threadstatus(); -} - -void -_threaddie(int x) -{ - extern char *_threadexitsallstatus; - USED(x); - - if(_threadexitsallstatus) - _exits(_threadexitsallstatus); -} - -int -main(int argc, char **argv) -{ - Mainarg *a; - Proc *p; - -//_threaddebuglevel = (DBGSCHED|DBGCHAN|DBGREND)^~0; - _systhreadinit(); - _qlockinit(_threadsleep, _threadwakeup); - _sysfatal = _threadsysfatal; - notify(_threadnote); - if(mainstacksize == 0) - mainstacksize = 32*1024; - - a = _threadmalloc(sizeof *a, 1); - a->argc = argc; - a->argv = argv; - p = _newproc(mainlauncher, a, mainstacksize, "threadmain", 0, 0); - _scheduler(p); - abort(); /* not reached */ - return 0; -} - static void mainlauncher(void *arg) { Mainarg *a; a = arg; + _threadmaininit(); threadmain(a->argc, a->argv); threadexits("threadmain"); } -void -_threadsignal(void) -{ -} - -void -_threadsignalpasser(void) -{ -} - int -_schedfork(Proc *p) +main(int argc, char **argv) { - int pid; - lock(&p->lock); - pid = ffork(RFMEM|RFNOWAIT, _scheduler, p); - p->pid = pid; - unlock(&p->lock); - return pid; - -} + Mainarg a; + Proc *p; -void -_schedexit(Proc *p) -{ - char ex[ERRMAX]; - Proc **l; + /* + * XXX Do daemonize hack here. + */ + + /* + * Instruct QLock et al. to use our scheduling functions + * so that they can operate at the thread level. + */ + _qlockinit(_threadsleep, _threadwakeup); - lock(&_threadpq.lock); - for(l=&_threadpq.head; *l; l=&(*l)->next){ - if(*l == p){ - *l = p->next; - if(*l == nil) - _threadpq.tail = l; - break; - } - } - _threadprocs--; - unlock(&_threadpq.lock); + /* + * Install our own _threadsysfatal which takes down + * the whole conglomeration of procs. + */ + _sysfatal = _threadsysfatal; - strncpy(ex, p->exitstr, sizeof ex); - ex[sizeof ex-1] = '\0'; - free(p); - _exits(ex); -} + /* + * XXX Install our own jump handler. + */ -int -nrand(int n) -{ - return random()%n; -} + /* + * Install our own signal handlers. + */ + notify(_threadnote); -void -_systhreadinit(void) -{ + /* + * Construct the initial proc running mainlauncher(&a). + */ + if(mainstacksize == 0) + mainstacksize = 32*1024; + a.argc = argc; + a.argv = argv; + p = _newproc(); + _newthread(p, mainlauncher, &a, mainstacksize, "threadmain", 0); + _threadscheduler(p); + abort(); /* not reached */ + return 0; } +/* + * No-op function here so that sched.o drags in main.o. + */ void -threadstats(void) +_threadlinkmain(void) { - extern int _threadnrendez, _threadhighnrendez, - _threadnalt, _threadhighnentry; - fprint(2, "*** THREAD LIBRARY STATS ***\n"); - fprint(2, "nrendez %d high simultaneous %d\n", - _threadnrendez, _threadhighnrendez); - fprint(2, "nalt %d high simultaneous entry %d\n", - _threadnalt, _threadhighnentry); } diff --git a/src/libthread/mkfile b/src/libthread/mkfile index abff1629..ede391da 100644 --- a/src/libthread/mkfile +++ b/src/libthread/mkfile @@ -28,11 +28,11 @@ OFILES=\ memset.$O\ memsetd.$O\ note.$O\ - proctab.$O\ + pthread.$O\ read9pmsg.$O\ ref.$O\ - rendez.$O\ sched.$O\ + sleep.$O\ HFILES=\ $PLAN9/include/thread.h\ diff --git a/src/libthread/note.c b/src/libthread/note.c index 241c16ff..60742aad 100644 --- a/src/libthread/note.c +++ b/src/libthread/note.c @@ -65,6 +65,8 @@ delayednotes(Proc *p, void *v) } if(i==NFN){ _threaddebug(DBGNOTE, "Unhandled note %s, proc %p\n", n->s, p); + if(strcmp(n->s, "sys: child") == 0) + noted(NCONT); fprint(2, "unhandled note %s, pid %d\n", n->s, p->pid); if(v != nil) noted(NDFLT); @@ -85,7 +87,9 @@ _threadnote(void *v, char *s) Note *n; _threaddebug(DBGNOTE, "Got note %s", s); - if(strncmp(s, "sys:", 4) == 0 && strcmp(s, "sys: write on closed pipe") != 0) + if(strncmp(s, "sys:", 4) == 0 + && strcmp(s, "sys: write on closed pipe") != 0 + && strcmp(s, "sys: child") != 0) noted(NDFLT); // if(_threadexitsallstatus){ diff --git a/src/libthread/proctab.c b/src/libthread/proctab.c index ec28d676..b8dd2097 100644 --- a/src/libthread/proctab.c +++ b/src/libthread/proctab.c @@ -46,7 +46,7 @@ __threadgetproc(int rm) if(!multi) return theproc; - pid = _threadgetpid(); + pid = getpid(); lock(&ptablock); h = ((unsigned)pid)%PTABHASH; diff --git a/src/libthread/ref.c b/src/libthread/ref.c index 8f50fd5f..a3b2cbae 100644 --- a/src/libthread/ref.c +++ b/src/libthread/ref.c @@ -1,3 +1,9 @@ +/* + * Atomic reference counts - used by applications. + * + * We use locks to avoid the assembly of the Plan 9 versions. + */ + #include "threadimpl.h" void diff --git a/src/libthread/rendez.c b/src/libthread/rendez.c deleted file mode 100644 index 4451fa4d..00000000 --- a/src/libthread/rendez.c +++ /dev/null @@ -1,38 +0,0 @@ -#include "threadimpl.h" - -int _threadhighnrendez; -int _threadnrendez; - -void -_threadsleep(_Procrend *r) -{ - Thread *t; - - t = _threadgetproc()->thread; - r->arg = t; - t->nextstate = Rendezvous; - t->inrendez = 1; - unlock(r->l); - _sched(); - t->inrendez = 0; - lock(r->l); -} - -void -_threadwakeup(_Procrend *r) -{ - Thread *t; - - t = r->arg; - while(t->state == Running) - sleep(0); - lock(&t->proc->lock); - if(t->state == Dead){ - unlock(&t->proc->lock); - return; - } - assert(t->state == Rendezvous && t->inrendez); - t->state = Ready; - _threadready(t); - unlock(&t->proc->lock); -} diff --git a/src/libthread/sched.c b/src/libthread/sched.c index 7f7b6daa..3fb2ff20 100644 --- a/src/libthread/sched.c +++ b/src/libthread/sched.c @@ -1,76 +1,59 @@ -#include -#include -#include +/* + * Thread scheduler. + */ #include "threadimpl.h" -static Thread *runthread(Proc*); - -static char *_psstate[] = { - "Dead", - "Running", - "Ready", - "Rendezvous", -}; - -static char* -psstate(int s) -{ - if(s < 0 || s >= nelem(_psstate)) - return "unknown"; - return _psstate[s]; -} - -void -needstack(int howmuch) -{ - Proc *p; - Thread *t; - - p = _threadgetproc(); - if(p == nil || (t=p->thread) == nil) - return; - if((ulong)&howmuch < (ulong)t->stk+howmuch){ /* stack overflow waiting to happen */ - fprint(2, "stack overflow: stack at 0x%lux, limit at 0x%lux, need 0x%lux\n", (ulong)&p, (ulong)t->stk, howmuch); - abort(); - } -} +static Thread *runthread(Proc*); +static void schedexit(Proc*); +/* + * Main scheduling loop. + */ void -_scheduler(void *arg) +_threadscheduler(void *arg) { Proc *p; Thread *t; p = arg; - lock(&p->lock); - p->pid = _threadgetpid(); - _threadsetproc(p); + + _threadlinkmain(); + _threadinitproc(p); for(;;){ + /* + * Clean up zombie children. + */ + _threadwaitkids(p); + + /* + * Find next thread to run. + */ + _threaddebug(DBGSCHED, "runthread"); t = runthread(p); - if(t == nil){ - _threaddebug(DBGSCHED, "all threads gone; exiting"); - _threaddelproc(); - _schedexit(p); - } - _threaddebug(DBGSCHED, "running %d.%d", t->proc->pid, t->id); - p->thread = t; - if(t->moribund){ - _threaddebug(DBGSCHED, "%d.%d marked to die"); - goto Moribund; + if(t == nil) + schedexit(p); + + /* + * If it's ready, run it (might instead be marked to die). + */ + lock(&p->lock); + if(t->state == Ready){ + _threaddebug(DBGSCHED, "running %d.%d", p->id, t->id); + t->state = Running; + t->nextstate = Ready; + p->thread = t; + unlock(&p->lock); + _swaplabel(&p->context, &t->context); + lock(&p->lock); + p->thread = nil; } - t->state = Running; - t->nextstate = Ready; - unlock(&p->lock); - _swaplabel(&p->sched, &t->sched); - - lock(&p->lock); - p->thread = nil; + /* + * If thread needs to die, kill it. + */ if(t->moribund){ - Moribund: - if(t->moribund != 1) - fprint(2, "moribund %d\n", t->moribund); + _threaddebug(DBGSCHED, "moribund %d.%d", p->id, t->id); assert(t->moribund == 1); t->state = Dead; if(t->prevt) @@ -82,40 +65,37 @@ _scheduler(void *arg) else p->threads.tail = t->prevt; unlock(&p->lock); - if(t->inrendez){ - abort(); - // _threadflagrendez(t); - // _threadbreakrendez(); - } - _stackfree(t->stk); - free(t->cmdname); - free(t); /* XXX how do we know there are no references? */ + _threadfree(t); p->nthreads--; t = nil; - lock(&p->lock); continue; } -/* - if(p->needexec){ - t->ret = _schedexec(&p->exec); - p->needexec = 0; - } -*/ - if(p->newproc){ - t->ret = _schedfork(p->newproc); - if(t->ret < 0){ -//fprint(2, "_schedfork: %r\n"); - abort(); - } - p->newproc = nil; + unlock(&p->lock); + + /* + * If there is a request to run a function on the + * scheduling stack, do so. + */ + if(p->schedfn){ + _threaddebug(DBGSCHED, "schedfn"); + p->schedfn(p); + p->schedfn = nil; + _threaddebug(DBGSCHED, "schedfn ended"); } + + /* + * Move the thread along. + */ t->state = t->nextstate; + _threaddebug(DBGSCHED, "moveon %d.%d", p->id, t->id); if(t->state == Ready) _threadready(t); - unlock(&p->lock); } } +/* + * Called by thread to give up control of processor to scheduler. + */ int _sched(void) { @@ -125,166 +105,195 @@ _sched(void) p = _threadgetproc(); t = p->thread; assert(t != nil); - _swaplabel(&t->sched, &p->sched); + _swaplabel(&t->context, &p->context); return p->nsched++; } +/* + * Called by thread to yield the processor to other threads. + * Returns number of other threads run between call and return. + */ +int +yield(void) +{ + Proc *p; + int nsched; + + p = _threadgetproc(); + nsched = p->nsched; + return _sched() - nsched; +} + +/* + * Choose the next thread to run. + */ static Thread* runthread(Proc *p) { - Channel *c; Thread *t; Tqueue *q; - Waitmsg *w; - int e, sent; + /* + * No threads left? + */ if(p->nthreads==0 || (p->nthreads==1 && p->idle)) return nil; - q = &p->ready; -relock: + lock(&p->readylock); - if(p->nsched%128 == 0){ - /* clean up children */ - e = errno; - if((c = _threadwaitchan) != nil){ - if(c->n <= c->s){ - sent = 0; - for(;;){ - if((w = p->waitmsg) != nil) - p->waitmsg = nil; - else - w = waitnohang(); - if(w == nil) - break; - if(sent == 0){ - unlock(&p->readylock); - sent = 1; - } - if(nbsendp(c, w) != 1) - break; - } - p->waitmsg = w; - if(sent) - goto relock; - } - }else{ - while((w = waitnohang()) != nil) - free(w); - } - errno = e; - } + q = &p->ready; if(q->head == nil){ + /* + * Is this a single-process program with an idle thread? + */ if(p->idle){ - if(p->idle->state != Ready){ - fprint(2, "everyone is asleep\n"); - exits("everyone is asleep"); - } + /* + * The idle thread had better be ready! + */ + if(p->idle->state != Ready) + sysfatal("all threads are asleep"); + + /* + * Run the idle thread. + */ unlock(&p->readylock); _threaddebug(DBGSCHED, "running idle thread", p->nthreads); return p->idle; } - _threaddebug(DBGSCHED, "sleeping for more work (%d threads)", p->nthreads); + /* + * Wait until one of our threads is readied (by another proc!). + */ q->asleep = 1; p->rend.l = &p->readylock; _procsleep(&p->rend); + + /* + * Maybe we were awakened to exit? + */ if(_threadexitsallstatus) _exits(_threadexitsallstatus); + + assert(q->head != nil); } + t = q->head; q->head = t->next; unlock(&p->readylock); - return t; -} -long -threadstack(void) -{ - Proc *p; - Thread *t; - - p = _threadgetproc(); - t = p->thread; - return (ulong)&p - (ulong)t->stk; + return t; } +/* + * Add a newly-ready thread to its proc's run queue. + */ void _threadready(Thread *t) { Tqueue *q; + /* + * The idle thread does not go on the run queue. + */ if(t == t->proc->idle){ _threaddebug(DBGSCHED, "idle thread is ready"); return; } assert(t->state == Ready); - _threaddebug(DBGSCHED, "readying %d.%d", t->proc->pid, t->id); + _threaddebug(DBGSCHED, "readying %d.%d", t->proc->id, t->id); + + /* + * Add thread to run queue. + */ q = &t->proc->ready; lock(&t->proc->readylock); + t->next = nil; - if(q->head==nil) + if(q->head == nil) q->head = t; else q->tail->next = t; q->tail = t; + + /* + * Wake proc scheduler if it is sleeping. + */ if(q->asleep){ assert(q->asleep == 1); q->asleep = 0; - /* lock passes to runthread */ _procwakeup(&t->proc->rend); } unlock(&t->proc->readylock); - if(_threadexitsallstatus) - _exits(_threadexitsallstatus); } +/* + * Mark the given thread as the idle thread. + * Since the idle thread was just created, it is sitting + * somewhere on the ready queue. + */ void -_threadidle(void) +_threadsetidle(int id) { Tqueue *q; - Thread *t, *idle; + Thread *t, **l, *last; Proc *p; p = _threadgetproc(); - q = &p->ready; + lock(&p->readylock); - assert(q->tail); - idle = q->tail; - if(q->head == idle){ - q->head = nil; - q->tail = nil; - }else{ - for(t=q->head; t->next!=q->tail; t=t->next) - ; - t->next = nil; - q->tail = t; - } - p->idle = idle; - _threaddebug(DBGSCHED, "p->idle is %d\n", idle->id); - unlock(&p->readylock); -} -int -yield(void) -{ - Proc *p; - int nsched; + /* + * Find thread on ready queue. + */ + q = &p->ready; + for(l=&q->head, last=nil; (t=*l) != nil; l=&t->next, last=t) + if(t->id == id) + break; + assert(t != nil); - p = _threadgetproc(); - nsched = p->nsched; - return _sched() - nsched; + /* + * Remove it from ready queue. + */ + *l = t->next; + if(t == q->head) + q->head = t->next; + if(t->next == nil) + q->tail = last; + + /* + * Set as idle thread. + */ + p->idle = t; + _threaddebug(DBGSCHED, "p->idle is %d\n", t->id); + unlock(&p->readylock); } -void -threadstatus(void) +static void +schedexit(Proc *p) { - Proc *p; - Thread *t; + char ex[ERRMAX]; + int n; + Proc **l; - p = _threadgetproc(); - for(t=p->threads.head; t; t=t->nextt) - fprint(2, "[%3d] %s userpc=%lux\n", - t->id, psstate(t->state), t->userpc); + _threaddebug(DBGSCHED, "exiting proc %d", p->id); + lock(&_threadpq.lock); + for(l=&_threadpq.head; *l; l=&(*l)->next){ + if(*l == p){ + *l = p->next; + if(*l == nil) + _threadpq.tail = l; + break; + } + } + n = --_threadnprocs; + unlock(&_threadpq.lock); + + strncpy(ex, p->exitstr, sizeof ex); + ex[sizeof ex-1] = '\0'; + free(p); + if(n == 0) + _threadexitallproc(ex); + else + _threadexitproc(ex); } - + diff --git a/src/libthread/sleep.c b/src/libthread/sleep.c new file mode 100644 index 00000000..d6c4dac4 --- /dev/null +++ b/src/libthread/sleep.c @@ -0,0 +1,38 @@ +#include "threadimpl.h" + +int _threadhighnrendez; +int _threadnrendez; + +void +_threadsleep(_Procrend *r) +{ + Thread *t; + + t = _threadgetproc()->thread; + r->arg = t; + t->nextstate = Rendezvous; + t->asleep = 1; + unlock(r->l); + _sched(); + t->asleep = 0; + lock(r->l); +} + +void +_threadwakeup(_Procrend *r) +{ + Thread *t; + + t = r->arg; + while(t->state == Running) + sleep(0); + lock(&t->proc->lock); + if(t->state == Dead){ + unlock(&t->proc->lock); + return; + } + assert(t->state == Rendezvous && t->asleep); + t->state = Ready; + _threadready(t); + unlock(&t->proc->lock); +} diff --git a/src/libthread/texec.c b/src/libthread/texec.c index 87c3f77b..bcfabee6 100644 --- a/src/libthread/texec.c +++ b/src/libthread/texec.c @@ -5,11 +5,16 @@ extern int _threaddebuglevel; void doexec(void *v) { + int fd[3]; char **argv = v; -print("doexec\n"); - procexec(nil, argv[0], argv); + fd[0] = dup(0, -1); + fd[1] = dup(1, -1); + fd[2] = dup(2, -1); + threadexec(nil, fd, argv[0], argv); + print("exec failed: %r\n"); sendp(threadwaitchan(), nil); + threadexits(nil); } void @@ -28,7 +33,7 @@ threadmain(int argc, char **argv) proccreate(doexec, argv, 8192); w = recvp(c); if(w == nil) - print("exec failed: %r\n"); + print("exec/recvp failed: %r\n"); else print("%d %lud %lud %lud %s\n", w->pid, w->time[0], w->time[1], w->time[2], w->msg); threadexits(nil); diff --git a/src/libthread/threadimpl.h b/src/libthread/threadimpl.h index fea309d9..7c9a66bb 100644 --- a/src/libthread/threadimpl.h +++ b/src/libthread/threadimpl.h @@ -7,31 +7,28 @@ * _threadgetproc()->thread is always a live pointer. * p->threads, p->ready, and _threadrgrp also contain * live thread pointers. These may only be consulted - * while holding p->lock or _threadrgrp.lock; in procs - * other than p, the pointers are only guaranteed to be live - * while the lock is still being held. + * while holding p->lock; in procs other than p, the + * pointers are only guaranteed to be live while the lock + * is still being held. * * Thread structures can only be freed by the proc * they belong to. Threads marked with t->inrendez * need to be extracted from the _threadrgrp before * being freed. - * - * _threadrgrp.lock cannot be acquired while holding p->lock. */ +#include #include -#include +#include #include #include "label.h" typedef struct Thread Thread; -typedef struct Proc Proc; +typedef struct Proc Proc; typedef struct Tqueue Tqueue; typedef struct Pqueue Pqueue; -typedef struct Rgrp Rgrp; typedef struct Execargs Execargs; -/* must match list in sched.c */ typedef enum { Dead, @@ -50,17 +47,9 @@ typedef enum enum { - RENDHASH = 10009, - Printsize = 2048, NPRIV = 8, }; -struct Rgrp -{ - Lock lock; - Thread *hash[RENDHASH]; -}; - struct Tqueue /* Thread queue */ { int asleep; @@ -68,27 +57,38 @@ struct Tqueue /* Thread queue */ Thread *tail; }; +struct Pqueue { /* Proc queue */ + Lock lock; + Proc *head; + Proc **tail; +}; + struct Thread { Lock lock; /* protects thread data structure */ - Label sched; /* for context switches */ - int id; /* thread id */ + + int asleep; /* thread is in _threadsleep */ + Label context; /* for context switches */ int grp; /* thread group */ + int id; /* thread id */ int moribund; /* thread needs to die */ - State state; /* run state */ - State nextstate; /* next run state */ - uchar *stk; /* top of stack (lowest address of stack) */ - uint stksize; /* stack size */ + char *name; /* name of thread */ Thread *next; /* next on ready queue */ - - Proc *proc; /* proc of this thread */ Thread *nextt; /* next on list of threads in this proc */ + State nextstate; /* next run state */ + Proc *proc; /* proc of this thread */ Thread *prevt; /* prev on list of threads in this proc */ int ret; /* return value for Exec, Fork */ + State state; /* run state */ + uchar *stk; /* top of stack (lowest address of stack) */ + uint stksize; /* stack size */ + void* udata[NPRIV]; /* User per-thread data pointer */ - char *cmdname; /* ptr to name of thread */ + /* + * for debugging only + * (could go away without impacting correct behavior): + */ - int inrendez; Channel *altc; _Procrend altrend; @@ -96,10 +96,7 @@ struct Thread Alt *alt; /* pointer to current alt structure (debugging) */ ulong userpc; Channel *c; - pthread_cond_t cond; - void* udata[NPRIV]; /* User per-thread data pointer */ - int lastfd; }; struct Execargs @@ -113,12 +110,13 @@ struct Execargs struct Proc { Lock lock; - Label sched; /* for context switches */ - Proc *link; /* in proctab */ - int pid; /* process id */ + + Label context; /* for context switches */ + Proc *link; /* in ptab */ int splhi; /* delay notes */ Thread *thread; /* running thread */ Thread *idle; /* idle thread */ + int id; int needexec; Execargs exec; /* exec argument */ @@ -131,13 +129,15 @@ struct Proc Tqueue ready; /* Runnable threads */ Lock readylock; - char printbuf[Printsize]; int blocked; /* In a rendezvous */ int pending; /* delayed note pending */ int nonotes; /* delay notes */ uint nextID; /* ID of most recently created thread */ Proc *next; /* linked list of Procs */ + + void (*schedfn)(Proc*); /* function to call in scheduler */ + _Procrend rend; /* sleep here for more ready threads */ void *arg; /* passed between shared and unshared stk */ @@ -147,29 +147,17 @@ struct Proc void* udata; /* User per-proc data pointer */ int nsched; -}; - -struct Pqueue { /* Proc queue */ - Lock lock; - Proc *head; - Proc **tail; -}; -struct Ioproc -{ - int tid; - Channel *c, *creply; - int inuse; - long (*op)(va_list*); - va_list arg; - long ret; - char err[ERRMAX]; - Ioproc *next; + /* + * for debugging only + */ + int pid; /* process id */ + int pthreadid; /* pthread id */ }; void _swaplabel(Label*, Label*); -void _freeproc(Proc*); -Proc* _newproc(void(*)(void*), void*, uint, char*, int, int); +Proc* _newproc(void); +int _newthread(Proc*, void(*)(void*), void*, uint, char*, int); int _procsplhi(void); void _procsplx(int); int _sched(void); @@ -177,7 +165,8 @@ int _schedexec(Execargs*); void _schedexecwait(void); void _schedexit(Proc*); int _schedfork(Proc*); -void _scheduler(void*); +void _threadfree(Thread*); +void _threadscheduler(void*); void _systhreadinit(void); void _threadassert(char*); void __threaddebug(ulong, char*, ...); @@ -186,12 +175,15 @@ void _threadexitsall(char*); Proc* _threadgetproc(void); extern void _threadmultiproc(void); Proc* _threaddelproc(void); +void _threadinitproc(Proc*); +void _threadwaitkids(Proc*); void _threadsetproc(Proc*); void _threadinitstack(Thread*, void(*)(void*), void*); +void _threadlinkmain(void); void* _threadmalloc(long, int); void _threadnote(void*, char*); void _threadready(Thread*); -void _threadidle(void); +void _threadsetidle(int); void _threadsleep(_Procrend*); void _threadwakeup(_Procrend*); void _threadsignal(void); @@ -200,13 +192,15 @@ long _xdec(long*); void _xinc(long*); void _threadremove(Proc*, Thread*); void threadstatus(void); +void _threadstartproc(Proc*); +void _threadexitproc(char*); +void _threadexitallproc(char*); +extern int _threadnprocs; extern int _threaddebuglevel; extern char* _threadexitsallstatus; extern Pqueue _threadpq; extern Channel* _threadwaitchan; -extern Rgrp _threadrgrp; -extern void _stackfree(void*); #define DBGAPPL (1 << 0) #define DBGSCHED (1 << 16) @@ -216,8 +210,6 @@ extern void _stackfree(void*); #define DBGNOTE (1 << 20) #define DBGEXEC (1 << 21) -#define ioproc_arg(io, type) (va_arg((io)->arg, type)) -extern int _threadgetpid(void); extern void _threadmemset(void*, int, int); extern void _threaddebugmemset(void*, int, int); extern int _threadprocs; diff --git a/src/libthread/ucontext.c b/src/libthread/ucontext.c index 60f803d3..b1c5ef53 100644 --- a/src/libthread/ucontext.c +++ b/src/libthread/ucontext.c @@ -6,17 +6,18 @@ _threadinitstack(Thread *t, void (*f)(void*), void *arg) sigset_t zero; /* do a reasonable initialization */ - memset(&t->sched.uc, 0, sizeof t->sched.uc); + memset(&t->context.uc, 0, sizeof t->context.uc); sigemptyset(&zero); - sigprocmask(SIG_BLOCK, &zero, &t->sched.uc.uc_sigmask); + sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask); /* call getcontext, because on Linux makecontext neglects floating point */ - getcontext(&t->sched.uc); + getcontext(&t->context.uc); /* call makecontext to do the real work. */ - t->sched.uc.uc_stack.ss_sp = t->stk; - t->sched.uc.uc_stack.ss_size = t->stksize; - makecontext(&t->sched.uc, (void(*)())f, 1, arg); + /* leave a few words open on both ends */ + t->context.uc.uc_stack.ss_sp = t->stk+8; + t->context.uc.uc_stack.ss_size = t->stksize-16; + makecontext(&t->context.uc, (void(*)())f, 1, arg); } void -- cgit v1.2.3