/*
 * vbackup [-Dnv] fspartition [score]
 *
 * Copy a file system to a disk image stored on Venti.
 * Prints a vnfs config line for the copied image.
 *
 *	-D	print debugging
 *	-f	'fast' writes - skip write if block exists on server
 *	-m	set mount name
 *	-n	nop -- don't actually write blocks
 *	-s	print status updates
 *	-v	print debugging trace
 *	-w	write parallelism
 * 
 * If score is given on the command line, it should be the
 * score from a previous vbackup on this fspartition.
 * In this mode, only the new blocks are stored to Venti.
 * The result is still a complete image, but requires many
 * fewer Venti writes in the common case.
 *
 * This program is structured as three processes connected
 * by buffered queues:
 *
 * 	fsysproc | cmpproc | ventiproc
 * 
 * Fsysproc reads the disk and queues the blocks.
 * Cmpproc compares the blocks against the SHA1 hashes
 * in the old image, if any.  It discards the unchanged blocks
 * and queues the changed ones.  Ventiproc writes blocks to Venti.
 * 
 * There is a fourth proc, statusproc, which prints status
 * updates about how the various procs are progressing.
 */

#include <u.h>
#include <libc.h>
#include <bio.h>
#include <thread.h>
#include <libsec.h>
#include <venti.h>
#include <diskfs.h>
#include "queue.h"

enum
{
	STACK = 32768
};

typedef struct WriteReq WriteReq;
struct WriteReq
{
	Packet *p;
	uint type;
	uchar score[VtScoreSize];
};

Biobuf	bscores;		/* biobuf filled with block scores */
int		debug;		/* debugging flag (not used) */
Disk*	disk;			/* disk being backed up */
RWLock	endlk;		/* silly synchonization */
int		errors;		/* are we exiting with an error status? */
int		fastwrites;		/* do not write blocks already on server */
int		fsscanblock;	/* last block scanned */
Fsys*	fsys;			/* file system being backed up */
int		nchange;		/* number of changed blocks */
int		nop;			/* don't actually send blocks to venti */
int		nskip;		/* number of blocks skipped (already on server) */
int		nwritethread;	/* number of write-behind threads */
Queue*	qcmp;		/* queue fsys->cmp */
Queue*	qventi;		/* queue cmp->venti */
int		statustime;	/* print status every _ seconds */
int		verbose;		/* print extra stuff */
VtFile*	vfile;			/* venti file being written */
Channel*	writechan;	/* chan(WriteReq) */
VtConn*	z;			/* connection to venti */
VtCache*	zcache;		/* cache of venti blocks */
uchar*	zero;			/* blocksize zero bytes */

int nsend, nrecv;

void		cmpproc(void*);
void		fsysproc(void*);
void		statusproc(void*);
void		ventiproc(void*);
int		timefmt(Fmt*);
char*	mountplace(char *dev);

void
usage(void)
{
	fprint(2, "usage: vbackup [-DVnv] [-m mtpt] [-s secs] [-w n] disk [score]\n");
	threadexitsall("usage");
}

