From d3df308747ee4d1fcc063a348dcf1146b390bda7 Mon Sep 17 00:00:00 2001 From: rsc Date: Sat, 6 Dec 2003 18:08:52 +0000 Subject: File system stuff. --- src/libmux/mux.c | 152 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 src/libmux/mux.c (limited to 'src/libmux/mux.c') diff --git a/src/libmux/mux.c b/src/libmux/mux.c new file mode 100644 index 00000000..0d33498e --- /dev/null +++ b/src/libmux/mux.c @@ -0,0 +1,152 @@ +/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */ +/* See COPYRIGHT */ + +/* + * Generic RPC packet multiplexor. Inspired by but not derived from + * Plan 9 kernel. Originally developed as part of Tra, later used in + * libnventi, and then finally split out into a generic library. + */ + +#include +#include +#include + +static int gettag(Mux*, Muxrpc*); +static void puttag(Mux*, Muxrpc*); +static void enqueue(Mux*, Muxrpc*); +static void dequeue(Mux*, Muxrpc*); + +void +muxinit(Mux *mux) +{ + mux->tagrend.l = &mux->lk; + mux->sleep.next = &mux->sleep; + mux->sleep.prev = &mux->sleep; +} + +void* +muxrpc(Mux *mux, void *tx) +{ + uint tag; + Muxrpc *r, *r2; + void *p; + + /* must malloc because stack could be private */ + r = mallocz(sizeof(Muxrpc), 1); + if(r == nil) + return nil; + r->r.l = &mux->lk; + + /* assign the tag */ + tag = gettag(mux, r); + if(mux->settag(mux, tx, tag) < 0){ + puttag(mux, r); + free(r); + return nil; + } + + /* send the packet */ + if(_muxsend(mux, tx) < 0){ + puttag(mux, r); + free(r); + return nil; + } + + /* add ourselves to sleep queue */ + qlock(&mux->lk); + enqueue(mux, r); + + /* wait for our packet */ + while(mux->muxer && !r->p) + rsleep(&r->r); + + /* if not done, there's no muxer: start muxing */ + if(!r->p){ + if(mux->muxer) + abort(); + mux->muxer = 1; + while(!r->p){ + qunlock(&mux->lk); + p = _muxrecv(mux); + if(p) + tag = mux->gettag(mux, p); + else + tag = ~0; + qlock(&mux->lk); + if(p == nil){ /* eof -- just give up and pass the buck */ + dequeue(mux, r); + break; + } + /* hand packet to correct sleeper */ + if(tag < 0 || tag >= mux->mwait){ + fprint(2, "%s: bad rpc tag %ux\n", argv0, tag); + /* must leak packet! don't know how to free it! */ + continue; + } + r2 = mux->wait[tag]; + r2->p = p; + rwakeup(&r2->r); + } + mux->muxer = 0; + + /* if there is anyone else sleeping, wake them to mux */ + if(mux->sleep.next != &mux->sleep) + rwakeup(&mux->sleep.next->r); + } + p = r->p; + puttag(mux, r); + free(r); + qunlock(&mux->lk); + return p; +} + +static void +enqueue(Mux *mux, Muxrpc *r) +{ + r->next = mux->sleep.next; + r->prev = &mux->sleep; + r->next->prev = r; + r->prev->next = r; +} + +static void +dequeue(Mux *mux, Muxrpc *r) +{ + r->next->prev = r->prev; + r->prev->next = r->next; + r->prev = nil; + r->next = nil; +} + +static int +gettag(Mux *mux, Muxrpc *r) +{ + int i; + +Again: + while(mux->nwait == mux->mwait) + rsleep(&mux->tagrend); + i=mux->freetag; + if(mux->wait[i] == 0) + goto Found; + for(i=0; imwait; i++) + if(mux->wait[i] == 0){ + Found: + mux->nwait++; + mux->wait[i] = r; + r->tag = i; + return i; + } + fprint(2, "libfs: nwait botch\n"); + goto Again; +} + +static void +puttag(Mux *mux, Muxrpc *r) +{ + assert(mux->wait[r->tag] == r); + mux->wait[r->tag] = nil; + mux->nwait--; + mux->freetag = r->tag; + rwakeup(&mux->tagrend); +} -- cgit v1.2.3