diff options
author | rsc <devnull@localhost> | 2004-09-17 00:38:29 +0000 |
---|---|---|
committer | rsc <devnull@localhost> | 2004-09-17 00:38:29 +0000 |
commit | 06bb4ed20d855b60e39c1125d8d715ba8892265b (patch) | |
tree | 8294b537f5b81809671985903e31c4835c41cd04 /src | |
parent | 984e353160593b20d1e2944e1f2e9ce2117c8490 (diff) | |
download | plan9port-06bb4ed20d855b60e39c1125d8d715ba8892265b.tar.gz plan9port-06bb4ed20d855b60e39c1125d8d715ba8892265b.tar.bz2 plan9port-06bb4ed20d855b60e39c1125d8d715ba8892265b.zip |
Rewrite to remove dependence on rendezvous and its bizarre
data structures. Makes it easier to use pthreads too.
Still need to add code for non-pthreads systems.
Just a checkpoint to switch work to another machine.
Diffstat (limited to 'src')
-rw-r--r-- | src/lib9/ffork-Linux.c | 4 | ||||
-rw-r--r-- | src/lib9/lock.c | 70 | ||||
-rw-r--r-- | src/lib9/qlock.c | 87 | ||||
-rw-r--r-- | src/libthread/channel.c | 43 | ||||
-rw-r--r-- | src/libthread/create.c | 2 | ||||
-rw-r--r-- | src/libthread/main.c | 7 | ||||
-rw-r--r-- | src/libthread/mkfile | 7 | ||||
-rw-r--r-- | src/libthread/rendez.c | 104 | ||||
-rw-r--r-- | src/libthread/sched.c | 11 | ||||
-rw-r--r-- | src/libthread/threadimpl.h | 13 | ||||
-rw-r--r-- | src/libthread/tprimes.c | 1 |
11 files changed, 151 insertions, 198 deletions
diff --git a/src/lib9/ffork-Linux.c b/src/lib9/ffork-Linux.c index f4704c60..4f976359 100644 --- a/src/lib9/ffork-Linux.c +++ b/src/lib9/ffork-Linux.c @@ -1,3 +1,6 @@ +#include "ffork-pthread.c" + +#ifdef OLD /* * Is nothing simple? * @@ -191,3 +194,4 @@ getfforkid(void) return getpid(); } +#endif diff --git a/src/lib9/lock.c b/src/lib9/lock.c index 80f65b33..eb3ea21d 100644 --- a/src/lib9/lock.c +++ b/src/lib9/lock.c @@ -2,52 +2,54 @@ #include <unistd.h> #include <sys/time.h> #include <sched.h> +#include <errno.h> #include <libc.h> -int _ntas; -static int -_xtas(void *v) +static pthread_mutex_t initmutex = PTHREAD_MUTEX_INITIALIZER; + +static void +lockinit(Lock *lk) { - int x; + pthread_mutexattr_t attr; - _ntas++; - x = _tas(v); - return x; + pthread_mutex_lock(&initmutex); + if(lk->init == 0){ + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL); + pthread_mutex_init(&lk->mutex, &attr); + pthread_mutexattr_destroy(&attr); + lk->init = 1; + } + pthread_mutex_unlock(&initmutex); } -int -canlock(Lock *l) +void +lock(Lock *lk) { - return !_xtas(&l->val); + if(!lk->init) + lockinit(lk); + if(pthread_mutex_lock(&lk->mutex) != 0) + abort(); } -void -unlock(Lock *l) +int +canlock(Lock *lk) { - l->val = 0; + int r; + + if(!lk->init) + lockinit(lk); + r = pthread_mutex_trylock(&lk->mutex); + if(r == 0) + return 1; + if(r == EBUSY) + return 0; + abort(); } void -lock(Lock *lk) +unlock(Lock *lk) { - int i; - - /* once fast */ - if(!_xtas(&lk->val)) - return; - /* a thousand times pretty fast */ - for(i=0; i<1000; i++){ - if(!_xtas(&lk->val)) - return; - sched_yield(); - } - /* now nice and slow */ - for(i=0; i<1000; i++){ - if(!_xtas(&lk->val)) - return; - usleep(100*1000); - } - /* take your time */ - while(_xtas(&lk->val)) - usleep(1000*1000); + if(pthread_mutex_unlock(&lk->mutex) != 0) + abort(); } diff --git a/src/lib9/qlock.c b/src/lib9/qlock.c index d83a4b7f..798b08f3 100644 --- a/src/lib9/qlock.c +++ b/src/lib9/qlock.c @@ -16,13 +16,15 @@ enum Waking, }; -static ulong (*_rendezvousp)(ulong, ulong) = rendezvous; +static void (*procsleep)(_Procrend*) = _procsleep; +static void (*procwakeup)(_Procrend*) = _procwakeup; /* this gets called by the thread library ONLY to get us to use its rendezvous */ void -_qlockinit(ulong (*r)(ulong, ulong)) +_qlockinit(void (*sleep)(_Procrend*), void (*wakeup)(_Procrend*)) { - _rendezvousp = r; + procsleep = sleep; + procwakeup = wakeup; } /* find a free shared memory location to queue ourselves in */ @@ -39,7 +41,7 @@ getqlp(void) fprint(2, "qlock: out of qlp\n"); abort(); } - if(_tas(&(p->inuse)) == 0){ + if(canlock(&p->inuse)){ ql.p = p; p->next = nil; break; @@ -70,13 +72,11 @@ qlock(QLock *q) p->next = mp; q->tail = mp; mp->state = Queuing; + mp->rend.l = &q->lock; + _procsleep(&mp->rend); unlock(&q->lock); - - /* wait */ - while((*_rendezvousp)((ulong)mp, 1) == ~0) - ; assert(mp->state == Waking); - mp->inuse = 0; + unlock(&mp->inuse); } void @@ -91,10 +91,9 @@ qunlock(QLock *q) q->head = p->next; if(q->head == nil) q->tail = nil; - unlock(&q->lock); p->state = Waking; - while((*_rendezvousp)((ulong)p, 0x12345) == ~0) - ; + _procwakeup(&p->rend); + unlock(&q->lock); return; } q->locked = 0; @@ -137,13 +136,11 @@ rlock(RWLock *q) q->tail = mp; mp->next = nil; mp->state = QueuingR; + mp->rend.l = &q->lock; + _procsleep(&mp->rend); unlock(&q->lock); - - /* wait in kernel */ - while((*_rendezvousp)((ulong)mp, 1) == ~0) - ; assert(mp->state == Waking); - mp->inuse = 0; + unlock(&mp->inuse); } int @@ -181,12 +178,11 @@ runlock(RWLock *q) if(q->head == 0) q->tail = 0; q->writer = 1; - unlock(&q->lock); /* wakeup waiter */ p->state = Waking; - while((*_rendezvousp)((ulong)p, 0) == ~0) - ; + _procwakeup(&p->rend); + unlock(&q->lock); } void @@ -212,13 +208,13 @@ wlock(RWLock *q) q->tail = mp; mp->next = nil; mp->state = QueuingW; - unlock(&q->lock); /* wait in kernel */ - while((*_rendezvousp)((ulong)mp, 1) == ~0) - ; + mp->rend.l = &q->lock; + _procsleep(&mp->rend); + unlock(&q->lock); assert(mp->state == Waking); - mp->inuse = 0; + unlock(&mp->inuse); } int @@ -256,10 +252,9 @@ wunlock(RWLock *q) q->head = p->next; if(q->head == nil) q->tail = nil; - unlock(&q->lock); p->state = Waking; - while((*_rendezvousp)((ulong)p, 0) == ~0) - ; + _procwakeup(&p->rend); + unlock(&q->lock); return; } @@ -274,8 +269,7 @@ wunlock(RWLock *q) q->head = p->next; q->readers++; p->state = Waking; - while((*_rendezvousp)((ulong)p, 0) == ~0) - ; + _procwakeup(&p->rend); } if(q->head == nil) q->tail = nil; @@ -315,20 +309,17 @@ rsleep(Rendez *r) r->l->head = t->next; if(r->l->head == nil) r->l->tail = nil; - unlock(&r->l->lock); t->state = Waking; - while((*_rendezvousp)((ulong)t, 0x12345) == ~0) - ; - }else{ + _procwakeup(&t->rend); + }else r->l->locked = 0; - unlock(&r->l->lock); - } /* wait for a wakeup */ - while((*_rendezvousp)((ulong)me, 0x23456) == ~0) - ; + me->rend.l = &r->l->lock; + _procsleep(&me->rend); + assert(me->state == Waking); - me->inuse = 0; + unlock(&me->inuse); if(!r->l->locked){ fprint(2, "rsleep: not locked after wakeup\n"); abort(); @@ -384,3 +375,23 @@ rwakeupall(Rendez *r) ; return i; } + +void +_procsleep(_Procrend *rend) +{ +//print("sleep %p %d\n", rend, getpid()); + pthread_cond_init(&rend->cond, 0); + rend->asleep = 1; + while(rend->asleep) + pthread_cond_wait(&rend->cond, &rend->l->mutex); + pthread_cond_destroy(&rend->cond); +} + +void +_procwakeup(_Procrend *rend) +{ +//print("wakeup %p\n", rend); + rend->asleep = 0; + pthread_cond_signal(&rend->cond); +} + diff --git a/src/libthread/channel.c b/src/libthread/channel.c index 68eaa95e..cdb8376e 100644 --- a/src/libthread/channel.c +++ b/src/libthread/channel.c @@ -2,7 +2,7 @@ static Lock chanlock; /* central channel access lock */ -static void enqueue(Alt*, Channel**); +static void enqueue(Alt*, Thread*); static void dequeue(Alt*); static int altexec(Alt*, int); @@ -29,7 +29,7 @@ canexec(Alt *a) /* are there senders or receivers blocked? */ otherop = (CHANSND+CHANRCV) - a->op; for(i=0; i<c->nentry; i++) - if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){ + if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread==nil){ _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c); return 1; } @@ -100,9 +100,8 @@ static int _alt(Alt *alts) { Alt *a, *xa; - Channel *volatile c; + Channel *c; int n, s; - ulong r; Thread *t; /* @@ -131,7 +130,7 @@ _alt(Alt *alts) xa->entryno = -1; if(xa->op == CHANNOP) continue; - + c = xa->c; if(c==nil){ unlock(&chanlock); @@ -153,11 +152,11 @@ _threadnalt++; } /* enqueue on all channels. */ - c = nil; + t->altc = nil; for(xa=alts; xa->op!=CHANEND; xa++){ if(xa->op==CHANNOP) continue; - enqueue(xa, (Channel**)&c); + enqueue(xa, t); } /* @@ -166,25 +165,20 @@ _threadnalt++; * is interrupted -- someone else might come * along and try to rendezvous with us, so * we need to be here. + * + * actually, now we're assuming no interrupts. */ - Again: + /*Again:*/ t->alt = alts; t->chan = Chanalt; - - unlock(&chanlock); + t->altrend.l = &chanlock; _procsplx(s); - r = _threadrendezvous((ulong)&c, 0); + _threadsleep(&t->altrend); s = _procsplhi(); - lock(&chanlock); - - if(r==~0){ /* interrupted */ - if(c!=nil) /* someone will meet us; go back */ - goto Again; - c = (Channel*)~0; /* so no one tries to meet us */ - } /* dequeue from channels, find selected one */ a = nil; + c = t->altc; for(xa=alts; xa->op!=CHANEND; xa++){ if(xa->op==CHANNOP) continue; @@ -385,12 +379,12 @@ if(c->nentry > _threadhighnentry) _threadhighnentry = c->nentry; } static void -enqueue(Alt *a, Channel **c) +enqueue(Alt *a, Thread *t) { int i; _threaddebug(DBGCHAN, "Queueing alt %p on channel %p", a, a->c); - a->tag = c; + a->thread = t; i = emptyentry(a->c); a->c->qentry[i] = a; } @@ -466,7 +460,7 @@ altexec(Alt *a, int spl) b = nil; me = a->v; for(i=0; i<c->nentry; i++) - if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil) + if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread==nil) if(nrand(++n) == 0) b = c->qentry[i]; if(b != nil){ @@ -493,13 +487,12 @@ altexec(Alt *a, int spl) else altcopy(waiter, me, c->e); } - *b->tag = c; /* commits us to rendezvous */ + b->thread->altc = c; + _procwakeup(&b->thread->altrend); + _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)(void*)&chanlock); _threaddebug(DBGCHAN, "unlocking the chanlock"); unlock(&chanlock); _procsplx(spl); - _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)(void*)&chanlock); - while(_threadrendezvous((ulong)b->tag, 0) == ~0) - ; return 1; } diff --git a/src/libthread/create.c b/src/libthread/create.c index f5f0d6c0..5dee4c48 100644 --- a/src/libthread/create.c +++ b/src/libthread/create.c @@ -2,6 +2,7 @@ Pqueue _threadpq; int _threadprocs; +int __pthread_nonstandard_stacks; static int nextID(void); @@ -21,6 +22,7 @@ newthread(Proc *p, void (*f)(void *arg), void *arg, uint stacksize, char *name, Thread *t; char *s; + __pthread_nonstandard_stacks = 1; if(stacksize < 32) sysfatal("bad stacksize %d", stacksize); t = _threadmalloc(sizeof(Thread), 1); diff --git a/src/libthread/main.c b/src/libthread/main.c index 0a2fccb7..6129f3e0 100644 --- a/src/libthread/main.c +++ b/src/libthread/main.c @@ -40,7 +40,7 @@ main(int argc, char **argv) //_threaddebuglevel = (DBGSCHED|DBGCHAN|DBGREND)^~0; _systhreadinit(); - _qlockinit(_threadrendezvous); + _qlockinit(_threadsleep, _threadwakeup); _sysfatal = _threadsysfatal; notify(_threadnote); if(mainstacksize == 0) @@ -49,8 +49,9 @@ main(int argc, char **argv) a = _threadmalloc(sizeof *a, 1); a->argc = argc; a->argv = argv; - +malloc(10); p = _newproc(mainlauncher, a, mainstacksize, "threadmain", 0, 0); +malloc(10); _schedinit(p); abort(); /* not reached */ return 0; @@ -61,7 +62,9 @@ mainlauncher(void *arg) { Mainarg *a; +malloc(10); a = arg; +malloc(10); threadmain(a->argc, a->argv); threadexits("threadmain"); } diff --git a/src/libthread/mkfile b/src/libthread/mkfile index 221cb818..d1a095e7 100644 --- a/src/libthread/mkfile +++ b/src/libthread/mkfile @@ -23,7 +23,6 @@ OFILES=\ ioreadn.$O\ iosleep.$O\ iowrite.$O\ - kill.$O\ lib.$O\ main.$O\ memset.$O\ @@ -43,13 +42,13 @@ HFILES=\ <$PLAN9/src/mksyslib tprimes: tprimes.$O $PLAN9/lib/$LIB - $LD -o tprimes tprimes.$O $LDFLAGS -lthread -l9 -lfmt -lutf + $LD -o tprimes tprimes.$O $LDFLAGS -lthread -l9 texec: texec.$O $PLAN9/lib/$LIB - $LD -o texec texec.$O $LDFLAGS -lthread -l9 -lfmt -lutf + $LD -o texec texec.$O $LDFLAGS -lthread -l9 trend: trend.$O $PLAN9/lib/$LIB - $LD -o trend trend.$O $LDFLAGS -lthread -l9 -lfmt -lutf + $LD -o trend trend.$O $LDFLAGS -lthread -l9 CLEANFILES=$CLEANFILES tprimes texec diff --git a/src/libthread/rendez.c b/src/libthread/rendez.c index 70eb0ae8..4451fa4d 100644 --- a/src/libthread/rendez.c +++ b/src/libthread/rendez.c @@ -1,104 +1,38 @@ #include "threadimpl.h" -Rgrp _threadrgrp; -static int isdirty; int _threadhighnrendez; int _threadnrendez; -static int nrendez; -static ulong -finish(Thread *t, ulong val) -{ - ulong ret; - - ret = t->rendval; - t->rendval = val; - while(t->state == Running) - sleep(0); - lock(&t->proc->lock); - if(t->state == Rendezvous){ /* not always true: might be Dead */ - t->state = Ready; - _threadready(t); - } - unlock(&t->proc->lock); - return ret; -} - -ulong -_threadrendezvous(ulong tag, ulong val) +void +_threadsleep(_Procrend *r) { - ulong ret; - Thread *t, **l; - - lock(&_threadrgrp.lock); -_threadnrendez++; - l = &_threadrgrp.hash[tag%nelem(_threadrgrp.hash)]; - for(t=*l; t; l=&t->rendhash, t=*l){ - if(t->rendtag==tag){ - _threaddebug(DBGREND, "Rendezvous with thread %d.%d", t->proc->pid, t->id); - *l = t->rendhash; - ret = finish(t, val); - --nrendez; - unlock(&_threadrgrp.lock); - return ret; - } - } + Thread *t; - /* Going to sleep here. */ t = _threadgetproc()->thread; - t->rendbreak = 0; - t->inrendez = 1; - t->rendtag = tag; - t->rendval = val; - t->rendhash = *l; - *l = t; - ++nrendez; - if(nrendez > _threadhighnrendez) - _threadhighnrendez = nrendez; - _threaddebug(DBGREND, "Rendezvous for tag %lud (m=%d)", t->rendtag, t->moribund); - unlock(&_threadrgrp.lock); + r->arg = t; t->nextstate = Rendezvous; + t->inrendez = 1; + unlock(r->l); _sched(); t->inrendez = 0; - _threaddebug(DBGREND, "Woke after rendezvous; val is %lud", t->rendval); - return t->rendval; -} - -/* - * This is called while holding _threadpq.lock and p->lock, - * so we can't lock _threadrgrp.lock. Instead our caller has - * to call _threadbreakrendez after dropping those locks. - */ -void -_threadflagrendez(Thread *t) -{ - t->rendbreak = 1; - isdirty = 1; + lock(r->l); } void -_threadbreakrendez(void) +_threadwakeup(_Procrend *r) { - int i; - Thread *t, **l; + Thread *t; - if(isdirty == 0) - return; - lock(&_threadrgrp.lock); - if(isdirty == 0){ - unlock(&_threadrgrp.lock); + t = r->arg; + while(t->state == Running) + sleep(0); + lock(&t->proc->lock); + if(t->state == Dead){ + unlock(&t->proc->lock); return; } - isdirty = 0; - for(i=0; i<nelem(_threadrgrp.hash); i++){ - l = &_threadrgrp.hash[i]; - for(t=*l; t; t=*l){ - if(t->rendbreak){ - *l = t->rendhash; - finish(t, ~0); - }else - l=&t->rendhash; - } - } - unlock(&_threadrgrp.lock); + assert(t->state == Rendezvous && t->inrendez); + t->state = Ready; + _threadready(t); + unlock(&t->proc->lock); } diff --git a/src/libthread/sched.c b/src/libthread/sched.c index df8014b6..f9e680fd 100644 --- a/src/libthread/sched.c +++ b/src/libthread/sched.c @@ -36,6 +36,7 @@ _schedinit(void *arg) unlock(&p->lock); while(_setlabel(&p->sched)) ; +malloc(10); _threaddebug(DBGSCHED, "top of schedinit, _threadexitsallstatus=%p", _threadexitsallstatus); if(_threadexitsallstatus) _exits(_threadexitsallstatus); @@ -57,8 +58,9 @@ _schedinit(void *arg) p->threads.tail = t->prevt; unlock(&p->lock); if(t->inrendez){ - _threadflagrendez(t); - _threadbreakrendez(); + abort(); + // _threadflagrendez(t); + // _threadbreakrendez(); } _stackfree(t->stk); free(t->cmdname); @@ -183,15 +185,18 @@ _sched(void) Resched: p = _threadgetproc(); //fprint(2, "p %p\n", p); +malloc(10); if((t = p->thread) != nil){ needstack(512); // _threaddebug(DBGSCHED, "pausing, state=%s set %p goto %p", // psstate(t->state), &t->sched, &p->sched); +print("swap\n"); if(_setlabel(&t->sched)==0) _gotolabel(&p->sched); _threadstacklimit(t->stk, t->stk+t->stksize); return p->nsched++; }else{ +malloc(10); t = runthread(p); if(t == nil){ _threaddebug(DBGSCHED, "all threads gone; exiting"); @@ -206,6 +211,8 @@ Resched: } t->state = Running; t->nextstate = Ready; +malloc(10); +print("gotolabel\n"); _gotolabel(&t->sched); for(;;); } diff --git a/src/libthread/threadimpl.h b/src/libthread/threadimpl.h index 7e44e646..851186e3 100644 --- a/src/libthread/threadimpl.h +++ b/src/libthread/threadimpl.h @@ -88,11 +88,9 @@ struct Thread char *cmdname; /* ptr to name of thread */ - int inrendez; - Thread *rendhash; /* Trgrp linked list */ - ulong rendtag; /* rendezvous tag */ - ulong rendval; /* rendezvous value */ - int rendbreak; /* rendezvous has been taken */ + int inrendez; + Channel *altc; + _Procrend altrend; Chanstate chan; /* which channel operation is current */ Alt *alt; /* pointer to current alt structure (debugging) */ @@ -179,11 +177,9 @@ int _schedfork(Proc*); void _schedinit(void*); void _systhreadinit(void); void _threadassert(char*); -void _threadbreakrendez(void); void __threaddebug(ulong, char*, ...); #define _threaddebug if(!_threaddebuglevel){}else __threaddebug void _threadexitsall(char*); -void _threadflagrendez(Thread*); Proc* _threadgetproc(void); extern void _threadmultiproc(void); Proc* _threaddelproc(void); @@ -193,7 +189,8 @@ void* _threadmalloc(long, int); void _threadnote(void*, char*); void _threadready(Thread*); void _threadidle(void); -ulong _threadrendezvous(ulong, ulong); +void _threadsleep(_Procrend*); +void _threadwakeup(_Procrend*); void _threadsignal(void); void _threadsysfatal(char*, va_list); long _xdec(long*); diff --git a/src/libthread/tprimes.c b/src/libthread/tprimes.c index 89d30c03..0e719451 100644 --- a/src/libthread/tprimes.c +++ b/src/libthread/tprimes.c @@ -41,6 +41,7 @@ threadmain(int argc, char **argv) int i; Channel *c; +malloc(10); ARGBEGIN{ case 'D': _threaddebuglevel = atoi(ARGF()); |