void
threadmain(int argc, char **argv)
{
	char *pref, *mountname;
	uchar score[VtScoreSize], prev[VtScoreSize];
	int i, fd, csize;
	vlong bsize;
	Tm tm;
	VtEntry e;
	VtBlock *b;
	VtCache *c;
	VtRoot root;
	char *tmp, *tmpnam;

	fmtinstall('F', vtfcallfmt);
	fmtinstall('H', encodefmt);
	fmtinstall('T', timefmt);
	fmtinstall('V', vtscorefmt);

	mountname = sysname();
	ARGBEGIN{
	default:
		usage();
		break;
	case 'D':
		debug++;
		break;
	case 'V':
		chattyventi = 1;
		break;
	case 'f':
		fastwrites = 1;
		break;
	case 'm':
		mountname = EARGF(usage());
		break;
	case 'n':
		nop = 1;
		break;
	case 's':
		statustime = atoi(EARGF(usage()));
		break;
	case 'v':
		verbose = 1;
		break;
	case 'w':
		nwritethread = atoi(EARGF(usage()));
		break;
	}ARGEND

	if(argc != 1 && argc != 2)
		usage();

	if(statustime)
		print("# %T vbackup %s %s\n", argv[0], argc>=2 ? argv[1] : "");

	/*
	 * open fs
	 */
	if((disk = diskopenfile(argv[0])) == nil)
		sysfatal("diskopen: %r");
	if((disk = diskcache(disk, 32768, 2*MAXQ+16)) == nil)
		sysfatal("diskcache: %r");
	if((fsys = fsysopen(disk)) == nil)
		sysfatal("ffsopen: %r");

	/*
	 * connect to venti
	 */
	if((z = vtdial(nil)) == nil)
		sysfatal("vtdial: %r");
	if(vtconnect(z) < 0)
		sysfatal("vtconnect: %r");

	/*
	 * set up venti block cache
	 */
	zero = vtmallocz(fsys->blocksize);
	bsize = fsys->blocksize;
	csize = 50;	/* plenty; could probably do with 5 */

	if(verbose)
		fprint(2, "cache %d blocks\n", csize);
	c = vtcachealloc(z, bsize, csize);
	zcache = c;

	/*
	 * parse starting score
	 */
	memset(prev, 0, sizeof prev);
	if(argc == 1){
		vfile = vtfilecreateroot(c, (fsys->blocksize/VtScoreSize)*VtScoreSize,
			fsys->blocksize, VtDataType);
		if(vfile == nil)
			sysfatal("vtfilecreateroot: %r");
		vtfilelock(vfile, VtORDWR);
		if(vtfilewrite(vfile, zero, 1, bsize*fsys->nblock-1) != 1)
			sysfatal("vtfilewrite: %r");
		if(vtfileflush(vfile) < 0)
			sysfatal("vtfileflush: %r");
	}else{
		if(vtparsescore(argv[1], &pref, score) < 0)
			sysfatal("bad score: %r");
		if(pref!=nil && strcmp(pref, fsys->type) != 0)
			sysfatal("score is %s but fsys is %s", pref, fsys->type);
		b = vtcacheglobal(c, score, VtRootType);
		if(b){
			if(vtrootunpack(&root, b->data) < 0)
				sysfatal("bad root: %r");
			if(strcmp(root.type, fsys->type) != 0)
				sysfatal("root is %s but fsys is %s", root.type, fsys->type);
			memmove(prev, score, VtScoreSize);
			memmove(score, root.score, VtScoreSize);
			vtblockput(b);
		}
		b = vtcacheglobal(c, score, VtDirType);
		if(b == nil)
			sysfatal("vtcacheglobal %V: %r", score);
		if(vtentryunpack(&e, b->data, 0) < 0)
			sysfatal("%V: vtentryunpack failed", score);
		if(verbose)
			fprint(2, "entry: size %llud psize %d dsize %d\n",
				e.size, e.psize, e.dsize);
		vtblockput(b);
		if((vfile = vtfileopenroot(c, &e)) == nil)
			sysfatal("vtfileopenroot: %r");
		vtfilelock(vfile, VtORDWR);
		if(e.dsize != bsize)
			sysfatal("file system block sizes don't match %d %lld", e.dsize, bsize);
		if(e.size != fsys->nblock*bsize)
			sysfatal("file system block counts don't match %lld %lld", e.size, fsys->nblock*bsize);
	}

	/*
	 * write scores of blocks into temporary file
	 */
	if((tmp = getenv("TMP")) != nil){
		/* okay, good */
	}else if(access("/var/tmp", 0) >= 0)
		tmp = "/var/tmp";
	else
		tmp = "/tmp";
	tmpnam = smprint("%s/vbackup.XXXXXX", tmp);
	if(tmpnam == nil)
		sysfatal("smprint: %r");

	if((fd = opentemp(tmpnam)) < 0)
		sysfatal("opentemp %s: %r", tmpnam);
	if(statustime)
		print("# %T reading scores into %s\n", tmpnam);
	if(verbose)
		fprint(2, "read scores into %s...\n", tmpnam);

	Binit(&bscores, fd, OWRITE);
	for(i=0; i<fsys->nblock; i++){
		if(vtfileblockscore(vfile, i, score) < 0)
			sysfatal("vtfileblockhash %d: %r", i);
		if(Bwrite(&bscores, score, VtScoreSize) != VtScoreSize)
			sysfatal("Bwrite: %r");
	}
	Bterm(&bscores);
	vtfileunlock(vfile);

	/*
	 * prep scores for rereading
	 */
	seek(fd, 0, 0);
	Binit(&bscores, fd, OREAD);

	/*
	 * start the main processes 
	 */
	if(statustime)
		print("# %T starting procs\n");
	qcmp = qalloc();
	qventi = qalloc();

	rlock(&endlk);
	proccreate(fsysproc, nil, STACK);
	rlock(&endlk);
	proccreate(ventiproc, nil, STACK);
	rlock(&endlk);
	proccreate(cmpproc, nil, STACK);
	if(statustime){
		rlock(&endlk);
		proccreate(statusproc, nil, STACK);
	}

	/*
	 * wait for processes to finish
	 */
	wlock(&endlk);

	if(statustime)
		print("# %T procs exited: %d blocks changed, %d read, %d written, %d skipped, %d copied\n",
			nchange, vtcachenread, vtcachenwrite, nskip, vtcachencopy);

	/*
	 * prepare root block
	 */
	vtfilelock(vfile, -1);
	if(vtfileflush(vfile) < 0)
		sysfatal("vtfileflush: %r");
	if(vtfilegetentry(vfile, &e) < 0)
		sysfatal("vtfilegetentry: %r");

	b = vtcacheallocblock(c, VtDirType);
	if(b == nil)
		sysfatal("vtcacheallocblock: %r");
	vtentrypack(&e, b->data, 0);
	if(vtblockwrite(b) < 0)
		sysfatal("vtblockwrite: %r");

	memset(&root, 0, sizeof root);
	strecpy(root.name, root.name+sizeof root.name, argv[0]);
	strecpy(root.type, root.type+sizeof root.type, fsys->type);
	memmove(root.score, b->score, VtScoreSize);
	root.blocksize = fsys->blocksize;
	memmove(root.prev, prev, VtScoreSize);
	vtblockput(b);

	b = vtcacheallocblock(c, VtRootType);
	if(b == nil)
		sysfatal("vtcacheallocblock: %r");
	vtrootpack(&root, b->data);
	if(vtblockwrite(b) < 0)
		sysfatal("vtblockwrite: %r");

	tm = *localtime(time(0));
	tm.year += 1900;
	tm.mon++;
	print("mount /%s/%d/%02d%02d%s %s:%V %d/%02d%02d/%02d%02d\n",
		mountname, tm.year, tm.mon, tm.mday, 
		mountplace(argv[0]),
		root.type, b->score,
		tm.year, tm.mon, tm.mday, tm.hour, tm.min);
	print("# %T %s %s:%V\n", argv[0], root.type, b->score);
	if(statustime)
		print("# %T venti sync\n");
	vtblockput(b);
	if(vtsync(z) < 0)
		sysfatal("vtsync: %r");
	if(statustime)
		print("# %T synced\n");
	threadexitsall(nil);
}

