diff options
author | rsc <devnull@localhost> | 2003-10-14 02:35:00 +0000 |
---|---|---|
committer | rsc <devnull@localhost> | 2003-10-14 02:35:00 +0000 |
commit | 3d7e9092a436b707f2160fb869ab68e2a222bc4e (patch) | |
tree | 1ef451afc85f7ecba3c8f97ad5222b7ccc515454 /src/libplumb/event.c | |
parent | 169aba14a4766b3d15695ef27681d0f1d04f8521 (diff) | |
download | plan9port-3d7e9092a436b707f2160fb869ab68e2a222bc4e.tar.gz plan9port-3d7e9092a436b707f2160fb869ab68e2a222bc4e.tar.bz2 plan9port-3d7e9092a436b707f2160fb869ab68e2a222bc4e.zip |
Single-threaded plumber that can run "start" rules.
Thanks to Caerwyn Jones.
Diffstat (limited to 'src/libplumb/event.c')
-rwxr-xr-x | src/libplumb/event.c | 108 |
1 files changed, 108 insertions, 0 deletions
diff --git a/src/libplumb/event.c b/src/libplumb/event.c new file mode 100755 index 00000000..36a95d68 --- /dev/null +++ b/src/libplumb/event.c @@ -0,0 +1,108 @@ +#include <u.h> +#include <libc.h> +#include <draw.h> +#include <event.h> +#include "plumb.h" + +typedef struct EQueue EQueue; + +struct EQueue +{ + int id; + char *buf; + int nbuf; + EQueue *next; +}; + +static EQueue *equeue; +static Lock eqlock; + +static +int +partial(int id, Event *e, uchar *b, int n) +{ + EQueue *eq, *p; + int nmore; + + lock(&eqlock); + for(eq = equeue; eq != nil; eq = eq->next) + if(eq->id == id) + break; + unlock(&eqlock); + if(eq == nil) + return 0; + /* partial message exists for this id */ + eq->buf = realloc(eq->buf, eq->nbuf+n); + if(eq->buf == nil) + drawerror(display, "eplumb: cannot allocate buffer"); + memmove(eq->buf+eq->nbuf, b, n); + eq->nbuf += n; + e->v = plumbunpackpartial((char*)eq->buf, eq->nbuf, &nmore); + if(nmore == 0){ /* no more to read in this message */ + lock(&eqlock); + if(eq == equeue) + equeue = eq->next; + else{ + for(p = equeue; p!=nil && p->next!=eq; p = p->next) + ; + if(p == nil) + drawerror(display, "eplumb: bad event queue"); + p->next = eq->next; + } + unlock(&eqlock); + free(eq->buf); + free(eq); + } + return 1; +} + +static +void +addpartial(int id, char *b, int n) +{ + EQueue *eq; + + eq = malloc(sizeof(EQueue)); + if(eq == nil) + return; + eq->id = id; + eq->nbuf = n; + eq->buf = malloc(n); + if(eq->buf == nil){ + free(eq); + return; + } + memmove(eq->buf, b, n); + lock(&eqlock); + eq->next = equeue; + equeue = eq; + unlock(&eqlock); +} + +static +int +plumbevent(int id, Event *e, uchar *b, int n) +{ + int nmore; + + if(partial(id, e, b, n) == 0){ + /* no partial message already waiting for this id */ + e->v = plumbunpackpartial((char*)b, n, &nmore); + if(nmore > 0) /* incomplete message */ + addpartial(id, (char*)b, n); + } + if(e->v == nil) + return 0; + return id; +} + +int +eplumb(int key, char *port) +{ + int fd; + + fd = plumbopen(port, OREAD|OCEXEC); + if(fd < 0) + return -1; + return estartfn(key, fd, 8192, plumbevent); +} |