1 /* GStreamer
2 *
3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21
22 /**
23 * SECTION:gstadaptivedemux
24 * @short_description: Base class for adaptive demuxers
25 *
26 * What is an adaptive demuxer?
27 * Adaptive demuxers are special demuxers in the sense that they don't
28 * actually demux data received from upstream but download the data
29 * themselves.
30 *
31 * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and
32 * a set of fragments. The manifest describes the available media and
33 * the sequence of fragments to use. Each fragment contains a small
34 * part of the media (typically only a few seconds). It is possible for
35 * the manifest to have the same media available in different configurations
36 * (bitrates for example) so that the client can select the one that
37 * best suits its scenario (network fluctuation, hardware requirements...).
38 * It is possible to switch from one representation of the media to another
39 * during playback. That's why it is called 'adaptive', because it can be
40 * adapted to the client's needs.
41 *
42 * Architectural overview:
43 * The manifest is received by the demuxer in its sink pad and, upon receiving
44 * EOS, it parses the manifest and exposes the streams available in it. For
45 * each stream a source element will be created and will download the list
46 * of fragments one by one. Once a fragment is finished downloading, the next
47 * URI is set to the source element and it starts fetching it and pushing
48 * through the stream's pad. This implies that each stream is independent from
49 * each other as it runs on a separate thread.
50 *
51 * After downloading each fragment, the download rate of it is calculated and
52 * the demuxer has a chance to switch to a different bitrate if needed. The
53 * switch can be done by simply pushing a new caps before the next fragment
54 * when codecs are the same, or by exposing a new pad group if it needs
55 * a codec change.
56 *
57 * Extra features:
58 * - Not linked streams: Streams that are not-linked have their download threads
59 * interrupted to save network bandwidth. When they are
60 * relinked a reconfigure event is received and the
61 * stream is restarted.
62 *
63 * Subclasses:
64 * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
65 * about the intrinsics of the subclass formats, so the subclasses are
66 * responsible for maintaining the manifest data structures and stream
67 * information.
68 */
69
70 /*
71 MT safety.
72 The following rules were observed while implementing MT safety in adaptive demux:
73 1. If a variable is accessed from multiple threads and at least one thread
74 writes to it, then all the accesses needs to be done from inside a critical section.
75 2. If thread A wants to join thread B then at the moment it calls gst_task_join
76 it must not hold any mutexes that thread B might take.
77
78 Adaptive demux API can be called from several threads. More, adaptive demux
79 starts some threads to monitor the download of fragments. In order to protect
80 accesses to shared variables (demux and streams) all the API functions that
81 can be run in different threads will need to get a mutex (manifest_lock)
82 when they start and release it when they end. Because some of those functions
83 can indirectly call other API functions (eg they can generate events or messages
84 that are processed in the same thread) the manifest_lock must be recursive.
85
86 The manifest_lock will serialize the public API making access to shared
87 variables safe. But some of these functions will try at some moment to join
88 threads created by adaptive demux, or to change the state of src elements
89 (which will block trying to join the src element streaming thread). Because
90 of rule 2, those functions will need to release the manifest_lock during the
91 call of gst_task_join. During this time they can be interrupted by other API calls.
92 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
93 is called and this will join all threads. In order to prevent interruptions
94 during such period, all the API functions will also use a second lock: api_lock.
95 This will be taken at the beginning of the function and released at the end,
96 but this time this lock will not be temporarily released during join.
97 This lock will be used only by API calls (not by gst_adaptive_demux_stream_download_loop
98 or gst_adaptive_demux_updates_loop or _src_chain or _src_event) so it is safe
99 to hold it while joining the threads or changing the src element state. The
100 api_lock will serialise all external requests to adaptive demux. In order to
101 avoid deadlocks, if a function needs to acquire both manifest and api locks,
102 the api_lock will be taken first and the manifest_lock second.
103
104 By using the api_lock a thread is protected against other API calls. But when
105 temporarily dropping the manifest_lock, it will be vulnerable to changes from
106 threads that use only the manifest_lock and not the api_lock. These threads run
107 one of the following functions: gst_adaptive_demux_stream_download_loop,
108 gst_adaptive_demux_updates_loop, _src_chain, _src_event. In order to guarantee
109 that all operations during an API call are not impacted by other writes, the
110 above mentioned functions must check a cancelled flag every time they reacquire
111 the manifest_lock. If the flag is set, they must exit immediately, without
112 performing any changes on the shared data. In this way, an API call (eg seek
113 request) can set the cancel flag before releasing the manifest_lock and be sure
114 that the demux object and its streams are not changed by anybody else.
115 */
116
117 #ifdef HAVE_CONFIG_H
118 #include "config.h"
119 #endif
120
121 #include "gstadaptivedemux.h"
122 #include "gst/gst-i18n-plugin.h"
123 #include <gst/base/gstadapter.h>
124
125 GST_DEBUG_CATEGORY (adaptivedemux_debug);
126 #define GST_CAT_DEFAULT adaptivedemux_debug
127
128 #define MAX_DOWNLOAD_ERROR_COUNT 3
129 #define DEFAULT_FAILED_COUNT 3
130 #define DEFAULT_CONNECTION_SPEED 0
131 #define DEFAULT_BITRATE_LIMIT 0.8f
132 #define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024 /* For safety. Large enough to hold a segment. */
133 #define NUM_LOOKBACK_FRAGMENTS 3
134
135 #ifdef OHOS_EXT_FUNC
136 // ohos.ext.func.0033
137 #define DEFAULT_RECONNECTION_TIMEOUT 3000000 // 3s
138 #endif
139
140 #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
141 #define GST_MANIFEST_LOCK(d) G_STMT_START { \
142 GST_TRACE("Locking from thread %p", g_thread_self()); \
143 g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d)); \
144 GST_TRACE("Locked from thread %p", g_thread_self()); \
145 } G_STMT_END
146
147 #define GST_MANIFEST_UNLOCK(d) G_STMT_START { \
148 GST_TRACE("Unlocking from thread %p", g_thread_self()); \
149 g_rec_mutex_unlock (GST_MANIFEST_GET_LOCK (d)); \
150 } G_STMT_END
151
152 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
153 #define GST_API_LOCK(d) g_mutex_lock (GST_API_GET_LOCK (d));
154 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
155
156 #define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
157 #define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
158 #define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
159
160 enum
161 {
162 PROP_0,
163 PROP_CONNECTION_SPEED,
164 PROP_BITRATE_LIMIT,
165 #ifdef OHOS_EXT_FUNC
166 // ohos.ext.func.0013
167 PROP_STATE_CHANGE,
168 PROP_EXIT_BLOCK,
169 #endif
170 #ifdef OHOS_EXT_FUNC
171 // ohos.ext.func.0033
172 PROP_RECONNECTION_TIMEOUT,
173 #endif
174 #ifdef OHOS_EXT_FUNC
175 // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
176 PROP_SLICE_POSITION,
177 PROP_DOWNLOAD_LOOP_START,
178 #endif
179 PROP_LAST
180 };
181
182 #ifdef OHOS_EXT_FUNC
183 // ohos.ext.func.0028
184 enum {
185 SIGNAL_BITRATE_PARSE_COMPLETE,
186 // ohos.ext.func.0034
187 SIGNAL_IS_LIVE_SCENE,
188 LAST_SIGNALS
189 };
190
191 static guint g_gst_adaptive_demux_signals[LAST_SIGNALS] = {0};
192 #endif
193
194 /* Internal, so not using GST_FLOW_CUSTOM_SUCCESS_N */
195 #define GST_ADAPTIVE_DEMUX_FLOW_SWITCH (GST_FLOW_CUSTOM_SUCCESS_2 + 1)
196
197 struct _GstAdaptiveDemuxPrivate
198 {
199 GstAdapter *input_adapter; /* protected by manifest_lock */
200 gint have_manifest; /* MT safe */
201
202 GList *old_streams; /* protected by manifest_lock */
203
204 GstTask *updates_task; /* MT safe */
205 GRecMutex updates_lock;
206 GMutex updates_timed_lock;
207 GCond updates_timed_cond; /* protected by updates_timed_lock */
208 gboolean stop_updates_task; /* protected by updates_timed_lock */
209
210 /* used only from updates_task, no need to protect it */
211 gint update_failed_count;
212
213 guint32 segment_seqnum; /* protected by manifest_lock */
214
215 /* main lock used to protect adaptive demux and all its streams.
216 * It serializes the adaptive demux public API.
217 */
218 GRecMutex manifest_lock;
219
220 /* condition to wait for manifest updates on a live stream.
221 * In order to signal the manifest_cond, the caller needs to hold both
222 * manifest_lock and manifest_update_lock (taken in this order)
223 */
224 GCond manifest_cond;
225 GMutex manifest_update_lock;
226
227 /* Lock and condition for prerolling streams before exposing */
228 GMutex preroll_lock;
229 GCond preroll_cond;
230 gint preroll_pending;
231
232 GMutex api_lock;
233
234 /* Protects demux and stream segment information
235 * Needed because seeks can update segment information
236 * without needing to stop tasks when they just want to
237 * update the segment boundaries */
238 GMutex segment_lock;
239
240 GstClockTime qos_earliest_time;
241 #ifdef OHOS_EXT_FUNC
242 // ohos.ext.func.0036
243 gboolean set_auto_bitrate_first;
244 #endif
245 };
246
247 typedef struct _GstAdaptiveDemuxTimer
248 {
249 gint ref_count;
250 GCond *cond;
251 GMutex *mutex;
252 GstClockID clock_id;
253 gboolean fired;
254 } GstAdaptiveDemuxTimer;
255
256 static GstBinClass *parent_class = NULL;
257 static gint private_offset = 0;
258
259 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
260 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
261 GstAdaptiveDemuxClass * klass);
262 static void gst_adaptive_demux_finalize (GObject * object);
263 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
264 element, GstStateChange transition);
265
266 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
267
268 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
269 GstEvent * event);
270 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
271 GstObject * parent, GstBuffer * buffer);
272 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
273 GstQuery * query);
274 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
275 GstEvent * event);
276
277 static gboolean
278 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
279
280 static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
281 static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
282 stream);
283 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
284 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
285 gboolean first_and_live);
286 static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
287 static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
288 static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
289 GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
290 GstClockTime ts, GstClockTime * final_ts);
291 static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
292 demux, GstAdaptiveDemuxStream * stream);
293 static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
294 demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
295 static GstFlowReturn
296 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
297 GstAdaptiveDemuxStream * stream);
298 static gint64
299 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux * demux,
300 GstAdaptiveDemuxStream * stream);
301 static GstFlowReturn gst_adaptive_demux_update_manifest (GstAdaptiveDemux *
302 demux);
303 static GstFlowReturn
304 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
305 static gboolean gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux);
306 static void gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux);
307
308 static void gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream);
309 static GstFlowReturn
310 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
311 GstEvent * event);
312
313 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
314 demux);
315 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
316 demux);
317
318 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
319 gboolean start_preroll_streams);
320 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
321 gboolean stop_updates);
322 static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
323 demux);
324 static void
325 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
326 stream, GstFlowReturn ret, GError * err);
327 static GstFlowReturn
328 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
329 GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
330 static GstFlowReturn
331 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
332 GstAdaptiveDemuxStream * stream);
333 static GstFlowReturn
334 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
335 GstAdaptiveDemuxStream * stream, GstClockTime duration);
336 static gboolean
337 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
338 GstClockTime end_time);
339 static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
340 GstClockTime time, GstClockID id, gpointer user_data);
341 static gboolean
342 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
343 * demux);
344
345 #ifdef OHOS_EXT_FUNC
346 // ohos.ext.func.0013
347 static void set_property_to_element (GstObject *elem, guint property_id, const void *property_value);
348 static void set_property_to_src_element (const GList *stream_list, guint property_id, const void *value);
349 static void set_property_to_src_and_download (GstAdaptiveDemux *demux, guint property_id, const void *property_value);
350 #endif
351
352 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
353 * method to get to the padtemplates */
354 GType
gst_adaptive_demux_get_type(void)355 gst_adaptive_demux_get_type (void)
356 {
357 static gsize type = 0;
358
359 if (g_once_init_enter (&type)) {
360 GType _type;
361 static const GTypeInfo info = {
362 sizeof (GstAdaptiveDemuxClass),
363 NULL,
364 NULL,
365 (GClassInitFunc) gst_adaptive_demux_class_init,
366 NULL,
367 NULL,
368 sizeof (GstAdaptiveDemux),
369 0,
370 (GInstanceInitFunc) gst_adaptive_demux_init,
371 };
372
373 _type = g_type_register_static (GST_TYPE_BIN,
374 "GstAdaptiveDemux", &info, G_TYPE_FLAG_ABSTRACT);
375
376 private_offset =
377 g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
378
379 g_once_init_leave (&type, _type);
380 }
381 return type;
382 }
383
384 static inline GstAdaptiveDemuxPrivate *
gst_adaptive_demux_get_instance_private(GstAdaptiveDemux * self)385 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
386 {
387 return (G_STRUCT_MEMBER_P (self, private_offset));
388 }
389
390 #ifdef OHOS_OPT_COMPAT
391 // ohos.ext.func.0029
392 static void
gst_adaptive_demux_set_exit_block(GstAdaptiveDemux * demux,const GValue * value)393 gst_adaptive_demux_set_exit_block (GstAdaptiveDemux *demux, const GValue *value)
394 {
395 gint exit_block = g_value_get_int (value);
396 GST_INFO ("adaptive exit_block :%d", exit_block);
397 if (demux->priv == NULL) {
398 GST_WARNING_OBJECT (demux, "adaptive exit_block :%d, demux->priv is null", exit_block);
399 return;
400 }
401 if (exit_block != 1) {
402 return;
403 }
404 GList *iter = NULL;
405 /* stop update task */
406 if (demux->priv->updates_task != NULL) {
407 gst_task_stop (demux->priv->updates_task);
408 demux->priv->updates_task = NULL;
409 }
410 g_mutex_lock (&demux->priv->updates_timed_lock);
411 demux->priv->stop_updates_task = TRUE;
412 g_cond_signal (&demux->priv->updates_timed_cond);
413 g_mutex_unlock (&demux->priv->updates_timed_lock);
414 if (demux->downloader != NULL) {
415 gst_uri_downloader_cancel (demux->downloader);
416 }
417 /* stop download tasks */
418 for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
419 GstAdaptiveDemuxStream *stream = iter->data;
420 if (stream == NULL) {
421 GST_WARNING_OBJECT (demux, "stream is null");
422 continue;
423 }
424
425 if (stream->download_task != NULL) {
426 gst_task_stop (stream->download_task);
427 }
428 g_mutex_lock (&stream->fragment_download_lock);
429 stream->cancelled = TRUE;
430 g_cond_signal (&stream->fragment_download_cond);
431 g_mutex_unlock (&stream->fragment_download_lock);
432 }
433 /* set exit_block to src element */
434 set_property_to_src_and_download (demux, PROP_EXIT_BLOCK, (void *) &exit_block);
435 }
436 #endif
437
438 static void
gst_adaptive_demux_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)439 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
440 const GValue * value, GParamSpec * pspec)
441 {
442 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
443
444 #ifdef OHOS_OPT_COMPAT
445 // ohos.opt.compat.0029
446 if (prop_id != PROP_EXIT_BLOCK) {
447 #endif
448 GST_API_LOCK (demux);
449 GST_MANIFEST_LOCK (demux);
450 #ifdef OHOS_OPT_COMPAT
451 // ohos.opt.compat.0029
452 }
453 #endif
454
455 switch (prop_id) {
456 case PROP_CONNECTION_SPEED:
457 #ifdef OHOS_EXT_FUNC
458 // ohos.ext.func.0028
459 demux->connection_speed = g_value_get_uint (value);
460 #else
461 demux->connection_speed = g_value_get_uint (value) * 1000;
462 #endif
463 GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
464 demux->connection_speed);
465 break;
466 case PROP_BITRATE_LIMIT:
467 demux->bitrate_limit = g_value_get_float (value);
468 break;
469 #ifdef OHOS_EXT_FUNC
470 // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
471 case PROP_SLICE_POSITION:
472 demux->slice_position = g_value_get_uint64 (value);
473 GST_WARNING_OBJECT (demux, "slice_position set to %" G_GUINT64_FORMAT, demux->slice_position);
474 break;
475 #endif
476 #ifdef OHOS_EXT_FUNC
477 // ohos.ext.func.0013
478 case PROP_STATE_CHANGE: {
479 gint state = g_value_get_int (value);
480 set_property_to_src_and_download(demux, prop_id, (void *)&state);
481 break;
482 }
483 case PROP_EXIT_BLOCK: {
484 // ohos.ext.func.0029
485 gst_adaptive_demux_set_exit_block(demux, value);
486 break;
487 }
488 #endif
489 #ifdef OHOS_EXT_FUNC
490 // ohos.ext.func.0033
491 case PROP_RECONNECTION_TIMEOUT: {
492 guint timeout = g_value_get_uint (value);
493 set_property_to_src_and_download(demux, prop_id, (void *)&timeout);
494 break;
495 }
496 #endif
497 default:
498 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
499 break;
500 }
501
502 #ifdef OHOS_OPT_COMPAT
503 // ohos.opt.compat.0029
504 if (prop_id != PROP_EXIT_BLOCK) {
505 #endif
506 GST_MANIFEST_UNLOCK (demux);
507 GST_API_UNLOCK (demux);
508 #ifdef OHOS_OPT_COMPAT
509 // ohos.opt.compat.0029
510 }
511 #endif
512 }
513
514 static void
gst_adaptive_demux_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)515 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
516 GValue * value, GParamSpec * pspec)
517 {
518 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
519
520 GST_MANIFEST_LOCK (demux);
521
522 #ifdef OHOS_EXT_FUNC
523 // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
524 gboolean start = FALSE;
525 #endif
526 switch (prop_id) {
527 case PROP_CONNECTION_SPEED:
528 g_value_set_uint (value, demux->connection_speed / 1000);
529 break;
530 case PROP_BITRATE_LIMIT:
531 g_value_set_float (value, demux->bitrate_limit);
532 break;
533 #ifdef OHOS_EXT_FUNC
534 // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
535 case PROP_DOWNLOAD_LOOP_START:
536 for (GList *iter = demux->streams; iter; iter = g_list_next (iter)) {
537 GstAdaptiveDemuxStream *stream = iter->data;
538 if (GST_TASK_STATE (stream->download_task) == GST_TASK_STARTED) {
539 start = TRUE;
540 break;
541 }
542 }
543 GST_DEBUG_OBJECT (demux, "download loop is %d", start);
544 g_value_set_boolean(value, start);
545 break;
546 #endif
547 default:
548 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
549 break;
550 }
551
552 GST_MANIFEST_UNLOCK (demux);
553 }
554
555 static void
gst_adaptive_demux_class_init(GstAdaptiveDemuxClass * klass)556 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
557 {
558 GObjectClass *gobject_class;
559 GstElementClass *gstelement_class;
560 GstBinClass *gstbin_class;
561
562 gobject_class = G_OBJECT_CLASS (klass);
563 gstelement_class = GST_ELEMENT_CLASS (klass);
564 gstbin_class = GST_BIN_CLASS (klass);
565
566 GST_DEBUG_CATEGORY_INIT (adaptivedemux_debug, "adaptivedemux", 0,
567 "Base Adaptive Demux");
568
569 parent_class = g_type_class_peek_parent (klass);
570
571 if (private_offset != 0)
572 g_type_class_adjust_private_offset (klass, &private_offset);
573
574 gobject_class->set_property = gst_adaptive_demux_set_property;
575 gobject_class->get_property = gst_adaptive_demux_get_property;
576 gobject_class->finalize = gst_adaptive_demux_finalize;
577
578 #ifdef OHOS_EXT_FUNC
579 // ohos.ext.func.0028
580 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
581 g_param_spec_uint ("connection-speed", "Connection Speed",
582 "Network connection speed in kbps (0 = calculate from downloaded"
583 " fragments)", 0, G_MAXUINT, DEFAULT_CONNECTION_SPEED,
584 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
585 #else
586 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
587 g_param_spec_uint ("connection-speed", "Connection Speed",
588 "Network connection speed in kbps (0 = calculate from downloaded"
589 " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
590 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
591 #endif
592
593 #ifdef OHOS_EXT_FUNC
594 // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
595 g_object_class_install_property (gobject_class, PROP_SLICE_POSITION,
596 g_param_spec_uint64 ("slice-position", "slice-position",
597 "slice-position", 0, G_MAXUINT64, 0,
598 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
599
600 g_object_class_install_property (gobject_class, PROP_DOWNLOAD_LOOP_START,
601 g_param_spec_boolean ("download-loop-start", "download-loop-start",
602 "download-loop-start", FALSE,
603 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
604 #endif
605
606 /* FIXME 2.0: rename this property to bandwidth-usage or any better name */
607 g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
608 g_param_spec_float ("bitrate-limit",
609 "Bitrate limit in %",
610 "Limit of the available bitrate to use when switching to alternates.",
611 0, 1, DEFAULT_BITRATE_LIMIT,
612 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
613
614 #ifdef OHOS_EXT_FUNC
615 // ohos.ext.func.0013
616 g_object_class_install_property (gobject_class, PROP_STATE_CHANGE,
617 g_param_spec_int ("state-change", "state-change from adaptive-demux",
618 "state-change from adaptive-demux", 0, (gint) (G_MAXINT32), 0,
619 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
620
621 g_object_class_install_property (gobject_class, PROP_EXIT_BLOCK,
622 g_param_spec_int ("exit-block", "EXIT BLOCK",
623 "souphttpsrc exit block", 0, (gint) (G_MAXINT32), 0,
624 G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
625 #endif
626
627 #ifdef OHOS_EXT_FUNC
628 // ohos.ext.func.0033
629 g_object_class_install_property (gobject_class, PROP_RECONNECTION_TIMEOUT,
630 g_param_spec_uint ("reconnection-timeout", "Reconnection-timeout",
631 "Value in seconds to timeout reconnection", 0, 3600000000, DEFAULT_RECONNECTION_TIMEOUT,
632 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
633 #endif
634
635 #ifdef OHOS_EXT_FUNC
636 // ohos.ext.func.0028
637 g_gst_adaptive_demux_signals[SIGNAL_BITRATE_PARSE_COMPLETE] =
638 g_signal_new("bitrate-parse-complete",
639 G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST,
640 0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_POINTER, G_TYPE_UINT);
641 #endif
642
643 #ifdef OHOS_EXT_FUNC
644 // ohos.ext.func.0034
645 g_gst_adaptive_demux_signals[SIGNAL_IS_LIVE_SCENE] =
646 g_signal_new("is-live-scene", G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST,
647 0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_BOOLEAN);
648 #endif
649
650 gstelement_class->change_state = gst_adaptive_demux_change_state;
651
652 gstbin_class->handle_message = gst_adaptive_demux_handle_message;
653
654 klass->data_received = gst_adaptive_demux_stream_data_received_default;
655 klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
656 klass->update_manifest = gst_adaptive_demux_update_manifest_default;
657 klass->requires_periodical_playlist_update =
658 gst_adaptive_demux_requires_periodical_playlist_update_default;
659
660 }
661
662 static void
gst_adaptive_demux_init(GstAdaptiveDemux * demux,GstAdaptiveDemuxClass * klass)663 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
664 GstAdaptiveDemuxClass * klass)
665 {
666 GstPadTemplate *pad_template;
667 GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
668 GObjectClass *gobject_class;
669
670 GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
671
672 demux->priv = gst_adaptive_demux_get_instance_private (demux);
673 demux->priv->input_adapter = gst_adapter_new ();
674 demux->downloader = gst_uri_downloader_new ();
675 gst_uri_downloader_set_parent (demux->downloader, GST_ELEMENT_CAST (demux));
676 demux->stream_struct_size = sizeof (GstAdaptiveDemuxStream);
677 demux->priv->segment_seqnum = gst_util_seqnum_next ();
678 demux->have_group_id = FALSE;
679 demux->group_id = G_MAXUINT;
680
681 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
682
683 gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
684 GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
685
686 demux->realtime_clock = gst_system_clock_obtain ();
687 g_assert (demux->realtime_clock != NULL);
688 gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
689 if (g_object_class_find_property (gobject_class, "clock-type")) {
690 g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
691 } else {
692 GST_WARNING_OBJECT (demux,
693 "System clock does not have clock-type property");
694 }
695 if (clock_type == GST_CLOCK_TYPE_REALTIME) {
696 demux->clock_offset = 0;
697 } else {
698 GDateTime *utc_now;
699 GstClockTime rtc_now;
700
701 utc_now = g_date_time_new_now_utc ();
702 rtc_now = gst_clock_get_time (demux->realtime_clock);
703 demux->clock_offset =
704 g_date_time_to_unix (utc_now) * G_TIME_SPAN_SECOND +
705 g_date_time_get_microsecond (utc_now) - GST_TIME_AS_USECONDS (rtc_now);
706 g_date_time_unref (utc_now);
707 }
708 g_rec_mutex_init (&demux->priv->updates_lock);
709 demux->priv->updates_task =
710 gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
711 demux, NULL);
712 gst_task_set_lock (demux->priv->updates_task, &demux->priv->updates_lock);
713
714 g_mutex_init (&demux->priv->updates_timed_lock);
715 g_cond_init (&demux->priv->updates_timed_cond);
716
717 g_cond_init (&demux->priv->manifest_cond);
718 g_mutex_init (&demux->priv->manifest_update_lock);
719
720 g_rec_mutex_init (&demux->priv->manifest_lock);
721 g_mutex_init (&demux->priv->api_lock);
722 g_mutex_init (&demux->priv->segment_lock);
723
724 g_cond_init (&demux->priv->preroll_cond);
725 g_mutex_init (&demux->priv->preroll_lock);
726
727 pad_template =
728 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
729 g_return_if_fail (pad_template != NULL);
730
731 demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
732 gst_pad_set_event_function (demux->sinkpad,
733 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
734 gst_pad_set_chain_function (demux->sinkpad,
735 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
736
737 /* Properties */
738 demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
739 demux->connection_speed = DEFAULT_CONNECTION_SPEED;
740 #ifdef OHOS_EXT_FUNC
741 // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
742 demux->slice_position = GST_CLOCK_TIME_NONE;
743 #endif
744 #ifdef OHOS_EXT_FUNC
745 // ohos.ext.func.0036
746 demux->priv->set_auto_bitrate_first = FALSE;
747 #endif
748
749 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
750 }
751
752 static void
gst_adaptive_demux_finalize(GObject * object)753 gst_adaptive_demux_finalize (GObject * object)
754 {
755 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
756 GstAdaptiveDemuxPrivate *priv = demux->priv;
757
758 GST_DEBUG_OBJECT (object, "finalize");
759
760 g_object_unref (priv->input_adapter);
761 g_object_unref (demux->downloader);
762
763 g_mutex_clear (&priv->updates_timed_lock);
764 g_cond_clear (&priv->updates_timed_cond);
765 g_mutex_clear (&demux->priv->manifest_update_lock);
766 g_cond_clear (&demux->priv->manifest_cond);
767 g_object_unref (priv->updates_task);
768 g_rec_mutex_clear (&priv->updates_lock);
769 g_rec_mutex_clear (&demux->priv->manifest_lock);
770 g_mutex_clear (&demux->priv->api_lock);
771 g_mutex_clear (&demux->priv->segment_lock);
772 if (demux->realtime_clock) {
773 gst_object_unref (demux->realtime_clock);
774 demux->realtime_clock = NULL;
775 }
776
777 g_cond_clear (&demux->priv->preroll_cond);
778 g_mutex_clear (&demux->priv->preroll_lock);
779
780 G_OBJECT_CLASS (parent_class)->finalize (object);
781 }
782
783 #ifdef OHOS_EXT_FUNC
784 // ohos.ext.func.0013
785 static void
set_property_to_element(GstObject * elem,guint property_id,const void * property_value)786 set_property_to_element (GstObject *elem, guint property_id, const void *property_value)
787 {
788 if ((elem == NULL) || (property_value == NULL)) {
789 return;
790 }
791
792 if (property_id == PROP_STATE_CHANGE) {
793 const gint *state_change = (const gint *) property_value;
794 g_object_set (elem, "state-change", *state_change, NULL);
795 } else if (property_id == PROP_RECONNECTION_TIMEOUT) { // ohos.ext.func.0033
796 const guint *timeout = (const guint *) property_value;
797 g_object_set (elem, "reconnection-timeout", *timeout, NULL);
798 } else if (property_id == PROP_EXIT_BLOCK) {
799 const gint *exit_block = (const gint *) property_value;
800 g_object_set (elem, "exit-block", *exit_block, NULL);
801 }
802 }
803
804 static void
set_property_to_src_element(const GList * stream_list,guint property_id,const void * property_value)805 set_property_to_src_element (const GList *stream_list, guint property_id, const void *property_value)
806 {
807 GstAdaptiveDemuxStream *stream = NULL;
808 GstIterator *iter = NULL;
809
810 if (property_value == NULL) {
811 GST_WARNING ("value is NULL, set_property_to_src_element failed!");
812 return;
813 }
814
815 for (; stream_list != NULL; stream_list = g_list_next (stream_list)) {
816 GValue data = { 0, };
817
818 stream = stream_list->data;
819 if ((stream == NULL) || (stream->src == NULL)) {
820 continue;
821 }
822
823 iter = gst_bin_iterate_sources ((GstBin *) stream->src);
824 if (iter == NULL) {
825 continue;
826 }
827 if (gst_iterator_next (iter, &data) == GST_ITERATOR_OK) {
828 GstElement *uri_src = g_value_get_object (&data);
829 if (uri_src != NULL) {
830 set_property_to_element ((GstObject *) uri_src, property_id, property_value);
831 }
832 }
833 if (G_IS_VALUE (&data)) {
834 g_value_unset (&data);
835 }
836 gst_iterator_free (iter);
837 iter = NULL;
838 }
839 }
840
841 static void
set_property_to_src_and_download(GstAdaptiveDemux * demux,guint property_id,const void * property_value)842 set_property_to_src_and_download (GstAdaptiveDemux *demux, guint property_id, const void *property_value)
843 {
844 if ((demux == NULL) || (property_value == NULL)) {
845 GST_WARNING ("input parameter is error");
846 return;
847 }
848
849 set_property_to_src_element (demux->streams, property_id, property_value);
850 //set_property_to_element ((GstObject *) demux->downloader, property_id, property_value);
851 }
852 #endif
853
854 static GstStateChangeReturn
gst_adaptive_demux_change_state(GstElement * element,GstStateChange transition)855 gst_adaptive_demux_change_state (GstElement * element,
856 GstStateChange transition)
857 {
858 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
859 GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
860
861 switch (transition) {
862 case GST_STATE_CHANGE_PAUSED_TO_READY:
863 if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
864 GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
865 gst_uri_downloader_cancel (demux->downloader);
866
867 GST_API_LOCK (demux);
868 GST_MANIFEST_LOCK (demux);
869 gst_adaptive_demux_reset (demux);
870 GST_MANIFEST_UNLOCK (demux);
871 GST_API_UNLOCK (demux);
872 break;
873 case GST_STATE_CHANGE_READY_TO_PAUSED:
874 GST_API_LOCK (demux);
875 GST_MANIFEST_LOCK (demux);
876 gst_adaptive_demux_reset (demux);
877 /* Clear "cancelled" flag in uridownloader since subclass might want to
878 * use uridownloader to fetch another manifest */
879 gst_uri_downloader_reset (demux->downloader);
880 if (g_atomic_int_get (&demux->priv->have_manifest))
881 gst_adaptive_demux_start_manifest_update_task (demux);
882 GST_MANIFEST_UNLOCK (demux);
883 GST_API_UNLOCK (demux);
884 if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
885 GST_DEBUG_OBJECT (demux, "demuxer has started running");
886 break;
887 default:
888 break;
889 }
890
891 /* this must be run without MANIFEST_LOCK taken.
892 * For PLAYING to PLAYING state changes, it will want to take a lock in
893 * src element and that lock is held while the streaming thread is running.
894 * The streaming thread will take the MANIFEST_LOCK, leading to a deadlock.
895 */
896 result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
897
898 return result;
899 }
900
901 #ifdef OHOS_EXT_FUNC
902 // ohos.ext.func.0028
903 static void
gst_adaptive_demux_update_bitrate(GstAdaptiveDemux * demux)904 gst_adaptive_demux_update_bitrate (GstAdaptiveDemux *demux)
905 {
906 GstAdaptiveDemuxBitrateInfo stream_bitrate_info;
907 stream_bitrate_info.bitrate_list = NULL;
908 stream_bitrate_info.bitrate_num = 0;
909 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
910 if ((demux_class->get_bitrate_info) == NULL) {
911 return;
912 }
913
914 gboolean ret = demux_class->get_bitrate_info(demux, &stream_bitrate_info);
915 if (!ret) {
916 return;
917 }
918
919 GST_INFO_OBJECT (demux, "Send to user, bitrate num = %u", stream_bitrate_info.bitrate_num);
920 g_signal_emit (GST_ELEMENT(demux), g_gst_adaptive_demux_signals[SIGNAL_BITRATE_PARSE_COMPLETE],
921 0, stream_bitrate_info.bitrate_list, stream_bitrate_info.bitrate_num);
922
923 if (stream_bitrate_info.bitrate_list != NULL) {
924 g_free (stream_bitrate_info.bitrate_list);
925 stream_bitrate_info.bitrate_list = NULL;
926 stream_bitrate_info.bitrate_num = 0;
927 }
928 }
929 #endif
930
931 static gboolean
gst_adaptive_demux_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)932 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
933 GstEvent * event)
934 {
935 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
936 gboolean ret;
937
938 switch (event->type) {
939 case GST_EVENT_FLUSH_STOP:{
940 GST_API_LOCK (demux);
941 GST_MANIFEST_LOCK (demux);
942
943 gst_adaptive_demux_reset (demux);
944
945 ret = gst_pad_event_default (pad, parent, event);
946
947 GST_MANIFEST_UNLOCK (demux);
948 GST_API_UNLOCK (demux);
949
950 return ret;
951 }
952 case GST_EVENT_EOS:{
953 GstAdaptiveDemuxClass *demux_class;
954 GstQuery *query;
955 gboolean query_res;
956 gboolean ret = TRUE;
957 gsize available;
958 GstBuffer *manifest_buffer;
959
960 GST_API_LOCK (demux);
961 GST_MANIFEST_LOCK (demux);
962
963 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
964
965 available = gst_adapter_available (demux->priv->input_adapter);
966
967 if (available == 0) {
968 GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
969 ret = gst_pad_event_default (pad, parent, event);
970
971 GST_MANIFEST_UNLOCK (demux);
972 GST_API_UNLOCK (demux);
973
974 return ret;
975 }
976
977 GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
978
979 /* Need to get the URI to use it as a base to generate the fragment's
980 * uris */
981 query = gst_query_new_uri ();
982 query_res = gst_pad_peer_query (pad, query);
983 if (query_res) {
984 gchar *uri, *redirect_uri;
985 gboolean permanent;
986
987 gst_query_parse_uri (query, &uri);
988 gst_query_parse_uri_redirection (query, &redirect_uri);
989 gst_query_parse_uri_redirection_permanent (query, &permanent);
990
991 if (permanent && redirect_uri) {
992 demux->manifest_uri = redirect_uri;
993 demux->manifest_base_uri = NULL;
994 g_free (uri);
995 } else {
996 demux->manifest_uri = uri;
997 demux->manifest_base_uri = redirect_uri;
998 }
999
1000 GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
1001 demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
1002 } else {
1003 GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
1004 }
1005 gst_query_unref (query);
1006
1007 /* Let the subclass parse the manifest */
1008 manifest_buffer =
1009 gst_adapter_take_buffer (demux->priv->input_adapter, available);
1010 if (!demux_class->process_manifest (demux, manifest_buffer)) {
1011 /* In most cases, this will happen if we set a wrong url in the
1012 * source element and we have received the 404 HTML response instead of
1013 * the manifest */
1014 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
1015 (NULL));
1016 ret = FALSE;
1017 } else {
1018 g_atomic_int_set (&demux->priv->have_manifest, TRUE);
1019 }
1020
1021 #ifdef OHOS_EXT_FUNC
1022 // ohos.ext.func.0028
1023 if (ret) {
1024 gst_adaptive_demux_update_bitrate(demux);
1025 }
1026 #endif
1027 gst_buffer_unref (manifest_buffer);
1028
1029 gst_element_post_message (GST_ELEMENT_CAST (demux),
1030 gst_message_new_element (GST_OBJECT_CAST (demux),
1031 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
1032 "manifest-uri", G_TYPE_STRING,
1033 demux->manifest_uri, "uri", G_TYPE_STRING,
1034 demux->manifest_uri,
1035 "manifest-download-start", GST_TYPE_CLOCK_TIME,
1036 GST_CLOCK_TIME_NONE,
1037 "manifest-download-stop", GST_TYPE_CLOCK_TIME,
1038 gst_util_get_timestamp (), NULL)));
1039
1040 if (ret) {
1041 /* Send duration message */
1042 if (!gst_adaptive_demux_is_live (demux)) {
1043 GstClockTime duration = demux_class->get_duration (demux);
1044
1045 if (duration != GST_CLOCK_TIME_NONE) {
1046 GST_DEBUG_OBJECT (demux,
1047 "Sending duration message : %" GST_TIME_FORMAT,
1048 GST_TIME_ARGS (duration));
1049 gst_element_post_message (GST_ELEMENT (demux),
1050 gst_message_new_duration_changed (GST_OBJECT (demux)));
1051 } else {
1052 GST_DEBUG_OBJECT (demux,
1053 "media duration unknown, can not send the duration message");
1054 }
1055 }
1056
1057 if (demux->next_streams) {
1058 gst_adaptive_demux_prepare_streams (demux,
1059 gst_adaptive_demux_is_live (demux));
1060 gst_adaptive_demux_start_tasks (demux, TRUE);
1061 gst_adaptive_demux_start_manifest_update_task (demux);
1062 } else {
1063 /* no streams */
1064 GST_WARNING_OBJECT (demux, "No streams created from manifest");
1065 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1066 (_("This file contains no playable streams.")),
1067 ("No known stream formats found at the Manifest"));
1068 ret = FALSE;
1069 }
1070
1071 }
1072 GST_MANIFEST_UNLOCK (demux);
1073 GST_API_UNLOCK (demux);
1074
1075 gst_event_unref (event);
1076 return ret;
1077 }
1078 case GST_EVENT_SEGMENT:
1079 /* Swallow newsegments, we'll push our own */
1080 gst_event_unref (event);
1081 return TRUE;
1082 default:
1083 break;
1084 }
1085
1086 return gst_pad_event_default (pad, parent, event);
1087 }
1088
1089 static GstFlowReturn
gst_adaptive_demux_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)1090 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1091 GstBuffer * buffer)
1092 {
1093 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1094
1095 GST_MANIFEST_LOCK (demux);
1096
1097 gst_adapter_push (demux->priv->input_adapter, buffer);
1098
1099 GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1100 (gint) gst_adapter_available (demux->priv->input_adapter));
1101
1102 GST_MANIFEST_UNLOCK (demux);
1103 return GST_FLOW_OK;
1104 }
1105
1106 /* must be called with manifest_lock taken */
1107 static void
gst_adaptive_demux_reset(GstAdaptiveDemux * demux)1108 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1109 {
1110 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1111 GList *iter;
1112 GList *old_streams;
1113 GstEvent *eos;
1114
1115 /* take ownership of old_streams before releasing the manifest_lock in
1116 * gst_adaptive_demux_stop_tasks
1117 */
1118 old_streams = demux->priv->old_streams;
1119 demux->priv->old_streams = NULL;
1120
1121 gst_adaptive_demux_stop_tasks (demux, TRUE);
1122
1123 if (klass->reset)
1124 klass->reset (demux);
1125
1126 eos = gst_event_new_eos ();
1127 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1128 GstAdaptiveDemuxStream *stream = iter->data;
1129 if (stream->pad) {
1130 gst_pad_push_event (stream->pad, gst_event_ref (eos));
1131 gst_pad_set_active (stream->pad, FALSE);
1132
1133 gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
1134 }
1135 gst_adaptive_demux_stream_free (stream);
1136 }
1137 gst_event_unref (eos);
1138 g_list_free (demux->streams);
1139 demux->streams = NULL;
1140 if (demux->prepared_streams) {
1141 g_list_free_full (demux->prepared_streams,
1142 (GDestroyNotify) gst_adaptive_demux_stream_free);
1143 demux->prepared_streams = NULL;
1144 }
1145 if (demux->next_streams) {
1146 g_list_free_full (demux->next_streams,
1147 (GDestroyNotify) gst_adaptive_demux_stream_free);
1148 demux->next_streams = NULL;
1149 }
1150
1151 if (old_streams) {
1152 g_list_free_full (old_streams,
1153 (GDestroyNotify) gst_adaptive_demux_stream_free);
1154 }
1155
1156 if (demux->priv->old_streams) {
1157 g_list_free_full (demux->priv->old_streams,
1158 (GDestroyNotify) gst_adaptive_demux_stream_free);
1159 demux->priv->old_streams = NULL;
1160 }
1161
1162 g_free (demux->manifest_uri);
1163 g_free (demux->manifest_base_uri);
1164 demux->manifest_uri = NULL;
1165 demux->manifest_base_uri = NULL;
1166
1167 gst_adapter_clear (demux->priv->input_adapter);
1168 g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1169
1170 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1171
1172 demux->have_group_id = FALSE;
1173 demux->group_id = G_MAXUINT;
1174 demux->priv->segment_seqnum = gst_util_seqnum_next ();
1175 }
1176
1177 static void
gst_adaptive_demux_handle_message(GstBin * bin,GstMessage * msg)1178 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1179 {
1180 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1181
1182 switch (GST_MESSAGE_TYPE (msg)) {
1183 case GST_MESSAGE_ERROR:{
1184 GList *iter;
1185 GstAdaptiveDemuxStream *stream = NULL;
1186 GError *err = NULL;
1187 gchar *debug = NULL;
1188 gchar *new_error = NULL;
1189 const GstStructure *details = NULL;
1190
1191 GST_MANIFEST_LOCK (demux);
1192
1193 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1194 GstAdaptiveDemuxStream *cur = iter->data;
1195 if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
1196 GST_OBJECT_CAST (cur->src))) {
1197 stream = cur;
1198 break;
1199 }
1200 }
1201 if (stream == NULL) {
1202 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1203 GstAdaptiveDemuxStream *cur = iter->data;
1204 if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
1205 GST_OBJECT_CAST (cur->src))) {
1206 stream = cur;
1207 break;
1208 }
1209 }
1210 if (stream == NULL) {
1211 GST_WARNING_OBJECT (demux,
1212 "Failed to locate stream for errored element");
1213 break;
1214 }
1215 }
1216
1217 gst_message_parse_error (msg, &err, &debug);
1218
1219 GST_WARNING_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
1220 "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1221 err->message, debug);
1222
1223 #ifdef OHOS_EXT_FUNC
1224 // ohos.ext.func.0033
1225 stream->errcode = err->code;
1226 #endif
1227
1228 if (debug)
1229 new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1230 if (new_error) {
1231 g_free (err->message);
1232 err->message = new_error;
1233 }
1234
1235 gst_message_parse_error_details (msg, &details);
1236 if (details) {
1237 gst_structure_get_uint (details, "http-status-code",
1238 &stream->last_status_code);
1239 }
1240
1241 /* error, but ask to retry */
1242 gst_adaptive_demux_stream_fragment_download_finish (stream,
1243 GST_FLOW_CUSTOM_ERROR, err);
1244
1245 #ifdef OHOS_EXT_FUNC
1246 // ohos.ext.func.0037 timeout need notify parent_class
1247 gboolean need_msg = FALSE;
1248 if (err->domain == GST_RESOURCE_ERROR && err->code == GST_RESOURCE_ERROR_TIME_OUT) {
1249 need_msg = TRUE;
1250 }
1251 #endif
1252
1253 g_error_free (err);
1254 g_free (debug);
1255
1256 GST_MANIFEST_UNLOCK (demux);
1257
1258 #ifdef OHOS_EXT_FUNC
1259 // ohos.ext.func.0037 timeout need notify parent_class
1260 if (need_msg) {
1261 break;
1262 }
1263 #endif
1264
1265 gst_message_unref (msg);
1266 msg = NULL;
1267 }
1268 break;
1269 default:
1270 break;
1271 }
1272
1273 if (msg)
1274 GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1275 }
1276
1277 void
gst_adaptive_demux_set_stream_struct_size(GstAdaptiveDemux * demux,gsize struct_size)1278 gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
1279 gsize struct_size)
1280 {
1281 GST_API_LOCK (demux);
1282 GST_MANIFEST_LOCK (demux);
1283 demux->stream_struct_size = struct_size;
1284 GST_MANIFEST_UNLOCK (demux);
1285 GST_API_UNLOCK (demux);
1286 }
1287
1288 /* must be called with manifest_lock taken */
1289 static gboolean
gst_adaptive_demux_prepare_stream(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1290 gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux,
1291 GstAdaptiveDemuxStream * stream)
1292 {
1293 GstPad *pad = stream->pad;
1294 gchar *name = gst_pad_get_name (pad);
1295 GstEvent *event;
1296 gchar *stream_id;
1297
1298 gst_pad_set_active (pad, TRUE);
1299 stream->need_header = TRUE;
1300
1301 stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT_CAST (demux), name);
1302
1303 event =
1304 gst_pad_get_sticky_event (GST_ADAPTIVE_DEMUX_SINK_PAD (demux),
1305 GST_EVENT_STREAM_START, 0);
1306 if (event) {
1307 if (gst_event_parse_group_id (event, &demux->group_id))
1308 demux->have_group_id = TRUE;
1309 else
1310 demux->have_group_id = FALSE;
1311 gst_event_unref (event);
1312 } else if (!demux->have_group_id) {
1313 demux->have_group_id = TRUE;
1314 demux->group_id = gst_util_group_id_next ();
1315 }
1316 event = gst_event_new_stream_start (stream_id);
1317 if (demux->have_group_id)
1318 gst_event_set_group_id (event, demux->group_id);
1319
1320 gst_pad_push_event (pad, event);
1321 g_free (stream_id);
1322 g_free (name);
1323
1324 GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
1325
1326 stream->discont = TRUE;
1327
1328 return TRUE;
1329 }
1330
1331 static gboolean
gst_adaptive_demux_expose_stream(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1332 gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
1333 GstAdaptiveDemuxStream * stream)
1334 {
1335 gboolean ret;
1336 GstPad *pad = stream->pad;
1337 GstCaps *caps;
1338
1339 if (stream->pending_caps) {
1340 gst_pad_set_caps (pad, stream->pending_caps);
1341 caps = stream->pending_caps;
1342 stream->pending_caps = NULL;
1343 } else {
1344 caps = gst_pad_get_current_caps (pad);
1345 }
1346
1347 GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT,
1348 GST_DEBUG_PAD_NAME (pad), caps);
1349 if (caps)
1350 gst_caps_unref (caps);
1351
1352 gst_object_ref (pad);
1353
1354 /* Don't hold the manifest lock while exposing a pad */
1355 GST_MANIFEST_UNLOCK (demux);
1356 ret = gst_element_add_pad (GST_ELEMENT_CAST (demux), pad);
1357 GST_MANIFEST_LOCK (demux);
1358
1359 return ret;
1360 }
1361
1362 /* must be called with manifest_lock taken */
1363 static GstClockTime
gst_adaptive_demux_stream_get_presentation_offset(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1364 gst_adaptive_demux_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1365 GstAdaptiveDemuxStream * stream)
1366 {
1367 GstAdaptiveDemuxClass *klass;
1368
1369 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1370
1371 if (klass->get_presentation_offset == NULL)
1372 return 0;
1373
1374 return klass->get_presentation_offset (demux, stream);
1375 }
1376
1377 /* must be called with manifest_lock taken */
1378 static GstClockTime
gst_adaptive_demux_get_period_start_time(GstAdaptiveDemux * demux)1379 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1380 {
1381 GstAdaptiveDemuxClass *klass;
1382
1383 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1384
1385 if (klass->get_period_start_time == NULL)
1386 return 0;
1387
1388 return klass->get_period_start_time (demux);
1389 }
1390
1391 /* must be called with manifest_lock taken */
1392 static gboolean
gst_adaptive_demux_prepare_streams(GstAdaptiveDemux * demux,gboolean first_and_live)1393 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1394 gboolean first_and_live)
1395 {
1396 GList *iter;
1397 GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
1398
1399 g_return_val_if_fail (demux->next_streams != NULL, FALSE);
1400 if (demux->prepared_streams != NULL) {
1401 /* Old streams that were never exposed, due to a seek or so */
1402 GST_FIXME_OBJECT (demux,
1403 "Preparing new streams without cleaning up old ones!");
1404 return FALSE;
1405 }
1406
1407 demux->prepared_streams = demux->next_streams;
1408 demux->next_streams = NULL;
1409
1410 if (!gst_adaptive_demux_is_running (demux)) {
1411 GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1412 return TRUE;
1413 }
1414
1415 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1416 GstAdaptiveDemuxStream *stream = iter->data;
1417
1418 stream->do_block = TRUE;
1419
1420 if (!gst_adaptive_demux_prepare_stream (demux,
1421 GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1422 /* TODO act on error */
1423 GST_FIXME_OBJECT (stream->pad,
1424 "Do something on failure to expose stream");
1425 }
1426
1427 if (first_and_live) {
1428 /* TODO we only need the first timestamp, maybe create a simple function to
1429 * get the current PTS of a fragment ? */
1430 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1431 gst_adaptive_demux_stream_update_fragment_info (demux, stream);
1432
1433 if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
1434 min_pts = MIN (min_pts, stream->fragment.timestamp);
1435 } else {
1436 min_pts = stream->fragment.timestamp;
1437 }
1438 }
1439 }
1440
1441 period_start = gst_adaptive_demux_get_period_start_time (demux);
1442
1443 /* For live streams, the subclass is supposed to seek to the current
1444 * fragment and then tell us its timestamp in stream->fragment.timestamp.
1445 * We now also have to seek our demuxer segment to reflect this.
1446 *
1447 * FIXME: This needs some refactoring at some point.
1448 */
1449 if (first_and_live) {
1450 gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1451 GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_pts + period_start,
1452 GST_SEEK_TYPE_NONE, -1, NULL);
1453 }
1454
1455 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1456 GstAdaptiveDemuxStream *stream = iter->data;
1457 GstClockTime offset;
1458
1459 offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1460 stream->segment = demux->segment;
1461
1462 /* The demuxer segment is just built from seek events, but for each stream
1463 * we have to adjust segments according to the current period and the
1464 * stream specific presentation time offset.
1465 *
1466 * For each period, buffer timestamps start again from 0. Additionally the
1467 * buffer timestamps are shifted by the stream specific presentation time
1468 * offset, so the first buffer timestamp of a period is 0 + presentation
1469 * time offset. If the stream contains timestamps itself, this is also
1470 * supposed to be the presentation time stored inside the stream.
1471 *
1472 * The stream time over periods is supposed to be continuous, that is the
1473 * buffer timestamp 0 + presentation time offset should map to the start
1474 * time of the current period.
1475 *
1476 *
1477 * The adjustment of the stream segments as such works the following.
1478 *
1479 * If the demuxer segment start is bigger than the period start, this
1480 * means that we have to drop some media at the beginning of the current
1481 * period, e.g. because a seek into the middle of the period has
1482 * happened. The amount of media to drop is the difference between the
1483 * period start and the demuxer segment start, and as each period starts
1484 * again from 0, this difference is going to be the actual stream's
1485 * segment start. As all timestamps of the stream are shifted by the
1486 * presentation time offset, we will also have to move the segment start
1487 * by that offset.
1488 *
1489 * Likewise, the demuxer segment stop value is adjusted in the same
1490 * fashion.
1491 *
1492 * Now the running time and stream time at the stream's segment start has
1493 * to be the one that is stored inside the demuxer's segment, which means
1494 * that segment.base and segment.time have to be copied over (done just
1495 * above)
1496 *
1497 *
1498 * If the demuxer segment start is smaller than the period start time,
1499 * this means that the whole period is inside the segment. As each period
1500 * starts timestamps from 0, and additionally timestamps are shifted by
1501 * the presentation time offset, the stream's first timestamp (and as such
1502 * the stream's segment start) has to be the presentation time offset.
1503 * The stream time at the segment start is supposed to be the stream time
1504 * of the period start according to the demuxer segment, so the stream
1505 * segment's time would be set to that. The same goes for the stream
1506 * segment's base, which is supposed to be the running time of the period
1507 * start according to the demuxer's segment.
1508 *
1509 * The same logic applies for negative rates with the segment stop and
1510 * the period stop time (which gets clamped).
1511 *
1512 *
1513 * For the first case where not the complete period is inside the segment,
1514 * the segment time and base as calculated by the second case would be
1515 * equivalent.
1516 */
1517 GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
1518 &demux->segment);
1519 GST_DEBUG_OBJECT (demux,
1520 "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
1521 GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
1522 /* note for readers:
1523 * Since stream->segment is initially a copy of demux->segment,
1524 * only the values that need updating are modified below. */
1525 if (first_and_live) {
1526 /* If first and live, demuxer did seek to the current position already */
1527 stream->segment.start = demux->segment.start - period_start + offset;
1528 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1529 stream->segment.stop = demux->segment.stop - period_start + offset;
1530 /* FIXME : Do we need to handle negative rates for this ? */
1531 stream->segment.position = stream->segment.start;
1532 } else if (demux->segment.start > period_start) {
1533 /* seek within a period */
1534 stream->segment.start = demux->segment.start - period_start + offset;
1535 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1536 stream->segment.stop = demux->segment.stop - period_start + offset;
1537 if (stream->segment.rate >= 0)
1538 stream->segment.position = offset;
1539 else
1540 stream->segment.position = stream->segment.stop;
1541 } else {
1542 stream->segment.start = offset;
1543 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1544 stream->segment.stop = demux->segment.stop - period_start + offset;
1545 if (stream->segment.rate >= 0)
1546 stream->segment.position = offset;
1547 else
1548 stream->segment.position = stream->segment.stop;
1549 stream->segment.time =
1550 gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
1551 period_start);
1552 stream->segment.base =
1553 gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
1554 period_start);
1555 }
1556
1557 stream->pending_segment = gst_event_new_segment (&stream->segment);
1558 gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
1559
1560 GST_DEBUG_OBJECT (demux,
1561 "Prepared segment %" GST_SEGMENT_FORMAT " for stream %p",
1562 &stream->segment, stream);
1563 }
1564 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1565
1566 return TRUE;
1567 }
1568
1569 static gboolean
gst_adaptive_demux_expose_streams(GstAdaptiveDemux * demux)1570 gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux)
1571 {
1572 GList *iter;
1573 GList *old_streams;
1574
1575 g_return_val_if_fail (demux->prepared_streams != NULL, FALSE);
1576
1577 old_streams = demux->streams;
1578 demux->streams = demux->prepared_streams;
1579 demux->prepared_streams = NULL;
1580
1581 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1582 GstAdaptiveDemuxStream *stream = iter->data;
1583
1584 if (!gst_adaptive_demux_expose_stream (demux,
1585 GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1586 /* TODO act on error */
1587 }
1588 }
1589 demux->priv->preroll_pending = 0;
1590
1591 GST_MANIFEST_UNLOCK (demux);
1592 gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
1593 GST_MANIFEST_LOCK (demux);
1594
1595 if (old_streams) {
1596 GstEvent *eos = gst_event_new_eos ();
1597
1598 /* before we put streams in the demux->priv->old_streams list,
1599 * we ask the download task to stop. In this way, it will no longer be
1600 * allowed to change the demux object.
1601 */
1602 for (iter = old_streams; iter; iter = g_list_next (iter)) {
1603 GstAdaptiveDemuxStream *stream = iter->data;
1604 GstPad *pad = gst_object_ref (GST_PAD (stream->pad));
1605
1606 GST_MANIFEST_UNLOCK (demux);
1607
1608 GST_DEBUG_OBJECT (pad, "Pushing EOS");
1609 gst_pad_push_event (pad, gst_event_ref (eos));
1610 gst_pad_set_active (pad, FALSE);
1611
1612 GST_LOG_OBJECT (pad, "Removing stream");
1613 gst_element_remove_pad (GST_ELEMENT (demux), pad);
1614 GST_MANIFEST_LOCK (demux);
1615
1616 gst_object_unref (GST_OBJECT (pad));
1617
1618 /* ask the download task to stop.
1619 * We will not join it now, because our thread can be one of these tasks.
1620 * We will do the joining later, from another stream download task or
1621 * from gst_adaptive_demux_stop_tasks.
1622 * We also cannot change the state of the stream->src element, because
1623 * that will wait on the streaming thread (which could be this thread)
1624 * to stop first.
1625 * Because we sent an EOS to the downstream element, the stream->src
1626 * element should detect this in its streaming task and stop.
1627 * Even if it doesn't do that, we will change its state later in
1628 * gst_adaptive_demux_stop_tasks.
1629 */
1630 GST_LOG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
1631 "Marking stream as cancelled");
1632 gst_task_stop (stream->download_task);
1633 g_mutex_lock (&stream->fragment_download_lock);
1634 stream->cancelled = TRUE;
1635 stream->replaced = TRUE;
1636 g_cond_signal (&stream->fragment_download_cond);
1637 g_mutex_unlock (&stream->fragment_download_lock);
1638 }
1639 gst_event_unref (eos);
1640
1641 /* The list should be freed from another thread as we can't properly
1642 * cleanup a GstTask from itself */
1643 demux->priv->old_streams =
1644 g_list_concat (demux->priv->old_streams, old_streams);
1645 }
1646
1647 /* Unblock after removing oldstreams */
1648 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1649 GstAdaptiveDemuxStream *stream = iter->data;
1650 stream->do_block = FALSE;
1651 }
1652
1653 GST_DEBUG_OBJECT (demux, "All streams are exposed");
1654
1655 return TRUE;
1656 }
1657
1658 /* must be called with manifest_lock taken */
1659 GstAdaptiveDemuxStream *
gst_adaptive_demux_stream_new(GstAdaptiveDemux * demux,GstPad * pad)1660 gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
1661 {
1662 GstAdaptiveDemuxStream *stream;
1663
1664 stream = g_malloc0 (demux->stream_struct_size);
1665
1666 /* Downloading task */
1667 g_rec_mutex_init (&stream->download_lock);
1668 stream->download_task =
1669 gst_task_new ((GstTaskFunction) gst_adaptive_demux_stream_download_loop,
1670 stream, NULL);
1671 gst_task_set_lock (stream->download_task, &stream->download_lock);
1672
1673 stream->pad = pad;
1674 stream->demux = demux;
1675 stream->fragment_bitrates =
1676 g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
1677 gst_pad_set_element_private (pad, stream);
1678 stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1679
1680 g_mutex_lock (&demux->priv->preroll_lock);
1681 stream->do_block = TRUE;
1682 demux->priv->preroll_pending++;
1683 g_mutex_unlock (&demux->priv->preroll_lock);
1684
1685 gst_pad_set_query_function (pad,
1686 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
1687 gst_pad_set_event_function (pad,
1688 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
1689
1690 gst_segment_init (&stream->segment, GST_FORMAT_TIME);
1691 g_cond_init (&stream->fragment_download_cond);
1692 g_mutex_init (&stream->fragment_download_lock);
1693
1694 demux->next_streams = g_list_append (demux->next_streams, stream);
1695
1696 return stream;
1697 }
1698
1699 GstAdaptiveDemuxStream *
gst_adaptive_demux_find_stream_for_pad(GstAdaptiveDemux * demux,GstPad * pad)1700 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1701 {
1702 GList *iter;
1703
1704 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1705 GstAdaptiveDemuxStream *stream = iter->data;
1706 if (stream->pad == pad) {
1707 return stream;
1708 }
1709 }
1710
1711 return NULL;
1712 }
1713
1714 /* must be called with manifest_lock taken.
1715 * It will temporarily drop the manifest_lock in order to join the task.
1716 * It will join only the old_streams (the demux->streams are joined by
1717 * gst_adaptive_demux_stop_tasks before gst_adaptive_demux_stream_free is
1718 * called)
1719 */
1720 static void
gst_adaptive_demux_stream_free(GstAdaptiveDemuxStream * stream)1721 gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
1722 {
1723 GstAdaptiveDemux *demux = stream->demux;
1724 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1725
1726 if (klass->stream_free)
1727 klass->stream_free (stream);
1728
1729 g_clear_error (&stream->last_error);
1730 if (stream->download_task) {
1731 if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
1732 GST_DEBUG_OBJECT (demux, "Leaving streaming task %s:%s",
1733 GST_DEBUG_PAD_NAME (stream->pad));
1734
1735 gst_task_stop (stream->download_task);
1736
1737 g_mutex_lock (&stream->fragment_download_lock);
1738 stream->cancelled = TRUE;
1739 g_cond_signal (&stream->fragment_download_cond);
1740 g_mutex_unlock (&stream->fragment_download_lock);
1741 }
1742 GST_LOG_OBJECT (demux, "Waiting for task to finish");
1743
1744 /* temporarily drop the manifest lock to join the task */
1745 GST_MANIFEST_UNLOCK (demux);
1746
1747 gst_task_join (stream->download_task);
1748
1749 GST_MANIFEST_LOCK (demux);
1750
1751 GST_LOG_OBJECT (demux, "Finished");
1752 gst_object_unref (stream->download_task);
1753 g_rec_mutex_clear (&stream->download_lock);
1754 stream->download_task = NULL;
1755 }
1756
1757 gst_adaptive_demux_stream_fragment_clear (&stream->fragment);
1758
1759 if (stream->pending_segment) {
1760 gst_event_unref (stream->pending_segment);
1761 stream->pending_segment = NULL;
1762 }
1763
1764 if (stream->pending_events) {
1765 g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
1766 stream->pending_events = NULL;
1767 }
1768
1769 if (stream->internal_pad) {
1770 gst_object_unparent (GST_OBJECT_CAST (stream->internal_pad));
1771 }
1772
1773 if (stream->src_srcpad) {
1774 gst_object_unref (stream->src_srcpad);
1775 stream->src_srcpad = NULL;
1776 }
1777
1778 if (stream->src) {
1779 GstElement *src = stream->src;
1780
1781 stream->src = NULL;
1782
1783 GST_MANIFEST_UNLOCK (demux);
1784 gst_element_set_locked_state (src, TRUE);
1785 gst_element_set_state (src, GST_STATE_NULL);
1786 gst_bin_remove (GST_BIN_CAST (demux), src);
1787 GST_MANIFEST_LOCK (demux);
1788 }
1789
1790 g_cond_clear (&stream->fragment_download_cond);
1791 g_mutex_clear (&stream->fragment_download_lock);
1792 g_free (stream->fragment_bitrates);
1793
1794 if (stream->pad) {
1795 gst_object_unref (stream->pad);
1796 stream->pad = NULL;
1797 }
1798 if (stream->pending_caps)
1799 gst_caps_unref (stream->pending_caps);
1800
1801 g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
1802
1803 g_free (stream);
1804 }
1805
1806 /* must be called with manifest_lock taken */
1807 static gboolean
gst_adaptive_demux_get_live_seek_range(GstAdaptiveDemux * demux,gint64 * range_start,gint64 * range_stop)1808 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1809 gint64 * range_start, gint64 * range_stop)
1810 {
1811 GstAdaptiveDemuxClass *klass;
1812
1813 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1814
1815 g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1816
1817 return klass->get_live_seek_range (demux, range_start, range_stop);
1818 }
1819
1820 /* must be called with manifest_lock taken */
1821 static gboolean
gst_adaptive_demux_stream_in_live_seek_range(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1822 gst_adaptive_demux_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1823 GstAdaptiveDemuxStream * stream)
1824 {
1825 gint64 range_start, range_stop;
1826 if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1827 GST_LOG_OBJECT (stream->pad,
1828 "stream position %" GST_TIME_FORMAT " live seek range %"
1829 GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1830 GST_TIME_ARGS (stream->segment.position), GST_STIME_ARGS (range_start),
1831 GST_STIME_ARGS (range_stop));
1832 return (stream->segment.position >= range_start
1833 && stream->segment.position <= range_stop);
1834 }
1835
1836 return FALSE;
1837 }
1838
1839 /* must be called with manifest_lock taken */
1840 static gboolean
gst_adaptive_demux_can_seek(GstAdaptiveDemux * demux)1841 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1842 {
1843 GstAdaptiveDemuxClass *klass;
1844
1845 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1846 if (gst_adaptive_demux_is_live (demux)) {
1847 return klass->get_live_seek_range != NULL;
1848 }
1849
1850 return klass->seek != NULL;
1851 }
1852
1853 static void
gst_adaptive_demux_update_streams_segment(GstAdaptiveDemux * demux,GList * streams,gint64 period_start,GstSeekType start_type,GstSeekType stop_type)1854 gst_adaptive_demux_update_streams_segment (GstAdaptiveDemux * demux,
1855 GList * streams, gint64 period_start, GstSeekType start_type,
1856 GstSeekType stop_type)
1857 {
1858 GList *iter;
1859 for (iter = streams; iter; iter = g_list_next (iter)) {
1860 GstAdaptiveDemuxStream *stream = iter->data;
1861 GstEvent *seg_evt;
1862 GstClockTime offset;
1863
1864 /* See comments in gst_adaptive_demux_get_period_start_time() for
1865 * an explanation of the segment modifications */
1866 stream->segment = demux->segment;
1867 offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1868 stream->segment.start += offset - period_start;
1869 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1870 stream->segment.stop += offset - period_start;
1871 if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1872 stream->segment.position = stream->segment.start;
1873 else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1874 stream->segment.position = stream->segment.stop;
1875 seg_evt = gst_event_new_segment (&stream->segment);
1876 gst_event_set_seqnum (seg_evt, demux->priv->segment_seqnum);
1877 gst_event_replace (&stream->pending_segment, seg_evt);
1878 GST_DEBUG_OBJECT (stream->pad, "Pending segment now %" GST_PTR_FORMAT,
1879 stream->pending_segment);
1880 gst_event_unref (seg_evt);
1881 /* Make sure the first buffer after a seek has the discont flag */
1882 stream->discont = TRUE;
1883 }
1884 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1885 }
1886
1887 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE | \
1888 GST_SEEK_FLAG_SNAP_AFTER | \
1889 GST_SEEK_FLAG_SNAP_NEAREST | \
1890 GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1891 GST_SEEK_FLAG_KEY_UNIT))
1892 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1893 GST_SEEK_FLAG_SNAP_AFTER | \
1894 GST_SEEK_FLAG_SNAP_NEAREST))
1895
1896 static gboolean
gst_adaptive_demux_handle_seek_event(GstAdaptiveDemux * demux,GstPad * pad,GstEvent * event)1897 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
1898 GstEvent * event)
1899 {
1900 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1901 gdouble rate;
1902 GstFormat format;
1903 GstSeekFlags flags;
1904 GstSeekType start_type, stop_type;
1905 gint64 start, stop;
1906 guint32 seqnum;
1907 gboolean update;
1908 gboolean ret;
1909 GstSegment oldsegment;
1910 GstAdaptiveDemuxStream *stream = NULL;
1911
1912 GST_INFO_OBJECT (demux, "Received seek event");
1913
1914 GST_API_LOCK (demux);
1915 GST_MANIFEST_LOCK (demux);
1916
1917 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1918 &stop_type, &stop);
1919
1920 if (format != GST_FORMAT_TIME) {
1921 GST_MANIFEST_UNLOCK (demux);
1922 GST_API_UNLOCK (demux);
1923 GST_WARNING_OBJECT (demux,
1924 "Adaptive demuxers only support TIME-based seeking");
1925 gst_event_unref (event);
1926 return FALSE;
1927 }
1928
1929 if (flags & GST_SEEK_FLAG_SEGMENT) {
1930 GST_FIXME_OBJECT (demux, "Handle segment seeks");
1931 GST_MANIFEST_UNLOCK (demux);
1932 GST_API_UNLOCK (demux);
1933 gst_event_unref (event);
1934 return FALSE;
1935 }
1936
1937 seqnum = gst_event_get_seqnum (event);
1938
1939 if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
1940 /* For instant rate seeks, reply directly and update
1941 * our segment so the new rate is reflected in any future
1942 * fragments */
1943 GstEvent *ev;
1944
1945 /* instant rate change only supported if direction does not change. All
1946 * other requirements are already checked before creating the seek event
1947 * but let's double-check here to be sure */
1948 if ((demux->segment.rate > 0 && rate < 0) ||
1949 (demux->segment.rate < 0 && rate > 0) ||
1950 start_type != GST_SEEK_TYPE_NONE ||
1951 stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
1952 GST_ERROR_OBJECT (demux,
1953 "Instant rate change seeks only supported in the "
1954 "same direction, without flushing and position change");
1955 GST_MANIFEST_UNLOCK (demux);
1956 GST_API_UNLOCK (demux);
1957 return FALSE;
1958 }
1959
1960 ev = gst_event_new_instant_rate_change (rate / demux->segment.rate,
1961 (GstSegmentFlags) flags);
1962 gst_event_set_seqnum (ev, seqnum);
1963
1964 GST_MANIFEST_UNLOCK (demux);
1965
1966 ret = gst_adaptive_demux_push_src_event (demux, ev);
1967
1968 GST_API_UNLOCK (demux);
1969 gst_event_unref (event);
1970
1971 return ret;
1972 }
1973
1974 if (!gst_adaptive_demux_can_seek (demux)) {
1975 GST_MANIFEST_UNLOCK (demux);
1976 GST_API_UNLOCK (demux);
1977 gst_event_unref (event);
1978 return FALSE;
1979 }
1980
1981 if (gst_adaptive_demux_is_live (demux)) {
1982 gint64 range_start, range_stop;
1983 gboolean changed = FALSE;
1984 gboolean start_valid = TRUE, stop_valid = TRUE;
1985
1986 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1987 &range_stop)) {
1988 GST_MANIFEST_UNLOCK (demux);
1989 GST_API_UNLOCK (demux);
1990 gst_event_unref (event);
1991 GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
1992 return FALSE;
1993 }
1994
1995 GST_DEBUG_OBJECT (demux,
1996 "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
1997 GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
1998
1999 /* Handle relative positioning for live streams (relative to the range_stop) */
2000 if (start_type == GST_SEEK_TYPE_END) {
2001 start = range_stop + start;
2002 start_type = GST_SEEK_TYPE_SET;
2003 changed = TRUE;
2004 }
2005 if (stop_type == GST_SEEK_TYPE_END) {
2006 stop = range_stop + stop;
2007 stop_type = GST_SEEK_TYPE_SET;
2008 changed = TRUE;
2009 }
2010
2011 /* Adjust the requested start/stop position if it falls beyond the live
2012 * seek range.
2013 * The only case where we don't adjust is for the starting point of
2014 * an accurate seek (start if forward and stop if backwards)
2015 */
2016 if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2017 (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2018 GST_DEBUG_OBJECT (demux,
2019 "seek before live stream start, setting to range start: %"
2020 GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2021 start = range_start;
2022 changed = TRUE;
2023 }
2024 /* truncate stop position also if set */
2025 if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2026 (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2027 GST_DEBUG_OBJECT (demux,
2028 "seek ending after live start, adjusting to: %"
2029 GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2030 stop = range_stop;
2031 changed = TRUE;
2032 }
2033
2034 if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2035 (start < range_start || start > range_stop)) {
2036 GST_WARNING_OBJECT (demux,
2037 "Seek to invalid position start:%" GST_STIME_FORMAT
2038 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2039 ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2040 GST_STIME_ARGS (range_stop));
2041 start_valid = FALSE;
2042 }
2043 if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2044 (stop < range_start || stop > range_stop)) {
2045 GST_WARNING_OBJECT (demux,
2046 "Seek to invalid position stop:%" GST_STIME_FORMAT
2047 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2048 ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2049 GST_STIME_ARGS (range_stop));
2050 stop_valid = FALSE;
2051 }
2052
2053 /* If the seek position is still outside of the seekable range, refuse the seek */
2054 if (!start_valid || !stop_valid) {
2055 GST_MANIFEST_UNLOCK (demux);
2056 GST_API_UNLOCK (demux);
2057 gst_event_unref (event);
2058 return FALSE;
2059 }
2060
2061 /* Re-create seek event with changed/updated values */
2062 if (changed) {
2063 gst_event_unref (event);
2064 event =
2065 gst_event_new_seek (rate, format, flags,
2066 start_type, start, stop_type, stop);
2067 gst_event_set_seqnum (event, seqnum);
2068 }
2069 }
2070
2071 GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2072
2073 /* have a backup in case seek fails */
2074 gst_segment_copy_into (&demux->segment, &oldsegment);
2075
2076 if (flags & GST_SEEK_FLAG_FLUSH) {
2077 GstEvent *fevent;
2078
2079 GST_DEBUG_OBJECT (demux, "sending flush start");
2080 fevent = gst_event_new_flush_start ();
2081 gst_event_set_seqnum (fevent, seqnum);
2082 GST_MANIFEST_UNLOCK (demux);
2083 gst_adaptive_demux_push_src_event (demux, fevent);
2084 GST_MANIFEST_LOCK (demux);
2085
2086 gst_adaptive_demux_stop_tasks (demux, FALSE);
2087 } else if ((rate > 0 && start_type != GST_SEEK_TYPE_NONE) ||
2088 (rate < 0 && stop_type != GST_SEEK_TYPE_NONE)) {
2089
2090 gst_adaptive_demux_stop_tasks (demux, FALSE);
2091 }
2092
2093 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2094
2095 /*
2096 * Handle snap seeks as follows:
2097 * 1) do the snap seeking on the stream that received
2098 * the event
2099 * 2) use the final position on this stream to seek
2100 * on the other streams to the same position
2101 *
2102 * We can't snap at all streams at the same time as
2103 * they might end in different positions, so just
2104 * use the one that received the event as the 'leading'
2105 * one to do the snap seek.
2106 */
2107 if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
2108 gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
2109 GstClockTime ts;
2110 GstSeekFlags stream_seek_flags = flags;
2111
2112 /* snap-seek on the stream that received the event and then
2113 * use the resulting position to seek on all streams */
2114
2115 if (rate >= 0) {
2116 if (start_type != GST_SEEK_TYPE_NONE)
2117 ts = start;
2118 else {
2119 ts = stream->segment.position;
2120 start_type = GST_SEEK_TYPE_SET;
2121 }
2122 } else {
2123 if (stop_type != GST_SEEK_TYPE_NONE)
2124 ts = stop;
2125 else {
2126 stop_type = GST_SEEK_TYPE_SET;
2127 ts = stream->segment.position;
2128 }
2129 }
2130
2131 if (stream) {
2132 demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
2133 }
2134
2135 /* replace event with a new one without snapping to seek on all streams */
2136 gst_event_unref (event);
2137 if (rate >= 0) {
2138 start = ts;
2139 } else {
2140 stop = ts;
2141 }
2142 event =
2143 gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2144 start_type, start, stop_type, stop);
2145 GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2146 }
2147 stream = NULL;
2148
2149 ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2150 start, stop_type, stop, &update);
2151
2152 if (ret) {
2153 /* FIXME - this seems unatural, do_seek() is updating base when we
2154 * only want the start/stop position to change, maybe do_seek() needs
2155 * some fixing? */
2156 if (!(flags & GST_SEEK_FLAG_FLUSH) && ((rate > 0
2157 && start_type == GST_SEEK_TYPE_NONE) || (rate < 0
2158 && stop_type == GST_SEEK_TYPE_NONE))) {
2159 demux->segment.base = oldsegment.base;
2160 }
2161
2162 GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2163
2164 ret = demux_class->seek (demux, event);
2165 }
2166
2167 if (!ret) {
2168 /* Is there anything else we can do if it fails? */
2169 gst_segment_copy_into (&oldsegment, &demux->segment);
2170 } else {
2171 demux->priv->segment_seqnum = seqnum;
2172 }
2173 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2174
2175 if (flags & GST_SEEK_FLAG_FLUSH) {
2176 GstEvent *fevent;
2177
2178 GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2179 fevent = gst_event_new_flush_stop (TRUE);
2180 gst_event_set_seqnum (fevent, seqnum);
2181 #ifdef OHOS_OPT_COMPAT
2182 /**
2183 * ohos.opt.compat.0037
2184 * Fixed deadlock when seek and reconfigure are concurrent
2185 */
2186 GST_MANIFEST_UNLOCK (demux);
2187 #endif
2188 gst_adaptive_demux_push_src_event (demux, fevent);
2189 #ifdef OHOS_OPT_COMPAT
2190 // ohos.opt.compat.0037
2191 GST_MANIFEST_LOCK (demux);
2192 #endif
2193 }
2194
2195 if (demux->next_streams) {
2196 /* If the seek generated new streams, get them
2197 * to preroll */
2198 gst_adaptive_demux_prepare_streams (demux, FALSE);
2199 gst_adaptive_demux_start_tasks (demux, TRUE);
2200 } else {
2201 GstClockTime period_start =
2202 gst_adaptive_demux_get_period_start_time (demux);
2203
2204 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2205 gst_adaptive_demux_update_streams_segment (demux, demux->streams,
2206 period_start, start_type, stop_type);
2207 gst_adaptive_demux_update_streams_segment (demux, demux->prepared_streams,
2208 period_start, start_type, stop_type);
2209 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2210
2211 #ifdef OHOS_OPT_COMPAT
2212 /* ohos.opt.compat.0028
2213 * In the variable resolution, if the changing path is still in the prepare stream state,
2214 * if seek continues to play at a certain location, the cancel of the prepare stream in will be set to true.
2215 * When switching to the main thread of the prepare stream, it is determined that when cancel is true,
2216 * no data will be pulled from the server, the prepare stream will be released first
2217 */
2218 if (demux->streams && demux->prepared_streams) {
2219 g_list_free_full (demux->prepared_streams,
2220 (GDestroyNotify) gst_adaptive_demux_stream_free);
2221 demux->prepared_streams = NULL;
2222 /* ohos.opt.compat.0043
2223 * Preroll_pending needs to reset when prepared_streams are released. The reason is
2224 * gst_adaptive_demux_stream_new() will increase the value, while after the streams were
2225 * prepared and released the value is not changed.
2226 */
2227 GST_MANIFEST_UNLOCK (demux);
2228 g_mutex_lock (&demux->priv->preroll_lock);
2229 demux->priv->preroll_pending = 0;
2230 g_mutex_unlock (&demux->priv->preroll_lock);
2231 GST_MANIFEST_LOCK (demux);
2232 }
2233 #endif
2234
2235 /* Restart the demux */
2236 gst_adaptive_demux_start_tasks (demux, FALSE);
2237 }
2238
2239 GST_MANIFEST_UNLOCK (demux);
2240 GST_API_UNLOCK (demux);
2241 gst_event_unref (event);
2242
2243 return ret;
2244 }
2245
2246 static gboolean
gst_adaptive_demux_src_event(GstPad * pad,GstObject * parent,GstEvent * event)2247 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2248 GstEvent * event)
2249 {
2250 GstAdaptiveDemux *demux;
2251
2252 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2253
2254 /* FIXME handle events received on pads that are to be removed */
2255
2256 switch (event->type) {
2257 case GST_EVENT_SEEK:
2258 {
2259 guint32 seqnum = gst_event_get_seqnum (event);
2260 if (seqnum == demux->priv->segment_seqnum) {
2261 GST_LOG_OBJECT (pad,
2262 "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2263 gst_event_unref (event);
2264 return TRUE;
2265 }
2266 return gst_adaptive_demux_handle_seek_event (demux, pad, event);
2267 }
2268 case GST_EVENT_RECONFIGURE:{
2269 GstAdaptiveDemuxStream *stream;
2270
2271 GST_MANIFEST_LOCK (demux);
2272 stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
2273
2274 if (stream) {
2275 if (!stream->cancelled && gst_adaptive_demux_is_running (demux) &&
2276 stream->last_ret == GST_FLOW_NOT_LINKED) {
2277 stream->last_ret = GST_FLOW_OK;
2278 stream->restart_download = TRUE;
2279 stream->need_header = TRUE;
2280 stream->discont = TRUE;
2281 GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
2282 gst_task_start (stream->download_task);
2283 }
2284 gst_event_unref (event);
2285 GST_MANIFEST_UNLOCK (demux);
2286 return TRUE;
2287 }
2288 GST_MANIFEST_UNLOCK (demux);
2289 }
2290 break;
2291 case GST_EVENT_LATENCY:{
2292 /* Upstream and our internal source are irrelevant
2293 * for latency, and we should not fail here to
2294 * configure the latency */
2295 gst_event_unref (event);
2296 return TRUE;
2297 }
2298 case GST_EVENT_QOS:{
2299 GstClockTimeDiff diff;
2300 GstClockTime timestamp;
2301 GstClockTime earliest_time;
2302
2303 gst_event_parse_qos (event, NULL, NULL, &diff, ×tamp);
2304 /* Only take into account lateness if late */
2305 if (diff > 0)
2306 earliest_time = timestamp + 2 * diff;
2307 else
2308 earliest_time = timestamp;
2309
2310 GST_OBJECT_LOCK (demux);
2311 if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2312 earliest_time > demux->priv->qos_earliest_time) {
2313 demux->priv->qos_earliest_time = earliest_time;
2314 GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2315 GST_TIME_ARGS (demux->priv->qos_earliest_time));
2316 }
2317 GST_OBJECT_UNLOCK (demux);
2318 break;
2319 }
2320 default:
2321 break;
2322 }
2323
2324 return gst_pad_event_default (pad, parent, event);
2325 }
2326
2327 static gboolean
gst_adaptive_demux_src_query(GstPad * pad,GstObject * parent,GstQuery * query)2328 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2329 GstQuery * query)
2330 {
2331 GstAdaptiveDemux *demux;
2332 GstAdaptiveDemuxClass *demux_class;
2333 gboolean ret = FALSE;
2334
2335 if (query == NULL)
2336 return FALSE;
2337
2338 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2339 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2340
2341 switch (query->type) {
2342 case GST_QUERY_DURATION:{
2343 GstClockTime duration = -1;
2344 GstFormat fmt;
2345
2346 gst_query_parse_duration (query, &fmt, NULL);
2347 #ifdef OHOS_OPT_COMPAT
2348 /**
2349 * ohos.opt.compat.0040 Fixed query duration failed.
2350 * gst_hls_demux_process_manifest() function hold a manifest lock to update playlist.
2351 * During this period, the value obtained by judgment(gst_adaptive_demux_is_live() function) is wrong.
2352 * Thus, use a manifest lock to block until the attribute endlist of m3u8 is updated.
2353 */
2354 GST_MANIFEST_LOCK (demux);
2355 #endif
2356 if (gst_adaptive_demux_is_live (demux)) {
2357 /* We are able to answer this query: the duration is unknown */
2358 gst_query_set_duration (query, fmt, -1);
2359 ret = TRUE;
2360 #ifdef OHOS_OPT_COMPAT
2361 // ohos.opt.compat.0040
2362 GST_MANIFEST_UNLOCK (demux);
2363 #endif
2364 break;
2365 }
2366 #ifdef OHOS_OPT_COMPAT
2367 // ohos.opt.compat.0040
2368 GST_MANIFEST_UNLOCK (demux);
2369 #endif
2370
2371 if (fmt == GST_FORMAT_TIME
2372 && g_atomic_int_get (&demux->priv->have_manifest)) {
2373 duration = demux_class->get_duration (demux);
2374
2375 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2376 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2377 ret = TRUE;
2378 }
2379 }
2380
2381 GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2382 GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2383 break;
2384 }
2385 case GST_QUERY_LATENCY:{
2386 gst_query_set_latency (query, FALSE, 0, -1);
2387 ret = TRUE;
2388 break;
2389 }
2390 case GST_QUERY_SEEKING:{
2391 GstFormat fmt;
2392 gint64 stop = -1;
2393 gint64 start = 0;
2394
2395 if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2396 GST_INFO_OBJECT (demux,
2397 "Don't have manifest yet, can't answer seeking query");
2398 return FALSE; /* can't answer without manifest */
2399 }
2400
2401 GST_MANIFEST_LOCK (demux);
2402
2403 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2404 GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2405 if (fmt == GST_FORMAT_TIME) {
2406 GstClockTime duration;
2407 gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2408
2409 ret = TRUE;
2410 if (can_seek) {
2411 if (gst_adaptive_demux_is_live (demux)) {
2412 ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2413 if (!ret) {
2414 GST_MANIFEST_UNLOCK (demux);
2415 GST_INFO_OBJECT (demux, "can't answer seeking query");
2416 return FALSE;
2417 }
2418 } else {
2419 duration = demux_class->get_duration (demux);
2420 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2421 stop = duration;
2422 }
2423 }
2424 gst_query_set_seeking (query, fmt, can_seek, start, stop);
2425 GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2426 GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2427 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2428 }
2429 GST_MANIFEST_UNLOCK (demux);
2430 break;
2431 }
2432 case GST_QUERY_URI:
2433
2434 GST_MANIFEST_LOCK (demux);
2435
2436 /* TODO HLS can answer this differently it seems */
2437 if (demux->manifest_uri) {
2438 /* FIXME: (hls) Do we answer with the variant playlist, with the current
2439 * playlist or the the uri of the last downlowaded fragment? */
2440 gst_query_set_uri (query, demux->manifest_uri);
2441 ret = TRUE;
2442 }
2443
2444 GST_MANIFEST_UNLOCK (demux);
2445 break;
2446 default:
2447 /* Don't forward queries upstream because of the special nature of this
2448 * "demuxer", which relies on the upstream element only to be fed
2449 * the Manifest
2450 */
2451 break;
2452 }
2453
2454 return ret;
2455 }
2456
2457 /* must be called with manifest_lock taken */
2458 static void
gst_adaptive_demux_start_tasks(GstAdaptiveDemux * demux,gboolean start_preroll_streams)2459 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
2460 gboolean start_preroll_streams)
2461 {
2462 GList *iter;
2463
2464 if (!gst_adaptive_demux_is_running (demux)) {
2465 GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2466 return;
2467 }
2468
2469 GST_INFO_OBJECT (demux, "Starting streams' tasks");
2470
2471 iter = start_preroll_streams ? demux->prepared_streams : demux->streams;
2472
2473 for (; iter; iter = g_list_next (iter)) {
2474 GstAdaptiveDemuxStream *stream = iter->data;
2475 #ifdef OHOS_EXT_FUNC
2476 // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
2477 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2478 if (klass->stream_select_bitrate)
2479 klass->stream_select_bitrate (stream, demux->connection_speed);
2480 #endif
2481
2482 if (!start_preroll_streams) {
2483 g_mutex_lock (&stream->fragment_download_lock);
2484 stream->cancelled = FALSE;
2485 stream->replaced = FALSE;
2486 g_mutex_unlock (&stream->fragment_download_lock);
2487 }
2488
2489 stream->last_ret = GST_FLOW_OK;
2490 gst_task_start (stream->download_task);
2491 }
2492 }
2493
2494 /* must be called with manifest_lock taken */
2495 static void
gst_adaptive_demux_stop_manifest_update_task(GstAdaptiveDemux * demux)2496 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2497 {
2498 gst_uri_downloader_cancel (demux->downloader);
2499
2500 gst_task_stop (demux->priv->updates_task);
2501
2502 g_mutex_lock (&demux->priv->updates_timed_lock);
2503 GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2504 demux->priv->stop_updates_task = TRUE;
2505 g_cond_signal (&demux->priv->updates_timed_cond);
2506 g_mutex_unlock (&demux->priv->updates_timed_lock);
2507 }
2508
2509 /* must be called with manifest_lock taken */
2510 static void
gst_adaptive_demux_start_manifest_update_task(GstAdaptiveDemux * demux)2511 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2512 {
2513 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2514
2515 if (gst_adaptive_demux_is_live (demux)) {
2516 #ifdef OHOS_EXT_FUNC
2517 // ohos.ext.func.0034
2518 g_signal_emit (GST_ELEMENT(demux), g_gst_adaptive_demux_signals[SIGNAL_IS_LIVE_SCENE], 0, TRUE);
2519 // ohos.ext.func.0036
2520 if (demux->priv->set_auto_bitrate_first == FALSE) {
2521 demux->connection_speed = 0;
2522 demux->priv->set_auto_bitrate_first = TRUE;
2523 }
2524
2525 #endif
2526 gst_uri_downloader_reset (demux->downloader);
2527 g_mutex_lock (&demux->priv->updates_timed_lock);
2528 demux->priv->stop_updates_task = FALSE;
2529 g_mutex_unlock (&demux->priv->updates_timed_lock);
2530 /* Task to periodically update the manifest */
2531 if (demux_class->requires_periodical_playlist_update (demux)) {
2532 GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2533 gst_task_start (demux->priv->updates_task);
2534 }
2535 }
2536 }
2537
2538 /* must be called with manifest_lock taken
2539 * This function will temporarily release manifest_lock in order to join the
2540 * download threads.
2541 * The api_lock will still protect it against other threads trying to modify
2542 * the demux element.
2543 */
2544 static void
gst_adaptive_demux_stop_tasks(GstAdaptiveDemux * demux,gboolean stop_updates)2545 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2546 {
2547 int i;
2548 GList *iter;
2549 GList *list_to_process;
2550
2551 GST_LOG_OBJECT (demux, "Stopping tasks");
2552
2553 if (stop_updates)
2554 gst_adaptive_demux_stop_manifest_update_task (demux);
2555
2556 list_to_process = demux->streams;
2557 for (i = 0; i < 2; ++i) {
2558 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2559 GstAdaptiveDemuxStream *stream = iter->data;
2560
2561 g_mutex_lock (&stream->fragment_download_lock);
2562 stream->cancelled = TRUE;
2563 gst_task_stop (stream->download_task);
2564 g_cond_signal (&stream->fragment_download_cond);
2565 g_mutex_unlock (&stream->fragment_download_lock);
2566 }
2567 list_to_process = demux->prepared_streams;
2568 }
2569
2570 GST_MANIFEST_UNLOCK (demux);
2571 g_mutex_lock (&demux->priv->preroll_lock);
2572 g_cond_broadcast (&demux->priv->preroll_cond);
2573 g_mutex_unlock (&demux->priv->preroll_lock);
2574 GST_MANIFEST_LOCK (demux);
2575
2576 g_mutex_lock (&demux->priv->manifest_update_lock);
2577 g_cond_broadcast (&demux->priv->manifest_cond);
2578 g_mutex_unlock (&demux->priv->manifest_update_lock);
2579
2580 /* need to release manifest_lock before stopping the src element.
2581 * The streams were asked to cancel, so they will not make any writes to demux
2582 * object. Even if we temporarily release manifest_lock, the demux->streams
2583 * cannot change and iter cannot be invalidated.
2584 */
2585 list_to_process = demux->streams;
2586 for (i = 0; i < 2; ++i) {
2587 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2588 GstAdaptiveDemuxStream *stream = iter->data;
2589 GstElement *src = stream->src;
2590
2591 GST_MANIFEST_UNLOCK (demux);
2592
2593 if (src) {
2594 gst_element_set_locked_state (src, TRUE);
2595 gst_element_set_state (src, GST_STATE_READY);
2596 }
2597
2598 /* stream->download_task value never changes, so it is safe to read it
2599 * outside critical section
2600 */
2601 gst_task_join (stream->download_task);
2602
2603 GST_MANIFEST_LOCK (demux);
2604 }
2605 list_to_process = demux->prepared_streams;
2606 }
2607
2608 GST_MANIFEST_UNLOCK (demux);
2609 if (stop_updates)
2610 gst_task_join (demux->priv->updates_task);
2611
2612 GST_MANIFEST_LOCK (demux);
2613
2614 list_to_process = demux->streams;
2615 for (i = 0; i < 2; ++i) {
2616 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2617 GstAdaptiveDemuxStream *stream = iter->data;
2618
2619 stream->download_error_count = 0;
2620 stream->need_header = TRUE;
2621 }
2622 list_to_process = demux->prepared_streams;
2623 }
2624 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2625 }
2626
2627 /* must be called with manifest_lock taken */
2628 static gboolean
gst_adaptive_demux_push_src_event(GstAdaptiveDemux * demux,GstEvent * event)2629 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2630 {
2631 GList *iter;
2632 gboolean ret = TRUE;
2633
2634 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2635 GstAdaptiveDemuxStream *stream = iter->data;
2636 gst_event_ref (event);
2637 ret = ret & gst_pad_push_event (stream->pad, event);
2638 }
2639 gst_event_unref (event);
2640 return ret;
2641 }
2642
2643 /* must be called with manifest_lock taken */
2644 void
gst_adaptive_demux_stream_set_caps(GstAdaptiveDemuxStream * stream,GstCaps * caps)2645 gst_adaptive_demux_stream_set_caps (GstAdaptiveDemuxStream * stream,
2646 GstCaps * caps)
2647 {
2648 GST_DEBUG_OBJECT (stream->pad, "setting new caps for stream %" GST_PTR_FORMAT,
2649 caps);
2650 gst_caps_replace (&stream->pending_caps, caps);
2651 gst_caps_unref (caps);
2652 }
2653
2654 /* must be called with manifest_lock taken */
2655 void
gst_adaptive_demux_stream_set_tags(GstAdaptiveDemuxStream * stream,GstTagList * tags)2656 gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
2657 GstTagList * tags)
2658 {
2659 GST_DEBUG_OBJECT (stream->pad, "setting new tags for stream %" GST_PTR_FORMAT,
2660 tags);
2661 if (stream->pending_tags) {
2662 gst_tag_list_unref (stream->pending_tags);
2663 }
2664 stream->pending_tags = tags;
2665 }
2666
2667 /* must be called with manifest_lock taken */
2668 void
gst_adaptive_demux_stream_queue_event(GstAdaptiveDemuxStream * stream,GstEvent * event)2669 gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
2670 GstEvent * event)
2671 {
2672 stream->pending_events = g_list_append (stream->pending_events, event);
2673 }
2674
2675 /* must be called with manifest_lock taken */
2676 static guint64
_update_average_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,guint64 new_bitrate)2677 _update_average_bitrate (GstAdaptiveDemux * demux,
2678 GstAdaptiveDemuxStream * stream, guint64 new_bitrate)
2679 {
2680 gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2681
2682 stream->moving_bitrate -= stream->fragment_bitrates[index];
2683 stream->fragment_bitrates[index] = new_bitrate;
2684 stream->moving_bitrate += new_bitrate;
2685
2686 stream->moving_index += 1;
2687
2688 if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2689 return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2690 return stream->moving_bitrate / stream->moving_index;
2691 }
2692
2693 /* must be called with manifest_lock taken */
2694 static guint64
gst_adaptive_demux_stream_update_current_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2695 gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2696 GstAdaptiveDemuxStream * stream)
2697 {
2698 guint64 average_bitrate;
2699 guint64 fragment_bitrate;
2700
2701 if (demux->connection_speed) {
2702 GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it",
2703 demux->connection_speed / 1000);
2704 stream->current_download_rate = demux->connection_speed;
2705 return demux->connection_speed;
2706 }
2707
2708 fragment_bitrate = stream->last_bitrate;
2709 GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2710 fragment_bitrate);
2711
2712 average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2713
2714 GST_INFO_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2715 "last fragment bitrate was %" G_GUINT64_FORMAT, fragment_bitrate);
2716 GST_INFO_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2717 "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
2718 NUM_LOOKBACK_FRAGMENTS, average_bitrate);
2719
2720 /* Conservative approach, make sure we don't upgrade too fast */
2721 stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
2722
2723 stream->current_download_rate *= demux->bitrate_limit;
2724 GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %"
2725 G_GUINT64_FORMAT, demux->bitrate_limit, stream->current_download_rate);
2726
2727 #if 0
2728 /* Debugging code, modulate the bitrate every few fragments */
2729 {
2730 static guint ctr = 0;
2731 if (ctr % 3 == 0) {
2732 GST_INFO_OBJECT (demux, "Halving reported bitrate for debugging");
2733 stream->current_download_rate /= 2;
2734 }
2735 ctr++;
2736 }
2737 #endif
2738
2739 return stream->current_download_rate;
2740 }
2741
2742 /* must be called with manifest_lock taken */
2743 static GstFlowReturn
gst_adaptive_demux_combine_flows(GstAdaptiveDemux * demux)2744 gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
2745 {
2746 gboolean all_notlinked = TRUE;
2747 gboolean all_eos = TRUE;
2748 GList *iter;
2749
2750 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2751 GstAdaptiveDemuxStream *stream = iter->data;
2752
2753 if (stream->last_ret != GST_FLOW_NOT_LINKED) {
2754 all_notlinked = FALSE;
2755 if (stream->last_ret != GST_FLOW_EOS)
2756 all_eos = FALSE;
2757 }
2758
2759 if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
2760 || stream->last_ret == GST_FLOW_FLUSHING) {
2761 return stream->last_ret;
2762 }
2763 }
2764 if (all_notlinked)
2765 return GST_FLOW_NOT_LINKED;
2766 else if (all_eos)
2767 return GST_FLOW_EOS;
2768 return GST_FLOW_OK;
2769 }
2770
2771 /* Called with preroll_lock */
2772 static void
gst_adaptive_demux_handle_preroll(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2773 gst_adaptive_demux_handle_preroll (GstAdaptiveDemux * demux,
2774 GstAdaptiveDemuxStream * stream)
2775 {
2776 demux->priv->preroll_pending--;
2777 if (demux->priv->preroll_pending == 0) {
2778 /* That was the last one, time to release all streams
2779 * and expose them */
2780 GST_DEBUG_OBJECT (demux, "All streams prerolled. exposing");
2781 gst_adaptive_demux_expose_streams (demux);
2782 g_cond_broadcast (&demux->priv->preroll_cond);
2783 }
2784 }
2785
2786 /* must be called with manifest_lock taken.
2787 * Temporarily releases manifest_lock
2788 */
2789 GstFlowReturn
gst_adaptive_demux_stream_push_buffer(GstAdaptiveDemuxStream * stream,GstBuffer * buffer)2790 gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
2791 GstBuffer * buffer)
2792 {
2793 GstAdaptiveDemux *demux = stream->demux;
2794 GstFlowReturn ret = GST_FLOW_OK;
2795 gboolean discont = FALSE;
2796 /* Pending events */
2797 GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags = NULL;
2798 GList *pending_events = NULL;
2799
2800 /* FIXME :
2801 * This is duplicating *exactly* the same thing as what is done at the beginning
2802 * of _src_chain if starting_fragment is TRUE */
2803 if (stream->first_fragment_buffer) {
2804 GstClockTime offset =
2805 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2806 GstClockTime period_start =
2807 gst_adaptive_demux_get_period_start_time (demux);
2808
2809 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2810 if (demux->segment.rate < 0)
2811 /* Set DISCONT flag for every first buffer in reverse playback mode
2812 * as each fragment for its own has to be reversed */
2813 discont = TRUE;
2814
2815 GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2816 if (GST_BUFFER_PTS_IS_VALID (buffer))
2817 GST_BUFFER_PTS (buffer) += offset;
2818
2819 if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2820 stream->segment.position = GST_BUFFER_PTS (buffer);
2821
2822 /* Convert from position inside the stream's segment to the demuxer's
2823 * segment, they are not necessarily the same */
2824 if (stream->segment.position - offset + period_start >
2825 demux->segment.position)
2826 demux->segment.position =
2827 stream->segment.position - offset + period_start;
2828 }
2829 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2830
2831 GST_LOG_OBJECT (stream->pad,
2832 "Going to push buffer with PTS %" GST_TIME_FORMAT,
2833 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2834 } else {
2835 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2836 }
2837
2838 if (stream->discont) {
2839 discont = TRUE;
2840 stream->discont = FALSE;
2841 }
2842
2843 if (discont) {
2844 GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
2845 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2846 } else {
2847 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
2848 }
2849
2850 stream->first_fragment_buffer = FALSE;
2851
2852 GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
2853 GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
2854 if (G_UNLIKELY (stream->pending_caps)) {
2855 pending_caps = gst_event_new_caps (stream->pending_caps);
2856 gst_caps_unref (stream->pending_caps);
2857 stream->pending_caps = NULL;
2858 }
2859
2860 if (stream->do_block) {
2861
2862 g_mutex_lock (&demux->priv->preroll_lock);
2863
2864 /* If we are preroll state, set caps in here */
2865 if (pending_caps) {
2866 gst_pad_push_event (stream->pad, pending_caps);
2867 pending_caps = NULL;
2868 }
2869
2870 gst_adaptive_demux_handle_preroll (demux, stream);
2871 GST_MANIFEST_UNLOCK (demux);
2872
2873 while (stream->do_block && !stream->cancelled) {
2874 GST_LOG_OBJECT (demux, "Stream %p sleeping for preroll", stream);
2875 g_cond_wait (&demux->priv->preroll_cond, &demux->priv->preroll_lock);
2876 }
2877 if (stream->cancelled) {
2878 GST_LOG_OBJECT (demux, "stream %p cancelled", stream);
2879 gst_buffer_unref (buffer);
2880 g_mutex_unlock (&demux->priv->preroll_lock);
2881 return GST_FLOW_FLUSHING;
2882 }
2883
2884 g_mutex_unlock (&demux->priv->preroll_lock);
2885 GST_MANIFEST_LOCK (demux);
2886 }
2887
2888 if (G_UNLIKELY (stream->pending_segment)) {
2889 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2890 pending_segment = stream->pending_segment;
2891 stream->pending_segment = NULL;
2892 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2893 }
2894 if (G_UNLIKELY (stream->pending_tags || stream->bitrate_changed)) {
2895 GstTagList *tags = stream->pending_tags;
2896
2897 stream->pending_tags = NULL;
2898 stream->bitrate_changed = 0;
2899
2900 if (stream->fragment.bitrate != 0) {
2901 if (tags)
2902 tags = gst_tag_list_make_writable (tags);
2903 else
2904 tags = gst_tag_list_new_empty ();
2905
2906 #ifdef OHOS_EXT_FUNC
2907 // ohos.ext.func.0042 when change bitrate, ongly change playlist, don't add new src and new pipeline
2908 GST_DEBUG_OBJECT (demux, "set bandwidth");
2909 gint bandwidth = 0;
2910 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2911 if (klass->get_current_bandwidth) {
2912 bandwidth = klass->get_current_bandwidth(stream);
2913 }
2914 guint64 position = 0;
2915 if (klass->get_current_position) {
2916 position = klass->get_current_position(stream);
2917 }
2918 gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
2919 GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, GST_TAG_BANDWIDTH, bandwidth, GST_TAG_SLICE_POSITION, position, NULL);
2920 #else
2921 gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
2922 GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, NULL);
2923 #endif
2924 }
2925 if (tags)
2926 pending_tags = gst_event_new_tag (tags);
2927 }
2928 if (G_UNLIKELY (stream->pending_events)) {
2929 pending_events = stream->pending_events;
2930 stream->pending_events = NULL;
2931 }
2932
2933 GST_MANIFEST_UNLOCK (demux);
2934
2935 /* Do not push events or buffers holding the manifest lock */
2936 if (G_UNLIKELY (pending_caps)) {
2937 GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT,
2938 pending_caps);
2939 gst_pad_push_event (stream->pad, pending_caps);
2940 }
2941 #ifdef OHOS_EXT_FUNC
2942 // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
2943 if (G_UNLIKELY (pending_tags)) {
2944 GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
2945 pending_tags);
2946 gst_pad_push_event (stream->pad, pending_tags);
2947 }
2948 if (G_UNLIKELY (pending_segment)) {
2949 GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
2950 pending_segment);
2951 gst_pad_push_event (stream->pad, pending_segment);
2952 }
2953 #else
2954 if (G_UNLIKELY (pending_segment)) {
2955 GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
2956 pending_segment);
2957 gst_pad_push_event (stream->pad, pending_segment);
2958 }
2959 if (G_UNLIKELY (pending_tags)) {
2960 GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
2961 pending_tags);
2962 gst_pad_push_event (stream->pad, pending_tags);
2963 }
2964 #endif
2965 while (pending_events != NULL) {
2966 GstEvent *event = pending_events->data;
2967
2968 if (!gst_pad_push_event (stream->pad, event))
2969 GST_ERROR_OBJECT (stream->pad, "Failed to send pending event");
2970
2971 pending_events = g_list_delete_link (pending_events, pending_events);
2972 }
2973
2974 /* Wait for preroll if blocking */
2975 GST_DEBUG_OBJECT (stream->pad,
2976 "About to push buffer of size %" G_GSIZE_FORMAT,
2977 gst_buffer_get_size (buffer));
2978
2979 ret = gst_pad_push (stream->pad, buffer);
2980
2981 GST_MANIFEST_LOCK (demux);
2982
2983 g_mutex_lock (&stream->fragment_download_lock);
2984 if (G_UNLIKELY (stream->cancelled)) {
2985 GST_LOG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2986 "Stream was cancelled");
2987 ret = stream->last_ret = GST_FLOW_FLUSHING;
2988 g_mutex_unlock (&stream->fragment_download_lock);
2989 return ret;
2990 }
2991 g_mutex_unlock (&stream->fragment_download_lock);
2992
2993 GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
2994 gst_flow_get_name (ret));
2995
2996 return ret;
2997 }
2998
2999 /* must be called with manifest_lock taken */
3000 static GstFlowReturn
gst_adaptive_demux_stream_finish_fragment_default(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)3001 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
3002 GstAdaptiveDemuxStream * stream)
3003 {
3004 /* No need to advance, this isn't a real fragment */
3005 if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
3006 return GST_FLOW_OK;
3007
3008 return gst_adaptive_demux_stream_advance_fragment (demux, stream,
3009 stream->fragment.duration);
3010 }
3011
3012 /* must be called with manifest_lock taken.
3013 * Can temporarily release manifest_lock
3014 */
3015 static GstFlowReturn
gst_adaptive_demux_stream_data_received_default(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstBuffer * buffer)3016 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
3017 GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
3018 {
3019 return gst_adaptive_demux_stream_push_buffer (stream, buffer);
3020 }
3021
3022 static gboolean
gst_adaptive_demux_requires_periodical_playlist_update_default(GstAdaptiveDemux * demux)3023 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
3024 * demux)
3025 {
3026 return TRUE;
3027 }
3028
3029 static GstFlowReturn
_src_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)3030 _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
3031 {
3032 GstAdaptiveDemuxStream *stream;
3033 GstAdaptiveDemux *demux;
3034 GstAdaptiveDemuxClass *klass;
3035 GstFlowReturn ret = GST_FLOW_OK;
3036
3037 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
3038 stream = gst_pad_get_element_private (pad);
3039 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3040
3041 GST_MANIFEST_LOCK (demux);
3042
3043 /* do not make any changes if the stream is cancelled */
3044 g_mutex_lock (&stream->fragment_download_lock);
3045 if (G_UNLIKELY (stream->cancelled)) {
3046 g_mutex_unlock (&stream->fragment_download_lock);
3047 gst_buffer_unref (buffer);
3048 ret = stream->last_ret = GST_FLOW_FLUSHING;
3049 GST_MANIFEST_UNLOCK (demux);
3050 return ret;
3051 }
3052 g_mutex_unlock (&stream->fragment_download_lock);
3053
3054 /* starting_fragment is set to TRUE at the beginning of
3055 * _stream_download_fragment()
3056 * /!\ If there is a header/index being downloaded, then this will
3057 * be TRUE for the first one ... but FALSE for the remaining ones,
3058 * including the *actual* fragment ! */
3059 if (stream->starting_fragment) {
3060 GstClockTime offset =
3061 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
3062 GstClockTime period_start =
3063 gst_adaptive_demux_get_period_start_time (demux);
3064
3065 stream->starting_fragment = FALSE;
3066 if (klass->start_fragment) {
3067 if (!klass->start_fragment (demux, stream)) {
3068 ret = GST_FLOW_ERROR;
3069 goto error;
3070 }
3071 }
3072
3073 GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
3074 if (GST_BUFFER_PTS_IS_VALID (buffer))
3075 GST_BUFFER_PTS (buffer) += offset;
3076
3077 GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
3078 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
3079
3080 if (GST_BUFFER_PTS_IS_VALID (buffer)) {
3081 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3082 stream->segment.position = GST_BUFFER_PTS (buffer);
3083
3084 /* Convert from position inside the stream's segment to the demuxer's
3085 * segment, they are not necessarily the same */
3086 if (stream->segment.position - offset + period_start >
3087 demux->segment.position)
3088 demux->segment.position =
3089 stream->segment.position - offset + period_start;
3090 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3091 }
3092
3093 } else {
3094 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
3095 }
3096
3097 /* downloading_first_buffer is set to TRUE in download_uri() just before
3098 * activating the source (i.e. requesting a given URI)
3099 *
3100 * The difference with starting_fragment is that this will be called
3101 * for *all* first buffers (of index, and header, and fragment)
3102 *
3103 * ... to then only do something useful (in this block) for actual
3104 * fragments... */
3105 if (stream->downloading_first_buffer) {
3106 gint64 chunk_size = 0;
3107
3108 stream->downloading_first_buffer = FALSE;
3109
3110 if (!stream->downloading_header && !stream->downloading_index) {
3111 /* If this is the first buffer of a fragment (not the headers or index)
3112 * and we don't have a birate from the sub-class, then see if we
3113 * can work it out from the fragment size and duration */
3114 if (stream->fragment.bitrate == 0 &&
3115 stream->fragment.duration != 0 &&
3116 gst_element_query_duration (stream->uri_handler, GST_FORMAT_BYTES,
3117 &chunk_size) && chunk_size != -1) {
3118 guint bitrate = MIN (G_MAXUINT, gst_util_uint64_scale (chunk_size,
3119 8 * GST_SECOND, stream->fragment.duration));
3120 GST_LOG_OBJECT (demux,
3121 "Fragment has size %" G_GINT64_FORMAT " duration %" GST_TIME_FORMAT
3122 " = bitrate %u", chunk_size,
3123 GST_TIME_ARGS (stream->fragment.duration), bitrate);
3124 stream->fragment.bitrate = bitrate;
3125 }
3126 if (stream->fragment.bitrate) {
3127 stream->bitrate_changed = TRUE;
3128 } else {
3129 GST_WARNING_OBJECT (demux, "Bitrate for fragment not available");
3130 }
3131 }
3132 }
3133
3134 stream->download_total_bytes += gst_buffer_get_size (buffer);
3135
3136 GST_TRACE_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT,
3137 gst_buffer_get_size (buffer));
3138
3139 ret = klass->data_received (demux, stream, buffer);
3140
3141 if (ret == GST_FLOW_FLUSHING) {
3142 /* do not make any changes if the stream is cancelled */
3143 g_mutex_lock (&stream->fragment_download_lock);
3144 if (G_UNLIKELY (stream->cancelled)) {
3145 g_mutex_unlock (&stream->fragment_download_lock);
3146 GST_MANIFEST_UNLOCK (demux);
3147 return ret;
3148 }
3149 g_mutex_unlock (&stream->fragment_download_lock);
3150 }
3151
3152 if (ret != GST_FLOW_OK) {
3153 gboolean finished = FALSE;
3154
3155 if (ret < GST_FLOW_EOS) {
3156 GST_ELEMENT_FLOW_ERROR (demux, ret);
3157
3158 /* TODO push this on all pads */
3159 gst_pad_push_event (stream->pad, gst_event_new_eos ());
3160 } else {
3161 GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
3162 gst_flow_get_name (ret));
3163 }
3164
3165 if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
3166 ret = GST_FLOW_EOS; /* return EOS to make the source stop */
3167 } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
3168 /* Behaves like an EOS event from upstream */
3169 stream->fragment.finished = TRUE;
3170 ret = klass->finish_fragment (demux, stream);
3171 if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
3172 ret = GST_FLOW_EOS; /* return EOS to make the source stop */
3173 } else if (ret != GST_FLOW_OK) {
3174 goto error;
3175 }
3176 finished = TRUE;
3177 }
3178
3179 gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
3180 if (finished)
3181 ret = GST_FLOW_EOS;
3182 }
3183
3184 error:
3185
3186 GST_MANIFEST_UNLOCK (demux);
3187
3188 return ret;
3189 }
3190
3191 /* must be called with manifest_lock taken */
3192 static void
gst_adaptive_demux_stream_fragment_download_finish(GstAdaptiveDemuxStream * stream,GstFlowReturn ret,GError * err)3193 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
3194 stream, GstFlowReturn ret, GError * err)
3195 {
3196 GST_DEBUG_OBJECT (stream->pad, "Download finish: %d %s - err: %p", ret,
3197 gst_flow_get_name (ret), err);
3198
3199 /* if we have an error, only replace last_ret if it was OK before to avoid
3200 * overwriting the first error we got */
3201 if (stream->last_ret == GST_FLOW_OK) {
3202 stream->last_ret = ret;
3203 if (err) {
3204 g_clear_error (&stream->last_error);
3205 stream->last_error = g_error_copy (err);
3206 }
3207 }
3208 g_mutex_lock (&stream->fragment_download_lock);
3209 stream->download_finished = TRUE;
3210 g_cond_signal (&stream->fragment_download_cond);
3211 g_mutex_unlock (&stream->fragment_download_lock);
3212 }
3213
3214 static GstFlowReturn
gst_adaptive_demux_eos_handling(GstAdaptiveDemuxStream * stream)3215 gst_adaptive_demux_eos_handling (GstAdaptiveDemuxStream * stream)
3216 {
3217 GstFlowReturn ret = GST_FLOW_OK;
3218 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
3219
3220 if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
3221 || !klass->need_another_chunk (stream)
3222 || stream->fragment.chunk_size == 0) {
3223 stream->fragment.finished = TRUE;
3224
3225 /* Last chance to figure out a fallback nominal bitrate if neither baseclass
3226 nor the HTTP Content-Length implementation worked. */
3227 if (stream->fragment.bitrate == 0 && stream->fragment.duration != 0 &&
3228 stream->fragment_bytes_downloaded != 0 && !stream->downloading_index &&
3229 !stream->downloading_header) {
3230 guint bitrate = MIN (G_MAXUINT,
3231 gst_util_uint64_scale (stream->fragment_bytes_downloaded,
3232 8 * GST_SECOND, stream->fragment.duration));
3233 GST_LOG_OBJECT (stream->pad,
3234 "Fragment has size %" G_GUINT64_FORMAT " duration %" GST_TIME_FORMAT
3235 " = bitrate %u", stream->fragment_bytes_downloaded,
3236 GST_TIME_ARGS (stream->fragment.duration), bitrate);
3237 stream->fragment.bitrate = bitrate;
3238 stream->bitrate_changed = TRUE;
3239 }
3240 ret = klass->finish_fragment (stream->demux, stream);
3241 }
3242 gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
3243
3244 return ret;
3245 }
3246
3247 static gboolean
_src_event(GstPad * pad,GstObject * parent,GstEvent * event)3248 _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3249 {
3250 GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
3251 GstAdaptiveDemux *demux = stream->demux;
3252
3253 switch (GST_EVENT_TYPE (event)) {
3254 case GST_EVENT_EOS:{
3255 GST_DEBUG_OBJECT (pad, "Saw EOS on src pad");
3256 GST_MANIFEST_LOCK (demux);
3257
3258 gst_adaptive_demux_eos_handling (stream);
3259
3260 /* FIXME ?
3261 * _eos_handling() calls fragment_download_finish() which does the
3262 * same thing as below.
3263 * Could this cause races ? */
3264 g_mutex_lock (&stream->fragment_download_lock);
3265 stream->download_finished = TRUE;
3266 g_cond_signal (&stream->fragment_download_cond);
3267 g_mutex_unlock (&stream->fragment_download_lock);
3268
3269 GST_MANIFEST_UNLOCK (demux);
3270 break;
3271 }
3272 default:
3273 break;
3274 }
3275
3276 gst_event_unref (event);
3277
3278 return TRUE;
3279 }
3280
3281 static gboolean
_src_query(GstPad * pad,GstObject * parent,GstQuery * query)3282 _src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3283 {
3284 GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
3285
3286 switch (GST_QUERY_TYPE (query)) {
3287 case GST_QUERY_ALLOCATION:
3288 return FALSE;
3289 break;
3290 default:
3291 break;
3292 }
3293
3294 return gst_pad_peer_query (stream->pad, query);
3295 }
3296
3297 static GstPadProbeReturn
_uri_handler_probe(GstPad * pad,GstPadProbeInfo * info,GstAdaptiveDemuxStream * stream)3298 _uri_handler_probe (GstPad * pad, GstPadProbeInfo * info,
3299 GstAdaptiveDemuxStream * stream)
3300 {
3301 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
3302
3303 if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
3304 GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
3305 if (stream->fragment_bytes_downloaded == 0) {
3306 stream->last_latency =
3307 gst_adaptive_demux_get_monotonic_time (stream->demux) -
3308 (stream->download_start_time * GST_USECOND);
3309 GST_DEBUG_OBJECT (pad,
3310 "FIRST BYTE since download_start %" GST_TIME_FORMAT,
3311 GST_TIME_ARGS (stream->last_latency));
3312 }
3313 stream->fragment_bytes_downloaded += gst_buffer_get_size (buf);
3314 GST_LOG_OBJECT (pad,
3315 "Received buffer, size %" G_GSIZE_FORMAT " total %" G_GUINT64_FORMAT,
3316 gst_buffer_get_size (buf), stream->fragment_bytes_downloaded);
3317 } else if (GST_PAD_PROBE_INFO_TYPE (info) &
3318 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
3319 GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
3320 GST_LOG_OBJECT (pad, "Received event %s %" GST_PTR_FORMAT,
3321 GST_EVENT_TYPE_NAME (ev), ev);
3322 switch (GST_EVENT_TYPE (ev)) {
3323 case GST_EVENT_SEGMENT:
3324 stream->fragment_bytes_downloaded = 0;
3325 break;
3326 case GST_EVENT_EOS:
3327 {
3328 stream->last_download_time =
3329 gst_adaptive_demux_get_monotonic_time (stream->demux) -
3330 (stream->download_start_time * GST_USECOND);
3331 stream->last_bitrate =
3332 gst_util_uint64_scale (stream->fragment_bytes_downloaded,
3333 8 * GST_SECOND, stream->last_download_time);
3334 GST_DEBUG_OBJECT (pad,
3335 "EOS since download_start %" GST_TIME_FORMAT " bitrate %"
3336 G_GUINT64_FORMAT " bps", GST_TIME_ARGS (stream->last_download_time),
3337 stream->last_bitrate);
3338 /* Calculate bitrate since URI request */
3339 }
3340 break;
3341 default:
3342 break;
3343 }
3344 }
3345
3346 return ret;
3347 }
3348
3349 /* must be called with manifest_lock taken.
3350 * Can temporarily release manifest_lock
3351 */
3352 static gboolean
gst_adaptive_demux_stream_wait_manifest_update(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)3353 gst_adaptive_demux_stream_wait_manifest_update (GstAdaptiveDemux * demux,
3354 GstAdaptiveDemuxStream * stream)
3355 {
3356 gboolean ret = TRUE;
3357
3358 /* Wait until we're cancelled or there's something for
3359 * us to download in the playlist or the playlist
3360 * became non-live */
3361 while (TRUE) {
3362 GST_DEBUG_OBJECT (demux, "No fragment left but live playlist, wait a bit");
3363
3364 /* get the manifest_update_lock while still holding the manifest_lock.
3365 * This will prevent other threads to signal the condition (they will need
3366 * both manifest_lock and manifest_update_lock in order to signal).
3367 * It cannot deadlock because all threads always get the manifest_lock first
3368 * and manifest_update_lock second.
3369 */
3370 g_mutex_lock (&demux->priv->manifest_update_lock);
3371
3372 GST_MANIFEST_UNLOCK (demux);
3373
3374 g_cond_wait (&demux->priv->manifest_cond,
3375 &demux->priv->manifest_update_lock);
3376 g_mutex_unlock (&demux->priv->manifest_update_lock);
3377
3378 GST_MANIFEST_LOCK (demux);
3379
3380 /* check for cancelled every time we get the manifest_lock */
3381 g_mutex_lock (&stream->fragment_download_lock);
3382 if (G_UNLIKELY (stream->cancelled)) {
3383 ret = FALSE;
3384 stream->last_ret = GST_FLOW_FLUSHING;
3385 g_mutex_unlock (&stream->fragment_download_lock);
3386 break;
3387 }
3388 g_mutex_unlock (&stream->fragment_download_lock);
3389
3390 /* Got a new fragment or not live anymore? */
3391 if (gst_adaptive_demux_stream_update_fragment_info (demux, stream) ==
3392 GST_FLOW_OK) {
3393 GST_DEBUG_OBJECT (demux, "new fragment available, "
3394 "not waiting for manifest update");
3395 ret = TRUE;
3396 break;
3397 }
3398
3399 if (!gst_adaptive_demux_is_live (demux)) {
3400 GST_DEBUG_OBJECT (demux, "Not live anymore, "
3401 "not waiting for manifest update");
3402 ret = FALSE;
3403 break;
3404 }
3405 }
3406 GST_DEBUG_OBJECT (demux, "Retrying now");
3407 return ret;
3408 }
3409
3410 /* must be called with manifest_lock taken */
3411 static gboolean
gst_adaptive_demux_stream_update_source(GstAdaptiveDemuxStream * stream,const gchar * uri,const gchar * referer,gboolean refresh,gboolean allow_cache)3412 gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
3413 const gchar * uri, const gchar * referer, gboolean refresh,
3414 gboolean allow_cache)
3415 {
3416 GstAdaptiveDemux *demux = stream->demux;
3417
3418 if (!gst_uri_is_valid (uri)) {
3419 GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
3420 return FALSE;
3421 }
3422
3423 /* Try to re-use existing source element */
3424 if (stream->src != NULL) {
3425 gchar *old_protocol, *new_protocol;
3426 gchar *old_uri;
3427
3428 old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler));
3429 old_protocol = gst_uri_get_protocol (old_uri);
3430 new_protocol = gst_uri_get_protocol (uri);
3431
3432 if (!g_str_equal (old_protocol, new_protocol)) {
3433 GstElement *src = stream->src;
3434
3435 stream->src = NULL;
3436 gst_object_unref (stream->src_srcpad);
3437 stream->src_srcpad = NULL;
3438 GST_MANIFEST_UNLOCK (demux);
3439 gst_element_set_locked_state (src, TRUE);
3440 gst_element_set_state (src, GST_STATE_NULL);
3441 gst_bin_remove (GST_BIN_CAST (demux), src);
3442 GST_MANIFEST_LOCK (demux);
3443 GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
3444 } else {
3445 GError *err = NULL;
3446
3447 GST_DEBUG_OBJECT (demux, "Re-using old source element");
3448 if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri,
3449 &err)) {
3450 GstElement *src = stream->src;
3451
3452 stream->src = NULL;
3453 GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
3454 err ? err->message : "Unknown error");
3455 g_clear_error (&err);
3456 gst_object_unref (stream->src_srcpad);
3457 stream->src_srcpad = NULL;
3458 GST_MANIFEST_UNLOCK (demux);
3459 gst_element_set_locked_state (src, TRUE);
3460 gst_element_set_state (src, GST_STATE_NULL);
3461 gst_bin_remove (GST_BIN_CAST (demux), src);
3462 GST_MANIFEST_LOCK (demux);
3463 }
3464 }
3465 g_free (old_uri);
3466 g_free (old_protocol);
3467 g_free (new_protocol);
3468 }
3469
3470 if (stream->src == NULL) {
3471 GstPad *uri_handler_src;
3472 GstPad *queue_sink;
3473 GstPad *queue_src;
3474 GstElement *uri_handler;
3475 GstElement *queue;
3476 GstPadLinkReturn pad_link_ret;
3477 GObjectClass *gobject_class;
3478 gchar *internal_name, *bin_name;
3479
3480 /* Our src consists of a bin containing uri_handler -> queue . The
3481 * purpose of the queue is to allow the uri_handler to download an
3482 * entire fragment without blocking, so we can accurately measure the
3483 * download bitrate. */
3484
3485 queue = gst_element_factory_make ("queue", NULL);
3486 if (queue == NULL)
3487 return FALSE;
3488
3489 g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL);
3490 g_object_set (queue, "max-size-buffers", (guint) 0, NULL);
3491 g_object_set (queue, "max-size-time", (guint64) 0, NULL);
3492
3493 uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
3494 if (uri_handler == NULL) {
3495 GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
3496 ("Missing plugin to handle URI: '%s'", uri), (NULL));
3497 gst_object_unref (queue);
3498 return FALSE;
3499 }
3500
3501 gobject_class = G_OBJECT_GET_CLASS (uri_handler);
3502
3503 if (g_object_class_find_property (gobject_class, "compress"))
3504 g_object_set (uri_handler, "compress", FALSE, NULL);
3505 if (g_object_class_find_property (gobject_class, "keep-alive"))
3506 g_object_set (uri_handler, "keep-alive", TRUE, NULL);
3507 if (g_object_class_find_property (gobject_class, "extra-headers")) {
3508 if (referer || refresh || !allow_cache) {
3509 GstStructure *extra_headers = gst_structure_new_empty ("headers");
3510
3511 if (referer)
3512 gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
3513 NULL);
3514
3515 if (!allow_cache)
3516 gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3517 "no-cache", NULL);
3518 else if (refresh)
3519 gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3520 "max-age=0", NULL);
3521
3522 g_object_set (uri_handler, "extra-headers", extra_headers, NULL);
3523
3524 gst_structure_free (extra_headers);
3525 } else {
3526 g_object_set (uri_handler, "extra-headers", NULL, NULL);
3527 }
3528 }
3529
3530 /* Source bin creation */
3531 bin_name = g_strdup_printf ("srcbin-%s", GST_PAD_NAME (stream->pad));
3532 stream->src = gst_bin_new (bin_name);
3533 g_free (bin_name);
3534 if (stream->src == NULL) {
3535 gst_object_unref (queue);
3536 gst_object_unref (uri_handler);
3537 return FALSE;
3538 }
3539
3540 gst_bin_add (GST_BIN_CAST (stream->src), queue);
3541 gst_bin_add (GST_BIN_CAST (stream->src), uri_handler);
3542
3543 uri_handler_src = gst_element_get_static_pad (uri_handler, "src");
3544 queue_sink = gst_element_get_static_pad (queue, "sink");
3545
3546 pad_link_ret =
3547 gst_pad_link_full (uri_handler_src, queue_sink,
3548 GST_PAD_LINK_CHECK_NOTHING);
3549 if (GST_PAD_LINK_FAILED (pad_link_ret)) {
3550 GST_WARNING_OBJECT (demux,
3551 "Could not link pads %s:%s to %s:%s for reason %d",
3552 GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink),
3553 pad_link_ret);
3554 g_object_unref (queue_sink);
3555 g_object_unref (uri_handler_src);
3556 gst_object_unref (stream->src);
3557 stream->src = NULL;
3558 return FALSE;
3559 }
3560
3561 /* Add a downstream event and data probe */
3562 gst_pad_add_probe (uri_handler_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
3563 (GstPadProbeCallback) _uri_handler_probe, stream, NULL);
3564
3565 g_object_unref (queue_sink);
3566 g_object_unref (uri_handler_src);
3567 queue_src = gst_element_get_static_pad (queue, "src");
3568 stream->src_srcpad = gst_ghost_pad_new ("src", queue_src);
3569 g_object_unref (queue_src);
3570 gst_element_add_pad (stream->src, stream->src_srcpad);
3571
3572 gst_element_set_locked_state (stream->src, TRUE);
3573 gst_bin_add (GST_BIN_CAST (demux), stream->src);
3574 stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
3575
3576 /* set up our internal floating pad to drop all events from
3577 * the http src we don't care about. On the chain function
3578 * we just push the buffer forward */
3579 internal_name = g_strdup_printf ("internal-%s", GST_PAD_NAME (stream->pad));
3580 stream->internal_pad = gst_pad_new (internal_name, GST_PAD_SINK);
3581 g_free (internal_name);
3582 gst_object_set_parent (GST_OBJECT_CAST (stream->internal_pad),
3583 GST_OBJECT_CAST (demux));
3584 GST_OBJECT_FLAG_SET (stream->internal_pad, GST_PAD_FLAG_NEED_PARENT);
3585 gst_pad_set_element_private (stream->internal_pad, stream);
3586 gst_pad_set_active (stream->internal_pad, TRUE);
3587 gst_pad_set_chain_function (stream->internal_pad, _src_chain);
3588 gst_pad_set_event_function (stream->internal_pad, _src_event);
3589 gst_pad_set_query_function (stream->internal_pad, _src_query);
3590
3591 if (gst_pad_link_full (stream->src_srcpad, stream->internal_pad,
3592 GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
3593 GST_ERROR_OBJECT (stream->pad, "Failed to link internal pad");
3594 return FALSE;
3595 }
3596
3597 stream->uri_handler = uri_handler;
3598 stream->queue = queue;
3599
3600 stream->last_status_code = 200; /* default to OK */
3601 }
3602 return TRUE;
3603 }
3604
3605 static GstPadProbeReturn
gst_ad_stream_src_to_ready_cb(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)3606 gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
3607 gpointer user_data)
3608 {
3609 GstAdaptiveDemuxStream *stream = user_data;
3610
3611 /* The source's src pad is IDLE so now set the state to READY */
3612 g_mutex_lock (&stream->fragment_download_lock);
3613 stream->src_at_ready = TRUE;
3614 g_cond_signal (&stream->fragment_download_cond);
3615 g_mutex_unlock (&stream->fragment_download_lock);
3616
3617 return GST_PAD_PROBE_REMOVE;
3618 }
3619
3620 #ifndef GST_DISABLE_GST_DEBUG
3621 static const char *
uritype(GstAdaptiveDemuxStream * s)3622 uritype (GstAdaptiveDemuxStream * s)
3623 {
3624 if (s->downloading_header)
3625 return "header";
3626 if (s->downloading_index)
3627 return "index";
3628 return "fragment";
3629 }
3630 #endif
3631
3632 /* must be called with manifest_lock taken.
3633 * Can temporarily release manifest_lock
3634 *
3635 * Will return when URI is fully downloaded (or aborted/errored)
3636 */
3637 static GstFlowReturn
gst_adaptive_demux_stream_download_uri(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,const gchar * uri,gint64 start,gint64 end,guint * http_status)3638 gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
3639 GstAdaptiveDemuxStream * stream, const gchar * uri, gint64 start,
3640 gint64 end, guint * http_status)
3641 {
3642 GstFlowReturn ret = GST_FLOW_OK;
3643 GST_DEBUG_OBJECT (stream->pad,
3644 "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
3645 uritype (stream), uri, start, end);
3646
3647 if (http_status)
3648 *http_status = 200; /* default to ok if no further information */
3649
3650 if (!gst_adaptive_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
3651 ret = stream->last_ret = GST_FLOW_ERROR;
3652 return ret;
3653 }
3654
3655 gst_element_set_locked_state (stream->src, TRUE);
3656
3657 GST_MANIFEST_UNLOCK (demux);
3658 if (gst_element_set_state (stream->src,
3659 GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
3660 /* If ranges are specified, seek to it */
3661 if (start != 0 || end != -1) {
3662 /* HTTP ranges are inclusive, GStreamer segments are exclusive for the
3663 * stop position */
3664 if (end != -1)
3665 end += 1;
3666 /* Send the seek event to the uri_handler, as the other pipeline elements
3667 * can't handle it when READY. */
3668 if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0,
3669 GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
3670 GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
3671
3672 GST_MANIFEST_LOCK (demux);
3673 /* looks like the source can't handle seeks in READY */
3674 g_clear_error (&stream->last_error);
3675 stream->last_error = g_error_new (GST_CORE_ERROR,
3676 GST_CORE_ERROR_NOT_IMPLEMENTED,
3677 "Source element can't handle range requests");
3678 stream->last_ret = GST_FLOW_ERROR;
3679 } else {
3680 GST_MANIFEST_LOCK (demux);
3681 }
3682 } else {
3683 GST_MANIFEST_LOCK (demux);
3684 }
3685
3686 if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
3687 stream->download_start_time =
3688 GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
3689
3690 /* src element is in state READY. Before we start it, we reset
3691 * download_finished
3692 */
3693 g_mutex_lock (&stream->fragment_download_lock);
3694 stream->download_finished = FALSE;
3695 stream->downloading_first_buffer = TRUE;
3696 g_mutex_unlock (&stream->fragment_download_lock);
3697
3698 GST_MANIFEST_UNLOCK (demux);
3699
3700 if (!gst_element_sync_state_with_parent (stream->src)) {
3701 GST_WARNING_OBJECT (demux, "Could not sync state for src element");
3702 GST_MANIFEST_LOCK (demux);
3703 ret = stream->last_ret = GST_FLOW_ERROR;
3704 return ret;
3705 }
3706
3707 /* wait for the fragment to be completely downloaded */
3708 GST_DEBUG_OBJECT (stream->pad,
3709 "Waiting for %s download to finish: %s", uritype (stream), uri);
3710
3711 g_mutex_lock (&stream->fragment_download_lock);
3712 stream->src_at_ready = FALSE;
3713 if (G_UNLIKELY (stream->cancelled)) {
3714 g_mutex_unlock (&stream->fragment_download_lock);
3715 GST_MANIFEST_LOCK (demux);
3716 ret = stream->last_ret = GST_FLOW_FLUSHING;
3717 return ret;
3718 }
3719 /* download_finished is only set:
3720 * * in ::fragment_download_finish()
3721 * * if EOS is received on the _src pad
3722 * */
3723 while (!stream->cancelled && !stream->download_finished) {
3724 g_cond_wait (&stream->fragment_download_cond,
3725 &stream->fragment_download_lock);
3726 }
3727 g_mutex_unlock (&stream->fragment_download_lock);
3728
3729 GST_DEBUG_OBJECT (stream->pad,
3730 "Finished Waiting for %s download: %s", uritype (stream), uri);
3731
3732 GST_MANIFEST_LOCK (demux);
3733 g_mutex_lock (&stream->fragment_download_lock);
3734 if (G_UNLIKELY (stream->cancelled)) {
3735 ret = stream->last_ret = GST_FLOW_FLUSHING;
3736 g_mutex_unlock (&stream->fragment_download_lock);
3737 return ret;
3738 }
3739 g_mutex_unlock (&stream->fragment_download_lock);
3740
3741 ret = stream->last_ret;
3742
3743 GST_DEBUG_OBJECT (stream->pad, "%s download finished: %s %d %s",
3744 uritype (stream), uri, stream->last_ret,
3745 gst_flow_get_name (stream->last_ret));
3746 if (stream->last_ret != GST_FLOW_OK && http_status) {
3747 *http_status = stream->last_status_code;
3748 }
3749 }
3750
3751 /* changing src element state might try to join the streaming thread, so
3752 * we must not hold the manifest lock.
3753 */
3754 GST_MANIFEST_UNLOCK (demux);
3755 } else {
3756 GST_MANIFEST_UNLOCK (demux);
3757 if (stream->last_ret == GST_FLOW_OK)
3758 stream->last_ret = GST_FLOW_CUSTOM_ERROR;
3759 ret = GST_FLOW_CUSTOM_ERROR;
3760 }
3761
3762 stream->src_at_ready = FALSE;
3763
3764 gst_element_set_locked_state (stream->src, TRUE);
3765 gst_pad_add_probe (stream->src_srcpad, GST_PAD_PROBE_TYPE_IDLE,
3766 gst_ad_stream_src_to_ready_cb, stream, NULL);
3767
3768 g_mutex_lock (&stream->fragment_download_lock);
3769 while (!stream->src_at_ready) {
3770 g_cond_wait (&stream->fragment_download_cond,
3771 &stream->fragment_download_lock);
3772 }
3773 g_mutex_unlock (&stream->fragment_download_lock);
3774
3775 gst_element_set_state (stream->src, GST_STATE_READY);
3776
3777 /* Need to drop the fragment_download_lock to get the MANIFEST lock */
3778 GST_MANIFEST_LOCK (demux);
3779 g_mutex_lock (&stream->fragment_download_lock);
3780 if (G_UNLIKELY (stream->cancelled)) {
3781 ret = stream->last_ret = GST_FLOW_FLUSHING;
3782 g_mutex_unlock (&stream->fragment_download_lock);
3783 return ret;
3784 }
3785 g_mutex_unlock (&stream->fragment_download_lock);
3786
3787 /* deactivate and reactivate our ghostpad to make it fresh for a new
3788 * stream */
3789 gst_pad_set_active (stream->internal_pad, FALSE);
3790 gst_pad_set_active (stream->internal_pad, TRUE);
3791
3792 return ret;
3793 }
3794
3795 /* must be called with manifest_lock taken.
3796 * Can temporarily release manifest_lock
3797 */
3798 static GstFlowReturn
gst_adaptive_demux_stream_download_header_fragment(GstAdaptiveDemuxStream * stream)3799 gst_adaptive_demux_stream_download_header_fragment (GstAdaptiveDemuxStream *
3800 stream)
3801 {
3802 GstAdaptiveDemux *demux = stream->demux;
3803 GstFlowReturn ret = GST_FLOW_OK;
3804
3805 if (stream->fragment.header_uri != NULL) {
3806 GST_DEBUG_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
3807 G_GINT64_FORMAT, stream->fragment.header_uri,
3808 stream->fragment.header_range_start, stream->fragment.header_range_end);
3809
3810 stream->downloading_header = TRUE;
3811 ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3812 stream->fragment.header_uri, stream->fragment.header_range_start,
3813 stream->fragment.header_range_end, NULL);
3814 stream->downloading_header = FALSE;
3815 }
3816
3817 /* check if we have an index */
3818 if (ret == GST_FLOW_OK) { /* TODO check for other valid types */
3819
3820 if (stream->fragment.index_uri != NULL) {
3821 GST_DEBUG_OBJECT (demux,
3822 "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
3823 stream->fragment.index_uri,
3824 stream->fragment.index_range_start, stream->fragment.index_range_end);
3825 stream->downloading_index = TRUE;
3826 ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3827 stream->fragment.index_uri, stream->fragment.index_range_start,
3828 stream->fragment.index_range_end, NULL);
3829 stream->downloading_index = FALSE;
3830 }
3831 }
3832
3833 return ret;
3834 }
3835
3836 /* must be called with manifest_lock taken.
3837 * Can temporarily release manifest_lock
3838 */
3839 static GstFlowReturn
gst_adaptive_demux_stream_download_fragment(GstAdaptiveDemuxStream * stream)3840 gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
3841 {
3842 GstAdaptiveDemux *demux = stream->demux;
3843 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3844 gchar *url = NULL;
3845 GstFlowReturn ret;
3846 gboolean retried_once = FALSE, live;
3847 guint http_status;
3848 guint last_status_code;
3849
3850 /* FIXME : */
3851 /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
3852 stream->starting_fragment = TRUE;
3853 stream->last_ret = GST_FLOW_OK;
3854 stream->first_fragment_buffer = TRUE;
3855
3856 GST_DEBUG_OBJECT (stream->pad, "Downloading %s%s%s",
3857 stream->fragment.uri ? "FRAGMENT " : "",
3858 stream->fragment.header_uri ? "HEADER " : "",
3859 stream->fragment.index_uri ? "INDEX" : "");
3860
3861 if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
3862 stream->fragment.index_uri == NULL)
3863 goto no_url_error;
3864
3865 if (stream->need_header) {
3866 ret = gst_adaptive_demux_stream_download_header_fragment (stream);
3867 if (ret != GST_FLOW_OK) {
3868 return ret;
3869 }
3870 stream->need_header = FALSE;
3871 }
3872
3873 again:
3874 ret = GST_FLOW_OK;
3875 url = stream->fragment.uri;
3876 GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream);
3877 if (!url)
3878 return ret;
3879
3880 stream->last_ret = GST_FLOW_OK;
3881 http_status = 200;
3882
3883 /* Download the actual fragment, either in fragments or in one go */
3884 if (klass->need_another_chunk && klass->need_another_chunk (stream)
3885 && stream->fragment.chunk_size != 0) {
3886 /* Handle chunk downloading */
3887 gint64 range_start, range_end, chunk_start, chunk_end;
3888 guint64 download_total_bytes;
3889 gint chunk_size = stream->fragment.chunk_size;
3890
3891 range_start = chunk_start = stream->fragment.range_start;
3892 range_end = stream->fragment.range_end;
3893 /* HTTP ranges are inclusive for the end */
3894 if (chunk_size != -1)
3895 chunk_end = range_start + chunk_size - 1;
3896 else
3897 chunk_end = range_end;
3898
3899 if (range_end != -1)
3900 chunk_end = MIN (chunk_end, range_end);
3901
3902 while (!stream->fragment.finished && (chunk_start <= range_end
3903 || range_end == -1)) {
3904 download_total_bytes = stream->download_total_bytes;
3905
3906 ret =
3907 gst_adaptive_demux_stream_download_uri (demux, stream, url,
3908 chunk_start, chunk_end, &http_status);
3909
3910 GST_DEBUG_OBJECT (stream->pad,
3911 "Fragment chunk download result: %d (%d) %s", stream->last_ret,
3912 http_status, gst_flow_get_name (stream->last_ret));
3913
3914 /* Don't retry for any chunks except the first. We would have sent
3915 * data downstream already otherwise and it's difficult to recover
3916 * from that in a meaningful way */
3917 if (chunk_start > range_start)
3918 retried_once = TRUE;
3919
3920 /* FIXME: Check for 416 Range Not Satisfiable here and fall back to
3921 * downloading up to -1. We don't know the full duration.
3922 * Needs https://bugzilla.gnome.org/show_bug.cgi?id=756806 */
3923 if (ret != GST_FLOW_OK && chunk_end == -1) {
3924 break;
3925 } else if (ret != GST_FLOW_OK) {
3926 chunk_end = -1;
3927 stream->last_ret = GST_FLOW_OK;
3928 continue;
3929 }
3930
3931 if (chunk_end == -1)
3932 break;
3933
3934 /* Short read, we're at the end now */
3935 if (stream->download_total_bytes - download_total_bytes <
3936 chunk_end + 1 - chunk_start)
3937 break;
3938
3939 if (!klass->need_another_chunk (stream))
3940 break;
3941
3942 /* HTTP ranges are inclusive for the end */
3943 chunk_start += chunk_size;
3944 chunk_size = stream->fragment.chunk_size;
3945 if (chunk_size != -1)
3946 chunk_end = chunk_start + chunk_size - 1;
3947 else
3948 chunk_end = range_end;
3949
3950 if (range_end != -1)
3951 chunk_end = MIN (chunk_end, range_end);
3952 }
3953 } else {
3954 ret =
3955 gst_adaptive_demux_stream_download_uri (demux, stream, url,
3956 stream->fragment.range_start, stream->fragment.range_end, &http_status);
3957 GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s",
3958 stream->last_ret, http_status, gst_flow_get_name (stream->last_ret));
3959 }
3960 if (ret == GST_FLOW_OK)
3961 goto beach;
3962
3963 #ifdef OHOS_EXT_FUNC
3964 /**
3965 * ohos.ext.func.0033
3966 * Support reconnection after disconnection in gstcurl.
3967 */
3968 if (ret == GST_FLOW_CUSTOM_ERROR && stream->errcode == GST_RESOURCE_ERROR_TIME_OUT) {
3969 return GST_FLOW_ERROR;
3970 }
3971 #endif
3972
3973 g_mutex_lock (&stream->fragment_download_lock);
3974 if (G_UNLIKELY (stream->cancelled)) {
3975 g_mutex_unlock (&stream->fragment_download_lock);
3976 return ret;
3977 }
3978 g_mutex_unlock (&stream->fragment_download_lock);
3979
3980 /* TODO check if we are truly stopping */
3981 if (ret != GST_FLOW_CUSTOM_ERROR)
3982 goto beach;
3983
3984 last_status_code = stream->last_status_code;
3985 GST_WARNING_OBJECT (stream->pad, "Got custom error, status %u, dc %d",
3986 last_status_code, stream->download_error_count);
3987
3988 live = gst_adaptive_demux_is_live (demux);
3989
3990 #ifdef OHOS_EXT_FUNC
3991 // ohos.ext.func.0013
3992 if (last_status_code == 408) {
3993 GST_WARNING_OBJECT (stream->pad, "Receive error of souphttpsrc, status_code is %u", last_status_code);
3994 stream->download_error_count = MAX_DOWNLOAD_ERROR_COUNT;
3995 return GST_FLOW_ERROR;
3996 }
3997 #endif
3998
3999 if (!retried_once && ((last_status_code / 100 == 4 && live)
4000 || last_status_code / 100 == 5)) {
4001 /* 4xx/5xx */
4002 /* if current position is before available start, switch to next */
4003 if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream))
4004 goto flushing;
4005
4006 if (live) {
4007 gint64 range_start, range_stop;
4008
4009 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
4010 &range_stop))
4011 goto flushing;
4012
4013 if (demux->segment.position < range_start) {
4014 GST_DEBUG_OBJECT (stream->pad, "Retrying once with next segment");
4015 stream->last_ret = GST_FLOW_OK;
4016 ret = gst_adaptive_demux_eos_handling (stream);
4017 GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
4018 gst_flow_get_name (ret));
4019 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
4020 ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
4021 GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
4022 gst_flow_get_name (ret));
4023 if (ret == GST_FLOW_OK) {
4024 retried_once = TRUE;
4025 goto again;
4026 }
4027 } else if (demux->segment.position > range_stop) {
4028 /* wait a bit to be in range, we don't have any locks at that point */
4029 gint64 wait_time =
4030 gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
4031 if (wait_time > 0) {
4032 gint64 end_time = g_get_monotonic_time () + wait_time / GST_USECOND;
4033
4034 GST_DEBUG_OBJECT (stream->pad,
4035 "Download waiting for %" GST_TIME_FORMAT,
4036 GST_TIME_ARGS (wait_time));
4037
4038 GST_MANIFEST_UNLOCK (demux);
4039 g_mutex_lock (&stream->fragment_download_lock);
4040 if (G_UNLIKELY (stream->cancelled)) {
4041 g_mutex_unlock (&stream->fragment_download_lock);
4042 GST_MANIFEST_LOCK (demux);
4043 stream->last_ret = GST_FLOW_FLUSHING;
4044 goto flushing;
4045 }
4046 do {
4047 g_cond_wait_until (&stream->fragment_download_cond,
4048 &stream->fragment_download_lock, end_time);
4049 if (G_UNLIKELY (stream->cancelled)) {
4050 g_mutex_unlock (&stream->fragment_download_lock);
4051 GST_MANIFEST_LOCK (demux);
4052 stream->last_ret = GST_FLOW_FLUSHING;
4053 goto flushing;
4054 }
4055 } while (!stream->download_finished);
4056 g_mutex_unlock (&stream->fragment_download_lock);
4057
4058 GST_MANIFEST_LOCK (demux);
4059 }
4060 }
4061 }
4062
4063 flushing:
4064 if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
4065 /* looks like there is no way of knowing when a live stream has ended
4066 * Have to assume we are falling behind and cause a manifest reload */
4067 GST_DEBUG_OBJECT (stream->pad, "Converting error of live stream to EOS");
4068 return GST_FLOW_EOS;
4069 }
4070 } else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
4071 /* If this is the last fragment, consider failures EOS and not actual
4072 * errors. Due to rounding errors in the durations, the last fragment
4073 * might not actually exist */
4074 GST_DEBUG_OBJECT (stream->pad, "Converting error for last fragment to EOS");
4075 return GST_FLOW_EOS;
4076 } else {
4077 /* retry once (same segment) for 5xx (server errors) */
4078 if (!retried_once) {
4079 retried_once = TRUE;
4080 /* wait a short time in case the server needs a bit to recover, we don't
4081 * care if we get woken up before end time. We can use sleep here since
4082 * we're already blocking and just want to wait some time. */
4083 g_usleep (100000); /* a tenth of a second */
4084 goto again;
4085 }
4086 }
4087
4088 beach:
4089 return ret;
4090
4091 no_url_error:
4092 {
4093 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
4094 (_("Failed to get fragment URL.")),
4095 ("An error happened when getting fragment URL"));
4096 gst_task_stop (stream->download_task);
4097 return GST_FLOW_ERROR;
4098 }
4099 }
4100
4101 /* this function will take the manifest_lock and will keep it until the end.
4102 * It will release it temporarily only when going to sleep.
4103 * Every time it takes the manifest_lock, it will check for cancelled condition
4104 */
4105 static void
gst_adaptive_demux_stream_download_loop(GstAdaptiveDemuxStream * stream)4106 gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
4107 {
4108 GstAdaptiveDemux *demux = stream->demux;
4109 GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
4110 GstFlowReturn ret;
4111 gboolean live;
4112
4113 GST_LOG_OBJECT (stream->pad, "download loop start");
4114
4115 GST_MANIFEST_LOCK (demux);
4116
4117 g_mutex_lock (&stream->fragment_download_lock);
4118 if (G_UNLIKELY (stream->cancelled)) {
4119 stream->last_ret = GST_FLOW_FLUSHING;
4120 g_mutex_unlock (&stream->fragment_download_lock);
4121 goto cancelled;
4122 }
4123 g_mutex_unlock (&stream->fragment_download_lock);
4124
4125 /* Check if we're done with our segment */
4126 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4127 if (demux->segment.rate > 0) {
4128 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
4129 && stream->segment.position >= stream->segment.stop) {
4130 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4131 ret = GST_FLOW_EOS;
4132 gst_task_stop (stream->download_task);
4133 goto end_of_manifest;
4134 }
4135 } else {
4136 if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
4137 && stream->segment.position <= stream->segment.start) {
4138 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4139 ret = GST_FLOW_EOS;
4140 gst_task_stop (stream->download_task);
4141 goto end_of_manifest;
4142 }
4143 }
4144 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4145
4146 /* Cleanup old streams if any */
4147 if (G_UNLIKELY (demux->priv->old_streams != NULL)) {
4148 GList *old_streams = demux->priv->old_streams;
4149 demux->priv->old_streams = NULL;
4150
4151 GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams");
4152 g_list_free_full (old_streams,
4153 (GDestroyNotify) gst_adaptive_demux_stream_free);
4154 GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams (done)");
4155
4156 /* gst_adaptive_demux_stream_free had temporarily released the manifest_lock.
4157 * Recheck the cancelled flag.
4158 */
4159 g_mutex_lock (&stream->fragment_download_lock);
4160 if (G_UNLIKELY (stream->cancelled)) {
4161 stream->last_ret = GST_FLOW_FLUSHING;
4162 g_mutex_unlock (&stream->fragment_download_lock);
4163 goto cancelled;
4164 }
4165 g_mutex_unlock (&stream->fragment_download_lock);
4166 }
4167
4168 /* Restarting download, figure out new position
4169 * FIXME : Move this to a separate function ? */
4170 if (G_UNLIKELY (stream->restart_download)) {
4171 GstEvent *seg_event;
4172 GstClockTime cur, ts = 0;
4173 gint64 pos;
4174
4175 GST_DEBUG_OBJECT (stream->pad,
4176 "Activating stream due to reconfigure event");
4177
4178 if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
4179 ts = (GstClockTime) pos;
4180 GST_DEBUG_OBJECT (demux, "Downstream position: %"
4181 GST_TIME_FORMAT, GST_TIME_ARGS (ts));
4182 } else {
4183 /* query other pads as some faulty element in the pad's branch might
4184 * reject position queries. This should be better than using the
4185 * demux segment position that can be much ahead */
4186 GList *iter;
4187
4188 for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
4189 GstAdaptiveDemuxStream *cur_stream =
4190 (GstAdaptiveDemuxStream *) iter->data;
4191
4192 if (gst_pad_peer_query_position (cur_stream->pad, GST_FORMAT_TIME,
4193 &pos)) {
4194 ts = (GstClockTime) pos;
4195 GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
4196 GST_TIME_FORMAT, GST_TIME_ARGS (ts));
4197 break;
4198 }
4199 }
4200 }
4201
4202 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4203 cur =
4204 gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
4205 stream->segment.position);
4206
4207 /* we might have already pushed this data */
4208 ts = MAX (ts, cur);
4209
4210 GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
4211 "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
4212
4213 if (GST_CLOCK_TIME_IS_VALID (ts)) {
4214 GstClockTime offset, period_start;
4215
4216 offset =
4217 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4218 period_start = gst_adaptive_demux_get_period_start_time (demux);
4219
4220 /* TODO check return */
4221 gst_adaptive_demux_stream_seek (demux, stream, demux->segment.rate >= 0,
4222 0, ts, &ts);
4223
4224 stream->segment.position = ts - period_start + offset;
4225 }
4226
4227 /* The stream's segment is still correct except for
4228 * the position, so let's send a new one with the
4229 * updated position */
4230 seg_event = gst_event_new_segment (&stream->segment);
4231 gst_event_set_seqnum (seg_event, demux->priv->segment_seqnum);
4232 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4233
4234 GST_DEBUG_OBJECT (stream->pad, "Sending restart segment: %"
4235 GST_PTR_FORMAT, seg_event);
4236 gst_pad_push_event (stream->pad, seg_event);
4237
4238 stream->discont = TRUE;
4239 stream->restart_download = FALSE;
4240 }
4241
4242 live = gst_adaptive_demux_is_live (demux);
4243
4244 /* Get information about the fragment to download */
4245 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
4246 ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
4247 GST_DEBUG_OBJECT (stream->pad, "Fragment info update result: %d %s",
4248 ret, gst_flow_get_name (ret));
4249 if (ret == GST_FLOW_OK) {
4250
4251 /* wait for live fragments to be available */
4252 if (live) {
4253 gint64 wait_time =
4254 gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
4255 if (wait_time > 0) {
4256 GstClockTime end_time =
4257 gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
4258
4259 GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
4260 GST_TIME_ARGS (wait_time));
4261
4262 GST_MANIFEST_UNLOCK (demux);
4263
4264 g_mutex_lock (&stream->fragment_download_lock);
4265 if (G_UNLIKELY (stream->cancelled)) {
4266 g_mutex_unlock (&stream->fragment_download_lock);
4267 GST_MANIFEST_LOCK (demux);
4268 stream->last_ret = GST_FLOW_FLUSHING;
4269 goto cancelled;
4270 }
4271 gst_adaptive_demux_wait_until (demux->realtime_clock,
4272 &stream->fragment_download_cond, &stream->fragment_download_lock,
4273 end_time);
4274 g_mutex_unlock (&stream->fragment_download_lock);
4275
4276 GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
4277
4278 GST_MANIFEST_LOCK (demux);
4279
4280 g_mutex_lock (&stream->fragment_download_lock);
4281 if (G_UNLIKELY (stream->cancelled)) {
4282 stream->last_ret = GST_FLOW_FLUSHING;
4283 g_mutex_unlock (&stream->fragment_download_lock);
4284 goto cancelled;
4285 }
4286 g_mutex_unlock (&stream->fragment_download_lock);
4287 }
4288 }
4289
4290 stream->last_ret = GST_FLOW_OK;
4291
4292 next_download = gst_adaptive_demux_get_monotonic_time (demux);
4293 ret = gst_adaptive_demux_stream_download_fragment (stream);
4294
4295 #ifdef OHOS_EXT_FUNC
4296 /**
4297 * ohos.ext.func.0033
4298 * Support reconnection after disconnection in gstcurl.
4299 * Stop download task when reconnection timeout.
4300 */
4301 if (ret == GST_FLOW_ERROR && stream->errcode == GST_RESOURCE_ERROR_TIME_OUT) {
4302 GST_WARNING_OBJECT (stream->pad, "download failed since connect time out, stop download task");
4303 gst_task_stop (stream->download_task);
4304 goto end;
4305 }
4306 #endif
4307
4308 if (ret == GST_FLOW_FLUSHING) {
4309 g_mutex_lock (&stream->fragment_download_lock);
4310 if (G_UNLIKELY (stream->cancelled)) {
4311 stream->last_ret = GST_FLOW_FLUSHING;
4312 g_mutex_unlock (&stream->fragment_download_lock);
4313 goto cancelled;
4314 }
4315 g_mutex_unlock (&stream->fragment_download_lock);
4316 }
4317
4318 } else {
4319 stream->last_ret = ret;
4320 }
4321
4322 switch (ret) {
4323 case GST_FLOW_OK:
4324 break; /* all is good, let's go */
4325 case GST_FLOW_EOS:
4326 GST_DEBUG_OBJECT (stream->pad, "EOS, checking to stop download loop");
4327
4328 /* we push the EOS after releasing the object lock */
4329 if (gst_adaptive_demux_is_live (demux)
4330 && (demux->segment.rate == 1.0
4331 || gst_adaptive_demux_stream_in_live_seek_range (demux,
4332 stream))) {
4333 GstAdaptiveDemuxClass *demux_class =
4334 GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4335
4336 /* this might be a fragment download error, refresh the manifest, just in case */
4337 if (!demux_class->requires_periodical_playlist_update (demux)) {
4338 ret = gst_adaptive_demux_update_manifest (demux);
4339 break;
4340 /* Wait only if we can ensure current manifest has been expired.
4341 * The meaning "we have next period" *WITH* EOS is that, current
4342 * period has been ended but we can continue to the next period */
4343 } else if (!gst_adaptive_demux_has_next_period (demux) &&
4344 gst_adaptive_demux_stream_wait_manifest_update (demux, stream)) {
4345 goto end;
4346 }
4347 gst_task_stop (stream->download_task);
4348 if (stream->replaced) {
4349 goto end;
4350 }
4351 } else {
4352 gst_task_stop (stream->download_task);
4353 }
4354
4355 if (gst_adaptive_demux_combine_flows (demux) == GST_FLOW_EOS) {
4356 if (gst_adaptive_demux_has_next_period (demux)) {
4357 GST_DEBUG_OBJECT (stream->pad,
4358 "Next period available, not sending EOS");
4359 gst_adaptive_demux_advance_period (demux);
4360 ret = GST_FLOW_OK;
4361 }
4362 }
4363 break;
4364
4365 case GST_FLOW_NOT_LINKED:
4366 {
4367 GstFlowReturn ret;
4368 gst_task_stop (stream->download_task);
4369
4370 ret = gst_adaptive_demux_combine_flows (demux);
4371 if (ret == GST_FLOW_NOT_LINKED) {
4372 GST_ELEMENT_FLOW_ERROR (demux, ret);
4373 }
4374 }
4375 break;
4376
4377 case GST_FLOW_FLUSHING:{
4378 GList *iter;
4379
4380 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4381 GstAdaptiveDemuxStream *other;
4382
4383 other = iter->data;
4384 gst_task_stop (other->download_task);
4385 }
4386 }
4387 break;
4388
4389 default:
4390 if (ret <= GST_FLOW_ERROR) {
4391 gboolean is_live = gst_adaptive_demux_is_live (demux);
4392 GST_WARNING_OBJECT (demux, "Error while downloading fragment");
4393 if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
4394 goto download_error;
4395 }
4396
4397 g_clear_error (&stream->last_error);
4398
4399 /* First try to update the playlist for non-live playlists
4400 * in case the URIs have changed in the meantime. But only
4401 * try it the first time, after that we're going to wait a
4402 * a bit to not flood the server */
4403 if (stream->download_error_count == 1 && !is_live) {
4404 /* TODO hlsdemux had more options to this function (boolean and err) */
4405
4406 if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
4407 /* Retry immediately, the playlist actually has changed */
4408 GST_DEBUG_OBJECT (demux, "Updated the playlist");
4409 goto end;
4410 }
4411 }
4412
4413 /* Wait half the fragment duration before retrying */
4414 next_download += stream->fragment.duration / 2;
4415
4416 GST_MANIFEST_UNLOCK (demux);
4417
4418 g_mutex_lock (&stream->fragment_download_lock);
4419 if (G_UNLIKELY (stream->cancelled)) {
4420 g_mutex_unlock (&stream->fragment_download_lock);
4421 GST_MANIFEST_LOCK (demux);
4422 stream->last_ret = GST_FLOW_FLUSHING;
4423 goto cancelled;
4424 }
4425 gst_adaptive_demux_wait_until (demux->realtime_clock,
4426 &stream->fragment_download_cond, &stream->fragment_download_lock,
4427 next_download);
4428 g_mutex_unlock (&stream->fragment_download_lock);
4429
4430 GST_DEBUG_OBJECT (demux, "Retrying now");
4431
4432 GST_MANIFEST_LOCK (demux);
4433
4434 g_mutex_lock (&stream->fragment_download_lock);
4435 if (G_UNLIKELY (stream->cancelled)) {
4436 stream->last_ret = GST_FLOW_FLUSHING;
4437 g_mutex_unlock (&stream->fragment_download_lock);
4438 goto cancelled;
4439 }
4440 g_mutex_unlock (&stream->fragment_download_lock);
4441
4442 /* Refetch the playlist now after we waited */
4443 if (!is_live
4444 && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
4445 GST_DEBUG_OBJECT (demux, "Updated the playlist");
4446 }
4447 goto end;
4448 }
4449 break;
4450 }
4451
4452 end_of_manifest:
4453 if (G_UNLIKELY (ret == GST_FLOW_EOS)) {
4454 if (GST_OBJECT_PARENT (stream->pad) != NULL) {
4455 if (demux->next_streams == NULL && demux->prepared_streams == NULL) {
4456 GST_DEBUG_OBJECT (stream->src, "Pushing EOS on pad");
4457 gst_adaptive_demux_stream_push_event (stream, gst_event_new_eos ());
4458 } else {
4459 GST_DEBUG_OBJECT (stream->src,
4460 "Stream is EOS, but we're switching fragments. Not sending.");
4461 }
4462 } else {
4463 GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
4464 goto download_error;
4465 }
4466 }
4467
4468 end:
4469 GST_MANIFEST_UNLOCK (demux);
4470 GST_LOG_OBJECT (stream->pad, "download loop end");
4471 return;
4472
4473 cancelled:
4474 {
4475 GST_DEBUG_OBJECT (stream->pad, "Stream has been cancelled");
4476 goto end;
4477 }
4478 download_error:
4479 {
4480 GstMessage *msg;
4481
4482 if (stream->last_error) {
4483 gchar *debug = g_strdup_printf ("Error on stream %s:%s",
4484 GST_DEBUG_PAD_NAME (stream->pad));
4485 msg =
4486 gst_message_new_error (GST_OBJECT_CAST (demux), stream->last_error,
4487 debug);
4488 GST_ERROR_OBJECT (stream->pad, "Download error: %s",
4489 stream->last_error->message);
4490 g_free (debug);
4491 } else {
4492 GError *err =
4493 g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
4494 _("Couldn't download fragments"));
4495 msg =
4496 gst_message_new_error (GST_OBJECT_CAST (demux), err,
4497 "Fragment downloading has failed consecutive times");
4498 g_error_free (err);
4499 GST_ERROR_OBJECT (stream->pad,
4500 "Download error: Couldn't download fragments, too many failures");
4501 }
4502
4503 gst_task_stop (stream->download_task);
4504 if (stream->src) {
4505 GstElement *src = stream->src;
4506
4507 stream->src = NULL;
4508 GST_MANIFEST_UNLOCK (demux);
4509 gst_element_set_locked_state (src, TRUE);
4510 gst_element_set_state (src, GST_STATE_NULL);
4511 gst_bin_remove (GST_BIN_CAST (demux), src);
4512 GST_MANIFEST_LOCK (demux);
4513 }
4514
4515 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
4516
4517 goto end;
4518 }
4519 }
4520
4521 static void
gst_adaptive_demux_updates_loop(GstAdaptiveDemux * demux)4522 gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
4523 {
4524 GstClockTime next_update;
4525 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4526
4527 /* Loop for updating of the playlist. This periodically checks if
4528 * the playlist is updated and does so, then signals the streaming
4529 * thread in case it can continue downloading now. */
4530
4531 /* block until the next scheduled update or the signal to quit this thread */
4532 GST_DEBUG_OBJECT (demux, "Started updates task");
4533
4534 GST_MANIFEST_LOCK (demux);
4535
4536 next_update =
4537 gst_adaptive_demux_get_monotonic_time (demux) +
4538 klass->get_manifest_update_interval (demux) * GST_USECOND;
4539
4540 /* Updating playlist only needed for live playlists */
4541 while (gst_adaptive_demux_is_live (demux)) {
4542 GstFlowReturn ret = GST_FLOW_OK;
4543
4544 /* Wait here until we should do the next update or we're cancelled */
4545 GST_DEBUG_OBJECT (demux, "Wait for next playlist update");
4546
4547 GST_MANIFEST_UNLOCK (demux);
4548
4549 g_mutex_lock (&demux->priv->updates_timed_lock);
4550 if (demux->priv->stop_updates_task) {
4551 g_mutex_unlock (&demux->priv->updates_timed_lock);
4552 goto quit;
4553 }
4554 gst_adaptive_demux_wait_until (demux->realtime_clock,
4555 &demux->priv->updates_timed_cond,
4556 &demux->priv->updates_timed_lock, next_update);
4557 g_mutex_unlock (&demux->priv->updates_timed_lock);
4558
4559 g_mutex_lock (&demux->priv->updates_timed_lock);
4560 if (demux->priv->stop_updates_task) {
4561 g_mutex_unlock (&demux->priv->updates_timed_lock);
4562 goto quit;
4563 }
4564 g_mutex_unlock (&demux->priv->updates_timed_lock);
4565
4566 GST_MANIFEST_LOCK (demux);
4567
4568 GST_DEBUG_OBJECT (demux, "Updating playlist");
4569
4570 ret = gst_adaptive_demux_update_manifest (demux);
4571
4572 if (ret == GST_FLOW_EOS) {
4573 } else if (ret != GST_FLOW_OK) {
4574 /* update_failed_count is used only here, no need to protect it */
4575 demux->priv->update_failed_count++;
4576 if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
4577 GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
4578 gst_flow_get_name (ret));
4579 next_update = gst_adaptive_demux_get_monotonic_time (demux)
4580 + klass->get_manifest_update_interval (demux) * GST_USECOND;
4581 } else {
4582 GST_ELEMENT_ERROR (demux, STREAM, FAILED,
4583 (_("Internal data stream error.")), ("Could not update playlist"));
4584 GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
4585 gst_task_stop (demux->priv->updates_task);
4586 GST_MANIFEST_UNLOCK (demux);
4587 goto end;
4588 }
4589 } else {
4590 GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
4591 demux->priv->update_failed_count = 0;
4592 next_update =
4593 gst_adaptive_demux_get_monotonic_time (demux) +
4594 klass->get_manifest_update_interval (demux) * GST_USECOND;
4595
4596 /* Wake up download tasks */
4597 g_mutex_lock (&demux->priv->manifest_update_lock);
4598 g_cond_broadcast (&demux->priv->manifest_cond);
4599 g_mutex_unlock (&demux->priv->manifest_update_lock);
4600 }
4601 }
4602
4603 GST_MANIFEST_UNLOCK (demux);
4604
4605 quit:
4606 {
4607 GST_DEBUG_OBJECT (demux, "Stop updates task request detected.");
4608 }
4609
4610 end:
4611 {
4612 return;
4613 }
4614 }
4615
4616 /* must be called with manifest_lock taken */
4617 static gboolean
gst_adaptive_demux_stream_push_event(GstAdaptiveDemuxStream * stream,GstEvent * event)4618 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
4619 GstEvent * event)
4620 {
4621 gboolean ret;
4622 GstPad *pad;
4623 GstAdaptiveDemux *demux = stream->demux;
4624
4625 if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
4626 stream->eos = TRUE;
4627 }
4628
4629 pad = gst_object_ref (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream));
4630
4631 /* Can't push events holding the manifest lock */
4632 GST_MANIFEST_UNLOCK (demux);
4633
4634 GST_DEBUG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
4635 "Pushing event %" GST_PTR_FORMAT, event);
4636
4637 ret = gst_pad_push_event (pad, event);
4638
4639 gst_object_unref (pad);
4640
4641 GST_MANIFEST_LOCK (demux);
4642
4643 return ret;
4644 }
4645
4646 /* must be called with manifest_lock taken */
4647 static gboolean
gst_adaptive_demux_is_live(GstAdaptiveDemux * demux)4648 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
4649 {
4650 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4651
4652 if (klass->is_live)
4653 return klass->is_live (demux);
4654 return FALSE;
4655 }
4656
4657 /* must be called with manifest_lock taken */
4658 static GstFlowReturn
gst_adaptive_demux_stream_seek(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,gboolean forward,GstSeekFlags flags,GstClockTime ts,GstClockTime * final_ts)4659 gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
4660 GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
4661 GstClockTime ts, GstClockTime * final_ts)
4662 {
4663 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4664
4665 if (klass->stream_seek)
4666 return klass->stream_seek (stream, forward, flags, ts, final_ts);
4667 return GST_FLOW_ERROR;
4668 }
4669
4670 /* must be called with manifest_lock taken */
4671 static gboolean
gst_adaptive_demux_stream_has_next_fragment(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4672 gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
4673 GstAdaptiveDemuxStream * stream)
4674 {
4675 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4676 gboolean ret = TRUE;
4677
4678 if (klass->stream_has_next_fragment)
4679 ret = klass->stream_has_next_fragment (stream);
4680
4681 return ret;
4682 }
4683
4684 /* must be called with manifest_lock taken */
4685 /* Called from:
4686 * the ::finish_fragment() handlers when an *actual* fragment is done
4687 * */
4688 GstFlowReturn
gst_adaptive_demux_stream_advance_fragment(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstClockTime duration)4689 gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
4690 GstAdaptiveDemuxStream * stream, GstClockTime duration)
4691 {
4692 GstFlowReturn ret;
4693
4694 if (stream->last_ret == GST_FLOW_OK) {
4695 stream->last_ret =
4696 gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
4697 duration);
4698 }
4699 ret = stream->last_ret;
4700
4701 return ret;
4702 }
4703
4704 /* must be called with manifest_lock taken */
4705 GstFlowReturn
gst_adaptive_demux_stream_advance_fragment_unlocked(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstClockTime duration)4706 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
4707 GstAdaptiveDemuxStream * stream, GstClockTime duration)
4708 {
4709 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4710 GstFlowReturn ret;
4711
4712 g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
4713
4714 GST_LOG_OBJECT (stream->pad,
4715 "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4716 GST_TIME_ARGS (stream->fragment.timestamp), GST_TIME_ARGS (duration));
4717
4718 stream->download_error_count = 0;
4719 g_clear_error (&stream->last_error);
4720
4721 /* FIXME - url has no indication of byte ranges for subsegments */
4722 /* FIXME : All those time statistics are biased, since they are calculated
4723 * *AFTER* the queue2, which might be blocking. They should ideally be
4724 * calculated *before* queue2 in the uri_handler_probe */
4725 gst_element_post_message (GST_ELEMENT_CAST (demux),
4726 gst_message_new_element (GST_OBJECT_CAST (demux),
4727 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
4728 "manifest-uri", G_TYPE_STRING,
4729 demux->manifest_uri, "uri", G_TYPE_STRING,
4730 stream->fragment.uri, "fragment-start-time",
4731 GST_TYPE_CLOCK_TIME, stream->download_start_time,
4732 "fragment-stop-time", GST_TYPE_CLOCK_TIME,
4733 gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
4734 stream->download_total_bytes, "fragment-download-time",
4735 GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
4736
4737 /* Don't update to the end of the segment if in reverse playback */
4738 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4739 if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
4740 GstClockTime offset =
4741 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4742 GstClockTime period_start =
4743 gst_adaptive_demux_get_period_start_time (demux);
4744
4745 stream->segment.position += duration;
4746
4747 /* Convert from position inside the stream's segment to the demuxer's
4748 * segment, they are not necessarily the same */
4749 if (stream->segment.position - offset + period_start >
4750 demux->segment.position)
4751 demux->segment.position =
4752 stream->segment.position - offset + period_start;
4753 }
4754 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4755
4756 /* When advancing with a non 1.0 rate on live streams, we need to check
4757 * the live seeking range again to make sure we can still advance to
4758 * that position */
4759 if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
4760 if (!gst_adaptive_demux_stream_in_live_seek_range (demux, stream))
4761 ret = GST_FLOW_EOS;
4762 else
4763 ret = klass->stream_advance_fragment (stream);
4764 } else if (gst_adaptive_demux_is_live (demux)
4765 || gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
4766 ret = klass->stream_advance_fragment (stream);
4767 } else {
4768 ret = GST_FLOW_EOS;
4769 }
4770
4771 stream->download_start_time =
4772 GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
4773
4774 if (ret == GST_FLOW_OK) {
4775 if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
4776 gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) {
4777 stream->need_header = TRUE;
4778 ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
4779 }
4780
4781 /* the subclass might want to switch pads */
4782 if (G_UNLIKELY (demux->next_streams)) {
4783 GList *iter;
4784 gboolean can_expose = TRUE;
4785
4786 gst_task_stop (stream->download_task);
4787
4788 ret = GST_FLOW_EOS;
4789
4790 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4791 /* Only expose if all streams are now cancelled or finished downloading */
4792 GstAdaptiveDemuxStream *other = iter->data;
4793 if (other != stream) {
4794 g_mutex_lock (&other->fragment_download_lock);
4795 can_expose &= (other->cancelled == TRUE
4796 || other->download_finished == TRUE);
4797 g_mutex_unlock (&other->fragment_download_lock);
4798 }
4799 }
4800
4801 if (can_expose) {
4802 GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
4803 "to do bitrate switching");
4804 gst_adaptive_demux_prepare_streams (demux, FALSE);
4805 gst_adaptive_demux_start_tasks (demux, TRUE);
4806 } else {
4807 GST_LOG_OBJECT (demux, "Not switching yet - ongoing downloads");
4808 }
4809 }
4810 }
4811
4812 return ret;
4813 }
4814
4815 /* must be called with manifest_lock taken */
4816 static gboolean
gst_adaptive_demux_stream_select_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,guint64 bitrate)4817 gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
4818 demux, GstAdaptiveDemuxStream * stream, guint64 bitrate)
4819 {
4820 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4821
4822 if (klass->stream_select_bitrate)
4823 return klass->stream_select_bitrate (stream, bitrate);
4824 return FALSE;
4825 }
4826
4827 /* must be called with manifest_lock taken */
4828 static GstFlowReturn
gst_adaptive_demux_stream_update_fragment_info(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4829 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
4830 GstAdaptiveDemuxStream * stream)
4831 {
4832 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4833 GstFlowReturn ret;
4834
4835 g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
4836 GST_FLOW_ERROR);
4837
4838 /* Make sure the sub-class will update bitrate, or else
4839 * we will later */
4840 stream->fragment.bitrate = 0;
4841 stream->fragment.finished = FALSE;
4842
4843 GST_LOG_OBJECT (stream->pad, "position %" GST_TIME_FORMAT,
4844 GST_TIME_ARGS (stream->segment.position));
4845
4846 ret = klass->stream_update_fragment_info (stream);
4847
4848 GST_LOG_OBJECT (stream->pad, "ret:%s uri:%s", gst_flow_get_name (ret),
4849 stream->fragment.uri);
4850 if (ret == GST_FLOW_OK) {
4851 GST_LOG_OBJECT (stream->pad,
4852 "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4853 GST_TIME_ARGS (stream->fragment.timestamp),
4854 GST_TIME_ARGS (stream->fragment.duration));
4855 GST_LOG_OBJECT (stream->pad,
4856 "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
4857 stream->fragment.range_start, stream->fragment.range_end);
4858 }
4859
4860 return ret;
4861 }
4862
4863 /* must be called with manifest_lock taken */
4864 static gint64
gst_adaptive_demux_stream_get_fragment_waiting_time(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4865 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux *
4866 demux, GstAdaptiveDemuxStream * stream)
4867 {
4868 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4869
4870 if (klass->stream_get_fragment_waiting_time)
4871 return klass->stream_get_fragment_waiting_time (stream);
4872 return 0;
4873 }
4874
4875 /* must be called with manifest_lock taken */
4876 static GstFlowReturn
gst_adaptive_demux_update_manifest_default(GstAdaptiveDemux * demux)4877 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4878 {
4879 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4880 GstFragment *download;
4881 GstBuffer *buffer;
4882 GstFlowReturn ret;
4883 GError *error = NULL;
4884
4885 download = gst_uri_downloader_fetch_uri (demux->downloader,
4886 demux->manifest_uri, NULL, TRUE, TRUE, TRUE, &error);
4887 if (download) {
4888 g_free (demux->manifest_uri);
4889 g_free (demux->manifest_base_uri);
4890 if (download->redirect_permanent && download->redirect_uri) {
4891 demux->manifest_uri = g_strdup (download->redirect_uri);
4892 demux->manifest_base_uri = NULL;
4893 } else {
4894 demux->manifest_uri = g_strdup (download->uri);
4895 demux->manifest_base_uri = g_strdup (download->redirect_uri);
4896 }
4897
4898 buffer = gst_fragment_get_buffer (download);
4899 g_object_unref (download);
4900 ret = klass->update_manifest_data (demux, buffer);
4901 gst_buffer_unref (buffer);
4902 /* FIXME: Should the manifest uri vars be reverted to original
4903 * values if updating fails? */
4904 } else {
4905 GST_WARNING_OBJECT (demux, "Failed to download manifest: %s",
4906 error->message);
4907 ret = GST_FLOW_NOT_LINKED;
4908 }
4909 g_clear_error (&error);
4910
4911 return ret;
4912 }
4913
4914 /* must be called with manifest_lock taken */
4915 static GstFlowReturn
gst_adaptive_demux_update_manifest(GstAdaptiveDemux * demux)4916 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4917 {
4918 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4919 GstFlowReturn ret;
4920
4921 ret = klass->update_manifest (demux);
4922
4923 if (ret == GST_FLOW_OK) {
4924 GstClockTime duration;
4925 /* Send an updated duration message */
4926 duration = klass->get_duration (demux);
4927 if (duration != GST_CLOCK_TIME_NONE) {
4928 GST_DEBUG_OBJECT (demux,
4929 "Sending duration message : %" GST_TIME_FORMAT,
4930 GST_TIME_ARGS (duration));
4931 gst_element_post_message (GST_ELEMENT (demux),
4932 gst_message_new_duration_changed (GST_OBJECT (demux)));
4933 } else {
4934 GST_DEBUG_OBJECT (demux,
4935 "Duration unknown, can not send the duration message");
4936 }
4937
4938 /* If a manifest changes it's liveness or periodic updateness, we need
4939 * to start/stop the manifest update task appropriately */
4940 /* Keep this condition in sync with the one in
4941 * gst_adaptive_demux_start_manifest_update_task()
4942 */
4943 if (gst_adaptive_demux_is_live (demux) &&
4944 klass->requires_periodical_playlist_update (demux)) {
4945 gst_adaptive_demux_start_manifest_update_task (demux);
4946 } else {
4947 gst_adaptive_demux_stop_manifest_update_task (demux);
4948 }
4949 }
4950
4951 return ret;
4952 }
4953
4954 void
gst_adaptive_demux_stream_fragment_clear(GstAdaptiveDemuxStreamFragment * f)4955 gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f)
4956 {
4957 g_free (f->uri);
4958 f->uri = NULL;
4959 f->range_start = 0;
4960 f->range_end = -1;
4961
4962 g_free (f->header_uri);
4963 f->header_uri = NULL;
4964 f->header_range_start = 0;
4965 f->header_range_end = -1;
4966
4967 g_free (f->index_uri);
4968 f->index_uri = NULL;
4969 f->index_range_start = 0;
4970 f->index_range_end = -1;
4971
4972 f->finished = FALSE;
4973 }
4974
4975 /* must be called with manifest_lock taken */
4976 static gboolean
gst_adaptive_demux_has_next_period(GstAdaptiveDemux * demux)4977 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4978 {
4979 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4980 gboolean ret = FALSE;
4981
4982 if (klass->has_next_period)
4983 ret = klass->has_next_period (demux);
4984 GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4985 return ret;
4986 }
4987
4988 /* must be called with manifest_lock taken */
4989 static void
gst_adaptive_demux_advance_period(GstAdaptiveDemux * demux)4990 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4991 {
4992 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4993
4994 g_return_if_fail (klass->advance_period != NULL);
4995
4996 GST_DEBUG_OBJECT (demux, "Advancing to next period");
4997 klass->advance_period (demux);
4998 gst_adaptive_demux_prepare_streams (demux, FALSE);
4999 gst_adaptive_demux_start_tasks (demux, TRUE);
5000 }
5001
5002 /**
5003 * gst_adaptive_demux_get_monotonic_time:
5004 * Returns: a monotonically increasing time, using the system realtime clock
5005 */
5006 GstClockTime
gst_adaptive_demux_get_monotonic_time(GstAdaptiveDemux * demux)5007 gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux)
5008 {
5009 g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
5010 return gst_clock_get_time (demux->realtime_clock);
5011 }
5012
5013 /**
5014 * gst_adaptive_demux_get_client_now_utc:
5015 * @demux: #GstAdaptiveDemux
5016 * Returns: the client's estimate of UTC
5017 *
5018 * Used to find the client's estimate of UTC, using the system realtime clock.
5019 */
5020 GDateTime *
gst_adaptive_demux_get_client_now_utc(GstAdaptiveDemux * demux)5021 gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
5022 {
5023 GstClockTime rtc_now;
5024 GDateTime *unix_datetime;
5025 GDateTime *result_datetime;
5026 gint64 utc_now_in_us;
5027
5028 rtc_now = gst_clock_get_time (demux->realtime_clock);
5029 utc_now_in_us = demux->clock_offset + GST_TIME_AS_USECONDS (rtc_now);
5030 unix_datetime =
5031 g_date_time_new_from_unix_utc (utc_now_in_us / G_TIME_SPAN_SECOND);
5032 result_datetime =
5033 g_date_time_add (unix_datetime, utc_now_in_us % G_TIME_SPAN_SECOND);
5034 g_date_time_unref (unix_datetime);
5035 return result_datetime;
5036 }
5037
5038 /**
5039 * gst_adaptive_demux_is_running
5040 * @demux: #GstAdaptiveDemux
5041 * Returns: whether the demuxer is processing data
5042 *
5043 * Returns FALSE if shutdown has started (transitioning down from
5044 * PAUSED), otherwise TRUE.
5045 */
5046 gboolean
gst_adaptive_demux_is_running(GstAdaptiveDemux * demux)5047 gst_adaptive_demux_is_running (GstAdaptiveDemux * demux)
5048 {
5049 return g_atomic_int_get (&demux->running);
5050 }
5051
5052 static GstAdaptiveDemuxTimer *
gst_adaptive_demux_timer_new(GCond * cond,GMutex * mutex)5053 gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
5054 {
5055 GstAdaptiveDemuxTimer *timer;
5056
5057 timer = g_slice_new (GstAdaptiveDemuxTimer);
5058 timer->fired = FALSE;
5059 timer->cond = cond;
5060 timer->mutex = mutex;
5061 g_atomic_int_set (&timer->ref_count, 1);
5062 return timer;
5063 }
5064
5065 static GstAdaptiveDemuxTimer *
gst_adaptive_demux_timer_ref(GstAdaptiveDemuxTimer * timer)5066 gst_adaptive_demux_timer_ref (GstAdaptiveDemuxTimer * timer)
5067 {
5068 g_return_val_if_fail (timer != NULL, NULL);
5069 g_atomic_int_inc (&timer->ref_count);
5070 return timer;
5071 }
5072
5073 static void
gst_adaptive_demux_timer_unref(GstAdaptiveDemuxTimer * timer)5074 gst_adaptive_demux_timer_unref (GstAdaptiveDemuxTimer * timer)
5075 {
5076 g_return_if_fail (timer != NULL);
5077 if (g_atomic_int_dec_and_test (&timer->ref_count)) {
5078 g_slice_free (GstAdaptiveDemuxTimer, timer);
5079 }
5080 }
5081
5082 /* gst_adaptive_demux_wait_until:
5083 * A replacement for g_cond_wait_until that uses the clock rather
5084 * than system time to control the duration of the sleep. Typically
5085 * clock is actually a #GstSystemClock, in which case this function
5086 * behaves exactly like g_cond_wait_until. Inside unit tests,
5087 * the clock is typically a #GstTestClock, which allows tests to run
5088 * in non-realtime.
5089 * This function must be called with mutex held.
5090 */
5091 static gboolean
gst_adaptive_demux_wait_until(GstClock * clock,GCond * cond,GMutex * mutex,GstClockTime end_time)5092 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
5093 GstClockTime end_time)
5094 {
5095 GstAdaptiveDemuxTimer *timer;
5096 gboolean fired;
5097 GstClockReturn res;
5098
5099 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
5100 /* for an invalid time, gst_clock_id_wait_async will try to call
5101 * gst_adaptive_demux_clock_callback from the current thread.
5102 * It still holds the mutex while doing that, so it will deadlock.
5103 * g_cond_wait_until would return immediately with false, so we'll do the same.
5104 */
5105 return FALSE;
5106 }
5107 timer = gst_adaptive_demux_timer_new (cond, mutex);
5108 timer->clock_id = gst_clock_new_single_shot_id (clock, end_time);
5109 res =
5110 gst_clock_id_wait_async (timer->clock_id,
5111 gst_adaptive_demux_clock_callback, gst_adaptive_demux_timer_ref (timer),
5112 (GDestroyNotify) gst_adaptive_demux_timer_unref);
5113 /* clock does not support asynchronously wait. Assert and return */
5114 if (res == GST_CLOCK_UNSUPPORTED) {
5115 gst_clock_id_unref (timer->clock_id);
5116 gst_adaptive_demux_timer_unref (timer);
5117 g_return_val_if_reached (TRUE);
5118 }
5119 g_assert (!timer->fired);
5120 /* the gst_adaptive_demux_clock_callback() will signal the
5121 * cond when the clock's single shot timer fires, or the cond will be
5122 * signalled by another thread that wants to cause this wait to finish
5123 * early (e.g. to terminate the waiting thread).
5124 * There is no need for a while loop here, because that logic is
5125 * implemented by the function calling gst_adaptive_demux_wait_until() */
5126 g_cond_wait (cond, mutex);
5127 fired = timer->fired;
5128 if (!fired)
5129 gst_clock_id_unschedule (timer->clock_id);
5130 gst_clock_id_unref (timer->clock_id);
5131 gst_adaptive_demux_timer_unref (timer);
5132 return !fired;
5133 }
5134
5135 static gboolean
gst_adaptive_demux_clock_callback(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)5136 gst_adaptive_demux_clock_callback (GstClock * clock,
5137 GstClockTime time, GstClockID id, gpointer user_data)
5138 {
5139 GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
5140 g_return_val_if_fail (timer != NULL, FALSE);
5141 g_mutex_lock (timer->mutex);
5142 timer->fired = TRUE;
5143 g_cond_signal (timer->cond);
5144 g_mutex_unlock (timer->mutex);
5145 return TRUE;
5146 }
5147
5148 /**
5149 * gst_adaptive_demux_get_qos_earliest_time:
5150 *
5151 * Returns: The QOS earliest time
5152 *
5153 * Since: 1.20
5154 */
5155 GstClockTime
gst_adaptive_demux_get_qos_earliest_time(GstAdaptiveDemux * demux)5156 gst_adaptive_demux_get_qos_earliest_time (GstAdaptiveDemux * demux)
5157 {
5158 GstClockTime earliest;
5159
5160 GST_OBJECT_LOCK (demux);
5161 earliest = demux->priv->qos_earliest_time;
5162 GST_OBJECT_UNLOCK (demux);
5163
5164 return earliest;
5165 }
5166