• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * aggregator.c - GstAggregator testsuite
3  * Copyright (C) 2006 Alessandro Decina <alessandro.d@gmail.com>
4  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@oencreed.com>
5  * Copyright (C) 2014 Thibault Saunier <tsaunier@opencreed.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26 
27 #include <stdlib.h>
28 #include <gst/check/gstcheck.h>
29 #include <gst/base/gstaggregator.h>
30 
31 /* dummy aggregator based element */
32 
33 #define GST_TYPE_TEST_AGGREGATOR            (gst_test_aggregator_get_type ())
34 #define GST_TEST_AGGREGATOR(obj)            (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_TEST_AGGREGATOR, GstTestAggregator))
35 #define GST_TEST_AGGREGATOR_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_TEST_AGGREGATOR, GstTestAggregatorClass))
36 #define GST_TEST_AGGREGATOR_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_TEST_AGGREGATOR, GstTestAggregatorClass))
37 
38 #define fail_error_message(msg)     \
39   G_STMT_START {        \
40     GError *error;        \
41     gst_message_parse_error(msg, &error, NULL);       \
42     fail_unless(FALSE, "Error Message from %s : %s",      \
43     GST_OBJECT_NAME (GST_MESSAGE_SRC(msg)), error->message); \
44     g_error_free (error);           \
45   } G_STMT_END;
46 
47 typedef struct _GstTestAggregator GstTestAggregator;
48 typedef struct _GstTestAggregatorClass GstTestAggregatorClass;
49 
50 static GType gst_test_aggregator_get_type (void);
51 
52 #define BUFFER_DURATION 100000000       /* 10 frames per second */
53 #define TEST_GAP_PTS 0
54 #define TEST_GAP_DURATION (5 * GST_SECOND)
55 
56 struct _GstTestAggregator
57 {
58   GstAggregator parent;
59 
60   guint64 timestamp;
61   gboolean gap_expected;
62   gboolean do_flush_on_aggregate;
63   gboolean do_remove_pad_on_aggregate;
64 };
65 
66 struct _GstTestAggregatorClass
67 {
68   GstAggregatorClass parent_class;
69 };
70 
71 static GstFlowReturn
gst_test_aggregator_aggregate(GstAggregator * aggregator,gboolean timeout)72 gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout)
73 {
74   GstIterator *iter;
75   gboolean all_eos = TRUE;
76   GstTestAggregator *testagg;
77   GstBuffer *buf;
78 
79   gboolean done_iterating = FALSE;
80 
81   testagg = GST_TEST_AGGREGATOR (aggregator);
82 
83   iter = gst_element_iterate_sink_pads (GST_ELEMENT (testagg));
84   while (!done_iterating) {
85     GValue value = { 0, };
86     GstAggregatorPad *pad;
87 
88     switch (gst_iterator_next (iter, &value)) {
89       case GST_ITERATOR_OK:
90         pad = g_value_get_object (&value);
91 
92         if (gst_aggregator_pad_is_eos (pad) == FALSE)
93           all_eos = FALSE;
94 
95         if (testagg->gap_expected == TRUE) {
96           buf = gst_aggregator_pad_peek_buffer (pad);
97           fail_unless (buf);
98           fail_unless (GST_BUFFER_PTS (buf) == TEST_GAP_PTS);
99           fail_unless (GST_BUFFER_DURATION (buf) == TEST_GAP_DURATION);
100           fail_unless (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_GAP));
101           fail_unless (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DROPPABLE));
102           gst_buffer_unref (buf);
103           testagg->gap_expected = FALSE;
104         }
105 
106         if (testagg->do_flush_on_aggregate) {
107           GstBuffer *popped_buf;
108           buf = gst_aggregator_pad_peek_buffer (pad);
109 
110           GST_DEBUG_OBJECT (pad, "Flushing on aggregate");
111 
112           gst_pad_send_event (GST_PAD (pad), gst_event_new_flush_start ());
113           popped_buf = gst_aggregator_pad_pop_buffer (pad);
114 
115           fail_unless (buf == popped_buf);
116           gst_buffer_unref (buf);
117           gst_buffer_unref (popped_buf);
118         } else if (testagg->do_remove_pad_on_aggregate) {
119           buf = gst_aggregator_pad_peek_buffer (pad);
120 
121           GST_DEBUG_OBJECT (pad, "Removing pad on aggregate");
122 
123           gst_buffer_unref (buf);
124           gst_element_release_request_pad (GST_ELEMENT (aggregator),
125               GST_PAD (pad));
126         } else {
127           gst_aggregator_pad_drop_buffer (pad);
128         }
129 
130         g_value_reset (&value);
131         break;
132       case GST_ITERATOR_RESYNC:
133         gst_iterator_resync (iter);
134         break;
135       case GST_ITERATOR_ERROR:
136         GST_WARNING_OBJECT (testagg, "Sinkpads iteration error");
137         done_iterating = TRUE;
138         break;
139       case GST_ITERATOR_DONE:
140         done_iterating = TRUE;
141         break;
142     }
143   }
144   gst_iterator_free (iter);
145 
146   if (all_eos == TRUE) {
147     GST_INFO_OBJECT (testagg, "no data available, must be EOS");
148     gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ());
149     return GST_FLOW_EOS;
150   }
151 
152   buf = gst_buffer_new ();
153   GST_BUFFER_TIMESTAMP (buf) = testagg->timestamp;
154   GST_BUFFER_DURATION (buf) = BUFFER_DURATION;
155   testagg->timestamp += BUFFER_DURATION;
156 
157   gst_aggregator_finish_buffer (aggregator, buf);
158 
159   /* We just check finish_frame return FLOW_OK */
160   return GST_FLOW_OK;
161 }
162 
163 #define gst_test_aggregator_parent_class parent_class
164 G_DEFINE_TYPE (GstTestAggregator, gst_test_aggregator, GST_TYPE_AGGREGATOR);
165 
166 static void
gst_test_aggregator_class_init(GstTestAggregatorClass * klass)167 gst_test_aggregator_class_init (GstTestAggregatorClass * klass)
168 {
169   GstElementClass *gstelement_class = (GstElementClass *) klass;
170   GstAggregatorClass *base_aggregator_class = (GstAggregatorClass *) klass;
171 
172   static GstStaticPadTemplate _src_template =
173       GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS,
174       GST_STATIC_CAPS_ANY);
175 
176   static GstStaticPadTemplate _sink_template =
177       GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK, GST_PAD_REQUEST,
178       GST_STATIC_CAPS_ANY);
179 
180   gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
181       &_src_template, GST_TYPE_AGGREGATOR_PAD);
182 
183   gst_element_class_add_static_pad_template_with_gtype (gstelement_class,
184       &_sink_template, GST_TYPE_AGGREGATOR_PAD);
185 
186   gst_element_class_set_static_metadata (gstelement_class, "Aggregator",
187       "Testing", "Combine N buffers", "Stefan Sauer <ensonic@users.sf.net>");
188 
189   base_aggregator_class->aggregate =
190       GST_DEBUG_FUNCPTR (gst_test_aggregator_aggregate);
191 }
192 
193 static void
gst_test_aggregator_init(GstTestAggregator * self)194 gst_test_aggregator_init (GstTestAggregator * self)
195 {
196   GstAggregator *agg = GST_AGGREGATOR (self);
197   gst_segment_init (&GST_AGGREGATOR_PAD (agg->srcpad)->segment,
198       GST_FORMAT_TIME);
199   self->timestamp = 0;
200   self->gap_expected = FALSE;
201 }
202 
203 static gboolean
gst_test_aggregator_plugin_init(GstPlugin * plugin)204 gst_test_aggregator_plugin_init (GstPlugin * plugin)
205 {
206   return gst_element_register (plugin, "testaggregator", GST_RANK_NONE,
207       GST_TYPE_TEST_AGGREGATOR);
208 }
209 
210 static gboolean
gst_test_aggregator_plugin_register(void)211 gst_test_aggregator_plugin_register (void)
212 {
213   return gst_plugin_register_static (GST_VERSION_MAJOR,
214       GST_VERSION_MINOR,
215       "testaggregator",
216       "Combine buffers",
217       gst_test_aggregator_plugin_init,
218       VERSION, GST_LICENSE, PACKAGE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN);
219 }
220 
221 /* test helpers */
222 
223 typedef struct
224 {
225   GQueue *queue;
226   GstElement *aggregator;
227   GstPad *sinkpad, *srcpad;
228   GstFlowReturn expected_result;
229 
230   /*                       ------------------
231    * -----------   --------|--              |
232    * | srcpad | -- | sinkpad |  aggregator  |
233    * -----------   --------|--              |
234    *                       ------------------
235    *  This is for 1 Chain, we can have several
236    */
237 } ChainData;
238 
239 typedef struct
240 {
241   GMainLoop *ml;
242   GstPad *srcpad,               /* srcpad of the GstAggregator */
243    *sinkpad;                    /* fake sinkpad to which GstAggregator.srcpad is linked */
244   guint timeout_id;
245   GstElement *aggregator;
246 
247   /* -----------------|
248    * |             ----------    -----------
249    * | aggregator  | srcpad | -- | sinkpad |
250    * |             ----------    -----------
251    * -----------------|
252    */
253 
254   gint flush_start_events, flush_stop_events;
255 } TestData;
256 
257 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
258     GST_PAD_SRC,
259     GST_PAD_ALWAYS,
260     GST_STATIC_CAPS_ANY);
261 
262 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
263     GST_PAD_SINK,
264     GST_PAD_ALWAYS,
265     GST_STATIC_CAPS_ANY);
266 
267 static void
start_flow(ChainData * chain_data)268 start_flow (ChainData * chain_data)
269 {
270   GstSegment segment;
271   GstCaps *caps;
272 
273   gst_pad_push_event (chain_data->srcpad, gst_event_new_stream_start ("test"));
274 
275   caps = gst_caps_new_empty_simple ("foo/x-bar");
276   gst_pad_push_event (chain_data->srcpad, gst_event_new_caps (caps));
277   gst_caps_unref (caps);
278 
279   gst_segment_init (&segment, GST_FORMAT_TIME);
280   gst_pad_push_event (chain_data->srcpad, gst_event_new_segment (&segment));
281 }
282 
283 static gpointer
push_data(gpointer user_data)284 push_data (gpointer user_data)
285 {
286   ChainData *chain_data = (ChainData *) user_data;
287   GstTestAggregator *aggregator = (GstTestAggregator *) chain_data->aggregator;
288   GstPad *sinkpad = chain_data->sinkpad;
289   GstPad *srcpad = chain_data->srcpad;
290   gpointer data;
291 
292   start_flow (chain_data);
293 
294   while ((data = g_queue_pop_head (chain_data->queue))) {
295     GST_DEBUG_OBJECT (sinkpad, "Pushing %" GST_PTR_FORMAT, data);
296 
297     /* switch on the data type and push */
298     if (GST_IS_BUFFER (data)) {
299       GstFlowReturn flow = gst_pad_push (srcpad, GST_BUFFER_CAST (data));
300       fail_unless (flow == chain_data->expected_result,
301           "got flow %s instead of %s on %s:%s", gst_flow_get_name (flow),
302           gst_flow_get_name (chain_data->expected_result),
303           GST_DEBUG_PAD_NAME (sinkpad));
304     } else if (GST_IS_EVENT (data)) {
305       switch (GST_EVENT_TYPE (data)) {
306         case GST_EVENT_GAP:
307           aggregator->gap_expected = TRUE;
308           break;
309         default:
310           break;
311       }
312       fail_unless (gst_pad_push_event (srcpad, GST_EVENT_CAST (data)));
313     } else if (GST_IS_QUERY (data)) {
314       /* we don't care whether the query actually got handled */
315       gst_pad_peer_query (srcpad, GST_QUERY_CAST (data));
316       gst_query_unref (GST_QUERY_CAST (data));
317     } else {
318       GST_WARNING_OBJECT (sinkpad, "bad queue entry: %" GST_PTR_FORMAT, data);
319     }
320   }
321   GST_DEBUG_OBJECT (sinkpad, "All data from queue sent");
322 
323   return NULL;
324 }
325 
326 static gboolean
_aggregate_timeout(GMainLoop * ml)327 _aggregate_timeout (GMainLoop * ml)
328 {
329   g_main_loop_quit (ml);
330 
331   fail_unless ("No buffer found on aggregator.srcpad -> TIMEOUT" == NULL);
332 
333   return FALSE;
334 }
335 
336 static gboolean
_quit(GMainLoop * ml)337 _quit (GMainLoop * ml)
338 {
339   GST_DEBUG ("QUITTING ML");
340   g_main_loop_quit (ml);
341 
342   return G_SOURCE_REMOVE;
343 }
344 
345 static GstPadProbeReturn
_aggregated_cb(GstPad * pad,GstPadProbeInfo * info,GMainLoop * ml)346 _aggregated_cb (GstPad * pad, GstPadProbeInfo * info, GMainLoop * ml)
347 {
348   GST_DEBUG ("Received data %" GST_PTR_FORMAT, info->data);
349   GST_DEBUG ("Should quit ML");
350   g_idle_add ((GSourceFunc) _quit, ml);
351 
352   return GST_PAD_PROBE_REMOVE;
353 }
354 
355 static GstPadProbeReturn
_downstream_probe_cb(GstPad * pad,GstPadProbeInfo * info,TestData * test)356 _downstream_probe_cb (GstPad * pad, GstPadProbeInfo * info, TestData * test)
357 {
358   GST_DEBUG ("PROBING ");
359   if (info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
360     if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
361         GST_EVENT_FLUSH_START) {
362 
363       g_atomic_int_inc (&test->flush_start_events);
364       GST_DEBUG ("==========> FLUSH: %i", test->flush_start_events);
365     } else if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_EVENT (info)) ==
366         GST_EVENT_FLUSH_STOP)
367       g_atomic_int_inc (&test->flush_stop_events);
368   }
369   return GST_PAD_PROBE_OK;
370 }
371 
372 /*
373  * Not thread safe, will create a new ChainData which contains
374  * an activated src pad linked to a requested sink pad of @agg, and
375  * a newly allocated buffer ready to be pushed. Caller needs to
376  * clear with _chain_data_clear after.
377  */
378 static void
_chain_data_init(ChainData * data,GstElement * agg,...)379 _chain_data_init (ChainData * data, GstElement * agg, ...)
380 {
381   static gint num_src_pads = 0;
382   gchar *pad_name = g_strdup_printf ("src%d", num_src_pads);
383   va_list var_args;
384   gpointer d;
385 
386   num_src_pads += 1;
387 
388   data->srcpad = gst_pad_new_from_static_template (&srctemplate, pad_name);
389   g_free (pad_name);
390   gst_pad_set_active (data->srcpad, TRUE);
391   data->aggregator = agg;
392   data->sinkpad = gst_element_request_pad_simple (agg, "sink_%u");
393   fail_unless (GST_IS_PAD (data->sinkpad));
394   fail_unless (gst_pad_link (data->srcpad, data->sinkpad) == GST_PAD_LINK_OK);
395 
396   /* add data items */
397   data->queue = g_queue_new ();
398   va_start (var_args, agg);
399   while (TRUE) {
400     if (!(d = va_arg (var_args, gpointer)))
401       break;
402     g_queue_push_tail (data->queue, d);
403     GST_DEBUG_OBJECT (data->sinkpad, "Adding to queue: %" GST_PTR_FORMAT, d);
404   }
405   va_end (var_args);
406 }
407 
408 static void
_chain_data_clear(ChainData * chain_data)409 _chain_data_clear (ChainData * chain_data)
410 {
411   gpointer data;
412 
413   while ((data = g_queue_pop_head (chain_data->queue))) {
414     /* switch on the data type and free */
415     if (GST_IS_BUFFER (data)) {
416       gst_buffer_unref (GST_BUFFER_CAST (data));
417     } else if (GST_IS_EVENT (data)) {
418       gst_event_unref (GST_EVENT_CAST (data));
419     } else if (GST_IS_QUERY (data)) {
420       gst_query_unref (GST_QUERY_CAST (data));
421     } else {
422       GST_WARNING_OBJECT (chain_data->sinkpad, "bad queue entry: %"
423           GST_PTR_FORMAT, data);
424     }
425   }
426   g_queue_free (chain_data->queue);
427 
428   if (chain_data->srcpad)
429     gst_object_unref (chain_data->srcpad);
430   if (chain_data->sinkpad)
431     gst_object_unref (chain_data->sinkpad);
432 }
433 
434 static GstFlowReturn
_test_chain(GstPad * pad,GstObject * object,GstBuffer * buffer)435 _test_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
436 {
437   /* accept any buffers */
438   gst_buffer_unref (buffer);
439   return GST_FLOW_OK;
440 }
441 
442 static void
_test_data_init(TestData * test,gboolean needs_flushing)443 _test_data_init (TestData * test, gboolean needs_flushing)
444 {
445   const gchar *timeout_factor_str = g_getenv ("TIMEOUT_FACTOR");
446   gint timeout = 1000;
447 
448   test->aggregator = gst_element_factory_make ("testaggregator", NULL);
449   gst_element_set_state (test->aggregator, GST_STATE_PLAYING);
450   test->ml = g_main_loop_new (NULL, TRUE);
451   test->srcpad = GST_AGGREGATOR (test->aggregator)->srcpad;
452 
453   GST_DEBUG_OBJECT (test->srcpad, "Init test data for srcpad");
454 
455   if (needs_flushing) {
456     static gint num_sink_pads = 0;
457     gchar *pad_name = g_strdup_printf ("sink%d", num_sink_pads);
458 
459     num_sink_pads += 1;
460     test->sinkpad = gst_pad_new_from_static_template (&sinktemplate, pad_name);
461     gst_pad_set_chain_function (test->sinkpad, _test_chain);
462     gst_pad_set_active (test->sinkpad, TRUE);
463     g_free (pad_name);
464     fail_unless (gst_pad_link (test->srcpad, test->sinkpad) == GST_PAD_LINK_OK);
465     gst_pad_add_probe (test->srcpad, GST_PAD_PROBE_TYPE_EVENT_FLUSH,
466         (GstPadProbeCallback) _downstream_probe_cb, test, NULL);
467   } else {
468     gst_pad_add_probe (test->srcpad, GST_PAD_PROBE_TYPE_BUFFER,
469         (GstPadProbeCallback) _aggregated_cb, test->ml, NULL);
470   }
471 
472   if (timeout_factor_str) {
473     gint factor = g_ascii_strtoll (timeout_factor_str, NULL, 10);
474     if (factor)
475       timeout *= factor;
476   }
477 
478   test->timeout_id =
479       g_timeout_add (timeout, (GSourceFunc) _aggregate_timeout, test->ml);
480 }
481 
482 static void
_test_data_clear(TestData * test)483 _test_data_clear (TestData * test)
484 {
485   gst_element_set_state (test->aggregator, GST_STATE_NULL);
486   gst_object_unref (test->aggregator);
487 
488   if (test->sinkpad)
489     gst_object_unref (test->sinkpad);
490 
491   g_main_loop_unref (test->ml);
492 }
493 
494 /* tests */
495 
GST_START_TEST(test_aggregate)496 GST_START_TEST (test_aggregate)
497 {
498   GThread *thread1, *thread2;
499   ChainData data1 = { 0, };
500   ChainData data2 = { 0, };
501   TestData test = { 0, };
502 
503   _test_data_init (&test, FALSE);
504   _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
505   _chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL);
506 
507   thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
508   thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
509 
510   g_main_loop_run (test.ml);
511   g_source_remove (test.timeout_id);
512 
513   /* these will return immediately as when the data is popped the threads are
514    * unlocked and will terminate */
515   g_thread_join (thread1);
516   g_thread_join (thread2);
517 
518   _chain_data_clear (&data1);
519   _chain_data_clear (&data2);
520   _test_data_clear (&test);
521 }
522 
523 GST_END_TEST;
524 
GST_START_TEST(test_aggregate_eos)525 GST_START_TEST (test_aggregate_eos)
526 {
527   GThread *thread1, *thread2;
528   ChainData data1 = { 0, };
529   ChainData data2 = { 0, };
530   TestData test = { 0, };
531 
532   _test_data_init (&test, FALSE);
533   _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
534   _chain_data_init (&data2, test.aggregator, gst_event_new_eos (), NULL);
535 
536   thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
537   thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
538 
539   g_main_loop_run (test.ml);
540   g_source_remove (test.timeout_id);
541 
542   /* these will return immediately as when the data is popped the threads are
543    * unlocked and will terminate */
544   g_thread_join (thread1);
545   g_thread_join (thread2);
546 
547   _chain_data_clear (&data1);
548   _chain_data_clear (&data2);
549   _test_data_clear (&test);
550 }
551 
552 GST_END_TEST;
553 
GST_START_TEST(test_aggregate_gap)554 GST_START_TEST (test_aggregate_gap)
555 {
556   GThread *thread;
557   ChainData data = { 0, };
558   TestData test = { 0, };
559 
560   _test_data_init (&test, FALSE);
561   _chain_data_init (&data, test.aggregator,
562       gst_event_new_gap (TEST_GAP_PTS, TEST_GAP_DURATION), NULL);
563 
564   thread = g_thread_try_new ("gst-check", push_data, &data, NULL);
565 
566   g_main_loop_run (test.ml);
567   g_source_remove (test.timeout_id);
568 
569   /* these will return immediately as when the data is popped the threads are
570    * unlocked and will terminate */
571   g_thread_join (thread);
572 
573   _chain_data_clear (&data);
574   _test_data_clear (&test);
575 }
576 
577 GST_END_TEST;
578 
GST_START_TEST(test_aggregate_handle_events)579 GST_START_TEST (test_aggregate_handle_events)
580 {
581   GThread *thread1, *thread2;
582   ChainData data1 = { 0, };
583   ChainData data2 = { 0, };
584   TestData test = { 0, };
585 
586   _test_data_init (&test, FALSE);
587   _chain_data_init (&data1, test.aggregator,
588       gst_event_new_tag (gst_tag_list_new_empty ()), gst_buffer_new (), NULL);
589   _chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL);
590 
591   thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
592   thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
593 
594   g_main_loop_run (test.ml);
595   g_source_remove (test.timeout_id);
596 
597   /* these will return immediately as when the data is popped the threads are
598    * unlocked and will terminate */
599   g_thread_join (thread1);
600   g_thread_join (thread2);
601 
602   _chain_data_clear (&data1);
603   _chain_data_clear (&data2);
604   _test_data_clear (&test);
605 }
606 
607 GST_END_TEST;
608 
GST_START_TEST(test_aggregate_handle_queries)609 GST_START_TEST (test_aggregate_handle_queries)
610 {
611   GThread *thread1, *thread2;
612   ChainData data1 = { 0, };
613   ChainData data2 = { 0, };
614   TestData test = { 0, };
615   GstCaps *caps;
616 
617   _test_data_init (&test, FALSE);
618 
619   caps = gst_caps_new_empty_simple ("foo/x-bar");
620   _chain_data_init (&data1, test.aggregator,
621       gst_query_new_allocation (caps, FALSE), gst_buffer_new (), NULL);
622   gst_caps_unref (caps);
623 
624   _chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL);
625 
626   thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
627   thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
628 
629   g_main_loop_run (test.ml);
630   g_source_remove (test.timeout_id);
631 
632   /* these will return immediately as when the data is popped the threads are
633    * unlocked and will terminate */
634   g_thread_join (thread1);
635   g_thread_join (thread2);
636 
637   _chain_data_clear (&data1);
638   _chain_data_clear (&data2);
639   _test_data_clear (&test);
640 }
641 
642 GST_END_TEST;
643 
644 #define NUM_BUFFERS 3
645 static void
handoff(GstElement * fakesink,GstBuffer * buf,GstPad * pad,guint * count)646 handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count)
647 {
648   *count = *count + 1;
649   GST_DEBUG ("HANDOFF: %i", *count);
650 }
651 
652 /* Test a linear pipeline using aggregator */
GST_START_TEST(test_linear_pipeline)653 GST_START_TEST (test_linear_pipeline)
654 {
655   GstBus *bus;
656   GstMessage *msg;
657   GstElement *pipeline, *src, *agg, *sink;
658   gint count = 0;
659 
660   pipeline = gst_pipeline_new ("pipeline");
661   src = gst_check_setup_element ("fakesrc");
662   g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4,
663       NULL);
664   agg = gst_check_setup_element ("testaggregator");
665   sink = gst_check_setup_element ("fakesink");
666   g_object_set (sink, "signal-handoffs", TRUE, NULL);
667   g_signal_connect (sink, "handoff", (GCallback) handoff, &count);
668 
669   fail_unless (gst_bin_add (GST_BIN (pipeline), src));
670   fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
671   fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
672   fail_unless (gst_element_link (src, agg));
673   fail_unless (gst_element_link (agg, sink));
674 
675   bus = gst_element_get_bus (pipeline);
676   fail_if (bus == NULL);
677   gst_element_set_state (pipeline, GST_STATE_PLAYING);
678 
679   msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
680   fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
681   gst_message_unref (msg);
682 
683   fail_unless_equals_int (count, NUM_BUFFERS);
684 
685   gst_element_set_state (pipeline, GST_STATE_NULL);
686   gst_object_unref (bus);
687   gst_object_unref (pipeline);
688 }
689 
690 GST_END_TEST;
691 
GST_START_TEST(test_two_src_pipeline)692 GST_START_TEST (test_two_src_pipeline)
693 {
694   GstBus *bus;
695   GstMessage *msg;
696   GstElement *pipeline, *src, *src1, *agg, *sink;
697   gint count = 0;
698 
699   pipeline = gst_pipeline_new ("pipeline");
700   src = gst_element_factory_make ("fakesrc", NULL);
701   g_object_set (src, "num-buffers", NUM_BUFFERS, "sizetype", 2, "sizemax", 4,
702       NULL);
703 
704   src1 = gst_element_factory_make ("fakesrc", NULL);
705   g_object_set (src1, "num-buffers", NUM_BUFFERS + 1, "sizetype", 2, "sizemax",
706       4, NULL);
707 
708   agg = gst_check_setup_element ("testaggregator");
709   sink = gst_check_setup_element ("fakesink");
710   g_object_set (sink, "signal-handoffs", TRUE, NULL);
711   g_signal_connect (sink, "handoff", (GCallback) handoff, &count);
712 
713   fail_unless (gst_bin_add (GST_BIN (pipeline), src));
714   fail_unless (gst_bin_add (GST_BIN (pipeline), src1));
715   fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
716   fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
717   fail_unless (gst_element_link (src, agg));
718   fail_unless (gst_element_link (src1, agg));
719   fail_unless (gst_element_link (agg, sink));
720 
721   bus = gst_element_get_bus (pipeline);
722   fail_if (bus == NULL);
723   gst_element_set_state (pipeline, GST_STATE_PLAYING);
724 
725   msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
726   fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
727   gst_message_unref (msg);
728 
729   fail_unless_equals_int (count, NUM_BUFFERS + 1);
730 
731   gst_element_set_state (pipeline, GST_STATE_NULL);
732   gst_object_unref (bus);
733   gst_object_unref (pipeline);
734 }
735 
736 GST_END_TEST;
737 
738 static GstPadProbeReturn
_drop_buffer_probe_cb(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)739 _drop_buffer_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
740 {
741   gint wait;
742 
743   if (GST_IS_BUFFER (info->data)) {
744     wait = GPOINTER_TO_INT (user_data);
745     if (wait > 0)
746       g_usleep (wait / 1000);
747     return GST_PAD_PROBE_DROP;
748   }
749 
750   return GST_PAD_PROBE_PASS;
751 }
752 
753 #define TIMEOUT_NUM_BUFFERS 20
754 static void
_test_timeout(gint buffer_wait)755 _test_timeout (gint buffer_wait)
756 {
757   GstBus *bus;
758   GstMessage *msg;
759   GstElement *pipeline, *src, *src1, *agg, *sink;
760   GstPad *src1pad;
761   gint count = 0;
762 
763   pipeline = gst_pipeline_new ("pipeline");
764   src = gst_element_factory_make ("fakesrc", NULL);
765   g_object_set (src, "num-buffers", TIMEOUT_NUM_BUFFERS, "sizetype", 2,
766       "sizemax", 4, "is-live", TRUE, "datarate", 4000, NULL);
767 
768   src1 = gst_element_factory_make ("fakesrc", NULL);
769   g_object_set (src1, "num-buffers", TIMEOUT_NUM_BUFFERS, "sizetype", 2,
770       "sizemax", 4, "is-live", TRUE, "datarate", 4000, NULL);
771 
772   agg = gst_check_setup_element ("testaggregator");
773   g_object_set (agg, "latency", GST_USECOND, NULL);
774   sink = gst_check_setup_element ("fakesink");
775   g_object_set (sink, "signal-handoffs", TRUE, NULL);
776   g_signal_connect (sink, "handoff", (GCallback) handoff, &count);
777 
778   fail_unless (gst_bin_add (GST_BIN (pipeline), src));
779   fail_unless (gst_bin_add (GST_BIN (pipeline), src1));
780   fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
781   fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
782 
783   src1pad = gst_element_get_static_pad (src1, "src");
784   fail_if (src1pad == NULL);
785   gst_pad_add_probe (src1pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
786       (GstPadProbeCallback) _drop_buffer_probe_cb,
787       GINT_TO_POINTER (buffer_wait), NULL);
788 
789   fail_unless (gst_element_link (src, agg));
790   fail_unless (gst_element_link (src1, agg));
791   fail_unless (gst_element_link (agg, sink));
792 
793   bus = gst_element_get_bus (pipeline);
794   fail_if (bus == NULL);
795   gst_element_set_state (pipeline, GST_STATE_PLAYING);
796 
797   msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
798   fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
799   gst_message_unref (msg);
800 
801   /* cannot rely on the exact number of buffers as the timeout may produce
802    * more buffers with the unsynchronized _aggregate() implementation in
803    * testaggregator */
804   fail_if (count < TIMEOUT_NUM_BUFFERS);
805 
806   gst_element_set_state (pipeline, GST_STATE_NULL);
807   gst_object_unref (src1pad);
808   gst_object_unref (bus);
809   gst_object_unref (pipeline);
810 }
811 
GST_START_TEST(test_timeout_pipeline)812 GST_START_TEST (test_timeout_pipeline)
813 {
814   _test_timeout (0);
815 }
816 
817 GST_END_TEST;
818 
GST_START_TEST(test_timeout_pipeline_with_wait)819 GST_START_TEST (test_timeout_pipeline_with_wait)
820 {
821   _test_timeout (1000000 /* 1 ms */ );
822 }
823 
824 GST_END_TEST;
825 
GST_START_TEST(test_flushing_seek)826 GST_START_TEST (test_flushing_seek)
827 {
828   GstEvent *event;
829   GThread *thread1, *thread2;
830   ChainData data1 = { 0, };
831   ChainData data2 = { 0, };
832   TestData test = { 0, };
833   GstBuffer *buf;
834   guint32 seqnum;
835 
836   _test_data_init (&test, TRUE);
837 
838   /* Queue a buffer in agg:sink_1. Then do a flushing seek and check that the
839    * new flushing seek logic is triggered. On the first FLUSH_START call the
840    * buffers queued in collectpads should get flushed. Only one FLUSH_START and
841    * one FLUSH_STOP should be forwarded downstream.
842    */
843   _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
844 
845   buf = gst_buffer_new ();
846   GST_BUFFER_TIMESTAMP (buf) = 0;
847   _chain_data_init (&data2, test.aggregator, buf, NULL);
848 
849   gst_segment_init (&GST_AGGREGATOR_PAD (GST_AGGREGATOR (test.
850               aggregator)->srcpad)->segment, GST_FORMAT_TIME);
851 
852   /* now do a successful flushing seek */
853   event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
854       GST_SEEK_TYPE_SET, 0, GST_SEEK_TYPE_SET, 10 * GST_SECOND);
855   seqnum = gst_event_get_seqnum (event);
856   fail_unless (gst_pad_send_event (test.srcpad, event));
857 
858   /* flushing starts when a flushing seek is received, and stops
859    * when all sink pads have received FLUSH_STOP */
860   fail_unless_equals_int (test.flush_start_events, 1);
861   fail_unless_equals_int (test.flush_stop_events, 0);
862 
863   /* send a first FLUSH_START on agg:sink_0, nothing will be sent
864    * downstream */
865   GST_DEBUG_OBJECT (data2.sinkpad, "send flush_start");
866   event = gst_event_new_flush_start ();
867   gst_event_set_seqnum (event, seqnum);
868   fail_unless (gst_pad_push_event (data2.srcpad, event));
869   fail_unless_equals_int (test.flush_start_events, 1);
870   fail_unless_equals_int (test.flush_stop_events, 0);
871 
872   /* expect this buffer to be flushed */
873   data2.expected_result = GST_FLOW_FLUSHING;
874   thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
875 
876   /* this should send no additional flush_start */
877   GST_DEBUG_OBJECT (data1.sinkpad, "send flush_start");
878   event = gst_event_new_flush_start ();
879   gst_event_set_seqnum (event, seqnum);
880   fail_unless (gst_pad_push_event (data1.srcpad, event));
881   fail_unless_equals_int (test.flush_start_events, 1);
882   fail_unless_equals_int (test.flush_stop_events, 0);
883 
884   /* the first FLUSH_STOP is not forwarded downstream */
885   GST_DEBUG_OBJECT (data1.srcpad, "send flush_stop");
886   event = gst_event_new_flush_stop (TRUE);
887   gst_event_set_seqnum (event, seqnum);
888   fail_unless (gst_pad_push_event (data1.srcpad, event));
889   fail_unless_equals_int (test.flush_start_events, 1);
890   fail_unless_equals_int (test.flush_stop_events, 0);
891 
892   /* at this point even the other pad agg:sink_1 should be flushing so thread2
893    * should have stopped */
894   g_thread_join (thread2);
895 
896   /* push a buffer on agg:sink_0 to trigger one collect after flushing to verify
897    * that flushing completes once all the pads have been flushed */
898   thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
899 
900   /* flush agg:sink_1 as well. This completes the flushing seek so a FLUSH_STOP is
901    * sent downstream */
902   GST_DEBUG_OBJECT (data2.srcpad, "send flush_stop");
903   event = gst_event_new_flush_stop (TRUE);
904   gst_event_set_seqnum (event, seqnum);
905   gst_pad_push_event (data2.srcpad, event);
906 
907   /* and the last FLUSH_STOP is forwarded downstream */
908   fail_unless_equals_int (test.flush_stop_events, 1);
909 
910   /*  Check collected */
911   gst_pad_add_probe (test.srcpad, GST_PAD_PROBE_TYPE_BUFFER,
912       (GstPadProbeCallback) _aggregated_cb, test.ml, NULL);
913 
914   g_queue_push_tail (data2.queue, gst_event_new_eos ());
915   thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
916 
917   g_main_loop_run (test.ml);
918   g_source_remove (test.timeout_id);
919 
920   fail_unless_equals_int (test.flush_stop_events, 1);
921 
922   /* these will return immediately as at this point the threads have been
923    * unlocked and are finished */
924   g_thread_join (thread1);
925   g_thread_join (thread2);
926 
927   _chain_data_clear (&data1);
928   _chain_data_clear (&data2);
929   _test_data_clear (&test);
930 
931 }
932 
933 GST_END_TEST;
934 
935 static void
infinite_seek(guint num_srcs,guint num_seeks,gboolean is_live)936 infinite_seek (guint num_srcs, guint num_seeks, gboolean is_live)
937 {
938   GstBus *bus;
939   GstMessage *message;
940   GstElement *pipeline, *src, *agg, *sink;
941   gint count = 0, i;
942   gboolean seek_res, carry_on = TRUE;
943 
944   pipeline = gst_pipeline_new ("pipeline");
945 
946   agg = gst_check_setup_element ("testaggregator");
947   sink = gst_check_setup_element ("fakesink");
948 
949   if (is_live)
950     g_object_set (agg, "latency", GST_MSECOND, NULL);
951 
952   fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
953   fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
954   fail_unless (gst_element_link (agg, sink));
955 
956   for (i = 0; i < num_srcs; i++) {
957     src = gst_element_factory_make ("fakesrc", NULL);
958     g_object_set (src, "sizetype", 2, "sizemax", 4,
959         "format", GST_FORMAT_TIME, "datarate", 1000, NULL);
960     if (is_live)
961       g_object_set (src, "is-live", TRUE, NULL);
962     fail_unless (gst_bin_add (GST_BIN (pipeline), src));
963     fail_unless (gst_element_link (src, agg));
964   }
965 
966   bus = gst_element_get_bus (pipeline);
967   fail_if (bus == NULL);
968   gst_element_set_state (pipeline, GST_STATE_PLAYING);
969   while (count < num_seeks && carry_on) {
970     message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 10);
971     if (message) {
972       switch (GST_MESSAGE_TYPE (message)) {
973         case GST_MESSAGE_EOS:
974         {
975           /* we should check if we really finished here */
976           GST_WARNING ("Got an EOS");
977           carry_on = FALSE;
978           break;
979         }
980         case GST_MESSAGE_STATE_CHANGED:
981         {
982           GstState new;
983 
984           if (GST_MESSAGE_SRC (message) == GST_OBJECT (pipeline)) {
985             gst_message_parse_state_changed (message, NULL, &new, NULL);
986 
987             if (new != GST_STATE_PLAYING)
988               break;
989 
990             GST_INFO ("Seeking (num: %i)", count);
991             seek_res =
992                 gst_element_seek_simple (sink, GST_FORMAT_TIME,
993                 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, 0);
994             GST_INFO ("seek result is : %d", seek_res);
995             fail_unless (seek_res != 0);
996             count++;
997           }
998 
999           break;
1000         }
1001         case GST_MESSAGE_ERROR:
1002           GST_ERROR ("Error on the bus: %" GST_PTR_FORMAT, message);
1003           carry_on = FALSE;
1004           fail_error_message (message);
1005           break;
1006         default:
1007           break;
1008       }
1009       gst_message_unref (message);
1010     }
1011   }
1012 
1013   gst_element_set_state (pipeline, GST_STATE_NULL);
1014   gst_object_unref (bus);
1015   gst_object_unref (pipeline);
1016 }
1017 
GST_START_TEST(test_infinite_seek)1018 GST_START_TEST (test_infinite_seek)
1019 {
1020   infinite_seek (2, 500, FALSE);
1021 }
1022 
1023 GST_END_TEST;
1024 
GST_START_TEST(test_infinite_seek_50_src)1025 GST_START_TEST (test_infinite_seek_50_src)
1026 {
1027   infinite_seek (50, 100, FALSE);
1028 }
1029 
1030 GST_END_TEST;
1031 
GST_START_TEST(test_infinite_seek_50_src_live)1032 GST_START_TEST (test_infinite_seek_50_src_live)
1033 {
1034   infinite_seek (50, 100, TRUE);
1035 }
1036 
1037 GST_END_TEST;
1038 
1039 typedef struct
1040 {
1041   GstElement *agg, *src, *pipeline;
1042   GCond *cond;
1043   GMutex *lock;
1044 } RemoveElementData;
1045 
1046 static GstPadProbeReturn
pad_probe_cb(GstPad * pad,GstPadProbeInfo * info,RemoveElementData * data)1047 pad_probe_cb (GstPad * pad, GstPadProbeInfo * info, RemoveElementData * data)
1048 {
1049   GstPad *peer;
1050 
1051   GST_INFO_OBJECT (pad, "Removing pad");
1052 
1053   peer = gst_pad_get_peer (pad);
1054   gst_pad_unlink (pad, peer);
1055   gst_element_release_request_pad (data->agg, peer);
1056   fail_unless (gst_bin_remove (GST_BIN (data->pipeline), data->src));
1057   gst_object_unref (peer);
1058 
1059   g_mutex_lock (data->lock);
1060   g_cond_broadcast (data->cond);
1061   g_mutex_unlock (data->lock);
1062 
1063   return GST_PAD_PROBE_OK;
1064 }
1065 
GST_START_TEST(test_add_remove)1066 GST_START_TEST (test_add_remove)
1067 {
1068   /* Used to notify that we removed the pad from  */
1069   GCond cond;
1070   GMutex lock;
1071   GstBus *bus;
1072   GstState state;
1073   GstMessage *message;
1074   gboolean carry_on = TRUE;
1075   guint num_iterations = 100;
1076   GstPad *pad;
1077   GstElement *pipeline, *src, *src1 = NULL, *agg, *sink;
1078   gint count = 0;
1079 
1080   g_mutex_init (&lock);
1081   g_cond_init (&cond);
1082 
1083   pipeline = gst_pipeline_new ("pipeline");
1084 
1085   agg = gst_check_setup_element ("testaggregator");
1086   sink = gst_check_setup_element ("fakesink");
1087 
1088   fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
1089   fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
1090   fail_unless (gst_element_link (agg, sink));
1091 
1092   bus = gst_element_get_bus (pipeline);
1093   while (count < num_iterations) {
1094 
1095     src = gst_element_factory_make ("fakesrc", NULL);
1096     g_object_set (src, "num-buffers", 100000, "sizetype", 2, "sizemax", 4,
1097         "format", GST_FORMAT_TIME, "datarate", 1000, NULL);
1098     gst_element_set_locked_state (src, TRUE);
1099     fail_unless (gst_bin_add (GST_BIN (pipeline), src));
1100     fail_unless (gst_element_link (src, agg));
1101     gst_element_set_locked_state (src, FALSE);
1102     fail_unless (gst_element_sync_state_with_parent (src));
1103 
1104     if (count == 0)
1105       gst_element_set_state (pipeline, GST_STATE_PLAYING);
1106 
1107     /* Now make sure the seek happened */
1108     carry_on = TRUE;
1109     do {
1110       message = gst_bus_timed_pop (bus, -1);
1111       switch (GST_MESSAGE_TYPE (message)) {
1112         case GST_MESSAGE_EOS:
1113         {
1114           /* we should check if we really finished here */
1115           GST_WARNING ("Got an EOS");
1116           carry_on = FALSE;
1117           break;
1118         }
1119         case GST_MESSAGE_STATE_CHANGED:
1120         {
1121           if (GST_MESSAGE_SRC (message) == GST_OBJECT (pipeline)) {
1122             gst_message_parse_state_changed (message, NULL, &state, NULL);
1123 
1124             if (state == GST_STATE_PLAYING) {
1125               RemoveElementData data;
1126 
1127               carry_on = FALSE;
1128               if (count == 0) {
1129                 GST_DEBUG ("First run, not removing any element yet");
1130 
1131                 break;
1132               }
1133 
1134               data.src = gst_object_ref (src1);
1135               data.agg = agg;
1136               data.lock = &lock;
1137               data.cond = &cond;
1138               data.pipeline = pipeline;
1139               pad = gst_element_get_static_pad (data.src, "src");
1140 
1141               g_mutex_lock (&lock);
1142               gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
1143                   (GstPadProbeCallback) pad_probe_cb, &data, NULL);
1144               GST_INFO ("Waiting for %" GST_PTR_FORMAT " %s", pad,
1145                   gst_element_state_get_name (GST_STATE (data.src)));
1146               g_cond_wait (&cond, &lock);
1147               g_mutex_unlock (&lock);
1148               gst_object_unref (pad);
1149 
1150               /*  We can not set state from the streaming thread so we
1151                *  need to make sure that the source has been removed
1152                *  before setting its state to NULL */
1153               gst_element_set_state (data.src, GST_STATE_NULL);
1154 
1155               gst_object_unref (data.src);
1156             }
1157           }
1158 
1159           break;
1160         }
1161         case GST_MESSAGE_ERROR:
1162         {
1163           GST_ERROR ("Error on the bus: %" GST_PTR_FORMAT, message);
1164           carry_on = FALSE;
1165           fail_error_message (message);
1166           break;
1167         }
1168         default:
1169           break;
1170       }
1171 
1172       gst_message_unref (message);
1173     } while (carry_on);
1174 
1175     GST_INFO ("Seeking");
1176     fail_unless (gst_element_seek_simple (pipeline, GST_FORMAT_TIME,
1177             GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, 0));
1178 
1179     count++;
1180     src1 = src;
1181   }
1182   gst_element_set_state (pipeline, GST_STATE_NULL);
1183   gst_object_unref (bus);
1184   gst_object_unref (pipeline);
1185   g_mutex_clear (&lock);
1186   g_cond_clear (&cond);
1187 }
1188 
1189 GST_END_TEST;
1190 
GST_START_TEST(test_change_state_intensive)1191 GST_START_TEST (test_change_state_intensive)
1192 {
1193   GstBus *bus;
1194   GstMessage *message;
1195   GstElement *pipeline, *src, *agg, *sink;
1196   gint i, state_i = 0, num_srcs = 3;
1197   gboolean carry_on = TRUE, ready = FALSE;
1198   GstStateChangeReturn state_return;
1199   GstState wanted_state, wanted_states[] = {
1200     GST_STATE_PLAYING, GST_STATE_NULL, GST_STATE_PAUSED, GST_STATE_READY,
1201     GST_STATE_PLAYING, GST_STATE_NULL, GST_STATE_PAUSED, GST_STATE_READY,
1202     GST_STATE_PLAYING, GST_STATE_NULL, GST_STATE_PAUSED, GST_STATE_READY,
1203     GST_STATE_PAUSED, GST_STATE_READY, GST_STATE_PAUSED, GST_STATE_READY,
1204     GST_STATE_PAUSED, GST_STATE_READY, GST_STATE_PAUSED, GST_STATE_NULL,
1205     GST_STATE_PAUSED, GST_STATE_NULL, GST_STATE_PAUSED, GST_STATE_NULL,
1206     GST_STATE_PAUSED, GST_STATE_NULL, GST_STATE_PAUSED, GST_STATE_NULL,
1207     GST_STATE_PAUSED, GST_STATE_NULL, GST_STATE_PLAYING, GST_STATE_NULL,
1208     GST_STATE_PLAYING, GST_STATE_NULL, GST_STATE_PLAYING, GST_STATE_NULL,
1209     GST_STATE_PLAYING, GST_STATE_NULL, GST_STATE_PLAYING, GST_STATE_NULL,
1210     GST_STATE_PLAYING, GST_STATE_NULL, GST_STATE_PLAYING, GST_STATE_NULL,
1211   };
1212 
1213   pipeline = gst_pipeline_new ("pipeline");
1214 
1215   agg = gst_check_setup_element ("testaggregator");
1216   sink = gst_check_setup_element ("fakesink");
1217 
1218   fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
1219   fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
1220   fail_unless (gst_element_link (agg, sink));
1221 
1222   for (i = 0; i < num_srcs; i++) {
1223     src = gst_element_factory_make ("fakesrc", NULL);
1224     g_object_set (src, "sizetype", 2, "sizemax", 4, NULL);
1225     fail_unless (gst_bin_add (GST_BIN (pipeline), src));
1226     fail_unless (gst_element_link (src, agg));
1227   }
1228 
1229   bus = gst_element_get_bus (pipeline);
1230   fail_if (bus == NULL);
1231 
1232   wanted_state = wanted_states[state_i++];
1233   state_return = gst_element_set_state (pipeline, wanted_state);
1234 
1235   while (state_i < G_N_ELEMENTS (wanted_states) && carry_on) {
1236     if (state_return == GST_STATE_CHANGE_SUCCESS && ready) {
1237       wanted_state = wanted_states[state_i++];
1238       fail_unless (gst_element_set_state (pipeline, wanted_state),
1239           GST_STATE_CHANGE_SUCCESS);
1240       GST_INFO ("Wanted state: %s", gst_element_state_get_name (wanted_state));
1241     }
1242 
1243     message = gst_bus_poll (bus, GST_MESSAGE_ANY, GST_SECOND / 10);
1244     if (message) {
1245       switch (GST_MESSAGE_TYPE (message)) {
1246         case GST_MESSAGE_EOS:
1247         {
1248           /* we should check if we really finished here */
1249           GST_WARNING ("Got an EOS");
1250           carry_on = FALSE;
1251           break;
1252         }
1253         case GST_MESSAGE_STATE_CHANGED:
1254         {
1255           GstState new;
1256 
1257           if (GST_MESSAGE_SRC (message) == GST_OBJECT (pipeline)) {
1258             gst_message_parse_state_changed (message, NULL, &new, NULL);
1259 
1260             if (new != wanted_state) {
1261               ready = FALSE;
1262               break;
1263             }
1264 
1265             GST_DEBUG ("State %s reached",
1266                 gst_element_state_get_name (wanted_state));
1267             wanted_state = wanted_states[state_i++];
1268             GST_DEBUG ("Wanted state: %s",
1269                 gst_element_state_get_name (wanted_state));
1270             state_return = gst_element_set_state (pipeline, wanted_state);
1271             fail_unless (state_return == GST_STATE_CHANGE_SUCCESS ||
1272                 state_return == GST_STATE_CHANGE_ASYNC);
1273             ready = TRUE;
1274           }
1275 
1276           break;
1277         }
1278         case GST_MESSAGE_ERROR:
1279           GST_ERROR ("Error on the bus: %" GST_PTR_FORMAT, message);
1280           carry_on = FALSE;
1281           break;
1282         default:
1283           break;
1284       }
1285       gst_message_unref (message);
1286     }
1287   }
1288 
1289   gst_element_set_state (pipeline, GST_STATE_NULL);
1290   gst_object_unref (bus);
1291   gst_object_unref (pipeline);
1292 }
1293 
1294 GST_END_TEST;
1295 
GST_START_TEST(test_flush_on_aggregate)1296 GST_START_TEST (test_flush_on_aggregate)
1297 {
1298   GThread *thread1, *thread2;
1299   ChainData data1 = { 0, };
1300   ChainData data2 = { 0, };
1301   TestData test = { 0, };
1302 
1303   _test_data_init (&test, FALSE);
1304   ((GstTestAggregator *) test.aggregator)->do_flush_on_aggregate = TRUE;
1305   _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
1306   _chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL);
1307 
1308   thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
1309   thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
1310 
1311   g_main_loop_run (test.ml);
1312   g_source_remove (test.timeout_id);
1313 
1314   /* these will return immediately as when the data is popped the threads are
1315    * unlocked and will terminate */
1316   g_thread_join (thread1);
1317   g_thread_join (thread2);
1318 
1319   _chain_data_clear (&data1);
1320   _chain_data_clear (&data2);
1321   _test_data_clear (&test);
1322 }
1323 
1324 GST_END_TEST;
1325 
GST_START_TEST(test_remove_pad_on_aggregate)1326 GST_START_TEST (test_remove_pad_on_aggregate)
1327 {
1328   GThread *thread1, *thread2;
1329   ChainData data1 = { 0, };
1330   ChainData data2 = { 0, };
1331   TestData test = { 0, };
1332 
1333   _test_data_init (&test, FALSE);
1334   ((GstTestAggregator *) test.aggregator)->do_remove_pad_on_aggregate = TRUE;
1335   _chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
1336   _chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL);
1337 
1338   thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
1339   thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
1340 
1341   g_main_loop_run (test.ml);
1342   g_source_remove (test.timeout_id);
1343 
1344   /* these will return immediately as when the data is popped the threads are
1345    * unlocked and will terminate */
1346   g_thread_join (thread1);
1347   g_thread_join (thread2);
1348 
1349   _chain_data_clear (&data1);
1350   _chain_data_clear (&data2);
1351   _test_data_clear (&test);
1352 }
1353 
1354 GST_END_TEST;
1355 
1356 static Suite *
gst_aggregator_suite(void)1357 gst_aggregator_suite (void)
1358 {
1359   Suite *suite;
1360   TCase *general;
1361 
1362   gst_test_aggregator_plugin_register ();
1363 
1364   suite = suite_create ("GstAggregator");
1365 
1366   general = tcase_create ("general");
1367   suite_add_tcase (suite, general);
1368   tcase_add_test (general, test_aggregate);
1369   tcase_add_test (general, test_aggregate_eos);
1370   tcase_add_test (general, test_aggregate_gap);
1371   tcase_add_test (general, test_aggregate_handle_events);
1372   tcase_add_test (general, test_aggregate_handle_queries);
1373   tcase_add_test (general, test_flushing_seek);
1374   tcase_add_test (general, test_infinite_seek);
1375   tcase_add_test (general, test_infinite_seek_50_src);
1376   tcase_add_test (general, test_infinite_seek_50_src_live);
1377   tcase_add_test (general, test_linear_pipeline);
1378   tcase_add_test (general, test_two_src_pipeline);
1379   tcase_add_test (general, test_timeout_pipeline);
1380   tcase_add_test (general, test_timeout_pipeline_with_wait);
1381   tcase_add_test (general, test_add_remove);
1382   tcase_add_test (general, test_change_state_intensive);
1383   tcase_add_test (general, test_flush_on_aggregate);
1384   tcase_add_test (general, test_remove_pad_on_aggregate);
1385 
1386   return suite;
1387 }
1388 
1389 GST_CHECK_MAIN (gst_aggregator);
1390