diff options
Diffstat (limited to 'src/libmux')
-rw-r--r-- | src/libmux/io.c | 17 | ||||
-rw-r--r-- | src/libmux/mux.c | 158 |
2 files changed, 132 insertions, 43 deletions
diff --git a/src/libmux/io.c b/src/libmux/io.c index 3d932b1a..4a89ca22 100644 --- a/src/libmux/io.c +++ b/src/libmux/io.c @@ -74,7 +74,7 @@ _muxsendproc(void *v) } void* -_muxrecv(Mux *mux) +_muxrecv(Mux *mux, int canblock) { void *p; @@ -88,15 +88,24 @@ _muxrecv(Mux *mux) */ if(mux->readq){ qunlock(&mux->lk); - return _muxqrecv(mux->readq); + if(canblock) + return _muxqrecv(mux->readq); + return _muxnbqrecv(mux->readq); } qlock(&mux->inlk); qunlock(&mux->lk); - p = mux->recv(mux); + if(canblock) + p = mux->recv(mux); + else{ + if(mux->nbrecv) + p = mux->nbrecv(mux); + else + p = nil; + } qunlock(&mux->inlk); /* - if(!p) + if(!p && canblock) vthangup(mux); */ return p; diff --git a/src/libmux/mux.c b/src/libmux/mux.c index e6438b01..bfabb238 100644 --- a/src/libmux/mux.c +++ b/src/libmux/mux.c @@ -26,21 +26,31 @@ muxinit(Mux *mux) mux->sleep.prev = &mux->sleep; } -void* -muxrpc(Mux *mux, void *tx) +static Muxrpc* +allocmuxrpc(Mux *mux) { - int tag; - Muxrpc *r, *r2; - void *p; - + Muxrpc *r; + /* must malloc because stack could be private */ r = mallocz(sizeof(Muxrpc), 1); if(r == nil){ werrstr("mallocz: %r"); return nil; } + r->mux = mux; r->r.l = &mux->lk; + r->waiting = 1; + + return r; +} +static int +tagmuxrpc(Muxrpc *r, void *tx) +{ + int tag; + Mux *mux; + + mux = r->mux; /* assign the tag, add selves to response queue */ qlock(&mux->lk); tag = gettag(mux, r); @@ -56,54 +66,83 @@ muxrpc(Mux *mux, void *tx) dequeue(mux, r); puttag(mux, r); qunlock(&mux->lk); - return nil; + return -1; } + return 0; +} + +void +muxmsgandqlock(Mux *mux, void *p) +{ + int tag; + Muxrpc *r2; + + tag = mux->gettag(mux, p) - mux->mintag; +/*print("mux tag %d\n", tag); */ + qlock(&mux->lk); + /* hand packet to correct sleeper */ + if(tag < 0 || tag >= mux->mwait){ + fprint(2, "%s: bad rpc tag %ux\n", argv0, tag); + /* must leak packet! don't know how to free it! */ + return; + } + r2 = mux->wait[tag]; + if(r2 == nil || r2->prev == nil){ + fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag); + /* must leak packet! don't know how to free it! */ + return; + } + r2->p = p; + dequeue(mux, r2); + rwakeup(&r2->r); +} + +void +electmuxer(Mux *mux) +{ + /* if there is anyone else sleeping, wake them to mux */ + if(mux->sleep.next != &mux->sleep){ + mux->muxer = mux->sleep.next; + rwakeup(&mux->muxer->r); + }else + mux->muxer = nil; +} + +void* +muxrpc(Mux *mux, void *tx) +{ + int tag; + Muxrpc *r; + void *p; + + if((r = allocmuxrpc(mux)) == nil) + return nil; + + if((tag = tagmuxrpc(r, tx)) < 0) + return nil; qlock(&mux->lk); /* wait for our packet */ - while(mux->muxer && !r->p){ + while(mux->muxer && mux->muxer != r && !r->p) rsleep(&r->r); - } /* if not done, there's no muxer: start muxing */ if(!r->p){ - if(mux->muxer) + if(mux->muxer != nil && mux->muxer != r) abort(); - mux->muxer = 1; + mux->muxer = r; while(!r->p){ qunlock(&mux->lk); - p = _muxrecv(mux); - if(p) - tag = mux->gettag(mux, p) - mux->mintag; - else - tag = ~0; -/*print("mux tag %d\n", tag); */ - qlock(&mux->lk); - if(p == nil){ /* eof -- just give up and pass the buck */ + p = _muxrecv(mux, 1); + if(p == nil){ + /* eof -- just give up and pass the buck */ + qlock(&mux->lk); dequeue(mux, r); break; } - /* hand packet to correct sleeper */ - if(tag < 0 || tag >= mux->mwait){ - fprint(2, "%s: bad rpc tag %ux\n", argv0, tag); - /* must leak packet! don't know how to free it! */ - continue; - } - r2 = mux->wait[tag]; - if(r2 == nil || r2->prev == nil){ - fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag); - /* must leak packet! don't know how to free it! */ - continue; - } - r2->p = p; - dequeue(mux, r2); - rwakeup(&r2->r); + muxmsgandqlock(mux, p); } - mux->muxer = 0; - - /* if there is anyone else sleeping, wake them to mux */ - if(mux->sleep.next != &mux->sleep) - rwakeup(&mux->sleep.next->r); + electmuxer(mux); } /*print("finished %p\n", r); */ p = r->p; @@ -114,6 +153,47 @@ muxrpc(Mux *mux, void *tx) return p; } +Muxrpc* +muxrpcstart(Mux *mux, void *tx) +{ + int tag; + Muxrpc *r; + + if((r = allocmuxrpc(mux)) == nil) + return nil; + if((tag = tagmuxrpc(r, tx)) < 0) + return nil; + return r; +} + +void* +muxrpccanfinish(Muxrpc *r) +{ + char *p; + Mux *mux; + + mux = r->mux; + qlock(&mux->lk); + if(!r->p && !mux->muxer){ + mux->muxer = r; + while(!r->p){ + qunlock(&mux->lk); + p = _muxrecv(mux, 0); + if(p == nil){ + qlock(&mux->lk); + break; + } + muxmsgandqlock(mux, p); + } + electmuxer(mux); + } + p = r->p; + if(p) + puttag(mux, r); + qunlock(&mux->lk); + return p; +} + static void enqueue(Mux *mux, Muxrpc *r) { |