diff options
author | rsc <devnull@localhost> | 2006-07-18 15:26:33 +0000 |
---|---|---|
committer | rsc <devnull@localhost> | 2006-07-18 15:26:33 +0000 |
commit | 28b49df3542a635cca788f3de213385f3fcb6334 (patch) | |
tree | a3a30774249929e66988bf77e76df9459acb50bc /src/cmd/venti/srv/buildindex.c | |
parent | 686bd37d9d8db5e3b969a3aa2d5b455e0976b262 (diff) | |
download | plan9port-28b49df3542a635cca788f3de213385f3fcb6334.tar.gz plan9port-28b49df3542a635cca788f3de213385f3fcb6334.tar.bz2 plan9port-28b49df3542a635cca788f3de213385f3fcb6334.zip |
assorted changes from Plan 9
Diffstat (limited to 'src/cmd/venti/srv/buildindex.c')
-rw-r--r-- | src/cmd/venti/srv/buildindex.c | 1018 |
1 files changed, 895 insertions, 123 deletions
diff --git a/src/cmd/venti/srv/buildindex.c b/src/cmd/venti/srv/buildindex.c index e70a830d..b6866daf 100644 --- a/src/cmd/venti/srv/buildindex.c +++ b/src/cmd/venti/srv/buildindex.c @@ -1,164 +1,936 @@ /* - * Rebuild the Venti index from scratch. + * Rebuild the index from scratch, in place. */ - #include "stdinc.h" #include "dat.h" #include "fns.h" -/* - * Write a single bucket. Could profit from a big buffer here - * so that we can absorb sporadic runs of blocks into one write, - * avoiding disk seeks. - */ -static int -writebucket(Index *ix, u32int buck, IBucket *ib, ZBlock *b) +enum { - ISect *is; + MinBufSize = 64*1024, + MaxBufSize = 4*1024*1024, +}; - is = ix->sects[indexsect0(ix, buck)]; - if(buck < is->start || buck >= is->stop){ - seterr(EAdmin, "cannot find index section for bucket %lud\n", (ulong)buck); - return -1; - } - buck -= is->start; +int dumb; +int errors; +char **isect; +int nisect; +int bloom; +int zero; -/* - qlock(&stats.lock); - stats.indexwrites++; - qunlock(&stats.lock); -*/ - packibucket(ib, b->data, is->bucketmagic); - return writepart(is->part, is->blockbase + ((u64int)buck << is->blocklog), b->data, is->blocksize); -} +u32int isectmem; +u64int totalbuckets; +u64int totalclumps; +Channel *arenadonechan; +Channel *isectdonechan; +Index *ix; -static int -buildindex(Index *ix, Part *part, u64int off, u64int clumps, int zero) -{ - IEStream *ies; - IBucket ib, zib; - ZBlock *z, *b; - u32int next, buck; - int ok; - uint nbuck; - u64int found = 0; - -/*ZZZ make buffer size configurable */ - b = alloczblock(ix->blocksize, 0, ix->blocksize); - z = alloczblock(ix->blocksize, 1, ix->blocksize); - ies = initiestream(part, off, clumps, 64*1024); - if(b == nil || z == nil || ies == nil){ - ok = 0; - goto breakout; - return -1; - } - ok = 0; - next = 0; - memset(&ib, 0, sizeof ib); - ib.data = b->data + IBucketSize; - zib.data = z->data + IBucketSize; - zib.n = 0; - nbuck = 0; - for(;;){ - buck = buildbucket(ix, ies, &ib, ix->blocksize-IBucketSize); - found += ib.n; - if(zero){ - for(; next != buck; next++){ - if(next == ix->buckets){ - if(buck != TWID32){ - fprint(2, "bucket out of range\n"); - ok = -1; - } - goto breakout; - } - if(writebucket(ix, next, &zib, z) < 0){ - fprint(2, "can't write zero bucket to buck=%d: %r", next); - ok = -1; - } - } - } - if(buck >= ix->buckets){ - if(buck == TWID32) - break; - fprint(2, "bucket out of range\n"); - ok = -1; - goto breakout; - } - if(writebucket(ix, buck, &ib, b) < 0){ - fprint(2, "bad bucket found=%lld: %r\n", found); - ok = -1; - } - next = buck + 1; - if(++nbuck%10000 == 0) - fprint(2, "\t%,d buckets written...\n", nbuck); - } -breakout:; - fprint(2, "wrote index with %lld entries\n", found); - freeiestream(ies); - freezblock(z); - freezblock(b); - return ok; -} +u64int arenaentries; +u64int skipentries; +u64int indexentries; + +static int shouldprocess(ISect*); +static void isectproc(void*); +static void arenapartproc(void*); void usage(void) { - fprint(2, "usage: buildindex [-Z] [-B blockcachesize] config tmppart\n"); - threadexitsall(0); + fprint(2, "usage: buildindex [-b] [-i isect]... [-M imem] venti.conf\n"); + threadexitsall("usage"); } -Config conf; - void threadmain(int argc, char *argv[]) { - Part *part; - u64int clumps, base; - u32int bcmem; - int zero; - - zero = 1; - bcmem = 0; + int fd, i, napart; + u32int bcmem, imem; + Config conf; + Part *p; + ventifmtinstall(); + imem = 256*1024*1024; ARGBEGIN{ - case 'B': - bcmem = unittoull(ARGF()); + case 'b': + bloom = 1; + break; + case 'i': + isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0])); + isect[nisect++] = EARGF(usage()); break; - case 'Z': - zero = 0; + case 'd': /* debugging - make sure to run all 3 passes */ + dumb = 1; + break; + case 'M': + imem = unittoull(EARGF(usage())); break; default: usage(); break; }ARGEND - - if(argc != 2) + + if(argc != 1) usage(); if(initventi(argv[0], &conf) < 0) sysfatal("can't init venti: %r"); + ix = mainindex; + if(nisect == 0 && ix->bloom) + bloom = 1; + if(bloom && ix->bloom && resetbloom(ix->bloom) < 0) + sysfatal("loadbloom: %r"); + if(bloom && !ix->bloom) + sysfatal("-b specified but no bloom filter"); + if(!bloom) + ix->bloom = nil; + isectmem = imem/ix->nsects; - if(bcmem < maxblocksize * (mainindex->narenas + mainindex->nsects * 4 + 16)) - bcmem = maxblocksize * (mainindex->narenas + mainindex->nsects * 4 + 16); + /* + * safety first - only need read access to arenas + */ + p = nil; + for(i=0; i<ix->narenas; i++){ + if(ix->arenas[i]->part != p){ + p = ix->arenas[i]->part; + if((fd = open(p->filename, OREAD)) < 0) + sysfatal("cannot reopen %s: %r", p->filename); + dup(fd, p->fd); + close(fd); + } + } + + /* + * need a block for every arena + */ + bcmem = maxblocksize * (mainindex->narenas + 16); if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem); initdcache(bcmem); + + totalclumps = 0; + for(i=0; i<ix->narenas; i++) + totalclumps += ix->arenas[i]->diskstats.clumps; + + totalbuckets = 0; + for(i=0; i<ix->nsects; i++) + totalbuckets += ix->sects[i]->blocks; + fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets); + + /* start index procs */ + fprint(2, "%T read index\n"); + isectdonechan = chancreate(sizeof(void*), 0); + for(i=0; i<ix->nsects; i++){ + if(shouldprocess(ix->sects[i])) + ix->sects[i]->writechan = chancreate(sizeof(IEntry), 0); + vtproc(isectproc, ix->sects[i]); + } + + for(i=0; i<nisect; i++) + if(isect[i]) + fprint(2, "warning: did not find index section %s\n", isect[i]); + + /* start arena procs */ + p = nil; + napart = 0; + arenadonechan = chancreate(sizeof(void*), 0); + for(i=0; i<ix->narenas; i++){ + if(ix->arenas[i]->part != p){ + p = ix->arenas[i]->part; + vtproc(arenapartproc, p); + napart++; + } + } + + /* wait for arena procs to finish */ + for(i=0; i<napart; i++) + recvp(arenadonechan); + + /* tell index procs to finish */ + for(i=0; i<ix->nsects; i++) + if(ix->sects[i]->writechan) + send(ix->sects[i]->writechan, nil); + + /* wait for index procs to finish */ + for(i=0; i<ix->nsects; i++) + if(ix->sects[i]->writechan) + recvp(isectdonechan); + + if(ix->bloom && writebloom(ix->bloom) < 0) + fprint(2, "writing bloom filter: %r\n"); + + fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n", + arenaentries, indexentries, skipentries); + threadexitsall(nil); +} + +static int +shouldprocess(ISect *is) +{ + int i; + + if(nisect == 0) + return 1; + + for(i=0; i<nisect; i++) + if(isect[i] && strcmp(isect[i], is->name) == 0){ + isect[i] = nil; + return 1; + } + return 0; +} + +static void +add(u64int *a, u64int n) +{ + static Lock l; + + lock(&l); + *a += n; + unlock(&l); +} + +/* + * Read through an arena partition and send each of its IEntries + * to the appropriate index section. When finished, send on + * arenadonechan. + */ +enum +{ + ClumpChunks = 32*1024, +}; +static void +arenapartproc(void *v) +{ + int i, j, n, nskip, x; + u32int clump; + u64int addr, tot; + Arena *a; + ClumpInfo *ci, *cis; + IEntry ie; + Part *p; + + p = v; + threadsetname("arenaproc %s", p->name); + + nskip = 0; + tot = 0; + cis = MKN(ClumpInfo, ClumpChunks); + for(i=0; i<ix->narenas; i++){ + a = ix->arenas[i]; + if(a->part != p) + continue; + if(a->memstats.clumps) + fprint(2, "%T arena %s: %d entries\n", + a->name, a->memstats.clumps); + addr = ix->amap[i].start; + for(clump=0; clump<a->memstats.clumps; clump+=n){ + n = ClumpChunks; + if(n > a->memstats.clumps - clump) + n = a->memstats.clumps - clump; + if(readclumpinfos(a, clump, cis, n) != n){ + fprint(2, "%T arena %s: directory read: %r\n", a->name); + errors = 1; + break; + } + for(j=0; j<n; j++){ + ci = &cis[j]; + ie.ia.type = ci->type; + ie.ia.size = ci->uncsize; + ie.ia.addr = addr; + addr += ci->size + ClumpSize; + ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog; + scorecp(ie.score, ci->score); + if(ci->type == VtCorruptType) + nskip++; + else{ + tot++; + x = indexsect(ix, ie.score); + assert(0 <= x && x < ix->nsects); + if(ix->sects[x]->writechan) + send(ix->sects[x]->writechan, &ie); + if(ix->bloom) + markbloomfilter(ix->bloom, ie.score); + } + } + } + } + add(&arenaentries, tot); + add(&skipentries, nskip); + sendp(arenadonechan, p); +} + +/* + * Convert score into relative bucket number in isect. + * Can pass a packed ientry instead of score - score is first. + */ +static u32int +score2bucket(ISect *is, uchar *score) +{ + u32int b; + + b = hashbits(score, 32)/ix->div; + assert(is->start <= b && b < is->stop); + return b - is->start; +} + +/* + * Convert offset in index section to bucket number. + */ +static u32int +offset2bucket(ISect *is, u64int offset) +{ + u32int b; + + assert(is->blockbase <= offset); + offset -= is->blockbase; + b = offset/is->blocksize; + assert(b < is->stop-is->start); + return b; +} + +/* + * Convert bucket number to offset. + */ +static u64int +bucket2offset(ISect *is, u32int b) +{ + assert(b <= is->stop-is->start); + return is->blockbase + (u64int)b*is->blocksize; +} + +/* + * IEntry buffers to hold initial round of spraying. + */ +typedef struct Buf Buf; +struct Buf +{ + Part *part; /* partition being written */ + uchar *bp; /* current block */ + uchar *ep; /* end of block */ + uchar *wp; /* write position in block */ + u64int boffset; /* start offset */ + u64int woffset; /* next write offset */ + u64int eoffset; /* end offset */ + u32int nentry; /* number of entries written */ +}; + +static void +bflush(Buf *buf) +{ + u32int bufsize; + + if(buf->woffset >= buf->eoffset) + sysfatal("buf index chunk overflow - need bufger index"); + bufsize = buf->ep - buf->bp; + if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){ + fprint(2, "write %s: %r\n", buf->part->name); + errors = 1; + } + buf->woffset += bufsize; + memset(buf->bp, 0, bufsize); + buf->wp = buf->bp; +} + +static void +bwrite(Buf *buf, IEntry *ie) +{ + if(buf->wp+IEntrySize > buf->ep) + bflush(buf); + assert(buf->bp <= buf->wp && buf->wp < buf->ep); + packientry(ie, buf->wp); + buf->wp += IEntrySize; + assert(buf->bp <= buf->wp && buf->wp <= buf->ep); + buf->nentry++; +} + +/* + * Minibuffer. In-memory data structure holds our place + * in the buffer but has no block data. We are writing and + * reading the minibuffers at the same time. (Careful!) + */ +typedef struct Minibuf Minibuf; +struct Minibuf +{ + u64int boffset; /* start offset */ + u64int roffset; /* read offset */ + u64int woffset; /* write offset */ + u64int eoffset; /* end offset */ + u32int nentry; /* # entries left to read */ + u32int nwentry; /* # entries written */ +}; + +/* + * Index entry pool. Used when trying to shuffle around + * the entries in a big buffer into the corresponding M minibuffers. + * Sized to hold M*EntriesPerBlock entries, so that there will always + * either be room in the pool for another block worth of entries + * or there will be an entire block worth of sorted entries to + * write out. + */ +typedef struct IEntryLink IEntryLink; +typedef struct IPool IPool; + +struct IEntryLink +{ + uchar ie[IEntrySize]; /* raw IEntry */ + IEntryLink *next; /* next in chain */ +}; + +struct IPool +{ + ISect *isect; + u32int buck0; /* first bucket in pool */ + u32int mbufbuckets; /* buckets per minibuf */ + IEntryLink *entry; /* all IEntryLinks */ + u32int nentry; /* # of IEntryLinks */ + IEntryLink *free; /* free list */ + u32int nfree; /* # on free list */ + Minibuf *mbuf; /* all minibufs */ + u32int nmbuf; /* # of minibufs */ + IEntryLink **mlist; /* lists for each minibuf */ + u32int *mcount; /* # on each mlist[i] */ + u32int bufsize; /* block buffer size */ + uchar *rbuf; /* read buffer */ + uchar *wbuf; /* write buffer */ + u32int epbuf; /* entries per block buffer */ +}; + +/* +static int +countsokay(IPool *p) +{ + int i; + u64int n; + + n = 0; + for(i=0; i<p->nmbuf; i++) + n += p->mcount[i]; + n += p->nfree; + if(n != p->nentry){ + print("free %ud:", p->nfree); + for(i=0; i<p->nmbuf; i++) + print(" %ud", p->mcount[i]); + print(" = %lld nentry: %ud\n", n, p->nentry); + } + return n == p->nentry; +} +*/ - fprint(2, "building a new index %s using %s for temporary storage\n", mainindex->name, argv[1]); +static IPool* +mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf, + u32int mbufbuckets, u32int bufsize) +{ + u32int i, nentry; + uchar *data; + IPool *p; + IEntryLink *l; + + nentry = (nmbuf+1)*bufsize / IEntrySize; + p = ezmalloc(sizeof(IPool) + +nentry*sizeof(IEntry) + +nmbuf*sizeof(IEntryLink*) + +nmbuf*sizeof(u32int) + +3*bufsize); + + p->isect = isect; + p->mbufbuckets = mbufbuckets; + p->bufsize = bufsize; + p->entry = (IEntryLink*)(p+1); + p->nentry = nentry; + p->mlist = (IEntryLink**)(p->entry+nentry); + p->mcount = (u32int*)(p->mlist+nmbuf); + p->nmbuf = nmbuf; + p->mbuf = mbuf; + data = (uchar*)(p->mcount+nmbuf); + data += bufsize - (u32int)data%bufsize; + p->rbuf = data; + p->wbuf = data+bufsize; + p->epbuf = bufsize/IEntrySize; - part = initpart(argv[1], ORDWR|ODIRECT); - if(part == nil) - sysfatal("can't initialize temporary partition: %r"); + for(i=0; i<p->nentry; i++){ + l = &p->entry[i]; + l->next = p->free; + p->free = l; + p->nfree++; + } + return p; +} - clumps = sortrawientries(mainindex, part, &base, mainindex->bloom); - if(clumps == TWID64) - sysfatal("can't build sorted index: %r"); - fprint(2, "found and sorted index entries for clumps=%lld at %lld\n", clumps, base); +/* + * Add the index entry ie to the pool p. + * Caller must know there is room. + */ +static void +ipoolinsert(IPool *p, uchar *ie) +{ + u32int buck, x; + IEntryLink *l; + + assert(p->free != nil); + + buck = score2bucket(p->isect, ie); + x = (buck-p->buck0) / p->mbufbuckets; + if(x >= p->nmbuf){ + fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n", + buck, p->mbufbuckets, x); + } + assert(x < p->nmbuf); - if(buildindex(mainindex, part, base, clumps, zero) < 0) - sysfatal("can't build new index: %r"); + l = p->free; + p->free = l->next; + p->nfree--; + memmove(l->ie, ie, IEntrySize); + l->next = p->mlist[x]; + p->mlist[x] = l; + p->mcount[x]++; +} + +/* + * Pull out a block containing as many + * entries as possible for minibuffer x. + */ +static u32int +ipoolgetbuf(IPool *p, u32int x) +{ + uchar *bp, *ep, *wp; + IEntryLink *l; + u32int n; + + bp = p->wbuf; + ep = p->wbuf + p->bufsize; + n = 0; + assert(x < p->nmbuf); + for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){ + l = p->mlist[x]; + p->mlist[x] = l->next; + p->mcount[x]--; + memmove(wp, l->ie, IEntrySize); + l->next = p->free; + p->free = l; + p->nfree++; + n++; + } + memset(wp, 0, ep-wp); + return n; +} + +/* + * Read a block worth of entries from the minibuf + * into the pool. Caller must know there is room. + */ +static void +ipoolloadblock(IPool *p, Minibuf *mb) +{ + u32int i, n; - if(mainindex->bloom) - writebloom(mainindex->bloom); + assert(mb->nentry > 0); + assert(mb->roffset >= mb->woffset); + assert(mb->roffset < mb->eoffset); - threadexitsall(0); + n = p->bufsize/IEntrySize; + if(n > mb->nentry) + n = mb->nentry; + if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0) + fprint(2, "readpart %s: %r\n", p->isect->part->name); + else{ + for(i=0; i<n; i++) + ipoolinsert(p, p->rbuf+i*IEntrySize); + } + mb->nentry -= n; + mb->roffset += p->bufsize; } + +/* + * Write out a block worth of entries to minibuffer x. + * If necessary, pick up the data there before overwriting it. + */ +static void +ipoolflush0(IPool *pool, u32int x) +{ + u32int bufsize; + Minibuf *mb; + + mb = pool->mbuf+x; + bufsize = pool->bufsize; + mb->nwentry += ipoolgetbuf(pool, x); + if(mb->nentry > 0 && mb->roffset == mb->woffset){ + assert(pool->nfree >= pool->bufsize/IEntrySize); + /* + * There will be room in the pool -- we just + * removed a block worth. + */ + ipoolloadblock(pool, mb); + } + if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0) + fprint(2, "writepart %s: %r\n", pool->isect->part->name); + mb->woffset += bufsize; +} + +/* + * Write out some full block of entries. + * (There must be one -- the pool is almost full!) + */ +static void +ipoolflush1(IPool *pool) +{ + u32int i; + + assert(pool->nfree <= pool->epbuf); + + for(i=0; i<pool->nmbuf; i++){ + if(pool->mcount[i] >= pool->epbuf){ + ipoolflush0(pool, i); + return; + } + } + /* can't be reached - someone must be full */ + sysfatal("ipoolflush1"); +} + +/* + * Flush all the entries in the pool out to disk. + * Nothing more to read from disk. + */ +static void +ipoolflush(IPool *pool) +{ + u32int i; + + for(i=0; i<pool->nmbuf; i++) + while(pool->mlist[i]) + ipoolflush0(pool, i); + assert(pool->nfree == pool->nentry); +} + +/* + * Third pass. Pick up each minibuffer from disk into + * memory and then write out the buckets. + */ + +/* + * Compare two packed index entries. + * Usual ordering except break ties by putting higher + * index addresses first (assumes have duplicates + * due to corruption in the lower addresses). + */ +static int +ientrycmpaddr(const void *va, const void *vb) +{ + int i; + uchar *a, *b; + + a = (uchar*)va; + b = (uchar*)vb; + i = ientrycmp(a, b); + if(i) + return i; + return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8); +} + +static void +zerorange(Part *p, u64int o, u64int e) +{ + static uchar zero[MaxIoSize]; + u32int n; + + for(; o<e; o+=n){ + n = sizeof zero; + if(o+n > e) + n = e-o; + if(writepart(p, o, zero, n) < 0) + fprint(2, "writepart %s: %r\n", p->name); + } +} + +/* + * Load a minibuffer into memory and write out the + * corresponding buckets. + */ +static void +sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize) +{ + uchar *buckdata, *p, *q, *ep; + u32int b, lastb, memsize, n; + u64int o; + IBucket ib; + Part *part; + + part = is->part; + buckdata = emalloc(is->blocksize); + + if(mb->nwentry == 0) + return; + + /* + * read entire buffer. + */ + assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset); + assert(mb->woffset-mb->boffset <= nbuf); + if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){ + fprint(2, "readpart %s: %r\n", part->name); + errors = 1; + return; + } + assert(*(uint*)buf != 0xa5a5a5a5); + + /* + * remove fragmentation due to IEntrySize + * not evenly dividing Bufsize + */ + memsize = (bufsize/IEntrySize)*IEntrySize; + for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){ + memmove(p, q, memsize); + p += memsize; + q += bufsize; + } + ep = buf + mb->nwentry*IEntrySize; + assert(ep <= buf+nbuf); + + /* + * sort entries + */ + qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr); + + /* + * write buckets out + */ + n = 0; + lastb = offset2bucket(is, mb->boffset); + for(p=buf; p<ep; p=q){ + b = score2bucket(is, p); + for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize) + ; + if(lastb+1 < b && zero) + zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b)); + if(IBucketSize+(q-p) > is->blocksize) + sysfatal("bucket overflow - make index bigger"); + memmove(buckdata+IBucketSize, p, q-p); + ib.n = (q-p)/IEntrySize; + n += ib.n; + packibucket(&ib, buckdata, is->bucketmagic); + if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0) + fprint(2, "write %s: %r\n", part->name); + lastb = b; + } + if(lastb+1 < is->stop-is->start && zero) + zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start)); + + if(n != mb->nwentry) + fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%ld\n", n, mb->nwentry, (ep-buf)/IEntrySize); + + free(buckdata); +} + +static void +isectproc(void *v) +{ + u32int buck, bufbuckets, bufsize, epbuf, i, j; + u32int mbufbuckets, n, nbucket, nn, space; + u32int nbuf, nminibuf, xminiclump, prod; + u64int blocksize, offset, xclump; + uchar *data, *p; + Buf *buf; + IEntry ie; + IPool *ipool; + ISect *is; + Minibuf *mbuf, *mb; + + is = v; + blocksize = is->blocksize; + nbucket = is->stop - is->start; + + /* + * Three passes: + * pass 1 - write index entries from arenas into + * large sequential sections on index disk. + * requires nbuf * bufsize memory. + * + * pass 2 - split each section into minibufs. + * requires nminibuf * bufsize memory. + * + * pass 3 - read each minibuf into memory and + * write buckets out. + * requires entries/minibuf * IEntrySize memory. + * + * The larger we set bufsize the less seeking hurts us. + * + * The fewer sections and minibufs we have, the less + * seeking hurts us. + * + * The fewer sections and minibufs we have, the + * more entries we end up with in each minibuf + * at the end. + * + * Shoot for using half our memory to hold each + * minibuf. The chance of a random distribution + * getting off by 2x is quite low. + * + * Once that is decided, figure out the smallest + * nminibuf and nsection/biggest bufsize we can use + * and still fit in the memory constraints. + */ + + /* expected number of clump index entries we'll see */ + xclump = nbucket * (double)totalclumps/totalbuckets; + + /* number of clumps we want to see in a minibuf */ + xminiclump = isectmem/2/IEntrySize; + + /* total number of minibufs we need */ + prod = xclump / xminiclump; + + /* if possible, skip second pass */ + if(!dumb && prod*MinBufSize < isectmem){ + nbuf = prod; + nminibuf = 1; + }else{ + /* otherwise use nsection = sqrt(nmini) */ + for(nbuf=1; nbuf*nbuf<prod; nbuf++) + ; + if(nbuf*MinBufSize > isectmem) + sysfatal("not enough memory"); + nminibuf = nbuf; + } + /* size buffer to use extra memory */ + bufsize = MinBufSize; + while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize) + bufsize *= 2; + data = emalloc(nbuf*bufsize); + epbuf = bufsize/IEntrySize; + + fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n", + is->part->name, nbucket, nbuf, nminibuf, bufsize); + /* + * Accept index entries from arena procs. + */ + buf = MKNZ(Buf, nbuf); + p = data; + offset = is->blockbase; + bufbuckets = (nbucket+nbuf-1)/nbuf; + for(i=0; i<nbuf; i++){ + buf[i].part = is->part; + buf[i].bp = p; + buf[i].wp = p; + p += bufsize; + buf[i].ep = p; + buf[i].boffset = offset; + buf[i].woffset = offset; + if(i < nbuf-1){ + offset += bufbuckets*blocksize; + buf[i].eoffset = offset; + }else{ + offset = is->blockbase + nbucket*blocksize; + buf[i].eoffset = offset; + } + } + assert(p == data+nbuf*bufsize); + + n = 0; + while(recv(is->writechan, &ie) == 1){ + if(ie.ia.addr == 0) + break; + buck = score2bucket(is, ie.score); + i = buck/bufbuckets; + assert(i < nbuf); + bwrite(&buf[i], &ie); + n++; + } + add(&indexentries, n); + + nn = 0; + for(i=0; i<nbuf; i++){ + bflush(&buf[i]); + buf[i].bp = nil; + buf[i].ep = nil; + buf[i].wp = nil; + nn += buf[i].nentry; + } + if(n != nn) + fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn); + + free(data); + + fprint(2, "%T %s: reordering\n", is->part->name); + + /* + * Rearrange entries into minibuffers and then + * split each minibuffer into buckets. + */ + mbuf = MKN(Minibuf, nminibuf); + mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf; + for(i=0; i<nbuf; i++){ + /* + * Set up descriptors. + */ + n = buf[i].nentry; + nn = 0; + offset = buf[i].boffset; + memset(mbuf, 0, nminibuf*sizeof(mbuf[0])); + for(j=0; j<nminibuf; j++){ + mb = &mbuf[j]; + mb->boffset = offset; + if(j < nminibuf-1){ + offset += mbufbuckets*blocksize; + mb->eoffset = offset; + }else + mb->eoffset = buf[i].eoffset; + mb->roffset = mb->boffset; + mb->woffset = mb->boffset; + mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize; + if(mb->nentry > buf[i].nentry) + mb->nentry = buf[i].nentry; + buf[i].nentry -= mb->nentry; + nn += mb->nentry; + } + if(n != nn) + fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);; + /* + * Rearrange. + */ + if(!dumb && nminibuf == 1){ + mbuf[0].nwentry = mbuf[0].nentry; + mbuf[0].woffset = buf[i].woffset; + }else{ + ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize); + ipool->buck0 = bufbuckets*i; + for(j=0; j<nminibuf; j++){ + mb = &mbuf[j]; + while(mb->nentry > 0){ + if(ipool->nfree < epbuf){ + ipoolflush1(ipool); + /* ipoolflush1 might change mb->nentry */ + continue; + } + assert(ipool->nfree >= epbuf); + ipoolloadblock(ipool, mb); + } + } + ipoolflush(ipool); + nn = 0; + for(j=0; j<nminibuf; j++) + nn += mbuf[j].nwentry; + if(n != nn) + fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i); + free(ipool); + } + + /* + * Make buckets. + */ + space = 0; + for(j=0; j<nminibuf; j++) + if(space < mbuf[j].woffset - mbuf[j].boffset) + space = mbuf[j].woffset - mbuf[j].boffset; + + data = emalloc(space); + for(j=0; j<nminibuf; j++){ + mb = &mbuf[j]; + sortminibuffer(is, mb, data, space, bufsize); + } + free(data); + } + + sendp(isectdonechan, is); +} + + + |