1 /*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25 #if !defined(_GNU_SOURCE)
26 #define _GNU_SOURCE
27 #endif
28
29 #include <pthread.h>
30
31 #include "private-lib-core.h"
32
33 #include <string.h>
34 #include <stdio.h>
35
36 struct lws_threadpool;
37
38 struct lws_threadpool_task {
39 struct lws_threadpool_task *task_queue_next;
40
41 struct lws_threadpool *tp;
42 char name[32];
43 struct lws_threadpool_task_args args;
44
45 lws_usec_t created;
46 lws_usec_t acquired;
47 lws_usec_t done;
48 lws_usec_t entered_state;
49
50 lws_usec_t acc_running;
51 lws_usec_t acc_syncing;
52
53 pthread_cond_t wake_idle;
54
55 enum lws_threadpool_task_status status;
56
57 int late_sync_retries;
58
59 char wanted_writeable_cb;
60 char outlive;
61 };
62
63 struct lws_pool {
64 struct lws_threadpool *tp;
65 pthread_t thread;
66 pthread_mutex_t lock; /* part of task wake_idle */
67 struct lws_threadpool_task *task;
68 lws_usec_t acquired;
69 int worker_index;
70 };
71
72 struct lws_threadpool {
73 pthread_mutex_t lock; /* protects all pool lists */
74 pthread_cond_t wake_idle;
75 struct lws_pool *pool_list;
76
77 struct lws_context *context;
78 struct lws_threadpool *tp_list; /* context list of threadpools */
79
80 struct lws_threadpool_task *task_queue_head;
81 struct lws_threadpool_task *task_done_head;
82
83 char name[32];
84
85 int threads_in_pool;
86 int queue_depth;
87 int done_queue_depth;
88 int max_queue_depth;
89 int running_tasks;
90
91 unsigned int destroying:1;
92 };
93
94 static int
ms_delta(lws_usec_t now,lws_usec_t then)95 ms_delta(lws_usec_t now, lws_usec_t then)
96 {
97 return (int)((now - then) / 1000);
98 }
99
100 static void
us_accrue(lws_usec_t * acc,lws_usec_t then)101 us_accrue(lws_usec_t *acc, lws_usec_t then)
102 {
103 lws_usec_t now = lws_now_usecs();
104
105 *acc += now - then;
106 }
107
108 static int
pc_delta(lws_usec_t now,lws_usec_t then,lws_usec_t us)109 pc_delta(lws_usec_t now, lws_usec_t then, lws_usec_t us)
110 {
111 lws_usec_t delta = (now - then) + 1;
112
113 return (int)((us * 100) / delta);
114 }
115
116 static void
__lws_threadpool_task_dump(struct lws_threadpool_task * task,char * buf,int len)117 __lws_threadpool_task_dump(struct lws_threadpool_task *task, char *buf, int len)
118 {
119 lws_usec_t now = lws_now_usecs();
120 char *end = buf + len - 1;
121 int syncms = 0, runms = 0;
122
123 if (!task->acquired) {
124 buf += lws_snprintf(buf, end - buf,
125 "task: %s, QUEUED queued: %dms",
126 task->name, ms_delta(now, task->created));
127
128 return;
129 }
130
131 if (task->acc_running)
132 runms = task->acc_running;
133
134 if (task->acc_syncing)
135 syncms = task->acc_syncing;
136
137 if (!task->done) {
138 buf += lws_snprintf(buf, end - buf,
139 "task: %s, ONGOING state %d (%dms) alive: %dms "
140 "(queued %dms, acquired: %dms, "
141 "run: %d%%, sync: %d%%)", task->name, task->status,
142 ms_delta(now, task->entered_state),
143 ms_delta(now, task->created),
144 ms_delta(task->acquired, task->created),
145 ms_delta(now, task->acquired),
146 pc_delta(now, task->acquired, runms),
147 pc_delta(now, task->acquired, syncms));
148
149 return;
150 }
151
152 lws_snprintf(buf, end - buf,
153 "task: %s, DONE state %d lived: %dms "
154 "(queued %dms, on thread: %dms, "
155 "ran: %d%%, synced: %d%%)", task->name, task->status,
156 ms_delta(task->done, task->created),
157 ms_delta(task->acquired, task->created),
158 ms_delta(task->done, task->acquired),
159 pc_delta(task->done, task->acquired, runms),
160 pc_delta(task->done, task->acquired, syncms));
161 }
162
163 void
lws_threadpool_dump(struct lws_threadpool * tp)164 lws_threadpool_dump(struct lws_threadpool *tp)
165 {
166 #if defined(_DEBUG)
167 struct lws_threadpool_task **c;
168 char buf[160];
169 int n, count;
170
171 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
172
173 lwsl_thread("%s: tp: %s, Queued: %d, Run: %d, Done: %d\n", __func__,
174 tp->name, tp->queue_depth, tp->running_tasks,
175 tp->done_queue_depth);
176
177 count = 0;
178 c = &tp->task_queue_head;
179 while (*c) {
180 struct lws_threadpool_task *task = *c;
181 __lws_threadpool_task_dump(task, buf, sizeof(buf));
182 lwsl_thread(" - %s\n", buf);
183 count++;
184
185 c = &(*c)->task_queue_next;
186 }
187
188 if (count != tp->queue_depth)
189 lwsl_err("%s: tp says queue depth %d, but actually %d\n",
190 __func__, tp->queue_depth, count);
191
192 count = 0;
193 for (n = 0; n < tp->threads_in_pool; n++) {
194 struct lws_pool *pool = &tp->pool_list[n];
195 struct lws_threadpool_task *task = pool->task;
196
197 if (task) {
198 __lws_threadpool_task_dump(task, buf, sizeof(buf));
199 lwsl_thread(" - worker %d: %s\n", n, buf);
200 count++;
201 }
202 }
203
204 if (count != tp->running_tasks)
205 lwsl_err("%s: tp says %d running_tasks, but actually %d\n",
206 __func__, tp->running_tasks, count);
207
208 count = 0;
209 c = &tp->task_done_head;
210 while (*c) {
211 struct lws_threadpool_task *task = *c;
212 __lws_threadpool_task_dump(task, buf, sizeof(buf));
213 lwsl_thread(" - %s\n", buf);
214 count++;
215
216 c = &(*c)->task_queue_next;
217 }
218
219 if (count != tp->done_queue_depth)
220 lwsl_err("%s: tp says done_queue_depth %d, but actually %d\n",
221 __func__, tp->done_queue_depth, count);
222
223 pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
224 #endif
225 }
226
227 static void
state_transition(struct lws_threadpool_task * task,enum lws_threadpool_task_status status)228 state_transition(struct lws_threadpool_task *task,
229 enum lws_threadpool_task_status status)
230 {
231 task->entered_state = lws_now_usecs();
232 task->status = status;
233 }
234
235 static void
lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task * task)236 lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task)
237 {
238 if (task->args.cleanup)
239 task->args.cleanup(task->args.wsi, task->args.user);
240
241 if (task->args.wsi)
242 task->args.wsi->tp_task = NULL;
243
244 lwsl_thread("%s: tp %p: cleaned finished task for wsi %p\n",
245 __func__, task->tp, task->args.wsi);
246
247 lws_free(task);
248 }
249
250 static void
__lws_threadpool_reap(struct lws_threadpool_task * task)251 __lws_threadpool_reap(struct lws_threadpool_task *task)
252 {
253 struct lws_threadpool_task **c, *t = NULL;
254 struct lws_threadpool *tp = task->tp;
255
256 /* remove the task from the done queue */
257
258 c = &tp->task_done_head;
259
260 while (*c) {
261 if ((*c) == task) {
262 t = *c;
263 *c = t->task_queue_next;
264 t->task_queue_next = NULL;
265 tp->done_queue_depth--;
266
267 lwsl_thread("%s: tp %s: reaped task wsi %p\n", __func__,
268 tp->name, task->args.wsi);
269
270 break;
271 }
272 c = &(*c)->task_queue_next;
273 }
274
275 if (!t)
276 lwsl_err("%s: task %p not in done queue\n", __func__, task);
277
278 /* call the task's cleanup and delete the task itself */
279
280 lws_threadpool_task_cleanup_destroy(task);
281 }
282
283 /*
284 * this gets called from each tsi service context after the service was
285 * cancelled... we need to ask for the writable callback from the matching
286 * tsi context for any wsis bound to a worked thread that need it
287 */
288
289 int
lws_threadpool_tsi_context(struct lws_context * context,int tsi)290 lws_threadpool_tsi_context(struct lws_context *context, int tsi)
291 {
292 struct lws_threadpool_task **c, *task = NULL;
293 struct lws_threadpool *tp;
294 struct lws *wsi;
295
296 lws_context_lock(context, __func__);
297
298 tp = context->tp_list_head;
299 while (tp) {
300 int n;
301
302 /* for the running (syncing...) tasks... */
303
304 for (n = 0; n < tp->threads_in_pool; n++) {
305 struct lws_pool *pool = &tp->pool_list[n];
306
307 task = pool->task;
308 if (!task)
309 continue;
310
311 wsi = task->args.wsi;
312 if (!wsi || wsi->tsi != tsi ||
313 !task->wanted_writeable_cb)
314 continue;
315
316 task->wanted_writeable_cb = 0;
317 lws_memory_barrier();
318
319 /*
320 * finally... we can ask for the callback on
321 * writable from the correct service thread
322 * context
323 */
324
325 lws_callback_on_writable(wsi);
326 }
327
328 /* for the done tasks... */
329
330 c = &tp->task_done_head;
331
332 while (*c) {
333 task = *c;
334 wsi = task->args.wsi;
335
336 if (wsi && wsi->tsi == tsi &&
337 task->wanted_writeable_cb) {
338
339 task->wanted_writeable_cb = 0;
340 lws_memory_barrier();
341
342 /*
343 * finally... we can ask for the callback on
344 * writable from the correct service thread
345 * context
346 */
347
348 lws_callback_on_writable(wsi);
349 }
350
351 c = &task->task_queue_next;
352 }
353
354 tp = tp->tp_list;
355 }
356
357 lws_context_unlock(context);
358
359 return 0;
360 }
361
362 static int
lws_threadpool_worker_sync(struct lws_pool * pool,struct lws_threadpool_task * task)363 lws_threadpool_worker_sync(struct lws_pool *pool,
364 struct lws_threadpool_task *task)
365 {
366 enum lws_threadpool_task_status temp;
367 struct timespec abstime;
368 struct lws *wsi;
369 int tries = 15;
370
371 /* block until writable acknowledges */
372 lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC in\n", __func__, task);
373 pthread_mutex_lock(&pool->lock); /* ======================= pool lock */
374
375 lwsl_info("%s: %s: task %p (%s): syncing with wsi %p\n", __func__,
376 pool->tp->name, task, task->name, task->args.wsi);
377
378 temp = task->status;
379 state_transition(task, LWS_TP_STATUS_SYNCING);
380 while (tries--) {
381 wsi = task->args.wsi;
382
383 /*
384 * if the wsi is no longer attached to this task, there is
385 * nothing we can sync to usefully. Since the work wants to
386 * sync, it means we should react to the situation by telling
387 * the task it can't continue usefully by stopping it.
388 */
389
390 if (!wsi) {
391 lwsl_thread("%s: %s: task %p (%s): No longer bound to any "
392 "wsi to sync to\n", __func__, pool->tp->name,
393 task, task->name);
394
395 state_transition(task, LWS_TP_STATUS_STOPPING);
396 goto done;
397 }
398
399 /*
400 * So tries times this is the maximum time between SYNC asking
401 * for a callback on writable and actually getting it we are
402 * willing to sit still for.
403 *
404 * If it is exceeded, we will stop the task.
405 */
406 abstime.tv_sec = time(NULL) + 2;
407 abstime.tv_nsec = 0;
408
409 task->wanted_writeable_cb = 1;
410 lws_memory_barrier();
411
412 /*
413 * This will cause lws_threadpool_tsi_context() to get called
414 * from each tsi service context, where we can safely ask for
415 * a callback on writeable on the wsi we are associated with.
416 */
417 lws_cancel_service(lws_get_context(wsi));
418
419 /*
420 * so the danger here is that we asked for a writable callback
421 * on the wsi, but for whatever reason, we are never going to
422 * get one. To avoid deadlocking forever, we allow a set time
423 * for the sync to happen naturally, otherwise the cond wait
424 * times out and we stop the task.
425 */
426
427 if (pthread_cond_timedwait(&task->wake_idle, &pool->lock,
428 &abstime) == ETIMEDOUT) {
429 task->late_sync_retries++;
430 if (!tries) {
431 lwsl_err("%s: %s: task %p (%s): SYNC timed out "
432 "(associated wsi %p)\n",
433 __func__, pool->tp->name, task,
434 task->name, task->args.wsi);
435
436 state_transition(task, LWS_TP_STATUS_STOPPING);
437 goto done;
438 }
439
440 continue;
441 } else
442 break;
443 }
444
445 if (task->status == LWS_TP_STATUS_SYNCING)
446 state_transition(task, temp);
447
448 lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC out\n", __func__, task);
449
450 done:
451 pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */
452
453 return 0;
454 }
455
456 static void *
lws_threadpool_worker(void * d)457 lws_threadpool_worker(void *d)
458 {
459 struct lws_threadpool_task **c, **c2, *task;
460 struct lws_pool *pool = d;
461 struct lws_threadpool *tp = pool->tp;
462 char buf[160];
463
464 while (!tp->destroying) {
465
466 /* we have no running task... wait and get one from the queue */
467
468 pthread_mutex_lock(&tp->lock); /* =================== tp lock */
469
470 /*
471 * if there's no task already waiting in the queue, wait for
472 * the wake_idle condition to signal us that might have changed
473 */
474 while (!tp->task_queue_head && !tp->destroying)
475 pthread_cond_wait(&tp->wake_idle, &tp->lock);
476
477 if (tp->destroying) {
478 pthread_mutex_unlock(&tp->lock); /* ------ tp unlock */
479 continue;
480 }
481
482 c = &tp->task_queue_head;
483 c2 = NULL;
484 task = NULL;
485 pool->task = NULL;
486
487 /* look at the queue tail */
488 while (*c) {
489 c2 = c;
490 c = &(*c)->task_queue_next;
491 }
492
493 /* is there a task at the queue tail? */
494 if (c2 && *c2) {
495 pool->task = task = *c2;
496 task->acquired = pool->acquired = lws_now_usecs();
497 /* remove it from the queue */
498 *c2 = task->task_queue_next;
499 task->task_queue_next = NULL;
500 tp->queue_depth--;
501 /* mark it as running */
502 state_transition(task, LWS_TP_STATUS_RUNNING);
503 }
504
505 /* someone else got it first... wait and try again */
506 if (!task) {
507 pthread_mutex_unlock(&tp->lock); /* ------ tp unlock */
508 continue;
509 }
510
511 task->wanted_writeable_cb = 0;
512
513 /* we have acquired a new task */
514
515 __lws_threadpool_task_dump(task, buf, sizeof(buf));
516
517 lwsl_thread("%s: %s: worker %d ACQUIRING: %s\n",
518 __func__, tp->name, pool->worker_index, buf);
519 tp->running_tasks++;
520
521 pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
522
523 /*
524 * 1) The task can return with LWS_TP_RETURN_CHECKING_IN to
525 * "resurface" periodically, and get called again with
526 * cont = 1 immediately to indicate it is picking up where it
527 * left off if the task is not being "stopped".
528 *
529 * This allows long tasks to respond to requests to stop in
530 * a clean and opaque way.
531 *
532 * 2) The task can return with LWS_TP_RETURN_SYNC to register
533 * a "callback on writable" request on the service thread and
534 * block until it hears back from the WRITABLE handler.
535 *
536 * This allows the work on the thread to be synchronized to the
537 * previous work being dispatched cleanly.
538 *
539 * 3) The task can return with LWS_TP_RETURN_FINISHED to
540 * indicate its work is completed nicely.
541 *
542 * 4) The task can return with LWS_TP_RETURN_STOPPED to indicate
543 * it stopped and cleaned up after incomplete work.
544 */
545
546 do {
547 lws_usec_t then;
548 int n;
549
550 if (tp->destroying || !task->args.wsi) {
551 lwsl_info("%s: stopping on wsi gone\n", __func__);
552 state_transition(task, LWS_TP_STATUS_STOPPING);
553 }
554
555 then = lws_now_usecs();
556 n = task->args.task(task->args.user, task->status);
557 lwsl_debug(" %d, status %d\n", n, task->status);
558 us_accrue(&task->acc_running, then);
559 if (n & LWS_TP_RETURN_FLAG_OUTLIVE)
560 task->outlive = 1;
561 switch (n & 7) {
562 case LWS_TP_RETURN_CHECKING_IN:
563 /* if not destroying the tp, continue */
564 break;
565 case LWS_TP_RETURN_SYNC:
566 if (!task->args.wsi) {
567 lwsl_debug("%s: task that wants to "
568 "outlive lost wsi asked "
569 "to sync: bypassed\n",
570 __func__);
571 break;
572 }
573 /* block until writable acknowledges */
574 then = lws_now_usecs();
575 lws_threadpool_worker_sync(pool, task);
576 us_accrue(&task->acc_syncing, then);
577 break;
578 case LWS_TP_RETURN_FINISHED:
579 state_transition(task, LWS_TP_STATUS_FINISHED);
580 break;
581 case LWS_TP_RETURN_STOPPED:
582 state_transition(task, LWS_TP_STATUS_STOPPED);
583 break;
584 }
585 } while (task->status == LWS_TP_STATUS_RUNNING);
586
587 pthread_mutex_lock(&tp->lock); /* =================== tp lock */
588
589 tp->running_tasks--;
590
591 if (pool->task->status == LWS_TP_STATUS_STOPPING)
592 state_transition(task, LWS_TP_STATUS_STOPPED);
593
594 /* move the task to the done queue */
595
596 pool->task->task_queue_next = tp->task_done_head;
597 tp->task_done_head = task;
598 tp->done_queue_depth++;
599 pool->task->done = lws_now_usecs();
600
601 if (!pool->task->args.wsi &&
602 (pool->task->status == LWS_TP_STATUS_STOPPED ||
603 pool->task->status == LWS_TP_STATUS_FINISHED)) {
604
605 __lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
606 lwsl_thread("%s: %s: worker %d REAPING: %s\n",
607 __func__, tp->name, pool->worker_index,
608 buf);
609
610 /*
611 * there is no longer any wsi attached, so nothing is
612 * going to take care of reaping us. So we must take
613 * care of it ourselves.
614 */
615 __lws_threadpool_reap(pool->task);
616 } else {
617
618 __lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
619 lwsl_thread("%s: %s: worker %d DONE: %s\n",
620 __func__, tp->name, pool->worker_index,
621 buf);
622
623 /* signal the associated wsi to take a fresh look at
624 * task status */
625
626 if (pool->task->args.wsi) {
627 task->wanted_writeable_cb = 1;
628
629 lws_cancel_service(
630 lws_get_context(pool->task->args.wsi));
631 }
632 }
633
634 pool->task = NULL;
635 pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
636 }
637
638 /* threadpool is being destroyed */
639
640 pthread_exit(NULL);
641
642 return NULL;
643 }
644
645 struct lws_threadpool *
lws_threadpool_create(struct lws_context * context,const struct lws_threadpool_create_args * args,const char * format,...)646 lws_threadpool_create(struct lws_context *context,
647 const struct lws_threadpool_create_args *args,
648 const char *format, ...)
649 {
650 struct lws_threadpool *tp;
651 va_list ap;
652 int n;
653
654 tp = lws_malloc(sizeof(*tp) + (sizeof(struct lws_pool) * args->threads),
655 "threadpool alloc");
656 if (!tp)
657 return NULL;
658
659 memset(tp, 0, sizeof(*tp) + (sizeof(struct lws_pool) * args->threads));
660 tp->pool_list = (struct lws_pool *)(tp + 1);
661 tp->max_queue_depth = args->max_queue_depth;
662
663 va_start(ap, format);
664 n = vsnprintf(tp->name, sizeof(tp->name) - 1, format, ap);
665 va_end(ap);
666
667 lws_context_lock(context, __func__);
668
669 tp->context = context;
670 tp->tp_list = context->tp_list_head;
671 context->tp_list_head = tp;
672
673 lws_context_unlock(context);
674
675 pthread_mutex_init(&tp->lock, NULL);
676 pthread_cond_init(&tp->wake_idle, NULL);
677
678 for (n = 0; n < args->threads; n++) {
679 #if defined(LWS_HAS_PTHREAD_SETNAME_NP)
680 char name[16];
681 #endif
682 tp->pool_list[n].tp = tp;
683 tp->pool_list[n].worker_index = n;
684 pthread_mutex_init(&tp->pool_list[n].lock, NULL);
685 if (pthread_create(&tp->pool_list[n].thread, NULL,
686 lws_threadpool_worker, &tp->pool_list[n])) {
687 lwsl_err("thread creation failed\n");
688 } else {
689 #if defined(LWS_HAS_PTHREAD_SETNAME_NP)
690 lws_snprintf(name, sizeof(name), "%s-%d", tp->name, n);
691 pthread_setname_np(tp->pool_list[n].thread, name);
692 #endif
693 tp->threads_in_pool++;
694 }
695 }
696
697 return tp;
698 }
699
700 void
lws_threadpool_finish(struct lws_threadpool * tp)701 lws_threadpool_finish(struct lws_threadpool *tp)
702 {
703 struct lws_threadpool_task **c, *task;
704
705 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
706
707 /* nothing new can start, running jobs will abort as STOPPED and the
708 * pool threads will exit ASAP (they are joined in destroy) */
709 tp->destroying = 1;
710
711 /* stop everyone in the pending queue and move to the done queue */
712
713 c = &tp->task_queue_head;
714 while (*c) {
715 task = *c;
716 *c = task->task_queue_next;
717 task->task_queue_next = tp->task_done_head;
718 tp->task_done_head = task;
719 state_transition(task, LWS_TP_STATUS_STOPPED);
720 tp->queue_depth--;
721 tp->done_queue_depth++;
722 task->done = lws_now_usecs();
723
724 c = &task->task_queue_next;
725 }
726
727 pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
728
729 pthread_cond_broadcast(&tp->wake_idle);
730 }
731
732 void
lws_threadpool_destroy(struct lws_threadpool * tp)733 lws_threadpool_destroy(struct lws_threadpool *tp)
734 {
735 struct lws_threadpool_task *task, *next;
736 struct lws_threadpool **ptp;
737 void *retval;
738 int n;
739
740 /* remove us from the context list of threadpools */
741
742 lws_context_lock(tp->context, __func__);
743
744 ptp = &tp->context->tp_list_head;
745 while (*ptp) {
746 if (*ptp == tp) {
747 *ptp = tp->tp_list;
748 break;
749 }
750 ptp = &(*ptp)->tp_list;
751 }
752
753 lws_context_unlock(tp->context);
754
755
756 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
757
758 tp->destroying = 1;
759 pthread_cond_broadcast(&tp->wake_idle);
760 pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
761
762 lws_threadpool_dump(tp);
763
764 for (n = 0; n < tp->threads_in_pool; n++) {
765 task = tp->pool_list[n].task;
766
767 /* he could be sitting waiting for SYNC */
768
769 if (task != NULL)
770 pthread_cond_broadcast(&task->wake_idle);
771
772 pthread_join(tp->pool_list[n].thread, &retval);
773 pthread_mutex_destroy(&tp->pool_list[n].lock);
774 }
775 lwsl_info("%s: all threadpools exited\n", __func__);
776
777 task = tp->task_done_head;
778 while (task) {
779 next = task->task_queue_next;
780 lws_threadpool_task_cleanup_destroy(task);
781 tp->done_queue_depth--;
782 task = next;
783 }
784
785 pthread_mutex_destroy(&tp->lock);
786
787 lws_free(tp);
788 }
789
790 /*
791 * we want to stop and destroy the task and related priv. The wsi may no
792 * longer exist.
793 */
794
795 int
lws_threadpool_dequeue(struct lws * wsi)796 lws_threadpool_dequeue(struct lws *wsi)
797 {
798 struct lws_threadpool *tp;
799 struct lws_threadpool_task **c, *task;
800 int n;
801
802 task = wsi->tp_task;
803 if (!task)
804 return 0;
805
806 tp = task->tp;
807 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
808
809 if (task->outlive && !tp->destroying) {
810
811 /* disconnect from wsi, and wsi from task */
812
813 wsi->tp_task = NULL;
814 task->args.wsi = NULL;
815
816 goto bail;
817 }
818
819
820 c = &tp->task_queue_head;
821
822 /* is he queued waiting for a chance to run? Mark him as stopped and
823 * move him on to the done queue */
824
825 while (*c) {
826 if ((*c) == task) {
827 *c = task->task_queue_next;
828 task->task_queue_next = tp->task_done_head;
829 tp->task_done_head = task;
830 state_transition(task, LWS_TP_STATUS_STOPPED);
831 tp->queue_depth--;
832 tp->done_queue_depth++;
833 task->done = lws_now_usecs();
834
835 lwsl_debug("%s: tp %p: removed queued task wsi %p\n",
836 __func__, tp, task->args.wsi);
837
838 break;
839 }
840 c = &(*c)->task_queue_next;
841 }
842
843 /* is he on the done queue? */
844
845 c = &tp->task_done_head;
846 while (*c) {
847 if ((*c) == task) {
848 *c = task->task_queue_next;
849 task->task_queue_next = NULL;
850 lws_threadpool_task_cleanup_destroy(task);
851 tp->done_queue_depth--;
852 goto bail;
853 }
854 c = &(*c)->task_queue_next;
855 }
856
857 /* he's not in the queue... is he already running on a thread? */
858
859 for (n = 0; n < tp->threads_in_pool; n++) {
860 if (!tp->pool_list[n].task || tp->pool_list[n].task != task)
861 continue;
862
863 /*
864 * ensure we don't collide with tests or changes in the
865 * worker thread
866 */
867 pthread_mutex_lock(&tp->pool_list[n].lock);
868
869 /*
870 * mark him as having been requested to stop...
871 * the caller will hear about it in his service thread
872 * context as a request to close
873 */
874 state_transition(task, LWS_TP_STATUS_STOPPING);
875
876 /* disconnect from wsi, and wsi from task */
877
878 task->args.wsi->tp_task = NULL;
879 task->args.wsi = NULL;
880
881 pthread_mutex_unlock(&tp->pool_list[n].lock);
882
883 lwsl_debug("%s: tp %p: request stop running task "
884 "for wsi %p\n", __func__, tp, task->args.wsi);
885
886 break;
887 }
888
889 if (n == tp->threads_in_pool) {
890 /* can't find it */
891 lwsl_notice("%s: tp %p: no task for wsi %p, decoupling\n",
892 __func__, tp, task->args.wsi);
893 task->args.wsi->tp_task = NULL;
894 task->args.wsi = NULL;
895 }
896
897 bail:
898 pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
899
900 return 0;
901 }
902
903 struct lws_threadpool_task *
lws_threadpool_enqueue(struct lws_threadpool * tp,const struct lws_threadpool_task_args * args,const char * format,...)904 lws_threadpool_enqueue(struct lws_threadpool *tp,
905 const struct lws_threadpool_task_args *args,
906 const char *format, ...)
907 {
908 struct lws_threadpool_task *task = NULL;
909 va_list ap;
910
911 if (tp->destroying)
912 return NULL;
913
914 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
915
916 /*
917 * if there's room on the queue, the job always goes on the queue
918 * first, then any free thread may pick it up after the wake_idle
919 */
920
921 if (tp->queue_depth == tp->max_queue_depth) {
922 lwsl_notice("%s: queue reached limit %d\n", __func__,
923 tp->max_queue_depth);
924
925 goto bail;
926 }
927
928 /*
929 * create the task object
930 */
931
932 task = lws_malloc(sizeof(*task), __func__);
933 if (!task)
934 goto bail;
935
936 memset(task, 0, sizeof(*task));
937 pthread_cond_init(&task->wake_idle, NULL);
938 task->args = *args;
939 task->tp = tp;
940 task->created = lws_now_usecs();
941
942 va_start(ap, format);
943 vsnprintf(task->name, sizeof(task->name) - 1, format, ap);
944 va_end(ap);
945
946 /*
947 * add him on the tp task queue
948 */
949
950 task->task_queue_next = tp->task_queue_head;
951 state_transition(task, LWS_TP_STATUS_QUEUED);
952 tp->task_queue_head = task;
953 tp->queue_depth++;
954
955 /*
956 * mark the wsi itself as depending on this tp (so wsi close for
957 * whatever reason can clean up)
958 */
959
960 args->wsi->tp_task = task;
961
962 lwsl_thread("%s: tp %s: enqueued task %p (%s) for wsi %p, depth %d\n",
963 __func__, tp->name, task, task->name, args->wsi,
964 tp->queue_depth);
965
966 /* alert any idle thread there's something new on the task list */
967
968 lws_memory_barrier();
969 pthread_cond_signal(&tp->wake_idle);
970
971 bail:
972 pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
973
974 return task;
975 }
976
977 /* this should be called from the service thread */
978
979 enum lws_threadpool_task_status
lws_threadpool_task_status_wsi(struct lws * wsi,struct lws_threadpool_task ** task,void ** user)980 lws_threadpool_task_status_wsi(struct lws *wsi,
981 struct lws_threadpool_task **task, void **user)
982 {
983 enum lws_threadpool_task_status status;
984 struct lws_threadpool *tp;
985
986 *task = wsi->tp_task;
987 if (!*task)
988 return -1;
989
990 tp = (*task)->tp;
991 *user = (*task)->args.user;
992 status = (*task)->status;
993
994 if (status == LWS_TP_STATUS_FINISHED ||
995 status == LWS_TP_STATUS_STOPPED) {
996 char buf[160];
997
998 pthread_mutex_lock(&tp->lock); /* ================ tpool lock */
999 __lws_threadpool_task_dump(*task, buf, sizeof(buf));
1000 lwsl_thread("%s: %s: service thread REAPING: %s\n",
1001 __func__, tp->name, buf);
1002 __lws_threadpool_reap(*task);
1003 lws_memory_barrier();
1004 pthread_mutex_unlock(&tp->lock); /* ------------ tpool unlock */
1005 }
1006
1007 return status;
1008 }
1009
1010 void
lws_threadpool_task_sync(struct lws_threadpool_task * task,int stop)1011 lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop)
1012 {
1013 lwsl_debug("%s\n", __func__);
1014
1015 if (stop)
1016 state_transition(task, LWS_TP_STATUS_STOPPING);
1017
1018 pthread_cond_signal(&task->wake_idle);
1019 }
1020