1 /*
2 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #include <sys/types.h>
29
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif
33
34 #ifdef HAVE_SYS_TIME_H
35 #include <sys/time.h>
36 #endif
37
38 #include <errno.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <string.h>
42 #ifdef HAVE_STDARG_H
43 #include <stdarg.h>
44 #endif
45
46 #ifdef WIN32
47 #include <winsock2.h>
48 #endif
49
50 #include "evutil.h"
51 #include "event.h"
52
53 /* prototypes */
54
55 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
56
57 static int
bufferevent_add(struct event * ev,int timeout)58 bufferevent_add(struct event *ev, int timeout)
59 {
60 struct timeval tv, *ptv = NULL;
61
62 if (timeout) {
63 evutil_timerclear(&tv);
64 tv.tv_sec = timeout;
65 ptv = &tv;
66 }
67
68 return (event_add(ev, ptv));
69 }
70
71 /*
72 * This callback is executed when the size of the input buffer changes.
73 * We use it to apply back pressure on the reading side.
74 */
75
76 void
bufferevent_read_pressure_cb(struct evbuffer * buf,size_t old,size_t now,void * arg)77 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
78 void *arg) {
79 struct bufferevent *bufev = arg;
80 /*
81 * If we are below the watermark then reschedule reading if it's
82 * still enabled.
83 */
84 if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
85 evbuffer_setcb(buf, NULL, NULL);
86
87 if (bufev->enabled & EV_READ)
88 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
89 }
90 }
91
92 static void
bufferevent_readcb(int fd,short event,void * arg)93 bufferevent_readcb(int fd, short event, void *arg)
94 {
95 struct bufferevent *bufev = arg;
96 int res = 0;
97 short what = EVBUFFER_READ;
98 size_t len;
99 int howmuch = -1;
100
101 if (event == EV_TIMEOUT) {
102 what |= EVBUFFER_TIMEOUT;
103 goto error;
104 }
105
106 /*
107 * If we have a high watermark configured then we don't want to
108 * read more data than would make us reach the watermark.
109 */
110 if (bufev->wm_read.high != 0) {
111 howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
112 /* we might have lowered the watermark, stop reading */
113 if (howmuch <= 0) {
114 struct evbuffer *buf = bufev->input;
115 event_del(&bufev->ev_read);
116 evbuffer_setcb(buf,
117 bufferevent_read_pressure_cb, bufev);
118 return;
119 }
120 }
121
122 res = evbuffer_read(bufev->input, fd, howmuch);
123 if (res == -1) {
124 if (errno == EAGAIN || errno == EINTR)
125 goto reschedule;
126 /* error case */
127 what |= EVBUFFER_ERROR;
128 } else if (res == 0) {
129 /* eof case */
130 what |= EVBUFFER_EOF;
131 }
132
133 if (res <= 0)
134 goto error;
135
136 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
137
138 /* See if this callbacks meets the water marks */
139 len = EVBUFFER_LENGTH(bufev->input);
140 if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
141 return;
142 if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
143 struct evbuffer *buf = bufev->input;
144 event_del(&bufev->ev_read);
145
146 /* Now schedule a callback for us when the buffer changes */
147 evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
148 }
149
150 /* Invoke the user callback - must always be called last */
151 if (bufev->readcb != NULL)
152 (*bufev->readcb)(bufev, bufev->cbarg);
153 return;
154
155 reschedule:
156 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
157 return;
158
159 error:
160 (*bufev->errorcb)(bufev, what, bufev->cbarg);
161 }
162
163 static void
bufferevent_writecb(int fd,short event,void * arg)164 bufferevent_writecb(int fd, short event, void *arg)
165 {
166 struct bufferevent *bufev = arg;
167 int res = 0;
168 short what = EVBUFFER_WRITE;
169
170 if (event == EV_TIMEOUT) {
171 what |= EVBUFFER_TIMEOUT;
172 goto error;
173 }
174
175 if (EVBUFFER_LENGTH(bufev->output)) {
176 res = evbuffer_write(bufev->output, fd);
177 if (res == -1) {
178 #ifndef WIN32
179 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
180 *set errno. thus this error checking is not portable*/
181 if (errno == EAGAIN ||
182 errno == EINTR ||
183 errno == EINPROGRESS)
184 goto reschedule;
185 /* error case */
186 what |= EVBUFFER_ERROR;
187
188 #else
189 goto reschedule;
190 #endif
191
192 } else if (res == 0) {
193 /* eof case */
194 what |= EVBUFFER_EOF;
195 }
196 if (res <= 0)
197 goto error;
198 }
199
200 if (EVBUFFER_LENGTH(bufev->output) != 0)
201 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
202
203 /*
204 * Invoke the user callback if our buffer is drained or below the
205 * low watermark.
206 */
207 if (bufev->writecb != NULL &&
208 EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
209 (*bufev->writecb)(bufev, bufev->cbarg);
210
211 return;
212
213 reschedule:
214 if (EVBUFFER_LENGTH(bufev->output) != 0)
215 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
216 return;
217
218 error:
219 (*bufev->errorcb)(bufev, what, bufev->cbarg);
220 }
221
222 /*
223 * Create a new buffered event object.
224 *
225 * The read callback is invoked whenever we read new data.
226 * The write callback is invoked whenever the output buffer is drained.
227 * The error callback is invoked on a write/read error or on EOF.
228 *
229 * Both read and write callbacks maybe NULL. The error callback is not
230 * allowed to be NULL and have to be provided always.
231 */
232
233 struct bufferevent *
bufferevent_new(int fd,evbuffercb readcb,evbuffercb writecb,everrorcb errorcb,void * cbarg)234 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
235 everrorcb errorcb, void *cbarg)
236 {
237 struct bufferevent *bufev;
238
239 if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
240 return (NULL);
241
242 if ((bufev->input = evbuffer_new()) == NULL) {
243 free(bufev);
244 return (NULL);
245 }
246
247 if ((bufev->output = evbuffer_new()) == NULL) {
248 evbuffer_free(bufev->input);
249 free(bufev);
250 return (NULL);
251 }
252
253 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
254 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
255
256 bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
257
258 /*
259 * Set to EV_WRITE so that using bufferevent_write is going to
260 * trigger a callback. Reading needs to be explicitly enabled
261 * because otherwise no data will be available.
262 */
263 bufev->enabled = EV_WRITE;
264
265 return (bufev);
266 }
267
268 void
bufferevent_setcb(struct bufferevent * bufev,evbuffercb readcb,evbuffercb writecb,everrorcb errorcb,void * cbarg)269 bufferevent_setcb(struct bufferevent *bufev,
270 evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
271 {
272 bufev->readcb = readcb;
273 bufev->writecb = writecb;
274 bufev->errorcb = errorcb;
275
276 bufev->cbarg = cbarg;
277 }
278
279 void
bufferevent_setfd(struct bufferevent * bufev,int fd)280 bufferevent_setfd(struct bufferevent *bufev, int fd)
281 {
282 event_del(&bufev->ev_read);
283 event_del(&bufev->ev_write);
284
285 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
286 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
287 if (bufev->ev_base != NULL) {
288 event_base_set(bufev->ev_base, &bufev->ev_read);
289 event_base_set(bufev->ev_base, &bufev->ev_write);
290 }
291
292 /* might have to manually trigger event registration */
293 }
294
295 int
bufferevent_priority_set(struct bufferevent * bufev,int priority)296 bufferevent_priority_set(struct bufferevent *bufev, int priority)
297 {
298 if (event_priority_set(&bufev->ev_read, priority) == -1)
299 return (-1);
300 if (event_priority_set(&bufev->ev_write, priority) == -1)
301 return (-1);
302
303 return (0);
304 }
305
306 /* Closing the file descriptor is the responsibility of the caller */
307
308 void
bufferevent_free(struct bufferevent * bufev)309 bufferevent_free(struct bufferevent *bufev)
310 {
311 event_del(&bufev->ev_read);
312 event_del(&bufev->ev_write);
313
314 evbuffer_free(bufev->input);
315 evbuffer_free(bufev->output);
316
317 free(bufev);
318 }
319
320 /*
321 * Returns 0 on success;
322 * -1 on failure.
323 */
324
325 int
bufferevent_write(struct bufferevent * bufev,const void * data,size_t size)326 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
327 {
328 int res;
329
330 res = evbuffer_add(bufev->output, data, size);
331
332 if (res == -1)
333 return (res);
334
335 /* If everything is okay, we need to schedule a write */
336 if (size > 0 && (bufev->enabled & EV_WRITE))
337 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
338
339 return (res);
340 }
341
342 int
bufferevent_write_buffer(struct bufferevent * bufev,struct evbuffer * buf)343 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
344 {
345 int res;
346
347 res = bufferevent_write(bufev, buf->buffer, buf->off);
348 if (res != -1)
349 evbuffer_drain(buf, buf->off);
350
351 return (res);
352 }
353
354 size_t
bufferevent_read(struct bufferevent * bufev,void * data,size_t size)355 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
356 {
357 struct evbuffer *buf = bufev->input;
358
359 if (buf->off < size)
360 size = buf->off;
361
362 /* Copy the available data to the user buffer */
363 memcpy(data, buf->buffer, size);
364
365 if (size)
366 evbuffer_drain(buf, size);
367
368 return (size);
369 }
370
371 int
bufferevent_enable(struct bufferevent * bufev,short event)372 bufferevent_enable(struct bufferevent *bufev, short event)
373 {
374 if (event & EV_READ) {
375 if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
376 return (-1);
377 }
378 if (event & EV_WRITE) {
379 if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
380 return (-1);
381 }
382
383 bufev->enabled |= event;
384 return (0);
385 }
386
387 int
bufferevent_disable(struct bufferevent * bufev,short event)388 bufferevent_disable(struct bufferevent *bufev, short event)
389 {
390 if (event & EV_READ) {
391 if (event_del(&bufev->ev_read) == -1)
392 return (-1);
393 }
394 if (event & EV_WRITE) {
395 if (event_del(&bufev->ev_write) == -1)
396 return (-1);
397 }
398
399 bufev->enabled &= ~event;
400 return (0);
401 }
402
403 /*
404 * Sets the read and write timeout for a buffered event.
405 */
406
407 void
bufferevent_settimeout(struct bufferevent * bufev,int timeout_read,int timeout_write)408 bufferevent_settimeout(struct bufferevent *bufev,
409 int timeout_read, int timeout_write) {
410 bufev->timeout_read = timeout_read;
411 bufev->timeout_write = timeout_write;
412
413 if (event_pending(&bufev->ev_read, EV_READ, NULL))
414 bufferevent_add(&bufev->ev_read, timeout_read);
415 if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
416 bufferevent_add(&bufev->ev_write, timeout_write);
417 }
418
419 /*
420 * Sets the water marks
421 */
422
423 void
bufferevent_setwatermark(struct bufferevent * bufev,short events,size_t lowmark,size_t highmark)424 bufferevent_setwatermark(struct bufferevent *bufev, short events,
425 size_t lowmark, size_t highmark)
426 {
427 if (events & EV_READ) {
428 bufev->wm_read.low = lowmark;
429 bufev->wm_read.high = highmark;
430 }
431
432 if (events & EV_WRITE) {
433 bufev->wm_write.low = lowmark;
434 bufev->wm_write.high = highmark;
435 }
436
437 /* If the watermarks changed then see if we should call read again */
438 bufferevent_read_pressure_cb(bufev->input,
439 0, EVBUFFER_LENGTH(bufev->input), bufev);
440 }
441
442 int
bufferevent_base_set(struct event_base * base,struct bufferevent * bufev)443 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
444 {
445 int res;
446
447 bufev->ev_base = base;
448
449 res = event_base_set(base, &bufev->ev_read);
450 if (res == -1)
451 return (res);
452
453 res = event_base_set(base, &bufev->ev_write);
454 return (res);
455 }
456