From 32f69c36e0eec1227934bbd34854bfebd88686f2 Mon Sep 17 00:00:00 2001 From: rsc Date: Thu, 11 Dec 2003 17:48:38 +0000 Subject: Add support for user-level 9P servers/clients and various bug fixes to go with them. --- src/cmd/9pserve.c | 457 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 425 insertions(+), 32 deletions(-) (limited to 'src/cmd/9pserve.c') diff --git a/src/cmd/9pserve.c b/src/cmd/9pserve.c index ad77236e..c33beb14 100644 --- a/src/cmd/9pserve.c +++ b/src/cmd/9pserve.c @@ -2,6 +2,8 @@ #include #include #include +#include +#include enum { @@ -38,6 +40,7 @@ struct Msg int ref; int ctag; int tag; + int isopenfd; Fcall tx; Fcall rx; Fid *fid; @@ -52,6 +55,8 @@ struct Msg struct Conn { int fd; + int fdmode; + Fid *fdfid; int nmsg; int nfid; Channel *inc; @@ -89,7 +94,7 @@ void *erealloc(void*, int); Queue *qalloc(void); int sendq(Queue*, void*); void *recvq(Queue*); -void selectthread(void*); +void pollthread(void*); void connthread(void*); void connoutthread(void*); void listenthread(void*); @@ -100,6 +105,10 @@ int tlisten(char*, char*); int taccept(int, char*); int iolisten(Ioproc*, char*, char*); int ioaccept(Ioproc*, int, char*); +int iorecvfd(Ioproc*, int); +int iosendfd(Ioproc*, int, int); +void mainproc(void*); +int ignorepipe(void*, char*); void usage(void) @@ -110,14 +119,13 @@ usage(void) } uchar vbuf[128]; - +extern int _threaddebuglevel; void threadmain(int argc, char **argv) { char *file; - int n; - Fcall f; + if(verbose) fprint(2, "9pserve running\n"); ARGBEGIN{ default: usage(); @@ -142,6 +150,20 @@ threadmain(int argc, char **argv) if((afd = announce(addr, adir)) < 0) sysfatal("announce %s: %r", addr); + proccreate(mainproc, nil, STACK); + threadexits(0); +} + +void +mainproc(void *v) +{ + int n; + Fcall f; + USED(v); + + yield(); /* let threadmain exit */ + + atnotify(ignorepipe, 1); fmtinstall('D', dirfmt); fmtinstall('M', dirmodefmt); fmtinstall('F', fcallfmt); @@ -150,10 +172,6 @@ threadmain(int argc, char **argv) outq = qalloc(); inq = qalloc(); -// threadcreateidle(selectthread, nil, STACK); - threadcreate(inputthread, nil, STACK); - threadcreate(outputthread, nil, STACK); - f.type = Tversion; f.version = "9P2000"; f.msize = 8192; @@ -165,7 +183,22 @@ threadmain(int argc, char **argv) if(convM2S(vbuf, n, &f) != n) sysfatal("convM2S failure"); if(verbose > 1) fprint(2, "* -> %F\n", &f); + + threadcreate(inputthread, nil, STACK); + threadcreate(outputthread, nil, STACK); threadcreate(listenthread, nil, STACK); + threadcreateidle(pollthread, nil, STACK); + threadexits(0); +} + +int +ignorepipe(void *v, char *s) +{ + USED(v); + if(strcmp(s, "sys: write on closed pipe") == 0) + return 1; + fprint(2, "msg: %s\n", s); + return 0; } void @@ -178,10 +211,6 @@ listenthread(void *arg) USED(arg); for(;;){ c = emalloc(sizeof(Conn)); - c->inc = chancreate(sizeof(void*), 0); - c->internal = chancreate(sizeof(void*), 0); - c->inq = qalloc(); - c->outq = qalloc(); c->fd = iolisten(io, adir, c->dir); if(c->fd < 0){ if(verbose) fprint(2, "listen: %r\n"); @@ -189,13 +218,17 @@ listenthread(void *arg) free(c); return; } + c->inc = chancreate(sizeof(void*), 0); + c->internal = chancreate(sizeof(void*), 0); + c->inq = qalloc(); + c->outq = qalloc(); if(verbose) fprint(2, "incoming call on %s\n", c->dir); threadcreate(connthread, c, STACK); } } void -sendmsg(Msg *m) +send9pmsg(Msg *m) { int n, nn; @@ -226,7 +259,7 @@ err(Msg *m, char *ename) m->rx.type = Rerror; m->rx.ename = ename; m->rx.tag = m->tx.tag; - sendmsg(m); + send9pmsg(m); } void @@ -250,7 +283,7 @@ connthread(void *arg) c->fd = fd; threadcreate(connoutthread, c, STACK); while((m = mread9p(io, c->fd)) != nil){ - if(verbose > 1) fprint(2, "%s -> %F\n", c->dir, &m->tx); + if(verbose > 1) fprint(2, "fd#%d -> %F\n", c->fd, &m->tx); m->c = c; m->ctag = m->tx.tag; c->nmsg++; @@ -267,13 +300,13 @@ connthread(void *arg) m->rx.msize = 8192; m->rx.version = "9P2000"; m->rx.type = Rversion; - sendmsg(m); + send9pmsg(m); continue; case Tflush: if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){ m->rx.tag = m->tx.tag; m->rx.type = Rflush; - sendmsg(m); + send9pmsg(m); continue; } m->oldm->ref++; @@ -318,6 +351,15 @@ connthread(void *arg) } m->afid->ref++; break; + case Topenfd: + if(m->tx.mode != OREAD && (m->tx.mode&~OTRUNC) != OWRITE){ + err(m, "openfd mode must be OREAD or OWRITE"); + continue; + } + m->isopenfd = 1; + m->tx.type = Topen; + m->tpkt[4] = Topen; + /* fall through */ case Tcreate: case Topen: case Tclunk: @@ -363,6 +405,7 @@ connthread(void *arg) m = msgnew(); m->internal = 1; m->c = c; + c->nmsg++; m->tx.type = Tflush; m->tx.tag = m->tag; m->tx.oldtag = om->tag; @@ -371,7 +414,9 @@ connthread(void *arg) m->ref++; /* for outq */ sendomsg(m); recvp(c->internal); - msgput(m); + msgput(m); /* got from recvp */ + msgput(m); /* got from msgnew */ + msgput(om); /* got from hash table */ } } @@ -382,6 +427,7 @@ connthread(void *arg) m = msgnew(); m->internal = 1; m->c = c; + c->nmsg++; m->tx.type = Tclunk; m->tx.tag = m->tag; m->tx.fid = f->fid; @@ -390,7 +436,9 @@ connthread(void *arg) m->ref++; sendomsg(m); recvp(c->internal); - msgput(m); + msgput(m); /* got from recvp */ + msgput(m); /* got from msgnew */ + fidput(f); /* got from hash table */ } } @@ -398,7 +446,155 @@ out: assert(c->nmsg == 0); assert(c->nfid == 0); close(c->fd); + chanfree(c->internal); + c->internal = 0; + chanfree(c->inc); + c->inc = 0; + free(c->inq); + c->inq = 0; + free(c->outq); + c->outq = 0; + free(c); +} + +static void +openfdthread(void *v) +{ + Conn *c; + Fid *fid; + Msg *m; + int n; + vlong tot; + Ioproc *io; + char buf[1024]; + + c = v; + fid = c->fdfid; + io = ioproc(); + + tot = 0; + if(c->fdmode == OREAD){ + for(;;){ + if(verbose) fprint(2, "tread..."); + m = msgnew(); + m->internal = 1; + m->c = c; + m->tx.type = Tread; + m->tx.count = 8192; + m->tx.fid = fid->fid; + m->tx.tag = m->tag; + m->tx.offset = tot; + m->fid = fid; + fid->ref++; + m->ref++; + sendomsg(m); + recvp(c->internal); + if(m->rx.type == Rerror) + break; + if(m->rx.count == 0) + break; + tot += m->rx.count; + if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count) + break; + msgput(m); + msgput(m); + } + }else{ + for(;;){ + if(verbose) fprint(2, "twrite..."); + if((n=ioread(io, c->fd, buf, sizeof buf)) <= 0){ + m = nil; + break; + } + m = msgnew(); + m->internal = 1; + m->c = c; + m->tx.type = Twrite; + m->tx.fid = fid->fid; + m->tx.data = buf; + m->tx.count = n; + m->tx.tag = m->tag; + m->tx.offset = tot; + m->fid = fid; + fid->ref++; + m->ref++; + sendomsg(m); + recvp(c->internal); + if(m->rx.type == Rerror) + break; + tot = n; + msgput(m); + msgput(m); + } + } + if(verbose) fprint(2, "eof on %d fid %d\n", c->fd, fid->fid); + close(c->fd); + closeioproc(io); + if(m){ + msgput(m); + msgput(m); + } + m = msgnew(); + m->internal = 1; + m->c = c; + m->tx.type = Tclunk; + m->tx.fid = fid->fid; + m->fid = fid; + fid->ref++; + m->ref++; + sendomsg(m); + recvp(c->internal); + msgput(m); + msgput(m); + fidput(fid); + c->fdfid = nil; + chanfree(c->internal); + c->internal = 0; free(c); +} + +int +xopenfd(Msg *m) +{ + char errs[ERRMAX]; + int n, p[2]; + Conn *nc; + + if(pipe(p) < 0){ + rerrstr(errs, sizeof errs); + err(m, errs); + } + if(verbose) fprint(2, "xopen pipe %d %d...", p[0], p[1]); + + /* now we're committed. */ + + /* a new connection for this fid */ + nc = emalloc(sizeof(Conn)); + nc->internal = chancreate(sizeof(void*), 0); + + /* a ref for us */ + nc->fdfid = m->fid; + m->fid->ref++; + nc->fdmode = m->tx.mode; + nc->fd = p[0]; + + /* clunk fid from other connection */ + if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0) + fidput(m->fid); + + /* a thread to tend the pipe */ + threadcreate(openfdthread, nc, STACK); + + /* rewrite as Ropenfd */ + m->rx.type = Ropenfd; + n = GBIT32(m->rpkt); + m->rpkt = erealloc(m->rpkt, n+4); + PBIT32(m->rpkt+n, p[1]); + n += 4; + PBIT32(m->rpkt, n); + m->rpkt[4] = Ropenfd; + m->rx.unixfd = p[1]; + return 0; } void @@ -413,6 +609,9 @@ connoutthread(void *arg) io = ioproc(); while((m = recvq(c->outq)) != nil){ err = m->tx.type+1 != m->rx.type; + if(!err && m->isopenfd) + if(xopenfd(m) < 0) + continue; switch(m->tx.type){ case Tflush: om = m->oldm; @@ -446,7 +645,7 @@ connoutthread(void *arg) } if(delhash(m->c->tag, m->ctag, m) == 0) msgput(m); - if(verbose > 1) fprint(2, "%s <- %F\n", c->dir, &m->rx); + if(verbose > 1) fprint(2, "fd#%d <- %F\n", c->fd, &m->rx); rewritehdr(&m->rx, m->rpkt); if(mwrite9p(io, c->fd, m->rpkt) < 0) if(verbose) fprint(2, "write error: %r\n"); @@ -473,6 +672,8 @@ outputthread(void *arg) msgput(m); } closeioproc(io); + fprint(2, "output eof\n"); + threadexitsall(0); } void @@ -483,6 +684,7 @@ inputthread(void *arg) Msg *m; Ioproc *io; + if(verbose) fprint(2, "input thread\n"); io = ioproc(); USED(arg); while((pkt = read9ppkt(io, 0)) != nil){ @@ -514,6 +716,8 @@ inputthread(void *arg) sendq(m->c->outq, m); } closeioproc(io); + fprint(2, "input eof\n"); + threadexitsall(0); } void* @@ -626,15 +830,20 @@ msgput(Msg *m) m->c->nmsg--; m->c = nil; fidput(m->fid); - fidput(m->afid); - fidput(m->newfid); - free(m->tpkt); - free(m->rpkt); m->fid = nil; + fidput(m->afid); m->afid = nil; + fidput(m->newfid); m->newfid = nil; + free(m->tpkt); m->tpkt = nil; + free(m->rpkt); m->rpkt = nil; + if(m->rx.type == Ropenfd) + close(m->rx.unixfd); + m->rx.unixfd = -1; + m->isopenfd = 0; + m->internal = 0; m->next = freemsg; freemsg = m; } @@ -649,6 +858,7 @@ msgget(int n) m = msgtab[n]; if(m->ref == 0) return nil; + if(verbose) fprint(2, "msgget %d = %p\n", n, m); m->ref++; return m; } @@ -768,6 +978,12 @@ read9ppkt(Ioproc *io, int fd) free(pkt); return nil; } +/* would do this if we ever got one of these, but we only generate them + if(pkt[4] == Ropenfd){ + newfd = iorecvfd(io, fd); + PBIT32(pkt+n-4, newfd); + } +*/ return pkt; } @@ -795,7 +1011,7 @@ mread9p(Ioproc *io, int fd) int mwrite9p(Ioproc *io, int fd, uchar *pkt) { - int n; + int n, nfd; n = GBIT32(pkt); if(verbose > 2) fprint(2, "write %d %d %.*H\n", fd, n, n, pkt); @@ -803,6 +1019,13 @@ mwrite9p(Ioproc *io, int fd, uchar *pkt) fprint(2, "write error: %r\n"); return -1; } + if(pkt[4] == Ropenfd){ + nfd = GBIT32(pkt+n-4); + if(iosendfd(io, fd, nfd) < 0){ + fprint(2, "send fd error: %r\n"); + return -1; + } + } return 0; } @@ -871,42 +1094,212 @@ rewritehdr(Fcall *f, uchar *pkt) #ifdef _LIB9_H_ /* unix select-based polling */ +struct Ioproc +{ + Channel *c; + Ioproc *next; + int index; +}; + +static struct Ioproc **pio; +static struct pollfd *pfd; +static int npfd; +static struct Ioproc *iofree; + Ioproc* ioproc(void) { - return nil; + Ioproc *io; + + if(iofree == nil){ + pfd = erealloc(pfd, (npfd+1)*sizeof(pfd[0])); + pfd[npfd].events = 0; + pfd[npfd].fd = -1; + iofree = emalloc(sizeof(Ioproc)); + iofree->index = npfd; + iofree->c = chancreate(sizeof(ulong), 1); + pio = erealloc(pio, (npfd+1)*sizeof(pio[0])); + pio[npfd] = iofree; + npfd++; + } + io = iofree; + iofree = io->next; + return io; +} + +void +closeioproc(Ioproc *io) +{ + io->next = iofree; + iofree = io; +} + +void +pollthread(void *v) +{ + int i, n; + + for(;;){ + yield(); + for(i=0; ic, 1); + } + } +} + +static void +noblock(int fd) +{ + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0)|O_NONBLOCK); +} + +static void +xwait(Ioproc *io, int fd, int e) +{ + if(verbose) fprint(2, "wait for %d%c\n", fd, e==POLLIN ? 'r' : 'w'); + pfd[io->index].fd = fd; + pfd[io->index].events = e; + recvul(io->c); + if(verbose) fprint(2, "got %d\n", fd); +} + +static void +rwait(Ioproc *io, int fd) +{ + xwait(io, fd, POLLIN); +} + +static void +wwait(Ioproc *io, int fd) +{ + xwait(io, fd, POLLOUT); } long ioread(Ioproc *io, int fd, void *v, long n) { + long r; USED(io); - xxx; + noblock(fd); + while((r=read(fd, v, n)) < 0 && errno == EWOULDBLOCK) + rwait(io, fd); + return r; } long -iowrite(Ioproc *io, int fd, void *v, long n) +ioreadn(Ioproc *io, int fd, void *v, long n) +{ + long tot, m; + uchar *u; + + u = v; + for(tot=0; tot