1 /* GStreamer
2 * Copyright (C) 2005 Thomas Vander Stichele <thomas at apestaart dot org>
3 *
4 * gsttask.c: Unit test for GstTask
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <gst/check/gstcheck.h>
26
27 static GMutex task_lock;
28 static GCond task_cond;
29
30 static GRecMutex task_mutex;
31
32 #define TEST_RACE_ITERATIONS 1000
33
34 static void
task_resume_func(void * data)35 task_resume_func (void *data)
36 {
37 g_mutex_lock (&task_lock);
38 g_cond_signal (&task_cond);
39 g_mutex_unlock (&task_lock);
40 }
41
GST_START_TEST(test_resume)42 GST_START_TEST (test_resume)
43 {
44 GstTask *t;
45
46 t = gst_task_new (task_resume_func, &t, NULL);
47 fail_if (t == NULL);
48
49 g_rec_mutex_init (&task_mutex);
50 gst_task_set_lock (t, &task_mutex);
51
52 g_cond_init (&task_cond);
53 g_mutex_init (&task_lock);
54
55 g_mutex_lock (&task_lock);
56
57 /* Pause the task, and resume it. */
58 fail_unless (gst_task_pause (t));
59 fail_unless (gst_task_resume (t));
60
61 while (GST_TASK_STATE (t) != GST_TASK_STARTED)
62 g_cond_wait (&task_cond, &task_lock);
63
64 fail_unless (gst_task_stop (t));
65 g_mutex_unlock (&task_lock);
66 fail_unless (gst_task_join (t));
67
68 /* Make sure we cannot resume from stopped. */
69 fail_if (gst_task_resume (t));
70
71 gst_object_unref (t);
72 }
73
74 GST_END_TEST;
75
76 static void
task_signal_pause_func(void * data)77 task_signal_pause_func (void *data)
78 {
79 GstTask **t = data;
80
81 g_mutex_lock (&task_lock);
82 GST_DEBUG ("signal");
83 g_cond_signal (&task_cond);
84
85 gst_task_pause (*t);
86 g_mutex_unlock (&task_lock);
87 }
88
GST_START_TEST(test_pause_stop_race)89 GST_START_TEST (test_pause_stop_race)
90 {
91 guint it = TEST_RACE_ITERATIONS;
92 GstTask *t;
93 gboolean ret;
94
95 t = gst_task_new (task_signal_pause_func, &t, NULL);
96 fail_if (t == NULL);
97
98 g_rec_mutex_init (&task_mutex);
99 gst_task_set_lock (t, &task_mutex);
100
101 g_cond_init (&task_cond);
102 g_mutex_init (&task_lock);
103
104 while (it-- > 0) {
105 g_mutex_lock (&task_lock);
106 GST_DEBUG ("starting");
107 ret = gst_task_start (t);
108 fail_unless (ret == TRUE);
109 /* wait for it to spin up */
110 GST_DEBUG ("waiting");
111 g_cond_wait (&task_cond, &task_lock);
112 GST_DEBUG ("done waiting");
113 g_mutex_unlock (&task_lock);
114
115 GST_DEBUG ("starting");
116 ret = gst_task_stop (t);
117 fail_unless (ret == TRUE);
118
119 GST_DEBUG ("joining");
120 ret = gst_task_join (t);
121 fail_unless (ret == TRUE);
122 }
123
124 g_cond_clear (&task_cond);
125 g_mutex_clear (&task_lock);
126
127 gst_object_unref (t);
128 }
129
130 GST_END_TEST;
131
132 static void
task_func2(void * data)133 task_func2 (void *data)
134 {
135 gboolean ret;
136 GstTask *t = *((GstTask **) data);
137
138 g_mutex_lock (&task_lock);
139 GST_DEBUG ("signal");
140 g_cond_signal (&task_cond);
141 g_mutex_unlock (&task_lock);
142
143 ASSERT_WARNING (ret = gst_task_join (t));
144 fail_unless (ret == FALSE);
145 }
146
GST_START_TEST(test_join)147 GST_START_TEST (test_join)
148 {
149 GstTask *t;
150 gboolean ret;
151
152 t = gst_task_new (task_func2, &t, NULL);
153 fail_if (t == NULL);
154
155 g_rec_mutex_init (&task_mutex);
156 gst_task_set_lock (t, &task_mutex);
157
158 g_cond_init (&task_cond);
159 g_mutex_init (&task_lock);
160
161 g_mutex_lock (&task_lock);
162 GST_DEBUG ("starting");
163 ret = gst_task_start (t);
164 fail_unless (ret == TRUE);
165 /* wait for it to spin up */
166 GST_DEBUG ("waiting");
167 g_cond_wait (&task_cond, &task_lock);
168 GST_DEBUG ("done waiting");
169 g_mutex_unlock (&task_lock);
170
171 GST_DEBUG ("joining");
172 ret = gst_task_join (t);
173 fail_unless (ret == TRUE);
174
175 gst_task_cleanup_all ();
176
177 gst_object_unref (t);
178 }
179
180 GST_END_TEST;
181
182 static void
task_func(void * data)183 task_func (void *data)
184 {
185 g_mutex_lock (&task_lock);
186 GST_DEBUG ("signal");
187 g_cond_signal (&task_cond);
188 g_mutex_unlock (&task_lock);
189 }
190
GST_START_TEST(test_lock_start)191 GST_START_TEST (test_lock_start)
192 {
193 GstTask *t;
194 gboolean ret;
195
196 t = gst_task_new (task_func, NULL, NULL);
197 fail_if (t == NULL);
198
199 g_rec_mutex_init (&task_mutex);
200 gst_task_set_lock (t, &task_mutex);
201
202 g_cond_init (&task_cond);
203 g_mutex_init (&task_lock);
204
205 g_mutex_lock (&task_lock);
206 GST_DEBUG ("starting");
207 ret = gst_task_start (t);
208 fail_unless (ret == TRUE);
209 /* wait for it to spin up */
210 GST_DEBUG ("waiting");
211 g_cond_wait (&task_cond, &task_lock);
212 GST_DEBUG ("done waiting");
213 g_mutex_unlock (&task_lock);
214
215 /* cannot set mutex now */
216 ASSERT_WARNING (gst_task_set_lock (t, &task_mutex));
217
218 GST_DEBUG ("joining");
219 ret = gst_task_join (t);
220 fail_unless (ret == TRUE);
221
222 gst_task_cleanup_all ();
223
224 gst_object_unref (t);
225 }
226
227 GST_END_TEST;
228
GST_START_TEST(test_lock)229 GST_START_TEST (test_lock)
230 {
231 GstTask *t;
232 gboolean ret;
233
234 t = gst_task_new (task_func, NULL, NULL);
235 fail_if (t == NULL);
236
237 g_rec_mutex_init (&task_mutex);
238 gst_task_set_lock (t, &task_mutex);
239
240 GST_DEBUG ("pause");
241 ret = gst_task_pause (t);
242 fail_unless (ret == TRUE);
243
244 g_usleep (1 * G_USEC_PER_SEC / 2);
245
246 GST_DEBUG ("joining");
247 ret = gst_task_join (t);
248 fail_unless (ret == TRUE);
249
250 g_usleep (1 * G_USEC_PER_SEC / 2);
251
252 gst_object_unref (t);
253 }
254
255 GST_END_TEST;
256
GST_START_TEST(test_no_lock)257 GST_START_TEST (test_no_lock)
258 {
259 GstTask *t;
260 gboolean ret;
261
262 t = gst_task_new (task_func, NULL, NULL);
263 fail_if (t == NULL);
264
265 /* stop should be possible without lock */
266 gst_task_stop (t);
267
268 /* pause should give a warning */
269 ASSERT_WARNING (ret = gst_task_pause (t));
270 fail_unless (ret == FALSE);
271
272 /* start should give a warning */
273 ASSERT_WARNING (ret = gst_task_start (t));
274 fail_unless (ret == FALSE);
275
276 /* stop should be possible without lock */
277 gst_task_stop (t);
278
279 gst_object_unref (t);
280 }
281
282 GST_END_TEST;
283
GST_START_TEST(test_create)284 GST_START_TEST (test_create)
285 {
286 GstTask *t;
287
288 t = gst_task_new (task_func, NULL, NULL);
289 fail_if (t == NULL);
290
291 gst_object_unref (t);
292 }
293
294 GST_END_TEST;
295
296 typedef struct
297 {
298 gboolean called;
299 gpointer caller_thread;
300
301 GCond blocked_cond;
302 GMutex blocked_lock;
303 gboolean blocked;
304
305 GCond unblock_cond;
306 GMutex unblock_lock;
307 gboolean unblock;
308 } TaskData;
309
310 static void
task_cb(TaskData * tdata)311 task_cb (TaskData * tdata)
312 {
313 tdata->called = TRUE;
314 tdata->caller_thread = g_thread_self ();
315
316 g_mutex_lock (&tdata->blocked_lock);
317 tdata->blocked = TRUE;
318 g_cond_signal (&tdata->blocked_cond);
319 g_mutex_unlock (&tdata->blocked_lock);
320
321 g_mutex_lock (&tdata->unblock_lock);
322 while (!tdata->unblock)
323 g_cond_wait (&tdata->unblock_cond, &tdata->unblock_lock);
324
325 g_mutex_unlock (&tdata->unblock_lock);
326 }
327
328 static void
init_task_data(TaskData * tdata)329 init_task_data (TaskData * tdata)
330 {
331 tdata->called = FALSE;
332 tdata->caller_thread = NULL;
333 tdata->unblock = FALSE;
334 g_cond_init (&tdata->unblock_cond);
335 g_mutex_init (&tdata->unblock_lock);
336
337 tdata->blocked = FALSE;
338 g_cond_init (&tdata->blocked_cond);
339 g_mutex_init (&tdata->blocked_lock);
340 }
341
342 static void
cleanup_task_data(TaskData * tdata)343 cleanup_task_data (TaskData * tdata)
344 {
345 g_mutex_clear (&tdata->unblock_lock);
346 g_cond_clear (&tdata->unblock_cond);
347 g_mutex_clear (&tdata->blocked_lock);
348 g_cond_clear (&tdata->blocked_cond);
349 }
350
351 /* In this test, we use a shared task pool with max-threads=1 and verify
352 * that the caller thread for two tasks is the same */
GST_START_TEST(test_shared_task_pool_shared_thread)353 GST_START_TEST (test_shared_task_pool_shared_thread)
354 {
355 GstTaskPool *pool;
356 gpointer handle, handle2;
357 GError *err = NULL;
358 TaskData tdata, tdata2;
359
360 init_task_data (&tdata);
361 init_task_data (&tdata2);
362
363 pool = gst_shared_task_pool_new ();
364 gst_task_pool_prepare (pool, &err);
365
366 fail_unless (err == NULL);
367
368 /* We request that two tasks be executed, and our task function is blocking.
369 * This means no new thread is available to spawn, and the second task should
370 * be queued up on the first thread */
371 handle =
372 gst_task_pool_push (pool, (GstTaskPoolFunction) task_cb, &tdata, &err);
373 fail_unless (err == NULL);
374 handle2 =
375 gst_task_pool_push (pool, (GstTaskPoolFunction) task_cb, &tdata2, &err);
376 fail_unless (err == NULL);
377
378 g_mutex_lock (&tdata.unblock_lock);
379 tdata.unblock = TRUE;
380 g_cond_signal (&tdata.unblock_cond);
381 g_mutex_unlock (&tdata.unblock_lock);
382
383 g_mutex_lock (&tdata2.unblock_lock);
384 tdata2.unblock = TRUE;
385 g_cond_signal (&tdata2.unblock_cond);
386 g_mutex_unlock (&tdata2.unblock_lock);
387
388 gst_task_pool_join (pool, handle);
389 gst_task_pool_join (pool, handle2);
390
391 fail_unless (tdata.called == TRUE);
392 fail_unless (tdata2.called == TRUE);
393 fail_unless (tdata.caller_thread == tdata2.caller_thread);
394
395 cleanup_task_data (&tdata);
396 cleanup_task_data (&tdata2);
397
398 gst_task_pool_cleanup (pool);
399
400 g_object_unref (pool);
401 }
402
403 GST_END_TEST;
404
405 /* In this test, we use a shared task pool with max-threads=2 and verify
406 * that the caller thread for two tasks is different */
GST_START_TEST(test_shared_task_pool_two_threads)407 GST_START_TEST (test_shared_task_pool_two_threads)
408 {
409 GstTaskPool *pool;
410 gpointer handle, handle2;
411 GError *err = NULL;
412 TaskData tdata, tdata2;
413
414 init_task_data (&tdata);
415 init_task_data (&tdata2);
416
417 pool = gst_shared_task_pool_new ();
418 gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (pool), 2);
419 gst_task_pool_prepare (pool, &err);
420
421 fail_unless (err == NULL);
422
423 /* We request that two tasks be executed, and our task function is blocking.
424 * This means the pool will have to spawn a new thread to handle the task */
425 handle =
426 gst_task_pool_push (pool, (GstTaskPoolFunction) task_cb, &tdata, &err);
427 fail_unless (err == NULL);
428 handle2 =
429 gst_task_pool_push (pool, (GstTaskPoolFunction) task_cb, &tdata2, &err);
430 fail_unless (err == NULL);
431
432 /* Make sure that the second task has started executing before unblocking */
433 g_mutex_lock (&tdata2.blocked_lock);
434 while (!tdata2.blocked) {
435 g_cond_wait (&tdata2.blocked_cond, &tdata2.blocked_lock);
436 }
437 g_mutex_unlock (&tdata2.blocked_lock);
438
439 g_mutex_lock (&tdata.unblock_lock);
440 tdata.unblock = TRUE;
441 g_cond_signal (&tdata.unblock_cond);
442 g_mutex_unlock (&tdata.unblock_lock);
443
444 g_mutex_lock (&tdata2.unblock_lock);
445 tdata2.unblock = TRUE;
446 g_cond_signal (&tdata2.unblock_cond);
447 g_mutex_unlock (&tdata2.unblock_lock);
448
449 gst_task_pool_join (pool, handle);
450 gst_task_pool_join (pool, handle2);
451
452 fail_unless (tdata.called == TRUE);
453 fail_unless (tdata2.called == TRUE);
454
455 fail_unless (tdata.caller_thread != tdata2.caller_thread);
456
457 cleanup_task_data (&tdata);
458 cleanup_task_data (&tdata2);
459
460 gst_task_pool_cleanup (pool);
461
462 g_object_unref (pool);
463 }
464
465 GST_END_TEST;
466
467 static Suite *
gst_task_suite(void)468 gst_task_suite (void)
469 {
470 Suite *s = suite_create ("GstTask");
471 TCase *tc_chain = tcase_create ("task tests");
472
473 suite_add_tcase (s, tc_chain);
474 tcase_add_test (tc_chain, test_create);
475 tcase_add_test (tc_chain, test_no_lock);
476 tcase_add_test (tc_chain, test_lock);
477 tcase_add_test (tc_chain, test_lock_start);
478 tcase_add_test (tc_chain, test_join);
479 tcase_add_test (tc_chain, test_pause_stop_race);
480 tcase_add_test (tc_chain, test_resume);
481 tcase_add_test (tc_chain, test_shared_task_pool_shared_thread);
482 tcase_add_test (tc_chain, test_shared_task_pool_two_threads);
483
484 return s;
485 }
486
487 GST_CHECK_MAIN (gst_task);
488