aboutsummaryrefslogtreecommitdiff
path: root/src/libthread/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libthread/channel.c')
-rw-r--r--src/libthread/channel.c412
1 files changed, 412 insertions, 0 deletions
diff --git a/src/libthread/channel.c b/src/libthread/channel.c
new file mode 100644
index 00000000..3c9614e6
--- /dev/null
+++ b/src/libthread/channel.c
@@ -0,0 +1,412 @@
+#include "u.h"
+#include "libc.h"
+#include "thread.h"
+#include "threadimpl.h"
+
+/*
+ * One can go through a lot of effort to avoid this global lock.
+ * You have to put locks in all the channels and all the Alt
+ * structures. At the beginning of an alt you have to lock all
+ * the channels, but then to try to actually exec an op you
+ * have to lock the other guy's alt structure, so that other
+ * people aren't trying to use him in some other op at the
+ * same time.
+ *
+ * For Plan 9 apps, it's just not worth the extra effort.
+ */
+static QLock chanlock;
+
+Channel*
+chancreate(int elemsize, int bufsize)
+{
+ Channel *c;
+
+ c = malloc(sizeof *c+bufsize*elemsize);
+ memset(c, 0, sizeof *c);
+ c->elemsize = elemsize;
+ c->bufsize = bufsize;
+ c->nbuf = 0;
+ c->buf = (uchar*)(c+1);
+ return c;
+}
+
+void
+chansetname(Channel *c, char *fmt, ...)
+{
+ char *name;
+ va_list arg;
+
+ va_start(arg, fmt);
+ name = vsmprint(fmt, arg);
+ va_end(arg);
+ free(c->name);
+ c->name = name;
+}
+
+/* bug - work out races */
+void
+chanfree(Channel *c)
+{
+ if(c == nil)
+ return;
+ free(c->name);
+ free(c->arecv.a);
+ free(c->asend.a);
+ free(c);
+}
+
+static void
+addarray(_Altarray *a, Alt *alt)
+{
+ if(a->n == a->m){
+ a->m += 16;
+ a->a = realloc(a->a, a->m*sizeof a->a[0]);
+ }
+ a->a[a->n++] = alt;
+}
+
+static void
+delarray(_Altarray *a, int i)
+{
+ --a->n;
+ a->a[i] = a->a[a->n];
+}
+
+/*
+ * doesn't really work for things other than CHANSND and CHANRCV
+ * but is only used as arg to chanarray, which can handle it
+ */
+#define otherop(op) (CHANSND+CHANRCV-(op))
+
+static _Altarray*
+chanarray(Channel *c, uint op)
+{
+ switch(op){
+ default:
+ return nil;
+ case CHANSND:
+ return &c->asend;
+ case CHANRCV:
+ return &c->arecv;
+ }
+}
+
+static int
+altcanexec(Alt *a)
+{
+ _Altarray *ar;
+ Channel *c;
+
+ if(a->op == CHANNOP)
+ return 0;
+ c = a->c;
+ if(c->bufsize == 0){
+ ar = chanarray(c, otherop(a->op));
+ return ar && ar->n;
+ }else{
+ switch(a->op){
+ default:
+ return 0;
+ case CHANSND:
+ return c->nbuf < c->bufsize;
+ case CHANRCV:
+ return c->nbuf > 0;
+ }
+ }
+}
+
+static void
+altqueue(Alt *a)
+{
+ _Altarray *ar;
+
+ ar = chanarray(a->c, a->op);
+ addarray(ar, a);
+}
+
+static void
+altdequeue(Alt *a)
+{
+ int i;
+ _Altarray *ar;
+
+ ar = chanarray(a->c, a->op);
+ if(ar == nil){
+ fprint(2, "bad use of altdequeue op=%d\n", a->op);
+ abort();
+ }
+
+ for(i=0; i<ar->n; i++)
+ if(ar->a[i] == a){
+ delarray(ar, i);
+ return;
+ }
+ fprint(2, "cannot find self in altdq\n");
+ abort();
+}
+
+static void
+altalldequeue(Alt *a)
+{
+ int i;
+
+ for(i=0; a[i].op!=CHANEND && a[i].op!=CHANNOBLK; i++)
+ if(a[i].op != CHANNOP)
+ altdequeue(&a[i]);
+}
+
+static void
+amove(void *dst, void *src, uint n)
+{
+ if(dst){
+ if(src == nil)
+ memset(dst, 0, n);
+ else
+ memmove(dst, src, n);
+ }
+}
+
+/*
+ * Actually move the data around. There are up to three
+ * players: the sender, the receiver, and the channel itself.
+ * If the channel is unbuffered or the buffer is empty,
+ * data goes from sender to receiver. If the channel is full,
+ * the receiver removes some from the channel and the sender
+ * gets to put some in.
+ */
+static void
+altcopy(Alt *s, Alt *r)
+{
+ Alt *t;
+ Channel *c;
+ uchar *cp;
+
+ /*
+ * Work out who is sender and who is receiver
+ */
+ if(s == nil && r == nil)
+ return;
+ assert(s != nil);
+ c = s->c;
+ if(s->op == CHANRCV){
+ t = s;
+ s = r;
+ r = t;
+ }
+ assert(s==nil || s->op == CHANSND);
+ assert(r==nil || r->op == CHANRCV);
+
+ /*
+ * Channel is empty (or unbuffered) - copy directly.
+ */
+ if(s && r && c->nbuf == 0){
+ amove(r->v, s->v, c->elemsize);
+ return;
+ }
+
+ /*
+ * Otherwise it's always okay to receive and then send.
+ */
+ if(r){
+ cp = c->buf + c->off*c->elemsize;
+ amove(r->v, cp, c->elemsize);
+ --c->nbuf;
+ if(++c->off == c->bufsize)
+ c->off = 0;
+ }
+ if(s){
+ cp = c->buf + (c->off+c->nbuf)%c->bufsize*c->elemsize;
+ amove(cp, s->v, c->elemsize);
+ ++c->nbuf;
+ }
+}
+
+static void
+altexec(Alt *a)
+{
+ int i;
+ _Altarray *ar;
+ Alt *other;
+ Channel *c;
+
+ c = a->c;
+ ar = chanarray(c, otherop(a->op));
+ if(ar && ar->n){
+ i = rand()%ar->n;
+ other = ar->a[i];
+ altcopy(a, other);
+ altalldequeue(other->xalt);
+ other->xalt[0].xalt = other;
+ _threadready(other->thread);
+ }else
+ altcopy(a, nil);
+}
+
+#define dbgalt 0
+int
+chanalt(Alt *a)
+{
+ int i, j, ncan, n, canblock;
+ Channel *c;
+ _Thread *t;
+
+ for(i=0; a[i].op != CHANEND && a[i].op != CHANNOBLK; i++)
+ ;
+ n = i;
+ canblock = a[i].op == CHANEND;
+
+ t = proc()->thread;
+ for(i=0; i<n; i++){
+ a[i].thread = t;
+ a[i].xalt = a;
+ }
+ qlock(&chanlock);
+if(dbgalt) print("alt ");
+ ncan = 0;
+ for(i=0; i<n; i++){
+ c = a[i].c;
+if(dbgalt) print(" %c:", "esrnb"[a[i].op]);
+if(dbgalt) if(c->name) print("%s", c->name); else print("%p", c);
+ if(altcanexec(&a[i])){
+if(dbgalt) print("*");
+ ncan++;
+ }
+ }
+ if(ncan){
+ j = rand()%ncan;
+ for(i=0; i<n; i++){
+ if(altcanexec(&a[i])){
+ if(j-- == 0){
+if(dbgalt){
+c = a[i].c;
+print(" => %c:", "esrnb"[a[i].op]);
+if(c->name) print("%s", c->name); else print("%p", c);
+print("\n");
+}
+ altexec(&a[i]);
+ qunlock(&chanlock);
+ return i;
+ }
+ }
+ }
+ }
+if(dbgalt)print("\n");
+
+ if(!canblock){
+ qunlock(&chanlock);
+ return -1;
+ }
+
+ for(i=0; i<n; i++){
+ if(a[i].op != CHANNOP)
+ altqueue(&a[i]);
+ }
+ qunlock(&chanlock);
+
+ _threadswitch();
+
+ /*
+ * the guy who ran the op took care of dequeueing us
+ * and then set a[0].alt to the one that was executed.
+ */
+ return a[0].xalt - a;
+}
+
+static int
+_chanop(Channel *c, int op, void *p, int canblock)
+{
+ Alt a[2];
+
+ a[0].c = c;
+ a[0].op = op;
+ a[0].v = p;
+ a[1].op = canblock ? CHANEND : CHANNOBLK;
+ if(chanalt(a) < 0)
+ return -1;
+ return 1;
+}
+
+int
+chansend(Channel *c, void *v)
+{
+ return _chanop(c, CHANSND, v, 1);
+}
+
+int
+channbsend(Channel *c, void *v)
+{
+ return _chanop(c, CHANSND, v, 0);
+}
+
+int
+chanrecv(Channel *c, void *v)
+{
+ return _chanop(c, CHANRCV, v, 1);
+}
+
+int
+channbrecv(Channel *c, void *v)
+{
+ return _chanop(c, CHANRCV, v, 0);
+}
+
+int
+chansendp(Channel *c, void *v)
+{
+ return _chanop(c, CHANSND, (void*)&v, 1);
+}
+
+void*
+chanrecvp(Channel *c)
+{
+ void *v;
+
+ _chanop(c, CHANRCV, (void*)&v, 1);
+ return v;
+}
+
+int
+channbsendp(Channel *c, void *v)
+{
+ return _chanop(c, CHANSND, (void*)&v, 0);
+}
+
+void*
+channbrecvp(Channel *c)
+{
+ void *v;
+
+ _chanop(c, CHANRCV, (void*)&v, 0);
+ return v;
+}
+
+int
+chansendul(Channel *c, ulong val)
+{
+ return _chanop(c, CHANSND, &val, 1);
+}
+
+ulong
+chanrecvul(Channel *c)
+{
+ ulong val;
+
+ _chanop(c, CHANRCV, &val, 1);
+ return val;
+}
+
+int
+channbsendul(Channel *c, ulong val)
+{
+ return _chanop(c, CHANSND, &val, 0);
+}
+
+ulong
+channbrecvul(Channel *c)
+{
+ ulong val;
+
+ _chanop(c, CHANRCV, &val, 0);
+ return val;
+}
+