From ceb04770830a7610a5a9a21aa96a4ba4cece2a5d Mon Sep 17 00:00:00 2001 From: rsc Date: Tue, 9 Dec 2003 06:06:07 +0000 Subject: check everything in so i can move to linux and valgrind. --- src/cmd/9pserve.c | 469 +++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 409 insertions(+), 60 deletions(-) (limited to 'src/cmd/9pserve.c') diff --git a/src/cmd/9pserve.c b/src/cmd/9pserve.c index 40db17e5..ad77236e 100644 --- a/src/cmd/9pserve.c +++ b/src/cmd/9pserve.c @@ -70,13 +70,14 @@ char adir[40]; int isunix; Queue *outq; Queue *inq; +int verbose; void *gethash(Hash**, uint); int puthash(Hash**, uint, void*); int delhash(Hash**, uint, void*); -Msg *mread9p(int); -int mwrite9p(int, Msg*); -uchar *read9ppkt(int); +Msg *mread9p(Ioproc*, int); +int mwrite9p(Ioproc*, int, uchar*); +uchar *read9ppkt(Ioproc*, int); int write9ppkt(int, uchar*); Msg *msgnew(void); void msgput(Msg*); @@ -85,29 +86,50 @@ Fid *fidnew(int); void fidput(Fid*); void *emalloc(int); void *erealloc(void*, int); +Queue *qalloc(void); int sendq(Queue*, void*); void *recvq(Queue*); void selectthread(void*); void connthread(void*); +void connoutthread(void*); void listenthread(void*); +void outputthread(void*); +void inputthread(void*); void rewritehdr(Fcall*, uchar*); int tlisten(char*, char*); int taccept(int, char*); +int iolisten(Ioproc*, char*, char*); +int ioaccept(Ioproc*, int, char*); void usage(void) { - fprint(2, "usage: 9pserve [-u] address\n"); + fprint(2, "usage: 9pserve [-s service] [-u] address\n"); fprint(2, "\treads/writes 9P messages on stdin/stdout\n"); exits("usage"); } +uchar vbuf[128]; + void threadmain(int argc, char **argv) { + char *file; + int n; + Fcall f; + ARGBEGIN{ default: usage(); + case 'v': + verbose++; + break; + case 's': + close(0); + if(open(file=EARGF(usage()), ORDWR) != 0) + sysfatal("open %s: %r", file); + dup(0, 1); + break; case 'u': isunix = 1; break; @@ -115,45 +137,68 @@ threadmain(int argc, char **argv) if(argc != 1) usage(); + addr = argv[0]; if((afd = announce(addr, adir)) < 0) sysfatal("announce %s: %r", addr); - threadcreateidle(selectthread, nil, STACK); + fmtinstall('D', dirfmt); + fmtinstall('M', dirmodefmt); + fmtinstall('F', fcallfmt); + fmtinstall('H', encodefmt); + + outq = qalloc(); + inq = qalloc(); + +// threadcreateidle(selectthread, nil, STACK); + threadcreate(inputthread, nil, STACK); + threadcreate(outputthread, nil, STACK); + + f.type = Tversion; + f.version = "9P2000"; + f.msize = 8192; + f.tag = NOTAG; + n = convS2M(&f, vbuf, sizeof vbuf); + if(verbose > 1) fprint(2, "* <- %F\n", &f); + write(1, vbuf, n); + n = read9pmsg(0, vbuf, sizeof vbuf); + if(convM2S(vbuf, n, &f) != n) + sysfatal("convM2S failure"); + if(verbose > 1) fprint(2, "* -> %F\n", &f); + threadcreate(listenthread, nil, STACK); } void listenthread(void *arg) { Conn *c; + Ioproc *io; + io = ioproc(); USED(arg); for(;;){ - c = malloc(sizeof(Conn)); - if(c == nil){ - fprint(2, "out of memory\n"); - sleep(60*1000); - continue; - } - c->fd = tlisten(adir, c->dir); + c = emalloc(sizeof(Conn)); + c->inc = chancreate(sizeof(void*), 0); + c->internal = chancreate(sizeof(void*), 0); + c->inq = qalloc(); + c->outq = qalloc(); + c->fd = iolisten(io, adir, c->dir); if(c->fd < 0){ - fprint(2, "listen: %r\n"); + if(verbose) fprint(2, "listen: %r\n"); close(afd); free(c); return; } + if(verbose) fprint(2, "incoming call on %s\n", c->dir); threadcreate(connthread, c, STACK); } } void -err(Msg *m, char *ename) +sendmsg(Msg *m) { int n, nn; - m->rx.type = Rerror; - m->rx.ename = ename; - m->rx.tag = m->ctag; n = sizeS2M(&m->rx); m->rpkt = emalloc(n); nn = convS2M(&m->rx, m->rpkt, n); @@ -162,6 +207,28 @@ err(Msg *m, char *ename) sendq(m->c->outq, m); } +void +sendomsg(Msg *m) +{ + int n, nn; + + n = sizeS2M(&m->tx); + m->tpkt = emalloc(n); + nn = convS2M(&m->tx, m->tpkt, n); + if(nn != n) + sysfatal("sizeS2M + convS2M disagree"); + sendq(outq, m); +} + +void +err(Msg *m, char *ename) +{ + m->rx.type = Rerror; + m->rx.ename = ename; + m->rx.tag = m->tx.tag; + sendmsg(m); +} + void connthread(void *arg) { @@ -170,31 +237,54 @@ connthread(void *arg) Hash *h; Msg *m, *om; Fid *f; + Ioproc *io; c = arg; - fd = taccept(c->fd, c->dir); + io = ioproc(); + fd = ioaccept(io, c->fd, c->dir); if(fd < 0){ - fprint(2, "accept %s: %r\n", c->dir); + if(verbose) fprint(2, "accept %s: %r\n", c->dir); goto out; } close(c->fd); c->fd = fd; - while((m = mread9p(c->fd)) != nil){ + threadcreate(connoutthread, c, STACK); + while((m = mread9p(io, c->fd)) != nil){ + if(verbose > 1) fprint(2, "%s -> %F\n", c->dir, &m->tx); m->c = c; + m->ctag = m->tx.tag; c->nmsg++; if(puthash(c->tag, m->tx.tag, m) < 0){ err(m, "duplicate tag"); continue; } + m->ref++; switch(m->tx.type){ + case Tversion: + m->rx.tag = m->tx.tag; + m->rx.msize = m->tx.msize; + if(m->rx.msize > 8192) + m->rx.msize = 8192; + m->rx.version = "9P2000"; + m->rx.type = Rversion; + sendmsg(m); + continue; case Tflush: if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){ - m->rx.tag = Rflush; - sendq(c->outq, m); + m->rx.tag = m->tx.tag; + m->rx.type = Rflush; + sendmsg(m); continue; } + m->oldm->ref++; break; case Tattach: + m->afid = nil; + if(m->tx.afid != NOFID + && (m->afid = gethash(c->fid, m->tx.afid)) == nil){ + err(m, "unknown fid"); + continue; + } m->fid = fidnew(m->tx.fid); if(puthash(c->fid, m->tx.fid, m->fid) < 0){ err(m, "duplicate fid"); @@ -207,6 +297,7 @@ connthread(void *arg) err(m, "unknown fid"); continue; } + m->fid->ref++; if(m->tx.newfid == m->tx.fid){ m->fid->ref++; m->newfid = m->fid; @@ -220,21 +311,19 @@ connthread(void *arg) } break; case Tauth: - if((m->afid = gethash(c->fid, m->tx.afid)) == nil){ - err(m, "unknown fid"); - continue; - } - m->fid = fidnew(m->tx.fid); - if(puthash(c->fid, m->tx.fid, m->fid) < 0){ + m->afid = fidnew(m->tx.afid); + if(puthash(c->fid, m->tx.afid, m->afid) < 0){ err(m, "duplicate fid"); continue; } - m->fid->ref++; + m->afid->ref++; break; + case Tcreate: case Topen: case Tclunk: case Tread: case Twrite: + case Tremove: case Tstat: case Twstat: if((m->fid = gethash(c->fid, m->tx.fid)) == nil){ @@ -257,7 +346,7 @@ connthread(void *arg) m->tx.afid = m->afid->fid; if(m->oldm) m->tx.oldtag = m->oldm->tag; - rewritehdr(&m->tx, m->tpkt); + /* reference passes to outq */ sendq(outq, m); while(c->nmsg >= MAXMSG){ c->inputstalled = 1; @@ -265,6 +354,8 @@ connthread(void *arg) } } + if(verbose) fprint(2, "%s eof\n", c->dir); + /* flush all outstanding messages */ for(i=0; itag[i]; h; h=h->next){ @@ -277,8 +368,10 @@ connthread(void *arg) m->tx.oldtag = om->tag; m->oldm = om; om->ref++; - sendq(outq, m); + m->ref++; /* for outq */ + sendomsg(m); recvp(c->internal); + msgput(m); } } @@ -294,8 +387,10 @@ connthread(void *arg) m->tx.fid = f->fid; m->fid = f; f->ref++; - sendq(outq, m); + m->ref++; + sendomsg(m); recvp(c->internal); + msgput(m); } } @@ -312,57 +407,72 @@ connoutthread(void *arg) int err; Conn *c; Msg *m, *om; + Ioproc *io; c = arg; + io = ioproc(); while((m = recvq(c->outq)) != nil){ err = m->tx.type+1 != m->rx.type; switch(m->tx.type){ case Tflush: om = m->oldm; - if(delhash(om->c->tag, om->ctag, om) == 0) - msgput(om); + if(om) + if(delhash(om->c->tag, om->ctag, om) == 0) + msgput(om); break; case Tclunk: - if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0) - fidput(m->fid); + case Tremove: + if(m->fid) + if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0) + fidput(m->fid); break; case Tauth: - if(err) - if(delhash(m->c->fid, m->afid->cfid, m->fid) == 0) + if(err && m->afid){ + fprint(2, "auth error\n"); + if(delhash(m->c->fid, m->afid->cfid, m->afid) == 0) fidput(m->fid); + } + break; case Tattach: - if(err) + if(err && m->fid) if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0) fidput(m->fid); break; case Twalk: - if(err && m->tx.fid != m->tx.newfid) + if(err && m->tx.fid != m->tx.newfid && m->newfid) if(delhash(m->c->fid, m->newfid->cfid, m->newfid) == 0) fidput(m->newfid); break; } - if(mwrite9p(c->fd, m) < 0) - fprint(2, "write error: %r\n"); if(delhash(m->c->tag, m->ctag, m) == 0) msgput(m); + if(verbose > 1) fprint(2, "%s <- %F\n", c->dir, &m->rx); + rewritehdr(&m->rx, m->rpkt); + if(mwrite9p(io, c->fd, m->rpkt) < 0) + if(verbose) fprint(2, "write error: %r\n"); msgput(m); if(c->inputstalled && c->nmsg < MAXMSG) nbsendp(c->inc, 0); } + closeioproc(io); } void outputthread(void *arg) { Msg *m; + Ioproc *io; USED(arg); - + io = ioproc(); while((m = recvq(outq)) != nil){ - if(mwrite9p(1, m) < 0) + if(verbose > 1) fprint(2, "* <- %F\n", &m->tx); + rewritehdr(&m->tx, m->tpkt); + if(mwrite9p(io, 1, m->tpkt) < 0) sysfatal("output error: %r"); msgput(m); } + closeioproc(io); } void @@ -371,19 +481,22 @@ inputthread(void *arg) uchar *pkt; int n, nn, tag; Msg *m; + Ioproc *io; - while((pkt = read9ppkt(0)) != nil){ + io = ioproc(); + USED(arg); + while((pkt = read9ppkt(io, 0)) != nil){ n = GBIT32(pkt); if(n < 7){ - fprint(2, "short 9P packet\n"); + fprint(2, "short 9P packet from server\n"); free(pkt); continue; } + if(verbose > 2) fprint(2, "read %.*H\n", n, pkt); tag = GBIT16(pkt+5); if((m = msgget(tag)) == nil){ fprint(2, "unexpected 9P response tag %d\n", tag); free(pkt); - msgput(m); continue; } if((nn = convM2S(pkt, n, &m->rx)) != n){ @@ -392,11 +505,15 @@ inputthread(void *arg) msgput(m); continue; } + if(verbose > 1) fprint(2, "* -> %F\n", &m->rx); m->rpkt = pkt; m->rx.tag = m->ctag; - rewritehdr(&m->rx, m->rpkt); - sendq(m->c->outq, m); + if(m->internal) + sendp(m->c->internal, 0); + else + sendq(m->c->outq, m); } + closeioproc(io); } void* @@ -417,8 +534,10 @@ delhash(Hash **ht, uint n, void *v) for(l=&ht[n%NHASH]; h=*l; l=&h->next) if(h->n == n){ - if(h->v != v) - fprint(2, "hash error\n"); + if(h->v != v){ + if(verbose) fprint(2, "delhash %d got %p want %p\n", n, h->v, v); + return -1; + } *l = h->next; free(h); return 0; @@ -451,13 +570,14 @@ fidnew(int cfid) Fid *f; if(freefid == nil){ - fidtab = erealloc(fidtab, nfidtab*sizeof(fidtab[0])); + fidtab = erealloc(fidtab, (nfidtab+1)*sizeof(fidtab[0])); fidtab[nfidtab] = emalloc(sizeof(Fid)); - freefid = fidtab[nfidtab++]; + freefid = fidtab[nfidtab]; + freefid->fid = nfidtab++; } f = freefid; freefid = f->next; - f->cfid = f->cfid; + f->cfid = cfid; f->ref = 1; return f; } @@ -465,6 +585,8 @@ fidnew(int cfid) void fidput(Fid *f) { + if(f == nil) + return; assert(f->ref > 0); if(--f->ref > 0) return; @@ -483,9 +605,10 @@ msgnew(void) Msg *m; if(freemsg == nil){ - msgtab = erealloc(msgtab, nmsgtab*sizeof(msgtab[0])); + msgtab = erealloc(msgtab, (nmsgtab+1)*sizeof(msgtab[0])); msgtab[nmsgtab] = emalloc(sizeof(Msg)); - freemsg = msgtab[nmsgtab++]; + freemsg = msgtab[nmsgtab]; + freemsg->tag = nmsgtab++; } m = freemsg; freemsg = m->next; @@ -496,21 +619,51 @@ msgnew(void) void msgput(Msg *m) { + if(verbose > 2) fprint(2, "msgput tag %d/%d ref %d\n", m->tag, m->ctag, m->ref); assert(m->ref > 0); if(--m->ref > 0) return; + m->c->nmsg--; + m->c = nil; + fidput(m->fid); + fidput(m->afid); + fidput(m->newfid); + free(m->tpkt); + free(m->rpkt); + m->fid = nil; + m->afid = nil; + m->newfid = nil; + m->tpkt = nil; + m->rpkt = nil; m->next = freemsg; freemsg = m; } +Msg* +msgget(int n) +{ + Msg *m; + + if(n < 0 || n >= nmsgtab) + return nil; + m = msgtab[n]; + if(m->ref == 0) + return nil; + m->ref++; + return m; +} + + void* emalloc(int n) { void *v; v = mallocz(n, 1); - if(v == nil) - sysfatal("out of memory"); + if(v == nil){ + abort(); + sysfatal("out of memory allocating %d", n); + } return v; } @@ -518,8 +671,10 @@ void* erealloc(void *v, int n) { v = realloc(v, n); - if(v == nil) - sysfatal("out of memory"); + if(v == nil){ + abort(); + sysfatal("out of memory reallocating %d", n); + } return v; } @@ -595,3 +750,197 @@ recvq(Queue *q) free(e); return p; } + +uchar* +read9ppkt(Ioproc *io, int fd) +{ + uchar buf[4], *pkt; + int n, nn; + + n = ioreadn(io, fd, buf, 4); + if(n != 4) + return nil; + n = GBIT32(buf); + pkt = emalloc(n); + PBIT32(pkt, n); + nn = ioreadn(io, fd, pkt+4, n-4); + if(nn != n-4){ + free(pkt); + return nil; + } + return pkt; +} + +Msg* +mread9p(Ioproc *io, int fd) +{ + int n, nn; + uchar *pkt; + Msg *m; + + if((pkt = read9ppkt(io, fd)) == nil) + return nil; + + m = msgnew(); + m->tpkt = pkt; + n = GBIT32(pkt); + nn = convM2S(pkt, n, &m->tx); + if(nn != n){ + fprint(2, "read bad packet from %d\n", fd); + return nil; + } + return m; +} + +int +mwrite9p(Ioproc *io, int fd, uchar *pkt) +{ + int n; + + n = GBIT32(pkt); + if(verbose > 2) fprint(2, "write %d %d %.*H\n", fd, n, n, pkt); + if(iowrite(io, fd, pkt, n) != n){ + fprint(2, "write error: %r\n"); + return -1; + } + return 0; +} + +void +restring(uchar *pkt, int pn, char *s) +{ + int n; + + if(s < (char*)pkt || s >= (char*)pkt+pn) + return; + + n = strlen(s); + memmove(s+1, s, n); + PBIT16((uchar*)s-1, n); +} + +void +rewritehdr(Fcall *f, uchar *pkt) +{ + int i, n; + + n = GBIT32(pkt); + PBIT16(pkt+5, f->tag); + switch(f->type){ + case Tversion: + case Rversion: + restring(pkt, n, f->version); + break; + case Tauth: + PBIT32(pkt+7, f->afid); + restring(pkt, n, f->uname); + restring(pkt, n, f->aname); + break; + case Tflush: + PBIT16(pkt+7, f->oldtag); + break; + case Tattach: + restring(pkt, n, f->uname); + restring(pkt, n, f->aname); + PBIT32(pkt+7, f->fid); + PBIT32(pkt+11, f->afid); + break; + case Twalk: + PBIT32(pkt+7, f->fid); + PBIT32(pkt+11, f->newfid); + for(i=0; inwname; i++) + restring(pkt, n, f->wname[i]); + break; + case Tcreate: + restring(pkt, n, f->name); + /* fall through */ + case Topen: + case Tread: + case Twrite: + case Tclunk: + case Tremove: + case Tstat: + case Twstat: + PBIT32(pkt+7, f->fid); + break; + case Rerror: + restring(pkt, n, f->ename); + break; + } +} + +#ifdef _LIB9_H_ +/* unix select-based polling */ +Ioproc* +ioproc(void) +{ + return nil; +} + +long +ioread(Ioproc *io, int fd, void *v, long n) +{ + USED(io); + + xxx; +} + +long +iowrite(Ioproc *io, int fd, void *v, long n) +{ + USED(io); + + xxx; +} + +int +iolisten(Ioproc *io, char *a, char *b) +{ + USED(io); + + xxx; +} + +int +ioaccept(Ioproc *io, int fd, char *dir) +{ + USED(io); + + xxx; +} + +#else +/* real plan 9 io procs */ +static long +_iolisten(va_list *arg) +{ + char *a, *b; + + a = va_arg(*arg, char*); + b = va_arg(*arg, char*); + return listen(a, b); +} + +int +iolisten(Ioproc *io, char *a, char *b) +{ + return iocall(io, _iolisten, a, b); +} + +static long +_ioaccept(va_list *arg) +{ + int fd; + char *dir; + + fd = va_arg(*arg, int); + dir = va_arg(*arg, char*); + return accept(fd, dir); +} + +int +ioaccept(Ioproc *io, int fd, char *dir) +{ + return iocall(io, _ioaccept, fd, dir); +} +#endif -- cgit v1.2.3