aboutsummaryrefslogtreecommitdiff
path: root/src/libthread/wait.c
blob: ba2301f547619feb9082e4020d911f6427005877 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#include <u.h>
#include <libc.h>
#include <thread.h>

typedef struct Waiter Waiter;

struct {
	QLock lk;
	Waitmsg **msg;
	int nmsg;
	int muxer;
	Waiter *head;
} waiting;

struct Waiter
{
	Rendez r;
	Waitmsg *msg;
	int pid;
	Waiter *next;
	Waiter *prev;
};

/* see src/libmux/mux.c */
Waitmsg*
procwait(int pid)
{
	Waiter *w;
	Waiter me;
	Waitmsg *msg;
	int i;

	memset(&me, 0, sizeof me);
	me.pid = pid;
	me.r.l = &waiting.lk;

	qlock(&waiting.lk);
	for(i=0; i<waiting.nmsg; i++){
		if(waiting.msg[i]->pid == pid){
			msg = waiting.msg[i];
			waiting.msg[i] = waiting.msg[--waiting.nmsg];
			qunlock(&waiting.lk);
			return msg;
		}
	}
	me.next = waiting.head;
	me.prev = nil;
	if(me.next)
		me.next->prev = &me;
	waiting.head = &me;
	while(waiting.muxer && me.msg==nil)
		rsleep(&me.r);

	if(!me.msg){
		if(waiting.muxer)
			abort();
		waiting.muxer = 1;
		while(!me.msg){
			qunlock(&waiting.lk);
			msg = recvp(threadwaitchan());
			qlock(&waiting.lk);
			if(msg == nil)	/* shouldn't happen */
				break;
			for(w=waiting.head; w; w=w->next)
				if(w->pid == msg->pid)
					break;
			if(w){
				if(w->prev)
					w->prev->next = w->next;
				else
					waiting.head = w->next;
				if(w->next)
					w->next->prev = w->prev;
				me.msg = msg;
				rwakeup(&w->r);
			}else{
				waiting.msg = realloc(waiting.msg, (waiting.nmsg+1)*sizeof waiting.msg[0]);
				if(waiting.msg == nil)
					sysfatal("out of memory");
				waiting.msg[waiting.nmsg++] = msg;
			}
		}
		waiting.muxer = 0;
		if(waiting.head)
			rwakeup(&waiting.head->r);
	}
	qunlock(&waiting.lk);
	if (me.msg->pid < 0) {
		free(me.msg);
		me.msg = 0;
	}
	return me.msg;
}