1 /*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25 #if !defined(_GNU_SOURCE)
26 #define _GNU_SOURCE
27 #endif
28
29 #if defined(WIN32)
30 #define HAVE_STRUCT_TIMESPEC
31 #if defined(pid_t)
32 #undef pid_t
33 #endif
34 #endif
35 #include <pthread.h>
36
37 #include "private-lib-core.h"
38
39 #include <string.h>
40 #include <stdio.h>
41
42 struct lws_threadpool;
43
44 struct lws_threadpool_task {
45 struct lws_threadpool_task *task_queue_next;
46
47 struct lws_threadpool *tp;
48 char name[32];
49 struct lws_threadpool_task_args args;
50
51 lws_dll2_t list;
52
53 lws_usec_t created;
54 lws_usec_t acquired;
55 lws_usec_t done;
56 lws_usec_t entered_state;
57
58 lws_usec_t acc_running;
59 lws_usec_t acc_syncing;
60
61 pthread_cond_t wake_idle;
62
63 enum lws_threadpool_task_status status;
64
65 int late_sync_retries;
66
67 char wanted_writeable_cb;
68 char outlive;
69 };
70
71 struct lws_pool {
72 struct lws_threadpool *tp;
73 pthread_t thread;
74 pthread_mutex_t lock; /* part of task wake_idle */
75 struct lws_threadpool_task *task;
76 lws_usec_t acquired;
77 int worker_index;
78 };
79
80 struct lws_threadpool {
81 pthread_mutex_t lock; /* protects all pool lists */
82 pthread_cond_t wake_idle;
83 struct lws_pool *pool_list;
84
85 struct lws_context *context;
86 struct lws_threadpool *tp_list; /* context list of threadpools */
87
88 struct lws_threadpool_task *task_queue_head;
89 struct lws_threadpool_task *task_done_head;
90
91 char name[32];
92
93 int threads_in_pool;
94 int queue_depth;
95 int done_queue_depth;
96 int max_queue_depth;
97 int running_tasks;
98
99 unsigned int destroying:1;
100 };
101
102 static int
ms_delta(lws_usec_t now,lws_usec_t then)103 ms_delta(lws_usec_t now, lws_usec_t then)
104 {
105 return (int)((now - then) / 1000);
106 }
107
108 static void
us_accrue(lws_usec_t * acc,lws_usec_t then)109 us_accrue(lws_usec_t *acc, lws_usec_t then)
110 {
111 lws_usec_t now = lws_now_usecs();
112
113 *acc += now - then;
114 }
115
116 static int
pc_delta(lws_usec_t now,lws_usec_t then,lws_usec_t us)117 pc_delta(lws_usec_t now, lws_usec_t then, lws_usec_t us)
118 {
119 lws_usec_t delta = (now - then) + 1;
120
121 return (int)((us * 100) / delta);
122 }
123
124 static void
__lws_threadpool_task_dump(struct lws_threadpool_task * task,char * buf,int len)125 __lws_threadpool_task_dump(struct lws_threadpool_task *task, char *buf, int len)
126 {
127 lws_usec_t now = lws_now_usecs();
128 char *end = buf + len - 1;
129 int syncms = 0, runms = 0;
130
131 if (!task->acquired) {
132 buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
133 "task: %s, QUEUED queued: %dms",
134 task->name, ms_delta(now, task->created));
135
136 return;
137 }
138
139 if (task->acc_running)
140 runms = (int)task->acc_running;
141
142 if (task->acc_syncing)
143 syncms = (int)task->acc_syncing;
144
145 if (!task->done) {
146 buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
147 "task: %s, ONGOING state %d (%dms) alive: %dms "
148 "(queued %dms, acquired: %dms, "
149 "run: %d%%, sync: %d%%)", task->name, task->status,
150 ms_delta(now, task->entered_state),
151 ms_delta(now, task->created),
152 ms_delta(task->acquired, task->created),
153 ms_delta(now, task->acquired),
154 pc_delta(now, task->acquired, runms),
155 pc_delta(now, task->acquired, syncms));
156
157 return;
158 }
159
160 lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
161 "task: %s, DONE state %d lived: %dms "
162 "(queued %dms, on thread: %dms, "
163 "ran: %d%%, synced: %d%%)", task->name, task->status,
164 ms_delta(task->done, task->created),
165 ms_delta(task->acquired, task->created),
166 ms_delta(task->done, task->acquired),
167 pc_delta(task->done, task->acquired, runms),
168 pc_delta(task->done, task->acquired, syncms));
169 }
170
171 void
lws_threadpool_dump(struct lws_threadpool * tp)172 lws_threadpool_dump(struct lws_threadpool *tp)
173 {
174 #if 0
175 //defined(_DEBUG)
176 struct lws_threadpool_task **c;
177 char buf[160];
178 int n, count;
179
180 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
181
182 lwsl_thread("%s: tp: %s, Queued: %d, Run: %d, Done: %d\n", __func__,
183 tp->name, tp->queue_depth, tp->running_tasks,
184 tp->done_queue_depth);
185
186 count = 0;
187 c = &tp->task_queue_head;
188 while (*c) {
189 struct lws_threadpool_task *task = *c;
190 __lws_threadpool_task_dump(task, buf, sizeof(buf));
191 lwsl_thread(" - %s\n", buf);
192 count++;
193
194 c = &(*c)->task_queue_next;
195 }
196
197 if (count != tp->queue_depth)
198 lwsl_err("%s: tp says queue depth %d, but actually %d\n",
199 __func__, tp->queue_depth, count);
200
201 count = 0;
202 for (n = 0; n < tp->threads_in_pool; n++) {
203 struct lws_pool *pool = &tp->pool_list[n];
204 struct lws_threadpool_task *task = pool->task;
205
206 if (task) {
207 __lws_threadpool_task_dump(task, buf, sizeof(buf));
208 lwsl_thread(" - worker %d: %s\n", n, buf);
209 count++;
210 }
211 }
212
213 if (count != tp->running_tasks)
214 lwsl_err("%s: tp says %d running_tasks, but actually %d\n",
215 __func__, tp->running_tasks, count);
216
217 count = 0;
218 c = &tp->task_done_head;
219 while (*c) {
220 struct lws_threadpool_task *task = *c;
221 __lws_threadpool_task_dump(task, buf, sizeof(buf));
222 lwsl_thread(" - %s\n", buf);
223 count++;
224
225 c = &(*c)->task_queue_next;
226 }
227
228 if (count != tp->done_queue_depth)
229 lwsl_err("%s: tp says done_queue_depth %d, but actually %d\n",
230 __func__, tp->done_queue_depth, count);
231
232 pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
233 #endif
234 }
235
236 static void
state_transition(struct lws_threadpool_task * task,enum lws_threadpool_task_status status)237 state_transition(struct lws_threadpool_task *task,
238 enum lws_threadpool_task_status status)
239 {
240 task->entered_state = lws_now_usecs();
241 task->status = status;
242 }
243
244 static struct lws *
task_to_wsi(struct lws_threadpool_task * task)245 task_to_wsi(struct lws_threadpool_task *task)
246 {
247 #if defined(LWS_WITH_SECURE_STREAMS)
248 if (task->args.ss)
249 return task->args.ss->wsi;
250 #endif
251 return task->args.wsi;
252 }
253
254 static void
lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task * task)255 lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task)
256 {
257 if (task->args.cleanup)
258 task->args.cleanup(task_to_wsi(task), task->args.user);
259
260 lws_dll2_remove(&task->list);
261
262 lwsl_thread("%s: tp %p: cleaned finished task for %s\n",
263 __func__, task->tp, lws_wsi_tag(task_to_wsi(task)));
264
265 lws_free(task);
266 }
267
268 static void
__lws_threadpool_reap(struct lws_threadpool_task * task)269 __lws_threadpool_reap(struct lws_threadpool_task *task)
270 {
271 struct lws_threadpool_task **c, *t = NULL;
272 struct lws_threadpool *tp = task->tp;
273
274 /* remove the task from the done queue */
275
276 if (tp) {
277 c = &tp->task_done_head;
278
279 while (*c) {
280 if ((*c) == task) {
281 t = *c;
282 *c = t->task_queue_next;
283 t->task_queue_next = NULL;
284 tp->done_queue_depth--;
285
286 lwsl_thread("%s: tp %s: reaped task %s\n", __func__,
287 tp->name, lws_wsi_tag(task_to_wsi(task)));
288
289 break;
290 }
291 c = &(*c)->task_queue_next;
292 }
293
294 if (!t) {
295 lwsl_err("%s: task %p not in done queue\n", __func__, task);
296 /*
297 * This shouldn't occur, but in this case not really
298 * safe to assume there's a task to destroy
299 */
300 return;
301 }
302 } else
303 lwsl_err("%s: task->tp NULL already\n", __func__);
304
305 /* call the task's cleanup and delete the task itself */
306
307 lws_threadpool_task_cleanup_destroy(task);
308 }
309
310 /*
311 * this gets called from each tsi service context after the service was
312 * cancelled... we need to ask for the writable callback from the matching
313 * tsi context for any wsis bound to a worked thread that need it
314 */
315
316 int
lws_threadpool_tsi_context(struct lws_context * context,int tsi)317 lws_threadpool_tsi_context(struct lws_context *context, int tsi)
318 {
319 struct lws_threadpool_task **c, *task = NULL;
320 struct lws_threadpool *tp;
321 struct lws *wsi;
322
323 lws_context_lock(context, __func__);
324
325 tp = context->tp_list_head;
326 while (tp) {
327 int n;
328
329 /* for the running (syncing...) tasks... */
330
331 for (n = 0; n < tp->threads_in_pool; n++) {
332 struct lws_pool *pool = &tp->pool_list[n];
333
334 task = pool->task;
335 if (!task)
336 continue;
337
338 wsi = task_to_wsi(task);
339 if (!wsi || wsi->tsi != tsi ||
340 (!task->wanted_writeable_cb &&
341 task->status != LWS_TP_STATUS_SYNCING))
342 continue;
343
344 task->wanted_writeable_cb = 0;
345 lws_memory_barrier();
346
347 /*
348 * finally... we can ask for the callback on
349 * writable from the correct service thread
350 * context
351 */
352
353 lws_callback_on_writable(wsi);
354 }
355
356 /* for the done tasks... */
357
358 c = &tp->task_done_head;
359
360 while (*c) {
361 task = *c;
362 wsi = task_to_wsi(task);
363
364 if (wsi && wsi->tsi == tsi &&
365 (task->wanted_writeable_cb ||
366 task->status == LWS_TP_STATUS_SYNCING)) {
367
368 task->wanted_writeable_cb = 0;
369 lws_memory_barrier();
370
371 /*
372 * finally... we can ask for the callback on
373 * writable from the correct service thread
374 * context
375 */
376
377 lws_callback_on_writable(wsi);
378 }
379
380 c = &task->task_queue_next;
381 }
382
383 tp = tp->tp_list;
384 }
385
386 lws_context_unlock(context);
387
388 return 0;
389 }
390
391 static int
lws_threadpool_worker_sync(struct lws_pool * pool,struct lws_threadpool_task * task)392 lws_threadpool_worker_sync(struct lws_pool *pool,
393 struct lws_threadpool_task *task)
394 {
395 enum lws_threadpool_task_status temp;
396 struct timespec abstime;
397 struct lws *wsi;
398 int tries = 15;
399
400 /* block until writable acknowledges */
401 lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC in\n", __func__, task);
402 pthread_mutex_lock(&pool->lock); /* ======================= pool lock */
403
404 lwsl_info("%s: %s: task %p (%s): syncing with %s\n", __func__,
405 pool->tp->name, task, task->name, lws_wsi_tag(task_to_wsi(task)));
406
407 temp = task->status;
408 state_transition(task, LWS_TP_STATUS_SYNCING);
409 while (tries--) {
410 wsi = task_to_wsi(task);
411
412 /*
413 * if the wsi is no longer attached to this task, there is
414 * nothing we can sync to usefully. Since the work wants to
415 * sync, it means we should react to the situation by telling
416 * the task it can't continue usefully by stopping it.
417 */
418
419 if (!wsi) {
420 lwsl_thread("%s: %s: task %p (%s): No longer bound to any "
421 "wsi to sync to\n", __func__, pool->tp->name,
422 task, task->name);
423
424 state_transition(task, LWS_TP_STATUS_STOPPING);
425 goto done;
426 }
427
428 /*
429 * So "tries" times this is the maximum time between SYNC asking
430 * for a callback on writable and actually getting it we are
431 * willing to sit still for.
432 *
433 * If it is exceeded, we will stop the task.
434 */
435 abstime.tv_sec = time(NULL) + 3;
436 abstime.tv_nsec = 0;
437
438 task->wanted_writeable_cb = 1;
439 lws_memory_barrier();
440
441 /*
442 * This will cause lws_threadpool_tsi_context() to get called
443 * from each tsi service context, where we can safely ask for
444 * a callback on writeable on the wsi we are associated with.
445 */
446 lws_cancel_service(lws_get_context(wsi));
447
448 /*
449 * so the danger here is that we asked for a writable callback
450 * on the wsi, but for whatever reason, we are never going to
451 * get one. To avoid deadlocking forever, we allow a set time
452 * for the sync to happen naturally, otherwise the cond wait
453 * times out and we stop the task.
454 */
455
456 if (pthread_cond_timedwait(&task->wake_idle, &pool->lock,
457 &abstime) == ETIMEDOUT) {
458 task->late_sync_retries++;
459 if (!tries) {
460 lwsl_err("%s: %s: task %p (%s): SYNC timed out "
461 "(associated %s)\n",
462 __func__, pool->tp->name, task,
463 task->name, lws_wsi_tag(task_to_wsi(task)));
464
465 pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */
466 lws_threadpool_dequeue_task(task);
467 return 1; /* destroyed task */
468 }
469
470 continue;
471 } else
472 break;
473 }
474
475 if (task->status == LWS_TP_STATUS_SYNCING)
476 state_transition(task, temp);
477
478 lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC out\n", __func__, task);
479
480 done:
481 pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */
482
483 return 0;
484 }
485
486 #if !defined(WIN32)
487 static int dummy;
488 #endif
489
490 static void *
lws_threadpool_worker(void * d)491 lws_threadpool_worker(void *d)
492 {
493 struct lws_threadpool_task **c, **c2, *task;
494 struct lws_pool *pool = d;
495 struct lws_threadpool *tp = pool->tp;
496 char buf[160];
497
498 while (!tp->destroying) {
499
500 /* we have no running task... wait and get one from the queue */
501
502 pthread_mutex_lock(&tp->lock); /* =================== tp lock */
503
504 /*
505 * if there's no task already waiting in the queue, wait for
506 * the wake_idle condition to signal us that might have changed
507 */
508 while (!tp->task_queue_head && !tp->destroying)
509 pthread_cond_wait(&tp->wake_idle, &tp->lock);
510
511 if (tp->destroying) {
512 lwsl_notice("%s: bailing\n", __func__);
513 goto doneski;
514 }
515
516 c = &tp->task_queue_head;
517 c2 = NULL;
518 task = NULL;
519 pool->task = NULL;
520
521 /* look at the queue tail */
522 while (*c) {
523 c2 = c;
524 c = &(*c)->task_queue_next;
525 }
526
527 /* is there a task at the queue tail? */
528 if (c2 && *c2) {
529 pool->task = task = *c2;
530 task->acquired = pool->acquired = lws_now_usecs();
531 /* remove it from the queue */
532 *c2 = task->task_queue_next;
533 task->task_queue_next = NULL;
534 tp->queue_depth--;
535 /* mark it as running */
536 state_transition(task, LWS_TP_STATUS_RUNNING);
537 }
538
539 /* someone else got it first... wait and try again */
540 if (!task) {
541 pthread_mutex_unlock(&tp->lock); /* ------ tp unlock */
542 continue;
543 }
544
545 task->wanted_writeable_cb = 0;
546
547 /* we have acquired a new task */
548
549 __lws_threadpool_task_dump(task, buf, sizeof(buf));
550
551 lwsl_thread("%s: %s: worker %d ACQUIRING: %s\n",
552 __func__, tp->name, pool->worker_index, buf);
553 tp->running_tasks++;
554
555 pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
556
557 /*
558 * 1) The task can return with LWS_TP_RETURN_CHECKING_IN to
559 * "resurface" periodically, and get called again with
560 * cont = 1 immediately to indicate it is picking up where it
561 * left off if the task is not being "stopped".
562 *
563 * This allows long tasks to respond to requests to stop in
564 * a clean and opaque way.
565 *
566 * 2) The task can return with LWS_TP_RETURN_SYNC to register
567 * a "callback on writable" request on the service thread and
568 * block until it hears back from the WRITABLE handler.
569 *
570 * This allows the work on the thread to be synchronized to the
571 * previous work being dispatched cleanly.
572 *
573 * 3) The task can return with LWS_TP_RETURN_FINISHED to
574 * indicate its work is completed nicely.
575 *
576 * 4) The task can return with LWS_TP_RETURN_STOPPED to indicate
577 * it stopped and cleaned up after incomplete work.
578 */
579
580 do {
581 lws_usec_t then;
582 int n;
583
584 if (tp->destroying || !task_to_wsi(task)) {
585 lwsl_info("%s: stopping on wsi gone\n", __func__);
586 state_transition(task, LWS_TP_STATUS_STOPPING);
587 }
588
589 then = lws_now_usecs();
590 n = (int)task->args.task(task->args.user, task->status);
591 lwsl_debug(" %d, status %d\n", n, task->status);
592 us_accrue(&task->acc_running, then);
593 if (n & LWS_TP_RETURN_FLAG_OUTLIVE)
594 task->outlive = 1;
595 switch (n & 7) {
596 case LWS_TP_RETURN_CHECKING_IN:
597 /* if not destroying the tp, continue */
598 break;
599 case LWS_TP_RETURN_SYNC:
600 if (!task_to_wsi(task)) {
601 lwsl_debug("%s: task that wants to "
602 "outlive lost wsi asked "
603 "to sync: bypassed\n",
604 __func__);
605 break;
606 }
607 /* block until writable acknowledges */
608 then = lws_now_usecs();
609 if (lws_threadpool_worker_sync(pool, task)) {
610 lwsl_notice("%s: Sync failed\n", __func__);
611 goto doneski;
612 }
613 us_accrue(&task->acc_syncing, then);
614 break;
615 case LWS_TP_RETURN_FINISHED:
616 state_transition(task, LWS_TP_STATUS_FINISHED);
617 break;
618 case LWS_TP_RETURN_STOPPED:
619 state_transition(task, LWS_TP_STATUS_STOPPED);
620 break;
621 }
622 } while (task->status == LWS_TP_STATUS_RUNNING);
623
624 pthread_mutex_lock(&tp->lock); /* =================== tp lock */
625
626 tp->running_tasks--;
627
628 if (pool->task->status == LWS_TP_STATUS_STOPPING)
629 state_transition(task, LWS_TP_STATUS_STOPPED);
630
631 /* move the task to the done queue */
632
633 pool->task->task_queue_next = tp->task_done_head;
634 tp->task_done_head = task;
635 tp->done_queue_depth++;
636 pool->task->done = lws_now_usecs();
637
638 if (!pool->task->args.wsi &&
639 (pool->task->status == LWS_TP_STATUS_STOPPED ||
640 pool->task->status == LWS_TP_STATUS_FINISHED)) {
641
642 __lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
643 lwsl_thread("%s: %s: worker %d REAPING: %s\n",
644 __func__, tp->name, pool->worker_index,
645 buf);
646
647 /*
648 * there is no longer any wsi attached, so nothing is
649 * going to take care of reaping us. So we must take
650 * care of it ourselves.
651 */
652 __lws_threadpool_reap(pool->task);
653 } else {
654
655 __lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
656 lwsl_thread("%s: %s: worker %d DONE: %s\n",
657 __func__, tp->name, pool->worker_index,
658 buf);
659
660 /* signal the associated wsi to take a fresh look at
661 * task status */
662
663 if (task_to_wsi(pool->task)) {
664 task->wanted_writeable_cb = 1;
665
666 lws_cancel_service(
667 lws_get_context(task_to_wsi(pool->task)));
668 }
669 }
670
671 doneski:
672 pool->task = NULL;
673 pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
674 }
675
676 lwsl_notice("%s: Exiting\n", __func__);
677
678 /* threadpool is being destroyed */
679 #if !defined(WIN32)
680 pthread_exit(&dummy);
681 #endif
682
683 return NULL;
684 }
685
686 struct lws_threadpool *
lws_threadpool_create(struct lws_context * context,const struct lws_threadpool_create_args * args,const char * format,...)687 lws_threadpool_create(struct lws_context *context,
688 const struct lws_threadpool_create_args *args,
689 const char *format, ...)
690 {
691 struct lws_threadpool *tp;
692 va_list ap;
693 int n;
694
695 tp = lws_malloc(sizeof(*tp) + (sizeof(struct lws_pool) * (unsigned int)args->threads),
696 "threadpool alloc");
697 if (!tp)
698 return NULL;
699
700 memset(tp, 0, sizeof(*tp) + (sizeof(struct lws_pool) * (unsigned int)args->threads));
701 tp->pool_list = (struct lws_pool *)(tp + 1);
702 tp->max_queue_depth = args->max_queue_depth;
703
704 va_start(ap, format);
705 n = vsnprintf(tp->name, sizeof(tp->name) - 1, format, ap);
706 va_end(ap);
707
708 lws_context_lock(context, __func__);
709
710 tp->context = context;
711 tp->tp_list = context->tp_list_head;
712 context->tp_list_head = tp;
713
714 lws_context_unlock(context);
715
716 pthread_mutex_init(&tp->lock, NULL);
717 pthread_cond_init(&tp->wake_idle, NULL);
718
719 for (n = 0; n < args->threads; n++) {
720 #if defined(LWS_HAS_PTHREAD_SETNAME_NP)
721 char name[16];
722 #endif
723 tp->pool_list[n].tp = tp;
724 tp->pool_list[n].worker_index = n;
725 pthread_mutex_init(&tp->pool_list[n].lock, NULL);
726 if (pthread_create(&tp->pool_list[n].thread, NULL,
727 lws_threadpool_worker, &tp->pool_list[n])) {
728 lwsl_err("thread creation failed\n");
729 } else {
730 #if defined(LWS_HAS_PTHREAD_SETNAME_NP)
731 lws_snprintf(name, sizeof(name), "%s-%d", tp->name, n);
732 pthread_setname_np(tp->pool_list[n].thread, name);
733 #endif
734 tp->threads_in_pool++;
735 }
736 }
737
738 return tp;
739 }
740
741 void
lws_threadpool_finish(struct lws_threadpool * tp)742 lws_threadpool_finish(struct lws_threadpool *tp)
743 {
744 struct lws_threadpool_task **c, *task;
745
746 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
747
748 /* nothing new can start, running jobs will abort as STOPPED and the
749 * pool threads will exit ASAP (they are joined in destroy) */
750 tp->destroying = 1;
751
752 /* stop everyone in the pending queue and move to the done queue */
753
754 c = &tp->task_queue_head;
755 while (*c) {
756 task = *c;
757 *c = task->task_queue_next;
758 task->task_queue_next = tp->task_done_head;
759 tp->task_done_head = task;
760 state_transition(task, LWS_TP_STATUS_STOPPED);
761 tp->queue_depth--;
762 tp->done_queue_depth++;
763 task->done = lws_now_usecs();
764
765 c = &task->task_queue_next;
766 }
767
768 pthread_cond_broadcast(&tp->wake_idle);
769 pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
770 }
771
772 void
lws_threadpool_destroy(struct lws_threadpool * tp)773 lws_threadpool_destroy(struct lws_threadpool *tp)
774 {
775 struct lws_threadpool_task *task, *next;
776 struct lws_threadpool **ptp;
777 void *retval;
778 int n;
779
780 /* remove us from the context list of threadpools */
781
782 lws_context_lock(tp->context, __func__);
783 ptp = &tp->context->tp_list_head;
784
785 while (*ptp) {
786 if (*ptp == tp) {
787 *ptp = tp->tp_list;
788 break;
789 }
790 ptp = &(*ptp)->tp_list;
791 }
792
793 lws_context_unlock(tp->context);
794
795 /*
796 * Wake up the threadpool guys and tell them to exit
797 */
798
799 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
800 tp->destroying = 1;
801 pthread_cond_broadcast(&tp->wake_idle);
802 pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
803
804 lws_threadpool_dump(tp);
805
806 lwsl_info("%s: waiting for threads to rejoin\n", __func__);
807 #if defined(WIN32)
808 Sleep(1000);
809 #endif
810
811 for (n = 0; n < tp->threads_in_pool; n++) {
812 task = tp->pool_list[n].task;
813
814 pthread_join(tp->pool_list[n].thread, &retval);
815 pthread_mutex_destroy(&tp->pool_list[n].lock);
816 }
817 lwsl_info("%s: all threadpools exited\n", __func__);
818 #if defined(WIN32)
819 Sleep(1000);
820 #endif
821
822 task = tp->task_done_head;
823 while (task) {
824 next = task->task_queue_next;
825 lws_threadpool_task_cleanup_destroy(task);
826 tp->done_queue_depth--;
827 task = next;
828 }
829
830 pthread_mutex_destroy(&tp->lock);
831
832 memset(tp, 0xdd, sizeof(*tp));
833 lws_free(tp);
834 }
835
836 /*
837 * We want to stop and destroy the tasks and related priv.
838 */
839
840 int
lws_threadpool_dequeue_task(struct lws_threadpool_task * task)841 lws_threadpool_dequeue_task(struct lws_threadpool_task *task)
842 {
843 struct lws_threadpool *tp;
844 struct lws_threadpool_task **c;
845 int n;
846
847 tp = task->tp;
848 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
849
850 if (task->outlive && !tp->destroying) {
851
852 /* disconnect from wsi, and wsi from task */
853
854 lws_dll2_remove(&task->list);
855 task->args.wsi = NULL;
856 #if defined(LWS_WITH_SECURE_STREAMS)
857 task->args.ss = NULL;
858 #endif
859
860 goto bail;
861 }
862
863
864 c = &tp->task_queue_head;
865
866 /* is he queued waiting for a chance to run? Mark him as stopped and
867 * move him on to the done queue */
868
869 while (*c) {
870 if ((*c) == task) {
871 *c = task->task_queue_next;
872 task->task_queue_next = tp->task_done_head;
873 tp->task_done_head = task;
874 state_transition(task, LWS_TP_STATUS_STOPPED);
875 tp->queue_depth--;
876 tp->done_queue_depth++;
877 task->done = lws_now_usecs();
878
879 lwsl_debug("%s: tp %p: removed queued task %s\n",
880 __func__, tp, lws_wsi_tag(task_to_wsi(task)));
881
882 break;
883 }
884 c = &(*c)->task_queue_next;
885 }
886
887 /* is he on the done queue? */
888
889 c = &tp->task_done_head;
890 while (*c) {
891 if ((*c) == task) {
892 *c = task->task_queue_next;
893 task->task_queue_next = NULL;
894 lws_threadpool_task_cleanup_destroy(task);
895 tp->done_queue_depth--;
896 goto bail;
897 }
898 c = &(*c)->task_queue_next;
899 }
900
901 /* he's not in the queue... is he already running on a thread? */
902
903 for (n = 0; n < tp->threads_in_pool; n++) {
904 if (!tp->pool_list[n].task || tp->pool_list[n].task != task)
905 continue;
906
907 /*
908 * ensure we don't collide with tests or changes in the
909 * worker thread
910 */
911 pthread_mutex_lock(&tp->pool_list[n].lock);
912
913 /*
914 * mark him as having been requested to stop...
915 * the caller will hear about it in his service thread
916 * context as a request to close
917 */
918 state_transition(task, LWS_TP_STATUS_STOPPING);
919
920 /* disconnect from wsi, and wsi from task */
921
922 lws_dll2_remove(&task->list);
923 task->args.wsi = NULL;
924 #if defined(LWS_WITH_SECURE_STREAMS)
925 task->args.ss = NULL;
926 #endif
927
928 pthread_mutex_unlock(&tp->pool_list[n].lock);
929
930 lwsl_debug("%s: tp %p: request stop running task "
931 "for %s\n", __func__, tp,
932 lws_wsi_tag(task_to_wsi(task)));
933
934 break;
935 }
936
937 if (n == tp->threads_in_pool) {
938 /* can't find it */
939 lwsl_notice("%s: tp %p: no task for %s, decoupling\n",
940 __func__, tp, lws_wsi_tag(task_to_wsi(task)));
941 lws_dll2_remove(&task->list);
942 task->args.wsi = NULL;
943 #if defined(LWS_WITH_SECURE_STREAMS)
944 task->args.ss = NULL;
945 #endif
946 }
947
948 bail:
949 pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
950
951 return 0;
952 }
953
954 int
lws_threadpool_dequeue(struct lws * wsi)955 lws_threadpool_dequeue(struct lws *wsi) /* deprecated */
956 {
957 struct lws_threadpool_task *task;
958
959 if (!wsi->tp_task_owner.count)
960 return 0;
961 assert(wsi->tp_task_owner.count != 1);
962
963 task = lws_container_of(wsi->tp_task_owner.head,
964 struct lws_threadpool_task, list);
965
966 return lws_threadpool_dequeue_task(task);
967 }
968
969 struct lws_threadpool_task *
lws_threadpool_enqueue(struct lws_threadpool * tp,const struct lws_threadpool_task_args * args,const char * format,...)970 lws_threadpool_enqueue(struct lws_threadpool *tp,
971 const struct lws_threadpool_task_args *args,
972 const char *format, ...)
973 {
974 struct lws_threadpool_task *task = NULL;
975 va_list ap;
976
977 if (tp->destroying)
978 return NULL;
979
980 #if defined(LWS_WITH_SECURE_STREAMS)
981 assert(args->ss || args->wsi);
982 #endif
983
984 pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
985
986 /*
987 * if there's room on the queue, the job always goes on the queue
988 * first, then any free thread may pick it up after the wake_idle
989 */
990
991 if (tp->queue_depth == tp->max_queue_depth) {
992 lwsl_notice("%s: queue reached limit %d\n", __func__,
993 tp->max_queue_depth);
994
995 goto bail;
996 }
997
998 /*
999 * create the task object
1000 */
1001
1002 task = lws_malloc(sizeof(*task), __func__);
1003 if (!task)
1004 goto bail;
1005
1006 memset(task, 0, sizeof(*task));
1007 pthread_cond_init(&task->wake_idle, NULL);
1008 task->args = *args;
1009 task->tp = tp;
1010 task->created = lws_now_usecs();
1011
1012 va_start(ap, format);
1013 vsnprintf(task->name, sizeof(task->name) - 1, format, ap);
1014 va_end(ap);
1015
1016 /*
1017 * add him on the tp task queue
1018 */
1019
1020 task->task_queue_next = tp->task_queue_head;
1021 state_transition(task, LWS_TP_STATUS_QUEUED);
1022 tp->task_queue_head = task;
1023 tp->queue_depth++;
1024
1025 /*
1026 * mark the wsi itself as depending on this tp (so wsi close for
1027 * whatever reason can clean up)
1028 */
1029
1030 #if defined(LWS_WITH_SECURE_STREAMS)
1031 if (args->ss)
1032 lws_dll2_add_tail(&task->list, &args->ss->wsi->tp_task_owner);
1033 else
1034 #endif
1035 lws_dll2_add_tail(&task->list, &args->wsi->tp_task_owner);
1036
1037 lwsl_thread("%s: tp %s: enqueued task %p (%s) for %s, depth %d\n",
1038 __func__, tp->name, task, task->name,
1039 lws_wsi_tag(task_to_wsi(task)), tp->queue_depth);
1040
1041 /* alert any idle thread there's something new on the task list */
1042
1043 lws_memory_barrier();
1044 pthread_cond_signal(&tp->wake_idle);
1045
1046 bail:
1047 pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
1048
1049 return task;
1050 }
1051
1052 /* this should be called from the service thread */
1053
1054 enum lws_threadpool_task_status
lws_threadpool_task_status(struct lws_threadpool_task * task,void ** user)1055 lws_threadpool_task_status(struct lws_threadpool_task *task, void **user)
1056 {
1057 enum lws_threadpool_task_status status;
1058 struct lws_threadpool *tp = task->tp;
1059
1060 if (!tp)
1061 return LWS_TP_STATUS_FINISHED;
1062
1063 *user = task->args.user;
1064 status = task->status;
1065
1066 if (status == LWS_TP_STATUS_FINISHED ||
1067 status == LWS_TP_STATUS_STOPPED) {
1068 char buf[160];
1069
1070 pthread_mutex_lock(&tp->lock); /* ================ tpool lock */
1071 __lws_threadpool_task_dump(task, buf, sizeof(buf));
1072 lwsl_thread("%s: %s: service thread REAPING: %s\n",
1073 __func__, tp->name, buf);
1074 __lws_threadpool_reap(task);
1075 lws_memory_barrier();
1076 pthread_mutex_unlock(&tp->lock); /* ------------ tpool unlock */
1077 }
1078
1079 return status;
1080 }
1081
1082 enum lws_threadpool_task_status
lws_threadpool_task_status_noreap(struct lws_threadpool_task * task)1083 lws_threadpool_task_status_noreap(struct lws_threadpool_task *task)
1084 {
1085 return task->status;
1086 }
1087
1088 enum lws_threadpool_task_status
lws_threadpool_task_status_wsi(struct lws * wsi,struct lws_threadpool_task ** _task,void ** user)1089 lws_threadpool_task_status_wsi(struct lws *wsi,
1090 struct lws_threadpool_task **_task, void **user)
1091 {
1092 struct lws_threadpool_task *task;
1093
1094 if (!wsi->tp_task_owner.count) {
1095 lwsl_notice("%s: wsi has no task, ~=FINISHED\n", __func__);
1096 return LWS_TP_STATUS_FINISHED;
1097 }
1098
1099 assert(wsi->tp_task_owner.count == 1); /* see deprecation docs in hdr */
1100
1101 task = lws_container_of(wsi->tp_task_owner.head,
1102 struct lws_threadpool_task, list);
1103
1104 *_task = task;
1105
1106 return lws_threadpool_task_status(task, user);
1107 }
1108
1109 void
lws_threadpool_task_sync(struct lws_threadpool_task * task,int stop)1110 lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop)
1111 {
1112 lwsl_debug("%s\n", __func__);
1113 if (!task)
1114 return;
1115
1116 if (stop)
1117 state_transition(task, LWS_TP_STATUS_STOPPING);
1118
1119 pthread_mutex_lock(&task->tp->lock);
1120 pthread_cond_signal(&task->wake_idle);
1121 pthread_mutex_unlock(&task->tp->lock);
1122 }
1123
1124 int
lws_threadpool_foreach_task_wsi(struct lws * wsi,void * user,int (* cb)(struct lws_threadpool_task * task,void * user))1125 lws_threadpool_foreach_task_wsi(struct lws *wsi, void *user,
1126 int (*cb)(struct lws_threadpool_task *task,
1127 void *user))
1128 {
1129 struct lws_threadpool_task *task1;
1130
1131 if (wsi->tp_task_owner.head == NULL)
1132 return 0;
1133
1134 task1 = lws_container_of(wsi->tp_task_owner.head,
1135 struct lws_threadpool_task, list);
1136
1137 pthread_mutex_lock(&task1->tp->lock); /* ================ tpool lock */
1138
1139 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
1140 wsi->tp_task_owner.head) {
1141 struct lws_threadpool_task *task = lws_container_of(d,
1142 struct lws_threadpool_task, list);
1143
1144 if (cb(task, user)) {
1145 pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */
1146 return 1;
1147 }
1148
1149 } lws_end_foreach_dll_safe(d, d1);
1150
1151 pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */
1152
1153 return 0;
1154 }
1155
1156 #if defined(LWS_WITH_SECURE_STREAMS)
1157 int
lws_threadpool_foreach_task_ss(struct lws_ss_handle * ss,void * user,int (* cb)(struct lws_threadpool_task * task,void * user))1158 lws_threadpool_foreach_task_ss(struct lws_ss_handle *ss, void *user,
1159 int (*cb)(struct lws_threadpool_task *task,
1160 void *user))
1161 {
1162 if (!ss->wsi)
1163 return 0;
1164
1165 return lws_threadpool_foreach_task_wsi(ss->wsi, user, cb);
1166 }
1167 #endif
1168
1169 static int
disassociate_wsi(struct lws_threadpool_task * task,void * user)1170 disassociate_wsi(struct lws_threadpool_task *task,
1171 void *user)
1172 {
1173 task->args.wsi = NULL;
1174 lws_dll2_remove(&task->list);
1175
1176 return 0;
1177 }
1178
1179 void
lws_threadpool_wsi_closing(struct lws * wsi)1180 lws_threadpool_wsi_closing(struct lws *wsi)
1181 {
1182 lws_threadpool_foreach_task_wsi(wsi, NULL, disassociate_wsi);
1183 }
1184
1185 struct lws_threadpool_task *
lws_threadpool_get_task_wsi(struct lws * wsi)1186 lws_threadpool_get_task_wsi(struct lws *wsi)
1187 {
1188 if (wsi->tp_task_owner.head == NULL)
1189 return NULL;
1190
1191 return lws_container_of(wsi->tp_task_owner.head,
1192 struct lws_threadpool_task, list);
1193 }
1194
1195 #if defined(LWS_WITH_SECURE_STREAMS)
1196 struct lws_threadpool_task *
lws_threadpool_get_task_ss(struct lws_ss_handle * ss)1197 lws_threadpool_get_task_ss(struct lws_ss_handle *ss)
1198 {
1199 return lws_threadpool_get_task_wsi(ss->wsi);
1200 }
1201 #endif
1202