From 195645536743aeb99eb336726823c38716cec02d Mon Sep 17 00:00:00 2001 From: rsc Date: Mon, 8 Nov 2004 16:03:20 +0000 Subject: more thread work --- src/libthread/channel.c | 18 +-- src/libthread/create.c | 2 +- src/libthread/exec-unix.c | 18 +-- src/libthread/exit.c | 2 +- src/libthread/fdwait.c | 159 ++------------------------- src/libthread/main.c | 69 ++++-------- src/libthread/mkfile | 12 +- src/libthread/pthread.c | 268 ++++++++++++++++++++++++++++++--------------- src/libthread/sched.c | 37 ++++++- src/libthread/sleep.c | 1 + src/libthread/thread.sh | 8 +- src/libthread/threadimpl.h | 34 ++++-- 12 files changed, 291 insertions(+), 337 deletions(-) (limited to 'src/libthread') diff --git a/src/libthread/channel.c b/src/libthread/channel.c index 14791d05..b488dc97 100644 --- a/src/libthread/channel.c +++ b/src/libthread/channel.c @@ -4,7 +4,7 @@ static Lock chanlock; /* central channel access lock */ static void enqueue(Alt*, Thread*); static void dequeue(Alt*); -static int altexec(Alt*, int); +static int altexec(Alt*); int _threadhighnentry; int _threadnalt; @@ -101,7 +101,7 @@ _alt(Alt *alts) { Alt *a, *xa; Channel *c; - int n, s; + int n; Thread *t; /* @@ -119,7 +119,6 @@ _alt(Alt *alts) t = _threadgetproc()->thread; if((t && t->moribund) || _threadexitsallstatus) yield(); /* won't return */ - s = _procsplhi(); lock(&chanlock); /* test whether any channels can proceed */ @@ -134,7 +133,6 @@ _alt(Alt *alts) c = xa->c; if(c==nil){ unlock(&chanlock); - _procsplx(s); return -1; } if(canexec(xa)) @@ -146,8 +144,7 @@ _alt(Alt *alts) /* nothing can proceed */ if(xa->op == CHANNOBLK){ unlock(&chanlock); - _procsplx(s); -_threadnalt++; + _threadnalt++; return xa - alts; } @@ -172,9 +169,7 @@ _threadnalt++; t->alt = alts; t->chan = Chanalt; t->altrend.l = &chanlock; - _procsplx(s); _threadsleep(&t->altrend); - s = _procsplhi(); /* dequeue from channels, find selected one */ a = nil; @@ -187,13 +182,12 @@ _threadnalt++; dequeue(xa); } unlock(&chanlock); - _procsplx(s); if(a == nil){ /* we were interrupted */ assert(c==(Channel*)~0); return -1; } }else{ - altexec(a, s); /* unlocks chanlock, does splx */ + altexec(a); /* unlocks chanlock, does splx */ } if(t) t->chan = Channone; @@ -445,7 +439,7 @@ altcopy(void *dst, void *src, int sz) } static int -altexec(Alt *a, int spl) +altexec(Alt *a) { volatile Alt *b; int i, n, otherop; @@ -492,7 +486,6 @@ altexec(Alt *a, int spl) _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)(void*)&chanlock); _threaddebug(DBGCHAN, "unlocking the chanlock"); unlock(&chanlock); - _procsplx(spl); return 1; } @@ -503,6 +496,5 @@ altexec(Alt *a, int spl) altcopy(buf, me, c->e); unlock(&chanlock); - _procsplx(spl); return 1; } diff --git a/src/libthread/create.c b/src/libthread/create.c index d2f4be25..4e6bc6de 100644 --- a/src/libthread/create.c +++ b/src/libthread/create.c @@ -138,7 +138,7 @@ proccreate(void (*f)(void*), void *arg, uint stacksize) p = _threadgetproc(); np = _newproc(); p->newproc = np; - p->schedfn = _threadstartproc; + p->schedfn = _kthreadstartproc; id = _newthread(np, f, arg, stacksize, nil, p->thread->grp); _sched(); /* call into scheduler to create proc XXX */ return id; diff --git a/src/libthread/exec-unix.c b/src/libthread/exec-unix.c index f69fedca..e5848eaa 100644 --- a/src/libthread/exec-unix.c +++ b/src/libthread/exec-unix.c @@ -10,20 +10,9 @@ _threadexec(Channel *pidc, int fd[3], char *prog, char *args[], int freeargs) int pfd[2]; int n, pid; char exitstr[ERRMAX]; - static int firstexec = 1; - static Lock lk; _threaddebug(DBGEXEC, "threadexec %s", prog); - if(firstexec){ - lock(&lk); - if(firstexec){ - firstexec = 0; - _threadfirstexec(); - } - unlock(&lk); - } - /* * We want threadexec to behave like exec; if exec succeeds, * never return, and if it fails, return with errstr set. @@ -53,7 +42,6 @@ _threadexec(Channel *pidc, int fd[3], char *prog, char *args[], int freeargs) _threaddebug(DBGSCHED, "exit after efork"); _exit(0); default: - _threadafterexec(); if(freeargs) free(args); break; @@ -88,14 +76,14 @@ Bad: void threadexec(Channel *pidc, int fd[3], char *prog, char *args[]) { - if(_callthreadexec(pidc, fd, prog, args, 0) >= 0) + if(_kthreadexec(pidc, fd, prog, args, 0) >= 0) threadexits(nil); } int threadspawn(int fd[3], char *prog, char *args[]) { - return _callthreadexec(nil, fd, prog, args, 0); + return _kthreadexec(nil, fd, prog, args, 0); } /* @@ -128,7 +116,7 @@ threadexecl(Channel *pidc, int fd[3], char *f, ...) args[n] = 0; va_end(arg); - if(_callthreadexec(pidc, fd, f, args, 1) >= 0) + if(_kthreadexec(pidc, fd, f, args, 1) >= 0) threadexits(nil); } diff --git a/src/libthread/exit.c b/src/libthread/exit.c index 79aa7c7f..69f3f738 100644 --- a/src/libthread/exit.c +++ b/src/libthread/exit.c @@ -32,7 +32,7 @@ threadexitsall(char *exitstr) _threadexitsallstatus = exitstr; _threaddebug(DBGSCHED, "_threadexitsallstatus set to %p", _threadexitsallstatus); /* leave */ - _threadexitallproc(exitstr); + _kthreadexitallproc(exitstr); } Channel* diff --git a/src/libthread/fdwait.c b/src/libthread/fdwait.c index a46ea456..d3983419 100644 --- a/src/libthread/fdwait.c +++ b/src/libthread/fdwait.c @@ -7,164 +7,25 @@ #include #include -#define debugpoll 0 - -#ifdef __APPLE__ -#include -enum { POLLIN=1, POLLOUT=2, POLLERR=4 }; -struct pollfd -{ - int fd; - int events; - int revents; -}; - -int -poll(struct pollfd *p, int np, int ms) +void +fdwait() { - int i, maxfd, n; - struct timeval tv, *tvp; fd_set rfd, wfd, efd; - - maxfd = -1; + FD_ZERO(&rfd); FD_ZERO(&wfd); FD_ZERO(&efd); - for(i=0; i maxfd) - maxfd = p[i].fd; - if(p[i].events & POLLIN) - FD_SET(p[i].fd, &rfd); - if(p[i].events & POLLOUT) - FD_SET(p[i].fd, &wfd); - FD_SET(p[i].fd, &efd); - } - - if(ms != -1){ - tv.tv_usec = (ms%1000)*1000; - tv.tv_sec = ms/1000; - tvp = &tv; - }else - tvp = nil; - - if(debugpoll){ - fprint(2, "select %d:", maxfd+1); - for(i=0; i<=maxfd; i++){ - if(FD_ISSET(i, &rfd)) - fprint(2, " r%d", i); - if(FD_ISSET(i, &wfd)) - fprint(2, " w%d", i); - if(FD_ISSET(i, &efd)) - fprint(2, " e%d", i); - } - fprint(2, "; tp=%p, t=%d.%d\n", tvp, tv.tv_sec, tv.tv_usec); - } - - n = select(maxfd+1, &rfd, &wfd, &efd, tvp); - - if(n <= 0) - return n; - - for(i=0; i -#endif - -/* - * Poll file descriptors in an idle loop. - */ - -typedef struct Poll Poll; - -struct Poll -{ - Channel *c; /* for sending back */ -}; - -static Channel *sleepchan[64]; -static int sleeptime[64]; -static int nsleep; - -static struct pollfd pfd[64]; -static struct Poll polls[64]; -static int npoll; - -static void -pollidle(void *v) -{ - int i, n, t; - uint now; - - for(;; yield()){ - if(debugpoll) fprint(2, "poll %d:", npoll); - for(i=0; iargc, a->argv); threadexits("threadmain"); } -static void -passer(void *x, char *msg) -{ - USED(x); - Waitmsg *w; - - if(strcmp(msg, "sys: usr2") == 0) - _exit(0); /* daemonize */ - else if(strcmp(msg, "sys: child") == 0){ - w = wait(); - if(w == nil) - _exit(1); - _exit(atoi(w->msg)); - }else - postnote(PNPROC, passtomainpid, msg); -} - int main(int argc, char **argv) { - int pid; Mainarg a; Proc *p; - sigset_t mask; /* - * Do daemonize hack here. + * In pthreads, threadmain is actually run in a subprocess, + * so that the main process can exit (if threaddaemonize is called). + * The main process relays notes to the subprocess. + * _Threadbackgroundsetup will return only in the subprocess. */ - if(_callsthreaddaemonize){ - passtomainpid = getpid(); - switch(pid = fork()){ - case -1: - sysfatal("fork: %r"); - - case 0: - /* continue executing */ - _threadmainpid = getppid(); - break; - - default: - /* wait for signal USR2 */ - notify(passer); - for(;;) - pause(); - _exit(0); - } - } + _threadbackgroundinit(); /* * Instruct QLock et al. to use our scheduling functions @@ -85,16 +49,17 @@ main(int argc, char **argv) /* * Install our own _threadsysfatal which takes down - * the whole conglomeration of procs. + * the whole confederation of procs. */ _sysfatal = _threadsysfatal; /* - * XXX Install our own jump handler. + * Install our own jump handler. */ + _notejmpbuf = _threadgetjmp; /* - * Install our own signal handlers. + * Install our own signal handler. */ notify(_threadnote); @@ -119,3 +84,15 @@ void _threadlinkmain(void) { } + +Jmp* +_threadgetjmp(void) +{ + static Jmp j; + Proc *p; + + p = _threadgetproc(); + if(p == nil) + return &j; + return &p->sigjmp; +} diff --git a/src/libthread/mkfile b/src/libthread/mkfile index e51d195f..6492bb93 100644 --- a/src/libthread/mkfile +++ b/src/libthread/mkfile @@ -1,15 +1,12 @@ <$PLAN9/src/mkhdr LIB=libthread.a -THREAD=`sh ./thread.sh` +SYSOFILES=`sh ./sysofiles.sh` OFILES=\ - $OBJTYPE.$O\ - $THREAD.$O\ - asm-$SYSNAME-$OBJTYPE.$O\ + $SYSOFILES\ channel.$O\ chanprint.$O\ create.$O\ - daemon.$O\ debug.$O\ exec-unix.$O\ exit.$O\ @@ -29,10 +26,10 @@ OFILES=\ main.$O\ memset.$O\ memsetd.$O\ - note.$O\ read9pmsg.$O\ ref.$O\ sched.$O\ + setproc.$O\ sleep.$O\ HFILES=\ @@ -57,6 +54,9 @@ tspawn: tspawn.$O $PLAN9/lib/$LIB trend: trend.$O $PLAN9/lib/$LIB $LD -o trend trend.$O $LDFLAGS -lthread -l9 +tsignal: tsignal.$O $PLAN9/lib/$LIB + $LD -o tsignal tsignal.$O $LDFLAGS -lthread -l9 + CLEANFILES=$CLEANFILES tprimes texec asm-Linux-ppc.$O: asm-Linux-386.s diff --git a/src/libthread/pthread.c b/src/libthread/pthread.c index 8d661c16..83d8b023 100644 --- a/src/libthread/pthread.c +++ b/src/libthread/pthread.c @@ -2,53 +2,36 @@ #include #include "threadimpl.h" -static int multi; -static Proc *theproc; -static pthread_key_t key; - /* - * Called before we go multiprocess. + * Basic kernel thread management. */ +static pthread_key_t key; + void -_threadmultiproc(void) +_kthreadinit(void) { - if(multi == 0){ - multi = 1; - pthread_key_create(&key, 0); - _threadsetproc(theproc); - } + pthread_key_create(&key, 0); } -/* - * Set the proc for the current pthread. - */ void -_threadsetproc(Proc *p) +_kthreadsetproc(Proc *p) { - if(!multi){ - theproc = p; - return; - } + sigset_t all; + + p->pthreadid = pthread_self(); + sigfillset(&all); + pthread_sigmask(SIG_SETMASK, &all, nil); pthread_setspecific(key, p); } -/* - * Get the proc for the current pthread. - */ Proc* -_threadgetproc(void) +_kthreadgetproc(void) { - if(!multi) - return theproc; - return pthread_getspecific(key); } -/* - * Called to start a new proc. - */ void -_threadstartproc(Proc *p) +_kthreadstartproc(Proc *p) { Proc *np; pthread_t tid; @@ -63,69 +46,43 @@ _threadstartproc(Proc *p) np->pthreadid = tid; } -/* - * Called to associate p with the current pthread. - */ void -_threadinitproc(Proc *p) -{ - p->pthreadid = pthread_self(); - _threadsetproc(p); -} - -/* - * Called to exit the current pthread. - */ -void -_threadexitproc(char *exitstr) +_kthreadexitproc(char *exitstr) { _threaddebug(DBGSCHED, "_pthreadexit"); pthread_exit(nil); } -/* - * Called to exit all pthreads. - */ void -_threadexitallproc(char *exitstr) +_kthreadexitallproc(char *exitstr) { _threaddebug(DBGSCHED, "_threadexitallproc"); exits(exitstr); } /* - * Called to poll for any kids of this pthread. - * Wait messages aren't restricted to a particular - * pthread, so we have a separate proc responsible - * for them. So this is a no-op. + * Exec. Pthreads does the hard work of making it possible + * for any thread to do the waiting, so this is pretty easy. + * We create a separate proc whose job is to wait for children + * and deliver wait messages. */ -void -_threadwaitkids(Proc *p) -{ -} +static Channel *_threadexecwaitchan; -/* - * Separate process to wait for child messages. - * Also runs signal handlers. - */ -static Channel *_threadexecchan; static void _threadwaitproc(void *v) { Channel *c; Waitmsg *w; - sigset_t none; - sigemptyset(&none); - pthread_sigmask(SIG_SETMASK, &none, 0); + _threadinternalproc(); USED(v); for(;;){ w = wait(); if(w == nil){ - if(errno == ECHILD) - recvul(_threadexecchan); + if(errno == ECHILD) /* wait for more */ + recvul(_threadexecwaitchan); continue; } if((c = _threadwaitchan) != nil) @@ -133,43 +90,182 @@ _threadwaitproc(void *v) else free(w); } -fprint(2, "_threadwaitproc exits\n"); + fprint(2, "_threadwaitproc exits\n"); /* not reached */ } + /* - * Called before the first exec. + * Call _threadexec in the right conditions. */ -void -_threadfirstexec(void) +int +_kthreadexec(Channel *c, int fd[3], char *prog, char *args[], int freeargs) { + static Lock lk; + int rv; + + if(!_threadexecwaitchan){ + lock(&lk); + if(!_threadexecwaitchan){ + _threadexecwaitchan = chancreate(sizeof(ulong), 1); + proccreate(_threadwaitproc, nil, 32*1024); + } + unlock(&lk); + } + rv = _threadexec(c, fd, prog, args, freeargs); + nbsendul(_threadexecwaitchan, 1); + return rv; } /* - * Called from mainlauncher before threadmain. + * Some threaded applications want to run in the background. + * Calling fork() and exiting in the parent will result in a child + * with a single pthread (if we are using pthreads), and will screw + * up our internal process info if we are using clone/rfork. + * Instead, apps should call threadbackground(), which takes + * care of this. + * + * _threadbackgroundinit is called from main. */ + +static int mainpid, passerpid; + +static void +passer(void *x, char *msg) +{ + Waitmsg *w; + + USED(x); + if(strcmp(msg, "sys: usr2") == 0) + _exit(0); /* daemonize */ + else if(strcmp(msg, "sys: child") == 0){ + /* child exited => so should we */ + w = wait(); + if(w == nil) + _exit(1); + _exit(atoi(w->msg)); + }else + postnote(PNGROUP, mainpid, msg); +} + void -_threadmaininit(void) +_threadbackgroundinit(void) { - _threadexecchan = chancreate(sizeof(ulong), 1); - proccreate(_threadwaitproc, nil, 32*1024); + int pid; + sigset_t mask; + + sigfillset(&mask); + pthread_sigmask(SIG_BLOCK, &mask, 0); + +return; + + passerpid = getpid(); + switch(pid = fork()){ + case -1: + sysfatal("fork: %r"); + + case 0: + rfork(RFNOTEG); + return; + + default: + break; + } + + mainpid = pid; + notify(passer); + notifyon("sys: child"); + notifyon("sys: usr2"); /* should already be on */ + for(;;) + pause(); + _exit(0); +} - /* - * Sleazy: decrement threadnprocs so that - * the existence of the _threadwaitproc proc - * doesn't keep us from exiting. - */ - lock(&_threadpq.lock); - --_threadnprocs; - /* print("change %d -> %d\n", _threadnprocs+1, _threadnprocs); */ - unlock(&_threadpq.lock); +void +threadbackground(void) +{ + if(passerpid <= 1) + return; + postnote(PNPROC, passerpid, "sys: usr2"); } /* - * Called after forking the exec child. + * Notes. */ +Channel *_threadnotechan; +static ulong sigs; +static Lock _threadnotelk; +static void _threadnoteproc(void*); +extern int _p9strsig(char*); +extern char *_p9sigstr(int); + +Channel* +threadnotechan(void) +{ + if(_threadnotechan == nil){ + lock(&_threadnotelk); + if(_threadnotechan == nil){ + _threadnotechan = chancreate(sizeof(char*), 1); + proccreate(_threadnoteproc, nil, 32*1024); + } + unlock(&_threadnotelk); + } + return _threadnotechan; +} + +void +_threadnote(void *x, char *msg) +{ + USED(x); + + if(_threadexitsallstatus) + _kthreadexitproc(_threadexitsallstatus); + + if(strcmp(msg, "sys: usr2") == 0) + noted(NCONT); + + if(_threadnotechan == nil) + noted(NDFLT); + + sigs |= 1<<_p9strsig(msg); + noted(NCONT); +} + +void +_threadnoteproc(void *x) +{ + int i; + sigset_t none; + Channel *c; + + _threadinternalproc(); + sigemptyset(&none); + pthread_sigmask(SIG_SETMASK, &none, 0); + + c = _threadnotechan; + for(;;){ + if(sigs == 0) + pause(); + for(i=0; i<32; i++){ + if((sigs&(1<nthreads==0 || (p->nthreads==1 && p->idle)) return nil; + _threadschednote(); lock(&p->readylock); q = &p->ready; if(q->head == nil){ @@ -180,7 +180,10 @@ runthread(Proc *p) */ q->asleep = 1; p->rend.l = &p->readylock; - _procsleep(&p->rend); + while(q->asleep){ + _procsleep(&p->rend); + _threadschednote(); + } /* * Maybe we were awakened to exit? @@ -284,6 +287,25 @@ _threadsetidle(int id) unlock(&p->readylock); } +/* + * Mark proc as internal so that if all but internal procs exit, we exit. + */ +void +_threadinternalproc(void) +{ + Proc *p; + + p = _threadgetproc(); + if(p->internal) + return; + lock(&_threadpq.lock); + if(p->internal == 0){ + p->internal = 1; + --_threadnprocs; + } + unlock(&_threadpq.lock); +} + static void schedexit(Proc *p) { @@ -301,7 +323,10 @@ schedexit(Proc *p) break; } } - n = --_threadnprocs; + if(p->internal) + n = _threadnprocs; + else + n = --_threadnprocs; unlock(&_threadpq.lock); strncpy(ex, p->exitstr, sizeof ex); @@ -309,10 +334,10 @@ schedexit(Proc *p) free(p); if(n == 0){ _threaddebug(DBGSCHED, "procexit; no more procs"); - _threadexitallproc(ex); + _kthreadexitallproc(ex); }else{ _threaddebug(DBGSCHED, "procexit"); - _threadexitproc(ex); + _kthreadexitproc(ex); } } diff --git a/src/libthread/sleep.c b/src/libthread/sleep.c index d6c4dac4..4b0d82a1 100644 --- a/src/libthread/sleep.c +++ b/src/libthread/sleep.c @@ -36,3 +36,4 @@ _threadwakeup(_Procrend *r) _threadready(t); unlock(&t->proc->lock); } + diff --git a/src/libthread/thread.sh b/src/libthread/thread.sh index e3984b79..818b509f 100644 --- a/src/libthread/thread.sh +++ b/src/libthread/thread.sh @@ -2,13 +2,13 @@ if [ `uname` = Linux ] then - case "`uname | awk '{print $3}'`" in - *) - echo Linux-clone - ;; + case `uname -r` in 2.[6789]*) echo pthread ;; + *) + echo Linux-clone + ;; esac else echo pthread diff --git a/src/libthread/threadimpl.h b/src/libthread/threadimpl.h index 8d7a7a75..0ad4568e 100644 --- a/src/libthread/threadimpl.h +++ b/src/libthread/threadimpl.h @@ -28,6 +28,14 @@ typedef struct Proc Proc; typedef struct Tqueue Tqueue; typedef struct Pqueue Pqueue; typedef struct Execargs Execargs; +typedef struct Jmp Jmp; + +/* sync with ../lib9/notify.c */ +struct Jmp +{ + p9jmp_buf b; +}; + typedef enum { @@ -126,6 +134,7 @@ struct Proc Proc *newproc; /* fork argument */ char exitstr[ERRMAX]; /* exit status */ + int internal; int rforkflag; int nthreads; Tqueue threads; /* All threads of this proc */ @@ -138,6 +147,7 @@ struct Proc uint nextID; /* ID of most recently created thread */ Proc *next; /* linked list of Procs */ + Jmp sigjmp; /* for notify implementation */ void (*schedfn)(Proc*); /* function to call in scheduler */ @@ -161,8 +171,6 @@ struct Proc void _swaplabel(Label*, Label*); Proc* _newproc(void); int _newthread(Proc*, void(*)(void*), void*, uint, char*, int); -int _procsplhi(void); -void _procsplx(int); int _sched(void); int _schedexec(Execargs*); void _schedexecwait(void); @@ -178,14 +186,14 @@ void _threadexitsall(char*); Proc* _threadgetproc(void); extern void _threadmultiproc(void); Proc* _threaddelproc(void); -void _threadinitproc(Proc*); -void _threadwaitkids(Proc*); +void _kthreadinitproc(Proc*); void _threadsetproc(Proc*); void _threadinitstack(Thread*, void(*)(void*), void*); void _threadlinkmain(void); void* _threadmalloc(long, int); void _threadnote(void*, char*); void _threadready(Thread*); +void _threadschednote(void); void _threadsetidle(int); void _threadsleep(_Procrend*); void _threadwakeup(_Procrend*); @@ -195,12 +203,18 @@ long _xdec(long*); void _xinc(long*); void _threadremove(Proc*, Thread*); void threadstatus(void); -void _threadstartproc(Proc*); -void _threadexitproc(char*); -void _threadexitallproc(char*); void _threadefork(int[3], int[2], char*, char**); +Jmp* _threadgetjmp(void); +void _kthreadinit(void); +void _kthreadsetproc(Proc*); +Proc* _kthreadgetproc(void); +void _kthreadstartproc(Proc*); +void _kthreadexitproc(char*); +void _kthreadexitallproc(char*); +void _threadinternalproc(void); +void _threadbackgroundinit(void); +void _kmaininit(void); -extern int _threadmainpid; extern int _threadnprocs; extern int _threaddebuglevel; extern char* _threadexitsallstatus; @@ -222,8 +236,6 @@ extern void _threadstacklimit(void*, void*); extern void _procdelthread(Proc*, Thread*); extern void _procaddthread(Proc*, Thread*); -extern void _threadafterexec(void); extern void _threadmaininit(void); -extern void _threadfirstexec(void); extern int _threadexec(Channel*, int[3], char*, char*[], int); -extern int _callthreadexec(Channel*, int[3], char*, char*[], int); +extern int _kthreadexec(Channel*, int[3], char*, char*[], int); -- cgit v1.2.3