• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 /**
21  * SECTION:gstappsrc
22  * @title: GstAppSrc
23  * @short_description: Easy way for applications to inject buffers into a
24  *     pipeline
25  * @see_also: #GstBaseSrc, appsink
26  *
27  * The appsrc element can be used by applications to insert data into a
28  * GStreamer pipeline. Unlike most GStreamer elements, appsrc provides
29  * external API functions.
30  *
31  * appsrc can be used by linking with the libgstapp library to access the
32  * methods directly or by using the appsrc action signals.
33  *
34  * Before operating appsrc, the caps property must be set to fixed caps
35  * describing the format of the data that will be pushed with appsrc. An
36  * exception to this is when pushing buffers with unknown caps, in which case no
37  * caps should be set. This is typically true of file-like sources that push raw
38  * byte buffers. If you don't want to explicitly set the caps, you can use
39  * gst_app_src_push_sample. This method gets the caps associated with the
40  * sample and sets them on the appsrc replacing any previously set caps (if
41  * different from sample's caps).
42  *
43  * The main way of handing data to the appsrc element is by calling the
44  * gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
45  * This will put the buffer onto a queue from which appsrc will read from in its
46  * streaming thread. It is important to note that data transport will not happen
47  * from the thread that performed the push-buffer call.
48  *
49  * The "max-bytes", "max-buffers" and "max-time" properties control how much
50  * data can be queued in appsrc before appsrc considers the queue full. A
51  * filled internal queue will always signal the "enough-data" signal, which
52  * signals the application that it should stop pushing data into appsrc. The
53  * "block" property will cause appsrc to block the push-buffer method until
54  * free data becomes available again.
55  *
56  * When the internal queue is running out of data, the "need-data" signal is
57  * emitted, which signals the application that it should start pushing more data
58  * into appsrc.
59  *
60  * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
61  * "seek-data" signal when the "stream-mode" property is set to "seekable" or
62  * "random-access". The signal argument will contain the new desired position in
63  * the stream expressed in the unit set with the "format" property. After
64  * receiving the seek-data signal, the application should push-buffers from the
65  * new position.
66  *
67  * These signals allow the application to operate the appsrc in two different
68  * ways:
69  *
70  * The push mode, in which the application repeatedly calls the push-buffer/push-sample
71  * method with a new buffer/sample. Optionally, the queue size in the appsrc
72  * can be controlled with the enough-data and need-data signals by respectively
73  * stopping/starting the push-buffer/push-sample calls. This is a typical
74  * mode of operation for the stream-type "stream" and "seekable". Use this
75  * mode when implementing various network protocols or hardware devices.
76  *
77  * The pull mode, in which the need-data signal triggers the next push-buffer call.
78  * This mode is typically used in the "random-access" stream-type. Use this
79  * mode for file access or other randomly accessible sources. In this mode, a
80  * buffer of exactly the amount of bytes given by the need-data signal should be
81  * pushed into appsrc.
82  *
83  * In all modes, the size property on appsrc should contain the total stream
84  * size in bytes. Setting this property is mandatory in the random-access mode.
85  * For the stream and seekable modes, setting this property is optional but
86  * recommended.
87  *
88  * When the application has finished pushing data into appsrc, it should call
89  * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
90  * this call, no more buffers can be pushed into appsrc until a flushing seek
91  * occurs or the state of the appsrc has gone through READY.
92  */
93 
94 #ifdef HAVE_CONFIG_H
95 #include "config.h"
96 #endif
97 
98 #include <gst/gst.h>
99 #include <gst/base/base.h>
100 
101 #include <string.h>
102 
103 #include "gstappsrc.h"
104 
105 typedef enum
106 {
107   NOONE_WAITING = 0,
108   STREAM_WAITING = 1 << 0,      /* streaming thread is waiting for application thread */
109   APP_WAITING = 1 << 1,         /* application thread is waiting for streaming thread */
110 } GstAppSrcWaitStatus;
111 
112 typedef struct
113 {
114   GstAppSrcCallbacks callbacks;
115   gpointer user_data;
116   GDestroyNotify destroy_notify;
117   gint ref_count;
118 } Callbacks;
119 
120 static Callbacks *
callbacks_ref(Callbacks * callbacks)121 callbacks_ref (Callbacks * callbacks)
122 {
123   g_atomic_int_inc (&callbacks->ref_count);
124 
125   return callbacks;
126 }
127 
128 static void
callbacks_unref(Callbacks * callbacks)129 callbacks_unref (Callbacks * callbacks)
130 {
131   if (!g_atomic_int_dec_and_test (&callbacks->ref_count))
132     return;
133 
134   if (callbacks->destroy_notify)
135     callbacks->destroy_notify (callbacks->user_data);
136 
137   g_free (callbacks);
138 }
139 
140 
141 struct _GstAppSrcPrivate
142 {
143   GCond cond;
144   GMutex mutex;
145   GstQueueArray *queue;
146   GstAppSrcWaitStatus wait_status;
147 
148   GstCaps *last_caps;
149   GstCaps *current_caps;
150   /* last segment received on the input */
151   GstSegment last_segment;
152   /* currently configured segment for the output */
153   GstSegment current_segment;
154   /* queue up a segment event based on last_segment before
155    * the next buffer of buffer list */
156   gboolean pending_custom_segment;
157 
158   /* the next buffer that will be queued needs a discont flag
159    * because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */
160   gboolean need_discont_upstream;
161   /* the next buffer that will be dequeued needs a discont flag
162    * because the previous one was dropped - GST_APP_LEAKY_TYPE_DOWNSTREAM */
163   gboolean need_discont_downstream;
164 
165   gint64 size;
166   GstClockTime duration;
167   GstAppStreamType stream_type;
168   guint64 max_bytes, max_buffers, max_time;
169   GstFormat format;
170   gboolean block;
171   gchar *uri;
172 
173   gboolean flushing;
174   gboolean started;
175   gboolean is_eos;
176   guint64 queued_bytes, queued_buffers;
177   /* Used to calculate the current time level */
178   GstClockTime last_in_running_time, last_out_running_time;
179   /* Updated based on the above whenever they change */
180   GstClockTime queued_time;
181   guint64 offset;
182   GstAppStreamType current_type;
183 
184   guint64 min_latency;
185   guint64 max_latency;
186   gboolean emit_signals;
187   guint min_percent;
188   gboolean handle_segment_change;
189 
190   GstAppLeakyType leaky_type;
191 
192   Callbacks *callbacks;
193 #ifdef OHOS_OPT_COMPAT
194   // ohos.opt.compat.0060
195   gboolean datasrc_mode;
196 #endif
197 };
198 
199 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
200 #define GST_CAT_DEFAULT app_src_debug
201 
202 enum
203 {
204   /* signals */
205   SIGNAL_NEED_DATA,
206   SIGNAL_ENOUGH_DATA,
207   SIGNAL_SEEK_DATA,
208 
209   /* actions */
210   SIGNAL_PUSH_BUFFER,
211   SIGNAL_END_OF_STREAM,
212   SIGNAL_PUSH_SAMPLE,
213   SIGNAL_PUSH_BUFFER_LIST,
214 
215   LAST_SIGNAL
216 };
217 
218 #define DEFAULT_PROP_SIZE          -1
219 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
220 #define DEFAULT_PROP_MAX_BYTES     200000
221 #define DEFAULT_PROP_MAX_BUFFERS   0
222 #define DEFAULT_PROP_MAX_TIME      (0 * GST_SECOND)
223 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
224 #define DEFAULT_PROP_BLOCK         FALSE
225 #define DEFAULT_PROP_IS_LIVE       FALSE
226 #define DEFAULT_PROP_MIN_LATENCY   -1
227 #define DEFAULT_PROP_MAX_LATENCY   -1
228 #define DEFAULT_PROP_EMIT_SIGNALS  TRUE
229 #define DEFAULT_PROP_MIN_PERCENT   0
230 #define DEFAULT_PROP_CURRENT_LEVEL_BYTES   0
231 #define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0
232 #define DEFAULT_PROP_CURRENT_LEVEL_TIME    0
233 #define DEFAULT_PROP_DURATION      GST_CLOCK_TIME_NONE
234 #define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE
235 #define DEFAULT_PROP_LEAKY_TYPE    GST_APP_LEAKY_TYPE_NONE
236 
237 enum
238 {
239   PROP_0,
240   PROP_CAPS,
241   PROP_SIZE,
242   PROP_STREAM_TYPE,
243   PROP_MAX_BYTES,
244   PROP_MAX_BUFFERS,
245   PROP_MAX_TIME,
246   PROP_FORMAT,
247   PROP_BLOCK,
248   PROP_IS_LIVE,
249   PROP_MIN_LATENCY,
250   PROP_MAX_LATENCY,
251   PROP_EMIT_SIGNALS,
252   PROP_MIN_PERCENT,
253   PROP_CURRENT_LEVEL_BYTES,
254   PROP_CURRENT_LEVEL_BUFFERS,
255   PROP_CURRENT_LEVEL_TIME,
256   PROP_DURATION,
257   PROP_HANDLE_SEGMENT_CHANGE,
258   PROP_LEAKY_TYPE,
259 #ifdef OHOS_OPT_COMPAT
260   // ohos.opt.compat.0060
261   PROP_DATASRC_MODE,
262 #endif
263   PROP_LAST
264 };
265 
266 static GstStaticPadTemplate gst_app_src_template =
267 GST_STATIC_PAD_TEMPLATE ("src",
268     GST_PAD_SRC,
269     GST_PAD_ALWAYS,
270     GST_STATIC_CAPS_ANY);
271 
272 static void gst_app_src_uri_handler_init (gpointer g_iface,
273     gpointer iface_data);
274 
275 static void gst_app_src_dispose (GObject * object);
276 static void gst_app_src_finalize (GObject * object);
277 
278 static void gst_app_src_set_property (GObject * object, guint prop_id,
279     const GValue * value, GParamSpec * pspec);
280 static void gst_app_src_get_property (GObject * object, guint prop_id,
281     GValue * value, GParamSpec * pspec);
282 
283 static gboolean gst_app_src_send_event (GstElement * element, GstEvent * event);
284 
285 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
286     gboolean do_min, guint64 min, gboolean do_max, guint64 max);
287 
288 static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc);
289 static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc,
290     GstCaps * filter);
291 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset,
292     guint size, GstBuffer ** buf);
293 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
294 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
295 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
296 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
297 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
298 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
299 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
300 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
301 static gboolean gst_app_src_event (GstBaseSrc * src, GstEvent * event);
302 
303 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
304     GstBuffer * buffer);
305 static GstFlowReturn gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
306     GstBufferList * buffer_list);
307 static GstFlowReturn gst_app_src_push_sample_action (GstAppSrc * appsrc,
308     GstSample * sample);
309 
310 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
311 
312 #define gst_app_src_parent_class parent_class
313 G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
314     G_ADD_PRIVATE (GstAppSrc)
315     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));
316 
317 static void
gst_app_src_class_init(GstAppSrcClass * klass)318 gst_app_src_class_init (GstAppSrcClass * klass)
319 {
320   GObjectClass *gobject_class = (GObjectClass *) klass;
321   GstElementClass *element_class = (GstElementClass *) klass;
322   GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
323 
324   GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
325 
326   gobject_class->dispose = gst_app_src_dispose;
327   gobject_class->finalize = gst_app_src_finalize;
328 
329   gobject_class->set_property = gst_app_src_set_property;
330   gobject_class->get_property = gst_app_src_get_property;
331 
332   /**
333    * GstAppSrc:caps:
334    *
335    * The GstCaps that will negotiated downstream and will be put
336    * on outgoing buffers.
337    */
338   g_object_class_install_property (gobject_class, PROP_CAPS,
339       g_param_spec_boxed ("caps", "Caps",
340           "The allowed caps for the src pad", GST_TYPE_CAPS,
341           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
342   /**
343    * GstAppSrc:format:
344    *
345    * The format to use for segment events. When the source is producing
346    * timestamped buffers this property should be set to GST_FORMAT_TIME.
347    */
348   g_object_class_install_property (gobject_class, PROP_FORMAT,
349       g_param_spec_enum ("format", "Format",
350           "The format of the segment events and seek", GST_TYPE_FORMAT,
351           DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
352   /**
353    * GstAppSrc:size:
354    *
355    * The total size in bytes of the data stream. If the total size is known, it
356    * is recommended to configure it with this property.
357    */
358   g_object_class_install_property (gobject_class, PROP_SIZE,
359       g_param_spec_int64 ("size", "Size",
360           "The size of the data stream in bytes (-1 if unknown)",
361           -1, G_MAXINT64, DEFAULT_PROP_SIZE,
362           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363   /**
364    * GstAppSrc:stream-type:
365    *
366    * The type of stream that this source is producing.  For seekable streams the
367    * application should connect to the seek-data signal.
368    */
369   g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
370       g_param_spec_enum ("stream-type", "Stream Type",
371           "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
372           DEFAULT_PROP_STREAM_TYPE,
373           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374   /**
375    * GstAppSrc:max-bytes:
376    *
377    * The maximum amount of bytes that can be queued internally.
378    * After the maximum amount of bytes are queued, appsrc will emit the
379    * "enough-data" signal.
380    */
381   g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
382       g_param_spec_uint64 ("max-bytes", "Max bytes",
383           "The maximum number of bytes to queue internally (0 = unlimited)",
384           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
385           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
386 
387   /**
388    * GstAppSrc:max-buffers:
389    *
390    * The maximum amount of buffers that can be queued internally.
391    * After the maximum amount of buffers are queued, appsrc will emit the
392    * "enough-data" signal.
393    *
394    * Since: 1.20
395    */
396   g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
397       g_param_spec_uint64 ("max-buffers", "Max buffers",
398           "The maximum number of buffers to queue internally (0 = unlimited)",
399           0, G_MAXUINT64, DEFAULT_PROP_MAX_BUFFERS,
400           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
401 
402   /**
403    * GstAppSrc:max-time:
404    *
405    * The maximum amount of time that can be queued internally.
406    * After the maximum amount of time are queued, appsrc will emit the
407    * "enough-data" signal.
408    *
409    * Since: 1.20
410    */
411   g_object_class_install_property (gobject_class, PROP_MAX_TIME,
412       g_param_spec_uint64 ("max-time", "Max time",
413           "The maximum amount of time to queue internally (0 = unlimited)",
414           0, G_MAXUINT64, DEFAULT_PROP_MAX_TIME,
415           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
416 
417   /**
418    * GstAppSrc:block:
419    *
420    * When max-bytes are queued and after the enough-data signal has been emitted,
421    * block any further push-buffer calls until the amount of queued bytes drops
422    * below the max-bytes limit.
423    */
424   g_object_class_install_property (gobject_class, PROP_BLOCK,
425       g_param_spec_boolean ("block", "Block",
426           "Block push-buffer when max-bytes are queued",
427           DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428 
429   /**
430    * GstAppSrc:is-live:
431    *
432    * Instruct the source to behave like a live source. This includes that it
433    * will only push out buffers in the PLAYING state.
434    */
435   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
436       g_param_spec_boolean ("is-live", "Is Live",
437           "Whether to act as a live source",
438           DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
439   /**
440    * GstAppSrc:min-latency:
441    *
442    * The minimum latency of the source. A value of -1 will use the default
443    * latency calculations of #GstBaseSrc.
444    */
445   g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
446       g_param_spec_int64 ("min-latency", "Min Latency",
447           "The minimum latency (-1 = default)",
448           -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
449           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
450   /**
451    * GstAppSrc::max-latency:
452    *
453    * The maximum latency of the source. A value of -1 means an unlimited amount
454    * of latency.
455    */
456   g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
457       g_param_spec_int64 ("max-latency", "Max Latency",
458           "The maximum latency (-1 = unlimited)",
459           -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
460           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
461 
462   /**
463    * GstAppSrc:emit-signals:
464    *
465    * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
466    * This option is by default enabled for backwards compatibility reasons but
467    * can disabled when needed because signal emission is expensive.
468    */
469   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
470       g_param_spec_boolean ("emit-signals", "Emit signals",
471           "Emit need-data, enough-data and seek-data signals",
472           DEFAULT_PROP_EMIT_SIGNALS,
473           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
474 
475   /**
476    * GstAppSrc:min-percent:
477    *
478    * Make appsrc emit the "need-data" signal when the amount of bytes in the
479    * queue drops below this percentage of max-bytes.
480    */
481   g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
482       g_param_spec_uint ("min-percent", "Min Percent",
483           "Emit need-data when queued bytes drops below this percent of max-bytes",
484           0, 100, DEFAULT_PROP_MIN_PERCENT,
485           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
486 
487   /**
488    * GstAppSrc:current-level-bytes:
489    *
490    * The number of currently queued bytes inside appsrc.
491    *
492    * Since: 1.2
493    */
494   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BYTES,
495       g_param_spec_uint64 ("current-level-bytes", "Current Level Bytes",
496           "The number of currently queued bytes",
497           0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES,
498           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
499 
500   /**
501    * GstAppSrc:current-level-buffers:
502    *
503    * The number of currently queued buffers inside appsrc.
504    *
505    * Since: 1.20
506    */
507   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BUFFERS,
508       g_param_spec_uint64 ("current-level-buffers", "Current Level Buffers",
509           "The number of currently queued buffers",
510           0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BUFFERS,
511           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
512 
513   /**
514    * GstAppSrc:current-level-time:
515    *
516    * The amount of currently queued time inside appsrc.
517    *
518    * Since: 1.20
519    */
520   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME,
521       g_param_spec_uint64 ("current-level-time", "Current Level Time",
522           "The amount of currently queued time",
523           0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_TIME,
524           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
525 
526   /**
527    * GstAppSrc:duration:
528    *
529    * The total duration in nanoseconds of the data stream. If the total duration is known, it
530    * is recommended to configure it with this property.
531    *
532    * Since: 1.10
533    */
534   g_object_class_install_property (gobject_class, PROP_DURATION,
535       g_param_spec_uint64 ("duration", "Duration",
536           "The duration of the data stream in nanoseconds (GST_CLOCK_TIME_NONE if unknown)",
537           0, G_MAXUINT64, DEFAULT_PROP_DURATION,
538           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
539 
540   /**
541    * GstAppSrc:handle-segment-change:
542    *
543    * When enabled, appsrc will check GstSegment in GstSample which was
544    * pushed via gst_app_src_push_sample() or "push-sample" signal action.
545    * If a GstSegment is changed, corresponding segment event will be followed
546    * by next data flow.
547    *
548    * FIXME: currently only GST_FORMAT_TIME format is supported and therefore
549    * GstAppSrc::format should be time. However, possibly #GstAppSrc can support
550    * other formats.
551    *
552    * Since: 1.18
553    */
554   g_object_class_install_property (gobject_class, PROP_HANDLE_SEGMENT_CHANGE,
555       g_param_spec_boolean ("handle-segment-change", "Handle Segment Change",
556           "Whether to detect and handle changed time format GstSegment in "
557           "GstSample. User should set valid GstSegment in GstSample. "
558           "Must set format property as \"time\" to enable this property",
559           DEFAULT_PROP_HANDLE_SEGMENT_CHANGE,
560           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
561           G_PARAM_STATIC_STRINGS));
562 
563   /**
564    * GstAppSrc:leaky-type:
565    *
566    * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
567    * will drop any buffers that are pushed into it once its internal queue is
568    * full. The selected type defines whether to drop the oldest or new
569    * buffers.
570    *
571    * Since: 1.20
572    */
573   g_object_class_install_property (gobject_class, PROP_LEAKY_TYPE,
574       g_param_spec_enum ("leaky-type", "Leaky Type",
575           "Whether to drop buffers once the internal queue is full",
576           GST_TYPE_APP_LEAKY_TYPE,
577           DEFAULT_PROP_LEAKY_TYPE,
578           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
579           G_PARAM_STATIC_STRINGS));
580 
581 #ifdef OHOS_OPT_COMPAT
582 // ohos.opt.compat.0060
583   g_object_class_install_property (gobject_class, PROP_DATASRC_MODE,
584       g_param_spec_boolean ("datasrc-mode", "Datasrc-Mode", "Datasrc Mode",
585       FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
586 #endif
587 
588   /**
589    * GstAppSrc::need-data:
590    * @appsrc: the appsrc element that emitted the signal
591    * @length: the amount of bytes needed.
592    *
593    * Signal that the source needs more data. In the callback or from another
594    * thread you should call push-buffer or end-of-stream.
595    *
596    * @length is just a hint and when it is set to -1, any number of bytes can be
597    * pushed into @appsrc.
598    *
599    * You can call push-buffer multiple times until the enough-data signal is
600    * fired.
601    */
602   gst_app_src_signals[SIGNAL_NEED_DATA] =
603       g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
604       G_STRUCT_OFFSET (GstAppSrcClass, need_data),
605       NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
606 
607   /**
608    * GstAppSrc::enough-data:
609    * @appsrc: the appsrc element that emitted the signal
610    *
611    * Signal that the source has enough data. It is recommended that the
612    * application stops calling push-buffer until the need-data signal is
613    * emitted again to avoid excessive buffer queueing.
614    */
615   gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
616       g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
617       G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
618       NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
619 
620   /**
621    * GstAppSrc::seek-data:
622    * @appsrc: the appsrc element that emitted the signal
623    * @offset: the offset to seek to
624    *
625    * Seek to the given offset. The next push-buffer should produce buffers from
626    * the new @offset.
627    * This callback is only called for seekable stream types.
628    *
629    * Returns: %TRUE if the seek succeeded.
630    */
631   gst_app_src_signals[SIGNAL_SEEK_DATA] =
632       g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
633       G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
634       NULL, NULL, NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
635 
636    /**
637     * GstAppSrc::push-buffer:
638     * @appsrc: the appsrc
639     * @buffer: (transfer none): a buffer to push
640     *
641     * Adds a buffer to the queue of buffers that the appsrc element will
642     * push to its source pad.
643     *
644     * This function does not take ownership of the buffer, but it takes a
645     * reference so the buffer can be unreffed at any time after calling this
646     * function.
647     *
648     * When the block property is TRUE, this function can block until free space
649     * becomes available in the queue.
650     */
651   gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
652       g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
653       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
654           push_buffer), NULL, NULL, NULL,
655       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
656 
657    /**
658     * GstAppSrc::push-buffer-list:
659     * @appsrc: the appsrc
660     * @buffer_list: (transfer none): a buffer list to push
661     *
662     * Adds a buffer list to the queue of buffers and buffer lists that the
663     * appsrc element will push to its source pad.
664     *
665     * This function does not take ownership of the buffer list, but it takes a
666     * reference so the buffer list can be unreffed at any time after calling
667     * this function.
668     *
669     * When the block property is TRUE, this function can block until free space
670     * becomes available in the queue.
671     *
672     * Since: 1.14
673     */
674   gst_app_src_signals[SIGNAL_PUSH_BUFFER_LIST] =
675       g_signal_new ("push-buffer-list", G_TYPE_FROM_CLASS (klass),
676       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
677           push_buffer_list), NULL, NULL, NULL,
678       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER_LIST);
679 
680   /**
681     * GstAppSrc::push-sample:
682     * @appsrc: the appsrc
683     * @sample: (transfer none): a sample from which extract buffer to push
684     *
685     * Extract a buffer from the provided sample and adds the extracted buffer
686     * to the queue of buffers that the appsrc element will
687     * push to its source pad. This function set the appsrc caps based on the caps
688     * in the sample and reset the caps if they change.
689     * Only the caps and the buffer of the provided sample are used and not
690     * for example the segment in the sample.
691     *
692     * This function does not take ownership of the sample, but it takes a
693     * reference so the sample can be unreffed at any time after calling this
694     * function.
695     *
696     * When the block property is TRUE, this function can block until free space
697     * becomes available in the queue.
698     *
699     * Since: 1.6
700     */
701   gst_app_src_signals[SIGNAL_PUSH_SAMPLE] =
702       g_signal_new ("push-sample", G_TYPE_FROM_CLASS (klass),
703       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
704           push_sample), NULL, NULL, NULL,
705       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_SAMPLE);
706 
707 
708    /**
709     * GstAppSrc::end-of-stream:
710     * @appsrc: the appsrc
711     *
712     * Notify @appsrc that no more buffer are available.
713     */
714   gst_app_src_signals[SIGNAL_END_OF_STREAM] =
715       g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
716       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
717           end_of_stream), NULL, NULL, NULL,
718       GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
719 
720   gst_element_class_set_static_metadata (element_class, "AppSrc",
721       "Generic/Source", "Allow the application to feed buffers to a pipeline",
722       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
723 
724   gst_element_class_add_static_pad_template (element_class,
725       &gst_app_src_template);
726 
727   element_class->send_event = gst_app_src_send_event;
728 
729   basesrc_class->negotiate = gst_app_src_negotiate;
730   basesrc_class->get_caps = gst_app_src_internal_get_caps;
731   basesrc_class->create = gst_app_src_create;
732   basesrc_class->start = gst_app_src_start;
733   basesrc_class->stop = gst_app_src_stop;
734   basesrc_class->unlock = gst_app_src_unlock;
735   basesrc_class->unlock_stop = gst_app_src_unlock_stop;
736   basesrc_class->do_seek = gst_app_src_do_seek;
737   basesrc_class->is_seekable = gst_app_src_is_seekable;
738   basesrc_class->get_size = gst_app_src_do_get_size;
739   basesrc_class->query = gst_app_src_query;
740   basesrc_class->event = gst_app_src_event;
741 
742   klass->push_buffer = gst_app_src_push_buffer_action;
743   klass->push_buffer_list = gst_app_src_push_buffer_list_action;
744   klass->push_sample = gst_app_src_push_sample_action;
745   klass->end_of_stream = gst_app_src_end_of_stream;
746 }
747 
748 static void
gst_app_src_init(GstAppSrc * appsrc)749 gst_app_src_init (GstAppSrc * appsrc)
750 {
751   GstAppSrcPrivate *priv;
752 
753   priv = appsrc->priv = gst_app_src_get_instance_private (appsrc);
754 
755   g_mutex_init (&priv->mutex);
756   g_cond_init (&priv->cond);
757   priv->queue = gst_queue_array_new (16);
758   priv->wait_status = NOONE_WAITING;
759 
760   priv->size = DEFAULT_PROP_SIZE;
761   priv->duration = DEFAULT_PROP_DURATION;
762   priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
763   priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
764   priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
765   priv->max_time = DEFAULT_PROP_MAX_TIME;
766   priv->format = DEFAULT_PROP_FORMAT;
767   priv->block = DEFAULT_PROP_BLOCK;
768   priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
769   priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
770   priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
771   priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
772   priv->datasrc_mode = FALSE;
773   priv->handle_segment_change = DEFAULT_PROP_HANDLE_SEGMENT_CHANGE;
774   priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE;
775 
776   gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
777 }
778 
779 /* Must be called with priv->mutex */
780 static void
gst_app_src_flush_queued(GstAppSrc * src,gboolean retain_last_caps)781 gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
782 {
783   GstMiniObject *obj;
784   GstAppSrcPrivate *priv = src->priv;
785   GstCaps *requeue_caps = NULL;
786 
787   while (!gst_queue_array_is_empty (priv->queue)) {
788     obj = gst_queue_array_pop_head (priv->queue);
789     if (obj) {
790       if (GST_IS_CAPS (obj) && retain_last_caps) {
791         gst_caps_replace (&requeue_caps, GST_CAPS_CAST (obj));
792       }
793       gst_mini_object_unref (obj);
794     }
795   }
796 
797   if (requeue_caps) {
798     gst_queue_array_push_tail (priv->queue, requeue_caps);
799   }
800 
801   priv->queued_bytes = 0;
802   priv->queued_buffers = 0;
803   priv->queued_time = 0;
804   priv->last_in_running_time = GST_CLOCK_TIME_NONE;
805   priv->last_out_running_time = GST_CLOCK_TIME_NONE;
806   priv->need_discont_upstream = FALSE;
807   priv->need_discont_downstream = FALSE;
808 }
809 
810 static void
gst_app_src_dispose(GObject * obj)811 gst_app_src_dispose (GObject * obj)
812 {
813   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
814   GstAppSrcPrivate *priv = appsrc->priv;
815   Callbacks *callbacks = NULL;
816 
817   GST_OBJECT_LOCK (appsrc);
818   if (priv->current_caps) {
819     gst_caps_unref (priv->current_caps);
820     priv->current_caps = NULL;
821   }
822   if (priv->last_caps) {
823     gst_caps_unref (priv->last_caps);
824     priv->last_caps = NULL;
825   }
826   GST_OBJECT_UNLOCK (appsrc);
827 
828   g_mutex_lock (&priv->mutex);
829   if (priv->callbacks)
830     callbacks = g_steal_pointer (&priv->callbacks);
831   gst_app_src_flush_queued (appsrc, FALSE);
832   g_mutex_unlock (&priv->mutex);
833 
834   g_clear_pointer (&callbacks, callbacks_unref);
835 
836   G_OBJECT_CLASS (parent_class)->dispose (obj);
837 }
838 
839 static void
gst_app_src_finalize(GObject * obj)840 gst_app_src_finalize (GObject * obj)
841 {
842   GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
843   GstAppSrcPrivate *priv = appsrc->priv;
844 
845   g_mutex_clear (&priv->mutex);
846   g_cond_clear (&priv->cond);
847   gst_queue_array_free (priv->queue);
848 
849   g_free (priv->uri);
850 
851   G_OBJECT_CLASS (parent_class)->finalize (obj);
852 }
853 
854 static GstCaps *
gst_app_src_internal_get_caps(GstBaseSrc * bsrc,GstCaps * filter)855 gst_app_src_internal_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
856 {
857   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
858   GstCaps *caps;
859 
860   GST_OBJECT_LOCK (appsrc);
861   if ((caps = appsrc->priv->current_caps))
862     gst_caps_ref (caps);
863   GST_OBJECT_UNLOCK (appsrc);
864 
865   if (filter) {
866     if (caps) {
867       GstCaps *intersection =
868           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
869       gst_caps_unref (caps);
870       caps = intersection;
871     } else {
872       caps = gst_caps_ref (filter);
873     }
874   }
875 
876   GST_DEBUG_OBJECT (appsrc, "caps: %" GST_PTR_FORMAT, caps);
877   return caps;
878 }
879 
880 static void
gst_app_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)881 gst_app_src_set_property (GObject * object, guint prop_id,
882     const GValue * value, GParamSpec * pspec)
883 {
884   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
885   GstAppSrcPrivate *priv = appsrc->priv;
886 
887   switch (prop_id) {
888     case PROP_CAPS:
889       gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
890       break;
891     case PROP_SIZE:
892       gst_app_src_set_size (appsrc, g_value_get_int64 (value));
893       break;
894     case PROP_STREAM_TYPE:
895       gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
896       break;
897     case PROP_MAX_BYTES:
898       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
899       break;
900     case PROP_MAX_BUFFERS:
901       gst_app_src_set_max_buffers (appsrc, g_value_get_uint64 (value));
902       break;
903     case PROP_MAX_TIME:
904       gst_app_src_set_max_time (appsrc, g_value_get_uint64 (value));
905       break;
906     case PROP_FORMAT:
907       priv->format = g_value_get_enum (value);
908       break;
909     case PROP_BLOCK:
910       priv->block = g_value_get_boolean (value);
911       break;
912     case PROP_IS_LIVE:
913       gst_base_src_set_live (GST_BASE_SRC (appsrc),
914           g_value_get_boolean (value));
915       break;
916     case PROP_MIN_LATENCY:
917       gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
918           FALSE, -1);
919       break;
920     case PROP_MAX_LATENCY:
921       gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
922           g_value_get_int64 (value));
923       break;
924     case PROP_EMIT_SIGNALS:
925       gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
926       break;
927     case PROP_MIN_PERCENT:
928       priv->min_percent = g_value_get_uint (value);
929       break;
930     case PROP_DURATION:
931       gst_app_src_set_duration (appsrc, g_value_get_uint64 (value));
932       break;
933     case PROP_HANDLE_SEGMENT_CHANGE:
934       priv->handle_segment_change = g_value_get_boolean (value);
935       break;
936     case PROP_LEAKY_TYPE:
937       priv->leaky_type = g_value_get_enum (value);
938       break;
939 #ifdef OHOS_OPT_COMPAT
940 // ohos.opt.compat.0060
941     case PROP_DATASRC_MODE:
942       priv->datasrc_mode = g_value_get_boolean (value);
943 #endif
944     default:
945       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
946       break;
947   }
948 }
949 
950 static void
gst_app_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)951 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
952     GParamSpec * pspec)
953 {
954   GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
955   GstAppSrcPrivate *priv = appsrc->priv;
956 
957   switch (prop_id) {
958     case PROP_CAPS:
959       g_value_take_boxed (value, gst_app_src_get_caps (appsrc));
960       break;
961     case PROP_SIZE:
962       g_value_set_int64 (value, gst_app_src_get_size (appsrc));
963       break;
964     case PROP_STREAM_TYPE:
965       g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
966       break;
967     case PROP_MAX_BYTES:
968       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
969       break;
970     case PROP_MAX_BUFFERS:
971       g_value_set_uint64 (value, gst_app_src_get_max_buffers (appsrc));
972       break;
973     case PROP_MAX_TIME:
974       g_value_set_uint64 (value, gst_app_src_get_max_time (appsrc));
975       break;
976     case PROP_FORMAT:
977       g_value_set_enum (value, priv->format);
978       break;
979     case PROP_BLOCK:
980       g_value_set_boolean (value, priv->block);
981       break;
982     case PROP_IS_LIVE:
983       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
984       break;
985     case PROP_MIN_LATENCY:
986     {
987       guint64 min = 0;
988 
989       gst_app_src_get_latency (appsrc, &min, NULL);
990       g_value_set_int64 (value, min);
991       break;
992     }
993     case PROP_MAX_LATENCY:
994     {
995       guint64 max = 0;
996 
997       gst_app_src_get_latency (appsrc, NULL, &max);
998       g_value_set_int64 (value, max);
999       break;
1000     }
1001     case PROP_EMIT_SIGNALS:
1002       g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
1003       break;
1004     case PROP_MIN_PERCENT:
1005       g_value_set_uint (value, priv->min_percent);
1006       break;
1007     case PROP_CURRENT_LEVEL_BYTES:
1008       g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc));
1009       break;
1010     case PROP_CURRENT_LEVEL_BUFFERS:
1011       g_value_set_uint64 (value,
1012           gst_app_src_get_current_level_buffers (appsrc));
1013       break;
1014     case PROP_CURRENT_LEVEL_TIME:
1015       g_value_set_uint64 (value, gst_app_src_get_current_level_time (appsrc));
1016       break;
1017     case PROP_DURATION:
1018       g_value_set_uint64 (value, gst_app_src_get_duration (appsrc));
1019       break;
1020     case PROP_HANDLE_SEGMENT_CHANGE:
1021       g_value_set_boolean (value, priv->handle_segment_change);
1022       break;
1023     case PROP_LEAKY_TYPE:
1024       g_value_set_enum (value, priv->leaky_type);
1025       break;
1026     default:
1027       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1028       break;
1029   }
1030 }
1031 
1032 static gboolean
gst_app_src_send_event(GstElement * element,GstEvent * event)1033 gst_app_src_send_event (GstElement * element, GstEvent * event)
1034 {
1035   GstAppSrc *appsrc = GST_APP_SRC_CAST (element);
1036   GstAppSrcPrivate *priv = appsrc->priv;
1037 
1038   switch (GST_EVENT_TYPE (event)) {
1039     case GST_EVENT_FLUSH_STOP:
1040       g_mutex_lock (&priv->mutex);
1041       gst_app_src_flush_queued (appsrc, TRUE);
1042       g_mutex_unlock (&priv->mutex);
1043       break;
1044     default:
1045       if (GST_EVENT_IS_SERIALIZED (event)) {
1046         GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event);
1047         g_mutex_lock (&priv->mutex);
1048         gst_queue_array_push_tail (priv->queue, event);
1049 
1050         if ((priv->wait_status & STREAM_WAITING))
1051           g_cond_broadcast (&priv->cond);
1052 
1053         g_mutex_unlock (&priv->mutex);
1054         return TRUE;
1055       }
1056       break;
1057   }
1058 
1059   return GST_CALL_PARENT_WITH_DEFAULT (GST_ELEMENT_CLASS, send_event, (element,
1060           event), FALSE);
1061 }
1062 
1063 static gboolean
gst_app_src_unlock(GstBaseSrc * bsrc)1064 gst_app_src_unlock (GstBaseSrc * bsrc)
1065 {
1066   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1067   GstAppSrcPrivate *priv = appsrc->priv;
1068 
1069   g_mutex_lock (&priv->mutex);
1070   GST_DEBUG_OBJECT (appsrc, "unlock start");
1071   priv->flushing = TRUE;
1072   g_cond_broadcast (&priv->cond);
1073   g_mutex_unlock (&priv->mutex);
1074 
1075   return TRUE;
1076 }
1077 
1078 static gboolean
gst_app_src_unlock_stop(GstBaseSrc * bsrc)1079 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
1080 {
1081   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1082   GstAppSrcPrivate *priv = appsrc->priv;
1083 
1084   g_mutex_lock (&priv->mutex);
1085   GST_DEBUG_OBJECT (appsrc, "unlock stop");
1086   priv->flushing = FALSE;
1087   g_cond_broadcast (&priv->cond);
1088   g_mutex_unlock (&priv->mutex);
1089 
1090   return TRUE;
1091 }
1092 
1093 static gboolean
gst_app_src_start(GstBaseSrc * bsrc)1094 gst_app_src_start (GstBaseSrc * bsrc)
1095 {
1096   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1097   GstAppSrcPrivate *priv = appsrc->priv;
1098 
1099   g_mutex_lock (&priv->mutex);
1100   GST_DEBUG_OBJECT (appsrc, "starting");
1101   priv->started = TRUE;
1102   /* set the offset to -1 so that we always do a first seek. This is only used
1103    * in random-access mode. */
1104   priv->offset = -1;
1105   priv->flushing = FALSE;
1106   g_mutex_unlock (&priv->mutex);
1107 
1108   gst_base_src_set_format (bsrc, priv->format);
1109   gst_segment_init (&priv->last_segment, priv->format);
1110   gst_segment_init (&priv->current_segment, priv->format);
1111   priv->pending_custom_segment = FALSE;
1112 
1113   return TRUE;
1114 }
1115 
1116 static gboolean
gst_app_src_stop(GstBaseSrc * bsrc)1117 gst_app_src_stop (GstBaseSrc * bsrc)
1118 {
1119   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1120   GstAppSrcPrivate *priv = appsrc->priv;
1121 
1122   g_mutex_lock (&priv->mutex);
1123   GST_DEBUG_OBJECT (appsrc, "stopping");
1124   priv->is_eos = FALSE;
1125   priv->flushing = TRUE;
1126   priv->started = FALSE;
1127   gst_app_src_flush_queued (appsrc, TRUE);
1128   g_cond_broadcast (&priv->cond);
1129   g_mutex_unlock (&priv->mutex);
1130 
1131   return TRUE;
1132 }
1133 
1134 static gboolean
gst_app_src_is_seekable(GstBaseSrc * src)1135 gst_app_src_is_seekable (GstBaseSrc * src)
1136 {
1137   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1138   GstAppSrcPrivate *priv = appsrc->priv;
1139   gboolean res = FALSE;
1140 
1141   switch (priv->stream_type) {
1142     case GST_APP_STREAM_TYPE_STREAM:
1143       break;
1144     case GST_APP_STREAM_TYPE_SEEKABLE:
1145     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
1146       res = TRUE;
1147       break;
1148   }
1149   return res;
1150 }
1151 
1152 static gboolean
gst_app_src_do_get_size(GstBaseSrc * src,guint64 * size)1153 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
1154 {
1155   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1156 
1157   *size = gst_app_src_get_size (appsrc);
1158 
1159   return TRUE;
1160 }
1161 
1162 static gboolean
gst_app_src_query(GstBaseSrc * src,GstQuery * query)1163 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
1164 {
1165   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1166   GstAppSrcPrivate *priv = appsrc->priv;
1167   gboolean res;
1168 
1169   switch (GST_QUERY_TYPE (query)) {
1170     case GST_QUERY_LATENCY:
1171     {
1172       GstClockTime min, max;
1173       gboolean live;
1174 
1175       /* Query the parent class for the defaults */
1176       res = gst_base_src_query_latency (src, &live, &min, &max);
1177 
1178       /* overwrite with our values when we need to */
1179       g_mutex_lock (&priv->mutex);
1180       if (priv->min_latency != -1) {
1181         min = priv->min_latency;
1182         max = priv->max_latency;
1183       }
1184       g_mutex_unlock (&priv->mutex);
1185 
1186       gst_query_set_latency (query, live, min, max);
1187       break;
1188     }
1189     case GST_QUERY_SCHEDULING:
1190     {
1191       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1192       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1193 
1194       switch (priv->stream_type) {
1195         case GST_APP_STREAM_TYPE_STREAM:
1196         case GST_APP_STREAM_TYPE_SEEKABLE:
1197           break;
1198         case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
1199           gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1200           break;
1201       }
1202       res = TRUE;
1203       break;
1204     }
1205     case GST_QUERY_DURATION:
1206     {
1207       GstFormat format;
1208       gst_query_parse_duration (query, &format, NULL);
1209       if (format == GST_FORMAT_BYTES) {
1210         gst_query_set_duration (query, format, priv->size);
1211         res = TRUE;
1212       } else if (format == GST_FORMAT_TIME) {
1213         if (priv->duration != GST_CLOCK_TIME_NONE) {
1214           gst_query_set_duration (query, format, priv->duration);
1215           res = TRUE;
1216         } else {
1217           res = FALSE;
1218         }
1219       } else {
1220         res = FALSE;
1221       }
1222       break;
1223     }
1224     default:
1225       res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
1226       break;
1227   }
1228 
1229   return res;
1230 }
1231 
1232 /* will be called in push mode */
1233 static gboolean
gst_app_src_do_seek(GstBaseSrc * src,GstSegment * segment)1234 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1235 {
1236   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1237   GstAppSrcPrivate *priv = appsrc->priv;
1238   gint64 desired_position;
1239   gboolean res = FALSE;
1240   gboolean emit;
1241   Callbacks *callbacks = NULL;
1242 
1243   desired_position = segment->position;
1244 
1245   /* no need to try to seek in streaming mode */
1246   if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
1247     return TRUE;
1248 
1249   GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
1250       desired_position, gst_format_get_name (segment->format));
1251 
1252   g_mutex_lock (&priv->mutex);
1253   emit = priv->emit_signals;
1254   if (priv->callbacks)
1255     callbacks = callbacks_ref (priv->callbacks);
1256   g_mutex_unlock (&priv->mutex);
1257 
1258   if (callbacks && callbacks->callbacks.seek_data) {
1259     res =
1260         callbacks->callbacks.seek_data (appsrc, desired_position,
1261         callbacks->user_data);
1262   } else if (emit) {
1263     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1264         desired_position, &res);
1265   }
1266 
1267   g_clear_pointer (&callbacks, callbacks_unref);
1268 
1269   if (res) {
1270     GST_DEBUG_OBJECT (appsrc, "flushing queue");
1271     g_mutex_lock (&priv->mutex);
1272     gst_app_src_flush_queued (appsrc, TRUE);
1273     gst_segment_copy_into (segment, &priv->last_segment);
1274     gst_segment_copy_into (segment, &priv->current_segment);
1275     priv->pending_custom_segment = FALSE;
1276     g_mutex_unlock (&priv->mutex);
1277     priv->is_eos = FALSE;
1278   } else {
1279     GST_WARNING_OBJECT (appsrc, "seek failed");
1280   }
1281 
1282   return res;
1283 }
1284 
1285 /* must be called with the appsrc mutex */
1286 static gboolean
gst_app_src_emit_seek(GstAppSrc * appsrc,guint64 offset)1287 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
1288 {
1289   gboolean res = FALSE;
1290   gboolean emit;
1291   GstAppSrcPrivate *priv = appsrc->priv;
1292   Callbacks *callbacks = NULL;
1293 
1294   emit = priv->emit_signals;
1295   if (priv->callbacks)
1296     callbacks = callbacks_ref (priv->callbacks);
1297   g_mutex_unlock (&priv->mutex);
1298 
1299   GST_DEBUG_OBJECT (appsrc,
1300       "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
1301       priv->offset, offset);
1302 
1303   if (callbacks && callbacks->callbacks.seek_data)
1304     res = callbacks->callbacks.seek_data (appsrc, offset, callbacks->user_data);
1305   else if (emit)
1306     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1307         offset, &res);
1308 
1309   g_clear_pointer (&callbacks, callbacks_unref);
1310 
1311   g_mutex_lock (&priv->mutex);
1312 
1313   return res;
1314 }
1315 
1316 /* must be called with the appsrc mutex. After this call things can be
1317  * flushing */
1318 static void
gst_app_src_emit_need_data(GstAppSrc * appsrc,guint size)1319 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
1320 {
1321   gboolean emit;
1322   GstAppSrcPrivate *priv = appsrc->priv;
1323   Callbacks *callbacks = NULL;
1324 
1325   emit = priv->emit_signals;
1326   if (priv->callbacks)
1327     callbacks = callbacks_ref (priv->callbacks);
1328   g_mutex_unlock (&priv->mutex);
1329 
1330   /* we have no data, we need some. We fire the signal with the size hint. */
1331   if (callbacks && callbacks->callbacks.need_data)
1332     callbacks->callbacks.need_data (appsrc, size, callbacks->user_data);
1333   else if (emit)
1334     g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
1335         NULL);
1336 
1337   g_clear_pointer (&callbacks, callbacks_unref);
1338 
1339   g_mutex_lock (&priv->mutex);
1340   /* we can be flushing now because we released the lock */
1341 }
1342 
1343 /* must be called with the appsrc mutex */
1344 static gboolean
gst_app_src_do_negotiate(GstBaseSrc * basesrc)1345 gst_app_src_do_negotiate (GstBaseSrc * basesrc)
1346 {
1347   GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1348   GstAppSrcPrivate *priv = appsrc->priv;
1349   gboolean result;
1350   GstCaps *caps;
1351 
1352   GST_OBJECT_LOCK (basesrc);
1353   caps = priv->current_caps ? gst_caps_ref (priv->current_caps) : NULL;
1354   GST_OBJECT_UNLOCK (basesrc);
1355 
1356   /* Avoid deadlock by unlocking mutex
1357    * otherwise we get deadlock between this and stream lock */
1358   g_mutex_unlock (&priv->mutex);
1359   if (caps) {
1360     result = gst_base_src_set_caps (basesrc, caps);
1361     gst_caps_unref (caps);
1362   } else {
1363     result = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc);
1364   }
1365   g_mutex_lock (&priv->mutex);
1366 
1367   return result;
1368 }
1369 
1370 static gboolean
gst_app_src_negotiate(GstBaseSrc * basesrc)1371 gst_app_src_negotiate (GstBaseSrc * basesrc)
1372 {
1373   GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1374   GstAppSrcPrivate *priv = appsrc->priv;
1375   gboolean result;
1376 
1377   g_mutex_lock (&priv->mutex);
1378   result = gst_app_src_do_negotiate (basesrc);
1379   g_mutex_unlock (&priv->mutex);
1380   return result;
1381 }
1382 
1383 /* Update the currently queued bytes/buffers/time information for the item
1384  * that was just removed from the queue.
1385  *
1386  * If update_offset is set, additionally the offset of the source will be
1387  * moved forward accordingly as if that many bytes were output.
1388  */
1389 static void
gst_app_src_update_queued_pop(GstAppSrc * appsrc,GstMiniObject * item,gboolean update_offset)1390 gst_app_src_update_queued_pop (GstAppSrc * appsrc, GstMiniObject * item,
1391     gboolean update_offset)
1392 {
1393   GstAppSrcPrivate *priv = appsrc->priv;
1394   guint buf_size = 0;
1395   guint n_buffers = 0;
1396   GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
1397 
1398   if (GST_IS_BUFFER (item)) {
1399     GstBuffer *buf = GST_BUFFER_CAST (item);
1400     buf_size = gst_buffer_get_size (buf);
1401     n_buffers = 1;
1402 
1403     end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
1404     if (end_buffer_ts != GST_CLOCK_TIME_NONE
1405         && GST_BUFFER_DURATION_IS_VALID (buf))
1406       end_buffer_ts += GST_BUFFER_DURATION (buf);
1407 
1408     GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", buf, buf_size);
1409   } else if (GST_IS_BUFFER_LIST (item)) {
1410     GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
1411     guint i;
1412 
1413     n_buffers = gst_buffer_list_length (buffer_list);
1414 
1415     for (i = 0; i < n_buffers; i++) {
1416       GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
1417       GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
1418 
1419       buf_size += gst_buffer_get_size (tmp);
1420       /* Update to the last buffer's timestamp that is known */
1421       if (ts != GST_CLOCK_TIME_NONE) {
1422         end_buffer_ts = ts;
1423         if (GST_BUFFER_DURATION_IS_VALID (tmp))
1424           end_buffer_ts += GST_BUFFER_DURATION (tmp);
1425       }
1426     }
1427   }
1428 
1429   priv->queued_bytes -= buf_size;
1430   priv->queued_buffers -= n_buffers;
1431 
1432   /* Update time level if working on a TIME segment */
1433   if ((priv->current_segment.format == GST_FORMAT_TIME
1434           || (priv->current_segment.format == GST_FORMAT_UNDEFINED
1435               && priv->last_segment.format == GST_FORMAT_TIME))
1436       && end_buffer_ts != GST_CLOCK_TIME_NONE) {
1437     const GstSegment *segment =
1438         priv->current_segment.format ==
1439         GST_FORMAT_TIME ? &priv->current_segment : &priv->last_segment;
1440 
1441     /* Clip to the current segment boundaries */
1442     if (segment->stop != -1 && end_buffer_ts > segment->stop)
1443       end_buffer_ts = segment->stop;
1444     else if (segment->start > end_buffer_ts)
1445       end_buffer_ts = segment->start;
1446 
1447     priv->last_out_running_time =
1448         gst_segment_to_running_time (segment, GST_FORMAT_TIME, end_buffer_ts);
1449 
1450     GST_TRACE_OBJECT (appsrc,
1451         "Last in running time %" GST_TIME_FORMAT ", last out running time %"
1452         GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
1453         GST_TIME_ARGS (priv->last_out_running_time));
1454 
1455     /* If timestamps on both sides are known, calculate the current
1456      * fill level in time and consider the queue empty if the output
1457      * running time is lower than the input one (i.e. some kind of reset
1458      * has happened).
1459      */
1460     if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
1461         && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
1462       if (priv->last_out_running_time > priv->last_in_running_time) {
1463         priv->queued_time = 0;
1464       } else {
1465         priv->queued_time =
1466             priv->last_in_running_time - priv->last_out_running_time;
1467       }
1468     }
1469   }
1470 
1471   GST_DEBUG_OBJECT (appsrc,
1472       "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
1473       " buffers, %" GST_TIME_FORMAT, priv->queued_bytes,
1474       priv->queued_buffers, GST_TIME_ARGS (priv->queued_time));
1475 
1476   /* only update the offset when in random_access mode and when requested by
1477    * the caller, i.e. not when just dropping the item */
1478   if (update_offset && priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
1479     priv->offset += buf_size;
1480 }
1481 
1482 /* Update the currently queued bytes/buffers/time information for the item
1483  * that was just added to the queue.
1484  */
1485 static void
gst_app_src_update_queued_push(GstAppSrc * appsrc,GstMiniObject * item)1486 gst_app_src_update_queued_push (GstAppSrc * appsrc, GstMiniObject * item)
1487 {
1488   GstAppSrcPrivate *priv = appsrc->priv;
1489   GstClockTime start_buffer_ts = GST_CLOCK_TIME_NONE;
1490   GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
1491   guint buf_size = 0;
1492   guint n_buffers = 0;
1493 
1494   if (GST_IS_BUFFER (item)) {
1495     GstBuffer *buf = GST_BUFFER_CAST (item);
1496 
1497     buf_size = gst_buffer_get_size (buf);
1498     n_buffers = 1;
1499 
1500     start_buffer_ts = end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
1501     if (end_buffer_ts != GST_CLOCK_TIME_NONE
1502         && GST_BUFFER_DURATION_IS_VALID (buf))
1503       end_buffer_ts += GST_BUFFER_DURATION (buf);
1504   } else if (GST_IS_BUFFER_LIST (item)) {
1505     GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
1506     guint i;
1507 
1508     n_buffers = gst_buffer_list_length (buffer_list);
1509 
1510     for (i = 0; i < n_buffers; i++) {
1511       GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
1512       GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
1513 
1514       buf_size += gst_buffer_get_size (tmp);
1515 
1516       if (ts != GST_CLOCK_TIME_NONE) {
1517         if (start_buffer_ts == GST_CLOCK_TIME_NONE)
1518           start_buffer_ts = ts;
1519         end_buffer_ts = ts;
1520         if (GST_BUFFER_DURATION_IS_VALID (tmp))
1521           end_buffer_ts += GST_BUFFER_DURATION (tmp);
1522       }
1523     }
1524   }
1525 
1526   priv->queued_bytes += buf_size;
1527   priv->queued_buffers += n_buffers;
1528 
1529   /* Update time level if working on a TIME segment */
1530   if (priv->last_segment.format == GST_FORMAT_TIME
1531       && end_buffer_ts != GST_CLOCK_TIME_NONE) {
1532     /* Clip to the last segment boundaries */
1533     if (priv->last_segment.stop != -1
1534         && end_buffer_ts > priv->last_segment.stop)
1535       end_buffer_ts = priv->last_segment.stop;
1536     else if (priv->last_segment.start > end_buffer_ts)
1537       end_buffer_ts = priv->last_segment.start;
1538 
1539     priv->last_in_running_time =
1540         gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
1541         end_buffer_ts);
1542 
1543     /* If this is the only buffer then we can directly update the queued time
1544      * here. This is especially useful if this was the first buffer because
1545      * otherwise we would have to wait until it is actually unqueued to know
1546      * the queued duration */
1547     if (priv->queued_buffers == 1) {
1548       if (priv->last_segment.stop != -1
1549           && start_buffer_ts > priv->last_segment.stop)
1550         start_buffer_ts = priv->last_segment.stop;
1551       else if (priv->last_segment.start > start_buffer_ts)
1552         start_buffer_ts = priv->last_segment.start;
1553 
1554       priv->last_out_running_time =
1555           gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
1556           start_buffer_ts);
1557     }
1558 
1559     GST_TRACE_OBJECT (appsrc,
1560         "Last in running time %" GST_TIME_FORMAT ", last out running time %"
1561         GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
1562         GST_TIME_ARGS (priv->last_out_running_time));
1563 
1564     if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
1565         && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
1566       if (priv->last_out_running_time > priv->last_in_running_time) {
1567         priv->queued_time = 0;
1568       } else {
1569         priv->queued_time =
1570             priv->last_in_running_time - priv->last_out_running_time;
1571       }
1572     }
1573   }
1574 
1575   GST_DEBUG_OBJECT (appsrc,
1576       "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
1577       " buffers, %" GST_TIME_FORMAT, priv->queued_bytes, priv->queued_buffers,
1578       GST_TIME_ARGS (priv->queued_time));
1579 }
1580 
1581 static GstFlowReturn
gst_app_src_create(GstBaseSrc * bsrc,guint64 offset,guint size,GstBuffer ** buf)1582 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
1583     GstBuffer ** buf)
1584 {
1585   GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1586   GstAppSrcPrivate *priv = appsrc->priv;
1587   GstFlowReturn ret;
1588 
1589   GST_OBJECT_LOCK (appsrc);
1590   if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
1591           bsrc->segment.format == GST_FORMAT_BYTES)) {
1592     GST_DEBUG_OBJECT (appsrc,
1593         "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
1594         bsrc->segment.duration, priv->size);
1595     bsrc->segment.duration = priv->size;
1596     GST_OBJECT_UNLOCK (appsrc);
1597 
1598     gst_element_post_message (GST_ELEMENT (appsrc),
1599         gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1600   } else if (G_UNLIKELY (priv->duration != bsrc->segment.duration &&
1601           bsrc->segment.format == GST_FORMAT_TIME)) {
1602     GST_DEBUG_OBJECT (appsrc,
1603         "Duration changed from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
1604         GST_TIME_ARGS (bsrc->segment.duration), GST_TIME_ARGS (priv->duration));
1605     bsrc->segment.duration = priv->duration;
1606     GST_OBJECT_UNLOCK (appsrc);
1607 
1608     gst_element_post_message (GST_ELEMENT (appsrc),
1609         gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1610   } else {
1611     GST_OBJECT_UNLOCK (appsrc);
1612   }
1613 
1614   g_mutex_lock (&priv->mutex);
1615   /* check flushing first */
1616   if (G_UNLIKELY (priv->flushing))
1617     goto flushing;
1618 
1619   if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
1620     /* if we are dealing with a random-access stream, issue a seek if the offset
1621      * changed. */
1622     if (G_UNLIKELY (priv->offset != offset)) {
1623       gboolean res;
1624 
1625       /* do the seek */
1626       res = gst_app_src_emit_seek (appsrc, offset);
1627 #ifdef OHOS_OPT_COMPAT
1628 /**
1629  * ohos.opt.compat.0060
1630  * Flush data after seek to avoid push old data.
1631  */
1632       if (priv->datasrc_mode) {
1633         GST_DEBUG_OBJECT(appsrc, "appsrc do seek, begin, flush");
1634         gst_app_src_flush_queued (appsrc, FALSE);
1635       }
1636 #endif
1637 
1638       if (G_UNLIKELY (!res))
1639         /* failing to seek is fatal */
1640         goto seek_error;
1641 
1642       priv->offset = offset;
1643       priv->is_eos = FALSE;
1644     }
1645   }
1646 
1647   while (TRUE) {
1648     /* Our lock may have been release to push events or caps, check out
1649      * state in case we are now flushing. */
1650     if (G_UNLIKELY (priv->flushing))
1651       goto flushing;
1652 
1653     /* return data as long as we have some */
1654     if (!gst_queue_array_is_empty (priv->queue)) {
1655       GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
1656 
1657       if (GST_IS_CAPS (obj)) {
1658         GstCaps *next_caps = GST_CAPS (obj);
1659         gboolean caps_changed = TRUE;
1660 
1661         if (next_caps && priv->current_caps)
1662           caps_changed = !gst_caps_is_equal (next_caps, priv->current_caps);
1663         else
1664           caps_changed = (next_caps != priv->current_caps);
1665 
1666         gst_caps_replace (&priv->current_caps, next_caps);
1667 
1668         if (next_caps) {
1669           gst_caps_unref (next_caps);
1670         }
1671 
1672         if (caps_changed)
1673           gst_app_src_do_negotiate (bsrc);
1674 
1675         /* Continue checks caps and queue */
1676         continue;
1677       }
1678 
1679       if (GST_IS_BUFFER (obj)) {
1680         GstBuffer *buffer = GST_BUFFER (obj);
1681 
1682         /* Mark the buffer as DISCONT if we previously dropped a buffer
1683          * instead of outputting it */
1684         if (priv->need_discont_downstream) {
1685           buffer = gst_buffer_make_writable (buffer);
1686           GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1687           priv->need_discont_downstream = FALSE;
1688         }
1689 
1690         *buf = buffer;
1691       } else if (GST_IS_BUFFER_LIST (obj)) {
1692         GstBufferList *buffer_list;
1693 
1694         buffer_list = GST_BUFFER_LIST (obj);
1695 
1696         /* Mark the first buffer of the buffer list as DISCONT if we
1697          * previously dropped a buffer instead of outputting it */
1698         if (priv->need_discont_downstream) {
1699           GstBuffer *buffer;
1700 
1701           buffer_list = gst_buffer_list_make_writable (buffer_list);
1702           buffer = gst_buffer_list_get_writable (buffer_list, 0);
1703           GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1704           priv->need_discont_downstream = FALSE;
1705         }
1706 
1707         gst_base_src_submit_buffer_list (bsrc, buffer_list);
1708         *buf = NULL;
1709       } else if (GST_IS_EVENT (obj)) {
1710         GstEvent *event = GST_EVENT (obj);
1711 
1712         GST_DEBUG_OBJECT (appsrc, "pop event %" GST_PTR_FORMAT, event);
1713 
1714         if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1715           const GstSegment *segment = NULL;
1716 
1717           gst_event_parse_segment (event, &segment);
1718           g_assert (segment != NULL);
1719 
1720           if (!gst_segment_is_equal (&priv->current_segment, segment)) {
1721             GST_DEBUG_OBJECT (appsrc,
1722                 "Update new segment %" GST_PTR_FORMAT, event);
1723             if (!gst_base_src_new_segment (bsrc, segment)) {
1724               GST_ERROR_OBJECT (appsrc,
1725                   "Couldn't set new segment %" GST_PTR_FORMAT, event);
1726               gst_event_unref (event);
1727               goto invalid_segment;
1728             }
1729             gst_segment_copy_into (segment, &priv->current_segment);
1730           }
1731 
1732           gst_event_unref (event);
1733         } else {
1734           GstEvent *seg_event;
1735           GstSegment last_segment = priv->last_segment;
1736 
1737           /* event is serialized with the buffers flow */
1738 
1739           /* We are about to push an event, release out lock */
1740           g_mutex_unlock (&priv->mutex);
1741 
1742           seg_event =
1743               gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc),
1744               GST_EVENT_SEGMENT, 0);
1745           if (!seg_event) {
1746             seg_event = gst_event_new_segment (&last_segment);
1747 
1748             GST_DEBUG_OBJECT (appsrc,
1749                 "received serialized event before first buffer, push default segment %"
1750                 GST_PTR_FORMAT, seg_event);
1751 
1752             gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), seg_event);
1753           } else {
1754             gst_event_unref (seg_event);
1755           }
1756 
1757           gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event);
1758 
1759           g_mutex_lock (&priv->mutex);
1760         }
1761         continue;
1762       } else {
1763         g_assert_not_reached ();
1764       }
1765 
1766       gst_app_src_update_queued_pop (appsrc, obj, TRUE);
1767 
1768       /* signal that we removed an item */
1769       if ((priv->wait_status & APP_WAITING))
1770         g_cond_broadcast (&priv->cond);
1771 
1772       /* see if we go lower than the min-percent */
1773       if (priv->min_percent) {
1774         if ((priv->max_bytes
1775                 && priv->queued_bytes * 100 / priv->max_bytes <=
1776                 priv->min_percent) || (priv->max_buffers
1777                 && priv->queued_buffers * 100 / priv->max_buffers <=
1778                 priv->min_percent) || (priv->max_time
1779                 && priv->queued_time * 100 / priv->max_time <=
1780                 priv->min_percent)) {
1781           /* ignore flushing state, we got a buffer and we will return it now.
1782            * Errors will be handled in the next round */
1783           gst_app_src_emit_need_data (appsrc, size);
1784         }
1785       }
1786       ret = GST_FLOW_OK;
1787       break;
1788     } else {
1789       gst_app_src_emit_need_data (appsrc, size);
1790 
1791       /* we can be flushing now because we released the lock above */
1792       if (G_UNLIKELY (priv->flushing))
1793         goto flushing;
1794 
1795       /* if we have a buffer now, continue the loop and try to return it. In
1796        * random-access mode (where a buffer is normally pushed in the above
1797        * signal) we can still be empty because the pushed buffer got flushed or
1798        * when the application pushes the requested buffer later, we support both
1799        * possibilities. */
1800       if (!gst_queue_array_is_empty (priv->queue))
1801         continue;
1802 
1803       /* no buffer yet, maybe we are EOS, if not, block for more data. */
1804     }
1805 
1806     /* check EOS */
1807     if (G_UNLIKELY (priv->is_eos))
1808       goto eos;
1809 
1810     /* nothing to return, wait a while for new data or flushing. */
1811     priv->wait_status |= STREAM_WAITING;
1812     g_cond_wait (&priv->cond, &priv->mutex);
1813     priv->wait_status &= ~STREAM_WAITING;
1814   }
1815   g_mutex_unlock (&priv->mutex);
1816   return ret;
1817 
1818   /* ERRORS */
1819 flushing:
1820   {
1821     GST_DEBUG_OBJECT (appsrc, "we are flushing");
1822     g_mutex_unlock (&priv->mutex);
1823     return GST_FLOW_FLUSHING;
1824   }
1825 eos:
1826   {
1827     GST_DEBUG_OBJECT (appsrc, "we are EOS");
1828     g_mutex_unlock (&priv->mutex);
1829     return GST_FLOW_EOS;
1830   }
1831 seek_error:
1832   {
1833     g_mutex_unlock (&priv->mutex);
1834     GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1835         GST_ERROR_SYSTEM);
1836     return GST_FLOW_ERROR;
1837   }
1838 
1839 invalid_segment:
1840   {
1841     g_mutex_unlock (&priv->mutex);
1842     GST_ELEMENT_ERROR (appsrc, LIBRARY, SETTINGS,
1843         (NULL), ("Failed to configure the provided input segment."));
1844     return GST_FLOW_ERROR;
1845   }
1846 }
1847 
1848 /* external API */
1849 
1850 /**
1851  * gst_app_src_set_caps:
1852  * @appsrc: a #GstAppSrc
1853  * @caps: (nullable): caps to set
1854  *
1855  * Set the capabilities on the appsrc element.  This function takes
1856  * a copy of the caps structure. After calling this method, the source will
1857  * only produce caps that match @caps. @caps must be fixed and the caps on the
1858  * buffers must match the caps or left NULL.
1859  */
1860 void
gst_app_src_set_caps(GstAppSrc * appsrc,const GstCaps * caps)1861 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1862 {
1863   GstAppSrcPrivate *priv;
1864   gboolean caps_changed;
1865 
1866   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1867 
1868   priv = appsrc->priv;
1869 
1870   g_mutex_lock (&priv->mutex);
1871 
1872   GST_OBJECT_LOCK (appsrc);
1873   if (caps && priv->last_caps)
1874     caps_changed = !gst_caps_is_equal (caps, priv->last_caps);
1875   else
1876     caps_changed = (caps != priv->last_caps);
1877 
1878   if (caps_changed) {
1879     GstCaps *new_caps;
1880     gpointer t;
1881 
1882     new_caps = caps ? gst_caps_copy (caps) : NULL;
1883     GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1884 
1885     while ((t = gst_queue_array_peek_tail (priv->queue)) && GST_IS_CAPS (t)) {
1886       gst_caps_unref (gst_queue_array_pop_tail (priv->queue));
1887     }
1888     gst_queue_array_push_tail (priv->queue, new_caps);
1889     gst_caps_replace (&priv->last_caps, new_caps);
1890 
1891     if ((priv->wait_status & STREAM_WAITING))
1892       g_cond_broadcast (&priv->cond);
1893   }
1894 
1895   GST_OBJECT_UNLOCK (appsrc);
1896 
1897   g_mutex_unlock (&priv->mutex);
1898 }
1899 
1900 /**
1901  * gst_app_src_get_caps:
1902  * @appsrc: a #GstAppSrc
1903  *
1904  * Get the configured caps on @appsrc.
1905  *
1906  * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
1907  */
1908 GstCaps *
gst_app_src_get_caps(GstAppSrc * appsrc)1909 gst_app_src_get_caps (GstAppSrc * appsrc)
1910 {
1911 
1912   GstCaps *caps;
1913 
1914   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1915 
1916   GST_OBJECT_LOCK (appsrc);
1917   if ((caps = appsrc->priv->last_caps))
1918     gst_caps_ref (caps);
1919   GST_OBJECT_UNLOCK (appsrc);
1920 
1921   return caps;
1922 
1923 }
1924 
1925 /**
1926  * gst_app_src_set_size:
1927  * @appsrc: a #GstAppSrc
1928  * @size: the size to set
1929  *
1930  * Set the size of the stream in bytes. A value of -1 means that the size is
1931  * not known.
1932  */
1933 void
gst_app_src_set_size(GstAppSrc * appsrc,gint64 size)1934 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1935 {
1936   GstAppSrcPrivate *priv;
1937 
1938   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1939 
1940   priv = appsrc->priv;
1941 
1942   GST_OBJECT_LOCK (appsrc);
1943   GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1944   priv->size = size;
1945   GST_OBJECT_UNLOCK (appsrc);
1946 }
1947 
1948 /**
1949  * gst_app_src_get_size:
1950  * @appsrc: a #GstAppSrc
1951  *
1952  * Get the size of the stream in bytes. A value of -1 means that the size is
1953  * not known.
1954  *
1955  * Returns: the size of the stream previously set with gst_app_src_set_size();
1956  */
1957 gint64
gst_app_src_get_size(GstAppSrc * appsrc)1958 gst_app_src_get_size (GstAppSrc * appsrc)
1959 {
1960   gint64 size;
1961   GstAppSrcPrivate *priv;
1962 
1963   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1964 
1965   priv = appsrc->priv;
1966 
1967   GST_OBJECT_LOCK (appsrc);
1968   size = priv->size;
1969   GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1970   GST_OBJECT_UNLOCK (appsrc);
1971 
1972   return size;
1973 }
1974 
1975 /**
1976  * gst_app_src_set_duration:
1977  * @appsrc: a #GstAppSrc
1978  * @duration: the duration to set
1979  *
1980  * Set the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
1981  * not known.
1982  *
1983  * Since: 1.10
1984  */
1985 void
gst_app_src_set_duration(GstAppSrc * appsrc,GstClockTime duration)1986 gst_app_src_set_duration (GstAppSrc * appsrc, GstClockTime duration)
1987 {
1988   GstAppSrcPrivate *priv;
1989 
1990   g_return_if_fail (GST_IS_APP_SRC (appsrc));
1991 
1992   priv = appsrc->priv;
1993 
1994   GST_OBJECT_LOCK (appsrc);
1995   GST_DEBUG_OBJECT (appsrc, "setting duration of %" GST_TIME_FORMAT,
1996       GST_TIME_ARGS (duration));
1997   priv->duration = duration;
1998   GST_OBJECT_UNLOCK (appsrc);
1999 }
2000 
2001 /**
2002  * gst_app_src_get_duration:
2003  * @appsrc: a #GstAppSrc
2004  *
2005  * Get the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
2006  * not known.
2007  *
2008  * Returns: the duration of the stream previously set with gst_app_src_set_duration();
2009  *
2010  * Since: 1.10
2011  */
2012 GstClockTime
gst_app_src_get_duration(GstAppSrc * appsrc)2013 gst_app_src_get_duration (GstAppSrc * appsrc)
2014 {
2015   GstClockTime duration;
2016   GstAppSrcPrivate *priv;
2017 
2018   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
2019 
2020   priv = appsrc->priv;
2021 
2022   GST_OBJECT_LOCK (appsrc);
2023   duration = priv->duration;
2024   GST_DEBUG_OBJECT (appsrc, "getting duration of %" GST_TIME_FORMAT,
2025       GST_TIME_ARGS (duration));
2026   GST_OBJECT_UNLOCK (appsrc);
2027 
2028   return duration;
2029 }
2030 
2031 /**
2032  * gst_app_src_set_stream_type:
2033  * @appsrc: a #GstAppSrc
2034  * @type: the new state
2035  *
2036  * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
2037  * be connected to.
2038  *
2039  * A stream_type stream
2040  */
2041 void
gst_app_src_set_stream_type(GstAppSrc * appsrc,GstAppStreamType type)2042 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
2043 {
2044   GstAppSrcPrivate *priv;
2045 
2046   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2047 
2048   priv = appsrc->priv;
2049 
2050   GST_OBJECT_LOCK (appsrc);
2051   GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
2052   priv->stream_type = type;
2053   GST_OBJECT_UNLOCK (appsrc);
2054 }
2055 
2056 /**
2057  * gst_app_src_get_stream_type:
2058  * @appsrc: a #GstAppSrc
2059  *
2060  * Get the stream type. Control the stream type of @appsrc
2061  * with gst_app_src_set_stream_type().
2062  *
2063  * Returns: the stream type.
2064  */
2065 GstAppStreamType
gst_app_src_get_stream_type(GstAppSrc * appsrc)2066 gst_app_src_get_stream_type (GstAppSrc * appsrc)
2067 {
2068   gboolean stream_type;
2069   GstAppSrcPrivate *priv;
2070 
2071   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
2072 
2073   priv = appsrc->priv;
2074 
2075   GST_OBJECT_LOCK (appsrc);
2076   stream_type = priv->stream_type;
2077   GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
2078   GST_OBJECT_UNLOCK (appsrc);
2079 
2080   return stream_type;
2081 }
2082 
2083 /**
2084  * gst_app_src_set_max_bytes:
2085  * @appsrc: a #GstAppSrc
2086  * @max: the maximum number of bytes to queue
2087  *
2088  * Set the maximum amount of bytes that can be queued in @appsrc.
2089  * After the maximum amount of bytes are queued, @appsrc will emit the
2090  * "enough-data" signal.
2091  */
2092 void
gst_app_src_set_max_bytes(GstAppSrc * appsrc,guint64 max)2093 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
2094 {
2095   GstAppSrcPrivate *priv;
2096 
2097   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2098 
2099   priv = appsrc->priv;
2100 
2101   g_mutex_lock (&priv->mutex);
2102   if (max != priv->max_bytes) {
2103     GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
2104     priv->max_bytes = max;
2105     /* signal the change */
2106     g_cond_broadcast (&priv->cond);
2107   }
2108   g_mutex_unlock (&priv->mutex);
2109 }
2110 
2111 /**
2112  * gst_app_src_get_max_bytes:
2113  * @appsrc: a #GstAppSrc
2114  *
2115  * Get the maximum amount of bytes that can be queued in @appsrc.
2116  *
2117  * Returns: The maximum amount of bytes that can be queued.
2118  */
2119 guint64
gst_app_src_get_max_bytes(GstAppSrc * appsrc)2120 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
2121 {
2122   guint64 result;
2123   GstAppSrcPrivate *priv;
2124 
2125   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2126 
2127   priv = appsrc->priv;
2128 
2129   g_mutex_lock (&priv->mutex);
2130   result = priv->max_bytes;
2131   GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
2132   g_mutex_unlock (&priv->mutex);
2133 
2134   return result;
2135 }
2136 
2137 /**
2138  * gst_app_src_get_current_level_bytes:
2139  * @appsrc: a #GstAppSrc
2140  *
2141  * Get the number of currently queued bytes inside @appsrc.
2142  *
2143  * Returns: The number of currently queued bytes.
2144  *
2145  * Since: 1.2
2146  */
2147 guint64
gst_app_src_get_current_level_bytes(GstAppSrc * appsrc)2148 gst_app_src_get_current_level_bytes (GstAppSrc * appsrc)
2149 {
2150   guint64 queued;
2151   GstAppSrcPrivate *priv;
2152 
2153   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
2154 
2155   priv = appsrc->priv;
2156 
2157   GST_OBJECT_LOCK (appsrc);
2158   queued = priv->queued_bytes;
2159   GST_DEBUG_OBJECT (appsrc, "current level bytes is %" G_GUINT64_FORMAT,
2160       queued);
2161   GST_OBJECT_UNLOCK (appsrc);
2162 
2163   return queued;
2164 }
2165 
2166 /**
2167  * gst_app_src_set_max_buffers:
2168  * @appsrc: a #GstAppSrc
2169  * @max: the maximum number of buffers to queue
2170  *
2171  * Set the maximum amount of buffers that can be queued in @appsrc.
2172  * After the maximum amount of buffers are queued, @appsrc will emit the
2173  * "enough-data" signal.
2174  *
2175  * Since: 1.20
2176  */
2177 void
gst_app_src_set_max_buffers(GstAppSrc * appsrc,guint64 max)2178 gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint64 max)
2179 {
2180   GstAppSrcPrivate *priv;
2181 
2182   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2183 
2184   priv = appsrc->priv;
2185 
2186   g_mutex_lock (&priv->mutex);
2187   if (max != priv->max_buffers) {
2188     GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %" G_GUINT64_FORMAT, max);
2189     priv->max_buffers = max;
2190     /* signal the change */
2191     g_cond_broadcast (&priv->cond);
2192   }
2193   g_mutex_unlock (&priv->mutex);
2194 }
2195 
2196 /**
2197  * gst_app_src_get_max_buffers:
2198  * @appsrc: a #GstAppSrc
2199  *
2200  * Get the maximum amount of buffers that can be queued in @appsrc.
2201  *
2202  * Returns: The maximum amount of buffers that can be queued.
2203  *
2204  * Since: 1.20
2205  */
2206 guint64
gst_app_src_get_max_buffers(GstAppSrc * appsrc)2207 gst_app_src_get_max_buffers (GstAppSrc * appsrc)
2208 {
2209   guint64 result;
2210   GstAppSrcPrivate *priv;
2211 
2212   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2213 
2214   priv = appsrc->priv;
2215 
2216   g_mutex_lock (&priv->mutex);
2217   result = priv->max_buffers;
2218   GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %" G_GUINT64_FORMAT,
2219       result);
2220   g_mutex_unlock (&priv->mutex);
2221 
2222   return result;
2223 }
2224 
2225 /**
2226  * gst_app_src_get_current_level_buffers:
2227  * @appsrc: a #GstAppSrc
2228  *
2229  * Get the number of currently queued buffers inside @appsrc.
2230  *
2231  * Returns: The number of currently queued buffers.
2232  *
2233  * Since: 1.20
2234  */
2235 guint64
gst_app_src_get_current_level_buffers(GstAppSrc * appsrc)2236 gst_app_src_get_current_level_buffers (GstAppSrc * appsrc)
2237 {
2238   guint64 queued;
2239   GstAppSrcPrivate *priv;
2240 
2241   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
2242 
2243   priv = appsrc->priv;
2244 
2245   GST_OBJECT_LOCK (appsrc);
2246   queued = priv->queued_buffers;
2247   GST_DEBUG_OBJECT (appsrc, "current level buffers is %" G_GUINT64_FORMAT,
2248       queued);
2249   GST_OBJECT_UNLOCK (appsrc);
2250 
2251   return queued;
2252 }
2253 
2254 /**
2255  * gst_app_src_set_max_time:
2256  * @appsrc: a #GstAppSrc
2257  * @max: the maximum amonut of time to queue
2258  *
2259  * Set the maximum amount of time that can be queued in @appsrc.
2260  * After the maximum amount of time are queued, @appsrc will emit the
2261  * "enough-data" signal.
2262  *
2263  * Since: 1.20
2264  */
2265 void
gst_app_src_set_max_time(GstAppSrc * appsrc,GstClockTime max)2266 gst_app_src_set_max_time (GstAppSrc * appsrc, GstClockTime max)
2267 {
2268   GstAppSrcPrivate *priv;
2269 
2270   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2271 
2272   priv = appsrc->priv;
2273 
2274   g_mutex_lock (&priv->mutex);
2275   if (max != priv->max_time) {
2276     GST_DEBUG_OBJECT (appsrc, "setting max-time to %" GST_TIME_FORMAT,
2277         GST_TIME_ARGS (max));
2278     priv->max_time = max;
2279     /* signal the change */
2280     g_cond_broadcast (&priv->cond);
2281   }
2282   g_mutex_unlock (&priv->mutex);
2283 }
2284 
2285 /**
2286  * gst_app_src_get_max_time:
2287  * @appsrc: a #GstAppSrc
2288  *
2289  * Get the maximum amount of time that can be queued in @appsrc.
2290  *
2291  * Returns: The maximum amount of time that can be queued.
2292  *
2293  * Since: 1.20
2294  */
2295 GstClockTime
gst_app_src_get_max_time(GstAppSrc * appsrc)2296 gst_app_src_get_max_time (GstAppSrc * appsrc)
2297 {
2298   GstClockTime result;
2299   GstAppSrcPrivate *priv;
2300 
2301   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2302 
2303   priv = appsrc->priv;
2304 
2305   g_mutex_lock (&priv->mutex);
2306   result = priv->max_time;
2307   GST_DEBUG_OBJECT (appsrc, "getting max-time of %" GST_TIME_FORMAT,
2308       GST_TIME_ARGS (result));
2309   g_mutex_unlock (&priv->mutex);
2310 
2311   return result;
2312 }
2313 
2314 /**
2315  * gst_app_src_get_current_level_time:
2316  * @appsrc: a #GstAppSrc
2317  *
2318  * Get the amount of currently queued time inside @appsrc.
2319  *
2320  * Returns: The amount of currently queued time.
2321  *
2322  * Since: 1.20
2323  */
2324 GstClockTime
gst_app_src_get_current_level_time(GstAppSrc * appsrc)2325 gst_app_src_get_current_level_time (GstAppSrc * appsrc)
2326 {
2327   gint64 queued;
2328   GstAppSrcPrivate *priv;
2329 
2330   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
2331 
2332   priv = appsrc->priv;
2333 
2334   GST_OBJECT_LOCK (appsrc);
2335   queued = priv->queued_time;
2336   GST_DEBUG_OBJECT (appsrc, "current level time is %" GST_TIME_FORMAT,
2337       GST_TIME_ARGS (queued));
2338   GST_OBJECT_UNLOCK (appsrc);
2339 
2340   return queued;
2341 }
2342 
2343 static void
gst_app_src_set_latencies(GstAppSrc * appsrc,gboolean do_min,guint64 min,gboolean do_max,guint64 max)2344 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
2345     gboolean do_max, guint64 max)
2346 {
2347   GstAppSrcPrivate *priv = appsrc->priv;
2348   gboolean changed = FALSE;
2349 
2350   g_mutex_lock (&priv->mutex);
2351   if (do_min && priv->min_latency != min) {
2352     priv->min_latency = min;
2353     changed = TRUE;
2354   }
2355   if (do_max && priv->max_latency != max) {
2356     priv->max_latency = max;
2357     changed = TRUE;
2358   }
2359   g_mutex_unlock (&priv->mutex);
2360 
2361   if (changed) {
2362     GST_DEBUG_OBJECT (appsrc, "posting latency changed");
2363     gst_element_post_message (GST_ELEMENT_CAST (appsrc),
2364         gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
2365   }
2366 }
2367 
2368 /**
2369  * gst_app_src_set_leaky_type:
2370  * @appsrc: a #GstAppSrc
2371  * @leaky: the #GstAppLeakyType
2372  *
2373  * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
2374  * will drop any buffers that are pushed into it once its internal queue is
2375  * full. The selected type defines whether to drop the oldest or new
2376  * buffers.
2377  *
2378  * Since: 1.20
2379  */
2380 void
gst_app_src_set_leaky_type(GstAppSrc * appsrc,GstAppLeakyType leaky)2381 gst_app_src_set_leaky_type (GstAppSrc * appsrc, GstAppLeakyType leaky)
2382 {
2383   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2384 
2385   appsrc->priv->leaky_type = leaky;
2386 }
2387 
2388 /**
2389  * gst_app_src_get_leaky_type:
2390  * @appsrc: a #GstAppSrc
2391  *
2392  * Returns the currently set #GstAppLeakyType. See gst_app_src_set_leaky_type()
2393  * for more details.
2394  *
2395  * Returns: The currently set #GstAppLeakyType.
2396  *
2397  * Since: 1.20
2398  */
2399 GstAppLeakyType
gst_app_src_get_leaky_type(GstAppSrc * appsrc)2400 gst_app_src_get_leaky_type (GstAppSrc * appsrc)
2401 {
2402   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_APP_LEAKY_TYPE_NONE);
2403 
2404   return appsrc->priv->leaky_type;
2405 }
2406 
2407 /**
2408  * gst_app_src_set_latency:
2409  * @appsrc: a #GstAppSrc
2410  * @min: the min latency
2411  * @max: the max latency
2412  *
2413  * Configure the @min and @max latency in @src. If @min is set to -1, the
2414  * default latency calculations for pseudo-live sources will be used.
2415  */
2416 void
gst_app_src_set_latency(GstAppSrc * appsrc,guint64 min,guint64 max)2417 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
2418 {
2419   gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
2420 }
2421 
2422 /**
2423  * gst_app_src_get_latency:
2424  * @appsrc: a #GstAppSrc
2425  * @min: (out): the min latency
2426  * @max: (out): the max latency
2427  *
2428  * Retrieve the min and max latencies in @min and @max respectively.
2429  */
2430 void
gst_app_src_get_latency(GstAppSrc * appsrc,guint64 * min,guint64 * max)2431 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
2432 {
2433   GstAppSrcPrivate *priv;
2434 
2435   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2436 
2437   priv = appsrc->priv;
2438 
2439   g_mutex_lock (&priv->mutex);
2440   if (min)
2441     *min = priv->min_latency;
2442   if (max)
2443     *max = priv->max_latency;
2444   g_mutex_unlock (&priv->mutex);
2445 }
2446 
2447 /**
2448  * gst_app_src_set_emit_signals:
2449  * @appsrc: a #GstAppSrc
2450  * @emit: the new state
2451  *
2452  * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
2453  * by default disabled because signal emission is expensive and unneeded when
2454  * the application prefers to operate in pull mode.
2455  */
2456 void
gst_app_src_set_emit_signals(GstAppSrc * appsrc,gboolean emit)2457 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
2458 {
2459   GstAppSrcPrivate *priv;
2460 
2461   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2462 
2463   priv = appsrc->priv;
2464 
2465   g_mutex_lock (&priv->mutex);
2466   priv->emit_signals = emit;
2467   g_mutex_unlock (&priv->mutex);
2468 }
2469 
2470 /**
2471  * gst_app_src_get_emit_signals:
2472  * @appsrc: a #GstAppSrc
2473  *
2474  * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
2475  *
2476  * Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
2477  * signals.
2478  */
2479 gboolean
gst_app_src_get_emit_signals(GstAppSrc * appsrc)2480 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
2481 {
2482   gboolean result;
2483   GstAppSrcPrivate *priv;
2484 
2485   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
2486 
2487   priv = appsrc->priv;
2488 
2489   g_mutex_lock (&priv->mutex);
2490   result = priv->emit_signals;
2491   g_mutex_unlock (&priv->mutex);
2492 
2493   return result;
2494 }
2495 
2496 static GstFlowReturn
gst_app_src_push_internal(GstAppSrc * appsrc,GstBuffer * buffer,GstBufferList * buflist,gboolean steal_ref)2497 gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
2498     GstBufferList * buflist, gboolean steal_ref)
2499 {
2500   gboolean first = TRUE;
2501   GstAppSrcPrivate *priv;
2502 
2503   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
2504 
2505   priv = appsrc->priv;
2506 
2507   if (buffer != NULL)
2508     g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2509   else
2510     g_return_val_if_fail (GST_IS_BUFFER_LIST (buflist), GST_FLOW_ERROR);
2511 
2512   if (buflist != NULL) {
2513     if (gst_buffer_list_length (buflist) == 0)
2514       return GST_FLOW_OK;
2515 
2516     buffer = gst_buffer_list_get (buflist, 0);
2517   }
2518 
2519   if (GST_BUFFER_DTS (buffer) == GST_CLOCK_TIME_NONE &&
2520       GST_BUFFER_PTS (buffer) == GST_CLOCK_TIME_NONE &&
2521       gst_base_src_get_do_timestamp (GST_BASE_SRC_CAST (appsrc))) {
2522     GstClock *clock;
2523 
2524     clock = gst_element_get_clock (GST_ELEMENT_CAST (appsrc));
2525     if (clock) {
2526       GstClockTime now;
2527       GstClockTime base_time =
2528           gst_element_get_base_time (GST_ELEMENT_CAST (appsrc));
2529 
2530       now = gst_clock_get_time (clock);
2531       if (now > base_time)
2532         now -= base_time;
2533       else
2534         now = 0;
2535       gst_object_unref (clock);
2536 
2537       if (buflist == NULL) {
2538         if (!steal_ref) {
2539           buffer = gst_buffer_copy (buffer);
2540           steal_ref = TRUE;
2541         } else {
2542           buffer = gst_buffer_make_writable (buffer);
2543         }
2544       } else {
2545         if (!steal_ref) {
2546           buflist = gst_buffer_list_copy (buflist);
2547           steal_ref = TRUE;
2548         } else {
2549           buflist = gst_buffer_list_make_writable (buflist);
2550         }
2551         buffer = gst_buffer_list_get_writable (buflist, 0);
2552       }
2553 
2554       GST_BUFFER_PTS (buffer) = now;
2555       GST_BUFFER_DTS (buffer) = now;
2556     } else {
2557       GST_WARNING_OBJECT (appsrc,
2558           "do-timestamp=TRUE but buffers are provided before "
2559           "reaching the PLAYING state and having a clock. Timestamps will "
2560           "not be accurate!");
2561     }
2562   }
2563 
2564   g_mutex_lock (&priv->mutex);
2565 
2566   while (TRUE) {
2567     /* can't accept buffers when we are flushing or EOS */
2568     if (priv->flushing)
2569       goto flushing;
2570 
2571     if (priv->is_eos)
2572       goto eos;
2573 
2574     if ((priv->max_bytes && priv->queued_bytes >= priv->max_bytes) ||
2575         (priv->max_buffers && priv->queued_buffers >= priv->max_buffers) ||
2576         (priv->max_time && priv->queued_time >= priv->max_time)) {
2577       GST_DEBUG_OBJECT (appsrc,
2578           "queue filled (queued %" G_GUINT64_FORMAT " bytes, max %"
2579           G_GUINT64_FORMAT " bytes, " "queued %" G_GUINT64_FORMAT
2580           " buffers, max %" G_GUINT64_FORMAT " buffers, " "queued %"
2581           GST_TIME_FORMAT " time, max %" GST_TIME_FORMAT " time)",
2582           priv->queued_bytes, priv->max_bytes, priv->queued_buffers,
2583           priv->max_buffers, GST_TIME_ARGS (priv->queued_time),
2584           GST_TIME_ARGS (priv->max_time));
2585 
2586       if (first) {
2587         Callbacks *callbacks = NULL;
2588         gboolean emit;
2589 
2590         emit = priv->emit_signals;
2591         if (priv->callbacks)
2592           callbacks = callbacks_ref (priv->callbacks);
2593         /* only signal on the first push */
2594         g_mutex_unlock (&priv->mutex);
2595 
2596         if (callbacks && callbacks->callbacks.enough_data)
2597           callbacks->callbacks.enough_data (appsrc, callbacks->user_data);
2598         else if (emit)
2599           g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
2600               NULL);
2601 
2602         g_clear_pointer (&callbacks, callbacks_unref);
2603 
2604         g_mutex_lock (&priv->mutex);
2605       }
2606 
2607       if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) {
2608         priv->need_discont_upstream = TRUE;
2609         goto dropped;
2610       } else if (priv->leaky_type == GST_APP_LEAKY_TYPE_DOWNSTREAM) {
2611         guint i, length = gst_queue_array_get_length (priv->queue);
2612         GstMiniObject *item = NULL;
2613 
2614         /* Find the oldest buffer or buffer list and drop it, then update the
2615          * limits. Dropping one is sufficient to go below the limits again.
2616          */
2617         for (i = 0; i < length; i++) {
2618           item = gst_queue_array_peek_nth (priv->queue, i);
2619           if (GST_IS_BUFFER (item) || GST_IS_BUFFER_LIST (item)) {
2620             gst_queue_array_drop_element (priv->queue, i);
2621             break;
2622           }
2623           /* To not accidentally have an event after the loop */
2624           item = NULL;
2625         }
2626 
2627         if (!item) {
2628           GST_FIXME_OBJECT (appsrc,
2629               "No buffer or buffer list queued but queue is full");
2630           /* This shouldn't really happen but in this case we can't really do
2631            * anything apart from accepting the buffer / bufferlist */
2632           break;
2633         }
2634 
2635         GST_WARNING_OBJECT (appsrc, "Dropping old item %" GST_PTR_FORMAT, item);
2636 
2637         gst_app_src_update_queued_pop (appsrc, item, FALSE);
2638         gst_mini_object_unref (item);
2639 
2640         priv->need_discont_downstream = TRUE;
2641         continue;
2642       }
2643 
2644       if (first) {
2645         /* continue to check for flushing/eos after releasing the lock */
2646         first = FALSE;
2647         continue;
2648       }
2649       if (priv->block) {
2650         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
2651         /* we are filled, wait until a buffer gets popped or when we
2652          * flush. */
2653         priv->wait_status |= APP_WAITING;
2654         g_cond_wait (&priv->cond, &priv->mutex);
2655         priv->wait_status &= ~APP_WAITING;
2656       } else {
2657         /* no need to wait for free space, we just pump more data into the
2658          * queue hoping that the caller reacts to the enough-data signal and
2659          * stops pushing buffers. */
2660         break;
2661       }
2662     } else {
2663       break;
2664     }
2665   }
2666 
2667   if (priv->pending_custom_segment) {
2668     GstEvent *event = gst_event_new_segment (&priv->last_segment);
2669 
2670     GST_DEBUG_OBJECT (appsrc, "enqueue new segment %" GST_PTR_FORMAT, event);
2671     gst_queue_array_push_tail (priv->queue, event);
2672     priv->pending_custom_segment = FALSE;
2673   }
2674 
2675   if (buflist != NULL) {
2676     /* Mark the first buffer of the buffer list as DISCONT if we previously
2677      * dropped a buffer instead of queueing it */
2678     if (priv->need_discont_upstream) {
2679       if (!steal_ref) {
2680         buflist = gst_buffer_list_copy (buflist);
2681         steal_ref = TRUE;
2682       } else {
2683         buflist = gst_buffer_list_make_writable (buflist);
2684       }
2685       buffer = gst_buffer_list_get_writable (buflist, 0);
2686       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2687       priv->need_discont_upstream = FALSE;
2688     }
2689 
2690     GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);
2691 
2692     if (!steal_ref)
2693       gst_buffer_list_ref (buflist);
2694     gst_queue_array_push_tail (priv->queue, buflist);
2695   } else {
2696     /* Mark the buffer as DISCONT if we previously dropped a buffer instead of
2697      * queueing it */
2698     if (priv->need_discont_upstream) {
2699       if (!steal_ref) {
2700         buffer = gst_buffer_copy (buffer);
2701         steal_ref = TRUE;
2702       } else {
2703         buffer = gst_buffer_make_writable (buffer);
2704       }
2705       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2706       priv->need_discont_upstream = FALSE;
2707     }
2708 
2709     GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
2710     if (!steal_ref)
2711       gst_buffer_ref (buffer);
2712     gst_queue_array_push_tail (priv->queue, buffer);
2713   }
2714 
2715   gst_app_src_update_queued_push (appsrc,
2716       buflist ? GST_MINI_OBJECT_CAST (buflist) : GST_MINI_OBJECT_CAST (buffer));
2717 
2718   if ((priv->wait_status & STREAM_WAITING))
2719     g_cond_broadcast (&priv->cond);
2720 
2721   g_mutex_unlock (&priv->mutex);
2722 
2723   return GST_FLOW_OK;
2724 
2725   /* ERRORS */
2726 flushing:
2727   {
2728     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
2729     if (steal_ref) {
2730       if (buflist)
2731         gst_buffer_list_unref (buflist);
2732       else
2733         gst_buffer_unref (buffer);
2734     }
2735     g_mutex_unlock (&priv->mutex);
2736     return GST_FLOW_FLUSHING;
2737   }
2738 eos:
2739   {
2740     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
2741     if (steal_ref) {
2742       if (buflist)
2743         gst_buffer_list_unref (buflist);
2744       else
2745         gst_buffer_unref (buffer);
2746     }
2747     g_mutex_unlock (&priv->mutex);
2748     return GST_FLOW_EOS;
2749   }
2750 dropped:
2751   {
2752     GST_DEBUG_OBJECT (appsrc, "dropped new buffer %p, we are full", buffer);
2753     if (steal_ref) {
2754       if (buflist)
2755         gst_buffer_list_unref (buflist);
2756       else
2757         gst_buffer_unref (buffer);
2758     }
2759     g_mutex_unlock (&priv->mutex);
2760     return GST_FLOW_EOS;
2761   }
2762 }
2763 
2764 static GstFlowReturn
gst_app_src_push_buffer_full(GstAppSrc * appsrc,GstBuffer * buffer,gboolean steal_ref)2765 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
2766     gboolean steal_ref)
2767 {
2768   return gst_app_src_push_internal (appsrc, buffer, NULL, steal_ref);
2769 }
2770 
2771 static GstFlowReturn
gst_app_src_push_sample_internal(GstAppSrc * appsrc,GstSample * sample)2772 gst_app_src_push_sample_internal (GstAppSrc * appsrc, GstSample * sample)
2773 {
2774   GstAppSrcPrivate *priv = appsrc->priv;
2775   GstBufferList *buffer_list;
2776   GstBuffer *buffer;
2777   GstCaps *caps;
2778 
2779   g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
2780 
2781   caps = gst_sample_get_caps (sample);
2782   if (caps != NULL) {
2783     gst_app_src_set_caps (appsrc, caps);
2784   } else {
2785     GST_WARNING_OBJECT (appsrc, "received sample without caps");
2786   }
2787 
2788   if (priv->handle_segment_change && priv->format == GST_FORMAT_TIME) {
2789     GstSegment *segment = gst_sample_get_segment (sample);
2790 
2791     if (segment->format != GST_FORMAT_TIME) {
2792       GST_LOG_OBJECT (appsrc, "format %s is not supported",
2793           gst_format_get_name (segment->format));
2794       goto handle_buffer;
2795     }
2796 
2797     g_mutex_lock (&priv->mutex);
2798     if (gst_segment_is_equal (&priv->last_segment, segment)) {
2799       GST_LOG_OBJECT (appsrc, "segment wasn't changed");
2800       g_mutex_unlock (&priv->mutex);
2801       goto handle_buffer;
2802     } else {
2803       GST_LOG_OBJECT (appsrc,
2804           "segment changed %" GST_SEGMENT_FORMAT " -> %" GST_SEGMENT_FORMAT,
2805           &priv->last_segment, segment);
2806     }
2807 
2808     /* will be pushed to queue with next buffer/buffer-list */
2809     gst_segment_copy_into (segment, &priv->last_segment);
2810     priv->pending_custom_segment = TRUE;
2811     g_mutex_unlock (&priv->mutex);
2812   }
2813 
2814 handle_buffer:
2815 
2816   buffer = gst_sample_get_buffer (sample);
2817   if (buffer != NULL)
2818     return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
2819 
2820   buffer_list = gst_sample_get_buffer_list (sample);
2821   if (buffer_list != NULL)
2822     return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
2823 
2824   GST_WARNING_OBJECT (appsrc, "received sample without buffer or buffer list");
2825   return GST_FLOW_OK;
2826 }
2827 
2828 /**
2829  * gst_app_src_push_buffer:
2830  * @appsrc: a #GstAppSrc
2831  * @buffer: (transfer full): a #GstBuffer to push
2832  *
2833  * Adds a buffer to the queue of buffers that the appsrc element will
2834  * push to its source pad.  This function takes ownership of the buffer.
2835  *
2836  * When the block property is TRUE, this function can block until free
2837  * space becomes available in the queue.
2838  *
2839  * Returns: #GST_FLOW_OK when the buffer was successfully queued.
2840  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2841  * #GST_FLOW_EOS when EOS occurred.
2842  */
2843 GstFlowReturn
gst_app_src_push_buffer(GstAppSrc * appsrc,GstBuffer * buffer)2844 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
2845 {
2846   return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
2847 }
2848 
2849 /**
2850  * gst_app_src_push_buffer_list:
2851  * @appsrc: a #GstAppSrc
2852  * @buffer_list: (transfer full): a #GstBufferList to push
2853  *
2854  * Adds a buffer list to the queue of buffers and buffer lists that the
2855  * appsrc element will push to its source pad.  This function takes ownership
2856  * of @buffer_list.
2857  *
2858  * When the block property is TRUE, this function can block until free
2859  * space becomes available in the queue.
2860  *
2861  * Returns: #GST_FLOW_OK when the buffer list was successfully queued.
2862  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2863  * #GST_FLOW_EOS when EOS occurred.
2864  *
2865  * Since: 1.14
2866  */
2867 GstFlowReturn
gst_app_src_push_buffer_list(GstAppSrc * appsrc,GstBufferList * buffer_list)2868 gst_app_src_push_buffer_list (GstAppSrc * appsrc, GstBufferList * buffer_list)
2869 {
2870   return gst_app_src_push_internal (appsrc, NULL, buffer_list, TRUE);
2871 }
2872 
2873 /**
2874  * gst_app_src_push_sample:
2875  * @appsrc: a #GstAppSrc
2876  * @sample: (transfer none): a #GstSample from which buffer and caps may be
2877  * extracted
2878  *
2879  * Extract a buffer from the provided sample and adds it to the queue of
2880  * buffers that the appsrc element will push to its source pad. Any
2881  * previous caps that were set on appsrc will be replaced by the caps
2882  * associated with the sample if not equal.
2883  *
2884  * This function does not take ownership of the
2885  * sample so the sample needs to be unreffed after calling this function.
2886  *
2887  * When the block property is TRUE, this function can block until free
2888  * space becomes available in the queue.
2889  *
2890  * Returns: #GST_FLOW_OK when the buffer was successfully queued.
2891  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2892  * #GST_FLOW_EOS when EOS occurred.
2893  *
2894  * Since: 1.6
2895  *
2896  */
2897 GstFlowReturn
gst_app_src_push_sample(GstAppSrc * appsrc,GstSample * sample)2898 gst_app_src_push_sample (GstAppSrc * appsrc, GstSample * sample)
2899 {
2900   return gst_app_src_push_sample_internal (appsrc, sample);
2901 }
2902 
2903 /* push a buffer without stealing the ref of the buffer. This is used for the
2904  * action signal. */
2905 static GstFlowReturn
gst_app_src_push_buffer_action(GstAppSrc * appsrc,GstBuffer * buffer)2906 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
2907 {
2908   return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
2909 }
2910 
2911 /* push a buffer list without stealing the ref of the buffer list. This is
2912  * used for the action signal. */
2913 static GstFlowReturn
gst_app_src_push_buffer_list_action(GstAppSrc * appsrc,GstBufferList * buffer_list)2914 gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
2915     GstBufferList * buffer_list)
2916 {
2917   return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
2918 }
2919 
2920 /* push a sample without stealing the ref. This is used for the
2921  * action signal. */
2922 static GstFlowReturn
gst_app_src_push_sample_action(GstAppSrc * appsrc,GstSample * sample)2923 gst_app_src_push_sample_action (GstAppSrc * appsrc, GstSample * sample)
2924 {
2925   return gst_app_src_push_sample_internal (appsrc, sample);
2926 }
2927 
2928 /**
2929  * gst_app_src_end_of_stream:
2930  * @appsrc: a #GstAppSrc
2931  *
2932  * Indicates to the appsrc element that the last buffer queued in the
2933  * element is the last buffer of the stream.
2934  *
2935  * Returns: #GST_FLOW_OK when the EOS was successfully queued.
2936  * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2937  */
2938 GstFlowReturn
gst_app_src_end_of_stream(GstAppSrc * appsrc)2939 gst_app_src_end_of_stream (GstAppSrc * appsrc)
2940 {
2941   GstAppSrcPrivate *priv;
2942 
2943   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
2944 
2945   priv = appsrc->priv;
2946 
2947   g_mutex_lock (&priv->mutex);
2948   /* can't accept buffers when we are flushing. We can accept them when we are
2949    * EOS although it will not do anything. */
2950   if (priv->flushing)
2951     goto flushing;
2952 
2953   GST_DEBUG_OBJECT (appsrc, "sending EOS");
2954   priv->is_eos = TRUE;
2955   g_cond_broadcast (&priv->cond);
2956   g_mutex_unlock (&priv->mutex);
2957 
2958   return GST_FLOW_OK;
2959 
2960   /* ERRORS */
2961 flushing:
2962   {
2963     g_mutex_unlock (&priv->mutex);
2964     GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
2965     return GST_FLOW_FLUSHING;
2966   }
2967 }
2968 
2969 /**
2970  * gst_app_src_set_callbacks: (skip)
2971  * @appsrc: a #GstAppSrc
2972  * @callbacks: the callbacks
2973  * @user_data: a user_data argument for the callbacks
2974  * @notify: a destroy notify function
2975  *
2976  * Set callbacks which will be executed when data is needed, enough data has
2977  * been collected or when a seek should be performed.
2978  * This is an alternative to using the signals, it has lower overhead and is thus
2979  * less expensive, but also less flexible.
2980  *
2981  * If callbacks are installed, no signals will be emitted for performance
2982  * reasons.
2983  *
2984  * Before 1.16.3 it was not possible to change the callbacks in a thread-safe
2985  * way.
2986  */
2987 void
gst_app_src_set_callbacks(GstAppSrc * appsrc,GstAppSrcCallbacks * callbacks,gpointer user_data,GDestroyNotify notify)2988 gst_app_src_set_callbacks (GstAppSrc * appsrc,
2989     GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
2990 {
2991   Callbacks *old_callbacks, *new_callbacks = NULL;
2992   GstAppSrcPrivate *priv;
2993 
2994   g_return_if_fail (GST_IS_APP_SRC (appsrc));
2995   g_return_if_fail (callbacks != NULL);
2996 
2997   priv = appsrc->priv;
2998 
2999   if (callbacks) {
3000     new_callbacks = g_new0 (Callbacks, 1);
3001     new_callbacks->callbacks = *callbacks;
3002     new_callbacks->user_data = user_data;
3003     new_callbacks->destroy_notify = notify;
3004     new_callbacks->ref_count = 1;
3005   }
3006 
3007   g_mutex_lock (&priv->mutex);
3008   old_callbacks = g_steal_pointer (&priv->callbacks);
3009   priv->callbacks = g_steal_pointer (&new_callbacks);
3010   g_mutex_unlock (&priv->mutex);
3011 
3012   g_clear_pointer (&old_callbacks, callbacks_unref);
3013 }
3014 
3015 /*** GSTURIHANDLER INTERFACE *************************************************/
3016 
3017 static GstURIType
gst_app_src_uri_get_type(GType type)3018 gst_app_src_uri_get_type (GType type)
3019 {
3020   return GST_URI_SRC;
3021 }
3022 
3023 static const gchar *const *
gst_app_src_uri_get_protocols(GType type)3024 gst_app_src_uri_get_protocols (GType type)
3025 {
3026   static const gchar *protocols[] = { "appsrc", NULL };
3027 
3028   return protocols;
3029 }
3030 
3031 static gchar *
gst_app_src_uri_get_uri(GstURIHandler * handler)3032 gst_app_src_uri_get_uri (GstURIHandler * handler)
3033 {
3034   GstAppSrc *appsrc = GST_APP_SRC (handler);
3035 
3036   return appsrc->priv->uri ? g_strdup (appsrc->priv->uri) : NULL;
3037 }
3038 
3039 static gboolean
gst_app_src_uri_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)3040 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
3041     GError ** error)
3042 {
3043   GstAppSrc *appsrc = GST_APP_SRC (handler);
3044 
3045   g_free (appsrc->priv->uri);
3046   appsrc->priv->uri = uri ? g_strdup (uri) : NULL;
3047 
3048   return TRUE;
3049 }
3050 
3051 static void
gst_app_src_uri_handler_init(gpointer g_iface,gpointer iface_data)3052 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
3053 {
3054   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
3055 
3056   iface->get_type = gst_app_src_uri_get_type;
3057   iface->get_protocols = gst_app_src_uri_get_protocols;
3058   iface->get_uri = gst_app_src_uri_get_uri;
3059   iface->set_uri = gst_app_src_uri_set_uri;
3060 }
3061 
3062 static gboolean
gst_app_src_event(GstBaseSrc * src,GstEvent * event)3063 gst_app_src_event (GstBaseSrc * src, GstEvent * event)
3064 {
3065   GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
3066   GstAppSrcPrivate *priv = appsrc->priv;
3067 
3068   switch (GST_EVENT_TYPE (event)) {
3069     case GST_EVENT_FLUSH_STOP:
3070       g_mutex_lock (&priv->mutex);
3071       priv->is_eos = FALSE;
3072       g_mutex_unlock (&priv->mutex);
3073       break;
3074     default:
3075       break;
3076   }
3077 
3078   return GST_BASE_SRC_CLASS (parent_class)->event (src, event);
3079 }
3080