aboutsummaryrefslogtreecommitdiff
path: root/src/libventi/server.c
diff options
context:
space:
mode:
authorrsc <devnull@localhost>2003-11-23 18:19:58 +0000
committerrsc <devnull@localhost>2003-11-23 18:19:58 +0000
commit056fe1ba7fa0b70f871dfb9005b24eb8e4cc230b (patch)
tree9ad42f31c3bc124cf6617cf9eb41dd525eccce83 /src/libventi/server.c
parent9df487d720a59bf8cb0dc4ccffc30ad8eb48256a (diff)
downloadplan9port-056fe1ba7fa0b70f871dfb9005b24eb8e4cc230b.tar.gz
plan9port-056fe1ba7fa0b70f871dfb9005b24eb8e4cc230b.tar.bz2
plan9port-056fe1ba7fa0b70f871dfb9005b24eb8e4cc230b.zip
new venti library.
Diffstat (limited to 'src/libventi/server.c')
-rw-r--r--src/libventi/server.c172
1 files changed, 172 insertions, 0 deletions
diff --git a/src/libventi/server.c b/src/libventi/server.c
new file mode 100644
index 00000000..60d253df
--- /dev/null
+++ b/src/libventi/server.c
@@ -0,0 +1,172 @@
+#include <u.h>
+#include <libc.h>
+#include <venti.h>
+#include <thread.h>
+#include "queue.h"
+
+enum
+{
+ STACK = 8192,
+};
+
+typedef struct VtSconn VtSconn;
+struct VtSconn
+{
+ int ctl;
+ char dir[NETPATHLEN];
+ VtSrv *srv;
+ VtConn *c;
+};
+
+struct VtSrv
+{
+ int afd;
+ int dead;
+ char adir[NETPATHLEN];
+ Queue *q; /* Queue(VtReq*) */
+};
+
+static void listenproc(void*);
+static void connproc(void*);
+
+VtSrv*
+vtlisten(char *addr)
+{
+ VtSrv *s;
+
+ s = vtmallocz(sizeof(VtSrv));
+ s->afd = announce(addr, s->adir);
+ if(s->afd < 0){
+ free(s);
+ return nil;
+ }
+ s->q = _vtqalloc();
+ proccreate(listenproc, s, STACK);
+ return s;
+}
+
+static void
+listenproc(void *v)
+{
+ int ctl;
+ char dir[NETPATHLEN];
+ VtSrv *srv;
+ VtSconn *sc;
+
+ srv = v;
+ for(;;){
+ ctl = listen(srv->adir, dir);
+ if(ctl < 0){
+ srv->dead = 1;
+ break;
+ }
+ sc = vtmallocz(sizeof(VtSconn));
+ sc->ctl = ctl;
+ sc->srv = srv;
+ strcpy(sc->dir, dir);
+ proccreate(connproc, sc, STACK);
+ }
+
+ // hangup
+}
+
+static void
+connproc(void *v)
+{
+ VtSconn *sc;
+ VtConn *c;
+ Packet *p;
+ VtReq *r;
+ int fd;
+
+ r = nil;
+ c = nil;
+ sc = v;
+ fprint(2, "new call %s on %d\n", sc->dir, sc->ctl);
+ fd = accept(sc->ctl, sc->dir);
+ close(sc->ctl);
+ if(fd < 0){
+ fprint(2, "accept %s: %r\n", sc->dir);
+ goto out;
+ }
+
+ c = vtconn(fd, fd);
+ sc->c = c;
+ if(vtversion(c) < 0){
+ fprint(2, "vtversion %s: %r\n", sc->dir);
+ goto out;
+ }
+ if(vtsrvhello(c) < 0){
+ fprint(2, "vtsrvhello %s: %r\n", sc->dir);
+ goto out;
+ }
+
+ fprint(2, "new proc %s\n", sc->dir);
+ proccreate(vtsendproc, c, STACK);
+ qlock(&c->lk);
+ while(!c->writeq)
+ rsleep(&c->rpcfork);
+ qunlock(&c->lk);
+
+ while((p = vtrecv(c)) != nil){
+ r = vtmallocz(sizeof(VtReq));
+ if(vtfcallunpack(&r->tx, p) < 0){
+ packetfree(p);
+ fprint(2, "bad packet on %s: %r\n", sc->dir);
+ continue;
+ }
+ packetfree(p);
+ if(r->tx.type == VtTgoodbye)
+ break;
+ r->rx.tag = r->tx.tag;
+ r->sc = sc;
+ if(_vtqsend(sc->srv->q, r) < 0){
+ fprint(2, "hungup queue\n");
+ break;
+ }
+ r = nil;
+ }
+
+ fprint(2, "eof on %s\n", sc->dir);
+
+out:
+ if(r){
+ vtfcallclear(&r->tx);
+ vtfree(r);
+ }
+ if(c)
+ vtfreeconn(c);
+ fprint(2, "freed %s\n", sc->dir);
+ vtfree(sc);
+ return;
+}
+
+VtReq*
+vtgetreq(VtSrv *srv)
+{
+ return _vtqrecv(srv->q);
+}
+
+void
+vtrespond(VtReq *r)
+{
+ Packet *p;
+ VtSconn *sc;
+
+ sc = r->sc;
+ if(r->rx.tag != r->tx.tag)
+ abort();
+ if(r->rx.type != r->tx.type+1 && r->rx.type != VtRerror)
+ abort();
+ if((p = vtfcallpack(&r->rx)) == nil){
+ fprint(2, "fcallpack on %s: %r\n", sc->dir);
+ packetfree(p);
+ vtfcallclear(&r->rx);
+ return;
+ }
+ vtsend(sc->c, p);
+ vtfcallclear(&r->tx);
+ vtfcallclear(&r->rx);
+ vtfree(r);
+}
+