aboutsummaryrefslogtreecommitdiff
path: root/src/libmux
diff options
context:
space:
mode:
Diffstat (limited to 'src/libmux')
-rw-r--r--src/libmux/io.c43
-rw-r--r--src/libmux/mux.c38
-rw-r--r--src/libmux/queue.c10
3 files changed, 50 insertions, 41 deletions
diff --git a/src/libmux/io.c b/src/libmux/io.c
index 4a89ca22..d9d9d8a6 100644
--- a/src/libmux/io.c
+++ b/src/libmux/io.c
@@ -34,7 +34,7 @@ _muxrecvproc(void *v)
qunlock(&mux->inlk);
qlock(&mux->lk);
_muxqhangup(q);
- while((p = _muxnbqrecv(q)) != nil)
+ while(_muxnbqrecv(q, &p))
free(p);
free(q);
mux->readq = nil;
@@ -64,7 +64,7 @@ _muxsendproc(void *v)
qunlock(&mux->outlk);
qlock(&mux->lk);
_muxqhangup(q);
- while((p = _muxnbqrecv(q)) != nil)
+ while(_muxnbqrecv(q, &p))
free(p);
free(q);
mux->writeq = nil;
@@ -73,42 +73,39 @@ _muxsendproc(void *v)
return;
}
-void*
-_muxrecv(Mux *mux, int canblock)
+int
+_muxrecv(Mux *mux, int canblock, void **vp)
{
void *p;
+ int ret;
qlock(&mux->lk);
-/*
- if(mux->state != VtStateConnected){
- werrstr("not connected");
- qunlock(&mux->lk);
- return nil;
- }
-*/
if(mux->readq){
qunlock(&mux->lk);
- if(canblock)
- return _muxqrecv(mux->readq);
- return _muxnbqrecv(mux->readq);
+ if(canblock){
+ *vp = _muxqrecv(mux->readq);
+ return 1;
+ }
+ return _muxnbqrecv(mux->readq, vp);
}
qlock(&mux->inlk);
qunlock(&mux->lk);
- if(canblock)
+ if(canblock){
p = mux->recv(mux);
- else{
+ ret = 1;
+ }else{
if(mux->nbrecv)
- p = mux->nbrecv(mux);
- else
+ ret = mux->nbrecv(mux, &p);
+ else{
+ /* send eof, not "no packet ready" */
p = nil;
+ ret = 1;
+ }
}
qunlock(&mux->inlk);
-/*
- if(!p && canblock)
- vthangup(mux);
-*/
- return p;
+ *vp = p;
+ return ret;
}
int
diff --git a/src/libmux/mux.c b/src/libmux/mux.c
index bfabb238..8257fb0e 100644
--- a/src/libmux/mux.c
+++ b/src/libmux/mux.c
@@ -1,4 +1,4 @@
-/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
+/* Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technology */
/* See COPYRIGHT */
/*
@@ -100,12 +100,17 @@ muxmsgandqlock(Mux *mux, void *p)
void
electmuxer(Mux *mux)
{
+ Muxrpc *rpc;
+
/* if there is anyone else sleeping, wake them to mux */
- if(mux->sleep.next != &mux->sleep){
- mux->muxer = mux->sleep.next;
- rwakeup(&mux->muxer->r);
- }else
- mux->muxer = nil;
+ for(rpc=mux->sleep.next; rpc != &mux->sleep; rpc = rpc->next){
+ if(!rpc->async){
+ mux->muxer = rpc;
+ rwakeup(&rpc->r);
+ return;
+ }
+ }
+ mux->muxer = nil;
}
void*
@@ -133,7 +138,7 @@ muxrpc(Mux *mux, void *tx)
mux->muxer = r;
while(!r->p){
qunlock(&mux->lk);
- p = _muxrecv(mux, 1);
+ _muxrecv(mux, 1, &p);
if(p == nil){
/* eof -- just give up and pass the buck */
qlock(&mux->lk);
@@ -144,7 +149,6 @@ muxrpc(Mux *mux, void *tx)
}
electmuxer(mux);
}
-/*print("finished %p\n", r); */
p = r->p;
puttag(mux, r);
qunlock(&mux->lk);
@@ -161,24 +165,29 @@ muxrpcstart(Mux *mux, void *tx)
if((r = allocmuxrpc(mux)) == nil)
return nil;
+ r->async = 1;
if((tag = tagmuxrpc(r, tx)) < 0)
return nil;
return r;
}
-void*
-muxrpccanfinish(Muxrpc *r)
+int
+muxrpccanfinish(Muxrpc *r, void **vp)
{
- char *p;
+ void *p;
Mux *mux;
-
+ int ret;
+
mux = r->mux;
qlock(&mux->lk);
+ ret = 1;
if(!r->p && !mux->muxer){
mux->muxer = r;
while(!r->p){
qunlock(&mux->lk);
- p = _muxrecv(mux, 0);
+ p = nil;
+ if(!_muxrecv(mux, 0, &p))
+ ret = 0;
if(p == nil){
qlock(&mux->lk);
break;
@@ -191,7 +200,8 @@ muxrpccanfinish(Muxrpc *r)
if(p)
puttag(mux, r);
qunlock(&mux->lk);
- return p;
+ *vp = p;
+ return ret;
}
static void
diff --git a/src/libmux/queue.c b/src/libmux/queue.c
index 1cadbe6c..2151c252 100644
--- a/src/libmux/queue.c
+++ b/src/libmux/queue.c
@@ -81,8 +81,8 @@ _muxqrecv(Muxqueue *q)
return p;
}
-void*
-_muxnbqrecv(Muxqueue *q)
+int
+_muxnbqrecv(Muxqueue *q, void **vp)
{
void *p;
Qel *e;
@@ -90,14 +90,16 @@ _muxnbqrecv(Muxqueue *q)
qlock(&q->lk);
if(q->head == nil){
qunlock(&q->lk);
- return nil;
+ *vp = nil;
+ return q->hungup;
}
e = q->head;
q->head = e->next;
qunlock(&q->lk);
p = e->p;
free(e);
- return p;
+ *vp = p;
+ return 1;
}
void