aboutsummaryrefslogtreecommitdiff
path: root/src/libsunrpc/client.c
diff options
context:
space:
mode:
authorrsc <devnull@localhost>2004-04-21 03:04:30 +0000
committerrsc <devnull@localhost>2004-04-21 03:04:30 +0000
commit551445b92c1f11d4f543e96790ff29762ab1ad10 (patch)
tree5129ea4e5cc91ff823623b080540fedcfaa72786 /src/libsunrpc/client.c
parentfa256eecfaf035cd6c46335452357856dc0bd9e9 (diff)
downloadplan9port-551445b92c1f11d4f543e96790ff29762ab1ad10.tar.gz
plan9port-551445b92c1f11d4f543e96790ff29762ab1ad10.tar.bz2
plan9port-551445b92c1f11d4f543e96790ff29762ab1ad10.zip
Add sunrpc.
Diffstat (limited to 'src/libsunrpc/client.c')
-rw-r--r--src/libsunrpc/client.c490
1 files changed, 490 insertions, 0 deletions
diff --git a/src/libsunrpc/client.c b/src/libsunrpc/client.c
new file mode 100644
index 00000000..f20208a2
--- /dev/null
+++ b/src/libsunrpc/client.c
@@ -0,0 +1,490 @@
+/*
+ * Sun RPC client.
+ */
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+#include <sunrpc.h>
+
+typedef struct Out Out;
+struct Out
+{
+ char err[ERRMAX]; /* error string */
+ Channel *creply; /* send to finish rpc */
+ uchar *p; /* pending request packet */
+ int n; /* size of request */
+ ulong tag; /* flush tag of pending request */
+ ulong xid; /* xid of pending request */
+ ulong st; /* first send time */
+ ulong t; /* resend time */
+ int nresend; /* number of resends */
+ SunRpc rpc; /* response rpc */
+};
+
+static void
+udpThread(void *v)
+{
+ uchar *p, *buf;
+ Ioproc *io;
+ int n;
+ SunClient *cli;
+ enum { BufSize = 65536 };
+
+ cli = v;
+ buf = emalloc(BufSize);
+ io = ioproc();
+ p = nil;
+ for(;;){
+ n = ioread(io, cli->fd, buf, BufSize);
+ if(n <= 0)
+ break;
+ p = emalloc(4+n);
+ memmove(p+4, buf, n);
+ p[0] = n>>24;
+ p[1] = n>>16;
+ p[2] = n>>8;
+ p[3] = n;
+ if(sendp(cli->readchan, p) == 0)
+ break;
+ p = nil;
+ }
+ free(p);
+ closeioproc(io);
+ while(send(cli->dying, nil) == -1)
+ ;
+}
+
+static void
+netThread(void *v)
+{
+ uchar *p, buf[4];
+ Ioproc *io;
+ uint n, tot;
+ int done;
+ SunClient *cli;
+
+ cli = v;
+ io = ioproc();
+ tot = 0;
+ p = nil;
+ for(;;){
+ n = ioreadn(io, cli->fd, buf, 4);
+ if(n != 4)
+ break;
+ n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
+ if(cli->chatty)
+ fprint(2, "%.8ux...", n);
+ done = n&0x80000000;
+ n &= ~0x80000000;
+ if(tot == 0){
+ p = emalloc(4+n);
+ tot = 4;
+ }else
+ p = erealloc(p, tot+n);
+ if(ioreadn(io, cli->fd, p+tot, n) != n)
+ break;
+ tot += n;
+ if(done){
+ p[0] = tot>>24;
+ p[1] = tot>>16;
+ p[2] = tot>>8;
+ p[3] = tot;
+ if(sendp(cli->readchan, p) == 0)
+ break;
+ p = nil;
+ tot = 0;
+ }
+ }
+ free(p);
+ closeioproc(io);
+ while(send(cli->dying, 0) == -1)
+ ;
+}
+
+static void
+timerThread(void *v)
+{
+ Ioproc *io;
+ SunClient *cli;
+
+ cli = v;
+ io = ioproc();
+ for(;;){
+ if(iosleep(io, 200) < 0)
+ break;
+ if(sendul(cli->timerchan, 0) == 0)
+ break;
+ }
+ closeioproc(io);
+ while(send(cli->dying, 0) == -1)
+ ;
+}
+
+static ulong
+msec(void)
+{
+ return nsec()/1000000;
+}
+
+static ulong
+twait(ulong rtt, int nresend)
+{
+ ulong t;
+
+ t = rtt;
+ if(nresend <= 1)
+ {}
+ else if(nresend <= 3)
+ t *= 2;
+ else if(nresend <= 18)
+ t <<= nresend-2;
+ else
+ t = 60*1000;
+ if(t > 60*1000)
+ t = 60*1000;
+
+ return t;
+}
+
+static void
+rpcMuxThread(void *v)
+{
+ uchar *buf, *p, *ep;
+ int i, n, nout, mout;
+ ulong t, xidgen, tag;
+ Alt a[5];
+ Out *o, **out;
+ SunRpc rpc;
+ SunClient *cli;
+
+ cli = v;
+ mout = 16;
+ nout = 0;
+ out = emalloc(mout*sizeof(out[0]));
+ xidgen = truerand();
+
+ a[0].op = CHANRCV;
+ a[0].c = cli->rpcchan;
+ a[0].v = &o;
+ a[1].op = CHANNOP;
+ a[1].c = cli->timerchan;
+ a[1].v = nil;
+ a[2].op = CHANRCV;
+ a[2].c = cli->flushchan;
+ a[2].v = &tag;
+ a[3].op = CHANRCV;
+ a[3].c = cli->readchan;
+ a[3].v = &buf;
+ a[4].op = CHANEND;
+
+ for(;;){
+ switch(alt(a)){
+ case 0: /* o = <-rpcchan */
+ if(o == nil)
+ goto Done;
+ cli->nsend++;
+ /* set xid */
+ o->xid = ++xidgen;
+ if(cli->needcount)
+ p = o->p+4;
+ else
+ p = o->p;
+ p[0] = xidgen>>24;
+ p[1] = xidgen>>16;
+ p[2] = xidgen>>8;
+ p[3] = xidgen;
+ if(write(cli->fd, o->p, o->n) != o->n){
+ free(o->p);
+ o->p = nil;
+ snprint(o->err, sizeof o->err, "write: %r");
+ sendp(o->creply, 0);
+ break;
+ }
+ if(nout >= mout){
+ mout *= 2;
+ out = erealloc(out, mout*sizeof(out[0]));
+ }
+ o->st = msec();
+ o->nresend = 0;
+ o->t = o->st + twait(cli->rtt.avg, 0);
+if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
+ out[nout++] = o;
+ a[1].op = CHANRCV;
+ break;
+
+ case 1: /* <-timerchan */
+ t = msec();
+ for(i=0; i<nout; i++){
+ o = out[i];
+ if((int)(t - o->t) > 0){
+if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
+ if(cli->maxwait && t - o->st >= cli->maxwait){
+ free(o->p);
+ o->p = nil;
+ strcpy(o->err, "timeout");
+ sendp(o->creply, 0);
+ out[i--] = out[--nout];
+ continue;
+ }
+ cli->nresend++;
+ o->nresend++;
+ o->t = t + twait(cli->rtt.avg, o->nresend);
+ if(write(cli->fd, o->p, o->n) != o->n){
+ free(o->p);
+ o->p = nil;
+ snprint(o->err, sizeof o->err, "rewrite: %r");
+ sendp(o->creply, 0);
+ out[i--] = out[--nout];
+ continue;
+ }
+ }
+ }
+ /* stop ticking if no work; rpcchan will turn it back on */
+ if(nout == 0)
+ a[1].op = CHANNOP;
+ break;
+
+ case 2: /* tag = <-flushchan */
+ for(i=0; i<nout; i++){
+ o = out[i];
+ if(o->tag == tag){
+ out[i--] = out[--nout];
+ strcpy(o->err, "flushed");
+ free(o->p);
+ o->p = nil;
+ sendp(o->creply, 0);
+ }
+ }
+ break;
+
+ case 3: /* buf = <-readchan */
+ p = buf;
+ n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
+ p += 4;
+ ep = p+n;
+ if(sunrpcunpack(p, ep, &p, &rpc) < 0){
+ fprint(2, "%s: in: %.*H unpack failed\n", argv0, n, buf+4);
+ free(buf);
+ break;
+ }
+ if(cli->chatty)
+ fprint(2, "in: %B\n", &rpc);
+ if(rpc.iscall){
+ fprint(2, "did not get reply\n");
+ free(buf);
+ break;
+ }
+ o = nil;
+ for(i=0; i<nout; i++){
+ o = out[i];
+ if(o->xid == rpc.xid)
+ break;
+ }
+ if(i==nout){
+ if(cli->chatty) fprint(2, "did not find waiting request\n");
+ free(buf);
+ break;
+ }
+ out[i] = out[--nout];
+ free(o->p);
+ o->p = nil;
+ if(rpc.status == SunSuccess){
+ o->p = buf;
+ o->rpc = rpc;
+ }else{
+ o->p = nil;
+ free(buf);
+ sunerrstr(rpc.status);
+ rerrstr(o->err, sizeof o->err);
+ }
+ sendp(o->creply, 0);
+ break;
+ }
+ }
+Done:
+ free(out);
+ sendp(cli->dying, 0);
+}
+
+SunClient*
+sundial(char *address)
+{
+ int fd;
+ SunClient *cli;
+
+ if((fd = dial(address, 0, 0, 0)) < 0)
+ return nil;
+
+ cli = emalloc(sizeof(SunClient));
+ cli->fd = fd;
+ cli->maxwait = 15000;
+ cli->rtt.avg = 1000;
+ cli->dying = chancreate(sizeof(void*), 0);
+ cli->rpcchan = chancreate(sizeof(Out*), 0);
+ cli->timerchan = chancreate(sizeof(ulong), 0);
+ cli->flushchan = chancreate(sizeof(ulong), 0);
+ cli->readchan = chancreate(sizeof(uchar*), 0);
+ if(strstr(address, "udp!")){
+ cli->needcount = 0;
+ cli->nettid = threadcreate(udpThread, cli, SunStackSize);
+ cli->timertid = threadcreate(timerThread, cli, SunStackSize);
+ }else{
+ cli->needcount = 1;
+ cli->nettid = threadcreate(netThread, cli, SunStackSize);
+ /* assume reliable: don't need timer */
+ /* BUG: netThread should know how to redial */
+ }
+ threadcreate(rpcMuxThread, cli, SunStackSize);
+
+ return cli;
+}
+
+void
+sunclientclose(SunClient *cli)
+{
+ int n;
+
+ /*
+ * Threadints get you out of any stuck system calls
+ * or thread rendezvouses, but do nothing if the thread
+ * is in the ready state. Keep interrupting until it takes.
+ */
+ n = 0;
+ if(!cli->timertid)
+ n++;
+ while(n < 2){
+ threadint(cli->nettid);
+ if(cli->timertid)
+ threadint(cli->timertid);
+ yield();
+ while(nbrecv(cli->dying, nil) == 1)
+ n++;
+ }
+
+ sendp(cli->rpcchan, 0);
+ recvp(cli->dying);
+
+ /* everyone's gone: clean up */
+ close(cli->fd);
+ chanfree(cli->flushchan);
+ chanfree(cli->readchan);
+ chanfree(cli->timerchan);
+ free(cli);
+}
+
+void
+sunclientflushrpc(SunClient *cli, ulong tag)
+{
+ sendul(cli->flushchan, tag);
+}
+
+void
+sunclientprog(SunClient *cli, SunProg *p)
+{
+ if(cli->nprog%16 == 0)
+ cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
+ cli->prog[cli->nprog++] = p;
+}
+
+int
+sunclientrpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree)
+{
+ uchar *bp, *p, *ep;
+ int i, n1, n2, n, nn;
+ Out o;
+ SunProg *prog;
+ SunStatus ok;
+
+ for(i=0; i<cli->nprog; i++)
+ if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
+ break;
+ if(i==cli->nprog){
+ werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
+ return -1;
+ }
+ prog = cli->prog[i];
+
+ if(cli->chatty){
+ fprint(2, "out: %B\n", &tx->rpc);
+ fprint(2, "\t%C\n", tx);
+ }
+
+ n1 = sunrpcsize(&tx->rpc);
+ n2 = suncallsize(prog, tx);
+
+ n = n1+n2;
+ if(cli->needcount)
+ n += 4;
+
+ /*
+ * The dance with 100 is to leave some padding in case
+ * suncallsize is slightly underestimating. If this happens,
+ * the pack will succeed and then we can give a good size
+ * mismatch error below. Otherwise the pack fails with
+ * garbage args, which is less helpful.
+ */
+ bp = emalloc(n+100);
+ ep = bp+n+100;
+ p = bp;
+ if(cli->needcount){
+ nn = n-4;
+ p[0] = (nn>>24)|0x80;
+ p[1] = nn>>16;
+ p[2] = nn>>8;
+ p[3] = nn;
+ p += 4;
+ }
+ if((ok = sunrpcpack(p, ep, &p, &tx->rpc)) != SunSuccess
+ || (ok = suncallpack(prog, p, ep, &p, tx)) != SunSuccess){
+ sunerrstr(ok);
+ free(bp);
+ return -1;
+ }
+ ep -= 100;
+ if(p != ep){
+ werrstr("rpc: packet size mismatch %d %ld %ld", n, ep-bp, p-bp);
+ free(bp);
+ return -1;
+ }
+
+ memset(&o, 0, sizeof o);
+ o.creply = chancreate(sizeof(void*), 0);
+ o.tag = tag;
+ o.p = bp;
+ o.n = n;
+
+ sendp(cli->rpcchan, &o);
+ recvp(o.creply);
+ chanfree(o.creply);
+
+ if(o.p == nil){
+ werrstr("%s", o.err);
+ return -1;
+ }
+
+ p = o.rpc.data;
+ ep = p+o.rpc.ndata;
+ rx->rpc = o.rpc;
+ rx->rpc.proc = tx->rpc.proc;
+ rx->rpc.prog = tx->rpc.prog;
+ rx->rpc.vers = tx->rpc.vers;
+ rx->type = (rx->rpc.proc<<1)|1;
+ if((ok = suncallunpack(prog, p, ep, &p, rx)) != SunSuccess){
+ sunerrstr(ok);
+ werrstr("unpack: %r");
+ free(o.p);
+ return -1;
+ }
+
+ if(cli->chatty){
+ fprint(2, "in: %B\n", &rx->rpc);
+ fprint(2, "in:\t%C\n", rx);
+ }
+
+ if(tofree)
+ *tofree = o.p;
+ else
+ free(o.p);
+
+ return 0;
+}