1 /* MIT License
2 *
3 * Copyright (c) 2024 Brad House
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice (including the next
13 * paragraph) shall be included in all copies or substantial portions of the
14 * Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 *
24 * SPDX-License-Identifier: MIT
25 */
26 #include "ares_private.h"
27 #include "ares_event.h"
28
29 #ifdef CARES_THREADS
ares_event_destroy_cb(void * arg)30 static void ares_event_destroy_cb(void *arg)
31 {
32 ares_event_t *event = arg;
33 if (event == NULL) {
34 return; /* LCOV_EXCL_LINE: DefensiveCoding */
35 }
36
37 /* Unregister from the event thread if it was registered with one */
38 if (event->e) {
39 const ares_event_thread_t *e = event->e;
40 e->ev_sys->event_del(event);
41 event->e = NULL;
42 }
43
44 if (event->free_data_cb && event->data) {
45 event->free_data_cb(event->data);
46 }
47
48 ares_free(event);
49 }
50
ares_event_signal(const ares_event_t * event)51 static void ares_event_signal(const ares_event_t *event)
52 {
53 if (event == NULL || event->signal_cb == NULL) {
54 return; /* LCOV_EXCL_LINE: DefensiveCoding */
55 }
56 event->signal_cb(event);
57 }
58
ares_event_thread_wake(const ares_event_thread_t * e)59 static void ares_event_thread_wake(const ares_event_thread_t *e)
60 {
61 if (e == NULL) {
62 return; /* LCOV_EXCL_LINE: DefensiveCoding */
63 }
64
65 ares_event_signal(e->ev_signal);
66 }
67
68 /* See if a pending update already exists. We don't want to enqueue multiple
69 * updates for the same event handle. Right now this is O(n) based on number
70 * of updates already enqueued. In the future, it might make sense to make
71 * this O(1) with a hashtable.
72 * NOTE: in some cases a delete then re-add of the same fd, but really pointing
73 * to a different destination can happen due to a quick close of a
74 * connection then creation of a new one. So we need to look at the
75 * flags and ignore any delete events when finding a match since we
76 * need to process the delete always, it can't be combined with other
77 * updates. */
ares_event_update_find(ares_event_thread_t * e,ares_socket_t fd,const void * data)78 static ares_event_t *ares_event_update_find(ares_event_thread_t *e,
79 ares_socket_t fd, const void *data)
80 {
81 ares_llist_node_t *node;
82
83 for (node = ares_llist_node_first(e->ev_updates); node != NULL;
84 node = ares_llist_node_next(node)) {
85 ares_event_t *ev = ares_llist_node_val(node);
86
87 if (fd != ARES_SOCKET_BAD && fd == ev->fd && ev->flags != 0) {
88 return ev;
89 }
90
91 if (fd == ARES_SOCKET_BAD && ev->fd == ARES_SOCKET_BAD &&
92 data == ev->data && ev->flags != 0) {
93 return ev;
94 }
95 }
96
97 return NULL;
98 }
99
ares_event_update(ares_event_t ** event,ares_event_thread_t * e,ares_event_flags_t flags,ares_event_cb_t cb,ares_socket_t fd,void * data,ares_event_free_data_t free_data_cb,ares_event_signal_cb_t signal_cb)100 ares_status_t ares_event_update(ares_event_t **event, ares_event_thread_t *e,
101 ares_event_flags_t flags, ares_event_cb_t cb,
102 ares_socket_t fd, void *data,
103 ares_event_free_data_t free_data_cb,
104 ares_event_signal_cb_t signal_cb)
105 {
106 ares_event_t *ev = NULL;
107 ares_status_t status;
108
109 if (e == NULL) {
110 return ARES_EFORMERR; /* LCOV_EXCL_LINE: DefensiveCoding */
111 }
112
113 /* Callback must be specified if not a removal event. */
114 if (flags != ARES_EVENT_FLAG_NONE && cb == NULL) {
115 return ARES_EFORMERR;
116 }
117
118 if (event != NULL) {
119 *event = NULL;
120 }
121
122 /* Validate flags */
123 if (fd == ARES_SOCKET_BAD) {
124 if (flags & (ARES_EVENT_FLAG_READ | ARES_EVENT_FLAG_WRITE)) {
125 return ARES_EFORMERR;
126 }
127 if (!(flags & ARES_EVENT_FLAG_OTHER)) {
128 return ARES_EFORMERR;
129 }
130 } else {
131 if (flags & ARES_EVENT_FLAG_OTHER) {
132 return ARES_EFORMERR;
133 }
134 }
135
136 /* That's all the validation we can really do */
137
138 ares_thread_mutex_lock(e->mutex);
139
140 /* See if we have a queued update already */
141 ev = ares_event_update_find(e, fd, data);
142 if (ev == NULL) {
143 /* Allocate a new one */
144 ev = ares_malloc_zero(sizeof(*ev));
145 if (ev == NULL) {
146 status = ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */
147 goto done; /* LCOV_EXCL_LINE: OutOfMemory */
148 }
149
150 if (ares_llist_insert_last(e->ev_updates, ev) == NULL) {
151 ares_free(ev); /* LCOV_EXCL_LINE: OutOfMemory */
152 status = ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */
153 goto done; /* LCOV_EXCL_LINE: OutOfMemory */
154 }
155 }
156
157 ev->flags = flags;
158 ev->fd = fd;
159 if (ev->cb == NULL) {
160 ev->cb = cb;
161 }
162 if (ev->data == NULL) {
163 ev->data = data;
164 }
165 if (ev->free_data_cb == NULL) {
166 ev->free_data_cb = free_data_cb;
167 }
168 if (ev->signal_cb == NULL) {
169 ev->signal_cb = signal_cb;
170 }
171
172 if (event != NULL) {
173 *event = ev;
174 }
175
176 status = ARES_SUCCESS;
177
178 done:
179 if (status == ARES_SUCCESS) {
180 /* Wake event thread if successful so it can pull the updates */
181 ares_event_thread_wake(e);
182 }
183
184 ares_thread_mutex_unlock(e->mutex);
185
186 return status;
187 }
188
ares_event_thread_process_fd(ares_event_thread_t * e,ares_socket_t fd,void * data,ares_event_flags_t flags)189 static void ares_event_thread_process_fd(ares_event_thread_t *e,
190 ares_socket_t fd, void *data,
191 ares_event_flags_t flags)
192 {
193 ares_fd_events_t event;
194 (void)data;
195
196 event.fd = fd;
197 event.events = 0;
198 if (flags & ARES_EVENT_FLAG_READ) {
199 event.events |= ARES_FD_EVENT_READ;
200 }
201 if (flags & ARES_EVENT_FLAG_WRITE) {
202 event.events |= ARES_FD_EVENT_WRITE;
203 }
204 ares_process_fds(e->channel, &event, 1, ARES_PROCESS_FLAG_SKIP_NON_FD);
205 }
206
ares_event_thread_sockstate_cb(void * data,ares_socket_t socket_fd,int readable,int writable)207 static void ares_event_thread_sockstate_cb(void *data, ares_socket_t socket_fd,
208 int readable, int writable)
209 {
210 ares_event_thread_t *e = data;
211 ares_event_flags_t flags = ARES_EVENT_FLAG_NONE;
212
213 if (readable) {
214 flags |= ARES_EVENT_FLAG_READ;
215 }
216
217 if (writable) {
218 flags |= ARES_EVENT_FLAG_WRITE;
219 }
220
221 /* Update channel fd. This function will lock e->mutex and also wake the
222 * event thread to process the update */
223 ares_event_update(NULL, e, flags, ares_event_thread_process_fd, socket_fd,
224 NULL, NULL, NULL);
225 }
226
notifywrite_cb(void * data)227 static void notifywrite_cb(void *data)
228 {
229 ares_event_thread_t *e = data;
230
231 ares_thread_mutex_lock(e->mutex);
232 e->process_pending_write = ARES_TRUE;
233 ares_thread_mutex_unlock(e->mutex);
234
235 ares_event_thread_wake(e);
236 }
237
ares_event_process_updates(ares_event_thread_t * e)238 static void ares_event_process_updates(ares_event_thread_t *e)
239 {
240 ares_llist_node_t *node;
241
242 /* Iterate across all updates and apply to internal list, removing from update
243 * list */
244 while ((node = ares_llist_node_first(e->ev_updates)) != NULL) {
245 ares_event_t *newev = ares_llist_node_claim(node);
246 ares_event_t *oldev;
247
248 if (newev->fd == ARES_SOCKET_BAD) {
249 oldev = ares_htable_vpvp_get_direct(e->ev_cust_handles, newev->data);
250 } else {
251 oldev = ares_htable_asvp_get_direct(e->ev_sock_handles, newev->fd);
252 }
253
254 /* Adding new */
255 if (oldev == NULL) {
256 newev->e = e;
257 /* Don't try to add a new event if all flags are cleared, that's basically
258 * someone trying to delete something already deleted. Also if it fails
259 * to add, cleanup. */
260 if (newev->flags == ARES_EVENT_FLAG_NONE ||
261 !e->ev_sys->event_add(newev)) {
262 newev->e = NULL;
263 ares_event_destroy_cb(newev);
264 } else {
265 if (newev->fd == ARES_SOCKET_BAD) {
266 ares_htable_vpvp_insert(e->ev_cust_handles, newev->data, newev);
267 } else {
268 ares_htable_asvp_insert(e->ev_sock_handles, newev->fd, newev);
269 }
270 }
271 continue;
272 }
273
274 /* Removal request */
275 if (newev->flags == ARES_EVENT_FLAG_NONE) {
276 /* the callback for the removal will call e->ev_sys->event_del(e, event)
277 */
278 if (newev->fd == ARES_SOCKET_BAD) {
279 ares_htable_vpvp_remove(e->ev_cust_handles, newev->data);
280 } else {
281 ares_htable_asvp_remove(e->ev_sock_handles, newev->fd);
282 }
283 ares_free(newev);
284 continue;
285 }
286
287 /* Modify request -- only flags can be changed */
288 e->ev_sys->event_mod(oldev, newev->flags);
289 oldev->flags = newev->flags;
290 ares_free(newev);
291 }
292 }
293
ares_event_thread_cleanup(ares_event_thread_t * e)294 static void ares_event_thread_cleanup(ares_event_thread_t *e)
295 {
296 /* Manually free any updates that weren't processed */
297 if (e->ev_updates != NULL) {
298 ares_llist_node_t *node;
299
300 while ((node = ares_llist_node_first(e->ev_updates)) != NULL) {
301 ares_event_destroy_cb(ares_llist_node_claim(node));
302 }
303 ares_llist_destroy(e->ev_updates);
304 e->ev_updates = NULL;
305 }
306
307 if (e->ev_sock_handles != NULL) {
308 ares_htable_asvp_destroy(e->ev_sock_handles);
309 e->ev_sock_handles = NULL;
310 }
311
312 if (e->ev_cust_handles != NULL) {
313 ares_htable_vpvp_destroy(e->ev_cust_handles);
314 e->ev_cust_handles = NULL;
315 }
316
317 if (e->ev_sys != NULL && e->ev_sys->destroy != NULL) {
318 e->ev_sys->destroy(e);
319 e->ev_sys = NULL;
320 }
321 }
322
ares_event_thread(void * arg)323 static void *ares_event_thread(void *arg)
324 {
325 ares_event_thread_t *e = arg;
326 ares_thread_mutex_lock(e->mutex);
327
328 while (e->isup) {
329 struct timeval tv;
330 const struct timeval *tvout;
331 unsigned long timeout_ms = 0; /* 0 = unlimited */
332 ares_bool_t process_pending_write;
333
334 ares_event_process_updates(e);
335
336 /* Don't hold a mutex while waiting on events or calling into anything
337 * that might require a c-ares channel lock since a callback could be
338 * triggered cross-thread */
339 ares_thread_mutex_unlock(e->mutex);
340
341 tvout = ares_timeout(e->channel, NULL, &tv);
342 if (tvout != NULL) {
343 timeout_ms =
344 (unsigned long)((tvout->tv_sec * 1000) + (tvout->tv_usec / 1000) + 1);
345 }
346
347 e->ev_sys->wait(e, timeout_ms);
348
349 /* Process pending write operation */
350 ares_thread_mutex_lock(e->mutex);
351 process_pending_write = e->process_pending_write;
352 e->process_pending_write = ARES_FALSE;
353 ares_thread_mutex_unlock(e->mutex);
354 if (process_pending_write) {
355 ares_process_pending_write(e->channel);
356 }
357
358 /* Relock before we loop again */
359 ares_thread_mutex_lock(e->mutex);
360
361 /* Each iteration should do timeout processing and any other cleanup
362 * that may not have been performed */
363 if (e->isup) {
364 ares_thread_mutex_unlock(e->mutex);
365 ares_process_fds(e->channel, NULL, 0, ARES_PROCESS_FLAG_NONE);
366 ares_thread_mutex_lock(e->mutex);
367 }
368 }
369
370 /* Lets cleanup while we're in the thread itself */
371 ares_event_thread_cleanup(e);
372
373 ares_thread_mutex_unlock(e->mutex);
374
375 return NULL;
376 }
377
ares_event_thread_destroy_int(ares_event_thread_t * e)378 static void ares_event_thread_destroy_int(ares_event_thread_t *e)
379 {
380 /* Wake thread and tell it to shutdown if it exists */
381 ares_thread_mutex_lock(e->mutex);
382 if (e->isup) {
383 e->isup = ARES_FALSE;
384 ares_event_thread_wake(e);
385 }
386 ares_thread_mutex_unlock(e->mutex);
387
388 /* Wait for thread to shutdown */
389 if (e->thread) {
390 void *rv = NULL;
391 ares_thread_join(e->thread, &rv);
392 e->thread = NULL;
393 }
394
395 /* If the event thread ever got to the point of starting, this is a no-op
396 * as it runs this same cleanup when it shuts down */
397 ares_event_thread_cleanup(e);
398
399 ares_thread_mutex_destroy(e->mutex);
400 e->mutex = NULL;
401
402 ares_free(e);
403 }
404
ares_event_thread_destroy(ares_channel_t * channel)405 void ares_event_thread_destroy(ares_channel_t *channel)
406 {
407 ares_event_thread_t *e = channel->sock_state_cb_data;
408
409 if (e == NULL) {
410 return; /* LCOV_EXCL_LINE: DefensiveCoding */
411 }
412
413 ares_event_thread_destroy_int(e);
414 channel->sock_state_cb_data = NULL;
415 channel->sock_state_cb = NULL;
416 channel->notify_pending_write_cb = NULL;
417 channel->notify_pending_write_cb_data = NULL;
418 }
419
ares_event_fetch_sys(ares_evsys_t evsys)420 static const ares_event_sys_t *ares_event_fetch_sys(ares_evsys_t evsys)
421 {
422 switch (evsys) {
423 case ARES_EVSYS_WIN32:
424 #if defined(USE_WINSOCK)
425 return &ares_evsys_win32;
426 #else
427 return NULL;
428 #endif
429
430 case ARES_EVSYS_EPOLL:
431 #if defined(HAVE_EPOLL)
432 return &ares_evsys_epoll;
433 #else
434 return NULL;
435 #endif
436
437 case ARES_EVSYS_KQUEUE:
438 #if defined(HAVE_KQUEUE)
439 return &ares_evsys_kqueue;
440 #else
441 return NULL;
442 #endif
443
444 case ARES_EVSYS_POLL:
445 #if defined(HAVE_POLL)
446 return &ares_evsys_poll;
447 #else
448 return NULL;
449 #endif
450
451 case ARES_EVSYS_SELECT:
452 #if defined(HAVE_PIPE)
453 return &ares_evsys_select;
454 #else
455 return NULL;
456 #endif
457
458 /* case ARES_EVSYS_DEFAULT: */
459 default:
460 break;
461 }
462
463 /* default */
464 #if defined(USE_WINSOCK)
465 return &ares_evsys_win32;
466 #elif defined(HAVE_KQUEUE)
467 return &ares_evsys_kqueue;
468 #elif defined(HAVE_EPOLL)
469 return &ares_evsys_epoll;
470 #elif defined(HAVE_POLL)
471 return &ares_evsys_poll;
472 #elif defined(HAVE_PIPE)
473 return &ares_evsys_select;
474 #else
475 return NULL;
476 #endif
477 }
478
ares_event_thread_init(ares_channel_t * channel)479 ares_status_t ares_event_thread_init(ares_channel_t *channel)
480 {
481 ares_event_thread_t *e;
482
483 e = ares_malloc_zero(sizeof(*e));
484 if (e == NULL) {
485 return ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */
486 }
487
488 e->mutex = ares_thread_mutex_create();
489 if (e->mutex == NULL) {
490 ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: OutOfMemory */
491 return ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */
492 }
493
494 e->ev_updates = ares_llist_create(NULL);
495 if (e->ev_updates == NULL) {
496 ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: OutOfMemory */
497 return ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */
498 }
499
500 e->ev_sock_handles = ares_htable_asvp_create(ares_event_destroy_cb);
501 if (e->ev_sock_handles == NULL) {
502 ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: OutOfMemory */
503 return ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */
504 }
505
506 e->ev_cust_handles = ares_htable_vpvp_create(NULL, ares_event_destroy_cb);
507 if (e->ev_cust_handles == NULL) {
508 ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: OutOfMemory */
509 return ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */
510 }
511
512 e->channel = channel;
513 e->isup = ARES_TRUE;
514 e->ev_sys = ares_event_fetch_sys(channel->evsys);
515 if (e->ev_sys == NULL) {
516 ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: UntestablePath */
517 return ARES_ENOTIMP; /* LCOV_EXCL_LINE: UntestablePath */
518 }
519
520 channel->sock_state_cb = ares_event_thread_sockstate_cb;
521 channel->sock_state_cb_data = e;
522 channel->notify_pending_write_cb = notifywrite_cb;
523 channel->notify_pending_write_cb_data = e;
524
525 if (!e->ev_sys->init(e)) {
526 /* LCOV_EXCL_START: UntestablePath */
527 ares_event_thread_destroy_int(e);
528 channel->sock_state_cb = NULL;
529 channel->sock_state_cb_data = NULL;
530 return ARES_ESERVFAIL;
531 /* LCOV_EXCL_STOP */
532 }
533
534 /* Before starting the thread, process any possible events the initialization
535 * might have enqueued as we may actually depend on these being valid
536 * immediately upon return, which may mean before the thread is fully spawned
537 * and processed the list itself. We don't want any sort of race conditions
538 * (like the event system wake handle itself). */
539 ares_event_process_updates(e);
540
541 /* Start thread */
542 if (ares_thread_create(&e->thread, ares_event_thread, e) != ARES_SUCCESS) {
543 /* LCOV_EXCL_START: UntestablePath */
544 ares_event_thread_destroy_int(e);
545 channel->sock_state_cb = NULL;
546 channel->sock_state_cb_data = NULL;
547 return ARES_ESERVFAIL;
548 /* LCOV_EXCL_STOP */
549 }
550
551 return ARES_SUCCESS;
552 }
553
554 #else
555
ares_event_thread_init(ares_channel_t * channel)556 ares_status_t ares_event_thread_init(ares_channel_t *channel)
557 {
558 (void)channel;
559 return ARES_ENOTIMP;
560 }
561
ares_event_thread_destroy(ares_channel_t * channel)562 void ares_event_thread_destroy(ares_channel_t *channel)
563 {
564 (void)channel;
565 }
566
567 #endif
568