• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GLIB - Library of useful routines for C programming
2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
3  *
4  * GAsyncQueue: asynchronous queue implementation, based on GQueue.
5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
19  */
20 
21 /*
22  * MT safe
23  */
24 
25 #include "config.h"
26 
27 #include "gasyncqueue.h"
28 #include "gasyncqueueprivate.h"
29 
30 #include "gmain.h"
31 #include "gmem.h"
32 #include "gqueue.h"
33 #include "gtestutils.h"
34 #include "gtimer.h"
35 #include "gthread.h"
36 #include "deprecated/gthread.h"
37 
38 
39 /**
40  * SECTION:async_queues
41  * @title: Asynchronous Queues
42  * @short_description: asynchronous communication between threads
43  * @see_also: #GThreadPool
44  *
45  * Often you need to communicate between different threads. In general
46  * it's safer not to do this by shared memory, but by explicit message
47  * passing. These messages only make sense asynchronously for
48  * multi-threaded applications though, as a synchronous operation could
49  * as well be done in the same thread.
50  *
51  * Asynchronous queues are an exception from most other GLib data
52  * structures, as they can be used simultaneously from multiple threads
53  * without explicit locking and they bring their own builtin reference
54  * counting. This is because the nature of an asynchronous queue is that
55  * it will always be used by at least 2 concurrent threads.
56  *
57  * For using an asynchronous queue you first have to create one with
58  * g_async_queue_new(). #GAsyncQueue structs are reference counted,
59  * use g_async_queue_ref() and g_async_queue_unref() to manage your
60  * references.
61  *
62  * A thread which wants to send a message to that queue simply calls
63  * g_async_queue_push() to push the message to the queue.
64  *
65  * A thread which is expecting messages from an asynchronous queue
66  * simply calls g_async_queue_pop() for that queue. If no message is
67  * available in the queue at that point, the thread is now put to sleep
68  * until a message arrives. The message will be removed from the queue
69  * and returned. The functions g_async_queue_try_pop() and
70  * g_async_queue_timeout_pop() can be used to only check for the presence
71  * of messages or to only wait a certain time for messages respectively.
72  *
73  * For almost every function there exist two variants, one that locks
74  * the queue and one that doesn't. That way you can hold the queue lock
75  * (acquire it with g_async_queue_lock() and release it with
76  * g_async_queue_unlock()) over multiple queue accessing instructions.
77  * This can be necessary to ensure the integrity of the queue, but should
78  * only be used when really necessary, as it can make your life harder
79  * if used unwisely. Normally you should only use the locking function
80  * variants (those without the _unlocked suffix).
81  *
82  * In many cases, it may be more convenient to use #GThreadPool when
83  * you need to distribute work to a set of worker threads instead of
84  * using #GAsyncQueue manually. #GThreadPool uses a GAsyncQueue
85  * internally.
86  */
87 
88 /**
89  * GAsyncQueue:
90  *
91  * The GAsyncQueue struct is an opaque data structure which represents
92  * an asynchronous queue. It should only be accessed through the
93  * g_async_queue_* functions.
94  */
95 struct _GAsyncQueue
96 {
97   GMutex mutex;
98   GCond cond;
99   GQueue queue;
100   GDestroyNotify item_free_func;
101   guint waiting_threads;
102   gint ref_count;
103 };
104 
105 typedef struct
106 {
107   GCompareDataFunc func;
108   gpointer         user_data;
109 } SortData;
110 
111 /**
112  * g_async_queue_new:
113  *
114  * Creates a new asynchronous queue.
115  *
116  * Returns: a new #GAsyncQueue. Free with g_async_queue_unref()
117  */
118 GAsyncQueue *
g_async_queue_new(void)119 g_async_queue_new (void)
120 {
121   return g_async_queue_new_full (NULL);
122 }
123 
124 /**
125  * g_async_queue_new_full:
126  * @item_free_func: function to free queue elements
127  *
128  * Creates a new asynchronous queue and sets up a destroy notify
129  * function that is used to free any remaining queue items when
130  * the queue is destroyed after the final unref.
131  *
132  * Returns: a new #GAsyncQueue. Free with g_async_queue_unref()
133  *
134  * Since: 2.16
135  */
136 GAsyncQueue *
g_async_queue_new_full(GDestroyNotify item_free_func)137 g_async_queue_new_full (GDestroyNotify item_free_func)
138 {
139   GAsyncQueue *queue;
140 
141   queue = g_new (GAsyncQueue, 1);
142   g_mutex_init (&queue->mutex);
143   g_cond_init (&queue->cond);
144   g_queue_init (&queue->queue);
145   queue->waiting_threads = 0;
146   queue->ref_count = 1;
147   queue->item_free_func = item_free_func;
148 
149   return queue;
150 }
151 
152 /**
153  * g_async_queue_ref:
154  * @queue: a #GAsyncQueue
155  *
156  * Increases the reference count of the asynchronous @queue by 1.
157  * You do not need to hold the lock to call this function.
158  *
159  * Returns: the @queue that was passed in (since 2.6)
160  */
161 GAsyncQueue *
g_async_queue_ref(GAsyncQueue * queue)162 g_async_queue_ref (GAsyncQueue *queue)
163 {
164   g_return_val_if_fail (queue, NULL);
165 
166   g_atomic_int_inc (&queue->ref_count);
167 
168   return queue;
169 }
170 
171 /**
172  * g_async_queue_ref_unlocked:
173  * @queue: a #GAsyncQueue
174  *
175  * Increases the reference count of the asynchronous @queue by 1.
176  *
177  * Deprecated: 2.8: Reference counting is done atomically.
178  * so g_async_queue_ref() can be used regardless of the @queue's
179  * lock.
180  */
181 void
g_async_queue_ref_unlocked(GAsyncQueue * queue)182 g_async_queue_ref_unlocked (GAsyncQueue *queue)
183 {
184   g_return_if_fail (queue);
185 
186   g_atomic_int_inc (&queue->ref_count);
187 }
188 
189 /**
190  * g_async_queue_unref_and_unlock:
191  * @queue: a #GAsyncQueue
192  *
193  * Decreases the reference count of the asynchronous @queue by 1
194  * and releases the lock. This function must be called while holding
195  * the @queue's lock. If the reference count went to 0, the @queue
196  * will be destroyed and the memory allocated will be freed.
197  *
198  * Deprecated: 2.8: Reference counting is done atomically.
199  * so g_async_queue_unref() can be used regardless of the @queue's
200  * lock.
201  */
202 void
g_async_queue_unref_and_unlock(GAsyncQueue * queue)203 g_async_queue_unref_and_unlock (GAsyncQueue *queue)
204 {
205   g_return_if_fail (queue);
206 
207   g_mutex_unlock (&queue->mutex);
208   g_async_queue_unref (queue);
209 }
210 
211 /**
212  * g_async_queue_unref:
213  * @queue: a #GAsyncQueue.
214  *
215  * Decreases the reference count of the asynchronous @queue by 1.
216  *
217  * If the reference count went to 0, the @queue will be destroyed
218  * and the memory allocated will be freed. So you are not allowed
219  * to use the @queue afterwards, as it might have disappeared.
220  * You do not need to hold the lock to call this function.
221  */
222 void
g_async_queue_unref(GAsyncQueue * queue)223 g_async_queue_unref (GAsyncQueue *queue)
224 {
225   g_return_if_fail (queue);
226 
227   if (g_atomic_int_dec_and_test (&queue->ref_count))
228     {
229       g_return_if_fail (queue->waiting_threads == 0);
230       g_mutex_clear (&queue->mutex);
231       g_cond_clear (&queue->cond);
232       if (queue->item_free_func)
233         g_queue_foreach (&queue->queue, (GFunc) queue->item_free_func, NULL);
234       g_queue_clear (&queue->queue);
235       g_free (queue);
236     }
237 }
238 
239 /**
240  * g_async_queue_lock:
241  * @queue: a #GAsyncQueue
242  *
243  * Acquires the @queue's lock. If another thread is already
244  * holding the lock, this call will block until the lock
245  * becomes available.
246  *
247  * Call g_async_queue_unlock() to drop the lock again.
248  *
249  * While holding the lock, you can only call the
250  * g_async_queue_*_unlocked() functions on @queue. Otherwise,
251  * deadlock may occur.
252  */
253 void
g_async_queue_lock(GAsyncQueue * queue)254 g_async_queue_lock (GAsyncQueue *queue)
255 {
256   g_return_if_fail (queue);
257 
258   g_mutex_lock (&queue->mutex);
259 }
260 
261 /**
262  * g_async_queue_unlock:
263  * @queue: a #GAsyncQueue
264  *
265  * Releases the queue's lock.
266  *
267  * Calling this function when you have not acquired
268  * the with g_async_queue_lock() leads to undefined
269  * behaviour.
270  */
271 void
g_async_queue_unlock(GAsyncQueue * queue)272 g_async_queue_unlock (GAsyncQueue *queue)
273 {
274   g_return_if_fail (queue);
275 
276   g_mutex_unlock (&queue->mutex);
277 }
278 
279 /**
280  * g_async_queue_push:
281  * @queue: a #GAsyncQueue
282  * @data: @data to push into the @queue
283  *
284  * Pushes the @data into the @queue. @data must not be %NULL.
285  */
286 void
g_async_queue_push(GAsyncQueue * queue,gpointer data)287 g_async_queue_push (GAsyncQueue *queue,
288                     gpointer     data)
289 {
290   g_return_if_fail (queue);
291   g_return_if_fail (data);
292 
293   g_mutex_lock (&queue->mutex);
294   g_async_queue_push_unlocked (queue, data);
295   g_mutex_unlock (&queue->mutex);
296 }
297 
298 /**
299  * g_async_queue_push_unlocked:
300  * @queue: a #GAsyncQueue
301  * @data: @data to push into the @queue
302  *
303  * Pushes the @data into the @queue. @data must not be %NULL.
304  *
305  * This function must be called while holding the @queue's lock.
306  */
307 void
g_async_queue_push_unlocked(GAsyncQueue * queue,gpointer data)308 g_async_queue_push_unlocked (GAsyncQueue *queue,
309                              gpointer     data)
310 {
311   g_return_if_fail (queue);
312   g_return_if_fail (data);
313 
314   g_queue_push_head (&queue->queue, data);
315   if (queue->waiting_threads > 0)
316     g_cond_signal (&queue->cond);
317 }
318 
319 /**
320  * g_async_queue_push_sorted:
321  * @queue: a #GAsyncQueue
322  * @data: the @data to push into the @queue
323  * @func: the #GCompareDataFunc is used to sort @queue
324  * @user_data: user data passed to @func.
325  *
326  * Inserts @data into @queue using @func to determine the new
327  * position.
328  *
329  * This function requires that the @queue is sorted before pushing on
330  * new elements, see g_async_queue_sort().
331  *
332  * This function will lock @queue before it sorts the queue and unlock
333  * it when it is finished.
334  *
335  * For an example of @func see g_async_queue_sort().
336  *
337  * Since: 2.10
338  */
339 void
g_async_queue_push_sorted(GAsyncQueue * queue,gpointer data,GCompareDataFunc func,gpointer user_data)340 g_async_queue_push_sorted (GAsyncQueue      *queue,
341                            gpointer          data,
342                            GCompareDataFunc  func,
343                            gpointer          user_data)
344 {
345   g_return_if_fail (queue != NULL);
346 
347   g_mutex_lock (&queue->mutex);
348   g_async_queue_push_sorted_unlocked (queue, data, func, user_data);
349   g_mutex_unlock (&queue->mutex);
350 }
351 
352 static gint
g_async_queue_invert_compare(gpointer v1,gpointer v2,SortData * sd)353 g_async_queue_invert_compare (gpointer  v1,
354                               gpointer  v2,
355                               SortData *sd)
356 {
357   return -sd->func (v1, v2, sd->user_data);
358 }
359 
360 /**
361  * g_async_queue_push_sorted_unlocked:
362  * @queue: a #GAsyncQueue
363  * @data: the @data to push into the @queue
364  * @func: the #GCompareDataFunc is used to sort @queue
365  * @user_data: user data passed to @func.
366  *
367  * Inserts @data into @queue using @func to determine the new
368  * position.
369  *
370  * The sort function @func is passed two elements of the @queue.
371  * It should return 0 if they are equal, a negative value if the
372  * first element should be higher in the @queue or a positive value
373  * if the first element should be lower in the @queue than the second
374  * element.
375  *
376  * This function requires that the @queue is sorted before pushing on
377  * new elements, see g_async_queue_sort().
378  *
379  * This function must be called while holding the @queue's lock.
380  *
381  * For an example of @func see g_async_queue_sort().
382  *
383  * Since: 2.10
384  */
385 void
g_async_queue_push_sorted_unlocked(GAsyncQueue * queue,gpointer data,GCompareDataFunc func,gpointer user_data)386 g_async_queue_push_sorted_unlocked (GAsyncQueue      *queue,
387                                     gpointer          data,
388                                     GCompareDataFunc  func,
389                                     gpointer          user_data)
390 {
391   SortData sd;
392 
393   g_return_if_fail (queue != NULL);
394 
395   sd.func = func;
396   sd.user_data = user_data;
397 
398   g_queue_insert_sorted (&queue->queue,
399                          data,
400                          (GCompareDataFunc)g_async_queue_invert_compare,
401                          &sd);
402   if (queue->waiting_threads > 0)
403     g_cond_signal (&queue->cond);
404 }
405 
406 static gpointer
g_async_queue_pop_intern_unlocked(GAsyncQueue * queue,gboolean wait,gint64 end_time)407 g_async_queue_pop_intern_unlocked (GAsyncQueue *queue,
408                                    gboolean     wait,
409                                    gint64       end_time)
410 {
411   gpointer retval;
412 
413   if (!g_queue_peek_tail_link (&queue->queue) && wait)
414     {
415       queue->waiting_threads++;
416       while (!g_queue_peek_tail_link (&queue->queue))
417         {
418 	  if (end_time == -1)
419 	    g_cond_wait (&queue->cond, &queue->mutex);
420 	  else
421 	    {
422 	      if (!g_cond_wait_until (&queue->cond, &queue->mutex, end_time))
423 		break;
424 	    }
425         }
426       queue->waiting_threads--;
427     }
428 
429   retval = g_queue_pop_tail (&queue->queue);
430 
431   g_assert (retval || !wait || end_time > 0);
432 
433   return retval;
434 }
435 
436 /**
437  * g_async_queue_pop:
438  * @queue: a #GAsyncQueue
439  *
440  * Pops data from the @queue. If @queue is empty, this function
441  * blocks until data becomes available.
442  *
443  * Returns: data from the queue
444  */
445 gpointer
g_async_queue_pop(GAsyncQueue * queue)446 g_async_queue_pop (GAsyncQueue *queue)
447 {
448   gpointer retval;
449 
450   g_return_val_if_fail (queue, NULL);
451 
452   g_mutex_lock (&queue->mutex);
453   retval = g_async_queue_pop_intern_unlocked (queue, TRUE, -1);
454   g_mutex_unlock (&queue->mutex);
455 
456   return retval;
457 }
458 
459 /**
460  * g_async_queue_pop_unlocked:
461  * @queue: a #GAsyncQueue
462  *
463  * Pops data from the @queue. If @queue is empty, this function
464  * blocks until data becomes available.
465  *
466  * This function must be called while holding the @queue's lock.
467  *
468  * Returns: data from the queue.
469  */
470 gpointer
g_async_queue_pop_unlocked(GAsyncQueue * queue)471 g_async_queue_pop_unlocked (GAsyncQueue *queue)
472 {
473   g_return_val_if_fail (queue, NULL);
474 
475   return g_async_queue_pop_intern_unlocked (queue, TRUE, -1);
476 }
477 
478 /**
479  * g_async_queue_try_pop:
480  * @queue: a #GAsyncQueue
481  *
482  * Tries to pop data from the @queue. If no data is available,
483  * %NULL is returned.
484  *
485  * Returns: (nullable): data from the queue or %NULL, when no data is
486  *     available immediately.
487  */
488 gpointer
g_async_queue_try_pop(GAsyncQueue * queue)489 g_async_queue_try_pop (GAsyncQueue *queue)
490 {
491   gpointer retval;
492 
493   g_return_val_if_fail (queue, NULL);
494 
495   g_mutex_lock (&queue->mutex);
496   retval = g_async_queue_pop_intern_unlocked (queue, FALSE, -1);
497   g_mutex_unlock (&queue->mutex);
498 
499   return retval;
500 }
501 
502 /**
503  * g_async_queue_try_pop_unlocked:
504  * @queue: a #GAsyncQueue
505  *
506  * Tries to pop data from the @queue. If no data is available,
507  * %NULL is returned.
508  *
509  * This function must be called while holding the @queue's lock.
510  *
511  * Returns: (nullable): data from the queue or %NULL, when no data is
512  *     available immediately.
513  */
514 gpointer
g_async_queue_try_pop_unlocked(GAsyncQueue * queue)515 g_async_queue_try_pop_unlocked (GAsyncQueue *queue)
516 {
517   g_return_val_if_fail (queue, NULL);
518 
519   return g_async_queue_pop_intern_unlocked (queue, FALSE, -1);
520 }
521 
522 /**
523  * g_async_queue_timeout_pop:
524  * @queue: a #GAsyncQueue
525  * @timeout: the number of microseconds to wait
526  *
527  * Pops data from the @queue. If the queue is empty, blocks for
528  * @timeout microseconds, or until data becomes available.
529  *
530  * If no data is received before the timeout, %NULL is returned.
531  *
532  * Returns: (nullable): data from the queue or %NULL, when no data is
533  *     received before the timeout.
534  */
535 gpointer
g_async_queue_timeout_pop(GAsyncQueue * queue,guint64 timeout)536 g_async_queue_timeout_pop (GAsyncQueue *queue,
537 			   guint64      timeout)
538 {
539   gint64 end_time = g_get_monotonic_time () + timeout;
540   gpointer retval;
541 
542   g_return_val_if_fail (queue != NULL, NULL);
543 
544   g_mutex_lock (&queue->mutex);
545   retval = g_async_queue_pop_intern_unlocked (queue, TRUE, end_time);
546   g_mutex_unlock (&queue->mutex);
547 
548   return retval;
549 }
550 
551 /**
552  * g_async_queue_timeout_pop_unlocked:
553  * @queue: a #GAsyncQueue
554  * @timeout: the number of microseconds to wait
555  *
556  * Pops data from the @queue. If the queue is empty, blocks for
557  * @timeout microseconds, or until data becomes available.
558  *
559  * If no data is received before the timeout, %NULL is returned.
560  *
561  * This function must be called while holding the @queue's lock.
562  *
563  * Returns: (nullable): data from the queue or %NULL, when no data is
564  *     received before the timeout.
565  */
566 gpointer
g_async_queue_timeout_pop_unlocked(GAsyncQueue * queue,guint64 timeout)567 g_async_queue_timeout_pop_unlocked (GAsyncQueue *queue,
568 				    guint64      timeout)
569 {
570   gint64 end_time = g_get_monotonic_time () + timeout;
571 
572   g_return_val_if_fail (queue != NULL, NULL);
573 
574   return g_async_queue_pop_intern_unlocked (queue, TRUE, end_time);
575 }
576 
577 /**
578  * g_async_queue_timed_pop:
579  * @queue: a #GAsyncQueue
580  * @end_time: a #GTimeVal, determining the final time
581  *
582  * Pops data from the @queue. If the queue is empty, blocks until
583  * @end_time or until data becomes available.
584  *
585  * If no data is received before @end_time, %NULL is returned.
586  *
587  * To easily calculate @end_time, a combination of g_get_real_time()
588  * and g_time_val_add() can be used.
589  *
590  * Returns: (nullable): data from the queue or %NULL, when no data is
591  *     received before @end_time.
592  *
593  * Deprecated: use g_async_queue_timeout_pop().
594  */
595 G_GNUC_BEGIN_IGNORE_DEPRECATIONS
596 gpointer
g_async_queue_timed_pop(GAsyncQueue * queue,GTimeVal * end_time)597 g_async_queue_timed_pop (GAsyncQueue *queue,
598                          GTimeVal    *end_time)
599 {
600   gint64 m_end_time;
601   gpointer retval;
602 
603   g_return_val_if_fail (queue, NULL);
604 
605   if (end_time != NULL)
606     {
607       m_end_time = g_get_monotonic_time () +
608         ((gint64) end_time->tv_sec * G_USEC_PER_SEC + end_time->tv_usec - g_get_real_time ());
609     }
610   else
611     m_end_time = -1;
612 
613   g_mutex_lock (&queue->mutex);
614   retval = g_async_queue_pop_intern_unlocked (queue, TRUE, m_end_time);
615   g_mutex_unlock (&queue->mutex);
616 
617   return retval;
618 }
619 G_GNUC_END_IGNORE_DEPRECATIONS
620 
621 /**
622  * g_async_queue_timed_pop_unlocked:
623  * @queue: a #GAsyncQueue
624  * @end_time: a #GTimeVal, determining the final time
625  *
626  * Pops data from the @queue. If the queue is empty, blocks until
627  * @end_time or until data becomes available.
628  *
629  * If no data is received before @end_time, %NULL is returned.
630  *
631  * To easily calculate @end_time, a combination of g_get_real_time()
632  * and g_time_val_add() can be used.
633  *
634  * This function must be called while holding the @queue's lock.
635  *
636  * Returns: (nullable): data from the queue or %NULL, when no data is
637  *     received before @end_time.
638  *
639  * Deprecated: use g_async_queue_timeout_pop_unlocked().
640  */
641 G_GNUC_BEGIN_IGNORE_DEPRECATIONS
642 gpointer
g_async_queue_timed_pop_unlocked(GAsyncQueue * queue,GTimeVal * end_time)643 g_async_queue_timed_pop_unlocked (GAsyncQueue *queue,
644                                   GTimeVal    *end_time)
645 {
646   gint64 m_end_time;
647 
648   g_return_val_if_fail (queue, NULL);
649 
650   if (end_time != NULL)
651     {
652       m_end_time = g_get_monotonic_time () +
653         ((gint64) end_time->tv_sec * G_USEC_PER_SEC + end_time->tv_usec - g_get_real_time ());
654     }
655   else
656     m_end_time = -1;
657 
658   return g_async_queue_pop_intern_unlocked (queue, TRUE, m_end_time);
659 }
660 G_GNUC_END_IGNORE_DEPRECATIONS
661 
662 /**
663  * g_async_queue_length:
664  * @queue: a #GAsyncQueue.
665  *
666  * Returns the length of the queue.
667  *
668  * Actually this function returns the number of data items in
669  * the queue minus the number of waiting threads, so a negative
670  * value means waiting threads, and a positive value means available
671  * entries in the @queue. A return value of 0 could mean n entries
672  * in the queue and n threads waiting. This can happen due to locking
673  * of the queue or due to scheduling.
674  *
675  * Returns: the length of the @queue
676  */
677 gint
g_async_queue_length(GAsyncQueue * queue)678 g_async_queue_length (GAsyncQueue *queue)
679 {
680   gint retval;
681 
682   g_return_val_if_fail (queue, 0);
683 
684   g_mutex_lock (&queue->mutex);
685   retval = queue->queue.length - queue->waiting_threads;
686   g_mutex_unlock (&queue->mutex);
687 
688   return retval;
689 }
690 
691 /**
692  * g_async_queue_length_unlocked:
693  * @queue: a #GAsyncQueue
694  *
695  * Returns the length of the queue.
696  *
697  * Actually this function returns the number of data items in
698  * the queue minus the number of waiting threads, so a negative
699  * value means waiting threads, and a positive value means available
700  * entries in the @queue. A return value of 0 could mean n entries
701  * in the queue and n threads waiting. This can happen due to locking
702  * of the queue or due to scheduling.
703  *
704  * This function must be called while holding the @queue's lock.
705  *
706  * Returns: the length of the @queue.
707  */
708 gint
g_async_queue_length_unlocked(GAsyncQueue * queue)709 g_async_queue_length_unlocked (GAsyncQueue *queue)
710 {
711   g_return_val_if_fail (queue, 0);
712 
713   return queue->queue.length - queue->waiting_threads;
714 }
715 
716 /**
717  * g_async_queue_sort:
718  * @queue: a #GAsyncQueue
719  * @func: the #GCompareDataFunc is used to sort @queue
720  * @user_data: user data passed to @func
721  *
722  * Sorts @queue using @func.
723  *
724  * The sort function @func is passed two elements of the @queue.
725  * It should return 0 if they are equal, a negative value if the
726  * first element should be higher in the @queue or a positive value
727  * if the first element should be lower in the @queue than the second
728  * element.
729  *
730  * This function will lock @queue before it sorts the queue and unlock
731  * it when it is finished.
732  *
733  * If you were sorting a list of priority numbers to make sure the
734  * lowest priority would be at the top of the queue, you could use:
735  * |[<!-- language="C" -->
736  *  gint32 id1;
737  *  gint32 id2;
738  *
739  *  id1 = GPOINTER_TO_INT (element1);
740  *  id2 = GPOINTER_TO_INT (element2);
741  *
742  *  return (id1 > id2 ? +1 : id1 == id2 ? 0 : -1);
743  * ]|
744  *
745  * Since: 2.10
746  */
747 void
g_async_queue_sort(GAsyncQueue * queue,GCompareDataFunc func,gpointer user_data)748 g_async_queue_sort (GAsyncQueue      *queue,
749                     GCompareDataFunc  func,
750                     gpointer          user_data)
751 {
752   g_return_if_fail (queue != NULL);
753   g_return_if_fail (func != NULL);
754 
755   g_mutex_lock (&queue->mutex);
756   g_async_queue_sort_unlocked (queue, func, user_data);
757   g_mutex_unlock (&queue->mutex);
758 }
759 
760 /**
761  * g_async_queue_sort_unlocked:
762  * @queue: a #GAsyncQueue
763  * @func: the #GCompareDataFunc is used to sort @queue
764  * @user_data: user data passed to @func
765  *
766  * Sorts @queue using @func.
767  *
768  * The sort function @func is passed two elements of the @queue.
769  * It should return 0 if they are equal, a negative value if the
770  * first element should be higher in the @queue or a positive value
771  * if the first element should be lower in the @queue than the second
772  * element.
773  *
774  * This function must be called while holding the @queue's lock.
775  *
776  * Since: 2.10
777  */
778 void
g_async_queue_sort_unlocked(GAsyncQueue * queue,GCompareDataFunc func,gpointer user_data)779 g_async_queue_sort_unlocked (GAsyncQueue      *queue,
780                              GCompareDataFunc  func,
781                              gpointer          user_data)
782 {
783   SortData sd;
784 
785   g_return_if_fail (queue != NULL);
786   g_return_if_fail (func != NULL);
787 
788   sd.func = func;
789   sd.user_data = user_data;
790 
791   g_queue_sort (&queue->queue,
792                 (GCompareDataFunc)g_async_queue_invert_compare,
793                 &sd);
794 }
795 
796 /**
797  * g_async_queue_remove:
798  * @queue: a #GAsyncQueue
799  * @item: the data to remove from the @queue
800  *
801  * Remove an item from the queue.
802  *
803  * Returns: %TRUE if the item was removed
804  *
805  * Since: 2.46
806  */
807 gboolean
g_async_queue_remove(GAsyncQueue * queue,gpointer item)808 g_async_queue_remove (GAsyncQueue *queue,
809                       gpointer     item)
810 {
811   gboolean ret;
812 
813   g_return_val_if_fail (queue != NULL, FALSE);
814   g_return_val_if_fail (item != NULL, FALSE);
815 
816   g_mutex_lock (&queue->mutex);
817   ret = g_async_queue_remove_unlocked (queue, item);
818   g_mutex_unlock (&queue->mutex);
819 
820   return ret;
821 }
822 
823 /**
824  * g_async_queue_remove_unlocked:
825  * @queue: a #GAsyncQueue
826  * @item: the data to remove from the @queue
827  *
828  * Remove an item from the queue.
829  *
830  * This function must be called while holding the @queue's lock.
831  *
832  * Returns: %TRUE if the item was removed
833  *
834  * Since: 2.46
835  */
836 gboolean
g_async_queue_remove_unlocked(GAsyncQueue * queue,gpointer item)837 g_async_queue_remove_unlocked (GAsyncQueue *queue,
838                                gpointer     item)
839 {
840   g_return_val_if_fail (queue != NULL, FALSE);
841   g_return_val_if_fail (item != NULL, FALSE);
842 
843   return g_queue_remove (&queue->queue, item);
844 }
845 
846 /**
847  * g_async_queue_push_front:
848  * @queue: a #GAsyncQueue
849  * @item: data to push into the @queue
850  *
851  * Pushes the @item into the @queue. @item must not be %NULL.
852  * In contrast to g_async_queue_push(), this function
853  * pushes the new item ahead of the items already in the queue,
854  * so that it will be the next one to be popped off the queue.
855  *
856  * Since: 2.46
857  */
858 void
g_async_queue_push_front(GAsyncQueue * queue,gpointer item)859 g_async_queue_push_front (GAsyncQueue *queue,
860                           gpointer     item)
861 {
862   g_return_if_fail (queue != NULL);
863   g_return_if_fail (item != NULL);
864 
865   g_mutex_lock (&queue->mutex);
866   g_async_queue_push_front_unlocked (queue, item);
867   g_mutex_unlock (&queue->mutex);
868 }
869 
870 /**
871  * g_async_queue_push_front_unlocked:
872  * @queue: a #GAsyncQueue
873  * @item: data to push into the @queue
874  *
875  * Pushes the @item into the @queue. @item must not be %NULL.
876  * In contrast to g_async_queue_push_unlocked(), this function
877  * pushes the new item ahead of the items already in the queue,
878  * so that it will be the next one to be popped off the queue.
879  *
880  * This function must be called while holding the @queue's lock.
881  *
882  * Since: 2.46
883  */
884 void
g_async_queue_push_front_unlocked(GAsyncQueue * queue,gpointer item)885 g_async_queue_push_front_unlocked (GAsyncQueue *queue,
886                                    gpointer     item)
887 {
888   g_return_if_fail (queue != NULL);
889   g_return_if_fail (item != NULL);
890 
891   g_queue_push_tail (&queue->queue, item);
892   if (queue->waiting_threads > 0)
893     g_cond_signal (&queue->cond);
894 }
895 
896 /*
897  * Private API
898  */
899 
900 GMutex *
_g_async_queue_get_mutex(GAsyncQueue * queue)901 _g_async_queue_get_mutex (GAsyncQueue *queue)
902 {
903   g_return_val_if_fail (queue, NULL);
904 
905   return &queue->mutex;
906 }
907