1 /* GStreamer message bus unit tests
2 * Copyright (C) 2005 Andy Wingo <wingo@pobox.com>
3 * Copyright (C) 2007 Tim-Philipp Müller <tim centricular net>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
14 *
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23
24 #include <gst/check/gstcheck.h>
25
26 static GstBus *test_bus = NULL;
27 static GMainLoop *main_loop;
28
29 static GType foo_device_get_type (void);
30
31 #define NUM_MESSAGES 1000
32 #define NUM_THREADS 10
33
34 static gpointer
pound_bus_with_messages(gpointer data)35 pound_bus_with_messages (gpointer data)
36 {
37 gint thread_id = GPOINTER_TO_INT (data);
38 gint i;
39
40 for (i = 0; i < NUM_MESSAGES; i++) {
41 GstMessage *m;
42 GstStructure *s;
43
44 s = gst_structure_new ("test_message",
45 "thread_id", G_TYPE_INT, thread_id, "msg_id", G_TYPE_INT, i, NULL);
46 m = gst_message_new_application (NULL, s);
47 gst_bus_post (test_bus, m);
48 }
49 return NULL;
50 }
51
52 static void
pull_messages(void)53 pull_messages (void)
54 {
55 GstMessage *m;
56 const GstStructure *s;
57 guint message_ids[NUM_THREADS];
58 gint i;
59
60 for (i = 0; i < NUM_THREADS; i++)
61 message_ids[i] = 0;
62
63 while (1) {
64 gint _t, _i;
65
66 m = gst_bus_pop (test_bus);
67 if (!m)
68 break;
69 g_return_if_fail (GST_MESSAGE_TYPE (m) == GST_MESSAGE_APPLICATION);
70
71 s = gst_message_get_structure (m);
72 if (!gst_structure_get_int (s, "thread_id", &_t))
73 g_critical ("Invalid message");
74 if (!gst_structure_get_int (s, "msg_id", &_i))
75 g_critical ("Invalid message");
76
77 g_return_if_fail (_t < NUM_THREADS);
78 g_return_if_fail (_i == message_ids[_t]++);
79
80 gst_message_unref (m);
81 }
82
83 for (i = 0; i < NUM_THREADS; i++)
84 g_return_if_fail (message_ids[i] == NUM_MESSAGES);
85 }
86
GST_START_TEST(test_hammer_bus)87 GST_START_TEST (test_hammer_bus)
88 {
89 GThread *threads[NUM_THREADS];
90 gint i;
91
92 test_bus = gst_bus_new ();
93
94 for (i = 0; i < NUM_THREADS; i++)
95 threads[i] = g_thread_try_new ("gst-check", pound_bus_with_messages,
96 GINT_TO_POINTER (i), NULL);
97
98 for (i = 0; i < NUM_THREADS; i++)
99 g_thread_join (threads[i]);
100
101 pull_messages ();
102
103 gst_object_unref ((GstObject *) test_bus);
104 }
105
106 GST_END_TEST;
107
108 static gboolean
message_func_eos(GstBus * bus,GstMessage * message,guint * p_counter)109 message_func_eos (GstBus * bus, GstMessage * message, guint * p_counter)
110 {
111 const GstStructure *s;
112 gint i;
113
114 g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS, FALSE);
115
116 GST_DEBUG ("got EOS message");
117
118 s = gst_message_get_structure (message);
119 if (!gst_structure_get_int (s, "msg_id", &i))
120 g_critical ("Invalid message");
121
122 if (p_counter != NULL)
123 *p_counter += 1;
124
125 return i != 9;
126 }
127
128 static gboolean
message_func_app(GstBus * bus,GstMessage * message,guint * p_counter)129 message_func_app (GstBus * bus, GstMessage * message, guint * p_counter)
130 {
131 const GstStructure *s;
132 gint i;
133
134 g_return_val_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION,
135 FALSE);
136
137 GST_DEBUG ("got APP message");
138
139 s = gst_message_get_structure (message);
140 if (!gst_structure_get_int (s, "msg_id", &i))
141 g_critical ("Invalid message");
142
143 if (p_counter != NULL)
144 *p_counter += 1;
145
146 return i != 9;
147 }
148
149 static gboolean
send_messages(gpointer data)150 send_messages (gpointer data)
151 {
152 GstMessage *m;
153 GstStructure *s;
154 gint i;
155
156 for (i = 0; i < 10; i++) {
157 s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
158 m = gst_message_new_application (NULL, s);
159 gst_bus_post (test_bus, m);
160 s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
161 m = gst_message_new_custom (GST_MESSAGE_EOS, NULL, s);
162 gst_bus_post (test_bus, m);
163 }
164
165 return FALSE;
166 }
167
168 /* test if adding a signal watch for different message types calls the
169 * respective callbacks. */
GST_START_TEST(test_watch)170 GST_START_TEST (test_watch)
171 {
172 guint num_eos = 0;
173 guint num_app = 0;
174 guint id;
175
176 test_bus = gst_bus_new ();
177
178 main_loop = g_main_loop_new (NULL, FALSE);
179
180 id = gst_bus_add_watch (test_bus, gst_bus_async_signal_func, NULL);
181 fail_if (id == 0);
182 g_signal_connect (test_bus, "message::eos", (GCallback) message_func_eos,
183 &num_eos);
184 g_signal_connect (test_bus, "message::application",
185 (GCallback) message_func_app, &num_app);
186
187 g_idle_add ((GSourceFunc) send_messages, NULL);
188 while (g_main_context_pending (NULL))
189 g_main_context_iteration (NULL, FALSE);
190
191 fail_unless_equals_int (num_eos, 10);
192 fail_unless_equals_int (num_app, 10);
193
194 fail_unless (gst_bus_remove_watch (test_bus));
195 g_main_loop_unref (main_loop);
196
197 gst_object_unref ((GstObject *) test_bus);
198 }
199
200 GST_END_TEST;
201
202 /* test if adding a signal watch for different message types calls the
203 * respective callbacks. */
GST_START_TEST(test_watch_with_custom_context)204 GST_START_TEST (test_watch_with_custom_context)
205 {
206 GMainContext *ctx;
207 GSource *source;
208 guint num_eos = 0;
209 guint num_app = 0;
210 guint id;
211
212 test_bus = gst_bus_new ();
213
214 ctx = g_main_context_new ();
215 main_loop = g_main_loop_new (ctx, FALSE);
216
217 source = gst_bus_create_watch (test_bus);
218 g_source_set_callback (source, (GSourceFunc) gst_bus_async_signal_func, NULL,
219 NULL);
220 id = g_source_attach (source, ctx);
221 g_source_unref (source);
222 fail_if (id == 0);
223
224 g_signal_connect (test_bus, "message::eos", (GCallback) message_func_eos,
225 &num_eos);
226 g_signal_connect (test_bus, "message::application",
227 (GCallback) message_func_app, &num_app);
228
229 source = g_idle_source_new ();
230 g_source_set_callback (source, (GSourceFunc) send_messages, NULL, NULL);
231 g_source_attach (source, ctx);
232 g_source_unref (source);
233
234 while (g_main_context_pending (ctx))
235 g_main_context_iteration (ctx, FALSE);
236
237 fail_unless_equals_int (num_eos, 10);
238 fail_unless_equals_int (num_app, 10);
239
240 if ((source = g_main_context_find_source_by_id (ctx, id)))
241 g_source_destroy (source);
242 g_main_loop_unref (main_loop);
243 g_main_context_unref (ctx);
244
245 gst_object_unref (test_bus);
246 }
247
248 GST_END_TEST;
249
250 /* test if adding a signal watch for different message types calls the
251 * respective callbacks. */
GST_START_TEST(test_add_watch_with_custom_context)252 GST_START_TEST (test_add_watch_with_custom_context)
253 {
254 GMainContext *ctx;
255 GSource *source;
256 guint num_eos = 0;
257 guint num_app = 0;
258
259 test_bus = gst_bus_new ();
260
261 ctx = g_main_context_new ();
262 main_loop = g_main_loop_new (ctx, FALSE);
263
264 g_main_context_push_thread_default (ctx);
265 gst_bus_add_signal_watch (test_bus);
266 g_main_context_pop_thread_default (ctx);
267
268 g_signal_connect (test_bus, "message::eos", (GCallback) message_func_eos,
269 &num_eos);
270 g_signal_connect (test_bus, "message::application",
271 (GCallback) message_func_app, &num_app);
272
273 source = g_idle_source_new ();
274 g_source_set_callback (source, (GSourceFunc) send_messages, NULL, NULL);
275 g_source_attach (source, ctx);
276 g_source_unref (source);
277
278 while (g_main_context_pending (ctx))
279 g_main_context_iteration (ctx, FALSE);
280
281 fail_unless_equals_int (num_eos, 10);
282 fail_unless_equals_int (num_app, 10);
283
284 g_main_loop_unref (main_loop);
285 g_main_context_unref (ctx);
286
287 gst_object_unref (test_bus);
288 }
289
290 GST_END_TEST;
291
292 static gboolean
dummy_bus_func(GstBus * bus,GstMessage * msg,gpointer user_data)293 dummy_bus_func (GstBus * bus, GstMessage * msg, gpointer user_data)
294 {
295 return TRUE;
296 }
297
GST_START_TEST(test_remove_watch)298 GST_START_TEST (test_remove_watch)
299 {
300 test_bus = gst_bus_new ();
301
302 /* removing a non-existing watch should fail */
303 fail_if (gst_bus_remove_watch (test_bus));
304
305 gst_bus_add_watch (test_bus, dummy_bus_func, NULL);
306
307 fail_unless (gst_bus_remove_watch (test_bus));
308
309 /* now it should fail to remove the watch again */
310 fail_if (gst_bus_remove_watch (test_bus));
311
312 gst_object_unref (test_bus);
313 }
314
315 GST_END_TEST;
316
317 static gint messages_seen;
318
319 static void
message_func(GstBus * bus,GstMessage * message,gpointer data)320 message_func (GstBus * bus, GstMessage * message, gpointer data)
321 {
322 g_return_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION);
323
324 messages_seen++;
325 }
326
327 static void
send_5app_1el_1err_2app_1eos_messages(guint interval_usecs)328 send_5app_1el_1err_2app_1eos_messages (guint interval_usecs)
329 {
330 GstMessage *m;
331 GstStructure *s;
332 gint i;
333
334 for (i = 0; i < 5; i++) {
335 s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
336 m = gst_message_new_application (NULL, s);
337 GST_LOG ("posting application message");
338 gst_bus_post (test_bus, m);
339 g_usleep (interval_usecs);
340 }
341 for (i = 0; i < 1; i++) {
342 s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
343 m = gst_message_new_element (NULL, s);
344 GST_LOG ("posting element message");
345 gst_bus_post (test_bus, m);
346 g_usleep (interval_usecs);
347 }
348 for (i = 0; i < 1; i++) {
349 m = gst_message_new_error (NULL, NULL, "debug string");
350 GST_LOG ("posting error message");
351 gst_bus_post (test_bus, m);
352 g_usleep (interval_usecs);
353 }
354 for (i = 0; i < 2; i++) {
355 s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
356 m = gst_message_new_application (NULL, s);
357 GST_LOG ("posting application message");
358 gst_bus_post (test_bus, m);
359 g_usleep (interval_usecs);
360 }
361 for (i = 0; i < 1; i++) {
362 m = gst_message_new_eos (NULL);
363 GST_LOG ("posting EOS message");
364 gst_bus_post (test_bus, m);
365 g_usleep (interval_usecs);
366 }
367 }
368
369 static void
send_extended_messages(guint interval_usecs)370 send_extended_messages (guint interval_usecs)
371 {
372 GstMessage *msg;
373 GstDevice *device;
374
375 device = g_object_new (foo_device_get_type (), NULL);
376
377 msg = gst_message_new_device_added (NULL, device);
378 GST_LOG ("posting device-added message");
379 gst_bus_post (test_bus, msg);
380 g_usleep (interval_usecs);
381
382 msg = gst_message_new_device_removed (NULL, device);
383 GST_LOG ("posting device-removed message");
384 gst_bus_post (test_bus, msg);
385 g_usleep (interval_usecs);
386
387 gst_object_unref (device);
388 }
389
390
391 static void
send_10_app_messages(void)392 send_10_app_messages (void)
393 {
394 GstMessage *m;
395 GstStructure *s;
396 gint i;
397
398 for (i = 0; i < 10; i++) {
399 s = gst_structure_new ("test_message", "msg_id", G_TYPE_INT, i, NULL);
400 m = gst_message_new_application (NULL, s);
401 gst_bus_post (test_bus, m);
402 }
403 }
404
405 /* test that you get the same messages from a poll as from signal watches. */
GST_START_TEST(test_watch_with_poll)406 GST_START_TEST (test_watch_with_poll)
407 {
408 guint i;
409
410 test_bus = gst_bus_new ();
411 messages_seen = 0;
412
413 gst_bus_add_signal_watch (test_bus);
414 g_signal_connect (test_bus, "message", (GCallback) message_func, NULL);
415
416 send_10_app_messages ();
417
418 for (i = 0; i < 10; i++)
419 gst_message_unref (gst_bus_poll (test_bus, GST_MESSAGE_APPLICATION,
420 GST_CLOCK_TIME_NONE));
421
422 fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
423 fail_unless (messages_seen == 10, "signal handler didn't get 10 messages");
424
425 gst_bus_remove_signal_watch (test_bus);
426
427 gst_object_unref (test_bus);
428 }
429
430 GST_END_TEST;
431
432 static gint check_messages_seen;
433
434 static void
send_check_messages(guint index)435 send_check_messages (guint index)
436 {
437 GstMessage *m;
438 GstStructure *s;
439
440 s = gst_structure_new ("test_message", "index", G_TYPE_INT, index, NULL);
441 m = gst_message_new_application (NULL, s);
442 gst_bus_post (test_bus, m);
443 }
444
445 static void
message_check_func(GstBus * bus,GstMessage * message,gpointer data)446 message_check_func (GstBus * bus, GstMessage * message, gpointer data)
447 {
448 const GstStructure *s;
449 int index;
450
451 g_return_if_fail (GST_MESSAGE_TYPE (message) == GST_MESSAGE_APPLICATION);
452
453 s = gst_message_get_structure (message);
454 if (!gst_structure_get_int (s, "index", &index))
455 g_critical ("Invalid message");
456
457 check_messages_seen |= 1 << index;
458 }
459
460 /* test that removing and adding the signal watch again works */
GST_START_TEST(test_watch_twice)461 GST_START_TEST (test_watch_twice)
462 {
463 GMainContext *ctx;
464 guint expect = 0;
465
466 test_bus = gst_bus_new ();
467 ctx = g_main_context_new ();
468
469 g_main_context_push_thread_default (ctx);
470
471 send_check_messages (0);
472 expect |= 1 << 0;
473
474 gst_bus_add_signal_watch (test_bus);
475
476 g_signal_connect (test_bus, "message::application",
477 (GCallback) message_check_func, NULL);
478
479 send_check_messages (1);
480 expect |= 1 << 1;
481
482 fail_unless (g_main_context_pending (ctx));
483
484 gst_bus_remove_signal_watch (test_bus);
485
486 send_check_messages (2);
487 expect |= 1 << 2;
488
489 gst_bus_add_signal_watch (test_bus);
490
491 send_check_messages (3);
492 expect |= 1 << 3;
493
494 while (g_main_context_pending (ctx))
495 g_main_context_iteration (ctx, FALSE);
496
497 /* this message will not arrive */
498 send_check_messages (4);
499 expect |= 0 << 4;
500
501 fail_unless (g_main_context_pending (ctx));
502
503 gst_bus_remove_signal_watch (test_bus);
504
505 /* this message should not arrive */
506 send_check_messages (5);
507 expect |= 0 << 5;
508
509 while (g_main_context_pending (ctx))
510 g_main_context_iteration (ctx, FALSE);
511
512 fail_unless (check_messages_seen == expect);
513
514 g_main_context_unref (ctx);
515 gst_object_unref (test_bus);
516 }
517
518 GST_END_TEST;
519
520 /* test that you get the messages with pop. */
GST_START_TEST(test_timed_pop)521 GST_START_TEST (test_timed_pop)
522 {
523 guint i;
524
525 test_bus = gst_bus_new ();
526
527 send_10_app_messages ();
528
529 for (i = 0; i < 10; i++)
530 gst_message_unref (gst_bus_timed_pop (test_bus, GST_CLOCK_TIME_NONE));
531
532 fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
533
534 gst_object_unref (test_bus);
535 }
536
537 GST_END_TEST;
538
539 typedef struct
540 {
541 GstDevice device;
542 } FooDevice;
543 typedef struct
544 {
545 GstDeviceClass device_klass;
546 } FooDeviceClass;
547
548 G_DEFINE_TYPE (FooDevice, foo_device, GST_TYPE_DEVICE);
549
550 static void
foo_device_class_init(FooDeviceClass * klass)551 foo_device_class_init (FooDeviceClass * klass)
552 {
553 /* nothing to do here */
554 }
555
556 static void
foo_device_init(FooDevice * device)557 foo_device_init (FooDevice * device)
558 {
559 /* nothing to do here */
560 }
561
562 /* test that you get the messages with pop_filtered */
GST_START_TEST(test_timed_pop_filtered)563 GST_START_TEST (test_timed_pop_filtered)
564 {
565 GstMessage *msg;
566 guint i;
567
568 test_bus = gst_bus_new ();
569
570 send_10_app_messages ();
571 for (i = 0; i < 10; i++) {
572 msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
573 GST_MESSAGE_ANY);
574 fail_unless (msg != NULL);
575 gst_message_unref (msg);
576 }
577
578 /* should flush all messages on the bus with types not matching */
579 send_10_app_messages ();
580 msg = gst_bus_timed_pop_filtered (test_bus, 0,
581 GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
582 fail_unless (msg == NULL);
583 msg = gst_bus_timed_pop_filtered (test_bus, GST_SECOND / 2,
584 GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
585 fail_unless (msg == NULL);
586 /* there should be nothing on the bus now */
587 fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
588 msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_ANY);
589 fail_unless (msg == NULL);
590
591 send_5app_1el_1err_2app_1eos_messages (0);
592 msg = gst_bus_timed_pop_filtered (test_bus, 0,
593 GST_MESSAGE_ANY ^ GST_MESSAGE_APPLICATION);
594 fail_unless (msg != NULL);
595 fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_ELEMENT);
596 gst_message_unref (msg);
597 fail_unless (gst_bus_have_pending (test_bus), "expected messages on bus");
598 msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_APPLICATION);
599 fail_unless (msg != NULL);
600 fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
601 gst_message_unref (msg);
602 msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_ERROR);
603 fail_unless (msg == NULL);
604
605 gst_object_unref (test_bus);
606
607 /* Test extended messages */
608 GST_DEBUG
609 ("Checking extended messages received from gst_bus_timed_pop_filtered");
610 test_bus = gst_bus_new ();
611
612 send_5app_1el_1err_2app_1eos_messages (0);
613 send_extended_messages (0);
614 send_5app_1el_1err_2app_1eos_messages (0);
615 msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_EXTENDED);
616 fail_unless (msg != NULL);
617 fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_DEVICE_ADDED);
618 gst_message_unref (msg);
619
620 msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_EXTENDED);
621 fail_unless (msg != NULL);
622 fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_DEVICE_REMOVED);
623 gst_message_unref (msg);
624 gst_object_unref (test_bus);
625
626 /* Now check extended messages don't appear when we don't ask for them */
627 GST_DEBUG
628 ("Checking extended messages *not* received from gst_bus_timed_pop_filtered when not wanted");
629 test_bus = gst_bus_new ();
630
631 send_extended_messages (0);
632 send_5app_1el_1err_2app_1eos_messages (0);
633
634 msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_ERROR);
635 fail_unless (msg != NULL);
636 fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_ERROR);
637 gst_message_unref (msg);
638
639 msg = gst_bus_timed_pop_filtered (test_bus, 0, GST_MESSAGE_EOS);
640 fail_unless (msg != NULL);
641 fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_EOS);
642 gst_message_unref (msg);
643
644 gst_object_unref (test_bus);
645 }
646
647 GST_END_TEST;
648
649 static gpointer
post_delayed_thread(gpointer data)650 post_delayed_thread (gpointer data)
651 {
652 THREAD_START ();
653 send_5app_1el_1err_2app_1eos_messages (1 * G_USEC_PER_SEC);
654 return NULL;
655 }
656
657 /* test that you get the messages with pop_filtered if there's a timeout*/
GST_START_TEST(test_timed_pop_filtered_with_timeout)658 GST_START_TEST (test_timed_pop_filtered_with_timeout)
659 {
660 GstMessage *msg;
661
662 MAIN_INIT ();
663
664 test_bus = gst_bus_new ();
665
666 MAIN_START_THREAD_FUNCTIONS (1, post_delayed_thread, NULL);
667
668 MAIN_SYNCHRONIZE ();
669
670 msg = gst_bus_timed_pop_filtered (test_bus, 2 * GST_SECOND,
671 GST_MESSAGE_ERROR);
672 fail_unless (msg == NULL, "Got unexpected %s message",
673 (msg) ? GST_MESSAGE_TYPE_NAME (msg) : "");
674 msg = gst_bus_timed_pop_filtered (test_bus, (3 + 1 + 1 + 1) * GST_SECOND,
675 GST_MESSAGE_ERROR | GST_MESSAGE_ELEMENT);
676 fail_unless (msg != NULL, "expected element message, but got nothing");
677 fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_ELEMENT);
678 gst_message_unref (msg);
679 msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
680 GST_MESSAGE_APPLICATION);
681 fail_unless (msg != NULL, "expected application message, but got nothing");
682 fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
683 gst_message_unref (msg);
684 msg = gst_bus_timed_pop_filtered (test_bus, GST_CLOCK_TIME_NONE,
685 GST_MESSAGE_APPLICATION);
686 fail_unless (msg != NULL, "expected application message, but got nothing");
687 fail_unless_equals_int (GST_MESSAGE_TYPE (msg), GST_MESSAGE_APPLICATION);
688 gst_message_unref (msg);
689 msg = gst_bus_timed_pop_filtered (test_bus, GST_SECOND / 4,
690 GST_MESSAGE_TAG | GST_MESSAGE_ERROR);
691 fail_unless (msg == NULL, "Got unexpected %s message",
692 (msg) ? GST_MESSAGE_TYPE_NAME (msg) : "");
693
694 MAIN_STOP_THREADS ();
695
696 gst_object_unref (test_bus);
697 }
698
699 GST_END_TEST;
700
701 /* test that you get the messages with pop from another thread. */
702 static gpointer
pop_thread(gpointer data)703 pop_thread (gpointer data)
704 {
705 GstBus *bus = GST_BUS_CAST (data);
706 guint i;
707
708 for (i = 0; i < 10; i++)
709 gst_message_unref (gst_bus_timed_pop (bus, GST_CLOCK_TIME_NONE));
710
711 return NULL;
712 }
713
GST_START_TEST(test_timed_pop_thread)714 GST_START_TEST (test_timed_pop_thread)
715 {
716 GThread *thread;
717 GError *error = NULL;
718
719 test_bus = gst_bus_new ();
720
721 thread = g_thread_try_new ("gst-chek", pop_thread, test_bus, &error);
722 fail_if (error != NULL);
723
724 send_10_app_messages ();
725
726 g_thread_join (thread);
727
728 fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
729
730 /* try to pop a message without timeout. */
731 fail_if (gst_bus_timed_pop (test_bus, 0) != NULL);
732
733 /* with a small timeout */
734 fail_if (gst_bus_timed_pop (test_bus, 1000) != NULL);
735
736 gst_object_unref (test_bus);
737 }
738
739 GST_END_TEST;
740
741 static gboolean
cb_bus_call(GstBus * bus,GstMessage * msg,gpointer data)742 cb_bus_call (GstBus * bus, GstMessage * msg, gpointer data)
743 {
744 GMainLoop *loop = data;
745
746 switch (GST_MESSAGE_TYPE (msg)) {
747 case GST_MESSAGE_EOS:
748 {
749 GST_INFO ("End-of-stream");
750 g_main_loop_quit (loop);
751 break;
752 }
753 case GST_MESSAGE_ERROR:
754 {
755 GError *err = NULL;
756
757 gst_message_parse_error (msg, &err, NULL);
758 g_error ("Error: %s", err->message);
759 g_error_free (err);
760
761 g_main_loop_quit (loop);
762 break;
763 }
764 default:
765 {
766 GST_LOG ("BUS MESSAGE: type=%s", GST_MESSAGE_TYPE_NAME (msg));
767 break;
768 }
769 }
770
771 return TRUE;
772 }
773
GST_START_TEST(test_custom_main_context)774 GST_START_TEST (test_custom_main_context)
775 {
776 GMainContext *ctx;
777 GMainLoop *loop;
778 GstElement *pipeline;
779 GstElement *src;
780 GstElement *sink;
781 GSource *source;
782 GstBus *bus;
783
784 ctx = g_main_context_new ();
785 loop = g_main_loop_new (ctx, FALSE);
786
787 pipeline = gst_pipeline_new (NULL);
788 src = gst_element_factory_make ("fakesrc", NULL);
789 g_object_set (src, "num-buffers", 2000, NULL);
790
791 sink = gst_element_factory_make ("fakesink", NULL);
792
793 fail_unless (gst_bin_add (GST_BIN (pipeline), src));
794 fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
795 fail_unless (gst_element_link (src, sink));
796
797 bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
798 source = gst_bus_create_watch (bus);
799 g_source_attach (source, ctx);
800 g_source_set_callback (source, (GSourceFunc) cb_bus_call, loop, NULL);
801 g_source_unref (source);
802 gst_object_unref (bus);
803
804 GST_INFO ("starting pipeline");
805
806 gst_element_set_state (pipeline, GST_STATE_PLAYING);
807 gst_element_get_state (pipeline, NULL, NULL, GST_CLOCK_TIME_NONE);
808
809 GST_INFO ("running event loop, ctx=%p", ctx);
810 g_main_loop_run (loop);
811
812 gst_element_set_state (pipeline, GST_STATE_NULL);
813
814 /* clean up */
815 if (ctx)
816 g_main_context_unref (ctx);
817 g_main_loop_unref (loop);
818 gst_object_unref (pipeline);
819 }
820
821 GST_END_TEST;
822
823 static GstBusSyncReply
test_async_sync_handler(GstBus * bus,GstMessage * msg,gpointer user_data)824 test_async_sync_handler (GstBus * bus, GstMessage * msg, gpointer user_data)
825 {
826 GArray *timestamps = user_data;
827 gint64 ts = g_get_monotonic_time () * 1000; /* microsecs -> nanosecs */
828
829 g_array_append_val (timestamps, ts);
830 GST_INFO ("[msg %p] %" GST_PTR_FORMAT, msg, msg);
831
832 return GST_BUS_ASYNC;
833 }
834
835 static gpointer
post_10_app_messages_thread(gpointer data)836 post_10_app_messages_thread (gpointer data)
837 {
838 THREAD_START ();
839 send_10_app_messages ();
840 return NULL;
841 }
842
843 /* Test GST_BUS_ASYNC actually causes the thread posting the message to
844 * block until the message has been freed. We spawn a thread to post ten
845 * messages. We install a bus sync handler to get the timestamp of each
846 * message as it is being posted, and to return GST_BUS_ASYNC. In the main
847 * thread we sleep a bit after we pop off a message and before we free it.
848 * The posting thread should be blocked while the main thread sleeps, so
849 * we expect the interval as the messages are posted to be roughly the same
850 * as the sleep time in the main thread. g_usleep() is not super-precise, so
851 * we allow for some slack there, we just want to check that the posting
852 * thread was blocked at all really. */
GST_START_TEST(test_async_message)853 GST_START_TEST (test_async_message)
854 {
855 GArray *timestamps;
856 guint i;
857
858 MAIN_INIT ();
859
860 timestamps = g_array_sized_new (FALSE, FALSE, sizeof (gint64), 10);
861
862 test_bus = gst_bus_new ();
863
864 gst_bus_set_sync_handler (test_bus, test_async_sync_handler, timestamps,
865 NULL);
866
867 MAIN_START_THREAD_FUNCTIONS (1, post_10_app_messages_thread, NULL);
868
869 MAIN_SYNCHRONIZE ();
870
871 for (i = 0; i < 10; i++) {
872 GstMessage *msg;
873
874 GST_LOG ("(%d) waiting for message..", i);
875 msg = gst_bus_timed_pop (test_bus, GST_CLOCK_TIME_NONE);
876 GST_LOG ("(%d) got message, sleeping a bit", i);
877 g_usleep (60 * GST_MSECOND / (GST_SECOND / G_USEC_PER_SEC));
878 GST_LOG ("(%d) about to free message", i);
879 gst_message_unref (msg);
880 }
881
882 for (i = 1; i < 10; i++) {
883 gint64 prev_ts = g_array_index (timestamps, gint64, i - 1);
884 gint64 ts = g_array_index (timestamps, gint64, i);
885 gint64 diff = ts - prev_ts;
886
887 fail_unless (prev_ts < ts);
888 fail_unless (diff >= 20 * GST_MSECOND, "interval between messages being "
889 "posted was just %" G_GINT64_FORMAT "ms", diff / GST_MSECOND);
890 }
891
892 fail_if (gst_bus_have_pending (test_bus), "unexpected messages on bus");
893
894 MAIN_STOP_THREADS ();
895
896 gst_object_unref (test_bus);
897
898 g_array_unref (timestamps);
899 }
900
901 GST_END_TEST;
902
GST_START_TEST(test_single_gsource)903 GST_START_TEST (test_single_gsource)
904 {
905 GstBus *bus = gst_bus_new ();
906 GSource *source = gst_bus_create_watch (bus);
907 g_source_attach (source, NULL);
908 g_source_unref (source);
909
910 source = gst_bus_create_watch (bus);
911 fail_if (source, "Only one GSource can be added to a bus");
912
913 ASSERT_CRITICAL (gst_bus_add_signal_watch (bus));
914 ASSERT_CRITICAL (gst_bus_remove_signal_watch (bus));
915
916 fail_unless (gst_bus_remove_watch (bus), "Could not remove watch");
917 gst_bus_add_signal_watch (bus);
918
919 fail_if (gst_bus_remove_watch (bus), "Signal watch should be removed"
920 " with gst_bus_remove_signal_watch");
921
922 gst_bus_remove_signal_watch (bus);
923
924 gst_object_unref (bus);
925 }
926
927 GST_END_TEST;
928
929 static Suite *
gst_bus_suite(void)930 gst_bus_suite (void)
931 {
932 Suite *s = suite_create ("GstBus");
933 TCase *tc_chain = tcase_create ("stresstest");
934
935 tcase_set_timeout (tc_chain, 60);
936
937 suite_add_tcase (s, tc_chain);
938 tcase_add_test (tc_chain, test_hammer_bus);
939 tcase_add_test (tc_chain, test_watch);
940 tcase_add_test (tc_chain, test_watch_with_poll);
941 tcase_add_test (tc_chain, test_watch_twice);
942 tcase_add_test (tc_chain, test_watch_with_custom_context);
943 tcase_add_test (tc_chain, test_add_watch_with_custom_context);
944 tcase_add_test (tc_chain, test_remove_watch);
945 tcase_add_test (tc_chain, test_timed_pop);
946 tcase_add_test (tc_chain, test_timed_pop_thread);
947 tcase_add_test (tc_chain, test_timed_pop_filtered);
948 tcase_add_test (tc_chain, test_timed_pop_filtered_with_timeout);
949 tcase_add_test (tc_chain, test_custom_main_context);
950 tcase_add_test (tc_chain, test_async_message);
951 tcase_add_test (tc_chain, test_single_gsource);
952 return s;
953 }
954
955 GST_CHECK_MAIN (gst_bus);
956