aboutsummaryrefslogtreecommitdiff
path: root/src/cmd/venti/srv/buildindex.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cmd/venti/srv/buildindex.c')
-rw-r--r--src/cmd/venti/srv/buildindex.c1018
1 files changed, 895 insertions, 123 deletions
diff --git a/src/cmd/venti/srv/buildindex.c b/src/cmd/venti/srv/buildindex.c
index e70a830d..b6866daf 100644
--- a/src/cmd/venti/srv/buildindex.c
+++ b/src/cmd/venti/srv/buildindex.c
@@ -1,164 +1,936 @@
/*
- * Rebuild the Venti index from scratch.
+ * Rebuild the index from scratch, in place.
*/
-
#include "stdinc.h"
#include "dat.h"
#include "fns.h"
-/*
- * Write a single bucket. Could profit from a big buffer here
- * so that we can absorb sporadic runs of blocks into one write,
- * avoiding disk seeks.
- */
-static int
-writebucket(Index *ix, u32int buck, IBucket *ib, ZBlock *b)
+enum
{
- ISect *is;
+ MinBufSize = 64*1024,
+ MaxBufSize = 4*1024*1024,
+};
- is = ix->sects[indexsect0(ix, buck)];
- if(buck < is->start || buck >= is->stop){
- seterr(EAdmin, "cannot find index section for bucket %lud\n", (ulong)buck);
- return -1;
- }
- buck -= is->start;
+int dumb;
+int errors;
+char **isect;
+int nisect;
+int bloom;
+int zero;
-/*
- qlock(&stats.lock);
- stats.indexwrites++;
- qunlock(&stats.lock);
-*/
- packibucket(ib, b->data, is->bucketmagic);
- return writepart(is->part, is->blockbase + ((u64int)buck << is->blocklog), b->data, is->blocksize);
-}
+u32int isectmem;
+u64int totalbuckets;
+u64int totalclumps;
+Channel *arenadonechan;
+Channel *isectdonechan;
+Index *ix;
-static int
-buildindex(Index *ix, Part *part, u64int off, u64int clumps, int zero)
-{
- IEStream *ies;
- IBucket ib, zib;
- ZBlock *z, *b;
- u32int next, buck;
- int ok;
- uint nbuck;
- u64int found = 0;
-
-/*ZZZ make buffer size configurable */
- b = alloczblock(ix->blocksize, 0, ix->blocksize);
- z = alloczblock(ix->blocksize, 1, ix->blocksize);
- ies = initiestream(part, off, clumps, 64*1024);
- if(b == nil || z == nil || ies == nil){
- ok = 0;
- goto breakout;
- return -1;
- }
- ok = 0;
- next = 0;
- memset(&ib, 0, sizeof ib);
- ib.data = b->data + IBucketSize;
- zib.data = z->data + IBucketSize;
- zib.n = 0;
- nbuck = 0;
- for(;;){
- buck = buildbucket(ix, ies, &ib, ix->blocksize-IBucketSize);
- found += ib.n;
- if(zero){
- for(; next != buck; next++){
- if(next == ix->buckets){
- if(buck != TWID32){
- fprint(2, "bucket out of range\n");
- ok = -1;
- }
- goto breakout;
- }
- if(writebucket(ix, next, &zib, z) < 0){
- fprint(2, "can't write zero bucket to buck=%d: %r", next);
- ok = -1;
- }
- }
- }
- if(buck >= ix->buckets){
- if(buck == TWID32)
- break;
- fprint(2, "bucket out of range\n");
- ok = -1;
- goto breakout;
- }
- if(writebucket(ix, buck, &ib, b) < 0){
- fprint(2, "bad bucket found=%lld: %r\n", found);
- ok = -1;
- }
- next = buck + 1;
- if(++nbuck%10000 == 0)
- fprint(2, "\t%,d buckets written...\n", nbuck);
- }
-breakout:;
- fprint(2, "wrote index with %lld entries\n", found);
- freeiestream(ies);
- freezblock(z);
- freezblock(b);
- return ok;
-}
+u64int arenaentries;
+u64int skipentries;
+u64int indexentries;
+
+static int shouldprocess(ISect*);
+static void isectproc(void*);
+static void arenapartproc(void*);
void
usage(void)
{
- fprint(2, "usage: buildindex [-Z] [-B blockcachesize] config tmppart\n");
- threadexitsall(0);
+ fprint(2, "usage: buildindex [-b] [-i isect]... [-M imem] venti.conf\n");
+ threadexitsall("usage");
}
-Config conf;
-
void
threadmain(int argc, char *argv[])
{
- Part *part;
- u64int clumps, base;
- u32int bcmem;
- int zero;
-
- zero = 1;
- bcmem = 0;
+ int fd, i, napart;
+ u32int bcmem, imem;
+ Config conf;
+ Part *p;
+
ventifmtinstall();
+ imem = 256*1024*1024;
ARGBEGIN{
- case 'B':
- bcmem = unittoull(ARGF());
+ case 'b':
+ bloom = 1;
+ break;
+ case 'i':
+ isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0]));
+ isect[nisect++] = EARGF(usage());
break;
- case 'Z':
- zero = 0;
+ case 'd': /* debugging - make sure to run all 3 passes */
+ dumb = 1;
+ break;
+ case 'M':
+ imem = unittoull(EARGF(usage()));
break;
default:
usage();
break;
}ARGEND
-
- if(argc != 2)
+
+ if(argc != 1)
usage();
if(initventi(argv[0], &conf) < 0)
sysfatal("can't init venti: %r");
+ ix = mainindex;
+ if(nisect == 0 && ix->bloom)
+ bloom = 1;
+ if(bloom && ix->bloom && resetbloom(ix->bloom) < 0)
+ sysfatal("loadbloom: %r");
+ if(bloom && !ix->bloom)
+ sysfatal("-b specified but no bloom filter");
+ if(!bloom)
+ ix->bloom = nil;
+ isectmem = imem/ix->nsects;
- if(bcmem < maxblocksize * (mainindex->narenas + mainindex->nsects * 4 + 16))
- bcmem = maxblocksize * (mainindex->narenas + mainindex->nsects * 4 + 16);
+ /*
+ * safety first - only need read access to arenas
+ */
+ p = nil;
+ for(i=0; i<ix->narenas; i++){
+ if(ix->arenas[i]->part != p){
+ p = ix->arenas[i]->part;
+ if((fd = open(p->filename, OREAD)) < 0)
+ sysfatal("cannot reopen %s: %r", p->filename);
+ dup(fd, p->fd);
+ close(fd);
+ }
+ }
+
+ /*
+ * need a block for every arena
+ */
+ bcmem = maxblocksize * (mainindex->narenas + 16);
if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
initdcache(bcmem);
+
+ totalclumps = 0;
+ for(i=0; i<ix->narenas; i++)
+ totalclumps += ix->arenas[i]->diskstats.clumps;
+
+ totalbuckets = 0;
+ for(i=0; i<ix->nsects; i++)
+ totalbuckets += ix->sects[i]->blocks;
+ fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets);
+
+ /* start index procs */
+ fprint(2, "%T read index\n");
+ isectdonechan = chancreate(sizeof(void*), 0);
+ for(i=0; i<ix->nsects; i++){
+ if(shouldprocess(ix->sects[i]))
+ ix->sects[i]->writechan = chancreate(sizeof(IEntry), 0);
+ vtproc(isectproc, ix->sects[i]);
+ }
+
+ for(i=0; i<nisect; i++)
+ if(isect[i])
+ fprint(2, "warning: did not find index section %s\n", isect[i]);
+
+ /* start arena procs */
+ p = nil;
+ napart = 0;
+ arenadonechan = chancreate(sizeof(void*), 0);
+ for(i=0; i<ix->narenas; i++){
+ if(ix->arenas[i]->part != p){
+ p = ix->arenas[i]->part;
+ vtproc(arenapartproc, p);
+ napart++;
+ }
+ }
+
+ /* wait for arena procs to finish */
+ for(i=0; i<napart; i++)
+ recvp(arenadonechan);
+
+ /* tell index procs to finish */
+ for(i=0; i<ix->nsects; i++)
+ if(ix->sects[i]->writechan)
+ send(ix->sects[i]->writechan, nil);
+
+ /* wait for index procs to finish */
+ for(i=0; i<ix->nsects; i++)
+ if(ix->sects[i]->writechan)
+ recvp(isectdonechan);
+
+ if(ix->bloom && writebloom(ix->bloom) < 0)
+ fprint(2, "writing bloom filter: %r\n");
+
+ fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n",
+ arenaentries, indexentries, skipentries);
+ threadexitsall(nil);
+}
+
+static int
+shouldprocess(ISect *is)
+{
+ int i;
+
+ if(nisect == 0)
+ return 1;
+
+ for(i=0; i<nisect; i++)
+ if(isect[i] && strcmp(isect[i], is->name) == 0){
+ isect[i] = nil;
+ return 1;
+ }
+ return 0;
+}
+
+static void
+add(u64int *a, u64int n)
+{
+ static Lock l;
+
+ lock(&l);
+ *a += n;
+ unlock(&l);
+}
+
+/*
+ * Read through an arena partition and send each of its IEntries
+ * to the appropriate index section. When finished, send on
+ * arenadonechan.
+ */
+enum
+{
+ ClumpChunks = 32*1024,
+};
+static void
+arenapartproc(void *v)
+{
+ int i, j, n, nskip, x;
+ u32int clump;
+ u64int addr, tot;
+ Arena *a;
+ ClumpInfo *ci, *cis;
+ IEntry ie;
+ Part *p;
+
+ p = v;
+ threadsetname("arenaproc %s", p->name);
+
+ nskip = 0;
+ tot = 0;
+ cis = MKN(ClumpInfo, ClumpChunks);
+ for(i=0; i<ix->narenas; i++){
+ a = ix->arenas[i];
+ if(a->part != p)
+ continue;
+ if(a->memstats.clumps)
+ fprint(2, "%T arena %s: %d entries\n",
+ a->name, a->memstats.clumps);
+ addr = ix->amap[i].start;
+ for(clump=0; clump<a->memstats.clumps; clump+=n){
+ n = ClumpChunks;
+ if(n > a->memstats.clumps - clump)
+ n = a->memstats.clumps - clump;
+ if(readclumpinfos(a, clump, cis, n) != n){
+ fprint(2, "%T arena %s: directory read: %r\n", a->name);
+ errors = 1;
+ break;
+ }
+ for(j=0; j<n; j++){
+ ci = &cis[j];
+ ie.ia.type = ci->type;
+ ie.ia.size = ci->uncsize;
+ ie.ia.addr = addr;
+ addr += ci->size + ClumpSize;
+ ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog;
+ scorecp(ie.score, ci->score);
+ if(ci->type == VtCorruptType)
+ nskip++;
+ else{
+ tot++;
+ x = indexsect(ix, ie.score);
+ assert(0 <= x && x < ix->nsects);
+ if(ix->sects[x]->writechan)
+ send(ix->sects[x]->writechan, &ie);
+ if(ix->bloom)
+ markbloomfilter(ix->bloom, ie.score);
+ }
+ }
+ }
+ }
+ add(&arenaentries, tot);
+ add(&skipentries, nskip);
+ sendp(arenadonechan, p);
+}
+
+/*
+ * Convert score into relative bucket number in isect.
+ * Can pass a packed ientry instead of score - score is first.
+ */
+static u32int
+score2bucket(ISect *is, uchar *score)
+{
+ u32int b;
+
+ b = hashbits(score, 32)/ix->div;
+ assert(is->start <= b && b < is->stop);
+ return b - is->start;
+}
+
+/*
+ * Convert offset in index section to bucket number.
+ */
+static u32int
+offset2bucket(ISect *is, u64int offset)
+{
+ u32int b;
+
+ assert(is->blockbase <= offset);
+ offset -= is->blockbase;
+ b = offset/is->blocksize;
+ assert(b < is->stop-is->start);
+ return b;
+}
+
+/*
+ * Convert bucket number to offset.
+ */
+static u64int
+bucket2offset(ISect *is, u32int b)
+{
+ assert(b <= is->stop-is->start);
+ return is->blockbase + (u64int)b*is->blocksize;
+}
+
+/*
+ * IEntry buffers to hold initial round of spraying.
+ */
+typedef struct Buf Buf;
+struct Buf
+{
+ Part *part; /* partition being written */
+ uchar *bp; /* current block */
+ uchar *ep; /* end of block */
+ uchar *wp; /* write position in block */
+ u64int boffset; /* start offset */
+ u64int woffset; /* next write offset */
+ u64int eoffset; /* end offset */
+ u32int nentry; /* number of entries written */
+};
+
+static void
+bflush(Buf *buf)
+{
+ u32int bufsize;
+
+ if(buf->woffset >= buf->eoffset)
+ sysfatal("buf index chunk overflow - need bufger index");
+ bufsize = buf->ep - buf->bp;
+ if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){
+ fprint(2, "write %s: %r\n", buf->part->name);
+ errors = 1;
+ }
+ buf->woffset += bufsize;
+ memset(buf->bp, 0, bufsize);
+ buf->wp = buf->bp;
+}
+
+static void
+bwrite(Buf *buf, IEntry *ie)
+{
+ if(buf->wp+IEntrySize > buf->ep)
+ bflush(buf);
+ assert(buf->bp <= buf->wp && buf->wp < buf->ep);
+ packientry(ie, buf->wp);
+ buf->wp += IEntrySize;
+ assert(buf->bp <= buf->wp && buf->wp <= buf->ep);
+ buf->nentry++;
+}
+
+/*
+ * Minibuffer. In-memory data structure holds our place
+ * in the buffer but has no block data. We are writing and
+ * reading the minibuffers at the same time. (Careful!)
+ */
+typedef struct Minibuf Minibuf;
+struct Minibuf
+{
+ u64int boffset; /* start offset */
+ u64int roffset; /* read offset */
+ u64int woffset; /* write offset */
+ u64int eoffset; /* end offset */
+ u32int nentry; /* # entries left to read */
+ u32int nwentry; /* # entries written */
+};
+
+/*
+ * Index entry pool. Used when trying to shuffle around
+ * the entries in a big buffer into the corresponding M minibuffers.
+ * Sized to hold M*EntriesPerBlock entries, so that there will always
+ * either be room in the pool for another block worth of entries
+ * or there will be an entire block worth of sorted entries to
+ * write out.
+ */
+typedef struct IEntryLink IEntryLink;
+typedef struct IPool IPool;
+
+struct IEntryLink
+{
+ uchar ie[IEntrySize]; /* raw IEntry */
+ IEntryLink *next; /* next in chain */
+};
+
+struct IPool
+{
+ ISect *isect;
+ u32int buck0; /* first bucket in pool */
+ u32int mbufbuckets; /* buckets per minibuf */
+ IEntryLink *entry; /* all IEntryLinks */
+ u32int nentry; /* # of IEntryLinks */
+ IEntryLink *free; /* free list */
+ u32int nfree; /* # on free list */
+ Minibuf *mbuf; /* all minibufs */
+ u32int nmbuf; /* # of minibufs */
+ IEntryLink **mlist; /* lists for each minibuf */
+ u32int *mcount; /* # on each mlist[i] */
+ u32int bufsize; /* block buffer size */
+ uchar *rbuf; /* read buffer */
+ uchar *wbuf; /* write buffer */
+ u32int epbuf; /* entries per block buffer */
+};
+
+/*
+static int
+countsokay(IPool *p)
+{
+ int i;
+ u64int n;
+
+ n = 0;
+ for(i=0; i<p->nmbuf; i++)
+ n += p->mcount[i];
+ n += p->nfree;
+ if(n != p->nentry){
+ print("free %ud:", p->nfree);
+ for(i=0; i<p->nmbuf; i++)
+ print(" %ud", p->mcount[i]);
+ print(" = %lld nentry: %ud\n", n, p->nentry);
+ }
+ return n == p->nentry;
+}
+*/
- fprint(2, "building a new index %s using %s for temporary storage\n", mainindex->name, argv[1]);
+static IPool*
+mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf,
+ u32int mbufbuckets, u32int bufsize)
+{
+ u32int i, nentry;
+ uchar *data;
+ IPool *p;
+ IEntryLink *l;
+
+ nentry = (nmbuf+1)*bufsize / IEntrySize;
+ p = ezmalloc(sizeof(IPool)
+ +nentry*sizeof(IEntry)
+ +nmbuf*sizeof(IEntryLink*)
+ +nmbuf*sizeof(u32int)
+ +3*bufsize);
+
+ p->isect = isect;
+ p->mbufbuckets = mbufbuckets;
+ p->bufsize = bufsize;
+ p->entry = (IEntryLink*)(p+1);
+ p->nentry = nentry;
+ p->mlist = (IEntryLink**)(p->entry+nentry);
+ p->mcount = (u32int*)(p->mlist+nmbuf);
+ p->nmbuf = nmbuf;
+ p->mbuf = mbuf;
+ data = (uchar*)(p->mcount+nmbuf);
+ data += bufsize - (u32int)data%bufsize;
+ p->rbuf = data;
+ p->wbuf = data+bufsize;
+ p->epbuf = bufsize/IEntrySize;
- part = initpart(argv[1], ORDWR|ODIRECT);
- if(part == nil)
- sysfatal("can't initialize temporary partition: %r");
+ for(i=0; i<p->nentry; i++){
+ l = &p->entry[i];
+ l->next = p->free;
+ p->free = l;
+ p->nfree++;
+ }
+ return p;
+}
- clumps = sortrawientries(mainindex, part, &base, mainindex->bloom);
- if(clumps == TWID64)
- sysfatal("can't build sorted index: %r");
- fprint(2, "found and sorted index entries for clumps=%lld at %lld\n", clumps, base);
+/*
+ * Add the index entry ie to the pool p.
+ * Caller must know there is room.
+ */
+static void
+ipoolinsert(IPool *p, uchar *ie)
+{
+ u32int buck, x;
+ IEntryLink *l;
+
+ assert(p->free != nil);
+
+ buck = score2bucket(p->isect, ie);
+ x = (buck-p->buck0) / p->mbufbuckets;
+ if(x >= p->nmbuf){
+ fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n",
+ buck, p->mbufbuckets, x);
+ }
+ assert(x < p->nmbuf);
- if(buildindex(mainindex, part, base, clumps, zero) < 0)
- sysfatal("can't build new index: %r");
+ l = p->free;
+ p->free = l->next;
+ p->nfree--;
+ memmove(l->ie, ie, IEntrySize);
+ l->next = p->mlist[x];
+ p->mlist[x] = l;
+ p->mcount[x]++;
+}
+
+/*
+ * Pull out a block containing as many
+ * entries as possible for minibuffer x.
+ */
+static u32int
+ipoolgetbuf(IPool *p, u32int x)
+{
+ uchar *bp, *ep, *wp;
+ IEntryLink *l;
+ u32int n;
+
+ bp = p->wbuf;
+ ep = p->wbuf + p->bufsize;
+ n = 0;
+ assert(x < p->nmbuf);
+ for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){
+ l = p->mlist[x];
+ p->mlist[x] = l->next;
+ p->mcount[x]--;
+ memmove(wp, l->ie, IEntrySize);
+ l->next = p->free;
+ p->free = l;
+ p->nfree++;
+ n++;
+ }
+ memset(wp, 0, ep-wp);
+ return n;
+}
+
+/*
+ * Read a block worth of entries from the minibuf
+ * into the pool. Caller must know there is room.
+ */
+static void
+ipoolloadblock(IPool *p, Minibuf *mb)
+{
+ u32int i, n;
- if(mainindex->bloom)
- writebloom(mainindex->bloom);
+ assert(mb->nentry > 0);
+ assert(mb->roffset >= mb->woffset);
+ assert(mb->roffset < mb->eoffset);
- threadexitsall(0);
+ n = p->bufsize/IEntrySize;
+ if(n > mb->nentry)
+ n = mb->nentry;
+ if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0)
+ fprint(2, "readpart %s: %r\n", p->isect->part->name);
+ else{
+ for(i=0; i<n; i++)
+ ipoolinsert(p, p->rbuf+i*IEntrySize);
+ }
+ mb->nentry -= n;
+ mb->roffset += p->bufsize;
}
+
+/*
+ * Write out a block worth of entries to minibuffer x.
+ * If necessary, pick up the data there before overwriting it.
+ */
+static void
+ipoolflush0(IPool *pool, u32int x)
+{
+ u32int bufsize;
+ Minibuf *mb;
+
+ mb = pool->mbuf+x;
+ bufsize = pool->bufsize;
+ mb->nwentry += ipoolgetbuf(pool, x);
+ if(mb->nentry > 0 && mb->roffset == mb->woffset){
+ assert(pool->nfree >= pool->bufsize/IEntrySize);
+ /*
+ * There will be room in the pool -- we just
+ * removed a block worth.
+ */
+ ipoolloadblock(pool, mb);
+ }
+ if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0)
+ fprint(2, "writepart %s: %r\n", pool->isect->part->name);
+ mb->woffset += bufsize;
+}
+
+/*
+ * Write out some full block of entries.
+ * (There must be one -- the pool is almost full!)
+ */
+static void
+ipoolflush1(IPool *pool)
+{
+ u32int i;
+
+ assert(pool->nfree <= pool->epbuf);
+
+ for(i=0; i<pool->nmbuf; i++){
+ if(pool->mcount[i] >= pool->epbuf){
+ ipoolflush0(pool, i);
+ return;
+ }
+ }
+ /* can't be reached - someone must be full */
+ sysfatal("ipoolflush1");
+}
+
+/*
+ * Flush all the entries in the pool out to disk.
+ * Nothing more to read from disk.
+ */
+static void
+ipoolflush(IPool *pool)
+{
+ u32int i;
+
+ for(i=0; i<pool->nmbuf; i++)
+ while(pool->mlist[i])
+ ipoolflush0(pool, i);
+ assert(pool->nfree == pool->nentry);
+}
+
+/*
+ * Third pass. Pick up each minibuffer from disk into
+ * memory and then write out the buckets.
+ */
+
+/*
+ * Compare two packed index entries.
+ * Usual ordering except break ties by putting higher
+ * index addresses first (assumes have duplicates
+ * due to corruption in the lower addresses).
+ */
+static int
+ientrycmpaddr(const void *va, const void *vb)
+{
+ int i;
+ uchar *a, *b;
+
+ a = (uchar*)va;
+ b = (uchar*)vb;
+ i = ientrycmp(a, b);
+ if(i)
+ return i;
+ return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8);
+}
+
+static void
+zerorange(Part *p, u64int o, u64int e)
+{
+ static uchar zero[MaxIoSize];
+ u32int n;
+
+ for(; o<e; o+=n){
+ n = sizeof zero;
+ if(o+n > e)
+ n = e-o;
+ if(writepart(p, o, zero, n) < 0)
+ fprint(2, "writepart %s: %r\n", p->name);
+ }
+}
+
+/*
+ * Load a minibuffer into memory and write out the
+ * corresponding buckets.
+ */
+static void
+sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize)
+{
+ uchar *buckdata, *p, *q, *ep;
+ u32int b, lastb, memsize, n;
+ u64int o;
+ IBucket ib;
+ Part *part;
+
+ part = is->part;
+ buckdata = emalloc(is->blocksize);
+
+ if(mb->nwentry == 0)
+ return;
+
+ /*
+ * read entire buffer.
+ */
+ assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset);
+ assert(mb->woffset-mb->boffset <= nbuf);
+ if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){
+ fprint(2, "readpart %s: %r\n", part->name);
+ errors = 1;
+ return;
+ }
+ assert(*(uint*)buf != 0xa5a5a5a5);
+
+ /*
+ * remove fragmentation due to IEntrySize
+ * not evenly dividing Bufsize
+ */
+ memsize = (bufsize/IEntrySize)*IEntrySize;
+ for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){
+ memmove(p, q, memsize);
+ p += memsize;
+ q += bufsize;
+ }
+ ep = buf + mb->nwentry*IEntrySize;
+ assert(ep <= buf+nbuf);
+
+ /*
+ * sort entries
+ */
+ qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr);
+
+ /*
+ * write buckets out
+ */
+ n = 0;
+ lastb = offset2bucket(is, mb->boffset);
+ for(p=buf; p<ep; p=q){
+ b = score2bucket(is, p);
+ for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize)
+ ;
+ if(lastb+1 < b && zero)
+ zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b));
+ if(IBucketSize+(q-p) > is->blocksize)
+ sysfatal("bucket overflow - make index bigger");
+ memmove(buckdata+IBucketSize, p, q-p);
+ ib.n = (q-p)/IEntrySize;
+ n += ib.n;
+ packibucket(&ib, buckdata, is->bucketmagic);
+ if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0)
+ fprint(2, "write %s: %r\n", part->name);
+ lastb = b;
+ }
+ if(lastb+1 < is->stop-is->start && zero)
+ zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start));
+
+ if(n != mb->nwentry)
+ fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%ld\n", n, mb->nwentry, (ep-buf)/IEntrySize);
+
+ free(buckdata);
+}
+
+static void
+isectproc(void *v)
+{
+ u32int buck, bufbuckets, bufsize, epbuf, i, j;
+ u32int mbufbuckets, n, nbucket, nn, space;
+ u32int nbuf, nminibuf, xminiclump, prod;
+ u64int blocksize, offset, xclump;
+ uchar *data, *p;
+ Buf *buf;
+ IEntry ie;
+ IPool *ipool;
+ ISect *is;
+ Minibuf *mbuf, *mb;
+
+ is = v;
+ blocksize = is->blocksize;
+ nbucket = is->stop - is->start;
+
+ /*
+ * Three passes:
+ * pass 1 - write index entries from arenas into
+ * large sequential sections on index disk.
+ * requires nbuf * bufsize memory.
+ *
+ * pass 2 - split each section into minibufs.
+ * requires nminibuf * bufsize memory.
+ *
+ * pass 3 - read each minibuf into memory and
+ * write buckets out.
+ * requires entries/minibuf * IEntrySize memory.
+ *
+ * The larger we set bufsize the less seeking hurts us.
+ *
+ * The fewer sections and minibufs we have, the less
+ * seeking hurts us.
+ *
+ * The fewer sections and minibufs we have, the
+ * more entries we end up with in each minibuf
+ * at the end.
+ *
+ * Shoot for using half our memory to hold each
+ * minibuf. The chance of a random distribution
+ * getting off by 2x is quite low.
+ *
+ * Once that is decided, figure out the smallest
+ * nminibuf and nsection/biggest bufsize we can use
+ * and still fit in the memory constraints.
+ */
+
+ /* expected number of clump index entries we'll see */
+ xclump = nbucket * (double)totalclumps/totalbuckets;
+
+ /* number of clumps we want to see in a minibuf */
+ xminiclump = isectmem/2/IEntrySize;
+
+ /* total number of minibufs we need */
+ prod = xclump / xminiclump;
+
+ /* if possible, skip second pass */
+ if(!dumb && prod*MinBufSize < isectmem){
+ nbuf = prod;
+ nminibuf = 1;
+ }else{
+ /* otherwise use nsection = sqrt(nmini) */
+ for(nbuf=1; nbuf*nbuf<prod; nbuf++)
+ ;
+ if(nbuf*MinBufSize > isectmem)
+ sysfatal("not enough memory");
+ nminibuf = nbuf;
+ }
+ /* size buffer to use extra memory */
+ bufsize = MinBufSize;
+ while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize)
+ bufsize *= 2;
+ data = emalloc(nbuf*bufsize);
+ epbuf = bufsize/IEntrySize;
+
+ fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n",
+ is->part->name, nbucket, nbuf, nminibuf, bufsize);
+ /*
+ * Accept index entries from arena procs.
+ */
+ buf = MKNZ(Buf, nbuf);
+ p = data;
+ offset = is->blockbase;
+ bufbuckets = (nbucket+nbuf-1)/nbuf;
+ for(i=0; i<nbuf; i++){
+ buf[i].part = is->part;
+ buf[i].bp = p;
+ buf[i].wp = p;
+ p += bufsize;
+ buf[i].ep = p;
+ buf[i].boffset = offset;
+ buf[i].woffset = offset;
+ if(i < nbuf-1){
+ offset += bufbuckets*blocksize;
+ buf[i].eoffset = offset;
+ }else{
+ offset = is->blockbase + nbucket*blocksize;
+ buf[i].eoffset = offset;
+ }
+ }
+ assert(p == data+nbuf*bufsize);
+
+ n = 0;
+ while(recv(is->writechan, &ie) == 1){
+ if(ie.ia.addr == 0)
+ break;
+ buck = score2bucket(is, ie.score);
+ i = buck/bufbuckets;
+ assert(i < nbuf);
+ bwrite(&buf[i], &ie);
+ n++;
+ }
+ add(&indexentries, n);
+
+ nn = 0;
+ for(i=0; i<nbuf; i++){
+ bflush(&buf[i]);
+ buf[i].bp = nil;
+ buf[i].ep = nil;
+ buf[i].wp = nil;
+ nn += buf[i].nentry;
+ }
+ if(n != nn)
+ fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn);
+
+ free(data);
+
+ fprint(2, "%T %s: reordering\n", is->part->name);
+
+ /*
+ * Rearrange entries into minibuffers and then
+ * split each minibuffer into buckets.
+ */
+ mbuf = MKN(Minibuf, nminibuf);
+ mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf;
+ for(i=0; i<nbuf; i++){
+ /*
+ * Set up descriptors.
+ */
+ n = buf[i].nentry;
+ nn = 0;
+ offset = buf[i].boffset;
+ memset(mbuf, 0, nminibuf*sizeof(mbuf[0]));
+ for(j=0; j<nminibuf; j++){
+ mb = &mbuf[j];
+ mb->boffset = offset;
+ if(j < nminibuf-1){
+ offset += mbufbuckets*blocksize;
+ mb->eoffset = offset;
+ }else
+ mb->eoffset = buf[i].eoffset;
+ mb->roffset = mb->boffset;
+ mb->woffset = mb->boffset;
+ mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize;
+ if(mb->nentry > buf[i].nentry)
+ mb->nentry = buf[i].nentry;
+ buf[i].nentry -= mb->nentry;
+ nn += mb->nentry;
+ }
+ if(n != nn)
+ fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);;
+ /*
+ * Rearrange.
+ */
+ if(!dumb && nminibuf == 1){
+ mbuf[0].nwentry = mbuf[0].nentry;
+ mbuf[0].woffset = buf[i].woffset;
+ }else{
+ ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize);
+ ipool->buck0 = bufbuckets*i;
+ for(j=0; j<nminibuf; j++){
+ mb = &mbuf[j];
+ while(mb->nentry > 0){
+ if(ipool->nfree < epbuf){
+ ipoolflush1(ipool);
+ /* ipoolflush1 might change mb->nentry */
+ continue;
+ }
+ assert(ipool->nfree >= epbuf);
+ ipoolloadblock(ipool, mb);
+ }
+ }
+ ipoolflush(ipool);
+ nn = 0;
+ for(j=0; j<nminibuf; j++)
+ nn += mbuf[j].nwentry;
+ if(n != nn)
+ fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i);
+ free(ipool);
+ }
+
+ /*
+ * Make buckets.
+ */
+ space = 0;
+ for(j=0; j<nminibuf; j++)
+ if(space < mbuf[j].woffset - mbuf[j].boffset)
+ space = mbuf[j].woffset - mbuf[j].boffset;
+
+ data = emalloc(space);
+ for(j=0; j<nminibuf; j++){
+ mb = &mbuf[j];
+ sortminibuffer(is, mb, data, space, bufsize);
+ }
+ free(data);
+ }
+
+ sendp(isectdonechan, is);
+}
+
+
+