1 /*
2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 #include <sys/types.h>
30 #include <limits.h>
31 #include <string.h>
32 #include <stdlib.h>
33
34 #include "event2/event.h"
35 #include "event2/event_struct.h"
36 #include "event2/util.h"
37 #include "event2/bufferevent.h"
38 #include "event2/bufferevent_struct.h"
39 #include "event2/buffer.h"
40
41 #include "ratelim-internal.h"
42
43 #include "bufferevent-internal.h"
44 #include "mm-internal.h"
45 #include "util-internal.h"
46 #include "event-internal.h"
47
48 int
ev_token_bucket_init(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick,int reinitialize)49 ev_token_bucket_init(struct ev_token_bucket *bucket,
50 const struct ev_token_bucket_cfg *cfg,
51 ev_uint32_t current_tick,
52 int reinitialize)
53 {
54 if (reinitialize) {
55 /* on reinitialization, we only clip downwards, since we've
56 already used who-knows-how-much bandwidth this tick. We
57 leave "last_updated" as it is; the next update will add the
58 appropriate amount of bandwidth to the bucket.
59 */
60 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
61 bucket->read_limit = cfg->read_maximum;
62 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
63 bucket->write_limit = cfg->write_maximum;
64 } else {
65 bucket->read_limit = cfg->read_rate;
66 bucket->write_limit = cfg->write_rate;
67 bucket->last_updated = current_tick;
68 }
69 return 0;
70 }
71
72 int
ev_token_bucket_update(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick)73 ev_token_bucket_update(struct ev_token_bucket *bucket,
74 const struct ev_token_bucket_cfg *cfg,
75 ev_uint32_t current_tick)
76 {
77 /* It's okay if the tick number overflows, since we'll just
78 * wrap around when we do the unsigned substraction. */
79 unsigned n_ticks = current_tick - bucket->last_updated;
80
81 /* Make sure some ticks actually happened, and that time didn't
82 * roll back. */
83 if (n_ticks == 0 || n_ticks > INT_MAX)
84 return 0;
85
86 /* Naively, we would say
87 bucket->limit += n_ticks * cfg->rate;
88
89 if (bucket->limit > cfg->maximum)
90 bucket->limit = cfg->maximum;
91
92 But we're worried about overflow, so we do it like this:
93 */
94
95 if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
96 bucket->read_limit = cfg->read_maximum;
97 else
98 bucket->read_limit += n_ticks * cfg->read_rate;
99
100
101 if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
102 bucket->write_limit = cfg->write_maximum;
103 else
104 bucket->write_limit += n_ticks * cfg->write_rate;
105
106
107 bucket->last_updated = current_tick;
108
109 return 1;
110 }
111
112 static inline void
bufferevent_update_buckets(struct bufferevent_private * bev)113 bufferevent_update_buckets(struct bufferevent_private *bev)
114 {
115 /* Must hold lock on bev. */
116 struct timeval now;
117 unsigned tick;
118 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
119 tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
120 if (tick != bev->rate_limiting->limit.last_updated)
121 ev_token_bucket_update(&bev->rate_limiting->limit,
122 bev->rate_limiting->cfg, tick);
123 }
124
125 ev_uint32_t
ev_token_bucket_get_tick(const struct timeval * tv,const struct ev_token_bucket_cfg * cfg)126 ev_token_bucket_get_tick(const struct timeval *tv,
127 const struct ev_token_bucket_cfg *cfg)
128 {
129 /* This computation uses two multiplies and a divide. We could do
130 * fewer if we knew that the tick length was an integer number of
131 * seconds, or if we knew it divided evenly into a second. We should
132 * investigate that more.
133 */
134
135 /* We cast to an ev_uint64_t first, since we don't want to overflow
136 * before we do the final divide. */
137 ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
138 return (unsigned)(msec / cfg->msec_per_tick);
139 }
140
141 struct ev_token_bucket_cfg *
ev_token_bucket_cfg_new(size_t read_rate,size_t read_burst,size_t write_rate,size_t write_burst,const struct timeval * tick_len)142 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
143 size_t write_rate, size_t write_burst,
144 const struct timeval *tick_len)
145 {
146 struct ev_token_bucket_cfg *r;
147 struct timeval g;
148 if (! tick_len) {
149 g.tv_sec = 1;
150 g.tv_usec = 0;
151 tick_len = &g;
152 }
153 if (read_rate > read_burst || write_rate > write_burst ||
154 read_rate < 1 || write_rate < 1)
155 return NULL;
156 if (read_rate > EV_RATE_LIMIT_MAX ||
157 write_rate > EV_RATE_LIMIT_MAX ||
158 read_burst > EV_RATE_LIMIT_MAX ||
159 write_burst > EV_RATE_LIMIT_MAX)
160 return NULL;
161 r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
162 if (!r)
163 return NULL;
164 r->read_rate = read_rate;
165 r->write_rate = write_rate;
166 r->read_maximum = read_burst;
167 r->write_maximum = write_burst;
168 memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
169 r->msec_per_tick = (tick_len->tv_sec * 1000) +
170 (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
171 return r;
172 }
173
174 void
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg * cfg)175 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
176 {
177 mm_free(cfg);
178 }
179
180 /* No matter how big our bucket gets, don't try to read more than this
181 * much in a single read operation. */
182 #define MAX_TO_READ_EVER 16384
183 /* No matter how big our bucket gets, don't try to write more than this
184 * much in a single write operation. */
185 #define MAX_TO_WRITE_EVER 16384
186
187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
189
190 static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
191 static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
192 static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
193 static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
194
195 /** Helper: figure out the maximum amount we should write if is_write, or
196 the maximum amount we should read if is_read. Return that maximum, or
197 0 if our bucket is wholly exhausted.
198 */
199 static inline ev_ssize_t
_bufferevent_get_rlim_max(struct bufferevent_private * bev,int is_write)200 _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
201 {
202 /* needs lock on bev. */
203 ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
204
205 #define LIM(x) \
206 (is_write ? (x).write_limit : (x).read_limit)
207
208 #define GROUP_SUSPENDED(g) \
209 (is_write ? (g)->write_suspended : (g)->read_suspended)
210
211 /* Sets max_so_far to MIN(x, max_so_far) */
212 #define CLAMPTO(x) \
213 do { \
214 if (max_so_far > (x)) \
215 max_so_far = (x); \
216 } while (0);
217
218 if (!bev->rate_limiting)
219 return max_so_far;
220
221 /* If rate-limiting is enabled at all, update the appropriate
222 bucket, and take the smaller of our rate limit and the group
223 rate limit.
224 */
225
226 if (bev->rate_limiting->cfg) {
227 bufferevent_update_buckets(bev);
228 max_so_far = LIM(bev->rate_limiting->limit);
229 }
230 if (bev->rate_limiting->group) {
231 struct bufferevent_rate_limit_group *g =
232 bev->rate_limiting->group;
233 ev_ssize_t share;
234 LOCK_GROUP(g);
235 if (GROUP_SUSPENDED(g)) {
236 /* We can get here if we failed to lock this
237 * particular bufferevent while suspending the whole
238 * group. */
239 if (is_write)
240 bufferevent_suspend_write(&bev->bev,
241 BEV_SUSPEND_BW_GROUP);
242 else
243 bufferevent_suspend_read(&bev->bev,
244 BEV_SUSPEND_BW_GROUP);
245 share = 0;
246 } else {
247 /* XXXX probably we should divide among the active
248 * members, not the total members. */
249 share = LIM(g->rate_limit) / g->n_members;
250 if (share < g->min_share)
251 share = g->min_share;
252 }
253 UNLOCK_GROUP(g);
254 CLAMPTO(share);
255 }
256
257 if (max_so_far < 0)
258 max_so_far = 0;
259 return max_so_far;
260 }
261
262 ev_ssize_t
_bufferevent_get_read_max(struct bufferevent_private * bev)263 _bufferevent_get_read_max(struct bufferevent_private *bev)
264 {
265 return _bufferevent_get_rlim_max(bev, 0);
266 }
267
268 ev_ssize_t
_bufferevent_get_write_max(struct bufferevent_private * bev)269 _bufferevent_get_write_max(struct bufferevent_private *bev)
270 {
271 return _bufferevent_get_rlim_max(bev, 1);
272 }
273
274 int
_bufferevent_decrement_read_buckets(struct bufferevent_private * bev,ev_ssize_t bytes)275 _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
276 {
277 /* XXXXX Make sure all users of this function check its return value */
278 int r = 0;
279 /* need to hold lock on bev */
280 if (!bev->rate_limiting)
281 return 0;
282
283 if (bev->rate_limiting->cfg) {
284 bev->rate_limiting->limit.read_limit -= bytes;
285 if (bev->rate_limiting->limit.read_limit <= 0) {
286 bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
287 if (event_add(&bev->rate_limiting->refill_bucket_event,
288 &bev->rate_limiting->cfg->tick_timeout) < 0)
289 r = -1;
290 } else if (bev->read_suspended & BEV_SUSPEND_BW) {
291 if (!(bev->write_suspended & BEV_SUSPEND_BW))
292 event_del(&bev->rate_limiting->refill_bucket_event);
293 bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
294 }
295 }
296
297 if (bev->rate_limiting->group) {
298 LOCK_GROUP(bev->rate_limiting->group);
299 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
300 bev->rate_limiting->group->total_read += bytes;
301 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302 _bev_group_suspend_reading(bev->rate_limiting->group);
303 } else if (bev->rate_limiting->group->read_suspended) {
304 _bev_group_unsuspend_reading(bev->rate_limiting->group);
305 }
306 UNLOCK_GROUP(bev->rate_limiting->group);
307 }
308
309 return r;
310 }
311
312 int
_bufferevent_decrement_write_buckets(struct bufferevent_private * bev,ev_ssize_t bytes)313 _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
314 {
315 /* XXXXX Make sure all users of this function check its return value */
316 int r = 0;
317 /* need to hold lock */
318 if (!bev->rate_limiting)
319 return 0;
320
321 if (bev->rate_limiting->cfg) {
322 bev->rate_limiting->limit.write_limit -= bytes;
323 if (bev->rate_limiting->limit.write_limit <= 0) {
324 bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
325 if (event_add(&bev->rate_limiting->refill_bucket_event,
326 &bev->rate_limiting->cfg->tick_timeout) < 0)
327 r = -1;
328 } else if (bev->write_suspended & BEV_SUSPEND_BW) {
329 if (!(bev->read_suspended & BEV_SUSPEND_BW))
330 event_del(&bev->rate_limiting->refill_bucket_event);
331 bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
332 }
333 }
334
335 if (bev->rate_limiting->group) {
336 LOCK_GROUP(bev->rate_limiting->group);
337 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
338 bev->rate_limiting->group->total_written += bytes;
339 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340 _bev_group_suspend_writing(bev->rate_limiting->group);
341 } else if (bev->rate_limiting->group->write_suspended) {
342 _bev_group_unsuspend_writing(bev->rate_limiting->group);
343 }
344 UNLOCK_GROUP(bev->rate_limiting->group);
345 }
346
347 return r;
348 }
349
350 /** Stop reading on every bufferevent in <b>g</b> */
351 static int
_bev_group_suspend_reading(struct bufferevent_rate_limit_group * g)352 _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
353 {
354 /* Needs group lock */
355 struct bufferevent_private *bev;
356 g->read_suspended = 1;
357 g->pending_unsuspend_read = 0;
358
359 /* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
360 to prevent a deadlock. (Ordinarily, the group lock nests inside
361 the bufferevent locks. If we are unable to lock any individual
362 bufferevent, it will find out later when it looks at its limit
363 and sees that its group is suspended.
364 */
365 TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366 if (EVLOCK_TRY_LOCK(bev->lock)) {
367 bufferevent_suspend_read(&bev->bev,
368 BEV_SUSPEND_BW_GROUP);
369 EVLOCK_UNLOCK(bev->lock, 0);
370 }
371 }
372 return 0;
373 }
374
375 /** Stop writing on every bufferevent in <b>g</b> */
376 static int
_bev_group_suspend_writing(struct bufferevent_rate_limit_group * g)377 _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
378 {
379 /* Needs group lock */
380 struct bufferevent_private *bev;
381 g->write_suspended = 1;
382 g->pending_unsuspend_write = 0;
383 TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384 if (EVLOCK_TRY_LOCK(bev->lock)) {
385 bufferevent_suspend_write(&bev->bev,
386 BEV_SUSPEND_BW_GROUP);
387 EVLOCK_UNLOCK(bev->lock, 0);
388 }
389 }
390 return 0;
391 }
392
393 /** Timer callback invoked on a single bufferevent with one or more exhausted
394 buckets when they are ready to refill. */
395 static void
_bev_refill_callback(evutil_socket_t fd,short what,void * arg)396 _bev_refill_callback(evutil_socket_t fd, short what, void *arg)
397 {
398 unsigned tick;
399 struct timeval now;
400 struct bufferevent_private *bev = arg;
401 int again = 0;
402 BEV_LOCK(&bev->bev);
403 if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
404 BEV_UNLOCK(&bev->bev);
405 return;
406 }
407
408 /* First, update the bucket */
409 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410 tick = ev_token_bucket_get_tick(&now,
411 bev->rate_limiting->cfg);
412 ev_token_bucket_update(&bev->rate_limiting->limit,
413 bev->rate_limiting->cfg,
414 tick);
415
416 /* Now unsuspend any read/write operations as appropriate. */
417 if ((bev->read_suspended & BEV_SUSPEND_BW)) {
418 if (bev->rate_limiting->limit.read_limit > 0)
419 bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
420 else
421 again = 1;
422 }
423 if ((bev->write_suspended & BEV_SUSPEND_BW)) {
424 if (bev->rate_limiting->limit.write_limit > 0)
425 bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
426 else
427 again = 1;
428 }
429 if (again) {
430 /* One or more of the buckets may need another refill if they
431 started negative.
432
433 XXXX if we need to be quiet for more ticks, we should
434 maybe figure out what timeout we really want.
435 */
436 /* XXXX Handle event_add failure somehow */
437 event_add(&bev->rate_limiting->refill_bucket_event,
438 &bev->rate_limiting->cfg->tick_timeout);
439 }
440 BEV_UNLOCK(&bev->bev);
441 }
442
443 /** Helper: grab a random element from a bufferevent group. */
444 static struct bufferevent_private *
_bev_group_random_element(struct bufferevent_rate_limit_group * group)445 _bev_group_random_element(struct bufferevent_rate_limit_group *group)
446 {
447 int which;
448 struct bufferevent_private *bev;
449
450 /* requires group lock */
451
452 if (!group->n_members)
453 return NULL;
454
455 EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
456
457 which = _evutil_weakrand() % group->n_members;
458
459 bev = TAILQ_FIRST(&group->members);
460 while (which--)
461 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
462
463 return bev;
464 }
465
466 /** Iterate over the elements of a rate-limiting group 'g' with a random
467 starting point, assigning each to the variable 'bev', and executing the
468 block 'block'.
469
470 We do this in a half-baked effort to get fairness among group members.
471 XXX Round-robin or some kind of priority queue would be even more fair.
472 */
473 #define FOREACH_RANDOM_ORDER(block) \
474 do { \
475 first = _bev_group_random_element(g); \
476 for (bev = first; bev != TAILQ_END(&g->members); \
477 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
478 block ; \
479 } \
480 for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
481 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
482 block ; \
483 } \
484 } while (0)
485
486 static void
_bev_group_unsuspend_reading(struct bufferevent_rate_limit_group * g)487 _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
488 {
489 int again = 0;
490 struct bufferevent_private *bev, *first;
491
492 g->read_suspended = 0;
493 FOREACH_RANDOM_ORDER({
494 if (EVLOCK_TRY_LOCK(bev->lock)) {
495 bufferevent_unsuspend_read(&bev->bev,
496 BEV_SUSPEND_BW_GROUP);
497 EVLOCK_UNLOCK(bev->lock, 0);
498 } else {
499 again = 1;
500 }
501 });
502 g->pending_unsuspend_read = again;
503 }
504
505 static void
_bev_group_unsuspend_writing(struct bufferevent_rate_limit_group * g)506 _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
507 {
508 int again = 0;
509 struct bufferevent_private *bev, *first;
510 g->write_suspended = 0;
511
512 FOREACH_RANDOM_ORDER({
513 if (EVLOCK_TRY_LOCK(bev->lock)) {
514 bufferevent_unsuspend_write(&bev->bev,
515 BEV_SUSPEND_BW_GROUP);
516 EVLOCK_UNLOCK(bev->lock, 0);
517 } else {
518 again = 1;
519 }
520 });
521 g->pending_unsuspend_write = again;
522 }
523
524 /** Callback invoked every tick to add more elements to the group bucket
525 and unsuspend group members as needed.
526 */
527 static void
_bev_group_refill_callback(evutil_socket_t fd,short what,void * arg)528 _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
529 {
530 struct bufferevent_rate_limit_group *g = arg;
531 unsigned tick;
532 struct timeval now;
533
534 event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
535
536 LOCK_GROUP(g);
537
538 tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
539 ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
540
541 if (g->pending_unsuspend_read ||
542 (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
543 _bev_group_unsuspend_reading(g);
544 }
545 if (g->pending_unsuspend_write ||
546 (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
547 _bev_group_unsuspend_writing(g);
548 }
549
550 /* XXXX Rather than waiting to the next tick to unsuspend stuff
551 * with pending_unsuspend_write/read, we should do it on the
552 * next iteration of the mainloop.
553 */
554
555 UNLOCK_GROUP(g);
556 }
557
558 int
bufferevent_set_rate_limit(struct bufferevent * bev,struct ev_token_bucket_cfg * cfg)559 bufferevent_set_rate_limit(struct bufferevent *bev,
560 struct ev_token_bucket_cfg *cfg)
561 {
562 struct bufferevent_private *bevp =
563 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
564 int r = -1;
565 struct bufferevent_rate_limit *rlim;
566 struct timeval now;
567 ev_uint32_t tick;
568 int reinit = 0, suspended = 0;
569 /* XXX reference-count cfg */
570
571 BEV_LOCK(bev);
572
573 if (cfg == NULL) {
574 if (bevp->rate_limiting) {
575 rlim = bevp->rate_limiting;
576 rlim->cfg = NULL;
577 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
578 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
579 if (event_initialized(&rlim->refill_bucket_event))
580 event_del(&rlim->refill_bucket_event);
581 }
582 r = 0;
583 goto done;
584 }
585
586 event_base_gettimeofday_cached(bev->ev_base, &now);
587 tick = ev_token_bucket_get_tick(&now, cfg);
588
589 if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
590 /* no-op */
591 r = 0;
592 goto done;
593 }
594 if (bevp->rate_limiting == NULL) {
595 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
596 if (!rlim)
597 goto done;
598 bevp->rate_limiting = rlim;
599 } else {
600 rlim = bevp->rate_limiting;
601 }
602 reinit = rlim->cfg != NULL;
603
604 rlim->cfg = cfg;
605 ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
606
607 if (reinit) {
608 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
609 event_del(&rlim->refill_bucket_event);
610 }
611 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
612 _bev_refill_callback, bevp);
613
614 if (rlim->limit.read_limit > 0) {
615 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
616 } else {
617 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
618 suspended=1;
619 }
620 if (rlim->limit.write_limit > 0) {
621 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
622 } else {
623 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
624 suspended = 1;
625 }
626
627 if (suspended)
628 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
629
630 r = 0;
631
632 done:
633 BEV_UNLOCK(bev);
634 return r;
635 }
636
637 struct bufferevent_rate_limit_group *
bufferevent_rate_limit_group_new(struct event_base * base,const struct ev_token_bucket_cfg * cfg)638 bufferevent_rate_limit_group_new(struct event_base *base,
639 const struct ev_token_bucket_cfg *cfg)
640 {
641 struct bufferevent_rate_limit_group *g;
642 struct timeval now;
643 ev_uint32_t tick;
644
645 event_base_gettimeofday_cached(base, &now);
646 tick = ev_token_bucket_get_tick(&now, cfg);
647
648 g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
649 if (!g)
650 return NULL;
651 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
652 TAILQ_INIT(&g->members);
653
654 ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
655
656 event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
657 _bev_group_refill_callback, g);
658 /*XXXX handle event_add failure */
659 event_add(&g->master_refill_event, &cfg->tick_timeout);
660
661 EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
662
663 bufferevent_rate_limit_group_set_min_share(g, 64);
664
665 return g;
666 }
667
668 int
bufferevent_rate_limit_group_set_cfg(struct bufferevent_rate_limit_group * g,const struct ev_token_bucket_cfg * cfg)669 bufferevent_rate_limit_group_set_cfg(
670 struct bufferevent_rate_limit_group *g,
671 const struct ev_token_bucket_cfg *cfg)
672 {
673 int same_tick;
674 if (!g || !cfg)
675 return -1;
676
677 LOCK_GROUP(g);
678 same_tick = evutil_timercmp(
679 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
680 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
681
682 if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
683 g->rate_limit.read_limit = cfg->read_maximum;
684 if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
685 g->rate_limit.write_limit = cfg->write_maximum;
686
687 if (!same_tick) {
688 /* This can cause a hiccup in the schedule */
689 event_add(&g->master_refill_event, &cfg->tick_timeout);
690 }
691
692 /* The new limits might force us to adjust min_share differently. */
693 bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
694
695 UNLOCK_GROUP(g);
696 return 0;
697 }
698
699 int
bufferevent_rate_limit_group_set_min_share(struct bufferevent_rate_limit_group * g,size_t share)700 bufferevent_rate_limit_group_set_min_share(
701 struct bufferevent_rate_limit_group *g,
702 size_t share)
703 {
704 if (share > EV_SSIZE_MAX)
705 return -1;
706
707 g->configured_min_share = share;
708
709 /* Can't set share to less than the one-tick maximum. IOW, at steady
710 * state, at least one connection can go per tick. */
711 if (share > g->rate_limit_cfg.read_rate)
712 share = g->rate_limit_cfg.read_rate;
713 if (share > g->rate_limit_cfg.write_rate)
714 share = g->rate_limit_cfg.write_rate;
715
716 g->min_share = share;
717 return 0;
718 }
719
720 void
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group * g)721 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
722 {
723 LOCK_GROUP(g);
724 EVUTIL_ASSERT(0 == g->n_members);
725 event_del(&g->master_refill_event);
726 UNLOCK_GROUP(g);
727 EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
728 mm_free(g);
729 }
730
731 int
bufferevent_add_to_rate_limit_group(struct bufferevent * bev,struct bufferevent_rate_limit_group * g)732 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
733 struct bufferevent_rate_limit_group *g)
734 {
735 int wsuspend, rsuspend;
736 struct bufferevent_private *bevp =
737 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
738 BEV_LOCK(bev);
739
740 if (!bevp->rate_limiting) {
741 struct bufferevent_rate_limit *rlim;
742 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
743 if (!rlim) {
744 BEV_UNLOCK(bev);
745 return -1;
746 }
747 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
748 _bev_refill_callback, bevp);
749 bevp->rate_limiting = rlim;
750 }
751
752 if (bevp->rate_limiting->group == g) {
753 BEV_UNLOCK(bev);
754 return 0;
755 }
756 if (bevp->rate_limiting->group)
757 bufferevent_remove_from_rate_limit_group(bev);
758
759 LOCK_GROUP(g);
760 bevp->rate_limiting->group = g;
761 ++g->n_members;
762 TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
763
764 rsuspend = g->read_suspended;
765 wsuspend = g->write_suspended;
766
767 UNLOCK_GROUP(g);
768
769 if (rsuspend)
770 bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
771 if (wsuspend)
772 bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
773
774 BEV_UNLOCK(bev);
775 return 0;
776 }
777
778 int
bufferevent_remove_from_rate_limit_group(struct bufferevent * bev)779 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
780 {
781 return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
782 }
783
784 int
bufferevent_remove_from_rate_limit_group_internal(struct bufferevent * bev,int unsuspend)785 bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
786 int unsuspend)
787 {
788 struct bufferevent_private *bevp =
789 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
790 BEV_LOCK(bev);
791 if (bevp->rate_limiting && bevp->rate_limiting->group) {
792 struct bufferevent_rate_limit_group *g =
793 bevp->rate_limiting->group;
794 LOCK_GROUP(g);
795 bevp->rate_limiting->group = NULL;
796 --g->n_members;
797 TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
798 UNLOCK_GROUP(g);
799 }
800 if (unsuspend) {
801 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
802 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
803 }
804 BEV_UNLOCK(bev);
805 return 0;
806 }
807
808 /* ===
809 * API functions to expose rate limits.
810 *
811 * Don't use these from inside Libevent; they're meant to be for use by
812 * the program.
813 * === */
814
815 /* Mostly you don't want to use this function from inside libevent;
816 * _bufferevent_get_read_max() is more likely what you want*/
817 ev_ssize_t
bufferevent_get_read_limit(struct bufferevent * bev)818 bufferevent_get_read_limit(struct bufferevent *bev)
819 {
820 ev_ssize_t r;
821 struct bufferevent_private *bevp;
822 BEV_LOCK(bev);
823 bevp = BEV_UPCAST(bev);
824 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
825 bufferevent_update_buckets(bevp);
826 r = bevp->rate_limiting->limit.read_limit;
827 } else {
828 r = EV_SSIZE_MAX;
829 }
830 BEV_UNLOCK(bev);
831 return r;
832 }
833
834 /* Mostly you don't want to use this function from inside libevent;
835 * _bufferevent_get_write_max() is more likely what you want*/
836 ev_ssize_t
bufferevent_get_write_limit(struct bufferevent * bev)837 bufferevent_get_write_limit(struct bufferevent *bev)
838 {
839 ev_ssize_t r;
840 struct bufferevent_private *bevp;
841 BEV_LOCK(bev);
842 bevp = BEV_UPCAST(bev);
843 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
844 bufferevent_update_buckets(bevp);
845 r = bevp->rate_limiting->limit.write_limit;
846 } else {
847 r = EV_SSIZE_MAX;
848 }
849 BEV_UNLOCK(bev);
850 return r;
851 }
852
853 ev_ssize_t
bufferevent_get_max_to_read(struct bufferevent * bev)854 bufferevent_get_max_to_read(struct bufferevent *bev)
855 {
856 ev_ssize_t r;
857 BEV_LOCK(bev);
858 r = _bufferevent_get_read_max(BEV_UPCAST(bev));
859 BEV_UNLOCK(bev);
860 return r;
861 }
862
863 ev_ssize_t
bufferevent_get_max_to_write(struct bufferevent * bev)864 bufferevent_get_max_to_write(struct bufferevent *bev)
865 {
866 ev_ssize_t r;
867 BEV_LOCK(bev);
868 r = _bufferevent_get_write_max(BEV_UPCAST(bev));
869 BEV_UNLOCK(bev);
870 return r;
871 }
872
873
874 /* Mostly you don't want to use this function from inside libevent;
875 * _bufferevent_get_read_max() is more likely what you want*/
876 ev_ssize_t
bufferevent_rate_limit_group_get_read_limit(struct bufferevent_rate_limit_group * grp)877 bufferevent_rate_limit_group_get_read_limit(
878 struct bufferevent_rate_limit_group *grp)
879 {
880 ev_ssize_t r;
881 LOCK_GROUP(grp);
882 r = grp->rate_limit.read_limit;
883 UNLOCK_GROUP(grp);
884 return r;
885 }
886
887 /* Mostly you don't want to use this function from inside libevent;
888 * _bufferevent_get_write_max() is more likely what you want. */
889 ev_ssize_t
bufferevent_rate_limit_group_get_write_limit(struct bufferevent_rate_limit_group * grp)890 bufferevent_rate_limit_group_get_write_limit(
891 struct bufferevent_rate_limit_group *grp)
892 {
893 ev_ssize_t r;
894 LOCK_GROUP(grp);
895 r = grp->rate_limit.write_limit;
896 UNLOCK_GROUP(grp);
897 return r;
898 }
899
900 int
bufferevent_decrement_read_limit(struct bufferevent * bev,ev_ssize_t decr)901 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
902 {
903 int r = 0;
904 ev_ssize_t old_limit, new_limit;
905 struct bufferevent_private *bevp;
906 BEV_LOCK(bev);
907 bevp = BEV_UPCAST(bev);
908 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
909 old_limit = bevp->rate_limiting->limit.read_limit;
910
911 new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
912 if (old_limit > 0 && new_limit <= 0) {
913 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
914 if (event_add(&bevp->rate_limiting->refill_bucket_event,
915 &bevp->rate_limiting->cfg->tick_timeout) < 0)
916 r = -1;
917 } else if (old_limit <= 0 && new_limit > 0) {
918 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
919 event_del(&bevp->rate_limiting->refill_bucket_event);
920 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
921 }
922
923 BEV_UNLOCK(bev);
924 return r;
925 }
926
927 int
bufferevent_decrement_write_limit(struct bufferevent * bev,ev_ssize_t decr)928 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
929 {
930 /* XXXX this is mostly copy-and-paste from
931 * bufferevent_decrement_read_limit */
932 int r = 0;
933 ev_ssize_t old_limit, new_limit;
934 struct bufferevent_private *bevp;
935 BEV_LOCK(bev);
936 bevp = BEV_UPCAST(bev);
937 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
938 old_limit = bevp->rate_limiting->limit.write_limit;
939
940 new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
941 if (old_limit > 0 && new_limit <= 0) {
942 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
943 if (event_add(&bevp->rate_limiting->refill_bucket_event,
944 &bevp->rate_limiting->cfg->tick_timeout) < 0)
945 r = -1;
946 } else if (old_limit <= 0 && new_limit > 0) {
947 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
948 event_del(&bevp->rate_limiting->refill_bucket_event);
949 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
950 }
951
952 BEV_UNLOCK(bev);
953 return r;
954 }
955
956 int
bufferevent_rate_limit_group_decrement_read(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)957 bufferevent_rate_limit_group_decrement_read(
958 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
959 {
960 int r = 0;
961 ev_ssize_t old_limit, new_limit;
962 LOCK_GROUP(grp);
963 old_limit = grp->rate_limit.read_limit;
964 new_limit = (grp->rate_limit.read_limit -= decr);
965
966 if (old_limit > 0 && new_limit <= 0) {
967 _bev_group_suspend_reading(grp);
968 } else if (old_limit <= 0 && new_limit > 0) {
969 _bev_group_unsuspend_reading(grp);
970 }
971
972 UNLOCK_GROUP(grp);
973 return r;
974 }
975
976 int
bufferevent_rate_limit_group_decrement_write(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)977 bufferevent_rate_limit_group_decrement_write(
978 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
979 {
980 int r = 0;
981 ev_ssize_t old_limit, new_limit;
982 LOCK_GROUP(grp);
983 old_limit = grp->rate_limit.write_limit;
984 new_limit = (grp->rate_limit.write_limit -= decr);
985
986 if (old_limit > 0 && new_limit <= 0) {
987 _bev_group_suspend_writing(grp);
988 } else if (old_limit <= 0 && new_limit > 0) {
989 _bev_group_unsuspend_writing(grp);
990 }
991
992 UNLOCK_GROUP(grp);
993 return r;
994 }
995
996 void
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group * grp,ev_uint64_t * total_read_out,ev_uint64_t * total_written_out)997 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
998 ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
999 {
1000 EVUTIL_ASSERT(grp != NULL);
1001 if (total_read_out)
1002 *total_read_out = grp->total_read;
1003 if (total_written_out)
1004 *total_written_out = grp->total_written;
1005 }
1006
1007 void
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group * grp)1008 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1009 {
1010 grp->total_read = grp->total_written = 0;
1011 }
1012