• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GThreadPool: thread pool implementation.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
19  */
20 
21 /*
22  * MT safe
23  */
24 
25 #include "config.h"
26 
27 #include "gthreadpool.h"
28 
29 #include "gasyncqueue.h"
30 #include "gasyncqueueprivate.h"
31 #include "gmain.h"
32 #include "gtestutils.h"
33 #include "gthreadprivate.h"
34 #include "gtimer.h"
35 #include "gutils.h"
36 
37 /**
38  * SECTION:thread_pools
39  * @title: Thread Pools
40  * @short_description: pools of threads to execute work concurrently
41  * @see_also: #GThread
42  *
43  * Sometimes you wish to asynchronously fork out the execution of work
44  * and continue working in your own thread. If that will happen often,
45  * the overhead of starting and destroying a thread each time might be
46  * too high. In such cases reusing already started threads seems like a
47  * good idea. And it indeed is, but implementing this can be tedious
48  * and error-prone.
49  *
50  * Therefore GLib provides thread pools for your convenience. An added
51  * advantage is, that the threads can be shared between the different
52  * subsystems of your program, when they are using GLib.
53  *
54  * To create a new thread pool, you use g_thread_pool_new().
55  * It is destroyed by g_thread_pool_free().
56  *
57  * If you want to execute a certain task within a thread pool,
58  * you call g_thread_pool_push().
59  *
60  * To get the current number of running threads you call
61  * g_thread_pool_get_num_threads(). To get the number of still
62  * unprocessed tasks you call g_thread_pool_unprocessed(). To control
63  * the maximal number of threads for a thread pool, you use
64  * g_thread_pool_get_max_threads() and g_thread_pool_set_max_threads().
65  *
66  * Finally you can control the number of unused threads, that are kept
67  * alive by GLib for future use. The current number can be fetched with
68  * g_thread_pool_get_num_unused_threads(). The maximal number can be
69  * controlled by g_thread_pool_get_max_unused_threads() and
70  * g_thread_pool_set_max_unused_threads(). All currently unused threads
71  * can be stopped by calling g_thread_pool_stop_unused_threads().
72  */
73 
74 #define DEBUG_MSG(x)
75 /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");    */
76 
77 typedef struct _GRealThreadPool GRealThreadPool;
78 
79 /**
80  * GThreadPool:
81  * @func: the function to execute in the threads of this pool
82  * @user_data: the user data for the threads of this pool
83  * @exclusive: are all threads exclusive to this pool
84  *
85  * The #GThreadPool struct represents a thread pool. It has three
86  * public read-only members, but the underlying struct is bigger,
87  * so you must not copy this struct.
88  */
89 struct _GRealThreadPool
90 {
91   GThreadPool pool;
92   GAsyncQueue *queue;
93   GCond cond;
94   gint max_threads;
95   guint num_threads;
96   gboolean running;
97   gboolean immediate;
98   gboolean waiting;
99   GCompareDataFunc sort_func;
100   gpointer sort_user_data;
101 };
102 
103 /* The following is just an address to mark the wakeup order for a
104  * thread, it could be any address (as long, as it isn't a valid
105  * GThreadPool address)
106  */
107 static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
108 static gint wakeup_thread_serial = 0;
109 
110 /* Here all unused threads are waiting  */
111 static GAsyncQueue *unused_thread_queue = NULL;
112 static gint unused_threads = 0;
113 static gint max_unused_threads = 2;
114 static gint kill_unused_threads = 0;
115 static guint max_idle_time = 15 * 1000;
116 
117 static GThreadSchedulerSettings shared_thread_scheduler_settings;
118 static gboolean have_shared_thread_scheduler_settings = FALSE;
119 
120 typedef struct
121 {
122   /* Either thread or error are set in the end. Both transfer-full. */
123   GThreadPool *pool;
124   GThread *thread;
125   GError *error;
126 } SpawnThreadData;
127 
128 static GCond spawn_thread_cond;
129 static GAsyncQueue *spawn_thread_queue;
130 
131 static void             g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
132                                                            gpointer          data);
133 static void             g_thread_pool_free_internal       (GRealThreadPool  *pool);
134 static gpointer         g_thread_pool_thread_proxy        (gpointer          data);
135 static gboolean         g_thread_pool_start_thread        (GRealThreadPool  *pool,
136                                                            GError          **error);
137 static void             g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
138 static GRealThreadPool* g_thread_pool_wait_for_new_pool   (void);
139 static gpointer         g_thread_pool_wait_for_new_task   (GRealThreadPool  *pool);
140 
141 static void
g_thread_pool_queue_push_unlocked(GRealThreadPool * pool,gpointer data)142 g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
143                                    gpointer         data)
144 {
145   if (pool->sort_func)
146     g_async_queue_push_sorted_unlocked (pool->queue,
147                                         data,
148                                         pool->sort_func,
149                                         pool->sort_user_data);
150   else
151     g_async_queue_push_unlocked (pool->queue, data);
152 }
153 
154 static GRealThreadPool*
g_thread_pool_wait_for_new_pool(void)155 g_thread_pool_wait_for_new_pool (void)
156 {
157   GRealThreadPool *pool;
158   gint local_wakeup_thread_serial;
159   guint local_max_unused_threads;
160   gint local_max_idle_time;
161   gint last_wakeup_thread_serial;
162   gboolean have_relayed_thread_marker = FALSE;
163 
164   local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
165   local_max_idle_time = g_atomic_int_get (&max_idle_time);
166   last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
167 
168   g_atomic_int_inc (&unused_threads);
169 
170   do
171     {
172       if ((guint) g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
173         {
174           /* If this is a superfluous thread, stop it. */
175           pool = NULL;
176         }
177       else if (local_max_idle_time > 0)
178         {
179           /* If a maximal idle time is given, wait for the given time. */
180           DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
181                       g_thread_self (), local_max_idle_time / 1000.0));
182 
183           pool = g_async_queue_timeout_pop (unused_thread_queue,
184 					    local_max_idle_time * 1000);
185         }
186       else
187         {
188           /* If no maximal idle time is given, wait indefinitely. */
189           DEBUG_MSG (("thread %p waiting in global pool.", g_thread_self ()));
190           pool = g_async_queue_pop (unused_thread_queue);
191         }
192 
193       if (pool == wakeup_thread_marker)
194         {
195           local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
196           if (last_wakeup_thread_serial == local_wakeup_thread_serial)
197             {
198               if (!have_relayed_thread_marker)
199               {
200                 /* If this wakeup marker has been received for
201                  * the second time, relay it.
202                  */
203                 DEBUG_MSG (("thread %p relaying wakeup message to "
204                             "waiting thread with lower serial.",
205                             g_thread_self ()));
206 
207                 g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
208                 have_relayed_thread_marker = TRUE;
209 
210                 /* If a wakeup marker has been relayed, this thread
211                  * will get out of the way for 100 microseconds to
212                  * avoid receiving this marker again.
213                  */
214                 g_usleep (100);
215               }
216             }
217           else
218             {
219               if (g_atomic_int_add (&kill_unused_threads, -1) > 0)
220                 {
221                   pool = NULL;
222                   break;
223                 }
224 
225               DEBUG_MSG (("thread %p updating to new limits.",
226                           g_thread_self ()));
227 
228               local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
229               local_max_idle_time = g_atomic_int_get (&max_idle_time);
230               last_wakeup_thread_serial = local_wakeup_thread_serial;
231 
232               have_relayed_thread_marker = FALSE;
233             }
234         }
235     }
236   while (pool == wakeup_thread_marker);
237 
238   g_atomic_int_add (&unused_threads, -1);
239 
240   return pool;
241 }
242 
243 static gpointer
g_thread_pool_wait_for_new_task(GRealThreadPool * pool)244 g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
245 {
246   gpointer task = NULL;
247 
248   if (pool->running || (!pool->immediate &&
249                         g_async_queue_length_unlocked (pool->queue) > 0))
250     {
251       /* This thread pool is still active. */
252       if (pool->max_threads != -1 && pool->num_threads > (guint) pool->max_threads)
253         {
254           /* This is a superfluous thread, so it goes to the global pool. */
255           DEBUG_MSG (("superfluous thread %p in pool %p.",
256                       g_thread_self (), pool));
257         }
258       else if (pool->pool.exclusive)
259         {
260           /* Exclusive threads stay attached to the pool. */
261           task = g_async_queue_pop_unlocked (pool->queue);
262 
263           DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
264                       "(%d running, %d unprocessed).",
265                       g_thread_self (), pool, pool->num_threads,
266                       g_async_queue_length_unlocked (pool->queue)));
267         }
268       else
269         {
270           /* A thread will wait for new tasks for at most 1/2
271            * second before going to the global pool.
272            */
273           DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
274                       "(%d running, %d unprocessed).",
275                       g_thread_self (), pool, pool->num_threads,
276                       g_async_queue_length_unlocked (pool->queue)));
277 
278           task = g_async_queue_timeout_pop_unlocked (pool->queue,
279 						     G_USEC_PER_SEC / 2);
280         }
281     }
282   else
283     {
284       /* This thread pool is inactive, it will no longer process tasks. */
285       DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
286                   "(running: %s, immediate: %s, len: %d).",
287                   pool, g_thread_self (),
288                   pool->running ? "true" : "false",
289                   pool->immediate ? "true" : "false",
290                   g_async_queue_length_unlocked (pool->queue)));
291     }
292 
293   return task;
294 }
295 
296 static gpointer
g_thread_pool_spawn_thread(gpointer data)297 g_thread_pool_spawn_thread (gpointer data)
298 {
299   while (TRUE)
300     {
301       SpawnThreadData *spawn_thread_data;
302       GThread *thread = NULL;
303       GError *error = NULL;
304       const gchar *prgname = g_get_prgname ();
305       gchar name[16] = "pool";
306 
307       if (prgname)
308         g_snprintf (name, sizeof (name), "pool-%s", prgname);
309 
310       g_async_queue_lock (spawn_thread_queue);
311       /* Spawn a new thread for the given pool and wake the requesting thread
312        * up again with the result. This new thread will have the scheduler
313        * settings inherited from this thread and in extension of the thread
314        * that created the first non-exclusive thread-pool. */
315       spawn_thread_data = g_async_queue_pop_unlocked (spawn_thread_queue);
316       thread = g_thread_try_new (name, g_thread_pool_thread_proxy, spawn_thread_data->pool, &error);
317 
318       spawn_thread_data->thread = g_steal_pointer (&thread);
319       spawn_thread_data->error = g_steal_pointer (&error);
320 
321       g_cond_broadcast (&spawn_thread_cond);
322       g_async_queue_unlock (spawn_thread_queue);
323     }
324 
325   return NULL;
326 }
327 
328 static gpointer
g_thread_pool_thread_proxy(gpointer data)329 g_thread_pool_thread_proxy (gpointer data)
330 {
331   GRealThreadPool *pool;
332 
333   pool = data;
334 
335   DEBUG_MSG (("thread %p started for pool %p.", g_thread_self (), pool));
336 
337   g_async_queue_lock (pool->queue);
338 
339   while (TRUE)
340     {
341       gpointer task;
342 
343       task = g_thread_pool_wait_for_new_task (pool);
344       if (task)
345         {
346           if (pool->running || !pool->immediate)
347             {
348               /* A task was received and the thread pool is active,
349                * so execute the function.
350                */
351               g_async_queue_unlock (pool->queue);
352               DEBUG_MSG (("thread %p in pool %p calling func.",
353                           g_thread_self (), pool));
354               pool->pool.func (task, pool->pool.user_data);
355               g_async_queue_lock (pool->queue);
356             }
357         }
358       else
359         {
360           /* No task was received, so this thread goes to the global pool. */
361           gboolean free_pool = FALSE;
362 
363           DEBUG_MSG (("thread %p leaving pool %p for global pool.",
364                       g_thread_self (), pool));
365           pool->num_threads--;
366 
367           if (!pool->running)
368             {
369               if (!pool->waiting)
370                 {
371                   if (pool->num_threads == 0)
372                     {
373                       /* If the pool is not running and no other
374                        * thread is waiting for this thread pool to
375                        * finish and this is the last thread of this
376                        * pool, free the pool.
377                        */
378                       free_pool = TRUE;
379                     }
380                   else
381                     {
382                       /* If the pool is not running and no other
383                        * thread is waiting for this thread pool to
384                        * finish and this is not the last thread of
385                        * this pool and there are no tasks left in the
386                        * queue, wakeup the remaining threads.
387                        */
388                       if (g_async_queue_length_unlocked (pool->queue) ==
389                           (gint) -pool->num_threads)
390                         g_thread_pool_wakeup_and_stop_all (pool);
391                     }
392                 }
393               else if (pool->immediate ||
394                        g_async_queue_length_unlocked (pool->queue) <= 0)
395                 {
396                   /* If the pool is not running and another thread is
397                    * waiting for this thread pool to finish and there
398                    * are either no tasks left or the pool shall stop
399                    * immediately, inform the waiting thread of a change
400                    * of the thread pool state.
401                    */
402                   g_cond_broadcast (&pool->cond);
403                 }
404             }
405 
406           g_async_queue_unlock (pool->queue);
407 
408           if (free_pool)
409             g_thread_pool_free_internal (pool);
410 
411           if ((pool = g_thread_pool_wait_for_new_pool ()) == NULL)
412             break;
413 
414           g_async_queue_lock (pool->queue);
415 
416           DEBUG_MSG (("thread %p entering pool %p from global pool.",
417                       g_thread_self (), pool));
418 
419           /* pool->num_threads++ is not done here, but in
420            * g_thread_pool_start_thread to make the new started
421            * thread known to the pool before itself can do it.
422            */
423         }
424     }
425 
426   return NULL;
427 }
428 
429 static gboolean
g_thread_pool_start_thread(GRealThreadPool * pool,GError ** error)430 g_thread_pool_start_thread (GRealThreadPool  *pool,
431                             GError          **error)
432 {
433   gboolean success = FALSE;
434 
435   if (pool->max_threads != -1 && pool->num_threads >= (guint) pool->max_threads)
436     /* Enough threads are already running */
437     return TRUE;
438 
439   g_async_queue_lock (unused_thread_queue);
440 
441   if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
442     {
443       g_async_queue_push_unlocked (unused_thread_queue, pool);
444       success = TRUE;
445     }
446 
447   g_async_queue_unlock (unused_thread_queue);
448 
449   if (!success)
450     {
451       const gchar *prgname = g_get_prgname ();
452       gchar name[16] = "pool";
453       GThread *thread;
454 
455       if (prgname)
456         g_snprintf (name, sizeof (name), "pool-%s", prgname);
457 
458       /* No thread was found, we have to start a new one */
459       if (pool->pool.exclusive)
460         {
461           /* For exclusive thread-pools this is directly called from new() and
462            * we simply start new threads that inherit the scheduler settings
463            * from the current thread.
464            */
465           thread = g_thread_try_new (name, g_thread_pool_thread_proxy, pool, error);
466         }
467       else
468         {
469           /* For non-exclusive thread-pools this can be called at any time
470            * when a new thread is needed. We make sure to create a new thread
471            * here with the correct scheduler settings: either by directly
472            * providing them if supported by the GThread implementation or by
473            * going via our helper thread.
474            */
475           if (have_shared_thread_scheduler_settings)
476             {
477               thread = g_thread_new_internal (name, g_thread_proxy, g_thread_pool_thread_proxy, pool, 0, &shared_thread_scheduler_settings, error);
478             }
479           else
480             {
481               SpawnThreadData spawn_thread_data = { (GThreadPool *) pool, NULL, NULL };
482 
483               g_async_queue_lock (spawn_thread_queue);
484 
485               g_async_queue_push_unlocked (spawn_thread_queue, &spawn_thread_data);
486 
487               while (!spawn_thread_data.thread && !spawn_thread_data.error)
488                 g_cond_wait (&spawn_thread_cond, _g_async_queue_get_mutex (spawn_thread_queue));
489 
490               thread = spawn_thread_data.thread;
491               if (!thread)
492                 g_propagate_error (error, g_steal_pointer (&spawn_thread_data.error));
493               g_async_queue_unlock (spawn_thread_queue);
494             }
495         }
496 
497       if (thread == NULL)
498         return FALSE;
499 
500       g_thread_unref (thread);
501     }
502 
503   /* See comment in g_thread_pool_thread_proxy as to why this is done
504    * here and not there
505    */
506   pool->num_threads++;
507 
508   return TRUE;
509 }
510 
511 /**
512  * g_thread_pool_new:
513  * @func: a function to execute in the threads of the new thread pool
514  * @user_data: user data that is handed over to @func every time it
515  *     is called
516  * @max_threads: the maximal number of threads to execute concurrently
517  *     in  the new thread pool, -1 means no limit
518  * @exclusive: should this thread pool be exclusive?
519  * @error: return location for error, or %NULL
520  *
521  * This function creates a new thread pool.
522  *
523  * Whenever you call g_thread_pool_push(), either a new thread is
524  * created or an unused one is reused. At most @max_threads threads
525  * are running concurrently for this thread pool. @max_threads = -1
526  * allows unlimited threads to be created for this thread pool. The
527  * newly created or reused thread now executes the function @func
528  * with the two arguments. The first one is the parameter to
529  * g_thread_pool_push() and the second one is @user_data.
530  *
531  * Pass g_get_num_processors() to @max_threads to create as many threads as
532  * there are logical processors on the system. This will not pin each thread to
533  * a specific processor.
534  *
535  * The parameter @exclusive determines whether the thread pool owns
536  * all threads exclusive or shares them with other thread pools.
537  * If @exclusive is %TRUE, @max_threads threads are started
538  * immediately and they will run exclusively for this thread pool
539  * until it is destroyed by g_thread_pool_free(). If @exclusive is
540  * %FALSE, threads are created when needed and shared between all
541  * non-exclusive thread pools. This implies that @max_threads may
542  * not be -1 for exclusive thread pools. Besides, exclusive thread
543  * pools are not affected by g_thread_pool_set_max_idle_time()
544  * since their threads are never considered idle and returned to the
545  * global pool.
546  *
547  * @error can be %NULL to ignore errors, or non-%NULL to report
548  * errors. An error can only occur when @exclusive is set to %TRUE
549  * and not all @max_threads threads could be created.
550  * See #GThreadError for possible errors that may occur.
551  * Note, even in case of error a valid #GThreadPool is returned.
552  *
553  * Returns: the new #GThreadPool
554  */
555 GThreadPool *
g_thread_pool_new(GFunc func,gpointer user_data,gint max_threads,gboolean exclusive,GError ** error)556 g_thread_pool_new (GFunc      func,
557                    gpointer   user_data,
558                    gint       max_threads,
559                    gboolean   exclusive,
560                    GError   **error)
561 {
562   GRealThreadPool *retval;
563   G_LOCK_DEFINE_STATIC (init);
564 
565   g_return_val_if_fail (func, NULL);
566   g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
567   g_return_val_if_fail (max_threads >= -1, NULL);
568 
569   retval = g_new (GRealThreadPool, 1);
570 
571   retval->pool.func = func;
572   retval->pool.user_data = user_data;
573   retval->pool.exclusive = exclusive;
574   retval->queue = g_async_queue_new ();
575   g_cond_init (&retval->cond);
576   retval->max_threads = max_threads;
577   retval->num_threads = 0;
578   retval->running = TRUE;
579   retval->immediate = FALSE;
580   retval->waiting = FALSE;
581   retval->sort_func = NULL;
582   retval->sort_user_data = NULL;
583 
584   G_LOCK (init);
585   if (!unused_thread_queue)
586       unused_thread_queue = g_async_queue_new ();
587 
588   /* For the very first non-exclusive thread-pool we remember the thread
589    * scheduler settings of the thread creating the pool, if supported by
590    * the GThread implementation. This is then used for making sure that
591    * all threads created on the non-exclusive thread-pool have the same
592    * scheduler settings, and more importantly don't just inherit them
593    * from the thread that just happened to push a new task and caused
594    * a new thread to be created.
595    *
596    * Not doing so could cause real-time priority threads or otherwise
597    * threads with problematic scheduler settings to be part of the
598    * non-exclusive thread-pools.
599    *
600    * If this is not supported by the GThread implementation then we here
601    * start a thread that will inherit the scheduler settings from this
602    * very thread and whose only purpose is to spawn new threads with the
603    * same settings for use by the non-exclusive thread-pools.
604    *
605    *
606    * For non-exclusive thread-pools this is not required as all threads
607    * are created immediately below and are running forever, so they will
608    * automatically inherit the scheduler settings from this very thread.
609    */
610   if (!exclusive && !have_shared_thread_scheduler_settings && !spawn_thread_queue)
611     {
612       if (g_thread_get_scheduler_settings (&shared_thread_scheduler_settings))
613         {
614           have_shared_thread_scheduler_settings = TRUE;
615         }
616       else
617         {
618           spawn_thread_queue = g_async_queue_new ();
619           g_cond_init (&spawn_thread_cond);
620           g_thread_new ("pool-spawner", g_thread_pool_spawn_thread, NULL);
621         }
622     }
623   G_UNLOCK (init);
624 
625   if (retval->pool.exclusive)
626     {
627       g_async_queue_lock (retval->queue);
628 
629       while (retval->num_threads < (guint) retval->max_threads)
630         {
631           GError *local_error = NULL;
632 
633           if (!g_thread_pool_start_thread (retval, &local_error))
634             {
635               g_propagate_error (error, local_error);
636               break;
637             }
638         }
639 
640       g_async_queue_unlock (retval->queue);
641     }
642 
643   return (GThreadPool*) retval;
644 }
645 
646 /**
647  * g_thread_pool_push:
648  * @pool: a #GThreadPool
649  * @data: a new task for @pool
650  * @error: return location for error, or %NULL
651  *
652  * Inserts @data into the list of tasks to be executed by @pool.
653  *
654  * When the number of currently running threads is lower than the
655  * maximal allowed number of threads, a new thread is started (or
656  * reused) with the properties given to g_thread_pool_new().
657  * Otherwise, @data stays in the queue until a thread in this pool
658  * finishes its previous task and processes @data.
659  *
660  * @error can be %NULL to ignore errors, or non-%NULL to report
661  * errors. An error can only occur when a new thread couldn't be
662  * created. In that case @data is simply appended to the queue of
663  * work to do.
664  *
665  * Before version 2.32, this function did not return a success status.
666  *
667  * Returns: %TRUE on success, %FALSE if an error occurred
668  */
669 gboolean
g_thread_pool_push(GThreadPool * pool,gpointer data,GError ** error)670 g_thread_pool_push (GThreadPool  *pool,
671                     gpointer      data,
672                     GError      **error)
673 {
674   GRealThreadPool *real;
675   gboolean result;
676 
677   real = (GRealThreadPool*) pool;
678 
679   g_return_val_if_fail (real, FALSE);
680   g_return_val_if_fail (real->running, FALSE);
681 
682   result = TRUE;
683 
684   g_async_queue_lock (real->queue);
685 
686   if (g_async_queue_length_unlocked (real->queue) >= 0)
687     {
688       /* No thread is waiting in the queue */
689       GError *local_error = NULL;
690 
691       if (!g_thread_pool_start_thread (real, &local_error))
692         {
693           g_propagate_error (error, local_error);
694           result = FALSE;
695         }
696     }
697 
698   g_thread_pool_queue_push_unlocked (real, data);
699   g_async_queue_unlock (real->queue);
700 
701   return result;
702 }
703 
704 /**
705  * g_thread_pool_set_max_threads:
706  * @pool: a #GThreadPool
707  * @max_threads: a new maximal number of threads for @pool,
708  *     or -1 for unlimited
709  * @error: return location for error, or %NULL
710  *
711  * Sets the maximal allowed number of threads for @pool.
712  * A value of -1 means that the maximal number of threads
713  * is unlimited. If @pool is an exclusive thread pool, setting
714  * the maximal number of threads to -1 is not allowed.
715  *
716  * Setting @max_threads to 0 means stopping all work for @pool.
717  * It is effectively frozen until @max_threads is set to a non-zero
718  * value again.
719  *
720  * A thread is never terminated while calling @func, as supplied by
721  * g_thread_pool_new(). Instead the maximal number of threads only
722  * has effect for the allocation of new threads in g_thread_pool_push().
723  * A new thread is allocated, whenever the number of currently
724  * running threads in @pool is smaller than the maximal number.
725  *
726  * @error can be %NULL to ignore errors, or non-%NULL to report
727  * errors. An error can only occur when a new thread couldn't be
728  * created.
729  *
730  * Before version 2.32, this function did not return a success status.
731  *
732  * Returns: %TRUE on success, %FALSE if an error occurred
733  */
734 gboolean
g_thread_pool_set_max_threads(GThreadPool * pool,gint max_threads,GError ** error)735 g_thread_pool_set_max_threads (GThreadPool  *pool,
736                                gint          max_threads,
737                                GError      **error)
738 {
739   GRealThreadPool *real;
740   gint to_start;
741   gboolean result;
742 
743   real = (GRealThreadPool*) pool;
744 
745   g_return_val_if_fail (real, FALSE);
746   g_return_val_if_fail (real->running, FALSE);
747   g_return_val_if_fail (!real->pool.exclusive || max_threads != -1, FALSE);
748   g_return_val_if_fail (max_threads >= -1, FALSE);
749 
750   result = TRUE;
751 
752   g_async_queue_lock (real->queue);
753 
754   real->max_threads = max_threads;
755 
756   if (pool->exclusive)
757     to_start = real->max_threads - real->num_threads;
758   else
759     to_start = g_async_queue_length_unlocked (real->queue);
760 
761   for ( ; to_start > 0; to_start--)
762     {
763       GError *local_error = NULL;
764 
765       if (!g_thread_pool_start_thread (real, &local_error))
766         {
767           g_propagate_error (error, local_error);
768           result = FALSE;
769           break;
770         }
771     }
772 
773   g_async_queue_unlock (real->queue);
774 
775   return result;
776 }
777 
778 /**
779  * g_thread_pool_get_max_threads:
780  * @pool: a #GThreadPool
781  *
782  * Returns the maximal number of threads for @pool.
783  *
784  * Returns: the maximal number of threads
785  */
786 gint
g_thread_pool_get_max_threads(GThreadPool * pool)787 g_thread_pool_get_max_threads (GThreadPool *pool)
788 {
789   GRealThreadPool *real;
790   gint retval;
791 
792   real = (GRealThreadPool*) pool;
793 
794   g_return_val_if_fail (real, 0);
795   g_return_val_if_fail (real->running, 0);
796 
797   g_async_queue_lock (real->queue);
798   retval = real->max_threads;
799   g_async_queue_unlock (real->queue);
800 
801   return retval;
802 }
803 
804 /**
805  * g_thread_pool_get_num_threads:
806  * @pool: a #GThreadPool
807  *
808  * Returns the number of threads currently running in @pool.
809  *
810  * Returns: the number of threads currently running
811  */
812 guint
g_thread_pool_get_num_threads(GThreadPool * pool)813 g_thread_pool_get_num_threads (GThreadPool *pool)
814 {
815   GRealThreadPool *real;
816   guint retval;
817 
818   real = (GRealThreadPool*) pool;
819 
820   g_return_val_if_fail (real, 0);
821   g_return_val_if_fail (real->running, 0);
822 
823   g_async_queue_lock (real->queue);
824   retval = real->num_threads;
825   g_async_queue_unlock (real->queue);
826 
827   return retval;
828 }
829 
830 /**
831  * g_thread_pool_unprocessed:
832  * @pool: a #GThreadPool
833  *
834  * Returns the number of tasks still unprocessed in @pool.
835  *
836  * Returns: the number of unprocessed tasks
837  */
838 guint
g_thread_pool_unprocessed(GThreadPool * pool)839 g_thread_pool_unprocessed (GThreadPool *pool)
840 {
841   GRealThreadPool *real;
842   gint unprocessed;
843 
844   real = (GRealThreadPool*) pool;
845 
846   g_return_val_if_fail (real, 0);
847   g_return_val_if_fail (real->running, 0);
848 
849   unprocessed = g_async_queue_length (real->queue);
850 
851   return MAX (unprocessed, 0);
852 }
853 
854 /**
855  * g_thread_pool_free:
856  * @pool: a #GThreadPool
857  * @immediate: should @pool shut down immediately?
858  * @wait_: should the function wait for all tasks to be finished?
859  *
860  * Frees all resources allocated for @pool.
861  *
862  * If @immediate is %TRUE, no new task is processed for @pool.
863  * Otherwise @pool is not freed before the last task is processed.
864  * Note however, that no thread of this pool is interrupted while
865  * processing a task. Instead at least all still running threads
866  * can finish their tasks before the @pool is freed.
867  *
868  * If @wait_ is %TRUE, this function does not return before all
869  * tasks to be processed (dependent on @immediate, whether all
870  * or only the currently running) are ready.
871  * Otherwise this function returns immediately.
872  *
873  * After calling this function @pool must not be used anymore.
874  */
875 void
g_thread_pool_free(GThreadPool * pool,gboolean immediate,gboolean wait_)876 g_thread_pool_free (GThreadPool *pool,
877                     gboolean     immediate,
878                     gboolean     wait_)
879 {
880   GRealThreadPool *real;
881 
882   real = (GRealThreadPool*) pool;
883 
884   g_return_if_fail (real);
885   g_return_if_fail (real->running);
886 
887   /* If there's no thread allowed here, there is not much sense in
888    * not stopping this pool immediately, when it's not empty
889    */
890   g_return_if_fail (immediate ||
891                     real->max_threads != 0 ||
892                     g_async_queue_length (real->queue) == 0);
893 
894   g_async_queue_lock (real->queue);
895 
896   real->running = FALSE;
897   real->immediate = immediate;
898   real->waiting = wait_;
899 
900   if (wait_)
901     {
902       while (g_async_queue_length_unlocked (real->queue) != (gint) -real->num_threads &&
903              !(immediate && real->num_threads == 0))
904         g_cond_wait (&real->cond, _g_async_queue_get_mutex (real->queue));
905     }
906 
907   if (immediate || g_async_queue_length_unlocked (real->queue) == (gint) -real->num_threads)
908     {
909       /* No thread is currently doing something (and nothing is left
910        * to process in the queue)
911        */
912       if (real->num_threads == 0)
913         {
914           /* No threads left, we clean up */
915           g_async_queue_unlock (real->queue);
916           g_thread_pool_free_internal (real);
917           return;
918         }
919 
920       g_thread_pool_wakeup_and_stop_all (real);
921     }
922 
923   /* The last thread should cleanup the pool */
924   real->waiting = FALSE;
925   g_async_queue_unlock (real->queue);
926 }
927 
928 static void
g_thread_pool_free_internal(GRealThreadPool * pool)929 g_thread_pool_free_internal (GRealThreadPool* pool)
930 {
931   g_return_if_fail (pool);
932   g_return_if_fail (pool->running == FALSE);
933   g_return_if_fail (pool->num_threads == 0);
934 
935   g_async_queue_unref (pool->queue);
936   g_cond_clear (&pool->cond);
937 
938   g_free (pool);
939 }
940 
941 static void
g_thread_pool_wakeup_and_stop_all(GRealThreadPool * pool)942 g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool)
943 {
944   guint i;
945 
946   g_return_if_fail (pool);
947   g_return_if_fail (pool->running == FALSE);
948   g_return_if_fail (pool->num_threads != 0);
949 
950   pool->immediate = TRUE;
951 
952   /*
953    * So here we're sending bogus data to the pool threads, which
954    * should cause them each to wake up, and check the above
955    * pool->immediate condition. However we don't want that
956    * data to be sorted (since it'll crash the sorter).
957    */
958   for (i = 0; i < pool->num_threads; i++)
959     g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
960 }
961 
962 /**
963  * g_thread_pool_set_max_unused_threads:
964  * @max_threads: maximal number of unused threads
965  *
966  * Sets the maximal number of unused threads to @max_threads.
967  * If @max_threads is -1, no limit is imposed on the number
968  * of unused threads.
969  *
970  * The default value is 2.
971  */
972 void
g_thread_pool_set_max_unused_threads(gint max_threads)973 g_thread_pool_set_max_unused_threads (gint max_threads)
974 {
975   g_return_if_fail (max_threads >= -1);
976 
977   g_atomic_int_set (&max_unused_threads, max_threads);
978 
979   if (max_threads != -1)
980     {
981       max_threads -= g_atomic_int_get (&unused_threads);
982       if (max_threads < 0)
983         {
984           g_atomic_int_set (&kill_unused_threads, -max_threads);
985           g_atomic_int_inc (&wakeup_thread_serial);
986 
987           g_async_queue_lock (unused_thread_queue);
988 
989           do
990             {
991               g_async_queue_push_unlocked (unused_thread_queue,
992                                            wakeup_thread_marker);
993             }
994           while (++max_threads);
995 
996           g_async_queue_unlock (unused_thread_queue);
997         }
998     }
999 }
1000 
1001 /**
1002  * g_thread_pool_get_max_unused_threads:
1003  *
1004  * Returns the maximal allowed number of unused threads.
1005  *
1006  * Returns: the maximal number of unused threads
1007  */
1008 gint
g_thread_pool_get_max_unused_threads(void)1009 g_thread_pool_get_max_unused_threads (void)
1010 {
1011   return g_atomic_int_get (&max_unused_threads);
1012 }
1013 
1014 /**
1015  * g_thread_pool_get_num_unused_threads:
1016  *
1017  * Returns the number of currently unused threads.
1018  *
1019  * Returns: the number of currently unused threads
1020  */
1021 guint
g_thread_pool_get_num_unused_threads(void)1022 g_thread_pool_get_num_unused_threads (void)
1023 {
1024   return (guint) g_atomic_int_get (&unused_threads);
1025 }
1026 
1027 /**
1028  * g_thread_pool_stop_unused_threads:
1029  *
1030  * Stops all currently unused threads. This does not change the
1031  * maximal number of unused threads. This function can be used to
1032  * regularly stop all unused threads e.g. from g_timeout_add().
1033  */
1034 void
g_thread_pool_stop_unused_threads(void)1035 g_thread_pool_stop_unused_threads (void)
1036 {
1037   guint oldval;
1038 
1039   oldval = g_thread_pool_get_max_unused_threads ();
1040 
1041   g_thread_pool_set_max_unused_threads (0);
1042   g_thread_pool_set_max_unused_threads (oldval);
1043 }
1044 
1045 /**
1046  * g_thread_pool_set_sort_function:
1047  * @pool: a #GThreadPool
1048  * @func: the #GCompareDataFunc used to sort the list of tasks.
1049  *     This function is passed two tasks. It should return
1050  *     0 if the order in which they are handled does not matter,
1051  *     a negative value if the first task should be processed before
1052  *     the second or a positive value if the second task should be
1053  *     processed first.
1054  * @user_data: user data passed to @func
1055  *
1056  * Sets the function used to sort the list of tasks. This allows the
1057  * tasks to be processed by a priority determined by @func, and not
1058  * just in the order in which they were added to the pool.
1059  *
1060  * Note, if the maximum number of threads is more than 1, the order
1061  * that threads are executed cannot be guaranteed 100%. Threads are
1062  * scheduled by the operating system and are executed at random. It
1063  * cannot be assumed that threads are executed in the order they are
1064  * created.
1065  *
1066  * Since: 2.10
1067  */
1068 void
g_thread_pool_set_sort_function(GThreadPool * pool,GCompareDataFunc func,gpointer user_data)1069 g_thread_pool_set_sort_function (GThreadPool      *pool,
1070                                  GCompareDataFunc  func,
1071                                  gpointer          user_data)
1072 {
1073   GRealThreadPool *real;
1074 
1075   real = (GRealThreadPool*) pool;
1076 
1077   g_return_if_fail (real);
1078   g_return_if_fail (real->running);
1079 
1080   g_async_queue_lock (real->queue);
1081 
1082   real->sort_func = func;
1083   real->sort_user_data = user_data;
1084 
1085   if (func)
1086     g_async_queue_sort_unlocked (real->queue,
1087                                  real->sort_func,
1088                                  real->sort_user_data);
1089 
1090   g_async_queue_unlock (real->queue);
1091 }
1092 
1093 /**
1094  * g_thread_pool_move_to_front:
1095  * @pool: a #GThreadPool
1096  * @data: an unprocessed item in the pool
1097  *
1098  * Moves the item to the front of the queue of unprocessed
1099  * items, so that it will be processed next.
1100  *
1101  * Returns: %TRUE if the item was found and moved
1102  *
1103  * Since: 2.46
1104  */
1105 gboolean
g_thread_pool_move_to_front(GThreadPool * pool,gpointer data)1106 g_thread_pool_move_to_front (GThreadPool *pool,
1107                              gpointer     data)
1108 {
1109   GRealThreadPool *real = (GRealThreadPool*) pool;
1110   gboolean found;
1111 
1112   g_async_queue_lock (real->queue);
1113 
1114   found = g_async_queue_remove_unlocked (real->queue, data);
1115   if (found)
1116     g_async_queue_push_front_unlocked (real->queue, data);
1117 
1118   g_async_queue_unlock (real->queue);
1119 
1120   return found;
1121 }
1122 
1123 /**
1124  * g_thread_pool_set_max_idle_time:
1125  * @interval: the maximum @interval (in milliseconds)
1126  *     a thread can be idle
1127  *
1128  * This function will set the maximum @interval that a thread
1129  * waiting in the pool for new tasks can be idle for before
1130  * being stopped. This function is similar to calling
1131  * g_thread_pool_stop_unused_threads() on a regular timeout,
1132  * except this is done on a per thread basis.
1133  *
1134  * By setting @interval to 0, idle threads will not be stopped.
1135  *
1136  * The default value is 15000 (15 seconds).
1137  *
1138  * Since: 2.10
1139  */
1140 void
g_thread_pool_set_max_idle_time(guint interval)1141 g_thread_pool_set_max_idle_time (guint interval)
1142 {
1143   guint i;
1144 
1145   g_atomic_int_set (&max_idle_time, interval);
1146 
1147   i = (guint) g_atomic_int_get (&unused_threads);
1148   if (i > 0)
1149     {
1150       g_atomic_int_inc (&wakeup_thread_serial);
1151       g_async_queue_lock (unused_thread_queue);
1152 
1153       do
1154         {
1155           g_async_queue_push_unlocked (unused_thread_queue,
1156                                        wakeup_thread_marker);
1157         }
1158       while (--i);
1159 
1160       g_async_queue_unlock (unused_thread_queue);
1161     }
1162 }
1163 
1164 /**
1165  * g_thread_pool_get_max_idle_time:
1166  *
1167  * This function will return the maximum @interval that a
1168  * thread will wait in the thread pool for new tasks before
1169  * being stopped.
1170  *
1171  * If this function returns 0, threads waiting in the thread
1172  * pool for new work are not stopped.
1173  *
1174  * Returns: the maximum @interval (milliseconds) to wait
1175  *     for new tasks in the thread pool before stopping the
1176  *     thread
1177  *
1178  * Since: 2.10
1179  */
1180 guint
g_thread_pool_get_max_idle_time(void)1181 g_thread_pool_get_max_idle_time (void)
1182 {
1183   return (guint) g_atomic_int_get (&max_idle_time);
1184 }
1185