aboutsummaryrefslogtreecommitdiff
path: root/src/libmux
diff options
context:
space:
mode:
Diffstat (limited to 'src/libmux')
-rw-r--r--src/libmux/COPYRIGHT27
-rw-r--r--src/libmux/io.c136
-rw-r--r--src/libmux/mkfile16
-rw-r--r--src/libmux/mux.c152
-rw-r--r--src/libmux/queue.c109
-rw-r--r--src/libmux/thread.c27
6 files changed, 467 insertions, 0 deletions
diff --git a/src/libmux/COPYRIGHT b/src/libmux/COPYRIGHT
new file mode 100644
index 00000000..d9679c3a
--- /dev/null
+++ b/src/libmux/COPYRIGHT
@@ -0,0 +1,27 @@
+
+This software was developed as part of a project at MIT:
+ /sys/src/libmux/*
+ /sys/include/mux.h
+
+Copyright (c) 2003 Russ Cox,
+ Massachusetts Institute of Technology
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
diff --git a/src/libmux/io.c b/src/libmux/io.c
new file mode 100644
index 00000000..3d932b1a
--- /dev/null
+++ b/src/libmux/io.c
@@ -0,0 +1,136 @@
+/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
+/* See COPYRIGHT */
+
+#include <u.h>
+#include <libc.h>
+#include <mux.h>
+
+/*
+ * If you fork off two procs running muxrecvproc and muxsendproc,
+ * then muxrecv/muxsend (and thus muxrpc) will never block except on
+ * rendevouses, which is nice when it's running in one thread of many.
+ */
+void
+_muxrecvproc(void *v)
+{
+ void *p;
+ Mux *mux;
+ Muxqueue *q;
+
+ mux = v;
+ q = _muxqalloc();
+
+ qlock(&mux->lk);
+ mux->readq = q;
+ qlock(&mux->inlk);
+ rwakeup(&mux->rpcfork);
+ qunlock(&mux->lk);
+
+ while((p = mux->recv(mux)) != nil)
+ if(_muxqsend(q, p) < 0){
+ free(p);
+ break;
+ }
+ qunlock(&mux->inlk);
+ qlock(&mux->lk);
+ _muxqhangup(q);
+ while((p = _muxnbqrecv(q)) != nil)
+ free(p);
+ free(q);
+ mux->readq = nil;
+ rwakeup(&mux->rpcfork);
+ qunlock(&mux->lk);
+}
+
+void
+_muxsendproc(void *v)
+{
+ Muxqueue *q;
+ void *p;
+ Mux *mux;
+
+ mux = v;
+ q = _muxqalloc();
+
+ qlock(&mux->lk);
+ mux->writeq = q;
+ qlock(&mux->outlk);
+ rwakeup(&mux->rpcfork);
+ qunlock(&mux->lk);
+
+ while((p = _muxqrecv(q)) != nil)
+ if(mux->send(mux, p) < 0)
+ break;
+ qunlock(&mux->outlk);
+ qlock(&mux->lk);
+ _muxqhangup(q);
+ while((p = _muxnbqrecv(q)) != nil)
+ free(p);
+ free(q);
+ mux->writeq = nil;
+ rwakeup(&mux->rpcfork);
+ qunlock(&mux->lk);
+ return;
+}
+
+void*
+_muxrecv(Mux *mux)
+{
+ void *p;
+
+ qlock(&mux->lk);
+/*
+ if(mux->state != VtStateConnected){
+ werrstr("not connected");
+ qunlock(&mux->lk);
+ return nil;
+ }
+*/
+ if(mux->readq){
+ qunlock(&mux->lk);
+ return _muxqrecv(mux->readq);
+ }
+
+ qlock(&mux->inlk);
+ qunlock(&mux->lk);
+ p = mux->recv(mux);
+ qunlock(&mux->inlk);
+/*
+ if(!p)
+ vthangup(mux);
+*/
+ return p;
+}
+
+int
+_muxsend(Mux *mux, void *p)
+{
+ qlock(&mux->lk);
+/*
+ if(mux->state != VtStateConnected){
+ packetfree(p);
+ werrstr("not connected");
+ qunlock(&mux->lk);
+ return -1;
+ }
+*/
+ if(mux->writeq){
+ qunlock(&mux->lk);
+ if(_muxqsend(mux->writeq, p) < 0){
+ free(p);
+ return -1;
+ }
+ return 0;
+ }
+
+ qlock(&mux->outlk);
+ qunlock(&mux->lk);
+ if(mux->send(mux, p) < 0){
+ qunlock(&mux->outlk);
+ /* vthangup(mux); */
+ return -1;
+ }
+ qunlock(&mux->outlk);
+ return 0;
+}
+
diff --git a/src/libmux/mkfile b/src/libmux/mkfile
new file mode 100644
index 00000000..71d62a43
--- /dev/null
+++ b/src/libmux/mkfile
@@ -0,0 +1,16 @@
+PLAN9=../..
+<$PLAN9/src/mkhdr
+
+LIB=libmux.a
+
+OFILES=\
+ io.$O\
+ mux.$O\
+ queue.$O\
+ thread.$O\
+
+HFILES=\
+ $PLAN9/include/mux.h\
+
+<$PLAN9/src/mksyslib
+
diff --git a/src/libmux/mux.c b/src/libmux/mux.c
new file mode 100644
index 00000000..0d33498e
--- /dev/null
+++ b/src/libmux/mux.c
@@ -0,0 +1,152 @@
+/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
+/* See COPYRIGHT */
+
+/*
+ * Generic RPC packet multiplexor. Inspired by but not derived from
+ * Plan 9 kernel. Originally developed as part of Tra, later used in
+ * libnventi, and then finally split out into a generic library.
+ */
+
+#include <u.h>
+#include <libc.h>
+#include <mux.h>
+
+static int gettag(Mux*, Muxrpc*);
+static void puttag(Mux*, Muxrpc*);
+static void enqueue(Mux*, Muxrpc*);
+static void dequeue(Mux*, Muxrpc*);
+
+void
+muxinit(Mux *mux)
+{
+ mux->tagrend.l = &mux->lk;
+ mux->sleep.next = &mux->sleep;
+ mux->sleep.prev = &mux->sleep;
+}
+
+void*
+muxrpc(Mux *mux, void *tx)
+{
+ uint tag;
+ Muxrpc *r, *r2;
+ void *p;
+
+ /* must malloc because stack could be private */
+ r = mallocz(sizeof(Muxrpc), 1);
+ if(r == nil)
+ return nil;
+ r->r.l = &mux->lk;
+
+ /* assign the tag */
+ tag = gettag(mux, r);
+ if(mux->settag(mux, tx, tag) < 0){
+ puttag(mux, r);
+ free(r);
+ return nil;
+ }
+
+ /* send the packet */
+ if(_muxsend(mux, tx) < 0){
+ puttag(mux, r);
+ free(r);
+ return nil;
+ }
+
+ /* add ourselves to sleep queue */
+ qlock(&mux->lk);
+ enqueue(mux, r);
+
+ /* wait for our packet */
+ while(mux->muxer && !r->p)
+ rsleep(&r->r);
+
+ /* if not done, there's no muxer: start muxing */
+ if(!r->p){
+ if(mux->muxer)
+ abort();
+ mux->muxer = 1;
+ while(!r->p){
+ qunlock(&mux->lk);
+ p = _muxrecv(mux);
+ if(p)
+ tag = mux->gettag(mux, p);
+ else
+ tag = ~0;
+ qlock(&mux->lk);
+ if(p == nil){ /* eof -- just give up and pass the buck */
+ dequeue(mux, r);
+ break;
+ }
+ /* hand packet to correct sleeper */
+ if(tag < 0 || tag >= mux->mwait){
+ fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
+ /* must leak packet! don't know how to free it! */
+ continue;
+ }
+ r2 = mux->wait[tag];
+ r2->p = p;
+ rwakeup(&r2->r);
+ }
+ mux->muxer = 0;
+
+ /* if there is anyone else sleeping, wake them to mux */
+ if(mux->sleep.next != &mux->sleep)
+ rwakeup(&mux->sleep.next->r);
+ }
+ p = r->p;
+ puttag(mux, r);
+ free(r);
+ qunlock(&mux->lk);
+ return p;
+}
+
+static void
+enqueue(Mux *mux, Muxrpc *r)
+{
+ r->next = mux->sleep.next;
+ r->prev = &mux->sleep;
+ r->next->prev = r;
+ r->prev->next = r;
+}
+
+static void
+dequeue(Mux *mux, Muxrpc *r)
+{
+ r->next->prev = r->prev;
+ r->prev->next = r->next;
+ r->prev = nil;
+ r->next = nil;
+}
+
+static int
+gettag(Mux *mux, Muxrpc *r)
+{
+ int i;
+
+Again:
+ while(mux->nwait == mux->mwait)
+ rsleep(&mux->tagrend);
+ i=mux->freetag;
+ if(mux->wait[i] == 0)
+ goto Found;
+ for(i=0; i<mux->mwait; i++)
+ if(mux->wait[i] == 0){
+ Found:
+ mux->nwait++;
+ mux->wait[i] = r;
+ r->tag = i;
+ return i;
+ }
+ fprint(2, "libfs: nwait botch\n");
+ goto Again;
+}
+
+static void
+puttag(Mux *mux, Muxrpc *r)
+{
+ assert(mux->wait[r->tag] == r);
+ mux->wait[r->tag] = nil;
+ mux->nwait--;
+ mux->freetag = r->tag;
+ rwakeup(&mux->tagrend);
+}
diff --git a/src/libmux/queue.c b/src/libmux/queue.c
new file mode 100644
index 00000000..072f1860
--- /dev/null
+++ b/src/libmux/queue.c
@@ -0,0 +1,109 @@
+/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
+/* See COPYRIGHT */
+
+#include <u.h>
+#include <libc.h>
+#include <mux.h>
+
+typedef struct Qel Qel;
+struct Qel
+{
+ Qel *next;
+ void *p;
+};
+
+struct Muxqueue
+{
+ int hungup;
+ QLock lk;
+ Rendez r;
+ Qel *head;
+ Qel *tail;
+};
+
+Muxqueue*
+_muxqalloc(void)
+{
+ Muxqueue *q;
+
+ q = mallocz(sizeof(Muxqueue), 1);
+ if(q == nil)
+ return nil;
+ q->r.l = &q->lk;
+ return q;
+}
+
+int
+_muxqsend(Muxqueue *q, void *p)
+{
+ Qel *e;
+
+ e = malloc(sizeof(Qel));
+ if(e == nil)
+ return -1;
+ qlock(&q->lk);
+ if(q->hungup){
+ werrstr("hungup queue");
+ qunlock(&q->lk);
+ return -1;
+ }
+ e->p = p;
+ e->next = nil;
+ if(q->head == nil)
+ q->head = e;
+ else
+ q->tail->next = e;
+ q->tail = e;
+ rwakeup(&q->r);
+ qunlock(&q->lk);
+ return 0;
+}
+
+void*
+_muxqrecv(Muxqueue *q)
+{
+ void *p;
+ Qel *e;
+
+ qlock(&q->lk);
+ while(q->head == nil && !q->hungup)
+ rsleep(&q->r);
+ if(q->hungup){
+ qunlock(&q->lk);
+ return nil;
+ }
+ e = q->head;
+ q->head = e->next;
+ qunlock(&q->lk);
+ p = e->p;
+ free(e);
+ return p;
+}
+
+void*
+_muxnbqrecv(Muxqueue *q)
+{
+ void *p;
+ Qel *e;
+
+ qlock(&q->lk);
+ if(q->head == nil){
+ qunlock(&q->lk);
+ return nil;
+ }
+ e = q->head;
+ q->head = e->next;
+ qunlock(&q->lk);
+ p = e->p;
+ free(e);
+ return p;
+}
+
+void
+_muxqhangup(Muxqueue *q)
+{
+ qlock(&q->lk);
+ q->hungup = 1;
+ rwakeupall(&q->r);
+ qunlock(&q->lk);
+}
diff --git a/src/libmux/thread.c b/src/libmux/thread.c
new file mode 100644
index 00000000..1c643e06
--- /dev/null
+++ b/src/libmux/thread.c
@@ -0,0 +1,27 @@
+/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
+/* See COPYRIGHT */
+
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+#include <mux.h>
+
+enum
+{
+ STACK = 32768
+};
+
+void
+muxthreads(Mux *mux)
+{
+ proccreate(_muxrecvproc, mux, STACK);
+ qlock(&mux->lk);
+ while(!mux->writeq)
+ rsleep(&mux->rpcfork);
+ qunlock(&mux->lk);
+ proccreate(_muxsendproc, mux, STACK);
+ qlock(&mux->lk);
+ while(!mux->writeq)
+ rsleep(&mux->rpcfork);
+ qunlock(&mux->lk);
+}