aboutsummaryrefslogtreecommitdiff
path: root/src/libthread/channel.c
diff options
context:
space:
mode:
authorrsc <devnull@localhost>2004-12-25 21:56:33 +0000
committerrsc <devnull@localhost>2004-12-25 21:56:33 +0000
commit1544f90960275dc9211bde30329c3258e0e1bf38 (patch)
treef55e7a73c03aaa24daa7cc2ad02822b921c477f9 /src/libthread/channel.c
parent7788fd54094693384ef5c92c475656dba8819feb (diff)
downloadplan9port-1544f90960275dc9211bde30329c3258e0e1bf38.tar.gz
plan9port-1544f90960275dc9211bde30329c3258e0e1bf38.tar.bz2
plan9port-1544f90960275dc9211bde30329c3258e0e1bf38.zip
New thread library
Diffstat (limited to 'src/libthread/channel.c')
-rw-r--r--src/libthread/channel.c500
1 files changed, 0 insertions, 500 deletions
diff --git a/src/libthread/channel.c b/src/libthread/channel.c
deleted file mode 100644
index b488dc97..00000000
--- a/src/libthread/channel.c
+++ /dev/null
@@ -1,500 +0,0 @@
-#include "threadimpl.h"
-
-static Lock chanlock; /* central channel access lock */
-
-static void enqueue(Alt*, Thread*);
-static void dequeue(Alt*);
-static int altexec(Alt*);
-
-int _threadhighnentry;
-int _threadnalt;
-
-static void
-setuserpc(ulong pc)
-{
- Thread *t;
-
- t = _threadgetproc()->thread;
- if(t)
- t->userpc = pc;
-}
-
-static int
-canexec(Alt *a)
-{
- int i, otherop;
- Channel *c;
-
- c = a->c;
- /* are there senders or receivers blocked? */
- otherop = (CHANSND+CHANRCV) - a->op;
- for(i=0; i<c->nentry; i++)
- if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread->altc==nil){
- _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
- return 1;
- }
-
- /* is there room in the channel? */
- if((a->op==CHANSND && c->n < c->s)
- || (a->op==CHANRCV && c->n > 0)){
- _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
- return 1;
- }
-
- return 0;
-}
-
-static void
-_chanfree(Channel *c)
-{
- int i, inuse;
-
- inuse = 0;
- for(i = 0; i < c->nentry; i++)
- if(c->qentry[i])
- inuse = 1;
- if(inuse)
- c->freed = 1;
- else{
- if(c->qentry)
- free(c->qentry);
- free(c);
- }
-}
-
-void
-chanfree(Channel *c)
-{
- lock(&chanlock);
- _chanfree(c);
- unlock(&chanlock);
-}
-
-int
-chaninit(Channel *c, int elemsize, int elemcnt)
-{
- if(elemcnt < 0 || elemsize <= 0 || c == nil)
- return -1;
- memset(c, 0, sizeof *c);
- c->e = elemsize;
- c->s = elemcnt;
- _threaddebug(DBGCHAN, "chaninit %p", c);
- return 1;
-}
-
-Channel*
-chancreate(int elemsize, int elemcnt)
-{
- Channel *c;
-
- if(elemcnt < 0 || elemsize <= 0)
- return nil;
- c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
- c->e = elemsize;
- c->s = elemcnt;
- _threaddebug(DBGCHAN, "chancreate %p", c);
- return c;
-}
-
-static int
-_alt(Alt *alts)
-{
- Alt *a, *xa;
- Channel *c;
- int n;
- Thread *t;
-
- /*
- * The point of going splhi here is that note handlers
- * might reasonably want to use channel operations,
- * but that will hang if the note comes while we hold the
- * chanlock. Instead, we delay the note until we've dropped
- * the lock.
- */
-
- /*
- * T might be nil here -- the scheduler sends on threadwaitchan
- * directly (in non-blocking mode, of course!).
- */
- t = _threadgetproc()->thread;
- if((t && t->moribund) || _threadexitsallstatus)
- yield(); /* won't return */
- lock(&chanlock);
-
- /* test whether any channels can proceed */
- n = 0;
- a = nil;
-
- for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
- xa->entryno = -1;
- if(xa->op == CHANNOP)
- continue;
-
- c = xa->c;
- if(c==nil){
- unlock(&chanlock);
- return -1;
- }
- if(canexec(xa))
- if(nrand(++n) == 0)
- a = xa;
- }
-
- if(a==nil){
- /* nothing can proceed */
- if(xa->op == CHANNOBLK){
- unlock(&chanlock);
- _threadnalt++;
- return xa - alts;
- }
-
- /* enqueue on all channels. */
- t->altc = nil;
- for(xa=alts; xa->op!=CHANEND; xa++){
- if(xa->op==CHANNOP)
- continue;
- enqueue(xa, t);
- }
-
- /*
- * wait for successful rendezvous.
- * we can't just give up if the rendezvous
- * is interrupted -- someone else might come
- * along and try to rendezvous with us, so
- * we need to be here.
- *
- * actually, now we're assuming no interrupts.
- */
- /*Again:*/
- t->alt = alts;
- t->chan = Chanalt;
- t->altrend.l = &chanlock;
- _threadsleep(&t->altrend);
-
- /* dequeue from channels, find selected one */
- a = nil;
- c = t->altc;
- for(xa=alts; xa->op!=CHANEND; xa++){
- if(xa->op==CHANNOP)
- continue;
- if(xa->c == c)
- a = xa;
- dequeue(xa);
- }
- unlock(&chanlock);
- if(a == nil){ /* we were interrupted */
- assert(c==(Channel*)~0);
- return -1;
- }
- }else{
- altexec(a); /* unlocks chanlock, does splx */
- }
- if(t)
- t->chan = Channone;
- return a - alts;
-}
-
-int
-alt(Alt *alts)
-{
- setuserpc(getcallerpc(&alts));
- return _alt(alts);
-}
-
-static int
-runop(int op, Channel *c, void *v, int nb)
-{
- int r;
- Alt a[2];
-
- /*
- * we could do this without calling alt,
- * but the only reason would be performance,
- * and i'm not convinced it matters.
- */
- a[0].op = op;
- a[0].c = c;
- a[0].v = v;
- a[1].op = CHANEND;
- if(nb)
- a[1].op = CHANNOBLK;
- switch(r=_alt(a)){
- case -1: /* interrupted */
- return -1;
- case 1: /* nonblocking, didn't accomplish anything */
- assert(nb);
- return 0;
- case 0:
- return 1;
- default:
- fprint(2, "ERROR: channel alt returned %d\n", r);
- abort();
- return -1;
- }
-}
-
-int
-recv(Channel *c, void *v)
-{
- setuserpc(getcallerpc(&c));
- return runop(CHANRCV, c, v, 0);
-}
-
-int
-nbrecv(Channel *c, void *v)
-{
- setuserpc(getcallerpc(&c));
- return runop(CHANRCV, c, v, 1);
-}
-
-int
-send(Channel *c, void *v)
-{
- setuserpc(getcallerpc(&c));
- return runop(CHANSND, c, v, 0);
-}
-
-int
-nbsend(Channel *c, void *v)
-{
- setuserpc(getcallerpc(&c));
- return runop(CHANSND, c, v, 1);
-}
-
-static void
-channelsize(Channel *c, int sz)
-{
- if(c->e != sz){
- fprint(2, "expected channel with elements of size %d, got size %d\n",
- sz, c->e);
- abort();
- }
-}
-
-int
-sendul(Channel *c, ulong v)
-{
- setuserpc(getcallerpc(&c));
- channelsize(c, sizeof(ulong));
- return send(c, &v);
-}
-
-ulong
-recvul(Channel *c)
-{
- ulong v;
-
- setuserpc(getcallerpc(&c));
- channelsize(c, sizeof(ulong));
- if(runop(CHANRCV, c, &v, 0) < 0)
- return ~0;
- return v;
-}
-
-int
-sendp(Channel *c, void *v)
-{
- setuserpc(getcallerpc(&c));
- channelsize(c, sizeof(void*));
- return runop(CHANSND, c, &v, 0);
-}
-
-void*
-recvp(Channel *c)
-{
- void *v;
-
- setuserpc(getcallerpc(&c));
- channelsize(c, sizeof(void*));
- if(runop(CHANRCV, c, &v, 0) < 0)
- return nil;
- return v;
-}
-
-int
-nbsendul(Channel *c, ulong v)
-{
- setuserpc(getcallerpc(&c));
- channelsize(c, sizeof(ulong));
- return runop(CHANSND, c, &v, 1);
-}
-
-ulong
-nbrecvul(Channel *c)
-{
- ulong v;
-
- setuserpc(getcallerpc(&c));
- channelsize(c, sizeof(ulong));
- if(runop(CHANRCV, c, &v, 1) == 0)
- return 0;
- return v;
-}
-
-int
-nbsendp(Channel *c, void *v)
-{
- setuserpc(getcallerpc(&c));
- channelsize(c, sizeof(void*));
- return runop(CHANSND, c, &v, 1);
-}
-
-void*
-nbrecvp(Channel *c)
-{
- void *v;
-
- setuserpc(getcallerpc(&c));
- channelsize(c, sizeof(void*));
- if(runop(CHANRCV, c, &v, 1) == 0)
- return nil;
- return v;
-}
-
-static int
-emptyentry(Channel *c)
-{
- int i, extra;
-
- assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
-
- for(i=0; i<c->nentry; i++)
- if(c->qentry[i]==nil)
- return i;
-
- extra = 16;
- c->nentry += extra;
-if(c->nentry > _threadhighnentry) _threadhighnentry = c->nentry;
- c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
- if(c->qentry == nil)
- sysfatal("realloc channel entries: %r");
- _threadmemset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
- return i;
-}
-
-static void
-enqueue(Alt *a, Thread *t)
-{
- int i;
-
- _threaddebug(DBGCHAN, "Queueing alt %p on channel %p", a, a->c);
- a->thread = t;
- i = emptyentry(a->c);
- a->c->qentry[i] = a;
-}
-
-static void
-dequeue(Alt *a)
-{
- int i;
- Channel *c;
-
- c = a->c;
- for(i=0; i<c->nentry; i++)
- if(c->qentry[i]==a){
- _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
- c->qentry[i] = nil;
- if(c->freed)
- _chanfree(c);
- return;
- }
-}
-
-static void*
-altexecbuffered(Alt *a, int willreplace)
-{
- uchar *v;
- Channel *c;
-
- c = a->c;
- /* use buffered channel queue */
- if(a->op==CHANRCV && c->n > 0){
- _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
- v = c->v + c->e*(c->f%c->s);
- if(!willreplace)
- c->n--;
- c->f++;
- return v;
- }
- if(a->op==CHANSND && c->n < c->s){
- _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
- v = c->v + c->e*((c->f+c->n)%c->s);
- if(!willreplace)
- c->n++;
- return v;
- }
- abort();
- return nil;
-}
-
-static void
-altcopy(void *dst, void *src, int sz)
-{
- if(dst){
- if(src)
- memmove(dst, src, sz);
- else
- _threadmemset(dst, 0, sz);
- }
-}
-
-static int
-altexec(Alt *a)
-{
- volatile Alt *b;
- int i, n, otherop;
- Channel *c;
- void *me, *waiter, *buf;
-
- c = a->c;
-
- /* rendezvous with others */
- otherop = (CHANSND+CHANRCV) - a->op;
- n = 0;
- b = nil;
- me = a->v;
- for(i=0; i<c->nentry; i++)
- if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread->altc==nil)
- if(nrand(++n) == 0)
- b = c->qentry[i];
- if(b != nil){
- _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
- waiter = b->v;
- if(c->s && c->n){
- /*
- * if buffer is full and there are waiters
- * and we're meeting a waiter,
- * we must be receiving.
- *
- * we use the value in the channel buffer,
- * copy the waiter's value into the channel buffer
- * on behalf of the waiter, and then wake the waiter.
- */
- if(a->op!=CHANRCV)
- abort();
- buf = altexecbuffered(a, 1);
- altcopy(me, buf, c->e);
- altcopy(buf, waiter, c->e);
- }else{
- if(a->op==CHANRCV)
- altcopy(me, waiter, c->e);
- else
- altcopy(waiter, me, c->e);
- }
- b->thread->altc = c;
- _threadwakeup(&b->thread->altrend);
- _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)(void*)&chanlock);
- _threaddebug(DBGCHAN, "unlocking the chanlock");
- unlock(&chanlock);
- return 1;
- }
-
- buf = altexecbuffered(a, 0);
- if(a->op==CHANRCV)
- altcopy(me, buf, c->e);
- else
- altcopy(buf, me, c->e);
-
- unlock(&chanlock);
- return 1;
-}