• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2002-2007 Niels Provos <provos@citi.umich.edu>
3  * Copyright (c) 2007-2012 Niels Provos, Nick Mathewson
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  * 3. The name of the author may not be used to endorse or promote products
14  *    derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include <sys/types.h>
29 
30 #include "event2/event-config.h"
31 
32 #ifdef _EVENT_HAVE_SYS_TIME_H
33 #include <sys/time.h>
34 #endif
35 
36 #include <errno.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #ifdef _EVENT_HAVE_STDARG_H
41 #include <stdarg.h>
42 #endif
43 
44 #ifdef WIN32
45 #include <winsock2.h>
46 #endif
47 #include <errno.h>
48 
49 #include "event2/util.h"
50 #include "event2/buffer.h"
51 #include "event2/buffer_compat.h"
52 #include "event2/bufferevent.h"
53 #include "event2/bufferevent_struct.h"
54 #include "event2/bufferevent_compat.h"
55 #include "event2/event.h"
56 #include "log-internal.h"
57 #include "mm-internal.h"
58 #include "bufferevent-internal.h"
59 #include "evbuffer-internal.h"
60 #include "util-internal.h"
61 
62 static void _bufferevent_cancel_all(struct bufferevent *bev);
63 
64 
65 void
bufferevent_suspend_read(struct bufferevent * bufev,bufferevent_suspend_flags what)66 bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
67 {
68 	struct bufferevent_private *bufev_private =
69 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
70 	BEV_LOCK(bufev);
71 	if (!bufev_private->read_suspended)
72 		bufev->be_ops->disable(bufev, EV_READ);
73 	bufev_private->read_suspended |= what;
74 	BEV_UNLOCK(bufev);
75 }
76 
77 void
bufferevent_unsuspend_read(struct bufferevent * bufev,bufferevent_suspend_flags what)78 bufferevent_unsuspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
79 {
80 	struct bufferevent_private *bufev_private =
81 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
82 	BEV_LOCK(bufev);
83 	bufev_private->read_suspended &= ~what;
84 	if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))
85 		bufev->be_ops->enable(bufev, EV_READ);
86 	BEV_UNLOCK(bufev);
87 }
88 
89 void
bufferevent_suspend_write(struct bufferevent * bufev,bufferevent_suspend_flags what)90 bufferevent_suspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what)
91 {
92 	struct bufferevent_private *bufev_private =
93 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
94 	BEV_LOCK(bufev);
95 	if (!bufev_private->write_suspended)
96 		bufev->be_ops->disable(bufev, EV_WRITE);
97 	bufev_private->write_suspended |= what;
98 	BEV_UNLOCK(bufev);
99 }
100 
101 void
bufferevent_unsuspend_write(struct bufferevent * bufev,bufferevent_suspend_flags what)102 bufferevent_unsuspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what)
103 {
104 	struct bufferevent_private *bufev_private =
105 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
106 	BEV_LOCK(bufev);
107 	bufev_private->write_suspended &= ~what;
108 	if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE))
109 		bufev->be_ops->enable(bufev, EV_WRITE);
110 	BEV_UNLOCK(bufev);
111 }
112 
113 
114 /* Callback to implement watermarks on the input buffer.  Only enabled
115  * if the watermark is set. */
116 static void
bufferevent_inbuf_wm_cb(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)117 bufferevent_inbuf_wm_cb(struct evbuffer *buf,
118     const struct evbuffer_cb_info *cbinfo,
119     void *arg)
120 {
121 	struct bufferevent *bufev = arg;
122 	size_t size;
123 
124 	size = evbuffer_get_length(buf);
125 
126 	if (size >= bufev->wm_read.high)
127 		bufferevent_wm_suspend_read(bufev);
128 	else
129 		bufferevent_wm_unsuspend_read(bufev);
130 }
131 
132 static void
bufferevent_run_deferred_callbacks_locked(struct deferred_cb * _,void * arg)133 bufferevent_run_deferred_callbacks_locked(struct deferred_cb *_, void *arg)
134 {
135 	struct bufferevent_private *bufev_private = arg;
136 	struct bufferevent *bufev = &bufev_private->bev;
137 
138 	BEV_LOCK(bufev);
139 	if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
140 	    bufev->errorcb) {
141 		/* The "connected" happened before any reads or writes, so
142 		   send it first. */
143 		bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
144 		bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg);
145 	}
146 	if (bufev_private->readcb_pending && bufev->readcb) {
147 		bufev_private->readcb_pending = 0;
148 		bufev->readcb(bufev, bufev->cbarg);
149 	}
150 	if (bufev_private->writecb_pending && bufev->writecb) {
151 		bufev_private->writecb_pending = 0;
152 		bufev->writecb(bufev, bufev->cbarg);
153 	}
154 	if (bufev_private->eventcb_pending && bufev->errorcb) {
155 		short what = bufev_private->eventcb_pending;
156 		int err = bufev_private->errno_pending;
157 		bufev_private->eventcb_pending = 0;
158 		bufev_private->errno_pending = 0;
159 		EVUTIL_SET_SOCKET_ERROR(err);
160 		bufev->errorcb(bufev, what, bufev->cbarg);
161 	}
162 	_bufferevent_decref_and_unlock(bufev);
163 }
164 
165 static void
bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb * _,void * arg)166 bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg)
167 {
168 	struct bufferevent_private *bufev_private = arg;
169 	struct bufferevent *bufev = &bufev_private->bev;
170 
171 	BEV_LOCK(bufev);
172 #define UNLOCKED(stmt) \
173 	do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0)
174 
175 	if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
176 	    bufev->errorcb) {
177 		/* The "connected" happened before any reads or writes, so
178 		   send it first. */
179 		bufferevent_event_cb errorcb = bufev->errorcb;
180 		void *cbarg = bufev->cbarg;
181 		bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
182 		UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg));
183 	}
184 	if (bufev_private->readcb_pending && bufev->readcb) {
185 		bufferevent_data_cb readcb = bufev->readcb;
186 		void *cbarg = bufev->cbarg;
187 		bufev_private->readcb_pending = 0;
188 		UNLOCKED(readcb(bufev, cbarg));
189 	}
190 	if (bufev_private->writecb_pending && bufev->writecb) {
191 		bufferevent_data_cb writecb = bufev->writecb;
192 		void *cbarg = bufev->cbarg;
193 		bufev_private->writecb_pending = 0;
194 		UNLOCKED(writecb(bufev, cbarg));
195 	}
196 	if (bufev_private->eventcb_pending && bufev->errorcb) {
197 		bufferevent_event_cb errorcb = bufev->errorcb;
198 		void *cbarg = bufev->cbarg;
199 		short what = bufev_private->eventcb_pending;
200 		int err = bufev_private->errno_pending;
201 		bufev_private->eventcb_pending = 0;
202 		bufev_private->errno_pending = 0;
203 		EVUTIL_SET_SOCKET_ERROR(err);
204 		UNLOCKED(errorcb(bufev,what,cbarg));
205 	}
206 	_bufferevent_decref_and_unlock(bufev);
207 #undef UNLOCKED
208 }
209 
210 #define SCHEDULE_DEFERRED(bevp)						\
211 	do {								\
212 		bufferevent_incref(&(bevp)->bev);			\
213 		event_deferred_cb_schedule(				\
214 			event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \
215 			&(bevp)->deferred);				\
216 	} while (0)
217 
218 
219 void
_bufferevent_run_readcb(struct bufferevent * bufev)220 _bufferevent_run_readcb(struct bufferevent *bufev)
221 {
222 	/* Requires that we hold the lock and a reference */
223 	struct bufferevent_private *p =
224 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
225 	if (bufev->readcb == NULL)
226 		return;
227 	if (p->options & BEV_OPT_DEFER_CALLBACKS) {
228 		p->readcb_pending = 1;
229 		if (!p->deferred.queued)
230 			SCHEDULE_DEFERRED(p);
231 	} else {
232 		bufev->readcb(bufev, bufev->cbarg);
233 	}
234 }
235 
236 void
_bufferevent_run_writecb(struct bufferevent * bufev)237 _bufferevent_run_writecb(struct bufferevent *bufev)
238 {
239 	/* Requires that we hold the lock and a reference */
240 	struct bufferevent_private *p =
241 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
242 	if (bufev->writecb == NULL)
243 		return;
244 	if (p->options & BEV_OPT_DEFER_CALLBACKS) {
245 		p->writecb_pending = 1;
246 		if (!p->deferred.queued)
247 			SCHEDULE_DEFERRED(p);
248 	} else {
249 		bufev->writecb(bufev, bufev->cbarg);
250 	}
251 }
252 
253 void
_bufferevent_run_eventcb(struct bufferevent * bufev,short what)254 _bufferevent_run_eventcb(struct bufferevent *bufev, short what)
255 {
256 	/* Requires that we hold the lock and a reference */
257 	struct bufferevent_private *p =
258 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
259 	if (bufev->errorcb == NULL)
260 		return;
261 	if (p->options & BEV_OPT_DEFER_CALLBACKS) {
262 		p->eventcb_pending |= what;
263 		p->errno_pending = EVUTIL_SOCKET_ERROR();
264 		if (!p->deferred.queued)
265 			SCHEDULE_DEFERRED(p);
266 	} else {
267 		bufev->errorcb(bufev, what, bufev->cbarg);
268 	}
269 }
270 
271 int
bufferevent_init_common(struct bufferevent_private * bufev_private,struct event_base * base,const struct bufferevent_ops * ops,enum bufferevent_options options)272 bufferevent_init_common(struct bufferevent_private *bufev_private,
273     struct event_base *base,
274     const struct bufferevent_ops *ops,
275     enum bufferevent_options options)
276 {
277 	struct bufferevent *bufev = &bufev_private->bev;
278 
279 	if (!bufev->input) {
280 		if ((bufev->input = evbuffer_new()) == NULL)
281 			return -1;
282 	}
283 
284 	if (!bufev->output) {
285 		if ((bufev->output = evbuffer_new()) == NULL) {
286 			evbuffer_free(bufev->input);
287 			return -1;
288 		}
289 	}
290 
291 	bufev_private->refcnt = 1;
292 	bufev->ev_base = base;
293 
294 	/* Disable timeouts. */
295 	evutil_timerclear(&bufev->timeout_read);
296 	evutil_timerclear(&bufev->timeout_write);
297 
298 	bufev->be_ops = ops;
299 
300 	/*
301 	 * Set to EV_WRITE so that using bufferevent_write is going to
302 	 * trigger a callback.  Reading needs to be explicitly enabled
303 	 * because otherwise no data will be available.
304 	 */
305 	bufev->enabled = EV_WRITE;
306 
307 #ifndef _EVENT_DISABLE_THREAD_SUPPORT
308 	if (options & BEV_OPT_THREADSAFE) {
309 		if (bufferevent_enable_locking(bufev, NULL) < 0) {
310 			/* cleanup */
311 			evbuffer_free(bufev->input);
312 			evbuffer_free(bufev->output);
313 			bufev->input = NULL;
314 			bufev->output = NULL;
315 			return -1;
316 		}
317 	}
318 #endif
319 	if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
320 	    == BEV_OPT_UNLOCK_CALLBACKS) {
321 		event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
322 		return -1;
323 	}
324 	if (options & BEV_OPT_DEFER_CALLBACKS) {
325 		if (options & BEV_OPT_UNLOCK_CALLBACKS)
326 			event_deferred_cb_init(&bufev_private->deferred,
327 			    bufferevent_run_deferred_callbacks_unlocked,
328 			    bufev_private);
329 		else
330 			event_deferred_cb_init(&bufev_private->deferred,
331 			    bufferevent_run_deferred_callbacks_locked,
332 			    bufev_private);
333 	}
334 
335 	bufev_private->options = options;
336 
337 	evbuffer_set_parent(bufev->input, bufev);
338 	evbuffer_set_parent(bufev->output, bufev);
339 
340 	return 0;
341 }
342 
343 void
bufferevent_setcb(struct bufferevent * bufev,bufferevent_data_cb readcb,bufferevent_data_cb writecb,bufferevent_event_cb eventcb,void * cbarg)344 bufferevent_setcb(struct bufferevent *bufev,
345     bufferevent_data_cb readcb, bufferevent_data_cb writecb,
346     bufferevent_event_cb eventcb, void *cbarg)
347 {
348 	BEV_LOCK(bufev);
349 
350 	bufev->readcb = readcb;
351 	bufev->writecb = writecb;
352 	bufev->errorcb = eventcb;
353 
354 	bufev->cbarg = cbarg;
355 	BEV_UNLOCK(bufev);
356 }
357 
358 struct evbuffer *
bufferevent_get_input(struct bufferevent * bufev)359 bufferevent_get_input(struct bufferevent *bufev)
360 {
361 	return bufev->input;
362 }
363 
364 struct evbuffer *
bufferevent_get_output(struct bufferevent * bufev)365 bufferevent_get_output(struct bufferevent *bufev)
366 {
367 	return bufev->output;
368 }
369 
370 struct event_base *
bufferevent_get_base(struct bufferevent * bufev)371 bufferevent_get_base(struct bufferevent *bufev)
372 {
373 	return bufev->ev_base;
374 }
375 
376 int
bufferevent_write(struct bufferevent * bufev,const void * data,size_t size)377 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
378 {
379 	if (evbuffer_add(bufev->output, data, size) == -1)
380 		return (-1);
381 
382 	return 0;
383 }
384 
385 int
bufferevent_write_buffer(struct bufferevent * bufev,struct evbuffer * buf)386 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
387 {
388 	if (evbuffer_add_buffer(bufev->output, buf) == -1)
389 		return (-1);
390 
391 	return 0;
392 }
393 
394 size_t
bufferevent_read(struct bufferevent * bufev,void * data,size_t size)395 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
396 {
397 	return (evbuffer_remove(bufev->input, data, size));
398 }
399 
400 int
bufferevent_read_buffer(struct bufferevent * bufev,struct evbuffer * buf)401 bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf)
402 {
403 	return (evbuffer_add_buffer(buf, bufev->input));
404 }
405 
406 int
bufferevent_enable(struct bufferevent * bufev,short event)407 bufferevent_enable(struct bufferevent *bufev, short event)
408 {
409 	struct bufferevent_private *bufev_private =
410 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
411 	short impl_events = event;
412 	int r = 0;
413 
414 	_bufferevent_incref_and_lock(bufev);
415 	if (bufev_private->read_suspended)
416 		impl_events &= ~EV_READ;
417 	if (bufev_private->write_suspended)
418 		impl_events &= ~EV_WRITE;
419 
420 	bufev->enabled |= event;
421 
422 	if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
423 		r = -1;
424 
425 	_bufferevent_decref_and_unlock(bufev);
426 	return r;
427 }
428 
429 int
bufferevent_set_timeouts(struct bufferevent * bufev,const struct timeval * tv_read,const struct timeval * tv_write)430 bufferevent_set_timeouts(struct bufferevent *bufev,
431 			 const struct timeval *tv_read,
432 			 const struct timeval *tv_write)
433 {
434 	int r = 0;
435 	BEV_LOCK(bufev);
436 	if (tv_read) {
437 		bufev->timeout_read = *tv_read;
438 	} else {
439 		evutil_timerclear(&bufev->timeout_read);
440 	}
441 	if (tv_write) {
442 		bufev->timeout_write = *tv_write;
443 	} else {
444 		evutil_timerclear(&bufev->timeout_write);
445 	}
446 
447 	if (bufev->be_ops->adj_timeouts)
448 		r = bufev->be_ops->adj_timeouts(bufev);
449 	BEV_UNLOCK(bufev);
450 
451 	return r;
452 }
453 
454 
455 /* Obsolete; use bufferevent_set_timeouts */
456 void
bufferevent_settimeout(struct bufferevent * bufev,int timeout_read,int timeout_write)457 bufferevent_settimeout(struct bufferevent *bufev,
458 		       int timeout_read, int timeout_write)
459 {
460 	struct timeval tv_read, tv_write;
461 	struct timeval *ptv_read = NULL, *ptv_write = NULL;
462 
463 	memset(&tv_read, 0, sizeof(tv_read));
464 	memset(&tv_write, 0, sizeof(tv_write));
465 
466 	if (timeout_read) {
467 		tv_read.tv_sec = timeout_read;
468 		ptv_read = &tv_read;
469 	}
470 	if (timeout_write) {
471 		tv_write.tv_sec = timeout_write;
472 		ptv_write = &tv_write;
473 	}
474 
475 	bufferevent_set_timeouts(bufev, ptv_read, ptv_write);
476 }
477 
478 
479 int
bufferevent_disable_hard(struct bufferevent * bufev,short event)480 bufferevent_disable_hard(struct bufferevent *bufev, short event)
481 {
482 	int r = 0;
483 	struct bufferevent_private *bufev_private =
484 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
485 
486 	BEV_LOCK(bufev);
487 	bufev->enabled &= ~event;
488 
489 	bufev_private->connecting = 0;
490 	if (bufev->be_ops->disable(bufev, event) < 0)
491 		r = -1;
492 
493 	BEV_UNLOCK(bufev);
494 	return r;
495 }
496 
497 int
bufferevent_disable(struct bufferevent * bufev,short event)498 bufferevent_disable(struct bufferevent *bufev, short event)
499 {
500 	int r = 0;
501 
502 	BEV_LOCK(bufev);
503 	bufev->enabled &= ~event;
504 
505 	if (bufev->be_ops->disable(bufev, event) < 0)
506 		r = -1;
507 
508 	BEV_UNLOCK(bufev);
509 	return r;
510 }
511 
512 /*
513  * Sets the water marks
514  */
515 
516 void
bufferevent_setwatermark(struct bufferevent * bufev,short events,size_t lowmark,size_t highmark)517 bufferevent_setwatermark(struct bufferevent *bufev, short events,
518     size_t lowmark, size_t highmark)
519 {
520 	struct bufferevent_private *bufev_private =
521 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
522 
523 	BEV_LOCK(bufev);
524 	if (events & EV_WRITE) {
525 		bufev->wm_write.low = lowmark;
526 		bufev->wm_write.high = highmark;
527 	}
528 
529 	if (events & EV_READ) {
530 		bufev->wm_read.low = lowmark;
531 		bufev->wm_read.high = highmark;
532 
533 		if (highmark) {
534 			/* There is now a new high-water mark for read.
535 			   enable the callback if needed, and see if we should
536 			   suspend/bufferevent_wm_unsuspend. */
537 
538 			if (bufev_private->read_watermarks_cb == NULL) {
539 				bufev_private->read_watermarks_cb =
540 				    evbuffer_add_cb(bufev->input,
541 						    bufferevent_inbuf_wm_cb,
542 						    bufev);
543 			}
544 			evbuffer_cb_set_flags(bufev->input,
545 				      bufev_private->read_watermarks_cb,
546 				      EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);
547 
548 			if (evbuffer_get_length(bufev->input) >= highmark)
549 				bufferevent_wm_suspend_read(bufev);
550 			else if (evbuffer_get_length(bufev->input) < highmark)
551 				bufferevent_wm_unsuspend_read(bufev);
552 		} else {
553 			/* There is now no high-water mark for read. */
554 			if (bufev_private->read_watermarks_cb)
555 				evbuffer_cb_clear_flags(bufev->input,
556 				    bufev_private->read_watermarks_cb,
557 				    EVBUFFER_CB_ENABLED);
558 			bufferevent_wm_unsuspend_read(bufev);
559 		}
560 	}
561 	BEV_UNLOCK(bufev);
562 }
563 
564 int
bufferevent_flush(struct bufferevent * bufev,short iotype,enum bufferevent_flush_mode mode)565 bufferevent_flush(struct bufferevent *bufev,
566     short iotype,
567     enum bufferevent_flush_mode mode)
568 {
569 	int r = -1;
570 	BEV_LOCK(bufev);
571 	if (bufev->be_ops->flush)
572 		r = bufev->be_ops->flush(bufev, iotype, mode);
573 	BEV_UNLOCK(bufev);
574 	return r;
575 }
576 
577 void
_bufferevent_incref_and_lock(struct bufferevent * bufev)578 _bufferevent_incref_and_lock(struct bufferevent *bufev)
579 {
580 	struct bufferevent_private *bufev_private =
581 	    BEV_UPCAST(bufev);
582 	BEV_LOCK(bufev);
583 	++bufev_private->refcnt;
584 }
585 
586 #if 0
587 static void
588 _bufferevent_transfer_lock_ownership(struct bufferevent *donor,
589     struct bufferevent *recipient)
590 {
591 	struct bufferevent_private *d = BEV_UPCAST(donor);
592 	struct bufferevent_private *r = BEV_UPCAST(recipient);
593 	if (d->lock != r->lock)
594 		return;
595 	if (r->own_lock)
596 		return;
597 	if (d->own_lock) {
598 		d->own_lock = 0;
599 		r->own_lock = 1;
600 	}
601 }
602 #endif
603 
604 int
_bufferevent_decref_and_unlock(struct bufferevent * bufev)605 _bufferevent_decref_and_unlock(struct bufferevent *bufev)
606 {
607 	struct bufferevent_private *bufev_private =
608 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
609 	struct bufferevent *underlying;
610 
611 	EVUTIL_ASSERT(bufev_private->refcnt > 0);
612 
613 	if (--bufev_private->refcnt) {
614 		BEV_UNLOCK(bufev);
615 		return 0;
616 	}
617 
618 	underlying = bufferevent_get_underlying(bufev);
619 
620 	/* Clean up the shared info */
621 	if (bufev->be_ops->destruct)
622 		bufev->be_ops->destruct(bufev);
623 
624 	/* XXX what happens if refcnt for these buffers is > 1?
625 	 * The buffers can share a lock with this bufferevent object,
626 	 * but the lock might be destroyed below. */
627 	/* evbuffer will free the callbacks */
628 	evbuffer_free(bufev->input);
629 	evbuffer_free(bufev->output);
630 
631 	if (bufev_private->rate_limiting) {
632 		if (bufev_private->rate_limiting->group)
633 			bufferevent_remove_from_rate_limit_group_internal(bufev,0);
634 		if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event))
635 			event_del(&bufev_private->rate_limiting->refill_bucket_event);
636 		event_debug_unassign(&bufev_private->rate_limiting->refill_bucket_event);
637 		mm_free(bufev_private->rate_limiting);
638 		bufev_private->rate_limiting = NULL;
639 	}
640 
641 	event_debug_unassign(&bufev->ev_read);
642 	event_debug_unassign(&bufev->ev_write);
643 
644 	BEV_UNLOCK(bufev);
645 	if (bufev_private->own_lock)
646 		EVTHREAD_FREE_LOCK(bufev_private->lock,
647 		    EVTHREAD_LOCKTYPE_RECURSIVE);
648 
649 	/* Free the actual allocated memory. */
650 	mm_free(((char*)bufev) - bufev->be_ops->mem_offset);
651 
652 	/* Release the reference to underlying now that we no longer need the
653 	 * reference to it.  We wait this long mainly in case our lock is
654 	 * shared with underlying.
655 	 *
656 	 * The 'destruct' function will also drop a reference to underlying
657 	 * if BEV_OPT_CLOSE_ON_FREE is set.
658 	 *
659 	 * XXX Should we/can we just refcount evbuffer/bufferevent locks?
660 	 * It would probably save us some headaches.
661 	 */
662 	if (underlying)
663 		bufferevent_decref(underlying);
664 
665 	return 1;
666 }
667 
668 int
bufferevent_decref(struct bufferevent * bufev)669 bufferevent_decref(struct bufferevent *bufev)
670 {
671 	BEV_LOCK(bufev);
672 	return _bufferevent_decref_and_unlock(bufev);
673 }
674 
675 void
bufferevent_free(struct bufferevent * bufev)676 bufferevent_free(struct bufferevent *bufev)
677 {
678 	BEV_LOCK(bufev);
679 	bufferevent_setcb(bufev, NULL, NULL, NULL, NULL);
680 	_bufferevent_cancel_all(bufev);
681 	_bufferevent_decref_and_unlock(bufev);
682 }
683 
684 void
bufferevent_incref(struct bufferevent * bufev)685 bufferevent_incref(struct bufferevent *bufev)
686 {
687 	struct bufferevent_private *bufev_private =
688 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
689 
690 	BEV_LOCK(bufev);
691 	++bufev_private->refcnt;
692 	BEV_UNLOCK(bufev);
693 }
694 
695 int
bufferevent_enable_locking(struct bufferevent * bufev,void * lock)696 bufferevent_enable_locking(struct bufferevent *bufev, void *lock)
697 {
698 #ifdef _EVENT_DISABLE_THREAD_SUPPORT
699 	return -1;
700 #else
701 	struct bufferevent *underlying;
702 
703 	if (BEV_UPCAST(bufev)->lock)
704 		return -1;
705 	underlying = bufferevent_get_underlying(bufev);
706 
707 	if (!lock && underlying && BEV_UPCAST(underlying)->lock) {
708 		lock = BEV_UPCAST(underlying)->lock;
709 		BEV_UPCAST(bufev)->lock = lock;
710 		BEV_UPCAST(bufev)->own_lock = 0;
711 	} else if (!lock) {
712 		EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE);
713 		if (!lock)
714 			return -1;
715 		BEV_UPCAST(bufev)->lock = lock;
716 		BEV_UPCAST(bufev)->own_lock = 1;
717 	} else {
718 		BEV_UPCAST(bufev)->lock = lock;
719 		BEV_UPCAST(bufev)->own_lock = 0;
720 	}
721 	evbuffer_enable_locking(bufev->input, lock);
722 	evbuffer_enable_locking(bufev->output, lock);
723 
724 	if (underlying && !BEV_UPCAST(underlying)->lock)
725 		bufferevent_enable_locking(underlying, lock);
726 
727 	return 0;
728 #endif
729 }
730 
731 int
bufferevent_setfd(struct bufferevent * bev,evutil_socket_t fd)732 bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd)
733 {
734 	union bufferevent_ctrl_data d;
735 	int res = -1;
736 	d.fd = fd;
737 	BEV_LOCK(bev);
738 	if (bev->be_ops->ctrl)
739 		res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d);
740 	BEV_UNLOCK(bev);
741 	return res;
742 }
743 
744 evutil_socket_t
bufferevent_getfd(struct bufferevent * bev)745 bufferevent_getfd(struct bufferevent *bev)
746 {
747 	union bufferevent_ctrl_data d;
748 	int res = -1;
749 	d.fd = -1;
750 	BEV_LOCK(bev);
751 	if (bev->be_ops->ctrl)
752 		res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d);
753 	BEV_UNLOCK(bev);
754 	return (res<0) ? -1 : d.fd;
755 }
756 
757 static void
_bufferevent_cancel_all(struct bufferevent * bev)758 _bufferevent_cancel_all(struct bufferevent *bev)
759 {
760 	union bufferevent_ctrl_data d;
761 	memset(&d, 0, sizeof(d));
762 	BEV_LOCK(bev);
763 	if (bev->be_ops->ctrl)
764 		bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d);
765 	BEV_UNLOCK(bev);
766 }
767 
768 short
bufferevent_get_enabled(struct bufferevent * bufev)769 bufferevent_get_enabled(struct bufferevent *bufev)
770 {
771 	short r;
772 	BEV_LOCK(bufev);
773 	r = bufev->enabled;
774 	BEV_UNLOCK(bufev);
775 	return r;
776 }
777 
778 struct bufferevent *
bufferevent_get_underlying(struct bufferevent * bev)779 bufferevent_get_underlying(struct bufferevent *bev)
780 {
781 	union bufferevent_ctrl_data d;
782 	int res = -1;
783 	d.ptr = NULL;
784 	BEV_LOCK(bev);
785 	if (bev->be_ops->ctrl)
786 		res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_UNDERLYING, &d);
787 	BEV_UNLOCK(bev);
788 	return (res<0) ? NULL : d.ptr;
789 }
790 
791 static void
bufferevent_generic_read_timeout_cb(evutil_socket_t fd,short event,void * ctx)792 bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
793 {
794 	struct bufferevent *bev = ctx;
795 	_bufferevent_incref_and_lock(bev);
796 	bufferevent_disable(bev, EV_READ);
797 	_bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING);
798 	_bufferevent_decref_and_unlock(bev);
799 }
800 static void
bufferevent_generic_write_timeout_cb(evutil_socket_t fd,short event,void * ctx)801 bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
802 {
803 	struct bufferevent *bev = ctx;
804 	_bufferevent_incref_and_lock(bev);
805 	bufferevent_disable(bev, EV_WRITE);
806 	_bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING);
807 	_bufferevent_decref_and_unlock(bev);
808 }
809 
810 void
_bufferevent_init_generic_timeout_cbs(struct bufferevent * bev)811 _bufferevent_init_generic_timeout_cbs(struct bufferevent *bev)
812 {
813 	evtimer_assign(&bev->ev_read, bev->ev_base,
814 	    bufferevent_generic_read_timeout_cb, bev);
815 	evtimer_assign(&bev->ev_write, bev->ev_base,
816 	    bufferevent_generic_write_timeout_cb, bev);
817 }
818 
819 int
_bufferevent_del_generic_timeout_cbs(struct bufferevent * bev)820 _bufferevent_del_generic_timeout_cbs(struct bufferevent *bev)
821 {
822 	int r1,r2;
823 	r1 = event_del(&bev->ev_read);
824 	r2 = event_del(&bev->ev_write);
825 	if (r1<0 || r2<0)
826 		return -1;
827 	return 0;
828 }
829 
830 int
_bufferevent_generic_adj_timeouts(struct bufferevent * bev)831 _bufferevent_generic_adj_timeouts(struct bufferevent *bev)
832 {
833 	const short enabled = bev->enabled;
834 	struct bufferevent_private *bev_p =
835 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
836 	int r1=0, r2=0;
837 	if ((enabled & EV_READ) && !bev_p->read_suspended &&
838 	    evutil_timerisset(&bev->timeout_read))
839 		r1 = event_add(&bev->ev_read, &bev->timeout_read);
840 	else
841 		r1 = event_del(&bev->ev_read);
842 
843 	if ((enabled & EV_WRITE) && !bev_p->write_suspended &&
844 	    evutil_timerisset(&bev->timeout_write) &&
845 	    evbuffer_get_length(bev->output))
846 		r2 = event_add(&bev->ev_write, &bev->timeout_write);
847 	else
848 		r2 = event_del(&bev->ev_write);
849 	if (r1 < 0 || r2 < 0)
850 		return -1;
851 	return 0;
852 }
853 
854 int
_bufferevent_add_event(struct event * ev,const struct timeval * tv)855 _bufferevent_add_event(struct event *ev, const struct timeval *tv)
856 {
857 	if (tv->tv_sec == 0 && tv->tv_usec == 0)
858 		return event_add(ev, NULL);
859 	else
860 		return event_add(ev, tv);
861 }
862 
863 /* For use by user programs only; internally, we should be calling
864    either _bufferevent_incref_and_lock(), or BEV_LOCK. */
865 void
bufferevent_lock(struct bufferevent * bev)866 bufferevent_lock(struct bufferevent *bev)
867 {
868 	_bufferevent_incref_and_lock(bev);
869 }
870 
871 void
bufferevent_unlock(struct bufferevent * bev)872 bufferevent_unlock(struct bufferevent *bev)
873 {
874 	_bufferevent_decref_and_unlock(bev);
875 }
876