void
fsysproc(void *dummy)
{
	u32int i;
	Block *db;

	USED(dummy);

	for(i=0; i<fsys->nblock; i++){
		fsscanblock = i;
		if((db = fsysreadblock(fsys, i)) != nil)
			qwrite(qcmp, db, i);
	}
	fsscanblock = i;
	qclose(qcmp);

	if(statustime)
		print("# %T fsys proc exiting\n");
	runlock(&endlk);
}

void
cmpproc(void *dummy)
{
	uchar *data;
	Block *db;
	u32int bno, bsize;
	uchar score[VtScoreSize];
	uchar score1[VtScoreSize];

	USED(dummy);

	bsize = fsys->blocksize;
	while((db = qread(qcmp, &bno)) != nil){
		data = db->data;
		sha1(data, vtzerotruncate(VtDataType, data, bsize), score, nil);
		if(Bseek(&bscores, (vlong)bno*VtScoreSize, 0) < 0)
			sysfatal("cmpproc Bseek: %r");
		if(Bread(&bscores, score1, VtScoreSize) != VtScoreSize)
			sysfatal("cmpproc Bread: %r");
		if(memcmp(score, score1, VtScoreSize) != 0){
			nchange++;
			if(verbose)
				print("# block %ud: old %V new %V\n", bno, score1, score);
			qwrite(qventi, db, bno);
		}else
			blockput(db);
	}
	qclose(qventi);
	if(statustime)
		print("# %T cmp proc exiting\n");
	runlock(&endlk);
}

