• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  *
3  * Copyright (C) 2010, Thiago Santos <thiago.sousa.santos@collabora.co.uk>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <gst/check/check.h>
26 #include <gst/app/gstappsrc.h>
27 #include <gst/app/gstappsink.h>
28 
29 #ifdef HAVE_VALGRIND
30 #include <valgrind/valgrind.h>
31 #else
32 #define RUNNING_ON_VALGRIND FALSE
33 #endif
34 
35 #define SAMPLE_CAPS "application/x-gst-check-test"
36 
37 static GstPad *mysinkpad;
38 
39 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
40     GST_PAD_SINK,
41     GST_PAD_ALWAYS,
42     GST_STATIC_CAPS_ANY);
43 
44 static GstElement *
setup_appsrc(void)45 setup_appsrc (void)
46 {
47   GstElement *appsrc;
48 
49   GST_DEBUG ("setup_appsrc");
50   appsrc = gst_check_setup_element ("appsrc");
51   mysinkpad = gst_check_setup_sink_pad (appsrc, &sinktemplate);
52 
53   gst_pad_set_active (mysinkpad, TRUE);
54 
55   return appsrc;
56 }
57 
58 static void
cleanup_appsrc(GstElement * appsrc)59 cleanup_appsrc (GstElement * appsrc)
60 {
61   GST_DEBUG ("cleanup_appsrc");
62 
63   gst_check_drop_buffers ();
64   gst_check_teardown_sink_pad (appsrc);
65   gst_check_teardown_element (appsrc);
66 }
67 
68 /*
69  * Pushes 4 buffers into appsrc and checks the caps on them on the output.
70  *
71  * Appsrc is configured with caps=SAMPLE_CAPS, so the buffers should have the
72  * same caps that they were pushed with.
73  *
74  * The 4 buffers have NULL, SAMPLE_CAPS, NULL, SAMPLE_CAPS caps,
75  * respectively.
76  */
GST_START_TEST(test_appsrc_non_null_caps)77 GST_START_TEST (test_appsrc_non_null_caps)
78 {
79   GstElement *src;
80   GstBuffer *buffer;
81   GstCaps *caps, *ccaps;
82 
83   src = setup_appsrc ();
84 
85   caps = gst_caps_from_string (SAMPLE_CAPS);
86   g_object_set (src, "caps", caps, NULL);
87 
88   ASSERT_SET_STATE (src, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
89 
90   buffer = gst_buffer_new_and_alloc (4);
91   fail_unless (gst_app_src_push_buffer (GST_APP_SRC (src),
92           buffer) == GST_FLOW_OK);
93 
94   buffer = gst_buffer_new_and_alloc (4);
95   fail_unless (gst_app_src_push_buffer (GST_APP_SRC (src),
96           buffer) == GST_FLOW_OK);
97 
98   buffer = gst_buffer_new_and_alloc (4);
99   fail_unless (gst_app_src_push_buffer (GST_APP_SRC (src),
100           buffer) == GST_FLOW_OK);
101 
102   buffer = gst_buffer_new_and_alloc (4);
103   fail_unless (gst_app_src_push_buffer (GST_APP_SRC (src),
104           buffer) == GST_FLOW_OK);
105 
106   fail_unless (gst_app_src_end_of_stream (GST_APP_SRC (src)) == GST_FLOW_OK);
107 
108   /* Give some time to the appsrc loop to push the buffers */
109   g_usleep (G_USEC_PER_SEC * 3);
110 
111   /* Check the output caps */
112   fail_unless (g_list_length (buffers) == 4);
113 
114   ccaps = gst_pad_get_current_caps (mysinkpad);
115   fail_unless (gst_caps_is_equal (ccaps, caps));
116   gst_caps_unref (ccaps);
117 
118   ASSERT_SET_STATE (src, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
119   gst_caps_unref (caps);
120   cleanup_appsrc (src);
121 }
122 
123 GST_END_TEST;
124 
125 static GstAppSinkCallbacks app_callbacks;
126 
127 typedef struct
128 {
129   GstElement *source;
130   GstElement *sink;
131 } ProgramData;
132 
133 static GstFlowReturn
on_new_sample_from_source(GstAppSink * elt,gpointer user_data)134 on_new_sample_from_source (GstAppSink * elt, gpointer user_data)
135 {
136   ProgramData *data = (ProgramData *) user_data;
137   GstSample *sample;
138   GstBuffer *buffer;
139   GstElement *source;
140 
141   sample = gst_app_sink_pull_sample (GST_APP_SINK (elt));
142   buffer = gst_sample_get_buffer (sample);
143   source = gst_bin_get_by_name (GST_BIN (data->sink), "testsource");
144   gst_app_src_push_buffer (GST_APP_SRC (source), gst_buffer_ref (buffer));
145   gst_sample_unref (sample);
146   g_object_unref (source);
147   return GST_FLOW_OK;
148 }
149 
150 /*
151  * appsink => appsrc pipelines executed 100 times:
152  * - appsink pipeline has sync=false
153  * - appsrc pipeline has sync=true
154  * - appsrc has block=true
155  * after 1 second an error message is posted on appsink pipeline bus
156  * when the error is received the appsrc pipeline is set to NULL
157  * and then the appsink pipeline is
158  * set to NULL too, this must not deadlock
159  */
160 
GST_START_TEST(test_appsrc_block_deadlock)161 GST_START_TEST (test_appsrc_block_deadlock)
162 {
163   GstElement *testsink;
164   ProgramData *data;
165 
166   GST_INFO ("iteration %d", __i__);
167 
168   data = g_new0 (ProgramData, 1);
169 
170   data->source =
171       gst_parse_launch ("videotestsrc ! video/x-raw,width=16,height=16 ! "
172       "appsink sync=false name=testsink", NULL);
173 
174   fail_unless (data->source != NULL);
175 
176   app_callbacks.new_sample = on_new_sample_from_source;
177   testsink = gst_bin_get_by_name (GST_BIN (data->source), "testsink");
178   gst_app_sink_set_callbacks (GST_APP_SINK_CAST (testsink), &app_callbacks,
179       data, NULL);
180 
181   gst_object_unref (testsink);
182 
183   data->sink =
184       gst_parse_launch
185       ("appsrc name=testsource block=1 max-bytes=1000 is-live=true ! "
186       "fakesink sync=true", NULL);
187 
188   fail_unless (data->sink != NULL);
189 
190   ASSERT_SET_STATE (data->sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
191   ASSERT_SET_STATE (data->source, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
192 
193   /* wait for preroll */
194   gst_element_get_state (data->source, NULL, NULL, GST_CLOCK_TIME_NONE);
195   gst_element_get_state (data->sink, NULL, NULL, GST_CLOCK_TIME_NONE);
196 
197   g_usleep (50 * (G_USEC_PER_SEC / 1000));
198 
199   ASSERT_SET_STATE (data->sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
200   ASSERT_SET_STATE (data->source, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
201 
202   gst_object_unref (data->source);
203   gst_object_unref (data->sink);
204   g_free (data);
205 }
206 
207 GST_END_TEST;
208 
209 typedef struct
210 {
211   GstCaps *caps1;
212   GstCaps *caps2;
213   GstCaps *expected_caps;
214 } Helper;
215 
216 static void
caps_notify_cb(GObject * obj,GObject * child,GParamSpec * pspec,Helper * h)217 caps_notify_cb (GObject * obj, GObject * child, GParamSpec * pspec, Helper * h)
218 {
219   GstCaps *caps = NULL;
220 
221   g_object_get (child, "caps", &caps, NULL);
222   if (caps) {
223     GST_LOG_OBJECT (child, "expected caps: %" GST_PTR_FORMAT, h->expected_caps);
224     GST_LOG_OBJECT (child, "caps set to  : %" GST_PTR_FORMAT, caps);
225     fail_unless (gst_caps_is_equal (caps, h->expected_caps));
226     gst_caps_unref (caps);
227   }
228 }
229 
230 static void
handoff_cb(GstElement * sink,GstBuffer * buf,GstPad * pad,Helper * h)231 handoff_cb (GstElement * sink, GstBuffer * buf, GstPad * pad, Helper * h)
232 {
233   /* have our buffer, now the caps should change */
234   h->expected_caps = h->caps2;
235   GST_INFO ("got buffer, expect caps %" GST_PTR_FORMAT " next", h->caps2);
236 }
237 
238 /* Make sure that if set_caps() is called twice before the source is started,
239  * the caps are just replaced and not put into the internal queue */
GST_START_TEST(test_appsrc_set_caps_twice)240 GST_START_TEST (test_appsrc_set_caps_twice)
241 {
242   GstElement *pipe, *src, *sink;
243   GstMessage *msg;
244   GstCaps *caps;
245   Helper h;
246 
247   h.caps1 = gst_caps_new_simple ("foo/bar", "bleh", G_TYPE_INT, 2, NULL);
248   h.caps2 = gst_caps_new_simple ("bar/foo", "xyz", G_TYPE_INT, 3, NULL);
249 
250   pipe = gst_pipeline_new ("pipeline");
251   src = gst_element_factory_make ("appsrc", NULL);
252   sink = gst_element_factory_make ("fakesink", NULL);
253   gst_bin_add_many (GST_BIN (pipe), src, sink, NULL);
254   gst_element_link (src, sink);
255 
256   g_signal_connect (pipe, "deep-notify::caps", G_CALLBACK (caps_notify_cb), &h);
257 
258   g_object_set (sink, "signal-handoffs", TRUE, NULL);
259   g_signal_connect (sink, "handoff", G_CALLBACK (handoff_cb), &h);
260 
261   /* case 1: set caps to caps1, then set again to caps2, all this before
262    * appsrc is started and before any buffers are in the queue yet. We don't
263    * want to see any trace of caps1 during negotiation in this case. */
264   gst_app_src_set_caps (GST_APP_SRC (src), h.caps1);
265   caps = gst_app_src_get_caps (GST_APP_SRC (src));
266   fail_unless (gst_caps_is_equal (caps, h.caps1));
267   gst_caps_unref (caps);
268 
269   gst_app_src_set_caps (GST_APP_SRC (src), h.caps2);
270   caps = gst_app_src_get_caps (GST_APP_SRC (src));
271   fail_unless (gst_caps_is_equal (caps, h.caps2));
272   gst_caps_unref (caps);
273 
274   gst_app_src_end_of_stream (GST_APP_SRC (src));
275 
276   h.expected_caps = h.caps2;
277 
278   gst_element_set_state (pipe, GST_STATE_PLAYING);
279 
280   msg =
281       gst_bus_timed_pop_filtered (GST_ELEMENT_BUS (pipe), -1, GST_MESSAGE_EOS);
282   gst_message_unref (msg);
283 
284   gst_element_set_state (pipe, GST_STATE_NULL);
285   gst_object_unref (pipe);
286 
287   GST_INFO ("Case #2");
288 
289   /* case 2: set caps to caps1, then push a buffer and set to caps2, again
290    * before appsrc is started. In this case appsrc should negotiate to caps1
291    * first, and then caps2 after pushing the first buffer. */
292 
293   /* We're creating a new pipeline/appsrc here because appsrc's behaviour
294    * change slightly after setting it to NULL/READY and then re-using it */
295   pipe = gst_pipeline_new ("pipeline");
296   src = gst_element_factory_make ("appsrc", NULL);
297   sink = gst_element_factory_make ("fakesink", NULL);
298   gst_bin_add_many (GST_BIN (pipe), src, sink, NULL);
299   gst_element_link (src, sink);
300 
301   g_signal_connect (pipe, "deep-notify::caps", G_CALLBACK (caps_notify_cb), &h);
302 
303   g_object_set (sink, "signal-handoffs", TRUE, NULL);
304   g_signal_connect (sink, "handoff", G_CALLBACK (handoff_cb), &h);
305 
306   gst_app_src_set_caps (GST_APP_SRC (src), h.caps1);
307   caps = gst_app_src_get_caps (GST_APP_SRC (src));
308   fail_unless (gst_caps_is_equal (caps, h.caps1));
309   gst_caps_unref (caps);
310 
311   /* first caps1, then buffer, then later caps2 */
312   h.expected_caps = h.caps1;
313 
314   gst_element_set_state (pipe, GST_STATE_PLAYING);
315 
316   gst_app_src_push_buffer (GST_APP_SRC (src), gst_buffer_new ());
317 
318   gst_app_src_set_caps (GST_APP_SRC (src), h.caps2);
319   caps = gst_app_src_get_caps (GST_APP_SRC (src));
320   fail_unless (gst_caps_is_equal (caps, h.caps2));
321   gst_caps_unref (caps);
322 
323   gst_app_src_end_of_stream (GST_APP_SRC (src));
324 
325   msg =
326       gst_bus_timed_pop_filtered (GST_ELEMENT_BUS (pipe), -1, GST_MESSAGE_EOS);
327   gst_message_unref (msg);
328 
329   gst_element_set_state (pipe, GST_STATE_NULL);
330   gst_object_unref (pipe);
331 
332   gst_caps_unref (h.caps2);
333   gst_caps_unref (h.caps1);
334 }
335 
336 GST_END_TEST;
337 
338 static gboolean
seek_cb(GstAppSrc * src,guint64 offset,gpointer data)339 seek_cb (GstAppSrc * src, guint64 offset, gpointer data)
340 {
341   /* Return fake true */
342   return TRUE;
343 }
344 
345 static void
caps_cb(GObject * obj,GObject * child,GParamSpec * pspec,GstCaps ** received_caps)346 caps_cb (GObject * obj, GObject * child, GParamSpec * pspec,
347     GstCaps ** received_caps)
348 {
349   GstCaps *caps = NULL;
350 
351   /* Collect the caps */
352   g_object_get (child, "caps", &caps, NULL);
353   if (caps) {
354     GST_LOG_OBJECT (child, "caps set to  : %" GST_PTR_FORMAT, caps);
355     gst_caps_replace (received_caps, caps);
356     gst_caps_unref (caps);
357   }
358 }
359 
GST_START_TEST(test_appsrc_caps_in_push_modes)360 GST_START_TEST (test_appsrc_caps_in_push_modes)
361 {
362   GstElement *pipe, *src, *sink;
363   GstMessage *msg;
364   GstCaps *caps, *caps1, *received_caps;
365   gint i;
366   GstMessageType msg_types;
367   GstAppSrcCallbacks cb = { 0 };
368   GstAppStreamType modes[] = { GST_APP_STREAM_TYPE_STREAM,
369     GST_APP_STREAM_TYPE_SEEKABLE,
370     GST_APP_STREAM_TYPE_RANDOM_ACCESS
371   };
372 
373   for (i = 0; i < sizeof (modes) / sizeof (modes[0]); i++) {
374     GST_INFO ("checking mode %d", modes[i]);
375     caps1 = gst_caps_new_simple ("foo/bar", "bleh", G_TYPE_INT, 2, NULL);
376     received_caps = NULL;
377 
378     pipe = gst_pipeline_new ("pipeline");
379     src = gst_element_factory_make ("appsrc", NULL);
380     sink = gst_element_factory_make ("fakesink", NULL);
381     gst_bin_add_many (GST_BIN (pipe), src, sink, NULL);
382     gst_element_link (src, sink);
383 
384     g_object_set (G_OBJECT (src), "stream-type", modes[i], NULL);
385     if (modes[i] != GST_APP_STREAM_TYPE_STREAM) {
386       cb.seek_data = seek_cb;
387       gst_app_src_set_callbacks (GST_APP_SRC (src), &cb, NULL, NULL);
388     }
389     g_signal_connect (pipe, "deep-notify::caps", G_CALLBACK (caps_cb),
390         &received_caps);
391 
392     gst_app_src_set_caps (GST_APP_SRC (src), caps1);
393     caps = gst_app_src_get_caps (GST_APP_SRC (src));
394     fail_unless (gst_caps_is_equal (caps, caps1));
395     gst_caps_unref (caps);
396 
397     gst_element_set_state (pipe, GST_STATE_PLAYING);
398 
399     if (modes[i] != GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
400       gst_app_src_end_of_stream (GST_APP_SRC (src));
401       msg_types = GST_MESSAGE_EOS;
402     } else {
403       gst_app_src_push_buffer (GST_APP_SRC (src), gst_buffer_new ());
404       msg_types = GST_MESSAGE_ASYNC_DONE;
405     }
406 
407     msg = gst_bus_timed_pop_filtered (GST_ELEMENT_BUS (pipe), -1, msg_types);
408     gst_message_unref (msg);
409     /* The collected caps should match with one that was pushed */
410     fail_unless (received_caps && gst_caps_is_equal (received_caps, caps1));
411 
412     gst_element_set_state (pipe, GST_STATE_NULL);
413     gst_object_unref (pipe);
414     gst_caps_unref (caps1);
415     if (received_caps)
416       gst_caps_unref (received_caps);
417   }
418 }
419 
420 GST_END_TEST;
421 
422 /* This test simulates a pipeline blocked pushing caps using a blocking pad
423  * probe. This state is seen if the application push buffers and later change
424  * the caps on one stream before the other stream have prerolled. In this
425  * state, GStreamer 1.12 and previous would deadlock inside GstBaseSrc as
426  * it was holding the live lock while calling create(). AppSrc serialize the
427  * caps event into it's queue and then push it downstream when create() is
428  * called. */
429 
430 static GstPadProbeReturn
caps_event_probe_cb(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)431 caps_event_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
432 {
433   GMainLoop *loop = user_data;
434 
435   if (GST_EVENT_TYPE (info->data) == GST_EVENT_CAPS) {
436     g_main_loop_quit (loop);
437     return GST_PAD_PROBE_OK;
438   }
439 
440   return GST_PAD_PROBE_PASS;
441 }
442 
GST_START_TEST(test_appsrc_blocked_on_caps)443 GST_START_TEST (test_appsrc_blocked_on_caps)
444 {
445   GstElement *pipeline = NULL, *app = NULL;
446   GstPad *pad = NULL;
447   GstCaps *caps = NULL;
448   GError *error = NULL;
449   GMainLoop *loop;
450 
451   loop = g_main_loop_new (NULL, FALSE);
452 
453   pipeline = gst_parse_launch ("appsrc is-live=1 name=app ! fakesink", &error);
454   g_assert_no_error (error);
455 
456   app = gst_bin_get_by_name (GST_BIN (pipeline), "app");
457   pad = gst_element_get_static_pad (app, "src");
458 
459   gst_pad_add_probe (pad,
460       GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
461       caps_event_probe_cb, loop, NULL);
462   gst_object_unref (app);
463   gst_object_unref (pad);
464 
465   gst_element_set_state (pipeline, GST_STATE_PLAYING);
466 
467   caps = gst_caps_from_string ("application/x-test");
468   gst_app_src_set_caps (GST_APP_SRC (app), caps);
469   gst_caps_unref (caps);
470 
471   g_main_loop_run (loop);
472 
473 #if 0
474   /* This would work around the issue by deblocking the source on older
475    * version of GStreamer */
476   gst_element_send_event (app, gst_event_new_flush_start ());
477 #endif
478 
479   /* As appsrc change the caps GstBaseSrc::create() virtual function, the live
480    * lock use to remains held and prevented the state change from happening. */
481   gst_element_set_state (pipeline, GST_STATE_NULL);
482   gst_object_unref (pipeline);
483   g_main_loop_unref (loop);
484 }
485 
486 GST_END_TEST;
487 
488 static guint expect_offset;
489 static gboolean chainlist_called;
490 static gboolean done;
491 
492 static gboolean
event_func(GstPad * pad,GstObject * parent,GstEvent * event)493 event_func (GstPad * pad, GstObject * parent, GstEvent * event)
494 {
495   GST_LOG ("event %" GST_PTR_FORMAT, event);
496   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
497     g_mutex_lock (&check_mutex);
498     done = TRUE;
499     g_cond_signal (&check_cond);
500     g_mutex_unlock (&check_mutex);
501   }
502   gst_event_unref (event);
503   return TRUE;
504 }
505 
506 static GstFlowReturn
chain_____func(GstPad * pad,GstObject * parent,GstBuffer * buf)507 chain_____func (GstPad * pad, GstObject * parent, GstBuffer * buf)
508 {
509   GST_LOG ("  buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
510 
511   fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset);
512   ++expect_offset;
513   gst_buffer_unref (buf);
514 
515   return GST_FLOW_OK;
516 }
517 
518 static GstFlowReturn
chainlist_func(GstPad * pad,GstObject * parent,GstBufferList * list)519 chainlist_func (GstPad * pad, GstObject * parent, GstBufferList * list)
520 {
521   guint i, len;
522 
523   len = gst_buffer_list_length (list);
524 
525   GST_DEBUG ("buffer list with %u buffers", len);
526   for (i = 0; i < len; ++i) {
527     GstBuffer *buf = gst_buffer_list_get (list, i);
528     GST_LOG ("  buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
529 
530     fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset);
531     ++expect_offset;
532   }
533   chainlist_called = TRUE;
534   gst_buffer_list_unref (list);
535   return GST_FLOW_OK;
536 }
537 
GST_START_TEST(test_appsrc_push_buffer_list)538 GST_START_TEST (test_appsrc_push_buffer_list)
539 {
540   GstElement *src;
541   guint i;
542 
543   src = gst_element_factory_make ("appsrc", "appsrc");
544 
545   mysinkpad = gst_check_setup_sink_pad (src, &sinktemplate);
546   gst_pad_set_chain_function (mysinkpad, chain_____func);
547   gst_pad_set_chain_list_function (mysinkpad, chainlist_func);
548   gst_pad_set_event_function (mysinkpad, event_func);
549   gst_pad_set_active (mysinkpad, TRUE);
550 
551   expect_offset = 0;
552   chainlist_called = FALSE;
553   done = FALSE;
554 
555   gst_element_set_state (src, GST_STATE_PLAYING);
556 
557 #define NUM_BUFFERS 100
558 
559   for (i = 0; i < NUM_BUFFERS; ++i) {
560     GstFlowReturn flow;
561     GstBuffer *buf;
562 
563     buf = gst_buffer_new ();
564     GST_BUFFER_OFFSET (buf) = i;
565 
566     if (i == 0 || g_random_boolean ()) {
567       GstBufferList *buflist = gst_buffer_list_new ();
568 
569       gst_buffer_list_add (buflist, buf);
570 
571       buf = gst_buffer_new ();
572       GST_BUFFER_OFFSET (buf) = ++i;
573       gst_buffer_list_add (buflist, buf);
574       if (g_random_boolean ()) {
575         flow = gst_app_src_push_buffer_list (GST_APP_SRC (src), buflist);
576       } else {
577         g_signal_emit_by_name (src, "push-buffer-list", buflist, &flow);
578         gst_buffer_list_unref (buflist);
579       }
580     } else {
581       flow = gst_app_src_push_buffer (GST_APP_SRC (src), buf);
582     }
583     fail_unless_equals_int (flow, GST_FLOW_OK);
584   }
585 
586   gst_app_src_end_of_stream (GST_APP_SRC (src));
587 
588   g_mutex_lock (&check_mutex);
589   while (!done)
590     g_cond_wait (&check_cond, &check_mutex);
591   g_mutex_unlock (&check_mutex);
592 
593   gst_element_set_state (src, GST_STATE_NULL);
594 
595   /* make sure the buffer list was pushed out as list! */
596   fail_unless (chainlist_called);
597 
598   /* can be NUM_BUFFERS or NUM_BUFFERS + 1 depending on whether last item
599    * was buffer list or not */
600   fail_unless (expect_offset >= NUM_BUFFERS);
601 
602   gst_check_teardown_sink_pad (src);
603 
604   gst_object_unref (src);
605 }
606 
607 GST_END_TEST;
608 
609 static GstPadProbeReturn
appsrc_pad_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)610 appsrc_pad_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
611 {
612   GList **expected = (GList **) (user_data);
613   GList *next;
614   GstEvent *exp;
615   GstBuffer *exp_buf;
616 
617   if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
618     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
619     GST_DEBUG ("Got event %s", GST_EVENT_TYPE_NAME (ev));
620     switch (GST_EVENT_TYPE (ev)) {
621       case GST_EVENT_SEGMENT:
622       {
623         fail_if (*expected == NULL,
624             "appsrc pushed a SEGMENT event but we didn't expect any");
625         next = (*expected)->next;
626         fail_unless (GST_IS_EVENT ((*expected)->data),
627             "appsrc pushed a SEGMENT event but we expected any others");
628         exp = GST_EVENT ((*expected)->data);
629         fail_unless (GST_EVENT_TYPE (ev) == GST_EVENT_TYPE (exp),
630             "Got event of type %s but expected event was %s",
631             GST_EVENT_TYPE_NAME (ev), GST_EVENT_TYPE_NAME (exp));
632 
633         {
634           const GstSegment *recvseg, *expectseg;
635 
636           /* Compare segment values */
637           gst_event_parse_segment (ev, &recvseg);
638           gst_event_parse_segment (exp, &expectseg);
639 
640           fail_unless_equals_int (recvseg->format, expectseg->format);
641           fail_unless_equals_uint64 (recvseg->offset, expectseg->offset);
642           fail_unless_equals_uint64 (recvseg->start, expectseg->start);
643           fail_unless_equals_uint64 (recvseg->stop, expectseg->stop);
644           fail_unless_equals_uint64 (recvseg->time, expectseg->time);
645         }
646 
647         gst_event_unref (exp);
648         g_list_free1 (*expected);
649         *expected = next;
650       }
651         break;
652       case GST_EVENT_EOS:
653         fail_if (*expected == NULL,
654             "appsrc pushed a EOS event but we didn't expect any");
655         next = (*expected)->next;
656         fail_unless (GST_IS_EVENT ((*expected)->data),
657             "appsrc pushed a EOS event but we expected any others");
658         exp = GST_EVENT ((*expected)->data);
659         fail_unless (GST_EVENT_TYPE (ev) == GST_EVENT_TYPE (exp),
660             "Got event of type %s but expected event was %s",
661             GST_EVENT_TYPE_NAME (ev), GST_EVENT_TYPE_NAME (exp));
662 
663         gst_event_unref (exp);
664         g_list_free1 (*expected);
665         *expected = next;
666         break;
667       case GST_EVENT_CAPS:
668       {
669         GstCaps *caps;
670         fail_if (*expected == NULL,
671             "appsrc pushed a CAPS event but we didn't expect any");
672         next = (*expected)->next;
673         fail_unless (GST_IS_EVENT ((*expected)->data),
674             "appsrc pushed a CAPS event but we expected any others");
675         exp = GST_EVENT ((*expected)->data);
676         fail_unless (GST_EVENT_TYPE (ev) == GST_EVENT_TYPE (exp),
677             "Got event of type %s but expected event was %s",
678             GST_EVENT_TYPE_NAME (ev), GST_EVENT_TYPE_NAME (exp));
679         gst_event_parse_caps (ev, &caps);
680         GST_DEBUG ("caps set to  : %" GST_PTR_FORMAT, caps);
681 
682         gst_event_unref (exp);
683         g_list_free1 (*expected);
684         *expected = next;
685         break;
686       }
687 
688       default:
689         break;
690     }
691   } else if (GST_IS_BUFFER (GST_PAD_PROBE_INFO_DATA (info))) {
692     GstBuffer *recvbuf = GST_PAD_PROBE_INFO_BUFFER (info);
693     GST_DEBUG ("Got buffer");
694     fail_if (*expected == NULL,
695         "appsrc pushed a buffer but we didn't expect any");
696     next = (*expected)->next;
697     fail_unless (GST_IS_BUFFER ((*expected)->data),
698         "appsrc pushed a buffer but we expected that it's not a event");
699 
700     exp_buf = GST_BUFFER ((*expected)->data);
701     fail_unless_equals_uint64 (GST_BUFFER_PTS (recvbuf),
702         GST_BUFFER_PTS (exp_buf));
703     fail_unless_equals_uint64 (GST_BUFFER_DTS (recvbuf),
704         GST_BUFFER_DTS (exp_buf));
705     fail_unless_equals_uint64 (GST_BUFFER_DURATION (recvbuf),
706         GST_BUFFER_DURATION (exp_buf));
707 
708     g_list_free1 (*expected);
709     *expected = next;
710   }
711 
712   return GST_PAD_PROBE_OK;
713 }
714 
715 typedef struct
716 {
717   GMutex lock;
718   GCond cond;
719 
720   GstClockTime expected_last_pts;
721   guint last_buf_count;
722 } SegmentTestData;
723 
724 static void
custom_segment_handoff_cb(GstElement * sink,GstBuffer * buf,GstPad * pad,gpointer * user_data)725 custom_segment_handoff_cb (GstElement * sink, GstBuffer * buf, GstPad * pad,
726     gpointer * user_data)
727 {
728   SegmentTestData *data = (SegmentTestData *) user_data;
729 
730   if (GST_BUFFER_PTS (buf) == data->expected_last_pts) {
731     g_mutex_lock (&data->lock);
732     data->last_buf_count++;
733     g_cond_signal (&data->cond);
734     g_mutex_unlock (&data->lock);
735   }
736 }
737 
738 /* Assuming application driven streaming with multiple period.
739  * application provides custom segment per each period */
GST_START_TEST(test_appsrc_period_with_custom_segment)740 GST_START_TEST (test_appsrc_period_with_custom_segment)
741 {
742   GstElement *pipe, *src, *sink;
743   GstMessage *msg;
744   gint i, j, period;
745   GstAppSrcCallbacks cb = { 0 };
746   GstAppStreamType modes[] = { GST_APP_STREAM_TYPE_STREAM,
747     GST_APP_STREAM_TYPE_SEEKABLE
748   };
749   GstSegment segment;
750   GstSample *sample;
751   GstBuffer *buffer;
752   GstClockTime period_duration = 5 * GST_SECOND;
753   GstEvent *event;
754   gulong probe_id;
755   GstPad *pad;
756   GList *expected = NULL;
757   SegmentTestData test_data;
758 
759   g_mutex_init (&test_data.lock);
760   g_cond_init (&test_data.cond);
761   test_data.last_buf_count = 0;
762   test_data.expected_last_pts = 5 * GST_SECOND;
763 
764   for (i = 0; i < G_N_ELEMENTS (modes); i++) {
765     /* mode 0: stream-type == GST_APP_STREAM_TYPE_STREAM
766      * mode 1: stream-type == GST_APP_STREAM_TYPE_SEEKABLE */
767 
768     GST_INFO ("checking mode %d", modes[i]);
769 
770     pipe = gst_pipeline_new ("pipeline");
771     src = gst_element_factory_make ("appsrc", NULL);
772     sink = gst_element_factory_make ("fakesink", NULL);
773     gst_bin_add_many (GST_BIN (pipe), src, sink, NULL);
774     fail_unless (gst_element_link (src, sink) == TRUE);
775     pad = gst_element_get_static_pad (sink, "sink");
776 
777     probe_id = gst_pad_add_probe (pad,
778         GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
779         (GstPadProbeCallback) appsrc_pad_probe, &expected, NULL);
780 
781     g_object_set (G_OBJECT (src), "stream-type", modes[i], "format",
782         GST_FORMAT_TIME, "handle-segment-change", TRUE, NULL);
783 
784     if (modes[i] != GST_APP_STREAM_TYPE_STREAM) {
785       cb.seek_data = seek_cb;
786       gst_app_src_set_callbacks (GST_APP_SRC (src), &cb, NULL, NULL);
787 
788       test_data.last_buf_count = 0;
789 
790       g_object_set (sink, "signal-handoffs", TRUE, NULL);
791       g_signal_connect (sink, "handoff",
792           G_CALLBACK (custom_segment_handoff_cb), &test_data);
793     }
794 
795     ASSERT_SET_STATE (pipe, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
796 
797     /* 2 periods exits */
798     for (period = 0; period < 2; period++) {
799       /* Total presentation timeline is form 0 sec to 10 sec
800        * - Each period's first PTS is 1 sec and last PTS is 5 sec
801        * - First period has presentation timeline with 0 ~ 5
802        * - Last period has presentation timeline with 5 ~ 10
803        */
804 
805       /* PREPARE SEGMENT */
806       gst_segment_init (&segment, GST_FORMAT_TIME);
807       segment.start = segment.position = GST_SECOND;
808       segment.time = period * period_duration;
809       segment.base = period * period_duration;
810 
811       /* PREPARE BUFFER */
812       buffer = gst_buffer_new_and_alloc (4);
813       GST_BUFFER_DTS (buffer) = GST_BUFFER_PTS (buffer) = GST_SECOND;
814       GST_BUFFER_DURATION (buffer) = GST_SECOND;
815 
816       /* PREPARE SAMPLE */
817       sample = gst_sample_new (buffer, NULL, &segment, NULL);
818 
819       expected = g_list_append (expected, gst_event_new_segment (&segment));
820       expected = g_list_append (expected, buffer);
821 
822       /* 1st sample includes buffer and segment */
823       fail_unless (gst_app_src_push_sample (GST_APP_SRC (src), sample)
824           == GST_FLOW_OK);
825 
826       /* CLEAN UP */
827       gst_buffer_unref (buffer);
828       gst_sample_unref (sample);
829 
830       /* Push the left buffers in the current period */
831       for (j = 2; j <= 5; j++) {
832         buffer = gst_buffer_new_and_alloc (4);
833         GST_BUFFER_DTS (buffer) = GST_BUFFER_PTS (buffer) = j * GST_SECOND;
834         GST_BUFFER_DURATION (buffer) = GST_SECOND;
835         expected = g_list_append (expected, buffer);
836         fail_unless (gst_app_src_push_buffer (GST_APP_SRC (src), buffer)
837             == GST_FLOW_OK);
838       }
839     }
840 
841     if (modes[i] != GST_APP_STREAM_TYPE_STREAM) {
842       /* Client request seek to 7 sec position (which belongs to 2nd period)
843        * Application must provides corresponding buffer (of 2nd period) with
844        * new custom segment */
845 
846       GstClockTime requested_pos = 7 * GST_SECOND;
847 
848       /* Wait all buffers of two periods to be consumed */
849       g_mutex_lock (&test_data.lock);
850       while (test_data.last_buf_count != 2)
851         g_cond_wait (&test_data.cond, &test_data.lock);
852       g_mutex_unlock (&test_data.lock);
853 
854       GST_DEBUG ("Seek to %" GST_TIME_FORMAT, GST_TIME_ARGS (requested_pos));
855       event = gst_event_new_seek (1.0, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
856           GST_SEEK_TYPE_SET, requested_pos, GST_SEEK_TYPE_NONE, -1);
857       fail_unless (gst_element_send_event (pipe, event) == TRUE);
858 
859       /* PREPARE SEGMENT */
860       gst_segment_init (&segment, GST_FORMAT_TIME);
861       segment.start = segment.position = 3 * GST_SECOND;
862       segment.time = requested_pos;
863 
864       /* PREPARE BUFFER */
865       buffer = gst_buffer_new_and_alloc (4);
866       GST_BUFFER_DTS (buffer) = GST_BUFFER_PTS (buffer) = 3 * GST_SECOND;
867 
868       /* PREPARE SAMPLE */
869       sample = gst_sample_new (buffer, NULL, &segment, NULL);
870 
871       expected = g_list_append (expected, gst_event_new_segment (&segment));
872       expected = g_list_append (expected, buffer);
873 
874       /* 1st sample includes buffer and segment */
875       fail_unless (gst_app_src_push_sample (GST_APP_SRC (src), sample)
876           == GST_FLOW_OK);
877 
878       /* CLEAN UP */
879       gst_buffer_unref (buffer);
880       gst_sample_unref (sample);
881 
882       /* Push the left buffers in the current period */
883       for (j = 4; j <= 5; j++) {
884         buffer = gst_buffer_new_and_alloc (4);
885         GST_BUFFER_DTS (buffer) = GST_BUFFER_PTS (buffer) = j * GST_SECOND;
886         GST_BUFFER_DURATION (buffer) = GST_SECOND;
887         expected = g_list_append (expected, buffer);
888         fail_unless (gst_app_src_push_buffer (GST_APP_SRC (src), buffer)
889             == GST_FLOW_OK);
890       }
891     }
892 
893     expected = g_list_append (expected, gst_event_new_eos ());
894     fail_unless (gst_app_src_end_of_stream (GST_APP_SRC (src)) == GST_FLOW_OK);
895 
896     msg =
897         gst_bus_timed_pop_filtered (GST_ELEMENT_BUS (pipe), -1,
898         GST_MESSAGE_EOS | GST_MESSAGE_ERROR);
899     fail_unless (msg);
900     fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS);
901     gst_message_unref (msg);
902 
903     gst_pad_remove_probe (pad, probe_id);
904     gst_object_unref (pad);
905 
906     ASSERT_SET_STATE (pipe, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
907     gst_object_unref (pipe);
908     fail_if (expected != NULL);
909   }
910 
911   g_mutex_clear (&test_data.lock);
912   g_cond_clear (&test_data.cond);
913 }
914 
915 GST_END_TEST;
916 
GST_START_TEST(test_appsrc_custom_segment_twice)917 GST_START_TEST (test_appsrc_custom_segment_twice)
918 {
919   GstElement *pipe, *src, *sink;
920   GstMessage *msg;
921   gint i, tc;
922   GstAppSrcCallbacks cb = { 0 };
923   GstAppStreamType modes[] = { GST_APP_STREAM_TYPE_STREAM,
924     GST_APP_STREAM_TYPE_SEEKABLE
925   };
926   GstSample *sample;
927   gulong probe_id;
928   GstPad *pad;
929   GList *expected = NULL;
930   GstSegment segment;
931   GstBuffer *buffer;
932 
933   for (tc = 0; tc < 4; tc++) {
934     /* Case 0: Push segment1 without buffer,
935      * then push segment1 with buffer again.
936      * Expected behaviour is that pushing segment only once to downstream */
937 
938     /* Case 1: Push segment1 with buffer,
939      * then push segment1 with buffer again.
940      * Expected behaviour is that pushing segment only once to downstream */
941 
942     /* Case 2: Push segment1 without buffer,
943      * then push segment2 with buffer.
944      * Expected behaviour is that pushing only segment2 with buffer
945      * to downstream */
946 
947     /* Case 3: Push segment1 with buffer,
948      * then push segment2 with buffer.
949      * Expected behaviour is that pushing segment1 with buffer,
950      * and then segment2 with buffer */
951 
952     GST_INFO ("Test Case #%d", tc);
953 
954     for (i = 0; i < G_N_ELEMENTS (modes); i++) {
955       GST_INFO ("checking mode %d", modes[i]);
956 
957       pipe = gst_pipeline_new ("pipeline");
958       src = gst_element_factory_make ("appsrc", NULL);
959       sink = gst_element_factory_make ("fakesink", NULL);
960       gst_bin_add_many (GST_BIN (pipe), src, sink, NULL);
961       fail_unless (gst_element_link (src, sink));
962 
963       pad = gst_element_get_static_pad (sink, "sink");
964 
965       probe_id = gst_pad_add_probe (pad,
966           GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
967           (GstPadProbeCallback) appsrc_pad_probe, &expected, NULL);
968 
969       g_object_set (G_OBJECT (src), "stream-type", modes[i], "format",
970           GST_FORMAT_TIME, "handle-segment-change", TRUE, NULL);
971 
972       if (modes[i] != GST_APP_STREAM_TYPE_STREAM) {
973         cb.seek_data = seek_cb;
974         gst_app_src_set_callbacks (GST_APP_SRC (src), &cb, NULL, NULL);
975       }
976 
977       ASSERT_SET_STATE (pipe, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
978 
979       GST_DEBUG ("Prepare/Push the first sample");
980       /* PREPARE SEGMENT */
981       gst_segment_init (&segment, GST_FORMAT_TIME);
982       segment.start = segment.position = segment.time = GST_SECOND;
983 
984       /* PREPARE BUFFER */
985       buffer = gst_buffer_new_and_alloc (4);
986       GST_BUFFER_DTS (buffer) = GST_BUFFER_PTS (buffer) = GST_SECOND;
987       GST_BUFFER_DURATION (buffer) = GST_SECOND;
988 
989       /* PREPARE FIRST SAMPLE */
990       if (tc == 0) {
991         /* Test Case 0: Push a sample without buffer */
992         sample = gst_sample_new (NULL, NULL, &segment, NULL);
993         expected = g_list_append (expected, gst_event_new_segment (&segment));
994       } else if (tc == 2) {
995         /* Test Case 2: Push a sample without buffer.
996          * We don't expect this segment will be used,
997          * because the updated next sample will be actually used */
998         sample = gst_sample_new (NULL, NULL, &segment, NULL);
999       } else {
1000         sample = gst_sample_new (buffer, NULL, &segment, NULL);
1001         expected = g_list_append (expected, gst_event_new_segment (&segment));
1002         expected = g_list_append (expected, buffer);
1003       }
1004       /* PUSH THE FIRST SAMPLE */
1005       fail_unless (gst_app_src_push_sample (GST_APP_SRC (src), sample)
1006           == GST_FLOW_OK);
1007 
1008       /* CLEAN UP */
1009       gst_buffer_unref (buffer);
1010       gst_sample_unref (sample);
1011 
1012       GST_DEBUG ("Prepare/Push the last sample");
1013       /* PREPARE SEGMENT */
1014       gst_segment_init (&segment, GST_FORMAT_TIME);
1015       segment.start = segment.position = segment.time =
1016           (tc == 0 || tc == 1) ? 1 * GST_SECOND : 2 * GST_SECOND;
1017 
1018       /* PREPARE BUFFER */
1019       buffer = gst_buffer_new_and_alloc (4);
1020       GST_BUFFER_DTS (buffer) = GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
1021       GST_BUFFER_DURATION (buffer) = GST_SECOND;
1022 
1023       /* PREPARE THE LAST SAMPLE */
1024       if (tc == 0 || tc == 1) {
1025         /* Test Case 0 or 1: Push a sample with duplicated segment */
1026         sample = gst_sample_new (buffer, NULL, &segment, NULL);
1027         expected = g_list_append (expected, buffer);
1028       } else {
1029         sample = gst_sample_new (buffer, NULL, &segment, NULL);
1030         expected = g_list_append (expected, gst_event_new_segment (&segment));
1031         expected = g_list_append (expected, buffer);
1032       }
1033 
1034       fail_unless (gst_app_src_push_sample (GST_APP_SRC (src), sample)
1035           == GST_FLOW_OK);
1036 
1037       /* CLEAN UP */
1038       gst_buffer_unref (buffer);
1039       gst_sample_unref (sample);
1040 
1041       expected = g_list_append (expected, gst_event_new_eos ());
1042       fail_unless (gst_app_src_end_of_stream (GST_APP_SRC (src)) ==
1043           GST_FLOW_OK);
1044 
1045       msg =
1046           gst_bus_timed_pop_filtered (GST_ELEMENT_BUS (pipe), -1,
1047           GST_MESSAGE_EOS | GST_MESSAGE_ERROR);
1048       fail_unless (msg);
1049       fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS);
1050       gst_message_unref (msg);
1051 
1052       gst_pad_remove_probe (pad, probe_id);
1053       gst_object_unref (pad);
1054 
1055       ASSERT_SET_STATE (pipe, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
1056       gst_object_unref (pipe);
1057       fail_if (expected != NULL);
1058     }
1059   }
1060 }
1061 
1062 GST_END_TEST;
1063 
1064 static GstPadProbeReturn
block_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)1065 block_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1066 {
1067   return GST_PAD_PROBE_OK;
1068 }
1069 
GST_START_TEST(test_appsrc_limits)1070 GST_START_TEST (test_appsrc_limits)
1071 {
1072   GstHarness *h;
1073   GstPad *srcpad;
1074   GstBuffer *buffer;
1075   gulong probe_id;
1076   guint64 current_level;
1077 
1078   /* Test if the bytes limit works correctly with both leaky types */
1079   h = gst_harness_new ("appsrc");
1080   g_object_set (h->element,
1081       "format", GST_FORMAT_TIME,
1082       "max-bytes", G_GUINT64_CONSTANT (200),
1083       "max-time", G_GUINT64_CONSTANT (0),
1084       "max-buffers", G_GUINT64_CONSTANT (0), "leaky-type", 1 /* upstream */ ,
1085       NULL);
1086   gst_harness_play (h);
1087   srcpad = gst_element_get_static_pad (h->element, "src");
1088 
1089   /* Pad probe to ensure that the source pad task is blocked and we can
1090    * deterministically test the behaviour of the appsrc queue */
1091   probe_id =
1092       gst_pad_add_probe (srcpad,
1093       GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST |
1094       GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL);
1095 
1096   buffer = gst_buffer_new_and_alloc (100);
1097   GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
1098   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1099 
1100   /* wait until the appsrc is blocked downstream */
1101   while (!gst_pad_is_blocking (srcpad))
1102     g_thread_yield ();
1103 
1104   buffer = gst_buffer_new_and_alloc (100);
1105   GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
1106   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1107   buffer = gst_buffer_new_and_alloc (100);
1108   GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
1109   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1110 
1111   /* The first buffer is not queued anymore but inside the pad probe */
1112   g_object_get (h->element, "current-level-bytes", &current_level, NULL);
1113   fail_unless_equals_uint64 (current_level, 200);
1114   g_object_get (h->element, "current-level-buffers", &current_level, NULL);
1115   fail_unless_equals_uint64 (current_level, 2);
1116   g_object_get (h->element, "current-level-time", &current_level, NULL);
1117   fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);
1118 
1119   buffer = gst_buffer_new_and_alloc (100);
1120   GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
1121   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1122 
1123   /* The new buffer was dropped now, otherwise we would have 2 seconds queued */
1124   g_object_get (h->element, "current-level-bytes", &current_level, NULL);
1125   fail_unless_equals_uint64 (current_level, 200);
1126   g_object_get (h->element, "current-level-buffers", &current_level, NULL);
1127   fail_unless_equals_uint64 (current_level, 2);
1128   g_object_get (h->element, "current-level-time", &current_level, NULL);
1129   fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);
1130 
1131   g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL);
1132 
1133   buffer = gst_buffer_new_and_alloc (100);
1134   GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
1135   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1136 
1137   /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
1138   g_object_get (h->element, "current-level-bytes", &current_level, NULL);
1139   fail_unless_equals_uint64 (current_level, 200);
1140   g_object_get (h->element, "current-level-buffers", &current_level, NULL);
1141   fail_unless_equals_uint64 (current_level, 2);
1142   /* 3s because the last dequeued buffer had an end timestamp of 0s, the
1143    * buffer with timestamp 1s was dropped and the newly queued buffer has a
1144    * start timestamp of 4s */
1145   g_object_get (h->element, "current-level-time", &current_level, NULL);
1146   fail_unless_equals_uint64 (current_level, 3 * GST_SECOND);
1147 
1148   /* Remove probe and check if we get all buffers we're supposed to get */
1149   gst_pad_remove_probe (srcpad, probe_id);
1150 
1151   buffer = gst_harness_pull (h);
1152   fail_unless (buffer);
1153   fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND);
1154   gst_buffer_unref (buffer);
1155 
1156   buffer = gst_harness_pull (h);
1157   fail_unless (buffer);
1158   fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND);
1159   /* DISCONT because the buffer with 1s was dropped */
1160   fail_unless (GST_BUFFER_IS_DISCONT (buffer));
1161   gst_buffer_unref (buffer);
1162 
1163   buffer = gst_harness_pull (h);
1164   fail_unless (buffer);
1165   fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND);
1166   /* DISCONT because the first buffer with 4s was dropped */
1167   fail_unless (GST_BUFFER_IS_DISCONT (buffer));
1168   gst_buffer_unref (buffer);
1169 
1170   gst_object_unref (srcpad);
1171   gst_harness_teardown (h);
1172 
1173   /* Test if the buffers limit works correctly with both leaky types */
1174   h = gst_harness_new ("appsrc");
1175   g_object_set (h->element,
1176       "format", GST_FORMAT_TIME,
1177       "max-bytes", G_GUINT64_CONSTANT (0),
1178       "max-time", G_GUINT64_CONSTANT (0),
1179       "max-buffers", G_GUINT64_CONSTANT (2), "leaky-type", 1 /* upstream */ ,
1180       NULL);
1181   gst_harness_play (h);
1182   srcpad = gst_element_get_static_pad (h->element, "src");
1183 
1184   /* Pad probe to ensure that the source pad task is blocked and we can
1185    * deterministically test the behaviour of the appsrc queue */
1186   probe_id =
1187       gst_pad_add_probe (srcpad,
1188       GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST |
1189       GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL);
1190 
1191   buffer = gst_buffer_new_and_alloc (100);
1192   GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
1193   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1194 
1195   /* wait until the appsrc is blocked downstream */
1196   while (!gst_pad_is_blocking (srcpad))
1197     g_thread_yield ();
1198 
1199   buffer = gst_buffer_new_and_alloc (100);
1200   GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
1201   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1202   buffer = gst_buffer_new_and_alloc (100);
1203   GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
1204   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1205 
1206   /* The first buffer is not queued anymore but inside the pad probe */
1207   g_object_get (h->element, "current-level-bytes", &current_level, NULL);
1208   fail_unless_equals_uint64 (current_level, 200);
1209   g_object_get (h->element, "current-level-buffers", &current_level, NULL);
1210   fail_unless_equals_uint64 (current_level, 2);
1211   g_object_get (h->element, "current-level-time", &current_level, NULL);
1212   fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);
1213 
1214   buffer = gst_buffer_new_and_alloc (100);
1215   GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
1216   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1217 
1218   /* The new buffer was dropped now, otherwise we would have 2 seconds queued */
1219   g_object_get (h->element, "current-level-bytes", &current_level, NULL);
1220   fail_unless_equals_uint64 (current_level, 200);
1221   g_object_get (h->element, "current-level-buffers", &current_level, NULL);
1222   fail_unless_equals_uint64 (current_level, 2);
1223   g_object_get (h->element, "current-level-time", &current_level, NULL);
1224   fail_unless_equals_uint64 (current_level, 1 * GST_SECOND);
1225 
1226   g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL);
1227 
1228   buffer = gst_buffer_new_and_alloc (100);
1229   GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
1230   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1231 
1232   /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
1233   g_object_get (h->element, "current-level-bytes", &current_level, NULL);
1234   fail_unless_equals_uint64 (current_level, 200);
1235   g_object_get (h->element, "current-level-buffers", &current_level, NULL);
1236   fail_unless_equals_uint64 (current_level, 2);
1237   /* 3s because the last dequeued buffer had an end timestamp of 0s, the
1238    * buffer with timestamp 1s was dropped and the newly queued buffer has a
1239    * start timestamp of 4s */
1240   g_object_get (h->element, "current-level-time", &current_level, NULL);
1241   fail_unless_equals_uint64 (current_level, 3 * GST_SECOND);
1242 
1243   /* Remove probe and check if we get all buffers we're supposed to get */
1244   gst_pad_remove_probe (srcpad, probe_id);
1245 
1246   buffer = gst_harness_pull (h);
1247   fail_unless (buffer);
1248   fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND);
1249   gst_buffer_unref (buffer);
1250 
1251   buffer = gst_harness_pull (h);
1252   fail_unless (buffer);
1253   fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND);
1254   /* DISCONT because the buffer with 1s was dropped */
1255   fail_unless (GST_BUFFER_IS_DISCONT (buffer));
1256   gst_buffer_unref (buffer);
1257 
1258   buffer = gst_harness_pull (h);
1259   fail_unless (buffer);
1260   fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND);
1261   /* DISCONT because the first buffer with 4s was dropped */
1262   fail_unless (GST_BUFFER_IS_DISCONT (buffer));
1263   gst_buffer_unref (buffer);
1264 
1265   gst_object_unref (srcpad);
1266   gst_harness_teardown (h);
1267 
1268   /* Test if the time limit works correctly with both leaky types */
1269   h = gst_harness_new ("appsrc");
1270   g_object_set (h->element,
1271       "format", GST_FORMAT_TIME,
1272       "max-bytes", G_GUINT64_CONSTANT (0),
1273       "max-time", 2 * GST_SECOND,
1274       "max-buffers", G_GUINT64_CONSTANT (0), "leaky-type", 1 /* upstream */ ,
1275       NULL);
1276   gst_harness_play (h);
1277   srcpad = gst_element_get_static_pad (h->element, "src");
1278 
1279   /* Pad probe to ensure that the source pad task is blocked and we can
1280    * deterministically test the behaviour of the appsrc queue */
1281   probe_id =
1282       gst_pad_add_probe (srcpad,
1283       GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST |
1284       GST_PAD_PROBE_TYPE_BLOCKING, block_probe, NULL, NULL);
1285 
1286   buffer = gst_buffer_new_and_alloc (100);
1287   GST_BUFFER_PTS (buffer) = 0 * GST_SECOND;
1288   GST_BUFFER_DURATION (buffer) = GST_SECOND;
1289   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1290 
1291   /* wait until the appsrc is blocked downstream */
1292   while (!gst_pad_is_blocking (srcpad))
1293     g_thread_yield ();
1294 
1295   buffer = gst_buffer_new_and_alloc (100);
1296   GST_BUFFER_PTS (buffer) = 1 * GST_SECOND;
1297   GST_BUFFER_DURATION (buffer) = GST_SECOND;
1298   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1299   buffer = gst_buffer_new_and_alloc (100);
1300   GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
1301   GST_BUFFER_DURATION (buffer) = GST_SECOND;
1302   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1303 
1304   /* The first buffer is not queued anymore but inside the pad probe */
1305   g_object_get (h->element, "current-level-bytes", &current_level, NULL);
1306   fail_unless_equals_uint64 (current_level, 200);
1307   g_object_get (h->element, "current-level-buffers", &current_level, NULL);
1308   fail_unless_equals_uint64 (current_level, 2);
1309   g_object_get (h->element, "current-level-time", &current_level, NULL);
1310   fail_unless_equals_uint64 (current_level, 2 * GST_SECOND);
1311 
1312   buffer = gst_buffer_new_and_alloc (100);
1313   GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
1314   GST_BUFFER_DURATION (buffer) = GST_SECOND;
1315   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1316 
1317   /* The new buffer was dropped now, otherwise we would have more than 2 seconds queued */
1318   g_object_get (h->element, "current-level-bytes", &current_level, NULL);
1319   fail_unless_equals_uint64 (current_level, 200);
1320   g_object_get (h->element, "current-level-buffers", &current_level, NULL);
1321   fail_unless_equals_uint64 (current_level, 2);
1322   g_object_get (h->element, "current-level-time", &current_level, NULL);
1323   fail_unless_equals_uint64 (current_level, 2 * GST_SECOND);
1324 
1325   g_object_set (h->element, "leaky-type", 2 /* downstream */ , NULL);
1326 
1327   buffer = gst_buffer_new_and_alloc (100);
1328   GST_BUFFER_PTS (buffer) = 4 * GST_SECOND;
1329   GST_BUFFER_DURATION (buffer) = GST_SECOND;
1330   gst_app_src_push_buffer (GST_APP_SRC (h->element), buffer);
1331 
1332   /* The oldest buffer was dropped now, otherwise we would have only 1 second queued */
1333   g_object_get (h->element, "current-level-bytes", &current_level, NULL);
1334   fail_unless_equals_uint64 (current_level, 200);
1335   g_object_get (h->element, "current-level-buffers", &current_level, NULL);
1336   fail_unless_equals_uint64 (current_level, 2);
1337   /* 3s because the last dequeued buffer had an end timestamp of 0s, the
1338    * buffer with timestamp 1s was dropped and the newly queued buffer has a
1339    * start timestamp of 4s */
1340   g_object_get (h->element, "current-level-time", &current_level, NULL);
1341   fail_unless_equals_uint64 (current_level, 3 * GST_SECOND);
1342 
1343   /* Remove probe and check if we get all buffers we're supposed to get */
1344   gst_pad_remove_probe (srcpad, probe_id);
1345 
1346   buffer = gst_harness_pull (h);
1347   fail_unless (buffer);
1348   fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 0 * GST_SECOND);
1349   gst_buffer_unref (buffer);
1350 
1351   buffer = gst_harness_pull (h);
1352   fail_unless (buffer);
1353   fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 2 * GST_SECOND);
1354   /* DISCONT because the buffer with 1s was dropped */
1355   fail_unless (GST_BUFFER_IS_DISCONT (buffer));
1356   gst_buffer_unref (buffer);
1357 
1358   buffer = gst_harness_pull (h);
1359   fail_unless (buffer);
1360   fail_unless_equals_uint64 (GST_BUFFER_PTS (buffer), 4 * GST_SECOND);
1361   /* DISCONT because the first buffer with 4s was dropped */
1362   fail_unless (GST_BUFFER_IS_DISCONT (buffer));
1363   gst_buffer_unref (buffer);
1364 
1365   gst_object_unref (srcpad);
1366   gst_harness_teardown (h);
1367 }
1368 
1369 GST_END_TEST;
1370 
1371 static GstFlowReturn
send_event_chain_func(GstPad * pad,GstObject * parent,GstBuffer * buf)1372 send_event_chain_func (GstPad * pad, GstObject * parent, GstBuffer * buf)
1373 {
1374   GST_LOG ("  buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
1375 
1376   fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset);
1377   ++expect_offset;
1378   gst_buffer_unref (buf);
1379 
1380   if (expect_offset == 2) {
1381     /* test is done */
1382     g_mutex_lock (&check_mutex);
1383     done = TRUE;
1384     g_cond_signal (&check_cond);
1385     g_mutex_unlock (&check_mutex);
1386   }
1387 
1388   return GST_FLOW_OK;
1389 }
1390 
1391 static gboolean
send_event_event_func(GstPad * pad,GstObject * parent,GstEvent * event)1392 send_event_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
1393 {
1394   GST_LOG ("event %" GST_PTR_FORMAT, event);
1395   if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_DOWNSTREAM) {
1396     /* this event should arrive after the first buffer */
1397     fail_unless_equals_int (expect_offset, 1);
1398   }
1399   gst_event_unref (event);
1400   return TRUE;
1401 }
1402 
1403 /* check that custom downstream events are properly serialized with buffers */
GST_START_TEST(test_appsrc_send_custom_event)1404 GST_START_TEST (test_appsrc_send_custom_event)
1405 {
1406   GstElement *src;
1407   GstBuffer *buf;
1408 
1409   src = setup_appsrc ();
1410 
1411   ASSERT_SET_STATE (src, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
1412 
1413   expect_offset = 0;
1414   gst_pad_set_chain_function (mysinkpad, send_event_chain_func);
1415   gst_pad_set_event_function (mysinkpad, send_event_event_func);
1416 
1417   /* send a buffer, a custom event and a second buffer */
1418   buf = gst_buffer_new_and_alloc (1);
1419   GST_BUFFER_OFFSET (buf) = 0;
1420   fail_unless (gst_app_src_push_buffer (GST_APP_SRC_CAST (src),
1421           buf) == GST_FLOW_OK);
1422 
1423   gst_element_send_event (src,
1424       gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1425           gst_structure_new ("custom", NULL, NULL)));
1426 
1427   buf = gst_buffer_new_and_alloc (2);
1428   GST_BUFFER_OFFSET (buf) = 1;
1429   fail_unless (gst_app_src_push_buffer (GST_APP_SRC_CAST (src),
1430           buf) == GST_FLOW_OK);
1431 
1432   g_mutex_lock (&check_mutex);
1433   while (!done)
1434     g_cond_wait (&check_cond, &check_mutex);
1435   g_mutex_unlock (&check_mutex);
1436 
1437   ASSERT_SET_STATE (src, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
1438   cleanup_appsrc (src);
1439 }
1440 
1441 GST_END_TEST;
1442 
1443 static Suite *
appsrc_suite(void)1444 appsrc_suite (void)
1445 {
1446   Suite *s = suite_create ("appsrc");
1447   TCase *tc_chain = tcase_create ("general");
1448 
1449   tcase_add_test (tc_chain, test_appsrc_non_null_caps);
1450   tcase_add_test (tc_chain, test_appsrc_set_caps_twice);
1451   tcase_add_test (tc_chain, test_appsrc_caps_in_push_modes);
1452   tcase_add_test (tc_chain, test_appsrc_blocked_on_caps);
1453   tcase_add_test (tc_chain, test_appsrc_push_buffer_list);
1454   tcase_add_test (tc_chain, test_appsrc_period_with_custom_segment);
1455   tcase_add_test (tc_chain, test_appsrc_custom_segment_twice);
1456   tcase_add_test (tc_chain, test_appsrc_limits);
1457   tcase_add_test (tc_chain, test_appsrc_send_custom_event);
1458 
1459   if (RUNNING_ON_VALGRIND)
1460     tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5);
1461   else
1462     tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 100);
1463 
1464   suite_add_tcase (s, tc_chain);
1465 
1466   return s;
1467 }
1468 
1469 GST_CHECK_MAIN (appsrc);
1470