diff options
author | rsc <devnull@localhost> | 2003-11-23 17:54:58 +0000 |
---|---|---|
committer | rsc <devnull@localhost> | 2003-11-23 17:54:58 +0000 |
commit | 7a4ee46d253e291044bba2d0c54b818b67ac013c (patch) | |
tree | 7bdcaf69a15ecd24c057a697936b67bbde93e00b /src/cmd/venti | |
parent | 4fbfdd7acd4bf4fc71b1329230e05fc761907566 (diff) | |
download | plan9port-7a4ee46d253e291044bba2d0c54b818b67ac013c.tar.gz plan9port-7a4ee46d253e291044bba2d0c54b818b67ac013c.tar.bz2 plan9port-7a4ee46d253e291044bba2d0c54b818b67ac013c.zip |
Initial stab at Venti.
Diffstat (limited to 'src/cmd/venti')
52 files changed, 9527 insertions, 0 deletions
diff --git a/src/cmd/venti/arena.c b/src/cmd/venti/arena.c new file mode 100644 index 00000000..4a59b278 --- /dev/null +++ b/src/cmd/venti/arena.c @@ -0,0 +1,641 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +typedef struct ASum ASum; + +struct ASum +{ + Arena *arena; + ASum *next; +}; + +static void sealarena(Arena *arena); +static int okarena(Arena *arena); +static int loadarena(Arena *arena); +static CIBlock *getcib(Arena *arena, int clump, int writing, CIBlock *rock); +static void putcib(Arena *arena, CIBlock *cib); +static void sumproc(void *); + +static QLock sumlock; +static Rendez sumwait; +static ASum *sumq; + +int +initarenasum(void) +{ + sumwait.l = &sumlock; + + if(vtproc(sumproc, nil) < 0){ + seterr(EOk, "can't start arena checksum slave: %r"); + return -1; + } + return 0; +} + +/* + * make an Arena, and initialize it based upon the disk header and trailer. + */ +Arena* +initarena(Part *part, u64int base, u64int size, u32int blocksize) +{ + Arena *arena; + + arena = MKZ(Arena); + arena->part = part; + arena->blocksize = blocksize; + arena->clumpmax = arena->blocksize / ClumpInfoSize; + arena->base = base + blocksize; + arena->size = size - 2 * blocksize; + + if(loadarena(arena) < 0){ + seterr(ECorrupt, "arena header or trailer corrupted"); + freearena(arena); + return nil; + } + if(okarena(arena) < 0){ + freearena(arena); + return nil; + } + + if(arena->sealed && scorecmp(zeroscore, arena->score)==0) + backsumarena(arena); + + return arena; +} + +void +freearena(Arena *arena) +{ + if(arena == nil) + return; + if(arena->cib.data != nil){ + putdblock(arena->cib.data); + arena->cib.data = nil; + } + free(arena); +} + +Arena* +newarena(Part *part, char *name, u64int base, u64int size, u32int blocksize) +{ + Arena *arena; + + if(nameok(name) < 0){ + seterr(EOk, "illegal arena name", name); + return nil; + } + arena = MKZ(Arena); + arena->part = part; + arena->version = ArenaVersion; + arena->blocksize = blocksize; + arena->clumpmax = arena->blocksize / ClumpInfoSize; + arena->base = base + blocksize; + arena->size = size - 2 * blocksize; + + namecp(arena->name, name); + + if(wbarena(arena)<0 || wbarenahead(arena)<0){ + freearena(arena); + return nil; + } + + return arena; +} + +int +readclumpinfo(Arena *arena, int clump, ClumpInfo *ci) +{ + CIBlock *cib, r; + + cib = getcib(arena, clump, 0, &r); + if(cib == nil) + return -1; + unpackclumpinfo(ci, &cib->data->data[cib->offset]); + putcib(arena, cib); + return 0; +} + +int +readclumpinfos(Arena *arena, int clump, ClumpInfo *cis, int n) +{ + CIBlock *cib, r; + int i; + + for(i = 0; i < n; i++){ + cib = getcib(arena, clump + i, 0, &r); + if(cib == nil) + break; + unpackclumpinfo(&cis[i], &cib->data->data[cib->offset]); + putcib(arena, cib); + } + return i; +} + +/* + * write directory information for one clump + * must be called the arena locked + */ +int +writeclumpinfo(Arena *arena, int clump, ClumpInfo *ci) +{ + CIBlock *cib, r; + + cib = getcib(arena, clump, 1, &r); + if(cib == nil) + return -1; + packclumpinfo(ci, &cib->data->data[cib->offset]); + putcib(arena, cib); + return 0; +} + +u64int +arenadirsize(Arena *arena, u32int clumps) +{ + return ((clumps / arena->clumpmax) + 1) * arena->blocksize; +} + +/* + * read a clump of data + * n is a hint of the size of the data, not including the header + * make sure it won't run off the end, then return the number of bytes actually read + */ +u32int +readarena(Arena *arena, u64int aa, u8int *buf, long n) +{ + DBlock *b; + u64int a; + u32int blocksize, off, m; + long nn; + + if(n == 0) + return -1; + + qlock(&arena->lock); + a = arena->size - arenadirsize(arena, arena->clumps); + qunlock(&arena->lock); + if(aa >= a){ + seterr(EOk, "reading beyond arena clump storage: clumps=%d aa=%lld a=%lld -1 clumps=%lld\n", arena->clumps, aa, a, arena->size - arenadirsize(arena, arena->clumps - 1)); + return -1; + } + if(aa + n > a) + n = a - aa; + + blocksize = arena->blocksize; + a = arena->base + aa; + off = a & (blocksize - 1); + a -= off; + nn = 0; + for(;;){ + b = getdblock(arena->part, a, 1); + if(b == nil) + return -1; + m = blocksize - off; + if(m > n - nn) + m = n - nn; + memmove(&buf[nn], &b->data[off], m); + putdblock(b); + nn += m; + if(nn == n) + break; + off = 0; + a += blocksize; + } + return n; +} + +/* + * write some data to the clump section at a given offset + * used to fix up corrupted arenas. + */ +u32int +writearena(Arena *arena, u64int aa, u8int *clbuf, u32int n) +{ + DBlock *b; + u64int a; + u32int blocksize, off, m; + long nn; + int ok; + + if(n == 0) + return -1; + + qlock(&arena->lock); + a = arena->size - arenadirsize(arena, arena->clumps); + if(aa >= a || aa + n > a){ + qunlock(&arena->lock); + seterr(EOk, "writing beyond arena clump storage"); + return -1; + } + + blocksize = arena->blocksize; + a = arena->base + aa; + off = a & (blocksize - 1); + a -= off; + nn = 0; + for(;;){ + b = getdblock(arena->part, a, off != 0 || off + n < blocksize); + if(b == nil){ + qunlock(&arena->lock); + return -1; + } + m = blocksize - off; + if(m > n - nn) + m = n - nn; + memmove(&b->data[off], &clbuf[nn], m); + ok = writepart(arena->part, a, b->data, blocksize); + putdblock(b); + if(ok < 0){ + qunlock(&arena->lock); + return -1; + } + nn += m; + if(nn == n) + break; + off = 0; + a += blocksize; + } + qunlock(&arena->lock); + return n; +} + +/* + * allocate space for the clump and write it, + * updating the arena directory +ZZZ question: should this distinguish between an arena +filling up and real errors writing the clump? + */ +u64int +writeaclump(Arena *arena, Clump *c, u8int *clbuf) +{ + DBlock *b; + u64int a, aa; + u32int clump, n, nn, m, off, blocksize; + int ok; + + n = c->info.size + ClumpSize; + qlock(&arena->lock); + aa = arena->used; + if(arena->sealed + || aa + n + U32Size + arenadirsize(arena, arena->clumps + 1) > arena->size){ + if(!arena->sealed) + sealarena(arena); + qunlock(&arena->lock); + return TWID64; + } + if(packclump(c, &clbuf[0]) < 0){ + qunlock(&arena->lock); + return TWID64; + } + + /* + * write the data out one block at a time + */ + blocksize = arena->blocksize; + a = arena->base + aa; + off = a & (blocksize - 1); + a -= off; + nn = 0; + for(;;){ + b = getdblock(arena->part, a, off != 0); + if(b == nil){ + qunlock(&arena->lock); + return TWID64; + } + m = blocksize - off; + if(m > n - nn) + m = n - nn; + memmove(&b->data[off], &clbuf[nn], m); +print("writing\n"); + ok = writepart(arena->part, a, b->data, blocksize); + putdblock(b); + if(ok < 0){ + qunlock(&arena->lock); + return TWID64; + } + nn += m; + if(nn == n) + break; + off = 0; + a += blocksize; + } + + arena->used += c->info.size + ClumpSize; + arena->uncsize += c->info.uncsize; + if(c->info.size < c->info.uncsize) + arena->cclumps++; + + clump = arena->clumps++; + if(arena->clumps == 0) + sysfatal("clumps wrapped\n"); + arena->wtime = now(); + if(arena->ctime == 0) + arena->ctime = arena->wtime; + + writeclumpinfo(arena, clump, &c->info); +//ZZZ make this an enum param + if((clump & 0x1ff) == 0x1ff){ + flushciblocks(arena); + wbarena(arena); + } + + qunlock(&arena->lock); + return aa; +} + +/* + * once sealed, an arena never has any data added to it. + * it should only be changed to fix errors. + * this also syncs the clump directory. + */ +static void +sealarena(Arena *arena) +{ + flushciblocks(arena); + arena->sealed = 1; + wbarena(arena); + backsumarena(arena); +} + +void +backsumarena(Arena *arena) +{ + ASum *as; + + as = MK(ASum); + if(as == nil) + return; + qlock(&sumlock); + as->arena = arena; + as->next = sumq; + sumq = as; + rwakeup(&sumwait); + qunlock(&sumlock); +} + +static void +sumproc(void *unused) +{ + ASum *as; + Arena *arena; + + USED(unused); + + for(;;){ + qlock(&sumlock); + while(sumq == nil) + rsleep(&sumwait); + as = sumq; + sumq = as->next; + qunlock(&sumlock); + arena = as->arena; + free(as); + + sumarena(arena); + } +} + +void +sumarena(Arena *arena) +{ + ZBlock *b; + DigestState s; + u64int a, e; + u32int bs; + u8int score[VtScoreSize]; + + bs = MaxIoSize; + if(bs < arena->blocksize) + bs = arena->blocksize; + + /* + * read & sum all blocks except the last one + */ + memset(&s, 0, sizeof s); + b = alloczblock(bs, 0); + e = arena->base + arena->size; + for(a = arena->base - arena->blocksize; a + arena->blocksize <= e; a += bs){ + if(a + bs > e) + bs = arena->blocksize; + if(readpart(arena->part, a, b->data, bs) < 0) + goto ReadErr; + sha1(b->data, bs, nil, &s); + } + + /* + * the last one is special, since it may already have the checksum included + */ + bs = arena->blocksize; + if(readpart(arena->part, e, b->data, bs) < 0){ +ReadErr: + logerr(EOk, "sumarena can't sum %s, read at %lld failed: %r", arena->name, a); + freezblock(b); + return; + } + + sha1(b->data, bs-VtScoreSize, nil, &s); + sha1(zeroscore, VtScoreSize, nil, &s); + sha1(nil, 0, score, &s); + + /* + * check for no checksum or the same + */ + if(scorecmp(score, &b->data[bs - VtScoreSize]) != 0){ + if(scorecmp(zeroscore, &b->data[bs - VtScoreSize]) != 0) + logerr(EOk, "overwriting mismatched checksums for arena=%s, found=%V calculated=%V", + arena->name, &b->data[bs - VtScoreSize], score); + scorecp(&b->data[bs - VtScoreSize], score); + if(writepart(arena->part, e, b->data, bs) < 0) + logerr(EOk, "sumarena can't write sum for %s: %r", arena->name); + } + freezblock(b); + + qlock(&arena->lock); + scorecp(arena->score, score); + qunlock(&arena->lock); +} + +/* + * write the arena trailer block to the partition + */ +int +wbarena(Arena *arena) +{ + ZBlock *b; + int bad; + + b = alloczblock(arena->blocksize, 1); + if(b == nil){ + logerr(EAdmin, "can't write arena trailer: %r"); +///ZZZ add error message? + return -1; + } + bad = okarena(arena)<0 || packarena(arena, b->data)<0 || + writepart(arena->part, arena->base + arena->size, b->data, arena->blocksize)<0; + freezblock(b); + if(bad) + return -1; + return 0; +} + +int +wbarenahead(Arena *arena) +{ + ZBlock *b; + ArenaHead head; + int bad; + + namecp(head.name, arena->name); + head.version = arena->version; + head.size = arena->size + 2 * arena->blocksize; + head.blocksize = arena->blocksize; + b = alloczblock(arena->blocksize, 1); + if(b == nil){ + logerr(EAdmin, "can't write arena header: %r"); +///ZZZ add error message? + return -1; + } + bad = packarenahead(&head, b->data)<0 || + writepart(arena->part, arena->base - arena->blocksize, b->data, arena->blocksize)<0; + freezblock(b); + if(bad) + return -1; + return 0; +} + +/* + * read the arena header and trailer blocks from disk + */ +static int +loadarena(Arena *arena) +{ + ArenaHead head; + ZBlock *b; + + b = alloczblock(arena->blocksize, 0); + if(b == nil) + return -1; + if(readpart(arena->part, arena->base + arena->size, b->data, arena->blocksize) < 0){ + freezblock(b); + return -1; + } + if(unpackarena(arena, b->data) < 0){ + freezblock(b); + return -1; + } + if(arena->version != ArenaVersion){ + seterr(EAdmin, "unknown arena version %d", arena->version); + freezblock(b); + return -1; + } + scorecp(arena->score, &b->data[arena->blocksize - VtScoreSize]); + + if(readpart(arena->part, arena->base - arena->blocksize, b->data, arena->blocksize) < 0){ + logerr(EAdmin, "can't read arena header: %r"); + freezblock(b); + return 0; + } + if(unpackarenahead(&head, b->data) < 0) + logerr(ECorrupt, "corrupted arena header: %r"); + else if(namecmp(arena->name, head.name)!=0 + || arena->version != head.version + || arena->blocksize != head.blocksize + || arena->size + 2 * arena->blocksize != head.size) + logerr(ECorrupt, "arena header inconsistent with arena data"); + freezblock(b); + + return 0; +} + +static int +okarena(Arena *arena) +{ + u64int dsize; + int ok; + + ok = 0; + dsize = arenadirsize(arena, arena->clumps); + if(arena->used + dsize > arena->size){ + seterr(ECorrupt, "arena used > size"); + ok = -1; + } + + if(arena->cclumps > arena->clumps) + logerr(ECorrupt, "arena has more compressed clumps than total clumps"); + + if(arena->uncsize + arena->clumps * ClumpSize + arena->blocksize < arena->used) + logerr(ECorrupt, "arena uncompressed size inconsistent with used space %lld %d %lld", arena->uncsize, arena->clumps, arena->used); + + if(arena->ctime > arena->wtime) + logerr(ECorrupt, "arena creation time after last write time"); + + return ok; +} + +static CIBlock* +getcib(Arena *arena, int clump, int writing, CIBlock *rock) +{ + CIBlock *cib; + u32int block, off; + + if(clump >= arena->clumps){ + seterr(EOk, "clump directory access out of range"); + return nil; + } + block = clump / arena->clumpmax; + off = (clump - block * arena->clumpmax) * ClumpInfoSize; + + if(arena->cib.block == block + && arena->cib.data != nil){ + arena->cib.offset = off; + return &arena->cib; + } + + if(writing){ + flushciblocks(arena); + cib = &arena->cib; + }else + cib = rock; + + qlock(&stats.lock); + stats.cireads++; + qunlock(&stats.lock); + + cib->block = block; + cib->offset = off; + cib->data = getdblock(arena->part, arena->base + arena->size - (block + 1) * arena->blocksize, arena->blocksize); + if(cib->data == nil) + return nil; + return cib; +} + +static void +putcib(Arena *arena, CIBlock *cib) +{ + if(cib != &arena->cib){ + putdblock(cib->data); + cib->data = nil; + } +} + +/* + * must be called with arena locked + */ +int +flushciblocks(Arena *arena) +{ + int ok; + + if(arena->cib.data == nil) + return 0; + qlock(&stats.lock); + stats.ciwrites++; + qunlock(&stats.lock); + ok = writepart(arena->part, arena->base + arena->size - (arena->cib.block + 1) * arena->blocksize, arena->cib.data->data, arena->blocksize); + + if(ok < 0) + seterr(EAdmin, "failed writing arena directory block"); + putdblock(arena->cib.data); + arena->cib.data = nil; + return ok; +} diff --git a/src/cmd/venti/arenas.c b/src/cmd/venti/arenas.c new file mode 100644 index 00000000..5275b938 --- /dev/null +++ b/src/cmd/venti/arenas.c @@ -0,0 +1,404 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +typedef struct AHash AHash; + +/* + * hash table for finding arena's based on their names. + */ +struct AHash +{ + AHash *next; + Arena *arena; +}; + +enum +{ + AHashSize = 512 +}; + +static AHash *ahash[AHashSize]; + +static u32int +hashstr(char *s) +{ + u32int h; + int c; + + h = 0; + for(; c = *s; s++){ + c ^= c << 6; + h += (c << 11) ^ (c >> 1); + c = *s; + h ^= (c << 14) + (c << 7) + (c << 4) + c; + } + return h; +} + +int +addarena(Arena *arena) +{ + AHash *a; + u32int h; + + h = hashstr(arena->name) & (AHashSize - 1); + a = MK(AHash); + if(a == nil) + return -1; + a->arena = arena; + a->next = ahash[h]; + ahash[h] = a; + return 0; +} + +Arena* +findarena(char *name) +{ + AHash *a; + u32int h; + + h = hashstr(name) & (AHashSize - 1); + for(a = ahash[h]; a != nil; a = a->next) + if(strcmp(a->arena->name, name) == 0) + return a->arena; + return nil; +} + +int +delarena(Arena *arena) +{ + AHash *a, *last; + u32int h; + + h = hashstr(arena->name) & (AHashSize - 1); + last = nil; + for(a = ahash[h]; a != nil; a = a->next){ + if(a->arena == arena){ + if(last != nil) + last->next = a->next; + else + ahash[h] = a->next; + free(a); + return 0; + } + last = a; + } + return -1; +} + +ArenaPart* +initarenapart(Part *part) +{ + AMapN amn; + ArenaPart *ap; + ZBlock *b; + u32int i; + int ok; + + b = alloczblock(HeadSize, 0); + if(b == nil || readpart(part, PartBlank, b->data, HeadSize) < 0){ + seterr(EAdmin, "can't read arena partition header: %r"); + return nil; + } + + ap = MKZ(ArenaPart); + if(ap == nil){ + freezblock(b); + return nil; + } + ap->part = part; + ok = unpackarenapart(ap, b->data); + freezblock(b); + if(ok < 0){ + seterr(ECorrupt, "corrupted arena partition header: %r"); + freearenapart(ap, 0); + return nil; + } + + ap->tabbase = (PartBlank + HeadSize + ap->blocksize - 1) & ~(ap->blocksize - 1); + if(ap->version != ArenaPartVersion){ + seterr(ECorrupt, "unknown arena partition version %d", ap->version); + freearenapart(ap, 0); + return nil; + } + if(ap->blocksize & (ap->blocksize - 1)){ + seterr(ECorrupt, "illegal non-power-of-2 block size %d\n", ap->blocksize); + freearenapart(ap, 0); + return nil; + } + if(ap->tabbase >= ap->arenabase){ + seterr(ECorrupt, "arena partition table overlaps with arena storage"); + freearenapart(ap, 0); + return nil; + } + ap->tabsize = ap->arenabase - ap->tabbase; + partblocksize(part, ap->blocksize); + ap->size = ap->part->size & ~(u64int)(ap->blocksize - 1); + + if(readarenamap(&amn, part, ap->tabbase, ap->tabsize) < 0){ + freearenapart(ap, 0); + return nil; + } + ap->narenas = amn.n; + ap->map = amn.map; + if(okamap(ap->map, ap->narenas, ap->arenabase, ap->size, "arena table") < 0){ + freearenapart(ap, 0); + return nil; + } + + ap->arenas = MKNZ(Arena*, ap->narenas); + for(i = 0; i < ap->narenas; i++){ + ap->arenas[i] = initarena(part, ap->map[i].start, ap->map[i].stop - ap->map[i].start, ap->blocksize); + if(ap->arenas[i] == nil){ + freearenapart(ap, 1); + return nil; + } + if(namecmp(ap->map[i].name, ap->arenas[i]->name) != 0){ + seterr(ECorrupt, "arena name mismatches with expected name: %s vs. %s", + ap->map[i].name, ap->arenas[i]->name); + freearenapart(ap, 1); + return nil; + } + if(findarena(ap->arenas[i]->name)){ + seterr(ECorrupt, "duplicate arena name %s in %s", + ap->map[i].name, ap->part->name); + freearenapart(ap, 1); + return nil; + } + } + + for(i = 0; i < ap->narenas; i++) + addarena(ap->arenas[i]); + + return ap; +} + +ArenaPart* +newarenapart(Part *part, u32int blocksize, u32int tabsize) +{ + ArenaPart *ap; + + if(blocksize & (blocksize - 1)){ + seterr(ECorrupt, "illegal non-power-of-2 block size %d\n", blocksize); + return nil; + } + ap = MKZ(ArenaPart); + if(ap == nil) + return nil; + + ap->version = ArenaPartVersion; + ap->part = part; + ap->blocksize = blocksize; + partblocksize(part, blocksize); + ap->size = part->size & ~(u64int)(blocksize - 1); + ap->tabbase = (PartBlank + HeadSize + blocksize - 1) & ~(blocksize - 1); + ap->arenabase = (ap->tabbase + tabsize + blocksize - 1) & ~(blocksize - 1); + ap->tabsize = ap->arenabase - ap->tabbase; + ap->narenas = 0; + + if(wbarenapart(ap) < 0){ + freearenapart(ap, 0); + return nil; + } + + return ap; +} + +int +wbarenapart(ArenaPart *ap) +{ + ZBlock *b; + + if(okamap(ap->map, ap->narenas, ap->arenabase, ap->size, "arena table") < 0) + return -1; + b = alloczblock(HeadSize, 1); + if(b == nil) +//ZZZ set error message? + return -1; + + if(packarenapart(ap, b->data) < 0){ + seterr(ECorrupt, "can't make arena partition header: %r"); + freezblock(b); + return -1; + } + if(writepart(ap->part, PartBlank, b->data, HeadSize) < 0){ + seterr(EAdmin, "can't write arena partition header: %r"); + freezblock(b); + return -1; + } + freezblock(b); + + return wbarenamap(ap->map, ap->narenas, ap->part, ap->tabbase, ap->tabsize); +} + +void +freearenapart(ArenaPart *ap, int freearenas) +{ + int i; + + if(ap == nil) + return; + if(freearenas){ + for(i = 0; i < ap->narenas; i++){ + if(ap->arenas[i] == nil) + continue; + delarena(ap->arenas[i]); + freearena(ap->arenas[i]); + } + } + free(ap->map); + free(ap->arenas); + free(ap); +} + +int +okamap(AMap *am, int n, u64int start, u64int stop, char *what) +{ + u64int last; + u32int i; + + last = start; + for(i = 0; i < n; i++){ + if(am[i].start < last){ + if(i == 0) + seterr(ECorrupt, "invalid start address in %s", what); + else + seterr(ECorrupt, "overlapping ranges in %s", what); + return -1; + } + if(am[i].stop < am[i].start){ + seterr(ECorrupt, "invalid range in %s", what); + return -1; + } + last = am[i].stop; + } + if(last > stop){ + seterr(ECorrupt, "invalid ending address in %s", what); + return -1; + } + return 0; +} + +int +maparenas(AMap *am, Arena **arenas, int n, char *what) +{ + u32int i; + + for(i = 0; i < n; i++){ + arenas[i] = findarena(am[i].name); + if(arenas[i] == nil){ + seterr(EAdmin, "can't find arena '%s' for '%s'\n", am[i].name, what); + return -1; + } + } + return 0; +} + +int +readarenamap(AMapN *amn, Part *part, u64int base, u32int size) +{ + IFile f; + u32int ok; + + if(partifile(&f, part, base, size) < 0) + return -1; + ok = parseamap(&f, amn); + freeifile(&f); + return ok; +} + +int +wbarenamap(AMap *am, int n, Part *part, u64int base, u64int size) +{ + Fmt f; + ZBlock *b; + + b = alloczblock(size, 1); + if(b == nil) + return -1; + + fmtzbinit(&f, b); + + if(outputamap(&f, am, n) < 0){ + seterr(ECorrupt, "arena set size too small"); + freezblock(b); + return -1; + } + if(writepart(part, base, b->data, size) < 0){ + seterr(EAdmin, "can't write arena set: %r"); + freezblock(b); + return -1; + } + freezblock(b); + return 0; +} + +/* + * amap: n '\n' amapelem * n + * n: u32int + * amapelem: name '\t' astart '\t' asize '\n' + * astart, asize: u64int + */ +int +parseamap(IFile *f, AMapN *amn) +{ + AMap *am; + u64int v64; + u32int v; + char *s, *flds[4]; + int i, n; + + /* + * arenas + */ + if(ifileu32int(f, &v) < 0){ + seterr(ECorrupt, "syntax error: bad number of elements in %s", f->name); + return -1; + } + n = v; + if(n > MaxAMap){ + seterr(ECorrupt, "illegal number of elements in %s", f->name); + return -1; + } + am = MKNZ(AMap, n); + if(am == nil) + return -1; + for(i = 0; i < n; i++){ + s = ifileline(f); + if(s == nil || getfields(s, flds, 4, 0, "\t") != 3) + return -1; + if(nameok(flds[0]) < 0) + return -1; + namecp(am[i].name, flds[0]); + if(stru64int(flds[1], &v64) < 0){ + seterr(ECorrupt, "syntax error: bad arena base address in %s", f->name); + free(am); + return -1; + } + am[i].start = v64; + if(stru64int(flds[2], &v64) < 0){ + seterr(ECorrupt, "syntax error: bad arena size in %s", f->name); + free(am); + return -1; + } + am[i].stop = v64; + } + + amn->map = am; + amn->n = n; + return 0; +} + +int +outputamap(Fmt *f, AMap *am, int n) +{ + int i; + + if(fmtprint(f, "%ud\n", n) < 0) + return -1; + for(i = 0; i < n; i++) + if(fmtprint(f, "%s\t%llud\t%llud\n", am[i].name, am[i].start, am[i].stop) < 0) + return -1; + return 0; +} diff --git a/src/cmd/venti/buildbuck.c b/src/cmd/venti/buildbuck.c new file mode 100644 index 00000000..4232bb47 --- /dev/null +++ b/src/cmd/venti/buildbuck.c @@ -0,0 +1,112 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +struct IEStream +{ + Part *part; + u64int off; /* read position within part */ + u64int n; /* number of valid ientries left to read */ + u32int size; /* allocated space in buffer */ + u8int *buf; + u8int *pos; /* current place in buffer */ + u8int *epos; /* end of valid buffer contents */ +}; + +IEStream* +initiestream(Part *part, u64int off, u64int clumps, u32int size) +{ + IEStream *ies; + +//ZZZ out of memory? + ies = MKZ(IEStream); + ies->buf = MKN(u8int, size); + ies->epos = ies->buf; + ies->pos = ies->epos; + ies->off = off; + ies->n = clumps; + ies->size = size; + ies->part = part; + return ies; +} + +void +freeiestream(IEStream *ies) +{ + if(ies == nil) + return; + free(ies->buf); + free(ies); +} + +static u8int* +peekientry(IEStream *ies) +{ + u32int n, nn; + + n = ies->epos - ies->pos; + if(n < IEntrySize){ + memmove(ies->buf, ies->pos, n); + ies->epos = &ies->buf[n]; + ies->pos = ies->buf; + nn = ies->size; + if(nn > ies->n * IEntrySize) + nn = ies->n * IEntrySize; + nn -= n; + if(nn == 0) + return nil; + if(readpart(ies->part, ies->off, ies->epos, nn) < 0){ + seterr(EOk, "can't read sorted index entries: %r"); + return nil; + } + ies->epos += nn; + ies->off += nn; + } + return ies->pos; +} + +static u32int +iebuck(Index *ix, u8int *b) +{ + return hashbits(b, 32) / ix->div; +} + +u32int +buildbucket(Index *ix, IEStream *ies, IBucket *ib) +{ + IEntry ie1, ie2; + u8int *b; + u32int buck; + + buck = TWID32; + ib->n = 0; + ib->next = 0; + while(ies->n){ + b = peekientry(ies); + if(b == nil) + return TWID32; +//fprint(2, "b=%p ies->n=%lld ib.n=%d buck=%d score=%V\n", b, ies->n, ib->n, iebuck(ix, b), b); + if(ib->n == 0) + buck = iebuck(ix, b); + else{ + if(buck != iebuck(ix, b)) + break; + if(ientrycmp(&ib->data[(ib->n - 1)* IEntrySize], b) == 0){ + /* + * guess that the larger address is the correct one to use + */ + unpackientry(&ie1, &ib->data[(ib->n - 1)* IEntrySize]); + unpackientry(&ie2, b); + seterr(EOk, "duplicate index entry for score=%V type=%d\n", ie1.score, ie1.ia.type); + ib->n--; + if(ie1.ia.addr > ie2.ia.addr) + memmove(b, &ib->data[ib->n * IEntrySize], IEntrySize); + } + } + memmove(&ib->data[ib->n * IEntrySize], b, IEntrySize); + ib->n++; + ies->n--; + ies->pos += IEntrySize; + } + return buck; +} diff --git a/src/cmd/venti/buildindex.c b/src/cmd/venti/buildindex.c new file mode 100644 index 00000000..952e75dd --- /dev/null +++ b/src/cmd/venti/buildindex.c @@ -0,0 +1,145 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +static int +writebucket(Index *ix, u32int buck, IBucket *ib, ZBlock *b) +{ + ISect *is; + + is = findisect(ix, buck); + if(is == nil){ + seterr(EAdmin, "bad math in writebucket"); + return -1; + } + if(buck < is->start || buck >= is->stop) + seterr(EAdmin, "index write out of bounds: %d not in [%d,%d)\n", + buck, is->start, is->stop); + buck -= is->start; + qlock(&stats.lock); + stats.indexwrites++; + qunlock(&stats.lock); + packibucket(ib, b->data); + return writepart(is->part, is->blockbase + ((u64int)buck << is->blocklog), b->data, is->blocksize); +} + +static int +buildindex(Index *ix, Part *part, u64int off, u64int clumps, int zero) +{ + IEStream *ies; + IBucket ib, zib; + ZBlock *z, *b; + u32int next, buck; + int ok; + u64int found = 0; + +//ZZZ make buffer size configurable + b = alloczblock(ix->blocksize, 0); + z = alloczblock(ix->blocksize, 1); + ies = initiestream(part, off, clumps, 64*1024); + if(b == nil || z == nil || ies == nil){ + ok = 0; + goto breakout; + return -1; + } + ok = 0; + next = 0; + ib.data = b->data + IBucketSize; + zib.data = z->data + IBucketSize; + zib.n = 0; + zib.next = 0; + for(;;){ + buck = buildbucket(ix, ies, &ib); + found += ib.n; + if(zero){ + for(; next != buck; next++){ + if(next == ix->buckets){ + if(buck != TWID32){ + fprint(2, "bucket out of range\n"); + ok = -1; + } + goto breakout; + } + if(writebucket(ix, next, &zib, z) < 0){ + fprint(2, "can't write zero bucket to buck=%d: %r", next); + ok = -1; + } + } + } + if(buck >= ix->buckets){ + if(buck == TWID32) + break; + fprint(2, "bucket out of range\n"); + ok = -1; + goto breakout; + } + if(writebucket(ix, buck, &ib, b) < 0){ + fprint(2, "bad bucket found=%lld: %r\n", found); + ok = -1; + } + next = buck + 1; + } +breakout:; + fprint(2, "constructed index with %lld entries\n", found); + freeiestream(ies); + freezblock(z); + freezblock(b); + return ok; +} + +void +usage(void) +{ + fprint(2, "usage: buildindex [-Z] [-B blockcachesize] config tmppart\n"); + threadexitsall(0); +} + +void +threadmain(int argc, char *argv[]) +{ + Part *part; + u64int clumps, base; + u32int bcmem; + int zero; + + zero = 1; + bcmem = 0; + ARGBEGIN{ + case 'B': + bcmem = unittoull(ARGF()); + break; + case 'Z': + zero = 0; + break; + default: + usage(); + break; + }ARGEND + + if(argc != 2) + usage(); + + if(initventi(argv[0]) < 0) + sysfatal("can't init venti: %r"); + + if(bcmem < maxblocksize * (mainindex->narenas + mainindex->nsects * 4 + 16)) + bcmem = maxblocksize * (mainindex->narenas + mainindex->nsects * 4 + 16); + fprint(2, "initialize %d bytes of disk block cache\n", bcmem); + initdcache(bcmem); + + fprint(2, "building a new index %s using %s for temporary storage\n", mainindex->name, argv[1]); + + part = initpart(argv[1], 1); + if(part == nil) + sysfatal("can't initialize temporary partition: %r"); + + clumps = sortrawientries(mainindex, part, &base); + if(clumps == TWID64) + sysfatal("can't build sorted index: %r"); + fprint(2, "found and sorted index entries for clumps=%lld at %lld\n", clumps, base); + + if(buildindex(mainindex, part, base, clumps, zero) < 0) + sysfatal("can't build new index: %r"); + + threadexitsall(0); +} diff --git a/src/cmd/venti/checkarenas.c b/src/cmd/venti/checkarenas.c new file mode 100644 index 00000000..ae7c641a --- /dev/null +++ b/src/cmd/venti/checkarenas.c @@ -0,0 +1,120 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +static int verbose; + +static void +checkarena(Arena *arena, int scan, int fix) +{ + Arena old; + int err, e; + + if(verbose && arena->clumps) + printarena(2, arena); + + old = *arena; + + if(scan){ + arena->used = 0; + arena->clumps = 0; + arena->cclumps = 0; + arena->uncsize = 0; + } + + err = 0; + for(;;){ + e = syncarena(arena, 1000, 0, fix); + err |= e; + if(!(e & SyncHeader)) + break; + if(verbose && arena->clumps) + fprint(2, "."); + } + if(verbose && arena->clumps) + fprint(2, "\n"); + + err &= ~SyncHeader; + if(arena->used != old.used + || arena->clumps != old.clumps + || arena->cclumps != old.cclumps + || arena->uncsize != old.uncsize){ + fprint(2, "incorrect arena header fields\n"); + printarena(2, arena); + err |= SyncHeader; + } + + if(!err || !fix) + return; + + fprint(2, "writing fixed arena header fields\n"); + if(wbarena(arena) < 0) + fprint(2, "arena header write failed: %r\n"); +} + +void +usage(void) +{ + fprint(2, "usage: checkarenas [-afv] file\n"); + threadexitsall(0); +} + +void +threadmain(int argc, char *argv[]) +{ + ArenaPart *ap; + Part *part; + char *file; + int i, fix, scan; + + fmtinstall('V', vtscorefmt); + + statsinit(); + + fix = 0; + scan = 0; + ARGBEGIN{ + case 'f': + fix++; + break; + case 'a': + scan = 1; + break; + case 'v': + verbose++; + break; + default: + usage(); + break; + }ARGEND + + if(!fix) + readonly = 1; + + if(argc != 1) + usage(); + + file = argv[0]; + + part = initpart(file, 0); + if(part == nil) + sysfatal("can't open partition %s: %r", file); + + ap = initarenapart(part); + if(ap == nil) + sysfatal("can't initialize arena partition in %s: %r", file); + + if(verbose > 1){ + printarenapart(2, ap); + fprint(2, "\n"); + } + + initdcache(8 * MaxDiskBlock); + + for(i = 0; i < ap->narenas; i++) + checkarena(ap->arenas[i], scan, fix); + + if(verbose > 1) + printstats(); + threadexitsall(0); +} diff --git a/src/cmd/venti/checkindex.c b/src/cmd/venti/checkindex.c new file mode 100644 index 00000000..fa6f5efc --- /dev/null +++ b/src/cmd/venti/checkindex.c @@ -0,0 +1,188 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +static int +checkbucket(Index *ix, u32int buck, IBucket *ib) +{ + ISect *is; + DBlock *eb; + IBucket eib; + IEntry ie, eie; + int i, ei, ok, c; + + is = findisect(ix, buck); + if(is == nil){ + seterr(EAdmin, "bad math in checkbuckets"); + return -1; + } + buck -= is->start; + eb = getdblock(is->part, is->blockbase + ((u64int)buck << is->blocklog), 1); + if(eb == nil) + return -1; + unpackibucket(&eib, eb->data); + + ok = 0; + ei = 0; + for(i = 0; i < ib->n; i++){ + while(ei < eib.n){ + c = ientrycmp(&ib->data[i * IEntrySize], &eib.data[ei * IEntrySize]); + if(c == 0){ + unpackientry(&ie, &ib->data[i * IEntrySize]); + unpackientry(&eie, &eib.data[ei * IEntrySize]); + if(iaddrcmp(&ie.ia, &eie.ia) != 0){ + fprint(2, "bad entry in index for score=%V\n", &ib->data[i * IEntrySize]); + fprint(2, "\taddr=%lld type=%d size=%d blocks=%d\n", + ie.ia.addr, ie.ia.type, ie.ia.size, ie.ia.blocks); + fprint(2, "\taddr=%lld type=%d size=%d blocks=%d\n", + eie.ia.addr, eie.ia.type, eie.ia.size, eie.ia.blocks); + } + ei++; + goto cont; + } + if(c < 0) + break; +if(1) + fprint(2, "spurious entry in index for score=%V type=%d\n", + &eib.data[ei * IEntrySize], eib.data[ei * IEntrySize + IEntryTypeOff]); + ei++; + ok = -1; + } + fprint(2, "missing entry in index for score=%V type=%d\n", + &ib->data[i * IEntrySize], ib->data[i * IEntrySize + IEntryTypeOff]); + ok = -1; + cont:; + } + for(; ei < eib.n; ei++){ +if(1) fprint(2, "spurious entry in index for score=%V; found %d entries expected %d\n", + &eib.data[ei * IEntrySize], eib.n, ib->n); + ok = -1; + break; + } + putdblock(eb); + return ok; +} + +int +checkindex(Index *ix, Part *part, u64int off, u64int clumps, int zero) +{ + IEStream *ies; + IBucket ib, zib; + ZBlock *z, *b; + u32int next, buck; + int ok, bok; +u64int found = 0; + +//ZZZ make buffer size configurable + b = alloczblock(ix->blocksize, 0); + z = alloczblock(ix->blocksize, 1); + ies = initiestream(part, off, clumps, 64*1024); + if(b == nil || z == nil || ies == nil){ + ok = -1; + goto breakout; + return -1; + } + ok = 0; + next = 0; + ib.data = b->data; + zib.data = z->data; + zib.n = 0; + zib.next = 0; + for(;;){ + buck = buildbucket(ix, ies, &ib); + found += ib.n; + if(zero){ + for(; next != buck; next++){ + if(next == ix->buckets){ + if(buck != TWID32) + fprint(2, "bucket out of range\n"); + goto breakout; + } + bok = checkbucket(ix, next, &zib); + if(bok < 0){ + fprint(2, "bad bucket=%d found: %r\n", next); + ok = -1; + } + } + } + if(buck >= ix->buckets){ + if(buck == TWID32) + break; + fprint(2, "bucket out of range\n"); + ok = -1; + goto breakout; + } + bok = checkbucket(ix, buck, &ib); + if(bok < 0){ + fprint(2, "bad bucket found=%lld: %r\n", found); + ok = -1; + } + next = buck + 1; + } +breakout:; +fprint(2, "found %lld entries in sorted list\n", found); + freeiestream(ies); + freezblock(z); + freezblock(b); + return ok; +} + +void +usage(void) +{ + fprint(2, "usage: checkindex [-f] [-B blockcachesize] config tmp\n"); + threadexitsall(0); +} + +void +threadmain(int argc, char *argv[]) +{ + Part *part; + u64int clumps, base; + u32int bcmem; + int fix, skipz; + + fix = 0; + bcmem = 0; + skipz = 0; + ARGBEGIN{ + case 'B': + bcmem = unittoull(ARGF()); + break; + case 'f': + fix++; + break; + case 'Z': + skipz = 1; + break; + default: + usage(); + break; + }ARGEND + + if(!fix) + readonly = 1; + + if(argc != 2) + usage(); + + if(initventi(argv[0]) < 0) + sysfatal("can't init venti: %r"); + + if(bcmem < maxblocksize * (mainindex->narenas + mainindex->nsects * 4 + 16)) + bcmem = maxblocksize * (mainindex->narenas + mainindex->nsects * 4 + 16); + fprint(2, "initialize %d bytes of disk block cache\n", bcmem); + initdcache(bcmem); + + part = initpart(argv[1], 1); + if(part == nil) + sysfatal("can't initialize temporary partition: %r"); + + clumps = sortrawientries(mainindex, part, &base); + if(clumps == TWID64) + sysfatal("can't build sorted index: %r"); + fprint(2, "found and sorted index entries for clumps=%lld at %lld\n", clumps, base); + checkindex(mainindex, part, base, clumps, !skipz); + + threadexitsall(0); +} diff --git a/src/cmd/venti/clump.c b/src/cmd/venti/clump.c new file mode 100644 index 00000000..272d7aec --- /dev/null +++ b/src/cmd/venti/clump.c @@ -0,0 +1,200 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" +#include "whack.h" + +/* + * writes a lump to disk + * returns the address in amap of the clump + */ +int +storeclump(Index *ix, ZBlock *zb, u8int *sc, int type, u32int creator, IAddr *ia) +{ + ZBlock *cb; + Clump cl; + u64int a; + u8int bh[VtScoreSize]; + int size, dsize; + +if(0)print("storeclump %08x %p\n", mainindex->arenas[0], &cl); + size = zb->len; + if(size > VtMaxLumpSize){ + seterr(EStrange, "lump too large"); + return -1; + } + if(vttypevalid(type) < 0){ + seterr(EStrange, "invalid lump type"); + return -1; + } + + if(1){ + scoremem(bh, zb->data, size); + if(scorecmp(sc, bh) != 0){ + seterr(ECorrupt, "storing clump: corrupted; expected=%V got=%V, size=%d", sc, bh, size); + return -1; + } + } + + cb = alloczblock(size + ClumpSize, 0); + if(cb == nil) + return -1; + + cl.info.type = type; + cl.info.uncsize = size; + cl.creator = creator; + cl.time = now(); + scorecp(cl.info.score, sc); + +if(0)print("whackblock %08x %p\n", mainindex->arenas[0], &cl); + dsize = whackblock(&cb->data[ClumpSize], zb->data, size); +if(0)print("whackedblock %08x %p\n", mainindex->arenas[0], &cl); + if(dsize > 0 && dsize < size){ + cl.encoding = ClumpECompress; + }else{ + cl.encoding = ClumpENone; + dsize = size; + memmove(&cb->data[ClumpSize], zb->data, size); + } + cl.info.size = dsize; + + a = writeiclump(ix, &cl, cb->data); + + freezblock(cb); + if(a == 0) + return -1; + + qlock(&stats.lock); + stats.clumpwrites++; + stats.clumpbwrites += size; + stats.clumpbcomp += dsize; + qunlock(&stats.lock); + + ia->addr = a; + ia->type = type; + ia->size = size; + ia->blocks = (dsize + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog; + + return 0; +} + +u32int +clumpmagic(Arena *arena, u64int aa) +{ + u8int buf[U32Size]; + + if(readarena(arena, aa, buf, U32Size) < 0) + return TWID32; + return unpackmagic(buf); +} + +/* + * fetch a block based at addr. + * score is filled in with the block's score. + * blocks is roughly the length of the clump on disk; + * if zero, the length is unknown. + */ +ZBlock* +loadclump(Arena *arena, u64int aa, int blocks, Clump *cl, u8int *score, int verify) +{ + Unwhack uw; + ZBlock *zb, *cb; + u8int bh[VtScoreSize], *buf; + u32int n; + int nunc; + + qlock(&stats.lock); + stats.clumpreads++; + qunlock(&stats.lock); + + if(blocks <= 0) + blocks = 1; + + cb = alloczblock(blocks << ABlockLog, 0); + if(cb == nil) + return nil; + n = readarena(arena, aa, cb->data, blocks << ABlockLog); + if(n < ClumpSize){ + if(n != 0) + seterr(ECorrupt, "loadclump read less than a header"); + freezblock(cb); + return nil; + } + if(unpackclump(cl, cb->data) < 0){ + freezblock(cb); + return nil; + } + n -= ClumpSize; + if(n < cl->info.size){ + freezblock(cb); + n = cl->info.size; + cb = alloczblock(n, 0); + if(cb == nil) + return nil; + if(readarena(arena, aa + ClumpSize, cb->data, n) != n){ + seterr(ECorrupt, "loadclump read too little data"); + freezblock(cb); + return nil; + } + buf = cb->data; + }else + buf = cb->data + ClumpSize; + + scorecp(score, cl->info.score); + + zb = alloczblock(cl->info.uncsize, 0); + if(zb == nil){ + freezblock(cb); + return nil; + } + switch(cl->encoding){ + case ClumpECompress: + unwhackinit(&uw); + nunc = unwhack(&uw, zb->data, cl->info.uncsize, buf, cl->info.size); + if(nunc != cl->info.uncsize){ + if(nunc < 0) + seterr(ECorrupt, "decompression failed: %s", uw.err); + else + seterr(ECorrupt, "decompression gave partial block: %d/%d\n", nunc, cl->info.uncsize); + freezblock(cb); + freezblock(zb); + return nil; + } + break; + case ClumpENone: + if(cl->info.size != cl->info.uncsize){ + seterr(ECorrupt, "loading clump: bad uncompressed size for uncompressed block"); + freezblock(cb); + freezblock(zb); + return nil; + } + memmove(zb->data, buf, cl->info.uncsize); + break; + default: + seterr(ECorrupt, "unknown encoding in loadlump"); + freezblock(cb); + freezblock(zb); + return nil; + } + freezblock(cb); + + if(verify){ + scoremem(bh, zb->data, cl->info.uncsize); + if(scorecmp(cl->info.score, bh) != 0){ + seterr(ECorrupt, "loading clump: corrupted; expected=%V got=%V", cl->info.score, bh); + freezblock(zb); + return nil; + } + if(vttypevalid(cl->info.type) < 0){ + seterr(ECorrupt, "loading lump: invalid lump type %d", cl->info.type); + freezblock(zb); + return nil; + } + } + + qlock(&stats.lock); + stats.clumpbreads += cl->info.size; + stats.clumpbuncomp += cl->info.uncsize; + qunlock(&stats.lock); + + return zb; +} diff --git a/src/cmd/venti/clumpstats.c b/src/cmd/venti/clumpstats.c new file mode 100644 index 00000000..5a448b67 --- /dev/null +++ b/src/cmd/venti/clumpstats.c @@ -0,0 +1,128 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +int count[VtMaxLumpSize][VtMaxType]; + +enum +{ + ClumpChunks = 32*1024 +}; + +static int +readarenainfo(Arena *arena) +{ + ClumpInfo *ci, *cis; + u32int clump; + int i, n, ok; + + if(arena->clumps) + fprint(2, "reading directory for arena=%s with %d entries\n", arena->name, arena->clumps); + + cis = MKN(ClumpInfo, ClumpChunks); + ok = 0; + for(clump = 0; clump < arena->clumps; clump += n){ + n = ClumpChunks; + + if(n > arena->clumps - clump) + n = arena->clumps - clump; + + if((i=readclumpinfos(arena, clump, cis, n)) != n){ + seterr(EOk, "arena directory read failed %d not %d: %r", i, n); + ok = -1; + break; + } + + for(i = 0; i < n; i++){ + ci = &cis[i]; + if(ci->type >= VtMaxType || ci->uncsize >= VtMaxLumpSize) { + fprint(2, "bad clump: %d: type = %d: size = %d\n", clump+i, ci->type, ci->uncsize); + continue; + } +if(ci->uncsize == 422) +print("%s: %d: %V\n", arena->name, clump+i, ci->score); + count[ci->uncsize][ci->type]++; + } + } + free(cis); + if(ok < 0) + return TWID32; + return clump; +} + +static void +clumpstats(Index *ix) +{ + int ok; + ulong clumps, n; + int i, j, t; + + ok = 0; + clumps = 0; + for(i = 0; i < ix->narenas; i++){ + n = readarenainfo(ix->arenas[i]); + if(n == TWID32){ + ok = -1; + break; + } + clumps += n; + } + + if(ok < 0) + return; + + print("clumps = %ld\n", clumps); + for(i=0; i<VtMaxLumpSize; i++) { + t = 0; + for(j=0; j<VtMaxType; j++) + t += count[i][j]; + if(t == 0) + continue; + print("%d\t%d", i, t); + for(j=0; j<VtMaxType; j++) + print("\t%d", count[i][j]); + print("\n"); + } +} + + +void +usage(void) +{ + fprint(2, "usage: clumpstats [-B blockcachesize] config\n"); + threadexitsall(0); +} + +void +threadmain(int argc, char *argv[]) +{ + u32int bcmem; + + bcmem = 0; + + ARGBEGIN{ + case 'B': + bcmem = unittoull(ARGF()); + break; + default: + usage(); + break; + }ARGEND + + readonly = 1; + + if(argc != 1) + usage(); + + if(initventi(argv[0]) < 0) + sysfatal("can't init venti: %r"); + + if(bcmem < maxblocksize * (mainindex->narenas + mainindex->nsects * 4 + 16)) + bcmem = maxblocksize * (mainindex->narenas + mainindex->nsects * 4 + 16); + fprint(2, "initialize %d bytes of disk block cache\n", bcmem); + initdcache(bcmem); + + clumpstats(mainindex); + + threadexitsall(0); +} diff --git a/src/cmd/venti/config.c b/src/cmd/venti/config.c new file mode 100644 index 00000000..cdcaad10 --- /dev/null +++ b/src/cmd/venti/config.c @@ -0,0 +1,144 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +Index *mainindex; +int paranoid = 1; /* should verify hashes on disk read */ + +static ArenaPart *configarenas(char *file); +static ISect *configisect(char *file); + +int +initventi(char *file) +{ + Config conf; + + fmtinstall('V', vtscorefmt); + + statsinit(); + + if(file == nil){ + seterr(EOk, "no configuration file"); + return -1; + } + if(runconfig(file, &conf) < 0){ + seterr(EOk, "can't initialize venti: %r"); + return -1; + } + mainindex = initindex(conf.index, conf.sects, conf.nsects); + if(mainindex == nil) + return -1; + return 0; +} + +/* + * configs : + * | configs config + * config : "isect" filename + * | "arenas" filename + * | "index" name + * + * '#' and \n are comments + */ +enum +{ + MaxArgs = 2 +}; +int +runconfig(char *file, Config *config) +{ + ArenaPart **av; + ISect **sv; + IFile f; + char *s, *line, *flds[MaxArgs + 1]; + int i, ok; + + if(readifile(&f, file) < 0) + return -1; + config->index = nil; + config->naparts = 0; + config->aparts = nil; + config->nsects = 0; + config->sects = nil; + ok = -1; + line = nil; + for(;;){ + s = ifileline(&f); + if(s == nil){ + ok = 0; + break; + } + line = estrdup(s); + i = getfields(s, flds, MaxArgs + 1, 1, " \t\r"); + if(i == 2 && strcmp(flds[0], "isect") == 0){ + sv = MKN(ISect*, config->nsects + 1); + for(i = 0; i < config->nsects; i++) + sv[i] = config->sects[i]; + free(config->sects); + config->sects = sv; + config->sects[config->nsects] = configisect(flds[1]); + if(config->sects[config->nsects] == nil) + break; + config->nsects++; + }else if(i == 2 && strcmp(flds[0], "arenas") == 0){ + av = MKN(ArenaPart*, config->naparts + 1); + for(i = 0; i < config->naparts; i++) + av[i] = config->aparts[i]; + free(config->aparts); + config->aparts = av; + config->aparts[config->naparts] = configarenas(flds[1]); + if(config->aparts[config->naparts] == nil) + break; + config->naparts++; + }else if(i == 2 && strcmp(flds[0], "index") == 0){ + if(nameok(flds[1]) < 0){ + seterr(EAdmin, "illegal index name %s in config file %s", flds[1], config); + break; + } + if(config->index != nil){ + seterr(EAdmin, "duplicate indices in config file %s", config); + break; + } + config->index = estrdup(flds[1]); + }else{ + seterr(EAdmin, "illegal line '%s' in configuration file %s", line, config); + break; + } + free(line); + line = nil; + } + free(line); + freeifile(&f); + if(ok < 0){ + free(config->sects); + config->sects = nil; + free(config->aparts); + config->aparts = nil; + } + return ok; +} + +static ISect* +configisect(char *file) +{ + Part *part; + + fprint(2, "configure index section in %s\n", file); + + part = initpart(file, 0); + if(part == nil) + return nil; + return initisect(part); +} + +static ArenaPart* +configarenas(char *file) +{ + Part *part; + + fprint(2, "configure arenas in %s\n", file); + part = initpart(file, 0); + if(part == nil) + return nil; + return initarenapart(part); +} diff --git a/src/cmd/venti/conv.c b/src/cmd/venti/conv.c new file mode 100644 index 00000000..ae89baa7 --- /dev/null +++ b/src/cmd/venti/conv.c @@ -0,0 +1,500 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +/* + * disk structure conversion routines + */ +#define U8GET(p) ((p)[0]) +#define U16GET(p) (((p)[0]<<8)|(p)[1]) +#define U32GET(p) ((u32int)(((p)[0]<<24)|((p)[1]<<16)|((p)[2]<<8)|(p)[3])) +#define U64GET(p) (((u64int)U32GET(p)<<32)|(u64int)U32GET((p)+4)) + +#define U8PUT(p,v) (p)[0]=(v)&0xFF +#define U16PUT(p,v) (p)[0]=((v)>>8)&0xFF;(p)[1]=(v)&0xFF +#define U32PUT(p,v) (p)[0]=((v)>>24)&0xFF;(p)[1]=((v)>>16)&0xFF;(p)[2]=((v)>>8)&0xFF;(p)[3]=(v)&0xFF +#define U64PUT(p,v,t32) t32=(v)>>32;U32PUT(p,t32);t32=(v);U32PUT((p)+4,t32) + +static struct { + u32int m; + char *s; +} magics[] = { + ArenaPartMagic, "ArenaPartMagic", + ArenaHeadMagic, "ArenaHeadMagic", + ArenaMagic, "ArenaMagic", + ISectMagic, "ISectMagic", +}; + +static char* +fmtmagic(char *s, u32int m) +{ + int i; + + for(i=0; i<nelem(magics); i++) + if(magics[i].m == m) + return magics[i].s; + sprint(s, "0x%08lux", m); + return s; +} + +u32int +unpackmagic(u8int *buf) +{ + return U32GET(buf); +} + +void +packmagic(u32int magic, u8int *buf) +{ + U32PUT(buf, magic); +} + +int +unpackarenapart(ArenaPart *ap, u8int *buf) +{ + u8int *p; + u32int m; + char fbuf[20]; + + p = buf; + + m = U32GET(p); + if(m != ArenaPartMagic){ + seterr(ECorrupt, "arena set has wrong magic number: %s expected ArenaPartMagic (%lux)", fmtmagic(fbuf, m), ArenaPartMagic); + return -1; + } + p += U32Size; + ap->version = U32GET(p); + p += U32Size; + ap->blocksize = U32GET(p); + p += U32Size; + ap->arenabase = U32GET(p); + p += U32Size; + + if(buf + ArenaPartSize != p) + sysfatal("unpackarenapart unpacked wrong amount"); + + return 0; +} + +int +packarenapart(ArenaPart *ap, u8int *buf) +{ + u8int *p; + + p = buf; + + U32PUT(p, ArenaPartMagic); + p += U32Size; + U32PUT(p, ap->version); + p += U32Size; + U32PUT(p, ap->blocksize); + p += U32Size; + U32PUT(p, ap->arenabase); + p += U32Size; + + if(buf + ArenaPartSize != p) + sysfatal("packarenapart packed wrong amount"); + + return 0; +} + +int +unpackarena(Arena *arena, u8int *buf) +{ + u8int *p; + u32int m; + char fbuf[20]; + + p = buf; + + m = U32GET(p); + if(m != ArenaMagic){ + seterr(ECorrupt, "arena has wrong magic number: %s expected ArenaMagic (%lux)", fmtmagic(fbuf, m), m, ArenaMagic); + return -1; + } + p += U32Size; + arena->version = U32GET(p); + p += U32Size; + namecp(arena->name, (char*)p); + p += ANameSize; + arena->clumps = U32GET(p); + p += U32Size; + arena->cclumps = U32GET(p); + p += U32Size; + arena->ctime = U32GET(p); + p += U32Size; + arena->wtime = U32GET(p); + p += U32Size; + arena->used = U64GET(p); + p += U64Size; + arena->uncsize = U64GET(p); + p += U64Size; + arena->sealed = U8GET(p); + p += U8Size; + + if(buf + ArenaSize != p) + sysfatal("unpackarena unpacked wrong amount"); + + return 0; +} + +int +packarena(Arena *arena, u8int *buf) +{ + u8int *p; + u32int t32; + + p = buf; + + U32PUT(p, ArenaMagic); + p += U32Size; + U32PUT(p, arena->version); + p += U32Size; + namecp((char*)p, arena->name); + p += ANameSize; + U32PUT(p, arena->clumps); + p += U32Size; + U32PUT(p, arena->cclumps); + p += U32Size; + U32PUT(p, arena->ctime); + p += U32Size; + U32PUT(p, arena->wtime); + p += U32Size; + U64PUT(p, arena->used, t32); + p += U64Size; + U64PUT(p, arena->uncsize, t32); + p += U64Size; + U8PUT(p, arena->sealed); + p += U8Size; + + if(buf + ArenaSize != p) + sysfatal("packarena packed wrong amount"); + + return 0; +} + +int +unpackarenahead(ArenaHead *head, u8int *buf) +{ + u8int *p; + u32int m; + char fbuf[20]; + + p = buf; + + m = U32GET(p); + if(m != ArenaHeadMagic){ + seterr(ECorrupt, "arena has wrong magic number: %s expected ArenaHeadMagic (%lux)", + fmtmagic(fbuf, m), ArenaHeadMagic); + return -1; + } + p += U32Size; + head->version = U32GET(p); + p += U32Size; + namecp(head->name, (char*)p); + p += ANameSize; + head->blocksize = U32GET(p); + p += U32Size; + head->size = U64GET(p); + p += U64Size; + + if(buf + ArenaHeadSize != p) + sysfatal("unpackarenahead unpacked wrong amount"); + + return 0; +} + +int +packarenahead(ArenaHead *head, u8int *buf) +{ + u8int *p; + u32int t32; + + p = buf; + + U32PUT(p, ArenaHeadMagic); + p += U32Size; + U32PUT(p, head->version); + p += U32Size; + namecp((char*)p, head->name); + p += ANameSize; + U32PUT(p, head->blocksize); + p += U32Size; + U64PUT(p, head->size, t32); + p += U64Size; + + if(buf + ArenaHeadSize != p) + sysfatal("packarenahead packed wrong amount"); + + return 0; +} + +static int +checkclump(Clump *w) +{ + if(w->encoding == ClumpENone){ + if(w->info.size != w->info.uncsize){ + seterr(ECorrupt, "uncompressed wad size mismatch"); + return -1; + } + }else if(w->encoding == ClumpECompress){ + if(w->info.size >= w->info.uncsize){ + seterr(ECorrupt, "compressed lump has inconsistent block sizes %d %d", w->info.size, w->info.uncsize); + return -1; + } + }else{ + seterr(ECorrupt, "clump has illegal encoding"); + return -1; + } + + return 0; +} + +int +unpackclump(Clump *c, u8int *buf) +{ + u8int *p; + u32int magic; + + p = buf; + magic = U32GET(p); + if(magic != ClumpMagic){ + seterr(ECorrupt, "clump has bad magic number=%#8.8ux", magic); + return -1; + } + p += U32Size; + + c->info.type = vtfromdisktype(U8GET(p)); + p += U8Size; + c->info.size = U16GET(p); + p += U16Size; + c->info.uncsize = U16GET(p); + p += U16Size; + scorecp(c->info.score, p); + p += VtScoreSize; + + c->encoding = U8GET(p); + p += U8Size; + c->creator = U32GET(p); + p += U32Size; + c->time = U32GET(p); + p += U32Size; + + if(buf + ClumpSize != p) + sysfatal("unpackclump unpacked wrong amount"); + + return checkclump(c); +} + +int +packclump(Clump *c, u8int *buf) +{ + u8int *p; + + p = buf; + U32PUT(p, ClumpMagic); + p += U32Size; + + U8PUT(p, vttodisktype(c->info.type)); + p += U8Size; + U16PUT(p, c->info.size); + p += U16Size; + U16PUT(p, c->info.uncsize); + p += U16Size; + scorecp(p, c->info.score); + p += VtScoreSize; + + U8PUT(p, c->encoding); + p += U8Size; + U32PUT(p, c->creator); + p += U32Size; + U32PUT(p, c->time); + p += U32Size; + + if(buf + ClumpSize != p) + sysfatal("packclump packed wrong amount"); + + return checkclump(c); +} + +void +unpackclumpinfo(ClumpInfo *ci, u8int *buf) +{ + u8int *p; + + p = buf; + ci->type = vtfromdisktype(U8GET(p)); + p += U8Size; + ci->size = U16GET(p); + p += U16Size; + ci->uncsize = U16GET(p); + p += U16Size; + scorecp(ci->score, p); + p += VtScoreSize; + + if(buf + ClumpInfoSize != p) + sysfatal("unpackclumpinfo unpacked wrong amount"); +} + +void +packclumpinfo(ClumpInfo *ci, u8int *buf) +{ + u8int *p; + + p = buf; + U8PUT(p, vttodisktype(ci->type)); + p += U8Size; + U16PUT(p, ci->size); + p += U16Size; + U16PUT(p, ci->uncsize); + p += U16Size; + scorecp(p, ci->score); + p += VtScoreSize; + + if(buf + ClumpInfoSize != p) + sysfatal("packclumpinfo packed wrong amount"); +} + +int +unpackisect(ISect *is, u8int *buf) +{ + u8int *p; + u32int m; + char fbuf[20]; + + p = buf; + + + m = U32GET(p); + if(m != ISectMagic){ + seterr(ECorrupt, "index section has wrong magic number: %s expected ISectMagic (%lux)", + fmtmagic(fbuf, m), ISectMagic); + return -1; + } + p += U32Size; + is->version = U32GET(p); + p += U32Size; + namecp(is->name, (char*)p); + p += ANameSize; + namecp(is->index, (char*)p); + p += ANameSize; + is->blocksize = U32GET(p); + p += U32Size; + is->blockbase = U32GET(p); + p += U32Size; + is->blocks = U32GET(p); + p += U32Size; + is->start = U32GET(p); + p += U32Size; + is->stop = U32GET(p); + p += U32Size; + + if(buf + ISectSize != p) + sysfatal("unpackisect unpacked wrong amount"); + + return 0; +} + +int +packisect(ISect *is, u8int *buf) +{ + u8int *p; + + p = buf; + + U32PUT(p, ISectMagic); + p += U32Size; + U32PUT(p, is->version); + p += U32Size; + namecp((char*)p, is->name); + p += ANameSize; + namecp((char*)p, is->index); + p += ANameSize; + U32PUT(p, is->blocksize); + p += U32Size; + U32PUT(p, is->blockbase); + p += U32Size; + U32PUT(p, is->blocks); + p += U32Size; + U32PUT(p, is->start); + p += U32Size; + U32PUT(p, is->stop); + p += U32Size; + + if(buf + ISectSize != p) + sysfatal("packisect packed wrong amount"); + + return 0; +} + +void +unpackientry(IEntry *ie, u8int *buf) +{ + u8int *p; + + p = buf; + + scorecp(ie->score, p); + p += VtScoreSize; + ie->wtime = U32GET(p); + p += U32Size; + ie->train = U16GET(p); + p += U16Size; + ie->ia.addr = U64GET(p); +if(ie->ia.addr>>56) print("%.8H => %llux\n", p, ie->ia.addr); + p += U64Size; + ie->ia.size = U16GET(p); + p += U16Size; + if(p - buf != IEntryTypeOff) + sysfatal("unpackientry bad IEntryTypeOff amount"); + ie->ia.type = vtfromdisktype(U8GET(p)); + p += U8Size; + ie->ia.blocks = U8GET(p); + p += U8Size; + + if(p - buf != IEntrySize) + sysfatal("unpackientry unpacked wrong amount"); +} + +void +packientry(IEntry *ie, u8int *buf) +{ + u32int t32; + u8int *p; + + p = buf; + + scorecp(p, ie->score); + p += VtScoreSize; + U32PUT(p, ie->wtime); + p += U32Size; + U16PUT(p, ie->train); + p += U16Size; + U64PUT(p, ie->ia.addr, t32); + p += U64Size; + U16PUT(p, ie->ia.size); + p += U16Size; + U8PUT(p, vttodisktype(ie->ia.type)); + p += U8Size; + U8PUT(p, ie->ia.blocks); + p += U8Size; + + if(p - buf != IEntrySize) + sysfatal("packientry packed wrong amount"); +} + +void +unpackibucket(IBucket *b, u8int *buf) +{ + b->n = U16GET(buf); + b->next = U32GET(&buf[U16Size]); + b->data = buf + IBucketSize; +} + +void +packibucket(IBucket *b, u8int *buf) +{ + U16PUT(buf, b->n); + U32PUT(&buf[U16Size], b->next); +} diff --git a/src/cmd/venti/copy.c b/src/cmd/venti/copy.c new file mode 100644 index 00000000..8b5bb84d --- /dev/null +++ b/src/cmd/venti/copy.c @@ -0,0 +1,169 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +int fast; + +VtConn *zsrc, *zdst; + +void +usage(void) +{ + fprint(2, "usage: copy src-host dst-host score [type]\n"); + threadexitsall("usage"); +} + +int +parsescore(uchar *score, char *buf, int n) +{ + int i, c; + + memset(score, 0, VtScoreSize); + + if(n < VtScoreSize*2) + return -1; + for(i=0; i<VtScoreSize*2; i++) { + if(buf[i] >= '0' && buf[i] <= '9') + c = buf[i] - '0'; + else if(buf[i] >= 'a' && buf[i] <= 'f') + c = buf[i] - 'a' + 10; + else if(buf[i] >= 'A' && buf[i] <= 'F') + c = buf[i] - 'A' + 10; + else { + return -1; + } + + if((i & 1) == 0) + c <<= 4; + + score[i>>1] |= c; + } + return 0; +} + +void +walk(uchar score[VtScoreSize], uint type, int base) +{ + int i, n, sub; + uchar *buf; + VtEntry e; + VtRoot root; + + if(memcmp(score, vtzeroscore, VtScoreSize) == 0) + return; + + buf = vtmallocz(VtMaxLumpSize); + if(fast && vtread(zdst, score, type, buf, VtMaxLumpSize) >= 0){ + fprint(2, "skip %V\n", score); + free(buf); + return; + } + + n = vtread(zsrc, score, type, buf, VtMaxLumpSize); + if(n < 0){ + fprint(2, "warning: could not read block %V %d: %r", score, type); + return; + } + + switch(type){ + case VtRootType: + if(vtrootunpack(&root, buf) < 0){ + fprint(2, "warning: could not unpack root in %V %d\n", score, type); + break; + } + walk(root.score, VtDirType, 0); + walk(root.prev, VtRootType, 0); + break; + + case VtDirType: + for(i=0; i<n/VtEntrySize; i++){ + if(vtentryunpack(&e, buf, i) < 0){ + fprint(2, "warning: could not unpack entry #%d in %V %d\n", i, score, type); + continue; + } + if(!(e.flags & VtEntryActive)) + continue; + if(e.flags&VtEntryDir) + base = VtDirType; + else + base = VtDataType; + sub = base | ((e.flags&VtEntryDepthMask)>>VtEntryDepthShift); + walk(e.score, sub, base); + } + break; + + case VtDataType: + break; + + default: /* pointers */ + for(i=0; i<n; i+=VtScoreSize) + if(memcmp(buf+i, vtzeroscore, VtScoreSize) != 0) + walk(buf+i, type-1, base); + break; + } + + if(vtwrite(zdst, score, type, buf, n) < 0) + fprint(2, "warning: could not write block %V %d: %r", score, type); + free(buf); +} + +void +threadmain(int argc, char *argv[]) +{ + int type, n; + uchar score[VtScoreSize]; + uchar *buf; + + ARGBEGIN{ + case 'f': + fast = 1; + break; + default: + usage(); + break; + }ARGEND + + if(argc != 3 && argc != 4) + usage(); + + fmtinstall('V', vtscorefmt); + + if(parsescore(score, argv[2], strlen(argv[2]) < 0)) + sysfatal("could not parse score: %r"); + + buf = vtmallocz(VtMaxLumpSize); + + zsrc = vtdial(argv[0]); + if(zsrc == nil) + sysfatal("could not dial src server: %r"); + if(vtconnect(zsrc) < 0) + sysfatal("vtconnect src: %r"); + + zdst = vtdial(argv[1]); + if(zdst == nil) + sysfatal("could not dial dst server: %r"); + if(vtconnect(zdst) < 0) + sysfatal("vtconnect dst: %r"); + + if(argc == 4){ + type = atoi(argv[3]); + n = vtread(zsrc, score, type, buf, VtMaxLumpSize); + if(n < 0) + sysfatal("could not read block: %r"); + }else{ + for(type=0; type<VtMaxType; type++){ + n = vtread(zsrc, score, type, buf, VtMaxLumpSize); + if(n >= 0) + break; + } + if(type == VtMaxType) + sysfatal("could not find block %V of any type", score); + } + + walk(score, type, VtDirType); + + if(vtsync(zdst) < 0) + sysfatal("could not sync dst server: %r"); + + threadexitsall(0); +} diff --git a/src/cmd/venti/dat.h b/src/cmd/venti/dat.h new file mode 100644 index 00000000..b9f395c8 --- /dev/null +++ b/src/cmd/venti/dat.h @@ -0,0 +1,497 @@ +typedef struct Config Config; +typedef struct AMap AMap; +typedef struct AMapN AMapN; +typedef struct Arena Arena; +typedef struct ArenaHead ArenaHead; +typedef struct ArenaPart ArenaPart; +typedef struct CIBlock CIBlock; +typedef struct Clump Clump; +typedef struct ClumpInfo ClumpInfo; +typedef struct IAddr IAddr; +typedef struct IBucket IBucket; +typedef struct ICache ICache; +typedef struct IEStream IEStream; +typedef struct IEntry IEntry; +typedef struct IFile IFile; +typedef struct ISect ISect; +typedef struct Index Index; +typedef struct Lump Lump; +typedef struct DBlock DBlock; +typedef struct Part Part; +typedef struct Stats Stats; +typedef struct ZBlock ZBlock; + +#define TWID32 ((u32int)~(u32int)0) +#define TWID64 ((u64int)~(u64int)0) +#define TWID8 ((u8int)~(u8int)0) + +enum +{ + ABlockLog = 9, /* log2(512), the quantum for reading arenas */ + ANameSize = 64, + MaxDiskBlock = 64*1024, /* max. allowed size for a disk block */ + MaxIoSize = 64*1024, /* max. allowed size for a disk io operation */ + PartBlank = 256*1024, /* untouched section at beginning of partition */ + HeadSize = 512, /* size of a header after PartBlank */ + MinArenaSize = 1*1024*1024, /* smallest reasonable arena size */ + IndexBase = 1024*1024, /* initial address to use in an index */ + MaxIo = 64*1024, /* max size of a single read or write operation */ + ICacheBits = 16, /* default bits for indexing icache */ + ICacheDepth = 4, /* default depth of an icache hash chain */ + MaxAMap = 2*1024, /* max. allowed arenas in an address mapping; must be < 32*1024 */ + + /* + * return codes from syncarena + */ + SyncDataErr = 1 << 0, /* problem reading the clump data */ + SyncCIErr = 1 << 1, /* found erroneous clump directory entries */ + SyncCIZero = 1 << 2, /* found unwritten clump directory entries */ + SyncFixErr = 1 << 3, /* error writing fixed data */ + SyncHeader = 1 << 4, /* altered header fields */ + + /* + * error severity + */ + EOk = 0, /* error expected in normal operation */ + EStrange, /* strange error that should be logged */ + ECorrupt, /* corrupted data found in arenas */ + EICorrupt, /* corrupted data found in index */ + EAdmin, /* should be brought to administrators' attention */ + ECrash, /* really bad internal error */ + EBug, /* a limitation which should be fixed */ + EInconsist, /* inconsistencies between index and arena */ + EMax, + + /* + * internal disk formats for the venti archival storage system + */ + /* + * magic numbers on disk + */ + ClumpMagic = 0xd15cb10c, /* clump header */ + ClumpFreeMagic = 0, /* free clump; terminates active clump log */ + + ArenaPartMagic = 0xa9e4a5e7, /* arena partition header */ + ArenaMagic = 0xf2a14ead, /* arena trailer */ + ArenaHeadMagic = 0xd15c4ead, /* arena header */ + + ISectMagic = 0xd15c5ec7, /* index header */ + + ArenaPartVersion = 3, + ArenaVersion = 4, + IndexVersion = 1, + ISectVersion = 1, + + /* + * encodings of clumps on disk + */ + ClumpEErr = 0, /* can't happen */ + ClumpENone, /* plain */ + ClumpECompress, /* compressed */ + ClumpEMax, + + /* + * marker for corrupted data on disk + */ + VtTypeCorrupt = VtMaxType, + + /* + * sizes in bytes on disk + */ + U8Size = 1, + U16Size = 2, + U32Size = 4, + U64Size = 8, + + ArenaPartSize = 4 * U32Size, + ArenaSize = 2 * U64Size + 6 * U32Size + ANameSize + U8Size, + ArenaHeadSize = U64Size + 3 * U32Size + ANameSize, + ISectSize = 7 * U32Size + 2 * ANameSize, + ClumpInfoSize = U8Size + 2 * U16Size + VtScoreSize, + ClumpSize = ClumpInfoSize + U8Size + 3 * U32Size, + IBucketSize = U32Size + U16Size, + IEntrySize = U64Size + U32Size + 2*U16Size + 2*U8Size + VtScoreSize, + IEntryTypeOff = VtScoreSize + U64Size + U32Size + 2 * U16Size, + + MaxClumpBlocks = (VtMaxLumpSize + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog, + + VentiZZZZZZZZ +}; + +/* + * results of parsing and initializing a config file + */ +struct Config +{ + char *index; /* name of the index to initialize */ + int naparts; /* arena partitions initialized */ + ArenaPart **aparts; + int nsects; /* index sections initialized */ + ISect **sects; +}; + +/* + * a Part is the low level interface to files or disks. + * there are two main types of partitions + * arena paritions, which some number of arenas, each in a sub-partition. + * index partition, which only have one subpartition. + */ +struct Part +{ + int fd; /* rock for accessing the disk */ + u64int size; /* size of the partiton */ + u32int blocksize; /* block size for reads and writes */ + char *name; +}; + +/* + * a cached block from the partition + * yuck -- most of this is internal structure for the cache + * all other routines should only use data + */ +struct DBlock +{ + u8int *data; + + Part *part; /* partition in which cached */ + u64int addr; /* base address on the partition */ + u16int size; /* amount of data available, not amount allocated; should go away */ + DBlock *next; /* doubly linked hash chains */ + DBlock *prev; + u32int heap; /* index in heap table */ + u32int used; /* last reference times */ + u32int used2; + u32int ref; /* reference count */ + QLock lock; /* for access to data only */ +}; + +/* + * a cached block from the partition + * yuck -- most of this is internal structure for the cache + * all other routines should only use data + * double yuck -- this is mostly the same as a DBlock + */ +struct Lump +{ + Packet *data; + + Part *part; /* partition in which cached */ + u8int score[VtScoreSize]; /* score of packet */ + u8int type; /* type of packet */ + u16int size; /* amount of data allocated to hold packet */ + Lump *next; /* doubly linked hash chains */ + Lump *prev; + u32int heap; /* index in heap table */ + u32int used; /* last reference times */ + u32int used2; + u32int ref; /* reference count */ + QLock lock; /* for access to data only */ +}; + +/* + * mapping between names and address ranges + */ +struct AMap +{ + u64int start; + u64int stop; + char name[ANameSize]; +}; + +/* + * an AMap along with a length + */ +struct AMapN +{ + int n; + AMap *map; +}; + +/* + * an ArenaPart is a partition made up of Arenas + * it exists because most os's don't support many partitions, + * and we want to have many different Arenas + */ +struct ArenaPart +{ + Part *part; + u64int size; /* size of underlying partition, rounded down to blocks */ + Arena **arenas; + u32int tabbase; /* base address of arena table on disk */ + u32int tabsize; /* max. bytes in arena table */ + + /* + * fields stored on disk + */ + u32int version; + u32int blocksize; /* "optimal" block size for reads and writes */ + u32int arenabase; /* base address of first arena */ + + /* + * stored in the arena mapping table on disk + */ + AMap *map; + int narenas; +}; + +/* + * info about one block in the clump info cache + */ +struct CIBlock +{ + u32int block; /* blocks in the directory */ + int offset; /* offsets of one clump in the data */ + DBlock *data; +}; + +/* + * an Arena is a log of Clumps, preceeded by an ArenaHeader, + * and followed by a Arena, each in one disk block. + * struct on disk is not always up to date, but should be self-consistent. + * to sync after reboot, follow clumps starting at used until ClumpFreeMagic if found. + * <struct name="Arena" type="Arena *"> + * <field name="name" val="s->name" type="AName"/> + * <field name="version" val="s->version" type="U32int"/> + * <field name="partition" val="s->part->name" type="AName"/> + * <field name="blocksize" val="s->blocksize" type="U32int"/> + * <field name="start" val="s->base" type="U64int"/> + * <field name="stop" val="s->base+2*s->blocksize" type="U64int"/> + * <field name="created" val="s->ctime" type="U32int"/> + * <field name="modified" val="s->wtime" type="U32int"/> + * <field name="sealed" val="s->sealed" type="Sealed"/> + * <field name="score" val="s->score" type="Score"/> + * <field name="clumps" val="s->clumps" type="U32int"/> + * <field name="compressedclumps" val="s->cclumps" type="U32int"/> + * <field name="data" val="s->uncsize" type="U64int"/> + * <field name="compresseddata" val="s->used - s->clumps * ClumpSize" type="U64int"/> + * <field name="storage" val="s->used + s->clumps * ClumpInfoSize" type="U64int"/> + * </struct> + */ +struct Arena +{ + QLock lock; /* lock for arena fields, writing to disk */ + Part *part; /* partition in which arena lives */ + int blocksize; /* size of block to read or write */ + u64int base; /* base address on disk */ + u64int size; /* total space in the arena */ + u64int limit; /* storage limit for clumps */ + u8int score[VtScoreSize]; /* score of the entire sealed & summed arena */ + + int clumpmax; /* ClumpInfos per block */ + CIBlock cib; /* dirty clump directory block */ + + /* + * fields stored on disk + */ + u32int version; + char name[ANameSize]; /* text label */ + u32int clumps; /* number of allocated clumps */ + u32int cclumps; /* clumps which are compressed; informational only */ + u32int ctime; /* first time a block was written */ + u32int wtime; /* last time a block was written */ + u64int used; /* number of bytes currently used */ + u64int uncsize; /* total of all clumps's uncsize; informational only */ + u8int sealed; /* arena all filled up? */ +}; + +/* + * redundant storage of some fields at the beginning of each arena + */ +struct ArenaHead +{ + u32int version; + char name[ANameSize]; + u32int blocksize; + u64int size; +}; + +/* + * most interesting meta information for a clump. + * stored in each clump's header and in the Arena's directory, + * stored in reverse order just prior to the arena trailer + */ +struct ClumpInfo +{ + u8int type; + u16int size; /* size of disk data, not including header */ + u16int uncsize; /* size of uncompressed data */ + u8int score[VtScoreSize]; /* score of the uncompressed data only */ +}; + +/* + * header for an immutable clump of data + */ +struct Clump +{ + ClumpInfo info; + u8int encoding; + u32int creator; /* initial client which wrote the block */ + u32int time; /* creation at gmt seconds since 1/1/1970 */ +}; + +/* + * index of all clumps according to their score + * this is just a wrapper to tie together the index sections + * <struct name="Index" type="Index *"> + * <field name="name" val="s->name" type="AName"/> + * <field name="version" val="s->version" type="U32int"/> + * <field name="blocksize" val="s->blocksize" type="U32int"/> + * <field name="tabsize" val="s->tabsize" type="U32int"/> + * <field name="buckets" val="s->buckets" type="U32int"/> + * <field name="buckdiv" val="s->div" type="U32int"/> + * <array name="sect" val="&s->smap[i]" elems="s->nsects" type="Amap"/> + * <array name="amap" val="&s->amap[i]" elems="s->narenas" type="Amap"/> + * <array name="arena" val="s->arenas[i]" elems="s->narenas" type="Arena"/> + * </struct> + * <struct name="Amap" type="AMap *"> + * <field name="name" val="s->name" type="AName"/> + * <field name="start" val="s->start" type="U64int"/> + * <field name="stop" val="s->stop" type="U64int"/> + * </struct> + */ +struct Index +{ + u32int div; /* divisor for mapping score to bucket */ + u32int buckets; /* last bucket used in disk hash table */ + u32int blocksize; + u32int tabsize; /* max. bytes in index config */ + int mapalloc; /* first arena to check when adding a lump */ + Arena **arenas; /* arenas in the mapping */ + ISect **sects; /* sections which hold the buckets */ + + /* + * fields stored in config file + */ + u32int version; + char name[ANameSize]; /* text label */ + int nsects; + AMap *smap; /* mapping of buckets to index sections */ + int narenas; + AMap *amap; /* mapping from index addesses to arenas */ +}; + +/* + * one part of the bucket storage for an index. + * the index blocks are sequentially allocated + * across all of the sections. + */ +struct ISect +{ + Part *part; + int blocklog; /* log2(blocksize) */ + int buckmax; /* max. entries in a index bucket */ + u32int tabbase; /* base address of index config table on disk */ + u32int tabsize; /* max. bytes in index config */ + + /* + * fields stored on disk + */ + u32int version; + char name[ANameSize]; /* text label */ + char index[ANameSize]; /* index owning the section */ + u32int blocksize; /* size of hash buckets in index */ + u32int blockbase; /* address of start of on disk index table */ + u32int blocks; /* total blocks on disk; some may be unused */ + u32int start; /* first bucket in this section */ + u32int stop; /* limit of buckets in this section */ +}; + +/* + * externally interesting part of an IEntry + */ +struct IAddr +{ + u64int addr; + u16int size; /* uncompressed size */ + u8int type; /* type of block */ + u8int blocks; /* arena io quanta for Clump + data */ +}; + +/* + * entries in the index + * kept in IBuckets in the disk index table, + * cached in the memory ICache. + */ +struct IEntry +{ + u8int score[VtScoreSize]; + IEntry *next; /* next in hash chain */ + u32int wtime; /* last write time */ + u16int train; /* relative train containing the most recent ref; 0 if no ref, 1 if in same car */ + u8int rac; /* read ahead count */ + IAddr ia; +}; + +/* + * buckets in the on disk index table + */ +struct IBucket +{ + u16int n; /* number of active indices */ + u32int next; /* overflow bucket */ + u8int *data; +}; + +/* + * temporary buffers used by individual threads + */ +struct ZBlock +{ + u32int len; + u8int *data; +}; + +/* + * simple input buffer for a '\0' terminated text file + */ +struct IFile +{ + char *name; /* name of the file */ + ZBlock *b; /* entire contents of file */ + u32int pos; /* current position in the file */ +}; + +/* + * statistics about the operation of the server + * mainly for performance monitoring and profiling. + */ +struct Stats +{ + QLock lock; + long lumpwrites; /* protocol block writes */ + long lumpreads; /* protocol block reads */ + long lumphit; /* lump cache hit */ + long lumpmiss; /* lump cache miss */ + long clumpwrites; /* clumps to disk */ + vlong clumpbwrites; /* clump data bytes to disk */ + vlong clumpbcomp; /* clump bytes compressed */ + long clumpreads; /* clumps from disk */ + vlong clumpbreads; /* clump data bytes from disk */ + vlong clumpbuncomp; /* clump bytes uncompressed */ + long ciwrites; /* clump directory to disk */ + long cireads; /* clump directory from disk */ + long indexwrites; /* index to disk */ + long indexreads; /* index from disk */ + long indexwreads; /* for writing a new entry */ + long indexareads; /* for allocating an overflow block */ + long diskwrites; /* total disk writes */ + long diskreads; /* total disk reads */ + vlong diskbwrites; /* total disk bytes written */ + vlong diskbreads; /* total disk bytes read */ + long pchit; /* partition cache hit */ + long pcmiss; /* partition cache miss */ + long pcreads; /* partition cache reads from disk */ + vlong pcbreads; /* partition cache bytes read */ + long icinserts; /* stores into index cache */ + long iclookups; /* index cache lookups */ + long ichits; /* hits in the cache */ + long icfills; /* successful fills from index */ +}; + +extern Index *mainindex; +extern u32int maxblocksize; /* max. block size used by any partition */ +extern int paranoid; /* should verify hashes on disk read */ +extern int queuewrites; /* put all lump writes on a queue and finish later */ +extern int readonly; /* only allowed to read the disk data */ +extern Stats stats; +extern u8int zeroscore[VtScoreSize]; diff --git a/src/cmd/venti/dcache.c b/src/cmd/venti/dcache.c new file mode 100644 index 00000000..64b56847 --- /dev/null +++ b/src/cmd/venti/dcache.c @@ -0,0 +1,372 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +typedef struct DCache DCache; + +enum +{ + HashLog = 9, + HashSize = 1<<HashLog, + HashMask = HashSize - 1, +}; + +struct DCache +{ + QLock lock; + Rendez full; + DBlock *free; /* list of available lumps */ + u32int now; /* ticks for usage timestamps */ + int size; /* max. size of any block; allocated to each block */ + DBlock **heads; /* hash table for finding address */ + int nheap; /* number of available victims */ + DBlock **heap; /* heap for locating victims */ + int nblocks; /* number of blocks allocated */ + DBlock *blocks; /* array of block descriptors */ + u8int *mem; /* memory for all block descriptors */ +}; + +static DCache dcache; + +static int downheap(int i, DBlock *b); +static int upheap(int i, DBlock *b); +static DBlock *bumpdblock(void); +static void delheap(DBlock *db); +static void fixheap(int i, DBlock *b); + +void +initdcache(u32int mem) +{ + DBlock *b, *last; + u32int nblocks, blocksize; + int i; + + if(mem < maxblocksize * 2) + sysfatal("need at least %d bytes for the disk cache", maxblocksize * 2); + if(maxblocksize == 0) + sysfatal("no max. block size given for disk cache"); + blocksize = maxblocksize; + nblocks = mem / blocksize; + if(0) + fprint(2, "initialize disk cache with %d blocks of %d bytes\n", nblocks, blocksize); + dcache.full.l = &dcache.lock; + dcache.nblocks = nblocks; + dcache.size = blocksize; + dcache.heads = MKNZ(DBlock*, HashSize); + dcache.heap = MKNZ(DBlock*, nblocks); + dcache.blocks = MKNZ(DBlock, nblocks); + dcache.mem = MKNZ(u8int, nblocks * blocksize); + + last = nil; + for(i = 0; i < nblocks; i++){ + b = &dcache.blocks[i]; + b->data = &dcache.mem[i * blocksize]; + b->heap = TWID32; + b->next = last; + last = b; + } + dcache.free = last; + dcache.nheap = 0; +} + +static u32int +pbhash(u64int addr) +{ + u32int h; + +#define hashit(c) ((((c) * 0x6b43a9b5) >> (32 - HashLog)) & HashMask) + h = (addr >> 32) ^ addr; + return hashit(h); +} + +DBlock* +getdblock(Part *part, u64int addr, int read) +{ + DBlock *b; + u32int h, size; + + size = part->blocksize; + if(size > dcache.size){ + seterr(EAdmin, "block size %d too big for cache", size); + return nil; + } + h = pbhash(addr); + + /* + * look for the block in the cache + */ +//checkdcache(); + qlock(&dcache.lock); +again: + for(b = dcache.heads[h]; b != nil; b = b->next){ + if(b->part == part && b->addr == addr){ + qlock(&stats.lock); + stats.pchit++; + qunlock(&stats.lock); + goto found; + } + } + qlock(&stats.lock); + stats.pcmiss++; + qunlock(&stats.lock); + + /* + * missed: locate the block with the oldest second to last use. + * remove it from the heap, and fix up the heap. + */ + b = bumpdblock(); + if(b == nil){ + logerr(EAdmin, "all disk cache blocks in use"); + rsleep(&dcache.full); + goto again; + } + + /* + * the new block has no last use, so assume it happens sometime in the middle +ZZZ this is not reasonable + */ + b->used = (b->used2 + dcache.now) / 2; + + /* + * rechain the block on the correct hash chain + */ + b->next = dcache.heads[h]; + dcache.heads[h] = b; + if(b->next != nil) + b->next->prev = b; + b->prev = nil; + + b->addr = addr; + b->part = part; + b->size = 0; + +found: + b->ref++; + b->used2 = b->used; + b->used = dcache.now++; + if(b->heap != TWID32) + fixheap(b->heap, b); + + qunlock(&dcache.lock); +//checkdcache(); + + qlock(&b->lock); + if(b->size != size){ + if(b->size < size){ + if(!read) + memset(&b->data[b->size], 0, size - b->size); + else{ + if(readpart(part, addr + b->size, &b->data[b->size], size - b->size) < 0){ + putdblock(b); + return nil; + } + qlock(&stats.lock); + stats.pcreads++; + stats.pcbreads += size - b->size; + qunlock(&stats.lock); + } + } + b->size = size; + } + + return b; +} + +void +putdblock(DBlock *b) +{ + if(b == nil) + return; + + qunlock(&b->lock); +//checkdcache(); + qlock(&dcache.lock); + if(--b->ref == 0){ + if(b->heap == TWID32) + upheap(dcache.nheap++, b); + rwakeup(&dcache.full); + } + + qunlock(&dcache.lock); +//checkdcache(); +} + +/* + * remove some block from use and update the free list and counters + */ +static DBlock* +bumpdblock(void) +{ + DBlock *b; + ulong h; + + b = dcache.free; + if(b != nil){ + dcache.free = b->next; + return b; + } + + /* + * remove blocks until we find one that is unused + * referenced blocks are left in the heap even though + * they can't be scavenged; this is simple a speed optimization + */ + for(;;){ + if(dcache.nheap == 0) + return nil; + b = dcache.heap[0]; + delheap(b); + if(!b->ref) + break; + } + + /* + * unchain the block + */ + if(b->prev == nil){ + h = pbhash(b->addr); + if(dcache.heads[h] != b) + sysfatal("bad hash chains in disk cache"); + dcache.heads[h] = b->next; + }else + b->prev->next = b->next; + if(b->next != nil) + b->next->prev = b->prev; + + return b; +} + +/* + * delete an arbitrary block from the heap + */ +static void +delheap(DBlock *db) +{ + fixheap(db->heap, dcache.heap[--dcache.nheap]); + db->heap = TWID32; +} + +/* + * push an element up or down to it's correct new location + */ +static void +fixheap(int i, DBlock *b) +{ + if(upheap(i, b) == i) + downheap(i, b); +} + +static int +upheap(int i, DBlock *b) +{ + DBlock *bb; + u32int now; + int p; + + now = dcache.now; + for(; i != 0; i = p){ + p = (i - 1) >> 1; + bb = dcache.heap[p]; + if(b->used2 - now >= bb->used2 - now) + break; + dcache.heap[i] = bb; + bb->heap = i; + } + + dcache.heap[i] = b; + b->heap = i; + return i; +} + +static int +downheap(int i, DBlock *b) +{ + DBlock *bb; + u32int now; + int k; + + now = dcache.now; + for(; ; i = k){ + k = (i << 1) + 1; + if(k >= dcache.nheap) + break; + if(k + 1 < dcache.nheap && dcache.heap[k]->used2 - now > dcache.heap[k + 1]->used2 - now) + k++; + bb = dcache.heap[k]; + if(b->used2 - now <= bb->used2 - now) + break; + dcache.heap[i] = bb; + bb->heap = i; + } + + dcache.heap[i] = b; + b->heap = i; + return i; +} + +static void +findblock(DBlock *bb) +{ + DBlock *b, *last; + int h; + + last = nil; + h = pbhash(bb->addr); + for(b = dcache.heads[h]; b != nil; b = b->next){ + if(last != b->prev) + sysfatal("bad prev link"); + if(b == bb) + return; + last = b; + } + sysfatal("block missing from hash table"); +} + +void +checkdcache(void) +{ + DBlock *b; + u32int size, now; + int i, k, refed, nfree; + + qlock(&dcache.lock); + size = dcache.size; + now = dcache.now; + for(i = 0; i < dcache.nheap; i++){ + if(dcache.heap[i]->heap != i) + sysfatal("dc: mis-heaped at %d: %d", i, dcache.heap[i]->heap); + if(i > 0 && dcache.heap[(i - 1) >> 1]->used2 - now > dcache.heap[i]->used2 - now) + sysfatal("dc: bad heap ordering"); + k = (i << 1) + 1; + if(k < dcache.nheap && dcache.heap[i]->used2 - now > dcache.heap[k]->used2 - now) + sysfatal("dc: bad heap ordering"); + k++; + if(k < dcache.nheap && dcache.heap[i]->used2 - now > dcache.heap[k]->used2 - now) + sysfatal("dc: bad heap ordering"); + } + + refed = 0; + for(i = 0; i < dcache.nblocks; i++){ + b = &dcache.blocks[i]; + if(b->data != &dcache.mem[i * size]) + sysfatal("dc: mis-blocked at %d", i); + if(b->ref && b->heap == TWID32) + refed++; + if(b->addr) + findblock(b); + if(b->heap != TWID32 + && dcache.heap[b->heap] != b) + sysfatal("dc: spurious heap value"); + } + + nfree = 0; + for(b = dcache.free; b != nil; b = b->next){ + if(b->addr != 0 || b->heap != TWID32) + sysfatal("dc: bad free list"); + nfree++; + } + + if(dcache.nheap + nfree + refed != dcache.nblocks) + sysfatal("dc: missing blocks: %d %d %d", dcache.nheap, refed, dcache.nblocks); + qunlock(&dcache.lock); +} diff --git a/src/cmd/venti/dump.c b/src/cmd/venti/dump.c new file mode 100644 index 00000000..12fcddda --- /dev/null +++ b/src/cmd/venti/dump.c @@ -0,0 +1,47 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +void +printindex(int fd, Index *ix) +{ + int i; + + fprint(fd, "index=%s version=%d blocksize=%d tabsize=%d\n", + ix->name, ix->version, ix->blocksize, ix->tabsize); + fprint(fd, "\tbuckets=%d div=%d\n", ix->buckets, ix->div); + for(i = 0; i < ix->nsects; i++) + fprint(fd, "\tsect=%s for buckets [%lld,%lld)\n", ix->smap[i].name, ix->smap[i].start, ix->smap[i].stop); + for(i = 0; i < ix->narenas; i++) + fprint(fd, "\tarena=%s at [%lld,%lld)\n", ix->amap[i].name, ix->amap[i].start, ix->amap[i].stop); +} + +void +printarenapart(int fd, ArenaPart *ap) +{ + int i; + + fprint(fd, "arena partition=%s\n\tversion=%d blocksize=%d arenas=%d\n\tsetbase=%d setsize=%d\n", + ap->part->name, ap->version, ap->blocksize, ap->narenas, ap->tabbase, ap->tabsize); + for(i = 0; i < ap->narenas; i++) + fprint(fd, "\tarena=%s at [%lld,%lld)\n", ap->map[i].name, ap->map[i].start, ap->map[i].stop); +} + +void +printarena(int fd, Arena *arena) +{ + fprint(fd, "arena='%s' [%lld,%lld)\n\tversion=%d created=%d modified=%d", + arena->name, arena->base, arena->base + arena->size + 2 * arena->blocksize, + arena->version, arena->ctime, arena->wtime); + if(arena->sealed) + fprint(2, " sealed\n"); + else + fprint(2, "\n"); + if(scorecmp(zeroscore, arena->score) != 0) + fprint(2, "\tscore=%V\n", arena->score); + + fprint(fd, "\tclumps=%,d compressed clumps=%,d data=%,lld compressed data=%,lld disk storage=%,lld\n", + arena->clumps, arena->cclumps, arena->uncsize, + arena->used - arena->clumps * ClumpSize, + arena->used + arena->clumps * ClumpInfoSize); +} diff --git a/src/cmd/venti/findscore.c b/src/cmd/venti/findscore.c new file mode 100644 index 00000000..6a69dc53 --- /dev/null +++ b/src/cmd/venti/findscore.c @@ -0,0 +1,132 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +enum +{ + ClumpChunks = 32*1024 +}; + +static int verbose; + +int +clumpinfoeq(ClumpInfo *c, ClumpInfo *d) +{ + return c->type == d->type + && c->size == d->size + && c->uncsize == d->uncsize + && scorecmp(c->score, d->score)==0; +} + +/* + * synchronize the clump info directory with + * with the clumps actually stored in the arena. + * the directory should be at least as up to date + * as the arena's trailer. + * + * checks/updates at most n clumps. + * + * returns 1 if ok, -1 if an error occured, 0 if blocks were updated + */ +int +findscore(Arena *arena, uchar *score) +{ + IEntry ie; + ClumpInfo *ci, *cis; + u64int a; + u32int clump; + int i, n, found; + +//ZZZ remove fprint? + if(arena->clumps) + fprint(2, "reading directory for arena=%s with %d entries\n", arena->name, arena->clumps); + + cis = MKN(ClumpInfo, ClumpChunks); + found = 0; + a = 0; + memset(&ie, 0, sizeof(IEntry)); + for(clump = 0; clump < arena->clumps; clump += n){ + n = ClumpChunks; + if(n > arena->clumps - clump) + n = arena->clumps - clump; + if(readclumpinfos(arena, clump, cis, n) != n){ + seterr(EOk, "arena directory read failed: %r"); + break; + } + + for(i = 0; i < n; i++){ + ci = &cis[i]; + if(scorecmp(score, ci->score)==0){ + fprint(2, "found at clump=%d with type=%d size=%d csize=%d position=%lld\n", + clump + i, ci->type, ci->uncsize, ci->size, a); + found++; + } + a += ci->size + ClumpSize; + } + } + free(cis); + return found; +} + +void +usage(void) +{ + fprint(2, "usage: findscore [-v] arenafile score\n"); + threadexitsall(0); +} + +void +threadmain(int argc, char *argv[]) +{ + ArenaPart *ap; + Part *part; + char *file; + u8int score[VtScoreSize]; + int i, found; + + fmtinstall('V', vtscorefmt); + statsinit(); + + ARGBEGIN{ + case 'v': + verbose++; + break; + default: + usage(); + break; + }ARGEND + + readonly = 1; + + if(argc != 2) + usage(); + + file = argv[0]; + if(strscore(argv[1], score) < 0) + sysfatal("bad score %s\n", argv[1]); + + part = initpart(file, 0); + if(part == nil) + sysfatal("can't open partition %s: %r", file); + + ap = initarenapart(part); + if(ap == nil) + sysfatal("can't initialize arena partition in %s: %r", file); + + if(verbose > 1){ + printarenapart(2, ap); + fprint(2, "\n"); + } + + initdcache(8 * MaxDiskBlock); + + found = 0; + for(i = 0; i < ap->narenas; i++) + found += findscore(ap->arenas[i], score); + + print("found %d occurances of %V\n", found, score); + + if(verbose > 1) + printstats(); + threadexitsall(0); +} diff --git a/src/cmd/venti/fmtarenas.c b/src/cmd/venti/fmtarenas.c new file mode 100644 index 00000000..8e52c56b --- /dev/null +++ b/src/cmd/venti/fmtarenas.c @@ -0,0 +1,114 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +void +usage(void) +{ + fprint(2, "usage: fmtarenas [-Z] [-b blocksize] [-a arenasize] name file\n"); + threadexitsall(0); +} + +void +threadmain(int argc, char *argv[]) +{ + ArenaPart *ap; + Part *part; + Arena *arena; + u64int addr, limit, asize, apsize; + char *file, *name, aname[ANameSize]; + int i, n, blocksize, tabsize, zero; + + fmtinstall('V', vtscorefmt); + + statsinit(); + + blocksize = 8 * 1024; + asize = 512 * 1024 *1024; + tabsize = 64 * 1024; /* BUG: should be determine from number of arenas */ + zero = 1; + ARGBEGIN{ + case 'a': + asize = unittoull(ARGF()); + if(asize == TWID64) + usage(); + break; + case 'b': + blocksize = unittoull(ARGF()); + if(blocksize == ~0) + usage(); + if(blocksize > MaxDiskBlock){ + fprint(2, "block size too large, max %d\n", MaxDiskBlock); + threadexitsall("usage"); + } + break; + case 'Z': + zero = 0; + break; + default: + usage(); + break; + }ARGEND + + if(argc != 2) + usage(); + + name = argv[0]; + file = argv[1]; + + if(nameok(name) < 0) + sysfatal("illegal name template %s", name); + + part = initpart(file, 1); + if(part == nil) + sysfatal("can't open partition %s: %r", file); + + if(zero) + zeropart(part, blocksize); + + ap = newarenapart(part, blocksize, tabsize); + if(ap == nil) + sysfatal("can't initialize arena: %r"); + + apsize = ap->size - ap->arenabase; + n = apsize / asize; + if(apsize - (n * asize) >= MinArenaSize) + n++; + + fprint(2, "configuring %s with arenas=%d for a total storage of bytes=%lld and directory bytes=%d\n", + file, n, apsize, ap->tabsize); + + ap->narenas = n; + ap->map = MKNZ(AMap, n); + ap->arenas = MKNZ(Arena*, n); + + addr = ap->arenabase; + for(i = 0; i < n; i++){ + limit = addr + asize; + if(limit >= ap->size || ap->size - limit < MinArenaSize){ + limit = ap->size; + if(limit - addr < MinArenaSize) + sysfatal("bad arena set math: runt arena at %lld,%lld %lld\n", addr, limit, ap->size); + } + + snprint(aname, ANameSize, "%s%d", name, i); + + fprint(2, "adding arena %s at [%lld,%lld)\n", aname, addr, limit); + + arena = newarena(part, aname, addr, limit - addr, blocksize); + if(!arena) + fprint(2, "can't make new arena %s: %r", aname); + freearena(arena); + + ap->map[i].start = addr; + ap->map[i].stop = limit; + namecp(ap->map[i].name, aname); + + addr = limit; + } + + if(wbarenapart(ap) < 0) + fprint(2, "can't write back arena partition header for %s: %r\n", file); + + threadexitsall(0); +} diff --git a/src/cmd/venti/fmtindex.c b/src/cmd/venti/fmtindex.c new file mode 100644 index 00000000..19daa8e1 --- /dev/null +++ b/src/cmd/venti/fmtindex.c @@ -0,0 +1,117 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +void +usage(void) +{ + fprint(2, "usage: fmtindex config\n"); + threadexitsall(0); +} + +void +threadmain(int argc, char *argv[]) +{ + Config conf; + Index *ix; + ArenaPart *ap; + Arena **arenas; + AMap *amap; + u64int addr; + char *file; + u32int i, j, n, narenas; + int add; + + fmtinstall('V', vtscorefmt); + statsinit(); + + add = 0; + ARGBEGIN{ + case 'a': + add = 1; + break; + default: + usage(); + break; + }ARGEND + + if(argc != 1) + usage(); + + file = argv[0]; + + if(runconfig(file, &conf) < 0) + sysfatal("can't initialize config %s: %r", file); + if(conf.index == nil) + sysfatal("no index specified in %s", file); + if(nameok(conf.index) < 0) + sysfatal("illegal index name %s", conf.index); + + narenas = 0; + for(i = 0; i < conf.naparts; i++){ + ap = conf.aparts[i]; + narenas += ap->narenas; + } + + if(add){ + ix = initindex(conf.index, conf.sects, conf.nsects); + if(ix == nil) + sysfatal("can't initialize index %s: %r", conf.index); + }else{ + ix = newindex(conf.index, conf.sects, conf.nsects); + if(ix == nil) + sysfatal("can't create new index %s: %r", conf.index); + + n = 0; + for(i = 0; i < ix->nsects; i++) + n += ix->sects[i]->blocks; + + if(ix->div < 100) + sysfatal("index divisor too coarse: use bigger block size"); + + fprint(2, "using %ud buckets of %ud; div=%d\n", ix->buckets, n, ix->div); + } + amap = MKNZ(AMap, narenas); + arenas = MKNZ(Arena*, narenas); + + addr = IndexBase; + n = 0; + for(i = 0; i < conf.naparts; i++){ + ap = conf.aparts[i]; + for(j = 0; j < ap->narenas; j++){ + if(n >= narenas) + sysfatal("too few slots in index's arena set"); + + arenas[n] = ap->arenas[j]; + if(n < ix->narenas){ + if(arenas[n] != ix->arenas[n]) + sysfatal("mismatched arenas %s and %s at slot %d\n", + arenas[n]->name, ix->arenas[n]->name, n); + amap[n] = ix->amap[n]; + if(amap[n].start != addr) + sysfatal("mis-located arena %s in index %s\n", arenas[n]->name, ix->name); + addr = amap[n].stop; + }else{ + amap[n].start = addr; + addr += ap->arenas[j]->size; + amap[n].stop = addr; + namecp(amap[n].name, ap->arenas[j]->name); + fprint(2, "add arena %s at [%lld,%lld)\n", + amap[n].name, amap[n].start, amap[n].stop); + } + + n++; + } + } + fprint(2, "configured index=%s with arenas=%d and storage=%lld\n", + ix->name, n, addr - IndexBase); + + ix->amap = amap; + ix->arenas = arenas; + ix->narenas = narenas; + + if(wbindex(ix) < 0) + fprint(2, "can't write back arena partition header for %s: %r\n", file); + + threadexitsall(0); +} diff --git a/src/cmd/venti/fmtisect.c b/src/cmd/venti/fmtisect.c new file mode 100644 index 00000000..77c92967 --- /dev/null +++ b/src/cmd/venti/fmtisect.c @@ -0,0 +1,69 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +void +usage(void) +{ + fprint(2, "usage: fmtisect [-Z] [-b blocksize] name file\n"); + threadexitsall(0); +} + +void +threadmain(int argc, char *argv[]) +{ + ISect *is; + Part *part; + char *file, *name; + int blocksize, setsize, zero; + + fmtinstall('V', vtscorefmt); + statsinit(); + + blocksize = 8 * 1024; + setsize = 64 * 1024; + zero = 1; + ARGBEGIN{ + case 'b': + blocksize = unittoull(ARGF()); + if(blocksize == ~0) + usage(); + if(blocksize > MaxDiskBlock){ + fprint(2, "block size too large, max %d\n", MaxDiskBlock); + threadexitsall("usage"); + } + break; + case 'Z': + zero = 0; + break; + default: + usage(); + break; + }ARGEND + + if(argc != 2) + usage(); + + name = argv[0]; + file = argv[1]; + + if(nameok(name) < 0) + sysfatal("illegal name %s", name); + + part = initpart(file, 0); + if(part == nil) + sysfatal("can't open partition %s: %r", file); + + if(zero) + zeropart(part, blocksize); + + fprint(2, "configuring index section %s with space for index config bytes=%d\n", name, setsize); + is = newisect(part, name, blocksize, setsize); + if(is == nil) + sysfatal("can't initialize new index: %r"); + + if(wbisect(is) < 0) + fprint(2, "can't write back index section header for %s: %r\n", file); + + threadexitsall(0); +} diff --git a/src/cmd/venti/fns.h b/src/cmd/venti/fns.h new file mode 100644 index 00000000..4503d539 --- /dev/null +++ b/src/cmd/venti/fns.h @@ -0,0 +1,161 @@ +/* + * sorted by 4,/^$/|sort -bd +1 + */ +int addarena(Arena *name); +ZBlock *alloczblock(u32int size, int zeroed); +Arena *amapitoa(Index *index, u64int a, u64int *aa); +u64int arenadirsize(Arena *arena, u32int clumps); +void arenaupdate(Arena *arena, u32int size, u8int *score); +void backsumarena(Arena *arena); +u32int buildbucket(Index *ix, IEStream *ies, IBucket *ib); +void checkdcache(void); +void checklumpcache(void); +int clumpinfoeq(ClumpInfo *c, ClumpInfo *d); +int clumpinfoeq(ClumpInfo *c, ClumpInfo *d); +u32int clumpmagic(Arena *arena, u64int aa); +int delarena(Arena *arena); +void *emalloc(ulong); +void *erealloc(void *, ulong); +char *estrdup(char*); +void *ezmalloc(ulong); +Arena *findarena(char *name); +ISect *findisect(Index *ix, u32int buck); +int flushciblocks(Arena *arena); +void fmtzbinit(Fmt *f, ZBlock *b); +void freearena(Arena *arena); +void freearenapart(ArenaPart *ap, int freearenas); +void freeiestream(IEStream *ies); +void freeifile(IFile *f); +void freeisect(ISect *is); +void freeindex(Index *index); +void freepart(Part *part); +void freezblock(ZBlock *b); +DBlock *getdblock(Part *part, u64int addr, int read); +u32int hashbits(u8int *score, int nbits); +int httpdinit(char *address); +int iaddrcmp(IAddr *ia1, IAddr *ia2); +int ientrycmp(const void *vie1, const void *vie2); +char *ifileline(IFile *f); +int ifilename(IFile *f, char *dst); +int ifileu32int(IFile *f, u32int *r); +int indexsect(Index *ix, u8int *score); +Arena *initarena(Part *part, u64int base, u64int size, u32int blocksize); +ArenaPart *initarenapart(Part *part); +int initarenasum(void); +void initdcache(u32int mem); +void initicache(int bits, int depth); +IEStream *initiestream(Part *part, u64int off, u64int clumps, u32int size); +ISect *initisect(Part *part); +Index *initindex(char *name, ISect **sects, int n); +void initlumpcache(u32int size, u32int nblocks); +int initlumpqueues(int nq); +Part* initpart(char *name, int writable); +int initventi(char *config); +void insertlump(Lump *lump, Packet *p); +int insertscore(u8int *score, IAddr *ia, int write); +ZBlock *loadclump(Arena *arena, u64int aa, int blocks, Clump *cl, u8int *score, int verify); +int loadientry(Index *index, u8int *score, int type, IEntry *ie); +void logerr(int severity, char *fmt, ...); +Lump *lookuplump(u8int *score, int type); +int lookupscore(u8int *score, int type, IAddr *ia, int *rac); +int maparenas(AMap *am, Arena **arenas, int n, char *what); +int namecmp(char *s, char *t); +void namecp(char *dst, char *src); +int nameok(char *name); +Arena *newarena(Part *part, char *name, u64int base, u64int size, u32int blocksize); +ArenaPart *newarenapart(Part *part, u32int blocksize, u32int tabsize); +ISect *newisect(Part *part, char *name, u32int blocksize, u32int tabsize); +Index *newindex(char *name, ISect **sects, int n); +u32int now(void); +int okamap(AMap *am, int n, u64int start, u64int stop, char *what); +int outputamap(Fmt *f, AMap *am, int n); +int outputindex(Fmt *f, Index *ix); +int packarena(Arena *arena, u8int *buf); +int packarenahead(ArenaHead *head, u8int *buf); +int packarenapart(ArenaPart *as, u8int *buf); +int packclump(Clump *c, u8int *buf); +void packclumpinfo(ClumpInfo *ci, u8int *buf); +void packibucket(IBucket *b, u8int *buf); +void packientry(IEntry *i, u8int *buf); +int packisect(ISect *is, u8int *buf); +void packmagic(u32int magic, u8int *buf); +ZBlock *packet2zblock(Packet *p, u32int size); +int parseamap(IFile *f, AMapN *amn); +int parseindex(IFile *f, Index *ix); +void partblocksize(Part *part, u32int blocksize); +int partifile(IFile *f, Part *part, u64int start, u32int size); +void printarenapart(int fd, ArenaPart *ap); +void printarena(int fd, Arena *arena); +void printindex(int fd, Index *ix); +void printstats(void); +void putdblock(DBlock *b); +void putlump(Lump *b); +void queueflush(void); +int queuewrite(Lump *b, Packet *p, int creator); +u32int readarena(Arena *arena, u64int aa, u8int *buf, long n); +int readarenamap(AMapN *amn, Part *part, u64int base, u32int size); +int readclumpinfo(Arena *arena, int clump, ClumpInfo *ci); +int readclumpinfos(Arena *arena, int clump, ClumpInfo *cis, int n); +ZBlock *readfile(char *name); +int readifile(IFile *f, char *name); +Packet *readlump(u8int *score, int type, u32int size); +int readpart(Part *part, u64int addr, u8int *buf, u32int n); +int runconfig(char *config, Config*); +int scorecmp(u8int *, u8int *); +void scoremem(u8int *score, u8int *buf, int size); +void seterr(int severity, char *fmt, ...); +u64int sortrawientries(Index *ix, Part *tmp, u64int *tmpoff); +void statsinit(void); +int storeclump(Index *index, ZBlock *b, u8int *score, int type, u32int creator, IAddr *ia); +int storeientry(Index *index, IEntry *m); +int strscore(char *s, u8int *score); +int stru32int(char *s, u32int *r); +int stru64int(char *s, u64int *r); +void sumarena(Arena *arena); +int syncarena(Arena *arena, u32int n, int zok, int fix); +int syncarenaindex(Index *ix, Arena *arena, u32int clump, u64int a, int fix); +int syncindex(Index *ix, int fix); +int u64log2(u64int v); +u64int unittoull(char *s); +int unpackarena(Arena *arena, u8int *buf); +int unpackarenahead(ArenaHead *head, u8int *buf); +int unpackarenapart(ArenaPart *as, u8int *buf); +int unpackclump(Clump *c, u8int *buf); +void unpackclumpinfo(ClumpInfo *ci, u8int *buf); +void unpackibucket(IBucket *b, u8int *buf); +void unpackientry(IEntry *i, u8int *buf); +int unpackisect(ISect *is, u8int *buf); +u32int unpackmagic(u8int *buf); +int vtproc(void(*)(void*), void*); +int vttypevalid(int type); +int wbarena(Arena *arena); +int wbarenahead(Arena *arena); +int wbarenamap(AMap *am, int n, Part *part, u64int base, u64int size); +int wbarenapart(ArenaPart *ap); +int wbisect(ISect *is); +int wbindex(Index *ix); +int whackblock(u8int *dst, u8int *src, int ssize); +u64int writeaclump(Arena *a, Clump *c, u8int *clbuf); +u32int writearena(Arena *arena, u64int aa, u8int *clbuf, u32int n); +int writeclumpinfo(Arena *arean, int clump, ClumpInfo *ci); +u64int writeiclump(Index *ix, Clump *c, u8int *clbuf); +int writelump(Packet *p, u8int *score, int type, u32int creator); +int writepart(Part *part, u64int addr, u8int *buf, u32int n); +int writeqlump(Lump *u, Packet *p, int creator); +Packet *zblock2packet(ZBlock *zb, u32int size); +void zeropart(Part *part, int blocksize); + +/* +#pragma varargck argpos sysfatal 1 +#pragma varargck argpos logerr 2 +#pragma varargck argpos SetErr 2 +*/ + +#define scorecmp(h1,h2) memcmp((h1),(h2),VtScoreSize) +#define scorecp(h1,h2) memmove((h1),(h2),VtScoreSize) + +#define MK(t) ((t*)emalloc(sizeof(t))) +#define MKZ(t) ((t*)ezmalloc(sizeof(t))) +#define MKN(t,n) ((t*)emalloc((n)*sizeof(t))) +#define MKNZ(t,n) ((t*)ezmalloc((n)*sizeof(t))) +#define MKNA(t,at,n) ((t*)emalloc(sizeof(t) + (n)*sizeof(at))) diff --git a/src/cmd/venti/httpd.c b/src/cmd/venti/httpd.c new file mode 100644 index 00000000..cd207cd1 --- /dev/null +++ b/src/cmd/venti/httpd.c @@ -0,0 +1,441 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" +#include "httpd.h" +#include "xml.h" + +typedef struct HttpObj HttpObj; + +enum +{ + ObjNameSize = 64, + MaxObjs = 16 +}; + +struct HttpObj +{ + char name[ObjNameSize]; + int (*f)(HConnect*); +}; + +static HttpObj objs[MaxObjs]; + +static void listenproc(void*); +static int estats(HConnect *c); +static int dindex(HConnect *c); +static int xindex(HConnect *c); +static int sindex(HConnect *c); +static int notfound(HConnect *c); +static int httpdobj(char *name, int (*f)(HConnect*)); + +int +httpdinit(char *address) +{ + fmtinstall('D', hdatefmt); + fmtinstall('H', httpfmt); + fmtinstall('U', hurlfmt); + + if(address == nil) + address = "tcp!*!http"; + + httpdobj("/stats", estats); + httpdobj("/index", dindex); + httpdobj("/storage", sindex); + httpdobj("/xindex", xindex); + + if(vtproc(listenproc, address) < 0) + return -1; + return 0; +} + +static int +httpdobj(char *name, int (*f)(HConnect*)) +{ + int i; + + if(name == nil || strlen(name) >= ObjNameSize) + return -1; + for(i = 0; i < MaxObjs; i++){ + if(objs[i].name[0] == '\0'){ + strcpy(objs[i].name, name); + objs[i].f = f; + return 0; + } + if(strcmp(objs[i].name, name) == 0) + return -1; + } + return -1; +} + +static HConnect* +mkconnect(void) +{ + HConnect *c; + + c = mallocz(sizeof(HConnect), 1); + if(c == nil) + sysfatal("out of memory"); + c->replog = nil; + c->hpos = c->header; + c->hstop = c->header; + return c; +} + +void httpproc(void*); + +static void +listenproc(void *vaddress) +{ + HConnect *c; + char *address, ndir[NETPATHLEN], dir[NETPATHLEN]; + int ctl, nctl, data; + +//sleep(1000); /* let strace find us */ + + address = vaddress; + ctl = announce(address, dir); + if(ctl < 0){ + fprint(2, "venti: httpd can't announce on %s: %r\n", address); + return; + } + +print("announce ctl %d dir %s\n", ctl, dir); + for(;;){ + /* + * wait for a call (or an error) + */ + nctl = listen(dir, ndir); +print("httpd listen %d %s...\n", nctl, ndir); + if(nctl < 0){ + fprint(2, "venti: httpd can't listen on %s: %r\n", address); + return; + } + + data = accept(ctl, ndir); +print("httpd accept %d...\n", data); + if(data < 0){ + fprint(2, "venti: httpd accept: %r\n"); + close(nctl); + continue; + } +print("httpd close nctl %d\n", nctl); + close(nctl); + c = mkconnect(); + hinit(&c->hin, data, Hread); + hinit(&c->hout, data, Hwrite); + vtproc(httpproc, c); + } +} + +void +httpproc(void *v) +{ + HConnect *c; + int ok, t, i; + +//sleep(1000); /* let strace find us */ + c = v; + + for(t = 15*60*1000; ; t = 15*1000){ + if(hparsereq(c, t) <= 0) + break; + + ok = -1; + for(i = 0; i < MaxObjs && objs[i].name[0]; i++){ + if(strcmp(c->req.uri, objs[i].name) == 0){ + ok = (*objs[i].f)(c); + break; + } + } + if(i == MaxObjs) + ok = notfound(c); + if(c->head.closeit) + ok = -1; + hreqcleanup(c); + + if(ok < 0) + break; + } +print("httpd cleanup %d\n", c->hin.fd); + hreqcleanup(c); +print("close %d\n", c->hin.fd); + close(c->hin.fd); + free(c); +} + +static int +percent(long v, long total) +{ + if(total == 0) + total = 1; + if(v < 1000*1000) + return (v * 100) / total; + total /= 100; + if(total == 0) + total = 1; + return v / total; +} + +static int +preq(HConnect *c) +{ + if(hparseheaders(c, 15*60*1000) < 0) + return -1; + if(strcmp(c->req.meth, "GET") != 0 + && strcmp(c->req.meth, "HEAD") != 0) + return hunallowed(c, "GET, HEAD"); + if(c->head.expectother || c->head.expectcont) + return hfail(c, HExpectFail, nil); + return 0; +} + +static int +preqtext(HConnect *c) +{ + Hio *hout; + int r; + + r = preq(c); + if(r <= 0) + return r; + + hout = &c->hout; + if(c->req.vermaj){ + hokheaders(c); + hprint(hout, "Content-type: text/plain\r\n"); + if(http11(c)) + hprint(hout, "Transfer-Encoding: chunked\r\n"); + hprint(hout, "\r\n"); + } + + if(http11(c)) + hxferenc(hout, 1); + else + c->head.closeit = 1; + return 0; +} + +static int +notfound(HConnect *c) +{ + int r; + + r = preq(c); + if(r <= 0) + return r; + return hfail(c, HNotFound, c->req.uri); +} + +static int +estats(HConnect *c) +{ + Hio *hout; + int r; + + r = preqtext(c); + if(r <= 0) + return r; + + hout = &c->hout; + hprint(hout, "lump writes=%,ld\n", stats.lumpwrites); + hprint(hout, "lump reads=%,ld\n", stats.lumpreads); + hprint(hout, "lump cache read hits=%,ld\n", stats.lumphit); + hprint(hout, "lump cache read misses=%,ld\n", stats.lumpmiss); + + hprint(hout, "clump disk writes=%,ld\n", stats.clumpwrites); + hprint(hout, "clump disk bytes written=%,lld\n", stats.clumpbwrites); + hprint(hout, "clump disk bytes compressed=%,lld\n", stats.clumpbcomp); + hprint(hout, "clump disk reads=%,ld\n", stats.clumpreads); + hprint(hout, "clump disk bytes read=%,lld\n", stats.clumpbreads); + hprint(hout, "clump disk bytes uncompressed=%,lld\n", stats.clumpbuncomp); + + hprint(hout, "clump directory disk writes=%,ld\n", stats.ciwrites); + hprint(hout, "clump directory disk reads=%,ld\n", stats.cireads); + + hprint(hout, "index disk writes=%,ld\n", stats.indexwrites); + hprint(hout, "index disk reads=%,ld\n", stats.indexreads); + hprint(hout, "index disk reads for modify=%,ld\n", stats.indexwreads); + hprint(hout, "index disk reads for allocation=%,ld\n", stats.indexareads); + + hprint(hout, "index cache lookups=%,ld\n", stats.iclookups); + hprint(hout, "index cache hits=%,ld %d%%\n", stats.ichits, + percent(stats.ichits, stats.iclookups)); + hprint(hout, "index cache fills=%,ld %d%%\n", stats.icfills, + percent(stats.icfills, stats.iclookups)); + hprint(hout, "index cache inserts=%,ld\n", stats.icinserts); + + hprint(hout, "disk cache hits=%,ld\n", stats.pchit); + hprint(hout, "disk cache misses=%,ld\n", stats.pcmiss); + hprint(hout, "disk cache reads=%,ld\n", stats.pcreads); + hprint(hout, "disk cache bytes read=%,lld\n", stats.pcbreads); + + hprint(hout, "disk writes=%,ld\n", stats.diskwrites); + hprint(hout, "disk bytes written=%,lld\n", stats.diskbwrites); + hprint(hout, "disk reads=%,ld\n", stats.diskreads); + hprint(hout, "disk bytes read=%,lld\n", stats.diskbreads); + + hflush(hout); + return 0; +} + +static int +sindex(HConnect *c) +{ + Hio *hout; + Index *ix; + Arena *arena; + vlong clumps, cclumps, uncsize, used, size; + int i, r, active; + + r = preqtext(c); + if(r <= 0) + return r; + hout = &c->hout; + + ix = mainindex; + + hprint(hout, "index=%s\n", ix->name); + + active = 0; + clumps = 0; + cclumps = 0; + uncsize = 0; + used = 0; + size = 0; + for(i = 0; i < ix->narenas; i++){ + arena = ix->arenas[i]; + if(arena != nil && arena->clumps != 0){ + active++; + clumps += arena->clumps; + cclumps += arena->cclumps; + uncsize += arena->uncsize; + used += arena->used; + } + size += arena->size; + } + hprint(hout, "total arenas=%d active=%d\n", ix->narenas, active); + hprint(hout, "total space=%lld used=%lld\n", size, used + clumps * ClumpInfoSize); + hprint(hout, "clumps=%lld compressed clumps=%lld data=%lld compressed data=%lld\n", + clumps, cclumps, uncsize, used - clumps * ClumpSize); + hflush(hout); + return 0; +} + +static void +darena(Hio *hout, Arena *arena) +{ + hprint(hout, "arena='%s' on %s at [%lld,%lld)\n\tversion=%d created=%d modified=%d", + arena->name, arena->part->name, arena->base, arena->base + arena->size + 2 * arena->blocksize, + arena->version, arena->ctime, arena->wtime); + if(arena->sealed) + hprint(hout, " sealed\n"); + else + hprint(hout, "\n"); + if(scorecmp(zeroscore, arena->score) != 0) + hprint(hout, "\tscore=%V\n", arena->score); + + hprint(hout, "\tclumps=%d compressed clumps=%d data=%lld compressed data=%lld disk storage=%lld\n", + arena->clumps, arena->cclumps, arena->uncsize, + arena->used - arena->clumps * ClumpSize, + arena->used + arena->clumps * ClumpInfoSize); +} + +static int +dindex(HConnect *c) +{ + Hio *hout; + Index *ix; + int i, r; + + r = preqtext(c); + if(r <= 0) + return r; + hout = &c->hout; + + + ix = mainindex; + hprint(hout, "index=%s version=%d blocksize=%d tabsize=%d\n", + ix->name, ix->version, ix->blocksize, ix->tabsize); + hprint(hout, "\tbuckets=%d div=%d\n", ix->buckets, ix->div); + for(i = 0; i < ix->nsects; i++) + hprint(hout, "\tsect=%s for buckets [%lld,%lld)\n", ix->smap[i].name, ix->smap[i].start, ix->smap[i].stop); + for(i = 0; i < ix->narenas; i++){ + if(ix->arenas[i] != nil && ix->arenas[i]->clumps != 0){ + hprint(hout, "arena=%s at index [%lld,%lld)\n\t", ix->amap[i].name, ix->amap[i].start, ix->amap[i].stop); + darena(hout, ix->arenas[i]); + } + } + hflush(hout); + return 0; +} + +static int +xindex(HConnect *c) +{ + Hio *hout; + int r; + + r = preq(c); + if(r <= 0) + return r; + + hout = &c->hout; + if(c->req.vermaj){ + hokheaders(c); + hprint(hout, "Content-type: text/xml\r\n"); + if(http11(c)) + hprint(hout, "Transfer-Encoding: chunked\r\n"); + hprint(hout, "\r\n"); + } + + if(http11(c)) + hxferenc(hout, 1); + else + c->head.closeit = 1; + xmlindex(hout, mainindex, "index", 0); + hflush(hout); + return 0; +} + +void +xmlindent(Hio *hout, int indent) +{ + int i; + + for(i = 0; i < indent; i++) + hputc(hout, '\t'); +} + +void +xmlaname(Hio *hout, char *v, char *tag) +{ + hprint(hout, " %s=\"%s\"", tag, v); +} + +void +xmlscore(Hio *hout, u8int *v, char *tag) +{ + if(scorecmp(zeroscore, v) == 0) + return; + hprint(hout, " %s=\"%V\"", tag, v); +} + +void +xmlsealed(Hio *hout, int v, char *tag) +{ + if(!v) + return; + hprint(hout, " %s=\"yes\"", tag); +} + +void +xmlu32int(Hio *hout, u32int v, char *tag) +{ + hprint(hout, " %s=\"%ud\"", tag, v); +} + +void +xmlu64int(Hio *hout, u64int v, char *tag) +{ + hprint(hout, " %s=\"%llud\"", tag, v); +} diff --git a/src/cmd/venti/icache.c b/src/cmd/venti/icache.c new file mode 100644 index 00000000..04f1134e --- /dev/null +++ b/src/cmd/venti/icache.c @@ -0,0 +1,199 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +struct ICache +{ + QLock lock; /* locks hash table & all associated data */ + IEntry **heads; /* heads of all the hash chains */ + int bits; /* bits to use for indexing heads */ + u32int size; /* number of heads; == 1 << bits, should be < entries */ + IEntry *base; /* all allocated hash table entries */ + u32int entries; /* elements in base */ + u32int unused; /* index of first unused element in base */ + u32int stolen; /* last head from which an element was stolen */ +}; + +static ICache icache; + +static IEntry *icachealloc(IAddr *ia, u8int *score); + +/* + * bits is the number of bits in the icache hash table + * depth is the average depth + * memory usage is about (1<<bits) * depth * sizeof(IEntry) + (1<<bits) * sizeof(IEntry*) + */ +void +initicache(int bits, int depth) +{ + icache.bits = bits; + icache.size = 1 << bits; + icache.entries = depth * icache.size; + icache.base = MKNZ(IEntry, icache.entries); + icache.heads = MKNZ(IEntry*, icache.size); +} + +u32int +hashbits(u8int *sc, int bits) +{ + u32int v; + + v = (sc[0] << 24) | (sc[1] << 16) | (sc[2] << 8) | sc[3]; + if(bits < 32) + v >>= (32 - bits); + return v; +} + +/* +ZZZ need to think about evicting the correct IEntry, +and writing back the wtime. + * look up data score in the index cache + * if this fails, pull it in from the disk index table, if it exists. + * + * must be called with the lump for this score locked + */ +int +lookupscore(u8int *score, int type, IAddr *ia, int *rac) +{ + IEntry d, *ie, *last; + u32int h; + +fprint(2, "lookupscore %V %d\n", score, type); + qlock(&stats.lock); + stats.iclookups++; + qunlock(&stats.lock); + + qlock(&icache.lock); + h = hashbits(score, icache.bits); + last = nil; + for(ie = icache.heads[h]; ie != nil; ie = ie->next){ + if(ie->ia.type == type && scorecmp(ie->score, score)==0){ + if(last != nil) + last->next = ie->next; + else + icache.heads[h] = ie->next; + qlock(&stats.lock); + stats.ichits++; + qunlock(&stats.lock); + ie->rac = 1; + goto found; + } + last = ie; + } + + qunlock(&icache.lock); + + if(loadientry(mainindex, score, type, &d) < 0) + return -1; + + /* + * no one else can load an entry for this score, + * since we have the overall score lock. + */ + qlock(&stats.lock); + stats.icfills++; + qunlock(&stats.lock); + + qlock(&icache.lock); + + ie = icachealloc(&d.ia, score); + +found: + ie->next = icache.heads[h]; + icache.heads[h] = ie; + + *ia = ie->ia; + *rac = ie->rac; + + qunlock(&icache.lock); + + return 0; +} + +/* + * insert a new element in the hash table. + */ +int +insertscore(u8int *score, IAddr *ia, int write) +{ + IEntry *ie, se; + u32int h; + + qlock(&stats.lock); + stats.icinserts++; + qunlock(&stats.lock); + + qlock(&icache.lock); + h = hashbits(score, icache.bits); + + ie = icachealloc(ia, score); + + ie->next = icache.heads[h]; + icache.heads[h] = ie; + + se = *ie; + + qunlock(&icache.lock); + + if(!write) + return 0; + + return storeientry(mainindex, &se); +} + +/* + * allocate a index cache entry which hasn't been used in a while. + * must be called with icache.lock locked + * if the score is already in the table, update the entry. + */ +static IEntry * +icachealloc(IAddr *ia, u8int *score) +{ + IEntry *ie, *last, *next; + u32int h; + + h = hashbits(score, icache.bits); + last = nil; + for(ie = icache.heads[h]; ie != nil; ie = ie->next){ + if(ie->ia.type == ia->type && scorecmp(ie->score, score)==9){ + if(last != nil) + last->next = ie->next; + else + icache.heads[h] = ie->next; + ie->rac = 1; + return ie; + } + last = ie; + } + + h = icache.unused; + if(h < icache.entries){ + ie = &icache.base[h++]; + icache.unused = h; + goto Found; + } + + h = icache.stolen; + for(;;){ + h++; + if(h >= icache.size) + h = 0; + ie = icache.heads[h]; + if(ie != nil){ + last = nil; + for(; next = ie->next; ie = next) + last = ie; + if(last != nil) + last->next = nil; + else + icache.heads[h] = nil; + icache.stolen = h; + goto Found; + } + } +Found: + ie->ia = *ia; + scorecp(ie->score, score); + ie->rac = 0; + return ie; +} diff --git a/src/cmd/venti/ifile.c b/src/cmd/venti/ifile.c new file mode 100644 index 00000000..811e487c --- /dev/null +++ b/src/cmd/venti/ifile.c @@ -0,0 +1,93 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +int +readifile(IFile *f, char *name) +{ + ZBlock *b; + + b = readfile(name); + if(b == nil) + return -1; + f->name = name; + f->b = b; + f->pos = 0; + return 0; +} + +void +freeifile(IFile *f) +{ + freezblock(f->b); + f->b = nil; + f->pos = 0; +} + +int +partifile(IFile *f, Part *part, u64int start, u32int size) +{ + ZBlock *b; + + b = alloczblock(size, 0); + if(b == nil) + return -1; + if(readpart(part, start, b->data, size) < 0){ + seterr(EAdmin, "can't read %s: %r", part->name); + freezblock(b); + return -1; + } + f->name = part->name; + f->b = b; + f->pos = 0; + return 0; +} + +/* + * return the next non-blank input line, + * stripped of leading white space and with # comments eliminated + */ +char* +ifileline(IFile *f) +{ + char *s, *e, *t; + int c; + + for(;;){ + s = (char*)&f->b->data[f->pos]; + e = memchr(s, '\n', f->b->len - f->pos); + if(e == nil) + return nil; + *e++ = '\0'; + f->pos = e - (char*)f->b->data; + t = strchr(s, '#'); + if(t != nil) + *t = '\0'; + for(; c = *s; s++) + if(c != ' ' && c != '\t' && c != '\r') + return s; + } +} + +int +ifilename(IFile *f, char *dst) +{ + char *s; + + s = ifileline(f); + if(s == nil || strlen(s) >= ANameSize) + return -1; + namecp(dst, s); + return 0; +} + +int +ifileu32int(IFile *f, u32int *r) +{ + char *s; + + s = ifileline(f); + if(s == nil) + return -1; + return stru32int(s, r); +} diff --git a/src/cmd/venti/index.c b/src/cmd/venti/index.c new file mode 100644 index 00000000..9d3ad8c4 --- /dev/null +++ b/src/cmd/venti/index.c @@ -0,0 +1,790 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +static int bucklook(u8int *score, int type, u8int *data, int n); +static int writebucket(ISect *is, u32int buck, IBucket *ib, DBlock *b); +static int okibucket(IBucket *ib, ISect *is); +static ISect *initisect1(ISect *is); + +//static QLock indexlock; //ZZZ + +static char IndexMagic[] = "venti index configuration"; + +Index* +initindex(char *name, ISect **sects, int n) +{ + IFile f; + Index *ix; + ISect *is; + u32int last, blocksize, tabsize; + int i; + + if(n <= 0){ + seterr(EOk, "no index sections to initialize index"); + return nil; + } + ix = MKZ(Index); + if(ix == nil){ + seterr(EOk, "can't initialize index: out of memory"); + freeindex(ix); + return nil; + } + + tabsize = sects[0]->tabsize; + if(partifile(&f, sects[0]->part, sects[0]->tabbase, tabsize) < 0) + return nil; + if(parseindex(&f, ix) < 0){ + freeifile(&f); + freeindex(ix); + return nil; + } + freeifile(&f); + if(namecmp(ix->name, name) != 0){ + seterr(ECorrupt, "mismatched index name: found %s expected %s", ix->name, name); + return nil; + } + if(ix->nsects != n){ + seterr(ECorrupt, "mismatched number index sections: found %d expected %d", n, ix->nsects); + freeindex(ix); + return nil; + } + ix->sects = sects; + last = 0; + blocksize = ix->blocksize; + for(i = 0; i < ix->nsects; i++){ + is = sects[i]; + if(namecmp(ix->name, is->index) != 0 + || is->blocksize != blocksize + || is->tabsize != tabsize + || namecmp(is->name, ix->smap[i].name) != 0 + || is->start != ix->smap[i].start + || is->stop != ix->smap[i].stop + || last != is->start + || is->start > is->stop){ + seterr(ECorrupt, "inconsistent index sections in %s", ix->name); + freeindex(ix); + return nil; + } + last = is->stop; + } + ix->tabsize = tabsize; + ix->buckets = last; + + ix->div = (((u64int)1 << 32) + last - 1) / last; + last = (((u64int)1 << 32) - 1) / ix->div + 1; + if(last != ix->buckets){ + seterr(ECorrupt, "inconsistent math for buckets in %s", ix->name); + freeindex(ix); + return nil; + } + + ix->arenas = MKNZ(Arena*, ix->narenas); + if(maparenas(ix->amap, ix->arenas, ix->narenas, ix->name) < 0){ + freeindex(ix); + return nil; + } + return ix; +} + +int +wbindex(Index *ix) +{ + Fmt f; + ZBlock *b; + int i; + + if(ix->nsects == 0){ + seterr(EOk, "no sections in index %s", ix->name); + return -1; + } + b = alloczblock(ix->tabsize, 1); + if(b == nil){ + seterr(EOk, "can't write index configuration: out of memory"); + return -1; + } + fmtzbinit(&f, b); + if(outputindex(&f, ix) < 0){ + seterr(EOk, "can't make index configuration: table storage too small %d", ix->tabsize); + freezblock(b); + return -1; + } + for(i = 0; i < ix->nsects; i++){ + if(writepart(ix->sects[i]->part, ix->sects[i]->tabbase, b->data, ix->tabsize) < 0){ + seterr(EOk, "can't write index: %r"); + freezblock(b); + return -1; + } + } + freezblock(b); + + for(i = 0; i < ix->nsects; i++) + if(wbisect(ix->sects[i]) < 0) + return -1; + + return 0; +} + +/* + * index: IndexMagic '\n' version '\n' name '\n' blocksize '\n' sections arenas + * version, blocksize: u32int + * name: max. ANameSize string + * sections, arenas: AMap + */ +int +outputindex(Fmt *f, Index *ix) +{ + if(fmtprint(f, "%s\n%ud\n%s\n%ud\n", IndexMagic, ix->version, ix->name, ix->blocksize) < 0 + || outputamap(f, ix->smap, ix->nsects) < 0 + || outputamap(f, ix->amap, ix->narenas) < 0) + return -1; + return 0; +} + +int +parseindex(IFile *f, Index *ix) +{ + AMapN amn; + u32int v; + char *s; + + /* + * magic + */ + s = ifileline(f); + if(s == nil || strcmp(s, IndexMagic) != 0){ + seterr(ECorrupt, "bad index magic for %s", f->name); + return -1; + } + + /* + * version + */ + if(ifileu32int(f, &v) < 0){ + seterr(ECorrupt, "syntax error: bad version number in %s", f->name); + return -1; + } + ix->version = v; + if(ix->version != IndexVersion){ + seterr(ECorrupt, "bad version number in %s", f->name); + return -1; + } + + /* + * name + */ + if(ifilename(f, ix->name) < 0){ + seterr(ECorrupt, "syntax error: bad index name in %s", f->name); + return -1; + } + + /* + * block size + */ + if(ifileu32int(f, &v) < 0){ + seterr(ECorrupt, "syntax error: bad version number in %s", f->name); + return -1; + } + ix->blocksize = v; + + if(parseamap(f, &amn) < 0) + return -1; + ix->nsects = amn.n; + ix->smap = amn.map; + + if(parseamap(f, &amn) < 0) + return -1; + ix->narenas = amn.n; + ix->amap = amn.map; + + return 0; +} + +/* + * initialize an entirely new index + */ +Index * +newindex(char *name, ISect **sects, int n) +{ + Index *ix; + AMap *smap; + u64int nb; + u32int div, ub, xb, start, stop, blocksize, tabsize; + int i, j; + + if(n < 1){ + seterr(EOk, "creating index with no index sections"); + return nil; + } + + /* + * compute the total buckets available in the index, + * and the total buckets which are used. + */ + nb = 0; + blocksize = sects[0]->blocksize; + tabsize = sects[0]->tabsize; + for(i = 0; i < n; i++){ + if(sects[i]->start != 0 || sects[i]->stop != 0 + || sects[i]->index[0] != '\0'){ + seterr(EOk, "creating new index using non-empty section %s", sects[i]->name); + return nil; + } + if(blocksize != sects[i]->blocksize){ + seterr(EOk, "mismatched block sizes in index sections"); + return nil; + } + if(tabsize != sects[i]->tabsize){ + seterr(EOk, "mismatched config table sizes in index sections"); + return nil; + } + nb += sects[i]->blocks; + } + + /* + * check for duplicate names + */ + for(i = 0; i < n; i++){ + for(j = i + 1; j < n; j++){ + if(namecmp(sects[i]->name, sects[j]->name) == 0){ + seterr(EOk, "duplicate section name %s for index %s", sects[i]->name, name); + return nil; + } + } + } + + if(nb >= ((u64int)1 << 32)){ + seterr(EBug, "index too large"); + return nil; + } + div = (((u64int)1 << 32) + nb - 1) / nb; + ub = (((u64int)1 << 32) - 1) / div + 1; + if(div < 100){ + seterr(EBug, "index divisor too coarse"); + return nil; + } + if(ub > nb){ + seterr(EBug, "index initialization math wrong"); + return nil; + } + + /* + * initialize each of the index sections + * and the section map table + */ + smap = MKNZ(AMap, n); + if(smap == nil){ + seterr(EOk, "can't create new index: out of memory"); + return nil; + } + xb = nb - ub; + start = 0; + for(i = 0; i < n; i++){ + stop = start + sects[i]->blocks - xb / n; + if(i == n - 1) + stop = ub; + sects[i]->start = start; + sects[i]->stop = stop; + namecp(sects[i]->index, name); + + smap[i].start = start; + smap[i].stop = stop; + namecp(smap[i].name, sects[i]->name); + start = stop; + } + + /* + * initialize the index itself + */ + ix = MKZ(Index); + if(ix == nil){ + seterr(EOk, "can't create new index: out of memory"); + free(smap); + return nil; + } + ix->version = IndexVersion; + namecp(ix->name, name); + ix->sects = sects; + ix->smap = smap; + ix->nsects = n; + ix->blocksize = blocksize; + ix->div = div; + ix->buckets = ub; + ix->tabsize = tabsize; + return ix; +} + +ISect* +initisect(Part *part) +{ + ISect *is; + ZBlock *b; + int ok; + + b = alloczblock(HeadSize, 0); + if(b == nil || readpart(part, PartBlank, b->data, HeadSize) < 0){ + seterr(EAdmin, "can't read index section header: %r"); + return nil; + } + + is = MKZ(ISect); + if(is == nil){ + freezblock(b); + return nil; + } + is->part = part; + ok = unpackisect(is, b->data); + freezblock(b); + if(ok < 0){ + seterr(ECorrupt, "corrupted index section header: %r"); + freeisect(is); + return nil; + } + + if(is->version != ISectVersion){ + seterr(EAdmin, "unknown index section version %d", is->version); + freeisect(is); + return nil; + } + + return initisect1(is); +} + +ISect* +newisect(Part *part, char *name, u32int blocksize, u32int tabsize) +{ + ISect *is; + u32int tabbase; + + is = MKZ(ISect); + if(is == nil) + return nil; + + namecp(is->name, name); + is->version = ISectVersion; + is->part = part; + is->blocksize = blocksize; + is->start = 0; + is->stop = 0; + tabbase = (PartBlank + HeadSize + blocksize - 1) & ~(blocksize - 1); + is->blockbase = (tabbase + tabsize + blocksize - 1) & ~(blocksize - 1); + is->blocks = is->part->size / blocksize - is->blockbase / blocksize; + + is = initisect1(is); + if(is == nil) + return nil; + + return is; +} + +/* + * initialize the computed paramaters for an index + */ +static ISect* +initisect1(ISect *is) +{ + u64int v; + + is->buckmax = (is->blocksize - IBucketSize) / IEntrySize; + is->blocklog = u64log2(is->blocksize); + if(is->blocksize != (1 << is->blocklog)){ + seterr(ECorrupt, "illegal non-power-of-2 bucket size %d\n", is->blocksize); + freeisect(is); + return nil; + } + partblocksize(is->part, is->blocksize); + is->tabbase = (PartBlank + HeadSize + is->blocksize - 1) & ~(is->blocksize - 1); + if(is->tabbase >= is->blockbase){ + seterr(ECorrupt, "index section config table overlaps bucket storage"); + freeisect(is); + return nil; + } + is->tabsize = is->blockbase - is->tabbase; + v = is->part->size & ~(u64int)(is->blocksize - 1); + if(is->blockbase + (u64int)is->blocks * is->blocksize != v){ + seterr(ECorrupt, "invalid blocks in index section %s", is->name); +//ZZZZZZZZZ +// freeisect(is); +// return nil; + } + + if(is->stop - is->start > is->blocks){ + seterr(ECorrupt, "index section overflows available space"); + freeisect(is); + return nil; + } + if(is->start > is->stop){ + seterr(ECorrupt, "invalid index section range"); + freeisect(is); + return nil; + } + + return is; +} + +int +wbisect(ISect *is) +{ + ZBlock *b; + + b = alloczblock(HeadSize, 1); + if(b == nil) +//ZZZ set error? + return -1; + + if(packisect(is, b->data) < 0){ + seterr(ECorrupt, "can't make index section header: %r"); + freezblock(b); + return -1; + } + if(writepart(is->part, PartBlank, b->data, HeadSize) < 0){ + seterr(EAdmin, "can't write index section header: %r"); + freezblock(b); + return -1; + } + freezblock(b); + + return 0; +} + +void +freeisect(ISect *is) +{ + if(is == nil) + return; + free(is); +} + +void +freeindex(Index *ix) +{ + int i; + + if(ix == nil) + return; + free(ix->amap); + free(ix->arenas); + if(ix->sects) + for(i = 0; i < ix->nsects; i++) + freeisect(ix->sects[i]); + free(ix->sects); + free(ix->smap); + free(ix); +} + +/* + * write a clump to an available arena in the index + * and return the address of the clump within the index. +ZZZ question: should this distinguish between an arena +filling up and real errors writing the clump? + */ +u64int +writeiclump(Index *ix, Clump *c, u8int *clbuf) +{ + u64int a; + int i; + + for(i = ix->mapalloc; i < ix->narenas; i++){ + a = writeaclump(ix->arenas[i], c, clbuf); + if(a != TWID64) + return a + ix->amap[i].start; + } + + seterr(EAdmin, "no space left in arenas"); + return -1; +} + +/* + * convert an arena index to an relative address address + */ +Arena* +amapitoa(Index *ix, u64int a, u64int *aa) +{ + int i, r, l, m; + + l = 1; + r = ix->narenas - 1; + while(l <= r){ + m = (r + l) / 2; + if(ix->amap[m].start <= a) + l = m + 1; + else + r = m - 1; + } + l--; + + if(a > ix->amap[l].stop){ +for(i=0; i<ix->narenas; i++) + print("arena %d: %llux - %llux\n", i, ix->amap[i].start, ix->amap[i].stop); +print("want arena %d for %llux\n", l, a); + seterr(ECrash, "unmapped address passed to amapitoa"); + return nil; + } + + if(ix->arenas[l] == nil){ + seterr(ECrash, "unmapped arena selected in amapitoa"); + return nil; + } + *aa = a - ix->amap[l].start; + return ix->arenas[l]; +} + +int +iaddrcmp(IAddr *ia1, IAddr *ia2) +{ + return ia1->type != ia2->type + || ia1->size != ia2->size + || ia1->blocks != ia2->blocks + || ia1->addr != ia2->addr; +} + +/* + * lookup the score in the partition + * + * nothing needs to be explicitly locked: + * only static parts of ix are used, and + * the bucket is locked by the DBlock lock. + */ +int +loadientry(Index *ix, u8int *score, int type, IEntry *ie) +{ + ISect *is; + DBlock *b; + IBucket ib; + u32int buck; + int h, ok; + +fprint(2, "loadientry %V %d\n", score, type); + buck = hashbits(score, 32) / ix->div; + ok = -1; + for(;;){ + qlock(&stats.lock); + stats.indexreads++; + qunlock(&stats.lock); + is = findisect(ix, buck); + if(is == nil){ + seterr(EAdmin, "bad math in loadientry"); + return -1; + } + buck -= is->start; + b = getdblock(is->part, is->blockbase + ((u64int)buck << is->blocklog), 1); + if(b == nil) + break; + + unpackibucket(&ib, b->data); + if(okibucket(&ib, is) < 0) + break; + + h = bucklook(score, type, ib.data, ib.n); + if(h & 1){ + h ^= 1; + unpackientry(ie, &ib.data[h]); + ok = 0; + break; + } + + break; + } + putdblock(b); + return ok; +} + +/* + * insert or update an index entry into the appropriate bucket + */ +int +storeientry(Index *ix, IEntry *ie) +{ + ISect *is; + DBlock *b; + IBucket ib; + u32int buck; + int h, ok; + + buck = hashbits(ie->score, 32) / ix->div; + ok = 0; + for(;;){ + qlock(&stats.lock); + stats.indexwreads++; + qunlock(&stats.lock); + is = findisect(ix, buck); + if(is == nil){ + seterr(EAdmin, "bad math in storeientry"); + return -1; + } + buck -= is->start; + b = getdblock(is->part, is->blockbase + ((u64int)buck << is->blocklog), 1); + if(b == nil) + break; + + unpackibucket(&ib, b->data); + if(okibucket(&ib, is) < 0) + break; + + h = bucklook(ie->score, ie->ia.type, ib.data, ib.n); + if(h & 1){ + h ^= 1; + packientry(ie, &ib.data[h]); + ok = writebucket(is, buck, &ib, b); + break; + } + + if(ib.n < is->buckmax){ + memmove(&ib.data[h + IEntrySize], &ib.data[h], ib.n * IEntrySize - h); + ib.n++; + + packientry(ie, &ib.data[h]); + ok = writebucket(is, buck, &ib, b); + break; + } + + break; + } + + putdblock(b); + return ok; +} + +static int +writebucket(ISect *is, u32int buck, IBucket *ib, DBlock *b) +{ + if(buck >= is->blocks) + seterr(EAdmin, "index write out of bounds: %d >= %d\n", + buck, is->blocks); + qlock(&stats.lock); + stats.indexwrites++; + qunlock(&stats.lock); + packibucket(ib, b->data); + return writepart(is->part, is->blockbase + ((u64int)buck << is->blocklog), b->data, is->blocksize); +} + +/* + * find the number of the index section holding score + */ +int +indexsect(Index *ix, u8int *score) +{ + u32int buck; + int r, l, m; + + buck = hashbits(score, 32) / ix->div; + l = 1; + r = ix->nsects - 1; + while(l <= r){ + m = (r + l) >> 1; + if(ix->sects[m]->start <= buck) + l = m + 1; + else + r = m - 1; + } + return l - 1; +} + +/* + * find the index section which holds buck + */ +ISect* +findisect(Index *ix, u32int buck) +{ + ISect *is; + int r, l, m; + + l = 1; + r = ix->nsects - 1; + while(l <= r){ + m = (r + l) >> 1; + if(ix->sects[m]->start <= buck) + l = m + 1; + else + r = m - 1; + } + is = ix->sects[l - 1]; + if(is->start <= buck && is->stop > buck) + return is; + return nil; +} + +static int +okibucket(IBucket *ib, ISect *is) +{ + if(ib->n <= is->buckmax && (ib->next == 0 || ib->next >= is->start && ib->next < is->stop)) + return 0; + + seterr(EICorrupt, "corrupted disk index bucket: n=%ud max=%ud, next=%lud range=[%lud,%lud)", + ib->n, is->buckmax, ib->next, is->start, is->stop); + return -1; +} + +/* + * look for score within data; + * return 1 | byte index of matching index, + * or 0 | index of least element > score + */ +static int +bucklook(u8int *score, int otype, u8int *data, int n) +{ + int i, r, l, m, h, c, cc, type; + + type = vttodisktype(otype); +fprint(2, "bucklook %V %d->%d %d\n", score, otype, type, n); + l = 0; + r = n - 1; + while(l <= r){ + m = (r + l) >> 1; + h = m * IEntrySize; +fprint(2, "perhaps %V %d\n", data+h, data[h+IEntryTypeOff]); + for(i = 0; i < VtScoreSize; i++){ + c = score[i]; + cc = data[h + i]; + if(c != cc){ + if(c > cc) + l = m + 1; + else + r = m - 1; + goto cont; + } + } + cc = data[h + IEntryTypeOff]; + if(type != cc){ + if(type > cc) + l = m + 1; + else + r = m - 1; + goto cont; + } + return h | 1; + cont:; + } + + return l * IEntrySize; +} + +/* + * compare two IEntries; consistent with bucklook + */ +int +ientrycmp(const void *vie1, const void *vie2) +{ + u8int *ie1, *ie2; + int i, v1, v2; + + ie1 = (u8int*)vie1; + ie2 = (u8int*)vie2; + for(i = 0; i < VtScoreSize; i++){ + v1 = ie1[i]; + v2 = ie2[i]; + if(v1 != v2){ + if(v1 < v2) + return -1; + return 0; + } + } + v1 = ie1[IEntryTypeOff]; + v2 = ie2[IEntryTypeOff]; + if(v1 != v2){ + if(v1 < v2) + return -1; + return 0; + } + return -1; +} diff --git a/src/cmd/venti/lump.c b/src/cmd/venti/lump.c new file mode 100644 index 00000000..c449f022 --- /dev/null +++ b/src/cmd/venti/lump.c @@ -0,0 +1,206 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +int queuewrites = 0; + +static Packet *readilump(Lump *u, IAddr *ia, u8int *score, int rac); + +Packet* +readlump(u8int *score, int type, u32int size) +{ + Lump *u; + Packet *p; + IAddr ia; + u32int n; + int rac; + + qlock(&stats.lock); + stats.lumpreads++; + qunlock(&stats.lock); + u = lookuplump(score, type); + if(u->data != nil){ + n = packetsize(u->data); + if(n > size){ + seterr(EOk, "read too small: asked for %d need at least %d", size, n); + putlump(u); + + return nil; + } + p = packetdup(u->data, 0, n); + putlump(u); + + return p; + } + + if(lookupscore(score, type, &ia, &rac) < 0){ + //ZZZ place to check for someone trying to guess scores + seterr(EOk, "no block with that score exists"); + + putlump(u); + return nil; + } + if(ia.size > size){ + seterr(EOk, "read too small 1: asked for %d need at least %d", size, ia.size); + + putlump(u); + return nil; + } + + p = readilump(u, &ia, score, rac); + putlump(u); + + return p; +} + +/* + * save away a lump, and return it's score. + * doesn't store duplicates, but checks that the data is really the same. + */ +int +writelump(Packet *p, u8int *score, int type, u32int creator) +{ + Lump *u; + int ok; + + qlock(&stats.lock); + stats.lumpwrites++; + qunlock(&stats.lock); + + packetsha1(p, score); + + u = lookuplump(score, type); + if(u->data != nil){ + ok = 0; + if(packetcmp(p, u->data) != 0){ + seterr(EStrange, "score collision"); + ok = -1; + } + packetfree(p); + putlump(u); + return ok; + } + +print("writelump %08x\n", mainindex->arenas[0]); + if(queuewrites) + return queuewrite(u, p, creator); + + ok = writeqlump(u, p, creator); + + putlump(u); + return ok; +} + +int +writeqlump(Lump *u, Packet *p, int creator) +{ + ZBlock *flat; + Packet *old; + IAddr ia; + int ok; + int rac; + + if(lookupscore(u->score, u->type, &ia, &rac) == 0){ + /* + * if the read fails, + * assume it was corrupted data and store the block again + */ + old = readilump(u, &ia, u->score, rac); + if(old != nil){ + ok = 0; + if(packetcmp(p, old) != 0){ + seterr(EStrange, "score collision"); + ok = -1; + } + packetfree(p); + packetfree(old); + + return ok; + } + logerr(EAdmin, "writelump: read %V failed, rewriting: %r\n", u->score); + } + + flat = packet2zblock(p, packetsize(p)); + ok = storeclump(mainindex, flat, u->score, u->type, creator, &ia); + freezblock(flat); + if(ok == 0) + ok = insertscore(u->score, &ia, 1); + if(ok == 0) + insertlump(u, p); + else + packetfree(p); + + return ok; +} + +static void +readahead(u64int a, Arena *arena, u64int aa, int n) +{ + u8int buf[ClumpSize]; + Clump cl; + IAddr ia; + + while(n > 0) { + if (aa >= arena->used) + break; + if(readarena(arena, aa, buf, ClumpSize) < ClumpSize) + break; + if(unpackclump(&cl, buf) < 0) + break; + ia.addr = a; + ia.type = cl.info.type; + ia.size = cl.info.uncsize; + ia.blocks = (cl.info.size + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog; + insertscore(cl.info.score, &ia, 0); + a += ClumpSize + cl.info.size; + aa += ClumpSize + cl.info.size; + n--; + } +} + +static Packet* +readilump(Lump *u, IAddr *ia, u8int *score, int rac) +{ + Arena *arena; + ZBlock *zb; + Packet *p, *pp; + Clump cl; + u64int a, aa; + u8int sc[VtScoreSize]; + + arena = amapitoa(mainindex, ia->addr, &aa); + if(arena == nil) + return nil; + + zb = loadclump(arena, aa, ia->blocks, &cl, sc, paranoid); + if(zb == nil) + return nil; + + if(ia->size != cl.info.uncsize){ + seterr(EInconsist, "index and clump size mismatch"); + freezblock(zb); + return nil; + } + if(ia->type != cl.info.type){ + seterr(EInconsist, "index and clump type mismatch"); + freezblock(zb); + return nil; + } + if(scorecmp(score, sc) != 0){ + seterr(ECrash, "score mismatch"); + freezblock(zb); + return nil; + } + + if(rac == 0) { + a = ia->addr + ClumpSize + cl.info.size; + aa += ClumpSize + cl.info.size; + readahead(a, arena, aa, 20); + } + + p = zblock2packet(zb, cl.info.uncsize); + freezblock(zb); + pp = packetdup(p, 0, packetsize(p)); + insertlump(u, pp); + return p; +} diff --git a/src/cmd/venti/lumpcache.c b/src/cmd/venti/lumpcache.c new file mode 100644 index 00000000..979271af --- /dev/null +++ b/src/cmd/venti/lumpcache.c @@ -0,0 +1,380 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +typedef struct LumpCache LumpCache; + +enum +{ + HashLog = 9, + HashSize = 1<<HashLog, + HashMask = HashSize - 1, +}; + +struct LumpCache +{ + QLock lock; + Rendez full; + Lump *free; /* list of available lumps */ + u32int allowed; /* total allowable space for packets */ + u32int avail; /* remaining space for packets */ + u32int now; /* ticks for usage timestamps */ + Lump **heads; /* hash table for finding address */ + int nheap; /* number of available victims */ + Lump **heap; /* heap for locating victims */ + int nblocks; /* number of blocks allocated */ + Lump *blocks; /* array of block descriptors */ +}; + +static LumpCache lumpcache; + +static void delheap(Lump *db); +static int downheap(int i, Lump *b); +static void fixheap(int i, Lump *b); +static int upheap(int i, Lump *b); +static Lump *bumplump(void); + +void +initlumpcache(u32int size, u32int nblocks) +{ + Lump *last, *b; + int i; + + lumpcache.full.l = &lumpcache.lock; + lumpcache.nblocks = nblocks; + lumpcache.allowed = size; + lumpcache.avail = size; + lumpcache.heads = MKNZ(Lump*, HashSize); + lumpcache.heap = MKNZ(Lump*, nblocks); + lumpcache.blocks = MKNZ(Lump, nblocks); + + last = nil; + for(i = 0; i < nblocks; i++){ + b = &lumpcache.blocks[i]; + b->type = TWID8; + b->heap = TWID32; + b->next = last; + last = b; + } + lumpcache.free = last; + lumpcache.nheap = 0; +} + +Lump* +lookuplump(u8int *score, int type) +{ + Lump *b; + u32int h; + + h = hashbits(score, HashLog); + + /* + * look for the block in the cache + */ +//checklumpcache(); + qlock(&lumpcache.lock); +again: + for(b = lumpcache.heads[h]; b != nil; b = b->next){ + if(scorecmp(score, b->score)==0 && type == b->type){ + qlock(&stats.lock); + stats.lumphit++; + qunlock(&stats.lock); + goto found; + } + } + + /* + * missed: locate the block with the oldest second to last use. + * remove it from the heap, and fix up the heap. + */ + while(lumpcache.free == nil){ + if(bumplump() == nil){ + logerr(EAdmin, "all lump cache blocks in use"); + rsleep(&lumpcache.full); + goto again; + } + } + qlock(&stats.lock); + stats.lumpmiss++; + qunlock(&stats.lock); + + b = lumpcache.free; + lumpcache.free = b->next; + + /* + * the new block has no last use, so assume it happens sometime in the middle +ZZZ this is not reasonable + */ + b->used = (b->used2 + lumpcache.now) / 2; + + /* + * rechain the block on the correct hash chain + */ + b->next = lumpcache.heads[h]; + lumpcache.heads[h] = b; + if(b->next != nil) + b->next->prev = b; + b->prev = nil; + + scorecp(b->score, score); + b->type = type; + b->size = 0; + b->data = nil; + +found: + b->ref++; + b->used2 = b->used; + b->used = lumpcache.now++; + if(b->heap != TWID32) + fixheap(b->heap, b); + qunlock(&lumpcache.lock); + +//checklumpcache(); + + qlock(&b->lock); + + return b; +} + +void +insertlump(Lump *b, Packet *p) +{ + u32int size; + + /* + * look for the block in the cache + */ +//checklumpcache(); + qlock(&lumpcache.lock); +again: + + /* + * missed: locate the block with the oldest second to last use. + * remove it from the heap, and fix up the heap. + */ + size = packetasize(p); +//ZZZ + while(lumpcache.avail < size){ + if(bumplump() == nil){ + logerr(EAdmin, "all lump cache blocks in use"); + rsleep(&lumpcache.full); + goto again; + } + } + b->data = p; + b->size = size; + lumpcache.avail -= size; + + qunlock(&lumpcache.lock); +//checklumpcache(); +} + +void +putlump(Lump *b) +{ + if(b == nil) + return; + + qunlock(&b->lock); +//checklumpcache(); + qlock(&lumpcache.lock); + if(--b->ref == 0){ + if(b->heap == TWID32) + upheap(lumpcache.nheap++, b); + rwakeup(&lumpcache.full); + } + + qunlock(&lumpcache.lock); +//checklumpcache(); +} + +/* + * remove some lump from use and update the free list and counters + */ +static Lump* +bumplump(void) +{ + Lump *b; + u32int h; + + /* + * remove blocks until we find one that is unused + * referenced blocks are left in the heap even though + * they can't be scavenged; this is simple a speed optimization + */ + for(;;){ + if(lumpcache.nheap == 0) + return nil; + b = lumpcache.heap[0]; + delheap(b); + if(!b->ref){ + rwakeup(&lumpcache.full); + break; + } + } + + /* + * unchain the block + */ + if(b->prev == nil){ + h = hashbits(b->score, HashLog); + if(lumpcache.heads[h] != b) + sysfatal("bad hash chains in lump cache"); + lumpcache.heads[h] = b->next; + }else + b->prev->next = b->next; + if(b->next != nil) + b->next->prev = b->prev; + + if(b->data != nil){ + packetfree(b->data); + b->data = nil; + lumpcache.avail += b->size; + b->size = 0; + } + b->type = TWID8; + + b->next = lumpcache.free; + lumpcache.free = b; + + return b; +} + +/* + * delete an arbitrary block from the heap + */ +static void +delheap(Lump *db) +{ + fixheap(db->heap, lumpcache.heap[--lumpcache.nheap]); + db->heap = TWID32; +} + +/* + * push an element up or down to it's correct new location + */ +static void +fixheap(int i, Lump *b) +{ + if(upheap(i, b) == i) + downheap(i, b); +} + +static int +upheap(int i, Lump *b) +{ + Lump *bb; + u32int now; + int p; + + now = lumpcache.now; + for(; i != 0; i = p){ + p = (i - 1) >> 1; + bb = lumpcache.heap[p]; + if(b->used2 - now >= bb->used2 - now) + break; + lumpcache.heap[i] = bb; + bb->heap = i; + } + + lumpcache.heap[i] = b; + b->heap = i; + return i; +} + +static int +downheap(int i, Lump *b) +{ + Lump *bb; + u32int now; + int k; + + now = lumpcache.now; + for(; ; i = k){ + k = (i << 1) + 1; + if(k >= lumpcache.nheap) + break; + if(k + 1 < lumpcache.nheap && lumpcache.heap[k]->used2 - now > lumpcache.heap[k + 1]->used2 - now) + k++; + bb = lumpcache.heap[k]; + if(b->used2 - now <= bb->used2 - now) + break; + lumpcache.heap[i] = bb; + bb->heap = i; + } + + lumpcache.heap[i] = b; + b->heap = i; + return i; +} + +static void +findblock(Lump *bb) +{ + Lump *b, *last; + int h; + + last = nil; + h = hashbits(bb->score, HashLog); + for(b = lumpcache.heads[h]; b != nil; b = b->next){ + if(last != b->prev) + sysfatal("bad prev link"); + if(b == bb) + return; + last = b; + } + sysfatal("block score=%V type=%#x missing from hash table", bb->score, bb->type); +} + +void +checklumpcache(void) +{ + Lump *b; + u32int size, now, nfree; + int i, k, refed; + + qlock(&lumpcache.lock); + now = lumpcache.now; + for(i = 0; i < lumpcache.nheap; i++){ + if(lumpcache.heap[i]->heap != i) + sysfatal("lc: mis-heaped at %d: %d", i, lumpcache.heap[i]->heap); + if(i > 0 && lumpcache.heap[(i - 1) >> 1]->used2 - now > lumpcache.heap[i]->used2 - now) + sysfatal("lc: bad heap ordering"); + k = (i << 1) + 1; + if(k < lumpcache.nheap && lumpcache.heap[i]->used2 - now > lumpcache.heap[k]->used2 - now) + sysfatal("lc: bad heap ordering"); + k++; + if(k < lumpcache.nheap && lumpcache.heap[i]->used2 - now > lumpcache.heap[k]->used2 - now) + sysfatal("lc: bad heap ordering"); + } + + refed = 0; + size = 0; + for(i = 0; i < lumpcache.nblocks; i++){ + b = &lumpcache.blocks[i]; + if(b->data == nil && b->size != 0) + sysfatal("bad size: %d data=%p", b->size, b->data); + if(b->ref && b->heap == TWID32) + refed++; + if(b->type != TWID8){ + findblock(b); + size += b->size; + } + if(b->heap != TWID32 + && lumpcache.heap[b->heap] != b) + sysfatal("lc: spurious heap value"); + } + if(lumpcache.avail != lumpcache.allowed - size) + sysfatal("mismatched available=%d and allowed=%d - used=%d space", lumpcache.avail, lumpcache.allowed, size); + + nfree = 0; + for(b = lumpcache.free; b != nil; b = b->next){ + if(b->type != TWID8 || b->heap != TWID32) + sysfatal("lc: bad free list"); + nfree++; + } + + if(lumpcache.nheap + nfree + refed != lumpcache.nblocks) + sysfatal("lc: missing blocks: %d %d %d %d", lumpcache.nheap, refed, nfree, lumpcache.nblocks); + qunlock(&lumpcache.lock); +} diff --git a/src/cmd/venti/lumpqueue.c b/src/cmd/venti/lumpqueue.c new file mode 100644 index 00000000..450c7271 --- /dev/null +++ b/src/cmd/venti/lumpqueue.c @@ -0,0 +1,150 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +typedef struct LumpQueue LumpQueue; +typedef struct WLump WLump; + +enum +{ + MaxLumpQ = 1 << 3 /* max. lumps on a single write queue, must be pow 2 */ +}; + +struct WLump +{ + Lump *u; + Packet *p; + int creator; + int gen; +}; + +struct LumpQueue +{ + QLock lock; + Rendez flush; + Rendez full; + Rendez empty; + WLump q[MaxLumpQ]; + int w; + int r; +}; + +static LumpQueue *lumpqs; +static int nqs; + +static QLock glk; +static int gen; + +static void queueproc(void *vq); + +int +initlumpqueues(int nq) +{ + LumpQueue *q; + + int i; + nqs = nq; + + lumpqs = MKNZ(LumpQueue, nq); + + for(i = 0; i < nq; i++){ + q = &lumpqs[i]; + q->full.l = &q->lock; + q->empty.l = &q->lock; + q->flush.l = &q->lock; + + if(vtproc(queueproc, q) < 0){ + seterr(EOk, "can't start write queue slave: %r"); + return -1; + } + } + + return 0; +} + +/* + * queue a lump & it's packet data for writing + */ +int +queuewrite(Lump *u, Packet *p, int creator) +{ + LumpQueue *q; + int i; + + i = indexsect(mainindex, u->score); + if(i < 0 || i >= nqs){ + seterr(EBug, "internal error: illegal index section in queuewrite"); + return -1; + } + + q = &lumpqs[i]; + + qlock(&q->lock); + while(q->r == ((q->w + 1) & (MaxLumpQ - 1))) + rsleep(&q->full); + + q->q[q->w].u = u; + q->q[q->w].p = p; + q->q[q->w].creator = creator; + q->q[q->w].gen = gen; + q->w = (q->w + 1) & (MaxLumpQ - 1); + + rwakeup(&q->empty); + + qunlock(&q->lock); + + return 0; +} + +void +queueflush(void) +{ + int i; + LumpQueue *q; + + qlock(&glk); + gen++; + qunlock(&glk); + + for(i=0; i<mainindex->nsects; i++){ + q = &lumpqs[i]; + qlock(&q->lock); + while(q->w != q->r && gen - q->q[q->r].gen > 0) + rsleep(&q->flush); + qunlock(&q->lock); + } +} + +static void +queueproc(void *vq) +{ + LumpQueue *q; + Lump *u; + Packet *p; + int creator; + + q = vq; + for(;;){ + qlock(&q->lock); + while(q->w == q->r) + rsleep(&q->empty); + + u = q->q[q->r].u; + p = q->q[q->r].p; + creator = q->q[q->r].creator; + + rwakeup(&q->full); + + qunlock(&q->lock); + + if(writeqlump(u, p, creator) < 0) + fprint(2, "failed to write lump for %V: %r", u->score); + + qlock(&q->lock); + q->r = (q->r + 1) & (MaxLumpQ - 1); + rwakeup(&q->flush); + qunlock(&q->lock); + + putlump(u); + } +} diff --git a/src/cmd/venti/mkfile b/src/cmd/venti/mkfile new file mode 100644 index 00000000..48f2c60f --- /dev/null +++ b/src/cmd/venti/mkfile @@ -0,0 +1,89 @@ +PLAN9=../../.. +<$PLAN9/src/mkhdr + +LIBOFILES=\ + arena.$O\ + arenas.$O\ + buildbuck.$O\ + clump.$O\ + config.$O\ + conv.$O\ + dcache.$O\ + dump.$O\ + httpd.$O\ + icache.$O\ + ifile.$O\ + index.$O\ + lump.$O\ + lumpcache.$O\ + lumpqueue.$O\ + part.$O\ + score.$O\ + sortientry.$O\ + stats.$O\ + syncarena.$O\ + syncindex0.$O\ + unwhack.$O\ + utils.$O\ + unittoull.$O\ + whack.$O\ + xml.$O\ + zeropart.$O\ + +SLIB=libvs.a + +LIB=$SLIB\ + $PLAN9/lib/libventi.a\ + $PLAN9/lib/libhttpd.a\ + $PLAN9/lib/libbin.a\ + $PLAN9/lib/libsec.a\ + $PLAN9/lib/libthread.a\ + $PLAN9/lib/lib9.a\ + $PLAN9/lib/libfmt.a\ + $PLAN9/lib/libutf.a\ + +HFILES= dat.h\ + fns.h\ + stdinc.h\ + +TARG=\ +# venti\ + fmtarenas\ + fmtisect\ + fmtindex\ + buildindex\ + checkarenas\ + checkindex\ + clumpstats\ + findscore\ + rdarena\ + wrarena\ + syncindex\ + verifyarena\ + sync\ + read\ + write\ + copy\ + +BIN=$BIN/venti + +it:V: all + +<$PLAN9/src/mkmany + +# xml.c:D: mkxml dat.h +# ./mkxml dat.h > xml.c + +$SLIB(%.$O):N: %.$O +$SLIB: ${LIBOFILES:%=$SLIB(%)} + names=`echo $newprereq | sed -E 's/'$SLIB'\(([^)]+)\)/\1/g'` + # names = `{echo $newprereq |sed 's/ /\n/g' |sed -n 's/'$SLIB'\(([^)]+)\)/\1/gp'} + ar rvc $SLIB $names +# rm $names + +ainstall:V: ${TARG:%=%.ainstall} + +%.ainstall:V: $O.% + scp $prereq amsterdam:/usr/local/bin/venti/$stem + +LDFLAGS=$LDFLAGS -l9 diff --git a/src/cmd/venti/mkroot.c b/src/cmd/venti/mkroot.c new file mode 100644 index 00000000..67e5d0e9 --- /dev/null +++ b/src/cmd/venti/mkroot.c @@ -0,0 +1,59 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +char *host; + +void +usage(void) +{ + fprint(2, "usage: mkroot [-h host] name type score blocksize prev\n"); + threadexitsall("usage"); +} + +void +threadmain(int argc, char *argv[]) +{ + uchar score[VtScoreSize]; + uchar buf[VtRootSize]; + VtConn *z; + VtRoot root; + + ARGBEGIN{ + case 'h': + host = EARGF(usage()); + break; + default: + usage(); + break; + }ARGEND + + if(argc != 5) + usage(); + + fmtinstall('V', vtscorefmt); + + strecpy(root.name, root.name+sizeof root.name, argv[0]); + strecpy(root.type, root.type+sizeof root.type, argv[1]); + if(vtparsescore(argv[2], strlen(argv[2]), nil, root.score) < 0) + sysfatal("bad score '%s'", argv[2]); + root.blocksize = atoi(argv[3]); + if(vtparsescore(argv[4], strlen(argv[4]), nil, root.prev) < 0) + sysfatal("bad score '%s'", argv[4]); + vtrootpack(&root, buf); + + z = vtdial(host); + if(z == nil) + sysfatal("could not connect to server: %r"); + + if(vtconnect(z) < 0) + sysfatal("vtconnect: %r"); + + if(vtwrite(z, score, VtRootType, buf, VtRootSize) < 0) + sysfatal("vtwrite: %r"); + if(vtsync(z) < 0) + sysfatal("vtsync: %r"); + vthangup(z); + print("%V\n", score); + threadexitsall(0); +} diff --git a/src/cmd/venti/part.c b/src/cmd/venti/part.c new file mode 100644 index 00000000..dbf7b860 --- /dev/null +++ b/src/cmd/venti/part.c @@ -0,0 +1,130 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +u32int maxblocksize; +int readonly; + +Part* +initpart(char *name, int writable) +{ + Part *part; + Dir *dir; + int how; + + part = MK(Part); + part->name = estrdup(name); + if(!writable && readonly) + how = OREAD; + else + how = ORDWR; + part->fd = open(name, how); + if(part->fd < 0){ + if(how == ORDWR) + part->fd = open(name, OREAD); + if(part->fd < 0){ + freepart(part); + seterr(EOk, "can't open partition='%s': %r", name); + return nil; + } + fprint(2, "warning: %s opened for reading only\n", name); + } + dir = dirfstat(part->fd); + if(dir == nil){ + freepart(part); + seterr(EOk, "can't stat partition='%s': %r", name); + return nil; + } + part->size = dir->length; + part->blocksize = 0; + free(dir); + return part; +} + +void +freepart(Part *part) +{ + if(part == nil) + return; + close(part->fd); + free(part->name); + free(part); +} + +void +partblocksize(Part *part, u32int blocksize) +{ + if(part->blocksize) + sysfatal("resetting partition=%s's block size", part->name); + part->blocksize = blocksize; + if(blocksize > maxblocksize) + maxblocksize = blocksize; +} + +int +writepart(Part *part, u64int addr, u8int *buf, u32int n) +{ + long m, mm, nn; + + qlock(&stats.lock); + stats.diskwrites++; + stats.diskbwrites += n; + qunlock(&stats.lock); + + if(addr > part->size || addr + n > part->size){ + seterr(ECorrupt, "out of bounds write to partition='%s'", part->name); + return -1; + } + print("write %s %lud at %llud\n", part->name, n, addr); + for(nn = 0; nn < n; nn += m){ + mm = n - nn; + if(mm > MaxIo) + mm = MaxIo; + m = pwrite(part->fd, &buf[nn], mm, addr + nn); + if(m != mm){ + if(m < 0){ + seterr(EOk, "can't write partition='%s': %r", part->name); + return -1; + } + logerr(EOk, "truncated write to partition='%s' n=%ld wrote=%ld", part->name, mm, m); + } + } + return 0; +} + +int +readpart(Part *part, u64int addr, u8int *buf, u32int n) +{ + long m, mm, nn; + int i; + + qlock(&stats.lock); + stats.diskreads++; + stats.diskbreads += n; + qunlock(&stats.lock); + + if(addr > part->size || addr + n > part->size){ + seterr(ECorrupt, "out of bounds read from partition='%s': addr=%lld n=%d size=%lld", part->name, addr, n, part->size); + return -1; + } + print("read %s %lud at %llud\n", part->name, n, addr); + for(nn = 0; nn < n; nn += m){ + mm = n - nn; + if(mm > MaxIo) + mm = MaxIo; + m = -1; + for(i=0; i<4; i++) { + m = pread(part->fd, &buf[nn], mm, addr + nn); + if(m == mm) + break; + } + if(m != mm){ + if(m < 0){ + seterr(EOk, "can't read partition='%s': %r", part->name); + return -1; + } + logerr(EOk, "warning: truncated read from partition='%s' n=%ld read=%ld", part->name, mm, m); + } + } + return 0; +} diff --git a/src/cmd/venti/printarena.c b/src/cmd/venti/printarena.c new file mode 100644 index 00000000..95121a9f --- /dev/null +++ b/src/cmd/venti/printarena.c @@ -0,0 +1,128 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +int readonly = 1; /* for part.c */ + +void +usage(void) +{ + fprint(2, "usage: printarena arenafile [offset]\n"); + threadexitsall("usage"); +} + +static void +rdarena(Arena *arena, u64int offset) +{ + u64int a, aa, e; + u32int magic; + Clump cl; + uchar score[VtScoreSize]; + ZBlock *lump; + + printarena(2, arena); + + a = arena->base; + e = arena->base + arena->size; + if(offset != ~(u64int)0) { + if(offset >= e-a) + sysfatal("bad offset %llud >= %llud\n", + offset, e-a); + aa = offset; + } else + aa = 0; + + for(; aa < e; aa += ClumpSize+cl.info.size) { + magic = clumpmagic(arena, aa); + if(magic == ClumpFreeMagic) + break; + if(magic != ClumpMagic) { + fprint(2, "illegal clump magic number %#8.8ux offset %llud\n", + magic, aa); + break; + } + lump = loadclump(arena, aa, 0, &cl, score, 0); + if(lump == nil) { + fprint(2, "clump %llud failed to read: %r\n", aa); + break; + } + if(cl.info.type != VtTypeCorrupt) { + scoremem(score, lump->data, cl.info.uncsize); + if(scorecmp(cl.info.score, score) != 0) { + fprint(2, "clump %llud has mismatched score\n", aa); + break; + } + if(vttypevalid(cl.info.type) < 0) { + fprint(2, "clump %llud has bad type %d\n", aa, cl.info.type); + break; + } + } + print("%llud %V %d %d\n", aa, score, cl.info.type, cl.info.uncsize); + freezblock(lump); + } + print("end offset %llud\n", aa); +} + +void +threadmain(int argc, char *argv[]) +{ + char *file; + Arena *arena; + u64int offset, aoffset; + Part *part; + Dir *d; + uchar buf[8192]; + ArenaHead head; + + aoffset = 0; + ARGBEGIN{ + case 'o': + aoffset = strtoull(EARGF(usage()), 0, 0); + break; + default: + usage(); + break; + }ARGEND + + offset = ~(u64int)0; + switch(argc) { + default: + usage(); + case 2: + offset = strtoull(argv[1], 0, 0); + /* fall through */ + case 1: + file = argv[0]; + } + + + fmtinstall('V', vtscorefmt); + + statsinit(); + + if((d = dirstat(file)) == nil) + sysfatal("can't stat file %s: %r", file); + + part = initpart(file, 0); + if(part == nil) + sysfatal("can't open file %s: %r", file); + if(readpart(part, aoffset, buf, sizeof buf) < 0) + sysfatal("can't read file %s: %r", file); + + if(unpackarenahead(&head, buf) < 0) + sysfatal("corrupted arena header: %r"); + + if(aoffset+head.size > d->length) + sysfatal("arena is truncated: want %llud bytes have %llud\n", + head.size, d->length); + + partblocksize(part, head.blocksize); + initdcache(8 * MaxDiskBlock); + + arena = initarena(part, aoffset, head.size, head.blocksize); + if(arena == nil) + sysfatal("initarena: %r"); + + rdarena(arena, offset); + threadexitsall(0); +} diff --git a/src/cmd/venti/rdarena.c b/src/cmd/venti/rdarena.c new file mode 100644 index 00000000..8d130def --- /dev/null +++ b/src/cmd/venti/rdarena.c @@ -0,0 +1,91 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +static int verbose; + +void +usage(void) +{ + fprint(2, "usage: rdarena [-v] arenapart arena\n"); + threadexitsall(0); +} + +static void +rdarena(Arena *arena) +{ + ZBlock *b; + u64int a, e; + u32int bs; + + fprint(2, "copying %s to standard output\n", arena->name); + printarena(2, arena); + + bs = MaxIoSize; + if(bs < arena->blocksize) + bs = arena->blocksize; + + b = alloczblock(bs, 0); + e = arena->base + arena->size + arena->blocksize; + for(a = arena->base - arena->blocksize; a + arena->blocksize <= e; a += bs){ + if(a + bs > e) + bs = arena->blocksize; + if(readpart(arena->part, a, b->data, bs) < 0) + fprint(2, "can't copy %s, read at %lld failed: %r\n", arena->name, a); + if(write(1, b->data, bs) != bs) + sysfatal("can't copy %s, write at %lld failed: %r", arena->name, a); + } + + freezblock(b); +} + +void +threadmain(int argc, char *argv[]) +{ + ArenaPart *ap; + Part *part; + char *file, *aname; + int i; + + fmtinstall('V', vtscorefmt); + statsinit(); + + ARGBEGIN{ + case 'v': + verbose++; + break; + default: + usage(); + break; + }ARGEND + + readonly = 1; + + if(argc != 2) + usage(); + + file = argv[0]; + aname = argv[1]; + + part = initpart(file, 0); + if(part == nil) + sysfatal("can't open partition %s: %r", file); + + ap = initarenapart(part); + if(ap == nil) + sysfatal("can't initialize arena partition in %s: %r", file); + + if(verbose) + printarenapart(2, ap); + + initdcache(8 * MaxDiskBlock); + + for(i = 0; i < ap->narenas; i++){ + if(strcmp(ap->arenas[i]->name, aname) == 0){ + rdarena(ap->arenas[i]); + threadexitsall(0); + } + } + + sysfatal("couldn't find arena %s\n", aname); +} diff --git a/src/cmd/venti/read.c b/src/cmd/venti/read.c new file mode 100644 index 00000000..969a0593 --- /dev/null +++ b/src/cmd/venti/read.c @@ -0,0 +1,102 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +char *host; + +void +usage(void) +{ + fprint(2, "usage: read [-h host] score [type]\n"); + threadexitsall("usage"); +} + +int +parsescore(uchar *score, char *buf, int n) +{ + int i, c; + + memset(score, 0, VtScoreSize); + + if(n != VtScoreSize*2){ + werrstr("score wrong length %d", n); + return -1; + } + for(i=0; i<VtScoreSize*2; i++) { + if(buf[i] >= '0' && buf[i] <= '9') + c = buf[i] - '0'; + else if(buf[i] >= 'a' && buf[i] <= 'f') + c = buf[i] - 'a' + 10; + else if(buf[i] >= 'A' && buf[i] <= 'F') + c = buf[i] - 'A' + 10; + else { + c = buf[i]; + werrstr("bad score char %d '%c'", c, c); + return -1; + } + + if((i & 1) == 0) + c <<= 4; + + score[i>>1] |= c; + } + return 0; +} + +void +threadmain(int argc, char *argv[]) +{ + int type, n; + uchar score[VtScoreSize]; + uchar *buf; + VtConn *z; + + ARGBEGIN{ + case 'h': + host = EARGF(usage()); + break; + default: + usage(); + break; + }ARGEND + + if(argc != 1 && argc != 2) + usage(); + + + fmtinstall('V', vtscorefmt); + + if(parsescore(score, argv[0], strlen(argv[0])) < 0) + sysfatal("could not parse score '%s': %r", argv[0]); + + buf = vtmallocz(VtMaxLumpSize); + + z = vtdial(host); + if(z == nil) + sysfatal("could not connect to server: %r"); + + if(vtconnect(z) < 0) + sysfatal("vtconnect: %r"); + + if(argc == 1){ + n = -1; + for(type=0; type<VtMaxType; type++){ + n = vtread(z, score, type, buf, VtMaxLumpSize); + if(n >= 0){ + fprint(2, "venti/read%s%s %V %d\n", host ? " -h" : "", host ? host : "", + score, type); + break; + } + } + }else{ + type = atoi(argv[1]); + n = vtread(z, score, type, buf, VtMaxLumpSize); + } + vthangup(z); + if(n < 0) + sysfatal("could not read block: %r"); + if(write(1, buf, n) != n) + sysfatal("write: %r"); + + threadexitsall(0); +} diff --git a/src/cmd/venti/score.c b/src/cmd/venti/score.c new file mode 100644 index 00000000..0809e84f --- /dev/null +++ b/src/cmd/venti/score.c @@ -0,0 +1,43 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +u8int zeroscore[VtScoreSize]; + +void +scoremem(u8int *score, u8int *buf, int n) +{ + DigestState s; + + memset(&s, 0, sizeof s); + sha1(buf, n, score, &s); +} + +static int +hexv(int c) +{ + if(c >= '0' && c <= '9') + return c - '0'; + if(c >= 'a' && c <= 'f') + return c - 'a' + 10; + if(c >= 'A' && c <= 'F') + return c - 'A' + 10; + return -1; +} + +int +strscore(char *s, u8int *score) +{ + int i, c, d; + + for(i = 0; i < VtScoreSize; i++){ + c = hexv(s[2 * i]); + if(c < 0) + return -1; + d = hexv(s[2 * i + 1]); + if(d < 0) + return -1; + score[i] = (c << 4) + d; + } + return s[2 * i] == '\0'; +} diff --git a/src/cmd/venti/sortientry.c b/src/cmd/venti/sortientry.c new file mode 100644 index 00000000..0e9f4b1e --- /dev/null +++ b/src/cmd/venti/sortientry.c @@ -0,0 +1,332 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +typedef struct IEBuck IEBuck; +typedef struct IEBucks IEBucks; + +enum +{ + ClumpChunks = 32*1024 +}; + +struct IEBuck +{ + u32int head; /* head of chain of chunks on the disk */ + u32int used; /* usage of the last chunk */ + u64int total; /* total number of bytes in this bucket */ + u8int *buf; /* chunk of entries for this bucket */ +}; + +struct IEBucks +{ + Part *part; + u64int off; /* offset for writing data in the partition */ + u32int chunks; /* total chunks written to fd */ + u64int max; /* max bytes entered in any one bucket */ + int bits; /* number of bits in initial bucket sort */ + int nbucks; /* 1 << bits, the number of buckets */ + u32int size; /* bytes in each of the buckets chunks */ + u32int usable; /* amount usable for IEntry data */ + u8int *buf; /* buffer for all chunks */ + IEBuck *bucks; +}; + +#define U32GET(p) (((p)[0]<<24)|((p)[1]<<16)|((p)[2]<<8)|(p)[3]) +#define U32PUT(p,v) (p)[0]=(v)>>24;(p)[1]=(v)>>16;(p)[2]=(v)>>8;(p)[3]=(v) + +static IEBucks *initiebucks(Part *part, int bits, u32int size); +static int flushiebuck(IEBucks *ib, int b, int reset); +static int flushiebucks(IEBucks *ib); +static u32int sortiebuck(IEBucks *ib, int b); +static u64int sortiebucks(IEBucks *ib); +static int sprayientry(IEBucks *ib, IEntry *ie); +static u32int readarenainfo(IEBucks *ib, Arena *arena, u64int a); +static u32int readiebuck(IEBucks *ib, int b); +static void freeiebucks(IEBucks *ib); + +/* + * build a sorted file with all Ientries which should be in ix. + * assumes the arenas' directories are up to date. + * reads each, converts the entries to index entries, + * and sorts them. + */ +u64int +sortrawientries(Index *ix, Part *tmp, u64int *base) +{ + IEBucks *ib; + u64int clumps, sorted; + u32int n; + int i, ok; + +//ZZZ should allow configuration of bits, bucket size + ib = initiebucks(tmp, 8, 64*1024); + if(ib == nil){ + seterr(EOk, "can't create sorting buckets: %r"); + return TWID64; + } + ok = 0; + clumps = 0; + for(i = 0; i < ix->narenas; i++){ + n = readarenainfo(ib, ix->arenas[i], ix->amap[i].start); + if(n == TWID32){ + ok = -1; + break; + } + clumps += n; + } +fprint(2, "got %lld clumps - starting sort\n", clumps); + if(ok){ + sorted = sortiebucks(ib); + *base = (u64int)ib->chunks * ib->size; + if(sorted != clumps){ + fprint(2, "sorting messed up: clumps=%lld sorted=%lld\n", clumps, sorted); + ok = -1; + } + } + freeiebucks(ib); + if(ok < 0) + return TWID64; + return clumps; +} + +/* + * read in all of the arena's clump directory, + * convert to IEntry format, and bucket sort based + * on the first few bits. + */ +static u32int +readarenainfo(IEBucks *ib, Arena *arena, u64int a) +{ + IEntry ie; + ClumpInfo *ci, *cis; + u32int clump; + int i, n, ok; + +//ZZZ remove fprint? + if(arena->clumps) + fprint(2, "reading directory for arena=%s with %d entries\n", arena->name, arena->clumps); + + cis = MKN(ClumpInfo, ClumpChunks); + ok = 0; + memset(&ie, 0, sizeof(IEntry)); + for(clump = 0; clump < arena->clumps; clump += n){ + n = ClumpChunks; + if(n > arena->clumps - clump) + n = arena->clumps - clump; + if(readclumpinfos(arena, clump, cis, n) != n){ + seterr(EOk, "arena directory read failed: %r"); + ok = -1; + break; + } + + for(i = 0; i < n; i++){ + ci = &cis[i]; + ie.ia.type = ci->type; + ie.ia.size = ci->uncsize; + ie.ia.addr = a; + a += ci->size + ClumpSize; + ie.ia.blocks = (ci->size + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog; + scorecp(ie.score, ci->score); + sprayientry(ib, &ie); + } + } + free(cis); + if(ok < 0) + return TWID32; + return clump; +} + +/* + * initialize the external bucket sorting data structures + */ +static IEBucks* +initiebucks(Part *part, int bits, u32int size) +{ + IEBucks *ib; + int i; + + ib = MKZ(IEBucks); + if(ib == nil){ + seterr(EOk, "out of memory"); + return nil; + } + ib->bits = bits; + ib->nbucks = 1 << bits; + ib->size = size; + ib->usable = (size - U32Size) / IEntrySize * IEntrySize; + ib->bucks = MKNZ(IEBuck, ib->nbucks); + if(ib->bucks == nil){ + seterr(EOk, "out of memory allocation sorting buckets"); + freeiebucks(ib); + return nil; + } + ib->buf = MKN(u8int, size * (1 << bits)); + if(ib->buf == nil){ + seterr(EOk, "out of memory allocating sorting buckets' buffers"); + freeiebucks(ib); + return nil; + } + for(i = 0; i < ib->nbucks; i++){ + ib->bucks[i].head = TWID32; + ib->bucks[i].buf = &ib->buf[i * size]; + } + ib->part = part; + return ib; +} + +static void +freeiebucks(IEBucks *ib) +{ + if(ib == nil) + return; + free(ib->bucks); + free(ib->buf); + free(ib); +} + +/* + * initial sort: put the entry into the correct bucket + */ +static int +sprayientry(IEBucks *ib, IEntry *ie) +{ + u32int n; + int b; + + b = hashbits(ie->score, ib->bits); + n = ib->bucks[b].used; + packientry(ie, &ib->bucks[b].buf[n]); + n += IEntrySize; + ib->bucks[b].used = n; + if(n + IEntrySize <= ib->usable) + return 0; + return flushiebuck(ib, b, 1); +} + +/* + * finish sorting: + * for each bucket, read it in and sort it + * write out the the final file + */ +static u64int +sortiebucks(IEBucks *ib) +{ + u64int tot; + u32int n; + int i; + + if(flushiebucks(ib) < 0) + return TWID64; + for(i = 0; i < ib->nbucks; i++) + ib->bucks[i].buf = nil; + ib->off = (u64int)ib->chunks * ib->size; + free(ib->buf); +fprint(2, "ib->max = %lld\n", ib->max); +fprint(2, "ib->chunks = %ud\n", ib->chunks); + ib->buf = MKN(u8int, ib->max + U32Size); + if(ib->buf == nil){ + seterr(EOk, "out of memory allocating final sorting buffer; try more buckets"); + return TWID64; + } + tot = 0; + for(i = 0; i < ib->nbucks; i++){ + n = sortiebuck(ib, i); + if(n == TWID32) + return TWID64; + tot += n; + } + return tot; + return 0; +} + +/* + * sort from bucket b of ib into the output file to + */ +static u32int +sortiebuck(IEBucks *ib, int b) +{ + u32int n; + + n = readiebuck(ib, b); + if(n == TWID32) + return TWID32; + qsort(ib->buf, n, IEntrySize, ientrycmp); + if(writepart(ib->part, ib->off, ib->buf, n * IEntrySize) < 0){ + seterr(EOk, "can't write sorted bucket: %r"); + return TWID32; + } + ib->off += n * IEntrySize; + return n; +} + +/* + * write out a single bucket + */ +static int +flushiebuck(IEBucks *ib, int b, int reset) +{ + u32int n; + + if(ib->bucks[b].used == 0) + return 0; + n = ib->bucks[b].used; + U32PUT(&ib->bucks[b].buf[n], ib->bucks[b].head); + n += U32Size; + if(writepart(ib->part, (u64int)ib->chunks * ib->size, ib->bucks[b].buf, n) < 0){ + seterr(EOk, "can't write sorting bucket to file: %r"); + return -1; + } + ib->bucks[b].head = ib->chunks++; + ib->bucks[b].total += ib->bucks[b].used; + if(reset) + ib->bucks[b].used = 0; + return 0; +} + +/* + * write out all of the buckets, and compute + * the maximum size of any bucket + */ +static int +flushiebucks(IEBucks *ib) +{ + int i; + + for(i = 0; i < ib->nbucks; i++){ + if(flushiebuck(ib, i, 0) < 0) + return -1; + if(ib->bucks[i].total > ib->max) + ib->max = ib->bucks[i].total; + } + return 0; +} + +/* + * read in the chained buffers for bucket b, + * and return it's total number of IEntries + */ +static u32int +readiebuck(IEBucks *ib, int b) +{ + u32int head, n, m; + + head = ib->bucks[b].head; + n = 0; + m = ib->bucks[b].used; + if(m == 0) + m = ib->usable; +fprint(2, "%d total = %lld\n", b, ib->bucks[b].total); + while(head != TWID32){ + if(readpart(ib->part, (u64int)head * ib->size, &ib->buf[n], m + U32Size) < 0){ +fprint(2, "n = %ud\n", n); + seterr(EOk, "can't read index sort bucket: %r"); + return TWID32; + } + n += m; + head = U32GET(&ib->buf[n]); + m = ib->usable; + } +fprint(2, "n = %ud\n", n); + return n / IEntrySize; +} diff --git a/src/cmd/venti/stats.c b/src/cmd/venti/stats.c new file mode 100644 index 00000000..edf79bd7 --- /dev/null +++ b/src/cmd/venti/stats.c @@ -0,0 +1,64 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +Stats stats; + +void +statsinit(void) +{ +} + +static int +percent(long v, long total) +{ + if(total == 0) + total = 1; + if(v < 1000*1000) + return (v * 100) / total; + total /= 100; + if(total == 0) + total = 1; + return v / total; +} + +void +printstats(void) +{ + fprint(2, "lump writes=%,ld\n", stats.lumpwrites); + fprint(2, "lump reads=%,ld\n", stats.lumpreads); + fprint(2, "lump cache read hits=%,ld\n", stats.lumphit); + fprint(2, "lump cache read misses=%,ld\n", stats.lumpmiss); + + fprint(2, "clump disk writes=%,ld\n", stats.clumpwrites); + fprint(2, "clump disk bytes written=%,lld\n", stats.clumpbwrites); + fprint(2, "clump disk bytes compressed=%,lld\n", stats.clumpbcomp); + fprint(2, "clump disk reads=%,ld\n", stats.clumpreads); + fprint(2, "clump disk bytes read=%,lld\n", stats.clumpbreads); + fprint(2, "clump disk bytes uncompressed=%,lld\n", stats.clumpbuncomp); + + fprint(2, "clump directory disk writes=%,ld\n", stats.ciwrites); + fprint(2, "clump directory disk reads=%,ld\n", stats.cireads); + + fprint(2, "index disk writes=%,ld\n", stats.indexwrites); + fprint(2, "index disk reads=%,ld\n", stats.indexreads); + fprint(2, "index disk reads for modify=%,ld\n", stats.indexwreads); + fprint(2, "index disk reads for allocation=%,ld\n", stats.indexareads); + + fprint(2, "index cache lookups=%,ld\n", stats.iclookups); + fprint(2, "index cache hits=%,ld %d%%\n", stats.ichits, + percent(stats.ichits, stats.iclookups)); + fprint(2, "index cache fills=%,ld %d%%\n", stats.icfills, + percent(stats.icfills, stats.iclookups)); + fprint(2, "index cache inserts=%,ld\n", stats.icinserts); + + fprint(2, "disk cache hits=%,ld\n", stats.pchit); + fprint(2, "disk cache misses=%,ld\n", stats.pcmiss); + fprint(2, "disk cache reads=%,ld\n", stats.pcreads); + fprint(2, "disk cache bytes read=%,lld\n", stats.pcbreads); + + fprint(2, "disk writes=%,ld\n", stats.diskwrites); + fprint(2, "disk bytes written=%,lld\n", stats.diskbwrites); + fprint(2, "disk reads=%,ld\n", stats.diskreads); + fprint(2, "disk bytes read=%,lld\n", stats.diskbreads); +} diff --git a/src/cmd/venti/stdinc.h b/src/cmd/venti/stdinc.h new file mode 100644 index 00000000..6f8f5ba5 --- /dev/null +++ b/src/cmd/venti/stdinc.h @@ -0,0 +1,6 @@ +#include <u.h> +#include <libc.h> +#include <venti.h> +#include <libsec.h> +#include <thread.h> + diff --git a/src/cmd/venti/sync.c b/src/cmd/venti/sync.c new file mode 100644 index 00000000..74461400 --- /dev/null +++ b/src/cmd/venti/sync.c @@ -0,0 +1,48 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +char *host; + +void +usage(void) +{ + fprint(2, "usage: sync [-h host]\n"); + threadexitsall("usage"); +} + +void +threadmain(int argc, char *argv[]) +{ + VtConn *z; + + ARGBEGIN{ + case 'h': + host = EARGF(usage()); + if(host == nil) + usage(); + break; + default: + usage(); + break; + }ARGEND + + if(argc != 0) + usage(); + + + fmtinstall('V', vtscorefmt); + + z = vtdial(host); + if(z == nil) + sysfatal("could not connect to server: %r"); + + if(vtconnect(z) < 0) + sysfatal("vtconnect: %r"); + + if(vtsync(z) < 0) + sysfatal("vtsync: %r"); + + vthangup(z); + threadexitsall(0); +} diff --git a/src/cmd/venti/syncarena.c b/src/cmd/venti/syncarena.c new file mode 100644 index 00000000..78a6bd0f --- /dev/null +++ b/src/cmd/venti/syncarena.c @@ -0,0 +1,164 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +static int writeclumphead(Arena *arena, u64int aa, Clump *cl); +static int writeclumpmagic(Arena *arena, u64int aa, u32int magic); + +int +clumpinfocmp(ClumpInfo *c, ClumpInfo *d) +{ + return c->type != d->type + || c->size != d->size + || c->uncsize != d->uncsize + || scorecmp(c->score, d->score)!=0; +} + +/* + * synchronize the clump info directory with + * with the clumps actually stored in the arena. + * the directory should be at least as up to date + * as the arena's trailer. + * + * checks/updates at most n clumps. + * + * returns 0 if ok, flags if error occurred + */ +int +syncarena(Arena *arena, u32int n, int zok, int fix) +{ + ZBlock *lump; + Clump cl; + ClumpInfo ci; + static ClumpInfo zci = { .type = -1 }; + u8int score[VtScoreSize]; + u64int uncsize, used, aa; + u32int clump, clumps, cclumps, magic; + int err, flush, broken; + + used = arena->used; + clumps = arena->clumps; + cclumps = arena->cclumps; + uncsize = arena->uncsize; + + flush = 0; + err = 0; + for(; n; n--){ + aa = arena->used; + clump = arena->clumps; + magic = clumpmagic(arena, aa); + if(magic == ClumpFreeMagic) + break; + if(magic != ClumpMagic){ + fprint(2, "illegal clump magic number=%#8.8ux at clump=%d\n", magic, clump); + err |= SyncDataErr; +//ZZZ write a zero here? + if(0 && fix && writeclumpmagic(arena, aa, ClumpFreeMagic) < 0){ + fprint(2, "can't write corrected clump free magic: %r"); + err |= SyncFixErr; + } + break; + } + arena->clumps++; + + broken = 0; + lump = loadclump(arena, aa, 0, &cl, score, 0); + if(lump == nil){ + fprint(2, "clump=%d failed to read correctly: %r\n", clump); + err |= SyncDataErr; + }else if(cl.info.type != VtTypeCorrupt){ + scoremem(score, lump->data, cl.info.uncsize); + if(scorecmp(cl.info.score, score) != 0){ + fprint(2, "clump=%d has mismatched score\n", clump); + err |= SyncDataErr; + broken = 1; + }else if(vttypevalid(cl.info.type) < 0){ + fprint(2, "clump=%d has invalid type %d", clump, cl.info.type); + err |= SyncDataErr; + broken = 1; + } + if(broken && fix){ + cl.info.type = VtTypeCorrupt; + if(writeclumphead(arena, aa, &cl) < 0){ + fprint(2, "can't write corrected clump header: %r"); + err |= SyncFixErr; + } + } + } + freezblock(lump); + arena->used += ClumpSize + cl.info.size; + + if(!broken && readclumpinfo(arena, clump, &ci)<0){ + fprint(2, "arena directory read failed\n"); + broken = 1; + }else if(!broken && clumpinfocmp(&ci, &cl.info)!=0){ + if(clumpinfocmp(&ci, &zci) == 0){ + err |= SyncCIZero; + if(!zok) + fprint(2, "unwritten clump info for clump=%d\n", clump); + }else{ + err |= SyncCIErr; + fprint(2, "bad clump info for clump=%d\n", clump); + fprint(2, "\texpected score=%V type=%d size=%d uncsize=%d\n", + cl.info.score, cl.info.type, cl.info.size, cl.info.uncsize); + fprint(2, "\tfound score=%V type=%d size=%d uncsize=%d\n", + ci.score, ci.type, ci.size, ci.uncsize); + } + broken = 1; + } + if(broken && fix){ + flush = 1; + ci = cl.info; + if(writeclumpinfo(arena, clump, &ci) < 0){ + fprint(2, "can't write correct clump directory: %r\n"); + err |= SyncFixErr; + } + } + + arena->uncsize += cl.info.uncsize; + if(cl.info.size < cl.info.uncsize) + arena->cclumps++; + } + + if(flush){ + arena->wtime = now(); + if(arena->ctime == 0 && arena->clumps) + arena->ctime = arena->wtime; + if(flushciblocks(arena) < 0){ + fprint(2, "can't flush arena directory cache: %r"); + err |= SyncFixErr; + } + } + + if(used != arena->used + || clumps != arena->clumps + || cclumps != arena->cclumps + || uncsize != arena->uncsize) + err |= SyncHeader; + + return err; +} + +static int +writeclumphead(Arena *arena, u64int aa, Clump *cl) +{ + ZBlock *zb; + int bad; + + zb = alloczblock(ClumpSize, 0); + if(zb == nil) + return -1; + bad = packclump(cl, zb->data)<0 + || writearena(arena, aa, zb->data, ClumpSize) != ClumpSize; + freezblock(zb); + return bad ? -1 : 0; +} + +static int +writeclumpmagic(Arena *arena, u64int aa, u32int magic) +{ + u8int buf[U32Size]; + + packmagic(magic, buf); + return writearena(arena, aa, buf, U32Size) == U32Size; +} diff --git a/src/cmd/venti/syncindex.c b/src/cmd/venti/syncindex.c new file mode 100644 index 00000000..75bb0696 --- /dev/null +++ b/src/cmd/venti/syncindex.c @@ -0,0 +1,56 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +static int verbose; +void +usage(void) +{ + fprint(2, "usage: syncindex [-fv] [-B blockcachesize] config\n"); + threadexitsall("usage"); +} + +void +threadmain(int argc, char *argv[]) +{ + u32int bcmem; + int fix; + + fix = 0; + bcmem = 0; + ARGBEGIN{ + case 'B': + bcmem = unittoull(ARGF()); + break; + case 'f': + fix++; + break; + case 'v': + verbose++; + break; + default: + usage(); + break; + }ARGEND + + if(!fix) + readonly = 1; + + if(argc != 1) + usage(); + + if(initventi(argv[0]) < 0) + sysfatal("can't init venti: %r"); + + if(bcmem < maxblocksize * (mainindex->narenas + mainindex->nsects * 4 + 16)) + bcmem = maxblocksize * (mainindex->narenas + mainindex->nsects * 4 + 16); + fprint(2, "initialize %d bytes of disk block cache\n", bcmem); + initdcache(bcmem); + + if(verbose) + printindex(2, mainindex); + if(syncindex(mainindex, fix) < 0) + sysfatal("failed to sync index=%s: %r\n", mainindex->name); + + threadexitsall(0); +} diff --git a/src/cmd/venti/syncindex0.c b/src/cmd/venti/syncindex0.c new file mode 100644 index 00000000..95db1208 --- /dev/null +++ b/src/cmd/venti/syncindex0.c @@ -0,0 +1,144 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +enum +{ + ClumpChunks = 32*1024 +}; + +/* + * shell sort is plenty good enough + * because we're going to do a bunch of disk i/o's + */ +static void +sortclumpinfo(ClumpInfo *ci, int *s, int n) +{ + int i, j, m, t; + + for(m = (n + 3) / 5; m > 0; m = (m + 1) / 3){ + for(i = n - m; i-- > 0;){ + for(j = i + m; j < n; j += m){ + if(memcmp(ci[s[j - m]].score, ci[s[j]].score, VtScoreSize) <= 0) + break; + t = s[j]; + s[j] = s[j - m]; + s[j - m] = t; + } + } + } +} + +int +syncarenaindex(Index *ix, Arena *arena, u32int clump, u64int a, int fix) +{ + Packet *pack; + IEntry ie; + IAddr ia; + ClumpInfo *ci, *cis; + u32int now; + u64int *addrs; + int i, n, ok, *s; + + now = time(nil); + cis = MKN(ClumpInfo, ClumpChunks); + addrs = MKN(u64int, ClumpChunks); + s = MKN(int, ClumpChunks); + ok = 0; + for(; clump < arena->clumps; clump += n){ + n = ClumpChunks; + if(n > arena->clumps - clump) + n = arena->clumps - clump; + n = readclumpinfos(arena, clump, cis, n); + if(n <= 0){ + fprint(2, "arena directory read failed\n"); + ok = -1; + break; + } + + for(i = 0; i < n; i++){ + addrs[i] = a; + a += cis[i].size + ClumpSize; + s[i] = i; + } + + sortclumpinfo(cis, s, n); + + for(i = 0; i < n; i++){ + ci = &cis[s[i]]; + ia.type = ci->type; + ia.size = ci->uncsize; + ia.addr = addrs[s[i]]; + ia.blocks = (ci->size + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog; + + if(loadientry(ix, ci->score, ci->type, &ie) < 0) + fprint(2, "missing block type=%d score=%V\n", ci->type, ci->score); + else if(iaddrcmp(&ia, &ie.ia) != 0){ + fprint(2, "\nmismatched index entry and clump at %d\n", clump + i); + fprint(2, "\tclump: type=%d size=%d blocks=%d addr=%lld\n", ia.type, ia.size, ia.blocks, ia.addr); + fprint(2, "\tindex: type=%d size=%d block=%d addr=%lld\n", ie.ia.type, ie.ia.size, ie.ia.blocks, ie.ia.addr); + pack = readlump(ie.score, ie.ia.type, ie.ia.size); + packetfree(pack); + if(pack != nil){ + fprint(2, "duplicated lump\n"); + continue; + } + }else + continue; + if(!fix){ + ok = -1; + continue; + } + ie.ia = ia; + scorecp(ie.score, ci->score); + ie.train = 0; + ie.wtime = now; + if(storeientry(ix, &ie) < 0){ + fprint(2, "can't fix index: %r"); + ok = -1; + } + } + + if(0 && clump / 1000 != (clump + n) / 1000) + fprint(2, "."); + } + free(cis); + free(addrs); + free(s); + return ok; +} + +int +syncindex(Index *ix, int fix) +{ + Arena *arena; + u64int a; + u32int clump; + int i, e, e1, ok, ok1; + + ok = 0; + for(i = 0; i < ix->narenas; i++){ + arena = ix->arenas[i]; + clump = arena->clumps; + a = arena->used; + e = syncarena(arena, TWID32, fix, fix); + e1 = e; + if(fix) + e1 &= ~(SyncHeader|SyncCIZero); + if(e1 == SyncHeader) + fprint(2, "arena %s: header is out-of-date\n", arena->name); + if(e1) + ok = -1; + else{ + ok1 = syncarenaindex(ix, arena, clump, a + ix->amap[i].start, fix); + if(fix && ok1==0 && (e & SyncHeader) && wbarena(arena) < 0) + fprint(2, "arena=%s header write failed: %r\n", arena->name); + ok |= ok1; + } + } + if(fix && wbindex(ix) < 0){ + fprint(2, "can't write back index header for %s: %r\n", ix->name); + return -1; + } + return ok; +} diff --git a/src/cmd/venti/unittoull.c b/src/cmd/venti/unittoull.c new file mode 100644 index 00000000..db35aa0f --- /dev/null +++ b/src/cmd/venti/unittoull.c @@ -0,0 +1,27 @@ +#include "stdinc.h" + +#define TWID64 ((u64int)~(u64int)0) + +u64int +unittoull(char *s) +{ + char *es; + u64int n; + + if(s == nil) + return TWID64; + n = strtoul(s, &es, 0); + if(*es == 'k' || *es == 'K'){ + n *= 1024; + es++; + }else if(*es == 'm' || *es == 'M'){ + n *= 1024*1024; + es++; + }else if(*es == 'g' || *es == 'G'){ + n *= 1024*1024*1024; + es++; + } + if(*es != '\0') + return TWID64; + return n; +} diff --git a/src/cmd/venti/unwhack.c b/src/cmd/venti/unwhack.c new file mode 100644 index 00000000..5530bd07 --- /dev/null +++ b/src/cmd/venti/unwhack.c @@ -0,0 +1,179 @@ +#include "stdinc.h" +#include "whack.h" + +enum +{ + DMaxFastLen = 7, + DBigLenCode = 0x3c, /* minimum code for large lenth encoding */ + DBigLenBits = 6, + DBigLenBase = 1 /* starting items to encode for big lens */ +}; + +static uchar lenval[1 << (DBigLenBits - 1)] = +{ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 3, 3, 3, 3, 3, 3, 3, 3, + 4, 4, 4, 4, + 5, + 6, + 255, + 255 +}; + +static uchar lenbits[] = +{ + 0, 0, 0, + 2, 3, 5, 5, +}; + +static uchar offbits[16] = +{ + 5, 5, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10, 12, 13 +}; + +static ushort offbase[16] = +{ + 0, 0x20, + 0x40, 0x60, + 0x80, 0xc0, + 0x100, 0x180, + 0x200, 0x300, + 0x400, 0x600, + 0x800, 0xc00, + 0x1000, + 0x2000 +}; + +void +unwhackinit(Unwhack *uw) +{ + uw->err[0] = '\0'; +} + +int +unwhack(Unwhack *uw, uchar *dst, int ndst, uchar *src, int nsrc) +{ + uchar *s, *d, *dmax, *smax, lit; + ulong uwbits, lithist; + int i, off, len, bits, use, code, uwnbits, overbits; + + d = dst; + dmax = d + ndst; + + smax = src + nsrc; + uwnbits = 0; + uwbits = 0; + overbits = 0; + lithist = ~0; + while(src < smax || uwnbits - overbits >= MinDecode){ + while(uwnbits <= 24){ + uwbits <<= 8; + if(src < smax) + uwbits |= *src++; + else + overbits += 8; + uwnbits += 8; + } + + /* + * literal + */ + len = lenval[(uwbits >> (uwnbits - 5)) & 0x1f]; + if(len == 0){ + if(lithist & 0xf){ + uwnbits -= 9; + lit = (uwbits >> uwnbits) & 0xff; + lit &= 255; + }else{ + uwnbits -= 8; + lit = (uwbits >> uwnbits) & 0x7f; + if(lit < 32){ + if(lit < 24){ + uwnbits -= 2; + lit = (lit << 2) | ((uwbits >> uwnbits) & 3); + }else{ + uwnbits -= 3; + lit = (lit << 3) | ((uwbits >> uwnbits) & 7); + } + lit = (lit - 64) & 0xff; + } + } + if(d >= dmax){ + snprint(uw->err, WhackErrLen, "too much output"); + return -1; + } + *d++ = lit; + lithist = (lithist << 1) | (lit < 32) | (lit > 127); + continue; + } + + /* + * length + */ + if(len < 255) + uwnbits -= lenbits[len]; + else{ + uwnbits -= DBigLenBits; + code = ((uwbits >> uwnbits) & ((1 << DBigLenBits) - 1)) - DBigLenCode; + len = DMaxFastLen; + use = DBigLenBase; + bits = (DBigLenBits & 1) ^ 1; + while(code >= use){ + len += use; + code -= use; + code <<= 1; + uwnbits--; + if(uwnbits < 0){ + snprint(uw->err, WhackErrLen, "len out of range"); + return -1; + } + code |= (uwbits >> uwnbits) & 1; + use <<= bits; + bits ^= 1; + } + len += code; + + while(uwnbits <= 24){ + uwbits <<= 8; + if(src < smax) + uwbits |= *src++; + else + overbits += 8; + uwnbits += 8; + } + } + + /* + * offset + */ + uwnbits -= 4; + bits = (uwbits >> uwnbits) & 0xf; + off = offbase[bits]; + bits = offbits[bits]; + + uwnbits -= bits; + off |= (uwbits >> uwnbits) & ((1 << bits) - 1); + off++; + + if(off > d - dst){ + snprint(uw->err, WhackErrLen, "offset out of range: off=%d d=%ld len=%d nbits=%d", off, d - dst, len, uwnbits); + return -1; + } + if(d + len > dmax){ + snprint(uw->err, WhackErrLen, "len out of range"); + return -1; + } + s = d - off; + for(i = 0; i < len; i++) + d[i] = s[i]; + d += len; + } + if(uwnbits < overbits){ + snprint(uw->err, WhackErrLen, "compressed data overrun"); + return -1; + } + + len = d - dst; + + return len; +} diff --git a/src/cmd/venti/utils.c b/src/cmd/venti/utils.c new file mode 100644 index 00000000..26a0ef82 --- /dev/null +++ b/src/cmd/venti/utils.c @@ -0,0 +1,361 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +int +namecmp(char *s, char *t) +{ + return strncmp(s, t, ANameSize); +} + +void +namecp(char *dst, char *src) +{ + strncpy(dst, src, ANameSize - 1); + dst[ANameSize - 1] = '\0'; +} + +int +nameok(char *name) +{ + char *t; + int c; + + if(name == nil) + return -1; + for(t = name; c = *t; t++) + if(t - name >= ANameSize + || c < ' ' || c >= 0x7f) + return -1; + return 0; +} + +int +stru32int(char *s, u32int *r) +{ + char *t; + u32int n, nn, m; + int c; + + m = TWID32 / 10; + n = 0; + for(t = s; ; t++){ + c = *t; + if(c < '0' || c > '9') + break; + if(n > m) + return -1; + nn = n * 10 + c - '0'; + if(nn < n) + return -1; + n = nn; + } + *r = n; + return s != t && *t == '\0'; +} + +int +stru64int(char *s, u64int *r) +{ + char *t; + u64int n, nn, m; + int c; + + m = TWID64 / 10; + n = 0; + for(t = s; ; t++){ + c = *t; + if(c < '0' || c > '9') + break; + if(n > m) + return -1; + nn = n * 10 + c - '0'; + if(nn < n) + return -1; + n = nn; + } + *r = n; + return s != t && *t == '\0'; +} + +int +vttypevalid(int type) +{ + return type < VtMaxType; +} + +void +fmtzbinit(Fmt *f, ZBlock *b) +{ + f->runes = 0; + f->start = b->data; + f->to = f->start; + f->stop = (char*)f->start + b->len; + f->flush = nil; + f->farg = nil; + f->nfmt = 0; + f->args = nil; +} + +static int +sflush(Fmt *f) +{ + char *s; + int n; + + n = (int)f->farg; + n += 256; + f->farg = (void*)n; + s = f->start; + f->start = realloc(s, n); + if(f->start == nil){ + f->start = s; + return 0; + } + f->to = (char*)f->start + ((char*)f->to - s); + f->stop = (char*)f->start + n - 1; + return 1; +} + +static char* +logit(int severity, char *fmt, va_list args) +{ + Fmt f; + int n; + + f.runes = 0; + n = 32; + f.start = malloc(n); + if(f.start == nil) + return nil; + f.to = f.start; + f.stop = (char*)f.start + n - 1; + f.flush = sflush; + f.farg = (void*)n; + f.nfmt = 0; + f.args = args; + n = dofmt(&f, fmt); + if(n < 0) +{ +fprint(2, "dofmt %s failed\n", fmt); + return nil; +} + *(char*)f.to = '\0'; + + if(argv0 == nil) + fprint(2, "%s: err %d: %s\n", argv0, severity, f.start); + else + fprint(2, "err %d: %s\n", severity, f.start); + return f.start; +} + +void +seterr(int severity, char *fmt, ...) +{ + char *s; + va_list args; + + va_start(args, fmt); + s = logit(severity, fmt, args); + va_end(args); + if(s == nil) + werrstr("error setting error"); + else{ + werrstr("%s", s); + free(s); + } +} + +void +logerr(int severity, char *fmt, ...) +{ + char *s; + va_list args; + + va_start(args, fmt); + s = logit(severity, fmt, args); + va_end(args); + free(s); +} + +u32int +now(void) +{ + return time(nil); +} + +void +fatal(char *fmt, ...) +{ + Fmt f; + char buf[256]; + + f.runes = 0; + f.start = buf; + f.to = buf; + f.stop = buf + sizeof(buf); + f.flush = fmtfdflush; + f.farg = (void*)2; + f.nfmt = 0; + fmtprint(&f, "fatal %s error:", argv0); + va_start(f.args, fmt); + dofmt(&f, fmt); + va_end(f.args); + fmtprint(&f, "\n"); + fmtfdflush(&f); + if(0) + abort(); + threadexitsall(buf); +} + +ZBlock * +alloczblock(u32int size, int zeroed) +{ + ZBlock *b; + static ZBlock z; + + b = malloc(sizeof(ZBlock) + size); + if(b == nil){ + seterr(EOk, "out of memory"); + return nil; + } + + *b = z; + b->data = (u8int*)&b[1]; + b->len = size; + if(zeroed) + memset(b->data, 0, size); + return b; +} + +void +freezblock(ZBlock *b) +{ + free(b); +} + +ZBlock* +packet2zblock(Packet *p, u32int size) +{ + ZBlock *b; + + if(p == nil) + return nil; + b = alloczblock(size, 0); + if(b == nil) + return nil; + b->len = size; + if(packetcopy(p, b->data, 0, size) < 0){ + freezblock(b); + return nil; + } + return b; +} + +Packet* +zblock2packet(ZBlock *zb, u32int size) +{ + Packet *p; + + if(zb == nil) + return nil; + p = packetalloc(); + packetappend(p, zb->data, size); + return p; +} + +void * +emalloc(ulong n) +{ + void *p; + + p = malloc(n); + if(p == nil) + sysfatal("out of memory"); + memset(p, 0xa5, n); +if(0)print("emalloc %p-%p by %lux\n", p, (char*)p+n, getcallerpc(&n)); + return p; +} + +void * +ezmalloc(ulong n) +{ + void *p; + + p = malloc(n); + if(p == nil) + sysfatal("out of memory"); + memset(p, 0, n); +if(0)print("ezmalloc %p-%p by %lux\n", p, (char*)p+n, getcallerpc(&n)); + return p; +} + +void * +erealloc(void *p, ulong n) +{ + p = realloc(p, n); + if(p == nil) + sysfatal("out of memory"); +if(0)print("erealloc %p-%p by %lux\n", p, (char*)p+n, getcallerpc(&p)); + return p; +} + +char * +estrdup(char *s) +{ + char *t; + int n; + + n = strlen(s) + 1; + t = emalloc(n); + memmove(t, s, n); +if(0)print("estrdup %p-%p by %lux\n", t, (char*)t+n, getcallerpc(&s)); + return t; +} + +ZBlock* +readfile(char *name) +{ + Part *p; + ZBlock *b; + + p = initpart(name, 1); + if(p == nil) + return nil; + b = alloczblock(p->size, 0); + if(b == nil){ + seterr(EOk, "can't alloc %s: %r", name); + freepart(p); + return nil; + } + if(readpart(p, 0, b->data, p->size) < 0){ + seterr(EOk, "can't read %s: %r", name); + freepart(p); + freezblock(b); + return nil; + } + freepart(p); + return b; +} + +/* + * return floor(log2(v)) + */ +int +u64log2(u64int v) +{ + int i; + + for(i = 0; i < 64; i++) + if((v >> i) <= 1) + break; + return i; +} + +int +vtproc(void (*fn)(void*), void *arg) +{ + proccreate(fn, arg, 256*1024); + return 0; +} + diff --git a/src/cmd/venti/venti.c b/src/cmd/venti/venti.c new file mode 100644 index 00000000..6423f7fa --- /dev/null +++ b/src/cmd/venti/venti.c @@ -0,0 +1,172 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +#include "whack.h" + +int debug; +int mainstacksize = 256*1024; + +static void ventiserver(char *vaddr); + +void +threadmain(int argc, char *argv[]) +{ + char *config, *haddr, *vaddr; + u32int mem, icmem, bcmem, minbcmem; + + vaddr = "tcp!*!venti"; + haddr = nil; + config = nil; + mem = 0xffffffffUL; + icmem = 0; + bcmem = 0; + ARGBEGIN{ + case 'a': + vaddr = ARGF(); + if(vaddr == nil) + goto usage; + break; + case 'B': + bcmem = unittoull(ARGF()); + break; + case 'c': + config = ARGF(); + if(config == nil) + goto usage; + break; + case 'C': + mem = unittoull(ARGF()); + break; + case 'd': + debug = 1; + break; + case 'h': + haddr = ARGF(); + if(haddr == nil) + goto usage; + break; + case 'I': + icmem = unittoull(ARGF()); + break; + case 'w': + queuewrites = 1; + break; + default: + goto usage; + }ARGEND + +print("whack %d\n", sizeof(Whack)); + + if(argc){ + usage: + fprint(2, "usage: venti [-dw] [-a ventiaddress] [-h httpaddress] [-c config] [-C cachesize] [-I icachesize] [-B blockcachesize]\n"); + threadexitsall("usage"); + } + + fmtinstall('V', vtscorefmt); + fmtinstall('H', encodefmt); + fmtinstall('F', vtfcallfmt); + + if(config == nil) + config = "venti.conf"; + + + if(initarenasum() < 0) + fprint(2, "warning: can't initialize arena summing process: %r"); + + if(initventi(config) < 0) + sysfatal("can't init server: %r"); + + if(mem == 0xffffffffUL) + mem = 1 * 1024 * 1024; + fprint(2, "initialize %d bytes of lump cache for %d lumps\n", + mem, mem / (8 * 1024)); + initlumpcache(mem, mem / (8 * 1024)); + + icmem = u64log2(icmem / (sizeof(IEntry)+sizeof(IEntry*)) / ICacheDepth); + if(icmem < 4) + icmem = 4; + fprint(2, "initialize %d bytes of index cache for %d index entries\n", + (sizeof(IEntry)+sizeof(IEntry*)) * (1 << icmem) * ICacheDepth, + (1 << icmem) * ICacheDepth); + initicache(icmem, ICacheDepth); + + /* + * need a block for every arena and every process + */ + minbcmem = maxblocksize * + (mainindex->narenas + mainindex->nsects*4 + 16); + if(bcmem < minbcmem) + bcmem = minbcmem; + + fprint(2, "initialize %d bytes of disk block cache\n", bcmem); + initdcache(bcmem); + + fprint(2, "sync arenas and index...\n"); + if(syncindex(mainindex, 1) < 0) + sysfatal("can't sync server: %r"); + + if(queuewrites){ + fprint(2, "initialize write queue...\n"); + if(initlumpqueues(mainindex->nsects) < 0){ + fprint(2, "can't initialize lump queues," + " disabling write queueing: %r"); + queuewrites = 0; + } + } + + if(haddr){ + fprint(2, "starting http server at %s\n", haddr); + if(httpdinit(haddr) < 0) + fprint(2, "warning: can't start http server: %r"); + } + + ventiserver(vaddr); + threadexitsall(0); +} + +static void +vtrerror(VtReq *r, char *error) +{ + r->rx.type = VtRerror; + r->rx.error = estrdup(error); +} + +static void +ventiserver(char *addr) +{ + Packet *p; + VtReq *r; + VtSrv *s; + + s = vtlisten(addr); + if(s == nil) + sysfatal("can't announce %s: %r", addr); + + while((r = vtgetreq(s)) != nil){ + r->rx.type = r->tx.type+1; + print("req (arenas[0]=%p sects[0]=%p) %F\n", + mainindex->arenas[0], mainindex->sects[0], &r->tx); + switch(r->tx.type){ + default: + vtrerror(r, "unknown request"); + break; + case VtTread: + if((r->rx.data = readlump(r->tx.score, r->tx.dtype, r->tx.count)) == nil) + vtrerror(r, gerrstr()); + break; + case VtTwrite: + p = r->tx.data; + r->tx.data = nil; + if(writelump(p, r->rx.score, r->tx.dtype, 0) < 0) + vtrerror(r, gerrstr()); + break; + case VtTsync: + queueflush(); + break; + } + vtrespond(r); + } +} + diff --git a/src/cmd/venti/verifyarena.c b/src/cmd/venti/verifyarena.c new file mode 100644 index 00000000..9e7dc31b --- /dev/null +++ b/src/cmd/venti/verifyarena.c @@ -0,0 +1,127 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +static int verbose; + +void +usage(void) +{ + fprint(2, "usage: verifyarena [-v]\n"); + threadexitsall(0); +} + +static void +readblock(uchar *buf, int n) +{ + int nr, m; + + for(nr = 0; nr < n; nr += m){ + m = n - nr; + m = read(0, &buf[nr], m); + if(m <= 0) + sysfatal("can't read arena from standard input: %r"); + } +} + +static void +verifyarena(void) +{ + Arena arena; + ArenaHead head; + ZBlock *b; + DigestState s; + u64int n, e; + u32int bs; + u8int score[VtScoreSize]; + + fprint(2, "verify arena from standard input\n"); + + memset(&arena, 0, sizeof arena); + memset(&s, 0, sizeof s); + + /* + * read the little bit, which will included the header + */ + bs = MaxIoSize; + b = alloczblock(bs, 0); + readblock(b->data, HeadSize); + sha1(b->data, HeadSize, nil, &s); + if(unpackarenahead(&head, b->data) < 0) + sysfatal("corrupted arena header: %r"); + if(head.version != ArenaVersion) + fprint(2, "warning: unknown arena version %d\n", head.version); + + /* + * now we know how much to read + * read everything but the last block, which is special + */ + e = head.size - head.blocksize; + for(n = HeadSize; n < e; n += bs){ + if(n + bs > e) + bs = e - n; + readblock(b->data, bs); + sha1(b->data, bs, nil, &s); + } + + /* + * read the last block update the sum. + * the sum is calculated assuming the slot for the sum is zero. + */ + bs = head.blocksize; + readblock(b->data, bs); + sha1(b->data, bs-VtScoreSize, nil, &s); + sha1(zeroscore, VtScoreSize, nil, &s); + sha1(nil, 0, score, &s); + + /* + * validity check on the trailer + */ + arena.blocksize = head.blocksize; + if(unpackarena(&arena, b->data) < 0) + sysfatal("corrupted arena trailer: %r"); + scorecp(arena.score, &b->data[arena.blocksize - VtScoreSize]); + + if(namecmp(arena.name, head.name) != 0) + sysfatal("arena header and trailer names clash: %s vs. %s\n", head.name, arena.name); + if(arena.version != head.version) + sysfatal("arena header and trailer versions clash: %d vs. %d\n", head.version, arena.version); + arena.size = head.size - 2 * head.blocksize; + + /* + * check for no checksum or the same + */ + if(scorecmp(score, arena.score) != 0){ + if(scorecmp(zeroscore, arena.score) != 0) + fprint(2, "warning: mismatched checksums for arena=%s, found=%V calculated=%V", + arena.name, arena.score, score); + scorecp(arena.score, score); + }else + fprint(2, "matched score\n"); + + printarena(2, &arena); +} + +void +threadmain(int argc, char *argv[]) +{ + fmtinstall('V', vtscorefmt); + statsinit(); + + ARGBEGIN{ + case 'v': + verbose++; + break; + default: + usage(); + break; + }ARGEND + + readonly = 1; + + if(argc != 0) + usage(); + + verifyarena(); + threadexitsall(0); +} diff --git a/src/cmd/venti/whack.c b/src/cmd/venti/whack.c new file mode 100644 index 00000000..9aa6b6d2 --- /dev/null +++ b/src/cmd/venti/whack.c @@ -0,0 +1,330 @@ +#include "stdinc.h" +#include "whack.h" + +typedef struct Huff Huff; + +enum +{ + MaxFastLen = 9, + BigLenCode = 0x1f4, /* minimum code for large lenth encoding */ + BigLenBits = 9, + BigLenBase = 4, /* starting items to encode for big lens */ + + MinOffBits = 6, + MaxOffBits = MinOffBits + 8, + + MaxLen = 2051 /* max. length encodable in 24 bits */ +}; + +enum +{ + StatBytes, + StatOutBytes, + StatLits, + StatMatches, + StatLitBits, + StatOffBits, + StatLenBits, + + MaxStat +}; + +struct Huff +{ + short bits; /* length of the code */ + ulong encode; /* the code */ +}; + +static Huff lentab[MaxFastLen] = +{ + {2, 0x2}, /* 10 */ + {3, 0x6}, /* 110 */ + {5, 0x1c}, /* 11100 */ + {5, 0x1d}, /* 11101 */ + {6, 0x3c}, /* 111100 */ + {7, 0x7a}, /* 1111010 */ + {7, 0x7b}, /* 1111011 */ + {8, 0xf8}, /* 11111000 */ + {8, 0xf9}, /* 11111001 */ +}; + +static int thwmaxcheck; + +void +whackinit(Whack *tw, int level) +{ + thwmaxcheck = (1 << level); + thwmaxcheck -= thwmaxcheck >> 2; + if(thwmaxcheck < 2) + thwmaxcheck = 2; + else if(thwmaxcheck > 1024) + thwmaxcheck = 1024; + memset(tw, 0, sizeof *tw); + tw->begin = 2 * WhackMaxOff; +} + +/* + * find a string in the dictionary + */ +static int +whackmatch(Whack *b, uchar **ss, uchar *esrc, ulong h, ulong now) +{ + ushort then, off, last; + int bestoff, bestlen, check; + uchar *s, *t; + + s = *ss; + if(esrc < s + MinMatch) + return -1; + if(s + MaxLen < esrc) + esrc = s + MaxLen; + + bestoff = 0; + bestlen = 0; + check = thwmaxcheck; + last = 0; + for(then = b->hash[h]; check-- > 0; then = b->next[then & (WhackMaxOff - 1)]){ + off = now - then; + if(off <= last || off > WhackMaxOff) + break; + + /* + * don't need to check for the end because + * 1) s too close check above + */ + t = s - off; + if(s[0] == t[0] && s[1] == t[1] && s[2] == t[2]){ + if(!bestlen || esrc - s > bestlen && s[bestlen] == t[bestlen]){ + t += 3; + for(s += 3; s < esrc; s++){ + if(*s != *t) + break; + t++; + } + if(s - *ss > bestlen){ + bestlen = s - *ss; + bestoff = off; + if(bestlen > thwmaxcheck) + break; + } + } + } + s = *ss; + last = off; + } + *ss += bestlen; + return bestoff; +} + +/* + * knuth vol. 3 multiplicative hashing + * each byte x chosen according to rules + * 1/4 < x < 3/10, 1/3 x < < 3/7, 4/7 < x < 2/3, 7/10 < x < 3/4 + * with reasonable spread between the bytes & their complements + * + * the 3 byte value appears to be as almost good as the 4 byte value, + * and might be faster on some machines + */ +/* +#define hashit(c) ((((ulong)(c) * 0x6b43a9) >> (24 - HashLog)) & HashMask) +*/ +#define hashit(c) (((((ulong)(c) & 0xffffff) * 0x6b43a9b5) >> (32 - HashLog)) & HashMask) + +/* + * lz77 compression with single lookup in a hash table for each block + */ +int +whack(Whack *w, uchar *dst, uchar *src, int n, ulong stats[WhackStats]) +{ + uchar *s, *ss, *sss, *esrc, *half, *wdst, *wdmax; + ulong cont, code, wbits; + ushort now; + int toff, lithist, h, len, bits, use, wnbits, lits, matches, offbits, lenbits; + + if(n < MinMatch) + return -1; + + wdst = dst; + wdmax = dst + n; + + now = w->begin; + s = src; + w->data = s; + + cont = (s[0] << 16) | (s[1] << 8) | s[2]; + + esrc = s + n; + half = s + (n >> 1); + wnbits = 0; + wbits = 0; + lits = 0; + matches = 0; + offbits = 0; + lenbits = 0; + lithist = ~0; + while(s < esrc){ + h = hashit(cont); + + sss = s; + toff = whackmatch(w, &sss, esrc, h, now); + ss = sss; + + len = ss - s; + for(; wnbits >= 8; wnbits -= 8){ + if(wdst >= wdmax){ + w->begin = now; + return -1; + } + *wdst++ = wbits >> (wnbits - 8); + } + if(len < MinMatch){ + toff = *s; + lithist = (lithist << 1) | toff < 32 | toff > 127; + if(lithist & 0x1e){ + wbits = (wbits << 9) | toff; + wnbits += 9; + }else if(lithist & 1){ + toff = (toff + 64) & 0xff; + if(toff < 96){ + wbits = (wbits << 10) | toff; + wnbits += 10; + }else{ + wbits = (wbits << 11) | toff; + wnbits += 11; + } + }else{ + wbits = (wbits << 8) | toff; + wnbits += 8; + } + lits++; + + /* + * speed hack + * check for compression progress, bail if none achieved + */ + if(s > half){ + if(4 * (s - src) < 5 * lits){ + w->begin = now; + return -1; + } + half = esrc; + } + + if(s + MinMatch <= esrc){ + w->next[now & (WhackMaxOff - 1)] = w->hash[h]; + w->hash[h] = now; + if(s + MinMatch < esrc) + cont = (cont << 8) | s[MinMatch]; + } + now++; + s++; + continue; + } + + matches++; + + /* + * length of match + */ + if(len > MaxLen){ + len = MaxLen; + ss = s + len; + } + len -= MinMatch; + if(len < MaxFastLen){ + bits = lentab[len].bits; + wbits = (wbits << bits) | lentab[len].encode; + wnbits += bits; + lenbits += bits; + }else{ + code = BigLenCode; + bits = BigLenBits; + use = BigLenBase; + len -= MaxFastLen; + while(len >= use){ + len -= use; + code = (code + use) << 1; + use <<= (bits & 1) ^ 1; + bits++; + } + + wbits = (wbits << bits) | (code + len); + wnbits += bits; + lenbits += bits; + + for(; wnbits >= 8; wnbits -= 8){ + if(wdst >= wdmax){ + w->begin = now; + return -1; + } + *wdst++ = wbits >> (wnbits - 8); + } + } + + /* + * offset in history + */ + toff--; + for(bits = MinOffBits; toff >= (1 << bits); bits++) + ; + if(bits < MaxOffBits-1){ + wbits = (wbits << 3) | (bits - MinOffBits); + if(bits != MinOffBits) + bits--; + wnbits += bits + 3; + offbits += bits + 3; + }else{ + wbits = (wbits << 4) | 0xe | (bits - (MaxOffBits-1)); + bits--; + wnbits += bits + 4; + offbits += bits + 4; + } + wbits = (wbits << bits) | toff & ((1 << bits) - 1); + + for(; s != ss; s++){ + if(s + MinMatch <= esrc){ + h = hashit(cont); + w->next[now & (WhackMaxOff - 1)] = w->hash[h]; + w->hash[h] = now; + if(s + MinMatch < esrc) + cont = (cont << 8) | s[MinMatch]; + } + now++; + } + } + + w->begin = now; + + stats[StatBytes] += esrc - src; + stats[StatLits] += lits; + stats[StatMatches] += matches; + stats[StatLitBits] += (wdst - (dst + 2)) * 8 + wnbits - offbits - lenbits; + stats[StatOffBits] += offbits; + stats[StatLenBits] += lenbits; + + if(wnbits & 7){ + wbits <<= 8 - (wnbits & 7); + wnbits += 8 - (wnbits & 7); + } + for(; wnbits >= 8; wnbits -= 8){ + if(wdst >= wdmax) + return -1; + *wdst++ = wbits >> (wnbits - 8); + } + + stats[StatOutBytes] += wdst - dst; + + return wdst - dst; +} + +int +whackblock(uchar *dst, uchar *src, int ssize) +{ + Whack w; + ulong stats[MaxStat]; + int r; + + whackinit(&w, 6); + r = whack(&w, dst, src, ssize, stats); + return r; +} diff --git a/src/cmd/venti/whack.h b/src/cmd/venti/whack.h new file mode 100644 index 00000000..fb966169 --- /dev/null +++ b/src/cmd/venti/whack.h @@ -0,0 +1,40 @@ +typedef struct Whack Whack; +typedef struct Unwhack Unwhack; + +enum +{ + WhackStats = 8, + WhackErrLen = 64, /* max length of error message from thwack or unthwack */ + WhackMaxOff = 16*1024, /* max allowed offset */ + + HashLog = 14, + HashSize = 1<<HashLog, + HashMask = HashSize - 1, + + MinMatch = 3, /* shortest match possible */ + + MinDecode = 8, /* minimum bits to decode a match or lit; >= 8 */ + + MaxSeqMask = 8, /* number of bits in coding block mask */ + MaxSeqStart = 256 /* max offset of initial coding block */ +}; + +struct Whack +{ + ushort begin; /* time of first byte in hash */ + ushort hash[HashSize]; + ushort next[WhackMaxOff]; + uchar *data; +}; + +struct Unwhack +{ + char err[WhackErrLen]; +}; + +void whackinit(Whack*, int level); +void unwhackinit(Unwhack*); +int whack(Whack*, uchar *dst, uchar *src, int nsrc, ulong stats[WhackStats]); +int unwhack(Unwhack*, uchar *dst, int ndst, uchar *src, int nsrc); + +int whackblock(uchar *dst, uchar *src, int ssize); diff --git a/src/cmd/venti/wrarena.c b/src/cmd/venti/wrarena.c new file mode 100644 index 00000000..f232928a --- /dev/null +++ b/src/cmd/venti/wrarena.c @@ -0,0 +1,148 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +char *host; +int readonly = 1; /* for part.c */ + +void +usage(void) +{ + fprint(2, "usage: wrarena [-h host] arenafile [offset]\n"); + threadexitsall("usage"); +} + +static void +rdarena(VtConn *z, Arena *arena, u64int offset) +{ + u64int a, aa, e; + u32int magic; + Clump cl; + uchar score[VtScoreSize]; + ZBlock *lump; + + fprint(2, "copying %s to venti\n", arena->name); + printarena(2, arena); + + a = arena->base; + e = arena->base + arena->size; + if(offset != ~(u64int)0) { + if(offset >= e-a) + sysfatal("bad offset %llud >= %llud\n", + offset, e-a); + aa = offset; + } else + aa = 0; + + for(; aa < e; aa += ClumpSize+cl.info.size) { + magic = clumpmagic(arena, aa); + if(magic == ClumpFreeMagic) + break; + if(magic != ClumpMagic) { + fprint(2, "illegal clump magic number %#8.8ux offset %llud\n", + magic, aa); + break; + } + lump = loadclump(arena, aa, 0, &cl, score, 0); + if(lump == nil) { + fprint(2, "clump %llud failed to read: %r\n", aa); + break; + } + if(cl.info.type != VtTypeCorrupt) { + scoremem(score, lump->data, cl.info.uncsize); + if(scorecmp(cl.info.score, score) != 0) { + fprint(2, "clump %llud has mismatched score\n", aa); + break; + } + if(vttypevalid(cl.info.type) < 0) { + fprint(2, "clump %llud has bad type %d\n", aa, cl.info.type); + break; + } + } + if(z && vtwrite(z, score, cl.info.type, lump->data, cl.info.uncsize) < 0) + sysfatal("failed writing clump %llud: %r", aa); + freezblock(lump); + } + if(z && vtsync(z) < 0) + sysfatal("failed executing sync: %r"); + + print("end offset %llud\n", aa); +} + +void +threadmain(int argc, char *argv[]) +{ + char *file; + VtConn *z; + Arena *arena; + u64int offset, aoffset; + Part *part; + Dir *d; + uchar buf[8192]; + ArenaHead head; + + aoffset = 0; + ARGBEGIN{ + case 'h': + host = EARGF(usage()); + break; + case 'o': + aoffset = strtoull(EARGF(usage()), 0, 0); + break; + default: + usage(); + break; + }ARGEND + + offset = ~(u64int)0; + switch(argc) { + default: + usage(); + case 2: + offset = strtoull(argv[1], 0, 0); + /* fall through */ + case 1: + file = argv[0]; + } + + + fmtinstall('V', vtscorefmt); + + statsinit(); + + if((d = dirstat(file)) == nil) + sysfatal("can't stat file %s: %r", file); + + part = initpart(file, 0); + if(part == nil) + sysfatal("can't open file %s: %r", file); + if(readpart(part, aoffset, buf, sizeof buf) < 0) + sysfatal("can't read file %s: %r", file); + + if(unpackarenahead(&head, buf) < 0) + sysfatal("corrupted arena header: %r"); + + if(aoffset+head.size > d->length) + sysfatal("arena is truncated: want %llud bytes have %llud\n", + head.size, d->length); + + partblocksize(part, head.blocksize); + initdcache(8 * MaxDiskBlock); + + arena = initarena(part, aoffset, head.size, head.blocksize); + if(arena == nil) + sysfatal("initarena: %r"); + + if(host && strcmp(host, "/dev/null") != 0){ + z = vtdial(host); + if(z == nil) + sysfatal("could not connect to server: %r"); + if(vtconnect(z) < 0) + sysfatal("vtconnect: %r"); + }else + z = nil; + + rdarena(z, arena, offset); + vthangup(z); + threadexitsall(0); +} diff --git a/src/cmd/venti/write.c b/src/cmd/venti/write.c new file mode 100644 index 00000000..94b61be4 --- /dev/null +++ b/src/cmd/venti/write.c @@ -0,0 +1,60 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +char *host; + +void +usage(void) +{ + fprint(2, "usage: write [-z] [-h host] [-t type] <datablock\n"); + threadexitsall("usage"); +} + +void +threadmain(int argc, char *argv[]) +{ + uchar *p, score[VtScoreSize]; + int type, n, dotrunc; + VtConn *z; + + dotrunc = 0; + type = VtDataType; + ARGBEGIN{ + case 'z': + dotrunc = 1; + break; + case 'h': + host = EARGF(usage()); + break; + case 't': + type = atoi(EARGF(usage())); + break; + default: + usage(); + break; + }ARGEND + + if(argc != 0) + usage(); + + + fmtinstall('V', vtscorefmt); + + p = ezmalloc(VtMaxLumpSize+1); + n = readn(0, p, VtMaxLumpSize+1); + if(n > VtMaxLumpSize) + sysfatal("data too big"); + z = vtdial(host); + if(z == nil) + sysfatal("could not connect to server: %r"); + if(vtconnect(z) < 0) + sysfatal("vtconnect: %r"); + if(dotrunc) + n = vtzerotruncate(type, p, n); + if(vtwrite(z, score, type, p, n) < 0) + sysfatal("vtwrite: %r"); + vthangup(z); + print("%V\n", score); + threadexitsall(0); +} diff --git a/src/cmd/venti/xml.c b/src/cmd/venti/xml.c new file mode 100644 index 00000000..d50d5bc9 --- /dev/null +++ b/src/cmd/venti/xml.c @@ -0,0 +1,69 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" +#include "httpd.h" +#include "xml.h" + +void xmlarena(Hio *hout, Arena *s, char *tag, int indent){ + xmlindent(hout, indent); + hprint(hout, "<%s", tag); + xmlaname(hout, s->name, "name"); + xmlu32int(hout, s->version, "version"); + xmlaname(hout, s->part->name, "partition"); + xmlu32int(hout, s->blocksize, "blocksize"); + xmlu64int(hout, s->base, "start"); + xmlu64int(hout, s->base+2*s->blocksize, "stop"); + xmlu32int(hout, s->ctime, "created"); + xmlu32int(hout, s->wtime, "modified"); + xmlsealed(hout, s->sealed, "sealed"); + xmlscore(hout, s->score, "score"); + xmlu32int(hout, s->clumps, "clumps"); + xmlu32int(hout, s->cclumps, "compressedclumps"); + xmlu64int(hout, s->uncsize, "data"); + xmlu64int(hout, s->used - s->clumps * ClumpSize, "compresseddata"); + xmlu64int(hout, s->used + s->clumps * ClumpInfoSize, "storage"); + hprint(hout, "/>\n"); +} + +void xmlindex(Hio *hout, Index *s, char *tag, int indent){ + int i; + xmlindent(hout, indent); + hprint(hout, "<%s", tag); + xmlaname(hout, s->name, "name"); + xmlu32int(hout, s->version, "version"); + xmlu32int(hout, s->blocksize, "blocksize"); + xmlu32int(hout, s->tabsize, "tabsize"); + xmlu32int(hout, s->buckets, "buckets"); + xmlu32int(hout, s->div, "buckdiv"); + hprint(hout, ">\n"); + xmlindent(hout, indent + 1); + hprint(hout, "<sects>\n"); + for(i = 0; i < s->nsects; i++) + xmlamap(hout, &s->smap[i], "sect", indent + 2); + xmlindent(hout, indent + 1); + hprint(hout, "</sects>\n"); + xmlindent(hout, indent + 1); + hprint(hout, "<amaps>\n"); + for(i = 0; i < s->narenas; i++) + xmlamap(hout, &s->amap[i], "amap", indent + 2); + xmlindent(hout, indent + 1); + hprint(hout, "</amaps>\n"); + xmlindent(hout, indent + 1); + hprint(hout, "<arenas>\n"); + for(i = 0; i < s->narenas; i++) + xmlarena(hout, s->arenas[i], "arena", indent + 2); + xmlindent(hout, indent + 1); + hprint(hout, "</arenas>\n"); + xmlindent(hout, indent); + hprint(hout, "</%s>\n", tag); +} + +void xmlamap(Hio *hout, AMap *s, char *tag, int indent){ + xmlindent(hout, indent); + hprint(hout, "<%s", tag); + xmlaname(hout, s->name, "name"); + xmlu64int(hout, s->start, "start"); + xmlu64int(hout, s->stop, "stop"); + hprint(hout, "/>\n"); +} + diff --git a/src/cmd/venti/xml.h b/src/cmd/venti/xml.h new file mode 100644 index 00000000..c9e52b0b --- /dev/null +++ b/src/cmd/venti/xml.h @@ -0,0 +1,11 @@ +void xmlamap(Hio *hout, AMap *v, char *tag, int indent); +void xmlarena(Hio *hout, Arena *v, char *tag, int indent); +void xmlindex(Hio *hout, Index *v, char *tag, int indent); + +void xmlaname(Hio *hout, char *v, char *tag); +void xmlscore(Hio *hout, u8int *v, char *tag); +void xmlsealed(Hio *hout, int v, char *tag); +void xmlu32int(Hio *hout, u32int v, char *tag); +void xmlu64int(Hio *hout, u64int v, char *tag); + +void xmlindent(Hio *hout, int indent); diff --git a/src/cmd/venti/zeropart.c b/src/cmd/venti/zeropart.c new file mode 100644 index 00000000..b24c98c6 --- /dev/null +++ b/src/cmd/venti/zeropart.c @@ -0,0 +1,28 @@ +#include "stdinc.h" +#include "dat.h" +#include "fns.h" + +void +zeropart(Part *part, int blocksize) +{ + ZBlock *b; + u64int addr; + int w; + + fprint(2, "clearing the partition\n"); + + b = alloczblock(MaxIoSize, 1); + + w = 0; + for(addr = PartBlank; addr + MaxIoSize <= part->size; addr += MaxIoSize){ + if(writepart(part, addr, b->data, MaxIoSize) < 0) + sysfatal("can't initialize %s, writing block %d failed: %r", part->name, w); + w++; + } + + for(; addr + blocksize <= part->size; addr += blocksize) + if(writepart(part, addr, b->data, blocksize) < 0) + sysfatal("can't initialize %s: %r", part->name); + + freezblock(b); +} |