diff options
-rw-r--r-- | src/cmd/9pserve.c | 469 | ||||
-rw-r--r-- | src/libfs/fs.c | 13 | ||||
-rw-r--r-- | src/libfs/read.c | 3 | ||||
-rw-r--r-- | src/libfs/write.c | 28 | ||||
-rw-r--r-- | src/libmux/mux.c | 86 |
5 files changed, 492 insertions, 107 deletions
diff --git a/src/cmd/9pserve.c b/src/cmd/9pserve.c index 40db17e5..ad77236e 100644 --- a/src/cmd/9pserve.c +++ b/src/cmd/9pserve.c @@ -70,13 +70,14 @@ char adir[40]; int isunix; Queue *outq; Queue *inq; +int verbose; void *gethash(Hash**, uint); int puthash(Hash**, uint, void*); int delhash(Hash**, uint, void*); -Msg *mread9p(int); -int mwrite9p(int, Msg*); -uchar *read9ppkt(int); +Msg *mread9p(Ioproc*, int); +int mwrite9p(Ioproc*, int, uchar*); +uchar *read9ppkt(Ioproc*, int); int write9ppkt(int, uchar*); Msg *msgnew(void); void msgput(Msg*); @@ -85,29 +86,50 @@ Fid *fidnew(int); void fidput(Fid*); void *emalloc(int); void *erealloc(void*, int); +Queue *qalloc(void); int sendq(Queue*, void*); void *recvq(Queue*); void selectthread(void*); void connthread(void*); +void connoutthread(void*); void listenthread(void*); +void outputthread(void*); +void inputthread(void*); void rewritehdr(Fcall*, uchar*); int tlisten(char*, char*); int taccept(int, char*); +int iolisten(Ioproc*, char*, char*); +int ioaccept(Ioproc*, int, char*); void usage(void) { - fprint(2, "usage: 9pserve [-u] address\n"); + fprint(2, "usage: 9pserve [-s service] [-u] address\n"); fprint(2, "\treads/writes 9P messages on stdin/stdout\n"); exits("usage"); } +uchar vbuf[128]; + void threadmain(int argc, char **argv) { + char *file; + int n; + Fcall f; + ARGBEGIN{ default: usage(); + case 'v': + verbose++; + break; + case 's': + close(0); + if(open(file=EARGF(usage()), ORDWR) != 0) + sysfatal("open %s: %r", file); + dup(0, 1); + break; case 'u': isunix = 1; break; @@ -115,45 +137,68 @@ threadmain(int argc, char **argv) if(argc != 1) usage(); + addr = argv[0]; if((afd = announce(addr, adir)) < 0) sysfatal("announce %s: %r", addr); - threadcreateidle(selectthread, nil, STACK); + fmtinstall('D', dirfmt); + fmtinstall('M', dirmodefmt); + fmtinstall('F', fcallfmt); + fmtinstall('H', encodefmt); + + outq = qalloc(); + inq = qalloc(); + +// threadcreateidle(selectthread, nil, STACK); + threadcreate(inputthread, nil, STACK); + threadcreate(outputthread, nil, STACK); + + f.type = Tversion; + f.version = "9P2000"; + f.msize = 8192; + f.tag = NOTAG; + n = convS2M(&f, vbuf, sizeof vbuf); + if(verbose > 1) fprint(2, "* <- %F\n", &f); + write(1, vbuf, n); + n = read9pmsg(0, vbuf, sizeof vbuf); + if(convM2S(vbuf, n, &f) != n) + sysfatal("convM2S failure"); + if(verbose > 1) fprint(2, "* -> %F\n", &f); + threadcreate(listenthread, nil, STACK); } void listenthread(void *arg) { Conn *c; + Ioproc *io; + io = ioproc(); USED(arg); for(;;){ - c = malloc(sizeof(Conn)); - if(c == nil){ - fprint(2, "out of memory\n"); - sleep(60*1000); - continue; - } - c->fd = tlisten(adir, c->dir); + c = emalloc(sizeof(Conn)); + c->inc = chancreate(sizeof(void*), 0); + c->internal = chancreate(sizeof(void*), 0); + c->inq = qalloc(); + c->outq = qalloc(); + c->fd = iolisten(io, adir, c->dir); if(c->fd < 0){ - fprint(2, "listen: %r\n"); + if(verbose) fprint(2, "listen: %r\n"); close(afd); free(c); return; } + if(verbose) fprint(2, "incoming call on %s\n", c->dir); threadcreate(connthread, c, STACK); } } void -err(Msg *m, char *ename) +sendmsg(Msg *m) { int n, nn; - m->rx.type = Rerror; - m->rx.ename = ename; - m->rx.tag = m->ctag; n = sizeS2M(&m->rx); m->rpkt = emalloc(n); nn = convS2M(&m->rx, m->rpkt, n); @@ -163,6 +208,28 @@ err(Msg *m, char *ename) } void +sendomsg(Msg *m) +{ + int n, nn; + + n = sizeS2M(&m->tx); + m->tpkt = emalloc(n); + nn = convS2M(&m->tx, m->tpkt, n); + if(nn != n) + sysfatal("sizeS2M + convS2M disagree"); + sendq(outq, m); +} + +void +err(Msg *m, char *ename) +{ + m->rx.type = Rerror; + m->rx.ename = ename; + m->rx.tag = m->tx.tag; + sendmsg(m); +} + +void connthread(void *arg) { int i, fd; @@ -170,31 +237,54 @@ connthread(void *arg) Hash *h; Msg *m, *om; Fid *f; + Ioproc *io; c = arg; - fd = taccept(c->fd, c->dir); + io = ioproc(); + fd = ioaccept(io, c->fd, c->dir); if(fd < 0){ - fprint(2, "accept %s: %r\n", c->dir); + if(verbose) fprint(2, "accept %s: %r\n", c->dir); goto out; } close(c->fd); c->fd = fd; - while((m = mread9p(c->fd)) != nil){ + threadcreate(connoutthread, c, STACK); + while((m = mread9p(io, c->fd)) != nil){ + if(verbose > 1) fprint(2, "%s -> %F\n", c->dir, &m->tx); m->c = c; + m->ctag = m->tx.tag; c->nmsg++; if(puthash(c->tag, m->tx.tag, m) < 0){ err(m, "duplicate tag"); continue; } + m->ref++; switch(m->tx.type){ + case Tversion: + m->rx.tag = m->tx.tag; + m->rx.msize = m->tx.msize; + if(m->rx.msize > 8192) + m->rx.msize = 8192; + m->rx.version = "9P2000"; + m->rx.type = Rversion; + sendmsg(m); + continue; case Tflush: if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){ - m->rx.tag = Rflush; - sendq(c->outq, m); + m->rx.tag = m->tx.tag; + m->rx.type = Rflush; + sendmsg(m); continue; } + m->oldm->ref++; break; case Tattach: + m->afid = nil; + if(m->tx.afid != NOFID + && (m->afid = gethash(c->fid, m->tx.afid)) == nil){ + err(m, "unknown fid"); + continue; + } m->fid = fidnew(m->tx.fid); if(puthash(c->fid, m->tx.fid, m->fid) < 0){ err(m, "duplicate fid"); @@ -207,6 +297,7 @@ connthread(void *arg) err(m, "unknown fid"); continue; } + m->fid->ref++; if(m->tx.newfid == m->tx.fid){ m->fid->ref++; m->newfid = m->fid; @@ -220,21 +311,19 @@ connthread(void *arg) } break; case Tauth: - if((m->afid = gethash(c->fid, m->tx.afid)) == nil){ - err(m, "unknown fid"); - continue; - } - m->fid = fidnew(m->tx.fid); - if(puthash(c->fid, m->tx.fid, m->fid) < 0){ + m->afid = fidnew(m->tx.afid); + if(puthash(c->fid, m->tx.afid, m->afid) < 0){ err(m, "duplicate fid"); continue; } - m->fid->ref++; + m->afid->ref++; break; + case Tcreate: case Topen: case Tclunk: case Tread: case Twrite: + case Tremove: case Tstat: case Twstat: if((m->fid = gethash(c->fid, m->tx.fid)) == nil){ @@ -257,7 +346,7 @@ connthread(void *arg) m->tx.afid = m->afid->fid; if(m->oldm) m->tx.oldtag = m->oldm->tag; - rewritehdr(&m->tx, m->tpkt); + /* reference passes to outq */ sendq(outq, m); while(c->nmsg >= MAXMSG){ c->inputstalled = 1; @@ -265,6 +354,8 @@ connthread(void *arg) } } + if(verbose) fprint(2, "%s eof\n", c->dir); + /* flush all outstanding messages */ for(i=0; i<NHASH; i++){ for(h=c->tag[i]; h; h=h->next){ @@ -277,8 +368,10 @@ connthread(void *arg) m->tx.oldtag = om->tag; m->oldm = om; om->ref++; - sendq(outq, m); + m->ref++; /* for outq */ + sendomsg(m); recvp(c->internal); + msgput(m); } } @@ -294,8 +387,10 @@ connthread(void *arg) m->tx.fid = f->fid; m->fid = f; f->ref++; - sendq(outq, m); + m->ref++; + sendomsg(m); recvp(c->internal); + msgput(m); } } @@ -312,57 +407,72 @@ connoutthread(void *arg) int err; Conn *c; Msg *m, *om; + Ioproc *io; c = arg; + io = ioproc(); while((m = recvq(c->outq)) != nil){ err = m->tx.type+1 != m->rx.type; switch(m->tx.type){ case Tflush: om = m->oldm; - if(delhash(om->c->tag, om->ctag, om) == 0) - msgput(om); + if(om) + if(delhash(om->c->tag, om->ctag, om) == 0) + msgput(om); break; case Tclunk: - if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0) - fidput(m->fid); + case Tremove: + if(m->fid) + if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0) + fidput(m->fid); break; case Tauth: - if(err) - if(delhash(m->c->fid, m->afid->cfid, m->fid) == 0) + if(err && m->afid){ + fprint(2, "auth error\n"); + if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0) fidput(m->fid); + } + break; case Tattach: - if(err) + if(err && m->fid) if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0) fidput(m->fid); break; case Twalk: - if(err && m->tx.fid != m->tx.newfid) + if(err && m->tx.fid != m->tx.newfid && m->newfid) if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0) fidput(m->newfid); break; } - if(mwrite9p(c->fd, m) < 0) - fprint(2, "write error: %r\n"); if(delhash(m->c->tag, m->ctag, m) == 0) msgput(m); + if(verbose > 1) fprint(2, "%s <- %F\n", c->dir, &m->rx); + rewritehdr(&m->rx, m->rpkt); + if(mwrite9p(io, c->fd, m->rpkt) < 0) + if(verbose) fprint(2, "write error: %r\n"); msgput(m); if(c->inputstalled && c->nmsg < MAXMSG) nbsendp(c->inc, 0); } + closeioproc(io); } void outputthread(void *arg) { Msg *m; + Ioproc *io; USED(arg); - + io = ioproc(); while((m = recvq(outq)) != nil){ - if(mwrite9p(1, m) < 0) + if(verbose > 1) fprint(2, "* <- %F\n", &m->tx); + rewritehdr(&m->tx, m->tpkt); + if(mwrite9p(io, 1, m->tpkt) < 0) sysfatal("output error: %r"); msgput(m); } + closeioproc(io); } void @@ -371,19 +481,22 @@ inputthread(void *arg) uchar *pkt; int n, nn, tag; Msg *m; + Ioproc *io; - while((pkt = read9ppkt(0)) != nil){ + io = ioproc(); + USED(arg); + while((pkt = read9ppkt(io, 0)) != nil){ n = GBIT32(pkt); if(n < 7){ - fprint(2, "short 9P packet\n"); + fprint(2, "short 9P packet from server\n"); free(pkt); continue; } + if(verbose > 2) fprint(2, "read %.*H\n", n, pkt); tag = GBIT16(pkt+5); if((m = msgget(tag)) == nil){ fprint(2, "unexpected 9P response tag %d\n", tag); free(pkt); - msgput(m); continue; } if((nn = convM2S(pkt, n, &m->rx)) != n){ @@ -392,11 +505,15 @@ inputthread(void *arg) msgput(m); continue; } + if(verbose > 1) fprint(2, "* -> %F\n", &m->rx); m->rpkt = pkt; m->rx.tag = m->ctag; - rewritehdr(&m->rx, m->rpkt); - sendq(m->c->outq, m); + if(m->internal) + sendp(m->c->internal, 0); + else + sendq(m->c->outq, m); } + closeioproc(io); } void* @@ -417,8 +534,10 @@ delhash(Hash **ht, uint n, void *v) for(l=&ht[n%NHASH]; h=*l; l=&h->next) if(h->n == n){ - if(h->v != v) - fprint(2, "hash error\n"); + if(h->v != v){ + if(verbose) fprint(2, "delhash %d got %p want %p\n", n, h->v, v); + return -1; + } *l = h->next; free(h); return 0; @@ -451,13 +570,14 @@ fidnew(int cfid) Fid *f; if(freefid == nil){ - fidtab = erealloc(fidtab, nfidtab*sizeof(fidtab[0])); + fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0])); fidtab[nfidtab] = emalloc(sizeof(Fid)); - freefid = fidtab[nfidtab++]; + freefid = fidtab[nfidtab]; + freefid->fid = nfidtab++; } f = freefid; freefid = f->next; - f->cfid = f->cfid; + f->cfid = cfid; f->ref = 1; return f; } @@ -465,6 +585,8 @@ fidnew(int cfid) void fidput(Fid *f) { + if(f == nil) + return; assert(f->ref > 0); if(--f->ref > 0) return; @@ -483,9 +605,10 @@ msgnew(void) Msg *m; if(freemsg == nil){ - msgtab = erealloc(msgtab, nmsgtab*sizeof(msgtab[0])); + msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0])); msgtab[nmsgtab] = emalloc(sizeof(Msg)); - freemsg = msgtab[nmsgtab++]; + freemsg = msgtab[nmsgtab]; + freemsg->tag = nmsgtab++; } m = freemsg; freemsg = m->next; @@ -496,21 +619,51 @@ msgnew(void) void msgput(Msg *m) { + if(verbose > 2) fprint(2, "msgput tag %d/%d ref %d\n", m->tag, m->ctag, m->ref); assert(m->ref > 0); if(--m->ref > 0) return; + m->c->nmsg--; + m->c = nil; + fidput(m->fid); + fidput(m->afid); + fidput(m->newfid); + free(m->tpkt); + free(m->rpkt); + m->fid = nil; + m->afid = nil; + m->newfid = nil; + m->tpkt = nil; + m->rpkt = nil; m->next = freemsg; freemsg = m; } +Msg* +msgget(int n) +{ + Msg *m; + + if(n < 0 || n >= nmsgtab) + return nil; + m = msgtab[n]; + if(m->ref == 0) + return nil; + m->ref++; + return m; +} + + void* emalloc(int n) { void *v; v = mallocz(n, 1); - if(v == nil) - sysfatal("out of memory"); + if(v == nil){ + abort(); + sysfatal("out of memory allocating %d", n); + } return v; } @@ -518,8 +671,10 @@ void* erealloc(void *v, int n) { v = realloc(v, n); - if(v == nil) - sysfatal("out of memory"); + if(v == nil){ + abort(); + sysfatal("out of memory reallocating %d", n); + } return v; } @@ -595,3 +750,197 @@ recvq(Queue *q) free(e); return p; } + +uchar* +read9ppkt(Ioproc *io, int fd) +{ + uchar buf[4], *pkt; + int n, nn; + + n = ioreadn(io, fd, buf, 4); + if(n != 4) + return nil; + n = GBIT32(buf); + pkt = emalloc(n); + PBIT32(pkt, n); + nn = ioreadn(io, fd, pkt+4, n-4); + if(nn != n-4){ + free(pkt); + return nil; + } + return pkt; +} + +Msg* +mread9p(Ioproc *io, int fd) +{ + int n, nn; + uchar *pkt; + Msg *m; + + if((pkt = read9ppkt(io, fd)) == nil) + return nil; + + m = msgnew(); + m->tpkt = pkt; + n = GBIT32(pkt); + nn = convM2S(pkt, n, &m->tx); + if(nn != n){ + fprint(2, "read bad packet from %d\n", fd); + return nil; + } + return m; +} + +int +mwrite9p(Ioproc *io, int fd, uchar *pkt) +{ + int n; + + n = GBIT32(pkt); + if(verbose > 2) fprint(2, "write %d %d %.*H\n", fd, n, n, pkt); + if(iowrite(io, fd, pkt, n) != n){ + fprint(2, "write error: %r\n"); + return -1; + } + return 0; +} + +void +restring(uchar *pkt, int pn, char *s) +{ + int n; + + if(s < (char*)pkt || s >= (char*)pkt+pn) + return; + + n = strlen(s); + memmove(s+1, s, n); + PBIT16((uchar*)s-1, n); +} + +void +rewritehdr(Fcall *f, uchar *pkt) +{ + int i, n; + + n = GBIT32(pkt); + PBIT16(pkt+5, f->tag); + switch(f->type){ + case Tversion: + case Rversion: + restring(pkt, n, f->version); + break; + case Tauth: + PBIT32(pkt+7, f->afid); + restring(pkt, n, f->uname); + restring(pkt, n, f->aname); + break; + case Tflush: + PBIT16(pkt+7, f->oldtag); + break; + case Tattach: + restring(pkt, n, f->uname); + restring(pkt, n, f->aname); + PBIT32(pkt+7, f->fid); + PBIT32(pkt+11, f->afid); + break; + case Twalk: + PBIT32(pkt+7, f->fid); + PBIT32(pkt+11, f->newfid); + for(i=0; i<f->nwname; i++) + restring(pkt, n, f->wname[i]); + break; + case Tcreate: + restring(pkt, n, f->name); + /* fall through */ + case Topen: + case Tread: + case Twrite: + case Tclunk: + case Tremove: + case Tstat: + case Twstat: + PBIT32(pkt+7, f->fid); + break; + case Rerror: + restring(pkt, n, f->ename); + break; + } +} + +#ifdef _LIB9_H_ +/* unix select-based polling */ +Ioproc* +ioproc(void) +{ + return nil; +} + +long +ioread(Ioproc *io, int fd, void *v, long n) +{ + USED(io); + + xxx; +} + +long +iowrite(Ioproc *io, int fd, void *v, long n) +{ + USED(io); + + xxx; +} + +int +iolisten(Ioproc *io, char *a, char *b) +{ + USED(io); + + xxx; +} + +int +ioaccept(Ioproc *io, int fd, char *dir) +{ + USED(io); + + xxx; +} + +#else +/* real plan 9 io procs */ +static long +_iolisten(va_list *arg) +{ + char *a, *b; + + a = va_arg(*arg, char*); + b = va_arg(*arg, char*); + return listen(a, b); +} + +int +iolisten(Ioproc *io, char *a, char *b) +{ + return iocall(io, _iolisten, a, b); +} + +static long +_ioaccept(va_list *arg) +{ + int fd; + char *dir; + + fd = va_arg(*arg, int); + dir = va_arg(*arg, char*); + return accept(fd, dir); +} + +int +ioaccept(Ioproc *io, int fd, char *dir) +{ + return iocall(io, _ioaccept, fd, dir); +} +#endif diff --git a/src/libfs/fs.c b/src/libfs/fs.c index 985071c1..c06e19d8 100644 --- a/src/libfs/fs.c +++ b/src/libfs/fs.c @@ -22,6 +22,10 @@ fsinit(int fd) { Fsys *fs; + fmtinstall('F', fcallfmt); + fmtinstall('D', dirfmt); + fmtinstall('M', dirmodefmt); + fs = mallocz(sizeof(Fsys), 1); if(fs == nil) return nil; @@ -141,8 +145,12 @@ fsrpc(Fsys *fs, Fcall *tx, Fcall *rx, void **freep) n = sizeS2M(tx); tpkt = malloc(n); +fprint(2, "tpkt %p\n", tpkt); + if(freep) + *freep = nil; if(tpkt == nil) return -1; + fprint(2, "<- %F\n", tx); nn = convS2M(tx, tpkt, n); if(nn != n){ free(tpkt); @@ -151,7 +159,9 @@ fsrpc(Fsys *fs, Fcall *tx, Fcall *rx, void **freep) return -1; } rpkt = muxrpc(&fs->mux, tpkt); +fprint(2, "tpkt %p\n", tpkt); free(tpkt); +fprint(2, "tpkt freed\n"); if(rpkt == nil) return -1; n = GBIT32((uchar*)rpkt); @@ -162,6 +172,7 @@ fsrpc(Fsys *fs, Fcall *tx, Fcall *rx, void **freep) fprint(2, "%r\n"); return -1; } + fprint(2, "-> %F\n", rx); if(rx->type == Rerror){ werrstr("%s", rx->ename); free(rpkt); @@ -261,7 +272,7 @@ _fsrecv(Mux *mux) fprint(2, "libfs out of memory reading 9p packet; here comes trouble\n"); return nil; } - PBIT32(buf, n); + PBIT32(pkt, n); if(readn(fs->fd, pkt+4, n-4) != n-4){ free(pkt); return nil; diff --git a/src/libfs/read.c b/src/libfs/read.c index ca6c628c..1ef2cb3a 100644 --- a/src/libfs/read.c +++ b/src/libfs/read.c @@ -14,6 +14,7 @@ fspread(Fid *fid, void *buf, long n, vlong offset) void *freep; tx.type = Tread; + tx.fid = fid->fid; if(offset == -1){ qlock(&fid->lk); tx.offset = fid->offset; @@ -32,7 +33,7 @@ fspread(Fid *fid, void *buf, long n, vlong offset) memmove(buf, rx.data, rx.count); if(offset == -1){ qlock(&fid->lk); - tx.offset += n; + fid->offset += rx.count; qunlock(&fid->lk); } } diff --git a/src/libfs/write.c b/src/libfs/write.c index 96ecfa86..5652b491 100644 --- a/src/libfs/write.c +++ b/src/libfs/write.c @@ -8,39 +8,39 @@ #include "fsimpl.h" long -fspwrite(Fid *fd, void *buf, long n, vlong offset) +fspwrite(Fid *fid, void *buf, long n, vlong offset) { Fcall tx, rx; void *freep; - tx.type = Tread; + tx.type = Twrite; + tx.fid = fid->fid; if(offset == -1){ - qlock(&fd->lk); - tx.offset = fd->offset; - fd->offset += n; - qunlock(&fd->lk); + qlock(&fid->lk); + tx.offset = fid->offset; + qunlock(&fid->lk); }else tx.offset = offset; tx.count = n; tx.data = buf; - fsrpc(fd->fs, &tx, &rx, &freep); + fsrpc(fid->fs, &tx, &rx, &freep); if(rx.type == Rerror){ - if(offset == -1){ - qlock(&fd->lk); - fd->offset -= n; - qunlock(&fd->lk); - } werrstr("%s", rx.ename); free(freep); return -1; } + if(offset == -1 && rx.count){ + qlock(&fid->lk); + fid->offset += rx.count; + qunlock(&fid->lk); + } free(freep); return rx.count; } long -fswrite(Fid *fd, void *buf, long n) +fswrite(Fid *fid, void *buf, long n) { - return fspwrite(fd, buf, n, -1); + return fspwrite(fid, buf, n, -1); } diff --git a/src/libmux/mux.c b/src/libmux/mux.c index 0d33498e..7a21b444 100644 --- a/src/libmux/mux.c +++ b/src/libmux/mux.c @@ -27,7 +27,7 @@ muxinit(Mux *mux) void* muxrpc(Mux *mux, void *tx) { - uint tag; + int tag; Muxrpc *r, *r2; void *p; @@ -38,17 +38,13 @@ muxrpc(Mux *mux, void *tx) r->r.l = &mux->lk; /* assign the tag */ + qlock(&mux->lk); tag = gettag(mux, r); - if(mux->settag(mux, tx, tag) < 0){ - puttag(mux, r); - free(r); - return nil; - } - - /* send the packet */ - if(_muxsend(mux, tx) < 0){ + qunlock(&mux->lk); + if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){ + qlock(&mux->lk); puttag(mux, r); - free(r); + qunlock(&mux->lk); return nil; } @@ -95,7 +91,6 @@ muxrpc(Mux *mux, void *tx) } p = r->p; puttag(mux, r); - free(r); qunlock(&mux->lk); return p; } @@ -121,32 +116,61 @@ dequeue(Mux *mux, Muxrpc *r) static int gettag(Mux *mux, Muxrpc *r) { - int i; - -Again: - while(mux->nwait == mux->mwait) - rsleep(&mux->tagrend); - i=mux->freetag; - if(mux->wait[i] == 0) - goto Found; - for(i=0; i<mux->mwait; i++) - if(mux->wait[i] == 0){ - Found: - mux->nwait++; - mux->wait[i] = r; - r->tag = i; - return i; + int i, mw; + Muxrpc **w; + + for(;;){ + /* wait for a free tag */ + while(mux->nwait == mux->mwait){ + if(mux->mwait < mux->maxtag-mux->mintag){ + mw = mux->mwait; + if(mw == 0) + mw = 1; + else + mw <<= 1; + w = realloc(mux->wait, mw*sizeof(w[0])); + if(w == nil) + return -1; + mux->wait = w; + mux->freetag = mux->mwait; + mux->mwait = mw; + break; + } + rsleep(&mux->tagrend); } - fprint(2, "libfs: nwait botch\n"); - goto Again; + + i=mux->freetag; + if(mux->wait[i] == 0) + goto Found; + for(; i<mux->mwait; i++) + if(mux->wait[i] == 0) + goto Found; + for(i=0; i<mux->freetag; i++) + if(mux->wait[i] == 0) + goto Found; + /* should not fall out of while without free tag */ + fprint(2, "libfs: nwait botch\n"); + abort(); + } + +Found: + mux->nwait++; + mux->wait[i] = r; + r->tag = i+mux->mintag; + return i; } static void puttag(Mux *mux, Muxrpc *r) { - assert(mux->wait[r->tag] == r); - mux->wait[r->tag] = nil; + int i; + + i = r->tag - mux->mintag; + assert(mux->wait[i] == r); + mux->wait[i] = nil; mux->nwait--; - mux->freetag = r->tag; + mux->freetag = i; rwakeup(&mux->tagrend); +fprint(2, "free %p\n", r); + free(r); } |