1 /* GLIB - Library of useful routines for C programming
2 * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald
3 *
4 * GThreadPool: thread pool implementation.
5 * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22 * MT safe
23 */
24
25 #include "config.h"
26
27 #include "gthreadpool.h"
28
29 #include "gasyncqueue.h"
30 #include "gasyncqueueprivate.h"
31 #include "gmain.h"
32 #include "gtestutils.h"
33 #include "gthreadprivate.h"
34 #include "gtimer.h"
35 #include "gutils.h"
36
37 /**
38 * SECTION:thread_pools
39 * @title: Thread Pools
40 * @short_description: pools of threads to execute work concurrently
41 * @see_also: #GThread
42 *
43 * Sometimes you wish to asynchronously fork out the execution of work
44 * and continue working in your own thread. If that will happen often,
45 * the overhead of starting and destroying a thread each time might be
46 * too high. In such cases reusing already started threads seems like a
47 * good idea. And it indeed is, but implementing this can be tedious
48 * and error-prone.
49 *
50 * Therefore GLib provides thread pools for your convenience. An added
51 * advantage is, that the threads can be shared between the different
52 * subsystems of your program, when they are using GLib.
53 *
54 * To create a new thread pool, you use g_thread_pool_new().
55 * It is destroyed by g_thread_pool_free().
56 *
57 * If you want to execute a certain task within a thread pool,
58 * you call g_thread_pool_push().
59 *
60 * To get the current number of running threads you call
61 * g_thread_pool_get_num_threads(). To get the number of still
62 * unprocessed tasks you call g_thread_pool_unprocessed(). To control
63 * the maximal number of threads for a thread pool, you use
64 * g_thread_pool_get_max_threads() and g_thread_pool_set_max_threads().
65 *
66 * Finally you can control the number of unused threads, that are kept
67 * alive by GLib for future use. The current number can be fetched with
68 * g_thread_pool_get_num_unused_threads(). The maximal number can be
69 * controlled by g_thread_pool_get_max_unused_threads() and
70 * g_thread_pool_set_max_unused_threads(). All currently unused threads
71 * can be stopped by calling g_thread_pool_stop_unused_threads().
72 */
73
74 #define DEBUG_MSG(x)
75 /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n"); */
76
77 typedef struct _GRealThreadPool GRealThreadPool;
78
79 /**
80 * GThreadPool:
81 * @func: the function to execute in the threads of this pool
82 * @user_data: the user data for the threads of this pool
83 * @exclusive: are all threads exclusive to this pool
84 *
85 * The #GThreadPool struct represents a thread pool. It has three
86 * public read-only members, but the underlying struct is bigger,
87 * so you must not copy this struct.
88 */
89 struct _GRealThreadPool
90 {
91 GThreadPool pool;
92 GAsyncQueue *queue;
93 GCond cond;
94 gint max_threads;
95 guint num_threads;
96 gboolean running;
97 gboolean immediate;
98 gboolean waiting;
99 GCompareDataFunc sort_func;
100 gpointer sort_user_data;
101 };
102
103 /* The following is just an address to mark the wakeup order for a
104 * thread, it could be any address (as long, as it isn't a valid
105 * GThreadPool address)
106 */
107 static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
108 static gint wakeup_thread_serial = 0;
109
110 /* Here all unused threads are waiting */
111 static GAsyncQueue *unused_thread_queue = NULL;
112 static gint unused_threads = 0;
113 static gint max_unused_threads = 2;
114 static gint kill_unused_threads = 0;
115 static guint max_idle_time = 15 * 1000;
116
117 static GThreadSchedulerSettings shared_thread_scheduler_settings;
118 static gboolean have_shared_thread_scheduler_settings = FALSE;
119
120 typedef struct
121 {
122 /* Either thread or error are set in the end. Both transfer-full. */
123 GThreadPool *pool;
124 GThread *thread;
125 GError *error;
126 } SpawnThreadData;
127
128 static GCond spawn_thread_cond;
129 static GAsyncQueue *spawn_thread_queue;
130
131 static void g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
132 gpointer data);
133 static void g_thread_pool_free_internal (GRealThreadPool *pool);
134 static gpointer g_thread_pool_thread_proxy (gpointer data);
135 static gboolean g_thread_pool_start_thread (GRealThreadPool *pool,
136 GError **error);
137 static void g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool);
138 static GRealThreadPool* g_thread_pool_wait_for_new_pool (void);
139 static gpointer g_thread_pool_wait_for_new_task (GRealThreadPool *pool);
140
141 static void
g_thread_pool_queue_push_unlocked(GRealThreadPool * pool,gpointer data)142 g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
143 gpointer data)
144 {
145 if (pool->sort_func)
146 g_async_queue_push_sorted_unlocked (pool->queue,
147 data,
148 pool->sort_func,
149 pool->sort_user_data);
150 else
151 g_async_queue_push_unlocked (pool->queue, data);
152 }
153
154 static GRealThreadPool*
g_thread_pool_wait_for_new_pool(void)155 g_thread_pool_wait_for_new_pool (void)
156 {
157 GRealThreadPool *pool;
158 gint local_wakeup_thread_serial;
159 guint local_max_unused_threads;
160 gint local_max_idle_time;
161 gint last_wakeup_thread_serial;
162 gboolean have_relayed_thread_marker = FALSE;
163
164 local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
165 local_max_idle_time = g_atomic_int_get (&max_idle_time);
166 last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
167
168 g_atomic_int_inc (&unused_threads);
169
170 do
171 {
172 if ((guint) g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
173 {
174 /* If this is a superfluous thread, stop it. */
175 pool = NULL;
176 }
177 else if (local_max_idle_time > 0)
178 {
179 /* If a maximal idle time is given, wait for the given time. */
180 DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
181 g_thread_self (), local_max_idle_time / 1000.0));
182
183 pool = g_async_queue_timeout_pop (unused_thread_queue,
184 local_max_idle_time * 1000);
185 }
186 else
187 {
188 /* If no maximal idle time is given, wait indefinitely. */
189 DEBUG_MSG (("thread %p waiting in global pool.", g_thread_self ()));
190 pool = g_async_queue_pop (unused_thread_queue);
191 }
192
193 if (pool == wakeup_thread_marker)
194 {
195 local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
196 if (last_wakeup_thread_serial == local_wakeup_thread_serial)
197 {
198 if (!have_relayed_thread_marker)
199 {
200 /* If this wakeup marker has been received for
201 * the second time, relay it.
202 */
203 DEBUG_MSG (("thread %p relaying wakeup message to "
204 "waiting thread with lower serial.",
205 g_thread_self ()));
206
207 g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
208 have_relayed_thread_marker = TRUE;
209
210 /* If a wakeup marker has been relayed, this thread
211 * will get out of the way for 100 microseconds to
212 * avoid receiving this marker again.
213 */
214 g_usleep (100);
215 }
216 }
217 else
218 {
219 if (g_atomic_int_add (&kill_unused_threads, -1) > 0)
220 {
221 pool = NULL;
222 break;
223 }
224
225 DEBUG_MSG (("thread %p updating to new limits.",
226 g_thread_self ()));
227
228 local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
229 local_max_idle_time = g_atomic_int_get (&max_idle_time);
230 last_wakeup_thread_serial = local_wakeup_thread_serial;
231
232 have_relayed_thread_marker = FALSE;
233 }
234 }
235 }
236 while (pool == wakeup_thread_marker);
237
238 g_atomic_int_add (&unused_threads, -1);
239
240 return pool;
241 }
242
243 static gpointer
g_thread_pool_wait_for_new_task(GRealThreadPool * pool)244 g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
245 {
246 gpointer task = NULL;
247
248 if (pool->running || (!pool->immediate &&
249 g_async_queue_length_unlocked (pool->queue) > 0))
250 {
251 /* This thread pool is still active. */
252 if (pool->max_threads != -1 && pool->num_threads > (guint) pool->max_threads)
253 {
254 /* This is a superfluous thread, so it goes to the global pool. */
255 DEBUG_MSG (("superfluous thread %p in pool %p.",
256 g_thread_self (), pool));
257 }
258 else if (pool->pool.exclusive)
259 {
260 /* Exclusive threads stay attached to the pool. */
261 task = g_async_queue_pop_unlocked (pool->queue);
262
263 DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
264 "(%d running, %d unprocessed).",
265 g_thread_self (), pool, pool->num_threads,
266 g_async_queue_length_unlocked (pool->queue)));
267 }
268 else
269 {
270 /* A thread will wait for new tasks for at most 1/2
271 * second before going to the global pool.
272 */
273 DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
274 "(%d running, %d unprocessed).",
275 g_thread_self (), pool, pool->num_threads,
276 g_async_queue_length_unlocked (pool->queue)));
277
278 task = g_async_queue_timeout_pop_unlocked (pool->queue,
279 G_USEC_PER_SEC / 2);
280 }
281 }
282 else
283 {
284 /* This thread pool is inactive, it will no longer process tasks. */
285 DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
286 "(running: %s, immediate: %s, len: %d).",
287 pool, g_thread_self (),
288 pool->running ? "true" : "false",
289 pool->immediate ? "true" : "false",
290 g_async_queue_length_unlocked (pool->queue)));
291 }
292
293 return task;
294 }
295
296 static gpointer
g_thread_pool_spawn_thread(gpointer data)297 g_thread_pool_spawn_thread (gpointer data)
298 {
299 while (TRUE)
300 {
301 SpawnThreadData *spawn_thread_data;
302 GThread *thread = NULL;
303 GError *error = NULL;
304 const gchar *prgname = g_get_prgname ();
305 gchar name[16] = "pool";
306
307 if (prgname)
308 g_snprintf (name, sizeof (name), "pool-%s", prgname);
309
310 g_async_queue_lock (spawn_thread_queue);
311 /* Spawn a new thread for the given pool and wake the requesting thread
312 * up again with the result. This new thread will have the scheduler
313 * settings inherited from this thread and in extension of the thread
314 * that created the first non-exclusive thread-pool. */
315 spawn_thread_data = g_async_queue_pop_unlocked (spawn_thread_queue);
316 thread = g_thread_try_new (name, g_thread_pool_thread_proxy, spawn_thread_data->pool, &error);
317
318 spawn_thread_data->thread = g_steal_pointer (&thread);
319 spawn_thread_data->error = g_steal_pointer (&error);
320
321 g_cond_broadcast (&spawn_thread_cond);
322 g_async_queue_unlock (spawn_thread_queue);
323 }
324
325 return NULL;
326 }
327
328 static gpointer
g_thread_pool_thread_proxy(gpointer data)329 g_thread_pool_thread_proxy (gpointer data)
330 {
331 GRealThreadPool *pool;
332
333 pool = data;
334
335 DEBUG_MSG (("thread %p started for pool %p.", g_thread_self (), pool));
336
337 g_async_queue_lock (pool->queue);
338
339 while (TRUE)
340 {
341 gpointer task;
342
343 task = g_thread_pool_wait_for_new_task (pool);
344 if (task)
345 {
346 if (pool->running || !pool->immediate)
347 {
348 /* A task was received and the thread pool is active,
349 * so execute the function.
350 */
351 g_async_queue_unlock (pool->queue);
352 DEBUG_MSG (("thread %p in pool %p calling func.",
353 g_thread_self (), pool));
354 pool->pool.func (task, pool->pool.user_data);
355 g_async_queue_lock (pool->queue);
356 }
357 }
358 else
359 {
360 /* No task was received, so this thread goes to the global pool. */
361 gboolean free_pool = FALSE;
362
363 DEBUG_MSG (("thread %p leaving pool %p for global pool.",
364 g_thread_self (), pool));
365 pool->num_threads--;
366
367 if (!pool->running)
368 {
369 if (!pool->waiting)
370 {
371 if (pool->num_threads == 0)
372 {
373 /* If the pool is not running and no other
374 * thread is waiting for this thread pool to
375 * finish and this is the last thread of this
376 * pool, free the pool.
377 */
378 free_pool = TRUE;
379 }
380 else
381 {
382 /* If the pool is not running and no other
383 * thread is waiting for this thread pool to
384 * finish and this is not the last thread of
385 * this pool and there are no tasks left in the
386 * queue, wakeup the remaining threads.
387 */
388 if (g_async_queue_length_unlocked (pool->queue) ==
389 (gint) -pool->num_threads)
390 g_thread_pool_wakeup_and_stop_all (pool);
391 }
392 }
393 else if (pool->immediate ||
394 g_async_queue_length_unlocked (pool->queue) <= 0)
395 {
396 /* If the pool is not running and another thread is
397 * waiting for this thread pool to finish and there
398 * are either no tasks left or the pool shall stop
399 * immediately, inform the waiting thread of a change
400 * of the thread pool state.
401 */
402 g_cond_broadcast (&pool->cond);
403 }
404 }
405
406 g_async_queue_unlock (pool->queue);
407
408 if (free_pool)
409 g_thread_pool_free_internal (pool);
410
411 if ((pool = g_thread_pool_wait_for_new_pool ()) == NULL)
412 break;
413
414 g_async_queue_lock (pool->queue);
415
416 DEBUG_MSG (("thread %p entering pool %p from global pool.",
417 g_thread_self (), pool));
418
419 /* pool->num_threads++ is not done here, but in
420 * g_thread_pool_start_thread to make the new started
421 * thread known to the pool before itself can do it.
422 */
423 }
424 }
425
426 return NULL;
427 }
428
429 static gboolean
g_thread_pool_start_thread(GRealThreadPool * pool,GError ** error)430 g_thread_pool_start_thread (GRealThreadPool *pool,
431 GError **error)
432 {
433 gboolean success = FALSE;
434
435 if (pool->max_threads != -1 && pool->num_threads >= (guint) pool->max_threads)
436 /* Enough threads are already running */
437 return TRUE;
438
439 g_async_queue_lock (unused_thread_queue);
440
441 if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
442 {
443 g_async_queue_push_unlocked (unused_thread_queue, pool);
444 success = TRUE;
445 }
446
447 g_async_queue_unlock (unused_thread_queue);
448
449 if (!success)
450 {
451 const gchar *prgname = g_get_prgname ();
452 gchar name[16] = "pool";
453 GThread *thread;
454
455 if (prgname)
456 g_snprintf (name, sizeof (name), "pool-%s", prgname);
457
458 /* No thread was found, we have to start a new one */
459 if (pool->pool.exclusive)
460 {
461 /* For exclusive thread-pools this is directly called from new() and
462 * we simply start new threads that inherit the scheduler settings
463 * from the current thread.
464 */
465 thread = g_thread_try_new (name, g_thread_pool_thread_proxy, pool, error);
466 }
467 else
468 {
469 /* For non-exclusive thread-pools this can be called at any time
470 * when a new thread is needed. We make sure to create a new thread
471 * here with the correct scheduler settings: either by directly
472 * providing them if supported by the GThread implementation or by
473 * going via our helper thread.
474 */
475 if (have_shared_thread_scheduler_settings)
476 {
477 thread = g_thread_new_internal (name, g_thread_proxy, g_thread_pool_thread_proxy, pool, 0, &shared_thread_scheduler_settings, error);
478 }
479 else
480 {
481 SpawnThreadData spawn_thread_data = { (GThreadPool *) pool, NULL, NULL };
482
483 g_async_queue_lock (spawn_thread_queue);
484
485 g_async_queue_push_unlocked (spawn_thread_queue, &spawn_thread_data);
486
487 while (!spawn_thread_data.thread && !spawn_thread_data.error)
488 g_cond_wait (&spawn_thread_cond, _g_async_queue_get_mutex (spawn_thread_queue));
489
490 thread = spawn_thread_data.thread;
491 if (!thread)
492 g_propagate_error (error, g_steal_pointer (&spawn_thread_data.error));
493 g_async_queue_unlock (spawn_thread_queue);
494 }
495 }
496
497 if (thread == NULL)
498 return FALSE;
499
500 g_thread_unref (thread);
501 }
502
503 /* See comment in g_thread_pool_thread_proxy as to why this is done
504 * here and not there
505 */
506 pool->num_threads++;
507
508 return TRUE;
509 }
510
511 /**
512 * g_thread_pool_new:
513 * @func: a function to execute in the threads of the new thread pool
514 * @user_data: user data that is handed over to @func every time it
515 * is called
516 * @max_threads: the maximal number of threads to execute concurrently
517 * in the new thread pool, -1 means no limit
518 * @exclusive: should this thread pool be exclusive?
519 * @error: return location for error, or %NULL
520 *
521 * This function creates a new thread pool.
522 *
523 * Whenever you call g_thread_pool_push(), either a new thread is
524 * created or an unused one is reused. At most @max_threads threads
525 * are running concurrently for this thread pool. @max_threads = -1
526 * allows unlimited threads to be created for this thread pool. The
527 * newly created or reused thread now executes the function @func
528 * with the two arguments. The first one is the parameter to
529 * g_thread_pool_push() and the second one is @user_data.
530 *
531 * Pass g_get_num_processors() to @max_threads to create as many threads as
532 * there are logical processors on the system. This will not pin each thread to
533 * a specific processor.
534 *
535 * The parameter @exclusive determines whether the thread pool owns
536 * all threads exclusive or shares them with other thread pools.
537 * If @exclusive is %TRUE, @max_threads threads are started
538 * immediately and they will run exclusively for this thread pool
539 * until it is destroyed by g_thread_pool_free(). If @exclusive is
540 * %FALSE, threads are created when needed and shared between all
541 * non-exclusive thread pools. This implies that @max_threads may
542 * not be -1 for exclusive thread pools. Besides, exclusive thread
543 * pools are not affected by g_thread_pool_set_max_idle_time()
544 * since their threads are never considered idle and returned to the
545 * global pool.
546 *
547 * @error can be %NULL to ignore errors, or non-%NULL to report
548 * errors. An error can only occur when @exclusive is set to %TRUE
549 * and not all @max_threads threads could be created.
550 * See #GThreadError for possible errors that may occur.
551 * Note, even in case of error a valid #GThreadPool is returned.
552 *
553 * Returns: the new #GThreadPool
554 */
555 GThreadPool *
g_thread_pool_new(GFunc func,gpointer user_data,gint max_threads,gboolean exclusive,GError ** error)556 g_thread_pool_new (GFunc func,
557 gpointer user_data,
558 gint max_threads,
559 gboolean exclusive,
560 GError **error)
561 {
562 GRealThreadPool *retval;
563 G_LOCK_DEFINE_STATIC (init);
564
565 g_return_val_if_fail (func, NULL);
566 g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
567 g_return_val_if_fail (max_threads >= -1, NULL);
568
569 retval = g_new (GRealThreadPool, 1);
570
571 retval->pool.func = func;
572 retval->pool.user_data = user_data;
573 retval->pool.exclusive = exclusive;
574 retval->queue = g_async_queue_new ();
575 g_cond_init (&retval->cond);
576 retval->max_threads = max_threads;
577 retval->num_threads = 0;
578 retval->running = TRUE;
579 retval->immediate = FALSE;
580 retval->waiting = FALSE;
581 retval->sort_func = NULL;
582 retval->sort_user_data = NULL;
583
584 G_LOCK (init);
585 if (!unused_thread_queue)
586 unused_thread_queue = g_async_queue_new ();
587
588 /* For the very first non-exclusive thread-pool we remember the thread
589 * scheduler settings of the thread creating the pool, if supported by
590 * the GThread implementation. This is then used for making sure that
591 * all threads created on the non-exclusive thread-pool have the same
592 * scheduler settings, and more importantly don't just inherit them
593 * from the thread that just happened to push a new task and caused
594 * a new thread to be created.
595 *
596 * Not doing so could cause real-time priority threads or otherwise
597 * threads with problematic scheduler settings to be part of the
598 * non-exclusive thread-pools.
599 *
600 * If this is not supported by the GThread implementation then we here
601 * start a thread that will inherit the scheduler settings from this
602 * very thread and whose only purpose is to spawn new threads with the
603 * same settings for use by the non-exclusive thread-pools.
604 *
605 *
606 * For non-exclusive thread-pools this is not required as all threads
607 * are created immediately below and are running forever, so they will
608 * automatically inherit the scheduler settings from this very thread.
609 */
610 if (!exclusive && !have_shared_thread_scheduler_settings && !spawn_thread_queue)
611 {
612 if (g_thread_get_scheduler_settings (&shared_thread_scheduler_settings))
613 {
614 have_shared_thread_scheduler_settings = TRUE;
615 }
616 else
617 {
618 spawn_thread_queue = g_async_queue_new ();
619 g_cond_init (&spawn_thread_cond);
620 g_thread_new ("pool-spawner", g_thread_pool_spawn_thread, NULL);
621 }
622 }
623 G_UNLOCK (init);
624
625 if (retval->pool.exclusive)
626 {
627 g_async_queue_lock (retval->queue);
628
629 while (retval->num_threads < (guint) retval->max_threads)
630 {
631 GError *local_error = NULL;
632
633 if (!g_thread_pool_start_thread (retval, &local_error))
634 {
635 g_propagate_error (error, local_error);
636 break;
637 }
638 }
639
640 g_async_queue_unlock (retval->queue);
641 }
642
643 return (GThreadPool*) retval;
644 }
645
646 /**
647 * g_thread_pool_push:
648 * @pool: a #GThreadPool
649 * @data: a new task for @pool
650 * @error: return location for error, or %NULL
651 *
652 * Inserts @data into the list of tasks to be executed by @pool.
653 *
654 * When the number of currently running threads is lower than the
655 * maximal allowed number of threads, a new thread is started (or
656 * reused) with the properties given to g_thread_pool_new().
657 * Otherwise, @data stays in the queue until a thread in this pool
658 * finishes its previous task and processes @data.
659 *
660 * @error can be %NULL to ignore errors, or non-%NULL to report
661 * errors. An error can only occur when a new thread couldn't be
662 * created. In that case @data is simply appended to the queue of
663 * work to do.
664 *
665 * Before version 2.32, this function did not return a success status.
666 *
667 * Returns: %TRUE on success, %FALSE if an error occurred
668 */
669 gboolean
g_thread_pool_push(GThreadPool * pool,gpointer data,GError ** error)670 g_thread_pool_push (GThreadPool *pool,
671 gpointer data,
672 GError **error)
673 {
674 GRealThreadPool *real;
675 gboolean result;
676
677 real = (GRealThreadPool*) pool;
678
679 g_return_val_if_fail (real, FALSE);
680 g_return_val_if_fail (real->running, FALSE);
681
682 result = TRUE;
683
684 g_async_queue_lock (real->queue);
685
686 if (g_async_queue_length_unlocked (real->queue) >= 0)
687 {
688 /* No thread is waiting in the queue */
689 GError *local_error = NULL;
690
691 if (!g_thread_pool_start_thread (real, &local_error))
692 {
693 g_propagate_error (error, local_error);
694 result = FALSE;
695 }
696 }
697
698 g_thread_pool_queue_push_unlocked (real, data);
699 g_async_queue_unlock (real->queue);
700
701 return result;
702 }
703
704 /**
705 * g_thread_pool_set_max_threads:
706 * @pool: a #GThreadPool
707 * @max_threads: a new maximal number of threads for @pool,
708 * or -1 for unlimited
709 * @error: return location for error, or %NULL
710 *
711 * Sets the maximal allowed number of threads for @pool.
712 * A value of -1 means that the maximal number of threads
713 * is unlimited. If @pool is an exclusive thread pool, setting
714 * the maximal number of threads to -1 is not allowed.
715 *
716 * Setting @max_threads to 0 means stopping all work for @pool.
717 * It is effectively frozen until @max_threads is set to a non-zero
718 * value again.
719 *
720 * A thread is never terminated while calling @func, as supplied by
721 * g_thread_pool_new(). Instead the maximal number of threads only
722 * has effect for the allocation of new threads in g_thread_pool_push().
723 * A new thread is allocated, whenever the number of currently
724 * running threads in @pool is smaller than the maximal number.
725 *
726 * @error can be %NULL to ignore errors, or non-%NULL to report
727 * errors. An error can only occur when a new thread couldn't be
728 * created.
729 *
730 * Before version 2.32, this function did not return a success status.
731 *
732 * Returns: %TRUE on success, %FALSE if an error occurred
733 */
734 gboolean
g_thread_pool_set_max_threads(GThreadPool * pool,gint max_threads,GError ** error)735 g_thread_pool_set_max_threads (GThreadPool *pool,
736 gint max_threads,
737 GError **error)
738 {
739 GRealThreadPool *real;
740 gint to_start;
741 gboolean result;
742
743 real = (GRealThreadPool*) pool;
744
745 g_return_val_if_fail (real, FALSE);
746 g_return_val_if_fail (real->running, FALSE);
747 g_return_val_if_fail (!real->pool.exclusive || max_threads != -1, FALSE);
748 g_return_val_if_fail (max_threads >= -1, FALSE);
749
750 result = TRUE;
751
752 g_async_queue_lock (real->queue);
753
754 real->max_threads = max_threads;
755
756 if (pool->exclusive)
757 to_start = real->max_threads - real->num_threads;
758 else
759 to_start = g_async_queue_length_unlocked (real->queue);
760
761 for ( ; to_start > 0; to_start--)
762 {
763 GError *local_error = NULL;
764
765 if (!g_thread_pool_start_thread (real, &local_error))
766 {
767 g_propagate_error (error, local_error);
768 result = FALSE;
769 break;
770 }
771 }
772
773 g_async_queue_unlock (real->queue);
774
775 return result;
776 }
777
778 /**
779 * g_thread_pool_get_max_threads:
780 * @pool: a #GThreadPool
781 *
782 * Returns the maximal number of threads for @pool.
783 *
784 * Returns: the maximal number of threads
785 */
786 gint
g_thread_pool_get_max_threads(GThreadPool * pool)787 g_thread_pool_get_max_threads (GThreadPool *pool)
788 {
789 GRealThreadPool *real;
790 gint retval;
791
792 real = (GRealThreadPool*) pool;
793
794 g_return_val_if_fail (real, 0);
795 g_return_val_if_fail (real->running, 0);
796
797 g_async_queue_lock (real->queue);
798 retval = real->max_threads;
799 g_async_queue_unlock (real->queue);
800
801 return retval;
802 }
803
804 /**
805 * g_thread_pool_get_num_threads:
806 * @pool: a #GThreadPool
807 *
808 * Returns the number of threads currently running in @pool.
809 *
810 * Returns: the number of threads currently running
811 */
812 guint
g_thread_pool_get_num_threads(GThreadPool * pool)813 g_thread_pool_get_num_threads (GThreadPool *pool)
814 {
815 GRealThreadPool *real;
816 guint retval;
817
818 real = (GRealThreadPool*) pool;
819
820 g_return_val_if_fail (real, 0);
821 g_return_val_if_fail (real->running, 0);
822
823 g_async_queue_lock (real->queue);
824 retval = real->num_threads;
825 g_async_queue_unlock (real->queue);
826
827 return retval;
828 }
829
830 /**
831 * g_thread_pool_unprocessed:
832 * @pool: a #GThreadPool
833 *
834 * Returns the number of tasks still unprocessed in @pool.
835 *
836 * Returns: the number of unprocessed tasks
837 */
838 guint
g_thread_pool_unprocessed(GThreadPool * pool)839 g_thread_pool_unprocessed (GThreadPool *pool)
840 {
841 GRealThreadPool *real;
842 gint unprocessed;
843
844 real = (GRealThreadPool*) pool;
845
846 g_return_val_if_fail (real, 0);
847 g_return_val_if_fail (real->running, 0);
848
849 unprocessed = g_async_queue_length (real->queue);
850
851 return MAX (unprocessed, 0);
852 }
853
854 /**
855 * g_thread_pool_free:
856 * @pool: a #GThreadPool
857 * @immediate: should @pool shut down immediately?
858 * @wait_: should the function wait for all tasks to be finished?
859 *
860 * Frees all resources allocated for @pool.
861 *
862 * If @immediate is %TRUE, no new task is processed for @pool.
863 * Otherwise @pool is not freed before the last task is processed.
864 * Note however, that no thread of this pool is interrupted while
865 * processing a task. Instead at least all still running threads
866 * can finish their tasks before the @pool is freed.
867 *
868 * If @wait_ is %TRUE, this function does not return before all
869 * tasks to be processed (dependent on @immediate, whether all
870 * or only the currently running) are ready.
871 * Otherwise this function returns immediately.
872 *
873 * After calling this function @pool must not be used anymore.
874 */
875 void
g_thread_pool_free(GThreadPool * pool,gboolean immediate,gboolean wait_)876 g_thread_pool_free (GThreadPool *pool,
877 gboolean immediate,
878 gboolean wait_)
879 {
880 GRealThreadPool *real;
881
882 real = (GRealThreadPool*) pool;
883
884 g_return_if_fail (real);
885 g_return_if_fail (real->running);
886
887 /* If there's no thread allowed here, there is not much sense in
888 * not stopping this pool immediately, when it's not empty
889 */
890 g_return_if_fail (immediate ||
891 real->max_threads != 0 ||
892 g_async_queue_length (real->queue) == 0);
893
894 g_async_queue_lock (real->queue);
895
896 real->running = FALSE;
897 real->immediate = immediate;
898 real->waiting = wait_;
899
900 if (wait_)
901 {
902 while (g_async_queue_length_unlocked (real->queue) != (gint) -real->num_threads &&
903 !(immediate && real->num_threads == 0))
904 g_cond_wait (&real->cond, _g_async_queue_get_mutex (real->queue));
905 }
906
907 if (immediate || g_async_queue_length_unlocked (real->queue) == (gint) -real->num_threads)
908 {
909 /* No thread is currently doing something (and nothing is left
910 * to process in the queue)
911 */
912 if (real->num_threads == 0)
913 {
914 /* No threads left, we clean up */
915 g_async_queue_unlock (real->queue);
916 g_thread_pool_free_internal (real);
917 return;
918 }
919
920 g_thread_pool_wakeup_and_stop_all (real);
921 }
922
923 /* The last thread should cleanup the pool */
924 real->waiting = FALSE;
925 g_async_queue_unlock (real->queue);
926 }
927
928 static void
g_thread_pool_free_internal(GRealThreadPool * pool)929 g_thread_pool_free_internal (GRealThreadPool* pool)
930 {
931 g_return_if_fail (pool);
932 g_return_if_fail (pool->running == FALSE);
933 g_return_if_fail (pool->num_threads == 0);
934
935 g_async_queue_unref (pool->queue);
936 g_cond_clear (&pool->cond);
937
938 g_free (pool);
939 }
940
941 static void
g_thread_pool_wakeup_and_stop_all(GRealThreadPool * pool)942 g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool)
943 {
944 guint i;
945
946 g_return_if_fail (pool);
947 g_return_if_fail (pool->running == FALSE);
948 g_return_if_fail (pool->num_threads != 0);
949
950 pool->immediate = TRUE;
951
952 /*
953 * So here we're sending bogus data to the pool threads, which
954 * should cause them each to wake up, and check the above
955 * pool->immediate condition. However we don't want that
956 * data to be sorted (since it'll crash the sorter).
957 */
958 for (i = 0; i < pool->num_threads; i++)
959 g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
960 }
961
962 /**
963 * g_thread_pool_set_max_unused_threads:
964 * @max_threads: maximal number of unused threads
965 *
966 * Sets the maximal number of unused threads to @max_threads.
967 * If @max_threads is -1, no limit is imposed on the number
968 * of unused threads.
969 *
970 * The default value is 2.
971 */
972 void
g_thread_pool_set_max_unused_threads(gint max_threads)973 g_thread_pool_set_max_unused_threads (gint max_threads)
974 {
975 g_return_if_fail (max_threads >= -1);
976
977 g_atomic_int_set (&max_unused_threads, max_threads);
978
979 if (max_threads != -1)
980 {
981 max_threads -= g_atomic_int_get (&unused_threads);
982 if (max_threads < 0)
983 {
984 g_atomic_int_set (&kill_unused_threads, -max_threads);
985 g_atomic_int_inc (&wakeup_thread_serial);
986
987 g_async_queue_lock (unused_thread_queue);
988
989 do
990 {
991 g_async_queue_push_unlocked (unused_thread_queue,
992 wakeup_thread_marker);
993 }
994 while (++max_threads);
995
996 g_async_queue_unlock (unused_thread_queue);
997 }
998 }
999 }
1000
1001 /**
1002 * g_thread_pool_get_max_unused_threads:
1003 *
1004 * Returns the maximal allowed number of unused threads.
1005 *
1006 * Returns: the maximal number of unused threads
1007 */
1008 gint
g_thread_pool_get_max_unused_threads(void)1009 g_thread_pool_get_max_unused_threads (void)
1010 {
1011 return g_atomic_int_get (&max_unused_threads);
1012 }
1013
1014 /**
1015 * g_thread_pool_get_num_unused_threads:
1016 *
1017 * Returns the number of currently unused threads.
1018 *
1019 * Returns: the number of currently unused threads
1020 */
1021 guint
g_thread_pool_get_num_unused_threads(void)1022 g_thread_pool_get_num_unused_threads (void)
1023 {
1024 return (guint) g_atomic_int_get (&unused_threads);
1025 }
1026
1027 /**
1028 * g_thread_pool_stop_unused_threads:
1029 *
1030 * Stops all currently unused threads. This does not change the
1031 * maximal number of unused threads. This function can be used to
1032 * regularly stop all unused threads e.g. from g_timeout_add().
1033 */
1034 void
g_thread_pool_stop_unused_threads(void)1035 g_thread_pool_stop_unused_threads (void)
1036 {
1037 guint oldval;
1038
1039 oldval = g_thread_pool_get_max_unused_threads ();
1040
1041 g_thread_pool_set_max_unused_threads (0);
1042 g_thread_pool_set_max_unused_threads (oldval);
1043 }
1044
1045 /**
1046 * g_thread_pool_set_sort_function:
1047 * @pool: a #GThreadPool
1048 * @func: the #GCompareDataFunc used to sort the list of tasks.
1049 * This function is passed two tasks. It should return
1050 * 0 if the order in which they are handled does not matter,
1051 * a negative value if the first task should be processed before
1052 * the second or a positive value if the second task should be
1053 * processed first.
1054 * @user_data: user data passed to @func
1055 *
1056 * Sets the function used to sort the list of tasks. This allows the
1057 * tasks to be processed by a priority determined by @func, and not
1058 * just in the order in which they were added to the pool.
1059 *
1060 * Note, if the maximum number of threads is more than 1, the order
1061 * that threads are executed cannot be guaranteed 100%. Threads are
1062 * scheduled by the operating system and are executed at random. It
1063 * cannot be assumed that threads are executed in the order they are
1064 * created.
1065 *
1066 * Since: 2.10
1067 */
1068 void
g_thread_pool_set_sort_function(GThreadPool * pool,GCompareDataFunc func,gpointer user_data)1069 g_thread_pool_set_sort_function (GThreadPool *pool,
1070 GCompareDataFunc func,
1071 gpointer user_data)
1072 {
1073 GRealThreadPool *real;
1074
1075 real = (GRealThreadPool*) pool;
1076
1077 g_return_if_fail (real);
1078 g_return_if_fail (real->running);
1079
1080 g_async_queue_lock (real->queue);
1081
1082 real->sort_func = func;
1083 real->sort_user_data = user_data;
1084
1085 if (func)
1086 g_async_queue_sort_unlocked (real->queue,
1087 real->sort_func,
1088 real->sort_user_data);
1089
1090 g_async_queue_unlock (real->queue);
1091 }
1092
1093 /**
1094 * g_thread_pool_move_to_front:
1095 * @pool: a #GThreadPool
1096 * @data: an unprocessed item in the pool
1097 *
1098 * Moves the item to the front of the queue of unprocessed
1099 * items, so that it will be processed next.
1100 *
1101 * Returns: %TRUE if the item was found and moved
1102 *
1103 * Since: 2.46
1104 */
1105 gboolean
g_thread_pool_move_to_front(GThreadPool * pool,gpointer data)1106 g_thread_pool_move_to_front (GThreadPool *pool,
1107 gpointer data)
1108 {
1109 GRealThreadPool *real = (GRealThreadPool*) pool;
1110 gboolean found;
1111
1112 g_async_queue_lock (real->queue);
1113
1114 found = g_async_queue_remove_unlocked (real->queue, data);
1115 if (found)
1116 g_async_queue_push_front_unlocked (real->queue, data);
1117
1118 g_async_queue_unlock (real->queue);
1119
1120 return found;
1121 }
1122
1123 /**
1124 * g_thread_pool_set_max_idle_time:
1125 * @interval: the maximum @interval (in milliseconds)
1126 * a thread can be idle
1127 *
1128 * This function will set the maximum @interval that a thread
1129 * waiting in the pool for new tasks can be idle for before
1130 * being stopped. This function is similar to calling
1131 * g_thread_pool_stop_unused_threads() on a regular timeout,
1132 * except this is done on a per thread basis.
1133 *
1134 * By setting @interval to 0, idle threads will not be stopped.
1135 *
1136 * The default value is 15000 (15 seconds).
1137 *
1138 * Since: 2.10
1139 */
1140 void
g_thread_pool_set_max_idle_time(guint interval)1141 g_thread_pool_set_max_idle_time (guint interval)
1142 {
1143 guint i;
1144
1145 g_atomic_int_set (&max_idle_time, interval);
1146
1147 i = (guint) g_atomic_int_get (&unused_threads);
1148 if (i > 0)
1149 {
1150 g_atomic_int_inc (&wakeup_thread_serial);
1151 g_async_queue_lock (unused_thread_queue);
1152
1153 do
1154 {
1155 g_async_queue_push_unlocked (unused_thread_queue,
1156 wakeup_thread_marker);
1157 }
1158 while (--i);
1159
1160 g_async_queue_unlock (unused_thread_queue);
1161 }
1162 }
1163
1164 /**
1165 * g_thread_pool_get_max_idle_time:
1166 *
1167 * This function will return the maximum @interval that a
1168 * thread will wait in the thread pool for new tasks before
1169 * being stopped.
1170 *
1171 * If this function returns 0, threads waiting in the thread
1172 * pool for new work are not stopped.
1173 *
1174 * Returns: the maximum @interval (milliseconds) to wait
1175 * for new tasks in the thread pool before stopping the
1176 * thread
1177 *
1178 * Since: 2.10
1179 */
1180 guint
g_thread_pool_get_max_idle_time(void)1181 g_thread_pool_get_max_idle_time (void)
1182 {
1183 return (guint) g_atomic_int_get (&max_idle_time);
1184 }
1185