aboutsummaryrefslogtreecommitdiff
path: root/src/cmd/venti/srv/lumpqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cmd/venti/srv/lumpqueue.c')
-rw-r--r--src/cmd/venti/srv/lumpqueue.c187
1 files changed, 187 insertions, 0 deletions
diff --git a/src/cmd/venti/srv/lumpqueue.c b/src/cmd/venti/srv/lumpqueue.c
new file mode 100644
index 00000000..1b03f41c
--- /dev/null
+++ b/src/cmd/venti/srv/lumpqueue.c
@@ -0,0 +1,187 @@
+#include "stdinc.h"
+#include "dat.h"
+#include "fns.h"
+
+typedef struct LumpQueue LumpQueue;
+typedef struct WLump WLump;
+
+enum
+{
+ MaxLumpQ = 1 << 3 /* max. lumps on a single write queue, must be pow 2 */
+};
+
+struct WLump
+{
+ Lump *u;
+ Packet *p;
+ int creator;
+ int gen;
+ uint ms;
+};
+
+struct LumpQueue
+{
+ QLock lock;
+ Rendez flush;
+ Rendez full;
+ Rendez empty;
+ WLump q[MaxLumpQ];
+ int w;
+ int r;
+};
+
+static LumpQueue *lumpqs;
+static int nqs;
+
+static QLock glk;
+static int gen;
+
+static void queueproc(void *vq);
+
+int
+initlumpqueues(int nq)
+{
+ LumpQueue *q;
+
+ int i;
+ nqs = nq;
+
+ lumpqs = MKNZ(LumpQueue, nq);
+
+ for(i = 0; i < nq; i++){
+ q = &lumpqs[i];
+ q->full.l = &q->lock;
+ q->empty.l = &q->lock;
+ q->flush.l = &q->lock;
+
+ if(vtproc(queueproc, q) < 0){
+ seterr(EOk, "can't start write queue slave: %r");
+ return -1;
+ }
+ if(vtproc(queueproc, q) < 0){
+ seterr(EOk, "can't start write queue slave: %r");
+ return -1;
+ }
+ if(vtproc(queueproc, q) < 0){
+ seterr(EOk, "can't start write queue slave: %r");
+ return -1;
+ }
+ if(vtproc(queueproc, q) < 0){
+ seterr(EOk, "can't start write queue slave: %r");
+ return -1;
+ }
+ if(vtproc(queueproc, q) < 0){
+ seterr(EOk, "can't start write queue slave: %r");
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * queue a lump & it's packet data for writing
+ */
+int
+queuewrite(Lump *u, Packet *p, int creator, uint ms)
+{
+ LumpQueue *q;
+ int i;
+
+ trace(TraceProc, "queuewrite");
+ i = indexsect(mainindex, u->score);
+ if(i < 0 || i >= nqs){
+ seterr(EBug, "internal error: illegal index section in queuewrite");
+ return -1;
+ }
+
+ q = &lumpqs[i];
+
+ qlock(&q->lock);
+ while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){
+ trace(TraceProc, "queuewrite sleep");
+ rsleep(&q->full);
+ }
+
+ q->q[q->w].u = u;
+ q->q[q->w].p = p;
+ q->q[q->w].creator = creator;
+ q->q[q->w].ms = ms;
+ q->q[q->w].gen = gen;
+ q->w = (q->w + 1) & (MaxLumpQ - 1);
+
+ trace(TraceProc, "queuewrite wakeup");
+ rwakeup(&q->empty);
+
+ qunlock(&q->lock);
+
+ return 0;
+}
+
+void
+flushqueue(void)
+{
+ int i;
+ LumpQueue *q;
+
+ if(!lumpqs)
+ return;
+
+ trace(TraceProc, "flushqueue");
+
+ qlock(&glk);
+ gen++;
+ qunlock(&glk);
+
+ for(i=0; i<mainindex->nsects; i++){
+ q = &lumpqs[i];
+ qlock(&q->lock);
+ while(q->w != q->r && gen - q->q[q->r].gen > 0){
+ trace(TraceProc, "flushqueue sleep q%d", i);
+ rsleep(&q->flush);
+ }
+ qunlock(&q->lock);
+ }
+}
+
+static void
+queueproc(void *vq)
+{
+ LumpQueue *q;
+ Lump *u;
+ Packet *p;
+ int creator;
+ uint ms;
+
+ threadsetname("queueproc");
+
+ q = vq;
+ for(;;){
+ qlock(&q->lock);
+ while(q->w == q->r){
+ trace(TraceProc, "queueproc sleep empty");
+ rsleep(&q->empty);
+ }
+
+ u = q->q[q->r].u;
+ p = q->q[q->r].p;
+ creator = q->q[q->r].creator;
+ ms = q->q[q->r].ms;
+
+ q->r = (q->r + 1) & (MaxLumpQ - 1);
+ trace(TraceProc, "queueproc wakeup flush");
+ rwakeupall(&q->flush);
+
+ trace(TraceProc, "queueproc wakeup full");
+ rwakeup(&q->full);
+
+ qunlock(&q->lock);
+
+ trace(TraceProc, "queueproc writelump %V", u->score);
+ if(writeqlump(u, p, creator, ms) < 0)
+ fprint(2, "failed to write lump for %V: %r", u->score);
+ trace(TraceProc, "queueproc wrotelump %V", u->score);
+
+ putlump(u);
+ }
+}