void
writethread(void *v)
{
	WriteReq wr;
	char err[ERRMAX];

	USED(v);

	while(recv(writechan, &wr) == 1){
		nrecv++;
		if(wr.p == nil)
			break;
		
		if(fastwrites && vtread(z, wr.score, wr.type, nil, 0) < 0){
			rerrstr(err, sizeof err);
			if(strstr(err, "read too small")){	/* already exists */
				nskip++;
				packetfree(wr.p);
				continue;
			}
		}
		if(vtwritepacket(z, wr.score, wr.type, wr.p) < 0)
			sysfatal("vtwritepacket: %r");
	}
}

int
myvtwrite(VtConn *z, uchar score[VtScoreSize], uint type, uchar *buf, int n)
{
	WriteReq wr;

	if(nwritethread == 0)
		return vtwrite(z, score, type, buf, n);

	wr.p = packetalloc();
	packetappend(wr.p, buf, n);
	packetsha1(wr.p, score);
	memmove(wr.score, score, VtScoreSize);
	wr.type = type;
	nsend++;
	send(writechan, &wr);
	return 0;
}

void
ventiproc(void *dummy)
{
	int i;
	Block *db;
	u32int bno;
	u64int bsize;

	USED(dummy);

	proccreate(vtsendproc, z, STACK);
	proccreate(vtrecvproc, z, STACK);

	writechan = chancreate(sizeof(WriteReq), 0);
	for(i=0; i<nwritethread; i++)
		threadcreate(writethread, nil, STACK);
	vtcachesetwrite(zcache, myvtwrite);

	bsize = fsys->blocksize;
	vtfilelock(vfile, -1);
	while((db = qread(qventi, &bno)) != nil){
		if(nop){
			blockput(db);
			continue;
		}
		if(vtfilewrite(vfile, db->data, bsize, bno*bsize) != bsize)
			sysfatal("ventiproc vtfilewrite: %r");
		if(vtfileflushbefore(vfile, (bno+1)*bsize) < 0)
			sysfatal("ventiproc vtfileflushbefore: %r");
		blockput(db);
	}
	vtfileunlock(vfile);
	vtcachesetwrite(zcache, nil);
	for(i=0; i<nwritethread; i++)
		send(writechan, nil);
	if(statustime)
		print("# %T venti proc exiting - nsend %d nrecv %d\n", nsend, nrecv);
	runlock(&endlk);
}

static int
percent(u32int a, u32int b)
{
	return (vlong)a*100/b;
}

void
statusproc(void *dummy)
{
	int n;
	USED(dummy);

	for(n=0;;n++){
		sleep(1000);
		if(qcmp->closed && qcmp->nel==0 && qventi->closed && qventi->nel==0)
			break;
		if(n < statustime)
			continue;
		n = 0;
		print("# %T fsscan=%d%% cmpq=%d%% ventiq=%d%%\n",
			percent(fsscanblock, fsys->nblock),
			percent(qcmp->nel, MAXQ),
			percent(qventi->nel, MAXQ));
	}
	print("# %T status proc exiting\n");
	runlock(&endlk);
}

int
timefmt(Fmt *fmt)
{
	vlong ns;
	Tm tm;
	ns = nsec();
	tm = *localtime(time(0));
	return fmtprint(fmt, "%04d/%02d%02d %02d:%02d:%02d.%03d", 
		tm.year+1900, tm.mon+1, tm.mday, tm.hour, tm.min, tm.sec,
		(int)(ns%1000000000)/1000000);
}

char*
mountplace(char *dev)
{
	char *cmd, *q;
	int p[2], fd[3], n;
	char buf[100];
	
	if(pipe(p) < 0)
		sysfatal("pipe: %r");

	fd[0] = -1;
	fd[1] = p[1];
	fd[2] = -1;
	cmd = smprint("mount | awk '$1==\"%s\" && $2 == \"on\" {print $3}'", dev);
	if(threadspawnl(fd, "sh", "sh", "-c", cmd, nil) < 0)
		sysfatal("exec mount|awk (to find mtpt of %s): %r", dev);
	/* threadspawnl closed p[1] */
	n = readn(p[0], buf, sizeof buf-1);
	close(p[0]);
	if(n <= 0)
		return dev;
	buf[n] = 0;
	if((q = strchr(buf, '\n')) == nil)
		return dev;
	*q = 0;
	q = buf+strlen(buf);
	if(q>buf && *(q-1) == '/')
		*--q = 0;
	return strdup(buf);
}