• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  * 3. The name of the author may not be used to endorse or promote products
14  *    derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include <sys/types.h>
29 
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif
33 
34 #ifdef HAVE_SYS_TIME_H
35 #include <sys/time.h>
36 #endif
37 
38 #include <errno.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <string.h>
42 #ifdef HAVE_STDARG_H
43 #include <stdarg.h>
44 #endif
45 
46 #ifdef WIN32
47 #include <winsock2.h>
48 #endif
49 
50 #include "evutil.h"
51 #include "event.h"
52 
53 /* prototypes */
54 
55 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
56 
57 static int
bufferevent_add(struct event * ev,int timeout)58 bufferevent_add(struct event *ev, int timeout)
59 {
60 	struct timeval tv, *ptv = NULL;
61 
62 	if (timeout) {
63 		evutil_timerclear(&tv);
64 		tv.tv_sec = timeout;
65 		ptv = &tv;
66 	}
67 
68 	return (event_add(ev, ptv));
69 }
70 
71 /*
72  * This callback is executed when the size of the input buffer changes.
73  * We use it to apply back pressure on the reading side.
74  */
75 
76 void
bufferevent_read_pressure_cb(struct evbuffer * buf,size_t old,size_t now,void * arg)77 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
78     void *arg) {
79 	struct bufferevent *bufev = arg;
80 	/*
81 	 * If we are below the watermark then reschedule reading if it's
82 	 * still enabled.
83 	 */
84 	if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
85 		evbuffer_setcb(buf, NULL, NULL);
86 
87 		if (bufev->enabled & EV_READ)
88 			bufferevent_add(&bufev->ev_read, bufev->timeout_read);
89 	}
90 }
91 
92 static void
bufferevent_readcb(int fd,short event,void * arg)93 bufferevent_readcb(int fd, short event, void *arg)
94 {
95 	struct bufferevent *bufev = arg;
96 	int res = 0;
97 	short what = EVBUFFER_READ;
98 	size_t len;
99 	int howmuch = -1;
100 
101 	if (event == EV_TIMEOUT) {
102 		what |= EVBUFFER_TIMEOUT;
103 		goto error;
104 	}
105 
106 	/*
107 	 * If we have a high watermark configured then we don't want to
108 	 * read more data than would make us reach the watermark.
109 	 */
110 	if (bufev->wm_read.high != 0) {
111 		howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
112 		/* we might have lowered the watermark, stop reading */
113 		if (howmuch <= 0) {
114 			struct evbuffer *buf = bufev->input;
115 			event_del(&bufev->ev_read);
116 			evbuffer_setcb(buf,
117 			    bufferevent_read_pressure_cb, bufev);
118 			return;
119 		}
120 	}
121 
122 	res = evbuffer_read(bufev->input, fd, howmuch);
123 	if (res == -1) {
124 		if (errno == EAGAIN || errno == EINTR)
125 			goto reschedule;
126 		/* error case */
127 		what |= EVBUFFER_ERROR;
128 	} else if (res == 0) {
129 		/* eof case */
130 		what |= EVBUFFER_EOF;
131 	}
132 
133 	if (res <= 0)
134 		goto error;
135 
136 	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
137 
138 	/* See if this callbacks meets the water marks */
139 	len = EVBUFFER_LENGTH(bufev->input);
140 	if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
141 		return;
142 	if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
143 		struct evbuffer *buf = bufev->input;
144 		event_del(&bufev->ev_read);
145 
146 		/* Now schedule a callback for us when the buffer changes */
147 		evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
148 	}
149 
150 	/* Invoke the user callback - must always be called last */
151 	if (bufev->readcb != NULL)
152 		(*bufev->readcb)(bufev, bufev->cbarg);
153 	return;
154 
155  reschedule:
156 	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
157 	return;
158 
159  error:
160 	(*bufev->errorcb)(bufev, what, bufev->cbarg);
161 }
162 
163 static void
bufferevent_writecb(int fd,short event,void * arg)164 bufferevent_writecb(int fd, short event, void *arg)
165 {
166 	struct bufferevent *bufev = arg;
167 	int res = 0;
168 	short what = EVBUFFER_WRITE;
169 
170 	if (event == EV_TIMEOUT) {
171 		what |= EVBUFFER_TIMEOUT;
172 		goto error;
173 	}
174 
175 	if (EVBUFFER_LENGTH(bufev->output)) {
176 	    res = evbuffer_write(bufev->output, fd);
177 	    if (res == -1) {
178 #ifndef WIN32
179 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
180  *set errno. thus this error checking is not portable*/
181 		    if (errno == EAGAIN ||
182 			errno == EINTR ||
183 			errno == EINPROGRESS)
184 			    goto reschedule;
185 		    /* error case */
186 		    what |= EVBUFFER_ERROR;
187 
188 #else
189 				goto reschedule;
190 #endif
191 
192 	    } else if (res == 0) {
193 		    /* eof case */
194 		    what |= EVBUFFER_EOF;
195 	    }
196 	    if (res <= 0)
197 		    goto error;
198 	}
199 
200 	if (EVBUFFER_LENGTH(bufev->output) != 0)
201 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
202 
203 	/*
204 	 * Invoke the user callback if our buffer is drained or below the
205 	 * low watermark.
206 	 */
207 	if (bufev->writecb != NULL &&
208 	    EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
209 		(*bufev->writecb)(bufev, bufev->cbarg);
210 
211 	return;
212 
213  reschedule:
214 	if (EVBUFFER_LENGTH(bufev->output) != 0)
215 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
216 	return;
217 
218  error:
219 	(*bufev->errorcb)(bufev, what, bufev->cbarg);
220 }
221 
222 /*
223  * Create a new buffered event object.
224  *
225  * The read callback is invoked whenever we read new data.
226  * The write callback is invoked whenever the output buffer is drained.
227  * The error callback is invoked on a write/read error or on EOF.
228  *
229  * Both read and write callbacks maybe NULL.  The error callback is not
230  * allowed to be NULL and have to be provided always.
231  */
232 
233 struct bufferevent *
bufferevent_new(int fd,evbuffercb readcb,evbuffercb writecb,everrorcb errorcb,void * cbarg)234 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
235     everrorcb errorcb, void *cbarg)
236 {
237 	struct bufferevent *bufev;
238 
239 	if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
240 		return (NULL);
241 
242 	if ((bufev->input = evbuffer_new()) == NULL) {
243 		free(bufev);
244 		return (NULL);
245 	}
246 
247 	if ((bufev->output = evbuffer_new()) == NULL) {
248 		evbuffer_free(bufev->input);
249 		free(bufev);
250 		return (NULL);
251 	}
252 
253 	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
254 	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
255 
256 	bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
257 
258 	/*
259 	 * Set to EV_WRITE so that using bufferevent_write is going to
260 	 * trigger a callback.  Reading needs to be explicitly enabled
261 	 * because otherwise no data will be available.
262 	 */
263 	bufev->enabled = EV_WRITE;
264 
265 	return (bufev);
266 }
267 
268 void
bufferevent_setcb(struct bufferevent * bufev,evbuffercb readcb,evbuffercb writecb,everrorcb errorcb,void * cbarg)269 bufferevent_setcb(struct bufferevent *bufev,
270     evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
271 {
272 	bufev->readcb = readcb;
273 	bufev->writecb = writecb;
274 	bufev->errorcb = errorcb;
275 
276 	bufev->cbarg = cbarg;
277 }
278 
279 void
bufferevent_setfd(struct bufferevent * bufev,int fd)280 bufferevent_setfd(struct bufferevent *bufev, int fd)
281 {
282 	event_del(&bufev->ev_read);
283 	event_del(&bufev->ev_write);
284 
285 	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
286 	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
287 	if (bufev->ev_base != NULL) {
288 		event_base_set(bufev->ev_base, &bufev->ev_read);
289 		event_base_set(bufev->ev_base, &bufev->ev_write);
290 	}
291 
292 	/* might have to manually trigger event registration */
293 }
294 
295 int
bufferevent_priority_set(struct bufferevent * bufev,int priority)296 bufferevent_priority_set(struct bufferevent *bufev, int priority)
297 {
298 	if (event_priority_set(&bufev->ev_read, priority) == -1)
299 		return (-1);
300 	if (event_priority_set(&bufev->ev_write, priority) == -1)
301 		return (-1);
302 
303 	return (0);
304 }
305 
306 /* Closing the file descriptor is the responsibility of the caller */
307 
308 void
bufferevent_free(struct bufferevent * bufev)309 bufferevent_free(struct bufferevent *bufev)
310 {
311 	event_del(&bufev->ev_read);
312 	event_del(&bufev->ev_write);
313 
314 	evbuffer_free(bufev->input);
315 	evbuffer_free(bufev->output);
316 
317 	free(bufev);
318 }
319 
320 /*
321  * Returns 0 on success;
322  *        -1 on failure.
323  */
324 
325 int
bufferevent_write(struct bufferevent * bufev,const void * data,size_t size)326 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
327 {
328 	int res;
329 
330 	res = evbuffer_add(bufev->output, data, size);
331 
332 	if (res == -1)
333 		return (res);
334 
335 	/* If everything is okay, we need to schedule a write */
336 	if (size > 0 && (bufev->enabled & EV_WRITE))
337 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
338 
339 	return (res);
340 }
341 
342 int
bufferevent_write_buffer(struct bufferevent * bufev,struct evbuffer * buf)343 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
344 {
345 	int res;
346 
347 	res = bufferevent_write(bufev, buf->buffer, buf->off);
348 	if (res != -1)
349 		evbuffer_drain(buf, buf->off);
350 
351 	return (res);
352 }
353 
354 size_t
bufferevent_read(struct bufferevent * bufev,void * data,size_t size)355 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
356 {
357 	struct evbuffer *buf = bufev->input;
358 
359 	if (buf->off < size)
360 		size = buf->off;
361 
362 	/* Copy the available data to the user buffer */
363 	memcpy(data, buf->buffer, size);
364 
365 	if (size)
366 		evbuffer_drain(buf, size);
367 
368 	return (size);
369 }
370 
371 int
bufferevent_enable(struct bufferevent * bufev,short event)372 bufferevent_enable(struct bufferevent *bufev, short event)
373 {
374 	if (event & EV_READ) {
375 		if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
376 			return (-1);
377 	}
378 	if (event & EV_WRITE) {
379 		if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
380 			return (-1);
381 	}
382 
383 	bufev->enabled |= event;
384 	return (0);
385 }
386 
387 int
bufferevent_disable(struct bufferevent * bufev,short event)388 bufferevent_disable(struct bufferevent *bufev, short event)
389 {
390 	if (event & EV_READ) {
391 		if (event_del(&bufev->ev_read) == -1)
392 			return (-1);
393 	}
394 	if (event & EV_WRITE) {
395 		if (event_del(&bufev->ev_write) == -1)
396 			return (-1);
397 	}
398 
399 	bufev->enabled &= ~event;
400 	return (0);
401 }
402 
403 /*
404  * Sets the read and write timeout for a buffered event.
405  */
406 
407 void
bufferevent_settimeout(struct bufferevent * bufev,int timeout_read,int timeout_write)408 bufferevent_settimeout(struct bufferevent *bufev,
409     int timeout_read, int timeout_write) {
410 	bufev->timeout_read = timeout_read;
411 	bufev->timeout_write = timeout_write;
412 
413 	if (event_pending(&bufev->ev_read, EV_READ, NULL))
414 		bufferevent_add(&bufev->ev_read, timeout_read);
415 	if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
416 		bufferevent_add(&bufev->ev_write, timeout_write);
417 }
418 
419 /*
420  * Sets the water marks
421  */
422 
423 void
bufferevent_setwatermark(struct bufferevent * bufev,short events,size_t lowmark,size_t highmark)424 bufferevent_setwatermark(struct bufferevent *bufev, short events,
425     size_t lowmark, size_t highmark)
426 {
427 	if (events & EV_READ) {
428 		bufev->wm_read.low = lowmark;
429 		bufev->wm_read.high = highmark;
430 	}
431 
432 	if (events & EV_WRITE) {
433 		bufev->wm_write.low = lowmark;
434 		bufev->wm_write.high = highmark;
435 	}
436 
437 	/* If the watermarks changed then see if we should call read again */
438 	bufferevent_read_pressure_cb(bufev->input,
439 	    0, EVBUFFER_LENGTH(bufev->input), bufev);
440 }
441 
442 int
bufferevent_base_set(struct event_base * base,struct bufferevent * bufev)443 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
444 {
445 	int res;
446 
447 	bufev->ev_base = base;
448 
449 	res = event_base_set(base, &bufev->ev_read);
450 	if (res == -1)
451 		return (res);
452 
453 	res = event_base_set(base, &bufev->ev_write);
454 	return (res);
455 }
456