aboutsummaryrefslogtreecommitdiff
path: root/src/cmd/9pserve.c
diff options
context:
space:
mode:
authorrsc <devnull@localhost>2003-12-11 17:48:38 +0000
committerrsc <devnull@localhost>2003-12-11 17:48:38 +0000
commit32f69c36e0eec1227934bbd34854bfebd88686f2 (patch)
tree1587e9de84816b77168afa81c1594cc686809910 /src/cmd/9pserve.c
parentac244f8d287a6119155ea672c8fd13c487c5e4c7 (diff)
downloadplan9port-32f69c36e0eec1227934bbd34854bfebd88686f2.tar.gz
plan9port-32f69c36e0eec1227934bbd34854bfebd88686f2.tar.bz2
plan9port-32f69c36e0eec1227934bbd34854bfebd88686f2.zip
Add support for user-level 9P servers/clients and various bug fixes to go with them.
Diffstat (limited to 'src/cmd/9pserve.c')
-rw-r--r--src/cmd/9pserve.c457
1 files changed, 425 insertions, 32 deletions
diff --git a/src/cmd/9pserve.c b/src/cmd/9pserve.c
index ad77236e..c33beb14 100644
--- a/src/cmd/9pserve.c
+++ b/src/cmd/9pserve.c
@@ -2,6 +2,8 @@
#include <libc.h>
#include <fcall.h>
#include <thread.h>
+#include <poll.h>
+#include <errno.h>
enum
{
@@ -38,6 +40,7 @@ struct Msg
int ref;
int ctag;
int tag;
+ int isopenfd;
Fcall tx;
Fcall rx;
Fid *fid;
@@ -52,6 +55,8 @@ struct Msg
struct Conn
{
int fd;
+ int fdmode;
+ Fid *fdfid;
int nmsg;
int nfid;
Channel *inc;
@@ -89,7 +94,7 @@ void *erealloc(void*, int);
Queue *qalloc(void);
int sendq(Queue*, void*);
void *recvq(Queue*);
-void selectthread(void*);
+void pollthread(void*);
void connthread(void*);
void connoutthread(void*);
void listenthread(void*);
@@ -100,6 +105,10 @@ int tlisten(char*, char*);
int taccept(int, char*);
int iolisten(Ioproc*, char*, char*);
int ioaccept(Ioproc*, int, char*);
+int iorecvfd(Ioproc*, int);
+int iosendfd(Ioproc*, int, int);
+void mainproc(void*);
+int ignorepipe(void*, char*);
void
usage(void)
@@ -110,14 +119,13 @@ usage(void)
}
uchar vbuf[128];
-
+extern int _threaddebuglevel;
void
threadmain(int argc, char **argv)
{
char *file;
- int n;
- Fcall f;
+ if(verbose) fprint(2, "9pserve running\n");
ARGBEGIN{
default:
usage();
@@ -142,6 +150,20 @@ threadmain(int argc, char **argv)
if((afd = announce(addr, adir)) < 0)
sysfatal("announce %s: %r", addr);
+ proccreate(mainproc, nil, STACK);
+ threadexits(0);
+}
+
+void
+mainproc(void *v)
+{
+ int n;
+ Fcall f;
+ USED(v);
+
+ yield(); /* let threadmain exit */
+
+ atnotify(ignorepipe, 1);
fmtinstall('D', dirfmt);
fmtinstall('M', dirmodefmt);
fmtinstall('F', fcallfmt);
@@ -150,10 +172,6 @@ threadmain(int argc, char **argv)
outq = qalloc();
inq = qalloc();
-// threadcreateidle(selectthread, nil, STACK);
- threadcreate(inputthread, nil, STACK);
- threadcreate(outputthread, nil, STACK);
-
f.type = Tversion;
f.version = "9P2000";
f.msize = 8192;
@@ -165,7 +183,22 @@ threadmain(int argc, char **argv)
if(convM2S(vbuf, n, &f) != n)
sysfatal("convM2S failure");
if(verbose > 1) fprint(2, "* -> %F\n", &f);
+
+ threadcreate(inputthread, nil, STACK);
+ threadcreate(outputthread, nil, STACK);
threadcreate(listenthread, nil, STACK);
+ threadcreateidle(pollthread, nil, STACK);
+ threadexits(0);
+}
+
+int
+ignorepipe(void *v, char *s)
+{
+ USED(v);
+ if(strcmp(s, "sys: write on closed pipe") == 0)
+ return 1;
+ fprint(2, "msg: %s\n", s);
+ return 0;
}
void
@@ -178,10 +211,6 @@ listenthread(void *arg)
USED(arg);
for(;;){
c = emalloc(sizeof(Conn));
- c->inc = chancreate(sizeof(void*), 0);
- c->internal = chancreate(sizeof(void*), 0);
- c->inq = qalloc();
- c->outq = qalloc();
c->fd = iolisten(io, adir, c->dir);
if(c->fd < 0){
if(verbose) fprint(2, "listen: %r\n");
@@ -189,13 +218,17 @@ listenthread(void *arg)
free(c);
return;
}
+ c->inc = chancreate(sizeof(void*), 0);
+ c->internal = chancreate(sizeof(void*), 0);
+ c->inq = qalloc();
+ c->outq = qalloc();
if(verbose) fprint(2, "incoming call on %s\n", c->dir);
threadcreate(connthread, c, STACK);
}
}
void
-sendmsg(Msg *m)
+send9pmsg(Msg *m)
{
int n, nn;
@@ -226,7 +259,7 @@ err(Msg *m, char *ename)
m->rx.type = Rerror;
m->rx.ename = ename;
m->rx.tag = m->tx.tag;
- sendmsg(m);
+ send9pmsg(m);
}
void
@@ -250,7 +283,7 @@ connthread(void *arg)
c->fd = fd;
threadcreate(connoutthread, c, STACK);
while((m = mread9p(io, c->fd)) != nil){
- if(verbose > 1) fprint(2, "%s -> %F\n", c->dir, &m->tx);
+ if(verbose > 1) fprint(2, "fd#%d -> %F\n", c->fd, &m->tx);
m->c = c;
m->ctag = m->tx.tag;
c->nmsg++;
@@ -267,13 +300,13 @@ connthread(void *arg)
m->rx.msize = 8192;
m->rx.version = "9P2000";
m->rx.type = Rversion;
- sendmsg(m);
+ send9pmsg(m);
continue;
case Tflush:
if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
m->rx.tag = m->tx.tag;
m->rx.type = Rflush;
- sendmsg(m);
+ send9pmsg(m);
continue;
}
m->oldm->ref++;
@@ -318,6 +351,15 @@ connthread(void *arg)
}
m->afid->ref++;
break;
+ case Topenfd:
+ if(m->tx.mode != OREAD && (m->tx.mode&~OTRUNC) != OWRITE){
+ err(m, "openfd mode must be OREAD or OWRITE");
+ continue;
+ }
+ m->isopenfd = 1;
+ m->tx.type = Topen;
+ m->tpkt[4] = Topen;
+ /* fall through */
case Tcreate:
case Topen:
case Tclunk:
@@ -363,6 +405,7 @@ connthread(void *arg)
m = msgnew();
m->internal = 1;
m->c = c;
+ c->nmsg++;
m->tx.type = Tflush;
m->tx.tag = m->tag;
m->tx.oldtag = om->tag;
@@ -371,7 +414,9 @@ connthread(void *arg)
m->ref++; /* for outq */
sendomsg(m);
recvp(c->internal);
- msgput(m);
+ msgput(m); /* got from recvp */
+ msgput(m); /* got from msgnew */
+ msgput(om); /* got from hash table */
}
}
@@ -382,6 +427,7 @@ connthread(void *arg)
m = msgnew();
m->internal = 1;
m->c = c;
+ c->nmsg++;
m->tx.type = Tclunk;
m->tx.tag = m->tag;
m->tx.fid = f->fid;
@@ -390,7 +436,9 @@ connthread(void *arg)
m->ref++;
sendomsg(m);
recvp(c->internal);
- msgput(m);
+ msgput(m); /* got from recvp */
+ msgput(m); /* got from msgnew */
+ fidput(f); /* got from hash table */
}
}
@@ -398,7 +446,155 @@ out:
assert(c->nmsg == 0);
assert(c->nfid == 0);
close(c->fd);
+ chanfree(c->internal);
+ c->internal = 0;
+ chanfree(c->inc);
+ c->inc = 0;
+ free(c->inq);
+ c->inq = 0;
+ free(c->outq);
+ c->outq = 0;
+ free(c);
+}
+
+static void
+openfdthread(void *v)
+{
+ Conn *c;
+ Fid *fid;
+ Msg *m;
+ int n;
+ vlong tot;
+ Ioproc *io;
+ char buf[1024];
+
+ c = v;
+ fid = c->fdfid;
+ io = ioproc();
+
+ tot = 0;
+ if(c->fdmode == OREAD){
+ for(;;){
+ if(verbose) fprint(2, "tread...");
+ m = msgnew();
+ m->internal = 1;
+ m->c = c;
+ m->tx.type = Tread;
+ m->tx.count = 8192;
+ m->tx.fid = fid->fid;
+ m->tx.tag = m->tag;
+ m->tx.offset = tot;
+ m->fid = fid;
+ fid->ref++;
+ m->ref++;
+ sendomsg(m);
+ recvp(c->internal);
+ if(m->rx.type == Rerror)
+ break;
+ if(m->rx.count == 0)
+ break;
+ tot += m->rx.count;
+ if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count)
+ break;
+ msgput(m);
+ msgput(m);
+ }
+ }else{
+ for(;;){
+ if(verbose) fprint(2, "twrite...");
+ if((n=ioread(io, c->fd, buf, sizeof buf)) <= 0){
+ m = nil;
+ break;
+ }
+ m = msgnew();
+ m->internal = 1;
+ m->c = c;
+ m->tx.type = Twrite;
+ m->tx.fid = fid->fid;
+ m->tx.data = buf;
+ m->tx.count = n;
+ m->tx.tag = m->tag;
+ m->tx.offset = tot;
+ m->fid = fid;
+ fid->ref++;
+ m->ref++;
+ sendomsg(m);
+ recvp(c->internal);
+ if(m->rx.type == Rerror)
+ break;
+ tot = n;
+ msgput(m);
+ msgput(m);
+ }
+ }
+ if(verbose) fprint(2, "eof on %d fid %d\n", c->fd, fid->fid);
+ close(c->fd);
+ closeioproc(io);
+ if(m){
+ msgput(m);
+ msgput(m);
+ }
+ m = msgnew();
+ m->internal = 1;
+ m->c = c;
+ m->tx.type = Tclunk;
+ m->tx.fid = fid->fid;
+ m->fid = fid;
+ fid->ref++;
+ m->ref++;
+ sendomsg(m);
+ recvp(c->internal);
+ msgput(m);
+ msgput(m);
+ fidput(fid);
+ c->fdfid = nil;
+ chanfree(c->internal);
+ c->internal = 0;
free(c);
+}
+
+int
+xopenfd(Msg *m)
+{
+ char errs[ERRMAX];
+ int n, p[2];
+ Conn *nc;
+
+ if(pipe(p) < 0){
+ rerrstr(errs, sizeof errs);
+ err(m, errs);
+ }
+ if(verbose) fprint(2, "xopen pipe %d %d...", p[0], p[1]);
+
+ /* now we're committed. */
+
+ /* a new connection for this fid */
+ nc = emalloc(sizeof(Conn));
+ nc->internal = chancreate(sizeof(void*), 0);
+
+ /* a ref for us */
+ nc->fdfid = m->fid;
+ m->fid->ref++;
+ nc->fdmode = m->tx.mode;
+ nc->fd = p[0];
+
+ /* clunk fid from other connection */
+ if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
+ fidput(m->fid);
+
+ /* a thread to tend the pipe */
+ threadcreate(openfdthread, nc, STACK);
+
+ /* rewrite as Ropenfd */
+ m->rx.type = Ropenfd;
+ n = GBIT32(m->rpkt);
+ m->rpkt = erealloc(m->rpkt, n+4);
+ PBIT32(m->rpkt+n, p[1]);
+ n += 4;
+ PBIT32(m->rpkt, n);
+ m->rpkt[4] = Ropenfd;
+ m->rx.unixfd = p[1];
+ return 0;
}
void
@@ -413,6 +609,9 @@ connoutthread(void *arg)
io = ioproc();
while((m = recvq(c->outq)) != nil){
err = m->tx.type+1 != m->rx.type;
+ if(!err && m->isopenfd)
+ if(xopenfd(m) < 0)
+ continue;
switch(m->tx.type){
case Tflush:
om = m->oldm;
@@ -446,7 +645,7 @@ connoutthread(void *arg)
}
if(delhash(m->c->tag, m->ctag, m) == 0)
msgput(m);
- if(verbose > 1) fprint(2, "%s <- %F\n", c->dir, &m->rx);
+ if(verbose > 1) fprint(2, "fd#%d <- %F\n", c->fd, &m->rx);
rewritehdr(&m->rx, m->rpkt);
if(mwrite9p(io, c->fd, m->rpkt) < 0)
if(verbose) fprint(2, "write error: %r\n");
@@ -473,6 +672,8 @@ outputthread(void *arg)
msgput(m);
}
closeioproc(io);
+ fprint(2, "output eof\n");
+ threadexitsall(0);
}
void
@@ -483,6 +684,7 @@ inputthread(void *arg)
Msg *m;
Ioproc *io;
+ if(verbose) fprint(2, "input thread\n");
io = ioproc();
USED(arg);
while((pkt = read9ppkt(io, 0)) != nil){
@@ -514,6 +716,8 @@ inputthread(void *arg)
sendq(m->c->outq, m);
}
closeioproc(io);
+ fprint(2, "input eof\n");
+ threadexitsall(0);
}
void*
@@ -626,15 +830,20 @@ msgput(Msg *m)
m->c->nmsg--;
m->c = nil;
fidput(m->fid);
- fidput(m->afid);
- fidput(m->newfid);
- free(m->tpkt);
- free(m->rpkt);
m->fid = nil;
+ fidput(m->afid);
m->afid = nil;
+ fidput(m->newfid);
m->newfid = nil;
+ free(m->tpkt);
m->tpkt = nil;
+ free(m->rpkt);
m->rpkt = nil;
+ if(m->rx.type == Ropenfd)
+ close(m->rx.unixfd);
+ m->rx.unixfd = -1;
+ m->isopenfd = 0;
+ m->internal = 0;
m->next = freemsg;
freemsg = m;
}
@@ -649,6 +858,7 @@ msgget(int n)
m = msgtab[n];
if(m->ref == 0)
return nil;
+ if(verbose) fprint(2, "msgget %d = %p\n", n, m);
m->ref++;
return m;
}
@@ -768,6 +978,12 @@ read9ppkt(Ioproc *io, int fd)
free(pkt);
return nil;
}
+/* would do this if we ever got one of these, but we only generate them
+ if(pkt[4] == Ropenfd){
+ newfd = iorecvfd(io, fd);
+ PBIT32(pkt+n-4, newfd);
+ }
+*/
return pkt;
}
@@ -795,7 +1011,7 @@ mread9p(Ioproc *io, int fd)
int
mwrite9p(Ioproc *io, int fd, uchar *pkt)
{
- int n;
+ int n, nfd;
n = GBIT32(pkt);
if(verbose > 2) fprint(2, "write %d %d %.*H\n", fd, n, n, pkt);
@@ -803,6 +1019,13 @@ mwrite9p(Ioproc *io, int fd, uchar *pkt)
fprint(2, "write error: %r\n");
return -1;
}
+ if(pkt[4] == Ropenfd){
+ nfd = GBIT32(pkt+n-4);
+ if(iosendfd(io, fd, nfd) < 0){
+ fprint(2, "send fd error: %r\n");
+ return -1;
+ }
+ }
return 0;
}
@@ -871,42 +1094,212 @@ rewritehdr(Fcall *f, uchar *pkt)
#ifdef _LIB9_H_
/* unix select-based polling */
+struct Ioproc
+{
+ Channel *c;
+ Ioproc *next;
+ int index;
+};
+
+static struct Ioproc **pio;
+static struct pollfd *pfd;
+static int npfd;
+static struct Ioproc *iofree;
+
Ioproc*
ioproc(void)
{
- return nil;
+ Ioproc *io;
+
+ if(iofree == nil){
+ pfd = erealloc(pfd, (npfd+1)*sizeof(pfd[0]));
+ pfd[npfd].events = 0;
+ pfd[npfd].fd = -1;
+ iofree = emalloc(sizeof(Ioproc));
+ iofree->index = npfd;
+ iofree->c = chancreate(sizeof(ulong), 1);
+ pio = erealloc(pio, (npfd+1)*sizeof(pio[0]));
+ pio[npfd] = iofree;
+ npfd++;
+ }
+ io = iofree;
+ iofree = io->next;
+ return io;
+}
+
+void
+closeioproc(Ioproc *io)
+{
+ io->next = iofree;
+ iofree = io;
+}
+
+void
+pollthread(void *v)
+{
+ int i, n;
+
+ for(;;){
+ yield();
+ for(i=0; i<npfd; i++)
+ pfd[i].revents = 0;
+ if(verbose){
+ fprint(2, "poll:");
+ for(i=0; i<npfd; i++)
+ if(pfd[i].events)
+ fprint(2, " %d%c", pfd[i].fd, pfd[i].events==POLLIN ? 'r' : pfd[i].events==POLLOUT ? 'w' : '?');
+ fprint(2, "\n");
+ }
+ n = poll(pfd, npfd, -1);
+ if(n <= 0)
+ continue;
+ for(i=0; i<npfd; i++)
+ if(pfd[i].fd != -1 && pfd[i].revents){
+ pfd[i].fd = -1;
+ pfd[i].events = 0;
+ pfd[i].revents = 0;
+ nbsendul(pio[i]->c, 1);
+ }
+ }
+}
+
+static void
+noblock(int fd)
+{
+ fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0)|O_NONBLOCK);
+}
+
+static void
+xwait(Ioproc *io, int fd, int e)
+{
+ if(verbose) fprint(2, "wait for %d%c\n", fd, e==POLLIN ? 'r' : 'w');
+ pfd[io->index].fd = fd;
+ pfd[io->index].events = e;
+ recvul(io->c);
+ if(verbose) fprint(2, "got %d\n", fd);
+}
+
+static void
+rwait(Ioproc *io, int fd)
+{
+ xwait(io, fd, POLLIN);
+}
+
+static void
+wwait(Ioproc *io, int fd)
+{
+ xwait(io, fd, POLLOUT);
}
long
ioread(Ioproc *io, int fd, void *v, long n)
{
+ long r;
USED(io);
- xxx;
+ noblock(fd);
+ while((r=read(fd, v, n)) < 0 && errno == EWOULDBLOCK)
+ rwait(io, fd);
+ return r;
}
long
-iowrite(Ioproc *io, int fd, void *v, long n)
+ioreadn(Ioproc *io, int fd, void *v, long n)
+{
+ long tot, m;
+ uchar *u;
+
+ u = v;
+ for(tot=0; tot<n; tot+=m){
+ m = ioread(io, fd, u+tot, n-tot);
+ if(m <= 0){
+ if(tot)
+ break;
+ return m;
+ }
+ }
+ return tot;
+}
+
+int
+iorecvfd(Ioproc *io, int fd)
{
+ int r;
+
+ noblock(fd);
+ while((r=recvfd(fd)) < 0 && errno == EWOULDBLOCK)
+ rwait(io, fd);
+ return r;
+}
+
+int
+iosendfd(Ioproc *io, int s, int fd)
+{
+ int r;
+
+ noblock(s);
+ while((r=sendfd(s, fd)) < 0 && errno == EWOULDBLOCK)
+ wwait(io, s);
+if(r < 0) fprint(2, "sent %d, %d\n", s, fd);
+ return r;
+}
+
+static long
+_iowrite(Ioproc *io, int fd, void *v, long n)
+{
+ long r;
USED(io);
- xxx;
+ noblock(fd);
+ while((r=write(fd, v, n)) < 0 && errno == EWOULDBLOCK)
+ wwait(io, fd);
+ return r;
+}
+
+long
+iowrite(Ioproc *io, int fd, void *v, long n)
+{
+ long tot, m;
+ uchar *u;
+
+ u = v;
+ for(tot=0; tot<n; tot+=m){
+ m = _iowrite(io, fd, u+tot, n-tot);
+ if(m <= 0){
+ if(tot)
+ break;
+ return m;
+ }
+ }
+ return tot;
}
int
-iolisten(Ioproc *io, char *a, char *b)
+iolisten(Ioproc *io, char *dir, char *ndir)
{
+ int fd;
+ int r;
+ extern int _p9netfd(char*);
USED(io);
- xxx;
+ if((fd = _p9netfd(dir)) < 0)
+ return -1;
+ noblock(fd);
+ while((r=listen(dir, ndir)) < 0 && errno == EWOULDBLOCK)
+ rwait(io, fd);
+ return r;
}
int
ioaccept(Ioproc *io, int fd, char *dir)
{
+ int r;
USED(io);
- xxx;
+ noblock(fd);
+ while((r=accept(fd, dir)) < 0 && errno == EWOULDBLOCK)
+ rwait(io, fd);
+ return r;
}
#else