diff options
Diffstat (limited to 'src/libmux')
-rw-r--r-- | src/libmux/COPYRIGHT | 27 | ||||
-rw-r--r-- | src/libmux/io.c | 136 | ||||
-rw-r--r-- | src/libmux/mkfile | 16 | ||||
-rw-r--r-- | src/libmux/mux.c | 152 | ||||
-rw-r--r-- | src/libmux/queue.c | 109 | ||||
-rw-r--r-- | src/libmux/thread.c | 27 |
6 files changed, 467 insertions, 0 deletions
diff --git a/src/libmux/COPYRIGHT b/src/libmux/COPYRIGHT new file mode 100644 index 00000000..d9679c3a --- /dev/null +++ b/src/libmux/COPYRIGHT @@ -0,0 +1,27 @@ + +This software was developed as part of a project at MIT: + /sys/src/libmux/* + /sys/include/mux.h + +Copyright (c) 2003 Russ Cox, + Massachusetts Institute of Technology + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + diff --git a/src/libmux/io.c b/src/libmux/io.c new file mode 100644 index 00000000..3d932b1a --- /dev/null +++ b/src/libmux/io.c @@ -0,0 +1,136 @@ +/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */ +/* See COPYRIGHT */ + +#include <u.h> +#include <libc.h> +#include <mux.h> + +/* + * If you fork off two procs running muxrecvproc and muxsendproc, + * then muxrecv/muxsend (and thus muxrpc) will never block except on + * rendevouses, which is nice when it's running in one thread of many. + */ +void +_muxrecvproc(void *v) +{ + void *p; + Mux *mux; + Muxqueue *q; + + mux = v; + q = _muxqalloc(); + + qlock(&mux->lk); + mux->readq = q; + qlock(&mux->inlk); + rwakeup(&mux->rpcfork); + qunlock(&mux->lk); + + while((p = mux->recv(mux)) != nil) + if(_muxqsend(q, p) < 0){ + free(p); + break; + } + qunlock(&mux->inlk); + qlock(&mux->lk); + _muxqhangup(q); + while((p = _muxnbqrecv(q)) != nil) + free(p); + free(q); + mux->readq = nil; + rwakeup(&mux->rpcfork); + qunlock(&mux->lk); +} + +void +_muxsendproc(void *v) +{ + Muxqueue *q; + void *p; + Mux *mux; + + mux = v; + q = _muxqalloc(); + + qlock(&mux->lk); + mux->writeq = q; + qlock(&mux->outlk); + rwakeup(&mux->rpcfork); + qunlock(&mux->lk); + + while((p = _muxqrecv(q)) != nil) + if(mux->send(mux, p) < 0) + break; + qunlock(&mux->outlk); + qlock(&mux->lk); + _muxqhangup(q); + while((p = _muxnbqrecv(q)) != nil) + free(p); + free(q); + mux->writeq = nil; + rwakeup(&mux->rpcfork); + qunlock(&mux->lk); + return; +} + +void* +_muxrecv(Mux *mux) +{ + void *p; + + qlock(&mux->lk); +/* + if(mux->state != VtStateConnected){ + werrstr("not connected"); + qunlock(&mux->lk); + return nil; + } +*/ + if(mux->readq){ + qunlock(&mux->lk); + return _muxqrecv(mux->readq); + } + + qlock(&mux->inlk); + qunlock(&mux->lk); + p = mux->recv(mux); + qunlock(&mux->inlk); +/* + if(!p) + vthangup(mux); +*/ + return p; +} + +int +_muxsend(Mux *mux, void *p) +{ + qlock(&mux->lk); +/* + if(mux->state != VtStateConnected){ + packetfree(p); + werrstr("not connected"); + qunlock(&mux->lk); + return -1; + } +*/ + if(mux->writeq){ + qunlock(&mux->lk); + if(_muxqsend(mux->writeq, p) < 0){ + free(p); + return -1; + } + return 0; + } + + qlock(&mux->outlk); + qunlock(&mux->lk); + if(mux->send(mux, p) < 0){ + qunlock(&mux->outlk); + /* vthangup(mux); */ + return -1; + } + qunlock(&mux->outlk); + return 0; +} + diff --git a/src/libmux/mkfile b/src/libmux/mkfile new file mode 100644 index 00000000..71d62a43 --- /dev/null +++ b/src/libmux/mkfile @@ -0,0 +1,16 @@ +PLAN9=../.. +<$PLAN9/src/mkhdr + +LIB=libmux.a + +OFILES=\ + io.$O\ + mux.$O\ + queue.$O\ + thread.$O\ + +HFILES=\ + $PLAN9/include/mux.h\ + +<$PLAN9/src/mksyslib + diff --git a/src/libmux/mux.c b/src/libmux/mux.c new file mode 100644 index 00000000..0d33498e --- /dev/null +++ b/src/libmux/mux.c @@ -0,0 +1,152 @@ +/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */ +/* See COPYRIGHT */ + +/* + * Generic RPC packet multiplexor. Inspired by but not derived from + * Plan 9 kernel. Originally developed as part of Tra, later used in + * libnventi, and then finally split out into a generic library. + */ + +#include <u.h> +#include <libc.h> +#include <mux.h> + +static int gettag(Mux*, Muxrpc*); +static void puttag(Mux*, Muxrpc*); +static void enqueue(Mux*, Muxrpc*); +static void dequeue(Mux*, Muxrpc*); + +void +muxinit(Mux *mux) +{ + mux->tagrend.l = &mux->lk; + mux->sleep.next = &mux->sleep; + mux->sleep.prev = &mux->sleep; +} + +void* +muxrpc(Mux *mux, void *tx) +{ + uint tag; + Muxrpc *r, *r2; + void *p; + + /* must malloc because stack could be private */ + r = mallocz(sizeof(Muxrpc), 1); + if(r == nil) + return nil; + r->r.l = &mux->lk; + + /* assign the tag */ + tag = gettag(mux, r); + if(mux->settag(mux, tx, tag) < 0){ + puttag(mux, r); + free(r); + return nil; + } + + /* send the packet */ + if(_muxsend(mux, tx) < 0){ + puttag(mux, r); + free(r); + return nil; + } + + /* add ourselves to sleep queue */ + qlock(&mux->lk); + enqueue(mux, r); + + /* wait for our packet */ + while(mux->muxer && !r->p) + rsleep(&r->r); + + /* if not done, there's no muxer: start muxing */ + if(!r->p){ + if(mux->muxer) + abort(); + mux->muxer = 1; + while(!r->p){ + qunlock(&mux->lk); + p = _muxrecv(mux); + if(p) + tag = mux->gettag(mux, p); + else + tag = ~0; + qlock(&mux->lk); + if(p == nil){ /* eof -- just give up and pass the buck */ + dequeue(mux, r); + break; + } + /* hand packet to correct sleeper */ + if(tag < 0 || tag >= mux->mwait){ + fprint(2, "%s: bad rpc tag %ux\n", argv0, tag); + /* must leak packet! don't know how to free it! */ + continue; + } + r2 = mux->wait[tag]; + r2->p = p; + rwakeup(&r2->r); + } + mux->muxer = 0; + + /* if there is anyone else sleeping, wake them to mux */ + if(mux->sleep.next != &mux->sleep) + rwakeup(&mux->sleep.next->r); + } + p = r->p; + puttag(mux, r); + free(r); + qunlock(&mux->lk); + return p; +} + +static void +enqueue(Mux *mux, Muxrpc *r) +{ + r->next = mux->sleep.next; + r->prev = &mux->sleep; + r->next->prev = r; + r->prev->next = r; +} + +static void +dequeue(Mux *mux, Muxrpc *r) +{ + r->next->prev = r->prev; + r->prev->next = r->next; + r->prev = nil; + r->next = nil; +} + +static int +gettag(Mux *mux, Muxrpc *r) +{ + int i; + +Again: + while(mux->nwait == mux->mwait) + rsleep(&mux->tagrend); + i=mux->freetag; + if(mux->wait[i] == 0) + goto Found; + for(i=0; i<mux->mwait; i++) + if(mux->wait[i] == 0){ + Found: + mux->nwait++; + mux->wait[i] = r; + r->tag = i; + return i; + } + fprint(2, "libfs: nwait botch\n"); + goto Again; +} + +static void +puttag(Mux *mux, Muxrpc *r) +{ + assert(mux->wait[r->tag] == r); + mux->wait[r->tag] = nil; + mux->nwait--; + mux->freetag = r->tag; + rwakeup(&mux->tagrend); +} diff --git a/src/libmux/queue.c b/src/libmux/queue.c new file mode 100644 index 00000000..072f1860 --- /dev/null +++ b/src/libmux/queue.c @@ -0,0 +1,109 @@ +/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */ +/* See COPYRIGHT */ + +#include <u.h> +#include <libc.h> +#include <mux.h> + +typedef struct Qel Qel; +struct Qel +{ + Qel *next; + void *p; +}; + +struct Muxqueue +{ + int hungup; + QLock lk; + Rendez r; + Qel *head; + Qel *tail; +}; + +Muxqueue* +_muxqalloc(void) +{ + Muxqueue *q; + + q = mallocz(sizeof(Muxqueue), 1); + if(q == nil) + return nil; + q->r.l = &q->lk; + return q; +} + +int +_muxqsend(Muxqueue *q, void *p) +{ + Qel *e; + + e = malloc(sizeof(Qel)); + if(e == nil) + return -1; + qlock(&q->lk); + if(q->hungup){ + werrstr("hungup queue"); + qunlock(&q->lk); + return -1; + } + e->p = p; + e->next = nil; + if(q->head == nil) + q->head = e; + else + q->tail->next = e; + q->tail = e; + rwakeup(&q->r); + qunlock(&q->lk); + return 0; +} + +void* +_muxqrecv(Muxqueue *q) +{ + void *p; + Qel *e; + + qlock(&q->lk); + while(q->head == nil && !q->hungup) + rsleep(&q->r); + if(q->hungup){ + qunlock(&q->lk); + return nil; + } + e = q->head; + q->head = e->next; + qunlock(&q->lk); + p = e->p; + free(e); + return p; +} + +void* +_muxnbqrecv(Muxqueue *q) +{ + void *p; + Qel *e; + + qlock(&q->lk); + if(q->head == nil){ + qunlock(&q->lk); + return nil; + } + e = q->head; + q->head = e->next; + qunlock(&q->lk); + p = e->p; + free(e); + return p; +} + +void +_muxqhangup(Muxqueue *q) +{ + qlock(&q->lk); + q->hungup = 1; + rwakeupall(&q->r); + qunlock(&q->lk); +} diff --git a/src/libmux/thread.c b/src/libmux/thread.c new file mode 100644 index 00000000..1c643e06 --- /dev/null +++ b/src/libmux/thread.c @@ -0,0 +1,27 @@ +/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */ +/* See COPYRIGHT */ + +#include <u.h> +#include <libc.h> +#include <thread.h> +#include <mux.h> + +enum +{ + STACK = 32768 +}; + +void +muxthreads(Mux *mux) +{ + proccreate(_muxrecvproc, mux, STACK); + qlock(&mux->lk); + while(!mux->writeq) + rsleep(&mux->rpcfork); + qunlock(&mux->lk); + proccreate(_muxsendproc, mux, STACK); + qlock(&mux->lk); + while(!mux->writeq) + rsleep(&mux->rpcfork); + qunlock(&mux->lk); +} |