From 619085f0b4a85104ef6c7496f9ce1f46e9b17c82 Mon Sep 17 00:00:00 2001 From: rsc Date: Sat, 25 Dec 2004 21:57:50 +0000 Subject: more new libthread --- src/libthread/channel.c | 412 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 412 insertions(+) create mode 100644 src/libthread/channel.c (limited to 'src/libthread/channel.c') diff --git a/src/libthread/channel.c b/src/libthread/channel.c new file mode 100644 index 00000000..3c9614e6 --- /dev/null +++ b/src/libthread/channel.c @@ -0,0 +1,412 @@ +#include "u.h" +#include "libc.h" +#include "thread.h" +#include "threadimpl.h" + +/* + * One can go through a lot of effort to avoid this global lock. + * You have to put locks in all the channels and all the Alt + * structures. At the beginning of an alt you have to lock all + * the channels, but then to try to actually exec an op you + * have to lock the other guy's alt structure, so that other + * people aren't trying to use him in some other op at the + * same time. + * + * For Plan 9 apps, it's just not worth the extra effort. + */ +static QLock chanlock; + +Channel* +chancreate(int elemsize, int bufsize) +{ + Channel *c; + + c = malloc(sizeof *c+bufsize*elemsize); + memset(c, 0, sizeof *c); + c->elemsize = elemsize; + c->bufsize = bufsize; + c->nbuf = 0; + c->buf = (uchar*)(c+1); + return c; +} + +void +chansetname(Channel *c, char *fmt, ...) +{ + char *name; + va_list arg; + + va_start(arg, fmt); + name = vsmprint(fmt, arg); + va_end(arg); + free(c->name); + c->name = name; +} + +/* bug - work out races */ +void +chanfree(Channel *c) +{ + if(c == nil) + return; + free(c->name); + free(c->arecv.a); + free(c->asend.a); + free(c); +} + +static void +addarray(_Altarray *a, Alt *alt) +{ + if(a->n == a->m){ + a->m += 16; + a->a = realloc(a->a, a->m*sizeof a->a[0]); + } + a->a[a->n++] = alt; +} + +static void +delarray(_Altarray *a, int i) +{ + --a->n; + a->a[i] = a->a[a->n]; +} + +/* + * doesn't really work for things other than CHANSND and CHANRCV + * but is only used as arg to chanarray, which can handle it + */ +#define otherop(op) (CHANSND+CHANRCV-(op)) + +static _Altarray* +chanarray(Channel *c, uint op) +{ + switch(op){ + default: + return nil; + case CHANSND: + return &c->asend; + case CHANRCV: + return &c->arecv; + } +} + +static int +altcanexec(Alt *a) +{ + _Altarray *ar; + Channel *c; + + if(a->op == CHANNOP) + return 0; + c = a->c; + if(c->bufsize == 0){ + ar = chanarray(c, otherop(a->op)); + return ar && ar->n; + }else{ + switch(a->op){ + default: + return 0; + case CHANSND: + return c->nbuf < c->bufsize; + case CHANRCV: + return c->nbuf > 0; + } + } +} + +static void +altqueue(Alt *a) +{ + _Altarray *ar; + + ar = chanarray(a->c, a->op); + addarray(ar, a); +} + +static void +altdequeue(Alt *a) +{ + int i; + _Altarray *ar; + + ar = chanarray(a->c, a->op); + if(ar == nil){ + fprint(2, "bad use of altdequeue op=%d\n", a->op); + abort(); + } + + for(i=0; in; i++) + if(ar->a[i] == a){ + delarray(ar, i); + return; + } + fprint(2, "cannot find self in altdq\n"); + abort(); +} + +static void +altalldequeue(Alt *a) +{ + int i; + + for(i=0; a[i].op!=CHANEND && a[i].op!=CHANNOBLK; i++) + if(a[i].op != CHANNOP) + altdequeue(&a[i]); +} + +static void +amove(void *dst, void *src, uint n) +{ + if(dst){ + if(src == nil) + memset(dst, 0, n); + else + memmove(dst, src, n); + } +} + +/* + * Actually move the data around. There are up to three + * players: the sender, the receiver, and the channel itself. + * If the channel is unbuffered or the buffer is empty, + * data goes from sender to receiver. If the channel is full, + * the receiver removes some from the channel and the sender + * gets to put some in. + */ +static void +altcopy(Alt *s, Alt *r) +{ + Alt *t; + Channel *c; + uchar *cp; + + /* + * Work out who is sender and who is receiver + */ + if(s == nil && r == nil) + return; + assert(s != nil); + c = s->c; + if(s->op == CHANRCV){ + t = s; + s = r; + r = t; + } + assert(s==nil || s->op == CHANSND); + assert(r==nil || r->op == CHANRCV); + + /* + * Channel is empty (or unbuffered) - copy directly. + */ + if(s && r && c->nbuf == 0){ + amove(r->v, s->v, c->elemsize); + return; + } + + /* + * Otherwise it's always okay to receive and then send. + */ + if(r){ + cp = c->buf + c->off*c->elemsize; + amove(r->v, cp, c->elemsize); + --c->nbuf; + if(++c->off == c->bufsize) + c->off = 0; + } + if(s){ + cp = c->buf + (c->off+c->nbuf)%c->bufsize*c->elemsize; + amove(cp, s->v, c->elemsize); + ++c->nbuf; + } +} + +static void +altexec(Alt *a) +{ + int i; + _Altarray *ar; + Alt *other; + Channel *c; + + c = a->c; + ar = chanarray(c, otherop(a->op)); + if(ar && ar->n){ + i = rand()%ar->n; + other = ar->a[i]; + altcopy(a, other); + altalldequeue(other->xalt); + other->xalt[0].xalt = other; + _threadready(other->thread); + }else + altcopy(a, nil); +} + +#define dbgalt 0 +int +chanalt(Alt *a) +{ + int i, j, ncan, n, canblock; + Channel *c; + _Thread *t; + + for(i=0; a[i].op != CHANEND && a[i].op != CHANNOBLK; i++) + ; + n = i; + canblock = a[i].op == CHANEND; + + t = proc()->thread; + for(i=0; iname) print("%s", c->name); else print("%p", c); + if(altcanexec(&a[i])){ +if(dbgalt) print("*"); + ncan++; + } + } + if(ncan){ + j = rand()%ncan; + for(i=0; i %c:", "esrnb"[a[i].op]); +if(c->name) print("%s", c->name); else print("%p", c); +print("\n"); +} + altexec(&a[i]); + qunlock(&chanlock); + return i; + } + } + } + } +if(dbgalt)print("\n"); + + if(!canblock){ + qunlock(&chanlock); + return -1; + } + + for(i=0; i