aboutsummaryrefslogtreecommitdiff
path: root/src/libventi/queue.c
blob: 3a1f6ea56d71bacf8dd503fe2ad6de5ebcb29972 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include <u.h>
#include <libc.h>
#include <venti.h>
#include "queue.h"

typedef struct Qel Qel;
struct Qel
{
	Qel *next;
	void *p;
};

struct Queue
{
	int hungup;
	QLock lk;
	Rendez r;
	Qel *head;
	Qel *tail;
};

Queue*
_vtqalloc(void)
{
	Queue *q;

	q = vtmallocz(sizeof(Queue));
	q->r.l = &q->lk;
	return q;
}

void
_vtqfree(Queue *q)
{
	Qel *e;
	
	/* Leaks the pointers e->p! */
	while(q->head){
		e = q->head;
		q->head = e->next;
		free(e);
	}
	free(q);
}

int
_vtqsend(Queue *q, void *p)
{
	Qel *e;

	e = vtmalloc(sizeof(Qel));
	qlock(&q->lk);
	if(q->hungup){
		werrstr("hungup queue");
		qunlock(&q->lk);
		return -1;
	}
	e->p = p;
	e->next = nil;
	if(q->head == nil)
		q->head = e;
	else
		q->tail->next = e;
	q->tail = e;
	rwakeup(&q->r);
	qunlock(&q->lk);
	return 0;
}

void*
_vtqrecv(Queue *q)
{
	void *p;
	Qel *e;

	qlock(&q->lk);
	while(q->head == nil && !q->hungup)
		rsleep(&q->r);
	if(q->hungup){
		qunlock(&q->lk);
		return nil;
	}
	e = q->head;
	q->head = e->next;
	qunlock(&q->lk);
	p = e->p;
	vtfree(e);
	return p;
}

void*
_vtnbqrecv(Queue *q)
{
	void *p;
	Qel *e;

	qlock(&q->lk);
	if(q->head == nil){
		qunlock(&q->lk);
		return nil;
	}
	e = q->head;
	q->head = e->next;
	qunlock(&q->lk);
	p = e->p;
	vtfree(e);
	return p;
}

void
_vtqhangup(Queue *q)
{
	qlock(&q->lk);
	q->hungup = 1;
	rwakeupall(&q->r);
	qunlock(&q->lk);
}