diff options
-rw-r--r-- | src/cmd/9pserve.c | 62 |
1 files changed, 36 insertions, 26 deletions
diff --git a/src/cmd/9pserve.c b/src/cmd/9pserve.c index 04e98061..f7b76bf9 100644 --- a/src/cmd/9pserve.c +++ b/src/cmd/9pserve.c @@ -43,6 +43,7 @@ struct Msg { Conn *c; int internal; + int sync; int ref; int ctag; int tag; @@ -73,6 +74,7 @@ struct Conn Hash *fid[NHASH]; Queue *outq; Queue *inq; + Channel *outqdead; int dotu; }; @@ -288,6 +290,7 @@ listenthread(void *arg) c->internal = chancreate(sizeof(void*), 0); c->inq = qalloc(); c->outq = qalloc(); + c->outqdead = chancreate(sizeof(void*), 0); if(verbose) fprint(2, "%T incoming call on %s\n", c->dir); threadcreate(connthread, c, STACK); } @@ -348,7 +351,7 @@ connthread(void *arg) int i, fd; Conn *c; Hash *h, *hnext; - Msg *m, *om, *mm; + Msg *m, *om, *mm, sync; Fid *f; Ioproc *io; @@ -519,15 +522,11 @@ connthread(void *arg) if(verbose) fprint(2, "%T fd#%d eof; flushing conn\n", c->fd); - /* flush the output queue */ - sendq(c->outq, nil); - while(c->outq != nil) - yield(); - /* flush all outstanding messages */ for(i=0; i<NHASH; i++){ - for(h=c->tag[i]; h; h=hnext){ + while((h = c->tag[i]) != nil){ om = h->v; + msgincref(om); /* for us */ m = msgnew(0); m->internal = 1; m->c = c; @@ -543,12 +542,31 @@ connthread(void *arg) assert(mm == m); msgput(m); /* got from recvp */ msgput(m); /* got from msgnew */ - msgput(om); /* got from hash table */ - hnext = h->next; - free(h); + if(delhash(c->tag, om->tag, om) == 0) + msgput(om); /* got from hash table */ + msgput(om); /* got from msgincref */ } } + /* + * outputthread has written all its messages + * to the remote connection (because we've gotten all the replies!), + * but it might not have gotten a chance to msgput + * the very last one. sync up to make sure. + */ + memset(&sync, 0, sizeof sync); + sync.sync = 1; + sync.c = c; + sendq(outq, &sync); + recvp(c->outqdead); + + /* should be no messages left anywhere. */ + assert(c->nmsg == 0); + + /* everything is quiet; can close the local output queue. */ + sendq(c->outq, nil); + recvp(c->outqdead); + /* clunk all outstanding fids */ for(i=0; i<NHASH; i++){ for(h=c->fid[i]; h; h=hnext){ @@ -765,15 +783,13 @@ connoutthread(void *arg) char *ename; int err; Conn *c; - Queue *outq; Msg *m, *om; Ioproc *io; c = arg; - outq = c->outq; io = ioproc(); threadsetname("connout %s", c->dir); - while((m = recvq(outq)) != nil){ + while((m = recvq(c->outq)) != nil){ err = m->tx.type+1 != m->rx.type; if(!err && m->isopenfd) if(xopenfd(m) < 0) @@ -843,8 +859,9 @@ connoutthread(void *arg) nbsendp(c->inc, 0); } closeioproc(io); - free(outq); + free(c->outq); c->outq = nil; + sendp(c->outqdead, nil); } void @@ -857,6 +874,10 @@ outputthread(void *arg) io = ioproc(); threadsetname("output"); while((m = recvq(outq)) != nil){ + if(m->sync){ + sendp(m->c->outqdead, nil); + continue; + } if(verbose > 1) fprint(2, "%T * <- %F\n", &m->tx); rewritehdr(&m->tx, m->tpkt); if(mwrite9p(io, 1, m->tpkt) < 0) @@ -1148,7 +1169,6 @@ struct Qel struct Queue { - int hungup; QLock lk; Rendez r; Qel *head; @@ -1174,12 +1194,6 @@ sendq(Queue *q, void *p) e = emalloc(sizeof(Qel)); qlock(&q->lk); - if(q->hungup){ - free(e); - werrstr("hungup queue"); - qunlock(&q->lk); - return -1; - } e->p = p; e->next = nil; if(q->head == nil) @@ -1199,12 +1213,8 @@ recvq(Queue *q) Qel *e; qlock(&q->lk); - while(q->head == nil && !q->hungup) + while(q->head == nil) rsleep(&q->r); - if(q->hungup){ - qunlock(&q->lk); - return nil; - } e = q->head; q->head = e->next; qunlock(&q->lk); |