• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2005 Thomas Vander Stichele <thomas at apestaart dot org>
3  *
4  * gsttask.c: Unit test for GstTask
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <gst/check/gstcheck.h>
26 
27 static GMutex task_lock;
28 static GCond task_cond;
29 
30 static GRecMutex task_mutex;
31 
32 #define TEST_RACE_ITERATIONS 1000
33 
34 static void
task_resume_func(void * data)35 task_resume_func (void *data)
36 {
37   g_mutex_lock (&task_lock);
38   g_cond_signal (&task_cond);
39   g_mutex_unlock (&task_lock);
40 }
41 
GST_START_TEST(test_resume)42 GST_START_TEST (test_resume)
43 {
44   GstTask *t;
45 
46   t = gst_task_new (task_resume_func, &t, NULL);
47   fail_if (t == NULL);
48 
49   g_rec_mutex_init (&task_mutex);
50   gst_task_set_lock (t, &task_mutex);
51 
52   g_cond_init (&task_cond);
53   g_mutex_init (&task_lock);
54 
55   g_mutex_lock (&task_lock);
56 
57   /* Pause the task, and resume it. */
58   fail_unless (gst_task_pause (t));
59   fail_unless (gst_task_resume (t));
60 
61   while (GST_TASK_STATE (t) != GST_TASK_STARTED)
62     g_cond_wait (&task_cond, &task_lock);
63 
64   fail_unless (gst_task_stop (t));
65   g_mutex_unlock (&task_lock);
66   fail_unless (gst_task_join (t));
67 
68   /* Make sure we cannot resume from stopped. */
69   fail_if (gst_task_resume (t));
70 
71   gst_object_unref (t);
72 }
73 
74 GST_END_TEST;
75 
76 static void
task_signal_pause_func(void * data)77 task_signal_pause_func (void *data)
78 {
79   GstTask **t = data;
80 
81   g_mutex_lock (&task_lock);
82   GST_DEBUG ("signal");
83   g_cond_signal (&task_cond);
84 
85   gst_task_pause (*t);
86   g_mutex_unlock (&task_lock);
87 }
88 
GST_START_TEST(test_pause_stop_race)89 GST_START_TEST (test_pause_stop_race)
90 {
91   guint it = TEST_RACE_ITERATIONS;
92   GstTask *t;
93   gboolean ret;
94 
95   t = gst_task_new (task_signal_pause_func, &t, NULL);
96   fail_if (t == NULL);
97 
98   g_rec_mutex_init (&task_mutex);
99   gst_task_set_lock (t, &task_mutex);
100 
101   g_cond_init (&task_cond);
102   g_mutex_init (&task_lock);
103 
104   while (it-- > 0) {
105     g_mutex_lock (&task_lock);
106     GST_DEBUG ("starting");
107     ret = gst_task_start (t);
108     fail_unless (ret == TRUE);
109     /* wait for it to spin up */
110     GST_DEBUG ("waiting");
111     g_cond_wait (&task_cond, &task_lock);
112     GST_DEBUG ("done waiting");
113     g_mutex_unlock (&task_lock);
114 
115     GST_DEBUG ("starting");
116     ret = gst_task_stop (t);
117     fail_unless (ret == TRUE);
118 
119     GST_DEBUG ("joining");
120     ret = gst_task_join (t);
121     fail_unless (ret == TRUE);
122   }
123 
124   g_cond_clear (&task_cond);
125   g_mutex_clear (&task_lock);
126 
127   gst_object_unref (t);
128 }
129 
130 GST_END_TEST;
131 
132 static void
task_func2(void * data)133 task_func2 (void *data)
134 {
135   gboolean ret;
136   GstTask *t = *((GstTask **) data);
137 
138   g_mutex_lock (&task_lock);
139   GST_DEBUG ("signal");
140   g_cond_signal (&task_cond);
141   g_mutex_unlock (&task_lock);
142 
143   ASSERT_WARNING (ret = gst_task_join (t));
144   fail_unless (ret == FALSE);
145 }
146 
GST_START_TEST(test_join)147 GST_START_TEST (test_join)
148 {
149   GstTask *t;
150   gboolean ret;
151 
152   t = gst_task_new (task_func2, &t, NULL);
153   fail_if (t == NULL);
154 
155   g_rec_mutex_init (&task_mutex);
156   gst_task_set_lock (t, &task_mutex);
157 
158   g_cond_init (&task_cond);
159   g_mutex_init (&task_lock);
160 
161   g_mutex_lock (&task_lock);
162   GST_DEBUG ("starting");
163   ret = gst_task_start (t);
164   fail_unless (ret == TRUE);
165   /* wait for it to spin up */
166   GST_DEBUG ("waiting");
167   g_cond_wait (&task_cond, &task_lock);
168   GST_DEBUG ("done waiting");
169   g_mutex_unlock (&task_lock);
170 
171   GST_DEBUG ("joining");
172   ret = gst_task_join (t);
173   fail_unless (ret == TRUE);
174 
175   gst_task_cleanup_all ();
176 
177   gst_object_unref (t);
178 }
179 
180 GST_END_TEST;
181 
182 static void
task_func(void * data)183 task_func (void *data)
184 {
185   g_mutex_lock (&task_lock);
186   GST_DEBUG ("signal");
187   g_cond_signal (&task_cond);
188   g_mutex_unlock (&task_lock);
189 }
190 
GST_START_TEST(test_lock_start)191 GST_START_TEST (test_lock_start)
192 {
193   GstTask *t;
194   gboolean ret;
195 
196   t = gst_task_new (task_func, NULL, NULL);
197   fail_if (t == NULL);
198 
199   g_rec_mutex_init (&task_mutex);
200   gst_task_set_lock (t, &task_mutex);
201 
202   g_cond_init (&task_cond);
203   g_mutex_init (&task_lock);
204 
205   g_mutex_lock (&task_lock);
206   GST_DEBUG ("starting");
207   ret = gst_task_start (t);
208   fail_unless (ret == TRUE);
209   /* wait for it to spin up */
210   GST_DEBUG ("waiting");
211   g_cond_wait (&task_cond, &task_lock);
212   GST_DEBUG ("done waiting");
213   g_mutex_unlock (&task_lock);
214 
215   /* cannot set mutex now */
216   ASSERT_WARNING (gst_task_set_lock (t, &task_mutex));
217 
218   GST_DEBUG ("joining");
219   ret = gst_task_join (t);
220   fail_unless (ret == TRUE);
221 
222   gst_task_cleanup_all ();
223 
224   gst_object_unref (t);
225 }
226 
227 GST_END_TEST;
228 
GST_START_TEST(test_lock)229 GST_START_TEST (test_lock)
230 {
231   GstTask *t;
232   gboolean ret;
233 
234   t = gst_task_new (task_func, NULL, NULL);
235   fail_if (t == NULL);
236 
237   g_rec_mutex_init (&task_mutex);
238   gst_task_set_lock (t, &task_mutex);
239 
240   GST_DEBUG ("pause");
241   ret = gst_task_pause (t);
242   fail_unless (ret == TRUE);
243 
244   g_usleep (1 * G_USEC_PER_SEC / 2);
245 
246   GST_DEBUG ("joining");
247   ret = gst_task_join (t);
248   fail_unless (ret == TRUE);
249 
250   g_usleep (1 * G_USEC_PER_SEC / 2);
251 
252   gst_object_unref (t);
253 }
254 
255 GST_END_TEST;
256 
GST_START_TEST(test_no_lock)257 GST_START_TEST (test_no_lock)
258 {
259   GstTask *t;
260   gboolean ret;
261 
262   t = gst_task_new (task_func, NULL, NULL);
263   fail_if (t == NULL);
264 
265   /* stop should be possible without lock */
266   gst_task_stop (t);
267 
268   /* pause should give a warning */
269   ASSERT_WARNING (ret = gst_task_pause (t));
270   fail_unless (ret == FALSE);
271 
272   /* start should give a warning */
273   ASSERT_WARNING (ret = gst_task_start (t));
274   fail_unless (ret == FALSE);
275 
276   /* stop should be possible without lock */
277   gst_task_stop (t);
278 
279   gst_object_unref (t);
280 }
281 
282 GST_END_TEST;
283 
GST_START_TEST(test_create)284 GST_START_TEST (test_create)
285 {
286   GstTask *t;
287 
288   t = gst_task_new (task_func, NULL, NULL);
289   fail_if (t == NULL);
290 
291   gst_object_unref (t);
292 }
293 
294 GST_END_TEST;
295 
296 typedef struct
297 {
298   gboolean called;
299   gpointer caller_thread;
300 
301   GCond blocked_cond;
302   GMutex blocked_lock;
303   gboolean blocked;
304 
305   GCond unblock_cond;
306   GMutex unblock_lock;
307   gboolean unblock;
308 } TaskData;
309 
310 static void
task_cb(TaskData * tdata)311 task_cb (TaskData * tdata)
312 {
313   tdata->called = TRUE;
314   tdata->caller_thread = g_thread_self ();
315 
316   g_mutex_lock (&tdata->blocked_lock);
317   tdata->blocked = TRUE;
318   g_cond_signal (&tdata->blocked_cond);
319   g_mutex_unlock (&tdata->blocked_lock);
320 
321   g_mutex_lock (&tdata->unblock_lock);
322   while (!tdata->unblock)
323     g_cond_wait (&tdata->unblock_cond, &tdata->unblock_lock);
324 
325   g_mutex_unlock (&tdata->unblock_lock);
326 }
327 
328 static void
init_task_data(TaskData * tdata)329 init_task_data (TaskData * tdata)
330 {
331   tdata->called = FALSE;
332   tdata->caller_thread = NULL;
333   tdata->unblock = FALSE;
334   g_cond_init (&tdata->unblock_cond);
335   g_mutex_init (&tdata->unblock_lock);
336 
337   tdata->blocked = FALSE;
338   g_cond_init (&tdata->blocked_cond);
339   g_mutex_init (&tdata->blocked_lock);
340 }
341 
342 static void
cleanup_task_data(TaskData * tdata)343 cleanup_task_data (TaskData * tdata)
344 {
345   g_mutex_clear (&tdata->unblock_lock);
346   g_cond_clear (&tdata->unblock_cond);
347   g_mutex_clear (&tdata->blocked_lock);
348   g_cond_clear (&tdata->blocked_cond);
349 }
350 
351 /* In this test, we use a shared task pool with max-threads=1 and verify
352  * that the caller thread for two tasks is the same */
GST_START_TEST(test_shared_task_pool_shared_thread)353 GST_START_TEST (test_shared_task_pool_shared_thread)
354 {
355   GstTaskPool *pool;
356   gpointer handle, handle2;
357   GError *err = NULL;
358   TaskData tdata, tdata2;
359 
360   init_task_data (&tdata);
361   init_task_data (&tdata2);
362 
363   pool = gst_shared_task_pool_new ();
364   gst_task_pool_prepare (pool, &err);
365 
366   fail_unless (err == NULL);
367 
368   /* We request that two tasks be executed, and our task function is blocking.
369    * This means no new thread is available to spawn, and the second task should
370    * be queued up on the first thread */
371   handle =
372       gst_task_pool_push (pool, (GstTaskPoolFunction) task_cb, &tdata, &err);
373   fail_unless (err == NULL);
374   handle2 =
375       gst_task_pool_push (pool, (GstTaskPoolFunction) task_cb, &tdata2, &err);
376   fail_unless (err == NULL);
377 
378   g_mutex_lock (&tdata.unblock_lock);
379   tdata.unblock = TRUE;
380   g_cond_signal (&tdata.unblock_cond);
381   g_mutex_unlock (&tdata.unblock_lock);
382 
383   g_mutex_lock (&tdata2.unblock_lock);
384   tdata2.unblock = TRUE;
385   g_cond_signal (&tdata2.unblock_cond);
386   g_mutex_unlock (&tdata2.unblock_lock);
387 
388   gst_task_pool_join (pool, handle);
389   gst_task_pool_join (pool, handle2);
390 
391   fail_unless (tdata.called == TRUE);
392   fail_unless (tdata2.called == TRUE);
393   fail_unless (tdata.caller_thread == tdata2.caller_thread);
394 
395   cleanup_task_data (&tdata);
396   cleanup_task_data (&tdata2);
397 
398   gst_task_pool_cleanup (pool);
399 
400   g_object_unref (pool);
401 }
402 
403 GST_END_TEST;
404 
405 /* In this test, we use a shared task pool with max-threads=2 and verify
406  * that the caller thread for two tasks is different */
GST_START_TEST(test_shared_task_pool_two_threads)407 GST_START_TEST (test_shared_task_pool_two_threads)
408 {
409   GstTaskPool *pool;
410   gpointer handle, handle2;
411   GError *err = NULL;
412   TaskData tdata, tdata2;
413 
414   init_task_data (&tdata);
415   init_task_data (&tdata2);
416 
417   pool = gst_shared_task_pool_new ();
418   gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (pool), 2);
419   gst_task_pool_prepare (pool, &err);
420 
421   fail_unless (err == NULL);
422 
423   /* We request that two tasks be executed, and our task function is blocking.
424    * This means the pool will have to spawn a new thread to handle the task */
425   handle =
426       gst_task_pool_push (pool, (GstTaskPoolFunction) task_cb, &tdata, &err);
427   fail_unless (err == NULL);
428   handle2 =
429       gst_task_pool_push (pool, (GstTaskPoolFunction) task_cb, &tdata2, &err);
430   fail_unless (err == NULL);
431 
432   /* Make sure that the second task has started executing before unblocking */
433   g_mutex_lock (&tdata2.blocked_lock);
434   while (!tdata2.blocked) {
435     g_cond_wait (&tdata2.blocked_cond, &tdata2.blocked_lock);
436   }
437   g_mutex_unlock (&tdata2.blocked_lock);
438 
439   g_mutex_lock (&tdata.unblock_lock);
440   tdata.unblock = TRUE;
441   g_cond_signal (&tdata.unblock_cond);
442   g_mutex_unlock (&tdata.unblock_lock);
443 
444   g_mutex_lock (&tdata2.unblock_lock);
445   tdata2.unblock = TRUE;
446   g_cond_signal (&tdata2.unblock_cond);
447   g_mutex_unlock (&tdata2.unblock_lock);
448 
449   gst_task_pool_join (pool, handle);
450   gst_task_pool_join (pool, handle2);
451 
452   fail_unless (tdata.called == TRUE);
453   fail_unless (tdata2.called == TRUE);
454 
455   fail_unless (tdata.caller_thread != tdata2.caller_thread);
456 
457   cleanup_task_data (&tdata);
458   cleanup_task_data (&tdata2);
459 
460   gst_task_pool_cleanup (pool);
461 
462   g_object_unref (pool);
463 }
464 
465 GST_END_TEST;
466 
467 static Suite *
gst_task_suite(void)468 gst_task_suite (void)
469 {
470   Suite *s = suite_create ("GstTask");
471   TCase *tc_chain = tcase_create ("task tests");
472 
473   suite_add_tcase (s, tc_chain);
474   tcase_add_test (tc_chain, test_create);
475   tcase_add_test (tc_chain, test_no_lock);
476   tcase_add_test (tc_chain, test_lock);
477   tcase_add_test (tc_chain, test_lock_start);
478   tcase_add_test (tc_chain, test_join);
479   tcase_add_test (tc_chain, test_pause_stop_race);
480   tcase_add_test (tc_chain, test_resume);
481   tcase_add_test (tc_chain, test_shared_task_pool_shared_thread);
482   tcase_add_test (tc_chain, test_shared_task_pool_two_threads);
483 
484   return s;
485 }
486 
487 GST_CHECK_MAIN (gst_task);
488