• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libwebsockets - small server side websockets and web server implementation
3  *
4  * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  */
24 
25 #if !defined(_GNU_SOURCE)
26 #define _GNU_SOURCE
27 #endif
28 
29 #if defined(WIN32)
30 #define HAVE_STRUCT_TIMESPEC
31 #if defined(pid_t)
32 #undef pid_t
33 #endif
34 #endif
35 #include <pthread.h>
36 
37 #include "private-lib-core.h"
38 
39 #include <string.h>
40 #include <stdio.h>
41 
42 struct lws_threadpool;
43 
44 struct lws_threadpool_task {
45 	struct lws_threadpool_task	*task_queue_next;
46 
47 	struct lws_threadpool		*tp;
48 	char				name[32];
49 	struct lws_threadpool_task_args args;
50 
51 	lws_dll2_t			list;
52 
53 	lws_usec_t			created;
54 	lws_usec_t			acquired;
55 	lws_usec_t			done;
56 	lws_usec_t			entered_state;
57 
58 	lws_usec_t			acc_running;
59 	lws_usec_t			acc_syncing;
60 
61 	pthread_cond_t			wake_idle;
62 
63 	enum lws_threadpool_task_status status;
64 
65 	int				late_sync_retries;
66 
67 	char				wanted_writeable_cb;
68 	char				outlive;
69 };
70 
71 struct lws_pool {
72 	struct lws_threadpool		*tp;
73 	pthread_t			thread;
74 	pthread_mutex_t			lock; /* part of task wake_idle */
75 	struct lws_threadpool_task	*task;
76 	lws_usec_t			acquired;
77 	int				worker_index;
78 };
79 
80 struct lws_threadpool {
81 	pthread_mutex_t			lock; /* protects all pool lists */
82 	pthread_cond_t			wake_idle;
83 	struct lws_pool			*pool_list;
84 
85 	struct lws_context		*context;
86 	struct lws_threadpool		*tp_list; /* context list of threadpools */
87 
88 	struct lws_threadpool_task	*task_queue_head;
89 	struct lws_threadpool_task	*task_done_head;
90 
91 	char				name[32];
92 
93 	int				threads_in_pool;
94 	int				queue_depth;
95 	int				done_queue_depth;
96 	int				max_queue_depth;
97 	int				running_tasks;
98 
99 	unsigned int			destroying:1;
100 };
101 
102 static int
ms_delta(lws_usec_t now,lws_usec_t then)103 ms_delta(lws_usec_t now, lws_usec_t then)
104 {
105 	return (int)((now - then) / 1000);
106 }
107 
108 static void
us_accrue(lws_usec_t * acc,lws_usec_t then)109 us_accrue(lws_usec_t *acc, lws_usec_t then)
110 {
111 	lws_usec_t now = lws_now_usecs();
112 
113 	*acc += now - then;
114 }
115 
116 static int
pc_delta(lws_usec_t now,lws_usec_t then,lws_usec_t us)117 pc_delta(lws_usec_t now, lws_usec_t then, lws_usec_t us)
118 {
119 	lws_usec_t delta = (now - then) + 1;
120 
121 	return (int)((us * 100) / delta);
122 }
123 
124 static void
__lws_threadpool_task_dump(struct lws_threadpool_task * task,char * buf,int len)125 __lws_threadpool_task_dump(struct lws_threadpool_task *task, char *buf, int len)
126 {
127 	lws_usec_t now = lws_now_usecs();
128 	char *end = buf + len - 1;
129 	int syncms = 0, runms = 0;
130 
131 	if (!task->acquired) {
132 		buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
133 				    "task: %s, QUEUED queued: %dms",
134 				    task->name, ms_delta(now, task->created));
135 
136 		return;
137 	}
138 
139 	if (task->acc_running)
140 		runms = (int)task->acc_running;
141 
142 	if (task->acc_syncing)
143 		syncms = (int)task->acc_syncing;
144 
145 	if (!task->done) {
146 		buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
147 			"task: %s, ONGOING state %d (%dms) alive: %dms "
148 			"(queued %dms, acquired: %dms, "
149 			"run: %d%%, sync: %d%%)", task->name, task->status,
150 			ms_delta(now, task->entered_state),
151 			ms_delta(now, task->created),
152 			ms_delta(task->acquired, task->created),
153 			ms_delta(now, task->acquired),
154 			pc_delta(now, task->acquired, runms),
155 			pc_delta(now, task->acquired, syncms));
156 
157 		return;
158 	}
159 
160 	lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
161 		"task: %s, DONE state %d lived: %dms "
162 		"(queued %dms, on thread: %dms, "
163 		"ran: %d%%, synced: %d%%)", task->name, task->status,
164 		ms_delta(task->done, task->created),
165 		ms_delta(task->acquired, task->created),
166 		ms_delta(task->done, task->acquired),
167 		pc_delta(task->done, task->acquired, runms),
168 		pc_delta(task->done, task->acquired, syncms));
169 }
170 
171 void
lws_threadpool_dump(struct lws_threadpool * tp)172 lws_threadpool_dump(struct lws_threadpool *tp)
173 {
174 #if 0
175 	//defined(_DEBUG)
176 	struct lws_threadpool_task **c;
177 	char buf[160];
178 	int n, count;
179 
180 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
181 
182 	lwsl_thread("%s: tp: %s, Queued: %d, Run: %d, Done: %d\n", __func__,
183 		    tp->name, tp->queue_depth, tp->running_tasks,
184 		    tp->done_queue_depth);
185 
186 	count = 0;
187 	c = &tp->task_queue_head;
188 	while (*c) {
189 		struct lws_threadpool_task *task = *c;
190 		__lws_threadpool_task_dump(task, buf, sizeof(buf));
191 		lwsl_thread("  - %s\n", buf);
192 		count++;
193 
194 		c = &(*c)->task_queue_next;
195 	}
196 
197 	if (count != tp->queue_depth)
198 		lwsl_err("%s: tp says queue depth %d, but actually %d\n",
199 			 __func__, tp->queue_depth, count);
200 
201 	count = 0;
202 	for (n = 0; n < tp->threads_in_pool; n++) {
203 		struct lws_pool *pool = &tp->pool_list[n];
204 		struct lws_threadpool_task *task = pool->task;
205 
206 		if (task) {
207 			__lws_threadpool_task_dump(task, buf, sizeof(buf));
208 			lwsl_thread("  - worker %d: %s\n", n, buf);
209 			count++;
210 		}
211 	}
212 
213 	if (count != tp->running_tasks)
214 		lwsl_err("%s: tp says %d running_tasks, but actually %d\n",
215 			 __func__, tp->running_tasks, count);
216 
217 	count = 0;
218 	c = &tp->task_done_head;
219 	while (*c) {
220 		struct lws_threadpool_task *task = *c;
221 		__lws_threadpool_task_dump(task, buf, sizeof(buf));
222 		lwsl_thread("  - %s\n", buf);
223 		count++;
224 
225 		c = &(*c)->task_queue_next;
226 	}
227 
228 	if (count != tp->done_queue_depth)
229 		lwsl_err("%s: tp says done_queue_depth %d, but actually %d\n",
230 			 __func__, tp->done_queue_depth, count);
231 
232 	pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
233 #endif
234 }
235 
236 static void
state_transition(struct lws_threadpool_task * task,enum lws_threadpool_task_status status)237 state_transition(struct lws_threadpool_task *task,
238 		 enum lws_threadpool_task_status status)
239 {
240 	task->entered_state = lws_now_usecs();
241 	task->status = status;
242 }
243 
244 static struct lws *
task_to_wsi(struct lws_threadpool_task * task)245 task_to_wsi(struct lws_threadpool_task *task)
246 {
247 #if defined(LWS_WITH_SECURE_STREAMS)
248 	if (task->args.ss)
249 		return task->args.ss->wsi;
250 #endif
251 	return task->args.wsi;
252 }
253 
254 static void
lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task * task)255 lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task)
256 {
257 	if (task->args.cleanup)
258 		task->args.cleanup(task_to_wsi(task), task->args.user);
259 
260 	lws_dll2_remove(&task->list);
261 
262 	lwsl_thread("%s: tp %p: cleaned finished task for %s\n",
263 		    __func__, task->tp, lws_wsi_tag(task_to_wsi(task)));
264 
265 	lws_free(task);
266 }
267 
268 static void
__lws_threadpool_reap(struct lws_threadpool_task * task)269 __lws_threadpool_reap(struct lws_threadpool_task *task)
270 {
271 	struct lws_threadpool_task **c, *t = NULL;
272 	struct lws_threadpool *tp = task->tp;
273 
274 	/* remove the task from the done queue */
275 
276 	if (tp) {
277 		c = &tp->task_done_head;
278 
279 		while (*c) {
280 			if ((*c) == task) {
281 				t = *c;
282 				*c = t->task_queue_next;
283 				t->task_queue_next = NULL;
284 				tp->done_queue_depth--;
285 
286 				lwsl_thread("%s: tp %s: reaped task %s\n", __func__,
287 					   tp->name, lws_wsi_tag(task_to_wsi(task)));
288 
289 				break;
290 			}
291 			c = &(*c)->task_queue_next;
292 		}
293 
294 		if (!t) {
295 			lwsl_err("%s: task %p not in done queue\n", __func__, task);
296 			/*
297 			 * This shouldn't occur, but in this case not really
298 			 * safe to assume there's a task to destroy
299 			 */
300 			return;
301 		}
302 	} else
303 		lwsl_err("%s: task->tp NULL already\n", __func__);
304 
305 	/* call the task's cleanup and delete the task itself */
306 
307 	lws_threadpool_task_cleanup_destroy(task);
308 }
309 
310 /*
311  * this gets called from each tsi service context after the service was
312  * cancelled... we need to ask for the writable callback from the matching
313  * tsi context for any wsis bound to a worked thread that need it
314  */
315 
316 int
lws_threadpool_tsi_context(struct lws_context * context,int tsi)317 lws_threadpool_tsi_context(struct lws_context *context, int tsi)
318 {
319 	struct lws_threadpool_task **c, *task = NULL;
320 	struct lws_threadpool *tp;
321 	struct lws *wsi;
322 
323 	lws_context_lock(context, __func__);
324 
325 	tp = context->tp_list_head;
326 	while (tp) {
327 		int n;
328 
329 		/* for the running (syncing...) tasks... */
330 
331 		for (n = 0; n < tp->threads_in_pool; n++) {
332 			struct lws_pool *pool = &tp->pool_list[n];
333 
334 			task = pool->task;
335 			if (!task)
336 				continue;
337 
338 			wsi = task_to_wsi(task);
339 			if (!wsi || wsi->tsi != tsi ||
340 			    (!task->wanted_writeable_cb &&
341 			     task->status != LWS_TP_STATUS_SYNCING))
342 				continue;
343 
344 			task->wanted_writeable_cb = 0;
345 			lws_memory_barrier();
346 
347 			/*
348 			 * finally... we can ask for the callback on
349 			 * writable from the correct service thread
350 			 * context
351 			 */
352 
353 			lws_callback_on_writable(wsi);
354 		}
355 
356 		/* for the done tasks... */
357 
358 		c = &tp->task_done_head;
359 
360 		while (*c) {
361 			task = *c;
362 			wsi = task_to_wsi(task);
363 
364 			if (wsi && wsi->tsi == tsi &&
365 			    (task->wanted_writeable_cb ||
366 			     task->status == LWS_TP_STATUS_SYNCING)) {
367 
368 				task->wanted_writeable_cb = 0;
369 				lws_memory_barrier();
370 
371 				/*
372 				 * finally... we can ask for the callback on
373 				 * writable from the correct service thread
374 				 * context
375 				 */
376 
377 				lws_callback_on_writable(wsi);
378 			}
379 
380 			c = &task->task_queue_next;
381 		}
382 
383 		tp = tp->tp_list;
384 	}
385 
386 	lws_context_unlock(context);
387 
388 	return 0;
389 }
390 
391 static int
lws_threadpool_worker_sync(struct lws_pool * pool,struct lws_threadpool_task * task)392 lws_threadpool_worker_sync(struct lws_pool *pool,
393 			   struct lws_threadpool_task *task)
394 {
395 	enum lws_threadpool_task_status temp;
396 	struct timespec abstime;
397 	struct lws *wsi;
398 	int tries = 15;
399 
400 	/* block until writable acknowledges */
401 	lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC in\n", __func__, task);
402 	pthread_mutex_lock(&pool->lock); /* ======================= pool lock */
403 
404 	lwsl_info("%s: %s: task %p (%s): syncing with %s\n", __func__,
405 		    pool->tp->name, task, task->name, lws_wsi_tag(task_to_wsi(task)));
406 
407 	temp = task->status;
408 	state_transition(task, LWS_TP_STATUS_SYNCING);
409 	while (tries--) {
410 		wsi = task_to_wsi(task);
411 
412 		/*
413 		 * if the wsi is no longer attached to this task, there is
414 		 * nothing we can sync to usefully.  Since the work wants to
415 		 * sync, it means we should react to the situation by telling
416 		 * the task it can't continue usefully by stopping it.
417 		 */
418 
419 		if (!wsi) {
420 			lwsl_thread("%s: %s: task %p (%s): No longer bound to any "
421 				 "wsi to sync to\n", __func__, pool->tp->name,
422 				 task, task->name);
423 
424 			state_transition(task, LWS_TP_STATUS_STOPPING);
425 			goto done;
426 		}
427 
428 		/*
429 		 * So "tries" times this is the maximum time between SYNC asking
430 		 * for a callback on writable and actually getting it we are
431 		 * willing to sit still for.
432 		 *
433 		 * If it is exceeded, we will stop the task.
434 		 */
435 		abstime.tv_sec = time(NULL) + 3;
436 		abstime.tv_nsec = 0;
437 
438 		task->wanted_writeable_cb = 1;
439 		lws_memory_barrier();
440 
441 		/*
442 		 * This will cause lws_threadpool_tsi_context() to get called
443 		 * from each tsi service context, where we can safely ask for
444 		 * a callback on writeable on the wsi we are associated with.
445 		 */
446 		lws_cancel_service(lws_get_context(wsi));
447 
448 		/*
449 		 * so the danger here is that we asked for a writable callback
450 		 * on the wsi, but for whatever reason, we are never going to
451 		 * get one.  To avoid deadlocking forever, we allow a set time
452 		 * for the sync to happen naturally, otherwise the cond wait
453 		 * times out and we stop the task.
454 		 */
455 
456 		if (pthread_cond_timedwait(&task->wake_idle, &pool->lock,
457 					   &abstime) == ETIMEDOUT) {
458 			task->late_sync_retries++;
459 			if (!tries) {
460 				lwsl_err("%s: %s: task %p (%s): SYNC timed out "
461 					 "(associated %s)\n",
462 					 __func__, pool->tp->name, task,
463 					 task->name, lws_wsi_tag(task_to_wsi(task)));
464 
465 				pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */
466 				lws_threadpool_dequeue_task(task);
467 				return 1; /* destroyed task */
468 			}
469 
470 			continue;
471 		} else
472 			break;
473 	}
474 
475 	if (task->status == LWS_TP_STATUS_SYNCING)
476 		state_transition(task, temp);
477 
478 	lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC out\n", __func__, task);
479 
480 done:
481 	pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */
482 
483 	return 0;
484 }
485 
486 #if !defined(WIN32)
487 static int dummy;
488 #endif
489 
490 static void *
lws_threadpool_worker(void * d)491 lws_threadpool_worker(void *d)
492 {
493 	struct lws_threadpool_task **c, **c2, *task;
494 	struct lws_pool *pool = d;
495 	struct lws_threadpool *tp = pool->tp;
496 	char buf[160];
497 
498 	while (!tp->destroying) {
499 
500 		/* we have no running task... wait and get one from the queue */
501 
502 		pthread_mutex_lock(&tp->lock); /* =================== tp lock */
503 
504 		/*
505 		 * if there's no task already waiting in the queue, wait for
506 		 * the wake_idle condition to signal us that might have changed
507 		 */
508 		while (!tp->task_queue_head && !tp->destroying)
509 			pthread_cond_wait(&tp->wake_idle, &tp->lock);
510 
511 		if (tp->destroying) {
512 			lwsl_notice("%s: bailing\n", __func__);
513 			goto doneski;
514 		}
515 
516 		c = &tp->task_queue_head;
517 		c2 = NULL;
518 		task = NULL;
519 		pool->task = NULL;
520 
521 		/* look at the queue tail */
522 		while (*c) {
523 			c2 = c;
524 			c = &(*c)->task_queue_next;
525 		}
526 
527 		/* is there a task at the queue tail? */
528 		if (c2 && *c2) {
529 			pool->task = task = *c2;
530 			task->acquired = pool->acquired = lws_now_usecs();
531 			/* remove it from the queue */
532 			*c2 = task->task_queue_next;
533 			task->task_queue_next = NULL;
534 			tp->queue_depth--;
535 			/* mark it as running */
536 			state_transition(task, LWS_TP_STATUS_RUNNING);
537 		}
538 
539 		/* someone else got it first... wait and try again */
540 		if (!task) {
541 			pthread_mutex_unlock(&tp->lock);  /* ------ tp unlock */
542 			continue;
543 		}
544 
545 		task->wanted_writeable_cb = 0;
546 
547 		/* we have acquired a new task */
548 
549 		__lws_threadpool_task_dump(task, buf, sizeof(buf));
550 
551 		lwsl_thread("%s: %s: worker %d ACQUIRING: %s\n",
552 			    __func__, tp->name, pool->worker_index, buf);
553 		tp->running_tasks++;
554 
555 		pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
556 
557 		/*
558 		 * 1) The task can return with LWS_TP_RETURN_CHECKING_IN to
559 		 * "resurface" periodically, and get called again with
560 		 * cont = 1 immediately to indicate it is picking up where it
561 		 * left off if the task is not being "stopped".
562 		 *
563 		 * This allows long tasks to respond to requests to stop in
564 		 * a clean and opaque way.
565 		 *
566 		 * 2) The task can return with LWS_TP_RETURN_SYNC to register
567 		 * a "callback on writable" request on the service thread and
568 		 * block until it hears back from the WRITABLE handler.
569 		 *
570 		 * This allows the work on the thread to be synchronized to the
571 		 * previous work being dispatched cleanly.
572 		 *
573 		 * 3) The task can return with LWS_TP_RETURN_FINISHED to
574 		 * indicate its work is completed nicely.
575 		 *
576 		 * 4) The task can return with LWS_TP_RETURN_STOPPED to indicate
577 		 * it stopped and cleaned up after incomplete work.
578 		 */
579 
580 		do {
581 			lws_usec_t then;
582 			int n;
583 
584 			if (tp->destroying || !task_to_wsi(task)) {
585 				lwsl_info("%s: stopping on wsi gone\n", __func__);
586 				state_transition(task, LWS_TP_STATUS_STOPPING);
587 			}
588 
589 			then = lws_now_usecs();
590 			n = (int)task->args.task(task->args.user, task->status);
591 			lwsl_debug("   %d, status %d\n", n, task->status);
592 			us_accrue(&task->acc_running, then);
593 			if (n & LWS_TP_RETURN_FLAG_OUTLIVE)
594 				task->outlive = 1;
595 			switch (n & 7) {
596 			case LWS_TP_RETURN_CHECKING_IN:
597 				/* if not destroying the tp, continue */
598 				break;
599 			case LWS_TP_RETURN_SYNC:
600 				if (!task_to_wsi(task)) {
601 					lwsl_debug("%s: task that wants to "
602 						    "outlive lost wsi asked "
603 						    "to sync: bypassed\n",
604 						    __func__);
605 					break;
606 				}
607 				/* block until writable acknowledges */
608 				then = lws_now_usecs();
609 				if (lws_threadpool_worker_sync(pool, task)) {
610 					lwsl_notice("%s: Sync failed\n", __func__);
611 					goto doneski;
612 				}
613 				us_accrue(&task->acc_syncing, then);
614 				break;
615 			case LWS_TP_RETURN_FINISHED:
616 				state_transition(task, LWS_TP_STATUS_FINISHED);
617 				break;
618 			case LWS_TP_RETURN_STOPPED:
619 				state_transition(task, LWS_TP_STATUS_STOPPED);
620 				break;
621 			}
622 		} while (task->status == LWS_TP_STATUS_RUNNING);
623 
624 		pthread_mutex_lock(&tp->lock); /* =================== tp lock */
625 
626 		tp->running_tasks--;
627 
628 		if (pool->task->status == LWS_TP_STATUS_STOPPING)
629 			state_transition(task, LWS_TP_STATUS_STOPPED);
630 
631 		/* move the task to the done queue */
632 
633 		pool->task->task_queue_next = tp->task_done_head;
634 		tp->task_done_head = task;
635 		tp->done_queue_depth++;
636 		pool->task->done = lws_now_usecs();
637 
638 		if (!pool->task->args.wsi &&
639 		    (pool->task->status == LWS_TP_STATUS_STOPPED ||
640 		     pool->task->status == LWS_TP_STATUS_FINISHED)) {
641 
642 			__lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
643 			lwsl_thread("%s: %s: worker %d REAPING: %s\n",
644 				    __func__, tp->name, pool->worker_index,
645 				    buf);
646 
647 			/*
648 			 * there is no longer any wsi attached, so nothing is
649 			 * going to take care of reaping us.  So we must take
650 			 * care of it ourselves.
651 			 */
652 			__lws_threadpool_reap(pool->task);
653 		} else {
654 
655 			__lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
656 			lwsl_thread("%s: %s: worker %d DONE: %s\n",
657 				    __func__, tp->name, pool->worker_index,
658 				    buf);
659 
660 			/* signal the associated wsi to take a fresh look at
661 			 * task status */
662 
663 			if (task_to_wsi(pool->task)) {
664 				task->wanted_writeable_cb = 1;
665 
666 				lws_cancel_service(
667 					lws_get_context(task_to_wsi(pool->task)));
668 			}
669 		}
670 
671 doneski:
672 		pool->task = NULL;
673 		pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
674 	}
675 
676 	lwsl_notice("%s: Exiting\n", __func__);
677 
678 	/* threadpool is being destroyed */
679 #if !defined(WIN32)
680 	pthread_exit(&dummy);
681 #endif
682 
683 	return NULL;
684 }
685 
686 struct lws_threadpool *
lws_threadpool_create(struct lws_context * context,const struct lws_threadpool_create_args * args,const char * format,...)687 lws_threadpool_create(struct lws_context *context,
688 		      const struct lws_threadpool_create_args *args,
689 		      const char *format, ...)
690 {
691 	struct lws_threadpool *tp;
692 	va_list ap;
693 	int n;
694 
695 	tp = lws_malloc(sizeof(*tp) + (sizeof(struct lws_pool) * (unsigned int)args->threads),
696 			"threadpool alloc");
697 	if (!tp)
698 		return NULL;
699 
700 	memset(tp, 0, sizeof(*tp) + (sizeof(struct lws_pool) * (unsigned int)args->threads));
701 	tp->pool_list = (struct lws_pool *)(tp + 1);
702 	tp->max_queue_depth = args->max_queue_depth;
703 
704 	va_start(ap, format);
705 	n = vsnprintf(tp->name, sizeof(tp->name) - 1, format, ap);
706 	va_end(ap);
707 
708 	lws_context_lock(context, __func__);
709 
710 	tp->context = context;
711 	tp->tp_list = context->tp_list_head;
712 	context->tp_list_head = tp;
713 
714 	lws_context_unlock(context);
715 
716 	pthread_mutex_init(&tp->lock, NULL);
717 	pthread_cond_init(&tp->wake_idle, NULL);
718 
719 	for (n = 0; n < args->threads; n++) {
720 #if defined(LWS_HAS_PTHREAD_SETNAME_NP)
721 		char name[16];
722 #endif
723 		tp->pool_list[n].tp = tp;
724 		tp->pool_list[n].worker_index = n;
725 		pthread_mutex_init(&tp->pool_list[n].lock, NULL);
726 		if (pthread_create(&tp->pool_list[n].thread, NULL,
727 				   lws_threadpool_worker, &tp->pool_list[n])) {
728 			lwsl_err("thread creation failed\n");
729 		} else {
730 #if defined(LWS_HAS_PTHREAD_SETNAME_NP)
731 			lws_snprintf(name, sizeof(name), "%s-%d", tp->name, n);
732 			pthread_setname_np(tp->pool_list[n].thread, name);
733 #endif
734 			tp->threads_in_pool++;
735 		}
736 	}
737 
738 	return tp;
739 }
740 
741 void
lws_threadpool_finish(struct lws_threadpool * tp)742 lws_threadpool_finish(struct lws_threadpool *tp)
743 {
744 	struct lws_threadpool_task **c, *task;
745 
746 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
747 
748 	/* nothing new can start, running jobs will abort as STOPPED and the
749 	 * pool threads will exit ASAP (they are joined in destroy) */
750 	tp->destroying = 1;
751 
752 	/* stop everyone in the pending queue and move to the done queue */
753 
754 	c = &tp->task_queue_head;
755 	while (*c) {
756 		task = *c;
757 		*c = task->task_queue_next;
758 		task->task_queue_next = tp->task_done_head;
759 		tp->task_done_head = task;
760 		state_transition(task, LWS_TP_STATUS_STOPPED);
761 		tp->queue_depth--;
762 		tp->done_queue_depth++;
763 		task->done = lws_now_usecs();
764 
765 		c = &task->task_queue_next;
766 	}
767 
768 	pthread_cond_broadcast(&tp->wake_idle);
769 	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
770 }
771 
772 void
lws_threadpool_destroy(struct lws_threadpool * tp)773 lws_threadpool_destroy(struct lws_threadpool *tp)
774 {
775 	struct lws_threadpool_task *task, *next;
776 	struct lws_threadpool **ptp;
777 	void *retval;
778 	int n;
779 
780 	/* remove us from the context list of threadpools */
781 
782 	lws_context_lock(tp->context, __func__);
783 	ptp = &tp->context->tp_list_head;
784 
785 	while (*ptp) {
786 		if (*ptp == tp) {
787 			*ptp = tp->tp_list;
788 			break;
789 		}
790 		ptp = &(*ptp)->tp_list;
791 	}
792 
793 	lws_context_unlock(tp->context);
794 
795 	/*
796 	 * Wake up the threadpool guys and tell them to exit
797 	 */
798 
799 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
800 	tp->destroying = 1;
801 	pthread_cond_broadcast(&tp->wake_idle);
802 	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
803 
804 	lws_threadpool_dump(tp);
805 
806 	lwsl_info("%s: waiting for threads to rejoin\n", __func__);
807 #if defined(WIN32)
808 	Sleep(1000);
809 #endif
810 
811 	for (n = 0; n < tp->threads_in_pool; n++) {
812 		task = tp->pool_list[n].task;
813 
814 		pthread_join(tp->pool_list[n].thread, &retval);
815 		pthread_mutex_destroy(&tp->pool_list[n].lock);
816 	}
817 	lwsl_info("%s: all threadpools exited\n", __func__);
818 #if defined(WIN32)
819 	Sleep(1000);
820 #endif
821 
822 	task = tp->task_done_head;
823 	while (task) {
824 		next = task->task_queue_next;
825 		lws_threadpool_task_cleanup_destroy(task);
826 		tp->done_queue_depth--;
827 		task = next;
828 	}
829 
830 	pthread_mutex_destroy(&tp->lock);
831 
832 	memset(tp, 0xdd, sizeof(*tp));
833 	lws_free(tp);
834 }
835 
836 /*
837  * We want to stop and destroy the tasks and related priv.
838  */
839 
840 int
lws_threadpool_dequeue_task(struct lws_threadpool_task * task)841 lws_threadpool_dequeue_task(struct lws_threadpool_task *task)
842 {
843 	struct lws_threadpool *tp;
844 	struct lws_threadpool_task **c;
845 	int n;
846 
847 	tp = task->tp;
848 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
849 
850 	if (task->outlive && !tp->destroying) {
851 
852 		/* disconnect from wsi, and wsi from task */
853 
854 		lws_dll2_remove(&task->list);
855 		task->args.wsi = NULL;
856 #if defined(LWS_WITH_SECURE_STREAMS)
857 		task->args.ss = NULL;
858 #endif
859 
860 		goto bail;
861 	}
862 
863 
864 	c = &tp->task_queue_head;
865 
866 	/* is he queued waiting for a chance to run?  Mark him as stopped and
867 	 * move him on to the done queue */
868 
869 	while (*c) {
870 		if ((*c) == task) {
871 			*c = task->task_queue_next;
872 			task->task_queue_next = tp->task_done_head;
873 			tp->task_done_head = task;
874 			state_transition(task, LWS_TP_STATUS_STOPPED);
875 			tp->queue_depth--;
876 			tp->done_queue_depth++;
877 			task->done = lws_now_usecs();
878 
879 			lwsl_debug("%s: tp %p: removed queued task %s\n",
880 				    __func__, tp, lws_wsi_tag(task_to_wsi(task)));
881 
882 			break;
883 		}
884 		c = &(*c)->task_queue_next;
885 	}
886 
887 	/* is he on the done queue? */
888 
889 	c = &tp->task_done_head;
890 	while (*c) {
891 		if ((*c) == task) {
892 			*c = task->task_queue_next;
893 			task->task_queue_next = NULL;
894 			lws_threadpool_task_cleanup_destroy(task);
895 			tp->done_queue_depth--;
896 			goto bail;
897 		}
898 		c = &(*c)->task_queue_next;
899 	}
900 
901 	/* he's not in the queue... is he already running on a thread? */
902 
903 	for (n = 0; n < tp->threads_in_pool; n++) {
904 		if (!tp->pool_list[n].task || tp->pool_list[n].task != task)
905 			continue;
906 
907 		/*
908 		 * ensure we don't collide with tests or changes in the
909 		 * worker thread
910 		 */
911 		pthread_mutex_lock(&tp->pool_list[n].lock);
912 
913 		/*
914 		 * mark him as having been requested to stop...
915 		 * the caller will hear about it in his service thread
916 		 * context as a request to close
917 		 */
918 		state_transition(task, LWS_TP_STATUS_STOPPING);
919 
920 		/* disconnect from wsi, and wsi from task */
921 
922 		lws_dll2_remove(&task->list);
923 		task->args.wsi = NULL;
924 #if defined(LWS_WITH_SECURE_STREAMS)
925 		task->args.ss = NULL;
926 #endif
927 
928 		pthread_mutex_unlock(&tp->pool_list[n].lock);
929 
930 		lwsl_debug("%s: tp %p: request stop running task "
931 			    "for %s\n", __func__, tp,
932 			    lws_wsi_tag(task_to_wsi(task)));
933 
934 		break;
935 	}
936 
937 	if (n == tp->threads_in_pool) {
938 		/* can't find it */
939 		lwsl_notice("%s: tp %p: no task for %s, decoupling\n",
940 			    __func__, tp, lws_wsi_tag(task_to_wsi(task)));
941 		lws_dll2_remove(&task->list);
942 		task->args.wsi = NULL;
943 #if defined(LWS_WITH_SECURE_STREAMS)
944 		task->args.ss = NULL;
945 #endif
946 	}
947 
948 bail:
949 	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
950 
951 	return 0;
952 }
953 
954 int
lws_threadpool_dequeue(struct lws * wsi)955 lws_threadpool_dequeue(struct lws *wsi) /* deprecated */
956 {
957 	struct lws_threadpool_task *task;
958 
959 	if (!wsi->tp_task_owner.count)
960 		return 0;
961 	assert(wsi->tp_task_owner.count != 1);
962 
963 	task = lws_container_of(wsi->tp_task_owner.head,
964 				struct lws_threadpool_task, list);
965 
966 	return lws_threadpool_dequeue_task(task);
967 }
968 
969 struct lws_threadpool_task *
lws_threadpool_enqueue(struct lws_threadpool * tp,const struct lws_threadpool_task_args * args,const char * format,...)970 lws_threadpool_enqueue(struct lws_threadpool *tp,
971 		       const struct lws_threadpool_task_args *args,
972 		       const char *format, ...)
973 {
974 	struct lws_threadpool_task *task = NULL;
975 	va_list ap;
976 
977 	if (tp->destroying)
978 		return NULL;
979 
980 #if defined(LWS_WITH_SECURE_STREAMS)
981 	assert(args->ss || args->wsi);
982 #endif
983 
984 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
985 
986 	/*
987 	 * if there's room on the queue, the job always goes on the queue
988 	 * first, then any free thread may pick it up after the wake_idle
989 	 */
990 
991 	if (tp->queue_depth == tp->max_queue_depth) {
992 		lwsl_notice("%s: queue reached limit %d\n", __func__,
993 			    tp->max_queue_depth);
994 
995 		goto bail;
996 	}
997 
998 	/*
999 	 * create the task object
1000 	 */
1001 
1002 	task = lws_malloc(sizeof(*task), __func__);
1003 	if (!task)
1004 		goto bail;
1005 
1006 	memset(task, 0, sizeof(*task));
1007 	pthread_cond_init(&task->wake_idle, NULL);
1008 	task->args = *args;
1009 	task->tp = tp;
1010 	task->created = lws_now_usecs();
1011 
1012 	va_start(ap, format);
1013 	vsnprintf(task->name, sizeof(task->name) - 1, format, ap);
1014 	va_end(ap);
1015 
1016 	/*
1017 	 * add him on the tp task queue
1018 	 */
1019 
1020 	task->task_queue_next = tp->task_queue_head;
1021 	state_transition(task, LWS_TP_STATUS_QUEUED);
1022 	tp->task_queue_head = task;
1023 	tp->queue_depth++;
1024 
1025 	/*
1026 	 * mark the wsi itself as depending on this tp (so wsi close for
1027 	 * whatever reason can clean up)
1028 	 */
1029 
1030 #if defined(LWS_WITH_SECURE_STREAMS)
1031 	if (args->ss)
1032 		lws_dll2_add_tail(&task->list, &args->ss->wsi->tp_task_owner);
1033 	else
1034 #endif
1035 		lws_dll2_add_tail(&task->list, &args->wsi->tp_task_owner);
1036 
1037 	lwsl_thread("%s: tp %s: enqueued task %p (%s) for %s, depth %d\n",
1038 		    __func__, tp->name, task, task->name,
1039 		    lws_wsi_tag(task_to_wsi(task)), tp->queue_depth);
1040 
1041 	/* alert any idle thread there's something new on the task list */
1042 
1043 	lws_memory_barrier();
1044 	pthread_cond_signal(&tp->wake_idle);
1045 
1046 bail:
1047 	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
1048 
1049 	return task;
1050 }
1051 
1052 /* this should be called from the service thread */
1053 
1054 enum lws_threadpool_task_status
lws_threadpool_task_status(struct lws_threadpool_task * task,void ** user)1055 lws_threadpool_task_status(struct lws_threadpool_task *task, void **user)
1056 {
1057 	enum lws_threadpool_task_status status;
1058 	struct lws_threadpool *tp = task->tp;
1059 
1060 	if (!tp)
1061 		return LWS_TP_STATUS_FINISHED;
1062 
1063 	*user = task->args.user;
1064 	status = task->status;
1065 
1066 	if (status == LWS_TP_STATUS_FINISHED ||
1067 	    status == LWS_TP_STATUS_STOPPED) {
1068 		char buf[160];
1069 
1070 		pthread_mutex_lock(&tp->lock); /* ================ tpool lock */
1071 		__lws_threadpool_task_dump(task, buf, sizeof(buf));
1072 		lwsl_thread("%s: %s: service thread REAPING: %s\n",
1073 			    __func__, tp->name, buf);
1074 		__lws_threadpool_reap(task);
1075 		lws_memory_barrier();
1076 		pthread_mutex_unlock(&tp->lock); /* ------------ tpool unlock */
1077 	}
1078 
1079 	return status;
1080 }
1081 
1082 enum lws_threadpool_task_status
lws_threadpool_task_status_noreap(struct lws_threadpool_task * task)1083 lws_threadpool_task_status_noreap(struct lws_threadpool_task *task)
1084 {
1085 	return task->status;
1086 }
1087 
1088 enum lws_threadpool_task_status
lws_threadpool_task_status_wsi(struct lws * wsi,struct lws_threadpool_task ** _task,void ** user)1089 lws_threadpool_task_status_wsi(struct lws *wsi,
1090 			       struct lws_threadpool_task **_task, void **user)
1091 {
1092 	struct lws_threadpool_task *task;
1093 
1094 	if (!wsi->tp_task_owner.count) {
1095 		lwsl_notice("%s: wsi has no task, ~=FINISHED\n", __func__);
1096 		return LWS_TP_STATUS_FINISHED;
1097 	}
1098 
1099 	assert(wsi->tp_task_owner.count == 1); /* see deprecation docs in hdr */
1100 
1101 	task = lws_container_of(wsi->tp_task_owner.head,
1102 				struct lws_threadpool_task, list);
1103 
1104 	*_task = task;
1105 
1106 	return lws_threadpool_task_status(task, user);
1107 }
1108 
1109 void
lws_threadpool_task_sync(struct lws_threadpool_task * task,int stop)1110 lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop)
1111 {
1112 	lwsl_debug("%s\n", __func__);
1113 	if (!task)
1114 		return;
1115 
1116 	if (stop)
1117 		state_transition(task, LWS_TP_STATUS_STOPPING);
1118 
1119 	pthread_mutex_lock(&task->tp->lock);
1120 	pthread_cond_signal(&task->wake_idle);
1121 	pthread_mutex_unlock(&task->tp->lock);
1122 }
1123 
1124 int
lws_threadpool_foreach_task_wsi(struct lws * wsi,void * user,int (* cb)(struct lws_threadpool_task * task,void * user))1125 lws_threadpool_foreach_task_wsi(struct lws *wsi, void *user,
1126 				int (*cb)(struct lws_threadpool_task *task,
1127 					  void *user))
1128 {
1129 	struct lws_threadpool_task *task1;
1130 
1131 	if (wsi->tp_task_owner.head == NULL)
1132 		return 0;
1133 
1134 	task1 = lws_container_of(wsi->tp_task_owner.head,
1135 				 struct lws_threadpool_task, list);
1136 
1137 	pthread_mutex_lock(&task1->tp->lock); /* ================ tpool lock */
1138 
1139 	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
1140 				   wsi->tp_task_owner.head) {
1141 		struct lws_threadpool_task *task = lws_container_of(d,
1142 					struct lws_threadpool_task, list);
1143 
1144 		if (cb(task, user)) {
1145 			pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */
1146 			return 1;
1147 		}
1148 
1149 	} lws_end_foreach_dll_safe(d, d1);
1150 
1151 	pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */
1152 
1153 	return 0;
1154 }
1155 
1156 #if defined(LWS_WITH_SECURE_STREAMS)
1157 int
lws_threadpool_foreach_task_ss(struct lws_ss_handle * ss,void * user,int (* cb)(struct lws_threadpool_task * task,void * user))1158 lws_threadpool_foreach_task_ss(struct lws_ss_handle *ss, void *user,
1159 			       int (*cb)(struct lws_threadpool_task *task,
1160 					 void *user))
1161 {
1162 	if (!ss->wsi)
1163 		return 0;
1164 
1165 	return lws_threadpool_foreach_task_wsi(ss->wsi, user, cb);
1166 }
1167 #endif
1168 
1169 static int
disassociate_wsi(struct lws_threadpool_task * task,void * user)1170 disassociate_wsi(struct lws_threadpool_task *task,
1171 		  void *user)
1172 {
1173 	task->args.wsi = NULL;
1174 	lws_dll2_remove(&task->list);
1175 
1176 	return 0;
1177 }
1178 
1179 void
lws_threadpool_wsi_closing(struct lws * wsi)1180 lws_threadpool_wsi_closing(struct lws *wsi)
1181 {
1182 	lws_threadpool_foreach_task_wsi(wsi, NULL, disassociate_wsi);
1183 }
1184 
1185 struct lws_threadpool_task *
lws_threadpool_get_task_wsi(struct lws * wsi)1186 lws_threadpool_get_task_wsi(struct lws *wsi)
1187 {
1188 	if (wsi->tp_task_owner.head == NULL)
1189 		return NULL;
1190 
1191 	return lws_container_of(wsi->tp_task_owner.head,
1192 				 struct lws_threadpool_task, list);
1193 }
1194 
1195 #if defined(LWS_WITH_SECURE_STREAMS)
1196 struct lws_threadpool_task *
lws_threadpool_get_task_ss(struct lws_ss_handle * ss)1197 lws_threadpool_get_task_ss(struct lws_ss_handle *ss)
1198 {
1199 	return lws_threadpool_get_task_wsi(ss->wsi);
1200 }
1201 #endif
1202