aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrsc <devnull@localhost>2004-03-02 19:21:48 +0000
committerrsc <devnull@localhost>2004-03-02 19:21:48 +0000
commit05b7f431f01dad68d31b4681a5583a0c3de2921a (patch)
tree0ed87977a8a259582d024e4f7ffda0efb887cc41
parentc4991217e1c34d6bc14c7a5d2371a74342581539 (diff)
downloadplan9port-05b7f431f01dad68d31b4681a5583a0c3de2921a.tar.gz
plan9port-05b7f431f01dad68d31b4681a5583a0c3de2921a.tar.bz2
plan9port-05b7f431f01dad68d31b4681a5583a0c3de2921a.zip
Long-standing stability bugs fixed in 9pserve.
Update win to use acme interface directly instead of via pipes. Add comment to pipe about lack of message boundaries.
-rw-r--r--src/cmd/9pserve.c61
-rw-r--r--src/cmd/win.c161
-rw-r--r--src/lib9/pipe.c1
3 files changed, 154 insertions, 69 deletions
diff --git a/src/cmd/9pserve.c b/src/cmd/9pserve.c
index 3d72437f..44965a13 100644
--- a/src/cmd/9pserve.c
+++ b/src/cmd/9pserve.c
@@ -74,7 +74,7 @@ char adir[40];
int isunix;
Queue *outq;
Queue *inq;
-int verbose;
+int verbose = 0;
int msize = 8192;
void *gethash(Hash**, uint);
@@ -276,8 +276,8 @@ connthread(void *arg)
{
int i, fd;
Conn *c;
- Hash *h;
- Msg *m, *om;
+ Hash *h, *hnext;
+ Msg *m, *om, *mm;
Fid *f;
Ioproc *io;
@@ -405,11 +405,16 @@ connthread(void *arg)
}
}
- if(verbose) fprint(2, "%s eof\n", c->dir);
+ if(verbose) fprint(2, "fd#%d eof; flushing conn\n", c->fd);
+
+ /* flush the output queue */
+ sendq(c->outq, nil);
+ while(c->outq != nil)
+ yield();
/* flush all outstanding messages */
for(i=0; i<NHASH; i++){
- for(h=c->tag[i]; h; h=h->next){
+ for(h=c->tag[i]; h; h=hnext){
om = h->v;
m = msgnew();
m->internal = 1;
@@ -419,19 +424,22 @@ connthread(void *arg)
m->tx.tag = m->tag;
m->tx.oldtag = om->tag;
m->oldm = om;
- om->ref++;
+ om->ref++; /* for m->oldm */
m->ref++; /* for outq */
sendomsg(m);
- recvp(c->internal);
+ mm = recvp(c->internal);
+ assert(mm == m);
msgput(m); /* got from recvp */
msgput(m); /* got from msgnew */
msgput(om); /* got from hash table */
+ hnext = h->next;
+ free(h);
}
}
/* clunk all outstanding fids */
for(i=0; i<NHASH; i++){
- for(h=c->fid[i]; h; h=h->next){
+ for(h=c->fid[i]; h; h=hnext){
f = h->v;
m = msgnew();
m->internal = 1;
@@ -444,10 +452,13 @@ connthread(void *arg)
f->ref++;
m->ref++;
sendomsg(m);
- recvp(c->internal);
+ mm = recvp(c->internal);
+ assert(mm == m);
msgput(m); /* got from recvp */
msgput(m); /* got from msgnew */
fidput(f); /* got from hash table */
+ hnext = h->next;
+ free(h);
}
}
@@ -461,8 +472,6 @@ out:
c->inc = 0;
free(c->inq);
c->inq = 0;
- free(c->outq);
- c->outq = 0;
free(c);
}
@@ -482,6 +491,7 @@ openfdthread(void *v)
io = ioproc();
tot = 0;
+ m = nil;
if(c->fdmode == OREAD){
for(;;){
if(verbose) fprint(2, "tread...");
@@ -506,11 +516,12 @@ openfdthread(void *v)
break;
tot += m->rx.count;
if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
- fprint(2, "pipe write error: %r\n");
+ // fprint(2, "pipe write error: %r\n");
break;
}
msgput(m);
msgput(m);
+ m = nil;
}
}else{
for(;;){
@@ -521,7 +532,6 @@ openfdthread(void *v)
if((n=ioread(io, c->fd, buf, n)) <= 0){
if(n < 0)
fprint(2, "pipe read error: %r\n");
- m = nil;
break;
}
m = msgnew();
@@ -540,11 +550,11 @@ openfdthread(void *v)
recvp(c->internal);
if(m->rx.type == Rerror){
// fprint(2, "write error: %s\n", m->rx.ename);
- continue;
}
- tot = n;
+ tot += n;
msgput(m);
msgput(m);
+ m = nil;
}
}
if(verbose) fprint(2, "eof on %d fid %d\n", c->fd, fid->fid);
@@ -559,6 +569,7 @@ openfdthread(void *v)
m->internal = 1;
m->c = c;
m->tx.type = Tclunk;
+ m->tx.tag = m->tag;
m->tx.fid = fid->fid;
m->fid = fid;
fid->ref++;
@@ -635,12 +646,14 @@ connoutthread(void *arg)
{
int err;
Conn *c;
+ Queue *outq;
Msg *m, *om;
Ioproc *io;
c = arg;
+ outq = c->outq;
io = ioproc();
- while((m = recvq(c->outq)) != nil){
+ while((m = recvq(outq)) != nil){
err = m->tx.type+1 != m->rx.type;
if(!err && m->isopenfd)
if(xopenfd(m) < 0)
@@ -687,6 +700,8 @@ connoutthread(void *arg)
nbsendp(c->inc, 0);
}
closeioproc(io);
+ free(outq);
+ c->outq = nil;
}
void
@@ -740,13 +755,16 @@ inputthread(void *arg)
msgput(m);
continue;
}
- if(verbose > 1) fprint(2, "* -> %F\n", &m->rx);
+ if(verbose > 1) fprint(2, "* -> %F%s\n", &m->rx,
+ m->internal ? " (internal)" : "");
m->rpkt = pkt;
m->rx.tag = m->ctag;
if(m->internal)
- sendp(m->c->internal, 0);
- else
+ sendp(m->c->internal, m);
+ else if(m->c->outq)
sendq(m->c->outq, m);
+ else
+ msgput(m);
}
closeioproc(io);
//fprint(2, "input eof\n");
@@ -856,12 +874,17 @@ msgnew(void)
void
msgput(Msg *m)
{
+ if(m == nil)
+ return;
+
if(verbose > 2) fprint(2, "msgput tag %d/%d ref %d\n", m->tag, m->ctag, m->ref);
assert(m->ref > 0);
if(--m->ref > 0)
return;
m->c->nmsg--;
m->c = nil;
+ msgput(m->oldm);
+ m->oldm = nil;
fidput(m->fid);
m->fid = nil;
fidput(m->afid);
diff --git a/src/cmd/win.c b/src/cmd/win.c
index ef8b7b61..1f6f5350 100644
--- a/src/cmd/win.c
+++ b/src/cmd/win.c
@@ -40,33 +40,49 @@ struct Q
Q q;
-int eventfd;
-int addrfd;
-int datafd;
-int ctlfd;
-int bodyfd;
+Fid *eventfd;
+Fid *addrfd;
+Fid *datafd;
+Fid *ctlfd;
+// int bodyfd;
char *typing;
int ntypeb;
int ntyper;
int ntypebreak;
int debug;
+char *name;
char **prog;
int p[2];
Channel *cpid;
+Channel *cwait;
int pid = -1;
+int label(char*, int);
void error(char*);
void stdinproc(void*);
void stdoutproc(void*);
-void type(Event*, int, int, int);
-void sende(Event*, int, int, int, int, int);
+void type(Event*, int, Fid*, Fid*);
+void sende(Event*, int, Fid*, Fid*, Fid*, int);
char *onestring(int, char**);
int delete(Event*);
void deltype(uint, uint);
void runproc(void*);
+int
+fsfidprint(Fid *fid, char *fmt, ...)
+{
+ char buf[256];
+ va_list arg;
+ int n;
+
+ va_start(arg, fmt);
+ n = vsnprint(buf, sizeof buf, fmt, arg);
+ va_end(arg);
+ return fswrite(fid, buf, n);
+}
+
void
usage(void)
{
@@ -84,12 +100,18 @@ nopipes(void *v, char *msg)
}
void
+waitthread(void *v)
+{
+ recvp(cwait);
+ threadexitsall(nil);
+}
+
+void
threadmain(int argc, char **argv)
{
int fd, id;
char buf[256];
char buf1[128];
- char *name;
Fsys *fs;
ARGBEGIN{
@@ -110,8 +132,8 @@ threadmain(int argc, char **argv)
threadnotify(nopipes, 1);
if((fs = nsmount("acme", "")) < 0)
sysfatal("nsmount acme: %r");
- ctlfd = fsopenfd(fs, "new/ctl", ORDWR|OCEXEC);
- if(ctlfd < 0 || read(ctlfd, buf, 12) != 12)
+ ctlfd = fsopen(fs, "new/ctl", ORDWR|OCEXEC);
+ if(ctlfd < 0 || fsread(ctlfd, buf, 12) != 12)
sysfatal("ctl: %r");
id = atoi(buf);
sprint(buf, "%d/tag", id);
@@ -119,21 +141,27 @@ threadmain(int argc, char **argv)
write(fd, " Send Delete", 12);
close(fd);
sprint(buf, "%d/event", id);
- eventfd = fsopenfd(fs, buf, ORDWR|OCEXEC);
+ eventfd = fsopen(fs, buf, ORDWR|OCEXEC);
sprint(buf, "%d/addr", id);
- addrfd = fsopenfd(fs, buf, ORDWR|OCEXEC);
+ addrfd = fsopen(fs, buf, ORDWR|OCEXEC);
sprint(buf, "%d/data", id);
- datafd = fsopenfd(fs, buf, ORDWR|OCEXEC);
+ datafd = fsopen(fs, buf, ORDWR|OCEXEC);
sprint(buf, "%d/body", id);
- bodyfd = fsopenfd(fs, buf, ORDWR|OCEXEC);
+/* bodyfd = fsopenfd(fs, buf, ORDWR|OCEXEC); */
+ if(eventfd==nil || addrfd==nil || datafd==nil)
+ sysfatal("data files: %r");
+/*
if(eventfd<0 || addrfd<0 || datafd<0 || bodyfd<0)
sysfatal("data files: %r");
+*/
fsunmount(fs);
if(pipe(p) < 0)
sysfatal("pipe: %r");
cpid = chancreate(sizeof(ulong), 1);
+ cwait = threadwaitchan();
+ threadcreate(waitthread, nil, STACK);
threadcreate(runproc, nil, STACK);
pid = recvul(cpid);
if(pid == -1)
@@ -141,13 +169,13 @@ threadmain(int argc, char **argv)
getwd(buf1, sizeof buf1);
sprint(buf, "name %s/-%s\n0\n", buf1, name);
- write(ctlfd, buf, strlen(buf));
+ fswrite(ctlfd, buf, strlen(buf));
sprint(buf, "dumpdir %s/\n", buf1);
- write(ctlfd, buf, strlen(buf));
+ fswrite(ctlfd, buf, strlen(buf));
sprint(buf, "dump %s\n", onestring(argc, argv));
- write(ctlfd, buf, strlen(buf));
+ fswrite(ctlfd, buf, strlen(buf));
-// proccreate(stdoutproc, nil, STACK);
+ threadcreate(stdoutproc, nil, STACK);
stdinproc(nil);
}
@@ -161,10 +189,10 @@ runproc(void *v)
USED(v);
fd[0] = p[1];
- fd[1] = bodyfd;
- fd[2] = bodyfd;
-// fd[1] = p[1];
-// fd[2] = p[1];
+// fd[1] = bodyfd;
+// fd[2] = bodyfd;
+ fd[1] = p[1];
+ fd[2] = p[1];
if(prog[0] == nil){
prog = shell;
@@ -210,14 +238,14 @@ onestring(int argc, char **argv)
}
int
-getec(int efd)
+getec(Fid *efd)
{
static char buf[8192];
static char *bufp;
static int nbuf;
if(nbuf == 0){
- nbuf = read(efd, buf, sizeof buf);
+ nbuf = fsread(efd, buf, sizeof buf);
if(nbuf <= 0)
error(nil);
bufp = buf;
@@ -227,7 +255,7 @@ getec(int efd)
}
int
-geten(int efd)
+geten(Fid *efd)
{
int n, c;
@@ -240,7 +268,7 @@ geten(int efd)
}
int
-geter(int efd, char *buf, int *nb)
+geter(Fid *efd, char *buf, int *nb)
{
Rune r;
int n;
@@ -259,7 +287,7 @@ geter(int efd, char *buf, int *nb)
}
void
-gete(int efd, Event *e)
+gete(Fid *efd, Event *e)
{
int i, nb;
@@ -297,10 +325,10 @@ nrunes(char *s, int nb)
void
stdinproc(void *v)
{
- int cfd = ctlfd;
- int efd = eventfd;
- int dfd = datafd;
- int afd = addrfd;
+ Fid *cfd = ctlfd;
+ Fid *efd = eventfd;
+ Fid *dfd = datafd;
+ Fid *afd = addrfd;
int fd0 = p[0];
Event e, e2, e3, e4;
@@ -358,7 +386,7 @@ stdinproc(void *v)
}
if(e.flag&1 || (e.c2=='x' && e.nr==0 && e2.nr==0)){
/* send it straight back */
- fprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1);
+ fsfidprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1);
break;
}
if(e.q0==e.q1 && (e.flag&2)){
@@ -380,7 +408,7 @@ stdinproc(void *v)
/* just send it back */
if(e.flag & 2)
gete(efd, &e2);
- fprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1);
+ fsfidprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1);
break;
case 'd':
@@ -399,8 +427,8 @@ void
stdoutproc(void *v)
{
int fd1 = p[0];
- int afd = addrfd;
- int dfd = datafd;
+ Fid *afd = addrfd;
+ Fid *dfd = datafd;
int n, m, w, npart;
char *buf, *s, *t;
Rune r;
@@ -411,7 +439,7 @@ stdoutproc(void *v)
buf = malloc(8192+UTFmax+1);
npart = 0;
for(;;){
- n = read(fd1, buf+npart, 8192);
+ n = threadread(fd1, buf+npart, 8192);
if(n < 0)
error(nil);
if(n == 0)
@@ -445,11 +473,13 @@ stdoutproc(void *v)
if(n > 0){
memmove(hold, buf+n, npart);
buf[n] = 0;
+ n = label(buf, n);
+ buf[n] = 0;
qlock(&q.lk);
m = sprint(x, "#%d", q.p);
- if(write(afd, x, m) != m)
+ if(fswrite(afd, x, m) != m)
error("stdout writing address");
- if(write(dfd, buf, n) != n)
+ if(fswrite(dfd, buf, n) != n)
error("stdout writing body");
q.p += nrunes(buf, n);
qunlock(&q.lk);
@@ -458,6 +488,37 @@ stdoutproc(void *v)
}
}
+char wdir[256];
+int
+label(char *sr, int n)
+{
+ char *sl, *el, *er, *r;
+
+ er = sr+n;
+ for(r=er-1; r>=sr; r--)
+ if(*r == '\007')
+ break;
+ if(r < sr)
+ return n;
+
+ el = r+1;
+ if(el-sr > sizeof wdir)
+ sr = el - sizeof wdir;
+ for(sl=el-3; sl>=sr; sl--)
+ if(sl[0]=='\033' && sl[1]==']' && sl[2]==';')
+ break;
+ if(sl < sr)
+ return n;
+
+ *r = 0;
+ snprint(wdir, sizeof wdir, "name %s/-%s\n0\n", sl+3, name);
+ fswrite(ctlfd, wdir, strlen(wdir));
+
+ memmove(sl, el, er-el);
+ n -= (el-sl);
+ return n;
+}
+
int
delete(Event *e)
{
@@ -584,7 +645,7 @@ deltype(uint p0, uint p1)
}
void
-type(Event *e, int fd0, int afd, int dfd)
+type(Event *e, int fd0, Fid *afd, Fid *dfd)
{
int m, n, nr;
char buf[128];
@@ -595,8 +656,8 @@ type(Event *e, int fd0, int afd, int dfd)
m = e->q0;
while(m < e->q1){
n = sprint(buf, "#%d", m);
- write(afd, buf, n);
- n = read(dfd, buf, sizeof buf);
+ fswrite(afd, buf, n);
+ n = fsread(dfd, buf, sizeof buf);
nr = nrunes(buf, n);
while(m+nr > e->q1){
do; while(n>0 && (buf[--n]&0xC0)==0x80);
@@ -612,16 +673,16 @@ type(Event *e, int fd0, int afd, int dfd)
}
void
-sende(Event *e, int fd0, int cfd, int afd, int dfd, int donl)
+sende(Event *e, int fd0, Fid *cfd, Fid *afd, Fid *dfd, int donl)
{
int l, m, n, nr, lastc, end;
char abuf[16], buf[128];
end = q.p+ntyper;
l = sprint(abuf, "#%d", end);
- write(afd, abuf, l);
+ fswrite(afd, abuf, l);
if(e->nr > 0){
- write(dfd, e->b, e->nb);
+ fswrite(dfd, e->b, e->nb);
addtype(e->c1, ntyper, e->b, e->nb, e->nr);
lastc = e->r[e->nr-1];
}else{
@@ -629,8 +690,8 @@ sende(Event *e, int fd0, int cfd, int afd, int dfd, int donl)
lastc = 0;
while(m < e->q1){
n = sprint(buf, "#%d", m);
- write(afd, buf, n);
- n = read(dfd, buf, sizeof buf);
+ fswrite(afd, buf, n);
+ n = fsread(dfd, buf, sizeof buf);
nr = nrunes(buf, n);
while(m+nr > e->q1){
do; while(n>0 && (buf[--n]&0xC0)==0x80);
@@ -639,8 +700,8 @@ sende(Event *e, int fd0, int cfd, int afd, int dfd, int donl)
if(n == 0)
break;
l = sprint(abuf, "#%d", end);
- write(afd, abuf, l);
- write(dfd, buf, n);
+ fswrite(afd, abuf, l);
+ fswrite(dfd, buf, n);
addtype(e->c1, ntyper, buf, n, nr);
lastc = buf[n-1];
m += nr;
@@ -648,9 +709,9 @@ sende(Event *e, int fd0, int cfd, int afd, int dfd, int donl)
}
}
if(donl && lastc!='\n'){
- write(dfd, "\n", 1);
+ fswrite(dfd, "\n", 1);
addtype(e->c1, ntyper, "\n", 1, 1);
}
- write(cfd, "dot=addr", 8);
+ fswrite(cfd, "dot=addr", 8);
sendtype(fd0);
}
diff --git a/src/lib9/pipe.c b/src/lib9/pipe.c
index f9fe2420..4caeb6c1 100644
--- a/src/lib9/pipe.c
+++ b/src/lib9/pipe.c
@@ -3,6 +3,7 @@
#include <libc.h>
#include <sys/socket.h>
+/* BUG: would like to preserve delimiters on systems that can */
int
p9pipe(int fd[2])
{