aboutsummaryrefslogtreecommitdiff
path: root/src/libthread
diff options
context:
space:
mode:
Diffstat (limited to 'src/libthread')
-rw-r--r--src/libthread/channel.c412
-rw-r--r--src/libthread/ioproc.c130
-rw-r--r--src/libthread/ioproc.h14
-rw-r--r--src/libthread/mkfile39
-rw-r--r--src/libthread/pthread.c108
-rw-r--r--src/libthread/ref.c27
-rw-r--r--src/libthread/threadimpl.h70
7 files changed, 800 insertions, 0 deletions
diff --git a/src/libthread/channel.c b/src/libthread/channel.c
new file mode 100644
index 00000000..3c9614e6
--- /dev/null
+++ b/src/libthread/channel.c
@@ -0,0 +1,412 @@
+#include "u.h"
+#include "libc.h"
+#include "thread.h"
+#include "threadimpl.h"
+
+/*
+ * One can go through a lot of effort to avoid this global lock.
+ * You have to put locks in all the channels and all the Alt
+ * structures. At the beginning of an alt you have to lock all
+ * the channels, but then to try to actually exec an op you
+ * have to lock the other guy's alt structure, so that other
+ * people aren't trying to use him in some other op at the
+ * same time.
+ *
+ * For Plan 9 apps, it's just not worth the extra effort.
+ */
+static QLock chanlock;
+
+Channel*
+chancreate(int elemsize, int bufsize)
+{
+ Channel *c;
+
+ c = malloc(sizeof *c+bufsize*elemsize);
+ memset(c, 0, sizeof *c);
+ c->elemsize = elemsize;
+ c->bufsize = bufsize;
+ c->nbuf = 0;
+ c->buf = (uchar*)(c+1);
+ return c;
+}
+
+void
+chansetname(Channel *c, char *fmt, ...)
+{
+ char *name;
+ va_list arg;
+
+ va_start(arg, fmt);
+ name = vsmprint(fmt, arg);
+ va_end(arg);
+ free(c->name);
+ c->name = name;
+}
+
+/* bug - work out races */
+void
+chanfree(Channel *c)
+{
+ if(c == nil)
+ return;
+ free(c->name);
+ free(c->arecv.a);
+ free(c->asend.a);
+ free(c);
+}
+
+static void
+addarray(_Altarray *a, Alt *alt)
+{
+ if(a->n == a->m){
+ a->m += 16;
+ a->a = realloc(a->a, a->m*sizeof a->a[0]);
+ }
+ a->a[a->n++] = alt;
+}
+
+static void
+delarray(_Altarray *a, int i)
+{
+ --a->n;
+ a->a[i] = a->a[a->n];
+}
+
+/*
+ * doesn't really work for things other than CHANSND and CHANRCV
+ * but is only used as arg to chanarray, which can handle it
+ */
+#define otherop(op) (CHANSND+CHANRCV-(op))
+
+static _Altarray*
+chanarray(Channel *c, uint op)
+{
+ switch(op){
+ default:
+ return nil;
+ case CHANSND:
+ return &c->asend;
+ case CHANRCV:
+ return &c->arecv;
+ }
+}
+
+static int
+altcanexec(Alt *a)
+{
+ _Altarray *ar;
+ Channel *c;
+
+ if(a->op == CHANNOP)
+ return 0;
+ c = a->c;
+ if(c->bufsize == 0){
+ ar = chanarray(c, otherop(a->op));
+ return ar && ar->n;
+ }else{
+ switch(a->op){
+ default:
+ return 0;
+ case CHANSND:
+ return c->nbuf < c->bufsize;
+ case CHANRCV:
+ return c->nbuf > 0;
+ }
+ }
+}
+
+static void
+altqueue(Alt *a)
+{
+ _Altarray *ar;
+
+ ar = chanarray(a->c, a->op);
+ addarray(ar, a);
+}
+
+static void
+altdequeue(Alt *a)
+{
+ int i;
+ _Altarray *ar;
+
+ ar = chanarray(a->c, a->op);
+ if(ar == nil){
+ fprint(2, "bad use of altdequeue op=%d\n", a->op);
+ abort();
+ }
+
+ for(i=0; i<ar->n; i++)
+ if(ar->a[i] == a){
+ delarray(ar, i);
+ return;
+ }
+ fprint(2, "cannot find self in altdq\n");
+ abort();
+}
+
+static void
+altalldequeue(Alt *a)
+{
+ int i;
+
+ for(i=0; a[i].op!=CHANEND && a[i].op!=CHANNOBLK; i++)
+ if(a[i].op != CHANNOP)
+ altdequeue(&a[i]);
+}
+
+static void
+amove(void *dst, void *src, uint n)
+{
+ if(dst){
+ if(src == nil)
+ memset(dst, 0, n);
+ else
+ memmove(dst, src, n);
+ }
+}
+
+/*
+ * Actually move the data around. There are up to three
+ * players: the sender, the receiver, and the channel itself.
+ * If the channel is unbuffered or the buffer is empty,
+ * data goes from sender to receiver. If the channel is full,
+ * the receiver removes some from the channel and the sender
+ * gets to put some in.
+ */
+static void
+altcopy(Alt *s, Alt *r)
+{
+ Alt *t;
+ Channel *c;
+ uchar *cp;
+
+ /*
+ * Work out who is sender and who is receiver
+ */
+ if(s == nil && r == nil)
+ return;
+ assert(s != nil);
+ c = s->c;
+ if(s->op == CHANRCV){
+ t = s;
+ s = r;
+ r = t;
+ }
+ assert(s==nil || s->op == CHANSND);
+ assert(r==nil || r->op == CHANRCV);
+
+ /*
+ * Channel is empty (or unbuffered) - copy directly.
+ */
+ if(s && r && c->nbuf == 0){
+ amove(r->v, s->v, c->elemsize);
+ return;
+ }
+
+ /*
+ * Otherwise it's always okay to receive and then send.
+ */
+ if(r){
+ cp = c->buf + c->off*c->elemsize;
+ amove(r->v, cp, c->elemsize);
+ --c->nbuf;
+ if(++c->off == c->bufsize)
+ c->off = 0;
+ }
+ if(s){
+ cp = c->buf + (c->off+c->nbuf)%c->bufsize*c->elemsize;
+ amove(cp, s->v, c->elemsize);
+ ++c->nbuf;
+ }
+}
+
+static void
+altexec(Alt *a)
+{
+ int i;
+ _Altarray *ar;
+ Alt *other;
+ Channel *c;
+
+ c = a->c;
+ ar = chanarray(c, otherop(a->op));
+ if(ar && ar->n){
+ i = rand()%ar->n;
+ other = ar->a[i];
+ altcopy(a, other);
+ altalldequeue(other->xalt);
+ other->xalt[0].xalt = other;
+ _threadready(other->thread);
+ }else
+ altcopy(a, nil);
+}
+
+#define dbgalt 0
+int
+chanalt(Alt *a)
+{
+ int i, j, ncan, n, canblock;
+ Channel *c;
+ _Thread *t;
+
+ for(i=0; a[i].op != CHANEND && a[i].op != CHANNOBLK; i++)
+ ;
+ n = i;
+ canblock = a[i].op == CHANEND;
+
+ t = proc()->thread;
+ for(i=0; i<n; i++){
+ a[i].thread = t;
+ a[i].xalt = a;
+ }
+ qlock(&chanlock);
+if(dbgalt) print("alt ");
+ ncan = 0;
+ for(i=0; i<n; i++){
+ c = a[i].c;
+if(dbgalt) print(" %c:", "esrnb"[a[i].op]);
+if(dbgalt) if(c->name) print("%s", c->name); else print("%p", c);
+ if(altcanexec(&a[i])){
+if(dbgalt) print("*");
+ ncan++;
+ }
+ }
+ if(ncan){
+ j = rand()%ncan;
+ for(i=0; i<n; i++){
+ if(altcanexec(&a[i])){
+ if(j-- == 0){
+if(dbgalt){
+c = a[i].c;
+print(" => %c:", "esrnb"[a[i].op]);
+if(c->name) print("%s", c->name); else print("%p", c);
+print("\n");
+}
+ altexec(&a[i]);
+ qunlock(&chanlock);
+ return i;
+ }
+ }
+ }
+ }
+if(dbgalt)print("\n");
+
+ if(!canblock){
+ qunlock(&chanlock);
+ return -1;
+ }
+
+ for(i=0; i<n; i++){
+ if(a[i].op != CHANNOP)
+ altqueue(&a[i]);
+ }
+ qunlock(&chanlock);
+
+ _threadswitch();
+
+ /*
+ * the guy who ran the op took care of dequeueing us
+ * and then set a[0].alt to the one that was executed.
+ */
+ return a[0].xalt - a;
+}
+
+static int
+_chanop(Channel *c, int op, void *p, int canblock)
+{
+ Alt a[2];
+
+ a[0].c = c;
+ a[0].op = op;
+ a[0].v = p;
+ a[1].op = canblock ? CHANEND : CHANNOBLK;
+ if(chanalt(a) < 0)
+ return -1;
+ return 1;
+}
+
+int
+chansend(Channel *c, void *v)
+{
+ return _chanop(c, CHANSND, v, 1);
+}
+
+int
+channbsend(Channel *c, void *v)
+{
+ return _chanop(c, CHANSND, v, 0);
+}
+
+int
+chanrecv(Channel *c, void *v)
+{
+ return _chanop(c, CHANRCV, v, 1);
+}
+
+int
+channbrecv(Channel *c, void *v)
+{
+ return _chanop(c, CHANRCV, v, 0);
+}
+
+int
+chansendp(Channel *c, void *v)
+{
+ return _chanop(c, CHANSND, (void*)&v, 1);
+}
+
+void*
+chanrecvp(Channel *c)
+{
+ void *v;
+
+ _chanop(c, CHANRCV, (void*)&v, 1);
+ return v;
+}
+
+int
+channbsendp(Channel *c, void *v)
+{
+ return _chanop(c, CHANSND, (void*)&v, 0);
+}
+
+void*
+channbrecvp(Channel *c)
+{
+ void *v;
+
+ _chanop(c, CHANRCV, (void*)&v, 0);
+ return v;
+}
+
+int
+chansendul(Channel *c, ulong val)
+{
+ return _chanop(c, CHANSND, &val, 1);
+}
+
+ulong
+chanrecvul(Channel *c)
+{
+ ulong val;
+
+ _chanop(c, CHANRCV, &val, 1);
+ return val;
+}
+
+int
+channbsendul(Channel *c, ulong val)
+{
+ return _chanop(c, CHANSND, &val, 0);
+}
+
+ulong
+channbrecvul(Channel *c)
+{
+ ulong val;
+
+ _chanop(c, CHANRCV, &val, 0);
+ return val;
+}
+
diff --git a/src/libthread/ioproc.c b/src/libthread/ioproc.c
new file mode 100644
index 00000000..2296690f
--- /dev/null
+++ b/src/libthread/ioproc.c
@@ -0,0 +1,130 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+#include "ioproc.h"
+
+enum
+{
+ STACK = 32768,
+};
+
+void
+iointerrupt(Ioproc *io)
+{
+ if(!io->inuse)
+ return;
+ fprint(2, "bug: cannot iointerrupt yet\n");
+}
+
+static void
+xioproc(void *a)
+{
+ Ioproc *io, *x;
+ io = a;
+ /*
+ * first recvp acquires the ioproc.
+ * second tells us that the data is ready.
+ */
+ for(;;){
+ while(recv(io->c, &x) == -1)
+ ;
+ if(x == 0) /* our cue to leave */
+ break;
+ assert(x == io);
+
+ /* caller is now committed -- even if interrupted he'll return */
+ while(recv(io->creply, &x) == -1)
+ ;
+ if(x == 0) /* caller backed out */
+ continue;
+ assert(x == io);
+
+ io->ret = io->op(&io->arg);
+ if(io->ret < 0)
+ rerrstr(io->err, sizeof io->err);
+ while(send(io->creply, &io) == -1)
+ ;
+ while(recv(io->creply, &x) == -1)
+ ;
+ }
+}
+
+Ioproc*
+ioproc(void)
+{
+ Ioproc *io;
+
+ io = mallocz(sizeof(*io), 1);
+ if(io == nil)
+ sysfatal("ioproc malloc: %r");
+ io->c = chancreate(sizeof(void*), 0);
+ chansetname(io->c, "ioc%p", io->c);
+ io->creply = chancreate(sizeof(void*), 0);
+ chansetname(io->creply, "ior%p", io->c);
+ io->tid = proccreate(xioproc, io, STACK);
+ return io;
+}
+
+void
+closeioproc(Ioproc *io)
+{
+ if(io == nil)
+ return;
+ iointerrupt(io);
+ while(send(io->c, 0) == -1)
+ ;
+ chanfree(io->c);
+ chanfree(io->creply);
+ free(io);
+}
+
+long
+iocall(Ioproc *io, long (*op)(va_list*), ...)
+{
+ char e[ERRMAX];
+ int ret, inted;
+ Ioproc *msg;
+
+ if(send(io->c, &io) == -1){
+ werrstr("interrupted");
+ return -1;
+ }
+ assert(!io->inuse);
+ io->inuse = 1;
+ io->op = op;
+ va_start(io->arg, op);
+ msg = io;
+ inted = 0;
+ while(send(io->creply, &msg) == -1){
+ msg = nil;
+ inted = 1;
+ }
+ if(inted){
+ werrstr("interrupted");
+ return -1;
+ }
+
+ /*
+ * If we get interrupted, we have stick around so that
+ * the IO proc has someone to talk to. Send it an interrupt
+ * and try again.
+ */
+ inted = 0;
+ while(recv(io->creply, nil) == -1){
+ inted = 1;
+ iointerrupt(io);
+ }
+ USED(inted);
+ va_end(io->arg);
+ ret = io->ret;
+ if(ret < 0)
+ strecpy(e, e+sizeof e, io->err);
+ io->inuse = 0;
+
+ /* release resources */
+ while(send(io->creply, &io) == -1)
+ ;
+ if(ret < 0)
+ errstr(e, sizeof e);
+ return ret;
+}
diff --git a/src/libthread/ioproc.h b/src/libthread/ioproc.h
new file mode 100644
index 00000000..f3a488d3
--- /dev/null
+++ b/src/libthread/ioproc.h
@@ -0,0 +1,14 @@
+#define ioproc_arg(io, type) (va_arg((io)->arg, type))
+
+struct Ioproc
+{
+ int tid;
+ Channel *c, *creply;
+ int inuse;
+ long (*op)(va_list*);
+ va_list arg;
+ long ret;
+ char err[ERRMAX];
+ Ioproc *next;
+};
+
diff --git a/src/libthread/mkfile b/src/libthread/mkfile
new file mode 100644
index 00000000..98093788
--- /dev/null
+++ b/src/libthread/mkfile
@@ -0,0 +1,39 @@
+<$PLAN9/src/mkhdr
+
+LIB=libthread.a
+OFILES=\
+ channel.$O\
+ exec.$O\
+ ioproc.$O\
+ iorw.$O\
+ pthread.$O\
+ qlock.$O\
+ ref.$O\
+ thread.$O\
+
+<$PLAN9/src/mksyslib
+
+HFILES=thread.h threadimpl.h
+
+tprimes: tprimes.$O
+ 9l -o $target $target.$O $PLAN9/lib/$LIB -l9 -lpthread
+tspawn: tspawn.$O
+ 9l -o $target $target.$O $PLAN9/lib/$LIB -l9 -lpthread
+tspawnloop: tspawnloop.$O
+ 9l -o $target $target.$O $PLAN9/lib/$LIB -l9 -lpthread
+
+%.$O: %.c
+ 9c -I. $stem.c
+
+test:V: tprimes tspawn
+ primes 1 10007 >p1.txt
+ $PLAN9/bin/time ./tprimes 10000 >tp1.txt
+ cmp p1.txt tp1.txt
+ primes 1 1009 >p2.txt
+ $PLAN9/bin/time ./tprimes 1000 >tp2.txt
+ cmp p2.txt tp2.txt
+ echo tspawn should take 3 seconds, not 6
+ $PLAN9/bin/time ./tspawn sleep 3 >/dev/null
+
+CLEANFILES=p1.txt p2.txt tp1.txt tp2.txt
+
diff --git a/src/libthread/pthread.c b/src/libthread/pthread.c
new file mode 100644
index 00000000..77f97a4f
--- /dev/null
+++ b/src/libthread/pthread.c
@@ -0,0 +1,108 @@
+#include "u.h"
+#include <errno.h>
+#include "libc.h"
+#include "thread.h"
+#include "threadimpl.h"
+
+static pthread_mutex_t initmutex = PTHREAD_MUTEX_INITIALIZER;
+
+static void
+lockinit(Lock *lk)
+{
+ pthread_mutexattr_t attr;
+
+ pthread_mutex_lock(&initmutex);
+ if(lk->init == 0){
+ pthread_mutexattr_init(&attr);
+ pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
+ pthread_mutex_init(&lk->mutex, &attr);
+ pthread_mutexattr_destroy(&attr);
+ lk->init = 1;
+ }
+ pthread_mutex_unlock(&initmutex);
+}
+
+int
+_threadlock(Lock *lk, int block, ulong pc)
+{
+ int r;
+
+ if(!lk->init)
+ lockinit(lk);
+ if(block){
+ if(pthread_mutex_lock(&lk->mutex) != 0)
+ abort();
+ return 1;
+ }else{
+ r = pthread_mutex_trylock(&lk->mutex);
+ if(r == 0)
+ return 1;
+ if(r == EBUSY)
+ return 0;
+ abort();
+ return 0;
+ }
+}
+
+void
+_threadunlock(Lock *lk, ulong pc)
+{
+ if(pthread_mutex_unlock(&lk->mutex) != 0)
+ abort();
+}
+
+void
+_procsleep(_Procrendez *r)
+{
+ /* r is protected by r->l, which we hold */
+ pthread_cond_init(&r->cond, 0);
+ r->asleep = 1;
+ pthread_cond_wait(&r->cond, &r->l->mutex);
+ pthread_cond_destroy(&r->cond);
+ r->asleep = 0;
+}
+
+void
+_procwakeup(_Procrendez *r)
+{
+ if(r->asleep){
+ r->asleep = 0;
+ pthread_cond_signal(&r->cond);
+ }
+}
+
+void
+_procstart(Proc *p, void (*fn)(void*))
+{
+//print("pc\n");
+ if(pthread_create(&p->tid, nil, (void*(*)(void*))fn, p) < 0){
+//print("pc1\n");
+ fprint(2, "pthread_create: %r\n");
+ abort();
+ }
+//print("pc2\n");
+}
+
+static pthread_key_t prockey;
+
+Proc*
+_threadproc(void)
+{
+ Proc *p;
+
+ p = pthread_getspecific(prockey);
+ return p;
+}
+
+void
+_threadsetproc(Proc *p)
+{
+ pthread_setspecific(prockey, p);
+}
+
+void
+pthreadinit(void)
+{
+ pthread_key_create(&prockey, 0);
+}
+
diff --git a/src/libthread/ref.c b/src/libthread/ref.c
new file mode 100644
index 00000000..30c932ed
--- /dev/null
+++ b/src/libthread/ref.c
@@ -0,0 +1,27 @@
+#include "u.h"
+#include "libc.h"
+#include "thread.h"
+
+static long
+refadd(Ref *r, long a)
+{
+ long ref;
+
+ lock(&r->lock);
+ r->ref += a;
+ ref = r->ref;
+ unlock(&r->lock);
+ return ref;
+}
+
+long
+incref(Ref *r)
+{
+ return refadd(r, 1);
+}
+
+long
+decref(Ref *r)
+{
+ return refadd(r, -1);
+}
diff --git a/src/libthread/threadimpl.h b/src/libthread/threadimpl.h
new file mode 100644
index 00000000..9f70b0e0
--- /dev/null
+++ b/src/libthread/threadimpl.h
@@ -0,0 +1,70 @@
+#include <ucontext.h>
+
+typedef struct Context Context;
+typedef struct Proc Proc;
+typedef struct _Procrendez _Procrendez;
+
+enum
+{
+ STACK = 8192
+};
+
+struct Context
+{
+ ucontext_t uc;
+};
+
+struct _Thread
+{
+ _Thread *next;
+ _Thread *prev;
+ _Thread *allnext;
+ _Thread *allprev;
+ Context context;
+ uint id;
+ uchar *stk;
+ uint stksize;
+ int exiting;
+ void (*startfn)(void*);
+ void *startarg;
+ Proc *proc;
+ char name[256];
+ char state[256];
+};
+
+struct _Procrendez
+{
+ Lock *l;
+ int asleep;
+ pthread_cond_t cond;
+};
+
+extern void _procsleep(_Procrendez*);
+extern void _procwakeup(_Procrendez*);
+
+struct Proc
+{
+ pthread_t tid;
+ Lock lock;
+ _Thread *thread;
+ _Threadlist runqueue;
+ _Threadlist allthreads;
+ uint nthread;
+ uint sysproc;
+ _Procrendez runrend;
+ Context schedcontext;
+ void *udata;
+};
+
+extern Proc *xxx;
+#define proc() _threadproc()
+#define setproc(p) _threadsetproc(p)
+
+extern void _procstart(Proc*, void (*fn)(void*));
+extern _Thread *_threadcreate(Proc*, void(*fn)(void*), void*, uint);
+extern void _threadexit(void);
+extern Proc *_threadproc(void);
+extern void _threadsetproc(Proc*);
+extern int _threadlock(Lock*, int, ulong);
+extern void _threadunlock(Lock*, ulong);
+