aboutsummaryrefslogtreecommitdiff
path: root/src/libthread
diff options
context:
space:
mode:
Diffstat (limited to 'src/libthread')
-rw-r--r--src/libthread/channel.c18
-rw-r--r--src/libthread/create.c4
-rw-r--r--src/libthread/exec-unix.c139
-rw-r--r--src/libthread/main.c7
-rw-r--r--src/libthread/mkfile2
-rw-r--r--src/libthread/proctab.c19
-rw-r--r--src/libthread/sched.c34
-rw-r--r--src/libthread/threadimpl.h3
8 files changed, 124 insertions, 102 deletions
diff --git a/src/libthread/channel.c b/src/libthread/channel.c
index d1ec2985..4f642c84 100644
--- a/src/libthread/channel.c
+++ b/src/libthread/channel.c
@@ -65,9 +65,7 @@ chaninit(Channel *c, int elemsize, int elemcnt)
{
if(elemcnt < 0 || elemsize <= 0 || c == nil)
return -1;
- c->f = 0;
- c->n = 0;
- c->freed = 0;
+ memset(c, 0, sizeof *c);
c->e = elemsize;
c->s = elemcnt;
_threaddebug(DBGCHAN, "chaninit %p", c);
@@ -104,13 +102,16 @@ alt(Alt *alts)
* chanlock. Instead, we delay the note until we've dropped
* the lock.
*/
+
+ /*
+ * T might be nil here -- the scheduler sends on threadwaitchan
+ * directly (in non-blocking mode, of course!).
+ */
t = _threadgetproc()->thread;
- if(t->moribund || _threadexitsallstatus)
+ if((t && t->moribund) || _threadexitsallstatus)
yield(); /* won't return */
s = _procsplhi();
lock(&chanlock);
- t->alt = alts;
- t->chan = Chanalt;
/* test whether any channels can proceed */
n = 0;
@@ -125,7 +126,6 @@ alt(Alt *alts)
if(c==nil){
unlock(&chanlock);
_procsplx(s);
- t->chan = Channone;
return -1;
}
if(canexec(xa))
@@ -138,7 +138,6 @@ alt(Alt *alts)
if(xa->op == CHANNOBLK){
unlock(&chanlock);
_procsplx(s);
- t->chan = Channone;
_threadnalt++;
return xa - alts;
}
@@ -159,6 +158,9 @@ _threadnalt++;
* we need to be here.
*/
Again:
+ t->alt = alts;
+ t->chan = Chanalt;
+
unlock(&chanlock);
_procsplx(s);
r = _threadrendezvous((ulong)&c, 0);
diff --git a/src/libthread/create.c b/src/libthread/create.c
index d487e195..518eaae6 100644
--- a/src/libthread/create.c
+++ b/src/libthread/create.c
@@ -86,6 +86,7 @@ proccreate(void (*f)(void*), void *arg, uint stacksize)
p = _threadgetproc();
if(p->idle){
+ fprint(2, "cannot create procs once there is an idle thread\n");
werrstr("cannot create procs once there is an idle thread");
return -1;
}
@@ -124,6 +125,7 @@ threadcreateidle(void (*f)(void *arg), void *arg, uint stacksize)
int id;
if(_threadprocs!=1){
+ fprint(2, "cannot have idle thread in multi-proc program\n");
werrstr("cannot have idle thread in multi-proc program");
return -1;
}
@@ -153,6 +155,8 @@ _newproc(void (*f)(void *arg), void *arg, uint stacksize, char *name, int grp, i
else
*_threadpq.tail = p;
_threadpq.tail = &p->next;
+ if(_threadprocs == 1)
+ _threadmultiproc();
_threadprocs++;
unlock(&_threadpq.lock);
return p;
diff --git a/src/libthread/exec-unix.c b/src/libthread/exec-unix.c
index c04d414c..bfffa14d 100644
--- a/src/libthread/exec-unix.c
+++ b/src/libthread/exec-unix.c
@@ -2,28 +2,18 @@
#include <unistd.h>
#include "threadimpl.h"
+static void efork(int[3], int[2], char*, char**);
void
-procexec(Channel *pidc, int fd[3], char *prog, char *args[])
+threadexec(Channel *pidc, int fd[3], char *prog, char *args[])
{
- int n;
- Proc *p;
- Thread *t;
-
- _threaddebug(DBGEXEC, "procexec %s", prog);
- /* must be only thread in proc */
- p = _threadgetproc();
- t = p->thread;
- if(p->threads.head != t || p->threads.head->nextt != nil){
- werrstr("not only thread in proc");
- Bad:
- _threaddebug(DBGEXEC, "procexec bad %r");
- if(pidc)
- sendul(pidc, ~0);
- return;
- }
+ int pfd[2];
+ int n, pid;
+ char exitstr[ERRMAX];
+ _threaddebug(DBGEXEC, "threadexec %s", prog);
+
/*
- * We want procexec to behave like exec; if exec succeeds,
+ * We want threadexec to behave like exec; if exec succeeds,
* never return, and if it fails, return with errstr set.
* Unfortunately, the exec happens in another proc since
* we have to wait for the exec'ed process to finish.
@@ -34,114 +24,77 @@ procexec(Channel *pidc, int fd[3], char *prog, char *args[])
* then the proc doing the exec sends the errstr down the
* pipe to us.
*/
- if(pipe(p->exec.fd) < 0)
+ if(pipe(pfd) < 0)
goto Bad;
- if(fcntl(p->exec.fd[0], F_SETFD, 1) < 0)
+ if(fcntl(pfd[0], F_SETFD, 1) < 0)
goto Bad;
- if(fcntl(p->exec.fd[1], F_SETFD, 1) < 0)
+ if(fcntl(pfd[1], F_SETFD, 1) < 0)
goto Bad;
- /* exec in parallel via the scheduler */
- assert(p->needexec==0);
- p->exec.prog = prog;
- p->exec.args = args;
- p->exec.stdfd = fd;
- p->needexec = 1;
- _sched();
+ switch(pid = fork()){
+ case -1:
+ close(pfd[0]);
+ close(pfd[1]);
+ goto Bad;
+ case 0:
+ efork(fd, pfd, prog, args);
+ _exit(0);
+ default:
+ break;
+ }
- close(p->exec.fd[1]);
- if((n = read(p->exec.fd[0], p->exitstr, ERRMAX-1)) > 0){ /* exec failed */
- p->exitstr[n] = '\0';
- errstr(p->exitstr, ERRMAX);
- close(p->exec.fd[0]);
+ close(pfd[1]);
+ if((n = read(pfd[0], exitstr, ERRMAX-1)) > 0){ /* exec failed */
+ exitstr[n] = '\0';
+ errstr(exitstr, ERRMAX);
+ close(pfd[0]);
goto Bad;
}
- close(p->exec.fd[0]);
+ close(pfd[0]);
close(fd[0]);
if(fd[1] != fd[0])
close(fd[1]);
if(fd[2] != fd[1] && fd[2] != fd[0])
close(fd[2]);
if(pidc)
- sendul(pidc, t->ret);
+ sendul(pidc, pid);
- _threaddebug(DBGEXEC, "procexec schedexecwait");
- /* wait for exec'ed program, then exit */
- _schedexecwait();
-}
+ _threaddebug(DBGEXEC, "threadexec schedexecwait");
+ threadexits(0);
-void
-procexecl(Channel *pidc, int fd[3], char *f, ...)
-{
- procexec(pidc, fd, f, &f+1);
+Bad:
+ _threaddebug(DBGEXEC, "threadexec bad %r");
+ if(pidc)
+ sendul(pidc, ~0);
}
void
-_schedexecwait(void)
+threadexecl(Channel *pidc, int fd[3], char *f, ...)
{
- int pid;
- Channel *c;
- Proc *p;
- Thread *t;
- Waitmsg *w;
-
- p = _threadgetproc();
- t = p->thread;
- pid = t->ret;
- _threaddebug(DBGEXEC, "_schedexecwait %d", t->ret);
-
- for(;;){
- w = wait();
- if(w == nil)
- break;
- if(w->pid == pid)
- break;
- free(w);
- }
- if(w != nil){
- if((c = _threadwaitchan) != nil)
- sendp(c, w);
- else
- free(w);
- }
- threadexits("procexec");
+ threadexec(pidc, fd, f, &f+1);
}
static void
-efork(void *ve)
+efork(int stdfd[3], int fd[2], char *prog, char **args)
{
char buf[ERRMAX];
- Execargs *e;
int i;
- e = ve;
- _threaddebug(DBGEXEC, "_schedexec %s -- calling execv", e->prog);
- dup(e->stdfd[0], 0);
- dup(e->stdfd[1], 1);
- dup(e->stdfd[2], 2);
+ _threaddebug(DBGEXEC, "_schedexec %s -- calling execv", prog);
+ dup(stdfd[0], 0);
+ dup(stdfd[1], 1);
+ dup(stdfd[2], 2);
for(i=3; i<40; i++)
- if(i != e->fd[1])
+ if(i != fd[1])
close(i);
rfork(RFNOTEG);
- execvp(e->prog, e->args);
+ execvp(prog, args);
_threaddebug(DBGEXEC, "_schedexec failed: %r");
rerrstr(buf, sizeof buf);
if(buf[0]=='\0')
strcpy(buf, "exec failed");
- write(e->fd[1], buf, strlen(buf));
- close(e->fd[1]);
+ write(fd[1], buf, strlen(buf));
+ close(fd[1]);
_exits(buf);
}
-int
-_schedexec(Execargs *e)
-{
- int pid;
-
- pid = fork();
- if(pid == 0){
- efork(e);
- _exit(1);
- }
- return pid;
-}
diff --git a/src/libthread/main.c b/src/libthread/main.c
index 97a6154a..53061471 100644
--- a/src/libthread/main.c
+++ b/src/libthread/main.c
@@ -24,6 +24,12 @@ _threaddie(int x)
exit(_threadexitsallstatus[0] ? 1 : 0);
}
+static void
+_nop(int x)
+{
+ USED(x);
+}
+
int
main(int argc, char **argv)
{
@@ -31,6 +37,7 @@ main(int argc, char **argv)
Proc *p;
signal(SIGTERM, _threaddie);
+ signal(SIGCHLD, _nop);
// rfork(RFREND);
//_threaddebuglevel = (DBGSCHED|DBGCHAN|DBGREND)^~0;
diff --git a/src/libthread/mkfile b/src/libthread/mkfile
index 8854cde7..58a757b3 100644
--- a/src/libthread/mkfile
+++ b/src/libthread/mkfile
@@ -12,6 +12,7 @@ OFILES=\
debug.$O\
exec-unix.$O\
exit.$O\
+ fdwait.$O\
getpid.$O\
id.$O\
iocall.$O\
@@ -30,6 +31,7 @@ OFILES=\
memsetd.$O\
note.$O\
proctab.$O\
+ read9pmsg.$O\
ref.$O\
rendez.$O\
sched.$O\
diff --git a/src/libthread/proctab.c b/src/libthread/proctab.c
index 5e5dcb2b..ec28d676 100644
--- a/src/libthread/proctab.c
+++ b/src/libthread/proctab.c
@@ -6,6 +6,18 @@ enum
PTABHASH = 257,
};
+static int multi;
+static Proc *theproc;
+
+void
+_threadmultiproc(void)
+{
+ if(multi == 0){
+ multi = 1;
+ _threadsetproc(theproc);
+ }
+}
+
static Lock ptablock;
Proc *ptab[PTABHASH];
@@ -14,6 +26,10 @@ _threadsetproc(Proc *p)
{
int h;
+ if(!multi){
+ theproc = p;
+ return;
+ }
lock(&ptablock);
h = ((unsigned)p->pid)%PTABHASH;
p->link = ptab[h];
@@ -27,6 +43,9 @@ __threadgetproc(int rm)
Proc **l, *p;
int h, pid;
+ if(!multi)
+ return theproc;
+
pid = _threadgetpid();
lock(&ptablock);
diff --git a/src/libthread/sched.c b/src/libthread/sched.c
index d85a76e2..755fc280 100644
--- a/src/libthread/sched.c
+++ b/src/libthread/sched.c
@@ -1,4 +1,5 @@
#include <signal.h>
+#include <errno.h>
#include "threadimpl.h"
//static Thread *runthread(Proc*);
@@ -67,10 +68,12 @@ _schedinit(void *arg)
t = nil;
_sched();
}
+/*
if(p->needexec){
t->ret = _schedexec(&p->exec);
p->needexec = 0;
}
+*/
if(p->newproc){
t->ret = _schedfork(p->newproc);
if(t->ret < 0){
@@ -90,14 +93,45 @@ _schedinit(void *arg)
static Thread*
runthread(Proc *p)
{
+ Channel *c;
Thread *t;
Tqueue *q;
+ Waitmsg *w;
+ int e, sent;
if(p->nthreads==0 || (p->nthreads==1 && p->idle))
return nil;
q = &p->ready;
+relock:
lock(&p->readylock);
if(q->head == nil){
+ e = errno;
+ if((c = _threadwaitchan) != nil){
+ if(c->n <= c->s){
+ sent = 0;
+ for(;;){
+ if((w = p->waitmsg) != nil)
+ p->waitmsg = nil;
+ else
+ w = waitnohang();
+ if(w == nil)
+ break;
+ if(sent == 0){
+ unlock(&p->readylock);
+ sent = 1;
+ }
+ if(nbsendp(c, w) != 1)
+ break;
+ }
+ p->waitmsg = w;
+ if(sent)
+ goto relock;
+ }
+ }else{
+ while((w = waitnohang()) != nil)
+ free(w);
+ }
+ errno = e;
if(p->idle){
if(p->idle->state != Ready){
fprint(2, "everyone is asleep\n");
diff --git a/src/libthread/threadimpl.h b/src/libthread/threadimpl.h
index 0dd1e870..590342ab 100644
--- a/src/libthread/threadimpl.h
+++ b/src/libthread/threadimpl.h
@@ -139,6 +139,7 @@ struct Proc
void *arg; /* passed between shared and unshared stk */
char str[ERRMAX]; /* used by threadexits to avoid malloc */
char errbuf[ERRMAX]; /* errstr */
+ Waitmsg *waitmsg;
void* udata; /* User per-proc data pointer */
};
@@ -181,6 +182,7 @@ void __threaddebug(ulong, char*, ...);
void _threadexitsall(char*);
void _threadflagrendez(Thread*);
Proc* _threadgetproc(void);
+extern void _threadmultiproc(void);
Proc* _threaddelproc(void);
void _threadsetproc(Proc*);
void _threadinitstack(Thread*, void(*)(void*), void*);
@@ -195,7 +197,6 @@ long _xdec(long*);
void _xinc(long*);
void _threadremove(Proc*, Thread*);
-extern int _threadmultiproc;
extern int _threaddebuglevel;
extern char* _threadexitsallstatus;
extern Pqueue _threadpq;