From 619085f0b4a85104ef6c7496f9ce1f46e9b17c82 Mon Sep 17 00:00:00 2001 From: rsc Date: Sat, 25 Dec 2004 21:57:50 +0000 Subject: more new libthread --- src/libthread/channel.c | 412 +++++++++++++++++++++++++++++++++++++++++++++ src/libthread/ioproc.c | 130 ++++++++++++++ src/libthread/ioproc.h | 14 ++ src/libthread/mkfile | 39 +++++ src/libthread/pthread.c | 108 ++++++++++++ src/libthread/ref.c | 27 +++ src/libthread/threadimpl.h | 70 ++++++++ 7 files changed, 800 insertions(+) create mode 100644 src/libthread/channel.c create mode 100644 src/libthread/ioproc.c create mode 100644 src/libthread/ioproc.h create mode 100644 src/libthread/mkfile create mode 100644 src/libthread/pthread.c create mode 100644 src/libthread/ref.c create mode 100644 src/libthread/threadimpl.h diff --git a/src/libthread/channel.c b/src/libthread/channel.c new file mode 100644 index 00000000..3c9614e6 --- /dev/null +++ b/src/libthread/channel.c @@ -0,0 +1,412 @@ +#include "u.h" +#include "libc.h" +#include "thread.h" +#include "threadimpl.h" + +/* + * One can go through a lot of effort to avoid this global lock. + * You have to put locks in all the channels and all the Alt + * structures. At the beginning of an alt you have to lock all + * the channels, but then to try to actually exec an op you + * have to lock the other guy's alt structure, so that other + * people aren't trying to use him in some other op at the + * same time. + * + * For Plan 9 apps, it's just not worth the extra effort. + */ +static QLock chanlock; + +Channel* +chancreate(int elemsize, int bufsize) +{ + Channel *c; + + c = malloc(sizeof *c+bufsize*elemsize); + memset(c, 0, sizeof *c); + c->elemsize = elemsize; + c->bufsize = bufsize; + c->nbuf = 0; + c->buf = (uchar*)(c+1); + return c; +} + +void +chansetname(Channel *c, char *fmt, ...) +{ + char *name; + va_list arg; + + va_start(arg, fmt); + name = vsmprint(fmt, arg); + va_end(arg); + free(c->name); + c->name = name; +} + +/* bug - work out races */ +void +chanfree(Channel *c) +{ + if(c == nil) + return; + free(c->name); + free(c->arecv.a); + free(c->asend.a); + free(c); +} + +static void +addarray(_Altarray *a, Alt *alt) +{ + if(a->n == a->m){ + a->m += 16; + a->a = realloc(a->a, a->m*sizeof a->a[0]); + } + a->a[a->n++] = alt; +} + +static void +delarray(_Altarray *a, int i) +{ + --a->n; + a->a[i] = a->a[a->n]; +} + +/* + * doesn't really work for things other than CHANSND and CHANRCV + * but is only used as arg to chanarray, which can handle it + */ +#define otherop(op) (CHANSND+CHANRCV-(op)) + +static _Altarray* +chanarray(Channel *c, uint op) +{ + switch(op){ + default: + return nil; + case CHANSND: + return &c->asend; + case CHANRCV: + return &c->arecv; + } +} + +static int +altcanexec(Alt *a) +{ + _Altarray *ar; + Channel *c; + + if(a->op == CHANNOP) + return 0; + c = a->c; + if(c->bufsize == 0){ + ar = chanarray(c, otherop(a->op)); + return ar && ar->n; + }else{ + switch(a->op){ + default: + return 0; + case CHANSND: + return c->nbuf < c->bufsize; + case CHANRCV: + return c->nbuf > 0; + } + } +} + +static void +altqueue(Alt *a) +{ + _Altarray *ar; + + ar = chanarray(a->c, a->op); + addarray(ar, a); +} + +static void +altdequeue(Alt *a) +{ + int i; + _Altarray *ar; + + ar = chanarray(a->c, a->op); + if(ar == nil){ + fprint(2, "bad use of altdequeue op=%d\n", a->op); + abort(); + } + + for(i=0; in; i++) + if(ar->a[i] == a){ + delarray(ar, i); + return; + } + fprint(2, "cannot find self in altdq\n"); + abort(); +} + +static void +altalldequeue(Alt *a) +{ + int i; + + for(i=0; a[i].op!=CHANEND && a[i].op!=CHANNOBLK; i++) + if(a[i].op != CHANNOP) + altdequeue(&a[i]); +} + +static void +amove(void *dst, void *src, uint n) +{ + if(dst){ + if(src == nil) + memset(dst, 0, n); + else + memmove(dst, src, n); + } +} + +/* + * Actually move the data around. There are up to three + * players: the sender, the receiver, and the channel itself. + * If the channel is unbuffered or the buffer is empty, + * data goes from sender to receiver. If the channel is full, + * the receiver removes some from the channel and the sender + * gets to put some in. + */ +static void +altcopy(Alt *s, Alt *r) +{ + Alt *t; + Channel *c; + uchar *cp; + + /* + * Work out who is sender and who is receiver + */ + if(s == nil && r == nil) + return; + assert(s != nil); + c = s->c; + if(s->op == CHANRCV){ + t = s; + s = r; + r = t; + } + assert(s==nil || s->op == CHANSND); + assert(r==nil || r->op == CHANRCV); + + /* + * Channel is empty (or unbuffered) - copy directly. + */ + if(s && r && c->nbuf == 0){ + amove(r->v, s->v, c->elemsize); + return; + } + + /* + * Otherwise it's always okay to receive and then send. + */ + if(r){ + cp = c->buf + c->off*c->elemsize; + amove(r->v, cp, c->elemsize); + --c->nbuf; + if(++c->off == c->bufsize) + c->off = 0; + } + if(s){ + cp = c->buf + (c->off+c->nbuf)%c->bufsize*c->elemsize; + amove(cp, s->v, c->elemsize); + ++c->nbuf; + } +} + +static void +altexec(Alt *a) +{ + int i; + _Altarray *ar; + Alt *other; + Channel *c; + + c = a->c; + ar = chanarray(c, otherop(a->op)); + if(ar && ar->n){ + i = rand()%ar->n; + other = ar->a[i]; + altcopy(a, other); + altalldequeue(other->xalt); + other->xalt[0].xalt = other; + _threadready(other->thread); + }else + altcopy(a, nil); +} + +#define dbgalt 0 +int +chanalt(Alt *a) +{ + int i, j, ncan, n, canblock; + Channel *c; + _Thread *t; + + for(i=0; a[i].op != CHANEND && a[i].op != CHANNOBLK; i++) + ; + n = i; + canblock = a[i].op == CHANEND; + + t = proc()->thread; + for(i=0; iname) print("%s", c->name); else print("%p", c); + if(altcanexec(&a[i])){ +if(dbgalt) print("*"); + ncan++; + } + } + if(ncan){ + j = rand()%ncan; + for(i=0; i %c:", "esrnb"[a[i].op]); +if(c->name) print("%s", c->name); else print("%p", c); +print("\n"); +} + altexec(&a[i]); + qunlock(&chanlock); + return i; + } + } + } + } +if(dbgalt)print("\n"); + + if(!canblock){ + qunlock(&chanlock); + return -1; + } + + for(i=0; i +#include +#include +#include "ioproc.h" + +enum +{ + STACK = 32768, +}; + +void +iointerrupt(Ioproc *io) +{ + if(!io->inuse) + return; + fprint(2, "bug: cannot iointerrupt yet\n"); +} + +static void +xioproc(void *a) +{ + Ioproc *io, *x; + io = a; + /* + * first recvp acquires the ioproc. + * second tells us that the data is ready. + */ + for(;;){ + while(recv(io->c, &x) == -1) + ; + if(x == 0) /* our cue to leave */ + break; + assert(x == io); + + /* caller is now committed -- even if interrupted he'll return */ + while(recv(io->creply, &x) == -1) + ; + if(x == 0) /* caller backed out */ + continue; + assert(x == io); + + io->ret = io->op(&io->arg); + if(io->ret < 0) + rerrstr(io->err, sizeof io->err); + while(send(io->creply, &io) == -1) + ; + while(recv(io->creply, &x) == -1) + ; + } +} + +Ioproc* +ioproc(void) +{ + Ioproc *io; + + io = mallocz(sizeof(*io), 1); + if(io == nil) + sysfatal("ioproc malloc: %r"); + io->c = chancreate(sizeof(void*), 0); + chansetname(io->c, "ioc%p", io->c); + io->creply = chancreate(sizeof(void*), 0); + chansetname(io->creply, "ior%p", io->c); + io->tid = proccreate(xioproc, io, STACK); + return io; +} + +void +closeioproc(Ioproc *io) +{ + if(io == nil) + return; + iointerrupt(io); + while(send(io->c, 0) == -1) + ; + chanfree(io->c); + chanfree(io->creply); + free(io); +} + +long +iocall(Ioproc *io, long (*op)(va_list*), ...) +{ + char e[ERRMAX]; + int ret, inted; + Ioproc *msg; + + if(send(io->c, &io) == -1){ + werrstr("interrupted"); + return -1; + } + assert(!io->inuse); + io->inuse = 1; + io->op = op; + va_start(io->arg, op); + msg = io; + inted = 0; + while(send(io->creply, &msg) == -1){ + msg = nil; + inted = 1; + } + if(inted){ + werrstr("interrupted"); + return -1; + } + + /* + * If we get interrupted, we have stick around so that + * the IO proc has someone to talk to. Send it an interrupt + * and try again. + */ + inted = 0; + while(recv(io->creply, nil) == -1){ + inted = 1; + iointerrupt(io); + } + USED(inted); + va_end(io->arg); + ret = io->ret; + if(ret < 0) + strecpy(e, e+sizeof e, io->err); + io->inuse = 0; + + /* release resources */ + while(send(io->creply, &io) == -1) + ; + if(ret < 0) + errstr(e, sizeof e); + return ret; +} diff --git a/src/libthread/ioproc.h b/src/libthread/ioproc.h new file mode 100644 index 00000000..f3a488d3 --- /dev/null +++ b/src/libthread/ioproc.h @@ -0,0 +1,14 @@ +#define ioproc_arg(io, type) (va_arg((io)->arg, type)) + +struct Ioproc +{ + int tid; + Channel *c, *creply; + int inuse; + long (*op)(va_list*); + va_list arg; + long ret; + char err[ERRMAX]; + Ioproc *next; +}; + diff --git a/src/libthread/mkfile b/src/libthread/mkfile new file mode 100644 index 00000000..98093788 --- /dev/null +++ b/src/libthread/mkfile @@ -0,0 +1,39 @@ +<$PLAN9/src/mkhdr + +LIB=libthread.a +OFILES=\ + channel.$O\ + exec.$O\ + ioproc.$O\ + iorw.$O\ + pthread.$O\ + qlock.$O\ + ref.$O\ + thread.$O\ + +<$PLAN9/src/mksyslib + +HFILES=thread.h threadimpl.h + +tprimes: tprimes.$O + 9l -o $target $target.$O $PLAN9/lib/$LIB -l9 -lpthread +tspawn: tspawn.$O + 9l -o $target $target.$O $PLAN9/lib/$LIB -l9 -lpthread +tspawnloop: tspawnloop.$O + 9l -o $target $target.$O $PLAN9/lib/$LIB -l9 -lpthread + +%.$O: %.c + 9c -I. $stem.c + +test:V: tprimes tspawn + primes 1 10007 >p1.txt + $PLAN9/bin/time ./tprimes 10000 >tp1.txt + cmp p1.txt tp1.txt + primes 1 1009 >p2.txt + $PLAN9/bin/time ./tprimes 1000 >tp2.txt + cmp p2.txt tp2.txt + echo tspawn should take 3 seconds, not 6 + $PLAN9/bin/time ./tspawn sleep 3 >/dev/null + +CLEANFILES=p1.txt p2.txt tp1.txt tp2.txt + diff --git a/src/libthread/pthread.c b/src/libthread/pthread.c new file mode 100644 index 00000000..77f97a4f --- /dev/null +++ b/src/libthread/pthread.c @@ -0,0 +1,108 @@ +#include "u.h" +#include +#include "libc.h" +#include "thread.h" +#include "threadimpl.h" + +static pthread_mutex_t initmutex = PTHREAD_MUTEX_INITIALIZER; + +static void +lockinit(Lock *lk) +{ + pthread_mutexattr_t attr; + + pthread_mutex_lock(&initmutex); + if(lk->init == 0){ + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL); + pthread_mutex_init(&lk->mutex, &attr); + pthread_mutexattr_destroy(&attr); + lk->init = 1; + } + pthread_mutex_unlock(&initmutex); +} + +int +_threadlock(Lock *lk, int block, ulong pc) +{ + int r; + + if(!lk->init) + lockinit(lk); + if(block){ + if(pthread_mutex_lock(&lk->mutex) != 0) + abort(); + return 1; + }else{ + r = pthread_mutex_trylock(&lk->mutex); + if(r == 0) + return 1; + if(r == EBUSY) + return 0; + abort(); + return 0; + } +} + +void +_threadunlock(Lock *lk, ulong pc) +{ + if(pthread_mutex_unlock(&lk->mutex) != 0) + abort(); +} + +void +_procsleep(_Procrendez *r) +{ + /* r is protected by r->l, which we hold */ + pthread_cond_init(&r->cond, 0); + r->asleep = 1; + pthread_cond_wait(&r->cond, &r->l->mutex); + pthread_cond_destroy(&r->cond); + r->asleep = 0; +} + +void +_procwakeup(_Procrendez *r) +{ + if(r->asleep){ + r->asleep = 0; + pthread_cond_signal(&r->cond); + } +} + +void +_procstart(Proc *p, void (*fn)(void*)) +{ +//print("pc\n"); + if(pthread_create(&p->tid, nil, (void*(*)(void*))fn, p) < 0){ +//print("pc1\n"); + fprint(2, "pthread_create: %r\n"); + abort(); + } +//print("pc2\n"); +} + +static pthread_key_t prockey; + +Proc* +_threadproc(void) +{ + Proc *p; + + p = pthread_getspecific(prockey); + return p; +} + +void +_threadsetproc(Proc *p) +{ + pthread_setspecific(prockey, p); +} + +void +pthreadinit(void) +{ + pthread_key_create(&prockey, 0); +} + diff --git a/src/libthread/ref.c b/src/libthread/ref.c new file mode 100644 index 00000000..30c932ed --- /dev/null +++ b/src/libthread/ref.c @@ -0,0 +1,27 @@ +#include "u.h" +#include "libc.h" +#include "thread.h" + +static long +refadd(Ref *r, long a) +{ + long ref; + + lock(&r->lock); + r->ref += a; + ref = r->ref; + unlock(&r->lock); + return ref; +} + +long +incref(Ref *r) +{ + return refadd(r, 1); +} + +long +decref(Ref *r) +{ + return refadd(r, -1); +} diff --git a/src/libthread/threadimpl.h b/src/libthread/threadimpl.h new file mode 100644 index 00000000..9f70b0e0 --- /dev/null +++ b/src/libthread/threadimpl.h @@ -0,0 +1,70 @@ +#include + +typedef struct Context Context; +typedef struct Proc Proc; +typedef struct _Procrendez _Procrendez; + +enum +{ + STACK = 8192 +}; + +struct Context +{ + ucontext_t uc; +}; + +struct _Thread +{ + _Thread *next; + _Thread *prev; + _Thread *allnext; + _Thread *allprev; + Context context; + uint id; + uchar *stk; + uint stksize; + int exiting; + void (*startfn)(void*); + void *startarg; + Proc *proc; + char name[256]; + char state[256]; +}; + +struct _Procrendez +{ + Lock *l; + int asleep; + pthread_cond_t cond; +}; + +extern void _procsleep(_Procrendez*); +extern void _procwakeup(_Procrendez*); + +struct Proc +{ + pthread_t tid; + Lock lock; + _Thread *thread; + _Threadlist runqueue; + _Threadlist allthreads; + uint nthread; + uint sysproc; + _Procrendez runrend; + Context schedcontext; + void *udata; +}; + +extern Proc *xxx; +#define proc() _threadproc() +#define setproc(p) _threadsetproc(p) + +extern void _procstart(Proc*, void (*fn)(void*)); +extern _Thread *_threadcreate(Proc*, void(*fn)(void*), void*, uint); +extern void _threadexit(void); +extern Proc *_threadproc(void); +extern void _threadsetproc(Proc*); +extern int _threadlock(Lock*, int, ulong); +extern void _threadunlock(Lock*, ulong); + -- cgit v1.2.3