• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GThreadPool: thread pool implementation.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
19  */
20 
21 /*
22  * MT safe
23  */
24 
25 #include "config.h"
26 
27 #include "gthreadpool.h"
28 
29 #include "gasyncqueue.h"
30 #include "gasyncqueueprivate.h"
31 #include "gmain.h"
32 #include "gtestutils.h"
33 #include "gtimer.h"
34 #include "gutils.h"
35 
36 /**
37  * SECTION:thread_pools
38  * @title: Thread Pools
39  * @short_description: pools of threads to execute work concurrently
40  * @see_also: #GThread
41  *
42  * Sometimes you wish to asynchronously fork out the execution of work
43  * and continue working in your own thread. If that will happen often,
44  * the overhead of starting and destroying a thread each time might be
45  * too high. In such cases reusing already started threads seems like a
46  * good idea. And it indeed is, but implementing this can be tedious
47  * and error-prone.
48  *
49  * Therefore GLib provides thread pools for your convenience. An added
50  * advantage is, that the threads can be shared between the different
51  * subsystems of your program, when they are using GLib.
52  *
53  * To create a new thread pool, you use g_thread_pool_new().
54  * It is destroyed by g_thread_pool_free().
55  *
56  * If you want to execute a certain task within a thread pool,
57  * you call g_thread_pool_push().
58  *
59  * To get the current number of running threads you call
60  * g_thread_pool_get_num_threads(). To get the number of still
61  * unprocessed tasks you call g_thread_pool_unprocessed(). To control
62  * the maximal number of threads for a thread pool, you use
63  * g_thread_pool_get_max_threads() and g_thread_pool_set_max_threads().
64  *
65  * Finally you can control the number of unused threads, that are kept
66  * alive by GLib for future use. The current number can be fetched with
67  * g_thread_pool_get_num_unused_threads(). The maximal number can be
68  * controlled by g_thread_pool_get_max_unused_threads() and
69  * g_thread_pool_set_max_unused_threads(). All currently unused threads
70  * can be stopped by calling g_thread_pool_stop_unused_threads().
71  */
72 
73 #define DEBUG_MSG(x)
74 /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");    */
75 
76 typedef struct _GRealThreadPool GRealThreadPool;
77 
78 /**
79  * GThreadPool:
80  * @func: the function to execute in the threads of this pool
81  * @user_data: the user data for the threads of this pool
82  * @exclusive: are all threads exclusive to this pool
83  *
84  * The #GThreadPool struct represents a thread pool. It has three
85  * public read-only members, but the underlying struct is bigger,
86  * so you must not copy this struct.
87  */
88 struct _GRealThreadPool
89 {
90   GThreadPool pool;
91   GAsyncQueue *queue;
92   GCond cond;
93   gint max_threads;
94   guint num_threads;
95   gboolean running;
96   gboolean immediate;
97   gboolean waiting;
98   GCompareDataFunc sort_func;
99   gpointer sort_user_data;
100 };
101 
102 /* The following is just an address to mark the wakeup order for a
103  * thread, it could be any address (as long, as it isn't a valid
104  * GThreadPool address)
105  */
106 static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
107 static gint wakeup_thread_serial = 0;
108 
109 /* Here all unused threads are waiting  */
110 static GAsyncQueue *unused_thread_queue = NULL;
111 static gint unused_threads = 0;
112 static gint max_unused_threads = 2;
113 static gint kill_unused_threads = 0;
114 static guint max_idle_time = 15 * 1000;
115 
116 static void             g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
117                                                            gpointer          data);
118 static void             g_thread_pool_free_internal       (GRealThreadPool  *pool);
119 static gpointer         g_thread_pool_thread_proxy        (gpointer          data);
120 static gboolean         g_thread_pool_start_thread        (GRealThreadPool  *pool,
121                                                            GError          **error);
122 static void             g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
123 static GRealThreadPool* g_thread_pool_wait_for_new_pool   (void);
124 static gpointer         g_thread_pool_wait_for_new_task   (GRealThreadPool  *pool);
125 
126 static void
g_thread_pool_queue_push_unlocked(GRealThreadPool * pool,gpointer data)127 g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
128                                    gpointer         data)
129 {
130   if (pool->sort_func)
131     g_async_queue_push_sorted_unlocked (pool->queue,
132                                         data,
133                                         pool->sort_func,
134                                         pool->sort_user_data);
135   else
136     g_async_queue_push_unlocked (pool->queue, data);
137 }
138 
139 static GRealThreadPool*
g_thread_pool_wait_for_new_pool(void)140 g_thread_pool_wait_for_new_pool (void)
141 {
142   GRealThreadPool *pool;
143   gint local_wakeup_thread_serial;
144   guint local_max_unused_threads;
145   gint local_max_idle_time;
146   gint last_wakeup_thread_serial;
147   gboolean have_relayed_thread_marker = FALSE;
148 
149   local_max_unused_threads = g_atomic_int_get (&max_unused_threads);
150   local_max_idle_time = g_atomic_int_get (&max_idle_time);
151   last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
152 
153   g_atomic_int_inc (&unused_threads);
154 
155   do
156     {
157       if ((guint) g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
158         {
159           /* If this is a superfluous thread, stop it. */
160           pool = NULL;
161         }
162       else if (local_max_idle_time > 0)
163         {
164           /* If a maximal idle time is given, wait for the given time. */
165           DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
166                       g_thread_self (), local_max_idle_time / 1000.0));
167 
168           pool = g_async_queue_timeout_pop (unused_thread_queue,
169 					    local_max_idle_time * 1000);
170         }
171       else
172         {
173           /* If no maximal idle time is given, wait indefinitely. */
174           DEBUG_MSG (("thread %p waiting in global pool.", g_thread_self ()));
175           pool = g_async_queue_pop (unused_thread_queue);
176         }
177 
178       if (pool == wakeup_thread_marker)
179         {
180           local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
181           if (last_wakeup_thread_serial == local_wakeup_thread_serial)
182             {
183               if (!have_relayed_thread_marker)
184               {
185                 /* If this wakeup marker has been received for
186                  * the second time, relay it.
187                  */
188                 DEBUG_MSG (("thread %p relaying wakeup message to "
189                             "waiting thread with lower serial.",
190                             g_thread_self ()));
191 
192                 g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
193                 have_relayed_thread_marker = TRUE;
194 
195                 /* If a wakeup marker has been relayed, this thread
196                  * will get out of the way for 100 microseconds to
197                  * avoid receiving this marker again.
198                  */
199                 g_usleep (100);
200               }
201             }
202           else
203             {
204               if (g_atomic_int_add (&kill_unused_threads, -1) > 0)
205                 {
206                   pool = NULL;
207                   break;
208                 }
209 
210               DEBUG_MSG (("thread %p updating to new limits.",
211                           g_thread_self ()));
212 
213               local_max_unused_threads = g_atomic_int_get (&max_unused_threads);
214               local_max_idle_time = g_atomic_int_get (&max_idle_time);
215               last_wakeup_thread_serial = local_wakeup_thread_serial;
216 
217               have_relayed_thread_marker = FALSE;
218             }
219         }
220     }
221   while (pool == wakeup_thread_marker);
222 
223   g_atomic_int_add (&unused_threads, -1);
224 
225   return pool;
226 }
227 
228 static gpointer
g_thread_pool_wait_for_new_task(GRealThreadPool * pool)229 g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
230 {
231   gpointer task = NULL;
232 
233   if (pool->running || (!pool->immediate &&
234                         g_async_queue_length_unlocked (pool->queue) > 0))
235     {
236       /* This thread pool is still active. */
237       if (pool->max_threads != -1 && pool->num_threads > (guint) pool->max_threads)
238         {
239           /* This is a superfluous thread, so it goes to the global pool. */
240           DEBUG_MSG (("superfluous thread %p in pool %p.",
241                       g_thread_self (), pool));
242         }
243       else if (pool->pool.exclusive)
244         {
245           /* Exclusive threads stay attached to the pool. */
246           task = g_async_queue_pop_unlocked (pool->queue);
247 
248           DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
249                       "(%d running, %d unprocessed).",
250                       g_thread_self (), pool, pool->num_threads,
251                       g_async_queue_length_unlocked (pool->queue)));
252         }
253       else
254         {
255           /* A thread will wait for new tasks for at most 1/2
256            * second before going to the global pool.
257            */
258           DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
259                       "(%d running, %d unprocessed).",
260                       g_thread_self (), pool, pool->num_threads,
261                       g_async_queue_length_unlocked (pool->queue)));
262 
263           task = g_async_queue_timeout_pop_unlocked (pool->queue,
264 						     G_USEC_PER_SEC / 2);
265         }
266     }
267   else
268     {
269       /* This thread pool is inactive, it will no longer process tasks. */
270       DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
271                   "(running: %s, immediate: %s, len: %d).",
272                   pool, g_thread_self (),
273                   pool->running ? "true" : "false",
274                   pool->immediate ? "true" : "false",
275                   g_async_queue_length_unlocked (pool->queue)));
276     }
277 
278   return task;
279 }
280 
281 
282 static gpointer
g_thread_pool_thread_proxy(gpointer data)283 g_thread_pool_thread_proxy (gpointer data)
284 {
285   GRealThreadPool *pool;
286 
287   pool = data;
288 
289   DEBUG_MSG (("thread %p started for pool %p.", g_thread_self (), pool));
290 
291   g_async_queue_lock (pool->queue);
292 
293   while (TRUE)
294     {
295       gpointer task;
296 
297       task = g_thread_pool_wait_for_new_task (pool);
298       if (task)
299         {
300           if (pool->running || !pool->immediate)
301             {
302               /* A task was received and the thread pool is active,
303                * so execute the function.
304                */
305               g_async_queue_unlock (pool->queue);
306               DEBUG_MSG (("thread %p in pool %p calling func.",
307                           g_thread_self (), pool));
308               pool->pool.func (task, pool->pool.user_data);
309               g_async_queue_lock (pool->queue);
310             }
311         }
312       else
313         {
314           /* No task was received, so this thread goes to the global pool. */
315           gboolean free_pool = FALSE;
316 
317           DEBUG_MSG (("thread %p leaving pool %p for global pool.",
318                       g_thread_self (), pool));
319           pool->num_threads--;
320 
321           if (!pool->running)
322             {
323               if (!pool->waiting)
324                 {
325                   if (pool->num_threads == 0)
326                     {
327                       /* If the pool is not running and no other
328                        * thread is waiting for this thread pool to
329                        * finish and this is the last thread of this
330                        * pool, free the pool.
331                        */
332                       free_pool = TRUE;
333                     }
334                   else
335                     {
336                       /* If the pool is not running and no other
337                        * thread is waiting for this thread pool to
338                        * finish and this is not the last thread of
339                        * this pool and there are no tasks left in the
340                        * queue, wakeup the remaining threads.
341                        */
342                       if (g_async_queue_length_unlocked (pool->queue) ==
343                           (gint) -pool->num_threads)
344                         g_thread_pool_wakeup_and_stop_all (pool);
345                     }
346                 }
347               else if (pool->immediate ||
348                        g_async_queue_length_unlocked (pool->queue) <= 0)
349                 {
350                   /* If the pool is not running and another thread is
351                    * waiting for this thread pool to finish and there
352                    * are either no tasks left or the pool shall stop
353                    * immediately, inform the waiting thread of a change
354                    * of the thread pool state.
355                    */
356                   g_cond_broadcast (&pool->cond);
357                 }
358             }
359 
360           g_async_queue_unlock (pool->queue);
361 
362           if (free_pool)
363             g_thread_pool_free_internal (pool);
364 
365           if ((pool = g_thread_pool_wait_for_new_pool ()) == NULL)
366             break;
367 
368           g_async_queue_lock (pool->queue);
369 
370           DEBUG_MSG (("thread %p entering pool %p from global pool.",
371                       g_thread_self (), pool));
372 
373           /* pool->num_threads++ is not done here, but in
374            * g_thread_pool_start_thread to make the new started
375            * thread known to the pool before itself can do it.
376            */
377         }
378     }
379 
380   return NULL;
381 }
382 
383 static gboolean
g_thread_pool_start_thread(GRealThreadPool * pool,GError ** error)384 g_thread_pool_start_thread (GRealThreadPool  *pool,
385                             GError          **error)
386 {
387   gboolean success = FALSE;
388 
389   if (pool->max_threads != -1 && pool->num_threads >= (guint) pool->max_threads)
390     /* Enough threads are already running */
391     return TRUE;
392 
393   g_async_queue_lock (unused_thread_queue);
394 
395   if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
396     {
397       g_async_queue_push_unlocked (unused_thread_queue, pool);
398       success = TRUE;
399     }
400 
401   g_async_queue_unlock (unused_thread_queue);
402 
403   if (!success)
404     {
405       const gchar *prgname = g_get_prgname ();
406       gchar name[16] = "pool";
407       GThread *thread;
408 
409       if (prgname)
410         g_snprintf (name, sizeof (name), "pool-%s", prgname);
411 
412       /* No thread was found, we have to start a new one */
413       thread = g_thread_try_new (name, g_thread_pool_thread_proxy, pool, error);
414 
415       if (thread == NULL)
416         return FALSE;
417 
418       g_thread_unref (thread);
419     }
420 
421   /* See comment in g_thread_pool_thread_proxy as to why this is done
422    * here and not there
423    */
424   pool->num_threads++;
425 
426   return TRUE;
427 }
428 
429 /**
430  * g_thread_pool_new:
431  * @func: a function to execute in the threads of the new thread pool
432  * @user_data: user data that is handed over to @func every time it
433  *     is called
434  * @max_threads: the maximal number of threads to execute concurrently
435  *     in  the new thread pool, -1 means no limit
436  * @exclusive: should this thread pool be exclusive?
437  * @error: return location for error, or %NULL
438  *
439  * This function creates a new thread pool.
440  *
441  * Whenever you call g_thread_pool_push(), either a new thread is
442  * created or an unused one is reused. At most @max_threads threads
443  * are running concurrently for this thread pool. @max_threads = -1
444  * allows unlimited threads to be created for this thread pool. The
445  * newly created or reused thread now executes the function @func
446  * with the two arguments. The first one is the parameter to
447  * g_thread_pool_push() and the second one is @user_data.
448  *
449  * The parameter @exclusive determines whether the thread pool owns
450  * all threads exclusive or shares them with other thread pools.
451  * If @exclusive is %TRUE, @max_threads threads are started
452  * immediately and they will run exclusively for this thread pool
453  * until it is destroyed by g_thread_pool_free(). If @exclusive is
454  * %FALSE, threads are created when needed and shared between all
455  * non-exclusive thread pools. This implies that @max_threads may
456  * not be -1 for exclusive thread pools. Besides, exclusive thread
457  * pools are not affected by g_thread_pool_set_max_idle_time()
458  * since their threads are never considered idle and returned to the
459  * global pool.
460  *
461  * @error can be %NULL to ignore errors, or non-%NULL to report
462  * errors. An error can only occur when @exclusive is set to %TRUE
463  * and not all @max_threads threads could be created.
464  * See #GThreadError for possible errors that may occur.
465  * Note, even in case of error a valid #GThreadPool is returned.
466  *
467  * Returns: the new #GThreadPool
468  */
469 GThreadPool *
g_thread_pool_new(GFunc func,gpointer user_data,gint max_threads,gboolean exclusive,GError ** error)470 g_thread_pool_new (GFunc      func,
471                    gpointer   user_data,
472                    gint       max_threads,
473                    gboolean   exclusive,
474                    GError   **error)
475 {
476   GRealThreadPool *retval;
477   G_LOCK_DEFINE_STATIC (init);
478 
479   g_return_val_if_fail (func, NULL);
480   g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
481   g_return_val_if_fail (max_threads >= -1, NULL);
482 
483   retval = g_new (GRealThreadPool, 1);
484 
485   retval->pool.func = func;
486   retval->pool.user_data = user_data;
487   retval->pool.exclusive = exclusive;
488   retval->queue = g_async_queue_new ();
489   g_cond_init (&retval->cond);
490   retval->max_threads = max_threads;
491   retval->num_threads = 0;
492   retval->running = TRUE;
493   retval->immediate = FALSE;
494   retval->waiting = FALSE;
495   retval->sort_func = NULL;
496   retval->sort_user_data = NULL;
497 
498   G_LOCK (init);
499   if (!unused_thread_queue)
500       unused_thread_queue = g_async_queue_new ();
501   G_UNLOCK (init);
502 
503   if (retval->pool.exclusive)
504     {
505       g_async_queue_lock (retval->queue);
506 
507       while (retval->num_threads < (guint) retval->max_threads)
508         {
509           GError *local_error = NULL;
510 
511           if (!g_thread_pool_start_thread (retval, &local_error))
512             {
513               g_propagate_error (error, local_error);
514               break;
515             }
516         }
517 
518       g_async_queue_unlock (retval->queue);
519     }
520 
521   return (GThreadPool*) retval;
522 }
523 
524 /**
525  * g_thread_pool_push:
526  * @pool: a #GThreadPool
527  * @data: a new task for @pool
528  * @error: return location for error, or %NULL
529  *
530  * Inserts @data into the list of tasks to be executed by @pool.
531  *
532  * When the number of currently running threads is lower than the
533  * maximal allowed number of threads, a new thread is started (or
534  * reused) with the properties given to g_thread_pool_new().
535  * Otherwise, @data stays in the queue until a thread in this pool
536  * finishes its previous task and processes @data.
537  *
538  * @error can be %NULL to ignore errors, or non-%NULL to report
539  * errors. An error can only occur when a new thread couldn't be
540  * created. In that case @data is simply appended to the queue of
541  * work to do.
542  *
543  * Before version 2.32, this function did not return a success status.
544  *
545  * Returns: %TRUE on success, %FALSE if an error occurred
546  */
547 gboolean
g_thread_pool_push(GThreadPool * pool,gpointer data,GError ** error)548 g_thread_pool_push (GThreadPool  *pool,
549                     gpointer      data,
550                     GError      **error)
551 {
552   GRealThreadPool *real;
553   gboolean result;
554 
555   real = (GRealThreadPool*) pool;
556 
557   g_return_val_if_fail (real, FALSE);
558   g_return_val_if_fail (real->running, FALSE);
559 
560   result = TRUE;
561 
562   g_async_queue_lock (real->queue);
563 
564   if (g_async_queue_length_unlocked (real->queue) >= 0)
565     {
566       /* No thread is waiting in the queue */
567       GError *local_error = NULL;
568 
569       if (!g_thread_pool_start_thread (real, &local_error))
570         {
571           g_propagate_error (error, local_error);
572           result = FALSE;
573         }
574     }
575 
576   g_thread_pool_queue_push_unlocked (real, data);
577   g_async_queue_unlock (real->queue);
578 
579   return result;
580 }
581 
582 /**
583  * g_thread_pool_set_max_threads:
584  * @pool: a #GThreadPool
585  * @max_threads: a new maximal number of threads for @pool,
586  *     or -1 for unlimited
587  * @error: return location for error, or %NULL
588  *
589  * Sets the maximal allowed number of threads for @pool.
590  * A value of -1 means that the maximal number of threads
591  * is unlimited. If @pool is an exclusive thread pool, setting
592  * the maximal number of threads to -1 is not allowed.
593  *
594  * Setting @max_threads to 0 means stopping all work for @pool.
595  * It is effectively frozen until @max_threads is set to a non-zero
596  * value again.
597  *
598  * A thread is never terminated while calling @func, as supplied by
599  * g_thread_pool_new(). Instead the maximal number of threads only
600  * has effect for the allocation of new threads in g_thread_pool_push().
601  * A new thread is allocated, whenever the number of currently
602  * running threads in @pool is smaller than the maximal number.
603  *
604  * @error can be %NULL to ignore errors, or non-%NULL to report
605  * errors. An error can only occur when a new thread couldn't be
606  * created.
607  *
608  * Before version 2.32, this function did not return a success status.
609  *
610  * Returns: %TRUE on success, %FALSE if an error occurred
611  */
612 gboolean
g_thread_pool_set_max_threads(GThreadPool * pool,gint max_threads,GError ** error)613 g_thread_pool_set_max_threads (GThreadPool  *pool,
614                                gint          max_threads,
615                                GError      **error)
616 {
617   GRealThreadPool *real;
618   gint to_start;
619   gboolean result;
620 
621   real = (GRealThreadPool*) pool;
622 
623   g_return_val_if_fail (real, FALSE);
624   g_return_val_if_fail (real->running, FALSE);
625   g_return_val_if_fail (!real->pool.exclusive || max_threads != -1, FALSE);
626   g_return_val_if_fail (max_threads >= -1, FALSE);
627 
628   result = TRUE;
629 
630   g_async_queue_lock (real->queue);
631 
632   real->max_threads = max_threads;
633 
634   if (pool->exclusive)
635     to_start = real->max_threads - real->num_threads;
636   else
637     to_start = g_async_queue_length_unlocked (real->queue);
638 
639   for ( ; to_start > 0; to_start--)
640     {
641       GError *local_error = NULL;
642 
643       if (!g_thread_pool_start_thread (real, &local_error))
644         {
645           g_propagate_error (error, local_error);
646           result = FALSE;
647           break;
648         }
649     }
650 
651   g_async_queue_unlock (real->queue);
652 
653   return result;
654 }
655 
656 /**
657  * g_thread_pool_get_max_threads:
658  * @pool: a #GThreadPool
659  *
660  * Returns the maximal number of threads for @pool.
661  *
662  * Returns: the maximal number of threads
663  */
664 gint
g_thread_pool_get_max_threads(GThreadPool * pool)665 g_thread_pool_get_max_threads (GThreadPool *pool)
666 {
667   GRealThreadPool *real;
668   gint retval;
669 
670   real = (GRealThreadPool*) pool;
671 
672   g_return_val_if_fail (real, 0);
673   g_return_val_if_fail (real->running, 0);
674 
675   g_async_queue_lock (real->queue);
676   retval = real->max_threads;
677   g_async_queue_unlock (real->queue);
678 
679   return retval;
680 }
681 
682 /**
683  * g_thread_pool_get_num_threads:
684  * @pool: a #GThreadPool
685  *
686  * Returns the number of threads currently running in @pool.
687  *
688  * Returns: the number of threads currently running
689  */
690 guint
g_thread_pool_get_num_threads(GThreadPool * pool)691 g_thread_pool_get_num_threads (GThreadPool *pool)
692 {
693   GRealThreadPool *real;
694   guint retval;
695 
696   real = (GRealThreadPool*) pool;
697 
698   g_return_val_if_fail (real, 0);
699   g_return_val_if_fail (real->running, 0);
700 
701   g_async_queue_lock (real->queue);
702   retval = real->num_threads;
703   g_async_queue_unlock (real->queue);
704 
705   return retval;
706 }
707 
708 /**
709  * g_thread_pool_unprocessed:
710  * @pool: a #GThreadPool
711  *
712  * Returns the number of tasks still unprocessed in @pool.
713  *
714  * Returns: the number of unprocessed tasks
715  */
716 guint
g_thread_pool_unprocessed(GThreadPool * pool)717 g_thread_pool_unprocessed (GThreadPool *pool)
718 {
719   GRealThreadPool *real;
720   gint unprocessed;
721 
722   real = (GRealThreadPool*) pool;
723 
724   g_return_val_if_fail (real, 0);
725   g_return_val_if_fail (real->running, 0);
726 
727   unprocessed = g_async_queue_length (real->queue);
728 
729   return MAX (unprocessed, 0);
730 }
731 
732 /**
733  * g_thread_pool_free:
734  * @pool: a #GThreadPool
735  * @immediate: should @pool shut down immediately?
736  * @wait_: should the function wait for all tasks to be finished?
737  *
738  * Frees all resources allocated for @pool.
739  *
740  * If @immediate is %TRUE, no new task is processed for @pool.
741  * Otherwise @pool is not freed before the last task is processed.
742  * Note however, that no thread of this pool is interrupted while
743  * processing a task. Instead at least all still running threads
744  * can finish their tasks before the @pool is freed.
745  *
746  * If @wait_ is %TRUE, the functions does not return before all
747  * tasks to be processed (dependent on @immediate, whether all
748  * or only the currently running) are ready.
749  * Otherwise the function returns immediately.
750  *
751  * After calling this function @pool must not be used anymore.
752  */
753 void
g_thread_pool_free(GThreadPool * pool,gboolean immediate,gboolean wait_)754 g_thread_pool_free (GThreadPool *pool,
755                     gboolean     immediate,
756                     gboolean     wait_)
757 {
758   GRealThreadPool *real;
759 
760   real = (GRealThreadPool*) pool;
761 
762   g_return_if_fail (real);
763   g_return_if_fail (real->running);
764 
765   /* If there's no thread allowed here, there is not much sense in
766    * not stopping this pool immediately, when it's not empty
767    */
768   g_return_if_fail (immediate ||
769                     real->max_threads != 0 ||
770                     g_async_queue_length (real->queue) == 0);
771 
772   g_async_queue_lock (real->queue);
773 
774   real->running = FALSE;
775   real->immediate = immediate;
776   real->waiting = wait_;
777 
778   if (wait_)
779     {
780       while (g_async_queue_length_unlocked (real->queue) != (gint) -real->num_threads &&
781              !(immediate && real->num_threads == 0))
782         g_cond_wait (&real->cond, _g_async_queue_get_mutex (real->queue));
783     }
784 
785   if (immediate || g_async_queue_length_unlocked (real->queue) == (gint) -real->num_threads)
786     {
787       /* No thread is currently doing something (and nothing is left
788        * to process in the queue)
789        */
790       if (real->num_threads == 0)
791         {
792           /* No threads left, we clean up */
793           g_async_queue_unlock (real->queue);
794           g_thread_pool_free_internal (real);
795           return;
796         }
797 
798       g_thread_pool_wakeup_and_stop_all (real);
799     }
800 
801   /* The last thread should cleanup the pool */
802   real->waiting = FALSE;
803   g_async_queue_unlock (real->queue);
804 }
805 
806 static void
g_thread_pool_free_internal(GRealThreadPool * pool)807 g_thread_pool_free_internal (GRealThreadPool* pool)
808 {
809   g_return_if_fail (pool);
810   g_return_if_fail (pool->running == FALSE);
811   g_return_if_fail (pool->num_threads == 0);
812 
813   g_async_queue_unref (pool->queue);
814   g_cond_clear (&pool->cond);
815 
816   g_free (pool);
817 }
818 
819 static void
g_thread_pool_wakeup_and_stop_all(GRealThreadPool * pool)820 g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool)
821 {
822   guint i;
823 
824   g_return_if_fail (pool);
825   g_return_if_fail (pool->running == FALSE);
826   g_return_if_fail (pool->num_threads != 0);
827 
828   pool->immediate = TRUE;
829 
830   /*
831    * So here we're sending bogus data to the pool threads, which
832    * should cause them each to wake up, and check the above
833    * pool->immediate condition. However we don't want that
834    * data to be sorted (since it'll crash the sorter).
835    */
836   for (i = 0; i < pool->num_threads; i++)
837     g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
838 }
839 
840 /**
841  * g_thread_pool_set_max_unused_threads:
842  * @max_threads: maximal number of unused threads
843  *
844  * Sets the maximal number of unused threads to @max_threads.
845  * If @max_threads is -1, no limit is imposed on the number
846  * of unused threads.
847  *
848  * The default value is 2.
849  */
850 void
g_thread_pool_set_max_unused_threads(gint max_threads)851 g_thread_pool_set_max_unused_threads (gint max_threads)
852 {
853   g_return_if_fail (max_threads >= -1);
854 
855   g_atomic_int_set (&max_unused_threads, max_threads);
856 
857   if (max_threads != -1)
858     {
859       max_threads -= g_atomic_int_get (&unused_threads);
860       if (max_threads < 0)
861         {
862           g_atomic_int_set (&kill_unused_threads, -max_threads);
863           g_atomic_int_inc (&wakeup_thread_serial);
864 
865           g_async_queue_lock (unused_thread_queue);
866 
867           do
868             {
869               g_async_queue_push_unlocked (unused_thread_queue,
870                                            wakeup_thread_marker);
871             }
872           while (++max_threads);
873 
874           g_async_queue_unlock (unused_thread_queue);
875         }
876     }
877 }
878 
879 /**
880  * g_thread_pool_get_max_unused_threads:
881  *
882  * Returns the maximal allowed number of unused threads.
883  *
884  * Returns: the maximal number of unused threads
885  */
886 gint
g_thread_pool_get_max_unused_threads(void)887 g_thread_pool_get_max_unused_threads (void)
888 {
889   return g_atomic_int_get (&max_unused_threads);
890 }
891 
892 /**
893  * g_thread_pool_get_num_unused_threads:
894  *
895  * Returns the number of currently unused threads.
896  *
897  * Returns: the number of currently unused threads
898  */
899 guint
g_thread_pool_get_num_unused_threads(void)900 g_thread_pool_get_num_unused_threads (void)
901 {
902   return g_atomic_int_get (&unused_threads);
903 }
904 
905 /**
906  * g_thread_pool_stop_unused_threads:
907  *
908  * Stops all currently unused threads. This does not change the
909  * maximal number of unused threads. This function can be used to
910  * regularly stop all unused threads e.g. from g_timeout_add().
911  */
912 void
g_thread_pool_stop_unused_threads(void)913 g_thread_pool_stop_unused_threads (void)
914 {
915   guint oldval;
916 
917   oldval = g_thread_pool_get_max_unused_threads ();
918 
919   g_thread_pool_set_max_unused_threads (0);
920   g_thread_pool_set_max_unused_threads (oldval);
921 }
922 
923 /**
924  * g_thread_pool_set_sort_function:
925  * @pool: a #GThreadPool
926  * @func: the #GCompareDataFunc used to sort the list of tasks.
927  *     This function is passed two tasks. It should return
928  *     0 if the order in which they are handled does not matter,
929  *     a negative value if the first task should be processed before
930  *     the second or a positive value if the second task should be
931  *     processed first.
932  * @user_data: user data passed to @func
933  *
934  * Sets the function used to sort the list of tasks. This allows the
935  * tasks to be processed by a priority determined by @func, and not
936  * just in the order in which they were added to the pool.
937  *
938  * Note, if the maximum number of threads is more than 1, the order
939  * that threads are executed cannot be guaranteed 100%. Threads are
940  * scheduled by the operating system and are executed at random. It
941  * cannot be assumed that threads are executed in the order they are
942  * created.
943  *
944  * Since: 2.10
945  */
946 void
g_thread_pool_set_sort_function(GThreadPool * pool,GCompareDataFunc func,gpointer user_data)947 g_thread_pool_set_sort_function (GThreadPool      *pool,
948                                  GCompareDataFunc  func,
949                                  gpointer          user_data)
950 {
951   GRealThreadPool *real;
952 
953   real = (GRealThreadPool*) pool;
954 
955   g_return_if_fail (real);
956   g_return_if_fail (real->running);
957 
958   g_async_queue_lock (real->queue);
959 
960   real->sort_func = func;
961   real->sort_user_data = user_data;
962 
963   if (func)
964     g_async_queue_sort_unlocked (real->queue,
965                                  real->sort_func,
966                                  real->sort_user_data);
967 
968   g_async_queue_unlock (real->queue);
969 }
970 
971 /**
972  * g_thread_pool_move_to_front:
973  * @pool: a #GThreadPool
974  * @data: an unprocessed item in the pool
975  *
976  * Moves the item to the front of the queue of unprocessed
977  * items, so that it will be processed next.
978  *
979  * Returns: %TRUE if the item was found and moved
980  *
981  * Since: 2.46
982  */
983 gboolean
g_thread_pool_move_to_front(GThreadPool * pool,gpointer data)984 g_thread_pool_move_to_front (GThreadPool *pool,
985                              gpointer     data)
986 {
987   GRealThreadPool *real = (GRealThreadPool*) pool;
988   gboolean found;
989 
990   g_async_queue_lock (real->queue);
991 
992   found = g_async_queue_remove_unlocked (real->queue, data);
993   if (found)
994     g_async_queue_push_front_unlocked (real->queue, data);
995 
996   g_async_queue_unlock (real->queue);
997 
998   return found;
999 }
1000 
1001 /**
1002  * g_thread_pool_set_max_idle_time:
1003  * @interval: the maximum @interval (in milliseconds)
1004  *     a thread can be idle
1005  *
1006  * This function will set the maximum @interval that a thread
1007  * waiting in the pool for new tasks can be idle for before
1008  * being stopped. This function is similar to calling
1009  * g_thread_pool_stop_unused_threads() on a regular timeout,
1010  * except this is done on a per thread basis.
1011  *
1012  * By setting @interval to 0, idle threads will not be stopped.
1013  *
1014  * The default value is 15000 (15 seconds).
1015  *
1016  * Since: 2.10
1017  */
1018 void
g_thread_pool_set_max_idle_time(guint interval)1019 g_thread_pool_set_max_idle_time (guint interval)
1020 {
1021   guint i;
1022 
1023   g_atomic_int_set (&max_idle_time, interval);
1024 
1025   i = g_atomic_int_get (&unused_threads);
1026   if (i > 0)
1027     {
1028       g_atomic_int_inc (&wakeup_thread_serial);
1029       g_async_queue_lock (unused_thread_queue);
1030 
1031       do
1032         {
1033           g_async_queue_push_unlocked (unused_thread_queue,
1034                                        wakeup_thread_marker);
1035         }
1036       while (--i);
1037 
1038       g_async_queue_unlock (unused_thread_queue);
1039     }
1040 }
1041 
1042 /**
1043  * g_thread_pool_get_max_idle_time:
1044  *
1045  * This function will return the maximum @interval that a
1046  * thread will wait in the thread pool for new tasks before
1047  * being stopped.
1048  *
1049  * If this function returns 0, threads waiting in the thread
1050  * pool for new work are not stopped.
1051  *
1052  * Returns: the maximum @interval (milliseconds) to wait
1053  *     for new tasks in the thread pool before stopping the
1054  *     thread
1055  *
1056  * Since: 2.10
1057  */
1058 guint
g_thread_pool_get_max_idle_time(void)1059 g_thread_pool_get_max_idle_time (void)
1060 {
1061   return g_atomic_int_get (&max_idle_time);
1062 }
1063