aboutsummaryrefslogtreecommitdiff
path: root/src/libmux
diff options
context:
space:
mode:
Diffstat (limited to 'src/libmux')
-rw-r--r--src/libmux/io.c17
-rw-r--r--src/libmux/mux.c158
2 files changed, 132 insertions, 43 deletions
diff --git a/src/libmux/io.c b/src/libmux/io.c
index 3d932b1a..4a89ca22 100644
--- a/src/libmux/io.c
+++ b/src/libmux/io.c
@@ -74,7 +74,7 @@ _muxsendproc(void *v)
}
void*
-_muxrecv(Mux *mux)
+_muxrecv(Mux *mux, int canblock)
{
void *p;
@@ -88,15 +88,24 @@ _muxrecv(Mux *mux)
*/
if(mux->readq){
qunlock(&mux->lk);
- return _muxqrecv(mux->readq);
+ if(canblock)
+ return _muxqrecv(mux->readq);
+ return _muxnbqrecv(mux->readq);
}
qlock(&mux->inlk);
qunlock(&mux->lk);
- p = mux->recv(mux);
+ if(canblock)
+ p = mux->recv(mux);
+ else{
+ if(mux->nbrecv)
+ p = mux->nbrecv(mux);
+ else
+ p = nil;
+ }
qunlock(&mux->inlk);
/*
- if(!p)
+ if(!p && canblock)
vthangup(mux);
*/
return p;
diff --git a/src/libmux/mux.c b/src/libmux/mux.c
index e6438b01..bfabb238 100644
--- a/src/libmux/mux.c
+++ b/src/libmux/mux.c
@@ -26,21 +26,31 @@ muxinit(Mux *mux)
mux->sleep.prev = &mux->sleep;
}
-void*
-muxrpc(Mux *mux, void *tx)
+static Muxrpc*
+allocmuxrpc(Mux *mux)
{
- int tag;
- Muxrpc *r, *r2;
- void *p;
-
+ Muxrpc *r;
+
/* must malloc because stack could be private */
r = mallocz(sizeof(Muxrpc), 1);
if(r == nil){
werrstr("mallocz: %r");
return nil;
}
+ r->mux = mux;
r->r.l = &mux->lk;
+ r->waiting = 1;
+
+ return r;
+}
+static int
+tagmuxrpc(Muxrpc *r, void *tx)
+{
+ int tag;
+ Mux *mux;
+
+ mux = r->mux;
/* assign the tag, add selves to response queue */
qlock(&mux->lk);
tag = gettag(mux, r);
@@ -56,54 +66,83 @@ muxrpc(Mux *mux, void *tx)
dequeue(mux, r);
puttag(mux, r);
qunlock(&mux->lk);
- return nil;
+ return -1;
}
+ return 0;
+}
+
+void
+muxmsgandqlock(Mux *mux, void *p)
+{
+ int tag;
+ Muxrpc *r2;
+
+ tag = mux->gettag(mux, p) - mux->mintag;
+/*print("mux tag %d\n", tag); */
+ qlock(&mux->lk);
+ /* hand packet to correct sleeper */
+ if(tag < 0 || tag >= mux->mwait){
+ fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
+ /* must leak packet! don't know how to free it! */
+ return;
+ }
+ r2 = mux->wait[tag];
+ if(r2 == nil || r2->prev == nil){
+ fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
+ /* must leak packet! don't know how to free it! */
+ return;
+ }
+ r2->p = p;
+ dequeue(mux, r2);
+ rwakeup(&r2->r);
+}
+
+void
+electmuxer(Mux *mux)
+{
+ /* if there is anyone else sleeping, wake them to mux */
+ if(mux->sleep.next != &mux->sleep){
+ mux->muxer = mux->sleep.next;
+ rwakeup(&mux->muxer->r);
+ }else
+ mux->muxer = nil;
+}
+
+void*
+muxrpc(Mux *mux, void *tx)
+{
+ int tag;
+ Muxrpc *r;
+ void *p;
+
+ if((r = allocmuxrpc(mux)) == nil)
+ return nil;
+
+ if((tag = tagmuxrpc(r, tx)) < 0)
+ return nil;
qlock(&mux->lk);
/* wait for our packet */
- while(mux->muxer && !r->p){
+ while(mux->muxer && mux->muxer != r && !r->p)
rsleep(&r->r);
- }
/* if not done, there's no muxer: start muxing */
if(!r->p){
- if(mux->muxer)
+ if(mux->muxer != nil && mux->muxer != r)
abort();
- mux->muxer = 1;
+ mux->muxer = r;
while(!r->p){
qunlock(&mux->lk);
- p = _muxrecv(mux);
- if(p)
- tag = mux->gettag(mux, p) - mux->mintag;
- else
- tag = ~0;
-/*print("mux tag %d\n", tag); */
- qlock(&mux->lk);
- if(p == nil){ /* eof -- just give up and pass the buck */
+ p = _muxrecv(mux, 1);
+ if(p == nil){
+ /* eof -- just give up and pass the buck */
+ qlock(&mux->lk);
dequeue(mux, r);
break;
}
- /* hand packet to correct sleeper */
- if(tag < 0 || tag >= mux->mwait){
- fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
- /* must leak packet! don't know how to free it! */
- continue;
- }
- r2 = mux->wait[tag];
- if(r2 == nil || r2->prev == nil){
- fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
- /* must leak packet! don't know how to free it! */
- continue;
- }
- r2->p = p;
- dequeue(mux, r2);
- rwakeup(&r2->r);
+ muxmsgandqlock(mux, p);
}
- mux->muxer = 0;
-
- /* if there is anyone else sleeping, wake them to mux */
- if(mux->sleep.next != &mux->sleep)
- rwakeup(&mux->sleep.next->r);
+ electmuxer(mux);
}
/*print("finished %p\n", r); */
p = r->p;
@@ -114,6 +153,47 @@ muxrpc(Mux *mux, void *tx)
return p;
}
+Muxrpc*
+muxrpcstart(Mux *mux, void *tx)
+{
+ int tag;
+ Muxrpc *r;
+
+ if((r = allocmuxrpc(mux)) == nil)
+ return nil;
+ if((tag = tagmuxrpc(r, tx)) < 0)
+ return nil;
+ return r;
+}
+
+void*
+muxrpccanfinish(Muxrpc *r)
+{
+ char *p;
+ Mux *mux;
+
+ mux = r->mux;
+ qlock(&mux->lk);
+ if(!r->p && !mux->muxer){
+ mux->muxer = r;
+ while(!r->p){
+ qunlock(&mux->lk);
+ p = _muxrecv(mux, 0);
+ if(p == nil){
+ qlock(&mux->lk);
+ break;
+ }
+ muxmsgandqlock(mux, p);
+ }
+ electmuxer(mux);
+ }
+ p = r->p;
+ if(p)
+ puttag(mux, r);
+ qunlock(&mux->lk);
+ return p;
+}
+
static void
enqueue(Mux *mux, Muxrpc *r)
{