1 /* GStreamer
2 *
3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21
22 /**
23 * SECTION:gstadaptivedemux
24 * @short_description: Base class for adaptive demuxers
25 *
26 * What is an adaptive demuxer?
27 * Adaptive demuxers are special demuxers in the sense that they don't
28 * actually demux data received from upstream but download the data
29 * themselves.
30 *
31 * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and
32 * a set of fragments. The manifest describes the available media and
33 * the sequence of fragments to use. Each fragment contains a small
34 * part of the media (typically only a few seconds). It is possible for
35 * the manifest to have the same media available in different configurations
36 * (bitrates for example) so that the client can select the one that
37 * best suits its scenario (network fluctuation, hardware requirements...).
38 * It is possible to switch from one representation of the media to another
39 * during playback. That's why it is called 'adaptive', because it can be
40 * adapted to the client's needs.
41 *
42 * Architectural overview:
43 * The manifest is received by the demuxer in its sink pad and, upon receiving
44 * EOS, it parses the manifest and exposes the streams available in it. For
45 * each stream a source element will be created and will download the list
46 * of fragments one by one. Once a fragment is finished downloading, the next
47 * URI is set to the source element and it starts fetching it and pushing
48 * through the stream's pad. This implies that each stream is independent from
49 * each other as it runs on a separate thread.
50 *
51 * After downloading each fragment, the download rate of it is calculated and
52 * the demuxer has a chance to switch to a different bitrate if needed. The
53 * switch can be done by simply pushing a new caps before the next fragment
54 * when codecs are the same, or by exposing a new pad group if it needs
55 * a codec change.
56 *
57 * Extra features:
58 * - Not linked streams: Streams that are not-linked have their download threads
59 * interrupted to save network bandwidth. When they are
60 * relinked a reconfigure event is received and the
61 * stream is restarted.
62 *
63 * Subclasses:
64 * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
65 * about the intrinsics of the subclass formats, so the subclasses are
66 * responsible for maintaining the manifest data structures and stream
67 * information.
68 */
69
70 /*
71 MT safety.
72 The following rules were observed while implementing MT safety in adaptive demux:
73 1. If a variable is accessed from multiple threads and at least one thread
74 writes to it, then all the accesses needs to be done from inside a critical section.
75 2. If thread A wants to join thread B then at the moment it calls gst_task_join
76 it must not hold any mutexes that thread B might take.
77
78 Adaptive demux API can be called from several threads. More, adaptive demux
79 starts some threads to monitor the download of fragments. In order to protect
80 accesses to shared variables (demux and streams) all the API functions that
81 can be run in different threads will need to get a mutex (manifest_lock)
82 when they start and release it when they end. Because some of those functions
83 can indirectly call other API functions (eg they can generate events or messages
84 that are processed in the same thread) the manifest_lock must be recursive.
85
86 The manifest_lock will serialize the public API making access to shared
87 variables safe. But some of these functions will try at some moment to join
88 threads created by adaptive demux, or to change the state of src elements
89 (which will block trying to join the src element streaming thread). Because
90 of rule 2, those functions will need to release the manifest_lock during the
91 call of gst_task_join. During this time they can be interrupted by other API calls.
92 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
93 is called and this will join all threads. In order to prevent interruptions
94 during such period, all the API functions will also use a second lock: api_lock.
95 This will be taken at the beginning of the function and released at the end,
96 but this time this lock will not be temporarily released during join.
97 This lock will be used only by API calls (not by gst_adaptive_demux_stream_download_loop
98 or gst_adaptive_demux_updates_loop or _src_chain or _src_event) so it is safe
99 to hold it while joining the threads or changing the src element state. The
100 api_lock will serialise all external requests to adaptive demux. In order to
101 avoid deadlocks, if a function needs to acquire both manifest and api locks,
102 the api_lock will be taken first and the manifest_lock second.
103
104 By using the api_lock a thread is protected against other API calls. But when
105 temporarily dropping the manifest_lock, it will be vulnerable to changes from
106 threads that use only the manifest_lock and not the api_lock. These threads run
107 one of the following functions: gst_adaptive_demux_stream_download_loop,
108 gst_adaptive_demux_updates_loop, _src_chain, _src_event. In order to guarantee
109 that all operations during an API call are not impacted by other writes, the
110 above mentioned functions must check a cancelled flag every time they reacquire
111 the manifest_lock. If the flag is set, they must exit immediately, without
112 performing any changes on the shared data. In this way, an API call (eg seek
113 request) can set the cancel flag before releasing the manifest_lock and be sure
114 that the demux object and its streams are not changed by anybody else.
115 */
116
117 #ifdef HAVE_CONFIG_H
118 #include "config.h"
119 #endif
120
121 #include "gstadaptivedemux.h"
122 #include "gst/gst-i18n-plugin.h"
123 #include <gst/base/gstadapter.h>
124
125 GST_DEBUG_CATEGORY (adaptivedemux_debug);
126 #define GST_CAT_DEFAULT adaptivedemux_debug
127
128 #define MAX_DOWNLOAD_ERROR_COUNT 3
129 #define DEFAULT_FAILED_COUNT 3
130 #define DEFAULT_CONNECTION_SPEED 0
131 #define DEFAULT_BITRATE_LIMIT 0.8f
132 #define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024 /* For safety. Large enough to hold a segment. */
133 #define NUM_LOOKBACK_FRAGMENTS 3
134
135 #ifdef OHOS_EXT_FUNC
136 // ohos.ext.func.0013
137 #define DEFAULT_TIMEOUT 15
138 #endif
139
140 #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
141 #define GST_MANIFEST_LOCK(d) G_STMT_START { \
142 GST_TRACE("Locking from thread %p", g_thread_self()); \
143 g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d)); \
144 GST_TRACE("Locked from thread %p", g_thread_self()); \
145 } G_STMT_END
146
147 #define GST_MANIFEST_UNLOCK(d) G_STMT_START { \
148 GST_TRACE("Unlocking from thread %p", g_thread_self()); \
149 g_rec_mutex_unlock (GST_MANIFEST_GET_LOCK (d)); \
150 } G_STMT_END
151
152 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
153 #define GST_API_LOCK(d) g_mutex_lock (GST_API_GET_LOCK (d));
154 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
155
156 #define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
157 #define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
158 #define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
159
160 enum
161 {
162 PROP_0,
163 PROP_CONNECTION_SPEED,
164 PROP_BITRATE_LIMIT,
165 #ifdef OHOS_EXT_FUNC
166 // ohos.ext.func.0013
167 PROP_STATE_CHANGE,
168 PROP_TIMEOUT,
169 PROP_EXIT_BLOCK,
170 #endif
171 PROP_LAST
172 };
173
174 #ifdef OHOS_EXT_FUNC
175 // ohos.ext.func.0028
176 enum {
177 SIGNAL_BITRATE_PARSE_COMPLETE,
178 LAST_SIGNALS
179 };
180
181 static guint g_gst_adaptive_demux_signals[LAST_SIGNALS] = {0};
182 #endif
183
184 /* Internal, so not using GST_FLOW_CUSTOM_SUCCESS_N */
185 #define GST_ADAPTIVE_DEMUX_FLOW_SWITCH (GST_FLOW_CUSTOM_SUCCESS_2 + 1)
186
187 struct _GstAdaptiveDemuxPrivate
188 {
189 GstAdapter *input_adapter; /* protected by manifest_lock */
190 gint have_manifest; /* MT safe */
191
192 GList *old_streams; /* protected by manifest_lock */
193
194 GstTask *updates_task; /* MT safe */
195 GRecMutex updates_lock;
196 GMutex updates_timed_lock;
197 GCond updates_timed_cond; /* protected by updates_timed_lock */
198 gboolean stop_updates_task; /* protected by updates_timed_lock */
199
200 /* used only from updates_task, no need to protect it */
201 gint update_failed_count;
202
203 guint32 segment_seqnum; /* protected by manifest_lock */
204
205 /* main lock used to protect adaptive demux and all its streams.
206 * It serializes the adaptive demux public API.
207 */
208 GRecMutex manifest_lock;
209
210 /* condition to wait for manifest updates on a live stream.
211 * In order to signal the manifest_cond, the caller needs to hold both
212 * manifest_lock and manifest_update_lock (taken in this order)
213 */
214 GCond manifest_cond;
215 GMutex manifest_update_lock;
216
217 /* Lock and condition for prerolling streams before exposing */
218 GMutex preroll_lock;
219 GCond preroll_cond;
220 gint preroll_pending;
221
222 GMutex api_lock;
223
224 /* Protects demux and stream segment information
225 * Needed because seeks can update segment information
226 * without needing to stop tasks when they just want to
227 * update the segment boundaries */
228 GMutex segment_lock;
229
230 GstClockTime qos_earliest_time;
231 };
232
233 typedef struct _GstAdaptiveDemuxTimer
234 {
235 gint ref_count;
236 GCond *cond;
237 GMutex *mutex;
238 GstClockID clock_id;
239 gboolean fired;
240 } GstAdaptiveDemuxTimer;
241
242 static GstBinClass *parent_class = NULL;
243 static gint private_offset = 0;
244
245 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
246 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
247 GstAdaptiveDemuxClass * klass);
248 static void gst_adaptive_demux_finalize (GObject * object);
249 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
250 element, GstStateChange transition);
251
252 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
253
254 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
255 GstEvent * event);
256 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
257 GstObject * parent, GstBuffer * buffer);
258 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
259 GstQuery * query);
260 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
261 GstEvent * event);
262
263 static gboolean
264 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
265
266 static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
267 static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
268 stream);
269 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
270 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
271 gboolean first_and_live);
272 static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
273 static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
274 static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
275 GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
276 GstClockTime ts, GstClockTime * final_ts);
277 static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
278 demux, GstAdaptiveDemuxStream * stream);
279 static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
280 demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
281 static GstFlowReturn
282 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
283 GstAdaptiveDemuxStream * stream);
284 static gint64
285 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux * demux,
286 GstAdaptiveDemuxStream * stream);
287 static GstFlowReturn gst_adaptive_demux_update_manifest (GstAdaptiveDemux *
288 demux);
289 static GstFlowReturn
290 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
291 static gboolean gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux);
292 static void gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux);
293
294 static void gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream);
295 static GstFlowReturn
296 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
297 GstEvent * event);
298
299 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
300 demux);
301 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
302 demux);
303
304 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
305 gboolean start_preroll_streams);
306 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
307 gboolean stop_updates);
308 static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
309 demux);
310 static void
311 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
312 stream, GstFlowReturn ret, GError * err);
313 static GstFlowReturn
314 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
315 GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
316 static GstFlowReturn
317 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
318 GstAdaptiveDemuxStream * stream);
319 static GstFlowReturn
320 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
321 GstAdaptiveDemuxStream * stream, GstClockTime duration);
322 static gboolean
323 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
324 GstClockTime end_time);
325 static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
326 GstClockTime time, GstClockID id, gpointer user_data);
327 static gboolean
328 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
329 * demux);
330
331 #ifdef OHOS_EXT_FUNC
332 // ohos.ext.func.0013
333 static void set_property_to_element (GstObject *elem, guint property_id, const void *property_value);
334 static void set_property_to_src_element (const GList *stream_list, guint property_id, const void *value);
335 static void set_property_to_src_and_download (GstAdaptiveDemux *demux, guint property_id, const void *property_value);
336 #endif
337
338 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
339 * method to get to the padtemplates */
340 GType
gst_adaptive_demux_get_type(void)341 gst_adaptive_demux_get_type (void)
342 {
343 static gsize type = 0;
344
345 if (g_once_init_enter (&type)) {
346 GType _type;
347 static const GTypeInfo info = {
348 sizeof (GstAdaptiveDemuxClass),
349 NULL,
350 NULL,
351 (GClassInitFunc) gst_adaptive_demux_class_init,
352 NULL,
353 NULL,
354 sizeof (GstAdaptiveDemux),
355 0,
356 (GInstanceInitFunc) gst_adaptive_demux_init,
357 };
358
359 _type = g_type_register_static (GST_TYPE_BIN,
360 "GstAdaptiveDemux", &info, G_TYPE_FLAG_ABSTRACT);
361
362 private_offset =
363 g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
364
365 g_once_init_leave (&type, _type);
366 }
367 return type;
368 }
369
370 static inline GstAdaptiveDemuxPrivate *
gst_adaptive_demux_get_instance_private(GstAdaptiveDemux * self)371 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
372 {
373 return (G_STRUCT_MEMBER_P (self, private_offset));
374 }
375
376 #ifdef OHOS_OPT_COMPAT
377 // ohos.ext.func.0029
378 static void
gst_adaptive_demux_set_exit_block(GstAdaptiveDemux * demux,const GValue * value)379 gst_adaptive_demux_set_exit_block (GstAdaptiveDemux *demux, const GValue *value)
380 {
381 gint exit_block = g_value_get_int (value);
382 GST_INFO ("adaptive exit_block :%d", exit_block);
383 if (demux->priv == NULL) {
384 GST_WARNING_OBJECT (demux, "adaptive exit_block :%d, demux->priv is null", exit_block);
385 return;
386 }
387 if (exit_block != 1) {
388 return;
389 }
390 GList *iter = NULL;
391 /* stop update task */
392 if (demux->priv->updates_task != NULL) {
393 gst_task_stop (demux->priv->updates_task);
394 demux->priv->updates_task = NULL;
395 }
396 g_mutex_lock (&demux->priv->updates_timed_lock);
397 demux->priv->stop_updates_task = TRUE;
398 g_cond_signal (&demux->priv->updates_timed_cond);
399 g_mutex_unlock (&demux->priv->updates_timed_lock);
400 if (demux->downloader != NULL) {
401 gst_uri_downloader_cancel (demux->downloader);
402 }
403 /* stop download tasks */
404 for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
405 GstAdaptiveDemuxStream *stream = iter->data;
406 if (stream == NULL) {
407 GST_WARNING_OBJECT (demux, "stream is null");
408 continue;
409 }
410
411 if (stream->download_task != NULL) {
412 gst_task_stop (stream->download_task);
413 }
414 g_mutex_lock (&stream->fragment_download_lock);
415 stream->cancelled = TRUE;
416 g_cond_signal (&stream->fragment_download_cond);
417 g_mutex_unlock (&stream->fragment_download_lock);
418 }
419 /* set exit_block to src element */
420 set_property_to_src_and_download (demux, PROP_EXIT_BLOCK, (void *) &exit_block);
421 }
422 #endif
423
424 static void
gst_adaptive_demux_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)425 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
426 const GValue * value, GParamSpec * pspec)
427 {
428 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
429
430 #ifdef OHOS_OPT_COMPAT
431 // ohos.opt.compat.0029
432 if (prop_id != PROP_EXIT_BLOCK) {
433 #endif
434 GST_API_LOCK (demux);
435 GST_MANIFEST_LOCK (demux);
436 #ifdef OHOS_OPT_COMPAT
437 // ohos.opt.compat.0029
438 }
439 #endif
440
441 switch (prop_id) {
442 case PROP_CONNECTION_SPEED:
443 #ifdef OHOS_EXT_FUNC
444 // ohos.ext.func.0028
445 demux->connection_speed = g_value_get_uint (value);
446 #else
447 demux->connection_speed = g_value_get_uint (value) * 1000;
448 #endif
449 GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
450 demux->connection_speed);
451 break;
452 case PROP_BITRATE_LIMIT:
453 demux->bitrate_limit = g_value_get_float (value);
454 break;
455 #ifdef OHOS_EXT_FUNC
456 // ohos.ext.func.0013
457 case PROP_TIMEOUT: {
458 guint timeout = g_value_get_uint (value);
459 set_property_to_src_and_download(demux, prop_id, (void *)&timeout);
460 break;
461 }
462 case PROP_STATE_CHANGE: {
463 gint state = g_value_get_int (value);
464 set_property_to_src_and_download(demux, prop_id, (void *)&state);
465 break;
466 }
467 case PROP_EXIT_BLOCK: {
468 // ohos.ext.func.0029
469 gst_adaptive_demux_set_exit_block(demux, value);
470 break;
471 }
472 #endif
473 default:
474 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
475 break;
476 }
477
478 #ifdef OHOS_OPT_COMPAT
479 // ohos.opt.compat.0029
480 if (prop_id != PROP_EXIT_BLOCK) {
481 #endif
482 GST_MANIFEST_UNLOCK (demux);
483 GST_API_UNLOCK (demux);
484 #ifdef OHOS_OPT_COMPAT
485 // ohos.opt.compat.0029
486 }
487 #endif
488 }
489
490 static void
gst_adaptive_demux_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)491 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
492 GValue * value, GParamSpec * pspec)
493 {
494 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
495
496 GST_MANIFEST_LOCK (demux);
497
498 switch (prop_id) {
499 case PROP_CONNECTION_SPEED:
500 g_value_set_uint (value, demux->connection_speed / 1000);
501 break;
502 case PROP_BITRATE_LIMIT:
503 g_value_set_float (value, demux->bitrate_limit);
504 break;
505 default:
506 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
507 break;
508 }
509
510 GST_MANIFEST_UNLOCK (demux);
511 }
512
513 static void
gst_adaptive_demux_class_init(GstAdaptiveDemuxClass * klass)514 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
515 {
516 GObjectClass *gobject_class;
517 GstElementClass *gstelement_class;
518 GstBinClass *gstbin_class;
519
520 gobject_class = G_OBJECT_CLASS (klass);
521 gstelement_class = GST_ELEMENT_CLASS (klass);
522 gstbin_class = GST_BIN_CLASS (klass);
523
524 GST_DEBUG_CATEGORY_INIT (adaptivedemux_debug, "adaptivedemux", 0,
525 "Base Adaptive Demux");
526
527 parent_class = g_type_class_peek_parent (klass);
528
529 if (private_offset != 0)
530 g_type_class_adjust_private_offset (klass, &private_offset);
531
532 gobject_class->set_property = gst_adaptive_demux_set_property;
533 gobject_class->get_property = gst_adaptive_demux_get_property;
534 gobject_class->finalize = gst_adaptive_demux_finalize;
535
536 #ifdef OHOS_EXT_FUNC
537 // ohos.ext.func.0028
538 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
539 g_param_spec_uint ("connection-speed", "Connection Speed",
540 "Network connection speed in kbps (0 = calculate from downloaded"
541 " fragments)", 0, G_MAXUINT, DEFAULT_CONNECTION_SPEED,
542 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
543 #else
544 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
545 g_param_spec_uint ("connection-speed", "Connection Speed",
546 "Network connection speed in kbps (0 = calculate from downloaded"
547 " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
548 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
549 #endif
550
551 /* FIXME 2.0: rename this property to bandwidth-usage or any better name */
552 g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
553 g_param_spec_float ("bitrate-limit",
554 "Bitrate limit in %",
555 "Limit of the available bitrate to use when switching to alternates.",
556 0, 1, DEFAULT_BITRATE_LIMIT,
557 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
558
559 #ifdef OHOS_EXT_FUNC
560 // ohos.ext.func.0013
561 g_object_class_install_property (gobject_class, PROP_TIMEOUT,
562 g_param_spec_uint ("timeout", "timeout",
563 "Value in seconds to timeout a blocking I/O (0 = No timeout).", 0,
564 3600, DEFAULT_TIMEOUT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
565
566 g_object_class_install_property (gobject_class, PROP_STATE_CHANGE,
567 g_param_spec_int ("state-change", "state-change from adaptive-demux",
568 "state-change from adaptive-demux", 0, (gint) (G_MAXINT32), 0,
569 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
570
571 g_object_class_install_property (gobject_class, PROP_EXIT_BLOCK,
572 g_param_spec_int ("exit-block", "EXIT BLOCK",
573 "souphttpsrc exit block", 0, (gint) (G_MAXINT32), 0,
574 G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
575 #endif
576
577 #ifdef OHOS_EXT_FUNC
578 // ohos.ext.func.0028
579 g_gst_adaptive_demux_signals[SIGNAL_BITRATE_PARSE_COMPLETE] =
580 g_signal_new("bitrate-parse-complete",
581 G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST,
582 0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_POINTER, G_TYPE_UINT);
583 #endif
584
585 gstelement_class->change_state = gst_adaptive_demux_change_state;
586
587 gstbin_class->handle_message = gst_adaptive_demux_handle_message;
588
589 klass->data_received = gst_adaptive_demux_stream_data_received_default;
590 klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
591 klass->update_manifest = gst_adaptive_demux_update_manifest_default;
592 klass->requires_periodical_playlist_update =
593 gst_adaptive_demux_requires_periodical_playlist_update_default;
594
595 }
596
597 static void
gst_adaptive_demux_init(GstAdaptiveDemux * demux,GstAdaptiveDemuxClass * klass)598 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
599 GstAdaptiveDemuxClass * klass)
600 {
601 GstPadTemplate *pad_template;
602 GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
603 GObjectClass *gobject_class;
604
605 GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
606
607 demux->priv = gst_adaptive_demux_get_instance_private (demux);
608 demux->priv->input_adapter = gst_adapter_new ();
609 demux->downloader = gst_uri_downloader_new ();
610 gst_uri_downloader_set_parent (demux->downloader, GST_ELEMENT_CAST (demux));
611 demux->stream_struct_size = sizeof (GstAdaptiveDemuxStream);
612 demux->priv->segment_seqnum = gst_util_seqnum_next ();
613 demux->have_group_id = FALSE;
614 demux->group_id = G_MAXUINT;
615
616 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
617
618 gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
619 GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
620
621 demux->realtime_clock = gst_system_clock_obtain ();
622 g_assert (demux->realtime_clock != NULL);
623 gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
624 if (g_object_class_find_property (gobject_class, "clock-type")) {
625 g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
626 } else {
627 GST_WARNING_OBJECT (demux,
628 "System clock does not have clock-type property");
629 }
630 if (clock_type == GST_CLOCK_TYPE_REALTIME) {
631 demux->clock_offset = 0;
632 } else {
633 GDateTime *utc_now;
634 GstClockTime rtc_now;
635
636 utc_now = g_date_time_new_now_utc ();
637 rtc_now = gst_clock_get_time (demux->realtime_clock);
638 demux->clock_offset =
639 g_date_time_to_unix (utc_now) * G_TIME_SPAN_SECOND +
640 g_date_time_get_microsecond (utc_now) - GST_TIME_AS_USECONDS (rtc_now);
641 g_date_time_unref (utc_now);
642 }
643 g_rec_mutex_init (&demux->priv->updates_lock);
644 demux->priv->updates_task =
645 gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
646 demux, NULL);
647 gst_task_set_lock (demux->priv->updates_task, &demux->priv->updates_lock);
648
649 g_mutex_init (&demux->priv->updates_timed_lock);
650 g_cond_init (&demux->priv->updates_timed_cond);
651
652 g_cond_init (&demux->priv->manifest_cond);
653 g_mutex_init (&demux->priv->manifest_update_lock);
654
655 g_rec_mutex_init (&demux->priv->manifest_lock);
656 g_mutex_init (&demux->priv->api_lock);
657 g_mutex_init (&demux->priv->segment_lock);
658
659 g_cond_init (&demux->priv->preroll_cond);
660 g_mutex_init (&demux->priv->preroll_lock);
661
662 pad_template =
663 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
664 g_return_if_fail (pad_template != NULL);
665
666 demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
667 gst_pad_set_event_function (demux->sinkpad,
668 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
669 gst_pad_set_chain_function (demux->sinkpad,
670 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
671
672 /* Properties */
673 demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
674 demux->connection_speed = DEFAULT_CONNECTION_SPEED;
675
676 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
677 }
678
679 static void
gst_adaptive_demux_finalize(GObject * object)680 gst_adaptive_demux_finalize (GObject * object)
681 {
682 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
683 GstAdaptiveDemuxPrivate *priv = demux->priv;
684
685 GST_DEBUG_OBJECT (object, "finalize");
686
687 g_object_unref (priv->input_adapter);
688 g_object_unref (demux->downloader);
689
690 g_mutex_clear (&priv->updates_timed_lock);
691 g_cond_clear (&priv->updates_timed_cond);
692 g_mutex_clear (&demux->priv->manifest_update_lock);
693 g_cond_clear (&demux->priv->manifest_cond);
694 g_object_unref (priv->updates_task);
695 g_rec_mutex_clear (&priv->updates_lock);
696 g_rec_mutex_clear (&demux->priv->manifest_lock);
697 g_mutex_clear (&demux->priv->api_lock);
698 g_mutex_clear (&demux->priv->segment_lock);
699 if (demux->realtime_clock) {
700 gst_object_unref (demux->realtime_clock);
701 demux->realtime_clock = NULL;
702 }
703
704 g_cond_clear (&demux->priv->preroll_cond);
705 g_mutex_clear (&demux->priv->preroll_lock);
706
707 G_OBJECT_CLASS (parent_class)->finalize (object);
708 }
709
710 #ifdef OHOS_EXT_FUNC
711 // ohos.ext.func.0013
712 static void
set_property_to_element(GstObject * elem,guint property_id,const void * property_value)713 set_property_to_element (GstObject *elem, guint property_id, const void *property_value)
714 {
715 if ((elem == NULL) || (property_value == NULL)) {
716 return;
717 }
718
719 if (property_id == PROP_STATE_CHANGE) {
720 const gint *state_change = (const gint *) property_value;
721 g_object_set (elem, "state-change", *state_change, NULL);
722 } else if (property_id == PROP_TIMEOUT) {
723 const guint *timeout = (const guint *) property_value;
724 g_object_set (elem, "timeout", *timeout, NULL);
725 } else if (property_id == PROP_EXIT_BLOCK) {
726 const gint *exit_block = (const gint *) property_value;
727 g_object_set (elem, "exit-block", *exit_block, NULL);
728 }
729 }
730
731 static void
set_property_to_src_element(const GList * stream_list,guint property_id,const void * property_value)732 set_property_to_src_element (const GList *stream_list, guint property_id, const void *property_value)
733 {
734 GstAdaptiveDemuxStream *stream = NULL;
735 GstIterator *iter = NULL;
736
737 if (property_value == NULL) {
738 GST_WARNING ("value is NULL, set_property_to_src_element failed!");
739 return;
740 }
741
742 for (; stream_list != NULL; stream_list = g_list_next (stream_list)) {
743 GValue data = { 0, };
744
745 stream = stream_list->data;
746 if ((stream == NULL) || (stream->src == NULL)) {
747 continue;
748 }
749
750 iter = gst_bin_iterate_sources ((GstBin *) stream->src);
751 if (iter == NULL) {
752 continue;
753 }
754 if (gst_iterator_next (iter, &data) == GST_ITERATOR_OK) {
755 GstElement *uri_src = g_value_get_object (&data);
756 if (uri_src != NULL) {
757 set_property_to_element ((GstObject *) uri_src, property_id, property_value);
758 }
759 }
760 if (G_IS_VALUE (&data)) {
761 g_value_unset (&data);
762 }
763 gst_iterator_free (iter);
764 iter = NULL;
765 }
766 }
767
768 static void
set_property_to_src_and_download(GstAdaptiveDemux * demux,guint property_id,const void * property_value)769 set_property_to_src_and_download (GstAdaptiveDemux *demux, guint property_id, const void *property_value)
770 {
771 if ((demux == NULL) || (property_value == NULL)) {
772 GST_WARNING ("input parameter is error");
773 return;
774 }
775
776 set_property_to_src_element (demux->streams, property_id, property_value);
777 //set_property_to_element ((GstObject *) demux->downloader, property_id, property_value);
778 }
779 #endif
780
781 static GstStateChangeReturn
gst_adaptive_demux_change_state(GstElement * element,GstStateChange transition)782 gst_adaptive_demux_change_state (GstElement * element,
783 GstStateChange transition)
784 {
785 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
786 GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
787
788 switch (transition) {
789 case GST_STATE_CHANGE_PAUSED_TO_READY:
790 if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
791 GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
792 gst_uri_downloader_cancel (demux->downloader);
793
794 GST_API_LOCK (demux);
795 GST_MANIFEST_LOCK (demux);
796 gst_adaptive_demux_reset (demux);
797 GST_MANIFEST_UNLOCK (demux);
798 GST_API_UNLOCK (demux);
799 break;
800 case GST_STATE_CHANGE_READY_TO_PAUSED:
801 GST_API_LOCK (demux);
802 GST_MANIFEST_LOCK (demux);
803 gst_adaptive_demux_reset (demux);
804 /* Clear "cancelled" flag in uridownloader since subclass might want to
805 * use uridownloader to fetch another manifest */
806 gst_uri_downloader_reset (demux->downloader);
807 if (g_atomic_int_get (&demux->priv->have_manifest))
808 gst_adaptive_demux_start_manifest_update_task (demux);
809 GST_MANIFEST_UNLOCK (demux);
810 GST_API_UNLOCK (demux);
811 if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
812 GST_DEBUG_OBJECT (demux, "demuxer has started running");
813 break;
814 default:
815 break;
816 }
817
818 /* this must be run without MANIFEST_LOCK taken.
819 * For PLAYING to PLAYING state changes, it will want to take a lock in
820 * src element and that lock is held while the streaming thread is running.
821 * The streaming thread will take the MANIFEST_LOCK, leading to a deadlock.
822 */
823 result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
824
825 return result;
826 }
827
828 #ifdef OHOS_EXT_FUNC
829 // ohos.ext.func.0028
830 static void
gst_adaptive_demux_update_bitrate(GstAdaptiveDemux * demux)831 gst_adaptive_demux_update_bitrate (GstAdaptiveDemux *demux)
832 {
833 GstAdaptiveDemuxBitrateInfo stream_bitrate_info;
834 stream_bitrate_info.bitrate_list = NULL;
835 stream_bitrate_info.bitrate_num = 0;
836 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
837 if ((demux_class->get_bitrate_info) == NULL) {
838 return;
839 }
840
841 gboolean ret = demux_class->get_bitrate_info(demux, &stream_bitrate_info);
842 if (!ret) {
843 return;
844 }
845
846 GST_INFO_OBJECT (demux, "Send to user, bitrate num = %u", stream_bitrate_info.bitrate_num);
847 g_signal_emit (GST_ELEMENT(demux), g_gst_adaptive_demux_signals[SIGNAL_BITRATE_PARSE_COMPLETE],
848 0, stream_bitrate_info.bitrate_list, stream_bitrate_info.bitrate_num);
849
850 if (stream_bitrate_info.bitrate_list != NULL) {
851 g_free (stream_bitrate_info.bitrate_list);
852 stream_bitrate_info.bitrate_list = NULL;
853 stream_bitrate_info.bitrate_num = 0;
854 }
855 }
856 #endif
857
858 static gboolean
gst_adaptive_demux_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)859 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
860 GstEvent * event)
861 {
862 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
863 gboolean ret;
864
865 switch (event->type) {
866 case GST_EVENT_FLUSH_STOP:{
867 GST_API_LOCK (demux);
868 GST_MANIFEST_LOCK (demux);
869
870 gst_adaptive_demux_reset (demux);
871
872 ret = gst_pad_event_default (pad, parent, event);
873
874 GST_MANIFEST_UNLOCK (demux);
875 GST_API_UNLOCK (demux);
876
877 return ret;
878 }
879 case GST_EVENT_EOS:{
880 GstAdaptiveDemuxClass *demux_class;
881 GstQuery *query;
882 gboolean query_res;
883 gboolean ret = TRUE;
884 gsize available;
885 GstBuffer *manifest_buffer;
886
887 GST_API_LOCK (demux);
888 GST_MANIFEST_LOCK (demux);
889
890 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
891
892 available = gst_adapter_available (demux->priv->input_adapter);
893
894 if (available == 0) {
895 GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
896 ret = gst_pad_event_default (pad, parent, event);
897
898 GST_MANIFEST_UNLOCK (demux);
899 GST_API_UNLOCK (demux);
900
901 return ret;
902 }
903
904 GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
905
906 /* Need to get the URI to use it as a base to generate the fragment's
907 * uris */
908 query = gst_query_new_uri ();
909 query_res = gst_pad_peer_query (pad, query);
910 if (query_res) {
911 gchar *uri, *redirect_uri;
912 gboolean permanent;
913
914 gst_query_parse_uri (query, &uri);
915 gst_query_parse_uri_redirection (query, &redirect_uri);
916 gst_query_parse_uri_redirection_permanent (query, &permanent);
917
918 if (permanent && redirect_uri) {
919 demux->manifest_uri = redirect_uri;
920 demux->manifest_base_uri = NULL;
921 g_free (uri);
922 } else {
923 demux->manifest_uri = uri;
924 demux->manifest_base_uri = redirect_uri;
925 }
926
927 GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
928 demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
929 } else {
930 GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
931 }
932 gst_query_unref (query);
933
934 /* Let the subclass parse the manifest */
935 manifest_buffer =
936 gst_adapter_take_buffer (demux->priv->input_adapter, available);
937 if (!demux_class->process_manifest (demux, manifest_buffer)) {
938 /* In most cases, this will happen if we set a wrong url in the
939 * source element and we have received the 404 HTML response instead of
940 * the manifest */
941 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
942 (NULL));
943 ret = FALSE;
944 } else {
945 g_atomic_int_set (&demux->priv->have_manifest, TRUE);
946 }
947
948 #ifdef OHOS_EXT_FUNC
949 // ohos.ext.func.0028
950 if (ret) {
951 gst_adaptive_demux_update_bitrate(demux);
952 }
953 #endif
954 gst_buffer_unref (manifest_buffer);
955
956 gst_element_post_message (GST_ELEMENT_CAST (demux),
957 gst_message_new_element (GST_OBJECT_CAST (demux),
958 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
959 "manifest-uri", G_TYPE_STRING,
960 demux->manifest_uri, "uri", G_TYPE_STRING,
961 demux->manifest_uri,
962 "manifest-download-start", GST_TYPE_CLOCK_TIME,
963 GST_CLOCK_TIME_NONE,
964 "manifest-download-stop", GST_TYPE_CLOCK_TIME,
965 gst_util_get_timestamp (), NULL)));
966
967 if (ret) {
968 /* Send duration message */
969 if (!gst_adaptive_demux_is_live (demux)) {
970 GstClockTime duration = demux_class->get_duration (demux);
971
972 if (duration != GST_CLOCK_TIME_NONE) {
973 GST_DEBUG_OBJECT (demux,
974 "Sending duration message : %" GST_TIME_FORMAT,
975 GST_TIME_ARGS (duration));
976 gst_element_post_message (GST_ELEMENT (demux),
977 gst_message_new_duration_changed (GST_OBJECT (demux)));
978 } else {
979 GST_DEBUG_OBJECT (demux,
980 "media duration unknown, can not send the duration message");
981 }
982 }
983
984 if (demux->next_streams) {
985 gst_adaptive_demux_prepare_streams (demux,
986 gst_adaptive_demux_is_live (demux));
987 gst_adaptive_demux_start_tasks (demux, TRUE);
988 gst_adaptive_demux_start_manifest_update_task (demux);
989 } else {
990 /* no streams */
991 GST_WARNING_OBJECT (demux, "No streams created from manifest");
992 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
993 (_("This file contains no playable streams.")),
994 ("No known stream formats found at the Manifest"));
995 ret = FALSE;
996 }
997
998 }
999 GST_MANIFEST_UNLOCK (demux);
1000 GST_API_UNLOCK (demux);
1001
1002 gst_event_unref (event);
1003 return ret;
1004 }
1005 case GST_EVENT_SEGMENT:
1006 /* Swallow newsegments, we'll push our own */
1007 gst_event_unref (event);
1008 return TRUE;
1009 default:
1010 break;
1011 }
1012
1013 return gst_pad_event_default (pad, parent, event);
1014 }
1015
1016 static GstFlowReturn
gst_adaptive_demux_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)1017 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1018 GstBuffer * buffer)
1019 {
1020 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1021
1022 GST_MANIFEST_LOCK (demux);
1023
1024 gst_adapter_push (demux->priv->input_adapter, buffer);
1025
1026 GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1027 (gint) gst_adapter_available (demux->priv->input_adapter));
1028
1029 GST_MANIFEST_UNLOCK (demux);
1030 return GST_FLOW_OK;
1031 }
1032
1033 /* must be called with manifest_lock taken */
1034 static void
gst_adaptive_demux_reset(GstAdaptiveDemux * demux)1035 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1036 {
1037 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1038 GList *iter;
1039 GList *old_streams;
1040 GstEvent *eos;
1041
1042 /* take ownership of old_streams before releasing the manifest_lock in
1043 * gst_adaptive_demux_stop_tasks
1044 */
1045 old_streams = demux->priv->old_streams;
1046 demux->priv->old_streams = NULL;
1047
1048 gst_adaptive_demux_stop_tasks (demux, TRUE);
1049
1050 if (klass->reset)
1051 klass->reset (demux);
1052
1053 eos = gst_event_new_eos ();
1054 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1055 GstAdaptiveDemuxStream *stream = iter->data;
1056 if (stream->pad) {
1057 gst_pad_push_event (stream->pad, gst_event_ref (eos));
1058 gst_pad_set_active (stream->pad, FALSE);
1059
1060 gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
1061 }
1062 gst_adaptive_demux_stream_free (stream);
1063 }
1064 gst_event_unref (eos);
1065 g_list_free (demux->streams);
1066 demux->streams = NULL;
1067 if (demux->prepared_streams) {
1068 g_list_free_full (demux->prepared_streams,
1069 (GDestroyNotify) gst_adaptive_demux_stream_free);
1070 demux->prepared_streams = NULL;
1071 }
1072 if (demux->next_streams) {
1073 g_list_free_full (demux->next_streams,
1074 (GDestroyNotify) gst_adaptive_demux_stream_free);
1075 demux->next_streams = NULL;
1076 }
1077
1078 if (old_streams) {
1079 g_list_free_full (old_streams,
1080 (GDestroyNotify) gst_adaptive_demux_stream_free);
1081 }
1082
1083 if (demux->priv->old_streams) {
1084 g_list_free_full (demux->priv->old_streams,
1085 (GDestroyNotify) gst_adaptive_demux_stream_free);
1086 demux->priv->old_streams = NULL;
1087 }
1088
1089 g_free (demux->manifest_uri);
1090 g_free (demux->manifest_base_uri);
1091 demux->manifest_uri = NULL;
1092 demux->manifest_base_uri = NULL;
1093
1094 gst_adapter_clear (demux->priv->input_adapter);
1095 g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1096
1097 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1098
1099 demux->have_group_id = FALSE;
1100 demux->group_id = G_MAXUINT;
1101 demux->priv->segment_seqnum = gst_util_seqnum_next ();
1102 }
1103
1104 static void
gst_adaptive_demux_handle_message(GstBin * bin,GstMessage * msg)1105 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1106 {
1107 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1108
1109 switch (GST_MESSAGE_TYPE (msg)) {
1110 case GST_MESSAGE_ERROR:{
1111 GList *iter;
1112 GstAdaptiveDemuxStream *stream = NULL;
1113 GError *err = NULL;
1114 gchar *debug = NULL;
1115 gchar *new_error = NULL;
1116 const GstStructure *details = NULL;
1117
1118 GST_MANIFEST_LOCK (demux);
1119
1120 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1121 GstAdaptiveDemuxStream *cur = iter->data;
1122 if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
1123 GST_OBJECT_CAST (cur->src))) {
1124 stream = cur;
1125 break;
1126 }
1127 }
1128 if (stream == NULL) {
1129 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1130 GstAdaptiveDemuxStream *cur = iter->data;
1131 if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
1132 GST_OBJECT_CAST (cur->src))) {
1133 stream = cur;
1134 break;
1135 }
1136 }
1137 if (stream == NULL) {
1138 GST_WARNING_OBJECT (demux,
1139 "Failed to locate stream for errored element");
1140 break;
1141 }
1142 }
1143
1144 gst_message_parse_error (msg, &err, &debug);
1145
1146 GST_WARNING_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
1147 "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1148 err->message, debug);
1149
1150 if (debug)
1151 new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1152 if (new_error) {
1153 g_free (err->message);
1154 err->message = new_error;
1155 }
1156
1157 gst_message_parse_error_details (msg, &details);
1158 if (details) {
1159 gst_structure_get_uint (details, "http-status-code",
1160 &stream->last_status_code);
1161 }
1162
1163 /* error, but ask to retry */
1164 gst_adaptive_demux_stream_fragment_download_finish (stream,
1165 GST_FLOW_CUSTOM_ERROR, err);
1166
1167 g_error_free (err);
1168 g_free (debug);
1169
1170 GST_MANIFEST_UNLOCK (demux);
1171
1172 gst_message_unref (msg);
1173 msg = NULL;
1174 }
1175 break;
1176 default:
1177 break;
1178 }
1179
1180 if (msg)
1181 GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1182 }
1183
1184 void
gst_adaptive_demux_set_stream_struct_size(GstAdaptiveDemux * demux,gsize struct_size)1185 gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
1186 gsize struct_size)
1187 {
1188 GST_API_LOCK (demux);
1189 GST_MANIFEST_LOCK (demux);
1190 demux->stream_struct_size = struct_size;
1191 GST_MANIFEST_UNLOCK (demux);
1192 GST_API_UNLOCK (demux);
1193 }
1194
1195 /* must be called with manifest_lock taken */
1196 static gboolean
gst_adaptive_demux_prepare_stream(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1197 gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux,
1198 GstAdaptiveDemuxStream * stream)
1199 {
1200 GstPad *pad = stream->pad;
1201 gchar *name = gst_pad_get_name (pad);
1202 GstEvent *event;
1203 gchar *stream_id;
1204
1205 gst_pad_set_active (pad, TRUE);
1206 stream->need_header = TRUE;
1207
1208 stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT_CAST (demux), name);
1209
1210 event =
1211 gst_pad_get_sticky_event (GST_ADAPTIVE_DEMUX_SINK_PAD (demux),
1212 GST_EVENT_STREAM_START, 0);
1213 if (event) {
1214 if (gst_event_parse_group_id (event, &demux->group_id))
1215 demux->have_group_id = TRUE;
1216 else
1217 demux->have_group_id = FALSE;
1218 gst_event_unref (event);
1219 } else if (!demux->have_group_id) {
1220 demux->have_group_id = TRUE;
1221 demux->group_id = gst_util_group_id_next ();
1222 }
1223 event = gst_event_new_stream_start (stream_id);
1224 if (demux->have_group_id)
1225 gst_event_set_group_id (event, demux->group_id);
1226
1227 gst_pad_push_event (pad, event);
1228 g_free (stream_id);
1229 g_free (name);
1230
1231 GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
1232
1233 stream->discont = TRUE;
1234
1235 return TRUE;
1236 }
1237
1238 static gboolean
gst_adaptive_demux_expose_stream(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1239 gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
1240 GstAdaptiveDemuxStream * stream)
1241 {
1242 gboolean ret;
1243 GstPad *pad = stream->pad;
1244 GstCaps *caps;
1245
1246 if (stream->pending_caps) {
1247 gst_pad_set_caps (pad, stream->pending_caps);
1248 caps = stream->pending_caps;
1249 stream->pending_caps = NULL;
1250 } else {
1251 caps = gst_pad_get_current_caps (pad);
1252 }
1253
1254 GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT,
1255 GST_DEBUG_PAD_NAME (pad), caps);
1256 if (caps)
1257 gst_caps_unref (caps);
1258
1259 gst_object_ref (pad);
1260
1261 /* Don't hold the manifest lock while exposing a pad */
1262 GST_MANIFEST_UNLOCK (demux);
1263 ret = gst_element_add_pad (GST_ELEMENT_CAST (demux), pad);
1264 GST_MANIFEST_LOCK (demux);
1265
1266 return ret;
1267 }
1268
1269 /* must be called with manifest_lock taken */
1270 static GstClockTime
gst_adaptive_demux_stream_get_presentation_offset(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1271 gst_adaptive_demux_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1272 GstAdaptiveDemuxStream * stream)
1273 {
1274 GstAdaptiveDemuxClass *klass;
1275
1276 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1277
1278 if (klass->get_presentation_offset == NULL)
1279 return 0;
1280
1281 return klass->get_presentation_offset (demux, stream);
1282 }
1283
1284 /* must be called with manifest_lock taken */
1285 static GstClockTime
gst_adaptive_demux_get_period_start_time(GstAdaptiveDemux * demux)1286 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1287 {
1288 GstAdaptiveDemuxClass *klass;
1289
1290 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1291
1292 if (klass->get_period_start_time == NULL)
1293 return 0;
1294
1295 return klass->get_period_start_time (demux);
1296 }
1297
1298 /* must be called with manifest_lock taken */
1299 static gboolean
gst_adaptive_demux_prepare_streams(GstAdaptiveDemux * demux,gboolean first_and_live)1300 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1301 gboolean first_and_live)
1302 {
1303 GList *iter;
1304 GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
1305
1306 g_return_val_if_fail (demux->next_streams != NULL, FALSE);
1307 if (demux->prepared_streams != NULL) {
1308 /* Old streams that were never exposed, due to a seek or so */
1309 GST_FIXME_OBJECT (demux,
1310 "Preparing new streams without cleaning up old ones!");
1311 return FALSE;
1312 }
1313
1314 demux->prepared_streams = demux->next_streams;
1315 demux->next_streams = NULL;
1316
1317 if (!gst_adaptive_demux_is_running (demux)) {
1318 GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1319 return TRUE;
1320 }
1321
1322 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1323 GstAdaptiveDemuxStream *stream = iter->data;
1324
1325 stream->do_block = TRUE;
1326
1327 if (!gst_adaptive_demux_prepare_stream (demux,
1328 GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1329 /* TODO act on error */
1330 GST_FIXME_OBJECT (stream->pad,
1331 "Do something on failure to expose stream");
1332 }
1333
1334 if (first_and_live) {
1335 /* TODO we only need the first timestamp, maybe create a simple function to
1336 * get the current PTS of a fragment ? */
1337 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1338 gst_adaptive_demux_stream_update_fragment_info (demux, stream);
1339
1340 if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
1341 min_pts = MIN (min_pts, stream->fragment.timestamp);
1342 } else {
1343 min_pts = stream->fragment.timestamp;
1344 }
1345 }
1346 }
1347
1348 period_start = gst_adaptive_demux_get_period_start_time (demux);
1349
1350 /* For live streams, the subclass is supposed to seek to the current
1351 * fragment and then tell us its timestamp in stream->fragment.timestamp.
1352 * We now also have to seek our demuxer segment to reflect this.
1353 *
1354 * FIXME: This needs some refactoring at some point.
1355 */
1356 if (first_and_live) {
1357 gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1358 GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_pts + period_start,
1359 GST_SEEK_TYPE_NONE, -1, NULL);
1360 }
1361
1362 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1363 GstAdaptiveDemuxStream *stream = iter->data;
1364 GstClockTime offset;
1365
1366 offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1367 stream->segment = demux->segment;
1368
1369 /* The demuxer segment is just built from seek events, but for each stream
1370 * we have to adjust segments according to the current period and the
1371 * stream specific presentation time offset.
1372 *
1373 * For each period, buffer timestamps start again from 0. Additionally the
1374 * buffer timestamps are shifted by the stream specific presentation time
1375 * offset, so the first buffer timestamp of a period is 0 + presentation
1376 * time offset. If the stream contains timestamps itself, this is also
1377 * supposed to be the presentation time stored inside the stream.
1378 *
1379 * The stream time over periods is supposed to be continuous, that is the
1380 * buffer timestamp 0 + presentation time offset should map to the start
1381 * time of the current period.
1382 *
1383 *
1384 * The adjustment of the stream segments as such works the following.
1385 *
1386 * If the demuxer segment start is bigger than the period start, this
1387 * means that we have to drop some media at the beginning of the current
1388 * period, e.g. because a seek into the middle of the period has
1389 * happened. The amount of media to drop is the difference between the
1390 * period start and the demuxer segment start, and as each period starts
1391 * again from 0, this difference is going to be the actual stream's
1392 * segment start. As all timestamps of the stream are shifted by the
1393 * presentation time offset, we will also have to move the segment start
1394 * by that offset.
1395 *
1396 * Likewise, the demuxer segment stop value is adjusted in the same
1397 * fashion.
1398 *
1399 * Now the running time and stream time at the stream's segment start has
1400 * to be the one that is stored inside the demuxer's segment, which means
1401 * that segment.base and segment.time have to be copied over (done just
1402 * above)
1403 *
1404 *
1405 * If the demuxer segment start is smaller than the period start time,
1406 * this means that the whole period is inside the segment. As each period
1407 * starts timestamps from 0, and additionally timestamps are shifted by
1408 * the presentation time offset, the stream's first timestamp (and as such
1409 * the stream's segment start) has to be the presentation time offset.
1410 * The stream time at the segment start is supposed to be the stream time
1411 * of the period start according to the demuxer segment, so the stream
1412 * segment's time would be set to that. The same goes for the stream
1413 * segment's base, which is supposed to be the running time of the period
1414 * start according to the demuxer's segment.
1415 *
1416 * The same logic applies for negative rates with the segment stop and
1417 * the period stop time (which gets clamped).
1418 *
1419 *
1420 * For the first case where not the complete period is inside the segment,
1421 * the segment time and base as calculated by the second case would be
1422 * equivalent.
1423 */
1424 GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
1425 &demux->segment);
1426 GST_DEBUG_OBJECT (demux,
1427 "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
1428 GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
1429 /* note for readers:
1430 * Since stream->segment is initially a copy of demux->segment,
1431 * only the values that need updating are modified below. */
1432 if (first_and_live) {
1433 /* If first and live, demuxer did seek to the current position already */
1434 stream->segment.start = demux->segment.start - period_start + offset;
1435 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1436 stream->segment.stop = demux->segment.stop - period_start + offset;
1437 /* FIXME : Do we need to handle negative rates for this ? */
1438 stream->segment.position = stream->segment.start;
1439 } else if (demux->segment.start > period_start) {
1440 /* seek within a period */
1441 stream->segment.start = demux->segment.start - period_start + offset;
1442 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1443 stream->segment.stop = demux->segment.stop - period_start + offset;
1444 if (stream->segment.rate >= 0)
1445 stream->segment.position = offset;
1446 else
1447 stream->segment.position = stream->segment.stop;
1448 } else {
1449 stream->segment.start = offset;
1450 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1451 stream->segment.stop = demux->segment.stop - period_start + offset;
1452 if (stream->segment.rate >= 0)
1453 stream->segment.position = offset;
1454 else
1455 stream->segment.position = stream->segment.stop;
1456 stream->segment.time =
1457 gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
1458 period_start);
1459 stream->segment.base =
1460 gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
1461 period_start);
1462 }
1463
1464 stream->pending_segment = gst_event_new_segment (&stream->segment);
1465 gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
1466
1467 GST_DEBUG_OBJECT (demux,
1468 "Prepared segment %" GST_SEGMENT_FORMAT " for stream %p",
1469 &stream->segment, stream);
1470 }
1471 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1472
1473 return TRUE;
1474 }
1475
1476 static gboolean
gst_adaptive_demux_expose_streams(GstAdaptiveDemux * demux)1477 gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux)
1478 {
1479 GList *iter;
1480 GList *old_streams;
1481
1482 g_return_val_if_fail (demux->prepared_streams != NULL, FALSE);
1483
1484 old_streams = demux->streams;
1485 demux->streams = demux->prepared_streams;
1486 demux->prepared_streams = NULL;
1487
1488 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1489 GstAdaptiveDemuxStream *stream = iter->data;
1490
1491 if (!gst_adaptive_demux_expose_stream (demux,
1492 GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1493 /* TODO act on error */
1494 }
1495 }
1496 demux->priv->preroll_pending = 0;
1497
1498 GST_MANIFEST_UNLOCK (demux);
1499 gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
1500 GST_MANIFEST_LOCK (demux);
1501
1502 if (old_streams) {
1503 GstEvent *eos = gst_event_new_eos ();
1504
1505 /* before we put streams in the demux->priv->old_streams list,
1506 * we ask the download task to stop. In this way, it will no longer be
1507 * allowed to change the demux object.
1508 */
1509 for (iter = old_streams; iter; iter = g_list_next (iter)) {
1510 GstAdaptiveDemuxStream *stream = iter->data;
1511 GstPad *pad = gst_object_ref (GST_PAD (stream->pad));
1512
1513 GST_MANIFEST_UNLOCK (demux);
1514
1515 GST_DEBUG_OBJECT (pad, "Pushing EOS");
1516 gst_pad_push_event (pad, gst_event_ref (eos));
1517 gst_pad_set_active (pad, FALSE);
1518
1519 GST_LOG_OBJECT (pad, "Removing stream");
1520 gst_element_remove_pad (GST_ELEMENT (demux), pad);
1521 GST_MANIFEST_LOCK (demux);
1522
1523 gst_object_unref (GST_OBJECT (pad));
1524
1525 /* ask the download task to stop.
1526 * We will not join it now, because our thread can be one of these tasks.
1527 * We will do the joining later, from another stream download task or
1528 * from gst_adaptive_demux_stop_tasks.
1529 * We also cannot change the state of the stream->src element, because
1530 * that will wait on the streaming thread (which could be this thread)
1531 * to stop first.
1532 * Because we sent an EOS to the downstream element, the stream->src
1533 * element should detect this in its streaming task and stop.
1534 * Even if it doesn't do that, we will change its state later in
1535 * gst_adaptive_demux_stop_tasks.
1536 */
1537 GST_LOG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
1538 "Marking stream as cancelled");
1539 gst_task_stop (stream->download_task);
1540 g_mutex_lock (&stream->fragment_download_lock);
1541 stream->cancelled = TRUE;
1542 stream->replaced = TRUE;
1543 g_cond_signal (&stream->fragment_download_cond);
1544 g_mutex_unlock (&stream->fragment_download_lock);
1545 }
1546 gst_event_unref (eos);
1547
1548 /* The list should be freed from another thread as we can't properly
1549 * cleanup a GstTask from itself */
1550 demux->priv->old_streams =
1551 g_list_concat (demux->priv->old_streams, old_streams);
1552 }
1553
1554 /* Unblock after removing oldstreams */
1555 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1556 GstAdaptiveDemuxStream *stream = iter->data;
1557 stream->do_block = FALSE;
1558 }
1559
1560 GST_DEBUG_OBJECT (demux, "All streams are exposed");
1561
1562 return TRUE;
1563 }
1564
1565 /* must be called with manifest_lock taken */
1566 GstAdaptiveDemuxStream *
gst_adaptive_demux_stream_new(GstAdaptiveDemux * demux,GstPad * pad)1567 gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
1568 {
1569 GstAdaptiveDemuxStream *stream;
1570
1571 stream = g_malloc0 (demux->stream_struct_size);
1572
1573 /* Downloading task */
1574 g_rec_mutex_init (&stream->download_lock);
1575 stream->download_task =
1576 gst_task_new ((GstTaskFunction) gst_adaptive_demux_stream_download_loop,
1577 stream, NULL);
1578 gst_task_set_lock (stream->download_task, &stream->download_lock);
1579
1580 stream->pad = pad;
1581 stream->demux = demux;
1582 stream->fragment_bitrates =
1583 g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
1584 gst_pad_set_element_private (pad, stream);
1585 stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1586
1587 g_mutex_lock (&demux->priv->preroll_lock);
1588 stream->do_block = TRUE;
1589 demux->priv->preroll_pending++;
1590 g_mutex_unlock (&demux->priv->preroll_lock);
1591
1592 gst_pad_set_query_function (pad,
1593 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
1594 gst_pad_set_event_function (pad,
1595 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
1596
1597 gst_segment_init (&stream->segment, GST_FORMAT_TIME);
1598 g_cond_init (&stream->fragment_download_cond);
1599 g_mutex_init (&stream->fragment_download_lock);
1600
1601 demux->next_streams = g_list_append (demux->next_streams, stream);
1602
1603 return stream;
1604 }
1605
1606 GstAdaptiveDemuxStream *
gst_adaptive_demux_find_stream_for_pad(GstAdaptiveDemux * demux,GstPad * pad)1607 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1608 {
1609 GList *iter;
1610
1611 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1612 GstAdaptiveDemuxStream *stream = iter->data;
1613 if (stream->pad == pad) {
1614 return stream;
1615 }
1616 }
1617
1618 return NULL;
1619 }
1620
1621 /* must be called with manifest_lock taken.
1622 * It will temporarily drop the manifest_lock in order to join the task.
1623 * It will join only the old_streams (the demux->streams are joined by
1624 * gst_adaptive_demux_stop_tasks before gst_adaptive_demux_stream_free is
1625 * called)
1626 */
1627 static void
gst_adaptive_demux_stream_free(GstAdaptiveDemuxStream * stream)1628 gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
1629 {
1630 GstAdaptiveDemux *demux = stream->demux;
1631 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1632
1633 if (klass->stream_free)
1634 klass->stream_free (stream);
1635
1636 g_clear_error (&stream->last_error);
1637 if (stream->download_task) {
1638 if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
1639 GST_DEBUG_OBJECT (demux, "Leaving streaming task %s:%s",
1640 GST_DEBUG_PAD_NAME (stream->pad));
1641
1642 gst_task_stop (stream->download_task);
1643
1644 g_mutex_lock (&stream->fragment_download_lock);
1645 stream->cancelled = TRUE;
1646 g_cond_signal (&stream->fragment_download_cond);
1647 g_mutex_unlock (&stream->fragment_download_lock);
1648 }
1649 GST_LOG_OBJECT (demux, "Waiting for task to finish");
1650
1651 /* temporarily drop the manifest lock to join the task */
1652 GST_MANIFEST_UNLOCK (demux);
1653
1654 gst_task_join (stream->download_task);
1655
1656 GST_MANIFEST_LOCK (demux);
1657
1658 GST_LOG_OBJECT (demux, "Finished");
1659 gst_object_unref (stream->download_task);
1660 g_rec_mutex_clear (&stream->download_lock);
1661 stream->download_task = NULL;
1662 }
1663
1664 gst_adaptive_demux_stream_fragment_clear (&stream->fragment);
1665
1666 if (stream->pending_segment) {
1667 gst_event_unref (stream->pending_segment);
1668 stream->pending_segment = NULL;
1669 }
1670
1671 if (stream->pending_events) {
1672 g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
1673 stream->pending_events = NULL;
1674 }
1675
1676 if (stream->internal_pad) {
1677 gst_object_unparent (GST_OBJECT_CAST (stream->internal_pad));
1678 }
1679
1680 if (stream->src_srcpad) {
1681 gst_object_unref (stream->src_srcpad);
1682 stream->src_srcpad = NULL;
1683 }
1684
1685 if (stream->src) {
1686 GstElement *src = stream->src;
1687
1688 stream->src = NULL;
1689
1690 GST_MANIFEST_UNLOCK (demux);
1691 gst_element_set_locked_state (src, TRUE);
1692 gst_element_set_state (src, GST_STATE_NULL);
1693 gst_bin_remove (GST_BIN_CAST (demux), src);
1694 GST_MANIFEST_LOCK (demux);
1695 }
1696
1697 g_cond_clear (&stream->fragment_download_cond);
1698 g_mutex_clear (&stream->fragment_download_lock);
1699 g_free (stream->fragment_bitrates);
1700
1701 if (stream->pad) {
1702 gst_object_unref (stream->pad);
1703 stream->pad = NULL;
1704 }
1705 if (stream->pending_caps)
1706 gst_caps_unref (stream->pending_caps);
1707
1708 g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
1709
1710 g_free (stream);
1711 }
1712
1713 /* must be called with manifest_lock taken */
1714 static gboolean
gst_adaptive_demux_get_live_seek_range(GstAdaptiveDemux * demux,gint64 * range_start,gint64 * range_stop)1715 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1716 gint64 * range_start, gint64 * range_stop)
1717 {
1718 GstAdaptiveDemuxClass *klass;
1719
1720 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1721
1722 g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1723
1724 return klass->get_live_seek_range (demux, range_start, range_stop);
1725 }
1726
1727 /* must be called with manifest_lock taken */
1728 static gboolean
gst_adaptive_demux_stream_in_live_seek_range(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1729 gst_adaptive_demux_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1730 GstAdaptiveDemuxStream * stream)
1731 {
1732 gint64 range_start, range_stop;
1733 if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1734 GST_LOG_OBJECT (stream->pad,
1735 "stream position %" GST_TIME_FORMAT " live seek range %"
1736 GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1737 GST_TIME_ARGS (stream->segment.position), GST_STIME_ARGS (range_start),
1738 GST_STIME_ARGS (range_stop));
1739 return (stream->segment.position >= range_start
1740 && stream->segment.position <= range_stop);
1741 }
1742
1743 return FALSE;
1744 }
1745
1746 /* must be called with manifest_lock taken */
1747 static gboolean
gst_adaptive_demux_can_seek(GstAdaptiveDemux * demux)1748 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1749 {
1750 GstAdaptiveDemuxClass *klass;
1751
1752 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1753 if (gst_adaptive_demux_is_live (demux)) {
1754 return klass->get_live_seek_range != NULL;
1755 }
1756
1757 return klass->seek != NULL;
1758 }
1759
1760 static void
gst_adaptive_demux_update_streams_segment(GstAdaptiveDemux * demux,GList * streams,gint64 period_start,GstSeekType start_type,GstSeekType stop_type)1761 gst_adaptive_demux_update_streams_segment (GstAdaptiveDemux * demux,
1762 GList * streams, gint64 period_start, GstSeekType start_type,
1763 GstSeekType stop_type)
1764 {
1765 GList *iter;
1766 for (iter = streams; iter; iter = g_list_next (iter)) {
1767 GstAdaptiveDemuxStream *stream = iter->data;
1768 GstEvent *seg_evt;
1769 GstClockTime offset;
1770
1771 /* See comments in gst_adaptive_demux_get_period_start_time() for
1772 * an explanation of the segment modifications */
1773 stream->segment = demux->segment;
1774 offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1775 stream->segment.start += offset - period_start;
1776 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1777 stream->segment.stop += offset - period_start;
1778 if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1779 stream->segment.position = stream->segment.start;
1780 else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1781 stream->segment.position = stream->segment.stop;
1782 seg_evt = gst_event_new_segment (&stream->segment);
1783 gst_event_set_seqnum (seg_evt, demux->priv->segment_seqnum);
1784 gst_event_replace (&stream->pending_segment, seg_evt);
1785 GST_DEBUG_OBJECT (stream->pad, "Pending segment now %" GST_PTR_FORMAT,
1786 stream->pending_segment);
1787 gst_event_unref (seg_evt);
1788 /* Make sure the first buffer after a seek has the discont flag */
1789 stream->discont = TRUE;
1790 }
1791 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1792 }
1793
1794 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE | \
1795 GST_SEEK_FLAG_SNAP_AFTER | \
1796 GST_SEEK_FLAG_SNAP_NEAREST | \
1797 GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1798 GST_SEEK_FLAG_KEY_UNIT))
1799 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1800 GST_SEEK_FLAG_SNAP_AFTER | \
1801 GST_SEEK_FLAG_SNAP_NEAREST))
1802
1803 static gboolean
gst_adaptive_demux_handle_seek_event(GstAdaptiveDemux * demux,GstPad * pad,GstEvent * event)1804 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
1805 GstEvent * event)
1806 {
1807 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1808 gdouble rate;
1809 GstFormat format;
1810 GstSeekFlags flags;
1811 GstSeekType start_type, stop_type;
1812 gint64 start, stop;
1813 guint32 seqnum;
1814 gboolean update;
1815 gboolean ret;
1816 GstSegment oldsegment;
1817 GstAdaptiveDemuxStream *stream = NULL;
1818
1819 GST_INFO_OBJECT (demux, "Received seek event");
1820
1821 GST_API_LOCK (demux);
1822 GST_MANIFEST_LOCK (demux);
1823
1824 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1825 &stop_type, &stop);
1826
1827 if (format != GST_FORMAT_TIME) {
1828 GST_MANIFEST_UNLOCK (demux);
1829 GST_API_UNLOCK (demux);
1830 GST_WARNING_OBJECT (demux,
1831 "Adaptive demuxers only support TIME-based seeking");
1832 gst_event_unref (event);
1833 return FALSE;
1834 }
1835
1836 if (flags & GST_SEEK_FLAG_SEGMENT) {
1837 GST_FIXME_OBJECT (demux, "Handle segment seeks");
1838 GST_MANIFEST_UNLOCK (demux);
1839 GST_API_UNLOCK (demux);
1840 gst_event_unref (event);
1841 return FALSE;
1842 }
1843
1844 seqnum = gst_event_get_seqnum (event);
1845
1846 if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
1847 /* For instant rate seeks, reply directly and update
1848 * our segment so the new rate is reflected in any future
1849 * fragments */
1850 GstEvent *ev;
1851
1852 /* instant rate change only supported if direction does not change. All
1853 * other requirements are already checked before creating the seek event
1854 * but let's double-check here to be sure */
1855 if ((demux->segment.rate > 0 && rate < 0) ||
1856 (demux->segment.rate < 0 && rate > 0) ||
1857 start_type != GST_SEEK_TYPE_NONE ||
1858 stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
1859 GST_ERROR_OBJECT (demux,
1860 "Instant rate change seeks only supported in the "
1861 "same direction, without flushing and position change");
1862 GST_MANIFEST_UNLOCK (demux);
1863 GST_API_UNLOCK (demux);
1864 return FALSE;
1865 }
1866
1867 ev = gst_event_new_instant_rate_change (rate / demux->segment.rate,
1868 (GstSegmentFlags) flags);
1869 gst_event_set_seqnum (ev, seqnum);
1870
1871 GST_MANIFEST_UNLOCK (demux);
1872
1873 ret = gst_adaptive_demux_push_src_event (demux, ev);
1874
1875 GST_API_UNLOCK (demux);
1876 gst_event_unref (event);
1877
1878 return ret;
1879 }
1880
1881 if (!gst_adaptive_demux_can_seek (demux)) {
1882 GST_MANIFEST_UNLOCK (demux);
1883 GST_API_UNLOCK (demux);
1884 gst_event_unref (event);
1885 return FALSE;
1886 }
1887
1888 if (gst_adaptive_demux_is_live (demux)) {
1889 gint64 range_start, range_stop;
1890 gboolean changed = FALSE;
1891 gboolean start_valid = TRUE, stop_valid = TRUE;
1892
1893 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1894 &range_stop)) {
1895 GST_MANIFEST_UNLOCK (demux);
1896 GST_API_UNLOCK (demux);
1897 gst_event_unref (event);
1898 GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
1899 return FALSE;
1900 }
1901
1902 GST_DEBUG_OBJECT (demux,
1903 "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
1904 GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
1905
1906 /* Handle relative positioning for live streams (relative to the range_stop) */
1907 if (start_type == GST_SEEK_TYPE_END) {
1908 start = range_stop + start;
1909 start_type = GST_SEEK_TYPE_SET;
1910 changed = TRUE;
1911 }
1912 if (stop_type == GST_SEEK_TYPE_END) {
1913 stop = range_stop + stop;
1914 stop_type = GST_SEEK_TYPE_SET;
1915 changed = TRUE;
1916 }
1917
1918 /* Adjust the requested start/stop position if it falls beyond the live
1919 * seek range.
1920 * The only case where we don't adjust is for the starting point of
1921 * an accurate seek (start if forward and stop if backwards)
1922 */
1923 if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
1924 (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1925 GST_DEBUG_OBJECT (demux,
1926 "seek before live stream start, setting to range start: %"
1927 GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
1928 start = range_start;
1929 changed = TRUE;
1930 }
1931 /* truncate stop position also if set */
1932 if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
1933 (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1934 GST_DEBUG_OBJECT (demux,
1935 "seek ending after live start, adjusting to: %"
1936 GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
1937 stop = range_stop;
1938 changed = TRUE;
1939 }
1940
1941 if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
1942 (start < range_start || start > range_stop)) {
1943 GST_WARNING_OBJECT (demux,
1944 "Seek to invalid position start:%" GST_STIME_FORMAT
1945 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1946 ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
1947 GST_STIME_ARGS (range_stop));
1948 start_valid = FALSE;
1949 }
1950 if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
1951 (stop < range_start || stop > range_stop)) {
1952 GST_WARNING_OBJECT (demux,
1953 "Seek to invalid position stop:%" GST_STIME_FORMAT
1954 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1955 ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
1956 GST_STIME_ARGS (range_stop));
1957 stop_valid = FALSE;
1958 }
1959
1960 /* If the seek position is still outside of the seekable range, refuse the seek */
1961 if (!start_valid || !stop_valid) {
1962 GST_MANIFEST_UNLOCK (demux);
1963 GST_API_UNLOCK (demux);
1964 gst_event_unref (event);
1965 return FALSE;
1966 }
1967
1968 /* Re-create seek event with changed/updated values */
1969 if (changed) {
1970 gst_event_unref (event);
1971 event =
1972 gst_event_new_seek (rate, format, flags,
1973 start_type, start, stop_type, stop);
1974 gst_event_set_seqnum (event, seqnum);
1975 }
1976 }
1977
1978 GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
1979
1980 /* have a backup in case seek fails */
1981 gst_segment_copy_into (&demux->segment, &oldsegment);
1982
1983 if (flags & GST_SEEK_FLAG_FLUSH) {
1984 GstEvent *fevent;
1985
1986 GST_DEBUG_OBJECT (demux, "sending flush start");
1987 fevent = gst_event_new_flush_start ();
1988 gst_event_set_seqnum (fevent, seqnum);
1989 GST_MANIFEST_UNLOCK (demux);
1990 gst_adaptive_demux_push_src_event (demux, fevent);
1991 GST_MANIFEST_LOCK (demux);
1992
1993 gst_adaptive_demux_stop_tasks (demux, FALSE);
1994 } else if ((rate > 0 && start_type != GST_SEEK_TYPE_NONE) ||
1995 (rate < 0 && stop_type != GST_SEEK_TYPE_NONE)) {
1996
1997 gst_adaptive_demux_stop_tasks (demux, FALSE);
1998 }
1999
2000 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2001
2002 /*
2003 * Handle snap seeks as follows:
2004 * 1) do the snap seeking on the stream that received
2005 * the event
2006 * 2) use the final position on this stream to seek
2007 * on the other streams to the same position
2008 *
2009 * We can't snap at all streams at the same time as
2010 * they might end in different positions, so just
2011 * use the one that received the event as the 'leading'
2012 * one to do the snap seek.
2013 */
2014 if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
2015 gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
2016 GstClockTime ts;
2017 GstSeekFlags stream_seek_flags = flags;
2018
2019 /* snap-seek on the stream that received the event and then
2020 * use the resulting position to seek on all streams */
2021
2022 if (rate >= 0) {
2023 if (start_type != GST_SEEK_TYPE_NONE)
2024 ts = start;
2025 else {
2026 ts = stream->segment.position;
2027 start_type = GST_SEEK_TYPE_SET;
2028 }
2029 } else {
2030 if (stop_type != GST_SEEK_TYPE_NONE)
2031 ts = stop;
2032 else {
2033 stop_type = GST_SEEK_TYPE_SET;
2034 ts = stream->segment.position;
2035 }
2036 }
2037
2038 if (stream) {
2039 demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
2040 }
2041
2042 /* replace event with a new one without snapping to seek on all streams */
2043 gst_event_unref (event);
2044 if (rate >= 0) {
2045 start = ts;
2046 } else {
2047 stop = ts;
2048 }
2049 event =
2050 gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2051 start_type, start, stop_type, stop);
2052 GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2053 }
2054 stream = NULL;
2055
2056 ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2057 start, stop_type, stop, &update);
2058
2059 if (ret) {
2060 /* FIXME - this seems unatural, do_seek() is updating base when we
2061 * only want the start/stop position to change, maybe do_seek() needs
2062 * some fixing? */
2063 if (!(flags & GST_SEEK_FLAG_FLUSH) && ((rate > 0
2064 && start_type == GST_SEEK_TYPE_NONE) || (rate < 0
2065 && stop_type == GST_SEEK_TYPE_NONE))) {
2066 demux->segment.base = oldsegment.base;
2067 }
2068
2069 GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2070
2071 ret = demux_class->seek (demux, event);
2072 }
2073
2074 if (!ret) {
2075 /* Is there anything else we can do if it fails? */
2076 gst_segment_copy_into (&oldsegment, &demux->segment);
2077 } else {
2078 demux->priv->segment_seqnum = seqnum;
2079 }
2080 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2081
2082 if (flags & GST_SEEK_FLAG_FLUSH) {
2083 GstEvent *fevent;
2084
2085 GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2086 fevent = gst_event_new_flush_stop (TRUE);
2087 gst_event_set_seqnum (fevent, seqnum);
2088 #ifdef OHOS_OPT_COMPAT
2089 /**
2090 * ohos.opt.compat.0037
2091 * Fixed deadlock when seek and reconfigure are concurrent
2092 */
2093 GST_MANIFEST_UNLOCK (demux);
2094 #endif
2095 gst_adaptive_demux_push_src_event (demux, fevent);
2096 #ifdef OHOS_OPT_COMPAT
2097 // ohos.opt.compat.0037
2098 GST_MANIFEST_LOCK (demux);
2099 #endif
2100 }
2101
2102 if (demux->next_streams) {
2103 /* If the seek generated new streams, get them
2104 * to preroll */
2105 gst_adaptive_demux_prepare_streams (demux, FALSE);
2106 gst_adaptive_demux_start_tasks (demux, TRUE);
2107 } else {
2108 GstClockTime period_start =
2109 gst_adaptive_demux_get_period_start_time (demux);
2110
2111 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2112 gst_adaptive_demux_update_streams_segment (demux, demux->streams,
2113 period_start, start_type, stop_type);
2114 gst_adaptive_demux_update_streams_segment (demux, demux->prepared_streams,
2115 period_start, start_type, stop_type);
2116 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2117
2118 #ifdef OHOS_OPT_COMPAT
2119 /* ohos.opt.compat.0028
2120 * In the variable resolution, if the changing path is still in the prepare stream state,
2121 * if seek continues to play at a certain location, the cancel of the prepare stream in will be set to true.
2122 * When switching to the main thread of the prepare stream, it is determined that when cancel is true,
2123 * no data will be pulled from the server, the prepare stream will be released first
2124 */
2125 if (demux->streams && demux->prepared_streams) {
2126 g_list_free_full (demux->prepared_streams,
2127 (GDestroyNotify) gst_adaptive_demux_stream_free);
2128 demux->prepared_streams = NULL;
2129 /* ohos.opt.compat.0043
2130 * Preroll_pending needs to reset when prepared_streams are released. The reason is
2131 * gst_adaptive_demux_stream_new() will increase the value, while after the streams were
2132 * prepared and released the value is not changed.
2133 */
2134 GST_MANIFEST_UNLOCK (demux);
2135 g_mutex_lock (&demux->priv->preroll_lock);
2136 demux->priv->preroll_pending = 0;
2137 g_mutex_unlock (&demux->priv->preroll_lock);
2138 GST_MANIFEST_LOCK (demux);
2139 }
2140 #endif
2141
2142 /* Restart the demux */
2143 gst_adaptive_demux_start_tasks (demux, FALSE);
2144 }
2145
2146 GST_MANIFEST_UNLOCK (demux);
2147 GST_API_UNLOCK (demux);
2148 gst_event_unref (event);
2149
2150 return ret;
2151 }
2152
2153 static gboolean
gst_adaptive_demux_src_event(GstPad * pad,GstObject * parent,GstEvent * event)2154 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2155 GstEvent * event)
2156 {
2157 GstAdaptiveDemux *demux;
2158
2159 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2160
2161 /* FIXME handle events received on pads that are to be removed */
2162
2163 switch (event->type) {
2164 case GST_EVENT_SEEK:
2165 {
2166 guint32 seqnum = gst_event_get_seqnum (event);
2167 if (seqnum == demux->priv->segment_seqnum) {
2168 GST_LOG_OBJECT (pad,
2169 "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2170 gst_event_unref (event);
2171 return TRUE;
2172 }
2173 return gst_adaptive_demux_handle_seek_event (demux, pad, event);
2174 }
2175 case GST_EVENT_RECONFIGURE:{
2176 GstAdaptiveDemuxStream *stream;
2177
2178 GST_MANIFEST_LOCK (demux);
2179 stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
2180
2181 if (stream) {
2182 if (!stream->cancelled && gst_adaptive_demux_is_running (demux) &&
2183 stream->last_ret == GST_FLOW_NOT_LINKED) {
2184 stream->last_ret = GST_FLOW_OK;
2185 stream->restart_download = TRUE;
2186 stream->need_header = TRUE;
2187 stream->discont = TRUE;
2188 GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
2189 gst_task_start (stream->download_task);
2190 }
2191 gst_event_unref (event);
2192 GST_MANIFEST_UNLOCK (demux);
2193 return TRUE;
2194 }
2195 GST_MANIFEST_UNLOCK (demux);
2196 }
2197 break;
2198 case GST_EVENT_LATENCY:{
2199 /* Upstream and our internal source are irrelevant
2200 * for latency, and we should not fail here to
2201 * configure the latency */
2202 gst_event_unref (event);
2203 return TRUE;
2204 }
2205 case GST_EVENT_QOS:{
2206 GstClockTimeDiff diff;
2207 GstClockTime timestamp;
2208 GstClockTime earliest_time;
2209
2210 gst_event_parse_qos (event, NULL, NULL, &diff, ×tamp);
2211 /* Only take into account lateness if late */
2212 if (diff > 0)
2213 earliest_time = timestamp + 2 * diff;
2214 else
2215 earliest_time = timestamp;
2216
2217 GST_OBJECT_LOCK (demux);
2218 if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2219 earliest_time > demux->priv->qos_earliest_time) {
2220 demux->priv->qos_earliest_time = earliest_time;
2221 GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2222 GST_TIME_ARGS (demux->priv->qos_earliest_time));
2223 }
2224 GST_OBJECT_UNLOCK (demux);
2225 break;
2226 }
2227 default:
2228 break;
2229 }
2230
2231 return gst_pad_event_default (pad, parent, event);
2232 }
2233
2234 static gboolean
gst_adaptive_demux_src_query(GstPad * pad,GstObject * parent,GstQuery * query)2235 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2236 GstQuery * query)
2237 {
2238 GstAdaptiveDemux *demux;
2239 GstAdaptiveDemuxClass *demux_class;
2240 gboolean ret = FALSE;
2241
2242 if (query == NULL)
2243 return FALSE;
2244
2245 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2246 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2247
2248 switch (query->type) {
2249 case GST_QUERY_DURATION:{
2250 GstClockTime duration = -1;
2251 GstFormat fmt;
2252
2253 gst_query_parse_duration (query, &fmt, NULL);
2254 #ifdef OHOS_OPT_COMPAT
2255 /**
2256 * ohos.opt.compat.0040 Fixed query duration failed.
2257 * gst_hls_demux_process_manifest() function hold a manifest lock to update playlist.
2258 * During this period, the value obtained by judgment(gst_adaptive_demux_is_live() function) is wrong.
2259 * Thus, use a manifest lock to block until the attribute endlist of m3u8 is updated.
2260 */
2261 GST_MANIFEST_LOCK (demux);
2262 #endif
2263 if (gst_adaptive_demux_is_live (demux)) {
2264 /* We are able to answer this query: the duration is unknown */
2265 gst_query_set_duration (query, fmt, -1);
2266 ret = TRUE;
2267 #ifdef OHOS_OPT_COMPAT
2268 // ohos.opt.compat.0040
2269 GST_MANIFEST_UNLOCK (demux);
2270 #endif
2271 break;
2272 }
2273 #ifdef OHOS_OPT_COMPAT
2274 // ohos.opt.compat.0040
2275 GST_MANIFEST_UNLOCK (demux);
2276 #endif
2277
2278 if (fmt == GST_FORMAT_TIME
2279 && g_atomic_int_get (&demux->priv->have_manifest)) {
2280 duration = demux_class->get_duration (demux);
2281
2282 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2283 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2284 ret = TRUE;
2285 }
2286 }
2287
2288 GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2289 GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2290 break;
2291 }
2292 case GST_QUERY_LATENCY:{
2293 gst_query_set_latency (query, FALSE, 0, -1);
2294 ret = TRUE;
2295 break;
2296 }
2297 case GST_QUERY_SEEKING:{
2298 GstFormat fmt;
2299 gint64 stop = -1;
2300 gint64 start = 0;
2301
2302 if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2303 GST_INFO_OBJECT (demux,
2304 "Don't have manifest yet, can't answer seeking query");
2305 return FALSE; /* can't answer without manifest */
2306 }
2307
2308 GST_MANIFEST_LOCK (demux);
2309
2310 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2311 GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2312 if (fmt == GST_FORMAT_TIME) {
2313 GstClockTime duration;
2314 gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2315
2316 ret = TRUE;
2317 if (can_seek) {
2318 if (gst_adaptive_demux_is_live (demux)) {
2319 ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2320 if (!ret) {
2321 GST_MANIFEST_UNLOCK (demux);
2322 GST_INFO_OBJECT (demux, "can't answer seeking query");
2323 return FALSE;
2324 }
2325 } else {
2326 duration = demux_class->get_duration (demux);
2327 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2328 stop = duration;
2329 }
2330 }
2331 gst_query_set_seeking (query, fmt, can_seek, start, stop);
2332 GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2333 GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2334 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2335 }
2336 GST_MANIFEST_UNLOCK (demux);
2337 break;
2338 }
2339 case GST_QUERY_URI:
2340
2341 GST_MANIFEST_LOCK (demux);
2342
2343 /* TODO HLS can answer this differently it seems */
2344 if (demux->manifest_uri) {
2345 /* FIXME: (hls) Do we answer with the variant playlist, with the current
2346 * playlist or the the uri of the last downlowaded fragment? */
2347 gst_query_set_uri (query, demux->manifest_uri);
2348 ret = TRUE;
2349 }
2350
2351 GST_MANIFEST_UNLOCK (demux);
2352 break;
2353 default:
2354 /* Don't forward queries upstream because of the special nature of this
2355 * "demuxer", which relies on the upstream element only to be fed
2356 * the Manifest
2357 */
2358 break;
2359 }
2360
2361 return ret;
2362 }
2363
2364 /* must be called with manifest_lock taken */
2365 static void
gst_adaptive_demux_start_tasks(GstAdaptiveDemux * demux,gboolean start_preroll_streams)2366 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
2367 gboolean start_preroll_streams)
2368 {
2369 GList *iter;
2370
2371 if (!gst_adaptive_demux_is_running (demux)) {
2372 GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2373 return;
2374 }
2375
2376 GST_INFO_OBJECT (demux, "Starting streams' tasks");
2377
2378 iter = start_preroll_streams ? demux->prepared_streams : demux->streams;
2379
2380 for (; iter; iter = g_list_next (iter)) {
2381 GstAdaptiveDemuxStream *stream = iter->data;
2382
2383 if (!start_preroll_streams) {
2384 g_mutex_lock (&stream->fragment_download_lock);
2385 stream->cancelled = FALSE;
2386 stream->replaced = FALSE;
2387 g_mutex_unlock (&stream->fragment_download_lock);
2388 }
2389
2390 stream->last_ret = GST_FLOW_OK;
2391 gst_task_start (stream->download_task);
2392 }
2393 }
2394
2395 /* must be called with manifest_lock taken */
2396 static void
gst_adaptive_demux_stop_manifest_update_task(GstAdaptiveDemux * demux)2397 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2398 {
2399 gst_uri_downloader_cancel (demux->downloader);
2400
2401 gst_task_stop (demux->priv->updates_task);
2402
2403 g_mutex_lock (&demux->priv->updates_timed_lock);
2404 GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2405 demux->priv->stop_updates_task = TRUE;
2406 g_cond_signal (&demux->priv->updates_timed_cond);
2407 g_mutex_unlock (&demux->priv->updates_timed_lock);
2408 }
2409
2410 /* must be called with manifest_lock taken */
2411 static void
gst_adaptive_demux_start_manifest_update_task(GstAdaptiveDemux * demux)2412 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2413 {
2414 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2415
2416 if (gst_adaptive_demux_is_live (demux)) {
2417 gst_uri_downloader_reset (demux->downloader);
2418 g_mutex_lock (&demux->priv->updates_timed_lock);
2419 demux->priv->stop_updates_task = FALSE;
2420 g_mutex_unlock (&demux->priv->updates_timed_lock);
2421 /* Task to periodically update the manifest */
2422 if (demux_class->requires_periodical_playlist_update (demux)) {
2423 GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2424 gst_task_start (demux->priv->updates_task);
2425 }
2426 }
2427 }
2428
2429 /* must be called with manifest_lock taken
2430 * This function will temporarily release manifest_lock in order to join the
2431 * download threads.
2432 * The api_lock will still protect it against other threads trying to modify
2433 * the demux element.
2434 */
2435 static void
gst_adaptive_demux_stop_tasks(GstAdaptiveDemux * demux,gboolean stop_updates)2436 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2437 {
2438 int i;
2439 GList *iter;
2440 GList *list_to_process;
2441
2442 GST_LOG_OBJECT (demux, "Stopping tasks");
2443
2444 if (stop_updates)
2445 gst_adaptive_demux_stop_manifest_update_task (demux);
2446
2447 list_to_process = demux->streams;
2448 for (i = 0; i < 2; ++i) {
2449 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2450 GstAdaptiveDemuxStream *stream = iter->data;
2451
2452 g_mutex_lock (&stream->fragment_download_lock);
2453 stream->cancelled = TRUE;
2454 gst_task_stop (stream->download_task);
2455 g_cond_signal (&stream->fragment_download_cond);
2456 g_mutex_unlock (&stream->fragment_download_lock);
2457 }
2458 list_to_process = demux->prepared_streams;
2459 }
2460
2461 GST_MANIFEST_UNLOCK (demux);
2462 g_mutex_lock (&demux->priv->preroll_lock);
2463 g_cond_broadcast (&demux->priv->preroll_cond);
2464 g_mutex_unlock (&demux->priv->preroll_lock);
2465 GST_MANIFEST_LOCK (demux);
2466
2467 g_mutex_lock (&demux->priv->manifest_update_lock);
2468 g_cond_broadcast (&demux->priv->manifest_cond);
2469 g_mutex_unlock (&demux->priv->manifest_update_lock);
2470
2471 /* need to release manifest_lock before stopping the src element.
2472 * The streams were asked to cancel, so they will not make any writes to demux
2473 * object. Even if we temporarily release manifest_lock, the demux->streams
2474 * cannot change and iter cannot be invalidated.
2475 */
2476 list_to_process = demux->streams;
2477 for (i = 0; i < 2; ++i) {
2478 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2479 GstAdaptiveDemuxStream *stream = iter->data;
2480 GstElement *src = stream->src;
2481
2482 GST_MANIFEST_UNLOCK (demux);
2483
2484 if (src) {
2485 gst_element_set_locked_state (src, TRUE);
2486 gst_element_set_state (src, GST_STATE_READY);
2487 }
2488
2489 /* stream->download_task value never changes, so it is safe to read it
2490 * outside critical section
2491 */
2492 gst_task_join (stream->download_task);
2493
2494 GST_MANIFEST_LOCK (demux);
2495 }
2496 list_to_process = demux->prepared_streams;
2497 }
2498
2499 GST_MANIFEST_UNLOCK (demux);
2500 if (stop_updates)
2501 gst_task_join (demux->priv->updates_task);
2502
2503 GST_MANIFEST_LOCK (demux);
2504
2505 list_to_process = demux->streams;
2506 for (i = 0; i < 2; ++i) {
2507 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2508 GstAdaptiveDemuxStream *stream = iter->data;
2509
2510 stream->download_error_count = 0;
2511 stream->need_header = TRUE;
2512 }
2513 list_to_process = demux->prepared_streams;
2514 }
2515 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2516 }
2517
2518 /* must be called with manifest_lock taken */
2519 static gboolean
gst_adaptive_demux_push_src_event(GstAdaptiveDemux * demux,GstEvent * event)2520 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2521 {
2522 GList *iter;
2523 gboolean ret = TRUE;
2524
2525 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2526 GstAdaptiveDemuxStream *stream = iter->data;
2527 gst_event_ref (event);
2528 ret = ret & gst_pad_push_event (stream->pad, event);
2529 }
2530 gst_event_unref (event);
2531 return ret;
2532 }
2533
2534 /* must be called with manifest_lock taken */
2535 void
gst_adaptive_demux_stream_set_caps(GstAdaptiveDemuxStream * stream,GstCaps * caps)2536 gst_adaptive_demux_stream_set_caps (GstAdaptiveDemuxStream * stream,
2537 GstCaps * caps)
2538 {
2539 GST_DEBUG_OBJECT (stream->pad, "setting new caps for stream %" GST_PTR_FORMAT,
2540 caps);
2541 gst_caps_replace (&stream->pending_caps, caps);
2542 gst_caps_unref (caps);
2543 }
2544
2545 /* must be called with manifest_lock taken */
2546 void
gst_adaptive_demux_stream_set_tags(GstAdaptiveDemuxStream * stream,GstTagList * tags)2547 gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
2548 GstTagList * tags)
2549 {
2550 GST_DEBUG_OBJECT (stream->pad, "setting new tags for stream %" GST_PTR_FORMAT,
2551 tags);
2552 if (stream->pending_tags) {
2553 gst_tag_list_unref (stream->pending_tags);
2554 }
2555 stream->pending_tags = tags;
2556 }
2557
2558 /* must be called with manifest_lock taken */
2559 void
gst_adaptive_demux_stream_queue_event(GstAdaptiveDemuxStream * stream,GstEvent * event)2560 gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
2561 GstEvent * event)
2562 {
2563 stream->pending_events = g_list_append (stream->pending_events, event);
2564 }
2565
2566 /* must be called with manifest_lock taken */
2567 static guint64
_update_average_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,guint64 new_bitrate)2568 _update_average_bitrate (GstAdaptiveDemux * demux,
2569 GstAdaptiveDemuxStream * stream, guint64 new_bitrate)
2570 {
2571 gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2572
2573 stream->moving_bitrate -= stream->fragment_bitrates[index];
2574 stream->fragment_bitrates[index] = new_bitrate;
2575 stream->moving_bitrate += new_bitrate;
2576
2577 stream->moving_index += 1;
2578
2579 if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2580 return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2581 return stream->moving_bitrate / stream->moving_index;
2582 }
2583
2584 /* must be called with manifest_lock taken */
2585 static guint64
gst_adaptive_demux_stream_update_current_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2586 gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2587 GstAdaptiveDemuxStream * stream)
2588 {
2589 guint64 average_bitrate;
2590 guint64 fragment_bitrate;
2591
2592 if (demux->connection_speed) {
2593 GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it",
2594 demux->connection_speed / 1000);
2595 stream->current_download_rate = demux->connection_speed;
2596 return demux->connection_speed;
2597 }
2598
2599 fragment_bitrate = stream->last_bitrate;
2600 GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2601 fragment_bitrate);
2602
2603 average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2604
2605 GST_INFO_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2606 "last fragment bitrate was %" G_GUINT64_FORMAT, fragment_bitrate);
2607 GST_INFO_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2608 "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
2609 NUM_LOOKBACK_FRAGMENTS, average_bitrate);
2610
2611 /* Conservative approach, make sure we don't upgrade too fast */
2612 stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
2613
2614 stream->current_download_rate *= demux->bitrate_limit;
2615 GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %"
2616 G_GUINT64_FORMAT, demux->bitrate_limit, stream->current_download_rate);
2617
2618 #if 0
2619 /* Debugging code, modulate the bitrate every few fragments */
2620 {
2621 static guint ctr = 0;
2622 if (ctr % 3 == 0) {
2623 GST_INFO_OBJECT (demux, "Halving reported bitrate for debugging");
2624 stream->current_download_rate /= 2;
2625 }
2626 ctr++;
2627 }
2628 #endif
2629
2630 return stream->current_download_rate;
2631 }
2632
2633 /* must be called with manifest_lock taken */
2634 static GstFlowReturn
gst_adaptive_demux_combine_flows(GstAdaptiveDemux * demux)2635 gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
2636 {
2637 gboolean all_notlinked = TRUE;
2638 gboolean all_eos = TRUE;
2639 GList *iter;
2640
2641 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2642 GstAdaptiveDemuxStream *stream = iter->data;
2643
2644 if (stream->last_ret != GST_FLOW_NOT_LINKED) {
2645 all_notlinked = FALSE;
2646 if (stream->last_ret != GST_FLOW_EOS)
2647 all_eos = FALSE;
2648 }
2649
2650 if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
2651 || stream->last_ret == GST_FLOW_FLUSHING) {
2652 return stream->last_ret;
2653 }
2654 }
2655 if (all_notlinked)
2656 return GST_FLOW_NOT_LINKED;
2657 else if (all_eos)
2658 return GST_FLOW_EOS;
2659 return GST_FLOW_OK;
2660 }
2661
2662 /* Called with preroll_lock */
2663 static void
gst_adaptive_demux_handle_preroll(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2664 gst_adaptive_demux_handle_preroll (GstAdaptiveDemux * demux,
2665 GstAdaptiveDemuxStream * stream)
2666 {
2667 demux->priv->preroll_pending--;
2668 if (demux->priv->preroll_pending == 0) {
2669 /* That was the last one, time to release all streams
2670 * and expose them */
2671 GST_DEBUG_OBJECT (demux, "All streams prerolled. exposing");
2672 gst_adaptive_demux_expose_streams (demux);
2673 g_cond_broadcast (&demux->priv->preroll_cond);
2674 }
2675 }
2676
2677 /* must be called with manifest_lock taken.
2678 * Temporarily releases manifest_lock
2679 */
2680 GstFlowReturn
gst_adaptive_demux_stream_push_buffer(GstAdaptiveDemuxStream * stream,GstBuffer * buffer)2681 gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
2682 GstBuffer * buffer)
2683 {
2684 GstAdaptiveDemux *demux = stream->demux;
2685 GstFlowReturn ret = GST_FLOW_OK;
2686 gboolean discont = FALSE;
2687 /* Pending events */
2688 GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags = NULL;
2689 GList *pending_events = NULL;
2690
2691 /* FIXME :
2692 * This is duplicating *exactly* the same thing as what is done at the beginning
2693 * of _src_chain if starting_fragment is TRUE */
2694 if (stream->first_fragment_buffer) {
2695 GstClockTime offset =
2696 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2697 GstClockTime period_start =
2698 gst_adaptive_demux_get_period_start_time (demux);
2699
2700 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2701 if (demux->segment.rate < 0)
2702 /* Set DISCONT flag for every first buffer in reverse playback mode
2703 * as each fragment for its own has to be reversed */
2704 discont = TRUE;
2705
2706 GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2707 if (GST_BUFFER_PTS_IS_VALID (buffer))
2708 GST_BUFFER_PTS (buffer) += offset;
2709
2710 if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2711 stream->segment.position = GST_BUFFER_PTS (buffer);
2712
2713 /* Convert from position inside the stream's segment to the demuxer's
2714 * segment, they are not necessarily the same */
2715 if (stream->segment.position - offset + period_start >
2716 demux->segment.position)
2717 demux->segment.position =
2718 stream->segment.position - offset + period_start;
2719 }
2720 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2721
2722 GST_LOG_OBJECT (stream->pad,
2723 "Going to push buffer with PTS %" GST_TIME_FORMAT,
2724 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2725 } else {
2726 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2727 }
2728
2729 if (stream->discont) {
2730 discont = TRUE;
2731 stream->discont = FALSE;
2732 }
2733
2734 if (discont) {
2735 GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
2736 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2737 } else {
2738 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
2739 }
2740
2741 stream->first_fragment_buffer = FALSE;
2742
2743 GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
2744 GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
2745 if (G_UNLIKELY (stream->pending_caps)) {
2746 pending_caps = gst_event_new_caps (stream->pending_caps);
2747 gst_caps_unref (stream->pending_caps);
2748 stream->pending_caps = NULL;
2749 }
2750
2751 if (stream->do_block) {
2752
2753 g_mutex_lock (&demux->priv->preroll_lock);
2754
2755 /* If we are preroll state, set caps in here */
2756 if (pending_caps) {
2757 gst_pad_push_event (stream->pad, pending_caps);
2758 pending_caps = NULL;
2759 }
2760
2761 gst_adaptive_demux_handle_preroll (demux, stream);
2762 GST_MANIFEST_UNLOCK (demux);
2763
2764 while (stream->do_block && !stream->cancelled) {
2765 GST_LOG_OBJECT (demux, "Stream %p sleeping for preroll", stream);
2766 g_cond_wait (&demux->priv->preroll_cond, &demux->priv->preroll_lock);
2767 }
2768 if (stream->cancelled) {
2769 GST_LOG_OBJECT (demux, "stream %p cancelled", stream);
2770 gst_buffer_unref (buffer);
2771 g_mutex_unlock (&demux->priv->preroll_lock);
2772 return GST_FLOW_FLUSHING;
2773 }
2774
2775 g_mutex_unlock (&demux->priv->preroll_lock);
2776 GST_MANIFEST_LOCK (demux);
2777 }
2778
2779 if (G_UNLIKELY (stream->pending_segment)) {
2780 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2781 pending_segment = stream->pending_segment;
2782 stream->pending_segment = NULL;
2783 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2784 }
2785 if (G_UNLIKELY (stream->pending_tags || stream->bitrate_changed)) {
2786 GstTagList *tags = stream->pending_tags;
2787
2788 stream->pending_tags = NULL;
2789 stream->bitrate_changed = 0;
2790
2791 if (stream->fragment.bitrate != 0) {
2792 if (tags)
2793 tags = gst_tag_list_make_writable (tags);
2794 else
2795 tags = gst_tag_list_new_empty ();
2796
2797 gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
2798 GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, NULL);
2799 }
2800 if (tags)
2801 pending_tags = gst_event_new_tag (tags);
2802 }
2803 if (G_UNLIKELY (stream->pending_events)) {
2804 pending_events = stream->pending_events;
2805 stream->pending_events = NULL;
2806 }
2807
2808 GST_MANIFEST_UNLOCK (demux);
2809
2810 /* Do not push events or buffers holding the manifest lock */
2811 if (G_UNLIKELY (pending_caps)) {
2812 GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT,
2813 pending_caps);
2814 gst_pad_push_event (stream->pad, pending_caps);
2815 }
2816 if (G_UNLIKELY (pending_segment)) {
2817 GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
2818 pending_segment);
2819 gst_pad_push_event (stream->pad, pending_segment);
2820 }
2821 if (G_UNLIKELY (pending_tags)) {
2822 GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
2823 pending_tags);
2824 gst_pad_push_event (stream->pad, pending_tags);
2825 }
2826 while (pending_events != NULL) {
2827 GstEvent *event = pending_events->data;
2828
2829 if (!gst_pad_push_event (stream->pad, event))
2830 GST_ERROR_OBJECT (stream->pad, "Failed to send pending event");
2831
2832 pending_events = g_list_delete_link (pending_events, pending_events);
2833 }
2834
2835 /* Wait for preroll if blocking */
2836 GST_DEBUG_OBJECT (stream->pad,
2837 "About to push buffer of size %" G_GSIZE_FORMAT,
2838 gst_buffer_get_size (buffer));
2839
2840 ret = gst_pad_push (stream->pad, buffer);
2841
2842 GST_MANIFEST_LOCK (demux);
2843
2844 g_mutex_lock (&stream->fragment_download_lock);
2845 if (G_UNLIKELY (stream->cancelled)) {
2846 GST_LOG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2847 "Stream was cancelled");
2848 ret = stream->last_ret = GST_FLOW_FLUSHING;
2849 g_mutex_unlock (&stream->fragment_download_lock);
2850 return ret;
2851 }
2852 g_mutex_unlock (&stream->fragment_download_lock);
2853
2854 GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
2855 gst_flow_get_name (ret));
2856
2857 return ret;
2858 }
2859
2860 /* must be called with manifest_lock taken */
2861 static GstFlowReturn
gst_adaptive_demux_stream_finish_fragment_default(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2862 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
2863 GstAdaptiveDemuxStream * stream)
2864 {
2865 /* No need to advance, this isn't a real fragment */
2866 if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
2867 return GST_FLOW_OK;
2868
2869 return gst_adaptive_demux_stream_advance_fragment (demux, stream,
2870 stream->fragment.duration);
2871 }
2872
2873 /* must be called with manifest_lock taken.
2874 * Can temporarily release manifest_lock
2875 */
2876 static GstFlowReturn
gst_adaptive_demux_stream_data_received_default(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstBuffer * buffer)2877 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
2878 GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
2879 {
2880 return gst_adaptive_demux_stream_push_buffer (stream, buffer);
2881 }
2882
2883 static gboolean
gst_adaptive_demux_requires_periodical_playlist_update_default(GstAdaptiveDemux * demux)2884 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2885 * demux)
2886 {
2887 return TRUE;
2888 }
2889
2890 static GstFlowReturn
_src_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)2891 _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2892 {
2893 GstAdaptiveDemuxStream *stream;
2894 GstAdaptiveDemux *demux;
2895 GstAdaptiveDemuxClass *klass;
2896 GstFlowReturn ret = GST_FLOW_OK;
2897
2898 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2899 stream = gst_pad_get_element_private (pad);
2900 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2901
2902 GST_MANIFEST_LOCK (demux);
2903
2904 /* do not make any changes if the stream is cancelled */
2905 g_mutex_lock (&stream->fragment_download_lock);
2906 if (G_UNLIKELY (stream->cancelled)) {
2907 g_mutex_unlock (&stream->fragment_download_lock);
2908 gst_buffer_unref (buffer);
2909 ret = stream->last_ret = GST_FLOW_FLUSHING;
2910 GST_MANIFEST_UNLOCK (demux);
2911 return ret;
2912 }
2913 g_mutex_unlock (&stream->fragment_download_lock);
2914
2915 /* starting_fragment is set to TRUE at the beginning of
2916 * _stream_download_fragment()
2917 * /!\ If there is a header/index being downloaded, then this will
2918 * be TRUE for the first one ... but FALSE for the remaining ones,
2919 * including the *actual* fragment ! */
2920 if (stream->starting_fragment) {
2921 GstClockTime offset =
2922 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2923 GstClockTime period_start =
2924 gst_adaptive_demux_get_period_start_time (demux);
2925
2926 stream->starting_fragment = FALSE;
2927 if (klass->start_fragment) {
2928 if (!klass->start_fragment (demux, stream)) {
2929 ret = GST_FLOW_ERROR;
2930 goto error;
2931 }
2932 }
2933
2934 GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2935 if (GST_BUFFER_PTS_IS_VALID (buffer))
2936 GST_BUFFER_PTS (buffer) += offset;
2937
2938 GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
2939 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2940
2941 if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2942 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2943 stream->segment.position = GST_BUFFER_PTS (buffer);
2944
2945 /* Convert from position inside the stream's segment to the demuxer's
2946 * segment, they are not necessarily the same */
2947 if (stream->segment.position - offset + period_start >
2948 demux->segment.position)
2949 demux->segment.position =
2950 stream->segment.position - offset + period_start;
2951 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2952 }
2953
2954 } else {
2955 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2956 }
2957
2958 /* downloading_first_buffer is set to TRUE in download_uri() just before
2959 * activating the source (i.e. requesting a given URI)
2960 *
2961 * The difference with starting_fragment is that this will be called
2962 * for *all* first buffers (of index, and header, and fragment)
2963 *
2964 * ... to then only do something useful (in this block) for actual
2965 * fragments... */
2966 if (stream->downloading_first_buffer) {
2967 gint64 chunk_size = 0;
2968
2969 stream->downloading_first_buffer = FALSE;
2970
2971 if (!stream->downloading_header && !stream->downloading_index) {
2972 /* If this is the first buffer of a fragment (not the headers or index)
2973 * and we don't have a birate from the sub-class, then see if we
2974 * can work it out from the fragment size and duration */
2975 if (stream->fragment.bitrate == 0 &&
2976 stream->fragment.duration != 0 &&
2977 gst_element_query_duration (stream->uri_handler, GST_FORMAT_BYTES,
2978 &chunk_size) && chunk_size != -1) {
2979 guint bitrate = MIN (G_MAXUINT, gst_util_uint64_scale (chunk_size,
2980 8 * GST_SECOND, stream->fragment.duration));
2981 GST_LOG_OBJECT (demux,
2982 "Fragment has size %" G_GINT64_FORMAT " duration %" GST_TIME_FORMAT
2983 " = bitrate %u", chunk_size,
2984 GST_TIME_ARGS (stream->fragment.duration), bitrate);
2985 stream->fragment.bitrate = bitrate;
2986 }
2987 if (stream->fragment.bitrate) {
2988 stream->bitrate_changed = TRUE;
2989 } else {
2990 GST_WARNING_OBJECT (demux, "Bitrate for fragment not available");
2991 }
2992 }
2993 }
2994
2995 stream->download_total_bytes += gst_buffer_get_size (buffer);
2996
2997 GST_TRACE_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT,
2998 gst_buffer_get_size (buffer));
2999
3000 ret = klass->data_received (demux, stream, buffer);
3001
3002 if (ret == GST_FLOW_FLUSHING) {
3003 /* do not make any changes if the stream is cancelled */
3004 g_mutex_lock (&stream->fragment_download_lock);
3005 if (G_UNLIKELY (stream->cancelled)) {
3006 g_mutex_unlock (&stream->fragment_download_lock);
3007 GST_MANIFEST_UNLOCK (demux);
3008 return ret;
3009 }
3010 g_mutex_unlock (&stream->fragment_download_lock);
3011 }
3012
3013 if (ret != GST_FLOW_OK) {
3014 gboolean finished = FALSE;
3015
3016 if (ret < GST_FLOW_EOS) {
3017 GST_ELEMENT_FLOW_ERROR (demux, ret);
3018
3019 /* TODO push this on all pads */
3020 gst_pad_push_event (stream->pad, gst_event_new_eos ());
3021 } else {
3022 GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
3023 gst_flow_get_name (ret));
3024 }
3025
3026 if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
3027 ret = GST_FLOW_EOS; /* return EOS to make the source stop */
3028 } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
3029 /* Behaves like an EOS event from upstream */
3030 stream->fragment.finished = TRUE;
3031 ret = klass->finish_fragment (demux, stream);
3032 if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
3033 ret = GST_FLOW_EOS; /* return EOS to make the source stop */
3034 } else if (ret != GST_FLOW_OK) {
3035 goto error;
3036 }
3037 finished = TRUE;
3038 }
3039
3040 gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
3041 if (finished)
3042 ret = GST_FLOW_EOS;
3043 }
3044
3045 error:
3046
3047 GST_MANIFEST_UNLOCK (demux);
3048
3049 return ret;
3050 }
3051
3052 /* must be called with manifest_lock taken */
3053 static void
gst_adaptive_demux_stream_fragment_download_finish(GstAdaptiveDemuxStream * stream,GstFlowReturn ret,GError * err)3054 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
3055 stream, GstFlowReturn ret, GError * err)
3056 {
3057 GST_DEBUG_OBJECT (stream->pad, "Download finish: %d %s - err: %p", ret,
3058 gst_flow_get_name (ret), err);
3059
3060 /* if we have an error, only replace last_ret if it was OK before to avoid
3061 * overwriting the first error we got */
3062 if (stream->last_ret == GST_FLOW_OK) {
3063 stream->last_ret = ret;
3064 if (err) {
3065 g_clear_error (&stream->last_error);
3066 stream->last_error = g_error_copy (err);
3067 }
3068 }
3069 g_mutex_lock (&stream->fragment_download_lock);
3070 stream->download_finished = TRUE;
3071 g_cond_signal (&stream->fragment_download_cond);
3072 g_mutex_unlock (&stream->fragment_download_lock);
3073 }
3074
3075 static GstFlowReturn
gst_adaptive_demux_eos_handling(GstAdaptiveDemuxStream * stream)3076 gst_adaptive_demux_eos_handling (GstAdaptiveDemuxStream * stream)
3077 {
3078 GstFlowReturn ret = GST_FLOW_OK;
3079 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
3080
3081 if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
3082 || !klass->need_another_chunk (stream)
3083 || stream->fragment.chunk_size == 0) {
3084 stream->fragment.finished = TRUE;
3085
3086 /* Last chance to figure out a fallback nominal bitrate if neither baseclass
3087 nor the HTTP Content-Length implementation worked. */
3088 if (stream->fragment.bitrate == 0 && stream->fragment.duration != 0 &&
3089 stream->fragment_bytes_downloaded != 0 && !stream->downloading_index &&
3090 !stream->downloading_header) {
3091 guint bitrate = MIN (G_MAXUINT,
3092 gst_util_uint64_scale (stream->fragment_bytes_downloaded,
3093 8 * GST_SECOND, stream->fragment.duration));
3094 GST_LOG_OBJECT (stream->pad,
3095 "Fragment has size %" G_GUINT64_FORMAT " duration %" GST_TIME_FORMAT
3096 " = bitrate %u", stream->fragment_bytes_downloaded,
3097 GST_TIME_ARGS (stream->fragment.duration), bitrate);
3098 stream->fragment.bitrate = bitrate;
3099 stream->bitrate_changed = TRUE;
3100 }
3101 ret = klass->finish_fragment (stream->demux, stream);
3102 }
3103 gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
3104
3105 return ret;
3106 }
3107
3108 static gboolean
_src_event(GstPad * pad,GstObject * parent,GstEvent * event)3109 _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3110 {
3111 GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
3112 GstAdaptiveDemux *demux = stream->demux;
3113
3114 switch (GST_EVENT_TYPE (event)) {
3115 case GST_EVENT_EOS:{
3116 GST_DEBUG_OBJECT (pad, "Saw EOS on src pad");
3117 GST_MANIFEST_LOCK (demux);
3118
3119 gst_adaptive_demux_eos_handling (stream);
3120
3121 /* FIXME ?
3122 * _eos_handling() calls fragment_download_finish() which does the
3123 * same thing as below.
3124 * Could this cause races ? */
3125 g_mutex_lock (&stream->fragment_download_lock);
3126 stream->download_finished = TRUE;
3127 g_cond_signal (&stream->fragment_download_cond);
3128 g_mutex_unlock (&stream->fragment_download_lock);
3129
3130 GST_MANIFEST_UNLOCK (demux);
3131 break;
3132 }
3133 default:
3134 break;
3135 }
3136
3137 gst_event_unref (event);
3138
3139 return TRUE;
3140 }
3141
3142 static gboolean
_src_query(GstPad * pad,GstObject * parent,GstQuery * query)3143 _src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3144 {
3145 GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
3146
3147 switch (GST_QUERY_TYPE (query)) {
3148 case GST_QUERY_ALLOCATION:
3149 return FALSE;
3150 break;
3151 default:
3152 break;
3153 }
3154
3155 return gst_pad_peer_query (stream->pad, query);
3156 }
3157
3158 static GstPadProbeReturn
_uri_handler_probe(GstPad * pad,GstPadProbeInfo * info,GstAdaptiveDemuxStream * stream)3159 _uri_handler_probe (GstPad * pad, GstPadProbeInfo * info,
3160 GstAdaptiveDemuxStream * stream)
3161 {
3162 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
3163
3164 if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
3165 GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
3166 if (stream->fragment_bytes_downloaded == 0) {
3167 stream->last_latency =
3168 gst_adaptive_demux_get_monotonic_time (stream->demux) -
3169 (stream->download_start_time * GST_USECOND);
3170 GST_DEBUG_OBJECT (pad,
3171 "FIRST BYTE since download_start %" GST_TIME_FORMAT,
3172 GST_TIME_ARGS (stream->last_latency));
3173 }
3174 stream->fragment_bytes_downloaded += gst_buffer_get_size (buf);
3175 GST_LOG_OBJECT (pad,
3176 "Received buffer, size %" G_GSIZE_FORMAT " total %" G_GUINT64_FORMAT,
3177 gst_buffer_get_size (buf), stream->fragment_bytes_downloaded);
3178 } else if (GST_PAD_PROBE_INFO_TYPE (info) &
3179 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
3180 GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
3181 GST_LOG_OBJECT (pad, "Received event %s %" GST_PTR_FORMAT,
3182 GST_EVENT_TYPE_NAME (ev), ev);
3183 switch (GST_EVENT_TYPE (ev)) {
3184 case GST_EVENT_SEGMENT:
3185 stream->fragment_bytes_downloaded = 0;
3186 break;
3187 case GST_EVENT_EOS:
3188 {
3189 stream->last_download_time =
3190 gst_adaptive_demux_get_monotonic_time (stream->demux) -
3191 (stream->download_start_time * GST_USECOND);
3192 stream->last_bitrate =
3193 gst_util_uint64_scale (stream->fragment_bytes_downloaded,
3194 8 * GST_SECOND, stream->last_download_time);
3195 GST_DEBUG_OBJECT (pad,
3196 "EOS since download_start %" GST_TIME_FORMAT " bitrate %"
3197 G_GUINT64_FORMAT " bps", GST_TIME_ARGS (stream->last_download_time),
3198 stream->last_bitrate);
3199 /* Calculate bitrate since URI request */
3200 }
3201 break;
3202 default:
3203 break;
3204 }
3205 }
3206
3207 return ret;
3208 }
3209
3210 /* must be called with manifest_lock taken.
3211 * Can temporarily release manifest_lock
3212 */
3213 static gboolean
gst_adaptive_demux_stream_wait_manifest_update(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)3214 gst_adaptive_demux_stream_wait_manifest_update (GstAdaptiveDemux * demux,
3215 GstAdaptiveDemuxStream * stream)
3216 {
3217 gboolean ret = TRUE;
3218
3219 /* Wait until we're cancelled or there's something for
3220 * us to download in the playlist or the playlist
3221 * became non-live */
3222 while (TRUE) {
3223 GST_DEBUG_OBJECT (demux, "No fragment left but live playlist, wait a bit");
3224
3225 /* get the manifest_update_lock while still holding the manifest_lock.
3226 * This will prevent other threads to signal the condition (they will need
3227 * both manifest_lock and manifest_update_lock in order to signal).
3228 * It cannot deadlock because all threads always get the manifest_lock first
3229 * and manifest_update_lock second.
3230 */
3231 g_mutex_lock (&demux->priv->manifest_update_lock);
3232
3233 GST_MANIFEST_UNLOCK (demux);
3234
3235 g_cond_wait (&demux->priv->manifest_cond,
3236 &demux->priv->manifest_update_lock);
3237 g_mutex_unlock (&demux->priv->manifest_update_lock);
3238
3239 GST_MANIFEST_LOCK (demux);
3240
3241 /* check for cancelled every time we get the manifest_lock */
3242 g_mutex_lock (&stream->fragment_download_lock);
3243 if (G_UNLIKELY (stream->cancelled)) {
3244 ret = FALSE;
3245 stream->last_ret = GST_FLOW_FLUSHING;
3246 g_mutex_unlock (&stream->fragment_download_lock);
3247 break;
3248 }
3249 g_mutex_unlock (&stream->fragment_download_lock);
3250
3251 /* Got a new fragment or not live anymore? */
3252 if (gst_adaptive_demux_stream_update_fragment_info (demux, stream) ==
3253 GST_FLOW_OK) {
3254 GST_DEBUG_OBJECT (demux, "new fragment available, "
3255 "not waiting for manifest update");
3256 ret = TRUE;
3257 break;
3258 }
3259
3260 if (!gst_adaptive_demux_is_live (demux)) {
3261 GST_DEBUG_OBJECT (demux, "Not live anymore, "
3262 "not waiting for manifest update");
3263 ret = FALSE;
3264 break;
3265 }
3266 }
3267 GST_DEBUG_OBJECT (demux, "Retrying now");
3268 return ret;
3269 }
3270
3271 /* must be called with manifest_lock taken */
3272 static gboolean
gst_adaptive_demux_stream_update_source(GstAdaptiveDemuxStream * stream,const gchar * uri,const gchar * referer,gboolean refresh,gboolean allow_cache)3273 gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
3274 const gchar * uri, const gchar * referer, gboolean refresh,
3275 gboolean allow_cache)
3276 {
3277 GstAdaptiveDemux *demux = stream->demux;
3278
3279 if (!gst_uri_is_valid (uri)) {
3280 GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
3281 return FALSE;
3282 }
3283
3284 /* Try to re-use existing source element */
3285 if (stream->src != NULL) {
3286 gchar *old_protocol, *new_protocol;
3287 gchar *old_uri;
3288
3289 old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler));
3290 old_protocol = gst_uri_get_protocol (old_uri);
3291 new_protocol = gst_uri_get_protocol (uri);
3292
3293 if (!g_str_equal (old_protocol, new_protocol)) {
3294 GstElement *src = stream->src;
3295
3296 stream->src = NULL;
3297 gst_object_unref (stream->src_srcpad);
3298 stream->src_srcpad = NULL;
3299 GST_MANIFEST_UNLOCK (demux);
3300 gst_element_set_locked_state (src, TRUE);
3301 gst_element_set_state (src, GST_STATE_NULL);
3302 gst_bin_remove (GST_BIN_CAST (demux), src);
3303 GST_MANIFEST_LOCK (demux);
3304 GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
3305 } else {
3306 GError *err = NULL;
3307
3308 GST_DEBUG_OBJECT (demux, "Re-using old source element");
3309 if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri,
3310 &err)) {
3311 GstElement *src = stream->src;
3312
3313 stream->src = NULL;
3314 GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
3315 err ? err->message : "Unknown error");
3316 g_clear_error (&err);
3317 gst_object_unref (stream->src_srcpad);
3318 stream->src_srcpad = NULL;
3319 GST_MANIFEST_UNLOCK (demux);
3320 gst_element_set_locked_state (src, TRUE);
3321 gst_element_set_state (src, GST_STATE_NULL);
3322 gst_bin_remove (GST_BIN_CAST (demux), src);
3323 GST_MANIFEST_LOCK (demux);
3324 }
3325 }
3326 g_free (old_uri);
3327 g_free (old_protocol);
3328 g_free (new_protocol);
3329 }
3330
3331 if (stream->src == NULL) {
3332 GstPad *uri_handler_src;
3333 GstPad *queue_sink;
3334 GstPad *queue_src;
3335 GstElement *uri_handler;
3336 GstElement *queue;
3337 GstPadLinkReturn pad_link_ret;
3338 GObjectClass *gobject_class;
3339 gchar *internal_name, *bin_name;
3340
3341 /* Our src consists of a bin containing uri_handler -> queue . The
3342 * purpose of the queue is to allow the uri_handler to download an
3343 * entire fragment without blocking, so we can accurately measure the
3344 * download bitrate. */
3345
3346 queue = gst_element_factory_make ("queue", NULL);
3347 if (queue == NULL)
3348 return FALSE;
3349
3350 g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL);
3351 g_object_set (queue, "max-size-buffers", (guint) 0, NULL);
3352 g_object_set (queue, "max-size-time", (guint64) 0, NULL);
3353
3354 uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
3355 if (uri_handler == NULL) {
3356 GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
3357 ("Missing plugin to handle URI: '%s'", uri), (NULL));
3358 gst_object_unref (queue);
3359 return FALSE;
3360 }
3361
3362 gobject_class = G_OBJECT_GET_CLASS (uri_handler);
3363
3364 if (g_object_class_find_property (gobject_class, "compress"))
3365 g_object_set (uri_handler, "compress", FALSE, NULL);
3366 if (g_object_class_find_property (gobject_class, "keep-alive"))
3367 g_object_set (uri_handler, "keep-alive", TRUE, NULL);
3368 if (g_object_class_find_property (gobject_class, "extra-headers")) {
3369 if (referer || refresh || !allow_cache) {
3370 GstStructure *extra_headers = gst_structure_new_empty ("headers");
3371
3372 if (referer)
3373 gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
3374 NULL);
3375
3376 if (!allow_cache)
3377 gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3378 "no-cache", NULL);
3379 else if (refresh)
3380 gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3381 "max-age=0", NULL);
3382
3383 g_object_set (uri_handler, "extra-headers", extra_headers, NULL);
3384
3385 gst_structure_free (extra_headers);
3386 } else {
3387 g_object_set (uri_handler, "extra-headers", NULL, NULL);
3388 }
3389 }
3390
3391 /* Source bin creation */
3392 bin_name = g_strdup_printf ("srcbin-%s", GST_PAD_NAME (stream->pad));
3393 stream->src = gst_bin_new (bin_name);
3394 g_free (bin_name);
3395 if (stream->src == NULL) {
3396 gst_object_unref (queue);
3397 gst_object_unref (uri_handler);
3398 return FALSE;
3399 }
3400
3401 gst_bin_add (GST_BIN_CAST (stream->src), queue);
3402 gst_bin_add (GST_BIN_CAST (stream->src), uri_handler);
3403
3404 uri_handler_src = gst_element_get_static_pad (uri_handler, "src");
3405 queue_sink = gst_element_get_static_pad (queue, "sink");
3406
3407 pad_link_ret =
3408 gst_pad_link_full (uri_handler_src, queue_sink,
3409 GST_PAD_LINK_CHECK_NOTHING);
3410 if (GST_PAD_LINK_FAILED (pad_link_ret)) {
3411 GST_WARNING_OBJECT (demux,
3412 "Could not link pads %s:%s to %s:%s for reason %d",
3413 GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink),
3414 pad_link_ret);
3415 g_object_unref (queue_sink);
3416 g_object_unref (uri_handler_src);
3417 gst_object_unref (stream->src);
3418 stream->src = NULL;
3419 return FALSE;
3420 }
3421
3422 /* Add a downstream event and data probe */
3423 gst_pad_add_probe (uri_handler_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
3424 (GstPadProbeCallback) _uri_handler_probe, stream, NULL);
3425
3426 g_object_unref (queue_sink);
3427 g_object_unref (uri_handler_src);
3428 queue_src = gst_element_get_static_pad (queue, "src");
3429 stream->src_srcpad = gst_ghost_pad_new ("src", queue_src);
3430 g_object_unref (queue_src);
3431 gst_element_add_pad (stream->src, stream->src_srcpad);
3432
3433 gst_element_set_locked_state (stream->src, TRUE);
3434 gst_bin_add (GST_BIN_CAST (demux), stream->src);
3435 stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
3436
3437 /* set up our internal floating pad to drop all events from
3438 * the http src we don't care about. On the chain function
3439 * we just push the buffer forward */
3440 internal_name = g_strdup_printf ("internal-%s", GST_PAD_NAME (stream->pad));
3441 stream->internal_pad = gst_pad_new (internal_name, GST_PAD_SINK);
3442 g_free (internal_name);
3443 gst_object_set_parent (GST_OBJECT_CAST (stream->internal_pad),
3444 GST_OBJECT_CAST (demux));
3445 GST_OBJECT_FLAG_SET (stream->internal_pad, GST_PAD_FLAG_NEED_PARENT);
3446 gst_pad_set_element_private (stream->internal_pad, stream);
3447 gst_pad_set_active (stream->internal_pad, TRUE);
3448 gst_pad_set_chain_function (stream->internal_pad, _src_chain);
3449 gst_pad_set_event_function (stream->internal_pad, _src_event);
3450 gst_pad_set_query_function (stream->internal_pad, _src_query);
3451
3452 if (gst_pad_link_full (stream->src_srcpad, stream->internal_pad,
3453 GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
3454 GST_ERROR_OBJECT (stream->pad, "Failed to link internal pad");
3455 return FALSE;
3456 }
3457
3458 stream->uri_handler = uri_handler;
3459 stream->queue = queue;
3460
3461 stream->last_status_code = 200; /* default to OK */
3462 }
3463 return TRUE;
3464 }
3465
3466 static GstPadProbeReturn
gst_ad_stream_src_to_ready_cb(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)3467 gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
3468 gpointer user_data)
3469 {
3470 GstAdaptiveDemuxStream *stream = user_data;
3471
3472 /* The source's src pad is IDLE so now set the state to READY */
3473 g_mutex_lock (&stream->fragment_download_lock);
3474 stream->src_at_ready = TRUE;
3475 g_cond_signal (&stream->fragment_download_cond);
3476 g_mutex_unlock (&stream->fragment_download_lock);
3477
3478 return GST_PAD_PROBE_REMOVE;
3479 }
3480
3481 #ifndef GST_DISABLE_GST_DEBUG
3482 static const char *
uritype(GstAdaptiveDemuxStream * s)3483 uritype (GstAdaptiveDemuxStream * s)
3484 {
3485 if (s->downloading_header)
3486 return "header";
3487 if (s->downloading_index)
3488 return "index";
3489 return "fragment";
3490 }
3491 #endif
3492
3493 /* must be called with manifest_lock taken.
3494 * Can temporarily release manifest_lock
3495 *
3496 * Will return when URI is fully downloaded (or aborted/errored)
3497 */
3498 static GstFlowReturn
gst_adaptive_demux_stream_download_uri(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,const gchar * uri,gint64 start,gint64 end,guint * http_status)3499 gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
3500 GstAdaptiveDemuxStream * stream, const gchar * uri, gint64 start,
3501 gint64 end, guint * http_status)
3502 {
3503 GstFlowReturn ret = GST_FLOW_OK;
3504 GST_DEBUG_OBJECT (stream->pad,
3505 "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
3506 uritype (stream), uri, start, end);
3507
3508 if (http_status)
3509 *http_status = 200; /* default to ok if no further information */
3510
3511 if (!gst_adaptive_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
3512 ret = stream->last_ret = GST_FLOW_ERROR;
3513 return ret;
3514 }
3515
3516 gst_element_set_locked_state (stream->src, TRUE);
3517
3518 GST_MANIFEST_UNLOCK (demux);
3519 if (gst_element_set_state (stream->src,
3520 GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
3521 /* If ranges are specified, seek to it */
3522 if (start != 0 || end != -1) {
3523 /* HTTP ranges are inclusive, GStreamer segments are exclusive for the
3524 * stop position */
3525 if (end != -1)
3526 end += 1;
3527 /* Send the seek event to the uri_handler, as the other pipeline elements
3528 * can't handle it when READY. */
3529 if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0,
3530 GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
3531 GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
3532
3533 GST_MANIFEST_LOCK (demux);
3534 /* looks like the source can't handle seeks in READY */
3535 g_clear_error (&stream->last_error);
3536 stream->last_error = g_error_new (GST_CORE_ERROR,
3537 GST_CORE_ERROR_NOT_IMPLEMENTED,
3538 "Source element can't handle range requests");
3539 stream->last_ret = GST_FLOW_ERROR;
3540 } else {
3541 GST_MANIFEST_LOCK (demux);
3542 }
3543 } else {
3544 GST_MANIFEST_LOCK (demux);
3545 }
3546
3547 if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
3548 stream->download_start_time =
3549 GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
3550
3551 /* src element is in state READY. Before we start it, we reset
3552 * download_finished
3553 */
3554 g_mutex_lock (&stream->fragment_download_lock);
3555 stream->download_finished = FALSE;
3556 stream->downloading_first_buffer = TRUE;
3557 g_mutex_unlock (&stream->fragment_download_lock);
3558
3559 GST_MANIFEST_UNLOCK (demux);
3560
3561 if (!gst_element_sync_state_with_parent (stream->src)) {
3562 GST_WARNING_OBJECT (demux, "Could not sync state for src element");
3563 GST_MANIFEST_LOCK (demux);
3564 ret = stream->last_ret = GST_FLOW_ERROR;
3565 return ret;
3566 }
3567
3568 /* wait for the fragment to be completely downloaded */
3569 GST_DEBUG_OBJECT (stream->pad,
3570 "Waiting for %s download to finish: %s", uritype (stream), uri);
3571
3572 g_mutex_lock (&stream->fragment_download_lock);
3573 stream->src_at_ready = FALSE;
3574 if (G_UNLIKELY (stream->cancelled)) {
3575 g_mutex_unlock (&stream->fragment_download_lock);
3576 GST_MANIFEST_LOCK (demux);
3577 ret = stream->last_ret = GST_FLOW_FLUSHING;
3578 return ret;
3579 }
3580 /* download_finished is only set:
3581 * * in ::fragment_download_finish()
3582 * * if EOS is received on the _src pad
3583 * */
3584 while (!stream->cancelled && !stream->download_finished) {
3585 g_cond_wait (&stream->fragment_download_cond,
3586 &stream->fragment_download_lock);
3587 }
3588 g_mutex_unlock (&stream->fragment_download_lock);
3589
3590 GST_DEBUG_OBJECT (stream->pad,
3591 "Finished Waiting for %s download: %s", uritype (stream), uri);
3592
3593 GST_MANIFEST_LOCK (demux);
3594 g_mutex_lock (&stream->fragment_download_lock);
3595 if (G_UNLIKELY (stream->cancelled)) {
3596 ret = stream->last_ret = GST_FLOW_FLUSHING;
3597 g_mutex_unlock (&stream->fragment_download_lock);
3598 return ret;
3599 }
3600 g_mutex_unlock (&stream->fragment_download_lock);
3601
3602 ret = stream->last_ret;
3603
3604 GST_DEBUG_OBJECT (stream->pad, "%s download finished: %s %d %s",
3605 uritype (stream), uri, stream->last_ret,
3606 gst_flow_get_name (stream->last_ret));
3607 if (stream->last_ret != GST_FLOW_OK && http_status) {
3608 *http_status = stream->last_status_code;
3609 }
3610 }
3611
3612 /* changing src element state might try to join the streaming thread, so
3613 * we must not hold the manifest lock.
3614 */
3615 GST_MANIFEST_UNLOCK (demux);
3616 } else {
3617 GST_MANIFEST_UNLOCK (demux);
3618 if (stream->last_ret == GST_FLOW_OK)
3619 stream->last_ret = GST_FLOW_CUSTOM_ERROR;
3620 ret = GST_FLOW_CUSTOM_ERROR;
3621 }
3622
3623 stream->src_at_ready = FALSE;
3624
3625 gst_element_set_locked_state (stream->src, TRUE);
3626 gst_pad_add_probe (stream->src_srcpad, GST_PAD_PROBE_TYPE_IDLE,
3627 gst_ad_stream_src_to_ready_cb, stream, NULL);
3628
3629 g_mutex_lock (&stream->fragment_download_lock);
3630 while (!stream->src_at_ready) {
3631 g_cond_wait (&stream->fragment_download_cond,
3632 &stream->fragment_download_lock);
3633 }
3634 g_mutex_unlock (&stream->fragment_download_lock);
3635
3636 gst_element_set_state (stream->src, GST_STATE_READY);
3637
3638 /* Need to drop the fragment_download_lock to get the MANIFEST lock */
3639 GST_MANIFEST_LOCK (demux);
3640 g_mutex_lock (&stream->fragment_download_lock);
3641 if (G_UNLIKELY (stream->cancelled)) {
3642 ret = stream->last_ret = GST_FLOW_FLUSHING;
3643 g_mutex_unlock (&stream->fragment_download_lock);
3644 return ret;
3645 }
3646 g_mutex_unlock (&stream->fragment_download_lock);
3647
3648 /* deactivate and reactivate our ghostpad to make it fresh for a new
3649 * stream */
3650 gst_pad_set_active (stream->internal_pad, FALSE);
3651 gst_pad_set_active (stream->internal_pad, TRUE);
3652
3653 return ret;
3654 }
3655
3656 /* must be called with manifest_lock taken.
3657 * Can temporarily release manifest_lock
3658 */
3659 static GstFlowReturn
gst_adaptive_demux_stream_download_header_fragment(GstAdaptiveDemuxStream * stream)3660 gst_adaptive_demux_stream_download_header_fragment (GstAdaptiveDemuxStream *
3661 stream)
3662 {
3663 GstAdaptiveDemux *demux = stream->demux;
3664 GstFlowReturn ret = GST_FLOW_OK;
3665
3666 if (stream->fragment.header_uri != NULL) {
3667 GST_DEBUG_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
3668 G_GINT64_FORMAT, stream->fragment.header_uri,
3669 stream->fragment.header_range_start, stream->fragment.header_range_end);
3670
3671 stream->downloading_header = TRUE;
3672 ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3673 stream->fragment.header_uri, stream->fragment.header_range_start,
3674 stream->fragment.header_range_end, NULL);
3675 stream->downloading_header = FALSE;
3676 }
3677
3678 /* check if we have an index */
3679 if (ret == GST_FLOW_OK) { /* TODO check for other valid types */
3680
3681 if (stream->fragment.index_uri != NULL) {
3682 GST_DEBUG_OBJECT (demux,
3683 "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
3684 stream->fragment.index_uri,
3685 stream->fragment.index_range_start, stream->fragment.index_range_end);
3686 stream->downloading_index = TRUE;
3687 ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3688 stream->fragment.index_uri, stream->fragment.index_range_start,
3689 stream->fragment.index_range_end, NULL);
3690 stream->downloading_index = FALSE;
3691 }
3692 }
3693
3694 return ret;
3695 }
3696
3697 /* must be called with manifest_lock taken.
3698 * Can temporarily release manifest_lock
3699 */
3700 static GstFlowReturn
gst_adaptive_demux_stream_download_fragment(GstAdaptiveDemuxStream * stream)3701 gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
3702 {
3703 GstAdaptiveDemux *demux = stream->demux;
3704 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3705 gchar *url = NULL;
3706 GstFlowReturn ret;
3707 gboolean retried_once = FALSE, live;
3708 guint http_status;
3709 guint last_status_code;
3710
3711 /* FIXME : */
3712 /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
3713 stream->starting_fragment = TRUE;
3714 stream->last_ret = GST_FLOW_OK;
3715 stream->first_fragment_buffer = TRUE;
3716
3717 GST_DEBUG_OBJECT (stream->pad, "Downloading %s%s%s",
3718 stream->fragment.uri ? "FRAGMENT " : "",
3719 stream->fragment.header_uri ? "HEADER " : "",
3720 stream->fragment.index_uri ? "INDEX" : "");
3721
3722 if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
3723 stream->fragment.index_uri == NULL)
3724 goto no_url_error;
3725
3726 if (stream->need_header) {
3727 ret = gst_adaptive_demux_stream_download_header_fragment (stream);
3728 if (ret != GST_FLOW_OK) {
3729 return ret;
3730 }
3731 stream->need_header = FALSE;
3732 }
3733
3734 again:
3735 ret = GST_FLOW_OK;
3736 url = stream->fragment.uri;
3737 GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream);
3738 if (!url)
3739 return ret;
3740
3741 stream->last_ret = GST_FLOW_OK;
3742 http_status = 200;
3743
3744 /* Download the actual fragment, either in fragments or in one go */
3745 if (klass->need_another_chunk && klass->need_another_chunk (stream)
3746 && stream->fragment.chunk_size != 0) {
3747 /* Handle chunk downloading */
3748 gint64 range_start, range_end, chunk_start, chunk_end;
3749 guint64 download_total_bytes;
3750 gint chunk_size = stream->fragment.chunk_size;
3751
3752 range_start = chunk_start = stream->fragment.range_start;
3753 range_end = stream->fragment.range_end;
3754 /* HTTP ranges are inclusive for the end */
3755 if (chunk_size != -1)
3756 chunk_end = range_start + chunk_size - 1;
3757 else
3758 chunk_end = range_end;
3759
3760 if (range_end != -1)
3761 chunk_end = MIN (chunk_end, range_end);
3762
3763 while (!stream->fragment.finished && (chunk_start <= range_end
3764 || range_end == -1)) {
3765 download_total_bytes = stream->download_total_bytes;
3766
3767 ret =
3768 gst_adaptive_demux_stream_download_uri (demux, stream, url,
3769 chunk_start, chunk_end, &http_status);
3770
3771 GST_DEBUG_OBJECT (stream->pad,
3772 "Fragment chunk download result: %d (%d) %s", stream->last_ret,
3773 http_status, gst_flow_get_name (stream->last_ret));
3774
3775 /* Don't retry for any chunks except the first. We would have sent
3776 * data downstream already otherwise and it's difficult to recover
3777 * from that in a meaningful way */
3778 if (chunk_start > range_start)
3779 retried_once = TRUE;
3780
3781 /* FIXME: Check for 416 Range Not Satisfiable here and fall back to
3782 * downloading up to -1. We don't know the full duration.
3783 * Needs https://bugzilla.gnome.org/show_bug.cgi?id=756806 */
3784 if (ret != GST_FLOW_OK && chunk_end == -1) {
3785 break;
3786 } else if (ret != GST_FLOW_OK) {
3787 chunk_end = -1;
3788 stream->last_ret = GST_FLOW_OK;
3789 continue;
3790 }
3791
3792 if (chunk_end == -1)
3793 break;
3794
3795 /* Short read, we're at the end now */
3796 if (stream->download_total_bytes - download_total_bytes <
3797 chunk_end + 1 - chunk_start)
3798 break;
3799
3800 if (!klass->need_another_chunk (stream))
3801 break;
3802
3803 /* HTTP ranges are inclusive for the end */
3804 chunk_start += chunk_size;
3805 chunk_size = stream->fragment.chunk_size;
3806 if (chunk_size != -1)
3807 chunk_end = chunk_start + chunk_size - 1;
3808 else
3809 chunk_end = range_end;
3810
3811 if (range_end != -1)
3812 chunk_end = MIN (chunk_end, range_end);
3813 }
3814 } else {
3815 ret =
3816 gst_adaptive_demux_stream_download_uri (demux, stream, url,
3817 stream->fragment.range_start, stream->fragment.range_end, &http_status);
3818 GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s",
3819 stream->last_ret, http_status, gst_flow_get_name (stream->last_ret));
3820 }
3821 if (ret == GST_FLOW_OK)
3822 goto beach;
3823
3824 g_mutex_lock (&stream->fragment_download_lock);
3825 if (G_UNLIKELY (stream->cancelled)) {
3826 g_mutex_unlock (&stream->fragment_download_lock);
3827 return ret;
3828 }
3829 g_mutex_unlock (&stream->fragment_download_lock);
3830
3831 /* TODO check if we are truly stopping */
3832 if (ret != GST_FLOW_CUSTOM_ERROR)
3833 goto beach;
3834
3835 last_status_code = stream->last_status_code;
3836 GST_WARNING_OBJECT (stream->pad, "Got custom error, status %u, dc %d",
3837 last_status_code, stream->download_error_count);
3838
3839 live = gst_adaptive_demux_is_live (demux);
3840
3841 #ifdef OHOS_EXT_FUNC
3842 // ohos.ext.func.0013
3843 if (last_status_code == 408) {
3844 GST_WARNING_OBJECT (stream->pad, "Receive error of souphttpsrc, status_code is %u", last_status_code);
3845 stream->download_error_count = MAX_DOWNLOAD_ERROR_COUNT;
3846 return GST_FLOW_ERROR;
3847 }
3848 #endif
3849
3850 if (!retried_once && ((last_status_code / 100 == 4 && live)
3851 || last_status_code / 100 == 5)) {
3852 /* 4xx/5xx */
3853 /* if current position is before available start, switch to next */
3854 if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream))
3855 goto flushing;
3856
3857 if (live) {
3858 gint64 range_start, range_stop;
3859
3860 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
3861 &range_stop))
3862 goto flushing;
3863
3864 if (demux->segment.position < range_start) {
3865 GST_DEBUG_OBJECT (stream->pad, "Retrying once with next segment");
3866 stream->last_ret = GST_FLOW_OK;
3867 ret = gst_adaptive_demux_eos_handling (stream);
3868 GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3869 gst_flow_get_name (ret));
3870 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3871 ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3872 GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3873 gst_flow_get_name (ret));
3874 if (ret == GST_FLOW_OK) {
3875 retried_once = TRUE;
3876 goto again;
3877 }
3878 } else if (demux->segment.position > range_stop) {
3879 /* wait a bit to be in range, we don't have any locks at that point */
3880 gint64 wait_time =
3881 gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3882 if (wait_time > 0) {
3883 gint64 end_time = g_get_monotonic_time () + wait_time / GST_USECOND;
3884
3885 GST_DEBUG_OBJECT (stream->pad,
3886 "Download waiting for %" GST_TIME_FORMAT,
3887 GST_TIME_ARGS (wait_time));
3888
3889 GST_MANIFEST_UNLOCK (demux);
3890 g_mutex_lock (&stream->fragment_download_lock);
3891 if (G_UNLIKELY (stream->cancelled)) {
3892 g_mutex_unlock (&stream->fragment_download_lock);
3893 GST_MANIFEST_LOCK (demux);
3894 stream->last_ret = GST_FLOW_FLUSHING;
3895 goto flushing;
3896 }
3897 do {
3898 g_cond_wait_until (&stream->fragment_download_cond,
3899 &stream->fragment_download_lock, end_time);
3900 if (G_UNLIKELY (stream->cancelled)) {
3901 g_mutex_unlock (&stream->fragment_download_lock);
3902 GST_MANIFEST_LOCK (demux);
3903 stream->last_ret = GST_FLOW_FLUSHING;
3904 goto flushing;
3905 }
3906 } while (!stream->download_finished);
3907 g_mutex_unlock (&stream->fragment_download_lock);
3908
3909 GST_MANIFEST_LOCK (demux);
3910 }
3911 }
3912 }
3913
3914 flushing:
3915 if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
3916 /* looks like there is no way of knowing when a live stream has ended
3917 * Have to assume we are falling behind and cause a manifest reload */
3918 GST_DEBUG_OBJECT (stream->pad, "Converting error of live stream to EOS");
3919 return GST_FLOW_EOS;
3920 }
3921 } else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
3922 /* If this is the last fragment, consider failures EOS and not actual
3923 * errors. Due to rounding errors in the durations, the last fragment
3924 * might not actually exist */
3925 GST_DEBUG_OBJECT (stream->pad, "Converting error for last fragment to EOS");
3926 return GST_FLOW_EOS;
3927 } else {
3928 /* retry once (same segment) for 5xx (server errors) */
3929 if (!retried_once) {
3930 retried_once = TRUE;
3931 /* wait a short time in case the server needs a bit to recover, we don't
3932 * care if we get woken up before end time. We can use sleep here since
3933 * we're already blocking and just want to wait some time. */
3934 g_usleep (100000); /* a tenth of a second */
3935 goto again;
3936 }
3937 }
3938
3939 beach:
3940 return ret;
3941
3942 no_url_error:
3943 {
3944 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
3945 (_("Failed to get fragment URL.")),
3946 ("An error happened when getting fragment URL"));
3947 gst_task_stop (stream->download_task);
3948 return GST_FLOW_ERROR;
3949 }
3950 }
3951
3952 /* this function will take the manifest_lock and will keep it until the end.
3953 * It will release it temporarily only when going to sleep.
3954 * Every time it takes the manifest_lock, it will check for cancelled condition
3955 */
3956 static void
gst_adaptive_demux_stream_download_loop(GstAdaptiveDemuxStream * stream)3957 gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
3958 {
3959 GstAdaptiveDemux *demux = stream->demux;
3960 GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
3961 GstFlowReturn ret;
3962 gboolean live;
3963
3964 GST_LOG_OBJECT (stream->pad, "download loop start");
3965
3966 GST_MANIFEST_LOCK (demux);
3967
3968 g_mutex_lock (&stream->fragment_download_lock);
3969 if (G_UNLIKELY (stream->cancelled)) {
3970 stream->last_ret = GST_FLOW_FLUSHING;
3971 g_mutex_unlock (&stream->fragment_download_lock);
3972 goto cancelled;
3973 }
3974 g_mutex_unlock (&stream->fragment_download_lock);
3975
3976 /* Check if we're done with our segment */
3977 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3978 if (demux->segment.rate > 0) {
3979 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
3980 && stream->segment.position >= stream->segment.stop) {
3981 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3982 ret = GST_FLOW_EOS;
3983 gst_task_stop (stream->download_task);
3984 goto end_of_manifest;
3985 }
3986 } else {
3987 if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
3988 && stream->segment.position <= stream->segment.start) {
3989 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3990 ret = GST_FLOW_EOS;
3991 gst_task_stop (stream->download_task);
3992 goto end_of_manifest;
3993 }
3994 }
3995 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3996
3997 /* Cleanup old streams if any */
3998 if (G_UNLIKELY (demux->priv->old_streams != NULL)) {
3999 GList *old_streams = demux->priv->old_streams;
4000 demux->priv->old_streams = NULL;
4001
4002 GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams");
4003 g_list_free_full (old_streams,
4004 (GDestroyNotify) gst_adaptive_demux_stream_free);
4005 GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams (done)");
4006
4007 /* gst_adaptive_demux_stream_free had temporarily released the manifest_lock.
4008 * Recheck the cancelled flag.
4009 */
4010 g_mutex_lock (&stream->fragment_download_lock);
4011 if (G_UNLIKELY (stream->cancelled)) {
4012 stream->last_ret = GST_FLOW_FLUSHING;
4013 g_mutex_unlock (&stream->fragment_download_lock);
4014 goto cancelled;
4015 }
4016 g_mutex_unlock (&stream->fragment_download_lock);
4017 }
4018
4019 /* Restarting download, figure out new position
4020 * FIXME : Move this to a separate function ? */
4021 if (G_UNLIKELY (stream->restart_download)) {
4022 GstEvent *seg_event;
4023 GstClockTime cur, ts = 0;
4024 gint64 pos;
4025
4026 GST_DEBUG_OBJECT (stream->pad,
4027 "Activating stream due to reconfigure event");
4028
4029 if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
4030 ts = (GstClockTime) pos;
4031 GST_DEBUG_OBJECT (demux, "Downstream position: %"
4032 GST_TIME_FORMAT, GST_TIME_ARGS (ts));
4033 } else {
4034 /* query other pads as some faulty element in the pad's branch might
4035 * reject position queries. This should be better than using the
4036 * demux segment position that can be much ahead */
4037 GList *iter;
4038
4039 for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
4040 GstAdaptiveDemuxStream *cur_stream =
4041 (GstAdaptiveDemuxStream *) iter->data;
4042
4043 if (gst_pad_peer_query_position (cur_stream->pad, GST_FORMAT_TIME,
4044 &pos)) {
4045 ts = (GstClockTime) pos;
4046 GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
4047 GST_TIME_FORMAT, GST_TIME_ARGS (ts));
4048 break;
4049 }
4050 }
4051 }
4052
4053 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4054 cur =
4055 gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
4056 stream->segment.position);
4057
4058 /* we might have already pushed this data */
4059 ts = MAX (ts, cur);
4060
4061 GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
4062 "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
4063
4064 if (GST_CLOCK_TIME_IS_VALID (ts)) {
4065 GstClockTime offset, period_start;
4066
4067 offset =
4068 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4069 period_start = gst_adaptive_demux_get_period_start_time (demux);
4070
4071 /* TODO check return */
4072 gst_adaptive_demux_stream_seek (demux, stream, demux->segment.rate >= 0,
4073 0, ts, &ts);
4074
4075 stream->segment.position = ts - period_start + offset;
4076 }
4077
4078 /* The stream's segment is still correct except for
4079 * the position, so let's send a new one with the
4080 * updated position */
4081 seg_event = gst_event_new_segment (&stream->segment);
4082 gst_event_set_seqnum (seg_event, demux->priv->segment_seqnum);
4083 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4084
4085 GST_DEBUG_OBJECT (stream->pad, "Sending restart segment: %"
4086 GST_PTR_FORMAT, seg_event);
4087 gst_pad_push_event (stream->pad, seg_event);
4088
4089 stream->discont = TRUE;
4090 stream->restart_download = FALSE;
4091 }
4092
4093 live = gst_adaptive_demux_is_live (demux);
4094
4095 /* Get information about the fragment to download */
4096 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
4097 ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
4098 GST_DEBUG_OBJECT (stream->pad, "Fragment info update result: %d %s",
4099 ret, gst_flow_get_name (ret));
4100 if (ret == GST_FLOW_OK) {
4101
4102 /* wait for live fragments to be available */
4103 if (live) {
4104 gint64 wait_time =
4105 gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
4106 if (wait_time > 0) {
4107 GstClockTime end_time =
4108 gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
4109
4110 GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
4111 GST_TIME_ARGS (wait_time));
4112
4113 GST_MANIFEST_UNLOCK (demux);
4114
4115 g_mutex_lock (&stream->fragment_download_lock);
4116 if (G_UNLIKELY (stream->cancelled)) {
4117 g_mutex_unlock (&stream->fragment_download_lock);
4118 GST_MANIFEST_LOCK (demux);
4119 stream->last_ret = GST_FLOW_FLUSHING;
4120 goto cancelled;
4121 }
4122 gst_adaptive_demux_wait_until (demux->realtime_clock,
4123 &stream->fragment_download_cond, &stream->fragment_download_lock,
4124 end_time);
4125 g_mutex_unlock (&stream->fragment_download_lock);
4126
4127 GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
4128
4129 GST_MANIFEST_LOCK (demux);
4130
4131 g_mutex_lock (&stream->fragment_download_lock);
4132 if (G_UNLIKELY (stream->cancelled)) {
4133 stream->last_ret = GST_FLOW_FLUSHING;
4134 g_mutex_unlock (&stream->fragment_download_lock);
4135 goto cancelled;
4136 }
4137 g_mutex_unlock (&stream->fragment_download_lock);
4138 }
4139 }
4140
4141 stream->last_ret = GST_FLOW_OK;
4142
4143 next_download = gst_adaptive_demux_get_monotonic_time (demux);
4144 ret = gst_adaptive_demux_stream_download_fragment (stream);
4145
4146 if (ret == GST_FLOW_FLUSHING) {
4147 g_mutex_lock (&stream->fragment_download_lock);
4148 if (G_UNLIKELY (stream->cancelled)) {
4149 stream->last_ret = GST_FLOW_FLUSHING;
4150 g_mutex_unlock (&stream->fragment_download_lock);
4151 goto cancelled;
4152 }
4153 g_mutex_unlock (&stream->fragment_download_lock);
4154 }
4155
4156 } else {
4157 stream->last_ret = ret;
4158 }
4159
4160 switch (ret) {
4161 case GST_FLOW_OK:
4162 break; /* all is good, let's go */
4163 case GST_FLOW_EOS:
4164 GST_DEBUG_OBJECT (stream->pad, "EOS, checking to stop download loop");
4165
4166 /* we push the EOS after releasing the object lock */
4167 if (gst_adaptive_demux_is_live (demux)
4168 && (demux->segment.rate == 1.0
4169 || gst_adaptive_demux_stream_in_live_seek_range (demux,
4170 stream))) {
4171 GstAdaptiveDemuxClass *demux_class =
4172 GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4173
4174 /* this might be a fragment download error, refresh the manifest, just in case */
4175 if (!demux_class->requires_periodical_playlist_update (demux)) {
4176 ret = gst_adaptive_demux_update_manifest (demux);
4177 break;
4178 /* Wait only if we can ensure current manifest has been expired.
4179 * The meaning "we have next period" *WITH* EOS is that, current
4180 * period has been ended but we can continue to the next period */
4181 } else if (!gst_adaptive_demux_has_next_period (demux) &&
4182 gst_adaptive_demux_stream_wait_manifest_update (demux, stream)) {
4183 goto end;
4184 }
4185 gst_task_stop (stream->download_task);
4186 if (stream->replaced) {
4187 goto end;
4188 }
4189 } else {
4190 gst_task_stop (stream->download_task);
4191 }
4192
4193 if (gst_adaptive_demux_combine_flows (demux) == GST_FLOW_EOS) {
4194 if (gst_adaptive_demux_has_next_period (demux)) {
4195 GST_DEBUG_OBJECT (stream->pad,
4196 "Next period available, not sending EOS");
4197 gst_adaptive_demux_advance_period (demux);
4198 ret = GST_FLOW_OK;
4199 }
4200 }
4201 break;
4202
4203 case GST_FLOW_NOT_LINKED:
4204 {
4205 GstFlowReturn ret;
4206 gst_task_stop (stream->download_task);
4207
4208 ret = gst_adaptive_demux_combine_flows (demux);
4209 if (ret == GST_FLOW_NOT_LINKED) {
4210 GST_ELEMENT_FLOW_ERROR (demux, ret);
4211 }
4212 }
4213 break;
4214
4215 case GST_FLOW_FLUSHING:{
4216 GList *iter;
4217
4218 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4219 GstAdaptiveDemuxStream *other;
4220
4221 other = iter->data;
4222 gst_task_stop (other->download_task);
4223 }
4224 }
4225 break;
4226
4227 default:
4228 if (ret <= GST_FLOW_ERROR) {
4229 gboolean is_live = gst_adaptive_demux_is_live (demux);
4230 GST_WARNING_OBJECT (demux, "Error while downloading fragment");
4231 if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
4232 goto download_error;
4233 }
4234
4235 g_clear_error (&stream->last_error);
4236
4237 /* First try to update the playlist for non-live playlists
4238 * in case the URIs have changed in the meantime. But only
4239 * try it the first time, after that we're going to wait a
4240 * a bit to not flood the server */
4241 if (stream->download_error_count == 1 && !is_live) {
4242 /* TODO hlsdemux had more options to this function (boolean and err) */
4243
4244 if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
4245 /* Retry immediately, the playlist actually has changed */
4246 GST_DEBUG_OBJECT (demux, "Updated the playlist");
4247 goto end;
4248 }
4249 }
4250
4251 /* Wait half the fragment duration before retrying */
4252 next_download += stream->fragment.duration / 2;
4253
4254 GST_MANIFEST_UNLOCK (demux);
4255
4256 g_mutex_lock (&stream->fragment_download_lock);
4257 if (G_UNLIKELY (stream->cancelled)) {
4258 g_mutex_unlock (&stream->fragment_download_lock);
4259 GST_MANIFEST_LOCK (demux);
4260 stream->last_ret = GST_FLOW_FLUSHING;
4261 goto cancelled;
4262 }
4263 gst_adaptive_demux_wait_until (demux->realtime_clock,
4264 &stream->fragment_download_cond, &stream->fragment_download_lock,
4265 next_download);
4266 g_mutex_unlock (&stream->fragment_download_lock);
4267
4268 GST_DEBUG_OBJECT (demux, "Retrying now");
4269
4270 GST_MANIFEST_LOCK (demux);
4271
4272 g_mutex_lock (&stream->fragment_download_lock);
4273 if (G_UNLIKELY (stream->cancelled)) {
4274 stream->last_ret = GST_FLOW_FLUSHING;
4275 g_mutex_unlock (&stream->fragment_download_lock);
4276 goto cancelled;
4277 }
4278 g_mutex_unlock (&stream->fragment_download_lock);
4279
4280 /* Refetch the playlist now after we waited */
4281 if (!is_live
4282 && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
4283 GST_DEBUG_OBJECT (demux, "Updated the playlist");
4284 }
4285 goto end;
4286 }
4287 break;
4288 }
4289
4290 end_of_manifest:
4291 if (G_UNLIKELY (ret == GST_FLOW_EOS)) {
4292 if (GST_OBJECT_PARENT (stream->pad) != NULL) {
4293 if (demux->next_streams == NULL && demux->prepared_streams == NULL) {
4294 GST_DEBUG_OBJECT (stream->src, "Pushing EOS on pad");
4295 gst_adaptive_demux_stream_push_event (stream, gst_event_new_eos ());
4296 } else {
4297 GST_DEBUG_OBJECT (stream->src,
4298 "Stream is EOS, but we're switching fragments. Not sending.");
4299 }
4300 } else {
4301 GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
4302 goto download_error;
4303 }
4304 }
4305
4306 end:
4307 GST_MANIFEST_UNLOCK (demux);
4308 GST_LOG_OBJECT (stream->pad, "download loop end");
4309 return;
4310
4311 cancelled:
4312 {
4313 GST_DEBUG_OBJECT (stream->pad, "Stream has been cancelled");
4314 goto end;
4315 }
4316 download_error:
4317 {
4318 GstMessage *msg;
4319
4320 if (stream->last_error) {
4321 gchar *debug = g_strdup_printf ("Error on stream %s:%s",
4322 GST_DEBUG_PAD_NAME (stream->pad));
4323 msg =
4324 gst_message_new_error (GST_OBJECT_CAST (demux), stream->last_error,
4325 debug);
4326 GST_ERROR_OBJECT (stream->pad, "Download error: %s",
4327 stream->last_error->message);
4328 g_free (debug);
4329 } else {
4330 GError *err =
4331 g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
4332 _("Couldn't download fragments"));
4333 msg =
4334 gst_message_new_error (GST_OBJECT_CAST (demux), err,
4335 "Fragment downloading has failed consecutive times");
4336 g_error_free (err);
4337 GST_ERROR_OBJECT (stream->pad,
4338 "Download error: Couldn't download fragments, too many failures");
4339 }
4340
4341 gst_task_stop (stream->download_task);
4342 if (stream->src) {
4343 GstElement *src = stream->src;
4344
4345 stream->src = NULL;
4346 GST_MANIFEST_UNLOCK (demux);
4347 gst_element_set_locked_state (src, TRUE);
4348 gst_element_set_state (src, GST_STATE_NULL);
4349 gst_bin_remove (GST_BIN_CAST (demux), src);
4350 GST_MANIFEST_LOCK (demux);
4351 }
4352
4353 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
4354
4355 goto end;
4356 }
4357 }
4358
4359 static void
gst_adaptive_demux_updates_loop(GstAdaptiveDemux * demux)4360 gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
4361 {
4362 GstClockTime next_update;
4363 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4364
4365 /* Loop for updating of the playlist. This periodically checks if
4366 * the playlist is updated and does so, then signals the streaming
4367 * thread in case it can continue downloading now. */
4368
4369 /* block until the next scheduled update or the signal to quit this thread */
4370 GST_DEBUG_OBJECT (demux, "Started updates task");
4371
4372 GST_MANIFEST_LOCK (demux);
4373
4374 next_update =
4375 gst_adaptive_demux_get_monotonic_time (demux) +
4376 klass->get_manifest_update_interval (demux) * GST_USECOND;
4377
4378 /* Updating playlist only needed for live playlists */
4379 while (gst_adaptive_demux_is_live (demux)) {
4380 GstFlowReturn ret = GST_FLOW_OK;
4381
4382 /* Wait here until we should do the next update or we're cancelled */
4383 GST_DEBUG_OBJECT (demux, "Wait for next playlist update");
4384
4385 GST_MANIFEST_UNLOCK (demux);
4386
4387 g_mutex_lock (&demux->priv->updates_timed_lock);
4388 if (demux->priv->stop_updates_task) {
4389 g_mutex_unlock (&demux->priv->updates_timed_lock);
4390 goto quit;
4391 }
4392 gst_adaptive_demux_wait_until (demux->realtime_clock,
4393 &demux->priv->updates_timed_cond,
4394 &demux->priv->updates_timed_lock, next_update);
4395 g_mutex_unlock (&demux->priv->updates_timed_lock);
4396
4397 g_mutex_lock (&demux->priv->updates_timed_lock);
4398 if (demux->priv->stop_updates_task) {
4399 g_mutex_unlock (&demux->priv->updates_timed_lock);
4400 goto quit;
4401 }
4402 g_mutex_unlock (&demux->priv->updates_timed_lock);
4403
4404 GST_MANIFEST_LOCK (demux);
4405
4406 GST_DEBUG_OBJECT (demux, "Updating playlist");
4407
4408 ret = gst_adaptive_demux_update_manifest (demux);
4409
4410 if (ret == GST_FLOW_EOS) {
4411 } else if (ret != GST_FLOW_OK) {
4412 /* update_failed_count is used only here, no need to protect it */
4413 demux->priv->update_failed_count++;
4414 if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
4415 GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
4416 gst_flow_get_name (ret));
4417 next_update = gst_adaptive_demux_get_monotonic_time (demux)
4418 + klass->get_manifest_update_interval (demux) * GST_USECOND;
4419 } else {
4420 GST_ELEMENT_ERROR (demux, STREAM, FAILED,
4421 (_("Internal data stream error.")), ("Could not update playlist"));
4422 GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
4423 gst_task_stop (demux->priv->updates_task);
4424 GST_MANIFEST_UNLOCK (demux);
4425 goto end;
4426 }
4427 } else {
4428 GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
4429 demux->priv->update_failed_count = 0;
4430 next_update =
4431 gst_adaptive_demux_get_monotonic_time (demux) +
4432 klass->get_manifest_update_interval (demux) * GST_USECOND;
4433
4434 /* Wake up download tasks */
4435 g_mutex_lock (&demux->priv->manifest_update_lock);
4436 g_cond_broadcast (&demux->priv->manifest_cond);
4437 g_mutex_unlock (&demux->priv->manifest_update_lock);
4438 }
4439 }
4440
4441 GST_MANIFEST_UNLOCK (demux);
4442
4443 quit:
4444 {
4445 GST_DEBUG_OBJECT (demux, "Stop updates task request detected.");
4446 }
4447
4448 end:
4449 {
4450 return;
4451 }
4452 }
4453
4454 /* must be called with manifest_lock taken */
4455 static gboolean
gst_adaptive_demux_stream_push_event(GstAdaptiveDemuxStream * stream,GstEvent * event)4456 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
4457 GstEvent * event)
4458 {
4459 gboolean ret;
4460 GstPad *pad;
4461 GstAdaptiveDemux *demux = stream->demux;
4462
4463 if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
4464 stream->eos = TRUE;
4465 }
4466
4467 pad = gst_object_ref (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream));
4468
4469 /* Can't push events holding the manifest lock */
4470 GST_MANIFEST_UNLOCK (demux);
4471
4472 GST_DEBUG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
4473 "Pushing event %" GST_PTR_FORMAT, event);
4474
4475 ret = gst_pad_push_event (pad, event);
4476
4477 gst_object_unref (pad);
4478
4479 GST_MANIFEST_LOCK (demux);
4480
4481 return ret;
4482 }
4483
4484 /* must be called with manifest_lock taken */
4485 static gboolean
gst_adaptive_demux_is_live(GstAdaptiveDemux * demux)4486 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
4487 {
4488 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4489
4490 if (klass->is_live)
4491 return klass->is_live (demux);
4492 return FALSE;
4493 }
4494
4495 /* must be called with manifest_lock taken */
4496 static GstFlowReturn
gst_adaptive_demux_stream_seek(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,gboolean forward,GstSeekFlags flags,GstClockTime ts,GstClockTime * final_ts)4497 gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
4498 GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
4499 GstClockTime ts, GstClockTime * final_ts)
4500 {
4501 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4502
4503 if (klass->stream_seek)
4504 return klass->stream_seek (stream, forward, flags, ts, final_ts);
4505 return GST_FLOW_ERROR;
4506 }
4507
4508 /* must be called with manifest_lock taken */
4509 static gboolean
gst_adaptive_demux_stream_has_next_fragment(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4510 gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
4511 GstAdaptiveDemuxStream * stream)
4512 {
4513 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4514 gboolean ret = TRUE;
4515
4516 if (klass->stream_has_next_fragment)
4517 ret = klass->stream_has_next_fragment (stream);
4518
4519 return ret;
4520 }
4521
4522 /* must be called with manifest_lock taken */
4523 /* Called from:
4524 * the ::finish_fragment() handlers when an *actual* fragment is done
4525 * */
4526 GstFlowReturn
gst_adaptive_demux_stream_advance_fragment(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstClockTime duration)4527 gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
4528 GstAdaptiveDemuxStream * stream, GstClockTime duration)
4529 {
4530 GstFlowReturn ret;
4531
4532 if (stream->last_ret == GST_FLOW_OK) {
4533 stream->last_ret =
4534 gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
4535 duration);
4536 }
4537 ret = stream->last_ret;
4538
4539 return ret;
4540 }
4541
4542 /* must be called with manifest_lock taken */
4543 GstFlowReturn
gst_adaptive_demux_stream_advance_fragment_unlocked(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstClockTime duration)4544 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
4545 GstAdaptiveDemuxStream * stream, GstClockTime duration)
4546 {
4547 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4548 GstFlowReturn ret;
4549
4550 g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
4551
4552 GST_LOG_OBJECT (stream->pad,
4553 "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4554 GST_TIME_ARGS (stream->fragment.timestamp), GST_TIME_ARGS (duration));
4555
4556 stream->download_error_count = 0;
4557 g_clear_error (&stream->last_error);
4558
4559 /* FIXME - url has no indication of byte ranges for subsegments */
4560 /* FIXME : All those time statistics are biased, since they are calculated
4561 * *AFTER* the queue2, which might be blocking. They should ideally be
4562 * calculated *before* queue2 in the uri_handler_probe */
4563 gst_element_post_message (GST_ELEMENT_CAST (demux),
4564 gst_message_new_element (GST_OBJECT_CAST (demux),
4565 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
4566 "manifest-uri", G_TYPE_STRING,
4567 demux->manifest_uri, "uri", G_TYPE_STRING,
4568 stream->fragment.uri, "fragment-start-time",
4569 GST_TYPE_CLOCK_TIME, stream->download_start_time,
4570 "fragment-stop-time", GST_TYPE_CLOCK_TIME,
4571 gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
4572 stream->download_total_bytes, "fragment-download-time",
4573 GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
4574
4575 /* Don't update to the end of the segment if in reverse playback */
4576 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4577 if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
4578 GstClockTime offset =
4579 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4580 GstClockTime period_start =
4581 gst_adaptive_demux_get_period_start_time (demux);
4582
4583 stream->segment.position += duration;
4584
4585 /* Convert from position inside the stream's segment to the demuxer's
4586 * segment, they are not necessarily the same */
4587 if (stream->segment.position - offset + period_start >
4588 demux->segment.position)
4589 demux->segment.position =
4590 stream->segment.position - offset + period_start;
4591 }
4592 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4593
4594 /* When advancing with a non 1.0 rate on live streams, we need to check
4595 * the live seeking range again to make sure we can still advance to
4596 * that position */
4597 if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
4598 if (!gst_adaptive_demux_stream_in_live_seek_range (demux, stream))
4599 ret = GST_FLOW_EOS;
4600 else
4601 ret = klass->stream_advance_fragment (stream);
4602 } else if (gst_adaptive_demux_is_live (demux)
4603 || gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
4604 ret = klass->stream_advance_fragment (stream);
4605 } else {
4606 ret = GST_FLOW_EOS;
4607 }
4608
4609 stream->download_start_time =
4610 GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
4611
4612 if (ret == GST_FLOW_OK) {
4613 if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
4614 gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) {
4615 stream->need_header = TRUE;
4616 ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
4617 }
4618
4619 /* the subclass might want to switch pads */
4620 if (G_UNLIKELY (demux->next_streams)) {
4621 GList *iter;
4622 gboolean can_expose = TRUE;
4623
4624 gst_task_stop (stream->download_task);
4625
4626 ret = GST_FLOW_EOS;
4627
4628 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4629 /* Only expose if all streams are now cancelled or finished downloading */
4630 GstAdaptiveDemuxStream *other = iter->data;
4631 if (other != stream) {
4632 g_mutex_lock (&other->fragment_download_lock);
4633 can_expose &= (other->cancelled == TRUE
4634 || other->download_finished == TRUE);
4635 g_mutex_unlock (&other->fragment_download_lock);
4636 }
4637 }
4638
4639 if (can_expose) {
4640 GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
4641 "to do bitrate switching");
4642 gst_adaptive_demux_prepare_streams (demux, FALSE);
4643 gst_adaptive_demux_start_tasks (demux, TRUE);
4644 } else {
4645 GST_LOG_OBJECT (demux, "Not switching yet - ongoing downloads");
4646 }
4647 }
4648 }
4649
4650 return ret;
4651 }
4652
4653 /* must be called with manifest_lock taken */
4654 static gboolean
gst_adaptive_demux_stream_select_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,guint64 bitrate)4655 gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
4656 demux, GstAdaptiveDemuxStream * stream, guint64 bitrate)
4657 {
4658 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4659
4660 if (klass->stream_select_bitrate)
4661 return klass->stream_select_bitrate (stream, bitrate);
4662 return FALSE;
4663 }
4664
4665 /* must be called with manifest_lock taken */
4666 static GstFlowReturn
gst_adaptive_demux_stream_update_fragment_info(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4667 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
4668 GstAdaptiveDemuxStream * stream)
4669 {
4670 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4671 GstFlowReturn ret;
4672
4673 g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
4674 GST_FLOW_ERROR);
4675
4676 /* Make sure the sub-class will update bitrate, or else
4677 * we will later */
4678 stream->fragment.bitrate = 0;
4679 stream->fragment.finished = FALSE;
4680
4681 GST_LOG_OBJECT (stream->pad, "position %" GST_TIME_FORMAT,
4682 GST_TIME_ARGS (stream->segment.position));
4683
4684 ret = klass->stream_update_fragment_info (stream);
4685
4686 GST_LOG_OBJECT (stream->pad, "ret:%s uri:%s", gst_flow_get_name (ret),
4687 stream->fragment.uri);
4688 if (ret == GST_FLOW_OK) {
4689 GST_LOG_OBJECT (stream->pad,
4690 "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4691 GST_TIME_ARGS (stream->fragment.timestamp),
4692 GST_TIME_ARGS (stream->fragment.duration));
4693 GST_LOG_OBJECT (stream->pad,
4694 "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
4695 stream->fragment.range_start, stream->fragment.range_end);
4696 }
4697
4698 return ret;
4699 }
4700
4701 /* must be called with manifest_lock taken */
4702 static gint64
gst_adaptive_demux_stream_get_fragment_waiting_time(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4703 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux *
4704 demux, GstAdaptiveDemuxStream * stream)
4705 {
4706 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4707
4708 if (klass->stream_get_fragment_waiting_time)
4709 return klass->stream_get_fragment_waiting_time (stream);
4710 return 0;
4711 }
4712
4713 /* must be called with manifest_lock taken */
4714 static GstFlowReturn
gst_adaptive_demux_update_manifest_default(GstAdaptiveDemux * demux)4715 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4716 {
4717 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4718 GstFragment *download;
4719 GstBuffer *buffer;
4720 GstFlowReturn ret;
4721 GError *error = NULL;
4722
4723 download = gst_uri_downloader_fetch_uri (demux->downloader,
4724 demux->manifest_uri, NULL, TRUE, TRUE, TRUE, &error);
4725 if (download) {
4726 g_free (demux->manifest_uri);
4727 g_free (demux->manifest_base_uri);
4728 if (download->redirect_permanent && download->redirect_uri) {
4729 demux->manifest_uri = g_strdup (download->redirect_uri);
4730 demux->manifest_base_uri = NULL;
4731 } else {
4732 demux->manifest_uri = g_strdup (download->uri);
4733 demux->manifest_base_uri = g_strdup (download->redirect_uri);
4734 }
4735
4736 buffer = gst_fragment_get_buffer (download);
4737 g_object_unref (download);
4738 ret = klass->update_manifest_data (demux, buffer);
4739 gst_buffer_unref (buffer);
4740 /* FIXME: Should the manifest uri vars be reverted to original
4741 * values if updating fails? */
4742 } else {
4743 GST_WARNING_OBJECT (demux, "Failed to download manifest: %s",
4744 error->message);
4745 ret = GST_FLOW_NOT_LINKED;
4746 }
4747 g_clear_error (&error);
4748
4749 return ret;
4750 }
4751
4752 /* must be called with manifest_lock taken */
4753 static GstFlowReturn
gst_adaptive_demux_update_manifest(GstAdaptiveDemux * demux)4754 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4755 {
4756 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4757 GstFlowReturn ret;
4758
4759 ret = klass->update_manifest (demux);
4760
4761 if (ret == GST_FLOW_OK) {
4762 GstClockTime duration;
4763 /* Send an updated duration message */
4764 duration = klass->get_duration (demux);
4765 if (duration != GST_CLOCK_TIME_NONE) {
4766 GST_DEBUG_OBJECT (demux,
4767 "Sending duration message : %" GST_TIME_FORMAT,
4768 GST_TIME_ARGS (duration));
4769 gst_element_post_message (GST_ELEMENT (demux),
4770 gst_message_new_duration_changed (GST_OBJECT (demux)));
4771 } else {
4772 GST_DEBUG_OBJECT (demux,
4773 "Duration unknown, can not send the duration message");
4774 }
4775
4776 /* If a manifest changes it's liveness or periodic updateness, we need
4777 * to start/stop the manifest update task appropriately */
4778 /* Keep this condition in sync with the one in
4779 * gst_adaptive_demux_start_manifest_update_task()
4780 */
4781 if (gst_adaptive_demux_is_live (demux) &&
4782 klass->requires_periodical_playlist_update (demux)) {
4783 gst_adaptive_demux_start_manifest_update_task (demux);
4784 } else {
4785 gst_adaptive_demux_stop_manifest_update_task (demux);
4786 }
4787 }
4788
4789 return ret;
4790 }
4791
4792 void
gst_adaptive_demux_stream_fragment_clear(GstAdaptiveDemuxStreamFragment * f)4793 gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f)
4794 {
4795 g_free (f->uri);
4796 f->uri = NULL;
4797 f->range_start = 0;
4798 f->range_end = -1;
4799
4800 g_free (f->header_uri);
4801 f->header_uri = NULL;
4802 f->header_range_start = 0;
4803 f->header_range_end = -1;
4804
4805 g_free (f->index_uri);
4806 f->index_uri = NULL;
4807 f->index_range_start = 0;
4808 f->index_range_end = -1;
4809
4810 f->finished = FALSE;
4811 }
4812
4813 /* must be called with manifest_lock taken */
4814 static gboolean
gst_adaptive_demux_has_next_period(GstAdaptiveDemux * demux)4815 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4816 {
4817 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4818 gboolean ret = FALSE;
4819
4820 if (klass->has_next_period)
4821 ret = klass->has_next_period (demux);
4822 GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4823 return ret;
4824 }
4825
4826 /* must be called with manifest_lock taken */
4827 static void
gst_adaptive_demux_advance_period(GstAdaptiveDemux * demux)4828 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4829 {
4830 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4831
4832 g_return_if_fail (klass->advance_period != NULL);
4833
4834 GST_DEBUG_OBJECT (demux, "Advancing to next period");
4835 klass->advance_period (demux);
4836 gst_adaptive_demux_prepare_streams (demux, FALSE);
4837 gst_adaptive_demux_start_tasks (demux, TRUE);
4838 }
4839
4840 /**
4841 * gst_adaptive_demux_get_monotonic_time:
4842 * Returns: a monotonically increasing time, using the system realtime clock
4843 */
4844 GstClockTime
gst_adaptive_demux_get_monotonic_time(GstAdaptiveDemux * demux)4845 gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux)
4846 {
4847 g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4848 return gst_clock_get_time (demux->realtime_clock);
4849 }
4850
4851 /**
4852 * gst_adaptive_demux_get_client_now_utc:
4853 * @demux: #GstAdaptiveDemux
4854 * Returns: the client's estimate of UTC
4855 *
4856 * Used to find the client's estimate of UTC, using the system realtime clock.
4857 */
4858 GDateTime *
gst_adaptive_demux_get_client_now_utc(GstAdaptiveDemux * demux)4859 gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
4860 {
4861 GstClockTime rtc_now;
4862 GDateTime *unix_datetime;
4863 GDateTime *result_datetime;
4864 gint64 utc_now_in_us;
4865
4866 rtc_now = gst_clock_get_time (demux->realtime_clock);
4867 utc_now_in_us = demux->clock_offset + GST_TIME_AS_USECONDS (rtc_now);
4868 unix_datetime =
4869 g_date_time_new_from_unix_utc (utc_now_in_us / G_TIME_SPAN_SECOND);
4870 result_datetime =
4871 g_date_time_add (unix_datetime, utc_now_in_us % G_TIME_SPAN_SECOND);
4872 g_date_time_unref (unix_datetime);
4873 return result_datetime;
4874 }
4875
4876 /**
4877 * gst_adaptive_demux_is_running
4878 * @demux: #GstAdaptiveDemux
4879 * Returns: whether the demuxer is processing data
4880 *
4881 * Returns FALSE if shutdown has started (transitioning down from
4882 * PAUSED), otherwise TRUE.
4883 */
4884 gboolean
gst_adaptive_demux_is_running(GstAdaptiveDemux * demux)4885 gst_adaptive_demux_is_running (GstAdaptiveDemux * demux)
4886 {
4887 return g_atomic_int_get (&demux->running);
4888 }
4889
4890 static GstAdaptiveDemuxTimer *
gst_adaptive_demux_timer_new(GCond * cond,GMutex * mutex)4891 gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
4892 {
4893 GstAdaptiveDemuxTimer *timer;
4894
4895 timer = g_slice_new (GstAdaptiveDemuxTimer);
4896 timer->fired = FALSE;
4897 timer->cond = cond;
4898 timer->mutex = mutex;
4899 g_atomic_int_set (&timer->ref_count, 1);
4900 return timer;
4901 }
4902
4903 static GstAdaptiveDemuxTimer *
gst_adaptive_demux_timer_ref(GstAdaptiveDemuxTimer * timer)4904 gst_adaptive_demux_timer_ref (GstAdaptiveDemuxTimer * timer)
4905 {
4906 g_return_val_if_fail (timer != NULL, NULL);
4907 g_atomic_int_inc (&timer->ref_count);
4908 return timer;
4909 }
4910
4911 static void
gst_adaptive_demux_timer_unref(GstAdaptiveDemuxTimer * timer)4912 gst_adaptive_demux_timer_unref (GstAdaptiveDemuxTimer * timer)
4913 {
4914 g_return_if_fail (timer != NULL);
4915 if (g_atomic_int_dec_and_test (&timer->ref_count)) {
4916 g_slice_free (GstAdaptiveDemuxTimer, timer);
4917 }
4918 }
4919
4920 /* gst_adaptive_demux_wait_until:
4921 * A replacement for g_cond_wait_until that uses the clock rather
4922 * than system time to control the duration of the sleep. Typically
4923 * clock is actually a #GstSystemClock, in which case this function
4924 * behaves exactly like g_cond_wait_until. Inside unit tests,
4925 * the clock is typically a #GstTestClock, which allows tests to run
4926 * in non-realtime.
4927 * This function must be called with mutex held.
4928 */
4929 static gboolean
gst_adaptive_demux_wait_until(GstClock * clock,GCond * cond,GMutex * mutex,GstClockTime end_time)4930 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
4931 GstClockTime end_time)
4932 {
4933 GstAdaptiveDemuxTimer *timer;
4934 gboolean fired;
4935 GstClockReturn res;
4936
4937 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
4938 /* for an invalid time, gst_clock_id_wait_async will try to call
4939 * gst_adaptive_demux_clock_callback from the current thread.
4940 * It still holds the mutex while doing that, so it will deadlock.
4941 * g_cond_wait_until would return immediately with false, so we'll do the same.
4942 */
4943 return FALSE;
4944 }
4945 timer = gst_adaptive_demux_timer_new (cond, mutex);
4946 timer->clock_id = gst_clock_new_single_shot_id (clock, end_time);
4947 res =
4948 gst_clock_id_wait_async (timer->clock_id,
4949 gst_adaptive_demux_clock_callback, gst_adaptive_demux_timer_ref (timer),
4950 (GDestroyNotify) gst_adaptive_demux_timer_unref);
4951 /* clock does not support asynchronously wait. Assert and return */
4952 if (res == GST_CLOCK_UNSUPPORTED) {
4953 gst_clock_id_unref (timer->clock_id);
4954 gst_adaptive_demux_timer_unref (timer);
4955 g_return_val_if_reached (TRUE);
4956 }
4957 g_assert (!timer->fired);
4958 /* the gst_adaptive_demux_clock_callback() will signal the
4959 * cond when the clock's single shot timer fires, or the cond will be
4960 * signalled by another thread that wants to cause this wait to finish
4961 * early (e.g. to terminate the waiting thread).
4962 * There is no need for a while loop here, because that logic is
4963 * implemented by the function calling gst_adaptive_demux_wait_until() */
4964 g_cond_wait (cond, mutex);
4965 fired = timer->fired;
4966 if (!fired)
4967 gst_clock_id_unschedule (timer->clock_id);
4968 gst_clock_id_unref (timer->clock_id);
4969 gst_adaptive_demux_timer_unref (timer);
4970 return !fired;
4971 }
4972
4973 static gboolean
gst_adaptive_demux_clock_callback(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)4974 gst_adaptive_demux_clock_callback (GstClock * clock,
4975 GstClockTime time, GstClockID id, gpointer user_data)
4976 {
4977 GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
4978 g_return_val_if_fail (timer != NULL, FALSE);
4979 g_mutex_lock (timer->mutex);
4980 timer->fired = TRUE;
4981 g_cond_signal (timer->cond);
4982 g_mutex_unlock (timer->mutex);
4983 return TRUE;
4984 }
4985
4986 /**
4987 * gst_adaptive_demux_get_qos_earliest_time:
4988 *
4989 * Returns: The QOS earliest time
4990 *
4991 * Since: 1.20
4992 */
4993 GstClockTime
gst_adaptive_demux_get_qos_earliest_time(GstAdaptiveDemux * demux)4994 gst_adaptive_demux_get_qos_earliest_time (GstAdaptiveDemux * demux)
4995 {
4996 GstClockTime earliest;
4997
4998 GST_OBJECT_LOCK (demux);
4999 earliest = demux->priv->qos_earliest_time;
5000 GST_OBJECT_UNLOCK (demux);
5001
5002 return earliest;
5003 }
5004