From 056fe1ba7fa0b70f871dfb9005b24eb8e4cc230b Mon Sep 17 00:00:00 2001 From: rsc Date: Sun, 23 Nov 2003 18:19:58 +0000 Subject: new venti library. --- src/libventi/send.c | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 212 insertions(+) create mode 100644 src/libventi/send.c (limited to 'src/libventi/send.c') diff --git a/src/libventi/send.c b/src/libventi/send.c new file mode 100644 index 00000000..a72a6c23 --- /dev/null +++ b/src/libventi/send.c @@ -0,0 +1,212 @@ +#include +#include +#include +#include "queue.h" + +static int +_vtsend(VtConn *z, Packet *p) +{ + IOchunk ioc; + int n; + uchar buf[2]; + + if(z->state != VtStateConnected) { + werrstr("session not connected"); + return -1; + } + + /* add framing */ + n = packetsize(p); + if(n >= (1<<16)) { + werrstr("packet too large"); + packetfree(p); + return -1; + } + buf[0] = n>>8; + buf[1] = n; + packetprefix(p, buf, 2); + + for(;;){ + n = packetfragments(p, &ioc, 1, 0); + if(n == 0) + break; + if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){ + packetfree(p); + return 0; + } + packetconsume(p, nil, ioc.len); + } + packetfree(p); + return 1; +} + +static Packet* +_vtrecv(VtConn *z) +{ + uchar buf[10], *b; + int n; + Packet *p; + int size, len; + + if(z->state != VtStateConnected) { + werrstr("session not connected"); + return nil; + } + + p = z->part; + /* get enough for head size */ + size = packetsize(p); + while(size < 2) { + b = packettrailer(p, MaxFragSize); + assert(b != nil); + n = read(z->infd, b, MaxFragSize); + if(n <= 0) + goto Err; + size += n; + packettrim(p, 0, size); + } + + if(packetconsume(p, buf, 2) < 0) + goto Err; + len = (buf[0] << 8) | buf[1]; + size -= 2; + + while(size < len) { + n = len - size; + if(n > MaxFragSize) + n = MaxFragSize; + b = packettrailer(p, n); + if(readn(z->infd, b, n) != n) + goto Err; + size += n; + } + p = packetsplit(p, len); + return p; +Err: + return nil; +} + +/* + * If you fork off two procs running vtrecvproc and vtsendproc, + * then vtrecv/vtsend (and thus vtrpc) will never block except on + * rendevouses, which is nice when it's running in one thread of many. + */ +void +vtrecvproc(void *v) +{ + Packet *p; + VtConn *z; + Queue *q; + + z = v; + q = _vtqalloc(); + + qlock(&z->lk); + z->readq = q; + qlock(&z->inlk); + rwakeup(&z->rpcfork); + qunlock(&z->lk); + + while((p = _vtrecv(z)) != nil) + if(_vtqsend(q, p) < 0){ + packetfree(p); + break; + } + qunlock(&z->inlk); + qlock(&z->lk); + _vtqhangup(q); + while((p = _vtnbqrecv(q)) != nil) + packetfree(p); + vtfree(q); + z->readq = nil; + rwakeup(&z->rpcfork); + qunlock(&z->lk); + vthangup(z); +} + +void +vtsendproc(void *v) +{ + Queue *q; + Packet *p; + VtConn *z; + + z = v; + q = _vtqalloc(); + + qlock(&z->lk); + z->writeq = q; + qlock(&z->outlk); + rwakeup(&z->rpcfork); + qunlock(&z->lk); + + while((p = _vtqrecv(q)) != nil) + if(_vtsend(z, p) < 0) + break; + qunlock(&z->outlk); + qlock(&z->lk); + _vtqhangup(q); + while((p = _vtnbqrecv(q)) != nil) + packetfree(p); + vtfree(q); + z->writeq = nil; + rwakeup(&z->rpcfork); + qunlock(&z->lk); + return; +} + +Packet* +vtrecv(VtConn *z) +{ + Packet *p; + + qlock(&z->lk); + if(z->state != VtStateConnected){ + werrstr("not connected"); + qunlock(&z->lk); + return nil; + } + if(z->readq){ + qunlock(&z->lk); + return _vtqrecv(z->readq); + } + + qlock(&z->inlk); + qunlock(&z->lk); + p = _vtrecv(z); + qunlock(&z->inlk); + if(!p) + vthangup(z); + return p; +} + +int +vtsend(VtConn *z, Packet *p) +{ + qlock(&z->lk); + if(z->state != VtStateConnected){ + packetfree(p); + werrstr("not connected"); + qunlock(&z->lk); + return -1; + } + if(z->writeq){ + qunlock(&z->lk); + if(_vtqsend(z->writeq, p) < 0){ + packetfree(p); + return -1; + } + return 0; + } + + qlock(&z->outlk); + qunlock(&z->lk); + if(_vtsend(z, p) < 0){ + qunlock(&z->outlk); + vthangup(z); + return -1; + } + qunlock(&z->outlk); + return 0; +} + -- cgit v1.2.3