• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libwebsockets - small server side websockets and web server implementation
3  *
4  * Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  */
24 
25 #if !defined(_GNU_SOURCE)
26 #define _GNU_SOURCE
27 #endif
28 
29 #include <pthread.h>
30 
31 #include "private-lib-core.h"
32 
33 #include <string.h>
34 #include <stdio.h>
35 
36 struct lws_threadpool;
37 
38 struct lws_threadpool_task {
39 	struct lws_threadpool_task *task_queue_next;
40 
41 	struct lws_threadpool *tp;
42 	char name[32];
43 	struct lws_threadpool_task_args args;
44 
45 	lws_usec_t created;
46 	lws_usec_t acquired;
47 	lws_usec_t done;
48 	lws_usec_t entered_state;
49 
50 	lws_usec_t acc_running;
51 	lws_usec_t acc_syncing;
52 
53 	pthread_cond_t wake_idle;
54 
55 	enum lws_threadpool_task_status status;
56 
57 	int late_sync_retries;
58 
59 	char wanted_writeable_cb;
60 	char outlive;
61 };
62 
63 struct lws_pool {
64 	struct lws_threadpool *tp;
65 	pthread_t thread;
66 	pthread_mutex_t lock; /* part of task wake_idle */
67 	struct lws_threadpool_task *task;
68 	lws_usec_t acquired;
69 	int worker_index;
70 };
71 
72 struct lws_threadpool {
73 	pthread_mutex_t lock; /* protects all pool lists */
74 	pthread_cond_t wake_idle;
75 	struct lws_pool *pool_list;
76 
77 	struct lws_context *context;
78 	struct lws_threadpool *tp_list; /* context list of threadpools */
79 
80 	struct lws_threadpool_task *task_queue_head;
81 	struct lws_threadpool_task *task_done_head;
82 
83 	char name[32];
84 
85 	int threads_in_pool;
86 	int queue_depth;
87 	int done_queue_depth;
88 	int max_queue_depth;
89 	int running_tasks;
90 
91 	unsigned int destroying:1;
92 };
93 
94 static int
ms_delta(lws_usec_t now,lws_usec_t then)95 ms_delta(lws_usec_t now, lws_usec_t then)
96 {
97 	return (int)((now - then) / 1000);
98 }
99 
100 static void
us_accrue(lws_usec_t * acc,lws_usec_t then)101 us_accrue(lws_usec_t *acc, lws_usec_t then)
102 {
103 	lws_usec_t now = lws_now_usecs();
104 
105 	*acc += now - then;
106 }
107 
108 static int
pc_delta(lws_usec_t now,lws_usec_t then,lws_usec_t us)109 pc_delta(lws_usec_t now, lws_usec_t then, lws_usec_t us)
110 {
111 	lws_usec_t delta = (now - then) + 1;
112 
113 	return (int)((us * 100) / delta);
114 }
115 
116 static void
__lws_threadpool_task_dump(struct lws_threadpool_task * task,char * buf,int len)117 __lws_threadpool_task_dump(struct lws_threadpool_task *task, char *buf, int len)
118 {
119 	lws_usec_t now = lws_now_usecs();
120 	char *end = buf + len - 1;
121 	int syncms = 0, runms = 0;
122 
123 	if (!task->acquired) {
124 		buf += lws_snprintf(buf, end - buf,
125 				    "task: %s, QUEUED queued: %dms",
126 				    task->name, ms_delta(now, task->created));
127 
128 		return;
129 	}
130 
131 	if (task->acc_running)
132 		runms = task->acc_running;
133 
134 	if (task->acc_syncing)
135 		syncms = task->acc_syncing;
136 
137 	if (!task->done) {
138 		buf += lws_snprintf(buf, end - buf,
139 			"task: %s, ONGOING state %d (%dms) alive: %dms "
140 			"(queued %dms, acquired: %dms, "
141 			"run: %d%%, sync: %d%%)", task->name, task->status,
142 			ms_delta(now, task->entered_state),
143 			ms_delta(now, task->created),
144 			ms_delta(task->acquired, task->created),
145 			ms_delta(now, task->acquired),
146 			pc_delta(now, task->acquired, runms),
147 			pc_delta(now, task->acquired, syncms));
148 
149 		return;
150 	}
151 
152 	lws_snprintf(buf, end - buf,
153 		"task: %s, DONE state %d lived: %dms "
154 		"(queued %dms, on thread: %dms, "
155 		"ran: %d%%, synced: %d%%)", task->name, task->status,
156 		ms_delta(task->done, task->created),
157 		ms_delta(task->acquired, task->created),
158 		ms_delta(task->done, task->acquired),
159 		pc_delta(task->done, task->acquired, runms),
160 		pc_delta(task->done, task->acquired, syncms));
161 }
162 
163 void
lws_threadpool_dump(struct lws_threadpool * tp)164 lws_threadpool_dump(struct lws_threadpool *tp)
165 {
166 #if defined(_DEBUG)
167 	struct lws_threadpool_task **c;
168 	char buf[160];
169 	int n, count;
170 
171 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
172 
173 	lwsl_thread("%s: tp: %s, Queued: %d, Run: %d, Done: %d\n", __func__,
174 		    tp->name, tp->queue_depth, tp->running_tasks,
175 		    tp->done_queue_depth);
176 
177 	count = 0;
178 	c = &tp->task_queue_head;
179 	while (*c) {
180 		struct lws_threadpool_task *task = *c;
181 		__lws_threadpool_task_dump(task, buf, sizeof(buf));
182 		lwsl_thread("  - %s\n", buf);
183 		count++;
184 
185 		c = &(*c)->task_queue_next;
186 	}
187 
188 	if (count != tp->queue_depth)
189 		lwsl_err("%s: tp says queue depth %d, but actually %d\n",
190 			 __func__, tp->queue_depth, count);
191 
192 	count = 0;
193 	for (n = 0; n < tp->threads_in_pool; n++) {
194 		struct lws_pool *pool = &tp->pool_list[n];
195 		struct lws_threadpool_task *task = pool->task;
196 
197 		if (task) {
198 			__lws_threadpool_task_dump(task, buf, sizeof(buf));
199 			lwsl_thread("  - worker %d: %s\n", n, buf);
200 			count++;
201 		}
202 	}
203 
204 	if (count != tp->running_tasks)
205 		lwsl_err("%s: tp says %d running_tasks, but actually %d\n",
206 			 __func__, tp->running_tasks, count);
207 
208 	count = 0;
209 	c = &tp->task_done_head;
210 	while (*c) {
211 		struct lws_threadpool_task *task = *c;
212 		__lws_threadpool_task_dump(task, buf, sizeof(buf));
213 		lwsl_thread("  - %s\n", buf);
214 		count++;
215 
216 		c = &(*c)->task_queue_next;
217 	}
218 
219 	if (count != tp->done_queue_depth)
220 		lwsl_err("%s: tp says done_queue_depth %d, but actually %d\n",
221 			 __func__, tp->done_queue_depth, count);
222 
223 	pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
224 #endif
225 }
226 
227 static void
state_transition(struct lws_threadpool_task * task,enum lws_threadpool_task_status status)228 state_transition(struct lws_threadpool_task *task,
229 		 enum lws_threadpool_task_status status)
230 {
231 	task->entered_state = lws_now_usecs();
232 	task->status = status;
233 }
234 
235 static void
lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task * task)236 lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task)
237 {
238 	if (task->args.cleanup)
239 		task->args.cleanup(task->args.wsi, task->args.user);
240 
241 	if (task->args.wsi)
242 		task->args.wsi->tp_task = NULL;
243 
244 	lwsl_thread("%s: tp %p: cleaned finished task for wsi %p\n",
245 		    __func__, task->tp, task->args.wsi);
246 
247 	lws_free(task);
248 }
249 
250 static void
__lws_threadpool_reap(struct lws_threadpool_task * task)251 __lws_threadpool_reap(struct lws_threadpool_task *task)
252 {
253 	struct lws_threadpool_task **c, *t = NULL;
254 	struct lws_threadpool *tp = task->tp;
255 
256 	/* remove the task from the done queue */
257 
258 	c = &tp->task_done_head;
259 
260 	while (*c) {
261 		if ((*c) == task) {
262 			t = *c;
263 			*c = t->task_queue_next;
264 			t->task_queue_next = NULL;
265 			tp->done_queue_depth--;
266 
267 			lwsl_thread("%s: tp %s: reaped task wsi %p\n", __func__,
268 				   tp->name, task->args.wsi);
269 
270 			break;
271 		}
272 		c = &(*c)->task_queue_next;
273 	}
274 
275 	if (!t)
276 		lwsl_err("%s: task %p not in done queue\n", __func__, task);
277 
278 	/* call the task's cleanup and delete the task itself */
279 
280 	lws_threadpool_task_cleanup_destroy(task);
281 }
282 
283 /*
284  * this gets called from each tsi service context after the service was
285  * cancelled... we need to ask for the writable callback from the matching
286  * tsi context for any wsis bound to a worked thread that need it
287  */
288 
289 int
lws_threadpool_tsi_context(struct lws_context * context,int tsi)290 lws_threadpool_tsi_context(struct lws_context *context, int tsi)
291 {
292 	struct lws_threadpool_task **c, *task = NULL;
293 	struct lws_threadpool *tp;
294 	struct lws *wsi;
295 
296 	lws_context_lock(context, __func__);
297 
298 	tp = context->tp_list_head;
299 	while (tp) {
300 		int n;
301 
302 		/* for the running (syncing...) tasks... */
303 
304 		for (n = 0; n < tp->threads_in_pool; n++) {
305 			struct lws_pool *pool = &tp->pool_list[n];
306 
307 			task = pool->task;
308 			if (!task)
309 				continue;
310 
311 			wsi = task->args.wsi;
312 			if (!wsi || wsi->tsi != tsi ||
313 			    !task->wanted_writeable_cb)
314 				continue;
315 
316 			task->wanted_writeable_cb = 0;
317 			lws_memory_barrier();
318 
319 			/*
320 			 * finally... we can ask for the callback on
321 			 * writable from the correct service thread
322 			 * context
323 			 */
324 
325 			lws_callback_on_writable(wsi);
326 		}
327 
328 		/* for the done tasks... */
329 
330 		c = &tp->task_done_head;
331 
332 		while (*c) {
333 			task = *c;
334 			wsi = task->args.wsi;
335 
336 			if (wsi && wsi->tsi == tsi &&
337 			    task->wanted_writeable_cb) {
338 
339 				task->wanted_writeable_cb = 0;
340 				lws_memory_barrier();
341 
342 				/*
343 				 * finally... we can ask for the callback on
344 				 * writable from the correct service thread
345 				 * context
346 				 */
347 
348 				lws_callback_on_writable(wsi);
349 			}
350 
351 			c = &task->task_queue_next;
352 		}
353 
354 		tp = tp->tp_list;
355 	}
356 
357 	lws_context_unlock(context);
358 
359 	return 0;
360 }
361 
362 static int
lws_threadpool_worker_sync(struct lws_pool * pool,struct lws_threadpool_task * task)363 lws_threadpool_worker_sync(struct lws_pool *pool,
364 			   struct lws_threadpool_task *task)
365 {
366 	enum lws_threadpool_task_status temp;
367 	struct timespec abstime;
368 	struct lws *wsi;
369 	int tries = 15;
370 
371 	/* block until writable acknowledges */
372 	lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC in\n", __func__, task);
373 	pthread_mutex_lock(&pool->lock); /* ======================= pool lock */
374 
375 	lwsl_info("%s: %s: task %p (%s): syncing with wsi %p\n", __func__,
376 		    pool->tp->name, task, task->name, task->args.wsi);
377 
378 	temp = task->status;
379 	state_transition(task, LWS_TP_STATUS_SYNCING);
380 	while (tries--) {
381 		wsi = task->args.wsi;
382 
383 		/*
384 		 * if the wsi is no longer attached to this task, there is
385 		 * nothing we can sync to usefully.  Since the work wants to
386 		 * sync, it means we should react to the situation by telling
387 		 * the task it can't continue usefully by stopping it.
388 		 */
389 
390 		if (!wsi) {
391 			lwsl_thread("%s: %s: task %p (%s): No longer bound to any "
392 				 "wsi to sync to\n", __func__, pool->tp->name,
393 				 task, task->name);
394 
395 			state_transition(task, LWS_TP_STATUS_STOPPING);
396 			goto done;
397 		}
398 
399 		/*
400 		 * So tries times this is the maximum time between SYNC asking
401 		 * for a callback on writable and actually getting it we are
402 		 * willing to sit still for.
403 		 *
404 		 * If it is exceeded, we will stop the task.
405 		 */
406 		abstime.tv_sec = time(NULL) + 2;
407 		abstime.tv_nsec = 0;
408 
409 		task->wanted_writeable_cb = 1;
410 		lws_memory_barrier();
411 
412 		/*
413 		 * This will cause lws_threadpool_tsi_context() to get called
414 		 * from each tsi service context, where we can safely ask for
415 		 * a callback on writeable on the wsi we are associated with.
416 		 */
417 		lws_cancel_service(lws_get_context(wsi));
418 
419 		/*
420 		 * so the danger here is that we asked for a writable callback
421 		 * on the wsi, but for whatever reason, we are never going to
422 		 * get one.  To avoid deadlocking forever, we allow a set time
423 		 * for the sync to happen naturally, otherwise the cond wait
424 		 * times out and we stop the task.
425 		 */
426 
427 		if (pthread_cond_timedwait(&task->wake_idle, &pool->lock,
428 					   &abstime) == ETIMEDOUT) {
429 			task->late_sync_retries++;
430 			if (!tries) {
431 				lwsl_err("%s: %s: task %p (%s): SYNC timed out "
432 					 "(associated wsi %p)\n",
433 					 __func__, pool->tp->name, task,
434 					 task->name, task->args.wsi);
435 
436 				state_transition(task, LWS_TP_STATUS_STOPPING);
437 				goto done;
438 			}
439 
440 			continue;
441 		} else
442 			break;
443 	}
444 
445 	if (task->status == LWS_TP_STATUS_SYNCING)
446 		state_transition(task, temp);
447 
448 	lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC out\n", __func__, task);
449 
450 done:
451 	pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */
452 
453 	return 0;
454 }
455 
456 static void *
lws_threadpool_worker(void * d)457 lws_threadpool_worker(void *d)
458 {
459 	struct lws_threadpool_task **c, **c2, *task;
460 	struct lws_pool *pool = d;
461 	struct lws_threadpool *tp = pool->tp;
462 	char buf[160];
463 
464 	while (!tp->destroying) {
465 
466 		/* we have no running task... wait and get one from the queue */
467 
468 		pthread_mutex_lock(&tp->lock); /* =================== tp lock */
469 
470 		/*
471 		 * if there's no task already waiting in the queue, wait for
472 		 * the wake_idle condition to signal us that might have changed
473 		 */
474 		while (!tp->task_queue_head && !tp->destroying)
475 			pthread_cond_wait(&tp->wake_idle, &tp->lock);
476 
477 		if (tp->destroying) {
478 			pthread_mutex_unlock(&tp->lock);  /* ------ tp unlock */
479 			continue;
480 		}
481 
482 		c = &tp->task_queue_head;
483 		c2 = NULL;
484 		task = NULL;
485 		pool->task = NULL;
486 
487 		/* look at the queue tail */
488 		while (*c) {
489 			c2 = c;
490 			c = &(*c)->task_queue_next;
491 		}
492 
493 		/* is there a task at the queue tail? */
494 		if (c2 && *c2) {
495 			pool->task = task = *c2;
496 			task->acquired = pool->acquired = lws_now_usecs();
497 			/* remove it from the queue */
498 			*c2 = task->task_queue_next;
499 			task->task_queue_next = NULL;
500 			tp->queue_depth--;
501 			/* mark it as running */
502 			state_transition(task, LWS_TP_STATUS_RUNNING);
503 		}
504 
505 		/* someone else got it first... wait and try again */
506 		if (!task) {
507 			pthread_mutex_unlock(&tp->lock);  /* ------ tp unlock */
508 			continue;
509 		}
510 
511 		task->wanted_writeable_cb = 0;
512 
513 		/* we have acquired a new task */
514 
515 		__lws_threadpool_task_dump(task, buf, sizeof(buf));
516 
517 		lwsl_thread("%s: %s: worker %d ACQUIRING: %s\n",
518 			    __func__, tp->name, pool->worker_index, buf);
519 		tp->running_tasks++;
520 
521 		pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
522 
523 		/*
524 		 * 1) The task can return with LWS_TP_RETURN_CHECKING_IN to
525 		 * "resurface" periodically, and get called again with
526 		 * cont = 1 immediately to indicate it is picking up where it
527 		 * left off if the task is not being "stopped".
528 		 *
529 		 * This allows long tasks to respond to requests to stop in
530 		 * a clean and opaque way.
531 		 *
532 		 * 2) The task can return with LWS_TP_RETURN_SYNC to register
533 		 * a "callback on writable" request on the service thread and
534 		 * block until it hears back from the WRITABLE handler.
535 		 *
536 		 * This allows the work on the thread to be synchronized to the
537 		 * previous work being dispatched cleanly.
538 		 *
539 		 * 3) The task can return with LWS_TP_RETURN_FINISHED to
540 		 * indicate its work is completed nicely.
541 		 *
542 		 * 4) The task can return with LWS_TP_RETURN_STOPPED to indicate
543 		 * it stopped and cleaned up after incomplete work.
544 		 */
545 
546 		do {
547 			lws_usec_t then;
548 			int n;
549 
550 			if (tp->destroying || !task->args.wsi) {
551 				lwsl_info("%s: stopping on wsi gone\n", __func__);
552 				state_transition(task, LWS_TP_STATUS_STOPPING);
553 			}
554 
555 			then = lws_now_usecs();
556 			n = task->args.task(task->args.user, task->status);
557 			lwsl_debug("   %d, status %d\n", n, task->status);
558 			us_accrue(&task->acc_running, then);
559 			if (n & LWS_TP_RETURN_FLAG_OUTLIVE)
560 				task->outlive = 1;
561 			switch (n & 7) {
562 			case LWS_TP_RETURN_CHECKING_IN:
563 				/* if not destroying the tp, continue */
564 				break;
565 			case LWS_TP_RETURN_SYNC:
566 				if (!task->args.wsi) {
567 					lwsl_debug("%s: task that wants to "
568 						    "outlive lost wsi asked "
569 						    "to sync: bypassed\n",
570 						    __func__);
571 					break;
572 				}
573 				/* block until writable acknowledges */
574 				then = lws_now_usecs();
575 				lws_threadpool_worker_sync(pool, task);
576 				us_accrue(&task->acc_syncing, then);
577 				break;
578 			case LWS_TP_RETURN_FINISHED:
579 				state_transition(task, LWS_TP_STATUS_FINISHED);
580 				break;
581 			case LWS_TP_RETURN_STOPPED:
582 				state_transition(task, LWS_TP_STATUS_STOPPED);
583 				break;
584 			}
585 		} while (task->status == LWS_TP_STATUS_RUNNING);
586 
587 		pthread_mutex_lock(&tp->lock); /* =================== tp lock */
588 
589 		tp->running_tasks--;
590 
591 		if (pool->task->status == LWS_TP_STATUS_STOPPING)
592 			state_transition(task, LWS_TP_STATUS_STOPPED);
593 
594 		/* move the task to the done queue */
595 
596 		pool->task->task_queue_next = tp->task_done_head;
597 		tp->task_done_head = task;
598 		tp->done_queue_depth++;
599 		pool->task->done = lws_now_usecs();
600 
601 		if (!pool->task->args.wsi &&
602 		    (pool->task->status == LWS_TP_STATUS_STOPPED ||
603 		     pool->task->status == LWS_TP_STATUS_FINISHED)) {
604 
605 			__lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
606 			lwsl_thread("%s: %s: worker %d REAPING: %s\n",
607 				    __func__, tp->name, pool->worker_index,
608 				    buf);
609 
610 			/*
611 			 * there is no longer any wsi attached, so nothing is
612 			 * going to take care of reaping us.  So we must take
613 			 * care of it ourselves.
614 			 */
615 			__lws_threadpool_reap(pool->task);
616 		} else {
617 
618 			__lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
619 			lwsl_thread("%s: %s: worker %d DONE: %s\n",
620 				    __func__, tp->name, pool->worker_index,
621 				    buf);
622 
623 			/* signal the associated wsi to take a fresh look at
624 			 * task status */
625 
626 			if (pool->task->args.wsi) {
627 				task->wanted_writeable_cb = 1;
628 
629 				lws_cancel_service(
630 					lws_get_context(pool->task->args.wsi));
631 			}
632 		}
633 
634 		pool->task = NULL;
635 		pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
636 	}
637 
638 	/* threadpool is being destroyed */
639 
640 	pthread_exit(NULL);
641 
642 	return NULL;
643 }
644 
645 struct lws_threadpool *
lws_threadpool_create(struct lws_context * context,const struct lws_threadpool_create_args * args,const char * format,...)646 lws_threadpool_create(struct lws_context *context,
647 		      const struct lws_threadpool_create_args *args,
648 		      const char *format, ...)
649 {
650 	struct lws_threadpool *tp;
651 	va_list ap;
652 	int n;
653 
654 	tp = lws_malloc(sizeof(*tp) + (sizeof(struct lws_pool) * args->threads),
655 			"threadpool alloc");
656 	if (!tp)
657 		return NULL;
658 
659 	memset(tp, 0, sizeof(*tp) + (sizeof(struct lws_pool) * args->threads));
660 	tp->pool_list = (struct lws_pool *)(tp + 1);
661 	tp->max_queue_depth = args->max_queue_depth;
662 
663 	va_start(ap, format);
664 	n = vsnprintf(tp->name, sizeof(tp->name) - 1, format, ap);
665 	va_end(ap);
666 
667 	lws_context_lock(context, __func__);
668 
669 	tp->context = context;
670 	tp->tp_list = context->tp_list_head;
671 	context->tp_list_head = tp;
672 
673 	lws_context_unlock(context);
674 
675 	pthread_mutex_init(&tp->lock, NULL);
676 	pthread_cond_init(&tp->wake_idle, NULL);
677 
678 	for (n = 0; n < args->threads; n++) {
679 #if defined(LWS_HAS_PTHREAD_SETNAME_NP)
680 		char name[16];
681 #endif
682 		tp->pool_list[n].tp = tp;
683 		tp->pool_list[n].worker_index = n;
684 		pthread_mutex_init(&tp->pool_list[n].lock, NULL);
685 		if (pthread_create(&tp->pool_list[n].thread, NULL,
686 				   lws_threadpool_worker, &tp->pool_list[n])) {
687 			lwsl_err("thread creation failed\n");
688 		} else {
689 #if defined(LWS_HAS_PTHREAD_SETNAME_NP)
690 			lws_snprintf(name, sizeof(name), "%s-%d", tp->name, n);
691 			pthread_setname_np(tp->pool_list[n].thread, name);
692 #endif
693 			tp->threads_in_pool++;
694 		}
695 	}
696 
697 	return tp;
698 }
699 
700 void
lws_threadpool_finish(struct lws_threadpool * tp)701 lws_threadpool_finish(struct lws_threadpool *tp)
702 {
703 	struct lws_threadpool_task **c, *task;
704 
705 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
706 
707 	/* nothing new can start, running jobs will abort as STOPPED and the
708 	 * pool threads will exit ASAP (they are joined in destroy) */
709 	tp->destroying = 1;
710 
711 	/* stop everyone in the pending queue and move to the done queue */
712 
713 	c = &tp->task_queue_head;
714 	while (*c) {
715 		task = *c;
716 		*c = task->task_queue_next;
717 		task->task_queue_next = tp->task_done_head;
718 		tp->task_done_head = task;
719 		state_transition(task, LWS_TP_STATUS_STOPPED);
720 		tp->queue_depth--;
721 		tp->done_queue_depth++;
722 		task->done = lws_now_usecs();
723 
724 		c = &task->task_queue_next;
725 	}
726 
727 	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
728 
729 	pthread_cond_broadcast(&tp->wake_idle);
730 }
731 
732 void
lws_threadpool_destroy(struct lws_threadpool * tp)733 lws_threadpool_destroy(struct lws_threadpool *tp)
734 {
735 	struct lws_threadpool_task *task, *next;
736 	struct lws_threadpool **ptp;
737 	void *retval;
738 	int n;
739 
740 	/* remove us from the context list of threadpools */
741 
742 	lws_context_lock(tp->context, __func__);
743 
744 	ptp = &tp->context->tp_list_head;
745 	while (*ptp) {
746 		if (*ptp == tp) {
747 			*ptp = tp->tp_list;
748 			break;
749 		}
750 		ptp = &(*ptp)->tp_list;
751 	}
752 
753 	lws_context_unlock(tp->context);
754 
755 
756 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
757 
758 	tp->destroying = 1;
759 	pthread_cond_broadcast(&tp->wake_idle);
760 	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
761 
762 	lws_threadpool_dump(tp);
763 
764 	for (n = 0; n < tp->threads_in_pool; n++) {
765 		task = tp->pool_list[n].task;
766 
767 		/* he could be sitting waiting for SYNC */
768 
769 		if (task != NULL)
770 			pthread_cond_broadcast(&task->wake_idle);
771 
772 		pthread_join(tp->pool_list[n].thread, &retval);
773 		pthread_mutex_destroy(&tp->pool_list[n].lock);
774 	}
775 	lwsl_info("%s: all threadpools exited\n", __func__);
776 
777 	task = tp->task_done_head;
778 	while (task) {
779 		next = task->task_queue_next;
780 		lws_threadpool_task_cleanup_destroy(task);
781 		tp->done_queue_depth--;
782 		task = next;
783 	}
784 
785 	pthread_mutex_destroy(&tp->lock);
786 
787 	lws_free(tp);
788 }
789 
790 /*
791  * we want to stop and destroy the task and related priv.  The wsi may no
792  * longer exist.
793  */
794 
795 int
lws_threadpool_dequeue(struct lws * wsi)796 lws_threadpool_dequeue(struct lws *wsi)
797 {
798 	struct lws_threadpool *tp;
799 	struct lws_threadpool_task **c, *task;
800 	int n;
801 
802 	task = wsi->tp_task;
803 	if (!task)
804 		return 0;
805 
806 	tp = task->tp;
807 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
808 
809 	if (task->outlive && !tp->destroying) {
810 
811 		/* disconnect from wsi, and wsi from task */
812 
813 		wsi->tp_task = NULL;
814 		task->args.wsi = NULL;
815 
816 		goto bail;
817 	}
818 
819 
820 	c = &tp->task_queue_head;
821 
822 	/* is he queued waiting for a chance to run?  Mark him as stopped and
823 	 * move him on to the done queue */
824 
825 	while (*c) {
826 		if ((*c) == task) {
827 			*c = task->task_queue_next;
828 			task->task_queue_next = tp->task_done_head;
829 			tp->task_done_head = task;
830 			state_transition(task, LWS_TP_STATUS_STOPPED);
831 			tp->queue_depth--;
832 			tp->done_queue_depth++;
833 			task->done = lws_now_usecs();
834 
835 			lwsl_debug("%s: tp %p: removed queued task wsi %p\n",
836 				    __func__, tp, task->args.wsi);
837 
838 			break;
839 		}
840 		c = &(*c)->task_queue_next;
841 	}
842 
843 	/* is he on the done queue? */
844 
845 	c = &tp->task_done_head;
846 	while (*c) {
847 		if ((*c) == task) {
848 			*c = task->task_queue_next;
849 			task->task_queue_next = NULL;
850 			lws_threadpool_task_cleanup_destroy(task);
851 			tp->done_queue_depth--;
852 			goto bail;
853 		}
854 		c = &(*c)->task_queue_next;
855 	}
856 
857 	/* he's not in the queue... is he already running on a thread? */
858 
859 	for (n = 0; n < tp->threads_in_pool; n++) {
860 		if (!tp->pool_list[n].task || tp->pool_list[n].task != task)
861 			continue;
862 
863 		/*
864 		 * ensure we don't collide with tests or changes in the
865 		 * worker thread
866 		 */
867 		pthread_mutex_lock(&tp->pool_list[n].lock);
868 
869 		/*
870 		 * mark him as having been requested to stop...
871 		 * the caller will hear about it in his service thread
872 		 * context as a request to close
873 		 */
874 		state_transition(task, LWS_TP_STATUS_STOPPING);
875 
876 		/* disconnect from wsi, and wsi from task */
877 
878 		task->args.wsi->tp_task = NULL;
879 		task->args.wsi = NULL;
880 
881 		pthread_mutex_unlock(&tp->pool_list[n].lock);
882 
883 		lwsl_debug("%s: tp %p: request stop running task "
884 			    "for wsi %p\n", __func__, tp, task->args.wsi);
885 
886 		break;
887 	}
888 
889 	if (n == tp->threads_in_pool) {
890 		/* can't find it */
891 		lwsl_notice("%s: tp %p: no task for wsi %p, decoupling\n",
892 			    __func__, tp, task->args.wsi);
893 		task->args.wsi->tp_task = NULL;
894 		task->args.wsi = NULL;
895 	}
896 
897 bail:
898 	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
899 
900 	return 0;
901 }
902 
903 struct lws_threadpool_task *
lws_threadpool_enqueue(struct lws_threadpool * tp,const struct lws_threadpool_task_args * args,const char * format,...)904 lws_threadpool_enqueue(struct lws_threadpool *tp,
905 		       const struct lws_threadpool_task_args *args,
906 		       const char *format, ...)
907 {
908 	struct lws_threadpool_task *task = NULL;
909 	va_list ap;
910 
911 	if (tp->destroying)
912 		return NULL;
913 
914 	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
915 
916 	/*
917 	 * if there's room on the queue, the job always goes on the queue
918 	 * first, then any free thread may pick it up after the wake_idle
919 	 */
920 
921 	if (tp->queue_depth == tp->max_queue_depth) {
922 		lwsl_notice("%s: queue reached limit %d\n", __func__,
923 			    tp->max_queue_depth);
924 
925 		goto bail;
926 	}
927 
928 	/*
929 	 * create the task object
930 	 */
931 
932 	task = lws_malloc(sizeof(*task), __func__);
933 	if (!task)
934 		goto bail;
935 
936 	memset(task, 0, sizeof(*task));
937 	pthread_cond_init(&task->wake_idle, NULL);
938 	task->args = *args;
939 	task->tp = tp;
940 	task->created = lws_now_usecs();
941 
942 	va_start(ap, format);
943 	vsnprintf(task->name, sizeof(task->name) - 1, format, ap);
944 	va_end(ap);
945 
946 	/*
947 	 * add him on the tp task queue
948 	 */
949 
950 	task->task_queue_next = tp->task_queue_head;
951 	state_transition(task, LWS_TP_STATUS_QUEUED);
952 	tp->task_queue_head = task;
953 	tp->queue_depth++;
954 
955 	/*
956 	 * mark the wsi itself as depending on this tp (so wsi close for
957 	 * whatever reason can clean up)
958 	 */
959 
960 	args->wsi->tp_task = task;
961 
962 	lwsl_thread("%s: tp %s: enqueued task %p (%s) for wsi %p, depth %d\n",
963 		    __func__, tp->name, task, task->name, args->wsi,
964 		    tp->queue_depth);
965 
966 	/* alert any idle thread there's something new on the task list */
967 
968 	lws_memory_barrier();
969 	pthread_cond_signal(&tp->wake_idle);
970 
971 bail:
972 	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
973 
974 	return task;
975 }
976 
977 /* this should be called from the service thread */
978 
979 enum lws_threadpool_task_status
lws_threadpool_task_status_wsi(struct lws * wsi,struct lws_threadpool_task ** task,void ** user)980 lws_threadpool_task_status_wsi(struct lws *wsi,
981 			       struct lws_threadpool_task **task, void **user)
982 {
983 	enum lws_threadpool_task_status status;
984 	struct lws_threadpool *tp;
985 
986 	*task = wsi->tp_task;
987 	if (!*task)
988 		return -1;
989 
990 	tp = (*task)->tp;
991 	*user = (*task)->args.user;
992 	status = (*task)->status;
993 
994 	if (status == LWS_TP_STATUS_FINISHED ||
995 	    status == LWS_TP_STATUS_STOPPED) {
996 		char buf[160];
997 
998 		pthread_mutex_lock(&tp->lock); /* ================ tpool lock */
999 		__lws_threadpool_task_dump(*task, buf, sizeof(buf));
1000 		lwsl_thread("%s: %s: service thread REAPING: %s\n",
1001 			    __func__, tp->name, buf);
1002 		__lws_threadpool_reap(*task);
1003 		lws_memory_barrier();
1004 		pthread_mutex_unlock(&tp->lock); /* ------------ tpool unlock */
1005 	}
1006 
1007 	return status;
1008 }
1009 
1010 void
lws_threadpool_task_sync(struct lws_threadpool_task * task,int stop)1011 lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop)
1012 {
1013 	lwsl_debug("%s\n", __func__);
1014 
1015 	if (stop)
1016 		state_transition(task, LWS_TP_STATUS_STOPPING);
1017 
1018 	pthread_cond_signal(&task->wake_idle);
1019 }
1020