diff options
Diffstat (limited to 'src/cmd/venti/srv/lumpqueue.c')
-rw-r--r-- | src/cmd/venti/srv/lumpqueue.c | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/src/cmd/venti/srv/lumpqueue.c b/src/cmd/venti/srv/lumpqueue.c new file mode 100644 index 00000000..1b03f41c --- /dev/null +++ b/src/cmd/venti/srv/lumpqueue.c @@ -0,0 +1,187 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +typedef struct LumpQueue LumpQueue; +typedef struct WLump WLump; + +enum +{ + MaxLumpQ = 1 << 3 /* max. lumps on a single write queue, must be pow 2 */ +}; + +struct WLump +{ + Lump *u; + Packet *p; + int creator; + int gen; + uint ms; +}; + +struct LumpQueue +{ + QLock lock; + Rendez flush; + Rendez full; + Rendez empty; + WLump q[MaxLumpQ]; + int w; + int r; +}; + +static LumpQueue *lumpqs; +static int nqs; + +static QLock glk; +static int gen; + +static void queueproc(void *vq); + +int +initlumpqueues(int nq) +{ + LumpQueue *q; + + int i; + nqs = nq; + + lumpqs = MKNZ(LumpQueue, nq); + + for(i = 0; i < nq; i++){ + q = &lumpqs[i]; + q->full.l = &q->lock; + q->empty.l = &q->lock; + q->flush.l = &q->lock; + + if(vtproc(queueproc, q) < 0){ + seterr(EOk, "can't start write queue slave: %r"); + return -1; + } + if(vtproc(queueproc, q) < 0){ + seterr(EOk, "can't start write queue slave: %r"); + return -1; + } + if(vtproc(queueproc, q) < 0){ + seterr(EOk, "can't start write queue slave: %r"); + return -1; + } + if(vtproc(queueproc, q) < 0){ + seterr(EOk, "can't start write queue slave: %r"); + return -1; + } + if(vtproc(queueproc, q) < 0){ + seterr(EOk, "can't start write queue slave: %r"); + return -1; + } + } + + return 0; +} + +/* + * queue a lump & it's packet data for writing + */ +int +queuewrite(Lump *u, Packet *p, int creator, uint ms) +{ + LumpQueue *q; + int i; + + trace(TraceProc, "queuewrite"); + i = indexsect(mainindex, u->score); + if(i < 0 || i >= nqs){ + seterr(EBug, "internal error: illegal index section in queuewrite"); + return -1; + } + + q = &lumpqs[i]; + + qlock(&q->lock); + while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){ + trace(TraceProc, "queuewrite sleep"); + rsleep(&q->full); + } + + q->q[q->w].u = u; + q->q[q->w].p = p; + q->q[q->w].creator = creator; + q->q[q->w].ms = ms; + q->q[q->w].gen = gen; + q->w = (q->w + 1) & (MaxLumpQ - 1); + + trace(TraceProc, "queuewrite wakeup"); + rwakeup(&q->empty); + + qunlock(&q->lock); + + return 0; +} + +void +flushqueue(void) +{ + int i; + LumpQueue *q; + + if(!lumpqs) + return; + + trace(TraceProc, "flushqueue"); + + qlock(&glk); + gen++; + qunlock(&glk); + + for(i=0; i<mainindex->nsects; i++){ + q = &lumpqs[i]; + qlock(&q->lock); + while(q->w != q->r && gen - q->q[q->r].gen > 0){ + trace(TraceProc, "flushqueue sleep q%d", i); + rsleep(&q->flush); + } + qunlock(&q->lock); + } +} + +static void +queueproc(void *vq) +{ + LumpQueue *q; + Lump *u; + Packet *p; + int creator; + uint ms; + + threadsetname("queueproc"); + + q = vq; + for(;;){ + qlock(&q->lock); + while(q->w == q->r){ + trace(TraceProc, "queueproc sleep empty"); + rsleep(&q->empty); + } + + u = q->q[q->r].u; + p = q->q[q->r].p; + creator = q->q[q->r].creator; + ms = q->q[q->r].ms; + + q->r = (q->r + 1) & (MaxLumpQ - 1); + trace(TraceProc, "queueproc wakeup flush"); + rwakeupall(&q->flush); + + trace(TraceProc, "queueproc wakeup full"); + rwakeup(&q->full); + + qunlock(&q->lock); + + trace(TraceProc, "queueproc writelump %V", u->score); + if(writeqlump(u, p, creator, ms) < 0) + fprint(2, "failed to write lump for %V: %r", u->score); + trace(TraceProc, "queueproc wrotelump %V", u->score); + + putlump(u); + } +} |