1 /* GStreamer
2 * Copyright (C) 2007 David Schleef <ds@schleef.org>
3 * (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
14 *
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20 /**
21 * SECTION:gstappsrc
22 * @title: GstAppSrc
23 * @short_description: Easy way for applications to inject buffers into a
24 * pipeline
25 * @see_also: #GstBaseSrc, appsink
26 *
27 * The appsrc element can be used by applications to insert data into a
28 * GStreamer pipeline. Unlike most GStreamer elements, appsrc provides
29 * external API functions.
30 *
31 * appsrc can be used by linking with the libgstapp library to access the
32 * methods directly or by using the appsrc action signals.
33 *
34 * Before operating appsrc, the caps property must be set to fixed caps
35 * describing the format of the data that will be pushed with appsrc. An
36 * exception to this is when pushing buffers with unknown caps, in which case no
37 * caps should be set. This is typically true of file-like sources that push raw
38 * byte buffers. If you don't want to explicitly set the caps, you can use
39 * gst_app_src_push_sample. This method gets the caps associated with the
40 * sample and sets them on the appsrc replacing any previously set caps (if
41 * different from sample's caps).
42 *
43 * The main way of handing data to the appsrc element is by calling the
44 * gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
45 * This will put the buffer onto a queue from which appsrc will read from in its
46 * streaming thread. It is important to note that data transport will not happen
47 * from the thread that performed the push-buffer call.
48 *
49 * The "max-bytes", "max-buffers" and "max-time" properties control how much
50 * data can be queued in appsrc before appsrc considers the queue full. A
51 * filled internal queue will always signal the "enough-data" signal, which
52 * signals the application that it should stop pushing data into appsrc. The
53 * "block" property will cause appsrc to block the push-buffer method until
54 * free data becomes available again.
55 *
56 * When the internal queue is running out of data, the "need-data" signal is
57 * emitted, which signals the application that it should start pushing more data
58 * into appsrc.
59 *
60 * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
61 * "seek-data" signal when the "stream-mode" property is set to "seekable" or
62 * "random-access". The signal argument will contain the new desired position in
63 * the stream expressed in the unit set with the "format" property. After
64 * receiving the seek-data signal, the application should push-buffers from the
65 * new position.
66 *
67 * These signals allow the application to operate the appsrc in two different
68 * ways:
69 *
70 * The push mode, in which the application repeatedly calls the push-buffer/push-sample
71 * method with a new buffer/sample. Optionally, the queue size in the appsrc
72 * can be controlled with the enough-data and need-data signals by respectively
73 * stopping/starting the push-buffer/push-sample calls. This is a typical
74 * mode of operation for the stream-type "stream" and "seekable". Use this
75 * mode when implementing various network protocols or hardware devices.
76 *
77 * The pull mode, in which the need-data signal triggers the next push-buffer call.
78 * This mode is typically used in the "random-access" stream-type. Use this
79 * mode for file access or other randomly accessible sources. In this mode, a
80 * buffer of exactly the amount of bytes given by the need-data signal should be
81 * pushed into appsrc.
82 *
83 * In all modes, the size property on appsrc should contain the total stream
84 * size in bytes. Setting this property is mandatory in the random-access mode.
85 * For the stream and seekable modes, setting this property is optional but
86 * recommended.
87 *
88 * When the application has finished pushing data into appsrc, it should call
89 * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
90 * this call, no more buffers can be pushed into appsrc until a flushing seek
91 * occurs or the state of the appsrc has gone through READY.
92 */
93
94 #ifdef HAVE_CONFIG_H
95 #include "config.h"
96 #endif
97
98 #include <gst/gst.h>
99 #include <gst/base/base.h>
100
101 #include <string.h>
102
103 #include "gstappsrc.h"
104
105 typedef enum
106 {
107 NOONE_WAITING = 0,
108 STREAM_WAITING = 1 << 0, /* streaming thread is waiting for application thread */
109 APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */
110 } GstAppSrcWaitStatus;
111
112 typedef struct
113 {
114 GstAppSrcCallbacks callbacks;
115 gpointer user_data;
116 GDestroyNotify destroy_notify;
117 gint ref_count;
118 } Callbacks;
119
120 static Callbacks *
callbacks_ref(Callbacks * callbacks)121 callbacks_ref (Callbacks * callbacks)
122 {
123 g_atomic_int_inc (&callbacks->ref_count);
124
125 return callbacks;
126 }
127
128 static void
callbacks_unref(Callbacks * callbacks)129 callbacks_unref (Callbacks * callbacks)
130 {
131 if (!g_atomic_int_dec_and_test (&callbacks->ref_count))
132 return;
133
134 if (callbacks->destroy_notify)
135 callbacks->destroy_notify (callbacks->user_data);
136
137 g_free (callbacks);
138 }
139
140
141 struct _GstAppSrcPrivate
142 {
143 GCond cond;
144 GMutex mutex;
145 GstQueueArray *queue;
146 GstAppSrcWaitStatus wait_status;
147
148 GstCaps *last_caps;
149 GstCaps *current_caps;
150 /* last segment received on the input */
151 GstSegment last_segment;
152 /* currently configured segment for the output */
153 GstSegment current_segment;
154 /* queue up a segment event based on last_segment before
155 * the next buffer of buffer list */
156 gboolean pending_custom_segment;
157
158 /* the next buffer that will be queued needs a discont flag
159 * because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */
160 gboolean need_discont_upstream;
161 /* the next buffer that will be dequeued needs a discont flag
162 * because the previous one was dropped - GST_APP_LEAKY_TYPE_DOWNSTREAM */
163 gboolean need_discont_downstream;
164
165 gint64 size;
166 GstClockTime duration;
167 GstAppStreamType stream_type;
168 guint64 max_bytes, max_buffers, max_time;
169 GstFormat format;
170 gboolean block;
171 gchar *uri;
172
173 gboolean flushing;
174 gboolean started;
175 gboolean is_eos;
176 guint64 queued_bytes, queued_buffers;
177 /* Used to calculate the current time level */
178 GstClockTime last_in_running_time, last_out_running_time;
179 /* Updated based on the above whenever they change */
180 GstClockTime queued_time;
181 guint64 offset;
182 GstAppStreamType current_type;
183
184 guint64 min_latency;
185 guint64 max_latency;
186 gboolean emit_signals;
187 guint min_percent;
188 gboolean handle_segment_change;
189
190 GstAppLeakyType leaky_type;
191
192 Callbacks *callbacks;
193 #ifdef OHOS_OPT_COMPAT
194 // ohos.opt.compat.0060
195 gboolean datasrc_mode;
196 #endif
197 };
198
199 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
200 #define GST_CAT_DEFAULT app_src_debug
201
202 enum
203 {
204 /* signals */
205 SIGNAL_NEED_DATA,
206 SIGNAL_ENOUGH_DATA,
207 SIGNAL_SEEK_DATA,
208
209 /* actions */
210 SIGNAL_PUSH_BUFFER,
211 SIGNAL_END_OF_STREAM,
212 SIGNAL_PUSH_SAMPLE,
213 SIGNAL_PUSH_BUFFER_LIST,
214
215 LAST_SIGNAL
216 };
217
218 #define DEFAULT_PROP_SIZE -1
219 #define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM
220 #define DEFAULT_PROP_MAX_BYTES 200000
221 #define DEFAULT_PROP_MAX_BUFFERS 0
222 #define DEFAULT_PROP_MAX_TIME (0 * GST_SECOND)
223 #define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES
224 #define DEFAULT_PROP_BLOCK FALSE
225 #define DEFAULT_PROP_IS_LIVE FALSE
226 #define DEFAULT_PROP_MIN_LATENCY -1
227 #define DEFAULT_PROP_MAX_LATENCY -1
228 #define DEFAULT_PROP_EMIT_SIGNALS TRUE
229 #define DEFAULT_PROP_MIN_PERCENT 0
230 #define DEFAULT_PROP_CURRENT_LEVEL_BYTES 0
231 #define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0
232 #define DEFAULT_PROP_CURRENT_LEVEL_TIME 0
233 #define DEFAULT_PROP_DURATION GST_CLOCK_TIME_NONE
234 #define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE
235 #define DEFAULT_PROP_LEAKY_TYPE GST_APP_LEAKY_TYPE_NONE
236
237 enum
238 {
239 PROP_0,
240 PROP_CAPS,
241 PROP_SIZE,
242 PROP_STREAM_TYPE,
243 PROP_MAX_BYTES,
244 PROP_MAX_BUFFERS,
245 PROP_MAX_TIME,
246 PROP_FORMAT,
247 PROP_BLOCK,
248 PROP_IS_LIVE,
249 PROP_MIN_LATENCY,
250 PROP_MAX_LATENCY,
251 PROP_EMIT_SIGNALS,
252 PROP_MIN_PERCENT,
253 PROP_CURRENT_LEVEL_BYTES,
254 PROP_CURRENT_LEVEL_BUFFERS,
255 PROP_CURRENT_LEVEL_TIME,
256 PROP_DURATION,
257 PROP_HANDLE_SEGMENT_CHANGE,
258 PROP_LEAKY_TYPE,
259 #ifdef OHOS_OPT_COMPAT
260 // ohos.opt.compat.0060
261 PROP_DATASRC_MODE,
262 #endif
263 PROP_LAST
264 };
265
266 static GstStaticPadTemplate gst_app_src_template =
267 GST_STATIC_PAD_TEMPLATE ("src",
268 GST_PAD_SRC,
269 GST_PAD_ALWAYS,
270 GST_STATIC_CAPS_ANY);
271
272 static void gst_app_src_uri_handler_init (gpointer g_iface,
273 gpointer iface_data);
274
275 static void gst_app_src_dispose (GObject * object);
276 static void gst_app_src_finalize (GObject * object);
277
278 static void gst_app_src_set_property (GObject * object, guint prop_id,
279 const GValue * value, GParamSpec * pspec);
280 static void gst_app_src_get_property (GObject * object, guint prop_id,
281 GValue * value, GParamSpec * pspec);
282
283 static gboolean gst_app_src_send_event (GstElement * element, GstEvent * event);
284
285 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
286 gboolean do_min, guint64 min, gboolean do_max, guint64 max);
287
288 static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc);
289 static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc,
290 GstCaps * filter);
291 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset,
292 guint size, GstBuffer ** buf);
293 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
294 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
295 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
296 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
297 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
298 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
299 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
300 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
301 static gboolean gst_app_src_event (GstBaseSrc * src, GstEvent * event);
302
303 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
304 GstBuffer * buffer);
305 static GstFlowReturn gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
306 GstBufferList * buffer_list);
307 static GstFlowReturn gst_app_src_push_sample_action (GstAppSrc * appsrc,
308 GstSample * sample);
309
310 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
311
312 #define gst_app_src_parent_class parent_class
313 G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
314 G_ADD_PRIVATE (GstAppSrc)
315 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));
316
317 static void
gst_app_src_class_init(GstAppSrcClass * klass)318 gst_app_src_class_init (GstAppSrcClass * klass)
319 {
320 GObjectClass *gobject_class = (GObjectClass *) klass;
321 GstElementClass *element_class = (GstElementClass *) klass;
322 GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
323
324 GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
325
326 gobject_class->dispose = gst_app_src_dispose;
327 gobject_class->finalize = gst_app_src_finalize;
328
329 gobject_class->set_property = gst_app_src_set_property;
330 gobject_class->get_property = gst_app_src_get_property;
331
332 /**
333 * GstAppSrc:caps:
334 *
335 * The GstCaps that will negotiated downstream and will be put
336 * on outgoing buffers.
337 */
338 g_object_class_install_property (gobject_class, PROP_CAPS,
339 g_param_spec_boxed ("caps", "Caps",
340 "The allowed caps for the src pad", GST_TYPE_CAPS,
341 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
342 /**
343 * GstAppSrc:format:
344 *
345 * The format to use for segment events. When the source is producing
346 * timestamped buffers this property should be set to GST_FORMAT_TIME.
347 */
348 g_object_class_install_property (gobject_class, PROP_FORMAT,
349 g_param_spec_enum ("format", "Format",
350 "The format of the segment events and seek", GST_TYPE_FORMAT,
351 DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
352 /**
353 * GstAppSrc:size:
354 *
355 * The total size in bytes of the data stream. If the total size is known, it
356 * is recommended to configure it with this property.
357 */
358 g_object_class_install_property (gobject_class, PROP_SIZE,
359 g_param_spec_int64 ("size", "Size",
360 "The size of the data stream in bytes (-1 if unknown)",
361 -1, G_MAXINT64, DEFAULT_PROP_SIZE,
362 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363 /**
364 * GstAppSrc:stream-type:
365 *
366 * The type of stream that this source is producing. For seekable streams the
367 * application should connect to the seek-data signal.
368 */
369 g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
370 g_param_spec_enum ("stream-type", "Stream Type",
371 "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
372 DEFAULT_PROP_STREAM_TYPE,
373 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374 /**
375 * GstAppSrc:max-bytes:
376 *
377 * The maximum amount of bytes that can be queued internally.
378 * After the maximum amount of bytes are queued, appsrc will emit the
379 * "enough-data" signal.
380 */
381 g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
382 g_param_spec_uint64 ("max-bytes", "Max bytes",
383 "The maximum number of bytes to queue internally (0 = unlimited)",
384 0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
385 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
386
387 /**
388 * GstAppSrc:max-buffers:
389 *
390 * The maximum amount of buffers that can be queued internally.
391 * After the maximum amount of buffers are queued, appsrc will emit the
392 * "enough-data" signal.
393 *
394 * Since: 1.20
395 */
396 g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
397 g_param_spec_uint64 ("max-buffers", "Max buffers",
398 "The maximum number of buffers to queue internally (0 = unlimited)",
399 0, G_MAXUINT64, DEFAULT_PROP_MAX_BUFFERS,
400 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
401
402 /**
403 * GstAppSrc:max-time:
404 *
405 * The maximum amount of time that can be queued internally.
406 * After the maximum amount of time are queued, appsrc will emit the
407 * "enough-data" signal.
408 *
409 * Since: 1.20
410 */
411 g_object_class_install_property (gobject_class, PROP_MAX_TIME,
412 g_param_spec_uint64 ("max-time", "Max time",
413 "The maximum amount of time to queue internally (0 = unlimited)",
414 0, G_MAXUINT64, DEFAULT_PROP_MAX_TIME,
415 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
416
417 /**
418 * GstAppSrc:block:
419 *
420 * When max-bytes are queued and after the enough-data signal has been emitted,
421 * block any further push-buffer calls until the amount of queued bytes drops
422 * below the max-bytes limit.
423 */
424 g_object_class_install_property (gobject_class, PROP_BLOCK,
425 g_param_spec_boolean ("block", "Block",
426 "Block push-buffer when max-bytes are queued",
427 DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428
429 /**
430 * GstAppSrc:is-live:
431 *
432 * Instruct the source to behave like a live source. This includes that it
433 * will only push out buffers in the PLAYING state.
434 */
435 g_object_class_install_property (gobject_class, PROP_IS_LIVE,
436 g_param_spec_boolean ("is-live", "Is Live",
437 "Whether to act as a live source",
438 DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
439 /**
440 * GstAppSrc:min-latency:
441 *
442 * The minimum latency of the source. A value of -1 will use the default
443 * latency calculations of #GstBaseSrc.
444 */
445 g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
446 g_param_spec_int64 ("min-latency", "Min Latency",
447 "The minimum latency (-1 = default)",
448 -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
449 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
450 /**
451 * GstAppSrc::max-latency:
452 *
453 * The maximum latency of the source. A value of -1 means an unlimited amount
454 * of latency.
455 */
456 g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
457 g_param_spec_int64 ("max-latency", "Max Latency",
458 "The maximum latency (-1 = unlimited)",
459 -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
460 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
461
462 /**
463 * GstAppSrc:emit-signals:
464 *
465 * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
466 * This option is by default enabled for backwards compatibility reasons but
467 * can disabled when needed because signal emission is expensive.
468 */
469 g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
470 g_param_spec_boolean ("emit-signals", "Emit signals",
471 "Emit need-data, enough-data and seek-data signals",
472 DEFAULT_PROP_EMIT_SIGNALS,
473 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
474
475 /**
476 * GstAppSrc:min-percent:
477 *
478 * Make appsrc emit the "need-data" signal when the amount of bytes in the
479 * queue drops below this percentage of max-bytes.
480 */
481 g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
482 g_param_spec_uint ("min-percent", "Min Percent",
483 "Emit need-data when queued bytes drops below this percent of max-bytes",
484 0, 100, DEFAULT_PROP_MIN_PERCENT,
485 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
486
487 /**
488 * GstAppSrc:current-level-bytes:
489 *
490 * The number of currently queued bytes inside appsrc.
491 *
492 * Since: 1.2
493 */
494 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BYTES,
495 g_param_spec_uint64 ("current-level-bytes", "Current Level Bytes",
496 "The number of currently queued bytes",
497 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES,
498 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
499
500 /**
501 * GstAppSrc:current-level-buffers:
502 *
503 * The number of currently queued buffers inside appsrc.
504 *
505 * Since: 1.20
506 */
507 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BUFFERS,
508 g_param_spec_uint64 ("current-level-buffers", "Current Level Buffers",
509 "The number of currently queued buffers",
510 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BUFFERS,
511 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
512
513 /**
514 * GstAppSrc:current-level-time:
515 *
516 * The amount of currently queued time inside appsrc.
517 *
518 * Since: 1.20
519 */
520 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME,
521 g_param_spec_uint64 ("current-level-time", "Current Level Time",
522 "The amount of currently queued time",
523 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_TIME,
524 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
525
526 /**
527 * GstAppSrc:duration:
528 *
529 * The total duration in nanoseconds of the data stream. If the total duration is known, it
530 * is recommended to configure it with this property.
531 *
532 * Since: 1.10
533 */
534 g_object_class_install_property (gobject_class, PROP_DURATION,
535 g_param_spec_uint64 ("duration", "Duration",
536 "The duration of the data stream in nanoseconds (GST_CLOCK_TIME_NONE if unknown)",
537 0, G_MAXUINT64, DEFAULT_PROP_DURATION,
538 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
539
540 /**
541 * GstAppSrc:handle-segment-change:
542 *
543 * When enabled, appsrc will check GstSegment in GstSample which was
544 * pushed via gst_app_src_push_sample() or "push-sample" signal action.
545 * If a GstSegment is changed, corresponding segment event will be followed
546 * by next data flow.
547 *
548 * FIXME: currently only GST_FORMAT_TIME format is supported and therefore
549 * GstAppSrc::format should be time. However, possibly #GstAppSrc can support
550 * other formats.
551 *
552 * Since: 1.18
553 */
554 g_object_class_install_property (gobject_class, PROP_HANDLE_SEGMENT_CHANGE,
555 g_param_spec_boolean ("handle-segment-change", "Handle Segment Change",
556 "Whether to detect and handle changed time format GstSegment in "
557 "GstSample. User should set valid GstSegment in GstSample. "
558 "Must set format property as \"time\" to enable this property",
559 DEFAULT_PROP_HANDLE_SEGMENT_CHANGE,
560 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
561 G_PARAM_STATIC_STRINGS));
562
563 /**
564 * GstAppSrc:leaky-type:
565 *
566 * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
567 * will drop any buffers that are pushed into it once its internal queue is
568 * full. The selected type defines whether to drop the oldest or new
569 * buffers.
570 *
571 * Since: 1.20
572 */
573 g_object_class_install_property (gobject_class, PROP_LEAKY_TYPE,
574 g_param_spec_enum ("leaky-type", "Leaky Type",
575 "Whether to drop buffers once the internal queue is full",
576 GST_TYPE_APP_LEAKY_TYPE,
577 DEFAULT_PROP_LEAKY_TYPE,
578 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
579 G_PARAM_STATIC_STRINGS));
580
581 #ifdef OHOS_OPT_COMPAT
582 // ohos.opt.compat.0060
583 g_object_class_install_property (gobject_class, PROP_DATASRC_MODE,
584 g_param_spec_boolean ("datasrc-mode", "Datasrc-Mode", "Datasrc Mode",
585 FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
586 #endif
587
588 /**
589 * GstAppSrc::need-data:
590 * @appsrc: the appsrc element that emitted the signal
591 * @length: the amount of bytes needed.
592 *
593 * Signal that the source needs more data. In the callback or from another
594 * thread you should call push-buffer or end-of-stream.
595 *
596 * @length is just a hint and when it is set to -1, any number of bytes can be
597 * pushed into @appsrc.
598 *
599 * You can call push-buffer multiple times until the enough-data signal is
600 * fired.
601 */
602 gst_app_src_signals[SIGNAL_NEED_DATA] =
603 g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
604 G_STRUCT_OFFSET (GstAppSrcClass, need_data),
605 NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
606
607 /**
608 * GstAppSrc::enough-data:
609 * @appsrc: the appsrc element that emitted the signal
610 *
611 * Signal that the source has enough data. It is recommended that the
612 * application stops calling push-buffer until the need-data signal is
613 * emitted again to avoid excessive buffer queueing.
614 */
615 gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
616 g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
617 G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
618 NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
619
620 /**
621 * GstAppSrc::seek-data:
622 * @appsrc: the appsrc element that emitted the signal
623 * @offset: the offset to seek to
624 *
625 * Seek to the given offset. The next push-buffer should produce buffers from
626 * the new @offset.
627 * This callback is only called for seekable stream types.
628 *
629 * Returns: %TRUE if the seek succeeded.
630 */
631 gst_app_src_signals[SIGNAL_SEEK_DATA] =
632 g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
633 G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
634 NULL, NULL, NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
635
636 /**
637 * GstAppSrc::push-buffer:
638 * @appsrc: the appsrc
639 * @buffer: (transfer none): a buffer to push
640 *
641 * Adds a buffer to the queue of buffers that the appsrc element will
642 * push to its source pad.
643 *
644 * This function does not take ownership of the buffer, but it takes a
645 * reference so the buffer can be unreffed at any time after calling this
646 * function.
647 *
648 * When the block property is TRUE, this function can block until free space
649 * becomes available in the queue.
650 */
651 gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
652 g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
653 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
654 push_buffer), NULL, NULL, NULL,
655 GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
656
657 /**
658 * GstAppSrc::push-buffer-list:
659 * @appsrc: the appsrc
660 * @buffer_list: (transfer none): a buffer list to push
661 *
662 * Adds a buffer list to the queue of buffers and buffer lists that the
663 * appsrc element will push to its source pad.
664 *
665 * This function does not take ownership of the buffer list, but it takes a
666 * reference so the buffer list can be unreffed at any time after calling
667 * this function.
668 *
669 * When the block property is TRUE, this function can block until free space
670 * becomes available in the queue.
671 *
672 * Since: 1.14
673 */
674 gst_app_src_signals[SIGNAL_PUSH_BUFFER_LIST] =
675 g_signal_new ("push-buffer-list", G_TYPE_FROM_CLASS (klass),
676 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
677 push_buffer_list), NULL, NULL, NULL,
678 GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER_LIST);
679
680 /**
681 * GstAppSrc::push-sample:
682 * @appsrc: the appsrc
683 * @sample: (transfer none): a sample from which extract buffer to push
684 *
685 * Extract a buffer from the provided sample and adds the extracted buffer
686 * to the queue of buffers that the appsrc element will
687 * push to its source pad. This function set the appsrc caps based on the caps
688 * in the sample and reset the caps if they change.
689 * Only the caps and the buffer of the provided sample are used and not
690 * for example the segment in the sample.
691 *
692 * This function does not take ownership of the sample, but it takes a
693 * reference so the sample can be unreffed at any time after calling this
694 * function.
695 *
696 * When the block property is TRUE, this function can block until free space
697 * becomes available in the queue.
698 *
699 * Since: 1.6
700 */
701 gst_app_src_signals[SIGNAL_PUSH_SAMPLE] =
702 g_signal_new ("push-sample", G_TYPE_FROM_CLASS (klass),
703 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
704 push_sample), NULL, NULL, NULL,
705 GST_TYPE_FLOW_RETURN, 1, GST_TYPE_SAMPLE);
706
707
708 /**
709 * GstAppSrc::end-of-stream:
710 * @appsrc: the appsrc
711 *
712 * Notify @appsrc that no more buffer are available.
713 */
714 gst_app_src_signals[SIGNAL_END_OF_STREAM] =
715 g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
716 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
717 end_of_stream), NULL, NULL, NULL,
718 GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
719
720 gst_element_class_set_static_metadata (element_class, "AppSrc",
721 "Generic/Source", "Allow the application to feed buffers to a pipeline",
722 "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
723
724 gst_element_class_add_static_pad_template (element_class,
725 &gst_app_src_template);
726
727 element_class->send_event = gst_app_src_send_event;
728
729 basesrc_class->negotiate = gst_app_src_negotiate;
730 basesrc_class->get_caps = gst_app_src_internal_get_caps;
731 basesrc_class->create = gst_app_src_create;
732 basesrc_class->start = gst_app_src_start;
733 basesrc_class->stop = gst_app_src_stop;
734 basesrc_class->unlock = gst_app_src_unlock;
735 basesrc_class->unlock_stop = gst_app_src_unlock_stop;
736 basesrc_class->do_seek = gst_app_src_do_seek;
737 basesrc_class->is_seekable = gst_app_src_is_seekable;
738 basesrc_class->get_size = gst_app_src_do_get_size;
739 basesrc_class->query = gst_app_src_query;
740 basesrc_class->event = gst_app_src_event;
741
742 klass->push_buffer = gst_app_src_push_buffer_action;
743 klass->push_buffer_list = gst_app_src_push_buffer_list_action;
744 klass->push_sample = gst_app_src_push_sample_action;
745 klass->end_of_stream = gst_app_src_end_of_stream;
746 }
747
748 static void
gst_app_src_init(GstAppSrc * appsrc)749 gst_app_src_init (GstAppSrc * appsrc)
750 {
751 GstAppSrcPrivate *priv;
752
753 priv = appsrc->priv = gst_app_src_get_instance_private (appsrc);
754
755 g_mutex_init (&priv->mutex);
756 g_cond_init (&priv->cond);
757 priv->queue = gst_queue_array_new (16);
758 priv->wait_status = NOONE_WAITING;
759
760 priv->size = DEFAULT_PROP_SIZE;
761 priv->duration = DEFAULT_PROP_DURATION;
762 priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
763 priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
764 priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
765 priv->max_time = DEFAULT_PROP_MAX_TIME;
766 priv->format = DEFAULT_PROP_FORMAT;
767 priv->block = DEFAULT_PROP_BLOCK;
768 priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
769 priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
770 priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
771 priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
772 priv->datasrc_mode = FALSE;
773 priv->handle_segment_change = DEFAULT_PROP_HANDLE_SEGMENT_CHANGE;
774 priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE;
775
776 gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
777 }
778
779 /* Must be called with priv->mutex */
780 static void
gst_app_src_flush_queued(GstAppSrc * src,gboolean retain_last_caps)781 gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
782 {
783 GstMiniObject *obj;
784 GstAppSrcPrivate *priv = src->priv;
785 GstCaps *requeue_caps = NULL;
786
787 while (!gst_queue_array_is_empty (priv->queue)) {
788 obj = gst_queue_array_pop_head (priv->queue);
789 if (obj) {
790 if (GST_IS_CAPS (obj) && retain_last_caps) {
791 gst_caps_replace (&requeue_caps, GST_CAPS_CAST (obj));
792 }
793 gst_mini_object_unref (obj);
794 }
795 }
796
797 if (requeue_caps) {
798 gst_queue_array_push_tail (priv->queue, requeue_caps);
799 }
800
801 priv->queued_bytes = 0;
802 priv->queued_buffers = 0;
803 priv->queued_time = 0;
804 priv->last_in_running_time = GST_CLOCK_TIME_NONE;
805 priv->last_out_running_time = GST_CLOCK_TIME_NONE;
806 priv->need_discont_upstream = FALSE;
807 priv->need_discont_downstream = FALSE;
808 }
809
810 static void
gst_app_src_dispose(GObject * obj)811 gst_app_src_dispose (GObject * obj)
812 {
813 GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
814 GstAppSrcPrivate *priv = appsrc->priv;
815 Callbacks *callbacks = NULL;
816
817 GST_OBJECT_LOCK (appsrc);
818 if (priv->current_caps) {
819 gst_caps_unref (priv->current_caps);
820 priv->current_caps = NULL;
821 }
822 if (priv->last_caps) {
823 gst_caps_unref (priv->last_caps);
824 priv->last_caps = NULL;
825 }
826 GST_OBJECT_UNLOCK (appsrc);
827
828 g_mutex_lock (&priv->mutex);
829 if (priv->callbacks)
830 callbacks = g_steal_pointer (&priv->callbacks);
831 gst_app_src_flush_queued (appsrc, FALSE);
832 g_mutex_unlock (&priv->mutex);
833
834 g_clear_pointer (&callbacks, callbacks_unref);
835
836 G_OBJECT_CLASS (parent_class)->dispose (obj);
837 }
838
839 static void
gst_app_src_finalize(GObject * obj)840 gst_app_src_finalize (GObject * obj)
841 {
842 GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
843 GstAppSrcPrivate *priv = appsrc->priv;
844
845 g_mutex_clear (&priv->mutex);
846 g_cond_clear (&priv->cond);
847 gst_queue_array_free (priv->queue);
848
849 g_free (priv->uri);
850
851 G_OBJECT_CLASS (parent_class)->finalize (obj);
852 }
853
854 static GstCaps *
gst_app_src_internal_get_caps(GstBaseSrc * bsrc,GstCaps * filter)855 gst_app_src_internal_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
856 {
857 GstAppSrc *appsrc = GST_APP_SRC (bsrc);
858 GstCaps *caps;
859
860 GST_OBJECT_LOCK (appsrc);
861 if ((caps = appsrc->priv->current_caps))
862 gst_caps_ref (caps);
863 GST_OBJECT_UNLOCK (appsrc);
864
865 if (filter) {
866 if (caps) {
867 GstCaps *intersection =
868 gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
869 gst_caps_unref (caps);
870 caps = intersection;
871 } else {
872 caps = gst_caps_ref (filter);
873 }
874 }
875
876 GST_DEBUG_OBJECT (appsrc, "caps: %" GST_PTR_FORMAT, caps);
877 return caps;
878 }
879
880 static void
gst_app_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)881 gst_app_src_set_property (GObject * object, guint prop_id,
882 const GValue * value, GParamSpec * pspec)
883 {
884 GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
885 GstAppSrcPrivate *priv = appsrc->priv;
886
887 switch (prop_id) {
888 case PROP_CAPS:
889 gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
890 break;
891 case PROP_SIZE:
892 gst_app_src_set_size (appsrc, g_value_get_int64 (value));
893 break;
894 case PROP_STREAM_TYPE:
895 gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
896 break;
897 case PROP_MAX_BYTES:
898 gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
899 break;
900 case PROP_MAX_BUFFERS:
901 gst_app_src_set_max_buffers (appsrc, g_value_get_uint64 (value));
902 break;
903 case PROP_MAX_TIME:
904 gst_app_src_set_max_time (appsrc, g_value_get_uint64 (value));
905 break;
906 case PROP_FORMAT:
907 priv->format = g_value_get_enum (value);
908 break;
909 case PROP_BLOCK:
910 priv->block = g_value_get_boolean (value);
911 break;
912 case PROP_IS_LIVE:
913 gst_base_src_set_live (GST_BASE_SRC (appsrc),
914 g_value_get_boolean (value));
915 break;
916 case PROP_MIN_LATENCY:
917 gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
918 FALSE, -1);
919 break;
920 case PROP_MAX_LATENCY:
921 gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
922 g_value_get_int64 (value));
923 break;
924 case PROP_EMIT_SIGNALS:
925 gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
926 break;
927 case PROP_MIN_PERCENT:
928 priv->min_percent = g_value_get_uint (value);
929 break;
930 case PROP_DURATION:
931 gst_app_src_set_duration (appsrc, g_value_get_uint64 (value));
932 break;
933 case PROP_HANDLE_SEGMENT_CHANGE:
934 priv->handle_segment_change = g_value_get_boolean (value);
935 break;
936 case PROP_LEAKY_TYPE:
937 priv->leaky_type = g_value_get_enum (value);
938 break;
939 #ifdef OHOS_OPT_COMPAT
940 // ohos.opt.compat.0060
941 case PROP_DATASRC_MODE:
942 priv->datasrc_mode = g_value_get_boolean (value);
943 #endif
944 default:
945 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
946 break;
947 }
948 }
949
950 static void
gst_app_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)951 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
952 GParamSpec * pspec)
953 {
954 GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
955 GstAppSrcPrivate *priv = appsrc->priv;
956
957 switch (prop_id) {
958 case PROP_CAPS:
959 g_value_take_boxed (value, gst_app_src_get_caps (appsrc));
960 break;
961 case PROP_SIZE:
962 g_value_set_int64 (value, gst_app_src_get_size (appsrc));
963 break;
964 case PROP_STREAM_TYPE:
965 g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
966 break;
967 case PROP_MAX_BYTES:
968 g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
969 break;
970 case PROP_MAX_BUFFERS:
971 g_value_set_uint64 (value, gst_app_src_get_max_buffers (appsrc));
972 break;
973 case PROP_MAX_TIME:
974 g_value_set_uint64 (value, gst_app_src_get_max_time (appsrc));
975 break;
976 case PROP_FORMAT:
977 g_value_set_enum (value, priv->format);
978 break;
979 case PROP_BLOCK:
980 g_value_set_boolean (value, priv->block);
981 break;
982 case PROP_IS_LIVE:
983 g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
984 break;
985 case PROP_MIN_LATENCY:
986 {
987 guint64 min = 0;
988
989 gst_app_src_get_latency (appsrc, &min, NULL);
990 g_value_set_int64 (value, min);
991 break;
992 }
993 case PROP_MAX_LATENCY:
994 {
995 guint64 max = 0;
996
997 gst_app_src_get_latency (appsrc, NULL, &max);
998 g_value_set_int64 (value, max);
999 break;
1000 }
1001 case PROP_EMIT_SIGNALS:
1002 g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
1003 break;
1004 case PROP_MIN_PERCENT:
1005 g_value_set_uint (value, priv->min_percent);
1006 break;
1007 case PROP_CURRENT_LEVEL_BYTES:
1008 g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc));
1009 break;
1010 case PROP_CURRENT_LEVEL_BUFFERS:
1011 g_value_set_uint64 (value,
1012 gst_app_src_get_current_level_buffers (appsrc));
1013 break;
1014 case PROP_CURRENT_LEVEL_TIME:
1015 g_value_set_uint64 (value, gst_app_src_get_current_level_time (appsrc));
1016 break;
1017 case PROP_DURATION:
1018 g_value_set_uint64 (value, gst_app_src_get_duration (appsrc));
1019 break;
1020 case PROP_HANDLE_SEGMENT_CHANGE:
1021 g_value_set_boolean (value, priv->handle_segment_change);
1022 break;
1023 case PROP_LEAKY_TYPE:
1024 g_value_set_enum (value, priv->leaky_type);
1025 break;
1026 default:
1027 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1028 break;
1029 }
1030 }
1031
1032 static gboolean
gst_app_src_send_event(GstElement * element,GstEvent * event)1033 gst_app_src_send_event (GstElement * element, GstEvent * event)
1034 {
1035 GstAppSrc *appsrc = GST_APP_SRC_CAST (element);
1036 GstAppSrcPrivate *priv = appsrc->priv;
1037
1038 switch (GST_EVENT_TYPE (event)) {
1039 case GST_EVENT_FLUSH_STOP:
1040 g_mutex_lock (&priv->mutex);
1041 gst_app_src_flush_queued (appsrc, TRUE);
1042 g_mutex_unlock (&priv->mutex);
1043 break;
1044 default:
1045 if (GST_EVENT_IS_SERIALIZED (event)) {
1046 GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event);
1047 g_mutex_lock (&priv->mutex);
1048 gst_queue_array_push_tail (priv->queue, event);
1049
1050 if ((priv->wait_status & STREAM_WAITING))
1051 g_cond_broadcast (&priv->cond);
1052
1053 g_mutex_unlock (&priv->mutex);
1054 return TRUE;
1055 }
1056 break;
1057 }
1058
1059 return GST_CALL_PARENT_WITH_DEFAULT (GST_ELEMENT_CLASS, send_event, (element,
1060 event), FALSE);
1061 }
1062
1063 static gboolean
gst_app_src_unlock(GstBaseSrc * bsrc)1064 gst_app_src_unlock (GstBaseSrc * bsrc)
1065 {
1066 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1067 GstAppSrcPrivate *priv = appsrc->priv;
1068
1069 g_mutex_lock (&priv->mutex);
1070 GST_DEBUG_OBJECT (appsrc, "unlock start");
1071 priv->flushing = TRUE;
1072 g_cond_broadcast (&priv->cond);
1073 g_mutex_unlock (&priv->mutex);
1074
1075 return TRUE;
1076 }
1077
1078 static gboolean
gst_app_src_unlock_stop(GstBaseSrc * bsrc)1079 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
1080 {
1081 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1082 GstAppSrcPrivate *priv = appsrc->priv;
1083
1084 g_mutex_lock (&priv->mutex);
1085 GST_DEBUG_OBJECT (appsrc, "unlock stop");
1086 priv->flushing = FALSE;
1087 g_cond_broadcast (&priv->cond);
1088 g_mutex_unlock (&priv->mutex);
1089
1090 return TRUE;
1091 }
1092
1093 static gboolean
gst_app_src_start(GstBaseSrc * bsrc)1094 gst_app_src_start (GstBaseSrc * bsrc)
1095 {
1096 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1097 GstAppSrcPrivate *priv = appsrc->priv;
1098
1099 g_mutex_lock (&priv->mutex);
1100 GST_DEBUG_OBJECT (appsrc, "starting");
1101 priv->started = TRUE;
1102 /* set the offset to -1 so that we always do a first seek. This is only used
1103 * in random-access mode. */
1104 priv->offset = -1;
1105 priv->flushing = FALSE;
1106 g_mutex_unlock (&priv->mutex);
1107
1108 gst_base_src_set_format (bsrc, priv->format);
1109 gst_segment_init (&priv->last_segment, priv->format);
1110 gst_segment_init (&priv->current_segment, priv->format);
1111 priv->pending_custom_segment = FALSE;
1112
1113 return TRUE;
1114 }
1115
1116 static gboolean
gst_app_src_stop(GstBaseSrc * bsrc)1117 gst_app_src_stop (GstBaseSrc * bsrc)
1118 {
1119 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1120 GstAppSrcPrivate *priv = appsrc->priv;
1121
1122 g_mutex_lock (&priv->mutex);
1123 GST_DEBUG_OBJECT (appsrc, "stopping");
1124 priv->is_eos = FALSE;
1125 priv->flushing = TRUE;
1126 priv->started = FALSE;
1127 gst_app_src_flush_queued (appsrc, TRUE);
1128 g_cond_broadcast (&priv->cond);
1129 g_mutex_unlock (&priv->mutex);
1130
1131 return TRUE;
1132 }
1133
1134 static gboolean
gst_app_src_is_seekable(GstBaseSrc * src)1135 gst_app_src_is_seekable (GstBaseSrc * src)
1136 {
1137 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1138 GstAppSrcPrivate *priv = appsrc->priv;
1139 gboolean res = FALSE;
1140
1141 switch (priv->stream_type) {
1142 case GST_APP_STREAM_TYPE_STREAM:
1143 break;
1144 case GST_APP_STREAM_TYPE_SEEKABLE:
1145 case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
1146 res = TRUE;
1147 break;
1148 }
1149 return res;
1150 }
1151
1152 static gboolean
gst_app_src_do_get_size(GstBaseSrc * src,guint64 * size)1153 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
1154 {
1155 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1156
1157 *size = gst_app_src_get_size (appsrc);
1158
1159 return TRUE;
1160 }
1161
1162 static gboolean
gst_app_src_query(GstBaseSrc * src,GstQuery * query)1163 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
1164 {
1165 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1166 GstAppSrcPrivate *priv = appsrc->priv;
1167 gboolean res;
1168
1169 switch (GST_QUERY_TYPE (query)) {
1170 case GST_QUERY_LATENCY:
1171 {
1172 GstClockTime min, max;
1173 gboolean live;
1174
1175 /* Query the parent class for the defaults */
1176 res = gst_base_src_query_latency (src, &live, &min, &max);
1177
1178 /* overwrite with our values when we need to */
1179 g_mutex_lock (&priv->mutex);
1180 if (priv->min_latency != -1) {
1181 min = priv->min_latency;
1182 max = priv->max_latency;
1183 }
1184 g_mutex_unlock (&priv->mutex);
1185
1186 gst_query_set_latency (query, live, min, max);
1187 break;
1188 }
1189 case GST_QUERY_SCHEDULING:
1190 {
1191 gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1192 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1193
1194 switch (priv->stream_type) {
1195 case GST_APP_STREAM_TYPE_STREAM:
1196 case GST_APP_STREAM_TYPE_SEEKABLE:
1197 break;
1198 case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
1199 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1200 break;
1201 }
1202 res = TRUE;
1203 break;
1204 }
1205 case GST_QUERY_DURATION:
1206 {
1207 GstFormat format;
1208 gst_query_parse_duration (query, &format, NULL);
1209 if (format == GST_FORMAT_BYTES) {
1210 gst_query_set_duration (query, format, priv->size);
1211 res = TRUE;
1212 } else if (format == GST_FORMAT_TIME) {
1213 if (priv->duration != GST_CLOCK_TIME_NONE) {
1214 gst_query_set_duration (query, format, priv->duration);
1215 res = TRUE;
1216 } else {
1217 res = FALSE;
1218 }
1219 } else {
1220 res = FALSE;
1221 }
1222 break;
1223 }
1224 default:
1225 res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
1226 break;
1227 }
1228
1229 return res;
1230 }
1231
1232 /* will be called in push mode */
1233 static gboolean
gst_app_src_do_seek(GstBaseSrc * src,GstSegment * segment)1234 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1235 {
1236 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1237 GstAppSrcPrivate *priv = appsrc->priv;
1238 gint64 desired_position;
1239 gboolean res = FALSE;
1240 gboolean emit;
1241 Callbacks *callbacks = NULL;
1242
1243 desired_position = segment->position;
1244
1245 /* no need to try to seek in streaming mode */
1246 if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
1247 return TRUE;
1248
1249 GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
1250 desired_position, gst_format_get_name (segment->format));
1251
1252 g_mutex_lock (&priv->mutex);
1253 emit = priv->emit_signals;
1254 if (priv->callbacks)
1255 callbacks = callbacks_ref (priv->callbacks);
1256 g_mutex_unlock (&priv->mutex);
1257
1258 if (callbacks && callbacks->callbacks.seek_data) {
1259 res =
1260 callbacks->callbacks.seek_data (appsrc, desired_position,
1261 callbacks->user_data);
1262 } else if (emit) {
1263 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1264 desired_position, &res);
1265 }
1266
1267 g_clear_pointer (&callbacks, callbacks_unref);
1268
1269 if (res) {
1270 GST_DEBUG_OBJECT (appsrc, "flushing queue");
1271 g_mutex_lock (&priv->mutex);
1272 gst_app_src_flush_queued (appsrc, TRUE);
1273 gst_segment_copy_into (segment, &priv->last_segment);
1274 gst_segment_copy_into (segment, &priv->current_segment);
1275 priv->pending_custom_segment = FALSE;
1276 g_mutex_unlock (&priv->mutex);
1277 priv->is_eos = FALSE;
1278 } else {
1279 GST_WARNING_OBJECT (appsrc, "seek failed");
1280 }
1281
1282 return res;
1283 }
1284
1285 /* must be called with the appsrc mutex */
1286 static gboolean
gst_app_src_emit_seek(GstAppSrc * appsrc,guint64 offset)1287 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
1288 {
1289 gboolean res = FALSE;
1290 gboolean emit;
1291 GstAppSrcPrivate *priv = appsrc->priv;
1292 Callbacks *callbacks = NULL;
1293
1294 emit = priv->emit_signals;
1295 if (priv->callbacks)
1296 callbacks = callbacks_ref (priv->callbacks);
1297 g_mutex_unlock (&priv->mutex);
1298
1299 GST_DEBUG_OBJECT (appsrc,
1300 "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
1301 priv->offset, offset);
1302
1303 if (callbacks && callbacks->callbacks.seek_data)
1304 res = callbacks->callbacks.seek_data (appsrc, offset, callbacks->user_data);
1305 else if (emit)
1306 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1307 offset, &res);
1308
1309 g_clear_pointer (&callbacks, callbacks_unref);
1310
1311 g_mutex_lock (&priv->mutex);
1312
1313 return res;
1314 }
1315
1316 /* must be called with the appsrc mutex. After this call things can be
1317 * flushing */
1318 static void
gst_app_src_emit_need_data(GstAppSrc * appsrc,guint size)1319 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
1320 {
1321 gboolean emit;
1322 GstAppSrcPrivate *priv = appsrc->priv;
1323 Callbacks *callbacks = NULL;
1324
1325 emit = priv->emit_signals;
1326 if (priv->callbacks)
1327 callbacks = callbacks_ref (priv->callbacks);
1328 g_mutex_unlock (&priv->mutex);
1329
1330 /* we have no data, we need some. We fire the signal with the size hint. */
1331 if (callbacks && callbacks->callbacks.need_data)
1332 callbacks->callbacks.need_data (appsrc, size, callbacks->user_data);
1333 else if (emit)
1334 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
1335 NULL);
1336
1337 g_clear_pointer (&callbacks, callbacks_unref);
1338
1339 g_mutex_lock (&priv->mutex);
1340 /* we can be flushing now because we released the lock */
1341 }
1342
1343 /* must be called with the appsrc mutex */
1344 static gboolean
gst_app_src_do_negotiate(GstBaseSrc * basesrc)1345 gst_app_src_do_negotiate (GstBaseSrc * basesrc)
1346 {
1347 GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1348 GstAppSrcPrivate *priv = appsrc->priv;
1349 gboolean result;
1350 GstCaps *caps;
1351
1352 GST_OBJECT_LOCK (basesrc);
1353 caps = priv->current_caps ? gst_caps_ref (priv->current_caps) : NULL;
1354 GST_OBJECT_UNLOCK (basesrc);
1355
1356 /* Avoid deadlock by unlocking mutex
1357 * otherwise we get deadlock between this and stream lock */
1358 g_mutex_unlock (&priv->mutex);
1359 if (caps) {
1360 result = gst_base_src_set_caps (basesrc, caps);
1361 gst_caps_unref (caps);
1362 } else {
1363 result = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc);
1364 }
1365 g_mutex_lock (&priv->mutex);
1366
1367 return result;
1368 }
1369
1370 static gboolean
gst_app_src_negotiate(GstBaseSrc * basesrc)1371 gst_app_src_negotiate (GstBaseSrc * basesrc)
1372 {
1373 GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1374 GstAppSrcPrivate *priv = appsrc->priv;
1375 gboolean result;
1376
1377 g_mutex_lock (&priv->mutex);
1378 result = gst_app_src_do_negotiate (basesrc);
1379 g_mutex_unlock (&priv->mutex);
1380 return result;
1381 }
1382
1383 /* Update the currently queued bytes/buffers/time information for the item
1384 * that was just removed from the queue.
1385 *
1386 * If update_offset is set, additionally the offset of the source will be
1387 * moved forward accordingly as if that many bytes were output.
1388 */
1389 static void
gst_app_src_update_queued_pop(GstAppSrc * appsrc,GstMiniObject * item,gboolean update_offset)1390 gst_app_src_update_queued_pop (GstAppSrc * appsrc, GstMiniObject * item,
1391 gboolean update_offset)
1392 {
1393 GstAppSrcPrivate *priv = appsrc->priv;
1394 guint buf_size = 0;
1395 guint n_buffers = 0;
1396 GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
1397
1398 if (GST_IS_BUFFER (item)) {
1399 GstBuffer *buf = GST_BUFFER_CAST (item);
1400 buf_size = gst_buffer_get_size (buf);
1401 n_buffers = 1;
1402
1403 end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
1404 if (end_buffer_ts != GST_CLOCK_TIME_NONE
1405 && GST_BUFFER_DURATION_IS_VALID (buf))
1406 end_buffer_ts += GST_BUFFER_DURATION (buf);
1407
1408 GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", buf, buf_size);
1409 } else if (GST_IS_BUFFER_LIST (item)) {
1410 GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
1411 guint i;
1412
1413 n_buffers = gst_buffer_list_length (buffer_list);
1414
1415 for (i = 0; i < n_buffers; i++) {
1416 GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
1417 GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
1418
1419 buf_size += gst_buffer_get_size (tmp);
1420 /* Update to the last buffer's timestamp that is known */
1421 if (ts != GST_CLOCK_TIME_NONE) {
1422 end_buffer_ts = ts;
1423 if (GST_BUFFER_DURATION_IS_VALID (tmp))
1424 end_buffer_ts += GST_BUFFER_DURATION (tmp);
1425 }
1426 }
1427 }
1428
1429 priv->queued_bytes -= buf_size;
1430 priv->queued_buffers -= n_buffers;
1431
1432 /* Update time level if working on a TIME segment */
1433 if ((priv->current_segment.format == GST_FORMAT_TIME
1434 || (priv->current_segment.format == GST_FORMAT_UNDEFINED
1435 && priv->last_segment.format == GST_FORMAT_TIME))
1436 && end_buffer_ts != GST_CLOCK_TIME_NONE) {
1437 const GstSegment *segment =
1438 priv->current_segment.format ==
1439 GST_FORMAT_TIME ? &priv->current_segment : &priv->last_segment;
1440
1441 /* Clip to the current segment boundaries */
1442 if (segment->stop != -1 && end_buffer_ts > segment->stop)
1443 end_buffer_ts = segment->stop;
1444 else if (segment->start > end_buffer_ts)
1445 end_buffer_ts = segment->start;
1446
1447 priv->last_out_running_time =
1448 gst_segment_to_running_time (segment, GST_FORMAT_TIME, end_buffer_ts);
1449
1450 GST_TRACE_OBJECT (appsrc,
1451 "Last in running time %" GST_TIME_FORMAT ", last out running time %"
1452 GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
1453 GST_TIME_ARGS (priv->last_out_running_time));
1454
1455 /* If timestamps on both sides are known, calculate the current
1456 * fill level in time and consider the queue empty if the output
1457 * running time is lower than the input one (i.e. some kind of reset
1458 * has happened).
1459 */
1460 if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
1461 && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
1462 if (priv->last_out_running_time > priv->last_in_running_time) {
1463 priv->queued_time = 0;
1464 } else {
1465 priv->queued_time =
1466 priv->last_in_running_time - priv->last_out_running_time;
1467 }
1468 }
1469 }
1470
1471 GST_DEBUG_OBJECT (appsrc,
1472 "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
1473 " buffers, %" GST_TIME_FORMAT, priv->queued_bytes,
1474 priv->queued_buffers, GST_TIME_ARGS (priv->queued_time));
1475
1476 /* only update the offset when in random_access mode and when requested by
1477 * the caller, i.e. not when just dropping the item */
1478 if (update_offset && priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
1479 priv->offset += buf_size;
1480 }
1481
1482 /* Update the currently queued bytes/buffers/time information for the item
1483 * that was just added to the queue.
1484 */
1485 static void
gst_app_src_update_queued_push(GstAppSrc * appsrc,GstMiniObject * item)1486 gst_app_src_update_queued_push (GstAppSrc * appsrc, GstMiniObject * item)
1487 {
1488 GstAppSrcPrivate *priv = appsrc->priv;
1489 GstClockTime start_buffer_ts = GST_CLOCK_TIME_NONE;
1490 GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
1491 guint buf_size = 0;
1492 guint n_buffers = 0;
1493
1494 if (GST_IS_BUFFER (item)) {
1495 GstBuffer *buf = GST_BUFFER_CAST (item);
1496
1497 buf_size = gst_buffer_get_size (buf);
1498 n_buffers = 1;
1499
1500 start_buffer_ts = end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
1501 if (end_buffer_ts != GST_CLOCK_TIME_NONE
1502 && GST_BUFFER_DURATION_IS_VALID (buf))
1503 end_buffer_ts += GST_BUFFER_DURATION (buf);
1504 } else if (GST_IS_BUFFER_LIST (item)) {
1505 GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
1506 guint i;
1507
1508 n_buffers = gst_buffer_list_length (buffer_list);
1509
1510 for (i = 0; i < n_buffers; i++) {
1511 GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
1512 GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
1513
1514 buf_size += gst_buffer_get_size (tmp);
1515
1516 if (ts != GST_CLOCK_TIME_NONE) {
1517 if (start_buffer_ts == GST_CLOCK_TIME_NONE)
1518 start_buffer_ts = ts;
1519 end_buffer_ts = ts;
1520 if (GST_BUFFER_DURATION_IS_VALID (tmp))
1521 end_buffer_ts += GST_BUFFER_DURATION (tmp);
1522 }
1523 }
1524 }
1525
1526 priv->queued_bytes += buf_size;
1527 priv->queued_buffers += n_buffers;
1528
1529 /* Update time level if working on a TIME segment */
1530 if (priv->last_segment.format == GST_FORMAT_TIME
1531 && end_buffer_ts != GST_CLOCK_TIME_NONE) {
1532 /* Clip to the last segment boundaries */
1533 if (priv->last_segment.stop != -1
1534 && end_buffer_ts > priv->last_segment.stop)
1535 end_buffer_ts = priv->last_segment.stop;
1536 else if (priv->last_segment.start > end_buffer_ts)
1537 end_buffer_ts = priv->last_segment.start;
1538
1539 priv->last_in_running_time =
1540 gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
1541 end_buffer_ts);
1542
1543 /* If this is the only buffer then we can directly update the queued time
1544 * here. This is especially useful if this was the first buffer because
1545 * otherwise we would have to wait until it is actually unqueued to know
1546 * the queued duration */
1547 if (priv->queued_buffers == 1) {
1548 if (priv->last_segment.stop != -1
1549 && start_buffer_ts > priv->last_segment.stop)
1550 start_buffer_ts = priv->last_segment.stop;
1551 else if (priv->last_segment.start > start_buffer_ts)
1552 start_buffer_ts = priv->last_segment.start;
1553
1554 priv->last_out_running_time =
1555 gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
1556 start_buffer_ts);
1557 }
1558
1559 GST_TRACE_OBJECT (appsrc,
1560 "Last in running time %" GST_TIME_FORMAT ", last out running time %"
1561 GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
1562 GST_TIME_ARGS (priv->last_out_running_time));
1563
1564 if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
1565 && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
1566 if (priv->last_out_running_time > priv->last_in_running_time) {
1567 priv->queued_time = 0;
1568 } else {
1569 priv->queued_time =
1570 priv->last_in_running_time - priv->last_out_running_time;
1571 }
1572 }
1573 }
1574
1575 GST_DEBUG_OBJECT (appsrc,
1576 "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
1577 " buffers, %" GST_TIME_FORMAT, priv->queued_bytes, priv->queued_buffers,
1578 GST_TIME_ARGS (priv->queued_time));
1579 }
1580
1581 static GstFlowReturn
gst_app_src_create(GstBaseSrc * bsrc,guint64 offset,guint size,GstBuffer ** buf)1582 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
1583 GstBuffer ** buf)
1584 {
1585 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1586 GstAppSrcPrivate *priv = appsrc->priv;
1587 GstFlowReturn ret;
1588
1589 GST_OBJECT_LOCK (appsrc);
1590 if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
1591 bsrc->segment.format == GST_FORMAT_BYTES)) {
1592 GST_DEBUG_OBJECT (appsrc,
1593 "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
1594 bsrc->segment.duration, priv->size);
1595 bsrc->segment.duration = priv->size;
1596 GST_OBJECT_UNLOCK (appsrc);
1597
1598 gst_element_post_message (GST_ELEMENT (appsrc),
1599 gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1600 } else if (G_UNLIKELY (priv->duration != bsrc->segment.duration &&
1601 bsrc->segment.format == GST_FORMAT_TIME)) {
1602 GST_DEBUG_OBJECT (appsrc,
1603 "Duration changed from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
1604 GST_TIME_ARGS (bsrc->segment.duration), GST_TIME_ARGS (priv->duration));
1605 bsrc->segment.duration = priv->duration;
1606 GST_OBJECT_UNLOCK (appsrc);
1607
1608 gst_element_post_message (GST_ELEMENT (appsrc),
1609 gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1610 } else {
1611 GST_OBJECT_UNLOCK (appsrc);
1612 }
1613
1614 g_mutex_lock (&priv->mutex);
1615 /* check flushing first */
1616 if (G_UNLIKELY (priv->flushing))
1617 goto flushing;
1618
1619 if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
1620 /* if we are dealing with a random-access stream, issue a seek if the offset
1621 * changed. */
1622 if (G_UNLIKELY (priv->offset != offset)) {
1623 gboolean res;
1624
1625 /* do the seek */
1626 res = gst_app_src_emit_seek (appsrc, offset);
1627 #ifdef OHOS_OPT_COMPAT
1628 /**
1629 * ohos.opt.compat.0060
1630 * Flush data after seek to avoid push old data.
1631 */
1632 if (priv->datasrc_mode) {
1633 GST_DEBUG_OBJECT(appsrc, "appsrc do seek, begin, flush");
1634 gst_app_src_flush_queued (appsrc, FALSE);
1635 }
1636 #endif
1637
1638 if (G_UNLIKELY (!res))
1639 /* failing to seek is fatal */
1640 goto seek_error;
1641
1642 priv->offset = offset;
1643 priv->is_eos = FALSE;
1644 }
1645 }
1646
1647 while (TRUE) {
1648 /* Our lock may have been release to push events or caps, check out
1649 * state in case we are now flushing. */
1650 if (G_UNLIKELY (priv->flushing))
1651 goto flushing;
1652
1653 /* return data as long as we have some */
1654 if (!gst_queue_array_is_empty (priv->queue)) {
1655 GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
1656
1657 if (GST_IS_CAPS (obj)) {
1658 GstCaps *next_caps = GST_CAPS (obj);
1659 gboolean caps_changed = TRUE;
1660
1661 if (next_caps && priv->current_caps)
1662 caps_changed = !gst_caps_is_equal (next_caps, priv->current_caps);
1663 else
1664 caps_changed = (next_caps != priv->current_caps);
1665
1666 gst_caps_replace (&priv->current_caps, next_caps);
1667
1668 if (next_caps) {
1669 gst_caps_unref (next_caps);
1670 }
1671
1672 if (caps_changed)
1673 gst_app_src_do_negotiate (bsrc);
1674
1675 /* Continue checks caps and queue */
1676 continue;
1677 }
1678
1679 if (GST_IS_BUFFER (obj)) {
1680 GstBuffer *buffer = GST_BUFFER (obj);
1681
1682 /* Mark the buffer as DISCONT if we previously dropped a buffer
1683 * instead of outputting it */
1684 if (priv->need_discont_downstream) {
1685 buffer = gst_buffer_make_writable (buffer);
1686 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1687 priv->need_discont_downstream = FALSE;
1688 }
1689
1690 *buf = buffer;
1691 } else if (GST_IS_BUFFER_LIST (obj)) {
1692 GstBufferList *buffer_list;
1693
1694 buffer_list = GST_BUFFER_LIST (obj);
1695
1696 /* Mark the first buffer of the buffer list as DISCONT if we
1697 * previously dropped a buffer instead of outputting it */
1698 if (priv->need_discont_downstream) {
1699 GstBuffer *buffer;
1700
1701 buffer_list = gst_buffer_list_make_writable (buffer_list);
1702 buffer = gst_buffer_list_get_writable (buffer_list, 0);
1703 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1704 priv->need_discont_downstream = FALSE;
1705 }
1706
1707 gst_base_src_submit_buffer_list (bsrc, buffer_list);
1708 *buf = NULL;
1709 } else if (GST_IS_EVENT (obj)) {
1710 GstEvent *event = GST_EVENT (obj);
1711
1712 GST_DEBUG_OBJECT (appsrc, "pop event %" GST_PTR_FORMAT, event);
1713
1714 if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1715 const GstSegment *segment = NULL;
1716
1717 gst_event_parse_segment (event, &segment);
1718 g_assert (segment != NULL);
1719
1720 if (!gst_segment_is_equal (&priv->current_segment, segment)) {
1721 GST_DEBUG_OBJECT (appsrc,
1722 "Update new segment %" GST_PTR_FORMAT, event);
1723 if (!gst_base_src_new_segment (bsrc, segment)) {
1724 GST_ERROR_OBJECT (appsrc,
1725 "Couldn't set new segment %" GST_PTR_FORMAT, event);
1726 gst_event_unref (event);
1727 goto invalid_segment;
1728 }
1729 gst_segment_copy_into (segment, &priv->current_segment);
1730 }
1731
1732 gst_event_unref (event);
1733 } else {
1734 GstEvent *seg_event;
1735 GstSegment last_segment = priv->last_segment;
1736
1737 /* event is serialized with the buffers flow */
1738
1739 /* We are about to push an event, release out lock */
1740 g_mutex_unlock (&priv->mutex);
1741
1742 seg_event =
1743 gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc),
1744 GST_EVENT_SEGMENT, 0);
1745 if (!seg_event) {
1746 seg_event = gst_event_new_segment (&last_segment);
1747
1748 GST_DEBUG_OBJECT (appsrc,
1749 "received serialized event before first buffer, push default segment %"
1750 GST_PTR_FORMAT, seg_event);
1751
1752 gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), seg_event);
1753 } else {
1754 gst_event_unref (seg_event);
1755 }
1756
1757 gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event);
1758
1759 g_mutex_lock (&priv->mutex);
1760 }
1761 continue;
1762 } else {
1763 g_assert_not_reached ();
1764 }
1765
1766 gst_app_src_update_queued_pop (appsrc, obj, TRUE);
1767
1768 /* signal that we removed an item */
1769 if ((priv->wait_status & APP_WAITING))
1770 g_cond_broadcast (&priv->cond);
1771
1772 /* see if we go lower than the min-percent */
1773 if (priv->min_percent) {
1774 if ((priv->max_bytes
1775 && priv->queued_bytes * 100 / priv->max_bytes <=
1776 priv->min_percent) || (priv->max_buffers
1777 && priv->queued_buffers * 100 / priv->max_buffers <=
1778 priv->min_percent) || (priv->max_time
1779 && priv->queued_time * 100 / priv->max_time <=
1780 priv->min_percent)) {
1781 /* ignore flushing state, we got a buffer and we will return it now.
1782 * Errors will be handled in the next round */
1783 gst_app_src_emit_need_data (appsrc, size);
1784 }
1785 }
1786 ret = GST_FLOW_OK;
1787 break;
1788 } else {
1789 gst_app_src_emit_need_data (appsrc, size);
1790
1791 /* we can be flushing now because we released the lock above */
1792 if (G_UNLIKELY (priv->flushing))
1793 goto flushing;
1794
1795 /* if we have a buffer now, continue the loop and try to return it. In
1796 * random-access mode (where a buffer is normally pushed in the above
1797 * signal) we can still be empty because the pushed buffer got flushed or
1798 * when the application pushes the requested buffer later, we support both
1799 * possibilities. */
1800 if (!gst_queue_array_is_empty (priv->queue))
1801 continue;
1802
1803 /* no buffer yet, maybe we are EOS, if not, block for more data. */
1804 }
1805
1806 /* check EOS */
1807 if (G_UNLIKELY (priv->is_eos))
1808 goto eos;
1809
1810 /* nothing to return, wait a while for new data or flushing. */
1811 priv->wait_status |= STREAM_WAITING;
1812 g_cond_wait (&priv->cond, &priv->mutex);
1813 priv->wait_status &= ~STREAM_WAITING;
1814 }
1815 g_mutex_unlock (&priv->mutex);
1816 return ret;
1817
1818 /* ERRORS */
1819 flushing:
1820 {
1821 GST_DEBUG_OBJECT (appsrc, "we are flushing");
1822 g_mutex_unlock (&priv->mutex);
1823 return GST_FLOW_FLUSHING;
1824 }
1825 eos:
1826 {
1827 GST_DEBUG_OBJECT (appsrc, "we are EOS");
1828 g_mutex_unlock (&priv->mutex);
1829 return GST_FLOW_EOS;
1830 }
1831 seek_error:
1832 {
1833 g_mutex_unlock (&priv->mutex);
1834 GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1835 GST_ERROR_SYSTEM);
1836 return GST_FLOW_ERROR;
1837 }
1838
1839 invalid_segment:
1840 {
1841 g_mutex_unlock (&priv->mutex);
1842 GST_ELEMENT_ERROR (appsrc, LIBRARY, SETTINGS,
1843 (NULL), ("Failed to configure the provided input segment."));
1844 return GST_FLOW_ERROR;
1845 }
1846 }
1847
1848 /* external API */
1849
1850 /**
1851 * gst_app_src_set_caps:
1852 * @appsrc: a #GstAppSrc
1853 * @caps: (nullable): caps to set
1854 *
1855 * Set the capabilities on the appsrc element. This function takes
1856 * a copy of the caps structure. After calling this method, the source will
1857 * only produce caps that match @caps. @caps must be fixed and the caps on the
1858 * buffers must match the caps or left NULL.
1859 */
1860 void
gst_app_src_set_caps(GstAppSrc * appsrc,const GstCaps * caps)1861 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1862 {
1863 GstAppSrcPrivate *priv;
1864 gboolean caps_changed;
1865
1866 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1867
1868 priv = appsrc->priv;
1869
1870 g_mutex_lock (&priv->mutex);
1871
1872 GST_OBJECT_LOCK (appsrc);
1873 if (caps && priv->last_caps)
1874 caps_changed = !gst_caps_is_equal (caps, priv->last_caps);
1875 else
1876 caps_changed = (caps != priv->last_caps);
1877
1878 if (caps_changed) {
1879 GstCaps *new_caps;
1880 gpointer t;
1881
1882 new_caps = caps ? gst_caps_copy (caps) : NULL;
1883 GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1884
1885 while ((t = gst_queue_array_peek_tail (priv->queue)) && GST_IS_CAPS (t)) {
1886 gst_caps_unref (gst_queue_array_pop_tail (priv->queue));
1887 }
1888 gst_queue_array_push_tail (priv->queue, new_caps);
1889 gst_caps_replace (&priv->last_caps, new_caps);
1890
1891 if ((priv->wait_status & STREAM_WAITING))
1892 g_cond_broadcast (&priv->cond);
1893 }
1894
1895 GST_OBJECT_UNLOCK (appsrc);
1896
1897 g_mutex_unlock (&priv->mutex);
1898 }
1899
1900 /**
1901 * gst_app_src_get_caps:
1902 * @appsrc: a #GstAppSrc
1903 *
1904 * Get the configured caps on @appsrc.
1905 *
1906 * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
1907 */
1908 GstCaps *
gst_app_src_get_caps(GstAppSrc * appsrc)1909 gst_app_src_get_caps (GstAppSrc * appsrc)
1910 {
1911
1912 GstCaps *caps;
1913
1914 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1915
1916 GST_OBJECT_LOCK (appsrc);
1917 if ((caps = appsrc->priv->last_caps))
1918 gst_caps_ref (caps);
1919 GST_OBJECT_UNLOCK (appsrc);
1920
1921 return caps;
1922
1923 }
1924
1925 /**
1926 * gst_app_src_set_size:
1927 * @appsrc: a #GstAppSrc
1928 * @size: the size to set
1929 *
1930 * Set the size of the stream in bytes. A value of -1 means that the size is
1931 * not known.
1932 */
1933 void
gst_app_src_set_size(GstAppSrc * appsrc,gint64 size)1934 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1935 {
1936 GstAppSrcPrivate *priv;
1937
1938 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1939
1940 priv = appsrc->priv;
1941
1942 GST_OBJECT_LOCK (appsrc);
1943 GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1944 priv->size = size;
1945 GST_OBJECT_UNLOCK (appsrc);
1946 }
1947
1948 /**
1949 * gst_app_src_get_size:
1950 * @appsrc: a #GstAppSrc
1951 *
1952 * Get the size of the stream in bytes. A value of -1 means that the size is
1953 * not known.
1954 *
1955 * Returns: the size of the stream previously set with gst_app_src_set_size();
1956 */
1957 gint64
gst_app_src_get_size(GstAppSrc * appsrc)1958 gst_app_src_get_size (GstAppSrc * appsrc)
1959 {
1960 gint64 size;
1961 GstAppSrcPrivate *priv;
1962
1963 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1964
1965 priv = appsrc->priv;
1966
1967 GST_OBJECT_LOCK (appsrc);
1968 size = priv->size;
1969 GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1970 GST_OBJECT_UNLOCK (appsrc);
1971
1972 return size;
1973 }
1974
1975 /**
1976 * gst_app_src_set_duration:
1977 * @appsrc: a #GstAppSrc
1978 * @duration: the duration to set
1979 *
1980 * Set the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
1981 * not known.
1982 *
1983 * Since: 1.10
1984 */
1985 void
gst_app_src_set_duration(GstAppSrc * appsrc,GstClockTime duration)1986 gst_app_src_set_duration (GstAppSrc * appsrc, GstClockTime duration)
1987 {
1988 GstAppSrcPrivate *priv;
1989
1990 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1991
1992 priv = appsrc->priv;
1993
1994 GST_OBJECT_LOCK (appsrc);
1995 GST_DEBUG_OBJECT (appsrc, "setting duration of %" GST_TIME_FORMAT,
1996 GST_TIME_ARGS (duration));
1997 priv->duration = duration;
1998 GST_OBJECT_UNLOCK (appsrc);
1999 }
2000
2001 /**
2002 * gst_app_src_get_duration:
2003 * @appsrc: a #GstAppSrc
2004 *
2005 * Get the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
2006 * not known.
2007 *
2008 * Returns: the duration of the stream previously set with gst_app_src_set_duration();
2009 *
2010 * Since: 1.10
2011 */
2012 GstClockTime
gst_app_src_get_duration(GstAppSrc * appsrc)2013 gst_app_src_get_duration (GstAppSrc * appsrc)
2014 {
2015 GstClockTime duration;
2016 GstAppSrcPrivate *priv;
2017
2018 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
2019
2020 priv = appsrc->priv;
2021
2022 GST_OBJECT_LOCK (appsrc);
2023 duration = priv->duration;
2024 GST_DEBUG_OBJECT (appsrc, "getting duration of %" GST_TIME_FORMAT,
2025 GST_TIME_ARGS (duration));
2026 GST_OBJECT_UNLOCK (appsrc);
2027
2028 return duration;
2029 }
2030
2031 /**
2032 * gst_app_src_set_stream_type:
2033 * @appsrc: a #GstAppSrc
2034 * @type: the new state
2035 *
2036 * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
2037 * be connected to.
2038 *
2039 * A stream_type stream
2040 */
2041 void
gst_app_src_set_stream_type(GstAppSrc * appsrc,GstAppStreamType type)2042 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
2043 {
2044 GstAppSrcPrivate *priv;
2045
2046 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2047
2048 priv = appsrc->priv;
2049
2050 GST_OBJECT_LOCK (appsrc);
2051 GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
2052 priv->stream_type = type;
2053 GST_OBJECT_UNLOCK (appsrc);
2054 }
2055
2056 /**
2057 * gst_app_src_get_stream_type:
2058 * @appsrc: a #GstAppSrc
2059 *
2060 * Get the stream type. Control the stream type of @appsrc
2061 * with gst_app_src_set_stream_type().
2062 *
2063 * Returns: the stream type.
2064 */
2065 GstAppStreamType
gst_app_src_get_stream_type(GstAppSrc * appsrc)2066 gst_app_src_get_stream_type (GstAppSrc * appsrc)
2067 {
2068 gboolean stream_type;
2069 GstAppSrcPrivate *priv;
2070
2071 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
2072
2073 priv = appsrc->priv;
2074
2075 GST_OBJECT_LOCK (appsrc);
2076 stream_type = priv->stream_type;
2077 GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
2078 GST_OBJECT_UNLOCK (appsrc);
2079
2080 return stream_type;
2081 }
2082
2083 /**
2084 * gst_app_src_set_max_bytes:
2085 * @appsrc: a #GstAppSrc
2086 * @max: the maximum number of bytes to queue
2087 *
2088 * Set the maximum amount of bytes that can be queued in @appsrc.
2089 * After the maximum amount of bytes are queued, @appsrc will emit the
2090 * "enough-data" signal.
2091 */
2092 void
gst_app_src_set_max_bytes(GstAppSrc * appsrc,guint64 max)2093 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
2094 {
2095 GstAppSrcPrivate *priv;
2096
2097 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2098
2099 priv = appsrc->priv;
2100
2101 g_mutex_lock (&priv->mutex);
2102 if (max != priv->max_bytes) {
2103 GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
2104 priv->max_bytes = max;
2105 /* signal the change */
2106 g_cond_broadcast (&priv->cond);
2107 }
2108 g_mutex_unlock (&priv->mutex);
2109 }
2110
2111 /**
2112 * gst_app_src_get_max_bytes:
2113 * @appsrc: a #GstAppSrc
2114 *
2115 * Get the maximum amount of bytes that can be queued in @appsrc.
2116 *
2117 * Returns: The maximum amount of bytes that can be queued.
2118 */
2119 guint64
gst_app_src_get_max_bytes(GstAppSrc * appsrc)2120 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
2121 {
2122 guint64 result;
2123 GstAppSrcPrivate *priv;
2124
2125 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2126
2127 priv = appsrc->priv;
2128
2129 g_mutex_lock (&priv->mutex);
2130 result = priv->max_bytes;
2131 GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
2132 g_mutex_unlock (&priv->mutex);
2133
2134 return result;
2135 }
2136
2137 /**
2138 * gst_app_src_get_current_level_bytes:
2139 * @appsrc: a #GstAppSrc
2140 *
2141 * Get the number of currently queued bytes inside @appsrc.
2142 *
2143 * Returns: The number of currently queued bytes.
2144 *
2145 * Since: 1.2
2146 */
2147 guint64
gst_app_src_get_current_level_bytes(GstAppSrc * appsrc)2148 gst_app_src_get_current_level_bytes (GstAppSrc * appsrc)
2149 {
2150 guint64 queued;
2151 GstAppSrcPrivate *priv;
2152
2153 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
2154
2155 priv = appsrc->priv;
2156
2157 GST_OBJECT_LOCK (appsrc);
2158 queued = priv->queued_bytes;
2159 GST_DEBUG_OBJECT (appsrc, "current level bytes is %" G_GUINT64_FORMAT,
2160 queued);
2161 GST_OBJECT_UNLOCK (appsrc);
2162
2163 return queued;
2164 }
2165
2166 /**
2167 * gst_app_src_set_max_buffers:
2168 * @appsrc: a #GstAppSrc
2169 * @max: the maximum number of buffers to queue
2170 *
2171 * Set the maximum amount of buffers that can be queued in @appsrc.
2172 * After the maximum amount of buffers are queued, @appsrc will emit the
2173 * "enough-data" signal.
2174 *
2175 * Since: 1.20
2176 */
2177 void
gst_app_src_set_max_buffers(GstAppSrc * appsrc,guint64 max)2178 gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint64 max)
2179 {
2180 GstAppSrcPrivate *priv;
2181
2182 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2183
2184 priv = appsrc->priv;
2185
2186 g_mutex_lock (&priv->mutex);
2187 if (max != priv->max_buffers) {
2188 GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %" G_GUINT64_FORMAT, max);
2189 priv->max_buffers = max;
2190 /* signal the change */
2191 g_cond_broadcast (&priv->cond);
2192 }
2193 g_mutex_unlock (&priv->mutex);
2194 }
2195
2196 /**
2197 * gst_app_src_get_max_buffers:
2198 * @appsrc: a #GstAppSrc
2199 *
2200 * Get the maximum amount of buffers that can be queued in @appsrc.
2201 *
2202 * Returns: The maximum amount of buffers that can be queued.
2203 *
2204 * Since: 1.20
2205 */
2206 guint64
gst_app_src_get_max_buffers(GstAppSrc * appsrc)2207 gst_app_src_get_max_buffers (GstAppSrc * appsrc)
2208 {
2209 guint64 result;
2210 GstAppSrcPrivate *priv;
2211
2212 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2213
2214 priv = appsrc->priv;
2215
2216 g_mutex_lock (&priv->mutex);
2217 result = priv->max_buffers;
2218 GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %" G_GUINT64_FORMAT,
2219 result);
2220 g_mutex_unlock (&priv->mutex);
2221
2222 return result;
2223 }
2224
2225 /**
2226 * gst_app_src_get_current_level_buffers:
2227 * @appsrc: a #GstAppSrc
2228 *
2229 * Get the number of currently queued buffers inside @appsrc.
2230 *
2231 * Returns: The number of currently queued buffers.
2232 *
2233 * Since: 1.20
2234 */
2235 guint64
gst_app_src_get_current_level_buffers(GstAppSrc * appsrc)2236 gst_app_src_get_current_level_buffers (GstAppSrc * appsrc)
2237 {
2238 guint64 queued;
2239 GstAppSrcPrivate *priv;
2240
2241 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
2242
2243 priv = appsrc->priv;
2244
2245 GST_OBJECT_LOCK (appsrc);
2246 queued = priv->queued_buffers;
2247 GST_DEBUG_OBJECT (appsrc, "current level buffers is %" G_GUINT64_FORMAT,
2248 queued);
2249 GST_OBJECT_UNLOCK (appsrc);
2250
2251 return queued;
2252 }
2253
2254 /**
2255 * gst_app_src_set_max_time:
2256 * @appsrc: a #GstAppSrc
2257 * @max: the maximum amonut of time to queue
2258 *
2259 * Set the maximum amount of time that can be queued in @appsrc.
2260 * After the maximum amount of time are queued, @appsrc will emit the
2261 * "enough-data" signal.
2262 *
2263 * Since: 1.20
2264 */
2265 void
gst_app_src_set_max_time(GstAppSrc * appsrc,GstClockTime max)2266 gst_app_src_set_max_time (GstAppSrc * appsrc, GstClockTime max)
2267 {
2268 GstAppSrcPrivate *priv;
2269
2270 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2271
2272 priv = appsrc->priv;
2273
2274 g_mutex_lock (&priv->mutex);
2275 if (max != priv->max_time) {
2276 GST_DEBUG_OBJECT (appsrc, "setting max-time to %" GST_TIME_FORMAT,
2277 GST_TIME_ARGS (max));
2278 priv->max_time = max;
2279 /* signal the change */
2280 g_cond_broadcast (&priv->cond);
2281 }
2282 g_mutex_unlock (&priv->mutex);
2283 }
2284
2285 /**
2286 * gst_app_src_get_max_time:
2287 * @appsrc: a #GstAppSrc
2288 *
2289 * Get the maximum amount of time that can be queued in @appsrc.
2290 *
2291 * Returns: The maximum amount of time that can be queued.
2292 *
2293 * Since: 1.20
2294 */
2295 GstClockTime
gst_app_src_get_max_time(GstAppSrc * appsrc)2296 gst_app_src_get_max_time (GstAppSrc * appsrc)
2297 {
2298 GstClockTime result;
2299 GstAppSrcPrivate *priv;
2300
2301 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2302
2303 priv = appsrc->priv;
2304
2305 g_mutex_lock (&priv->mutex);
2306 result = priv->max_time;
2307 GST_DEBUG_OBJECT (appsrc, "getting max-time of %" GST_TIME_FORMAT,
2308 GST_TIME_ARGS (result));
2309 g_mutex_unlock (&priv->mutex);
2310
2311 return result;
2312 }
2313
2314 /**
2315 * gst_app_src_get_current_level_time:
2316 * @appsrc: a #GstAppSrc
2317 *
2318 * Get the amount of currently queued time inside @appsrc.
2319 *
2320 * Returns: The amount of currently queued time.
2321 *
2322 * Since: 1.20
2323 */
2324 GstClockTime
gst_app_src_get_current_level_time(GstAppSrc * appsrc)2325 gst_app_src_get_current_level_time (GstAppSrc * appsrc)
2326 {
2327 gint64 queued;
2328 GstAppSrcPrivate *priv;
2329
2330 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
2331
2332 priv = appsrc->priv;
2333
2334 GST_OBJECT_LOCK (appsrc);
2335 queued = priv->queued_time;
2336 GST_DEBUG_OBJECT (appsrc, "current level time is %" GST_TIME_FORMAT,
2337 GST_TIME_ARGS (queued));
2338 GST_OBJECT_UNLOCK (appsrc);
2339
2340 return queued;
2341 }
2342
2343 static void
gst_app_src_set_latencies(GstAppSrc * appsrc,gboolean do_min,guint64 min,gboolean do_max,guint64 max)2344 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
2345 gboolean do_max, guint64 max)
2346 {
2347 GstAppSrcPrivate *priv = appsrc->priv;
2348 gboolean changed = FALSE;
2349
2350 g_mutex_lock (&priv->mutex);
2351 if (do_min && priv->min_latency != min) {
2352 priv->min_latency = min;
2353 changed = TRUE;
2354 }
2355 if (do_max && priv->max_latency != max) {
2356 priv->max_latency = max;
2357 changed = TRUE;
2358 }
2359 g_mutex_unlock (&priv->mutex);
2360
2361 if (changed) {
2362 GST_DEBUG_OBJECT (appsrc, "posting latency changed");
2363 gst_element_post_message (GST_ELEMENT_CAST (appsrc),
2364 gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
2365 }
2366 }
2367
2368 /**
2369 * gst_app_src_set_leaky_type:
2370 * @appsrc: a #GstAppSrc
2371 * @leaky: the #GstAppLeakyType
2372 *
2373 * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
2374 * will drop any buffers that are pushed into it once its internal queue is
2375 * full. The selected type defines whether to drop the oldest or new
2376 * buffers.
2377 *
2378 * Since: 1.20
2379 */
2380 void
gst_app_src_set_leaky_type(GstAppSrc * appsrc,GstAppLeakyType leaky)2381 gst_app_src_set_leaky_type (GstAppSrc * appsrc, GstAppLeakyType leaky)
2382 {
2383 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2384
2385 appsrc->priv->leaky_type = leaky;
2386 }
2387
2388 /**
2389 * gst_app_src_get_leaky_type:
2390 * @appsrc: a #GstAppSrc
2391 *
2392 * Returns the currently set #GstAppLeakyType. See gst_app_src_set_leaky_type()
2393 * for more details.
2394 *
2395 * Returns: The currently set #GstAppLeakyType.
2396 *
2397 * Since: 1.20
2398 */
2399 GstAppLeakyType
gst_app_src_get_leaky_type(GstAppSrc * appsrc)2400 gst_app_src_get_leaky_type (GstAppSrc * appsrc)
2401 {
2402 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_APP_LEAKY_TYPE_NONE);
2403
2404 return appsrc->priv->leaky_type;
2405 }
2406
2407 /**
2408 * gst_app_src_set_latency:
2409 * @appsrc: a #GstAppSrc
2410 * @min: the min latency
2411 * @max: the max latency
2412 *
2413 * Configure the @min and @max latency in @src. If @min is set to -1, the
2414 * default latency calculations for pseudo-live sources will be used.
2415 */
2416 void
gst_app_src_set_latency(GstAppSrc * appsrc,guint64 min,guint64 max)2417 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
2418 {
2419 gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
2420 }
2421
2422 /**
2423 * gst_app_src_get_latency:
2424 * @appsrc: a #GstAppSrc
2425 * @min: (out): the min latency
2426 * @max: (out): the max latency
2427 *
2428 * Retrieve the min and max latencies in @min and @max respectively.
2429 */
2430 void
gst_app_src_get_latency(GstAppSrc * appsrc,guint64 * min,guint64 * max)2431 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
2432 {
2433 GstAppSrcPrivate *priv;
2434
2435 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2436
2437 priv = appsrc->priv;
2438
2439 g_mutex_lock (&priv->mutex);
2440 if (min)
2441 *min = priv->min_latency;
2442 if (max)
2443 *max = priv->max_latency;
2444 g_mutex_unlock (&priv->mutex);
2445 }
2446
2447 /**
2448 * gst_app_src_set_emit_signals:
2449 * @appsrc: a #GstAppSrc
2450 * @emit: the new state
2451 *
2452 * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
2453 * by default disabled because signal emission is expensive and unneeded when
2454 * the application prefers to operate in pull mode.
2455 */
2456 void
gst_app_src_set_emit_signals(GstAppSrc * appsrc,gboolean emit)2457 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
2458 {
2459 GstAppSrcPrivate *priv;
2460
2461 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2462
2463 priv = appsrc->priv;
2464
2465 g_mutex_lock (&priv->mutex);
2466 priv->emit_signals = emit;
2467 g_mutex_unlock (&priv->mutex);
2468 }
2469
2470 /**
2471 * gst_app_src_get_emit_signals:
2472 * @appsrc: a #GstAppSrc
2473 *
2474 * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
2475 *
2476 * Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
2477 * signals.
2478 */
2479 gboolean
gst_app_src_get_emit_signals(GstAppSrc * appsrc)2480 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
2481 {
2482 gboolean result;
2483 GstAppSrcPrivate *priv;
2484
2485 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
2486
2487 priv = appsrc->priv;
2488
2489 g_mutex_lock (&priv->mutex);
2490 result = priv->emit_signals;
2491 g_mutex_unlock (&priv->mutex);
2492
2493 return result;
2494 }
2495
2496 static GstFlowReturn
gst_app_src_push_internal(GstAppSrc * appsrc,GstBuffer * buffer,GstBufferList * buflist,gboolean steal_ref)2497 gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
2498 GstBufferList * buflist, gboolean steal_ref)
2499 {
2500 gboolean first = TRUE;
2501 GstAppSrcPrivate *priv;
2502
2503 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
2504
2505 priv = appsrc->priv;
2506
2507 if (buffer != NULL)
2508 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2509 else
2510 g_return_val_if_fail (GST_IS_BUFFER_LIST (buflist), GST_FLOW_ERROR);
2511
2512 if (buflist != NULL) {
2513 if (gst_buffer_list_length (buflist) == 0)
2514 return GST_FLOW_OK;
2515
2516 buffer = gst_buffer_list_get (buflist, 0);
2517 }
2518
2519 if (GST_BUFFER_DTS (buffer) == GST_CLOCK_TIME_NONE &&
2520 GST_BUFFER_PTS (buffer) == GST_CLOCK_TIME_NONE &&
2521 gst_base_src_get_do_timestamp (GST_BASE_SRC_CAST (appsrc))) {
2522 GstClock *clock;
2523
2524 clock = gst_element_get_clock (GST_ELEMENT_CAST (appsrc));
2525 if (clock) {
2526 GstClockTime now;
2527 GstClockTime base_time =
2528 gst_element_get_base_time (GST_ELEMENT_CAST (appsrc));
2529
2530 now = gst_clock_get_time (clock);
2531 if (now > base_time)
2532 now -= base_time;
2533 else
2534 now = 0;
2535 gst_object_unref (clock);
2536
2537 if (buflist == NULL) {
2538 if (!steal_ref) {
2539 buffer = gst_buffer_copy (buffer);
2540 steal_ref = TRUE;
2541 } else {
2542 buffer = gst_buffer_make_writable (buffer);
2543 }
2544 } else {
2545 if (!steal_ref) {
2546 buflist = gst_buffer_list_copy (buflist);
2547 steal_ref = TRUE;
2548 } else {
2549 buflist = gst_buffer_list_make_writable (buflist);
2550 }
2551 buffer = gst_buffer_list_get_writable (buflist, 0);
2552 }
2553
2554 GST_BUFFER_PTS (buffer) = now;
2555 GST_BUFFER_DTS (buffer) = now;
2556 } else {
2557 GST_WARNING_OBJECT (appsrc,
2558 "do-timestamp=TRUE but buffers are provided before "
2559 "reaching the PLAYING state and having a clock. Timestamps will "
2560 "not be accurate!");
2561 }
2562 }
2563
2564 g_mutex_lock (&priv->mutex);
2565
2566 while (TRUE) {
2567 /* can't accept buffers when we are flushing or EOS */
2568 if (priv->flushing)
2569 goto flushing;
2570
2571 if (priv->is_eos)
2572 goto eos;
2573
2574 if ((priv->max_bytes && priv->queued_bytes >= priv->max_bytes) ||
2575 (priv->max_buffers && priv->queued_buffers >= priv->max_buffers) ||
2576 (priv->max_time && priv->queued_time >= priv->max_time)) {
2577 GST_DEBUG_OBJECT (appsrc,
2578 "queue filled (queued %" G_GUINT64_FORMAT " bytes, max %"
2579 G_GUINT64_FORMAT " bytes, " "queued %" G_GUINT64_FORMAT
2580 " buffers, max %" G_GUINT64_FORMAT " buffers, " "queued %"
2581 GST_TIME_FORMAT " time, max %" GST_TIME_FORMAT " time)",
2582 priv->queued_bytes, priv->max_bytes, priv->queued_buffers,
2583 priv->max_buffers, GST_TIME_ARGS (priv->queued_time),
2584 GST_TIME_ARGS (priv->max_time));
2585
2586 if (first) {
2587 Callbacks *callbacks = NULL;
2588 gboolean emit;
2589
2590 emit = priv->emit_signals;
2591 if (priv->callbacks)
2592 callbacks = callbacks_ref (priv->callbacks);
2593 /* only signal on the first push */
2594 g_mutex_unlock (&priv->mutex);
2595
2596 if (callbacks && callbacks->callbacks.enough_data)
2597 callbacks->callbacks.enough_data (appsrc, callbacks->user_data);
2598 else if (emit)
2599 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
2600 NULL);
2601
2602 g_clear_pointer (&callbacks, callbacks_unref);
2603
2604 g_mutex_lock (&priv->mutex);
2605 }
2606
2607 if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) {
2608 priv->need_discont_upstream = TRUE;
2609 goto dropped;
2610 } else if (priv->leaky_type == GST_APP_LEAKY_TYPE_DOWNSTREAM) {
2611 guint i, length = gst_queue_array_get_length (priv->queue);
2612 GstMiniObject *item = NULL;
2613
2614 /* Find the oldest buffer or buffer list and drop it, then update the
2615 * limits. Dropping one is sufficient to go below the limits again.
2616 */
2617 for (i = 0; i < length; i++) {
2618 item = gst_queue_array_peek_nth (priv->queue, i);
2619 if (GST_IS_BUFFER (item) || GST_IS_BUFFER_LIST (item)) {
2620 gst_queue_array_drop_element (priv->queue, i);
2621 break;
2622 }
2623 /* To not accidentally have an event after the loop */
2624 item = NULL;
2625 }
2626
2627 if (!item) {
2628 GST_FIXME_OBJECT (appsrc,
2629 "No buffer or buffer list queued but queue is full");
2630 /* This shouldn't really happen but in this case we can't really do
2631 * anything apart from accepting the buffer / bufferlist */
2632 break;
2633 }
2634
2635 GST_WARNING_OBJECT (appsrc, "Dropping old item %" GST_PTR_FORMAT, item);
2636
2637 gst_app_src_update_queued_pop (appsrc, item, FALSE);
2638 gst_mini_object_unref (item);
2639
2640 priv->need_discont_downstream = TRUE;
2641 continue;
2642 }
2643
2644 if (first) {
2645 /* continue to check for flushing/eos after releasing the lock */
2646 first = FALSE;
2647 continue;
2648 }
2649 if (priv->block) {
2650 GST_DEBUG_OBJECT (appsrc, "waiting for free space");
2651 /* we are filled, wait until a buffer gets popped or when we
2652 * flush. */
2653 priv->wait_status |= APP_WAITING;
2654 g_cond_wait (&priv->cond, &priv->mutex);
2655 priv->wait_status &= ~APP_WAITING;
2656 } else {
2657 /* no need to wait for free space, we just pump more data into the
2658 * queue hoping that the caller reacts to the enough-data signal and
2659 * stops pushing buffers. */
2660 break;
2661 }
2662 } else {
2663 break;
2664 }
2665 }
2666
2667 if (priv->pending_custom_segment) {
2668 GstEvent *event = gst_event_new_segment (&priv->last_segment);
2669
2670 GST_DEBUG_OBJECT (appsrc, "enqueue new segment %" GST_PTR_FORMAT, event);
2671 gst_queue_array_push_tail (priv->queue, event);
2672 priv->pending_custom_segment = FALSE;
2673 }
2674
2675 if (buflist != NULL) {
2676 /* Mark the first buffer of the buffer list as DISCONT if we previously
2677 * dropped a buffer instead of queueing it */
2678 if (priv->need_discont_upstream) {
2679 if (!steal_ref) {
2680 buflist = gst_buffer_list_copy (buflist);
2681 steal_ref = TRUE;
2682 } else {
2683 buflist = gst_buffer_list_make_writable (buflist);
2684 }
2685 buffer = gst_buffer_list_get_writable (buflist, 0);
2686 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2687 priv->need_discont_upstream = FALSE;
2688 }
2689
2690 GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);
2691
2692 if (!steal_ref)
2693 gst_buffer_list_ref (buflist);
2694 gst_queue_array_push_tail (priv->queue, buflist);
2695 } else {
2696 /* Mark the buffer as DISCONT if we previously dropped a buffer instead of
2697 * queueing it */
2698 if (priv->need_discont_upstream) {
2699 if (!steal_ref) {
2700 buffer = gst_buffer_copy (buffer);
2701 steal_ref = TRUE;
2702 } else {
2703 buffer = gst_buffer_make_writable (buffer);
2704 }
2705 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2706 priv->need_discont_upstream = FALSE;
2707 }
2708
2709 GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
2710 if (!steal_ref)
2711 gst_buffer_ref (buffer);
2712 gst_queue_array_push_tail (priv->queue, buffer);
2713 }
2714
2715 gst_app_src_update_queued_push (appsrc,
2716 buflist ? GST_MINI_OBJECT_CAST (buflist) : GST_MINI_OBJECT_CAST (buffer));
2717
2718 if ((priv->wait_status & STREAM_WAITING))
2719 g_cond_broadcast (&priv->cond);
2720
2721 g_mutex_unlock (&priv->mutex);
2722
2723 return GST_FLOW_OK;
2724
2725 /* ERRORS */
2726 flushing:
2727 {
2728 GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
2729 if (steal_ref) {
2730 if (buflist)
2731 gst_buffer_list_unref (buflist);
2732 else
2733 gst_buffer_unref (buffer);
2734 }
2735 g_mutex_unlock (&priv->mutex);
2736 return GST_FLOW_FLUSHING;
2737 }
2738 eos:
2739 {
2740 GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
2741 if (steal_ref) {
2742 if (buflist)
2743 gst_buffer_list_unref (buflist);
2744 else
2745 gst_buffer_unref (buffer);
2746 }
2747 g_mutex_unlock (&priv->mutex);
2748 return GST_FLOW_EOS;
2749 }
2750 dropped:
2751 {
2752 GST_DEBUG_OBJECT (appsrc, "dropped new buffer %p, we are full", buffer);
2753 if (steal_ref) {
2754 if (buflist)
2755 gst_buffer_list_unref (buflist);
2756 else
2757 gst_buffer_unref (buffer);
2758 }
2759 g_mutex_unlock (&priv->mutex);
2760 return GST_FLOW_EOS;
2761 }
2762 }
2763
2764 static GstFlowReturn
gst_app_src_push_buffer_full(GstAppSrc * appsrc,GstBuffer * buffer,gboolean steal_ref)2765 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
2766 gboolean steal_ref)
2767 {
2768 return gst_app_src_push_internal (appsrc, buffer, NULL, steal_ref);
2769 }
2770
2771 static GstFlowReturn
gst_app_src_push_sample_internal(GstAppSrc * appsrc,GstSample * sample)2772 gst_app_src_push_sample_internal (GstAppSrc * appsrc, GstSample * sample)
2773 {
2774 GstAppSrcPrivate *priv = appsrc->priv;
2775 GstBufferList *buffer_list;
2776 GstBuffer *buffer;
2777 GstCaps *caps;
2778
2779 g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
2780
2781 caps = gst_sample_get_caps (sample);
2782 if (caps != NULL) {
2783 gst_app_src_set_caps (appsrc, caps);
2784 } else {
2785 GST_WARNING_OBJECT (appsrc, "received sample without caps");
2786 }
2787
2788 if (priv->handle_segment_change && priv->format == GST_FORMAT_TIME) {
2789 GstSegment *segment = gst_sample_get_segment (sample);
2790
2791 if (segment->format != GST_FORMAT_TIME) {
2792 GST_LOG_OBJECT (appsrc, "format %s is not supported",
2793 gst_format_get_name (segment->format));
2794 goto handle_buffer;
2795 }
2796
2797 g_mutex_lock (&priv->mutex);
2798 if (gst_segment_is_equal (&priv->last_segment, segment)) {
2799 GST_LOG_OBJECT (appsrc, "segment wasn't changed");
2800 g_mutex_unlock (&priv->mutex);
2801 goto handle_buffer;
2802 } else {
2803 GST_LOG_OBJECT (appsrc,
2804 "segment changed %" GST_SEGMENT_FORMAT " -> %" GST_SEGMENT_FORMAT,
2805 &priv->last_segment, segment);
2806 }
2807
2808 /* will be pushed to queue with next buffer/buffer-list */
2809 gst_segment_copy_into (segment, &priv->last_segment);
2810 priv->pending_custom_segment = TRUE;
2811 g_mutex_unlock (&priv->mutex);
2812 }
2813
2814 handle_buffer:
2815
2816 buffer = gst_sample_get_buffer (sample);
2817 if (buffer != NULL)
2818 return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
2819
2820 buffer_list = gst_sample_get_buffer_list (sample);
2821 if (buffer_list != NULL)
2822 return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
2823
2824 GST_WARNING_OBJECT (appsrc, "received sample without buffer or buffer list");
2825 return GST_FLOW_OK;
2826 }
2827
2828 /**
2829 * gst_app_src_push_buffer:
2830 * @appsrc: a #GstAppSrc
2831 * @buffer: (transfer full): a #GstBuffer to push
2832 *
2833 * Adds a buffer to the queue of buffers that the appsrc element will
2834 * push to its source pad. This function takes ownership of the buffer.
2835 *
2836 * When the block property is TRUE, this function can block until free
2837 * space becomes available in the queue.
2838 *
2839 * Returns: #GST_FLOW_OK when the buffer was successfully queued.
2840 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2841 * #GST_FLOW_EOS when EOS occurred.
2842 */
2843 GstFlowReturn
gst_app_src_push_buffer(GstAppSrc * appsrc,GstBuffer * buffer)2844 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
2845 {
2846 return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
2847 }
2848
2849 /**
2850 * gst_app_src_push_buffer_list:
2851 * @appsrc: a #GstAppSrc
2852 * @buffer_list: (transfer full): a #GstBufferList to push
2853 *
2854 * Adds a buffer list to the queue of buffers and buffer lists that the
2855 * appsrc element will push to its source pad. This function takes ownership
2856 * of @buffer_list.
2857 *
2858 * When the block property is TRUE, this function can block until free
2859 * space becomes available in the queue.
2860 *
2861 * Returns: #GST_FLOW_OK when the buffer list was successfully queued.
2862 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2863 * #GST_FLOW_EOS when EOS occurred.
2864 *
2865 * Since: 1.14
2866 */
2867 GstFlowReturn
gst_app_src_push_buffer_list(GstAppSrc * appsrc,GstBufferList * buffer_list)2868 gst_app_src_push_buffer_list (GstAppSrc * appsrc, GstBufferList * buffer_list)
2869 {
2870 return gst_app_src_push_internal (appsrc, NULL, buffer_list, TRUE);
2871 }
2872
2873 /**
2874 * gst_app_src_push_sample:
2875 * @appsrc: a #GstAppSrc
2876 * @sample: (transfer none): a #GstSample from which buffer and caps may be
2877 * extracted
2878 *
2879 * Extract a buffer from the provided sample and adds it to the queue of
2880 * buffers that the appsrc element will push to its source pad. Any
2881 * previous caps that were set on appsrc will be replaced by the caps
2882 * associated with the sample if not equal.
2883 *
2884 * This function does not take ownership of the
2885 * sample so the sample needs to be unreffed after calling this function.
2886 *
2887 * When the block property is TRUE, this function can block until free
2888 * space becomes available in the queue.
2889 *
2890 * Returns: #GST_FLOW_OK when the buffer was successfully queued.
2891 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2892 * #GST_FLOW_EOS when EOS occurred.
2893 *
2894 * Since: 1.6
2895 *
2896 */
2897 GstFlowReturn
gst_app_src_push_sample(GstAppSrc * appsrc,GstSample * sample)2898 gst_app_src_push_sample (GstAppSrc * appsrc, GstSample * sample)
2899 {
2900 return gst_app_src_push_sample_internal (appsrc, sample);
2901 }
2902
2903 /* push a buffer without stealing the ref of the buffer. This is used for the
2904 * action signal. */
2905 static GstFlowReturn
gst_app_src_push_buffer_action(GstAppSrc * appsrc,GstBuffer * buffer)2906 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
2907 {
2908 return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
2909 }
2910
2911 /* push a buffer list without stealing the ref of the buffer list. This is
2912 * used for the action signal. */
2913 static GstFlowReturn
gst_app_src_push_buffer_list_action(GstAppSrc * appsrc,GstBufferList * buffer_list)2914 gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
2915 GstBufferList * buffer_list)
2916 {
2917 return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
2918 }
2919
2920 /* push a sample without stealing the ref. This is used for the
2921 * action signal. */
2922 static GstFlowReturn
gst_app_src_push_sample_action(GstAppSrc * appsrc,GstSample * sample)2923 gst_app_src_push_sample_action (GstAppSrc * appsrc, GstSample * sample)
2924 {
2925 return gst_app_src_push_sample_internal (appsrc, sample);
2926 }
2927
2928 /**
2929 * gst_app_src_end_of_stream:
2930 * @appsrc: a #GstAppSrc
2931 *
2932 * Indicates to the appsrc element that the last buffer queued in the
2933 * element is the last buffer of the stream.
2934 *
2935 * Returns: #GST_FLOW_OK when the EOS was successfully queued.
2936 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2937 */
2938 GstFlowReturn
gst_app_src_end_of_stream(GstAppSrc * appsrc)2939 gst_app_src_end_of_stream (GstAppSrc * appsrc)
2940 {
2941 GstAppSrcPrivate *priv;
2942
2943 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
2944
2945 priv = appsrc->priv;
2946
2947 g_mutex_lock (&priv->mutex);
2948 /* can't accept buffers when we are flushing. We can accept them when we are
2949 * EOS although it will not do anything. */
2950 if (priv->flushing)
2951 goto flushing;
2952
2953 GST_DEBUG_OBJECT (appsrc, "sending EOS");
2954 priv->is_eos = TRUE;
2955 g_cond_broadcast (&priv->cond);
2956 g_mutex_unlock (&priv->mutex);
2957
2958 return GST_FLOW_OK;
2959
2960 /* ERRORS */
2961 flushing:
2962 {
2963 g_mutex_unlock (&priv->mutex);
2964 GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
2965 return GST_FLOW_FLUSHING;
2966 }
2967 }
2968
2969 /**
2970 * gst_app_src_set_callbacks: (skip)
2971 * @appsrc: a #GstAppSrc
2972 * @callbacks: the callbacks
2973 * @user_data: a user_data argument for the callbacks
2974 * @notify: a destroy notify function
2975 *
2976 * Set callbacks which will be executed when data is needed, enough data has
2977 * been collected or when a seek should be performed.
2978 * This is an alternative to using the signals, it has lower overhead and is thus
2979 * less expensive, but also less flexible.
2980 *
2981 * If callbacks are installed, no signals will be emitted for performance
2982 * reasons.
2983 *
2984 * Before 1.16.3 it was not possible to change the callbacks in a thread-safe
2985 * way.
2986 */
2987 void
gst_app_src_set_callbacks(GstAppSrc * appsrc,GstAppSrcCallbacks * callbacks,gpointer user_data,GDestroyNotify notify)2988 gst_app_src_set_callbacks (GstAppSrc * appsrc,
2989 GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
2990 {
2991 Callbacks *old_callbacks, *new_callbacks = NULL;
2992 GstAppSrcPrivate *priv;
2993
2994 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2995 g_return_if_fail (callbacks != NULL);
2996
2997 priv = appsrc->priv;
2998
2999 if (callbacks) {
3000 new_callbacks = g_new0 (Callbacks, 1);
3001 new_callbacks->callbacks = *callbacks;
3002 new_callbacks->user_data = user_data;
3003 new_callbacks->destroy_notify = notify;
3004 new_callbacks->ref_count = 1;
3005 }
3006
3007 g_mutex_lock (&priv->mutex);
3008 old_callbacks = g_steal_pointer (&priv->callbacks);
3009 priv->callbacks = g_steal_pointer (&new_callbacks);
3010 g_mutex_unlock (&priv->mutex);
3011
3012 g_clear_pointer (&old_callbacks, callbacks_unref);
3013 }
3014
3015 /*** GSTURIHANDLER INTERFACE *************************************************/
3016
3017 static GstURIType
gst_app_src_uri_get_type(GType type)3018 gst_app_src_uri_get_type (GType type)
3019 {
3020 return GST_URI_SRC;
3021 }
3022
3023 static const gchar *const *
gst_app_src_uri_get_protocols(GType type)3024 gst_app_src_uri_get_protocols (GType type)
3025 {
3026 static const gchar *protocols[] = { "appsrc", NULL };
3027
3028 return protocols;
3029 }
3030
3031 static gchar *
gst_app_src_uri_get_uri(GstURIHandler * handler)3032 gst_app_src_uri_get_uri (GstURIHandler * handler)
3033 {
3034 GstAppSrc *appsrc = GST_APP_SRC (handler);
3035
3036 return appsrc->priv->uri ? g_strdup (appsrc->priv->uri) : NULL;
3037 }
3038
3039 static gboolean
gst_app_src_uri_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)3040 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
3041 GError ** error)
3042 {
3043 GstAppSrc *appsrc = GST_APP_SRC (handler);
3044
3045 g_free (appsrc->priv->uri);
3046 appsrc->priv->uri = uri ? g_strdup (uri) : NULL;
3047
3048 return TRUE;
3049 }
3050
3051 static void
gst_app_src_uri_handler_init(gpointer g_iface,gpointer iface_data)3052 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
3053 {
3054 GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
3055
3056 iface->get_type = gst_app_src_uri_get_type;
3057 iface->get_protocols = gst_app_src_uri_get_protocols;
3058 iface->get_uri = gst_app_src_uri_get_uri;
3059 iface->set_uri = gst_app_src_uri_set_uri;
3060 }
3061
3062 static gboolean
gst_app_src_event(GstBaseSrc * src,GstEvent * event)3063 gst_app_src_event (GstBaseSrc * src, GstEvent * event)
3064 {
3065 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
3066 GstAppSrcPrivate *priv = appsrc->priv;
3067
3068 switch (GST_EVENT_TYPE (event)) {
3069 case GST_EVENT_FLUSH_STOP:
3070 g_mutex_lock (&priv->mutex);
3071 priv->is_eos = FALSE;
3072 g_mutex_unlock (&priv->mutex);
3073 break;
3074 default:
3075 break;
3076 }
3077
3078 return GST_BASE_SRC_CLASS (parent_class)->event (src, event);
3079 }
3080