/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */ /* See COPYRIGHT */ /* * Generic RPC packet multiplexor. Inspired by but not derived from * Plan 9 kernel. Originally developed as part of Tra, later used in * libnventi, and then finally split out into a generic library. */ #include <u.h> #include <libc.h> #include <mux.h> static int gettag(Mux*, Muxrpc*); static void puttag(Mux*, Muxrpc*); static void enqueue(Mux*, Muxrpc*); static void dequeue(Mux*, Muxrpc*); void muxinit(Mux *mux) { memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk)); mux->tagrend.l = &mux->lk; mux->rpcfork.l = &mux->lk; mux->sleep.next = &mux->sleep; mux->sleep.prev = &mux->sleep; } void* muxrpc(Mux *mux, void *tx) { int tag; Muxrpc *r, *r2; void *p; /* must malloc because stack could be private */ r = mallocz(sizeof(Muxrpc), 1); if(r == nil){ werrstr("mallocz: %r"); return nil; } r->r.l = &mux->lk; /* assign the tag, add selves to response queue */ qlock(&mux->lk); tag = gettag(mux, r); //print("gettag %p %d\n", r, tag); enqueue(mux, r); qunlock(&mux->lk); /* actually send the packet */ if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){ werrstr("settag/send tag %d: %r", tag); fprint(2, "%r\n"); qlock(&mux->lk); dequeue(mux, r); puttag(mux, r); qunlock(&mux->lk); return nil; } qlock(&mux->lk); /* wait for our packet */ while(mux->muxer && !r->p){ rsleep(&r->r); } /* if not done, there's no muxer: start muxing */ if(!r->p){ if(mux->muxer) abort(); mux->muxer = 1; while(!r->p){ qunlock(&mux->lk); p = _muxrecv(mux); if(p) tag = mux->gettag(mux, p) - mux->mintag; else tag = ~0; //print("mux tag %d\n", tag); qlock(&mux->lk); if(p == nil){ /* eof -- just give up and pass the buck */ dequeue(mux, r); break; } /* hand packet to correct sleeper */ if(tag < 0 || tag >= mux->mwait){ fprint(2, "%s: bad rpc tag %ux\n", argv0, tag); /* must leak packet! don't know how to free it! */ continue; } r2 = mux->wait[tag]; if(r2 == nil || r2->prev == nil){ fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag); /* must leak packet! don't know how to free it! */ continue; } r2->p = p; dequeue(mux, r2); rwakeup(&r2->r); } mux->muxer = 0; /* if there is anyone else sleeping, wake them to mux */ if(mux->sleep.next != &mux->sleep) rwakeup(&mux->sleep.next->r); } //print("finished %p\n", r); p = r->p; puttag(mux, r); qunlock(&mux->lk); if(p == nil) werrstr("unexpected eof"); return p; } static void enqueue(Mux *mux, Muxrpc *r) { r->next = mux->sleep.next; r->prev = &mux->sleep; r->next->prev = r; r->prev->next = r; } static void dequeue(Mux *mux, Muxrpc *r) { r->next->prev = r->prev; r->prev->next = r->next; r->prev = nil; r->next = nil; } static int gettag(Mux *mux, Muxrpc *r) { int i, mw; Muxrpc **w; for(;;){ /* wait for a free tag */ while(mux->nwait == mux->mwait){ if(mux->mwait < mux->maxtag-mux->mintag){ mw = mux->mwait; if(mw == 0) mw = 1; else mw <<= 1; w = realloc(mux->wait, mw*sizeof(w[0])); if(w == nil) return -1; memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0])); mux->wait = w; mux->freetag = mux->mwait; mux->mwait = mw; break; } rsleep(&mux->tagrend); } i=mux->freetag; if(mux->wait[i] == 0) goto Found; for(; i<mux->mwait; i++) if(mux->wait[i] == 0) goto Found; for(i=0; i<mux->freetag; i++) if(mux->wait[i] == 0) goto Found; /* should not fall out of while without free tag */ fprint(2, "libfs: nwait botch\n"); abort(); } Found: mux->nwait++; mux->wait[i] = r; r->tag = i+mux->mintag; return r->tag; } static void puttag(Mux *mux, Muxrpc *r) { int i; i = r->tag - mux->mintag; assert(mux->wait[i] == r); mux->wait[i] = nil; mux->nwait--; mux->freetag = i; rwakeup(&mux->tagrend); free(r); }