1 /* GStreamer
2 * Copyright (C) 2007 David Schleef <ds@schleef.org>
3 * (C) 2008 Wim Taymans <wim.taymans@gmail.com>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
14 *
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20 /**
21 * SECTION:gstappsrc
22 * @title: GstAppSrc
23 * @short_description: Easy way for applications to inject buffers into a
24 * pipeline
25 * @see_also: #GstBaseSrc, appsink
26 *
27 * The appsrc element can be used by applications to insert data into a
28 * GStreamer pipeline. Unlike most GStreamer elements, appsrc provides
29 * external API functions.
30 *
31 * appsrc can be used by linking with the libgstapp library to access the
32 * methods directly or by using the appsrc action signals.
33 *
34 * Before operating appsrc, the caps property must be set to fixed caps
35 * describing the format of the data that will be pushed with appsrc. An
36 * exception to this is when pushing buffers with unknown caps, in which case no
37 * caps should be set. This is typically true of file-like sources that push raw
38 * byte buffers. If you don't want to explicitly set the caps, you can use
39 * gst_app_src_push_sample. This method gets the caps associated with the
40 * sample and sets them on the appsrc replacing any previously set caps (if
41 * different from sample's caps).
42 *
43 * The main way of handing data to the appsrc element is by calling the
44 * gst_app_src_push_buffer() method or by emitting the push-buffer action signal.
45 * This will put the buffer onto a queue from which appsrc will read from in its
46 * streaming thread. It is important to note that data transport will not happen
47 * from the thread that performed the push-buffer call.
48 *
49 * The "max-bytes", "max-buffers" and "max-time" properties control how much
50 * data can be queued in appsrc before appsrc considers the queue full. A
51 * filled internal queue will always signal the "enough-data" signal, which
52 * signals the application that it should stop pushing data into appsrc. The
53 * "block" property will cause appsrc to block the push-buffer method until
54 * free data becomes available again.
55 *
56 * When the internal queue is running out of data, the "need-data" signal is
57 * emitted, which signals the application that it should start pushing more data
58 * into appsrc.
59 *
60 * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
61 * "seek-data" signal when the "stream-mode" property is set to "seekable" or
62 * "random-access". The signal argument will contain the new desired position in
63 * the stream expressed in the unit set with the "format" property. After
64 * receiving the seek-data signal, the application should push-buffers from the
65 * new position.
66 *
67 * These signals allow the application to operate the appsrc in two different
68 * ways:
69 *
70 * The push mode, in which the application repeatedly calls the push-buffer/push-sample
71 * method with a new buffer/sample. Optionally, the queue size in the appsrc
72 * can be controlled with the enough-data and need-data signals by respectively
73 * stopping/starting the push-buffer/push-sample calls. This is a typical
74 * mode of operation for the stream-type "stream" and "seekable". Use this
75 * mode when implementing various network protocols or hardware devices.
76 *
77 * The pull mode, in which the need-data signal triggers the next push-buffer call.
78 * This mode is typically used in the "random-access" stream-type. Use this
79 * mode for file access or other randomly accessible sources. In this mode, a
80 * buffer of exactly the amount of bytes given by the need-data signal should be
81 * pushed into appsrc.
82 *
83 * In all modes, the size property on appsrc should contain the total stream
84 * size in bytes. Setting this property is mandatory in the random-access mode.
85 * For the stream and seekable modes, setting this property is optional but
86 * recommended.
87 *
88 * When the application has finished pushing data into appsrc, it should call
89 * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
90 * this call, no more buffers can be pushed into appsrc until a flushing seek
91 * occurs or the state of the appsrc has gone through READY.
92 */
93
94 #ifdef HAVE_CONFIG_H
95 #include "config.h"
96 #endif
97
98 #include <gst/gst.h>
99 #include <gst/base/base.h>
100
101 #include <string.h>
102
103 #include "gstappsrc.h"
104
105 typedef enum
106 {
107 NOONE_WAITING = 0,
108 STREAM_WAITING = 1 << 0, /* streaming thread is waiting for application thread */
109 APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */
110 } GstAppSrcWaitStatus;
111
112 typedef struct
113 {
114 GstAppSrcCallbacks callbacks;
115 gpointer user_data;
116 GDestroyNotify destroy_notify;
117 gint ref_count;
118 } Callbacks;
119
120 static Callbacks *
callbacks_ref(Callbacks * callbacks)121 callbacks_ref (Callbacks * callbacks)
122 {
123 g_atomic_int_inc (&callbacks->ref_count);
124
125 return callbacks;
126 }
127
128 static void
callbacks_unref(Callbacks * callbacks)129 callbacks_unref (Callbacks * callbacks)
130 {
131 if (!g_atomic_int_dec_and_test (&callbacks->ref_count))
132 return;
133
134 if (callbacks->destroy_notify)
135 callbacks->destroy_notify (callbacks->user_data);
136
137 g_free (callbacks);
138 }
139
140
141 struct _GstAppSrcPrivate
142 {
143 GCond cond;
144 GMutex mutex;
145 GstQueueArray *queue;
146 GstAppSrcWaitStatus wait_status;
147
148 GstCaps *last_caps;
149 GstCaps *current_caps;
150 /* last segment received on the input */
151 GstSegment last_segment;
152 /* currently configured segment for the output */
153 GstSegment current_segment;
154 /* queue up a segment event based on last_segment before
155 * the next buffer of buffer list */
156 gboolean pending_custom_segment;
157
158 /* the next buffer that will be queued needs a discont flag
159 * because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */
160 gboolean need_discont_upstream;
161 /* the next buffer that will be dequeued needs a discont flag
162 * because the previous one was dropped - GST_APP_LEAKY_TYPE_DOWNSTREAM */
163 gboolean need_discont_downstream;
164
165 gint64 size;
166 GstClockTime duration;
167 GstAppStreamType stream_type;
168 guint64 max_bytes, max_buffers, max_time;
169 GstFormat format;
170 gboolean block;
171 gchar *uri;
172
173 gboolean flushing;
174 gboolean started;
175 gboolean is_eos;
176 guint64 queued_bytes, queued_buffers;
177 /* Used to calculate the current time level */
178 GstClockTime last_in_running_time, last_out_running_time;
179 /* Updated based on the above whenever they change */
180 GstClockTime queued_time;
181 guint64 offset;
182 GstAppStreamType current_type;
183
184 guint64 min_latency;
185 guint64 max_latency;
186 gboolean emit_signals;
187 guint min_percent;
188 gboolean handle_segment_change;
189
190 GstAppLeakyType leaky_type;
191
192 Callbacks *callbacks;
193 };
194
195 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
196 #define GST_CAT_DEFAULT app_src_debug
197
198 enum
199 {
200 /* signals */
201 SIGNAL_NEED_DATA,
202 SIGNAL_ENOUGH_DATA,
203 SIGNAL_SEEK_DATA,
204
205 /* actions */
206 SIGNAL_PUSH_BUFFER,
207 SIGNAL_END_OF_STREAM,
208 SIGNAL_PUSH_SAMPLE,
209 SIGNAL_PUSH_BUFFER_LIST,
210
211 LAST_SIGNAL
212 };
213
214 #define DEFAULT_PROP_SIZE -1
215 #define DEFAULT_PROP_STREAM_TYPE GST_APP_STREAM_TYPE_STREAM
216 #define DEFAULT_PROP_MAX_BYTES 200000
217 #define DEFAULT_PROP_MAX_BUFFERS 0
218 #define DEFAULT_PROP_MAX_TIME (0 * GST_SECOND)
219 #define DEFAULT_PROP_FORMAT GST_FORMAT_BYTES
220 #define DEFAULT_PROP_BLOCK FALSE
221 #define DEFAULT_PROP_IS_LIVE FALSE
222 #define DEFAULT_PROP_MIN_LATENCY -1
223 #define DEFAULT_PROP_MAX_LATENCY -1
224 #define DEFAULT_PROP_EMIT_SIGNALS TRUE
225 #define DEFAULT_PROP_MIN_PERCENT 0
226 #define DEFAULT_PROP_CURRENT_LEVEL_BYTES 0
227 #define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0
228 #define DEFAULT_PROP_CURRENT_LEVEL_TIME 0
229 #define DEFAULT_PROP_DURATION GST_CLOCK_TIME_NONE
230 #define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE
231 #define DEFAULT_PROP_LEAKY_TYPE GST_APP_LEAKY_TYPE_NONE
232
233 enum
234 {
235 PROP_0,
236 PROP_CAPS,
237 PROP_SIZE,
238 PROP_STREAM_TYPE,
239 PROP_MAX_BYTES,
240 PROP_MAX_BUFFERS,
241 PROP_MAX_TIME,
242 PROP_FORMAT,
243 PROP_BLOCK,
244 PROP_IS_LIVE,
245 PROP_MIN_LATENCY,
246 PROP_MAX_LATENCY,
247 PROP_EMIT_SIGNALS,
248 PROP_MIN_PERCENT,
249 PROP_CURRENT_LEVEL_BYTES,
250 PROP_CURRENT_LEVEL_BUFFERS,
251 PROP_CURRENT_LEVEL_TIME,
252 PROP_DURATION,
253 PROP_HANDLE_SEGMENT_CHANGE,
254 PROP_LEAKY_TYPE,
255 PROP_LAST
256 };
257
258 static GstStaticPadTemplate gst_app_src_template =
259 GST_STATIC_PAD_TEMPLATE ("src",
260 GST_PAD_SRC,
261 GST_PAD_ALWAYS,
262 GST_STATIC_CAPS_ANY);
263
264 static void gst_app_src_uri_handler_init (gpointer g_iface,
265 gpointer iface_data);
266
267 static void gst_app_src_dispose (GObject * object);
268 static void gst_app_src_finalize (GObject * object);
269
270 static void gst_app_src_set_property (GObject * object, guint prop_id,
271 const GValue * value, GParamSpec * pspec);
272 static void gst_app_src_get_property (GObject * object, guint prop_id,
273 GValue * value, GParamSpec * pspec);
274
275 static gboolean gst_app_src_send_event (GstElement * element, GstEvent * event);
276
277 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
278 gboolean do_min, guint64 min, gboolean do_max, guint64 max);
279
280 static gboolean gst_app_src_negotiate (GstBaseSrc * basesrc);
281 static GstCaps *gst_app_src_internal_get_caps (GstBaseSrc * bsrc,
282 GstCaps * filter);
283 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset,
284 guint size, GstBuffer ** buf);
285 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
286 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
287 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
288 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
289 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
290 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
291 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
292 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
293 static gboolean gst_app_src_event (GstBaseSrc * src, GstEvent * event);
294
295 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
296 GstBuffer * buffer);
297 static GstFlowReturn gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
298 GstBufferList * buffer_list);
299 static GstFlowReturn gst_app_src_push_sample_action (GstAppSrc * appsrc,
300 GstSample * sample);
301
302 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
303
304 #define gst_app_src_parent_class parent_class
305 G_DEFINE_TYPE_WITH_CODE (GstAppSrc, gst_app_src, GST_TYPE_BASE_SRC,
306 G_ADD_PRIVATE (GstAppSrc)
307 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_app_src_uri_handler_init));
308
309 static void
gst_app_src_class_init(GstAppSrcClass * klass)310 gst_app_src_class_init (GstAppSrcClass * klass)
311 {
312 GObjectClass *gobject_class = (GObjectClass *) klass;
313 GstElementClass *element_class = (GstElementClass *) klass;
314 GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
315
316 GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
317
318 gobject_class->dispose = gst_app_src_dispose;
319 gobject_class->finalize = gst_app_src_finalize;
320
321 gobject_class->set_property = gst_app_src_set_property;
322 gobject_class->get_property = gst_app_src_get_property;
323
324 /**
325 * GstAppSrc:caps:
326 *
327 * The GstCaps that will negotiated downstream and will be put
328 * on outgoing buffers.
329 */
330 g_object_class_install_property (gobject_class, PROP_CAPS,
331 g_param_spec_boxed ("caps", "Caps",
332 "The allowed caps for the src pad", GST_TYPE_CAPS,
333 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
334 /**
335 * GstAppSrc:format:
336 *
337 * The format to use for segment events. When the source is producing
338 * timestamped buffers this property should be set to GST_FORMAT_TIME.
339 */
340 g_object_class_install_property (gobject_class, PROP_FORMAT,
341 g_param_spec_enum ("format", "Format",
342 "The format of the segment events and seek", GST_TYPE_FORMAT,
343 DEFAULT_PROP_FORMAT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344 /**
345 * GstAppSrc:size:
346 *
347 * The total size in bytes of the data stream. If the total size is known, it
348 * is recommended to configure it with this property.
349 */
350 g_object_class_install_property (gobject_class, PROP_SIZE,
351 g_param_spec_int64 ("size", "Size",
352 "The size of the data stream in bytes (-1 if unknown)",
353 -1, G_MAXINT64, DEFAULT_PROP_SIZE,
354 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
355 /**
356 * GstAppSrc:stream-type:
357 *
358 * The type of stream that this source is producing. For seekable streams the
359 * application should connect to the seek-data signal.
360 */
361 g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
362 g_param_spec_enum ("stream-type", "Stream Type",
363 "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
364 DEFAULT_PROP_STREAM_TYPE,
365 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
366 /**
367 * GstAppSrc:max-bytes:
368 *
369 * The maximum amount of bytes that can be queued internally.
370 * After the maximum amount of bytes are queued, appsrc will emit the
371 * "enough-data" signal.
372 */
373 g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
374 g_param_spec_uint64 ("max-bytes", "Max bytes",
375 "The maximum number of bytes to queue internally (0 = unlimited)",
376 0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
377 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
378
379 /**
380 * GstAppSrc:max-buffers:
381 *
382 * The maximum amount of buffers that can be queued internally.
383 * After the maximum amount of buffers are queued, appsrc will emit the
384 * "enough-data" signal.
385 *
386 * Since: 1.20
387 */
388 g_object_class_install_property (gobject_class, PROP_MAX_BUFFERS,
389 g_param_spec_uint64 ("max-buffers", "Max buffers",
390 "The maximum number of buffers to queue internally (0 = unlimited)",
391 0, G_MAXUINT64, DEFAULT_PROP_MAX_BUFFERS,
392 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
393
394 /**
395 * GstAppSrc:max-time:
396 *
397 * The maximum amount of time that can be queued internally.
398 * After the maximum amount of time are queued, appsrc will emit the
399 * "enough-data" signal.
400 *
401 * Since: 1.20
402 */
403 g_object_class_install_property (gobject_class, PROP_MAX_TIME,
404 g_param_spec_uint64 ("max-time", "Max time",
405 "The maximum amount of time to queue internally (0 = unlimited)",
406 0, G_MAXUINT64, DEFAULT_PROP_MAX_TIME,
407 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
408
409 /**
410 * GstAppSrc:block:
411 *
412 * When max-bytes are queued and after the enough-data signal has been emitted,
413 * block any further push-buffer calls until the amount of queued bytes drops
414 * below the max-bytes limit.
415 */
416 g_object_class_install_property (gobject_class, PROP_BLOCK,
417 g_param_spec_boolean ("block", "Block",
418 "Block push-buffer when max-bytes are queued",
419 DEFAULT_PROP_BLOCK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
420
421 /**
422 * GstAppSrc:is-live:
423 *
424 * Instruct the source to behave like a live source. This includes that it
425 * will only push out buffers in the PLAYING state.
426 */
427 g_object_class_install_property (gobject_class, PROP_IS_LIVE,
428 g_param_spec_boolean ("is-live", "Is Live",
429 "Whether to act as a live source",
430 DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
431 /**
432 * GstAppSrc:min-latency:
433 *
434 * The minimum latency of the source. A value of -1 will use the default
435 * latency calculations of #GstBaseSrc.
436 */
437 g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
438 g_param_spec_int64 ("min-latency", "Min Latency",
439 "The minimum latency (-1 = default)",
440 -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
441 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442 /**
443 * GstAppSrc::max-latency:
444 *
445 * The maximum latency of the source. A value of -1 means an unlimited amount
446 * of latency.
447 */
448 g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
449 g_param_spec_int64 ("max-latency", "Max Latency",
450 "The maximum latency (-1 = unlimited)",
451 -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
452 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
453
454 /**
455 * GstAppSrc:emit-signals:
456 *
457 * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
458 * This option is by default enabled for backwards compatibility reasons but
459 * can disabled when needed because signal emission is expensive.
460 */
461 g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
462 g_param_spec_boolean ("emit-signals", "Emit signals",
463 "Emit need-data, enough-data and seek-data signals",
464 DEFAULT_PROP_EMIT_SIGNALS,
465 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
466
467 /**
468 * GstAppSrc:min-percent:
469 *
470 * Make appsrc emit the "need-data" signal when the amount of bytes in the
471 * queue drops below this percentage of max-bytes.
472 */
473 g_object_class_install_property (gobject_class, PROP_MIN_PERCENT,
474 g_param_spec_uint ("min-percent", "Min Percent",
475 "Emit need-data when queued bytes drops below this percent of max-bytes",
476 0, 100, DEFAULT_PROP_MIN_PERCENT,
477 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
478
479 /**
480 * GstAppSrc:current-level-bytes:
481 *
482 * The number of currently queued bytes inside appsrc.
483 *
484 * Since: 1.2
485 */
486 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BYTES,
487 g_param_spec_uint64 ("current-level-bytes", "Current Level Bytes",
488 "The number of currently queued bytes",
489 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BYTES,
490 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
491
492 /**
493 * GstAppSrc:current-level-buffers:
494 *
495 * The number of currently queued buffers inside appsrc.
496 *
497 * Since: 1.20
498 */
499 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_BUFFERS,
500 g_param_spec_uint64 ("current-level-buffers", "Current Level Buffers",
501 "The number of currently queued buffers",
502 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_BUFFERS,
503 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
504
505 /**
506 * GstAppSrc:current-level-time:
507 *
508 * The amount of currently queued time inside appsrc.
509 *
510 * Since: 1.20
511 */
512 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME,
513 g_param_spec_uint64 ("current-level-time", "Current Level Time",
514 "The amount of currently queued time",
515 0, G_MAXUINT64, DEFAULT_PROP_CURRENT_LEVEL_TIME,
516 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
517
518 /**
519 * GstAppSrc:duration:
520 *
521 * The total duration in nanoseconds of the data stream. If the total duration is known, it
522 * is recommended to configure it with this property.
523 *
524 * Since: 1.10
525 */
526 g_object_class_install_property (gobject_class, PROP_DURATION,
527 g_param_spec_uint64 ("duration", "Duration",
528 "The duration of the data stream in nanoseconds (GST_CLOCK_TIME_NONE if unknown)",
529 0, G_MAXUINT64, DEFAULT_PROP_DURATION,
530 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
531
532 /**
533 * GstAppSrc:handle-segment-change:
534 *
535 * When enabled, appsrc will check GstSegment in GstSample which was
536 * pushed via gst_app_src_push_sample() or "push-sample" signal action.
537 * If a GstSegment is changed, corresponding segment event will be followed
538 * by next data flow.
539 *
540 * FIXME: currently only GST_FORMAT_TIME format is supported and therefore
541 * GstAppSrc::format should be time. However, possibly #GstAppSrc can support
542 * other formats.
543 *
544 * Since: 1.18
545 */
546 g_object_class_install_property (gobject_class, PROP_HANDLE_SEGMENT_CHANGE,
547 g_param_spec_boolean ("handle-segment-change", "Handle Segment Change",
548 "Whether to detect and handle changed time format GstSegment in "
549 "GstSample. User should set valid GstSegment in GstSample. "
550 "Must set format property as \"time\" to enable this property",
551 DEFAULT_PROP_HANDLE_SEGMENT_CHANGE,
552 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
553 G_PARAM_STATIC_STRINGS));
554
555 /**
556 * GstAppSrc:leaky-type:
557 *
558 * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
559 * will drop any buffers that are pushed into it once its internal queue is
560 * full. The selected type defines whether to drop the oldest or new
561 * buffers.
562 *
563 * Since: 1.20
564 */
565 g_object_class_install_property (gobject_class, PROP_LEAKY_TYPE,
566 g_param_spec_enum ("leaky-type", "Leaky Type",
567 "Whether to drop buffers once the internal queue is full",
568 GST_TYPE_APP_LEAKY_TYPE,
569 DEFAULT_PROP_LEAKY_TYPE,
570 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
571 G_PARAM_STATIC_STRINGS));
572
573 /**
574 * GstAppSrc::need-data:
575 * @appsrc: the appsrc element that emitted the signal
576 * @length: the amount of bytes needed.
577 *
578 * Signal that the source needs more data. In the callback or from another
579 * thread you should call push-buffer or end-of-stream.
580 *
581 * @length is just a hint and when it is set to -1, any number of bytes can be
582 * pushed into @appsrc.
583 *
584 * You can call push-buffer multiple times until the enough-data signal is
585 * fired.
586 */
587 gst_app_src_signals[SIGNAL_NEED_DATA] =
588 g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
589 G_STRUCT_OFFSET (GstAppSrcClass, need_data),
590 NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
591
592 /**
593 * GstAppSrc::enough-data:
594 * @appsrc: the appsrc element that emitted the signal
595 *
596 * Signal that the source has enough data. It is recommended that the
597 * application stops calling push-buffer until the need-data signal is
598 * emitted again to avoid excessive buffer queueing.
599 */
600 gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
601 g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
602 G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
603 NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
604
605 /**
606 * GstAppSrc::seek-data:
607 * @appsrc: the appsrc element that emitted the signal
608 * @offset: the offset to seek to
609 *
610 * Seek to the given offset. The next push-buffer should produce buffers from
611 * the new @offset.
612 * This callback is only called for seekable stream types.
613 *
614 * Returns: %TRUE if the seek succeeded.
615 */
616 gst_app_src_signals[SIGNAL_SEEK_DATA] =
617 g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
618 G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
619 NULL, NULL, NULL, G_TYPE_BOOLEAN, 1, G_TYPE_UINT64);
620
621 /**
622 * GstAppSrc::push-buffer:
623 * @appsrc: the appsrc
624 * @buffer: (transfer none): a buffer to push
625 *
626 * Adds a buffer to the queue of buffers that the appsrc element will
627 * push to its source pad.
628 *
629 * This function does not take ownership of the buffer, but it takes a
630 * reference so the buffer can be unreffed at any time after calling this
631 * function.
632 *
633 * When the block property is TRUE, this function can block until free space
634 * becomes available in the queue.
635 */
636 gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
637 g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
638 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
639 push_buffer), NULL, NULL, NULL,
640 GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
641
642 /**
643 * GstAppSrc::push-buffer-list:
644 * @appsrc: the appsrc
645 * @buffer_list: (transfer none): a buffer list to push
646 *
647 * Adds a buffer list to the queue of buffers and buffer lists that the
648 * appsrc element will push to its source pad.
649 *
650 * This function does not take ownership of the buffer list, but it takes a
651 * reference so the buffer list can be unreffed at any time after calling
652 * this function.
653 *
654 * When the block property is TRUE, this function can block until free space
655 * becomes available in the queue.
656 *
657 * Since: 1.14
658 */
659 gst_app_src_signals[SIGNAL_PUSH_BUFFER_LIST] =
660 g_signal_new ("push-buffer-list", G_TYPE_FROM_CLASS (klass),
661 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
662 push_buffer_list), NULL, NULL, NULL,
663 GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER_LIST);
664
665 /**
666 * GstAppSrc::push-sample:
667 * @appsrc: the appsrc
668 * @sample: (transfer none): a sample from which extract buffer to push
669 *
670 * Extract a buffer from the provided sample and adds the extracted buffer
671 * to the queue of buffers that the appsrc element will
672 * push to its source pad. This function set the appsrc caps based on the caps
673 * in the sample and reset the caps if they change.
674 * Only the caps and the buffer of the provided sample are used and not
675 * for example the segment in the sample.
676 *
677 * This function does not take ownership of the sample, but it takes a
678 * reference so the sample can be unreffed at any time after calling this
679 * function.
680 *
681 * When the block property is TRUE, this function can block until free space
682 * becomes available in the queue.
683 *
684 * Since: 1.6
685 */
686 gst_app_src_signals[SIGNAL_PUSH_SAMPLE] =
687 g_signal_new ("push-sample", G_TYPE_FROM_CLASS (klass),
688 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
689 push_sample), NULL, NULL, NULL,
690 GST_TYPE_FLOW_RETURN, 1, GST_TYPE_SAMPLE);
691
692
693 /**
694 * GstAppSrc::end-of-stream:
695 * @appsrc: the appsrc
696 *
697 * Notify @appsrc that no more buffer are available.
698 */
699 gst_app_src_signals[SIGNAL_END_OF_STREAM] =
700 g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
701 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
702 end_of_stream), NULL, NULL, NULL,
703 GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
704
705 gst_element_class_set_static_metadata (element_class, "AppSrc",
706 "Generic/Source", "Allow the application to feed buffers to a pipeline",
707 "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
708
709 gst_element_class_add_static_pad_template (element_class,
710 &gst_app_src_template);
711
712 element_class->send_event = gst_app_src_send_event;
713
714 basesrc_class->negotiate = gst_app_src_negotiate;
715 basesrc_class->get_caps = gst_app_src_internal_get_caps;
716 basesrc_class->create = gst_app_src_create;
717 basesrc_class->start = gst_app_src_start;
718 basesrc_class->stop = gst_app_src_stop;
719 basesrc_class->unlock = gst_app_src_unlock;
720 basesrc_class->unlock_stop = gst_app_src_unlock_stop;
721 basesrc_class->do_seek = gst_app_src_do_seek;
722 basesrc_class->is_seekable = gst_app_src_is_seekable;
723 basesrc_class->get_size = gst_app_src_do_get_size;
724 basesrc_class->query = gst_app_src_query;
725 basesrc_class->event = gst_app_src_event;
726
727 klass->push_buffer = gst_app_src_push_buffer_action;
728 klass->push_buffer_list = gst_app_src_push_buffer_list_action;
729 klass->push_sample = gst_app_src_push_sample_action;
730 klass->end_of_stream = gst_app_src_end_of_stream;
731 }
732
733 static void
gst_app_src_init(GstAppSrc * appsrc)734 gst_app_src_init (GstAppSrc * appsrc)
735 {
736 GstAppSrcPrivate *priv;
737
738 priv = appsrc->priv = gst_app_src_get_instance_private (appsrc);
739
740 g_mutex_init (&priv->mutex);
741 g_cond_init (&priv->cond);
742 priv->queue = gst_queue_array_new (16);
743 priv->wait_status = NOONE_WAITING;
744
745 priv->size = DEFAULT_PROP_SIZE;
746 priv->duration = DEFAULT_PROP_DURATION;
747 priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
748 priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
749 priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
750 priv->max_time = DEFAULT_PROP_MAX_TIME;
751 priv->format = DEFAULT_PROP_FORMAT;
752 priv->block = DEFAULT_PROP_BLOCK;
753 priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
754 priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
755 priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
756 priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
757 priv->handle_segment_change = DEFAULT_PROP_HANDLE_SEGMENT_CHANGE;
758 priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE;
759
760 gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
761 }
762
763 /* Must be called with priv->mutex */
764 static void
gst_app_src_flush_queued(GstAppSrc * src,gboolean retain_last_caps)765 gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
766 {
767 GstMiniObject *obj;
768 GstAppSrcPrivate *priv = src->priv;
769 GstCaps *requeue_caps = NULL;
770
771 while (!gst_queue_array_is_empty (priv->queue)) {
772 obj = gst_queue_array_pop_head (priv->queue);
773 if (obj) {
774 if (GST_IS_CAPS (obj) && retain_last_caps) {
775 gst_caps_replace (&requeue_caps, GST_CAPS_CAST (obj));
776 }
777 gst_mini_object_unref (obj);
778 }
779 }
780
781 if (requeue_caps) {
782 gst_queue_array_push_tail (priv->queue, requeue_caps);
783 }
784
785 priv->queued_bytes = 0;
786 priv->queued_buffers = 0;
787 priv->queued_time = 0;
788 priv->last_in_running_time = GST_CLOCK_TIME_NONE;
789 priv->last_out_running_time = GST_CLOCK_TIME_NONE;
790 priv->need_discont_upstream = FALSE;
791 priv->need_discont_downstream = FALSE;
792 }
793
794 static void
gst_app_src_dispose(GObject * obj)795 gst_app_src_dispose (GObject * obj)
796 {
797 GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
798 GstAppSrcPrivate *priv = appsrc->priv;
799 Callbacks *callbacks = NULL;
800
801 GST_OBJECT_LOCK (appsrc);
802 if (priv->current_caps) {
803 gst_caps_unref (priv->current_caps);
804 priv->current_caps = NULL;
805 }
806 if (priv->last_caps) {
807 gst_caps_unref (priv->last_caps);
808 priv->last_caps = NULL;
809 }
810 GST_OBJECT_UNLOCK (appsrc);
811
812 g_mutex_lock (&priv->mutex);
813 if (priv->callbacks)
814 callbacks = g_steal_pointer (&priv->callbacks);
815 gst_app_src_flush_queued (appsrc, FALSE);
816 g_mutex_unlock (&priv->mutex);
817
818 g_clear_pointer (&callbacks, callbacks_unref);
819
820 G_OBJECT_CLASS (parent_class)->dispose (obj);
821 }
822
823 static void
gst_app_src_finalize(GObject * obj)824 gst_app_src_finalize (GObject * obj)
825 {
826 GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
827 GstAppSrcPrivate *priv = appsrc->priv;
828
829 g_mutex_clear (&priv->mutex);
830 g_cond_clear (&priv->cond);
831 gst_queue_array_free (priv->queue);
832
833 g_free (priv->uri);
834
835 G_OBJECT_CLASS (parent_class)->finalize (obj);
836 }
837
838 static GstCaps *
gst_app_src_internal_get_caps(GstBaseSrc * bsrc,GstCaps * filter)839 gst_app_src_internal_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
840 {
841 GstAppSrc *appsrc = GST_APP_SRC (bsrc);
842 GstCaps *caps;
843
844 GST_OBJECT_LOCK (appsrc);
845 if ((caps = appsrc->priv->current_caps))
846 gst_caps_ref (caps);
847 GST_OBJECT_UNLOCK (appsrc);
848
849 if (filter) {
850 if (caps) {
851 GstCaps *intersection =
852 gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
853 gst_caps_unref (caps);
854 caps = intersection;
855 } else {
856 caps = gst_caps_ref (filter);
857 }
858 }
859
860 GST_DEBUG_OBJECT (appsrc, "caps: %" GST_PTR_FORMAT, caps);
861 return caps;
862 }
863
864 static void
gst_app_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)865 gst_app_src_set_property (GObject * object, guint prop_id,
866 const GValue * value, GParamSpec * pspec)
867 {
868 GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
869 GstAppSrcPrivate *priv = appsrc->priv;
870
871 switch (prop_id) {
872 case PROP_CAPS:
873 gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
874 break;
875 case PROP_SIZE:
876 gst_app_src_set_size (appsrc, g_value_get_int64 (value));
877 break;
878 case PROP_STREAM_TYPE:
879 gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
880 break;
881 case PROP_MAX_BYTES:
882 gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
883 break;
884 case PROP_MAX_BUFFERS:
885 gst_app_src_set_max_buffers (appsrc, g_value_get_uint64 (value));
886 break;
887 case PROP_MAX_TIME:
888 gst_app_src_set_max_time (appsrc, g_value_get_uint64 (value));
889 break;
890 case PROP_FORMAT:
891 priv->format = g_value_get_enum (value);
892 break;
893 case PROP_BLOCK:
894 priv->block = g_value_get_boolean (value);
895 break;
896 case PROP_IS_LIVE:
897 gst_base_src_set_live (GST_BASE_SRC (appsrc),
898 g_value_get_boolean (value));
899 break;
900 case PROP_MIN_LATENCY:
901 gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
902 FALSE, -1);
903 break;
904 case PROP_MAX_LATENCY:
905 gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
906 g_value_get_int64 (value));
907 break;
908 case PROP_EMIT_SIGNALS:
909 gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
910 break;
911 case PROP_MIN_PERCENT:
912 priv->min_percent = g_value_get_uint (value);
913 break;
914 case PROP_DURATION:
915 gst_app_src_set_duration (appsrc, g_value_get_uint64 (value));
916 break;
917 case PROP_HANDLE_SEGMENT_CHANGE:
918 priv->handle_segment_change = g_value_get_boolean (value);
919 break;
920 case PROP_LEAKY_TYPE:
921 priv->leaky_type = g_value_get_enum (value);
922 break;
923 default:
924 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
925 break;
926 }
927 }
928
929 static void
gst_app_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)930 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
931 GParamSpec * pspec)
932 {
933 GstAppSrc *appsrc = GST_APP_SRC_CAST (object);
934 GstAppSrcPrivate *priv = appsrc->priv;
935
936 switch (prop_id) {
937 case PROP_CAPS:
938 g_value_take_boxed (value, gst_app_src_get_caps (appsrc));
939 break;
940 case PROP_SIZE:
941 g_value_set_int64 (value, gst_app_src_get_size (appsrc));
942 break;
943 case PROP_STREAM_TYPE:
944 g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
945 break;
946 case PROP_MAX_BYTES:
947 g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
948 break;
949 case PROP_MAX_BUFFERS:
950 g_value_set_uint64 (value, gst_app_src_get_max_buffers (appsrc));
951 break;
952 case PROP_MAX_TIME:
953 g_value_set_uint64 (value, gst_app_src_get_max_time (appsrc));
954 break;
955 case PROP_FORMAT:
956 g_value_set_enum (value, priv->format);
957 break;
958 case PROP_BLOCK:
959 g_value_set_boolean (value, priv->block);
960 break;
961 case PROP_IS_LIVE:
962 g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
963 break;
964 case PROP_MIN_LATENCY:
965 {
966 guint64 min = 0;
967
968 gst_app_src_get_latency (appsrc, &min, NULL);
969 g_value_set_int64 (value, min);
970 break;
971 }
972 case PROP_MAX_LATENCY:
973 {
974 guint64 max = 0;
975
976 gst_app_src_get_latency (appsrc, NULL, &max);
977 g_value_set_int64 (value, max);
978 break;
979 }
980 case PROP_EMIT_SIGNALS:
981 g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
982 break;
983 case PROP_MIN_PERCENT:
984 g_value_set_uint (value, priv->min_percent);
985 break;
986 case PROP_CURRENT_LEVEL_BYTES:
987 g_value_set_uint64 (value, gst_app_src_get_current_level_bytes (appsrc));
988 break;
989 case PROP_CURRENT_LEVEL_BUFFERS:
990 g_value_set_uint64 (value,
991 gst_app_src_get_current_level_buffers (appsrc));
992 break;
993 case PROP_CURRENT_LEVEL_TIME:
994 g_value_set_uint64 (value, gst_app_src_get_current_level_time (appsrc));
995 break;
996 case PROP_DURATION:
997 g_value_set_uint64 (value, gst_app_src_get_duration (appsrc));
998 break;
999 case PROP_HANDLE_SEGMENT_CHANGE:
1000 g_value_set_boolean (value, priv->handle_segment_change);
1001 break;
1002 case PROP_LEAKY_TYPE:
1003 g_value_set_enum (value, priv->leaky_type);
1004 break;
1005 default:
1006 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1007 break;
1008 }
1009 }
1010
1011 static gboolean
gst_app_src_send_event(GstElement * element,GstEvent * event)1012 gst_app_src_send_event (GstElement * element, GstEvent * event)
1013 {
1014 GstAppSrc *appsrc = GST_APP_SRC_CAST (element);
1015 GstAppSrcPrivate *priv = appsrc->priv;
1016
1017 switch (GST_EVENT_TYPE (event)) {
1018 case GST_EVENT_FLUSH_STOP:
1019 g_mutex_lock (&priv->mutex);
1020 gst_app_src_flush_queued (appsrc, TRUE);
1021 g_mutex_unlock (&priv->mutex);
1022 break;
1023 default:
1024 if (GST_EVENT_IS_SERIALIZED (event)) {
1025 GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event);
1026 g_mutex_lock (&priv->mutex);
1027 gst_queue_array_push_tail (priv->queue, event);
1028
1029 if ((priv->wait_status & STREAM_WAITING))
1030 g_cond_broadcast (&priv->cond);
1031
1032 g_mutex_unlock (&priv->mutex);
1033 return TRUE;
1034 }
1035 break;
1036 }
1037
1038 return GST_CALL_PARENT_WITH_DEFAULT (GST_ELEMENT_CLASS, send_event, (element,
1039 event), FALSE);
1040 }
1041
1042 static gboolean
gst_app_src_unlock(GstBaseSrc * bsrc)1043 gst_app_src_unlock (GstBaseSrc * bsrc)
1044 {
1045 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1046 GstAppSrcPrivate *priv = appsrc->priv;
1047
1048 g_mutex_lock (&priv->mutex);
1049 GST_DEBUG_OBJECT (appsrc, "unlock start");
1050 priv->flushing = TRUE;
1051 g_cond_broadcast (&priv->cond);
1052 g_mutex_unlock (&priv->mutex);
1053
1054 return TRUE;
1055 }
1056
1057 static gboolean
gst_app_src_unlock_stop(GstBaseSrc * bsrc)1058 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
1059 {
1060 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1061 GstAppSrcPrivate *priv = appsrc->priv;
1062
1063 g_mutex_lock (&priv->mutex);
1064 GST_DEBUG_OBJECT (appsrc, "unlock stop");
1065 priv->flushing = FALSE;
1066 g_cond_broadcast (&priv->cond);
1067 g_mutex_unlock (&priv->mutex);
1068
1069 return TRUE;
1070 }
1071
1072 static gboolean
gst_app_src_start(GstBaseSrc * bsrc)1073 gst_app_src_start (GstBaseSrc * bsrc)
1074 {
1075 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1076 GstAppSrcPrivate *priv = appsrc->priv;
1077
1078 g_mutex_lock (&priv->mutex);
1079 GST_DEBUG_OBJECT (appsrc, "starting");
1080 priv->started = TRUE;
1081 /* set the offset to -1 so that we always do a first seek. This is only used
1082 * in random-access mode. */
1083 priv->offset = -1;
1084 priv->flushing = FALSE;
1085 g_mutex_unlock (&priv->mutex);
1086
1087 gst_base_src_set_format (bsrc, priv->format);
1088 gst_segment_init (&priv->last_segment, priv->format);
1089 gst_segment_init (&priv->current_segment, priv->format);
1090 priv->pending_custom_segment = FALSE;
1091
1092 return TRUE;
1093 }
1094
1095 static gboolean
gst_app_src_stop(GstBaseSrc * bsrc)1096 gst_app_src_stop (GstBaseSrc * bsrc)
1097 {
1098 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1099 GstAppSrcPrivate *priv = appsrc->priv;
1100
1101 g_mutex_lock (&priv->mutex);
1102 GST_DEBUG_OBJECT (appsrc, "stopping");
1103 priv->is_eos = FALSE;
1104 priv->flushing = TRUE;
1105 priv->started = FALSE;
1106 gst_app_src_flush_queued (appsrc, TRUE);
1107 g_cond_broadcast (&priv->cond);
1108 g_mutex_unlock (&priv->mutex);
1109
1110 return TRUE;
1111 }
1112
1113 static gboolean
gst_app_src_is_seekable(GstBaseSrc * src)1114 gst_app_src_is_seekable (GstBaseSrc * src)
1115 {
1116 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1117 GstAppSrcPrivate *priv = appsrc->priv;
1118 gboolean res = FALSE;
1119
1120 switch (priv->stream_type) {
1121 case GST_APP_STREAM_TYPE_STREAM:
1122 break;
1123 case GST_APP_STREAM_TYPE_SEEKABLE:
1124 case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
1125 res = TRUE;
1126 break;
1127 }
1128 return res;
1129 }
1130
1131 static gboolean
gst_app_src_do_get_size(GstBaseSrc * src,guint64 * size)1132 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
1133 {
1134 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1135
1136 *size = gst_app_src_get_size (appsrc);
1137
1138 return TRUE;
1139 }
1140
1141 static gboolean
gst_app_src_query(GstBaseSrc * src,GstQuery * query)1142 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
1143 {
1144 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1145 GstAppSrcPrivate *priv = appsrc->priv;
1146 gboolean res;
1147
1148 switch (GST_QUERY_TYPE (query)) {
1149 case GST_QUERY_LATENCY:
1150 {
1151 GstClockTime min, max;
1152 gboolean live;
1153
1154 /* Query the parent class for the defaults */
1155 res = gst_base_src_query_latency (src, &live, &min, &max);
1156
1157 /* overwrite with our values when we need to */
1158 g_mutex_lock (&priv->mutex);
1159 if (priv->min_latency != -1) {
1160 min = priv->min_latency;
1161 max = priv->max_latency;
1162 }
1163 g_mutex_unlock (&priv->mutex);
1164
1165 gst_query_set_latency (query, live, min, max);
1166 break;
1167 }
1168 case GST_QUERY_SCHEDULING:
1169 {
1170 gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1171 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1172
1173 switch (priv->stream_type) {
1174 case GST_APP_STREAM_TYPE_STREAM:
1175 case GST_APP_STREAM_TYPE_SEEKABLE:
1176 break;
1177 case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
1178 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1179 break;
1180 }
1181 res = TRUE;
1182 break;
1183 }
1184 case GST_QUERY_DURATION:
1185 {
1186 GstFormat format;
1187 gst_query_parse_duration (query, &format, NULL);
1188 if (format == GST_FORMAT_BYTES) {
1189 gst_query_set_duration (query, format, priv->size);
1190 res = TRUE;
1191 } else if (format == GST_FORMAT_TIME) {
1192 if (priv->duration != GST_CLOCK_TIME_NONE) {
1193 gst_query_set_duration (query, format, priv->duration);
1194 res = TRUE;
1195 } else {
1196 res = FALSE;
1197 }
1198 } else {
1199 res = FALSE;
1200 }
1201 break;
1202 }
1203 default:
1204 res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
1205 break;
1206 }
1207
1208 return res;
1209 }
1210
1211 /* will be called in push mode */
1212 static gboolean
gst_app_src_do_seek(GstBaseSrc * src,GstSegment * segment)1213 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1214 {
1215 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
1216 GstAppSrcPrivate *priv = appsrc->priv;
1217 gint64 desired_position;
1218 gboolean res = FALSE;
1219 gboolean emit;
1220 Callbacks *callbacks = NULL;
1221
1222 desired_position = segment->position;
1223
1224 /* no need to try to seek in streaming mode */
1225 if (priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
1226 return TRUE;
1227
1228 GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
1229 desired_position, gst_format_get_name (segment->format));
1230
1231 g_mutex_lock (&priv->mutex);
1232 emit = priv->emit_signals;
1233 if (priv->callbacks)
1234 callbacks = callbacks_ref (priv->callbacks);
1235 g_mutex_unlock (&priv->mutex);
1236
1237 if (callbacks && callbacks->callbacks.seek_data) {
1238 res =
1239 callbacks->callbacks.seek_data (appsrc, desired_position,
1240 callbacks->user_data);
1241 } else if (emit) {
1242 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1243 desired_position, &res);
1244 }
1245
1246 g_clear_pointer (&callbacks, callbacks_unref);
1247
1248 if (res) {
1249 GST_DEBUG_OBJECT (appsrc, "flushing queue");
1250 g_mutex_lock (&priv->mutex);
1251 gst_app_src_flush_queued (appsrc, TRUE);
1252 gst_segment_copy_into (segment, &priv->last_segment);
1253 gst_segment_copy_into (segment, &priv->current_segment);
1254 priv->pending_custom_segment = FALSE;
1255 g_mutex_unlock (&priv->mutex);
1256 priv->is_eos = FALSE;
1257 } else {
1258 GST_WARNING_OBJECT (appsrc, "seek failed");
1259 }
1260
1261 return res;
1262 }
1263
1264 /* must be called with the appsrc mutex */
1265 static gboolean
gst_app_src_emit_seek(GstAppSrc * appsrc,guint64 offset)1266 gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
1267 {
1268 gboolean res = FALSE;
1269 gboolean emit;
1270 GstAppSrcPrivate *priv = appsrc->priv;
1271 Callbacks *callbacks = NULL;
1272
1273 emit = priv->emit_signals;
1274 if (priv->callbacks)
1275 callbacks = callbacks_ref (priv->callbacks);
1276 g_mutex_unlock (&priv->mutex);
1277
1278 GST_DEBUG_OBJECT (appsrc,
1279 "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
1280 priv->offset, offset);
1281
1282 if (callbacks && callbacks->callbacks.seek_data)
1283 res = callbacks->callbacks.seek_data (appsrc, offset, callbacks->user_data);
1284 else if (emit)
1285 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
1286 offset, &res);
1287
1288 g_clear_pointer (&callbacks, callbacks_unref);
1289
1290 g_mutex_lock (&priv->mutex);
1291
1292 return res;
1293 }
1294
1295 /* must be called with the appsrc mutex. After this call things can be
1296 * flushing */
1297 static void
gst_app_src_emit_need_data(GstAppSrc * appsrc,guint size)1298 gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
1299 {
1300 gboolean emit;
1301 GstAppSrcPrivate *priv = appsrc->priv;
1302 Callbacks *callbacks = NULL;
1303
1304 emit = priv->emit_signals;
1305 if (priv->callbacks)
1306 callbacks = callbacks_ref (priv->callbacks);
1307 g_mutex_unlock (&priv->mutex);
1308
1309 /* we have no data, we need some. We fire the signal with the size hint. */
1310 if (callbacks && callbacks->callbacks.need_data)
1311 callbacks->callbacks.need_data (appsrc, size, callbacks->user_data);
1312 else if (emit)
1313 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
1314 NULL);
1315
1316 g_clear_pointer (&callbacks, callbacks_unref);
1317
1318 g_mutex_lock (&priv->mutex);
1319 /* we can be flushing now because we released the lock */
1320 }
1321
1322 /* must be called with the appsrc mutex */
1323 static gboolean
gst_app_src_do_negotiate(GstBaseSrc * basesrc)1324 gst_app_src_do_negotiate (GstBaseSrc * basesrc)
1325 {
1326 GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1327 GstAppSrcPrivate *priv = appsrc->priv;
1328 gboolean result;
1329 GstCaps *caps;
1330
1331 GST_OBJECT_LOCK (basesrc);
1332 caps = priv->current_caps ? gst_caps_ref (priv->current_caps) : NULL;
1333 GST_OBJECT_UNLOCK (basesrc);
1334
1335 /* Avoid deadlock by unlocking mutex
1336 * otherwise we get deadlock between this and stream lock */
1337 g_mutex_unlock (&priv->mutex);
1338 if (caps) {
1339 result = gst_base_src_set_caps (basesrc, caps);
1340 gst_caps_unref (caps);
1341 } else {
1342 result = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc);
1343 }
1344 g_mutex_lock (&priv->mutex);
1345
1346 return result;
1347 }
1348
1349 static gboolean
gst_app_src_negotiate(GstBaseSrc * basesrc)1350 gst_app_src_negotiate (GstBaseSrc * basesrc)
1351 {
1352 GstAppSrc *appsrc = GST_APP_SRC_CAST (basesrc);
1353 GstAppSrcPrivate *priv = appsrc->priv;
1354 gboolean result;
1355
1356 g_mutex_lock (&priv->mutex);
1357 result = gst_app_src_do_negotiate (basesrc);
1358 g_mutex_unlock (&priv->mutex);
1359 return result;
1360 }
1361
1362 /* Update the currently queued bytes/buffers/time information for the item
1363 * that was just removed from the queue.
1364 *
1365 * If update_offset is set, additionally the offset of the source will be
1366 * moved forward accordingly as if that many bytes were output.
1367 */
1368 static void
gst_app_src_update_queued_pop(GstAppSrc * appsrc,GstMiniObject * item,gboolean update_offset)1369 gst_app_src_update_queued_pop (GstAppSrc * appsrc, GstMiniObject * item,
1370 gboolean update_offset)
1371 {
1372 GstAppSrcPrivate *priv = appsrc->priv;
1373 guint buf_size = 0;
1374 guint n_buffers = 0;
1375 GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
1376
1377 if (GST_IS_BUFFER (item)) {
1378 GstBuffer *buf = GST_BUFFER_CAST (item);
1379 buf_size = gst_buffer_get_size (buf);
1380 n_buffers = 1;
1381
1382 end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
1383 if (end_buffer_ts != GST_CLOCK_TIME_NONE
1384 && GST_BUFFER_DURATION_IS_VALID (buf))
1385 end_buffer_ts += GST_BUFFER_DURATION (buf);
1386
1387 GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", buf, buf_size);
1388 } else if (GST_IS_BUFFER_LIST (item)) {
1389 GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
1390 guint i;
1391
1392 n_buffers = gst_buffer_list_length (buffer_list);
1393
1394 for (i = 0; i < n_buffers; i++) {
1395 GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
1396 GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
1397
1398 buf_size += gst_buffer_get_size (tmp);
1399 /* Update to the last buffer's timestamp that is known */
1400 if (ts != GST_CLOCK_TIME_NONE) {
1401 end_buffer_ts = ts;
1402 if (GST_BUFFER_DURATION_IS_VALID (tmp))
1403 end_buffer_ts += GST_BUFFER_DURATION (tmp);
1404 }
1405 }
1406 }
1407
1408 priv->queued_bytes -= buf_size;
1409 priv->queued_buffers -= n_buffers;
1410
1411 /* Update time level if working on a TIME segment */
1412 if ((priv->current_segment.format == GST_FORMAT_TIME
1413 || (priv->current_segment.format == GST_FORMAT_UNDEFINED
1414 && priv->last_segment.format == GST_FORMAT_TIME))
1415 && end_buffer_ts != GST_CLOCK_TIME_NONE) {
1416 const GstSegment *segment =
1417 priv->current_segment.format ==
1418 GST_FORMAT_TIME ? &priv->current_segment : &priv->last_segment;
1419
1420 /* Clip to the current segment boundaries */
1421 if (segment->stop != -1 && end_buffer_ts > segment->stop)
1422 end_buffer_ts = segment->stop;
1423 else if (segment->start > end_buffer_ts)
1424 end_buffer_ts = segment->start;
1425
1426 priv->last_out_running_time =
1427 gst_segment_to_running_time (segment, GST_FORMAT_TIME, end_buffer_ts);
1428
1429 GST_TRACE_OBJECT (appsrc,
1430 "Last in running time %" GST_TIME_FORMAT ", last out running time %"
1431 GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
1432 GST_TIME_ARGS (priv->last_out_running_time));
1433
1434 /* If timestamps on both sides are known, calculate the current
1435 * fill level in time and consider the queue empty if the output
1436 * running time is lower than the input one (i.e. some kind of reset
1437 * has happened).
1438 */
1439 if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
1440 && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
1441 if (priv->last_out_running_time > priv->last_in_running_time) {
1442 priv->queued_time = 0;
1443 } else {
1444 priv->queued_time =
1445 priv->last_in_running_time - priv->last_out_running_time;
1446 }
1447 }
1448 }
1449
1450 GST_DEBUG_OBJECT (appsrc,
1451 "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
1452 " buffers, %" GST_TIME_FORMAT, priv->queued_bytes,
1453 priv->queued_buffers, GST_TIME_ARGS (priv->queued_time));
1454
1455 /* only update the offset when in random_access mode and when requested by
1456 * the caller, i.e. not when just dropping the item */
1457 if (update_offset && priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS)
1458 priv->offset += buf_size;
1459 }
1460
1461 /* Update the currently queued bytes/buffers/time information for the item
1462 * that was just added to the queue.
1463 */
1464 static void
gst_app_src_update_queued_push(GstAppSrc * appsrc,GstMiniObject * item)1465 gst_app_src_update_queued_push (GstAppSrc * appsrc, GstMiniObject * item)
1466 {
1467 GstAppSrcPrivate *priv = appsrc->priv;
1468 GstClockTime start_buffer_ts = GST_CLOCK_TIME_NONE;
1469 GstClockTime end_buffer_ts = GST_CLOCK_TIME_NONE;
1470 guint buf_size = 0;
1471 guint n_buffers = 0;
1472
1473 if (GST_IS_BUFFER (item)) {
1474 GstBuffer *buf = GST_BUFFER_CAST (item);
1475
1476 buf_size = gst_buffer_get_size (buf);
1477 n_buffers = 1;
1478
1479 start_buffer_ts = end_buffer_ts = GST_BUFFER_DTS_OR_PTS (buf);
1480 if (end_buffer_ts != GST_CLOCK_TIME_NONE
1481 && GST_BUFFER_DURATION_IS_VALID (buf))
1482 end_buffer_ts += GST_BUFFER_DURATION (buf);
1483 } else if (GST_IS_BUFFER_LIST (item)) {
1484 GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
1485 guint i;
1486
1487 n_buffers = gst_buffer_list_length (buffer_list);
1488
1489 for (i = 0; i < n_buffers; i++) {
1490 GstBuffer *tmp = gst_buffer_list_get (buffer_list, i);
1491 GstClockTime ts = GST_BUFFER_DTS_OR_PTS (tmp);
1492
1493 buf_size += gst_buffer_get_size (tmp);
1494
1495 if (ts != GST_CLOCK_TIME_NONE) {
1496 if (start_buffer_ts == GST_CLOCK_TIME_NONE)
1497 start_buffer_ts = ts;
1498 end_buffer_ts = ts;
1499 if (GST_BUFFER_DURATION_IS_VALID (tmp))
1500 end_buffer_ts += GST_BUFFER_DURATION (tmp);
1501 }
1502 }
1503 }
1504
1505 priv->queued_bytes += buf_size;
1506 priv->queued_buffers += n_buffers;
1507
1508 /* Update time level if working on a TIME segment */
1509 if (priv->last_segment.format == GST_FORMAT_TIME
1510 && end_buffer_ts != GST_CLOCK_TIME_NONE) {
1511 /* Clip to the last segment boundaries */
1512 if (priv->last_segment.stop != -1
1513 && end_buffer_ts > priv->last_segment.stop)
1514 end_buffer_ts = priv->last_segment.stop;
1515 else if (priv->last_segment.start > end_buffer_ts)
1516 end_buffer_ts = priv->last_segment.start;
1517
1518 priv->last_in_running_time =
1519 gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
1520 end_buffer_ts);
1521
1522 /* If this is the only buffer then we can directly update the queued time
1523 * here. This is especially useful if this was the first buffer because
1524 * otherwise we would have to wait until it is actually unqueued to know
1525 * the queued duration */
1526 if (priv->queued_buffers == 1) {
1527 if (priv->last_segment.stop != -1
1528 && start_buffer_ts > priv->last_segment.stop)
1529 start_buffer_ts = priv->last_segment.stop;
1530 else if (priv->last_segment.start > start_buffer_ts)
1531 start_buffer_ts = priv->last_segment.start;
1532
1533 priv->last_out_running_time =
1534 gst_segment_to_running_time (&priv->last_segment, GST_FORMAT_TIME,
1535 start_buffer_ts);
1536 }
1537
1538 GST_TRACE_OBJECT (appsrc,
1539 "Last in running time %" GST_TIME_FORMAT ", last out running time %"
1540 GST_TIME_FORMAT, GST_TIME_ARGS (priv->last_in_running_time),
1541 GST_TIME_ARGS (priv->last_out_running_time));
1542
1543 if (priv->last_out_running_time != GST_CLOCK_TIME_NONE
1544 && priv->last_in_running_time != GST_CLOCK_TIME_NONE) {
1545 if (priv->last_out_running_time > priv->last_in_running_time) {
1546 priv->queued_time = 0;
1547 } else {
1548 priv->queued_time =
1549 priv->last_in_running_time - priv->last_out_running_time;
1550 }
1551 }
1552 }
1553
1554 GST_DEBUG_OBJECT (appsrc,
1555 "Currently queued: %" G_GUINT64_FORMAT " bytes, %" G_GUINT64_FORMAT
1556 " buffers, %" GST_TIME_FORMAT, priv->queued_bytes, priv->queued_buffers,
1557 GST_TIME_ARGS (priv->queued_time));
1558 }
1559
1560 static GstFlowReturn
gst_app_src_create(GstBaseSrc * bsrc,guint64 offset,guint size,GstBuffer ** buf)1561 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
1562 GstBuffer ** buf)
1563 {
1564 GstAppSrc *appsrc = GST_APP_SRC_CAST (bsrc);
1565 GstAppSrcPrivate *priv = appsrc->priv;
1566 GstFlowReturn ret;
1567
1568 GST_OBJECT_LOCK (appsrc);
1569 if (G_UNLIKELY (priv->size != bsrc->segment.duration &&
1570 bsrc->segment.format == GST_FORMAT_BYTES)) {
1571 GST_DEBUG_OBJECT (appsrc,
1572 "Size changed from %" G_GINT64_FORMAT " to %" G_GINT64_FORMAT,
1573 bsrc->segment.duration, priv->size);
1574 bsrc->segment.duration = priv->size;
1575 GST_OBJECT_UNLOCK (appsrc);
1576
1577 gst_element_post_message (GST_ELEMENT (appsrc),
1578 gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1579 } else if (G_UNLIKELY (priv->duration != bsrc->segment.duration &&
1580 bsrc->segment.format == GST_FORMAT_TIME)) {
1581 GST_DEBUG_OBJECT (appsrc,
1582 "Duration changed from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
1583 GST_TIME_ARGS (bsrc->segment.duration), GST_TIME_ARGS (priv->duration));
1584 bsrc->segment.duration = priv->duration;
1585 GST_OBJECT_UNLOCK (appsrc);
1586
1587 gst_element_post_message (GST_ELEMENT (appsrc),
1588 gst_message_new_duration_changed (GST_OBJECT (appsrc)));
1589 } else {
1590 GST_OBJECT_UNLOCK (appsrc);
1591 }
1592
1593 g_mutex_lock (&priv->mutex);
1594 /* check flushing first */
1595 if (G_UNLIKELY (priv->flushing))
1596 goto flushing;
1597
1598 if (priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
1599 /* if we are dealing with a random-access stream, issue a seek if the offset
1600 * changed. */
1601 if (G_UNLIKELY (priv->offset != offset)) {
1602 gboolean res;
1603
1604 /* do the seek */
1605 res = gst_app_src_emit_seek (appsrc, offset);
1606
1607 if (G_UNLIKELY (!res))
1608 /* failing to seek is fatal */
1609 goto seek_error;
1610
1611 priv->offset = offset;
1612 priv->is_eos = FALSE;
1613 }
1614 }
1615
1616 while (TRUE) {
1617 /* Our lock may have been release to push events or caps, check out
1618 * state in case we are now flushing. */
1619 if (G_UNLIKELY (priv->flushing))
1620 goto flushing;
1621
1622 /* return data as long as we have some */
1623 if (!gst_queue_array_is_empty (priv->queue)) {
1624 GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
1625
1626 if (GST_IS_CAPS (obj)) {
1627 GstCaps *next_caps = GST_CAPS (obj);
1628 gboolean caps_changed = TRUE;
1629
1630 if (next_caps && priv->current_caps)
1631 caps_changed = !gst_caps_is_equal (next_caps, priv->current_caps);
1632 else
1633 caps_changed = (next_caps != priv->current_caps);
1634
1635 gst_caps_replace (&priv->current_caps, next_caps);
1636
1637 if (next_caps) {
1638 gst_caps_unref (next_caps);
1639 }
1640
1641 if (caps_changed)
1642 gst_app_src_do_negotiate (bsrc);
1643
1644 /* Continue checks caps and queue */
1645 continue;
1646 }
1647
1648 if (GST_IS_BUFFER (obj)) {
1649 GstBuffer *buffer = GST_BUFFER (obj);
1650
1651 /* Mark the buffer as DISCONT if we previously dropped a buffer
1652 * instead of outputting it */
1653 if (priv->need_discont_downstream) {
1654 buffer = gst_buffer_make_writable (buffer);
1655 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1656 priv->need_discont_downstream = FALSE;
1657 }
1658
1659 *buf = buffer;
1660 } else if (GST_IS_BUFFER_LIST (obj)) {
1661 GstBufferList *buffer_list;
1662
1663 buffer_list = GST_BUFFER_LIST (obj);
1664
1665 /* Mark the first buffer of the buffer list as DISCONT if we
1666 * previously dropped a buffer instead of outputting it */
1667 if (priv->need_discont_downstream) {
1668 GstBuffer *buffer;
1669
1670 buffer_list = gst_buffer_list_make_writable (buffer_list);
1671 buffer = gst_buffer_list_get_writable (buffer_list, 0);
1672 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1673 priv->need_discont_downstream = FALSE;
1674 }
1675
1676 gst_base_src_submit_buffer_list (bsrc, buffer_list);
1677 *buf = NULL;
1678 } else if (GST_IS_EVENT (obj)) {
1679 GstEvent *event = GST_EVENT (obj);
1680
1681 GST_DEBUG_OBJECT (appsrc, "pop event %" GST_PTR_FORMAT, event);
1682
1683 if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1684 const GstSegment *segment = NULL;
1685
1686 gst_event_parse_segment (event, &segment);
1687 g_assert (segment != NULL);
1688
1689 if (!gst_segment_is_equal (&priv->current_segment, segment)) {
1690 GST_DEBUG_OBJECT (appsrc,
1691 "Update new segment %" GST_PTR_FORMAT, event);
1692 if (!gst_base_src_new_segment (bsrc, segment)) {
1693 GST_ERROR_OBJECT (appsrc,
1694 "Couldn't set new segment %" GST_PTR_FORMAT, event);
1695 gst_event_unref (event);
1696 goto invalid_segment;
1697 }
1698 gst_segment_copy_into (segment, &priv->current_segment);
1699 }
1700
1701 gst_event_unref (event);
1702 } else {
1703 GstEvent *seg_event;
1704 GstSegment last_segment = priv->last_segment;
1705
1706 /* event is serialized with the buffers flow */
1707
1708 /* We are about to push an event, release out lock */
1709 g_mutex_unlock (&priv->mutex);
1710
1711 seg_event =
1712 gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc),
1713 GST_EVENT_SEGMENT, 0);
1714 if (!seg_event) {
1715 seg_event = gst_event_new_segment (&last_segment);
1716
1717 GST_DEBUG_OBJECT (appsrc,
1718 "received serialized event before first buffer, push default segment %"
1719 GST_PTR_FORMAT, seg_event);
1720
1721 gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), seg_event);
1722 } else {
1723 gst_event_unref (seg_event);
1724 }
1725
1726 gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event);
1727
1728 g_mutex_lock (&priv->mutex);
1729 }
1730 continue;
1731 } else {
1732 g_assert_not_reached ();
1733 }
1734
1735 gst_app_src_update_queued_pop (appsrc, obj, TRUE);
1736
1737 /* signal that we removed an item */
1738 if ((priv->wait_status & APP_WAITING))
1739 g_cond_broadcast (&priv->cond);
1740
1741 /* see if we go lower than the min-percent */
1742 if (priv->min_percent) {
1743 if ((priv->max_bytes
1744 && priv->queued_bytes * 100 / priv->max_bytes <=
1745 priv->min_percent) || (priv->max_buffers
1746 && priv->queued_buffers * 100 / priv->max_buffers <=
1747 priv->min_percent) || (priv->max_time
1748 && priv->queued_time * 100 / priv->max_time <=
1749 priv->min_percent)) {
1750 /* ignore flushing state, we got a buffer and we will return it now.
1751 * Errors will be handled in the next round */
1752 gst_app_src_emit_need_data (appsrc, size);
1753 }
1754 }
1755 ret = GST_FLOW_OK;
1756 break;
1757 } else {
1758 gst_app_src_emit_need_data (appsrc, size);
1759
1760 /* we can be flushing now because we released the lock above */
1761 if (G_UNLIKELY (priv->flushing))
1762 goto flushing;
1763
1764 /* if we have a buffer now, continue the loop and try to return it. In
1765 * random-access mode (where a buffer is normally pushed in the above
1766 * signal) we can still be empty because the pushed buffer got flushed or
1767 * when the application pushes the requested buffer later, we support both
1768 * possibilities. */
1769 if (!gst_queue_array_is_empty (priv->queue))
1770 continue;
1771
1772 /* no buffer yet, maybe we are EOS, if not, block for more data. */
1773 }
1774
1775 /* check EOS */
1776 if (G_UNLIKELY (priv->is_eos))
1777 goto eos;
1778
1779 /* nothing to return, wait a while for new data or flushing. */
1780 priv->wait_status |= STREAM_WAITING;
1781 g_cond_wait (&priv->cond, &priv->mutex);
1782 priv->wait_status &= ~STREAM_WAITING;
1783 }
1784 g_mutex_unlock (&priv->mutex);
1785 return ret;
1786
1787 /* ERRORS */
1788 flushing:
1789 {
1790 GST_DEBUG_OBJECT (appsrc, "we are flushing");
1791 g_mutex_unlock (&priv->mutex);
1792 return GST_FLOW_FLUSHING;
1793 }
1794 eos:
1795 {
1796 GST_DEBUG_OBJECT (appsrc, "we are EOS");
1797 g_mutex_unlock (&priv->mutex);
1798 return GST_FLOW_EOS;
1799 }
1800 seek_error:
1801 {
1802 g_mutex_unlock (&priv->mutex);
1803 GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
1804 GST_ERROR_SYSTEM);
1805 return GST_FLOW_ERROR;
1806 }
1807
1808 invalid_segment:
1809 {
1810 g_mutex_unlock (&priv->mutex);
1811 GST_ELEMENT_ERROR (appsrc, LIBRARY, SETTINGS,
1812 (NULL), ("Failed to configure the provided input segment."));
1813 return GST_FLOW_ERROR;
1814 }
1815 }
1816
1817 /* external API */
1818
1819 /**
1820 * gst_app_src_set_caps:
1821 * @appsrc: a #GstAppSrc
1822 * @caps: (nullable): caps to set
1823 *
1824 * Set the capabilities on the appsrc element. This function takes
1825 * a copy of the caps structure. After calling this method, the source will
1826 * only produce caps that match @caps. @caps must be fixed and the caps on the
1827 * buffers must match the caps or left NULL.
1828 */
1829 void
gst_app_src_set_caps(GstAppSrc * appsrc,const GstCaps * caps)1830 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
1831 {
1832 GstAppSrcPrivate *priv;
1833 gboolean caps_changed;
1834
1835 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1836
1837 priv = appsrc->priv;
1838
1839 g_mutex_lock (&priv->mutex);
1840
1841 GST_OBJECT_LOCK (appsrc);
1842 if (caps && priv->last_caps)
1843 caps_changed = !gst_caps_is_equal (caps, priv->last_caps);
1844 else
1845 caps_changed = (caps != priv->last_caps);
1846
1847 if (caps_changed) {
1848 GstCaps *new_caps;
1849 gpointer t;
1850
1851 new_caps = caps ? gst_caps_copy (caps) : NULL;
1852 GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
1853
1854 while ((t = gst_queue_array_peek_tail (priv->queue)) && GST_IS_CAPS (t)) {
1855 gst_caps_unref (gst_queue_array_pop_tail (priv->queue));
1856 }
1857 gst_queue_array_push_tail (priv->queue, new_caps);
1858 gst_caps_replace (&priv->last_caps, new_caps);
1859
1860 if ((priv->wait_status & STREAM_WAITING))
1861 g_cond_broadcast (&priv->cond);
1862 }
1863
1864 GST_OBJECT_UNLOCK (appsrc);
1865
1866 g_mutex_unlock (&priv->mutex);
1867 }
1868
1869 /**
1870 * gst_app_src_get_caps:
1871 * @appsrc: a #GstAppSrc
1872 *
1873 * Get the configured caps on @appsrc.
1874 *
1875 * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
1876 */
1877 GstCaps *
gst_app_src_get_caps(GstAppSrc * appsrc)1878 gst_app_src_get_caps (GstAppSrc * appsrc)
1879 {
1880
1881 GstCaps *caps;
1882
1883 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
1884
1885 GST_OBJECT_LOCK (appsrc);
1886 if ((caps = appsrc->priv->last_caps))
1887 gst_caps_ref (caps);
1888 GST_OBJECT_UNLOCK (appsrc);
1889
1890 return caps;
1891
1892 }
1893
1894 /**
1895 * gst_app_src_set_size:
1896 * @appsrc: a #GstAppSrc
1897 * @size: the size to set
1898 *
1899 * Set the size of the stream in bytes. A value of -1 means that the size is
1900 * not known.
1901 */
1902 void
gst_app_src_set_size(GstAppSrc * appsrc,gint64 size)1903 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
1904 {
1905 GstAppSrcPrivate *priv;
1906
1907 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1908
1909 priv = appsrc->priv;
1910
1911 GST_OBJECT_LOCK (appsrc);
1912 GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
1913 priv->size = size;
1914 GST_OBJECT_UNLOCK (appsrc);
1915 }
1916
1917 /**
1918 * gst_app_src_get_size:
1919 * @appsrc: a #GstAppSrc
1920 *
1921 * Get the size of the stream in bytes. A value of -1 means that the size is
1922 * not known.
1923 *
1924 * Returns: the size of the stream previously set with gst_app_src_set_size();
1925 */
1926 gint64
gst_app_src_get_size(GstAppSrc * appsrc)1927 gst_app_src_get_size (GstAppSrc * appsrc)
1928 {
1929 gint64 size;
1930 GstAppSrcPrivate *priv;
1931
1932 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
1933
1934 priv = appsrc->priv;
1935
1936 GST_OBJECT_LOCK (appsrc);
1937 size = priv->size;
1938 GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
1939 GST_OBJECT_UNLOCK (appsrc);
1940
1941 return size;
1942 }
1943
1944 /**
1945 * gst_app_src_set_duration:
1946 * @appsrc: a #GstAppSrc
1947 * @duration: the duration to set
1948 *
1949 * Set the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
1950 * not known.
1951 *
1952 * Since: 1.10
1953 */
1954 void
gst_app_src_set_duration(GstAppSrc * appsrc,GstClockTime duration)1955 gst_app_src_set_duration (GstAppSrc * appsrc, GstClockTime duration)
1956 {
1957 GstAppSrcPrivate *priv;
1958
1959 g_return_if_fail (GST_IS_APP_SRC (appsrc));
1960
1961 priv = appsrc->priv;
1962
1963 GST_OBJECT_LOCK (appsrc);
1964 GST_DEBUG_OBJECT (appsrc, "setting duration of %" GST_TIME_FORMAT,
1965 GST_TIME_ARGS (duration));
1966 priv->duration = duration;
1967 GST_OBJECT_UNLOCK (appsrc);
1968 }
1969
1970 /**
1971 * gst_app_src_get_duration:
1972 * @appsrc: a #GstAppSrc
1973 *
1974 * Get the duration of the stream in nanoseconds. A value of GST_CLOCK_TIME_NONE means that the duration is
1975 * not known.
1976 *
1977 * Returns: the duration of the stream previously set with gst_app_src_set_duration();
1978 *
1979 * Since: 1.10
1980 */
1981 GstClockTime
gst_app_src_get_duration(GstAppSrc * appsrc)1982 gst_app_src_get_duration (GstAppSrc * appsrc)
1983 {
1984 GstClockTime duration;
1985 GstAppSrcPrivate *priv;
1986
1987 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
1988
1989 priv = appsrc->priv;
1990
1991 GST_OBJECT_LOCK (appsrc);
1992 duration = priv->duration;
1993 GST_DEBUG_OBJECT (appsrc, "getting duration of %" GST_TIME_FORMAT,
1994 GST_TIME_ARGS (duration));
1995 GST_OBJECT_UNLOCK (appsrc);
1996
1997 return duration;
1998 }
1999
2000 /**
2001 * gst_app_src_set_stream_type:
2002 * @appsrc: a #GstAppSrc
2003 * @type: the new state
2004 *
2005 * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
2006 * be connected to.
2007 *
2008 * A stream_type stream
2009 */
2010 void
gst_app_src_set_stream_type(GstAppSrc * appsrc,GstAppStreamType type)2011 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
2012 {
2013 GstAppSrcPrivate *priv;
2014
2015 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2016
2017 priv = appsrc->priv;
2018
2019 GST_OBJECT_LOCK (appsrc);
2020 GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
2021 priv->stream_type = type;
2022 GST_OBJECT_UNLOCK (appsrc);
2023 }
2024
2025 /**
2026 * gst_app_src_get_stream_type:
2027 * @appsrc: a #GstAppSrc
2028 *
2029 * Get the stream type. Control the stream type of @appsrc
2030 * with gst_app_src_set_stream_type().
2031 *
2032 * Returns: the stream type.
2033 */
2034 GstAppStreamType
gst_app_src_get_stream_type(GstAppSrc * appsrc)2035 gst_app_src_get_stream_type (GstAppSrc * appsrc)
2036 {
2037 gboolean stream_type;
2038 GstAppSrcPrivate *priv;
2039
2040 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
2041
2042 priv = appsrc->priv;
2043
2044 GST_OBJECT_LOCK (appsrc);
2045 stream_type = priv->stream_type;
2046 GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
2047 GST_OBJECT_UNLOCK (appsrc);
2048
2049 return stream_type;
2050 }
2051
2052 /**
2053 * gst_app_src_set_max_bytes:
2054 * @appsrc: a #GstAppSrc
2055 * @max: the maximum number of bytes to queue
2056 *
2057 * Set the maximum amount of bytes that can be queued in @appsrc.
2058 * After the maximum amount of bytes are queued, @appsrc will emit the
2059 * "enough-data" signal.
2060 */
2061 void
gst_app_src_set_max_bytes(GstAppSrc * appsrc,guint64 max)2062 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
2063 {
2064 GstAppSrcPrivate *priv;
2065
2066 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2067
2068 priv = appsrc->priv;
2069
2070 g_mutex_lock (&priv->mutex);
2071 if (max != priv->max_bytes) {
2072 GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
2073 priv->max_bytes = max;
2074 /* signal the change */
2075 g_cond_broadcast (&priv->cond);
2076 }
2077 g_mutex_unlock (&priv->mutex);
2078 }
2079
2080 /**
2081 * gst_app_src_get_max_bytes:
2082 * @appsrc: a #GstAppSrc
2083 *
2084 * Get the maximum amount of bytes that can be queued in @appsrc.
2085 *
2086 * Returns: The maximum amount of bytes that can be queued.
2087 */
2088 guint64
gst_app_src_get_max_bytes(GstAppSrc * appsrc)2089 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
2090 {
2091 guint64 result;
2092 GstAppSrcPrivate *priv;
2093
2094 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2095
2096 priv = appsrc->priv;
2097
2098 g_mutex_lock (&priv->mutex);
2099 result = priv->max_bytes;
2100 GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
2101 g_mutex_unlock (&priv->mutex);
2102
2103 return result;
2104 }
2105
2106 /**
2107 * gst_app_src_get_current_level_bytes:
2108 * @appsrc: a #GstAppSrc
2109 *
2110 * Get the number of currently queued bytes inside @appsrc.
2111 *
2112 * Returns: The number of currently queued bytes.
2113 *
2114 * Since: 1.2
2115 */
2116 guint64
gst_app_src_get_current_level_bytes(GstAppSrc * appsrc)2117 gst_app_src_get_current_level_bytes (GstAppSrc * appsrc)
2118 {
2119 guint64 queued;
2120 GstAppSrcPrivate *priv;
2121
2122 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
2123
2124 priv = appsrc->priv;
2125
2126 GST_OBJECT_LOCK (appsrc);
2127 queued = priv->queued_bytes;
2128 GST_DEBUG_OBJECT (appsrc, "current level bytes is %" G_GUINT64_FORMAT,
2129 queued);
2130 GST_OBJECT_UNLOCK (appsrc);
2131
2132 return queued;
2133 }
2134
2135 /**
2136 * gst_app_src_set_max_buffers:
2137 * @appsrc: a #GstAppSrc
2138 * @max: the maximum number of buffers to queue
2139 *
2140 * Set the maximum amount of buffers that can be queued in @appsrc.
2141 * After the maximum amount of buffers are queued, @appsrc will emit the
2142 * "enough-data" signal.
2143 *
2144 * Since: 1.20
2145 */
2146 void
gst_app_src_set_max_buffers(GstAppSrc * appsrc,guint64 max)2147 gst_app_src_set_max_buffers (GstAppSrc * appsrc, guint64 max)
2148 {
2149 GstAppSrcPrivate *priv;
2150
2151 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2152
2153 priv = appsrc->priv;
2154
2155 g_mutex_lock (&priv->mutex);
2156 if (max != priv->max_buffers) {
2157 GST_DEBUG_OBJECT (appsrc, "setting max-buffers to %" G_GUINT64_FORMAT, max);
2158 priv->max_buffers = max;
2159 /* signal the change */
2160 g_cond_broadcast (&priv->cond);
2161 }
2162 g_mutex_unlock (&priv->mutex);
2163 }
2164
2165 /**
2166 * gst_app_src_get_max_buffers:
2167 * @appsrc: a #GstAppSrc
2168 *
2169 * Get the maximum amount of buffers that can be queued in @appsrc.
2170 *
2171 * Returns: The maximum amount of buffers that can be queued.
2172 *
2173 * Since: 1.20
2174 */
2175 guint64
gst_app_src_get_max_buffers(GstAppSrc * appsrc)2176 gst_app_src_get_max_buffers (GstAppSrc * appsrc)
2177 {
2178 guint64 result;
2179 GstAppSrcPrivate *priv;
2180
2181 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2182
2183 priv = appsrc->priv;
2184
2185 g_mutex_lock (&priv->mutex);
2186 result = priv->max_buffers;
2187 GST_DEBUG_OBJECT (appsrc, "getting max-buffers of %" G_GUINT64_FORMAT,
2188 result);
2189 g_mutex_unlock (&priv->mutex);
2190
2191 return result;
2192 }
2193
2194 /**
2195 * gst_app_src_get_current_level_buffers:
2196 * @appsrc: a #GstAppSrc
2197 *
2198 * Get the number of currently queued buffers inside @appsrc.
2199 *
2200 * Returns: The number of currently queued buffers.
2201 *
2202 * Since: 1.20
2203 */
2204 guint64
gst_app_src_get_current_level_buffers(GstAppSrc * appsrc)2205 gst_app_src_get_current_level_buffers (GstAppSrc * appsrc)
2206 {
2207 guint64 queued;
2208 GstAppSrcPrivate *priv;
2209
2210 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
2211
2212 priv = appsrc->priv;
2213
2214 GST_OBJECT_LOCK (appsrc);
2215 queued = priv->queued_buffers;
2216 GST_DEBUG_OBJECT (appsrc, "current level buffers is %" G_GUINT64_FORMAT,
2217 queued);
2218 GST_OBJECT_UNLOCK (appsrc);
2219
2220 return queued;
2221 }
2222
2223 /**
2224 * gst_app_src_set_max_time:
2225 * @appsrc: a #GstAppSrc
2226 * @max: the maximum amonut of time to queue
2227 *
2228 * Set the maximum amount of time that can be queued in @appsrc.
2229 * After the maximum amount of time are queued, @appsrc will emit the
2230 * "enough-data" signal.
2231 *
2232 * Since: 1.20
2233 */
2234 void
gst_app_src_set_max_time(GstAppSrc * appsrc,GstClockTime max)2235 gst_app_src_set_max_time (GstAppSrc * appsrc, GstClockTime max)
2236 {
2237 GstAppSrcPrivate *priv;
2238
2239 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2240
2241 priv = appsrc->priv;
2242
2243 g_mutex_lock (&priv->mutex);
2244 if (max != priv->max_time) {
2245 GST_DEBUG_OBJECT (appsrc, "setting max-time to %" GST_TIME_FORMAT,
2246 GST_TIME_ARGS (max));
2247 priv->max_time = max;
2248 /* signal the change */
2249 g_cond_broadcast (&priv->cond);
2250 }
2251 g_mutex_unlock (&priv->mutex);
2252 }
2253
2254 /**
2255 * gst_app_src_get_max_time:
2256 * @appsrc: a #GstAppSrc
2257 *
2258 * Get the maximum amount of time that can be queued in @appsrc.
2259 *
2260 * Returns: The maximum amount of time that can be queued.
2261 *
2262 * Since: 1.20
2263 */
2264 GstClockTime
gst_app_src_get_max_time(GstAppSrc * appsrc)2265 gst_app_src_get_max_time (GstAppSrc * appsrc)
2266 {
2267 GstClockTime result;
2268 GstAppSrcPrivate *priv;
2269
2270 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
2271
2272 priv = appsrc->priv;
2273
2274 g_mutex_lock (&priv->mutex);
2275 result = priv->max_time;
2276 GST_DEBUG_OBJECT (appsrc, "getting max-time of %" GST_TIME_FORMAT,
2277 GST_TIME_ARGS (result));
2278 g_mutex_unlock (&priv->mutex);
2279
2280 return result;
2281 }
2282
2283 /**
2284 * gst_app_src_get_current_level_time:
2285 * @appsrc: a #GstAppSrc
2286 *
2287 * Get the amount of currently queued time inside @appsrc.
2288 *
2289 * Returns: The amount of currently queued time.
2290 *
2291 * Since: 1.20
2292 */
2293 GstClockTime
gst_app_src_get_current_level_time(GstAppSrc * appsrc)2294 gst_app_src_get_current_level_time (GstAppSrc * appsrc)
2295 {
2296 gint64 queued;
2297 GstAppSrcPrivate *priv;
2298
2299 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_CLOCK_TIME_NONE);
2300
2301 priv = appsrc->priv;
2302
2303 GST_OBJECT_LOCK (appsrc);
2304 queued = priv->queued_time;
2305 GST_DEBUG_OBJECT (appsrc, "current level time is %" GST_TIME_FORMAT,
2306 GST_TIME_ARGS (queued));
2307 GST_OBJECT_UNLOCK (appsrc);
2308
2309 return queued;
2310 }
2311
2312 static void
gst_app_src_set_latencies(GstAppSrc * appsrc,gboolean do_min,guint64 min,gboolean do_max,guint64 max)2313 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
2314 gboolean do_max, guint64 max)
2315 {
2316 GstAppSrcPrivate *priv = appsrc->priv;
2317 gboolean changed = FALSE;
2318
2319 g_mutex_lock (&priv->mutex);
2320 if (do_min && priv->min_latency != min) {
2321 priv->min_latency = min;
2322 changed = TRUE;
2323 }
2324 if (do_max && priv->max_latency != max) {
2325 priv->max_latency = max;
2326 changed = TRUE;
2327 }
2328 g_mutex_unlock (&priv->mutex);
2329
2330 if (changed) {
2331 GST_DEBUG_OBJECT (appsrc, "posting latency changed");
2332 gst_element_post_message (GST_ELEMENT_CAST (appsrc),
2333 gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
2334 }
2335 }
2336
2337 /**
2338 * gst_app_src_set_leaky_type:
2339 * @appsrc: a #GstAppSrc
2340 * @leaky: the #GstAppLeakyType
2341 *
2342 * When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsrc
2343 * will drop any buffers that are pushed into it once its internal queue is
2344 * full. The selected type defines whether to drop the oldest or new
2345 * buffers.
2346 *
2347 * Since: 1.20
2348 */
2349 void
gst_app_src_set_leaky_type(GstAppSrc * appsrc,GstAppLeakyType leaky)2350 gst_app_src_set_leaky_type (GstAppSrc * appsrc, GstAppLeakyType leaky)
2351 {
2352 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2353
2354 appsrc->priv->leaky_type = leaky;
2355 }
2356
2357 /**
2358 * gst_app_src_get_leaky_type:
2359 * @appsrc: a #GstAppSrc
2360 *
2361 * Returns the currently set #GstAppLeakyType. See gst_app_src_set_leaky_type()
2362 * for more details.
2363 *
2364 * Returns: The currently set #GstAppLeakyType.
2365 *
2366 * Since: 1.20
2367 */
2368 GstAppLeakyType
gst_app_src_get_leaky_type(GstAppSrc * appsrc)2369 gst_app_src_get_leaky_type (GstAppSrc * appsrc)
2370 {
2371 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_APP_LEAKY_TYPE_NONE);
2372
2373 return appsrc->priv->leaky_type;
2374 }
2375
2376 /**
2377 * gst_app_src_set_latency:
2378 * @appsrc: a #GstAppSrc
2379 * @min: the min latency
2380 * @max: the max latency
2381 *
2382 * Configure the @min and @max latency in @src. If @min is set to -1, the
2383 * default latency calculations for pseudo-live sources will be used.
2384 */
2385 void
gst_app_src_set_latency(GstAppSrc * appsrc,guint64 min,guint64 max)2386 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
2387 {
2388 gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
2389 }
2390
2391 /**
2392 * gst_app_src_get_latency:
2393 * @appsrc: a #GstAppSrc
2394 * @min: (out): the min latency
2395 * @max: (out): the max latency
2396 *
2397 * Retrieve the min and max latencies in @min and @max respectively.
2398 */
2399 void
gst_app_src_get_latency(GstAppSrc * appsrc,guint64 * min,guint64 * max)2400 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
2401 {
2402 GstAppSrcPrivate *priv;
2403
2404 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2405
2406 priv = appsrc->priv;
2407
2408 g_mutex_lock (&priv->mutex);
2409 if (min)
2410 *min = priv->min_latency;
2411 if (max)
2412 *max = priv->max_latency;
2413 g_mutex_unlock (&priv->mutex);
2414 }
2415
2416 /**
2417 * gst_app_src_set_emit_signals:
2418 * @appsrc: a #GstAppSrc
2419 * @emit: the new state
2420 *
2421 * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
2422 * by default disabled because signal emission is expensive and unneeded when
2423 * the application prefers to operate in pull mode.
2424 */
2425 void
gst_app_src_set_emit_signals(GstAppSrc * appsrc,gboolean emit)2426 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
2427 {
2428 GstAppSrcPrivate *priv;
2429
2430 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2431
2432 priv = appsrc->priv;
2433
2434 g_mutex_lock (&priv->mutex);
2435 priv->emit_signals = emit;
2436 g_mutex_unlock (&priv->mutex);
2437 }
2438
2439 /**
2440 * gst_app_src_get_emit_signals:
2441 * @appsrc: a #GstAppSrc
2442 *
2443 * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
2444 *
2445 * Returns: %TRUE if @appsrc is emitting the "new-preroll" and "new-buffer"
2446 * signals.
2447 */
2448 gboolean
gst_app_src_get_emit_signals(GstAppSrc * appsrc)2449 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
2450 {
2451 gboolean result;
2452 GstAppSrcPrivate *priv;
2453
2454 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
2455
2456 priv = appsrc->priv;
2457
2458 g_mutex_lock (&priv->mutex);
2459 result = priv->emit_signals;
2460 g_mutex_unlock (&priv->mutex);
2461
2462 return result;
2463 }
2464
2465 static GstFlowReturn
gst_app_src_push_internal(GstAppSrc * appsrc,GstBuffer * buffer,GstBufferList * buflist,gboolean steal_ref)2466 gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
2467 GstBufferList * buflist, gboolean steal_ref)
2468 {
2469 gboolean first = TRUE;
2470 GstAppSrcPrivate *priv;
2471
2472 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
2473
2474 priv = appsrc->priv;
2475
2476 if (buffer != NULL)
2477 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2478 else
2479 g_return_val_if_fail (GST_IS_BUFFER_LIST (buflist), GST_FLOW_ERROR);
2480
2481 if (buflist != NULL) {
2482 if (gst_buffer_list_length (buflist) == 0)
2483 return GST_FLOW_OK;
2484
2485 buffer = gst_buffer_list_get (buflist, 0);
2486 }
2487
2488 if (GST_BUFFER_DTS (buffer) == GST_CLOCK_TIME_NONE &&
2489 GST_BUFFER_PTS (buffer) == GST_CLOCK_TIME_NONE &&
2490 gst_base_src_get_do_timestamp (GST_BASE_SRC_CAST (appsrc))) {
2491 GstClock *clock;
2492
2493 clock = gst_element_get_clock (GST_ELEMENT_CAST (appsrc));
2494 if (clock) {
2495 GstClockTime now;
2496 GstClockTime base_time =
2497 gst_element_get_base_time (GST_ELEMENT_CAST (appsrc));
2498
2499 now = gst_clock_get_time (clock);
2500 if (now > base_time)
2501 now -= base_time;
2502 else
2503 now = 0;
2504 gst_object_unref (clock);
2505
2506 if (buflist == NULL) {
2507 if (!steal_ref) {
2508 buffer = gst_buffer_copy (buffer);
2509 steal_ref = TRUE;
2510 } else {
2511 buffer = gst_buffer_make_writable (buffer);
2512 }
2513 } else {
2514 if (!steal_ref) {
2515 buflist = gst_buffer_list_copy (buflist);
2516 steal_ref = TRUE;
2517 } else {
2518 buflist = gst_buffer_list_make_writable (buflist);
2519 }
2520 buffer = gst_buffer_list_get_writable (buflist, 0);
2521 }
2522
2523 GST_BUFFER_PTS (buffer) = now;
2524 GST_BUFFER_DTS (buffer) = now;
2525 } else {
2526 GST_WARNING_OBJECT (appsrc,
2527 "do-timestamp=TRUE but buffers are provided before "
2528 "reaching the PLAYING state and having a clock. Timestamps will "
2529 "not be accurate!");
2530 }
2531 }
2532
2533 g_mutex_lock (&priv->mutex);
2534
2535 while (TRUE) {
2536 /* can't accept buffers when we are flushing or EOS */
2537 if (priv->flushing)
2538 goto flushing;
2539
2540 if (priv->is_eos)
2541 goto eos;
2542
2543 if ((priv->max_bytes && priv->queued_bytes >= priv->max_bytes) ||
2544 (priv->max_buffers && priv->queued_buffers >= priv->max_buffers) ||
2545 (priv->max_time && priv->queued_time >= priv->max_time)) {
2546 GST_DEBUG_OBJECT (appsrc,
2547 "queue filled (queued %" G_GUINT64_FORMAT " bytes, max %"
2548 G_GUINT64_FORMAT " bytes, " "queued %" G_GUINT64_FORMAT
2549 " buffers, max %" G_GUINT64_FORMAT " buffers, " "queued %"
2550 GST_TIME_FORMAT " time, max %" GST_TIME_FORMAT " time)",
2551 priv->queued_bytes, priv->max_bytes, priv->queued_buffers,
2552 priv->max_buffers, GST_TIME_ARGS (priv->queued_time),
2553 GST_TIME_ARGS (priv->max_time));
2554
2555 if (first) {
2556 Callbacks *callbacks = NULL;
2557 gboolean emit;
2558
2559 emit = priv->emit_signals;
2560 if (priv->callbacks)
2561 callbacks = callbacks_ref (priv->callbacks);
2562 /* only signal on the first push */
2563 g_mutex_unlock (&priv->mutex);
2564
2565 if (callbacks && callbacks->callbacks.enough_data)
2566 callbacks->callbacks.enough_data (appsrc, callbacks->user_data);
2567 else if (emit)
2568 g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
2569 NULL);
2570
2571 g_clear_pointer (&callbacks, callbacks_unref);
2572
2573 g_mutex_lock (&priv->mutex);
2574 }
2575
2576 if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) {
2577 priv->need_discont_upstream = TRUE;
2578 goto dropped;
2579 } else if (priv->leaky_type == GST_APP_LEAKY_TYPE_DOWNSTREAM) {
2580 guint i, length = gst_queue_array_get_length (priv->queue);
2581 GstMiniObject *item = NULL;
2582
2583 /* Find the oldest buffer or buffer list and drop it, then update the
2584 * limits. Dropping one is sufficient to go below the limits again.
2585 */
2586 for (i = 0; i < length; i++) {
2587 item = gst_queue_array_peek_nth (priv->queue, i);
2588 if (GST_IS_BUFFER (item) || GST_IS_BUFFER_LIST (item)) {
2589 gst_queue_array_drop_element (priv->queue, i);
2590 break;
2591 }
2592 /* To not accidentally have an event after the loop */
2593 item = NULL;
2594 }
2595
2596 if (!item) {
2597 GST_FIXME_OBJECT (appsrc,
2598 "No buffer or buffer list queued but queue is full");
2599 /* This shouldn't really happen but in this case we can't really do
2600 * anything apart from accepting the buffer / bufferlist */
2601 break;
2602 }
2603
2604 GST_WARNING_OBJECT (appsrc, "Dropping old item %" GST_PTR_FORMAT, item);
2605
2606 gst_app_src_update_queued_pop (appsrc, item, FALSE);
2607 gst_mini_object_unref (item);
2608
2609 priv->need_discont_downstream = TRUE;
2610 continue;
2611 }
2612
2613 if (first) {
2614 /* continue to check for flushing/eos after releasing the lock */
2615 first = FALSE;
2616 continue;
2617 }
2618 if (priv->block) {
2619 GST_DEBUG_OBJECT (appsrc, "waiting for free space");
2620 /* we are filled, wait until a buffer gets popped or when we
2621 * flush. */
2622 priv->wait_status |= APP_WAITING;
2623 g_cond_wait (&priv->cond, &priv->mutex);
2624 priv->wait_status &= ~APP_WAITING;
2625 } else {
2626 /* no need to wait for free space, we just pump more data into the
2627 * queue hoping that the caller reacts to the enough-data signal and
2628 * stops pushing buffers. */
2629 break;
2630 }
2631 } else {
2632 break;
2633 }
2634 }
2635
2636 if (priv->pending_custom_segment) {
2637 GstEvent *event = gst_event_new_segment (&priv->last_segment);
2638
2639 GST_DEBUG_OBJECT (appsrc, "enqueue new segment %" GST_PTR_FORMAT, event);
2640 gst_queue_array_push_tail (priv->queue, event);
2641 priv->pending_custom_segment = FALSE;
2642 }
2643
2644 if (buflist != NULL) {
2645 /* Mark the first buffer of the buffer list as DISCONT if we previously
2646 * dropped a buffer instead of queueing it */
2647 if (priv->need_discont_upstream) {
2648 if (!steal_ref) {
2649 buflist = gst_buffer_list_copy (buflist);
2650 steal_ref = TRUE;
2651 } else {
2652 buflist = gst_buffer_list_make_writable (buflist);
2653 }
2654 buffer = gst_buffer_list_get_writable (buflist, 0);
2655 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2656 priv->need_discont_upstream = FALSE;
2657 }
2658
2659 GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);
2660
2661 if (!steal_ref)
2662 gst_buffer_list_ref (buflist);
2663 gst_queue_array_push_tail (priv->queue, buflist);
2664 } else {
2665 /* Mark the buffer as DISCONT if we previously dropped a buffer instead of
2666 * queueing it */
2667 if (priv->need_discont_upstream) {
2668 if (!steal_ref) {
2669 buffer = gst_buffer_copy (buffer);
2670 steal_ref = TRUE;
2671 } else {
2672 buffer = gst_buffer_make_writable (buffer);
2673 }
2674 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2675 priv->need_discont_upstream = FALSE;
2676 }
2677
2678 GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
2679 if (!steal_ref)
2680 gst_buffer_ref (buffer);
2681 gst_queue_array_push_tail (priv->queue, buffer);
2682 }
2683
2684 gst_app_src_update_queued_push (appsrc,
2685 buflist ? GST_MINI_OBJECT_CAST (buflist) : GST_MINI_OBJECT_CAST (buffer));
2686
2687 if ((priv->wait_status & STREAM_WAITING))
2688 g_cond_broadcast (&priv->cond);
2689
2690 g_mutex_unlock (&priv->mutex);
2691
2692 return GST_FLOW_OK;
2693
2694 /* ERRORS */
2695 flushing:
2696 {
2697 GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
2698 if (steal_ref) {
2699 if (buflist)
2700 gst_buffer_list_unref (buflist);
2701 else
2702 gst_buffer_unref (buffer);
2703 }
2704 g_mutex_unlock (&priv->mutex);
2705 return GST_FLOW_FLUSHING;
2706 }
2707 eos:
2708 {
2709 GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
2710 if (steal_ref) {
2711 if (buflist)
2712 gst_buffer_list_unref (buflist);
2713 else
2714 gst_buffer_unref (buffer);
2715 }
2716 g_mutex_unlock (&priv->mutex);
2717 return GST_FLOW_EOS;
2718 }
2719 dropped:
2720 {
2721 GST_DEBUG_OBJECT (appsrc, "dropped new buffer %p, we are full", buffer);
2722 if (steal_ref) {
2723 if (buflist)
2724 gst_buffer_list_unref (buflist);
2725 else
2726 gst_buffer_unref (buffer);
2727 }
2728 g_mutex_unlock (&priv->mutex);
2729 return GST_FLOW_EOS;
2730 }
2731 }
2732
2733 static GstFlowReturn
gst_app_src_push_buffer_full(GstAppSrc * appsrc,GstBuffer * buffer,gboolean steal_ref)2734 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
2735 gboolean steal_ref)
2736 {
2737 return gst_app_src_push_internal (appsrc, buffer, NULL, steal_ref);
2738 }
2739
2740 static GstFlowReturn
gst_app_src_push_sample_internal(GstAppSrc * appsrc,GstSample * sample)2741 gst_app_src_push_sample_internal (GstAppSrc * appsrc, GstSample * sample)
2742 {
2743 GstAppSrcPrivate *priv = appsrc->priv;
2744 GstBufferList *buffer_list;
2745 GstBuffer *buffer;
2746 GstCaps *caps;
2747
2748 g_return_val_if_fail (GST_IS_SAMPLE (sample), GST_FLOW_ERROR);
2749
2750 caps = gst_sample_get_caps (sample);
2751 if (caps != NULL) {
2752 gst_app_src_set_caps (appsrc, caps);
2753 } else {
2754 GST_WARNING_OBJECT (appsrc, "received sample without caps");
2755 }
2756
2757 if (priv->handle_segment_change && priv->format == GST_FORMAT_TIME) {
2758 GstSegment *segment = gst_sample_get_segment (sample);
2759
2760 if (segment->format != GST_FORMAT_TIME) {
2761 GST_LOG_OBJECT (appsrc, "format %s is not supported",
2762 gst_format_get_name (segment->format));
2763 goto handle_buffer;
2764 }
2765
2766 g_mutex_lock (&priv->mutex);
2767 if (gst_segment_is_equal (&priv->last_segment, segment)) {
2768 GST_LOG_OBJECT (appsrc, "segment wasn't changed");
2769 g_mutex_unlock (&priv->mutex);
2770 goto handle_buffer;
2771 } else {
2772 GST_LOG_OBJECT (appsrc,
2773 "segment changed %" GST_SEGMENT_FORMAT " -> %" GST_SEGMENT_FORMAT,
2774 &priv->last_segment, segment);
2775 }
2776
2777 /* will be pushed to queue with next buffer/buffer-list */
2778 gst_segment_copy_into (segment, &priv->last_segment);
2779 priv->pending_custom_segment = TRUE;
2780 g_mutex_unlock (&priv->mutex);
2781 }
2782
2783 handle_buffer:
2784
2785 buffer = gst_sample_get_buffer (sample);
2786 if (buffer != NULL)
2787 return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
2788
2789 buffer_list = gst_sample_get_buffer_list (sample);
2790 if (buffer_list != NULL)
2791 return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
2792
2793 GST_WARNING_OBJECT (appsrc, "received sample without buffer or buffer list");
2794 return GST_FLOW_OK;
2795 }
2796
2797 /**
2798 * gst_app_src_push_buffer:
2799 * @appsrc: a #GstAppSrc
2800 * @buffer: (transfer full): a #GstBuffer to push
2801 *
2802 * Adds a buffer to the queue of buffers that the appsrc element will
2803 * push to its source pad. This function takes ownership of the buffer.
2804 *
2805 * When the block property is TRUE, this function can block until free
2806 * space becomes available in the queue.
2807 *
2808 * Returns: #GST_FLOW_OK when the buffer was successfully queued.
2809 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2810 * #GST_FLOW_EOS when EOS occurred.
2811 */
2812 GstFlowReturn
gst_app_src_push_buffer(GstAppSrc * appsrc,GstBuffer * buffer)2813 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
2814 {
2815 return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
2816 }
2817
2818 /**
2819 * gst_app_src_push_buffer_list:
2820 * @appsrc: a #GstAppSrc
2821 * @buffer_list: (transfer full): a #GstBufferList to push
2822 *
2823 * Adds a buffer list to the queue of buffers and buffer lists that the
2824 * appsrc element will push to its source pad. This function takes ownership
2825 * of @buffer_list.
2826 *
2827 * When the block property is TRUE, this function can block until free
2828 * space becomes available in the queue.
2829 *
2830 * Returns: #GST_FLOW_OK when the buffer list was successfully queued.
2831 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2832 * #GST_FLOW_EOS when EOS occurred.
2833 *
2834 * Since: 1.14
2835 */
2836 GstFlowReturn
gst_app_src_push_buffer_list(GstAppSrc * appsrc,GstBufferList * buffer_list)2837 gst_app_src_push_buffer_list (GstAppSrc * appsrc, GstBufferList * buffer_list)
2838 {
2839 return gst_app_src_push_internal (appsrc, NULL, buffer_list, TRUE);
2840 }
2841
2842 /**
2843 * gst_app_src_push_sample:
2844 * @appsrc: a #GstAppSrc
2845 * @sample: (transfer none): a #GstSample from which buffer and caps may be
2846 * extracted
2847 *
2848 * Extract a buffer from the provided sample and adds it to the queue of
2849 * buffers that the appsrc element will push to its source pad. Any
2850 * previous caps that were set on appsrc will be replaced by the caps
2851 * associated with the sample if not equal.
2852 *
2853 * This function does not take ownership of the
2854 * sample so the sample needs to be unreffed after calling this function.
2855 *
2856 * When the block property is TRUE, this function can block until free
2857 * space becomes available in the queue.
2858 *
2859 * Returns: #GST_FLOW_OK when the buffer was successfully queued.
2860 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2861 * #GST_FLOW_EOS when EOS occurred.
2862 *
2863 * Since: 1.6
2864 *
2865 */
2866 GstFlowReturn
gst_app_src_push_sample(GstAppSrc * appsrc,GstSample * sample)2867 gst_app_src_push_sample (GstAppSrc * appsrc, GstSample * sample)
2868 {
2869 return gst_app_src_push_sample_internal (appsrc, sample);
2870 }
2871
2872 /* push a buffer without stealing the ref of the buffer. This is used for the
2873 * action signal. */
2874 static GstFlowReturn
gst_app_src_push_buffer_action(GstAppSrc * appsrc,GstBuffer * buffer)2875 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
2876 {
2877 return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
2878 }
2879
2880 /* push a buffer list without stealing the ref of the buffer list. This is
2881 * used for the action signal. */
2882 static GstFlowReturn
gst_app_src_push_buffer_list_action(GstAppSrc * appsrc,GstBufferList * buffer_list)2883 gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
2884 GstBufferList * buffer_list)
2885 {
2886 return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
2887 }
2888
2889 /* push a sample without stealing the ref. This is used for the
2890 * action signal. */
2891 static GstFlowReturn
gst_app_src_push_sample_action(GstAppSrc * appsrc,GstSample * sample)2892 gst_app_src_push_sample_action (GstAppSrc * appsrc, GstSample * sample)
2893 {
2894 return gst_app_src_push_sample_internal (appsrc, sample);
2895 }
2896
2897 /**
2898 * gst_app_src_end_of_stream:
2899 * @appsrc: a #GstAppSrc
2900 *
2901 * Indicates to the appsrc element that the last buffer queued in the
2902 * element is the last buffer of the stream.
2903 *
2904 * Returns: #GST_FLOW_OK when the EOS was successfully queued.
2905 * #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
2906 */
2907 GstFlowReturn
gst_app_src_end_of_stream(GstAppSrc * appsrc)2908 gst_app_src_end_of_stream (GstAppSrc * appsrc)
2909 {
2910 GstAppSrcPrivate *priv;
2911
2912 g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
2913
2914 priv = appsrc->priv;
2915
2916 g_mutex_lock (&priv->mutex);
2917 /* can't accept buffers when we are flushing. We can accept them when we are
2918 * EOS although it will not do anything. */
2919 if (priv->flushing)
2920 goto flushing;
2921
2922 GST_DEBUG_OBJECT (appsrc, "sending EOS");
2923 priv->is_eos = TRUE;
2924 g_cond_broadcast (&priv->cond);
2925 g_mutex_unlock (&priv->mutex);
2926
2927 return GST_FLOW_OK;
2928
2929 /* ERRORS */
2930 flushing:
2931 {
2932 g_mutex_unlock (&priv->mutex);
2933 GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
2934 return GST_FLOW_FLUSHING;
2935 }
2936 }
2937
2938 /**
2939 * gst_app_src_set_callbacks: (skip)
2940 * @appsrc: a #GstAppSrc
2941 * @callbacks: the callbacks
2942 * @user_data: a user_data argument for the callbacks
2943 * @notify: a destroy notify function
2944 *
2945 * Set callbacks which will be executed when data is needed, enough data has
2946 * been collected or when a seek should be performed.
2947 * This is an alternative to using the signals, it has lower overhead and is thus
2948 * less expensive, but also less flexible.
2949 *
2950 * If callbacks are installed, no signals will be emitted for performance
2951 * reasons.
2952 *
2953 * Before 1.16.3 it was not possible to change the callbacks in a thread-safe
2954 * way.
2955 */
2956 void
gst_app_src_set_callbacks(GstAppSrc * appsrc,GstAppSrcCallbacks * callbacks,gpointer user_data,GDestroyNotify notify)2957 gst_app_src_set_callbacks (GstAppSrc * appsrc,
2958 GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
2959 {
2960 Callbacks *old_callbacks, *new_callbacks = NULL;
2961 GstAppSrcPrivate *priv;
2962
2963 g_return_if_fail (GST_IS_APP_SRC (appsrc));
2964 g_return_if_fail (callbacks != NULL);
2965
2966 priv = appsrc->priv;
2967
2968 if (callbacks) {
2969 new_callbacks = g_new0 (Callbacks, 1);
2970 new_callbacks->callbacks = *callbacks;
2971 new_callbacks->user_data = user_data;
2972 new_callbacks->destroy_notify = notify;
2973 new_callbacks->ref_count = 1;
2974 }
2975
2976 g_mutex_lock (&priv->mutex);
2977 old_callbacks = g_steal_pointer (&priv->callbacks);
2978 priv->callbacks = g_steal_pointer (&new_callbacks);
2979 g_mutex_unlock (&priv->mutex);
2980
2981 g_clear_pointer (&old_callbacks, callbacks_unref);
2982 }
2983
2984 /*** GSTURIHANDLER INTERFACE *************************************************/
2985
2986 static GstURIType
gst_app_src_uri_get_type(GType type)2987 gst_app_src_uri_get_type (GType type)
2988 {
2989 return GST_URI_SRC;
2990 }
2991
2992 static const gchar *const *
gst_app_src_uri_get_protocols(GType type)2993 gst_app_src_uri_get_protocols (GType type)
2994 {
2995 static const gchar *protocols[] = { "appsrc", NULL };
2996
2997 return protocols;
2998 }
2999
3000 static gchar *
gst_app_src_uri_get_uri(GstURIHandler * handler)3001 gst_app_src_uri_get_uri (GstURIHandler * handler)
3002 {
3003 GstAppSrc *appsrc = GST_APP_SRC (handler);
3004
3005 return appsrc->priv->uri ? g_strdup (appsrc->priv->uri) : NULL;
3006 }
3007
3008 static gboolean
gst_app_src_uri_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)3009 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
3010 GError ** error)
3011 {
3012 GstAppSrc *appsrc = GST_APP_SRC (handler);
3013
3014 g_free (appsrc->priv->uri);
3015 appsrc->priv->uri = uri ? g_strdup (uri) : NULL;
3016
3017 return TRUE;
3018 }
3019
3020 static void
gst_app_src_uri_handler_init(gpointer g_iface,gpointer iface_data)3021 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
3022 {
3023 GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
3024
3025 iface->get_type = gst_app_src_uri_get_type;
3026 iface->get_protocols = gst_app_src_uri_get_protocols;
3027 iface->get_uri = gst_app_src_uri_get_uri;
3028 iface->set_uri = gst_app_src_uri_set_uri;
3029 }
3030
3031 static gboolean
gst_app_src_event(GstBaseSrc * src,GstEvent * event)3032 gst_app_src_event (GstBaseSrc * src, GstEvent * event)
3033 {
3034 GstAppSrc *appsrc = GST_APP_SRC_CAST (src);
3035 GstAppSrcPrivate *priv = appsrc->priv;
3036
3037 switch (GST_EVENT_TYPE (event)) {
3038 case GST_EVENT_FLUSH_STOP:
3039 g_mutex_lock (&priv->mutex);
3040 priv->is_eos = FALSE;
3041 g_mutex_unlock (&priv->mutex);
3042 break;
3043 default:
3044 break;
3045 }
3046
3047 return GST_BASE_SRC_CLASS (parent_class)->event (src, event);
3048 }
3049