aboutsummaryrefslogtreecommitdiff
path: root/src/cmd/venti/srv/venti.c
diff options
context:
space:
mode:
authorrsc <devnull@localhost>2005-07-12 15:23:36 +0000
committerrsc <devnull@localhost>2005-07-12 15:23:36 +0000
commita0d146edd7a7de6236a0d60baafeeb59f8452aae (patch)
treeb55baa526d9f5adfc73246e6ee2fadf455e0b7a2 /src/cmd/venti/srv/venti.c
parent88bb285e3d87ec2508840af33f7e0af53ec3c13c (diff)
downloadplan9port-a0d146edd7a7de6236a0d60baafeeb59f8452aae.tar.gz
plan9port-a0d146edd7a7de6236a0d60baafeeb59f8452aae.tar.bz2
plan9port-a0d146edd7a7de6236a0d60baafeeb59f8452aae.zip
return of venti
Diffstat (limited to 'src/cmd/venti/srv/venti.c')
-rw-r--r--src/cmd/venti/srv/venti.c266
1 files changed, 266 insertions, 0 deletions
diff --git a/src/cmd/venti/srv/venti.c b/src/cmd/venti/srv/venti.c
new file mode 100644
index 00000000..91ded95c
--- /dev/null
+++ b/src/cmd/venti/srv/venti.c
@@ -0,0 +1,266 @@
+#ifdef PLAN9PORT
+#include <u.h>
+#include <signal.h>
+#endif
+#include "stdinc.h"
+#include "dat.h"
+#include "fns.h"
+
+#include "whack.h"
+
+int debug;
+int nofork;
+int mainstacksize = 256*1024;
+VtSrv *ventisrv;
+
+static void ventiserver(void*);
+
+void
+usage(void)
+{
+ fprint(2, "usage: venti [-dw] [-a ventiaddress] [-h httpaddress] [-c config] [-C cachesize] [-I icachesize] [-B blockcachesize]\n");
+ threadexitsall("usage");
+}
+void
+threadmain(int argc, char *argv[])
+{
+ char *configfile, *haddr, *vaddr, *webroot;
+ u32int mem, icmem, bcmem, minbcmem;
+ Config config;
+
+ traceinit();
+ threadsetname("main");
+ vaddr = nil;
+ haddr = nil;
+ configfile = nil;
+ webroot = nil;
+ mem = 0;
+ icmem = 0;
+ bcmem = 0;
+ ARGBEGIN{
+ case 'a':
+ vaddr = EARGF(usage());
+ break;
+ case 'B':
+ bcmem = unittoull(EARGF(usage()));
+ break;
+ case 'c':
+ configfile = EARGF(usage());
+ break;
+ case 'C':
+ mem = unittoull(EARGF(usage()));
+ break;
+ case 'D':
+ settrace(EARGF(usage()));
+ break;
+ case 'd':
+ debug = 1;
+ nofork = 1;
+ break;
+ case 'h':
+ haddr = EARGF(usage());
+ break;
+ case 'I':
+ icmem = unittoull(EARGF(usage()));
+ break;
+ case 'L':
+ ventilogging = 1;
+ break;
+ case 's':
+ nofork = 1;
+ break;
+ case 'W':
+ webroot = EARGF(usage());
+ break;
+ default:
+ usage();
+ }ARGEND
+
+ if(argc)
+ usage();
+
+ if(!nofork)
+ rfork(RFNOTEG);
+
+#ifdef PLAN9PORT
+ {
+ /* sigh - needed to avoid signals when writing to hungup networks */
+ struct sigaction sa;
+ memset(&sa, 0, sizeof sa);
+ sa.sa_handler = SIG_IGN;
+ sigaction(SIGPIPE, &sa, nil);
+ }
+#endif
+
+ trace(TraceQuiet, "venti started");
+ fprint(2, "venti: ");
+
+ ventifmtinstall();
+ if(configfile == nil)
+ configfile = "venti.conf";
+
+ if(initarenasum() < 0)
+ fprint(2, "warning: can't initialize arena summing process: %r");
+
+ fprint(2, "conf...");
+ if(initventi(configfile, &config) < 0)
+ sysfatal("can't init server: %r");
+
+ if(mem == 0)
+ mem = config.mem;
+ if(bcmem == 0)
+ bcmem = config.bcmem;
+ if(icmem == 0)
+ icmem = config.icmem;
+ if(haddr == nil)
+ haddr = config.haddr;
+ if(vaddr == nil)
+ vaddr = config.vaddr;
+ if(vaddr == nil)
+ vaddr = "tcp!*!venti";
+ if(webroot == nil)
+ webroot = config.webroot;
+ if(queuewrites == 0)
+ queuewrites = config.queuewrites;
+
+ if(haddr){
+ fprint(2, "httpd %s...", haddr);
+ if(httpdinit(haddr, webroot) < 0)
+ fprint(2, "warning: can't start http server: %r");
+ }
+
+ fprint(2, "init...");
+
+ if(mem == 0xffffffffUL)
+ mem = 1 * 1024 * 1024;
+ if(0) fprint(2, "initialize %d bytes of lump cache for %d lumps\n",
+ mem, mem / (8 * 1024));
+ initlumpcache(mem, mem / (8 * 1024));
+
+ icmem = u64log2(icmem / (sizeof(IEntry)+sizeof(IEntry*)) / ICacheDepth);
+ if(icmem < 4)
+ icmem = 4;
+ if(0) fprint(2, "initialize %d bytes of index cache for %d index entries\n",
+ (sizeof(IEntry)+sizeof(IEntry*)) * (1 << icmem) * ICacheDepth,
+ (1 << icmem) * ICacheDepth);
+ initicache(icmem, ICacheDepth);
+ initicachewrite();
+
+ /*
+ * need a block for every arena and every process
+ */
+ minbcmem = maxblocksize *
+ (mainindex->narenas + mainindex->nsects*4 + 16);
+ if(bcmem < minbcmem)
+ bcmem = minbcmem;
+
+ if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
+ initdcache(bcmem);
+
+ if(mainindex->bloom)
+ startbloomproc(mainindex->bloom);
+
+ fprint(2, "sync...");
+ if(syncindex(mainindex, 1, 0, 0) < 0)
+ sysfatal("can't sync server: %r");
+
+ if(queuewrites){
+ fprint(2, "queue...");
+ if(initlumpqueues(mainindex->nsects) < 0){
+ fprint(2, "can't initialize lump queues,"
+ " disabling write queueing: %r");
+ queuewrites = 0;
+ }
+ }
+
+ fprint(2, "announce %s...", vaddr);
+ ventisrv = vtlisten(vaddr);
+ if(ventisrv == nil)
+ sysfatal("can't announce %s: %r", vaddr);
+
+ fprint(2, "serving.\n");
+ if(nofork)
+ ventiserver(nil);
+ else
+ vtproc(ventiserver, nil);
+}
+
+static void
+vtrerror(VtReq *r, char *error)
+{
+ r->rx.msgtype = VtRerror;
+ r->rx.error = estrdup(error);
+}
+
+static void
+ventiserver(void *v)
+{
+ Packet *p;
+ VtReq *r;
+ char err[ERRMAX];
+ uint ms;
+ int cached, ok;
+
+ USED(v);
+ threadsetname("ventiserver");
+ trace(TraceWork, "start");
+ while((r = vtgetreq(ventisrv)) != nil){
+ trace(TraceWork, "finish");
+ trace(TraceWork, "start request %F", &r->tx);
+ trace(TraceRpc, "<- %F", &r->tx);
+ r->rx.msgtype = r->tx.msgtype+1;
+ addstat(StatRpcTotal, 1);
+ // print("req (arenas[0]=%p sects[0]=%p) %F\n",
+ // mainindex->arenas[0], mainindex->sects[0], &r->tx);
+ switch(r->tx.msgtype){
+ default:
+ vtrerror(r, "unknown request");
+ break;
+ case VtTread:
+ ms = msec();
+ r->rx.data = readlump(r->tx.score, r->tx.blocktype, r->tx.count, &cached);
+ ms = msec() - ms;
+ addstat2(StatRpcRead, 1, StatRpcReadTime, ms);
+ if(r->rx.data == nil){
+ addstat(StatRpcReadFail, 1);
+ rerrstr(err, sizeof err);
+ vtrerror(r, err);
+ }else{
+ addstat(StatRpcReadBytes, packetsize(r->rx.data));
+ addstat(StatRpcReadOk, 1);
+ if(cached)
+ addstat2(StatRpcReadCached, 1, StatRpcReadCachedTime, ms);
+ else
+ addstat2(StatRpcReadUncached, 1, StatRpcReadUncachedTime, ms);
+ }
+ break;
+ case VtTwrite:
+ p = r->tx.data;
+ r->tx.data = nil;
+ addstat(StatRpcWriteBytes, packetsize(p));
+ ms = msec();
+ ok = writelump(p, r->rx.score, r->tx.blocktype, 0, ms);
+ ms = msec() - ms;
+ addstat2(StatRpcWrite, 1, StatRpcWriteTime, ms);
+
+ if(ok < 0){
+ addstat(StatRpcWriteFail, 1);
+ rerrstr(err, sizeof err);
+ vtrerror(r, err);
+ }
+ break;
+ case VtTsync:
+ flushqueue();
+ flushdcache();
+ break;
+ }
+ trace(TraceRpc, "-> %F", &r->rx);
+ vtrespond(r);
+ trace(TraceWork, "start");
+ }
+ flushdcache();
+ flushicache();
+ threadexitsall(0);
+}
+
+