• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /**
21  * SECTION:gstappsrc
22  * @title: GstAppSrc
23  * @short_description: Easy way for applications to inject buffers into a
24  *     pipeline
25  * @see_also: #GstBaseSrc, appsink
26  *
27  * The appsrc element can be used by applications to insert data into a
28  * GStreamer pipeline. Unlike most GStreamer elements, appsrc provides
29  * external API functions.
30  *
31  * appsrc can be used by linking with the libgstapp library to access the
32  * methods directly or by using the appsrc action signals.
33  *
34  * Before operating appsrc, the caps property must be set to fixed caps
35  * describing the format of the data that will be pushed with appsrc. An
36  * exception to this is when pushing buffers with unknown caps, in which case no
37  * caps should be set. This is typically true of file-like sources that push raw
38  * byte buffers. If you don't want to explicitly set the caps, you can use
39  * gst_app_src_push_sample. This method gets the caps associated with the
40  * sample and sets them on the appsrc replacing any previously set caps (if
41  * different from sample's caps).
42  *
43  * The main way of handing data to the appsrc element is by calling the
44  * gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
45  * This will put the buffer onto a queue from which appsrc will read from in its
46  * streaming thread. It is important to note that data transport will not happen
47  * from the thread that performed the push-buffer call.
48  *
49  * The "max-bytes", "max-buffers" and "max-time" properties control how much
50  * data can be queued in appsrc before appsrc considers the queue full. A
51  * filled internal queue will always signal the "enough-data" signal, which
52  * signals the application that it should stop pushing data into appsrc. The
53  * "block" property will cause appsrc to block the push-buffer method until
54  * free data becomes available again.
55  *
56  * When the internal queue is running out of data, the "need-data" signal is
57  * emitted, which signals the application that it should start pushing more data
58  * into appsrc.
59  *
60  * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
61  * "seek-data" signal when the "stream-mode" property is set to "seekable" or
62  * "random-access". The signal argument will contain the new desired position in
63  * the stream expressed in the unit set with the "format" property. After
64  * receiving the seek-data signal, the application should push-buffers from the
65  * new position.
66  *
67  * These signals allow the application to operate the appsrc in two different
68  * ways:
69  *
70  * The push mode, in which the application repeatedly calls the push-buffer/push-sample
71  * method with a new buffer/sample. Optionally, the queue size in the appsrc
72  * can be controlled with the enough-data and need-data signals by respectively
73  * stopping/starting the push-buffer/push-sample calls. This is a typical
74  * mode of operation for the stream-type "stream" and "seekable". Use this
75  * mode when implementing various network protocols or hardware devices.
76  *
77  * The pull mode, in which the need-data signal triggers the next push-buffer call.
78  * This mode is typically used in the "random-access" stream-type. Use this
79  * mode for file access or other randomly accessible sources. In this mode, a
80  * buffer of exactly the amount of bytes given by the need-data signal should be
81  * pushed into appsrc.
82  *
83  * In all modes, the size property on appsrc should contain the total stream
84  * size in bytes. Setting this property is mandatory in the random-access mode.
85  * For the stream and seekable modes, setting this property is optional but
86  * recommended.
87  *
88  * When the application has finished pushing data into appsrc, it should call
89  * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
90  * this call, no more buffers can be pushed into appsrc until a flushing seek
91  * occurs or the state of the appsrc has gone through READY.
92  */
93 
94 #ifdef HAVE_CONFIG_H
95 #include "config.h"
96 #endif
97 
98 #include <gst/gst.h>
99 #include <gst/base/base.h>
100 
101 #include <string.h>
102 
103 #include "gstappsrc.h"
104 
105 typedef enum
106 {
107   NOONE_WAITING = 0,
108   STREAM_WAITING = 1 << 0,      /* streaming thread is waiting for application thread */
109   APP_WAITING = 1 << 1,         /* application thread is waiting for streaming thread */
110 } GstAppSrcWaitStatus;
111 
112 typedef struct
113 {
114   GstAppSrcCallbacks callbacks;
115   gpointer user_data;
116   GDestroyNotify destroy_notify;
117   gint ref_count;
118 } Callbacks;
119 
120 static Callbacks *
callbacks_ref(Callbacks * callbacks)121 callbacks_ref (Callbacks * callbacks)
122 {
123   g_atomic_int_inc (&callbacks->ref_count);
124 
125   return callbacks;
126 }
127 
128 static void
callbacks_unref(Callbacks * callbacks)129 callbacks_unref (Callbacks * callbacks)
130 {
131   if (!g_atomic_int_dec_and_test (&callbacks->ref_count))
132     return;
133 
134   if (callbacks->destroy_notify)
135     callbacks->destroy_notify (callbacks->user_data);
136 
137   g_free (callbacks);
138 }
139 
140 
141 struct _GstAppSrcPrivate
142 {
143   GCond cond;
144   GMutex mutex;
145   GstQueueArray *queue;
146   GstAppSrcWaitStatus wait_status;
147 
148   GstCaps *last_caps;
149   GstCaps *current_caps;
150   /* last segment received on the input */
151   GstSegment last_segment;
152   /* currently configured segment for the output */
153   GstSegment current_segment;
154   /* queue up a segment event based on last_segment before
155    * the next buffer of buffer list */
156   gboolean pending_custom_segment;
157 
158   /* the next buffer that will be queued needs a discont flag
159    * because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */
160   gboolean need_discont_upstream;
161   /* the next buffer that will be dequeued needs a discont flag
162    * because the previous one was dropped - GST_APP_LEAKY_TYPE_DOWNSTREAM */
163   gboolean need_discont_downstream;
164 
165   gint64 size;
166   GstClockTime duration;
167   GstAppStreamType stream_type;
168   guint64 max_bytes, max_buffers, max_time;
169   GstFormat format;
170   gboolean block;
171   gchar *uri;
172 
173   gboolean flushing;
174   gboolean started;
175   gboolean is_eos;
176   guint64 queued_bytes, queued_buffers;
177   /* Used to calculate the current time level */
178   GstClockTime last_in_running_time, last_out_running_time;
179   /* Updated based on the above whenever they change */
180   GstClockTime queued_time;
181   guint64 offset;
182   GstAppStreamType current_type;
183 
184   guint64 min_latency;
185   guint64 max_latency;
186   gboolean emit_signals;
187   guint min_percent;
188   gboolean handle_segment_change;
189 
190   GstAppLeakyType leaky_type;
191 
192   Callbacks *callbacks;
193 };
194 
195 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
196 #define GST_CAT_DEFAULT app_src_debug
197 
198 enum
199 {
200   /* signals */
201   SIGNAL_NEED_DATA,
202   SIGNAL_ENOUGH_DATA,
203   SIGNAL_SEEK_DATA,
204 
205   /* actions */
206   SIGNAL_PUSH_BUFFER,
207   SIGNAL_END_OF_STREAM,
208   SIGNAL_PUSH_SAMPLE,
209   SIGNAL_PUSH_BUFFER_LIST,
210 
211   LAST_SIGNAL
212 };
213 
214 #define DEFAULT_PROP_SIZE          -1
215 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
216 #define DEFAULT_PROP_MAX_BYTES     200000
217 #define DEFAULT_PROP_MAX_BUFFERS   0
218 #define DEFAULT_PROP_MAX_TIME      (0 * GST_SECOND)
219 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
220 #define DEFAULT_PROP_BLOCK         FALSE
221 #define DEFAULT_PROP_IS_LIVE       FALSE
222 #define DEFAULT_PROP_MIN_LATENCY   -1
223 #define DEFAULT_PROP_MAX_LATENCY   -1
224 #define DEFAULT_PROP_EMIT_SIGNALS  TRUE
225 #define DEFAULT_PROP_MIN_PERCENT   0
226 #define DEFAULT_PROP_CURRENT_LEVEL_BYTES   0
227 #define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0
228 #define DEFAULT_PROP_CURRENT_LEVEL_TIME    0
229 #define DEFAULT_PROP_DURATION      GST_CLOCK_TIME_NONE
230 #define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE
231 #define DEFAULT_PROP_LEAKY_TYPE    GST_APP_LEAKY_TYPE_NONE
232 
233 enum
234 {
235   PROP_0,
236   PROP_CAPS,
237   PROP_SIZE,
238   PROP_STREAM_TYPE,
239   PROP_MAX_BYTES,
240   PROP_MAX_BUFFERS,
241   PROP_MAX_TIME,
242   PROP_FORMAT,
243   PROP_BLOCK,
244   PROP_IS_LIVE,
245   PROP_MIN_LATENCY,
246   PROP_MAX_LATENCY,
247   PROP_EMIT_SIGNALS,
248   PROP_MIN_PERCENT,
249   PROP_CURRENT_LEVEL_BYTES,
250   PROP_CURRENT_LEVEL_BUFFERS,
251   PROP_CURRENT_LEVEL_TIME,
252   PROP_DURATION,
253   PROP_HANDLE_SEGMENT_CHANGE,
254   PROP_LEAKY_TYPE,
255   PROP_LAST
256 };
257 
258 static GstStaticPadTemplate gst_app_src_template =
259 GST_STATIC_PAD_TEMPLATE ("src",
260     GST_PAD_SRC,
261     GST_PAD_ALWAYS,
262     GST_STATIC_CAPS_ANY);
263 
264 static void gst_app_src_uri_handler_init (gpointer g_iface,
265     gpointer iface_data);
266 
267 static void gst_app_src_dispose (GObject * object);
268 static void gst_app_src_finalize (GObject * object);
269 
270 static void gst_app_src_set_property (GObject * object, guint prop_id,
271     const GValue * value, GParamSpec * pspec);
272 static void gst_app_src_get_property (GObject * object, guint prop_id,
273     GValue * value, GParamSpec * pspec);
274 
275 static gboolean gst_app_src_send_event (GstElement * element, GstEvent * event);
276 
277 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
278     gboolean do_min, guint64 min, gboolean do_max, guint64 max);
279 
280 static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc);
281 static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc,
282     GstCaps * filter);
283 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset,
284     guint size, GstBuffer ** buf);
285 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
286 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
287 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
288 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
289 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
290 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
291 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
292 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
293 static gboolean gst_app_src_event (GstBaseSrc * src, GstEvent * event);
294 
295 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
296     GstBuffer * buffer);
297 static GstFlowReturn gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
298     GstBufferList * buffer_list);
299 static GstFlowReturn gst_app_src_push_sample_action (GstAppSrc * appsrc,
300     GstSample * sample);
301 
302 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
303 
304 #define gst_app_src_parent_class parent_class
305 G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
306     G_ADD_PRIVATE (GstAppSrc)
307     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));
308 
309 static void
gst_app_src_class_init(GstAppSrcClass * klass)310 gst_app_src_class_init (GstAppSrcClass * klass)
311 {
312   GObjectClass *gobject_class = (GObjectClass *) klass;
313   GstElementClass *element_class = (GstElementClass *) klass;
314   GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
315 
316   GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
317 
318   gobject_class->dispose = gst_app_src_dispose;
319   gobject_class->finalize = gst_app_src_finalize;
320 
321   gobject_class->set_property = gst_app_src_set_property;
322   gobject_class->get_property = gst_app_src_get_property;
323 
324   /**
325    * GstAppSrc:caps:
326    *
327    * The GstCaps that will negotiated downstream and will be put
328    * on outgoing buffers.
329    */
330   g_object_class_install_property (gobject_class, PROP_CAPS,
331       g_param_spec_boxed ("caps", "Caps",
332           "The allowed caps for the src pad", GST_TYPE_CAPS,
333           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
334   /**
335    * GstAppSrc:format:
336    *
337    * The format to use for segment events. When the source is producing
338    * timestamped buffers this property should be set to GST_FORMAT_TIME.
339    */
340   g_object_class_install_property (gobject_class, PROP_FORMAT,
341       g_param_spec_enum ("format", "Format",
342           "The format of the segment events and seek", GST_TYPE_FORMAT,
343           DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344   /**
345    * GstAppSrc:size:
346    *
347    * The total size in bytes of the data stream. If the total size is known, it
348    * is recommended to configure it with this property.
349    */
350   g_object_class_install_property (gobject_class, PROP_SIZE,
351       g_param_spec_int64 ("size", "Size",
352           "The size of the data stream in bytes (-1 if unknown)",
353           -1, G_MAXINT64, DEFAULT_PROP_SIZE,
354           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
355   /**
356    * GstAppSrc:stream-type:
357    *
358    * The type of stream that this source is producing.  For seekable streams the
359    * application should connect to the seek-data signal.
360    */
361   g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
362       g_param_spec_enum ("stream-type", "Stream Type",
363           "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
364           DEFAULT_PROP_STREAM_TYPE,
365           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
366   /**
367    * GstAppSrc:max-bytes:
368    *
369    * The maximum amount of bytes that can be queued internally.
370    * After the maximum amount of bytes are queued, appsrc will emit the
371    * "enough-data" signal.
372    */
373   g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
374       g_param_spec_uint64 ("max-bytes", "Max bytes",
375           "The maximum number of bytes to queue internally (0 = unlimited)",
376           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
377           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
378 
379   /**
380    * GstAppSrc:max-buffers:
381    *
382    * The maximum amount of buffers that can be queued internally.
383    * After the maximum amount of buffers are queued, appsrc will emit the
384    * "enough-data" signal.
385    *
386    * Since: 1.20
387    */
388   g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
389       g_param_spec_uint64 ("max-buffers", "Max buffers",
390           "The maximum number of buffers to queue internally (0 = unlimited)",
391           0, G_MAXUINT64, DEFAULT_PROP_MAX_BUFFERS,
392           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
393 
394   /**
395    * GstAppSrc:max-time:
396    *
397    * The maximum amount of time that can be queued internally.
398    * After the maximum amount of time are queued, appsrc will emit the
399    * "enough-data" signal.
400    *
401    * Since: 1.20
402    */
403   g_object_class_install_property (gobject_class, PROP_MAX_TIME,
404       g_param_spec_uint64 ("max-time", "Max time",
405           "The maximum amount of time to queue internally (0 = unlimited)",
406           0, G_MAXUINT64, DEFAULT_PROP_MAX_TIME,
407           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
408 
409   /**
410    * GstAppSrc:block:
411    *
412    * When max-bytes are queued and after the enough-data signal has been emitted,
413    * block any further push-buffer calls until the amount of queued bytes drops
414    * below the max-bytes limit.
415    */
416   g_object_class_install_property (gobject_class, PROP_BLOCK,
417       g_param_spec_boolean ("block", "Block",
418           "Block push-buffer when max-bytes are queued",
419           DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
420 
421   /**
422    * GstAppSrc:is-live:
423    *
424    * Instruct the source to behave like a live source. This includes that it
425    * will only push out buffers in the PLAYING state.
426    */
427   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
428       g_param_spec_boolean ("is-live", "Is Live",
429           "Whether to act as a live source",
430           DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
431   /**
432    * GstAppSrc:min-latency:
433    *
434    * The minimum latency of the source. A value of -1 will use the default
435    * latency calculations of #GstBaseSrc.
436    */
437   g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
438       g_param_spec_int64 ("min-latency", "Min Latency",
439           "The minimum latency (-1 = default)",
440           -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
441           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442   /**
443    * GstAppSrc::max-latency:
444    *
445    * The maximum latency of the source. A value of -1 means an unlimited amount
446    * of latency.
447    */
448   g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
449       g_param_spec_int64 ("max-latency", "Max Latency",
450           "The maximum latency (-1 = unlimited)",
451           -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
452           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
453 
454   /**
455    * GstAppSrc:emit-signals:
456    *
457    * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
458    * This option is by default enabled for backwards compatibility reasons but
459    * can disabled when needed because signal emission is expensive.
460    */
461   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
462       g_param_spec_boolean ("emit-signals", "Emit signals",
463           "Emit need-data, enough-data and seek-data signals",
464           DEFAULT_PROP_EMIT_SIGNALS,
465           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
466 
467   /**
468    * GstAppSrc:min-percent:
469    *
470    * Make appsrc emit the "need-data" signal when the amount of bytes in the
471    * queue drops below this percentage of max-bytes.
472    */
473   g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
474       g_param_spec_uint ("min-percent", "Min Percent",
475           "Emit need-data when queued bytes drops below this percent of max-bytes",
476           0, 100, DEFAULT_PROP_MIN_PERCENT,
477           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
478 
479   /**
480    * GstAppSrc:current-level-bytes:
481    *
482    * The number of currently queued bytes inside appsrc.
483    *
484    * Since: 1.2
485    */
486   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BYTES,
487       g_param_spec_uint64 ("current-level-bytes", "Current Level Bytes",
488           "The number of currently queued bytes",
489           0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES,
490           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
491 
492   /**
493    * GstAppSrc:current-level-buffers:
494    *
495    * The number of currently queued buffers inside appsrc.
496    *
497    * Since: 1.20
498    */
499   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BUFFERS,
500       g_param_spec_uint64 ("current-level-buffers", "Current Level Buffers",
501           "The number of currently queued buffers",
502           0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BUFFERS,
503           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
504 
505   /**
506    * GstAppSrc:current-level-time:
507    *
508    * The amount of currently queued time inside appsrc.
509    *
510    * Since: 1.20
511    */
512   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME,
513       g_param_spec_uint64 ("current-level-time", "Current Level Time",
514           "The amount of currently queued time",
515           0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_TIME,
516           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
517 
518   /**
519    * GstAppSrc:duration:
520    *
521    * The total duration in nanoseconds of the data stream. If the total duration is known, it
522    * is recommended to configure it with this property.
523    *
524    * Since: 1.10
525    */
526   g_object_class_install_property (gobject_class, PROP_DURATION,
527       g_param_spec_uint64 ("duration", "Duration",
528           "The duration of the data stream in nanoseconds (GST_CLOCK_TIME_NONE if unknown)",
529           0, G_MAXUINT64, DEFAULT_PROP_DURATION,
530           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
531 
532   /**
533    * GstAppSrc:handle-segment-change:
534    *
535    * When enabled, appsrc will check GstSegment in GstSample which was
536    * pushed via gst_app_src_push_sample() or "push-sample" signal action.
537    * If a GstSegment is changed, corresponding segment event will be followed
538    * by next data flow.
539    *
540    * FIXME: currently only GST_FORMAT_TIME format is supported and therefore
541    * GstAppSrc::format should be time. However, possibly #GstAppSrc can support
542    * other formats.
543    *
544    * Since: 1.18
545    */
546   g_object_class_install_property (gobject_class, PROP_HANDLE_SEGMENT_CHANGE,
547       g_param_spec_boolean ("handle-segment-change", "Handle Segment Change",
548           "Whether to detect and handle changed time format GstSegment in "
549           "GstSample. User should set valid GstSegment in GstSample. "
550           "Must set format property as \"time\" to enable this property",
551           DEFAULT_PROP_HANDLE_SEGMENT_CHANGE,
552           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
553           G_PARAM_STATIC_STRINGS));
554 
555   /**
556    * GstAppSrc:leaky-type:
557    *
558    * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
559    * will drop any buffers that are pushed into it once its internal queue is
560    * full. The selected type defines whether to drop the oldest or new
561    * buffers.
562    *
563    * Since: 1.20
564    */
565   g_object_class_install_property (gobject_class, PROP_LEAKY_TYPE,
566       g_param_spec_enum ("leaky-type", "Leaky Type",
567           "Whether to drop buffers once the internal queue is full",
568           GST_TYPE_APP_LEAKY_TYPE,
569           DEFAULT_PROP_LEAKY_TYPE,
570           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
571           G_PARAM_STATIC_STRINGS));
572 
573   /**
574    * GstAppSrc::need-data:
575    * @appsrc: the appsrc element that emitted the signal
576    * @length: the amount of bytes needed.
577    *
578    * Signal that the source needs more data. In the callback or from another
579    * thread you should call push-buffer or end-of-stream.
580    *
581    * @length is just a hint and when it is set to -1, any number of bytes can be
582    * pushed into @appsrc.
583    *
584    * You can call push-buffer multiple times until the enough-data signal is
585    * fired.
586    */
587   gst_app_src_signals[SIGNAL_NEED_DATA] =
588       g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
589       G_STRUCT_OFFSET (GstAppSrcClass, need_data),
590       NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
591 
592   /**
593    * GstAppSrc::enough-data:
594    * @appsrc: the appsrc element that emitted the signal
595    *
596    * Signal that the source has enough data. It is recommended that the
597    * application stops calling push-buffer until the need-data signal is
598    * emitted again to avoid excessive buffer queueing.
599    */
600   gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
601       g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
602       G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
603       NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
604 
605   /**
606    * GstAppSrc::seek-data:
607    * @appsrc: the appsrc element that emitted the signal
608    * @offset: the offset to seek to
609    *
610    * Seek to the given offset. The next push-buffer should produce buffers from
611    * the new @offset.
612    * This callback is only called for seekable stream types.
613    *
614    * Returns: %TRUE if the seek succeeded.
615    */
616   gst_app_src_signals[SIGNAL_SEEK_DATA] =
617       g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
618       G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
619       NULL, NULL, NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
620 
621    /**
622     * GstAppSrc::push-buffer:
623     * @appsrc: the appsrc
624     * @buffer: (transfer none): a buffer to push
625     *
626     * Adds a buffer to the queue of buffers that the appsrc element will
627     * push to its source pad.
628     *
629     * This function does not take ownership of the buffer, but it takes a
630     * reference so the buffer can be unreffed at any time after calling this
631     * function.
632     *
633     * When the block property is TRUE, this function can block until free space
634     * becomes available in the queue.
635     */
636   gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
637       g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
638       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
639           push_buffer), NULL, NULL, NULL,
640       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
641 
642    /**
643     * GstAppSrc::push-buffer-list:
644     * @appsrc: the appsrc
645     * @buffer_list: (transfer none): a buffer list to push
646     *
647     * Adds a buffer list to the queue of buffers and buffer lists that the
648     * appsrc element will push to its source pad.
649     *
650     * This function does not take ownership of the buffer list, but it takes a
651     * reference so the buffer list can be unreffed at any time after calling
652     * this function.
653     *
654     * When the block property is TRUE, this function can block until free space
655     * becomes available in the queue.
656     *
657     * Since: 1.14
658     */
659   gst_app_src_signals[SIGNAL_PUSH_BUFFER_LIST] =
660       g_signal_new ("push-buffer-list", G_TYPE_FROM_CLASS (klass),
661       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
662           push_buffer_list), NULL, NULL, NULL,
663       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER_LIST);
664 
665   /**
666     * GstAppSrc::push-sample:
667     * @appsrc: the appsrc
668     * @sample: (transfer none): a sample from which extract buffer to push
669     *
670     * Extract a buffer from the provided sample and adds the extracted buffer
671     * to the queue of buffers that the appsrc element will
672     * push to its source pad. This function set the appsrc caps based on the caps
673     * in the sample and reset the caps if they change.
674     * Only the caps and the buffer of the provided sample are used and not
675     * for example the segment in the sample.
676     *
677     * This function does not take ownership of the sample, but it takes a
678     * reference so the sample can be unreffed at any time after calling this
679     * function.
680     *
681     * When the block property is TRUE, this function can block until free space
682     * becomes available in the queue.
683     *
684     * Since: 1.6
685     */
686   gst_app_src_signals[SIGNAL_PUSH_SAMPLE] =
687       g_signal_new ("push-sample", G_TYPE_FROM_CLASS (klass),
688       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
689           push_sample), NULL, NULL, NULL,
690       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_SAMPLE);
691 
692 
693    /**
694     * GstAppSrc::end-of-stream:
695     * @appsrc: the appsrc
696     *
697     * Notify @appsrc that no more buffer are available.
698     */
699   gst_app_src_signals[SIGNAL_END_OF_STREAM] =
700       g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
701       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
702           end_of_stream), NULL, NULL, NULL,
703       GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
704 
705   gst_element_class_set_static_metadata (element_class, "AppSrc",
706       "Generic/Source", "Allow the application to feed buffers to a pipeline",
707       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
708 
709   gst_element_class_add_static_pad_template (element_class,
710       &gst_app_src_template);
711 
712   element_class->send_event = gst_app_src_send_event;
713 
714   basesrc_class->negotiate = gst_app_src_negotiate;
715   basesrc_class->get_caps = gst_app_src_internal_get_caps;
716   basesrc_class->create = gst_app_src_create;
717   basesrc_class->start = gst_app_src_start;
718   basesrc_class->stop = gst_app_src_stop;
719   basesrc_class->unlock = gst_app_src_unlock;
720   basesrc_class->unlock_stop = gst_app_src_unlock_stop;
721   basesrc_class->do_seek = gst_app_src_do_seek;
722   basesrc_class->is_seekable = gst_app_src_is_seekable;
723   basesrc_class->get_size = gst_app_src_do_get_size;
724   basesrc_class->query = gst_app_src_query;
725   basesrc_class->event = gst_app_src_event;
726 
727   klass->push_buffer = gst_app_src_push_buffer_action;
728   klass->push_buffer_list = gst_app_src_push_buffer_list_action;
729   klass->push_sample = gst_app_src_push_sample_action;
730   klass->end_of_stream = gst_app_src_end_of_stream;
731 }
732 
733 static void
gst_app_src_init(GstAppSrc * appsrc)734 gst_app_src_init (GstAppSrc * appsrc)
735 {
736   GstAppSrcPrivate *priv;
737 
738   priv = appsrc->priv = gst_app_src_get_instance_private (appsrc);
739 
740   g_mutex_init (&priv->mutex);
741   g_cond_init (&priv->cond);
742   priv->queue = gst_queue_array_new (16);
743   priv->wait_status = NOONE_WAITING;
744 
745   priv->size = DEFAULT_PROP_SIZE;
746   priv->duration = DEFAULT_PROP_DURATION;
747   priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
748   priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
749   priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
750   priv->max_time = DEFAULT_PROP_MAX_TIME;
751   priv->format = DEFAULT_PROP_FORMAT;
752   priv->block = DEFAULT_PROP_BLOCK;
753   priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
754   priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
755   priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
756   priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
757   priv->handle_segment_change = DEFAULT_PROP_HANDLE_SEGMENT_CHANGE;
758   priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE;
759 
760   gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
761 }
762 
763 /* Must be called with priv->mutex */
764 static void
gst_app_src_flush_queued(GstAppSrc * src,gboolean retain_last_caps)765 gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
766 {
767   GstMiniObject *obj;
768   GstAppSrcPrivate *priv = src->priv;
769   GstCaps *requeue_caps = NULL;
770 
771   while (!gst_queue_array_is_empty (priv->queue)) {
772     obj = gst_queue_array_pop_head (priv->queue);
773     if (obj) {
774       if (GST_IS_CAPS (obj) && retain_last_caps) {
775         gst_caps_replace (&requeue_caps, GST_CAPS_CAST (obj));
776       }
777       gst_mini_object_unref (obj);
778     }
779   }
780 
781   if (requeue_caps) {
782     gst_queue_array_push_tail (priv->queue, requeue_caps);
783   }
784 
785   priv->queued_bytes = 0;
786   priv->queued_buffers = 0;
787   priv->queued_time = 0;
788   priv->last_in_running_time = GST_CLOCK_TIME_NONE;
789   priv->last_out_running_time = GST_CLOCK_TIME_NONE;
790   priv->need_discont_upstream = FALSE;
791   priv->need_discont_downstream = FALSE;
792 }
793 
794 static void
gst_app_src_dispose(GObject * obj)795 gst_app_src_dispose (GObject * obj)
796 {
797   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
798   GstAppSrcPrivate *priv = appsrc->priv;
799   Callbacks *callbacks = NULL;
800 
801   GST_OBJECT_LOCK (appsrc);
802   if (priv->current_caps) {
803     gst_caps_unref (priv->current_caps);
804     priv->current_caps = NULL;
805   }
806   if (priv->last_caps) {
807     gst_caps_unref (priv->last_caps);
808     priv->last_caps = NULL;
809   }
810   GST_OBJECT_UNLOCK (appsrc);
811 
812   g_mutex_lock (&priv->mutex);
813   if (priv->callbacks)
814     callbacks = g_steal_pointer (&priv->callbacks);
815   gst_app_src_flush_queued (appsrc, FALSE);
816   g_mutex_unlock (&priv->mutex);
817 
818   g_clear_pointer (&callbacks, callbacks_unref);
819 
820   G_OBJECT_CLASS (parent_class)->dispose (obj);
821 }
822 
823 static void
gst_app_src_finalize(GObject * obj)824 gst_app_src_finalize (GObject * obj)
825 {
826   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
827   GstAppSrcPrivate *priv = appsrc->priv;
828 
829   g_mutex_clear (&priv->mutex);
830   g_cond_clear (&priv->cond);
831   gst_queue_array_free (priv->queue);
832 
833   g_free (priv->uri);
834 
835   G_OBJECT_CLASS (parent_class)->finalize (obj);
836 }
837 
838 static GstCaps *
gst_app_src_internal_get_caps(GstBaseSrc * bsrc,GstCaps * filter)839 gst_app_src_internal_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
840 {
841   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
842   GstCaps *caps;
843 
844   GST_OBJECT_LOCK (appsrc);
845   if ((caps = appsrc->priv->current_caps))
846     gst_caps_ref (caps);
847   GST_OBJECT_UNLOCK (appsrc);
848 
849   if (filter) {
850     if (caps) {
851       GstCaps *intersection =
852           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
853       gst_caps_unref (caps);
854       caps = intersection;
855     } else {
856       caps = gst_caps_ref (filter);
857     }
858   }
859 
860   GST_DEBUG_OBJECT (appsrc, "caps: %" GST_PTR_FORMAT, caps);
861   return caps;
862 }
863 
864 static void
gst_app_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)865 gst_app_src_set_property (GObject * object, guint prop_id,
866     const GValue * value, GParamSpec * pspec)
867 {
868   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
869   GstAppSrcPrivate *priv = appsrc->priv;
870 
871   switch (prop_id) {
872     case PROP_CAPS:
873       gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
874       break;
875     case PROP_SIZE:
876       gst_app_src_set_size (appsrc, g_value_get_int64 (value));
877       break;
878     case PROP_STREAM_TYPE:
879       gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
880       break;
881     case PROP_MAX_BYTES:
882       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
883       break;
884     case PROP_MAX_BUFFERS:
885       gst_app_src_set_max_buffers (appsrc, g_value_get_uint64 (value));
886       break;
887     case PROP_MAX_TIME:
888       gst_app_src_set_max_time (appsrc, g_value_get_uint64 (value));
889       break;
890     case PROP_FORMAT:
891       priv->format = g_value_get_enum (value);
892       break;
893     case PROP_BLOCK:
894       priv->block = g_value_get_boolean (value);
895       break;
896     case PROP_IS_LIVE:
897       gst_base_src_set_live (GST_BASE_SRC (appsrc),
898           g_value_get_boolean (value));
899       break;
900     case PROP_MIN_LATENCY:
901       gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
902           FALSE, -1);
903       break;
904     case PROP_MAX_LATENCY:
905       gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
906           g_value_get_int64 (value));
907       break;
908     case PROP_EMIT_SIGNALS:
909       gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
910       break;
911     case PROP_MIN_PERCENT:
912       priv->min_percent = g_value_get_uint (value);
913       break;
914     case PROP_DURATION:
915       gst_app_src_set_duration (appsrc, g_value_get_uint64 (value));
916       break;
917     case PROP_HANDLE_SEGMENT_CHANGE:
918       priv->handle_segment_change = g_value_get_boolean (value);
919       break;
920     case PROP_LEAKY_TYPE:
921       priv->leaky_type = g_value_get_enum (value);
922       break;
923     default:
924       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
925       break;
926   }
927 }
928 
929 static void
gst_app_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)930 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
931     GParamSpec * pspec)
932 {
933   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
934   GstAppSrcPrivate *priv = appsrc->priv;
935 
936   switch (prop_id) {
937     case PROP_CAPS:
938       g_value_take_boxed (value, gst_app_src_get_caps (appsrc));
939       break;
940     case PROP_SIZE:
941       g_value_set_int64 (value, gst_app_src_get_size (appsrc));
942       break;
943     case PROP_STREAM_TYPE:
944       g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
945       break;
946     case PROP_MAX_BYTES:
947       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
948       break;
949     case PROP_MAX_BUFFERS:
950       g_value_set_uint64 (value, gst_app_src_get_max_buffers (appsrc));
951       break;
952     case PROP_MAX_TIME:
953       g_value_set_uint64 (value, gst_app_src_get_max_time (appsrc));
954       break;
955     case PROP_FORMAT:
956       g_value_set_enum (value, priv->format);
957       break;
958     case PROP_BLOCK:
959       g_value_set_boolean (value, priv->block);
960       break;
961     case PROP_IS_LIVE:
962       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
963       break;
964     case PROP_MIN_LATENCY:
965     {
966       guint64 min = 0;
967 
968       gst_app_src_get_latency (appsrc, &min, NULL);
969       g_value_set_int64 (value, min);
970       break;
971     }
972     case PROP_MAX_LATENCY:
973     {
974       guint64 max = 0;
975 
976       gst_app_src_get_latency (appsrc, NULL, &max);
977       g_value_set_int64 (value, max);
978       break;
979     }
980     case PROP_EMIT_SIGNALS:
981       g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
982       break;
983     case PROP_MIN_PERCENT:
984       g_value_set_uint (value, priv->min_percent);
985       break;
986     case PROP_CURRENT_LEVEL_BYTES:
987       g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc));
988       break;
989     case PROP_CURRENT_LEVEL_BUFFERS:
990       g_value_set_uint64 (value,
991           gst_app_src_get_current_level_buffers (appsrc));
992       break;
993     case PROP_CURRENT_LEVEL_TIME:
994       g_value_set_uint64 (value, gst_app_src_get_current_level_time (appsrc));
995       break;
996     case PROP_DURATION:
997       g_value_set_uint64 (value, gst_app_src_get_duration (appsrc));
998       break;
999     case PROP_HANDLE_SEGMENT_CHANGE:
1000       g_value_set_boolean (value, priv->handle_segment_change);
1001       break;
1002     case PROP_LEAKY_TYPE:
1003       g_value_set_enum (value, priv->leaky_type);
1004       break;
1005     default:
1006       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1007       break;
1008   }
1009 }
1010 
1011 static gboolean
gst_app_src_send_event(GstElement * element,GstEvent * event)1012 gst_app_src_send_event (GstElement * element, GstEvent * event)
1013 {
1014   GstAppSrc *appsrc = GST_APP_SRC_CAST (element);
1015   GstAppSrcPrivate *priv = appsrc->priv;
1016 
1017   switch (GST_EVENT_TYPE (event)) {
1018     case GST_EVENT_FLUSH_STOP:
1019       g_mutex_lock (&priv->mutex);
1020       gst_app_src_flush_queued (appsrc, TRUE);
1021       g_mutex_unlock (&priv->mutex);
1022       break;
1023     default:
1024       if (GST_EVENT_IS_SERIALIZED (event)) {
1025         GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event);
1026         g_mutex_lock (&priv->mutex);
1027         gst_queue_array_push_tail (priv->queue, event);
1028 
1029         if ((priv->wait_status & STREAM_WAITING))
1030           g_cond_broadcast (&priv->cond);
1031 
1032         g_mutex_unlock (&priv->mutex);
1033         return TRUE;
1034       }
1035       break;
1036   }
1037 
1038   return GST_CALL_PARENT_WITH_DEFAULT (GST_ELEMENT_CLASS, send_event, (element,
1039           event), FALSE);
1040 }
1041 
1042 static gboolean
gst_app_src_unlock(GstBaseSrc * bsrc)1043 gst_app_src_unlock (GstBaseSrc * bsrc)
1044 {
1045   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1046   GstAppSrcPrivate *priv = appsrc->priv;
1047 
1048   g_mutex_lock (&priv->mutex);
1049   GST_DEBUG_OBJECT (appsrc, "unlock start");
1050   priv->flushing = TRUE;
1051   g_cond_broadcast (&priv->cond);
1052   g_mutex_unlock (&priv->mutex);
1053 
1054   return TRUE;
1055 }
1056 
1057 static gboolean
gst_app_src_unlock_stop(GstBaseSrc * bsrc)1058 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
1059 {
1060   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1061   GstAppSrcPrivate *priv = appsrc->priv;
1062 
1063   g_mutex_lock (&priv->mutex);
1064   GST_DEBUG_OBJECT (appsrc, "unlock stop");
1065   priv->flushing = FALSE;
1066   g_cond_broadcast (&priv->cond);
1067   g_mutex_unlock (&priv->mutex);
1068 
1069   return TRUE;
1070 }
1071 
1072 static gboolean
gst_app_src_start(GstBaseSrc * bsrc)1073 gst_app_src_start (GstBaseSrc * bsrc)
1074 {
1075   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1076   GstAppSrcPrivate *priv = appsrc->priv;
1077 
1078   g_mutex_lock (&priv->mutex);
1079   GST_DEBUG_OBJECT (appsrc, "starting");
1080   priv->started = TRUE;
1081   /* set the offset to -1 so that we always do a first seek. This is only used
1082    * in random-access mode. */
1083   priv->offset = -1;
1084   priv->flushing = FALSE;
1085   g_mutex_unlock (&priv->mutex);
1086 
1087   gst_base_src_set_format (bsrc, priv->format);
1088   gst_segment_init (&priv->last_segment, priv->format);
1089   gst_segment_init (&priv->current_segment, priv->format);
1090   priv->pending_custom_segment = FALSE;
1091 
1092   return TRUE;
1093 }
1094 
1095 static gboolean
gst_app_src_stop(GstBaseSrc * bsrc)1096 gst_app_src_stop (GstBaseSrc * bsrc)
1097 {
1098   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1099   GstAppSrcPrivate *priv = appsrc->priv;
1100 
1101   g_mutex_lock (&priv->mutex);
1102   GST_DEBUG_OBJECT (appsrc, "stopping");
1103   priv->is_eos = FALSE;
1104   priv->flushing = TRUE;
1105   priv->started = FALSE;
1106   gst_app_src_flush_queued (appsrc, TRUE);
1107   g_cond_broadcast (&priv->cond);
1108   g_mutex_unlock (&priv->mutex);
1109 
1110   return TRUE;
1111 }
1112 
1113 static gboolean
gst_app_src_is_seekable(GstBaseSrc * src)1114 gst_app_src_is_seekable (GstBaseSrc * src)
1115 {
1116   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1117   GstAppSrcPrivate *priv = appsrc->priv;
1118   gboolean res = FALSE;
1119 
1120   switch (priv->stream_type) {
1121     case GST_APP_STREAM_TYPE_STREAM:
1122       break;
1123     case GST_APP_STREAM_TYPE_SEEKABLE:
1124     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
1125       res = TRUE;
1126       break;
1127   }
1128   return res;
1129 }
1130 
1131 static gboolean
gst_app_src_do_get_size(GstBaseSrc * src,guint64 * size)1132 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
1133 {
1134   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1135 
1136   *size = gst_app_src_get_size (appsrc);
1137 
1138   return TRUE;
1139 }
1140 
1141 static gboolean
gst_app_src_query(GstBaseSrc * src,GstQuery * query)1142 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
1143 {
1144   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1145   GstAppSrcPrivate *priv = appsrc->priv;
1146   gboolean res;
1147 
1148   switch (GST_QUERY_TYPE (query)) {
1149     case GST_QUERY_LATENCY:
1150     {
1151       GstClockTime min, max;
1152       gboolean live;
1153 
1154       /* Query the parent class for the defaults */
1155       res = gst_base_src_query_latency (src, &live, &min, &max);
1156 
1157       /* overwrite with our values when we need to */
1158       g_mutex_lock (&priv->mutex);
1159       if (priv->min_latency != -1) {
1160         min = priv->min_latency;
1161         max = priv->max_latency;
1162       }
1163       g_mutex_unlock (&priv->mutex);
1164 
1165       gst_query_set_latency (query, live, min, max);
1166       break;
1167     }
1168     case GST_QUERY_SCHEDULING:
1169     {
1170       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1171       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1172 
1173       switch (priv->stream_type) {
1174         case GST_APP_STREAM_TYPE_STREAM:
1175         case GST_APP_STREAM_TYPE_SEEKABLE:
1176           break;
1177         case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
1178           gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1179           break;
1180       }
1181       res = TRUE;
1182       break;
1183     }
1184     case GST_QUERY_DURATION:
1185     {
1186       GstFormat format;
1187       gst_query_parse_duration (query, &format, NULL);
1188       if (format == GST_FORMAT_BYTES) {
1189         gst_query_set_duration (query, format, priv->size);
1190         res = TRUE;
1191       } else if (format == GST_FORMAT_TIME) {
1192         if (priv->duration != GST_CLOCK_TIME_NONE) {
1193           gst_query_set_duration (query, format, priv->duration);
1194           res = TRUE;
1195         } else {
1196           res = FALSE;
1197         }
1198       } else {
1199         res = FALSE;
1200       }
1201       break;
1202     }
1203     default:
1204       res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
1205       break;
1206   }
1207 
1208   return res;
1209 }
1210 
1211 /* will be called in push mode */
1212 static gboolean
gst_app_src_do_seek(GstBaseSrc * src,GstSegment * segment)1213 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1214 {
1215   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1216   GstAppSrcPrivate *priv = appsrc->priv;
1217   gint64 desired_position;
1218   gboolean res = FALSE;
1219   gboolean emit;
1220   Callbacks *callbacks = NULL;
1221 
1222   desired_position = segment->position;
1223 
1224   /* no need to try to seek in streaming mode */
1225   if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
1226     return TRUE;
1227 
1228   GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
1229       desired_position, gst_format_get_name (segment->format));
1230 
1231   g_mutex_lock (&priv->mutex);
1232   emit = priv->emit_signals;
1233   if (priv->callbacks)
1234     callbacks = callbacks_ref (priv->callbacks);
1235   g_mutex_unlock (&priv->mutex);
1236 
1237   if (callbacks && callbacks->callbacks.seek_data) {
1238     res =
1239         callbacks->callbacks.seek_data (appsrc, desired_position,
1240         callbacks->user_data);
1241   } else if (emit) {
1242     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1243         desired_position, &res);
1244   }
1245 
1246   g_clear_pointer (&callbacks, callbacks_unref);
1247 
1248   if (res) {
1249     GST_DEBUG_OBJECT (appsrc, "flushing queue");
1250     g_mutex_lock (&priv->mutex);
1251     gst_app_src_flush_queued (appsrc, TRUE);
1252     gst_segment_copy_into (segment, &priv->last_segment);
1253     gst_segment_copy_into (segment, &priv->current_segment);
1254     priv->pending_custom_segment = FALSE;
1255     g_mutex_unlock (&priv->mutex);
1256     priv->is_eos = FALSE;
1257   } else {
1258     GST_WARNING_OBJECT (appsrc, "seek failed");
1259   }
1260 
1261   return res;
1262 }
1263 
1264 /* must be called with the appsrc mutex */
1265 static gboolean
gst_app_src_emit_seek(GstAppSrc * appsrc,guint64 offset)1266 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
1267 {
1268   gboolean res = FALSE;
1269   gboolean emit;
1270   GstAppSrcPrivate *priv = appsrc->priv;
1271   Callbacks *callbacks = NULL;
1272 
1273   emit = priv->emit_signals;
1274   if (priv->callbacks)
1275     callbacks = callbacks_ref (priv->callbacks);
1276   g_mutex_unlock (&priv->mutex);
1277 
1278   GST_DEBUG_OBJECT (appsrc,
1279       "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
1280       priv->offset, offset);
1281 
1282   if (callbacks && callbacks->callbacks.seek_data)
1283     res = callbacks->callbacks.seek_data (appsrc, offset, callbacks->user_data);
1284   else if (emit)
1285     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1286         offset, &res);
1287 
1288   g_clear_pointer (&callbacks, callbacks_unref);
1289 
1290   g_mutex_lock (&priv->mutex);
1291 
1292   return res;
1293 }
1294 
1295 /* must be called with the appsrc mutex. After this call things can be
1296  * flushing */
1297 static void
gst_app_src_emit_need_data(GstAppSrc * appsrc,guint size)1298 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
1299 {
1300   gboolean emit;
1301   GstAppSrcPrivate *priv = appsrc->priv;
1302   Callbacks *callbacks = NULL;
1303 
1304   emit = priv->emit_signals;
1305   if (priv->callbacks)
1306     callbacks = callbacks_ref (priv->callbacks);
1307   g_mutex_unlock (&priv->mutex);
1308 
1309   /* we have no data, we need some. We fire the signal with the size hint. */
1310   if (callbacks && callbacks->callbacks.need_data)
1311     callbacks->callbacks.need_data (appsrc, size, callbacks->user_data);
1312   else if (emit)
1313     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
1314         NULL);
1315 
1316   g_clear_pointer (&callbacks, callbacks_unref);
1317 
1318   g_mutex_lock (&priv->mutex);
1319   /* we can be flushing now because we released the lock */
1320 }
1321 
1322 /* must be called with the appsrc mutex */
1323 static gboolean
gst_app_src_do_negotiate(GstBaseSrc * basesrc)1324 gst_app_src_do_negotiate (GstBaseSrc * basesrc)
1325 {
1326   GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1327   GstAppSrcPrivate *priv = appsrc->priv;
1328   gboolean result;
1329   GstCaps *caps;
1330 
1331   GST_OBJECT_LOCK (basesrc);
1332   caps = priv->current_caps ? gst_caps_ref (priv->current_caps) : NULL;
1333   GST_OBJECT_UNLOCK (basesrc);
1334 
1335   /* Avoid deadlock by unlocking mutex
1336    * otherwise we get deadlock between this and stream lock */
1337   g_mutex_unlock (&priv->mutex);
1338   if (caps) {
1339     result = gst_base_src_set_caps (basesrc, caps);
1340     gst_caps_unref (caps);
1341   } else {
1342     result = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc);
1343   }
1344   g_mutex_lock (&priv->mutex);
1345 
1346   return result;
1347 }
1348 
1349 static gboolean
gst_app_src_negotiate(GstBaseSrc * basesrc)1350 gst_app_src_negotiate (GstBaseSrc * basesrc)
1351 {
1352   GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1353   GstAppSrcPrivate *priv = appsrc->priv;
1354   gboolean result;
1355 
1356   g_mutex_lock (&priv->mutex);
1357   result = gst_app_src_do_negotiate (basesrc);
1358   g_mutex_unlock (&priv->mutex);
1359   return result;
1360 }
1361 
1362 /* Update the currently queued bytes/buffers/time information for the item
1363  * that was just removed from the queue.
1364  *
1365  * If update_offset is set, additionally the offset of the source will be
1366  * moved forward accordingly as if that many bytes were output.
1367  */
1368 static void
gst_app_src_update_queued_pop(GstAppSrc * appsrc,GstMiniObject * item,gboolean update_offset)1369 gst_app_src_update_queued_pop (GstAppSrc * appsrc, GstMiniObject * item,
1370     gboolean update_offset)
1371 {
1372   GstAppSrcPrivate *priv = appsrc->priv;
1373   guint buf_size = 0;
1374   guint n_buffers = 0;
1375   GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
1376 
1377   if (GST_IS_BUFFER (item)) {
1378     GstBuffer *buf = GST_BUFFER_CAST (item);
1379     buf_size = gst_buffer_get_size (buf);
1380     n_buffers = 1;
1381 
1382     end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
1383     if (end_buffer_ts != GST_CLOCK_TIME_NONE
1384         && GST_BUFFER_DURATION_IS_VALID (buf))
1385       end_buffer_ts += GST_BUFFER_DURATION (buf);
1386 
1387     GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", buf, buf_size);
1388   } else if (GST_IS_BUFFER_LIST (item)) {
1389     GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
1390     guint i;
1391 
1392     n_buffers = gst_buffer_list_length (buffer_list);
1393 
1394     for (i = 0; i < n_buffers; i++) {
1395       GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
1396       GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
1397 
1398       buf_size += gst_buffer_get_size (tmp);
1399       /* Update to the last buffer's timestamp that is known */
1400       if (ts != GST_CLOCK_TIME_NONE) {
1401         end_buffer_ts = ts;
1402         if (GST_BUFFER_DURATION_IS_VALID (tmp))
1403           end_buffer_ts += GST_BUFFER_DURATION (tmp);
1404       }
1405     }
1406   }
1407 
1408   priv->queued_bytes -= buf_size;
1409   priv->queued_buffers -= n_buffers;
1410 
1411   /* Update time level if working on a TIME segment */
1412   if ((priv->current_segment.format == GST_FORMAT_TIME
1413           || (priv->current_segment.format == GST_FORMAT_UNDEFINED
1414               && priv->last_segment.format == GST_FORMAT_TIME))
1415       && end_buffer_ts != GST_CLOCK_TIME_NONE) {
1416     const GstSegment *segment =
1417         priv->current_segment.format ==
1418         GST_FORMAT_TIME ? &priv->current_segment : &priv->last_segment;
1419 
1420     /* Clip to the current segment boundaries */
1421     if (segment->stop != -1 && end_buffer_ts > segment->stop)
1422       end_buffer_ts = segment->stop;
1423     else if (segment->start > end_buffer_ts)
1424       end_buffer_ts = segment->start;
1425 
1426     priv->last_out_running_time =
1427         gst_segment_to_running_time (segment, GST_FORMAT_TIME, end_buffer_ts);
1428 
1429     GST_TRACE_OBJECT (appsrc,
1430         "Last in running time %" GST_TIME_FORMAT ", last out running time %"
1431         GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
1432         GST_TIME_ARGS (priv->last_out_running_time));
1433 
1434     /* If timestamps on both sides are known, calculate the current
1435      * fill level in time and consider the queue empty if the output
1436      * running time is lower than the input one (i.e. some kind of reset
1437      * has happened).
1438      */
1439     if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
1440         && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
1441       if (priv->last_out_running_time > priv->last_in_running_time) {
1442         priv->queued_time = 0;
1443       } else {
1444         priv->queued_time =
1445             priv->last_in_running_time - priv->last_out_running_time;
1446       }
1447     }
1448   }
1449 
1450   GST_DEBUG_OBJECT (appsrc,
1451       "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
1452       " buffers, %" GST_TIME_FORMAT, priv->queued_bytes,
1453       priv->queued_buffers, GST_TIME_ARGS (priv->queued_time));
1454 
1455   /* only update the offset when in random_access mode and when requested by
1456    * the caller, i.e. not when just dropping the item */
1457   if (update_offset && priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
1458     priv->offset += buf_size;
1459 }
1460 
1461 /* Update the currently queued bytes/buffers/time information for the item
1462  * that was just added to the queue.
1463  */
1464 static void
gst_app_src_update_queued_push(GstAppSrc * appsrc,GstMiniObject * item)1465 gst_app_src_update_queued_push (GstAppSrc * appsrc, GstMiniObject * item)
1466 {
1467   GstAppSrcPrivate *priv = appsrc->priv;
1468   GstClockTime start_buffer_ts = GST_CLOCK_TIME_NONE;
1469   GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
1470   guint buf_size = 0;
1471   guint n_buffers = 0;
1472 
1473   if (GST_IS_BUFFER (item)) {
1474     GstBuffer *buf = GST_BUFFER_CAST (item);
1475 
1476     buf_size = gst_buffer_get_size (buf);
1477     n_buffers = 1;
1478 
1479     start_buffer_ts = end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
1480     if (end_buffer_ts != GST_CLOCK_TIME_NONE
1481         && GST_BUFFER_DURATION_IS_VALID (buf))
1482       end_buffer_ts += GST_BUFFER_DURATION (buf);
1483   } else if (GST_IS_BUFFER_LIST (item)) {
1484     GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
1485     guint i;
1486 
1487     n_buffers = gst_buffer_list_length (buffer_list);
1488 
1489     for (i = 0; i < n_buffers; i++) {
1490       GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
1491       GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
1492 
1493       buf_size += gst_buffer_get_size (tmp);
1494 
1495       if (ts != GST_CLOCK_TIME_NONE) {
1496         if (start_buffer_ts == GST_CLOCK_TIME_NONE)
1497           start_buffer_ts = ts;
1498         end_buffer_ts = ts;
1499         if (GST_BUFFER_DURATION_IS_VALID (tmp))
1500           end_buffer_ts += GST_BUFFER_DURATION (tmp);
1501       }
1502     }
1503   }
1504 
1505   priv->queued_bytes += buf_size;
1506   priv->queued_buffers += n_buffers;
1507 
1508   /* Update time level if working on a TIME segment */
1509   if (priv->last_segment.format == GST_FORMAT_TIME
1510       && end_buffer_ts != GST_CLOCK_TIME_NONE) {
1511     /* Clip to the last segment boundaries */
1512     if (priv->last_segment.stop != -1
1513         && end_buffer_ts > priv->last_segment.stop)
1514       end_buffer_ts = priv->last_segment.stop;
1515     else if (priv->last_segment.start > end_buffer_ts)
1516       end_buffer_ts = priv->last_segment.start;
1517 
1518     priv->last_in_running_time =
1519         gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
1520         end_buffer_ts);
1521 
1522     /* If this is the only buffer then we can directly update the queued time
1523      * here. This is especially useful if this was the first buffer because
1524      * otherwise we would have to wait until it is actually unqueued to know
1525      * the queued duration */
1526     if (priv->queued_buffers == 1) {
1527       if (priv->last_segment.stop != -1
1528           && start_buffer_ts > priv->last_segment.stop)
1529         start_buffer_ts = priv->last_segment.stop;
1530       else if (priv->last_segment.start > start_buffer_ts)
1531         start_buffer_ts = priv->last_segment.start;
1532 
1533       priv->last_out_running_time =
1534           gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
1535           start_buffer_ts);
1536     }
1537 
1538     GST_TRACE_OBJECT (appsrc,
1539         "Last in running time %" GST_TIME_FORMAT ", last out running time %"
1540         GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
1541         GST_TIME_ARGS (priv->last_out_running_time));
1542 
1543     if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
1544         && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
1545       if (priv->last_out_running_time > priv->last_in_running_time) {
1546         priv->queued_time = 0;
1547       } else {
1548         priv->queued_time =
1549             priv->last_in_running_time - priv->last_out_running_time;
1550       }
1551     }
1552   }
1553 
1554   GST_DEBUG_OBJECT (appsrc,
1555       "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
1556       " buffers, %" GST_TIME_FORMAT, priv->queued_bytes, priv->queued_buffers,
1557       GST_TIME_ARGS (priv->queued_time));
1558 }
1559 
1560 static GstFlowReturn
gst_app_src_create(GstBaseSrc * bsrc,guint64 offset,guint size,GstBuffer ** buf)1561 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
1562     GstBuffer ** buf)
1563 {
1564   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1565   GstAppSrcPrivate *priv = appsrc->priv;
1566   GstFlowReturn ret;
1567 
1568   GST_OBJECT_LOCK (appsrc);
1569   if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
1570           bsrc->segment.format == GST_FORMAT_BYTES)) {
1571     GST_DEBUG_OBJECT (appsrc,
1572         "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
1573         bsrc->segment.duration, priv->size);
1574     bsrc->segment.duration = priv->size;
1575     GST_OBJECT_UNLOCK (appsrc);
1576 
1577     gst_element_post_message (GST_ELEMENT (appsrc),
1578         gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1579   } else if (G_UNLIKELY (priv->duration != bsrc->segment.duration &&
1580           bsrc->segment.format == GST_FORMAT_TIME)) {
1581     GST_DEBUG_OBJECT (appsrc,
1582         "Duration changed from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
1583         GST_TIME_ARGS (bsrc->segment.duration), GST_TIME_ARGS (priv->duration));
1584     bsrc->segment.duration = priv->duration;
1585     GST_OBJECT_UNLOCK (appsrc);
1586 
1587     gst_element_post_message (GST_ELEMENT (appsrc),
1588         gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1589   } else {
1590     GST_OBJECT_UNLOCK (appsrc);
1591   }
1592 
1593   g_mutex_lock (&priv->mutex);
1594   /* check flushing first */
1595   if (G_UNLIKELY (priv->flushing))
1596     goto flushing;
1597 
1598   if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
1599     /* if we are dealing with a random-access stream, issue a seek if the offset
1600      * changed. */
1601     if (G_UNLIKELY (priv->offset != offset)) {
1602       gboolean res;
1603 
1604       /* do the seek */
1605       res = gst_app_src_emit_seek (appsrc, offset);
1606 
1607       if (G_UNLIKELY (!res))
1608         /* failing to seek is fatal */
1609         goto seek_error;
1610 
1611       priv->offset = offset;
1612       priv->is_eos = FALSE;
1613     }
1614   }
1615 
1616   while (TRUE) {
1617     /* Our lock may have been release to push events or caps, check out
1618      * state in case we are now flushing. */
1619     if (G_UNLIKELY (priv->flushing))
1620       goto flushing;
1621 
1622     /* return data as long as we have some */
1623     if (!gst_queue_array_is_empty (priv->queue)) {
1624       GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
1625 
1626       if (GST_IS_CAPS (obj)) {
1627         GstCaps *next_caps = GST_CAPS (obj);
1628         gboolean caps_changed = TRUE;
1629 
1630         if (next_caps && priv->current_caps)
1631           caps_changed = !gst_caps_is_equal (next_caps, priv->current_caps);
1632         else
1633           caps_changed = (next_caps != priv->current_caps);
1634 
1635         gst_caps_replace (&priv->current_caps, next_caps);
1636 
1637         if (next_caps) {
1638           gst_caps_unref (next_caps);
1639         }
1640 
1641         if (caps_changed)
1642           gst_app_src_do_negotiate (bsrc);
1643 
1644         /* Continue checks caps and queue */
1645         continue;
1646       }
1647 
1648       if (GST_IS_BUFFER (obj)) {
1649         GstBuffer *buffer = GST_BUFFER (obj);
1650 
1651         /* Mark the buffer as DISCONT if we previously dropped a buffer
1652          * instead of outputting it */
1653         if (priv->need_discont_downstream) {
1654           buffer = gst_buffer_make_writable (buffer);
1655           GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1656           priv->need_discont_downstream = FALSE;
1657         }
1658 
1659         *buf = buffer;
1660       } else if (GST_IS_BUFFER_LIST (obj)) {
1661         GstBufferList *buffer_list;
1662 
1663         buffer_list = GST_BUFFER_LIST (obj);
1664 
1665         /* Mark the first buffer of the buffer list as DISCONT if we
1666          * previously dropped a buffer instead of outputting it */
1667         if (priv->need_discont_downstream) {
1668           GstBuffer *buffer;
1669 
1670           buffer_list = gst_buffer_list_make_writable (buffer_list);
1671           buffer = gst_buffer_list_get_writable (buffer_list, 0);
1672           GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1673           priv->need_discont_downstream = FALSE;
1674         }
1675 
1676         gst_base_src_submit_buffer_list (bsrc, buffer_list);
1677         *buf = NULL;
1678       } else if (GST_IS_EVENT (obj)) {
1679         GstEvent *event = GST_EVENT (obj);
1680 
1681         GST_DEBUG_OBJECT (appsrc, "pop event %" GST_PTR_FORMAT, event);
1682 
1683         if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1684           const GstSegment *segment = NULL;
1685 
1686           gst_event_parse_segment (event, &segment);
1687           g_assert (segment != NULL);
1688 
1689           if (!gst_segment_is_equal (&priv->current_segment, segment)) {
1690             GST_DEBUG_OBJECT (appsrc,
1691                 "Update new segment %" GST_PTR_FORMAT, event);
1692             if (!gst_base_src_new_segment (bsrc, segment)) {
1693               GST_ERROR_OBJECT (appsrc,
1694                   "Couldn't set new segment %" GST_PTR_FORMAT, event);
1695               gst_event_unref (event);
1696               goto invalid_segment;
1697             }
1698             gst_segment_copy_into (segment, &priv->current_segment);
1699           }
1700 
1701           gst_event_unref (event);
1702         } else {
1703           GstEvent *seg_event;
1704           GstSegment last_segment = priv->last_segment;
1705 
1706           /* event is serialized with the buffers flow */
1707 
1708           /* We are about to push an event, release out lock */
1709           g_mutex_unlock (&priv->mutex);
1710 
1711           seg_event =
1712               gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc),
1713               GST_EVENT_SEGMENT, 0);
1714           if (!seg_event) {
1715             seg_event = gst_event_new_segment (&last_segment);
1716 
1717             GST_DEBUG_OBJECT (appsrc,
1718                 "received serialized event before first buffer, push default segment %"
1719                 GST_PTR_FORMAT, seg_event);
1720 
1721             gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), seg_event);
1722           } else {
1723             gst_event_unref (seg_event);
1724           }
1725 
1726           gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event);
1727 
1728           g_mutex_lock (&priv->mutex);
1729         }
1730         continue;
1731       } else {
1732         g_assert_not_reached ();
1733       }
1734 
1735       gst_app_src_update_queued_pop (appsrc, obj, TRUE);
1736 
1737       /* signal that we removed an item */
1738       if ((priv->wait_status & APP_WAITING))
1739         g_cond_broadcast (&priv->cond);
1740 
1741       /* see if we go lower than the min-percent */
1742       if (priv->min_percent) {
1743         if ((priv->max_bytes
1744                 && priv->queued_bytes * 100 / priv->max_bytes <=
1745                 priv->min_percent) || (priv->max_buffers
1746                 && priv->queued_buffers * 100 / priv->max_buffers <=
1747                 priv->min_percent) || (priv->max_time
1748                 && priv->queued_time * 100 / priv->max_time <=
1749                 priv->min_percent)) {
1750           /* ignore flushing state, we got a buffer and we will return it now.
1751            * Errors will be handled in the next round */
1752           gst_app_src_emit_need_data (appsrc, size);
1753         }
1754       }
1755       ret = GST_FLOW_OK;
1756       break;
1757     } else {
1758       gst_app_src_emit_need_data (appsrc, size);
1759 
1760       /* we can be flushing now because we released the lock above */
1761       if (G_UNLIKELY (priv->flushing))
1762         goto flushing;
1763 
1764       /* if we have a buffer now, continue the loop and try to return it. In
1765        * random-access mode (where a buffer is normally pushed in the above
1766        * signal) we can still be empty because the pushed buffer got flushed or
1767        * when the application pushes the requested buffer later, we support both
1768        * possibilities. */
1769       if (!gst_queue_array_is_empty (priv->queue))
1770         continue;
1771 
1772       /* no buffer yet, maybe we are EOS, if not, block for more data. */
1773     }
1774 
1775     /* check EOS */
1776     if (G_UNLIKELY (priv->is_eos))
1777       goto eos;
1778 
1779     /* nothing to return, wait a while for new data or flushing. */
1780     priv->wait_status |= STREAM_WAITING;
1781     g_cond_wait (&priv->cond, &priv->mutex);
1782     priv->wait_status &= ~STREAM_WAITING;
1783   }
1784   g_mutex_unlock (&priv->mutex);
1785   return ret;
1786 
1787   /* ERRORS */
1788 flushing:
1789   {
1790     GST_DEBUG_OBJECT (appsrc, "we are flushing");
1791     g_mutex_unlock (&priv->mutex);
1792     return GST_FLOW_FLUSHING;
1793   }
1794 eos:
1795   {
1796     GST_DEBUG_OBJECT (appsrc, "we are EOS");
1797     g_mutex_unlock (&priv->mutex);
1798     return GST_FLOW_EOS;
1799   }
1800 seek_error:
1801   {
1802     g_mutex_unlock (&priv->mutex);
1803     GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1804         GST_ERROR_SYSTEM);
1805     return GST_FLOW_ERROR;
1806   }
1807 
1808 invalid_segment:
1809   {
1810     g_mutex_unlock (&priv->mutex);
1811     GST_ELEMENT_ERROR (appsrc, LIBRARY, SETTINGS,
1812         (NULL), ("Failed to configure the provided input segment."));
1813     return GST_FLOW_ERROR;
1814   }
1815 }
1816 
1817 /* external API */
1818 
1819 /**
1820  * gst_app_src_set_caps:
1821  * @appsrc: a #GstAppSrc
1822  * @caps: (nullable): caps to set
1823  *
1824  * Set the capabilities on the appsrc element.  This function takes
1825  * a copy of the caps structure. After calling this method, the source will
1826  * only produce caps that match @caps. @caps must be fixed and the caps on the
1827  * buffers must match the caps or left NULL.
1828  */
1829 void
gst_app_src_set_caps(GstAppSrc * appsrc,const GstCaps * caps)1830 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1831 {
1832   GstAppSrcPrivate *priv;
1833   gboolean caps_changed;
1834 
1835   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1836 
1837   priv = appsrc->priv;
1838 
1839   g_mutex_lock (&priv->mutex);
1840 
1841   GST_OBJECT_LOCK (appsrc);
1842   if (caps && priv->last_caps)
1843     caps_changed = !gst_caps_is_equal (caps, priv->last_caps);
1844   else
1845     caps_changed = (caps != priv->last_caps);
1846 
1847   if (caps_changed) {
1848     GstCaps *new_caps;
1849     gpointer t;
1850 
1851     new_caps = caps ? gst_caps_copy (caps) : NULL;
1852     GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1853 
1854     while ((t = gst_queue_array_peek_tail (priv->queue)) && GST_IS_CAPS (t)) {
1855       gst_caps_unref (gst_queue_array_pop_tail (priv->queue));
1856     }
1857     gst_queue_array_push_tail (priv->queue, new_caps);
1858     gst_caps_replace (&priv->last_caps, new_caps);
1859 
1860     if ((priv->wait_status & STREAM_WAITING))
1861       g_cond_broadcast (&priv->cond);
1862   }
1863 
1864   GST_OBJECT_UNLOCK (appsrc);
1865 
1866   g_mutex_unlock (&priv->mutex);
1867 }
1868 
1869 /**
1870  * gst_app_src_get_caps:
1871  * @appsrc: a #GstAppSrc
1872  *
1873  * Get the configured caps on @appsrc.
1874  *
1875  * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
1876  */
1877 GstCaps *
gst_app_src_get_caps(GstAppSrc * appsrc)1878 gst_app_src_get_caps (GstAppSrc * appsrc)
1879 {
1880 
1881   GstCaps *caps;
1882 
1883   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1884 
1885   GST_OBJECT_LOCK (appsrc);
1886   if ((caps = appsrc->priv->last_caps))
1887     gst_caps_ref (caps);
1888   GST_OBJECT_UNLOCK (appsrc);
1889 
1890   return caps;
1891 
1892 }
1893 
1894 /**
1895  * gst_app_src_set_size:
1896  * @appsrc: a #GstAppSrc
1897  * @size: the size to set
1898  *
1899  * Set the size of the stream in bytes. A value of -1 means that the size is
1900  * not known.
1901  */
1902 void
gst_app_src_set_size(GstAppSrc * appsrc,gint64 size)1903 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1904 {
1905   GstAppSrcPrivate *priv;
1906 
1907   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1908 
1909   priv = appsrc->priv;
1910 
1911   GST_OBJECT_LOCK (appsrc);
1912   GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1913   priv->size = size;
1914   GST_OBJECT_UNLOCK (appsrc);
1915 }
1916 
1917 /**
1918  * gst_app_src_get_size:
1919  * @appsrc: a #GstAppSrc
1920  *
1921  * Get the size of the stream in bytes. A value of -1 means that the size is
1922  * not known.
1923  *
1924  * Returns: the size of the stream previously set with gst_app_src_set_size();
1925  */
1926 gint64
gst_app_src_get_size(GstAppSrc * appsrc)1927 gst_app_src_get_size (GstAppSrc * appsrc)
1928 {
1929   gint64 size;
1930   GstAppSrcPrivate *priv;
1931 
1932   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1933 
1934   priv = appsrc->priv;
1935 
1936   GST_OBJECT_LOCK (appsrc);
1937   size = priv->size;
1938   GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1939   GST_OBJECT_UNLOCK (appsrc);
1940 
1941   return size;
1942 }
1943 
1944 /**
1945  * gst_app_src_set_duration:
1946  * @appsrc: a #GstAppSrc
1947  * @duration: the duration to set
1948  *
1949  * Set the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
1950  * not known.
1951  *
1952  * Since: 1.10
1953  */
1954 void
gst_app_src_set_duration(GstAppSrc * appsrc,GstClockTime duration)1955 gst_app_src_set_duration (GstAppSrc * appsrc, GstClockTime duration)
1956 {
1957   GstAppSrcPrivate *priv;
1958 
1959   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1960 
1961   priv = appsrc->priv;
1962 
1963   GST_OBJECT_LOCK (appsrc);
1964   GST_DEBUG_OBJECT (appsrc, "setting duration of %" GST_TIME_FORMAT,
1965       GST_TIME_ARGS (duration));
1966   priv->duration = duration;
1967   GST_OBJECT_UNLOCK (appsrc);
1968 }
1969 
1970 /**
1971  * gst_app_src_get_duration:
1972  * @appsrc: a #GstAppSrc
1973  *
1974  * Get the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
1975  * not known.
1976  *
1977  * Returns: the duration of the stream previously set with gst_app_src_set_duration();
1978  *
1979  * Since: 1.10
1980  */
1981 GstClockTime
gst_app_src_get_duration(GstAppSrc * appsrc)1982 gst_app_src_get_duration (GstAppSrc * appsrc)
1983 {
1984   GstClockTime duration;
1985   GstAppSrcPrivate *priv;
1986 
1987   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
1988 
1989   priv = appsrc->priv;
1990 
1991   GST_OBJECT_LOCK (appsrc);
1992   duration = priv->duration;
1993   GST_DEBUG_OBJECT (appsrc, "getting duration of %" GST_TIME_FORMAT,
1994       GST_TIME_ARGS (duration));
1995   GST_OBJECT_UNLOCK (appsrc);
1996 
1997   return duration;
1998 }
1999 
2000 /**
2001  * gst_app_src_set_stream_type:
2002  * @appsrc: a #GstAppSrc
2003  * @type: the new state
2004  *
2005  * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
2006  * be connected to.
2007  *
2008  * A stream_type stream
2009  */
2010 void
gst_app_src_set_stream_type(GstAppSrc * appsrc,GstAppStreamType type)2011 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
2012 {
2013   GstAppSrcPrivate *priv;
2014 
2015   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2016 
2017   priv = appsrc->priv;
2018 
2019   GST_OBJECT_LOCK (appsrc);
2020   GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
2021   priv->stream_type = type;
2022   GST_OBJECT_UNLOCK (appsrc);
2023 }
2024 
2025 /**
2026  * gst_app_src_get_stream_type:
2027  * @appsrc: a #GstAppSrc
2028  *
2029  * Get the stream type. Control the stream type of @appsrc
2030  * with gst_app_src_set_stream_type().
2031  *
2032  * Returns: the stream type.
2033  */
2034 GstAppStreamType
gst_app_src_get_stream_type(GstAppSrc * appsrc)2035 gst_app_src_get_stream_type (GstAppSrc * appsrc)
2036 {
2037   gboolean stream_type;
2038   GstAppSrcPrivate *priv;
2039 
2040   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
2041 
2042   priv = appsrc->priv;
2043 
2044   GST_OBJECT_LOCK (appsrc);
2045   stream_type = priv->stream_type;
2046   GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
2047   GST_OBJECT_UNLOCK (appsrc);
2048 
2049   return stream_type;
2050 }
2051 
2052 /**
2053  * gst_app_src_set_max_bytes:
2054  * @appsrc: a #GstAppSrc
2055  * @max: the maximum number of bytes to queue
2056  *
2057  * Set the maximum amount of bytes that can be queued in @appsrc.
2058  * After the maximum amount of bytes are queued, @appsrc will emit the
2059  * "enough-data" signal.
2060  */
2061 void
gst_app_src_set_max_bytes(GstAppSrc * appsrc,guint64 max)2062 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
2063 {
2064   GstAppSrcPrivate *priv;
2065 
2066   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2067 
2068   priv = appsrc->priv;
2069 
2070   g_mutex_lock (&priv->mutex);
2071   if (max != priv->max_bytes) {
2072     GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
2073     priv->max_bytes = max;
2074     /* signal the change */
2075     g_cond_broadcast (&priv->cond);
2076   }
2077   g_mutex_unlock (&priv->mutex);
2078 }
2079 
2080 /**
2081  * gst_app_src_get_max_bytes:
2082  * @appsrc: a #GstAppSrc
2083  *
2084  * Get the maximum amount of bytes that can be queued in @appsrc.
2085  *
2086  * Returns: The maximum amount of bytes that can be queued.
2087  */
2088 guint64
gst_app_src_get_max_bytes(GstAppSrc * appsrc)2089 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
2090 {
2091   guint64 result;
2092   GstAppSrcPrivate *priv;
2093 
2094   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2095 
2096   priv = appsrc->priv;
2097 
2098   g_mutex_lock (&priv->mutex);
2099   result = priv->max_bytes;
2100   GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
2101   g_mutex_unlock (&priv->mutex);
2102 
2103   return result;
2104 }
2105 
2106 /**
2107  * gst_app_src_get_current_level_bytes:
2108  * @appsrc: a #GstAppSrc
2109  *
2110  * Get the number of currently queued bytes inside @appsrc.
2111  *
2112  * Returns: The number of currently queued bytes.
2113  *
2114  * Since: 1.2
2115  */
2116 guint64
gst_app_src_get_current_level_bytes(GstAppSrc * appsrc)2117 gst_app_src_get_current_level_bytes (GstAppSrc * appsrc)
2118 {
2119   guint64 queued;
2120   GstAppSrcPrivate *priv;
2121 
2122   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
2123 
2124   priv = appsrc->priv;
2125 
2126   GST_OBJECT_LOCK (appsrc);
2127   queued = priv->queued_bytes;
2128   GST_DEBUG_OBJECT (appsrc, "current level bytes is %" G_GUINT64_FORMAT,
2129       queued);
2130   GST_OBJECT_UNLOCK (appsrc);
2131 
2132   return queued;
2133 }
2134 
2135 /**
2136  * gst_app_src_set_max_buffers:
2137  * @appsrc: a #GstAppSrc
2138  * @max: the maximum number of buffers to queue
2139  *
2140  * Set the maximum amount of buffers that can be queued in @appsrc.
2141  * After the maximum amount of buffers are queued, @appsrc will emit the
2142  * "enough-data" signal.
2143  *
2144  * Since: 1.20
2145  */
2146 void
gst_app_src_set_max_buffers(GstAppSrc * appsrc,guint64 max)2147 gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint64 max)
2148 {
2149   GstAppSrcPrivate *priv;
2150 
2151   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2152 
2153   priv = appsrc->priv;
2154 
2155   g_mutex_lock (&priv->mutex);
2156   if (max != priv->max_buffers) {
2157     GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %" G_GUINT64_FORMAT, max);
2158     priv->max_buffers = max;
2159     /* signal the change */
2160     g_cond_broadcast (&priv->cond);
2161   }
2162   g_mutex_unlock (&priv->mutex);
2163 }
2164 
2165 /**
2166  * gst_app_src_get_max_buffers:
2167  * @appsrc: a #GstAppSrc
2168  *
2169  * Get the maximum amount of buffers that can be queued in @appsrc.
2170  *
2171  * Returns: The maximum amount of buffers that can be queued.
2172  *
2173  * Since: 1.20
2174  */
2175 guint64
gst_app_src_get_max_buffers(GstAppSrc * appsrc)2176 gst_app_src_get_max_buffers (GstAppSrc * appsrc)
2177 {
2178   guint64 result;
2179   GstAppSrcPrivate *priv;
2180 
2181   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2182 
2183   priv = appsrc->priv;
2184 
2185   g_mutex_lock (&priv->mutex);
2186   result = priv->max_buffers;
2187   GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %" G_GUINT64_FORMAT,
2188       result);
2189   g_mutex_unlock (&priv->mutex);
2190 
2191   return result;
2192 }
2193 
2194 /**
2195  * gst_app_src_get_current_level_buffers:
2196  * @appsrc: a #GstAppSrc
2197  *
2198  * Get the number of currently queued buffers inside @appsrc.
2199  *
2200  * Returns: The number of currently queued buffers.
2201  *
2202  * Since: 1.20
2203  */
2204 guint64
gst_app_src_get_current_level_buffers(GstAppSrc * appsrc)2205 gst_app_src_get_current_level_buffers (GstAppSrc * appsrc)
2206 {
2207   guint64 queued;
2208   GstAppSrcPrivate *priv;
2209 
2210   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
2211 
2212   priv = appsrc->priv;
2213 
2214   GST_OBJECT_LOCK (appsrc);
2215   queued = priv->queued_buffers;
2216   GST_DEBUG_OBJECT (appsrc, "current level buffers is %" G_GUINT64_FORMAT,
2217       queued);
2218   GST_OBJECT_UNLOCK (appsrc);
2219 
2220   return queued;
2221 }
2222 
2223 /**
2224  * gst_app_src_set_max_time:
2225  * @appsrc: a #GstAppSrc
2226  * @max: the maximum amonut of time to queue
2227  *
2228  * Set the maximum amount of time that can be queued in @appsrc.
2229  * After the maximum amount of time are queued, @appsrc will emit the
2230  * "enough-data" signal.
2231  *
2232  * Since: 1.20
2233  */
2234 void
gst_app_src_set_max_time(GstAppSrc * appsrc,GstClockTime max)2235 gst_app_src_set_max_time (GstAppSrc * appsrc, GstClockTime max)
2236 {
2237   GstAppSrcPrivate *priv;
2238 
2239   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2240 
2241   priv = appsrc->priv;
2242 
2243   g_mutex_lock (&priv->mutex);
2244   if (max != priv->max_time) {
2245     GST_DEBUG_OBJECT (appsrc, "setting max-time to %" GST_TIME_FORMAT,
2246         GST_TIME_ARGS (max));
2247     priv->max_time = max;
2248     /* signal the change */
2249     g_cond_broadcast (&priv->cond);
2250   }
2251   g_mutex_unlock (&priv->mutex);
2252 }
2253 
2254 /**
2255  * gst_app_src_get_max_time:
2256  * @appsrc: a #GstAppSrc
2257  *
2258  * Get the maximum amount of time that can be queued in @appsrc.
2259  *
2260  * Returns: The maximum amount of time that can be queued.
2261  *
2262  * Since: 1.20
2263  */
2264 GstClockTime
gst_app_src_get_max_time(GstAppSrc * appsrc)2265 gst_app_src_get_max_time (GstAppSrc * appsrc)
2266 {
2267   GstClockTime result;
2268   GstAppSrcPrivate *priv;
2269 
2270   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2271 
2272   priv = appsrc->priv;
2273 
2274   g_mutex_lock (&priv->mutex);
2275   result = priv->max_time;
2276   GST_DEBUG_OBJECT (appsrc, "getting max-time of %" GST_TIME_FORMAT,
2277       GST_TIME_ARGS (result));
2278   g_mutex_unlock (&priv->mutex);
2279 
2280   return result;
2281 }
2282 
2283 /**
2284  * gst_app_src_get_current_level_time:
2285  * @appsrc: a #GstAppSrc
2286  *
2287  * Get the amount of currently queued time inside @appsrc.
2288  *
2289  * Returns: The amount of currently queued time.
2290  *
2291  * Since: 1.20
2292  */
2293 GstClockTime
gst_app_src_get_current_level_time(GstAppSrc * appsrc)2294 gst_app_src_get_current_level_time (GstAppSrc * appsrc)
2295 {
2296   gint64 queued;
2297   GstAppSrcPrivate *priv;
2298 
2299   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
2300 
2301   priv = appsrc->priv;
2302 
2303   GST_OBJECT_LOCK (appsrc);
2304   queued = priv->queued_time;
2305   GST_DEBUG_OBJECT (appsrc, "current level time is %" GST_TIME_FORMAT,
2306       GST_TIME_ARGS (queued));
2307   GST_OBJECT_UNLOCK (appsrc);
2308 
2309   return queued;
2310 }
2311 
2312 static void
gst_app_src_set_latencies(GstAppSrc * appsrc,gboolean do_min,guint64 min,gboolean do_max,guint64 max)2313 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
2314     gboolean do_max, guint64 max)
2315 {
2316   GstAppSrcPrivate *priv = appsrc->priv;
2317   gboolean changed = FALSE;
2318 
2319   g_mutex_lock (&priv->mutex);
2320   if (do_min && priv->min_latency != min) {
2321     priv->min_latency = min;
2322     changed = TRUE;
2323   }
2324   if (do_max && priv->max_latency != max) {
2325     priv->max_latency = max;
2326     changed = TRUE;
2327   }
2328   g_mutex_unlock (&priv->mutex);
2329 
2330   if (changed) {
2331     GST_DEBUG_OBJECT (appsrc, "posting latency changed");
2332     gst_element_post_message (GST_ELEMENT_CAST (appsrc),
2333         gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
2334   }
2335 }
2336 
2337 /**
2338  * gst_app_src_set_leaky_type:
2339  * @appsrc: a #GstAppSrc
2340  * @leaky: the #GstAppLeakyType
2341  *
2342  * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
2343  * will drop any buffers that are pushed into it once its internal queue is
2344  * full. The selected type defines whether to drop the oldest or new
2345  * buffers.
2346  *
2347  * Since: 1.20
2348  */
2349 void
gst_app_src_set_leaky_type(GstAppSrc * appsrc,GstAppLeakyType leaky)2350 gst_app_src_set_leaky_type (GstAppSrc * appsrc, GstAppLeakyType leaky)
2351 {
2352   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2353 
2354   appsrc->priv->leaky_type = leaky;
2355 }
2356 
2357 /**
2358  * gst_app_src_get_leaky_type:
2359  * @appsrc: a #GstAppSrc
2360  *
2361  * Returns the currently set #GstAppLeakyType. See gst_app_src_set_leaky_type()
2362  * for more details.
2363  *
2364  * Returns: The currently set #GstAppLeakyType.
2365  *
2366  * Since: 1.20
2367  */
2368 GstAppLeakyType
gst_app_src_get_leaky_type(GstAppSrc * appsrc)2369 gst_app_src_get_leaky_type (GstAppSrc * appsrc)
2370 {
2371   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_APP_LEAKY_TYPE_NONE);
2372 
2373   return appsrc->priv->leaky_type;
2374 }
2375 
2376 /**
2377  * gst_app_src_set_latency:
2378  * @appsrc: a #GstAppSrc
2379  * @min: the min latency
2380  * @max: the max latency
2381  *
2382  * Configure the @min and @max latency in @src. If @min is set to -1, the
2383  * default latency calculations for pseudo-live sources will be used.
2384  */
2385 void
gst_app_src_set_latency(GstAppSrc * appsrc,guint64 min,guint64 max)2386 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
2387 {
2388   gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
2389 }
2390 
2391 /**
2392  * gst_app_src_get_latency:
2393  * @appsrc: a #GstAppSrc
2394  * @min: (out): the min latency
2395  * @max: (out): the max latency
2396  *
2397  * Retrieve the min and max latencies in @min and @max respectively.
2398  */
2399 void
gst_app_src_get_latency(GstAppSrc * appsrc,guint64 * min,guint64 * max)2400 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
2401 {
2402   GstAppSrcPrivate *priv;
2403 
2404   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2405 
2406   priv = appsrc->priv;
2407 
2408   g_mutex_lock (&priv->mutex);
2409   if (min)
2410     *min = priv->min_latency;
2411   if (max)
2412     *max = priv->max_latency;
2413   g_mutex_unlock (&priv->mutex);
2414 }
2415 
2416 /**
2417  * gst_app_src_set_emit_signals:
2418  * @appsrc: a #GstAppSrc
2419  * @emit: the new state
2420  *
2421  * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
2422  * by default disabled because signal emission is expensive and unneeded when
2423  * the application prefers to operate in pull mode.
2424  */
2425 void
gst_app_src_set_emit_signals(GstAppSrc * appsrc,gboolean emit)2426 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
2427 {
2428   GstAppSrcPrivate *priv;
2429 
2430   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2431 
2432   priv = appsrc->priv;
2433 
2434   g_mutex_lock (&priv->mutex);
2435   priv->emit_signals = emit;
2436   g_mutex_unlock (&priv->mutex);
2437 }
2438 
2439 /**
2440  * gst_app_src_get_emit_signals:
2441  * @appsrc: a #GstAppSrc
2442  *
2443  * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
2444  *
2445  * Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
2446  * signals.
2447  */
2448 gboolean
gst_app_src_get_emit_signals(GstAppSrc * appsrc)2449 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
2450 {
2451   gboolean result;
2452   GstAppSrcPrivate *priv;
2453 
2454   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
2455 
2456   priv = appsrc->priv;
2457 
2458   g_mutex_lock (&priv->mutex);
2459   result = priv->emit_signals;
2460   g_mutex_unlock (&priv->mutex);
2461 
2462   return result;
2463 }
2464 
2465 static GstFlowReturn
gst_app_src_push_internal(GstAppSrc * appsrc,GstBuffer * buffer,GstBufferList * buflist,gboolean steal_ref)2466 gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
2467     GstBufferList * buflist, gboolean steal_ref)
2468 {
2469   gboolean first = TRUE;
2470   GstAppSrcPrivate *priv;
2471 
2472   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
2473 
2474   priv = appsrc->priv;
2475 
2476   if (buffer != NULL)
2477     g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2478   else
2479     g_return_val_if_fail (GST_IS_BUFFER_LIST (buflist), GST_FLOW_ERROR);
2480 
2481   if (buflist != NULL) {
2482     if (gst_buffer_list_length (buflist) == 0)
2483       return GST_FLOW_OK;
2484 
2485     buffer = gst_buffer_list_get (buflist, 0);
2486   }
2487 
2488   if (GST_BUFFER_DTS (buffer) == GST_CLOCK_TIME_NONE &&
2489       GST_BUFFER_PTS (buffer) == GST_CLOCK_TIME_NONE &&
2490       gst_base_src_get_do_timestamp (GST_BASE_SRC_CAST (appsrc))) {
2491     GstClock *clock;
2492 
2493     clock = gst_element_get_clock (GST_ELEMENT_CAST (appsrc));
2494     if (clock) {
2495       GstClockTime now;
2496       GstClockTime base_time =
2497           gst_element_get_base_time (GST_ELEMENT_CAST (appsrc));
2498 
2499       now = gst_clock_get_time (clock);
2500       if (now > base_time)
2501         now -= base_time;
2502       else
2503         now = 0;
2504       gst_object_unref (clock);
2505 
2506       if (buflist == NULL) {
2507         if (!steal_ref) {
2508           buffer = gst_buffer_copy (buffer);
2509           steal_ref = TRUE;
2510         } else {
2511           buffer = gst_buffer_make_writable (buffer);
2512         }
2513       } else {
2514         if (!steal_ref) {
2515           buflist = gst_buffer_list_copy (buflist);
2516           steal_ref = TRUE;
2517         } else {
2518           buflist = gst_buffer_list_make_writable (buflist);
2519         }
2520         buffer = gst_buffer_list_get_writable (buflist, 0);
2521       }
2522 
2523       GST_BUFFER_PTS (buffer) = now;
2524       GST_BUFFER_DTS (buffer) = now;
2525     } else {
2526       GST_WARNING_OBJECT (appsrc,
2527           "do-timestamp=TRUE but buffers are provided before "
2528           "reaching the PLAYING state and having a clock. Timestamps will "
2529           "not be accurate!");
2530     }
2531   }
2532 
2533   g_mutex_lock (&priv->mutex);
2534 
2535   while (TRUE) {
2536     /* can't accept buffers when we are flushing or EOS */
2537     if (priv->flushing)
2538       goto flushing;
2539 
2540     if (priv->is_eos)
2541       goto eos;
2542 
2543     if ((priv->max_bytes && priv->queued_bytes >= priv->max_bytes) ||
2544         (priv->max_buffers && priv->queued_buffers >= priv->max_buffers) ||
2545         (priv->max_time && priv->queued_time >= priv->max_time)) {
2546       GST_DEBUG_OBJECT (appsrc,
2547           "queue filled (queued %" G_GUINT64_FORMAT " bytes, max %"
2548           G_GUINT64_FORMAT " bytes, " "queued %" G_GUINT64_FORMAT
2549           " buffers, max %" G_GUINT64_FORMAT " buffers, " "queued %"
2550           GST_TIME_FORMAT " time, max %" GST_TIME_FORMAT " time)",
2551           priv->queued_bytes, priv->max_bytes, priv->queued_buffers,
2552           priv->max_buffers, GST_TIME_ARGS (priv->queued_time),
2553           GST_TIME_ARGS (priv->max_time));
2554 
2555       if (first) {
2556         Callbacks *callbacks = NULL;
2557         gboolean emit;
2558 
2559         emit = priv->emit_signals;
2560         if (priv->callbacks)
2561           callbacks = callbacks_ref (priv->callbacks);
2562         /* only signal on the first push */
2563         g_mutex_unlock (&priv->mutex);
2564 
2565         if (callbacks && callbacks->callbacks.enough_data)
2566           callbacks->callbacks.enough_data (appsrc, callbacks->user_data);
2567         else if (emit)
2568           g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
2569               NULL);
2570 
2571         g_clear_pointer (&callbacks, callbacks_unref);
2572 
2573         g_mutex_lock (&priv->mutex);
2574       }
2575 
2576       if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) {
2577         priv->need_discont_upstream = TRUE;
2578         goto dropped;
2579       } else if (priv->leaky_type == GST_APP_LEAKY_TYPE_DOWNSTREAM) {
2580         guint i, length = gst_queue_array_get_length (priv->queue);
2581         GstMiniObject *item = NULL;
2582 
2583         /* Find the oldest buffer or buffer list and drop it, then update the
2584          * limits. Dropping one is sufficient to go below the limits again.
2585          */
2586         for (i = 0; i < length; i++) {
2587           item = gst_queue_array_peek_nth (priv->queue, i);
2588           if (GST_IS_BUFFER (item) || GST_IS_BUFFER_LIST (item)) {
2589             gst_queue_array_drop_element (priv->queue, i);
2590             break;
2591           }
2592           /* To not accidentally have an event after the loop */
2593           item = NULL;
2594         }
2595 
2596         if (!item) {
2597           GST_FIXME_OBJECT (appsrc,
2598               "No buffer or buffer list queued but queue is full");
2599           /* This shouldn't really happen but in this case we can't really do
2600            * anything apart from accepting the buffer / bufferlist */
2601           break;
2602         }
2603 
2604         GST_WARNING_OBJECT (appsrc, "Dropping old item %" GST_PTR_FORMAT, item);
2605 
2606         gst_app_src_update_queued_pop (appsrc, item, FALSE);
2607         gst_mini_object_unref (item);
2608 
2609         priv->need_discont_downstream = TRUE;
2610         continue;
2611       }
2612 
2613       if (first) {
2614         /* continue to check for flushing/eos after releasing the lock */
2615         first = FALSE;
2616         continue;
2617       }
2618       if (priv->block) {
2619         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
2620         /* we are filled, wait until a buffer gets popped or when we
2621          * flush. */
2622         priv->wait_status |= APP_WAITING;
2623         g_cond_wait (&priv->cond, &priv->mutex);
2624         priv->wait_status &= ~APP_WAITING;
2625       } else {
2626         /* no need to wait for free space, we just pump more data into the
2627          * queue hoping that the caller reacts to the enough-data signal and
2628          * stops pushing buffers. */
2629         break;
2630       }
2631     } else {
2632       break;
2633     }
2634   }
2635 
2636   if (priv->pending_custom_segment) {
2637     GstEvent *event = gst_event_new_segment (&priv->last_segment);
2638 
2639     GST_DEBUG_OBJECT (appsrc, "enqueue new segment %" GST_PTR_FORMAT, event);
2640     gst_queue_array_push_tail (priv->queue, event);
2641     priv->pending_custom_segment = FALSE;
2642   }
2643 
2644   if (buflist != NULL) {
2645     /* Mark the first buffer of the buffer list as DISCONT if we previously
2646      * dropped a buffer instead of queueing it */
2647     if (priv->need_discont_upstream) {
2648       if (!steal_ref) {
2649         buflist = gst_buffer_list_copy (buflist);
2650         steal_ref = TRUE;
2651       } else {
2652         buflist = gst_buffer_list_make_writable (buflist);
2653       }
2654       buffer = gst_buffer_list_get_writable (buflist, 0);
2655       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2656       priv->need_discont_upstream = FALSE;
2657     }
2658 
2659     GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);
2660 
2661     if (!steal_ref)
2662       gst_buffer_list_ref (buflist);
2663     gst_queue_array_push_tail (priv->queue, buflist);
2664   } else {
2665     /* Mark the buffer as DISCONT if we previously dropped a buffer instead of
2666      * queueing it */
2667     if (priv->need_discont_upstream) {
2668       if (!steal_ref) {
2669         buffer = gst_buffer_copy (buffer);
2670         steal_ref = TRUE;
2671       } else {
2672         buffer = gst_buffer_make_writable (buffer);
2673       }
2674       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2675       priv->need_discont_upstream = FALSE;
2676     }
2677 
2678     GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
2679     if (!steal_ref)
2680       gst_buffer_ref (buffer);
2681     gst_queue_array_push_tail (priv->queue, buffer);
2682   }
2683 
2684   gst_app_src_update_queued_push (appsrc,
2685       buflist ? GST_MINI_OBJECT_CAST (buflist) : GST_MINI_OBJECT_CAST (buffer));
2686 
2687   if ((priv->wait_status & STREAM_WAITING))
2688     g_cond_broadcast (&priv->cond);
2689 
2690   g_mutex_unlock (&priv->mutex);
2691 
2692   return GST_FLOW_OK;
2693 
2694   /* ERRORS */
2695 flushing:
2696   {
2697     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
2698     if (steal_ref) {
2699       if (buflist)
2700         gst_buffer_list_unref (buflist);
2701       else
2702         gst_buffer_unref (buffer);
2703     }
2704     g_mutex_unlock (&priv->mutex);
2705     return GST_FLOW_FLUSHING;
2706   }
2707 eos:
2708   {
2709     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
2710     if (steal_ref) {
2711       if (buflist)
2712         gst_buffer_list_unref (buflist);
2713       else
2714         gst_buffer_unref (buffer);
2715     }
2716     g_mutex_unlock (&priv->mutex);
2717     return GST_FLOW_EOS;
2718   }
2719 dropped:
2720   {
2721     GST_DEBUG_OBJECT (appsrc, "dropped new buffer %p, we are full", buffer);
2722     if (steal_ref) {
2723       if (buflist)
2724         gst_buffer_list_unref (buflist);
2725       else
2726         gst_buffer_unref (buffer);
2727     }
2728     g_mutex_unlock (&priv->mutex);
2729     return GST_FLOW_EOS;
2730   }
2731 }
2732 
2733 static GstFlowReturn
gst_app_src_push_buffer_full(GstAppSrc * appsrc,GstBuffer * buffer,gboolean steal_ref)2734 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
2735     gboolean steal_ref)
2736 {
2737   return gst_app_src_push_internal (appsrc, buffer, NULL, steal_ref);
2738 }
2739 
2740 static GstFlowReturn
gst_app_src_push_sample_internal(GstAppSrc * appsrc,GstSample * sample)2741 gst_app_src_push_sample_internal (GstAppSrc * appsrc, GstSample * sample)
2742 {
2743   GstAppSrcPrivate *priv = appsrc->priv;
2744   GstBufferList *buffer_list;
2745   GstBuffer *buffer;
2746   GstCaps *caps;
2747 
2748   g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
2749 
2750   caps = gst_sample_get_caps (sample);
2751   if (caps != NULL) {
2752     gst_app_src_set_caps (appsrc, caps);
2753   } else {
2754     GST_WARNING_OBJECT (appsrc, "received sample without caps");
2755   }
2756 
2757   if (priv->handle_segment_change && priv->format == GST_FORMAT_TIME) {
2758     GstSegment *segment = gst_sample_get_segment (sample);
2759 
2760     if (segment->format != GST_FORMAT_TIME) {
2761       GST_LOG_OBJECT (appsrc, "format %s is not supported",
2762           gst_format_get_name (segment->format));
2763       goto handle_buffer;
2764     }
2765 
2766     g_mutex_lock (&priv->mutex);
2767     if (gst_segment_is_equal (&priv->last_segment, segment)) {
2768       GST_LOG_OBJECT (appsrc, "segment wasn't changed");
2769       g_mutex_unlock (&priv->mutex);
2770       goto handle_buffer;
2771     } else {
2772       GST_LOG_OBJECT (appsrc,
2773           "segment changed %" GST_SEGMENT_FORMAT " -> %" GST_SEGMENT_FORMAT,
2774           &priv->last_segment, segment);
2775     }
2776 
2777     /* will be pushed to queue with next buffer/buffer-list */
2778     gst_segment_copy_into (segment, &priv->last_segment);
2779     priv->pending_custom_segment = TRUE;
2780     g_mutex_unlock (&priv->mutex);
2781   }
2782 
2783 handle_buffer:
2784 
2785   buffer = gst_sample_get_buffer (sample);
2786   if (buffer != NULL)
2787     return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
2788 
2789   buffer_list = gst_sample_get_buffer_list (sample);
2790   if (buffer_list != NULL)
2791     return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
2792 
2793   GST_WARNING_OBJECT (appsrc, "received sample without buffer or buffer list");
2794   return GST_FLOW_OK;
2795 }
2796 
2797 /**
2798  * gst_app_src_push_buffer:
2799  * @appsrc: a #GstAppSrc
2800  * @buffer: (transfer full): a #GstBuffer to push
2801  *
2802  * Adds a buffer to the queue of buffers that the appsrc element will
2803  * push to its source pad.  This function takes ownership of the buffer.
2804  *
2805  * When the block property is TRUE, this function can block until free
2806  * space becomes available in the queue.
2807  *
2808  * Returns: #GST_FLOW_OK when the buffer was successfully queued.
2809  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2810  * #GST_FLOW_EOS when EOS occurred.
2811  */
2812 GstFlowReturn
gst_app_src_push_buffer(GstAppSrc * appsrc,GstBuffer * buffer)2813 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
2814 {
2815   return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
2816 }
2817 
2818 /**
2819  * gst_app_src_push_buffer_list:
2820  * @appsrc: a #GstAppSrc
2821  * @buffer_list: (transfer full): a #GstBufferList to push
2822  *
2823  * Adds a buffer list to the queue of buffers and buffer lists that the
2824  * appsrc element will push to its source pad.  This function takes ownership
2825  * of @buffer_list.
2826  *
2827  * When the block property is TRUE, this function can block until free
2828  * space becomes available in the queue.
2829  *
2830  * Returns: #GST_FLOW_OK when the buffer list was successfully queued.
2831  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2832  * #GST_FLOW_EOS when EOS occurred.
2833  *
2834  * Since: 1.14
2835  */
2836 GstFlowReturn
gst_app_src_push_buffer_list(GstAppSrc * appsrc,GstBufferList * buffer_list)2837 gst_app_src_push_buffer_list (GstAppSrc * appsrc, GstBufferList * buffer_list)
2838 {
2839   return gst_app_src_push_internal (appsrc, NULL, buffer_list, TRUE);
2840 }
2841 
2842 /**
2843  * gst_app_src_push_sample:
2844  * @appsrc: a #GstAppSrc
2845  * @sample: (transfer none): a #GstSample from which buffer and caps may be
2846  * extracted
2847  *
2848  * Extract a buffer from the provided sample and adds it to the queue of
2849  * buffers that the appsrc element will push to its source pad. Any
2850  * previous caps that were set on appsrc will be replaced by the caps
2851  * associated with the sample if not equal.
2852  *
2853  * This function does not take ownership of the
2854  * sample so the sample needs to be unreffed after calling this function.
2855  *
2856  * When the block property is TRUE, this function can block until free
2857  * space becomes available in the queue.
2858  *
2859  * Returns: #GST_FLOW_OK when the buffer was successfully queued.
2860  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2861  * #GST_FLOW_EOS when EOS occurred.
2862  *
2863  * Since: 1.6
2864  *
2865  */
2866 GstFlowReturn
gst_app_src_push_sample(GstAppSrc * appsrc,GstSample * sample)2867 gst_app_src_push_sample (GstAppSrc * appsrc, GstSample * sample)
2868 {
2869   return gst_app_src_push_sample_internal (appsrc, sample);
2870 }
2871 
2872 /* push a buffer without stealing the ref of the buffer. This is used for the
2873  * action signal. */
2874 static GstFlowReturn
gst_app_src_push_buffer_action(GstAppSrc * appsrc,GstBuffer * buffer)2875 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
2876 {
2877   return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
2878 }
2879 
2880 /* push a buffer list without stealing the ref of the buffer list. This is
2881  * used for the action signal. */
2882 static GstFlowReturn
gst_app_src_push_buffer_list_action(GstAppSrc * appsrc,GstBufferList * buffer_list)2883 gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
2884     GstBufferList * buffer_list)
2885 {
2886   return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
2887 }
2888 
2889 /* push a sample without stealing the ref. This is used for the
2890  * action signal. */
2891 static GstFlowReturn
gst_app_src_push_sample_action(GstAppSrc * appsrc,GstSample * sample)2892 gst_app_src_push_sample_action (GstAppSrc * appsrc, GstSample * sample)
2893 {
2894   return gst_app_src_push_sample_internal (appsrc, sample);
2895 }
2896 
2897 /**
2898  * gst_app_src_end_of_stream:
2899  * @appsrc: a #GstAppSrc
2900  *
2901  * Indicates to the appsrc element that the last buffer queued in the
2902  * element is the last buffer of the stream.
2903  *
2904  * Returns: #GST_FLOW_OK when the EOS was successfully queued.
2905  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2906  */
2907 GstFlowReturn
gst_app_src_end_of_stream(GstAppSrc * appsrc)2908 gst_app_src_end_of_stream (GstAppSrc * appsrc)
2909 {
2910   GstAppSrcPrivate *priv;
2911 
2912   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
2913 
2914   priv = appsrc->priv;
2915 
2916   g_mutex_lock (&priv->mutex);
2917   /* can't accept buffers when we are flushing. We can accept them when we are
2918    * EOS although it will not do anything. */
2919   if (priv->flushing)
2920     goto flushing;
2921 
2922   GST_DEBUG_OBJECT (appsrc, "sending EOS");
2923   priv->is_eos = TRUE;
2924   g_cond_broadcast (&priv->cond);
2925   g_mutex_unlock (&priv->mutex);
2926 
2927   return GST_FLOW_OK;
2928 
2929   /* ERRORS */
2930 flushing:
2931   {
2932     g_mutex_unlock (&priv->mutex);
2933     GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
2934     return GST_FLOW_FLUSHING;
2935   }
2936 }
2937 
2938 /**
2939  * gst_app_src_set_callbacks: (skip)
2940  * @appsrc: a #GstAppSrc
2941  * @callbacks: the callbacks
2942  * @user_data: a user_data argument for the callbacks
2943  * @notify: a destroy notify function
2944  *
2945  * Set callbacks which will be executed when data is needed, enough data has
2946  * been collected or when a seek should be performed.
2947  * This is an alternative to using the signals, it has lower overhead and is thus
2948  * less expensive, but also less flexible.
2949  *
2950  * If callbacks are installed, no signals will be emitted for performance
2951  * reasons.
2952  *
2953  * Before 1.16.3 it was not possible to change the callbacks in a thread-safe
2954  * way.
2955  */
2956 void
gst_app_src_set_callbacks(GstAppSrc * appsrc,GstAppSrcCallbacks * callbacks,gpointer user_data,GDestroyNotify notify)2957 gst_app_src_set_callbacks (GstAppSrc * appsrc,
2958     GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
2959 {
2960   Callbacks *old_callbacks, *new_callbacks = NULL;
2961   GstAppSrcPrivate *priv;
2962 
2963   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2964   g_return_if_fail (callbacks != NULL);
2965 
2966   priv = appsrc->priv;
2967 
2968   if (callbacks) {
2969     new_callbacks = g_new0 (Callbacks, 1);
2970     new_callbacks->callbacks = *callbacks;
2971     new_callbacks->user_data = user_data;
2972     new_callbacks->destroy_notify = notify;
2973     new_callbacks->ref_count = 1;
2974   }
2975 
2976   g_mutex_lock (&priv->mutex);
2977   old_callbacks = g_steal_pointer (&priv->callbacks);
2978   priv->callbacks = g_steal_pointer (&new_callbacks);
2979   g_mutex_unlock (&priv->mutex);
2980 
2981   g_clear_pointer (&old_callbacks, callbacks_unref);
2982 }
2983 
2984 /*** GSTURIHANDLER INTERFACE *************************************************/
2985 
2986 static GstURIType
gst_app_src_uri_get_type(GType type)2987 gst_app_src_uri_get_type (GType type)
2988 {
2989   return GST_URI_SRC;
2990 }
2991 
2992 static const gchar *const *
gst_app_src_uri_get_protocols(GType type)2993 gst_app_src_uri_get_protocols (GType type)
2994 {
2995   static const gchar *protocols[] = { "appsrc", NULL };
2996 
2997   return protocols;
2998 }
2999 
3000 static gchar *
gst_app_src_uri_get_uri(GstURIHandler * handler)3001 gst_app_src_uri_get_uri (GstURIHandler * handler)
3002 {
3003   GstAppSrc *appsrc = GST_APP_SRC (handler);
3004 
3005   return appsrc->priv->uri ? g_strdup (appsrc->priv->uri) : NULL;
3006 }
3007 
3008 static gboolean
gst_app_src_uri_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)3009 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
3010     GError ** error)
3011 {
3012   GstAppSrc *appsrc = GST_APP_SRC (handler);
3013 
3014   g_free (appsrc->priv->uri);
3015   appsrc->priv->uri = uri ? g_strdup (uri) : NULL;
3016 
3017   return TRUE;
3018 }
3019 
3020 static void
gst_app_src_uri_handler_init(gpointer g_iface,gpointer iface_data)3021 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
3022 {
3023   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
3024 
3025   iface->get_type = gst_app_src_uri_get_type;
3026   iface->get_protocols = gst_app_src_uri_get_protocols;
3027   iface->get_uri = gst_app_src_uri_get_uri;
3028   iface->set_uri = gst_app_src_uri_set_uri;
3029 }
3030 
3031 static gboolean
gst_app_src_event(GstBaseSrc * src,GstEvent * event)3032 gst_app_src_event (GstBaseSrc * src, GstEvent * event)
3033 {
3034   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
3035   GstAppSrcPrivate *priv = appsrc->priv;
3036 
3037   switch (GST_EVENT_TYPE (event)) {
3038     case GST_EVENT_FLUSH_STOP:
3039       g_mutex_lock (&priv->mutex);
3040       priv->is_eos = FALSE;
3041       g_mutex_unlock (&priv->mutex);
3042       break;
3043     default:
3044       break;
3045   }
3046 
3047   return GST_BASE_SRC_CLASS (parent_class)->event (src, event);
3048 }
3049