From 1544f90960275dc9211bde30329c3258e0e1bf38 Mon Sep 17 00:00:00 2001 From: rsc Date: Sat, 25 Dec 2004 21:56:33 +0000 Subject: New thread library --- src/libthread/channel.c | 500 ------------------------------------------------ 1 file changed, 500 deletions(-) delete mode 100644 src/libthread/channel.c (limited to 'src/libthread/channel.c') diff --git a/src/libthread/channel.c b/src/libthread/channel.c deleted file mode 100644 index b488dc97..00000000 --- a/src/libthread/channel.c +++ /dev/null @@ -1,500 +0,0 @@ -#include "threadimpl.h" - -static Lock chanlock; /* central channel access lock */ - -static void enqueue(Alt*, Thread*); -static void dequeue(Alt*); -static int altexec(Alt*); - -int _threadhighnentry; -int _threadnalt; - -static void -setuserpc(ulong pc) -{ - Thread *t; - - t = _threadgetproc()->thread; - if(t) - t->userpc = pc; -} - -static int -canexec(Alt *a) -{ - int i, otherop; - Channel *c; - - c = a->c; - /* are there senders or receivers blocked? */ - otherop = (CHANSND+CHANRCV) - a->op; - for(i=0; inentry; i++) - if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread->altc==nil){ - _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c); - return 1; - } - - /* is there room in the channel? */ - if((a->op==CHANSND && c->n < c->s) - || (a->op==CHANRCV && c->n > 0)){ - _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c); - return 1; - } - - return 0; -} - -static void -_chanfree(Channel *c) -{ - int i, inuse; - - inuse = 0; - for(i = 0; i < c->nentry; i++) - if(c->qentry[i]) - inuse = 1; - if(inuse) - c->freed = 1; - else{ - if(c->qentry) - free(c->qentry); - free(c); - } -} - -void -chanfree(Channel *c) -{ - lock(&chanlock); - _chanfree(c); - unlock(&chanlock); -} - -int -chaninit(Channel *c, int elemsize, int elemcnt) -{ - if(elemcnt < 0 || elemsize <= 0 || c == nil) - return -1; - memset(c, 0, sizeof *c); - c->e = elemsize; - c->s = elemcnt; - _threaddebug(DBGCHAN, "chaninit %p", c); - return 1; -} - -Channel* -chancreate(int elemsize, int elemcnt) -{ - Channel *c; - - if(elemcnt < 0 || elemsize <= 0) - return nil; - c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1); - c->e = elemsize; - c->s = elemcnt; - _threaddebug(DBGCHAN, "chancreate %p", c); - return c; -} - -static int -_alt(Alt *alts) -{ - Alt *a, *xa; - Channel *c; - int n; - Thread *t; - - /* - * The point of going splhi here is that note handlers - * might reasonably want to use channel operations, - * but that will hang if the note comes while we hold the - * chanlock. Instead, we delay the note until we've dropped - * the lock. - */ - - /* - * T might be nil here -- the scheduler sends on threadwaitchan - * directly (in non-blocking mode, of course!). - */ - t = _threadgetproc()->thread; - if((t && t->moribund) || _threadexitsallstatus) - yield(); /* won't return */ - lock(&chanlock); - - /* test whether any channels can proceed */ - n = 0; - a = nil; - - for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){ - xa->entryno = -1; - if(xa->op == CHANNOP) - continue; - - c = xa->c; - if(c==nil){ - unlock(&chanlock); - return -1; - } - if(canexec(xa)) - if(nrand(++n) == 0) - a = xa; - } - - if(a==nil){ - /* nothing can proceed */ - if(xa->op == CHANNOBLK){ - unlock(&chanlock); - _threadnalt++; - return xa - alts; - } - - /* enqueue on all channels. */ - t->altc = nil; - for(xa=alts; xa->op!=CHANEND; xa++){ - if(xa->op==CHANNOP) - continue; - enqueue(xa, t); - } - - /* - * wait for successful rendezvous. - * we can't just give up if the rendezvous - * is interrupted -- someone else might come - * along and try to rendezvous with us, so - * we need to be here. - * - * actually, now we're assuming no interrupts. - */ - /*Again:*/ - t->alt = alts; - t->chan = Chanalt; - t->altrend.l = &chanlock; - _threadsleep(&t->altrend); - - /* dequeue from channels, find selected one */ - a = nil; - c = t->altc; - for(xa=alts; xa->op!=CHANEND; xa++){ - if(xa->op==CHANNOP) - continue; - if(xa->c == c) - a = xa; - dequeue(xa); - } - unlock(&chanlock); - if(a == nil){ /* we were interrupted */ - assert(c==(Channel*)~0); - return -1; - } - }else{ - altexec(a); /* unlocks chanlock, does splx */ - } - if(t) - t->chan = Channone; - return a - alts; -} - -int -alt(Alt *alts) -{ - setuserpc(getcallerpc(&alts)); - return _alt(alts); -} - -static int -runop(int op, Channel *c, void *v, int nb) -{ - int r; - Alt a[2]; - - /* - * we could do this without calling alt, - * but the only reason would be performance, - * and i'm not convinced it matters. - */ - a[0].op = op; - a[0].c = c; - a[0].v = v; - a[1].op = CHANEND; - if(nb) - a[1].op = CHANNOBLK; - switch(r=_alt(a)){ - case -1: /* interrupted */ - return -1; - case 1: /* nonblocking, didn't accomplish anything */ - assert(nb); - return 0; - case 0: - return 1; - default: - fprint(2, "ERROR: channel alt returned %d\n", r); - abort(); - return -1; - } -} - -int -recv(Channel *c, void *v) -{ - setuserpc(getcallerpc(&c)); - return runop(CHANRCV, c, v, 0); -} - -int -nbrecv(Channel *c, void *v) -{ - setuserpc(getcallerpc(&c)); - return runop(CHANRCV, c, v, 1); -} - -int -send(Channel *c, void *v) -{ - setuserpc(getcallerpc(&c)); - return runop(CHANSND, c, v, 0); -} - -int -nbsend(Channel *c, void *v) -{ - setuserpc(getcallerpc(&c)); - return runop(CHANSND, c, v, 1); -} - -static void -channelsize(Channel *c, int sz) -{ - if(c->e != sz){ - fprint(2, "expected channel with elements of size %d, got size %d\n", - sz, c->e); - abort(); - } -} - -int -sendul(Channel *c, ulong v) -{ - setuserpc(getcallerpc(&c)); - channelsize(c, sizeof(ulong)); - return send(c, &v); -} - -ulong -recvul(Channel *c) -{ - ulong v; - - setuserpc(getcallerpc(&c)); - channelsize(c, sizeof(ulong)); - if(runop(CHANRCV, c, &v, 0) < 0) - return ~0; - return v; -} - -int -sendp(Channel *c, void *v) -{ - setuserpc(getcallerpc(&c)); - channelsize(c, sizeof(void*)); - return runop(CHANSND, c, &v, 0); -} - -void* -recvp(Channel *c) -{ - void *v; - - setuserpc(getcallerpc(&c)); - channelsize(c, sizeof(void*)); - if(runop(CHANRCV, c, &v, 0) < 0) - return nil; - return v; -} - -int -nbsendul(Channel *c, ulong v) -{ - setuserpc(getcallerpc(&c)); - channelsize(c, sizeof(ulong)); - return runop(CHANSND, c, &v, 1); -} - -ulong -nbrecvul(Channel *c) -{ - ulong v; - - setuserpc(getcallerpc(&c)); - channelsize(c, sizeof(ulong)); - if(runop(CHANRCV, c, &v, 1) == 0) - return 0; - return v; -} - -int -nbsendp(Channel *c, void *v) -{ - setuserpc(getcallerpc(&c)); - channelsize(c, sizeof(void*)); - return runop(CHANSND, c, &v, 1); -} - -void* -nbrecvp(Channel *c) -{ - void *v; - - setuserpc(getcallerpc(&c)); - channelsize(c, sizeof(void*)); - if(runop(CHANRCV, c, &v, 1) == 0) - return nil; - return v; -} - -static int -emptyentry(Channel *c) -{ - int i, extra; - - assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry)); - - for(i=0; inentry; i++) - if(c->qentry[i]==nil) - return i; - - extra = 16; - c->nentry += extra; -if(c->nentry > _threadhighnentry) _threadhighnentry = c->nentry; - c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0])); - if(c->qentry == nil) - sysfatal("realloc channel entries: %r"); - _threadmemset(&c->qentry[i], 0, extra*sizeof(c->qentry[0])); - return i; -} - -static void -enqueue(Alt *a, Thread *t) -{ - int i; - - _threaddebug(DBGCHAN, "Queueing alt %p on channel %p", a, a->c); - a->thread = t; - i = emptyentry(a->c); - a->c->qentry[i] = a; -} - -static void -dequeue(Alt *a) -{ - int i; - Channel *c; - - c = a->c; - for(i=0; inentry; i++) - if(c->qentry[i]==a){ - _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c); - c->qentry[i] = nil; - if(c->freed) - _chanfree(c); - return; - } -} - -static void* -altexecbuffered(Alt *a, int willreplace) -{ - uchar *v; - Channel *c; - - c = a->c; - /* use buffered channel queue */ - if(a->op==CHANRCV && c->n > 0){ - _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c); - v = c->v + c->e*(c->f%c->s); - if(!willreplace) - c->n--; - c->f++; - return v; - } - if(a->op==CHANSND && c->n < c->s){ - _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c); - v = c->v + c->e*((c->f+c->n)%c->s); - if(!willreplace) - c->n++; - return v; - } - abort(); - return nil; -} - -static void -altcopy(void *dst, void *src, int sz) -{ - if(dst){ - if(src) - memmove(dst, src, sz); - else - _threadmemset(dst, 0, sz); - } -} - -static int -altexec(Alt *a) -{ - volatile Alt *b; - int i, n, otherop; - Channel *c; - void *me, *waiter, *buf; - - c = a->c; - - /* rendezvous with others */ - otherop = (CHANSND+CHANRCV) - a->op; - n = 0; - b = nil; - me = a->v; - for(i=0; inentry; i++) - if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread->altc==nil) - if(nrand(++n) == 0) - b = c->qentry[i]; - if(b != nil){ - _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b); - waiter = b->v; - if(c->s && c->n){ - /* - * if buffer is full and there are waiters - * and we're meeting a waiter, - * we must be receiving. - * - * we use the value in the channel buffer, - * copy the waiter's value into the channel buffer - * on behalf of the waiter, and then wake the waiter. - */ - if(a->op!=CHANRCV) - abort(); - buf = altexecbuffered(a, 1); - altcopy(me, buf, c->e); - altcopy(buf, waiter, c->e); - }else{ - if(a->op==CHANRCV) - altcopy(me, waiter, c->e); - else - altcopy(waiter, me, c->e); - } - b->thread->altc = c; - _threadwakeup(&b->thread->altrend); - _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)(void*)&chanlock); - _threaddebug(DBGCHAN, "unlocking the chanlock"); - unlock(&chanlock); - return 1; - } - - buf = altexecbuffered(a, 0); - if(a->op==CHANRCV) - altcopy(me, buf, c->e); - else - altcopy(buf, me, c->e); - - unlock(&chanlock); - return 1; -} -- cgit v1.2.3