/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
/* See COPYRIGHT */

#include <u.h>
#include <libc.h>
#include <mux.h>

/*
 * If you fork off two procs running muxrecvproc and muxsendproc,
 * then muxrecv/muxsend (and thus muxrpc) will never block except on 
 * rendevouses, which is nice when it's running in one thread of many.
 */
void
_muxrecvproc(void *v)
{
	void *p;
	Mux *mux;
	Muxqueue *q;

	mux = v;
	q = _muxqalloc();

	qlock(&mux->lk);
	mux->readq = q;
	qlock(&mux->inlk);
	rwakeup(&mux->rpcfork);
	qunlock(&mux->lk);

	while((p = mux->recv(mux)) != nil)
		if(_muxqsend(q, p) < 0){
			free(p);
			break;
		}
	qunlock(&mux->inlk);
	qlock(&mux->lk);
	_muxqhangup(q);
	p = nil;
	while(_muxnbqrecv(q, &p) && p != nil){
		free(p);
		p = nil;
	}
	free(q);
	mux->readq = nil;
	rwakeup(&mux->rpcfork);
	qunlock(&mux->lk);
}

void
_muxsendproc(void *v)
{
	Muxqueue *q;
	void *p;
	Mux *mux;

	mux = v;
	q = _muxqalloc();

	qlock(&mux->lk);
	mux->writeq = q;
	qlock(&mux->outlk);
	rwakeup(&mux->rpcfork);
	qunlock(&mux->lk);

	while((p = _muxqrecv(q)) != nil)
		if(mux->send(mux, p) < 0)
			break;
	qunlock(&mux->outlk);
	qlock(&mux->lk);
	_muxqhangup(q);
	while(_muxnbqrecv(q, &p))
		free(p);
	free(q);
	mux->writeq = nil;
	rwakeup(&mux->rpcfork);
	qunlock(&mux->lk);
	return;
}

int
_muxrecv(Mux *mux, int canblock, void **vp)
{
	void *p;
	int ret;

	qlock(&mux->lk);
	if(mux->readq){
		qunlock(&mux->lk);
		if(canblock){
			*vp = _muxqrecv(mux->readq);
			return 1;
		}
		return _muxnbqrecv(mux->readq, vp);
	}

	qlock(&mux->inlk);
	qunlock(&mux->lk);
	if(canblock){
		p = mux->recv(mux);
		ret = 1;
	}else{
		if(mux->nbrecv)
			ret = mux->nbrecv(mux, &p);
		else{
			/* send eof, not "no packet ready" */
			p = nil;
			ret = 1;
		}
	}
	qunlock(&mux->inlk);
	*vp = p;
	return ret;
}

int
_muxsend(Mux *mux, void *p)
{
	qlock(&mux->lk);
/*
	if(mux->state != VtStateConnected){
		packetfree(p);
		werrstr("not connected");
		qunlock(&mux->lk);
		return -1;
	}
*/
	if(mux->writeq){
		qunlock(&mux->lk);
		if(_muxqsend(mux->writeq, p) < 0){
			free(p);
			return -1;
		}
		return 0;
	}

	qlock(&mux->outlk);
	qunlock(&mux->lk);
	if(mux->send(mux, p) < 0){
		qunlock(&mux->outlk);
		/* vthangup(mux); */
		return -1;	
	}
	qunlock(&mux->outlk);
	return 0;
}