• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 
22 /**
23  * SECTION:gstadaptivedemux
24  * @short_description: Base class for adaptive demuxers
25  *
26  * What is an adaptive demuxer?
27  * Adaptive demuxers are special demuxers in the sense that they don't
28  * actually demux data received from upstream but download the data
29  * themselves.
30  *
31  * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and
32  * a set of fragments. The manifest describes the available media and
33  * the sequence of fragments to use. Each fragment contains a small
34  * part of the media (typically only a few seconds). It is possible for
35  * the manifest to have the same media available in different configurations
36  * (bitrates for example) so that the client can select the one that
37  * best suits its scenario (network fluctuation, hardware requirements...).
38  * It is possible to switch from one representation of the media to another
39  * during playback. That's why it is called 'adaptive', because it can be
40  * adapted to the client's needs.
41  *
42  * Architectural overview:
43  * The manifest is received by the demuxer in its sink pad and, upon receiving
44  * EOS, it parses the manifest and exposes the streams available in it. For
45  * each stream a source element will be created and will download the list
46  * of fragments one by one. Once a fragment is finished downloading, the next
47  * URI is set to the source element and it starts fetching it and pushing
48  * through the stream's pad. This implies that each stream is independent from
49  * each other as it runs on a separate thread.
50  *
51  * After downloading each fragment, the download rate of it is calculated and
52  * the demuxer has a chance to switch to a different bitrate if needed. The
53  * switch can be done by simply pushing a new caps before the next fragment
54  * when codecs are the same, or by exposing a new pad group if it needs
55  * a codec change.
56  *
57  * Extra features:
58  * - Not linked streams: Streams that are not-linked have their download threads
59  *                       interrupted to save network bandwidth. When they are
60  *                       relinked a reconfigure event is received and the
61  *                       stream is restarted.
62  *
63  * Subclasses:
64  * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
65  * about the intrinsics of the subclass formats, so the subclasses are
66  * responsible for maintaining the manifest data structures and stream
67  * information.
68  */
69 
70 /*
71 MT safety.
72 The following rules were observed while implementing MT safety in adaptive demux:
73 1. If a variable is accessed from multiple threads and at least one thread
74 writes to it, then all the accesses needs to be done from inside a critical section.
75 2. If thread A wants to join thread B then at the moment it calls gst_task_join
76 it must not hold any mutexes that thread B might take.
77 
78 Adaptive demux API can be called from several threads. More, adaptive demux
79 starts some threads to monitor the download of fragments. In order to protect
80 accesses to shared variables (demux and streams) all the API functions that
81 can be run in different threads will need to get a mutex (manifest_lock)
82 when they start and release it when they end. Because some of those functions
83 can indirectly call other API functions (eg they can generate events or messages
84 that are processed in the same thread) the manifest_lock must be recursive.
85 
86 The manifest_lock will serialize the public API making access to shared
87 variables safe. But some of these functions will try at some moment to join
88 threads created by adaptive demux, or to change the state of src elements
89 (which will block trying to join the src element streaming thread). Because
90 of rule 2, those functions will need to release the manifest_lock during the
91 call of gst_task_join. During this time they can be interrupted by other API calls.
92 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
93 is called and this will join all threads. In order to prevent interruptions
94 during such period, all the API functions will also use a second lock: api_lock.
95 This will be taken at the beginning of the function and released at the end,
96 but this time this lock will not be temporarily released during join.
97 This lock will be used only by API calls (not by gst_adaptive_demux_stream_download_loop
98 or gst_adaptive_demux_updates_loop or _src_chain or _src_event) so it is safe
99 to hold it while joining the threads or changing the src element state. The
100 api_lock will serialise all external requests to adaptive demux. In order to
101 avoid deadlocks, if a function needs to acquire both manifest and api locks,
102 the api_lock will be taken first and the manifest_lock second.
103 
104 By using the api_lock a thread is protected against other API calls. But when
105 temporarily dropping the manifest_lock, it will be vulnerable to changes from
106 threads that use only the manifest_lock and not the api_lock. These threads run
107 one of the following functions: gst_adaptive_demux_stream_download_loop,
108 gst_adaptive_demux_updates_loop, _src_chain, _src_event. In order to guarantee
109 that all operations during an API call are not impacted by other writes, the
110 above mentioned functions must check a cancelled flag every time they reacquire
111 the manifest_lock. If the flag is set, they must exit immediately, without
112 performing any changes on the shared data. In this way, an API call (eg seek
113 request) can set the cancel flag before releasing the manifest_lock and be sure
114 that the demux object and its streams are not changed by anybody else.
115 */
116 
117 #ifdef HAVE_CONFIG_H
118 #include "config.h"
119 #endif
120 
121 #include "gstadaptivedemux.h"
122 #include "gst/gst-i18n-plugin.h"
123 #include <gst/base/gstadapter.h>
124 
125 GST_DEBUG_CATEGORY (adaptivedemux_debug);
126 #define GST_CAT_DEFAULT adaptivedemux_debug
127 
128 #define MAX_DOWNLOAD_ERROR_COUNT 3
129 #define DEFAULT_FAILED_COUNT 3
130 #define DEFAULT_CONNECTION_SPEED 0
131 #define DEFAULT_BITRATE_LIMIT 0.8f
132 #define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024    /* For safety. Large enough to hold a segment. */
133 #define NUM_LOOKBACK_FRAGMENTS 3
134 
135 #ifdef OHOS_EXT_FUNC
136 // ohos.ext.func.0013
137 #define DEFAULT_TIMEOUT              15
138 #endif
139 
140 #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
141 #define GST_MANIFEST_LOCK(d) G_STMT_START { \
142     GST_TRACE("Locking from thread %p", g_thread_self()); \
143     g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d)); \
144     GST_TRACE("Locked from thread %p", g_thread_self()); \
145  } G_STMT_END
146 
147 #define GST_MANIFEST_UNLOCK(d) G_STMT_START { \
148     GST_TRACE("Unlocking from thread %p", g_thread_self()); \
149     g_rec_mutex_unlock (GST_MANIFEST_GET_LOCK (d)); \
150  } G_STMT_END
151 
152 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
153 #define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
154 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
155 
156 #define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
157 #define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
158 #define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
159 
160 enum
161 {
162   PROP_0,
163   PROP_CONNECTION_SPEED,
164   PROP_BITRATE_LIMIT,
165 #ifdef OHOS_EXT_FUNC
166 // ohos.ext.func.0013
167   PROP_STATE_CHANGE,
168   PROP_TIMEOUT,
169   PROP_EXIT_BLOCK,
170 #endif
171   PROP_LAST
172 };
173 
174 #ifdef OHOS_EXT_FUNC
175 // ohos.ext.func.0028
176 enum {
177   SIGNAL_BITRATE_PARSE_COMPLETE,
178   LAST_SIGNALS
179 };
180 
181 static guint g_gst_adaptive_demux_signals[LAST_SIGNALS] = {0};
182 #endif
183 
184 /* Internal, so not using GST_FLOW_CUSTOM_SUCCESS_N */
185 #define GST_ADAPTIVE_DEMUX_FLOW_SWITCH (GST_FLOW_CUSTOM_SUCCESS_2 + 1)
186 
187 struct _GstAdaptiveDemuxPrivate
188 {
189   GstAdapter *input_adapter;    /* protected by manifest_lock */
190   gint have_manifest;           /* MT safe */
191 
192   GList *old_streams;           /* protected by manifest_lock */
193 
194   GstTask *updates_task;        /* MT safe */
195   GRecMutex updates_lock;
196   GMutex updates_timed_lock;
197   GCond updates_timed_cond;     /* protected by updates_timed_lock */
198   gboolean stop_updates_task;   /* protected by updates_timed_lock */
199 
200   /* used only from updates_task, no need to protect it */
201   gint update_failed_count;
202 
203   guint32 segment_seqnum;       /* protected by manifest_lock */
204 
205   /* main lock used to protect adaptive demux and all its streams.
206    * It serializes the adaptive demux public API.
207    */
208   GRecMutex manifest_lock;
209 
210   /* condition to wait for manifest updates on a live stream.
211    * In order to signal the manifest_cond, the caller needs to hold both
212    * manifest_lock and manifest_update_lock (taken in this order)
213    */
214   GCond manifest_cond;
215   GMutex manifest_update_lock;
216 
217   /* Lock and condition for prerolling streams before exposing */
218   GMutex preroll_lock;
219   GCond preroll_cond;
220   gint preroll_pending;
221 
222   GMutex api_lock;
223 
224   /* Protects demux and stream segment information
225    * Needed because seeks can update segment information
226    * without needing to stop tasks when they just want to
227    * update the segment boundaries */
228   GMutex segment_lock;
229 
230   GstClockTime qos_earliest_time;
231 };
232 
233 typedef struct _GstAdaptiveDemuxTimer
234 {
235   gint ref_count;
236   GCond *cond;
237   GMutex *mutex;
238   GstClockID clock_id;
239   gboolean fired;
240 } GstAdaptiveDemuxTimer;
241 
242 static GstBinClass *parent_class = NULL;
243 static gint private_offset = 0;
244 
245 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
246 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
247     GstAdaptiveDemuxClass * klass);
248 static void gst_adaptive_demux_finalize (GObject * object);
249 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
250     element, GstStateChange transition);
251 
252 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
253 
254 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
255     GstEvent * event);
256 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
257     GstObject * parent, GstBuffer * buffer);
258 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
259     GstQuery * query);
260 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
261     GstEvent * event);
262 
263 static gboolean
264 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
265 
266 static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
267 static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
268     stream);
269 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
270 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
271     gboolean first_and_live);
272 static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
273 static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
274 static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
275     GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
276     GstClockTime ts, GstClockTime * final_ts);
277 static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
278     demux, GstAdaptiveDemuxStream * stream);
279 static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
280     demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
281 static GstFlowReturn
282 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
283     GstAdaptiveDemuxStream * stream);
284 static gint64
285 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux * demux,
286     GstAdaptiveDemuxStream * stream);
287 static GstFlowReturn gst_adaptive_demux_update_manifest (GstAdaptiveDemux *
288     demux);
289 static GstFlowReturn
290 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
291 static gboolean gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux);
292 static void gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux);
293 
294 static void gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream);
295 static GstFlowReturn
296 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
297     GstEvent * event);
298 
299 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
300     demux);
301 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
302     demux);
303 
304 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
305     gboolean start_preroll_streams);
306 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
307     gboolean stop_updates);
308 static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
309     demux);
310 static void
311 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
312     stream, GstFlowReturn ret, GError * err);
313 static GstFlowReturn
314 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
315     GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
316 static GstFlowReturn
317 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
318     GstAdaptiveDemuxStream * stream);
319 static GstFlowReturn
320 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
321     GstAdaptiveDemuxStream * stream, GstClockTime duration);
322 static gboolean
323 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
324     GstClockTime end_time);
325 static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
326     GstClockTime time, GstClockID id, gpointer user_data);
327 static gboolean
328 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
329     * demux);
330 
331 #ifdef OHOS_EXT_FUNC
332 // ohos.ext.func.0013
333 static void set_property_to_element (GstObject *elem, guint property_id, const void *property_value);
334 static void set_property_to_src_element (const GList *stream_list, guint property_id, const void *value);
335 static void set_property_to_src_and_download (GstAdaptiveDemux *demux, guint property_id, const void *property_value);
336 #endif
337 
338 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
339  * method to get to the padtemplates */
340 GType
gst_adaptive_demux_get_type(void)341 gst_adaptive_demux_get_type (void)
342 {
343   static gsize type = 0;
344 
345   if (g_once_init_enter (&type)) {
346     GType _type;
347     static const GTypeInfo info = {
348       sizeof (GstAdaptiveDemuxClass),
349       NULL,
350       NULL,
351       (GClassInitFunc) gst_adaptive_demux_class_init,
352       NULL,
353       NULL,
354       sizeof (GstAdaptiveDemux),
355       0,
356       (GInstanceInitFunc) gst_adaptive_demux_init,
357     };
358 
359     _type = g_type_register_static (GST_TYPE_BIN,
360         "GstAdaptiveDemux", &info, G_TYPE_FLAG_ABSTRACT);
361 
362     private_offset =
363         g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
364 
365     g_once_init_leave (&type, _type);
366   }
367   return type;
368 }
369 
370 static inline GstAdaptiveDemuxPrivate *
gst_adaptive_demux_get_instance_private(GstAdaptiveDemux * self)371 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
372 {
373   return (G_STRUCT_MEMBER_P (self, private_offset));
374 }
375 
376 #ifdef OHOS_OPT_COMPAT
377 // ohos.ext.func.0029
378 static void
gst_adaptive_demux_set_exit_block(GstAdaptiveDemux * demux,const GValue * value)379 gst_adaptive_demux_set_exit_block (GstAdaptiveDemux *demux, const GValue *value)
380 {
381   gint exit_block = g_value_get_int (value);
382   GST_INFO ("adaptive exit_block :%d", exit_block);
383   if (demux->priv == NULL) {
384     GST_WARNING_OBJECT (demux, "adaptive exit_block :%d, demux->priv is null", exit_block);
385     return;
386   }
387   if (exit_block != 1) {
388     return;
389   }
390   GList *iter = NULL;
391   /* stop update task */
392   if (demux->priv->updates_task != NULL) {
393     gst_task_stop (demux->priv->updates_task);
394     demux->priv->updates_task = NULL;
395   }
396   g_mutex_lock (&demux->priv->updates_timed_lock);
397   demux->priv->stop_updates_task = TRUE;
398   g_cond_signal (&demux->priv->updates_timed_cond);
399   g_mutex_unlock (&demux->priv->updates_timed_lock);
400   if (demux->downloader != NULL) {
401     gst_uri_downloader_cancel (demux->downloader);
402   }
403   /* stop download tasks */
404   for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
405     GstAdaptiveDemuxStream *stream = iter->data;
406     if (stream == NULL) {
407       GST_WARNING_OBJECT (demux, "stream is null");
408       continue;
409     }
410 
411     if (stream->download_task != NULL) {
412       gst_task_stop (stream->download_task);
413     }
414     g_mutex_lock (&stream->fragment_download_lock);
415     stream->cancelled = TRUE;
416     g_cond_signal (&stream->fragment_download_cond);
417     g_mutex_unlock (&stream->fragment_download_lock);
418   }
419   /* set exit_block to src element */
420   set_property_to_src_and_download (demux, PROP_EXIT_BLOCK, (void *) &exit_block);
421 }
422 #endif
423 
424 static void
gst_adaptive_demux_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)425 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
426     const GValue * value, GParamSpec * pspec)
427 {
428   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
429 
430 #ifdef OHOS_OPT_COMPAT
431   // ohos.opt.compat.0029
432   if (prop_id != PROP_EXIT_BLOCK) {
433 #endif
434     GST_API_LOCK (demux);
435     GST_MANIFEST_LOCK (demux);
436 #ifdef OHOS_OPT_COMPAT
437   // ohos.opt.compat.0029
438   }
439 #endif
440 
441   switch (prop_id) {
442     case PROP_CONNECTION_SPEED:
443 #ifdef OHOS_EXT_FUNC
444       // ohos.ext.func.0028
445       demux->connection_speed = g_value_get_uint (value);
446 #else
447       demux->connection_speed = g_value_get_uint (value) * 1000;
448 #endif
449       GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
450           demux->connection_speed);
451       break;
452     case PROP_BITRATE_LIMIT:
453       demux->bitrate_limit = g_value_get_float (value);
454       break;
455 #ifdef OHOS_EXT_FUNC
456     // ohos.ext.func.0013
457     case PROP_TIMEOUT: {
458       guint timeout = g_value_get_uint (value);
459       set_property_to_src_and_download(demux, prop_id, (void *)&timeout);
460       break;
461     }
462     case PROP_STATE_CHANGE: {
463       gint state = g_value_get_int (value);
464       set_property_to_src_and_download(demux, prop_id, (void *)&state);
465       break;
466     }
467     case PROP_EXIT_BLOCK: {
468       // ohos.ext.func.0029
469       gst_adaptive_demux_set_exit_block(demux, value);
470       break;
471     }
472 #endif
473     default:
474       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
475       break;
476   }
477 
478 #ifdef OHOS_OPT_COMPAT
479   // ohos.opt.compat.0029
480   if (prop_id != PROP_EXIT_BLOCK) {
481 #endif
482     GST_MANIFEST_UNLOCK (demux);
483     GST_API_UNLOCK (demux);
484 #ifdef OHOS_OPT_COMPAT
485   // ohos.opt.compat.0029
486   }
487 #endif
488 }
489 
490 static void
gst_adaptive_demux_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)491 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
492     GValue * value, GParamSpec * pspec)
493 {
494   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
495 
496   GST_MANIFEST_LOCK (demux);
497 
498   switch (prop_id) {
499     case PROP_CONNECTION_SPEED:
500       g_value_set_uint (value, demux->connection_speed / 1000);
501       break;
502     case PROP_BITRATE_LIMIT:
503       g_value_set_float (value, demux->bitrate_limit);
504       break;
505     default:
506       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
507       break;
508   }
509 
510   GST_MANIFEST_UNLOCK (demux);
511 }
512 
513 static void
gst_adaptive_demux_class_init(GstAdaptiveDemuxClass * klass)514 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
515 {
516   GObjectClass *gobject_class;
517   GstElementClass *gstelement_class;
518   GstBinClass *gstbin_class;
519 
520   gobject_class = G_OBJECT_CLASS (klass);
521   gstelement_class = GST_ELEMENT_CLASS (klass);
522   gstbin_class = GST_BIN_CLASS (klass);
523 
524   GST_DEBUG_CATEGORY_INIT (adaptivedemux_debug, "adaptivedemux", 0,
525       "Base Adaptive Demux");
526 
527   parent_class = g_type_class_peek_parent (klass);
528 
529   if (private_offset != 0)
530     g_type_class_adjust_private_offset (klass, &private_offset);
531 
532   gobject_class->set_property = gst_adaptive_demux_set_property;
533   gobject_class->get_property = gst_adaptive_demux_get_property;
534   gobject_class->finalize = gst_adaptive_demux_finalize;
535 
536 #ifdef OHOS_EXT_FUNC
537   // ohos.ext.func.0028
538   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
539       g_param_spec_uint ("connection-speed", "Connection Speed",
540           "Network connection speed in kbps (0 = calculate from downloaded"
541           " fragments)", 0, G_MAXUINT, DEFAULT_CONNECTION_SPEED,
542           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
543 #else
544   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
545       g_param_spec_uint ("connection-speed", "Connection Speed",
546           "Network connection speed in kbps (0 = calculate from downloaded"
547           " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
548           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
549 #endif
550 
551   /* FIXME 2.0: rename this property to bandwidth-usage or any better name */
552   g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
553       g_param_spec_float ("bitrate-limit",
554           "Bitrate limit in %",
555           "Limit of the available bitrate to use when switching to alternates.",
556           0, 1, DEFAULT_BITRATE_LIMIT,
557           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
558 
559 #ifdef OHOS_EXT_FUNC
560   // ohos.ext.func.0013
561   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
562       g_param_spec_uint ("timeout", "timeout",
563           "Value in seconds to timeout a blocking I/O (0 = No timeout).", 0,
564           3600, DEFAULT_TIMEOUT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
565 
566   g_object_class_install_property (gobject_class, PROP_STATE_CHANGE,
567       g_param_spec_int ("state-change", "state-change from adaptive-demux",
568           "state-change from adaptive-demux", 0, (gint) (G_MAXINT32), 0,
569           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
570 
571   g_object_class_install_property (gobject_class, PROP_EXIT_BLOCK,
572       g_param_spec_int ("exit-block", "EXIT BLOCK",
573           "souphttpsrc exit block", 0, (gint) (G_MAXINT32), 0,
574           G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
575 #endif
576 
577 #ifdef OHOS_EXT_FUNC
578   // ohos.ext.func.0028
579   g_gst_adaptive_demux_signals[SIGNAL_BITRATE_PARSE_COMPLETE] =
580         g_signal_new("bitrate-parse-complete",
581             G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST,
582             0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_POINTER, G_TYPE_UINT);
583 #endif
584 
585   gstelement_class->change_state = gst_adaptive_demux_change_state;
586 
587   gstbin_class->handle_message = gst_adaptive_demux_handle_message;
588 
589   klass->data_received = gst_adaptive_demux_stream_data_received_default;
590   klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
591   klass->update_manifest = gst_adaptive_demux_update_manifest_default;
592   klass->requires_periodical_playlist_update =
593       gst_adaptive_demux_requires_periodical_playlist_update_default;
594 
595 }
596 
597 static void
gst_adaptive_demux_init(GstAdaptiveDemux * demux,GstAdaptiveDemuxClass * klass)598 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
599     GstAdaptiveDemuxClass * klass)
600 {
601   GstPadTemplate *pad_template;
602   GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
603   GObjectClass *gobject_class;
604 
605   GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
606 
607   demux->priv = gst_adaptive_demux_get_instance_private (demux);
608   demux->priv->input_adapter = gst_adapter_new ();
609   demux->downloader = gst_uri_downloader_new ();
610   gst_uri_downloader_set_parent (demux->downloader, GST_ELEMENT_CAST (demux));
611   demux->stream_struct_size = sizeof (GstAdaptiveDemuxStream);
612   demux->priv->segment_seqnum = gst_util_seqnum_next ();
613   demux->have_group_id = FALSE;
614   demux->group_id = G_MAXUINT;
615 
616   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
617 
618   gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
619       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
620 
621   demux->realtime_clock = gst_system_clock_obtain ();
622   g_assert (demux->realtime_clock != NULL);
623   gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
624   if (g_object_class_find_property (gobject_class, "clock-type")) {
625     g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
626   } else {
627     GST_WARNING_OBJECT (demux,
628         "System clock does not have clock-type property");
629   }
630   if (clock_type == GST_CLOCK_TYPE_REALTIME) {
631     demux->clock_offset = 0;
632   } else {
633     GDateTime *utc_now;
634     GstClockTime rtc_now;
635 
636     utc_now = g_date_time_new_now_utc ();
637     rtc_now = gst_clock_get_time (demux->realtime_clock);
638     demux->clock_offset =
639         g_date_time_to_unix (utc_now) * G_TIME_SPAN_SECOND +
640         g_date_time_get_microsecond (utc_now) - GST_TIME_AS_USECONDS (rtc_now);
641     g_date_time_unref (utc_now);
642   }
643   g_rec_mutex_init (&demux->priv->updates_lock);
644   demux->priv->updates_task =
645       gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
646       demux, NULL);
647   gst_task_set_lock (demux->priv->updates_task, &demux->priv->updates_lock);
648 
649   g_mutex_init (&demux->priv->updates_timed_lock);
650   g_cond_init (&demux->priv->updates_timed_cond);
651 
652   g_cond_init (&demux->priv->manifest_cond);
653   g_mutex_init (&demux->priv->manifest_update_lock);
654 
655   g_rec_mutex_init (&demux->priv->manifest_lock);
656   g_mutex_init (&demux->priv->api_lock);
657   g_mutex_init (&demux->priv->segment_lock);
658 
659   g_cond_init (&demux->priv->preroll_cond);
660   g_mutex_init (&demux->priv->preroll_lock);
661 
662   pad_template =
663       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
664   g_return_if_fail (pad_template != NULL);
665 
666   demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
667   gst_pad_set_event_function (demux->sinkpad,
668       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
669   gst_pad_set_chain_function (demux->sinkpad,
670       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
671 
672   /* Properties */
673   demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
674   demux->connection_speed = DEFAULT_CONNECTION_SPEED;
675 
676   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
677 }
678 
679 static void
gst_adaptive_demux_finalize(GObject * object)680 gst_adaptive_demux_finalize (GObject * object)
681 {
682   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
683   GstAdaptiveDemuxPrivate *priv = demux->priv;
684 
685   GST_DEBUG_OBJECT (object, "finalize");
686 
687   g_object_unref (priv->input_adapter);
688   g_object_unref (demux->downloader);
689 
690   g_mutex_clear (&priv->updates_timed_lock);
691   g_cond_clear (&priv->updates_timed_cond);
692   g_mutex_clear (&demux->priv->manifest_update_lock);
693   g_cond_clear (&demux->priv->manifest_cond);
694   g_object_unref (priv->updates_task);
695   g_rec_mutex_clear (&priv->updates_lock);
696   g_rec_mutex_clear (&demux->priv->manifest_lock);
697   g_mutex_clear (&demux->priv->api_lock);
698   g_mutex_clear (&demux->priv->segment_lock);
699   if (demux->realtime_clock) {
700     gst_object_unref (demux->realtime_clock);
701     demux->realtime_clock = NULL;
702   }
703 
704   g_cond_clear (&demux->priv->preroll_cond);
705   g_mutex_clear (&demux->priv->preroll_lock);
706 
707   G_OBJECT_CLASS (parent_class)->finalize (object);
708 }
709 
710 #ifdef OHOS_EXT_FUNC
711 // ohos.ext.func.0013
712 static void
set_property_to_element(GstObject * elem,guint property_id,const void * property_value)713 set_property_to_element (GstObject *elem, guint property_id, const void *property_value)
714 {
715   if ((elem == NULL) || (property_value == NULL)) {
716     return;
717   }
718 
719   if (property_id == PROP_STATE_CHANGE) {
720     const gint *state_change = (const gint *) property_value;
721     g_object_set (elem, "state-change", *state_change, NULL);
722   } else if (property_id == PROP_TIMEOUT) {
723     const guint *timeout = (const guint *) property_value;
724     g_object_set (elem, "timeout", *timeout, NULL);
725   } else if (property_id == PROP_EXIT_BLOCK) {
726     const gint *exit_block = (const gint *) property_value;
727     g_object_set (elem, "exit-block", *exit_block, NULL);
728    }
729 }
730 
731 static void
set_property_to_src_element(const GList * stream_list,guint property_id,const void * property_value)732 set_property_to_src_element (const GList *stream_list, guint property_id, const void *property_value)
733 {
734   GstAdaptiveDemuxStream *stream = NULL;
735   GstIterator *iter = NULL;
736 
737   if (property_value == NULL) {
738     GST_WARNING ("value is NULL, set_property_to_src_element failed!");
739     return;
740   }
741 
742   for (; stream_list != NULL; stream_list = g_list_next (stream_list)) {
743     GValue data = { 0, };
744 
745     stream = stream_list->data;
746     if ((stream == NULL) || (stream->src == NULL)) {
747       continue;
748     }
749 
750     iter = gst_bin_iterate_sources ((GstBin *) stream->src);
751     if (iter == NULL) {
752       continue;
753     }
754     if (gst_iterator_next (iter, &data) == GST_ITERATOR_OK) {
755       GstElement *uri_src = g_value_get_object (&data);
756       if (uri_src != NULL) {
757         set_property_to_element ((GstObject *) uri_src, property_id, property_value);
758       }
759     }
760     if (G_IS_VALUE (&data)) {
761       g_value_unset (&data);
762     }
763     gst_iterator_free (iter);
764     iter = NULL;
765   }
766 }
767 
768 static void
set_property_to_src_and_download(GstAdaptiveDemux * demux,guint property_id,const void * property_value)769 set_property_to_src_and_download (GstAdaptiveDemux *demux, guint property_id, const void *property_value)
770 {
771   if ((demux == NULL) || (property_value == NULL)) {
772     GST_WARNING ("input parameter is error");
773     return;
774   }
775 
776   set_property_to_src_element (demux->streams, property_id, property_value);
777   //set_property_to_element ((GstObject *) demux->downloader, property_id, property_value);
778 }
779 #endif
780 
781 static GstStateChangeReturn
gst_adaptive_demux_change_state(GstElement * element,GstStateChange transition)782 gst_adaptive_demux_change_state (GstElement * element,
783     GstStateChange transition)
784 {
785   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
786   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
787 
788   switch (transition) {
789     case GST_STATE_CHANGE_PAUSED_TO_READY:
790       if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
791         GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
792       gst_uri_downloader_cancel (demux->downloader);
793 
794       GST_API_LOCK (demux);
795       GST_MANIFEST_LOCK (demux);
796       gst_adaptive_demux_reset (demux);
797       GST_MANIFEST_UNLOCK (demux);
798       GST_API_UNLOCK (demux);
799       break;
800     case GST_STATE_CHANGE_READY_TO_PAUSED:
801       GST_API_LOCK (demux);
802       GST_MANIFEST_LOCK (demux);
803       gst_adaptive_demux_reset (demux);
804       /* Clear "cancelled" flag in uridownloader since subclass might want to
805        * use uridownloader to fetch another manifest */
806       gst_uri_downloader_reset (demux->downloader);
807       if (g_atomic_int_get (&demux->priv->have_manifest))
808         gst_adaptive_demux_start_manifest_update_task (demux);
809       GST_MANIFEST_UNLOCK (demux);
810       GST_API_UNLOCK (demux);
811       if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
812         GST_DEBUG_OBJECT (demux, "demuxer has started running");
813       break;
814     default:
815       break;
816   }
817 
818   /* this must be run without MANIFEST_LOCK taken.
819    * For PLAYING to PLAYING state changes, it will want to take a lock in
820    * src element and that lock is held while the streaming thread is running.
821    * The streaming thread will take the MANIFEST_LOCK, leading to a deadlock.
822    */
823   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
824 
825   return result;
826 }
827 
828 #ifdef OHOS_EXT_FUNC
829 // ohos.ext.func.0028
830 static void
gst_adaptive_demux_update_bitrate(GstAdaptiveDemux * demux)831 gst_adaptive_demux_update_bitrate (GstAdaptiveDemux *demux)
832 {
833   GstAdaptiveDemuxBitrateInfo stream_bitrate_info;
834   stream_bitrate_info.bitrate_list = NULL;
835   stream_bitrate_info.bitrate_num = 0;
836   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
837   if ((demux_class->get_bitrate_info) == NULL) {
838     return;
839   }
840 
841   gboolean ret = demux_class->get_bitrate_info(demux, &stream_bitrate_info);
842   if (!ret) {
843     return;
844   }
845 
846   GST_INFO_OBJECT (demux, "Send to user, bitrate num = %u", stream_bitrate_info.bitrate_num);
847   g_signal_emit (GST_ELEMENT(demux), g_gst_adaptive_demux_signals[SIGNAL_BITRATE_PARSE_COMPLETE],
848     0, stream_bitrate_info.bitrate_list, stream_bitrate_info.bitrate_num);
849 
850   if (stream_bitrate_info.bitrate_list != NULL) {
851     g_free (stream_bitrate_info.bitrate_list);
852     stream_bitrate_info.bitrate_list = NULL;
853     stream_bitrate_info.bitrate_num = 0;
854   }
855 }
856 #endif
857 
858 static gboolean
gst_adaptive_demux_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)859 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
860     GstEvent * event)
861 {
862   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
863   gboolean ret;
864 
865   switch (event->type) {
866     case GST_EVENT_FLUSH_STOP:{
867       GST_API_LOCK (demux);
868       GST_MANIFEST_LOCK (demux);
869 
870       gst_adaptive_demux_reset (demux);
871 
872       ret = gst_pad_event_default (pad, parent, event);
873 
874       GST_MANIFEST_UNLOCK (demux);
875       GST_API_UNLOCK (demux);
876 
877       return ret;
878     }
879     case GST_EVENT_EOS:{
880       GstAdaptiveDemuxClass *demux_class;
881       GstQuery *query;
882       gboolean query_res;
883       gboolean ret = TRUE;
884       gsize available;
885       GstBuffer *manifest_buffer;
886 
887       GST_API_LOCK (demux);
888       GST_MANIFEST_LOCK (demux);
889 
890       demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
891 
892       available = gst_adapter_available (demux->priv->input_adapter);
893 
894       if (available == 0) {
895         GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
896         ret = gst_pad_event_default (pad, parent, event);
897 
898         GST_MANIFEST_UNLOCK (demux);
899         GST_API_UNLOCK (demux);
900 
901         return ret;
902       }
903 
904       GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
905 
906       /* Need to get the URI to use it as a base to generate the fragment's
907        * uris */
908       query = gst_query_new_uri ();
909       query_res = gst_pad_peer_query (pad, query);
910       if (query_res) {
911         gchar *uri, *redirect_uri;
912         gboolean permanent;
913 
914         gst_query_parse_uri (query, &uri);
915         gst_query_parse_uri_redirection (query, &redirect_uri);
916         gst_query_parse_uri_redirection_permanent (query, &permanent);
917 
918         if (permanent && redirect_uri) {
919           demux->manifest_uri = redirect_uri;
920           demux->manifest_base_uri = NULL;
921           g_free (uri);
922         } else {
923           demux->manifest_uri = uri;
924           demux->manifest_base_uri = redirect_uri;
925         }
926 
927         GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
928             demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
929       } else {
930         GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
931       }
932       gst_query_unref (query);
933 
934       /* Let the subclass parse the manifest */
935       manifest_buffer =
936           gst_adapter_take_buffer (demux->priv->input_adapter, available);
937       if (!demux_class->process_manifest (demux, manifest_buffer)) {
938         /* In most cases, this will happen if we set a wrong url in the
939          * source element and we have received the 404 HTML response instead of
940          * the manifest */
941         GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
942             (NULL));
943         ret = FALSE;
944       } else {
945         g_atomic_int_set (&demux->priv->have_manifest, TRUE);
946       }
947 
948 #ifdef OHOS_EXT_FUNC
949       // ohos.ext.func.0028
950       if (ret) {
951         gst_adaptive_demux_update_bitrate(demux);
952       }
953 #endif
954       gst_buffer_unref (manifest_buffer);
955 
956       gst_element_post_message (GST_ELEMENT_CAST (demux),
957           gst_message_new_element (GST_OBJECT_CAST (demux),
958               gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
959                   "manifest-uri", G_TYPE_STRING,
960                   demux->manifest_uri, "uri", G_TYPE_STRING,
961                   demux->manifest_uri,
962                   "manifest-download-start", GST_TYPE_CLOCK_TIME,
963                   GST_CLOCK_TIME_NONE,
964                   "manifest-download-stop", GST_TYPE_CLOCK_TIME,
965                   gst_util_get_timestamp (), NULL)));
966 
967       if (ret) {
968         /* Send duration message */
969         if (!gst_adaptive_demux_is_live (demux)) {
970           GstClockTime duration = demux_class->get_duration (demux);
971 
972           if (duration != GST_CLOCK_TIME_NONE) {
973             GST_DEBUG_OBJECT (demux,
974                 "Sending duration message : %" GST_TIME_FORMAT,
975                 GST_TIME_ARGS (duration));
976             gst_element_post_message (GST_ELEMENT (demux),
977                 gst_message_new_duration_changed (GST_OBJECT (demux)));
978           } else {
979             GST_DEBUG_OBJECT (demux,
980                 "media duration unknown, can not send the duration message");
981           }
982         }
983 
984         if (demux->next_streams) {
985           gst_adaptive_demux_prepare_streams (demux,
986               gst_adaptive_demux_is_live (demux));
987           gst_adaptive_demux_start_tasks (demux, TRUE);
988           gst_adaptive_demux_start_manifest_update_task (demux);
989         } else {
990           /* no streams */
991           GST_WARNING_OBJECT (demux, "No streams created from manifest");
992           GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
993               (_("This file contains no playable streams.")),
994               ("No known stream formats found at the Manifest"));
995           ret = FALSE;
996         }
997 
998       }
999       GST_MANIFEST_UNLOCK (demux);
1000       GST_API_UNLOCK (demux);
1001 
1002       gst_event_unref (event);
1003       return ret;
1004     }
1005     case GST_EVENT_SEGMENT:
1006       /* Swallow newsegments, we'll push our own */
1007       gst_event_unref (event);
1008       return TRUE;
1009     default:
1010       break;
1011   }
1012 
1013   return gst_pad_event_default (pad, parent, event);
1014 }
1015 
1016 static GstFlowReturn
gst_adaptive_demux_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)1017 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1018     GstBuffer * buffer)
1019 {
1020   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1021 
1022   GST_MANIFEST_LOCK (demux);
1023 
1024   gst_adapter_push (demux->priv->input_adapter, buffer);
1025 
1026   GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1027       (gint) gst_adapter_available (demux->priv->input_adapter));
1028 
1029   GST_MANIFEST_UNLOCK (demux);
1030   return GST_FLOW_OK;
1031 }
1032 
1033 /* must be called with manifest_lock taken */
1034 static void
gst_adaptive_demux_reset(GstAdaptiveDemux * demux)1035 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1036 {
1037   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1038   GList *iter;
1039   GList *old_streams;
1040   GstEvent *eos;
1041 
1042   /* take ownership of old_streams before releasing the manifest_lock in
1043    * gst_adaptive_demux_stop_tasks
1044    */
1045   old_streams = demux->priv->old_streams;
1046   demux->priv->old_streams = NULL;
1047 
1048   gst_adaptive_demux_stop_tasks (demux, TRUE);
1049 
1050   if (klass->reset)
1051     klass->reset (demux);
1052 
1053   eos = gst_event_new_eos ();
1054   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1055     GstAdaptiveDemuxStream *stream = iter->data;
1056     if (stream->pad) {
1057       gst_pad_push_event (stream->pad, gst_event_ref (eos));
1058       gst_pad_set_active (stream->pad, FALSE);
1059 
1060       gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
1061     }
1062     gst_adaptive_demux_stream_free (stream);
1063   }
1064   gst_event_unref (eos);
1065   g_list_free (demux->streams);
1066   demux->streams = NULL;
1067   if (demux->prepared_streams) {
1068     g_list_free_full (demux->prepared_streams,
1069         (GDestroyNotify) gst_adaptive_demux_stream_free);
1070     demux->prepared_streams = NULL;
1071   }
1072   if (demux->next_streams) {
1073     g_list_free_full (demux->next_streams,
1074         (GDestroyNotify) gst_adaptive_demux_stream_free);
1075     demux->next_streams = NULL;
1076   }
1077 
1078   if (old_streams) {
1079     g_list_free_full (old_streams,
1080         (GDestroyNotify) gst_adaptive_demux_stream_free);
1081   }
1082 
1083   if (demux->priv->old_streams) {
1084     g_list_free_full (demux->priv->old_streams,
1085         (GDestroyNotify) gst_adaptive_demux_stream_free);
1086     demux->priv->old_streams = NULL;
1087   }
1088 
1089   g_free (demux->manifest_uri);
1090   g_free (demux->manifest_base_uri);
1091   demux->manifest_uri = NULL;
1092   demux->manifest_base_uri = NULL;
1093 
1094   gst_adapter_clear (demux->priv->input_adapter);
1095   g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1096 
1097   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1098 
1099   demux->have_group_id = FALSE;
1100   demux->group_id = G_MAXUINT;
1101   demux->priv->segment_seqnum = gst_util_seqnum_next ();
1102 }
1103 
1104 static void
gst_adaptive_demux_handle_message(GstBin * bin,GstMessage * msg)1105 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1106 {
1107   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1108 
1109   switch (GST_MESSAGE_TYPE (msg)) {
1110     case GST_MESSAGE_ERROR:{
1111       GList *iter;
1112       GstAdaptiveDemuxStream *stream = NULL;
1113       GError *err = NULL;
1114       gchar *debug = NULL;
1115       gchar *new_error = NULL;
1116       const GstStructure *details = NULL;
1117 
1118       GST_MANIFEST_LOCK (demux);
1119 
1120       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1121         GstAdaptiveDemuxStream *cur = iter->data;
1122         if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
1123                 GST_OBJECT_CAST (cur->src))) {
1124           stream = cur;
1125           break;
1126         }
1127       }
1128       if (stream == NULL) {
1129         for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1130           GstAdaptiveDemuxStream *cur = iter->data;
1131           if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
1132                   GST_OBJECT_CAST (cur->src))) {
1133             stream = cur;
1134             break;
1135           }
1136         }
1137         if (stream == NULL) {
1138           GST_WARNING_OBJECT (demux,
1139               "Failed to locate stream for errored element");
1140           break;
1141         }
1142       }
1143 
1144       gst_message_parse_error (msg, &err, &debug);
1145 
1146       GST_WARNING_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
1147           "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1148           err->message, debug);
1149 
1150       if (debug)
1151         new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1152       if (new_error) {
1153         g_free (err->message);
1154         err->message = new_error;
1155       }
1156 
1157       gst_message_parse_error_details (msg, &details);
1158       if (details) {
1159         gst_structure_get_uint (details, "http-status-code",
1160             &stream->last_status_code);
1161       }
1162 
1163       /* error, but ask to retry */
1164       gst_adaptive_demux_stream_fragment_download_finish (stream,
1165           GST_FLOW_CUSTOM_ERROR, err);
1166 
1167       g_error_free (err);
1168       g_free (debug);
1169 
1170       GST_MANIFEST_UNLOCK (demux);
1171 
1172       gst_message_unref (msg);
1173       msg = NULL;
1174     }
1175       break;
1176     default:
1177       break;
1178   }
1179 
1180   if (msg)
1181     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1182 }
1183 
1184 void
gst_adaptive_demux_set_stream_struct_size(GstAdaptiveDemux * demux,gsize struct_size)1185 gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
1186     gsize struct_size)
1187 {
1188   GST_API_LOCK (demux);
1189   GST_MANIFEST_LOCK (demux);
1190   demux->stream_struct_size = struct_size;
1191   GST_MANIFEST_UNLOCK (demux);
1192   GST_API_UNLOCK (demux);
1193 }
1194 
1195 /* must be called with manifest_lock taken */
1196 static gboolean
gst_adaptive_demux_prepare_stream(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1197 gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux,
1198     GstAdaptiveDemuxStream * stream)
1199 {
1200   GstPad *pad = stream->pad;
1201   gchar *name = gst_pad_get_name (pad);
1202   GstEvent *event;
1203   gchar *stream_id;
1204 
1205   gst_pad_set_active (pad, TRUE);
1206   stream->need_header = TRUE;
1207 
1208   stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT_CAST (demux), name);
1209 
1210   event =
1211       gst_pad_get_sticky_event (GST_ADAPTIVE_DEMUX_SINK_PAD (demux),
1212       GST_EVENT_STREAM_START, 0);
1213   if (event) {
1214     if (gst_event_parse_group_id (event, &demux->group_id))
1215       demux->have_group_id = TRUE;
1216     else
1217       demux->have_group_id = FALSE;
1218     gst_event_unref (event);
1219   } else if (!demux->have_group_id) {
1220     demux->have_group_id = TRUE;
1221     demux->group_id = gst_util_group_id_next ();
1222   }
1223   event = gst_event_new_stream_start (stream_id);
1224   if (demux->have_group_id)
1225     gst_event_set_group_id (event, demux->group_id);
1226 
1227   gst_pad_push_event (pad, event);
1228   g_free (stream_id);
1229   g_free (name);
1230 
1231   GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
1232 
1233   stream->discont = TRUE;
1234 
1235   return TRUE;
1236 }
1237 
1238 static gboolean
gst_adaptive_demux_expose_stream(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1239 gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
1240     GstAdaptiveDemuxStream * stream)
1241 {
1242   gboolean ret;
1243   GstPad *pad = stream->pad;
1244   GstCaps *caps;
1245 
1246   if (stream->pending_caps) {
1247     gst_pad_set_caps (pad, stream->pending_caps);
1248     caps = stream->pending_caps;
1249     stream->pending_caps = NULL;
1250   } else {
1251     caps = gst_pad_get_current_caps (pad);
1252   }
1253 
1254   GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT,
1255       GST_DEBUG_PAD_NAME (pad), caps);
1256   if (caps)
1257     gst_caps_unref (caps);
1258 
1259   gst_object_ref (pad);
1260 
1261   /* Don't hold the manifest lock while exposing a pad */
1262   GST_MANIFEST_UNLOCK (demux);
1263   ret = gst_element_add_pad (GST_ELEMENT_CAST (demux), pad);
1264   GST_MANIFEST_LOCK (demux);
1265 
1266   return ret;
1267 }
1268 
1269 /* must be called with manifest_lock taken */
1270 static GstClockTime
gst_adaptive_demux_stream_get_presentation_offset(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1271 gst_adaptive_demux_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1272     GstAdaptiveDemuxStream * stream)
1273 {
1274   GstAdaptiveDemuxClass *klass;
1275 
1276   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1277 
1278   if (klass->get_presentation_offset == NULL)
1279     return 0;
1280 
1281   return klass->get_presentation_offset (demux, stream);
1282 }
1283 
1284 /* must be called with manifest_lock taken */
1285 static GstClockTime
gst_adaptive_demux_get_period_start_time(GstAdaptiveDemux * demux)1286 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1287 {
1288   GstAdaptiveDemuxClass *klass;
1289 
1290   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1291 
1292   if (klass->get_period_start_time == NULL)
1293     return 0;
1294 
1295   return klass->get_period_start_time (demux);
1296 }
1297 
1298 /* must be called with manifest_lock taken */
1299 static gboolean
gst_adaptive_demux_prepare_streams(GstAdaptiveDemux * demux,gboolean first_and_live)1300 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1301     gboolean first_and_live)
1302 {
1303   GList *iter;
1304   GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
1305 
1306   g_return_val_if_fail (demux->next_streams != NULL, FALSE);
1307   if (demux->prepared_streams != NULL) {
1308     /* Old streams that were never exposed, due to a seek or so */
1309     GST_FIXME_OBJECT (demux,
1310         "Preparing new streams without cleaning up old ones!");
1311     return FALSE;
1312   }
1313 
1314   demux->prepared_streams = demux->next_streams;
1315   demux->next_streams = NULL;
1316 
1317   if (!gst_adaptive_demux_is_running (demux)) {
1318     GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1319     return TRUE;
1320   }
1321 
1322   for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1323     GstAdaptiveDemuxStream *stream = iter->data;
1324 
1325     stream->do_block = TRUE;
1326 
1327     if (!gst_adaptive_demux_prepare_stream (demux,
1328             GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1329       /* TODO act on error */
1330       GST_FIXME_OBJECT (stream->pad,
1331           "Do something on failure to expose stream");
1332     }
1333 
1334     if (first_and_live) {
1335       /* TODO we only need the first timestamp, maybe create a simple function to
1336        * get the current PTS of a fragment ? */
1337       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1338       gst_adaptive_demux_stream_update_fragment_info (demux, stream);
1339 
1340       if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
1341         min_pts = MIN (min_pts, stream->fragment.timestamp);
1342       } else {
1343         min_pts = stream->fragment.timestamp;
1344       }
1345     }
1346   }
1347 
1348   period_start = gst_adaptive_demux_get_period_start_time (demux);
1349 
1350   /* For live streams, the subclass is supposed to seek to the current
1351    * fragment and then tell us its timestamp in stream->fragment.timestamp.
1352    * We now also have to seek our demuxer segment to reflect this.
1353    *
1354    * FIXME: This needs some refactoring at some point.
1355    */
1356   if (first_and_live) {
1357     gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1358         GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_pts + period_start,
1359         GST_SEEK_TYPE_NONE, -1, NULL);
1360   }
1361 
1362   for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1363     GstAdaptiveDemuxStream *stream = iter->data;
1364     GstClockTime offset;
1365 
1366     offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1367     stream->segment = demux->segment;
1368 
1369     /* The demuxer segment is just built from seek events, but for each stream
1370      * we have to adjust segments according to the current period and the
1371      * stream specific presentation time offset.
1372      *
1373      * For each period, buffer timestamps start again from 0. Additionally the
1374      * buffer timestamps are shifted by the stream specific presentation time
1375      * offset, so the first buffer timestamp of a period is 0 + presentation
1376      * time offset. If the stream contains timestamps itself, this is also
1377      * supposed to be the presentation time stored inside the stream.
1378      *
1379      * The stream time over periods is supposed to be continuous, that is the
1380      * buffer timestamp 0 + presentation time offset should map to the start
1381      * time of the current period.
1382      *
1383      *
1384      * The adjustment of the stream segments as such works the following.
1385      *
1386      * If the demuxer segment start is bigger than the period start, this
1387      * means that we have to drop some media at the beginning of the current
1388      * period, e.g. because a seek into the middle of the period has
1389      * happened. The amount of media to drop is the difference between the
1390      * period start and the demuxer segment start, and as each period starts
1391      * again from 0, this difference is going to be the actual stream's
1392      * segment start. As all timestamps of the stream are shifted by the
1393      * presentation time offset, we will also have to move the segment start
1394      * by that offset.
1395      *
1396      * Likewise, the demuxer segment stop value is adjusted in the same
1397      * fashion.
1398      *
1399      * Now the running time and stream time at the stream's segment start has
1400      * to be the one that is stored inside the demuxer's segment, which means
1401      * that segment.base and segment.time have to be copied over (done just
1402      * above)
1403      *
1404      *
1405      * If the demuxer segment start is smaller than the period start time,
1406      * this means that the whole period is inside the segment. As each period
1407      * starts timestamps from 0, and additionally timestamps are shifted by
1408      * the presentation time offset, the stream's first timestamp (and as such
1409      * the stream's segment start) has to be the presentation time offset.
1410      * The stream time at the segment start is supposed to be the stream time
1411      * of the period start according to the demuxer segment, so the stream
1412      * segment's time would be set to that. The same goes for the stream
1413      * segment's base, which is supposed to be the running time of the period
1414      * start according to the demuxer's segment.
1415      *
1416      * The same logic applies for negative rates with the segment stop and
1417      * the period stop time (which gets clamped).
1418      *
1419      *
1420      * For the first case where not the complete period is inside the segment,
1421      * the segment time and base as calculated by the second case would be
1422      * equivalent.
1423      */
1424     GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
1425         &demux->segment);
1426     GST_DEBUG_OBJECT (demux,
1427         "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
1428         GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
1429     /* note for readers:
1430      * Since stream->segment is initially a copy of demux->segment,
1431      * only the values that need updating are modified below. */
1432     if (first_and_live) {
1433       /* If first and live, demuxer did seek to the current position already */
1434       stream->segment.start = demux->segment.start - period_start + offset;
1435       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1436         stream->segment.stop = demux->segment.stop - period_start + offset;
1437       /* FIXME : Do we need to handle negative rates for this ? */
1438       stream->segment.position = stream->segment.start;
1439     } else if (demux->segment.start > period_start) {
1440       /* seek within a period */
1441       stream->segment.start = demux->segment.start - period_start + offset;
1442       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1443         stream->segment.stop = demux->segment.stop - period_start + offset;
1444       if (stream->segment.rate >= 0)
1445         stream->segment.position = offset;
1446       else
1447         stream->segment.position = stream->segment.stop;
1448     } else {
1449       stream->segment.start = offset;
1450       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1451         stream->segment.stop = demux->segment.stop - period_start + offset;
1452       if (stream->segment.rate >= 0)
1453         stream->segment.position = offset;
1454       else
1455         stream->segment.position = stream->segment.stop;
1456       stream->segment.time =
1457           gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
1458           period_start);
1459       stream->segment.base =
1460           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
1461           period_start);
1462     }
1463 
1464     stream->pending_segment = gst_event_new_segment (&stream->segment);
1465     gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
1466 
1467     GST_DEBUG_OBJECT (demux,
1468         "Prepared segment %" GST_SEGMENT_FORMAT " for stream %p",
1469         &stream->segment, stream);
1470   }
1471   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1472 
1473   return TRUE;
1474 }
1475 
1476 static gboolean
gst_adaptive_demux_expose_streams(GstAdaptiveDemux * demux)1477 gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux)
1478 {
1479   GList *iter;
1480   GList *old_streams;
1481 
1482   g_return_val_if_fail (demux->prepared_streams != NULL, FALSE);
1483 
1484   old_streams = demux->streams;
1485   demux->streams = demux->prepared_streams;
1486   demux->prepared_streams = NULL;
1487 
1488   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1489     GstAdaptiveDemuxStream *stream = iter->data;
1490 
1491     if (!gst_adaptive_demux_expose_stream (demux,
1492             GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1493       /* TODO act on error */
1494     }
1495   }
1496   demux->priv->preroll_pending = 0;
1497 
1498   GST_MANIFEST_UNLOCK (demux);
1499   gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
1500   GST_MANIFEST_LOCK (demux);
1501 
1502   if (old_streams) {
1503     GstEvent *eos = gst_event_new_eos ();
1504 
1505     /* before we put streams in the demux->priv->old_streams list,
1506      * we ask the download task to stop. In this way, it will no longer be
1507      * allowed to change the demux object.
1508      */
1509     for (iter = old_streams; iter; iter = g_list_next (iter)) {
1510       GstAdaptiveDemuxStream *stream = iter->data;
1511       GstPad *pad = gst_object_ref (GST_PAD (stream->pad));
1512 
1513       GST_MANIFEST_UNLOCK (demux);
1514 
1515       GST_DEBUG_OBJECT (pad, "Pushing EOS");
1516       gst_pad_push_event (pad, gst_event_ref (eos));
1517       gst_pad_set_active (pad, FALSE);
1518 
1519       GST_LOG_OBJECT (pad, "Removing stream");
1520       gst_element_remove_pad (GST_ELEMENT (demux), pad);
1521       GST_MANIFEST_LOCK (demux);
1522 
1523       gst_object_unref (GST_OBJECT (pad));
1524 
1525       /* ask the download task to stop.
1526        * We will not join it now, because our thread can be one of these tasks.
1527        * We will do the joining later, from another stream download task or
1528        * from gst_adaptive_demux_stop_tasks.
1529        * We also cannot change the state of the stream->src element, because
1530        * that will wait on the streaming thread (which could be this thread)
1531        * to stop first.
1532        * Because we sent an EOS to the downstream element, the stream->src
1533        * element should detect this in its streaming task and stop.
1534        * Even if it doesn't do that, we will change its state later in
1535        * gst_adaptive_demux_stop_tasks.
1536        */
1537       GST_LOG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
1538           "Marking stream as cancelled");
1539       gst_task_stop (stream->download_task);
1540       g_mutex_lock (&stream->fragment_download_lock);
1541       stream->cancelled = TRUE;
1542       stream->replaced = TRUE;
1543       g_cond_signal (&stream->fragment_download_cond);
1544       g_mutex_unlock (&stream->fragment_download_lock);
1545     }
1546     gst_event_unref (eos);
1547 
1548     /* The list should be freed from another thread as we can't properly
1549      * cleanup a GstTask from itself */
1550     demux->priv->old_streams =
1551         g_list_concat (demux->priv->old_streams, old_streams);
1552   }
1553 
1554   /* Unblock after removing oldstreams */
1555   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1556     GstAdaptiveDemuxStream *stream = iter->data;
1557     stream->do_block = FALSE;
1558   }
1559 
1560   GST_DEBUG_OBJECT (demux, "All streams are exposed");
1561 
1562   return TRUE;
1563 }
1564 
1565 /* must be called with manifest_lock taken */
1566 GstAdaptiveDemuxStream *
gst_adaptive_demux_stream_new(GstAdaptiveDemux * demux,GstPad * pad)1567 gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
1568 {
1569   GstAdaptiveDemuxStream *stream;
1570 
1571   stream = g_malloc0 (demux->stream_struct_size);
1572 
1573   /* Downloading task */
1574   g_rec_mutex_init (&stream->download_lock);
1575   stream->download_task =
1576       gst_task_new ((GstTaskFunction) gst_adaptive_demux_stream_download_loop,
1577       stream, NULL);
1578   gst_task_set_lock (stream->download_task, &stream->download_lock);
1579 
1580   stream->pad = pad;
1581   stream->demux = demux;
1582   stream->fragment_bitrates =
1583       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
1584   gst_pad_set_element_private (pad, stream);
1585   stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1586 
1587   g_mutex_lock (&demux->priv->preroll_lock);
1588   stream->do_block = TRUE;
1589   demux->priv->preroll_pending++;
1590   g_mutex_unlock (&demux->priv->preroll_lock);
1591 
1592   gst_pad_set_query_function (pad,
1593       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
1594   gst_pad_set_event_function (pad,
1595       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
1596 
1597   gst_segment_init (&stream->segment, GST_FORMAT_TIME);
1598   g_cond_init (&stream->fragment_download_cond);
1599   g_mutex_init (&stream->fragment_download_lock);
1600 
1601   demux->next_streams = g_list_append (demux->next_streams, stream);
1602 
1603   return stream;
1604 }
1605 
1606 GstAdaptiveDemuxStream *
gst_adaptive_demux_find_stream_for_pad(GstAdaptiveDemux * demux,GstPad * pad)1607 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1608 {
1609   GList *iter;
1610 
1611   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1612     GstAdaptiveDemuxStream *stream = iter->data;
1613     if (stream->pad == pad) {
1614       return stream;
1615     }
1616   }
1617 
1618   return NULL;
1619 }
1620 
1621 /* must be called with manifest_lock taken.
1622  * It will temporarily drop the manifest_lock in order to join the task.
1623  * It will join only the old_streams (the demux->streams are joined by
1624  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux_stream_free is
1625  * called)
1626  */
1627 static void
gst_adaptive_demux_stream_free(GstAdaptiveDemuxStream * stream)1628 gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
1629 {
1630   GstAdaptiveDemux *demux = stream->demux;
1631   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1632 
1633   if (klass->stream_free)
1634     klass->stream_free (stream);
1635 
1636   g_clear_error (&stream->last_error);
1637   if (stream->download_task) {
1638     if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
1639       GST_DEBUG_OBJECT (demux, "Leaving streaming task %s:%s",
1640           GST_DEBUG_PAD_NAME (stream->pad));
1641 
1642       gst_task_stop (stream->download_task);
1643 
1644       g_mutex_lock (&stream->fragment_download_lock);
1645       stream->cancelled = TRUE;
1646       g_cond_signal (&stream->fragment_download_cond);
1647       g_mutex_unlock (&stream->fragment_download_lock);
1648     }
1649     GST_LOG_OBJECT (demux, "Waiting for task to finish");
1650 
1651     /* temporarily drop the manifest lock to join the task */
1652     GST_MANIFEST_UNLOCK (demux);
1653 
1654     gst_task_join (stream->download_task);
1655 
1656     GST_MANIFEST_LOCK (demux);
1657 
1658     GST_LOG_OBJECT (demux, "Finished");
1659     gst_object_unref (stream->download_task);
1660     g_rec_mutex_clear (&stream->download_lock);
1661     stream->download_task = NULL;
1662   }
1663 
1664   gst_adaptive_demux_stream_fragment_clear (&stream->fragment);
1665 
1666   if (stream->pending_segment) {
1667     gst_event_unref (stream->pending_segment);
1668     stream->pending_segment = NULL;
1669   }
1670 
1671   if (stream->pending_events) {
1672     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
1673     stream->pending_events = NULL;
1674   }
1675 
1676   if (stream->internal_pad) {
1677     gst_object_unparent (GST_OBJECT_CAST (stream->internal_pad));
1678   }
1679 
1680   if (stream->src_srcpad) {
1681     gst_object_unref (stream->src_srcpad);
1682     stream->src_srcpad = NULL;
1683   }
1684 
1685   if (stream->src) {
1686     GstElement *src = stream->src;
1687 
1688     stream->src = NULL;
1689 
1690     GST_MANIFEST_UNLOCK (demux);
1691     gst_element_set_locked_state (src, TRUE);
1692     gst_element_set_state (src, GST_STATE_NULL);
1693     gst_bin_remove (GST_BIN_CAST (demux), src);
1694     GST_MANIFEST_LOCK (demux);
1695   }
1696 
1697   g_cond_clear (&stream->fragment_download_cond);
1698   g_mutex_clear (&stream->fragment_download_lock);
1699   g_free (stream->fragment_bitrates);
1700 
1701   if (stream->pad) {
1702     gst_object_unref (stream->pad);
1703     stream->pad = NULL;
1704   }
1705   if (stream->pending_caps)
1706     gst_caps_unref (stream->pending_caps);
1707 
1708   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
1709 
1710   g_free (stream);
1711 }
1712 
1713 /* must be called with manifest_lock taken */
1714 static gboolean
gst_adaptive_demux_get_live_seek_range(GstAdaptiveDemux * demux,gint64 * range_start,gint64 * range_stop)1715 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1716     gint64 * range_start, gint64 * range_stop)
1717 {
1718   GstAdaptiveDemuxClass *klass;
1719 
1720   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1721 
1722   g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1723 
1724   return klass->get_live_seek_range (demux, range_start, range_stop);
1725 }
1726 
1727 /* must be called with manifest_lock taken */
1728 static gboolean
gst_adaptive_demux_stream_in_live_seek_range(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1729 gst_adaptive_demux_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1730     GstAdaptiveDemuxStream * stream)
1731 {
1732   gint64 range_start, range_stop;
1733   if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1734     GST_LOG_OBJECT (stream->pad,
1735         "stream position %" GST_TIME_FORMAT "  live seek range %"
1736         GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1737         GST_TIME_ARGS (stream->segment.position), GST_STIME_ARGS (range_start),
1738         GST_STIME_ARGS (range_stop));
1739     return (stream->segment.position >= range_start
1740         && stream->segment.position <= range_stop);
1741   }
1742 
1743   return FALSE;
1744 }
1745 
1746 /* must be called with manifest_lock taken */
1747 static gboolean
gst_adaptive_demux_can_seek(GstAdaptiveDemux * demux)1748 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1749 {
1750   GstAdaptiveDemuxClass *klass;
1751 
1752   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1753   if (gst_adaptive_demux_is_live (demux)) {
1754     return klass->get_live_seek_range != NULL;
1755   }
1756 
1757   return klass->seek != NULL;
1758 }
1759 
1760 static void
gst_adaptive_demux_update_streams_segment(GstAdaptiveDemux * demux,GList * streams,gint64 period_start,GstSeekType start_type,GstSeekType stop_type)1761 gst_adaptive_demux_update_streams_segment (GstAdaptiveDemux * demux,
1762     GList * streams, gint64 period_start, GstSeekType start_type,
1763     GstSeekType stop_type)
1764 {
1765   GList *iter;
1766   for (iter = streams; iter; iter = g_list_next (iter)) {
1767     GstAdaptiveDemuxStream *stream = iter->data;
1768     GstEvent *seg_evt;
1769     GstClockTime offset;
1770 
1771     /* See comments in gst_adaptive_demux_get_period_start_time() for
1772      * an explanation of the segment modifications */
1773     stream->segment = demux->segment;
1774     offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1775     stream->segment.start += offset - period_start;
1776     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1777       stream->segment.stop += offset - period_start;
1778     if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1779       stream->segment.position = stream->segment.start;
1780     else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1781       stream->segment.position = stream->segment.stop;
1782     seg_evt = gst_event_new_segment (&stream->segment);
1783     gst_event_set_seqnum (seg_evt, demux->priv->segment_seqnum);
1784     gst_event_replace (&stream->pending_segment, seg_evt);
1785     GST_DEBUG_OBJECT (stream->pad, "Pending segment now %" GST_PTR_FORMAT,
1786         stream->pending_segment);
1787     gst_event_unref (seg_evt);
1788     /* Make sure the first buffer after a seek has the discont flag */
1789     stream->discont = TRUE;
1790   }
1791   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1792 }
1793 
1794 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE |	  \
1795                               GST_SEEK_FLAG_SNAP_AFTER |	  \
1796                               GST_SEEK_FLAG_SNAP_NEAREST |	  \
1797 			      GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1798 			      GST_SEEK_FLAG_KEY_UNIT))
1799 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1800                               GST_SEEK_FLAG_SNAP_AFTER | \
1801                               GST_SEEK_FLAG_SNAP_NEAREST))
1802 
1803 static gboolean
gst_adaptive_demux_handle_seek_event(GstAdaptiveDemux * demux,GstPad * pad,GstEvent * event)1804 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
1805     GstEvent * event)
1806 {
1807   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1808   gdouble rate;
1809   GstFormat format;
1810   GstSeekFlags flags;
1811   GstSeekType start_type, stop_type;
1812   gint64 start, stop;
1813   guint32 seqnum;
1814   gboolean update;
1815   gboolean ret;
1816   GstSegment oldsegment;
1817   GstAdaptiveDemuxStream *stream = NULL;
1818 
1819   GST_INFO_OBJECT (demux, "Received seek event");
1820 
1821   GST_API_LOCK (demux);
1822   GST_MANIFEST_LOCK (demux);
1823 
1824   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1825       &stop_type, &stop);
1826 
1827   if (format != GST_FORMAT_TIME) {
1828     GST_MANIFEST_UNLOCK (demux);
1829     GST_API_UNLOCK (demux);
1830     GST_WARNING_OBJECT (demux,
1831         "Adaptive demuxers only support TIME-based seeking");
1832     gst_event_unref (event);
1833     return FALSE;
1834   }
1835 
1836   if (flags & GST_SEEK_FLAG_SEGMENT) {
1837     GST_FIXME_OBJECT (demux, "Handle segment seeks");
1838     GST_MANIFEST_UNLOCK (demux);
1839     GST_API_UNLOCK (demux);
1840     gst_event_unref (event);
1841     return FALSE;
1842   }
1843 
1844   seqnum = gst_event_get_seqnum (event);
1845 
1846   if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
1847     /* For instant rate seeks, reply directly and update
1848      * our segment so the new rate is reflected in any future
1849      * fragments */
1850     GstEvent *ev;
1851 
1852     /* instant rate change only supported if direction does not change. All
1853      * other requirements are already checked before creating the seek event
1854      * but let's double-check here to be sure */
1855     if ((demux->segment.rate > 0 && rate < 0) ||
1856         (demux->segment.rate < 0 && rate > 0) ||
1857         start_type != GST_SEEK_TYPE_NONE ||
1858         stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
1859       GST_ERROR_OBJECT (demux,
1860           "Instant rate change seeks only supported in the "
1861           "same direction, without flushing and position change");
1862       GST_MANIFEST_UNLOCK (demux);
1863       GST_API_UNLOCK (demux);
1864       return FALSE;
1865     }
1866 
1867     ev = gst_event_new_instant_rate_change (rate / demux->segment.rate,
1868         (GstSegmentFlags) flags);
1869     gst_event_set_seqnum (ev, seqnum);
1870 
1871     GST_MANIFEST_UNLOCK (demux);
1872 
1873     ret = gst_adaptive_demux_push_src_event (demux, ev);
1874 
1875     GST_API_UNLOCK (demux);
1876     gst_event_unref (event);
1877 
1878     return ret;
1879   }
1880 
1881   if (!gst_adaptive_demux_can_seek (demux)) {
1882     GST_MANIFEST_UNLOCK (demux);
1883     GST_API_UNLOCK (demux);
1884     gst_event_unref (event);
1885     return FALSE;
1886   }
1887 
1888   if (gst_adaptive_demux_is_live (demux)) {
1889     gint64 range_start, range_stop;
1890     gboolean changed = FALSE;
1891     gboolean start_valid = TRUE, stop_valid = TRUE;
1892 
1893     if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1894             &range_stop)) {
1895       GST_MANIFEST_UNLOCK (demux);
1896       GST_API_UNLOCK (demux);
1897       gst_event_unref (event);
1898       GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
1899       return FALSE;
1900     }
1901 
1902     GST_DEBUG_OBJECT (demux,
1903         "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
1904         GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
1905 
1906     /* Handle relative positioning for live streams (relative to the range_stop) */
1907     if (start_type == GST_SEEK_TYPE_END) {
1908       start = range_stop + start;
1909       start_type = GST_SEEK_TYPE_SET;
1910       changed = TRUE;
1911     }
1912     if (stop_type == GST_SEEK_TYPE_END) {
1913       stop = range_stop + stop;
1914       stop_type = GST_SEEK_TYPE_SET;
1915       changed = TRUE;
1916     }
1917 
1918     /* Adjust the requested start/stop position if it falls beyond the live
1919      * seek range.
1920      * The only case where we don't adjust is for the starting point of
1921      * an accurate seek (start if forward and stop if backwards)
1922      */
1923     if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
1924         (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1925       GST_DEBUG_OBJECT (demux,
1926           "seek before live stream start, setting to range start: %"
1927           GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
1928       start = range_start;
1929       changed = TRUE;
1930     }
1931     /* truncate stop position also if set */
1932     if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
1933         (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1934       GST_DEBUG_OBJECT (demux,
1935           "seek ending after live start, adjusting to: %"
1936           GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
1937       stop = range_stop;
1938       changed = TRUE;
1939     }
1940 
1941     if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
1942         (start < range_start || start > range_stop)) {
1943       GST_WARNING_OBJECT (demux,
1944           "Seek to invalid position start:%" GST_STIME_FORMAT
1945           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1946           ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
1947           GST_STIME_ARGS (range_stop));
1948       start_valid = FALSE;
1949     }
1950     if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
1951         (stop < range_start || stop > range_stop)) {
1952       GST_WARNING_OBJECT (demux,
1953           "Seek to invalid position stop:%" GST_STIME_FORMAT
1954           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1955           ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
1956           GST_STIME_ARGS (range_stop));
1957       stop_valid = FALSE;
1958     }
1959 
1960     /* If the seek position is still outside of the seekable range, refuse the seek */
1961     if (!start_valid || !stop_valid) {
1962       GST_MANIFEST_UNLOCK (demux);
1963       GST_API_UNLOCK (demux);
1964       gst_event_unref (event);
1965       return FALSE;
1966     }
1967 
1968     /* Re-create seek event with changed/updated values */
1969     if (changed) {
1970       gst_event_unref (event);
1971       event =
1972           gst_event_new_seek (rate, format, flags,
1973           start_type, start, stop_type, stop);
1974       gst_event_set_seqnum (event, seqnum);
1975     }
1976   }
1977 
1978   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
1979 
1980   /* have a backup in case seek fails */
1981   gst_segment_copy_into (&demux->segment, &oldsegment);
1982 
1983   if (flags & GST_SEEK_FLAG_FLUSH) {
1984     GstEvent *fevent;
1985 
1986     GST_DEBUG_OBJECT (demux, "sending flush start");
1987     fevent = gst_event_new_flush_start ();
1988     gst_event_set_seqnum (fevent, seqnum);
1989     GST_MANIFEST_UNLOCK (demux);
1990     gst_adaptive_demux_push_src_event (demux, fevent);
1991     GST_MANIFEST_LOCK (demux);
1992 
1993     gst_adaptive_demux_stop_tasks (demux, FALSE);
1994   } else if ((rate > 0 && start_type != GST_SEEK_TYPE_NONE) ||
1995       (rate < 0 && stop_type != GST_SEEK_TYPE_NONE)) {
1996 
1997     gst_adaptive_demux_stop_tasks (demux, FALSE);
1998   }
1999 
2000   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2001 
2002   /*
2003    * Handle snap seeks as follows:
2004    * 1) do the snap seeking on the stream that received
2005    *    the event
2006    * 2) use the final position on this stream to seek
2007    *    on the other streams to the same position
2008    *
2009    * We can't snap at all streams at the same time as
2010    * they might end in different positions, so just
2011    * use the one that received the event as the 'leading'
2012    * one to do the snap seek.
2013    */
2014   if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
2015           gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
2016     GstClockTime ts;
2017     GstSeekFlags stream_seek_flags = flags;
2018 
2019     /* snap-seek on the stream that received the event and then
2020      * use the resulting position to seek on all streams */
2021 
2022     if (rate >= 0) {
2023       if (start_type != GST_SEEK_TYPE_NONE)
2024         ts = start;
2025       else {
2026         ts = stream->segment.position;
2027         start_type = GST_SEEK_TYPE_SET;
2028       }
2029     } else {
2030       if (stop_type != GST_SEEK_TYPE_NONE)
2031         ts = stop;
2032       else {
2033         stop_type = GST_SEEK_TYPE_SET;
2034         ts = stream->segment.position;
2035       }
2036     }
2037 
2038     if (stream) {
2039       demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
2040     }
2041 
2042     /* replace event with a new one without snapping to seek on all streams */
2043     gst_event_unref (event);
2044     if (rate >= 0) {
2045       start = ts;
2046     } else {
2047       stop = ts;
2048     }
2049     event =
2050         gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2051         start_type, start, stop_type, stop);
2052     GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2053   }
2054   stream = NULL;
2055 
2056   ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2057       start, stop_type, stop, &update);
2058 
2059   if (ret) {
2060     /* FIXME - this seems unatural, do_seek() is updating base when we
2061      * only want the start/stop position to change, maybe do_seek() needs
2062      * some fixing? */
2063     if (!(flags & GST_SEEK_FLAG_FLUSH) && ((rate > 0
2064                 && start_type == GST_SEEK_TYPE_NONE) || (rate < 0
2065                 && stop_type == GST_SEEK_TYPE_NONE))) {
2066       demux->segment.base = oldsegment.base;
2067     }
2068 
2069     GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2070 
2071     ret = demux_class->seek (demux, event);
2072   }
2073 
2074   if (!ret) {
2075     /* Is there anything else we can do if it fails? */
2076     gst_segment_copy_into (&oldsegment, &demux->segment);
2077   } else {
2078     demux->priv->segment_seqnum = seqnum;
2079   }
2080   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2081 
2082   if (flags & GST_SEEK_FLAG_FLUSH) {
2083     GstEvent *fevent;
2084 
2085     GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2086     fevent = gst_event_new_flush_stop (TRUE);
2087     gst_event_set_seqnum (fevent, seqnum);
2088 #ifdef OHOS_OPT_COMPAT
2089     /**
2090      * ohos.opt.compat.0037
2091      * Fixed deadlock when seek and reconfigure are concurrent
2092      */
2093     GST_MANIFEST_UNLOCK (demux);
2094 #endif
2095     gst_adaptive_demux_push_src_event (demux, fevent);
2096 #ifdef OHOS_OPT_COMPAT
2097     // ohos.opt.compat.0037
2098     GST_MANIFEST_LOCK (demux);
2099 #endif
2100   }
2101 
2102   if (demux->next_streams) {
2103     /* If the seek generated new streams, get them
2104      * to preroll */
2105     gst_adaptive_demux_prepare_streams (demux, FALSE);
2106     gst_adaptive_demux_start_tasks (demux, TRUE);
2107   } else {
2108     GstClockTime period_start =
2109         gst_adaptive_demux_get_period_start_time (demux);
2110 
2111     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2112     gst_adaptive_demux_update_streams_segment (demux, demux->streams,
2113         period_start, start_type, stop_type);
2114     gst_adaptive_demux_update_streams_segment (demux, demux->prepared_streams,
2115         period_start, start_type, stop_type);
2116     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2117 
2118 #ifdef OHOS_OPT_COMPAT
2119     /* ohos.opt.compat.0028
2120      * In the variable resolution, if the changing path is still in the prepare stream state,
2121      * if seek continues to play at a certain location, the cancel of the prepare stream in will be set to true.
2122      * When switching to the main thread of the prepare stream, it is determined that when cancel is true,
2123      * no data will be pulled from the server, the prepare stream will be released first
2124      */
2125     if (demux->streams && demux->prepared_streams) {
2126       g_list_free_full (demux->prepared_streams,
2127           (GDestroyNotify) gst_adaptive_demux_stream_free);
2128       demux->prepared_streams = NULL;
2129       /* ohos.opt.compat.0043
2130        * Preroll_pending needs to reset when prepared_streams are released. The reason is
2131        * gst_adaptive_demux_stream_new() will increase the value, while after the streams were
2132        * prepared and released the value is not changed.
2133        */
2134       GST_MANIFEST_UNLOCK (demux);
2135       g_mutex_lock (&demux->priv->preroll_lock);
2136       demux->priv->preroll_pending = 0;
2137       g_mutex_unlock (&demux->priv->preroll_lock);
2138       GST_MANIFEST_LOCK (demux);
2139     }
2140 #endif
2141 
2142     /* Restart the demux */
2143     gst_adaptive_demux_start_tasks (demux, FALSE);
2144   }
2145 
2146   GST_MANIFEST_UNLOCK (demux);
2147   GST_API_UNLOCK (demux);
2148   gst_event_unref (event);
2149 
2150   return ret;
2151 }
2152 
2153 static gboolean
gst_adaptive_demux_src_event(GstPad * pad,GstObject * parent,GstEvent * event)2154 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2155     GstEvent * event)
2156 {
2157   GstAdaptiveDemux *demux;
2158 
2159   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2160 
2161   /* FIXME handle events received on pads that are to be removed */
2162 
2163   switch (event->type) {
2164     case GST_EVENT_SEEK:
2165     {
2166       guint32 seqnum = gst_event_get_seqnum (event);
2167       if (seqnum == demux->priv->segment_seqnum) {
2168         GST_LOG_OBJECT (pad,
2169             "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2170         gst_event_unref (event);
2171         return TRUE;
2172       }
2173       return gst_adaptive_demux_handle_seek_event (demux, pad, event);
2174     }
2175     case GST_EVENT_RECONFIGURE:{
2176       GstAdaptiveDemuxStream *stream;
2177 
2178       GST_MANIFEST_LOCK (demux);
2179       stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
2180 
2181       if (stream) {
2182         if (!stream->cancelled && gst_adaptive_demux_is_running (demux) &&
2183             stream->last_ret == GST_FLOW_NOT_LINKED) {
2184           stream->last_ret = GST_FLOW_OK;
2185           stream->restart_download = TRUE;
2186           stream->need_header = TRUE;
2187           stream->discont = TRUE;
2188           GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
2189           gst_task_start (stream->download_task);
2190         }
2191         gst_event_unref (event);
2192         GST_MANIFEST_UNLOCK (demux);
2193         return TRUE;
2194       }
2195       GST_MANIFEST_UNLOCK (demux);
2196     }
2197       break;
2198     case GST_EVENT_LATENCY:{
2199       /* Upstream and our internal source are irrelevant
2200        * for latency, and we should not fail here to
2201        * configure the latency */
2202       gst_event_unref (event);
2203       return TRUE;
2204     }
2205     case GST_EVENT_QOS:{
2206       GstClockTimeDiff diff;
2207       GstClockTime timestamp;
2208       GstClockTime earliest_time;
2209 
2210       gst_event_parse_qos (event, NULL, NULL, &diff, &timestamp);
2211       /* Only take into account lateness if late */
2212       if (diff > 0)
2213         earliest_time = timestamp + 2 * diff;
2214       else
2215         earliest_time = timestamp;
2216 
2217       GST_OBJECT_LOCK (demux);
2218       if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2219           earliest_time > demux->priv->qos_earliest_time) {
2220         demux->priv->qos_earliest_time = earliest_time;
2221         GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2222             GST_TIME_ARGS (demux->priv->qos_earliest_time));
2223       }
2224       GST_OBJECT_UNLOCK (demux);
2225       break;
2226     }
2227     default:
2228       break;
2229   }
2230 
2231   return gst_pad_event_default (pad, parent, event);
2232 }
2233 
2234 static gboolean
gst_adaptive_demux_src_query(GstPad * pad,GstObject * parent,GstQuery * query)2235 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2236     GstQuery * query)
2237 {
2238   GstAdaptiveDemux *demux;
2239   GstAdaptiveDemuxClass *demux_class;
2240   gboolean ret = FALSE;
2241 
2242   if (query == NULL)
2243     return FALSE;
2244 
2245   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2246   demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2247 
2248   switch (query->type) {
2249     case GST_QUERY_DURATION:{
2250       GstClockTime duration = -1;
2251       GstFormat fmt;
2252 
2253       gst_query_parse_duration (query, &fmt, NULL);
2254 #ifdef OHOS_OPT_COMPAT
2255       /**
2256        * ohos.opt.compat.0040 Fixed query duration failed.
2257        * gst_hls_demux_process_manifest() function hold a manifest lock to update playlist.
2258        * During this period, the value obtained by judgment(gst_adaptive_demux_is_live() function) is wrong.
2259        * Thus, use a manifest lock to block until the attribute endlist of m3u8 is updated.
2260        */
2261       GST_MANIFEST_LOCK (demux);
2262 #endif
2263       if (gst_adaptive_demux_is_live (demux)) {
2264         /* We are able to answer this query: the duration is unknown */
2265         gst_query_set_duration (query, fmt, -1);
2266         ret = TRUE;
2267 #ifdef OHOS_OPT_COMPAT
2268         // ohos.opt.compat.0040
2269         GST_MANIFEST_UNLOCK (demux);
2270 #endif
2271         break;
2272       }
2273 #ifdef OHOS_OPT_COMPAT
2274       // ohos.opt.compat.0040
2275       GST_MANIFEST_UNLOCK (demux);
2276 #endif
2277 
2278       if (fmt == GST_FORMAT_TIME
2279           && g_atomic_int_get (&demux->priv->have_manifest)) {
2280         duration = demux_class->get_duration (demux);
2281 
2282         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2283           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2284           ret = TRUE;
2285         }
2286       }
2287 
2288       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2289           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2290       break;
2291     }
2292     case GST_QUERY_LATENCY:{
2293       gst_query_set_latency (query, FALSE, 0, -1);
2294       ret = TRUE;
2295       break;
2296     }
2297     case GST_QUERY_SEEKING:{
2298       GstFormat fmt;
2299       gint64 stop = -1;
2300       gint64 start = 0;
2301 
2302       if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2303         GST_INFO_OBJECT (demux,
2304             "Don't have manifest yet, can't answer seeking query");
2305         return FALSE;           /* can't answer without manifest */
2306       }
2307 
2308       GST_MANIFEST_LOCK (demux);
2309 
2310       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2311       GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2312       if (fmt == GST_FORMAT_TIME) {
2313         GstClockTime duration;
2314         gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2315 
2316         ret = TRUE;
2317         if (can_seek) {
2318           if (gst_adaptive_demux_is_live (demux)) {
2319             ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2320             if (!ret) {
2321               GST_MANIFEST_UNLOCK (demux);
2322               GST_INFO_OBJECT (demux, "can't answer seeking query");
2323               return FALSE;
2324             }
2325           } else {
2326             duration = demux_class->get_duration (demux);
2327             if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2328               stop = duration;
2329           }
2330         }
2331         gst_query_set_seeking (query, fmt, can_seek, start, stop);
2332         GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2333             GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2334             GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2335       }
2336       GST_MANIFEST_UNLOCK (demux);
2337       break;
2338     }
2339     case GST_QUERY_URI:
2340 
2341       GST_MANIFEST_LOCK (demux);
2342 
2343       /* TODO HLS can answer this differently it seems */
2344       if (demux->manifest_uri) {
2345         /* FIXME: (hls) Do we answer with the variant playlist, with the current
2346          * playlist or the the uri of the last downlowaded fragment? */
2347         gst_query_set_uri (query, demux->manifest_uri);
2348         ret = TRUE;
2349       }
2350 
2351       GST_MANIFEST_UNLOCK (demux);
2352       break;
2353     default:
2354       /* Don't forward queries upstream because of the special nature of this
2355        *  "demuxer", which relies on the upstream element only to be fed
2356        *  the Manifest
2357        */
2358       break;
2359   }
2360 
2361   return ret;
2362 }
2363 
2364 /* must be called with manifest_lock taken */
2365 static void
gst_adaptive_demux_start_tasks(GstAdaptiveDemux * demux,gboolean start_preroll_streams)2366 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
2367     gboolean start_preroll_streams)
2368 {
2369   GList *iter;
2370 
2371   if (!gst_adaptive_demux_is_running (demux)) {
2372     GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2373     return;
2374   }
2375 
2376   GST_INFO_OBJECT (demux, "Starting streams' tasks");
2377 
2378   iter = start_preroll_streams ? demux->prepared_streams : demux->streams;
2379 
2380   for (; iter; iter = g_list_next (iter)) {
2381     GstAdaptiveDemuxStream *stream = iter->data;
2382 
2383     if (!start_preroll_streams) {
2384       g_mutex_lock (&stream->fragment_download_lock);
2385       stream->cancelled = FALSE;
2386       stream->replaced = FALSE;
2387       g_mutex_unlock (&stream->fragment_download_lock);
2388     }
2389 
2390     stream->last_ret = GST_FLOW_OK;
2391     gst_task_start (stream->download_task);
2392   }
2393 }
2394 
2395 /* must be called with manifest_lock taken */
2396 static void
gst_adaptive_demux_stop_manifest_update_task(GstAdaptiveDemux * demux)2397 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2398 {
2399   gst_uri_downloader_cancel (demux->downloader);
2400 
2401   gst_task_stop (demux->priv->updates_task);
2402 
2403   g_mutex_lock (&demux->priv->updates_timed_lock);
2404   GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2405   demux->priv->stop_updates_task = TRUE;
2406   g_cond_signal (&demux->priv->updates_timed_cond);
2407   g_mutex_unlock (&demux->priv->updates_timed_lock);
2408 }
2409 
2410 /* must be called with manifest_lock taken */
2411 static void
gst_adaptive_demux_start_manifest_update_task(GstAdaptiveDemux * demux)2412 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2413 {
2414   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2415 
2416   if (gst_adaptive_demux_is_live (demux)) {
2417     gst_uri_downloader_reset (demux->downloader);
2418     g_mutex_lock (&demux->priv->updates_timed_lock);
2419     demux->priv->stop_updates_task = FALSE;
2420     g_mutex_unlock (&demux->priv->updates_timed_lock);
2421     /* Task to periodically update the manifest */
2422     if (demux_class->requires_periodical_playlist_update (demux)) {
2423       GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2424       gst_task_start (demux->priv->updates_task);
2425     }
2426   }
2427 }
2428 
2429 /* must be called with manifest_lock taken
2430  * This function will temporarily release manifest_lock in order to join the
2431  * download threads.
2432  * The api_lock will still protect it against other threads trying to modify
2433  * the demux element.
2434  */
2435 static void
gst_adaptive_demux_stop_tasks(GstAdaptiveDemux * demux,gboolean stop_updates)2436 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2437 {
2438   int i;
2439   GList *iter;
2440   GList *list_to_process;
2441 
2442   GST_LOG_OBJECT (demux, "Stopping tasks");
2443 
2444   if (stop_updates)
2445     gst_adaptive_demux_stop_manifest_update_task (demux);
2446 
2447   list_to_process = demux->streams;
2448   for (i = 0; i < 2; ++i) {
2449     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2450       GstAdaptiveDemuxStream *stream = iter->data;
2451 
2452       g_mutex_lock (&stream->fragment_download_lock);
2453       stream->cancelled = TRUE;
2454       gst_task_stop (stream->download_task);
2455       g_cond_signal (&stream->fragment_download_cond);
2456       g_mutex_unlock (&stream->fragment_download_lock);
2457     }
2458     list_to_process = demux->prepared_streams;
2459   }
2460 
2461   GST_MANIFEST_UNLOCK (demux);
2462   g_mutex_lock (&demux->priv->preroll_lock);
2463   g_cond_broadcast (&demux->priv->preroll_cond);
2464   g_mutex_unlock (&demux->priv->preroll_lock);
2465   GST_MANIFEST_LOCK (demux);
2466 
2467   g_mutex_lock (&demux->priv->manifest_update_lock);
2468   g_cond_broadcast (&demux->priv->manifest_cond);
2469   g_mutex_unlock (&demux->priv->manifest_update_lock);
2470 
2471   /* need to release manifest_lock before stopping the src element.
2472    * The streams were asked to cancel, so they will not make any writes to demux
2473    * object. Even if we temporarily release manifest_lock, the demux->streams
2474    * cannot change and iter cannot be invalidated.
2475    */
2476   list_to_process = demux->streams;
2477   for (i = 0; i < 2; ++i) {
2478     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2479       GstAdaptiveDemuxStream *stream = iter->data;
2480       GstElement *src = stream->src;
2481 
2482       GST_MANIFEST_UNLOCK (demux);
2483 
2484       if (src) {
2485         gst_element_set_locked_state (src, TRUE);
2486         gst_element_set_state (src, GST_STATE_READY);
2487       }
2488 
2489       /* stream->download_task value never changes, so it is safe to read it
2490        * outside critical section
2491        */
2492       gst_task_join (stream->download_task);
2493 
2494       GST_MANIFEST_LOCK (demux);
2495     }
2496     list_to_process = demux->prepared_streams;
2497   }
2498 
2499   GST_MANIFEST_UNLOCK (demux);
2500   if (stop_updates)
2501     gst_task_join (demux->priv->updates_task);
2502 
2503   GST_MANIFEST_LOCK (demux);
2504 
2505   list_to_process = demux->streams;
2506   for (i = 0; i < 2; ++i) {
2507     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2508       GstAdaptiveDemuxStream *stream = iter->data;
2509 
2510       stream->download_error_count = 0;
2511       stream->need_header = TRUE;
2512     }
2513     list_to_process = demux->prepared_streams;
2514   }
2515   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2516 }
2517 
2518 /* must be called with manifest_lock taken */
2519 static gboolean
gst_adaptive_demux_push_src_event(GstAdaptiveDemux * demux,GstEvent * event)2520 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2521 {
2522   GList *iter;
2523   gboolean ret = TRUE;
2524 
2525   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2526     GstAdaptiveDemuxStream *stream = iter->data;
2527     gst_event_ref (event);
2528     ret = ret & gst_pad_push_event (stream->pad, event);
2529   }
2530   gst_event_unref (event);
2531   return ret;
2532 }
2533 
2534 /* must be called with manifest_lock taken */
2535 void
gst_adaptive_demux_stream_set_caps(GstAdaptiveDemuxStream * stream,GstCaps * caps)2536 gst_adaptive_demux_stream_set_caps (GstAdaptiveDemuxStream * stream,
2537     GstCaps * caps)
2538 {
2539   GST_DEBUG_OBJECT (stream->pad, "setting new caps for stream %" GST_PTR_FORMAT,
2540       caps);
2541   gst_caps_replace (&stream->pending_caps, caps);
2542   gst_caps_unref (caps);
2543 }
2544 
2545 /* must be called with manifest_lock taken */
2546 void
gst_adaptive_demux_stream_set_tags(GstAdaptiveDemuxStream * stream,GstTagList * tags)2547 gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
2548     GstTagList * tags)
2549 {
2550   GST_DEBUG_OBJECT (stream->pad, "setting new tags for stream %" GST_PTR_FORMAT,
2551       tags);
2552   if (stream->pending_tags) {
2553     gst_tag_list_unref (stream->pending_tags);
2554   }
2555   stream->pending_tags = tags;
2556 }
2557 
2558 /* must be called with manifest_lock taken */
2559 void
gst_adaptive_demux_stream_queue_event(GstAdaptiveDemuxStream * stream,GstEvent * event)2560 gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
2561     GstEvent * event)
2562 {
2563   stream->pending_events = g_list_append (stream->pending_events, event);
2564 }
2565 
2566 /* must be called with manifest_lock taken */
2567 static guint64
_update_average_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,guint64 new_bitrate)2568 _update_average_bitrate (GstAdaptiveDemux * demux,
2569     GstAdaptiveDemuxStream * stream, guint64 new_bitrate)
2570 {
2571   gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2572 
2573   stream->moving_bitrate -= stream->fragment_bitrates[index];
2574   stream->fragment_bitrates[index] = new_bitrate;
2575   stream->moving_bitrate += new_bitrate;
2576 
2577   stream->moving_index += 1;
2578 
2579   if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2580     return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2581   return stream->moving_bitrate / stream->moving_index;
2582 }
2583 
2584 /* must be called with manifest_lock taken */
2585 static guint64
gst_adaptive_demux_stream_update_current_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2586 gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2587     GstAdaptiveDemuxStream * stream)
2588 {
2589   guint64 average_bitrate;
2590   guint64 fragment_bitrate;
2591 
2592   if (demux->connection_speed) {
2593     GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it",
2594         demux->connection_speed / 1000);
2595     stream->current_download_rate = demux->connection_speed;
2596     return demux->connection_speed;
2597   }
2598 
2599   fragment_bitrate = stream->last_bitrate;
2600   GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2601       fragment_bitrate);
2602 
2603   average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2604 
2605   GST_INFO_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2606       "last fragment bitrate was %" G_GUINT64_FORMAT, fragment_bitrate);
2607   GST_INFO_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2608       "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
2609       NUM_LOOKBACK_FRAGMENTS, average_bitrate);
2610 
2611   /* Conservative approach, make sure we don't upgrade too fast */
2612   stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
2613 
2614   stream->current_download_rate *= demux->bitrate_limit;
2615   GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %"
2616       G_GUINT64_FORMAT, demux->bitrate_limit, stream->current_download_rate);
2617 
2618 #if 0
2619   /* Debugging code, modulate the bitrate every few fragments */
2620   {
2621     static guint ctr = 0;
2622     if (ctr % 3 == 0) {
2623       GST_INFO_OBJECT (demux, "Halving reported bitrate for debugging");
2624       stream->current_download_rate /= 2;
2625     }
2626     ctr++;
2627   }
2628 #endif
2629 
2630   return stream->current_download_rate;
2631 }
2632 
2633 /* must be called with manifest_lock taken */
2634 static GstFlowReturn
gst_adaptive_demux_combine_flows(GstAdaptiveDemux * demux)2635 gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
2636 {
2637   gboolean all_notlinked = TRUE;
2638   gboolean all_eos = TRUE;
2639   GList *iter;
2640 
2641   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2642     GstAdaptiveDemuxStream *stream = iter->data;
2643 
2644     if (stream->last_ret != GST_FLOW_NOT_LINKED) {
2645       all_notlinked = FALSE;
2646       if (stream->last_ret != GST_FLOW_EOS)
2647         all_eos = FALSE;
2648     }
2649 
2650     if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
2651         || stream->last_ret == GST_FLOW_FLUSHING) {
2652       return stream->last_ret;
2653     }
2654   }
2655   if (all_notlinked)
2656     return GST_FLOW_NOT_LINKED;
2657   else if (all_eos)
2658     return GST_FLOW_EOS;
2659   return GST_FLOW_OK;
2660 }
2661 
2662 /* Called with preroll_lock */
2663 static void
gst_adaptive_demux_handle_preroll(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2664 gst_adaptive_demux_handle_preroll (GstAdaptiveDemux * demux,
2665     GstAdaptiveDemuxStream * stream)
2666 {
2667   demux->priv->preroll_pending--;
2668   if (demux->priv->preroll_pending == 0) {
2669     /* That was the last one, time to release all streams
2670      * and expose them */
2671     GST_DEBUG_OBJECT (demux, "All streams prerolled. exposing");
2672     gst_adaptive_demux_expose_streams (demux);
2673     g_cond_broadcast (&demux->priv->preroll_cond);
2674   }
2675 }
2676 
2677 /* must be called with manifest_lock taken.
2678  * Temporarily releases manifest_lock
2679  */
2680 GstFlowReturn
gst_adaptive_demux_stream_push_buffer(GstAdaptiveDemuxStream * stream,GstBuffer * buffer)2681 gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
2682     GstBuffer * buffer)
2683 {
2684   GstAdaptiveDemux *demux = stream->demux;
2685   GstFlowReturn ret = GST_FLOW_OK;
2686   gboolean discont = FALSE;
2687   /* Pending events */
2688   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags = NULL;
2689   GList *pending_events = NULL;
2690 
2691   /* FIXME :
2692    * This is duplicating *exactly* the same thing as what is done at the beginning
2693    * of _src_chain if starting_fragment is TRUE */
2694   if (stream->first_fragment_buffer) {
2695     GstClockTime offset =
2696         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2697     GstClockTime period_start =
2698         gst_adaptive_demux_get_period_start_time (demux);
2699 
2700     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2701     if (demux->segment.rate < 0)
2702       /* Set DISCONT flag for every first buffer in reverse playback mode
2703        * as each fragment for its own has to be reversed */
2704       discont = TRUE;
2705 
2706     GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2707     if (GST_BUFFER_PTS_IS_VALID (buffer))
2708       GST_BUFFER_PTS (buffer) += offset;
2709 
2710     if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2711       stream->segment.position = GST_BUFFER_PTS (buffer);
2712 
2713       /* Convert from position inside the stream's segment to the demuxer's
2714        * segment, they are not necessarily the same */
2715       if (stream->segment.position - offset + period_start >
2716           demux->segment.position)
2717         demux->segment.position =
2718             stream->segment.position - offset + period_start;
2719     }
2720     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2721 
2722     GST_LOG_OBJECT (stream->pad,
2723         "Going to push buffer with PTS %" GST_TIME_FORMAT,
2724         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2725   } else {
2726     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2727   }
2728 
2729   if (stream->discont) {
2730     discont = TRUE;
2731     stream->discont = FALSE;
2732   }
2733 
2734   if (discont) {
2735     GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
2736     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2737   } else {
2738     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
2739   }
2740 
2741   stream->first_fragment_buffer = FALSE;
2742 
2743   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
2744   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
2745   if (G_UNLIKELY (stream->pending_caps)) {
2746     pending_caps = gst_event_new_caps (stream->pending_caps);
2747     gst_caps_unref (stream->pending_caps);
2748     stream->pending_caps = NULL;
2749   }
2750 
2751   if (stream->do_block) {
2752 
2753     g_mutex_lock (&demux->priv->preroll_lock);
2754 
2755     /* If we are preroll state, set caps in here */
2756     if (pending_caps) {
2757       gst_pad_push_event (stream->pad, pending_caps);
2758       pending_caps = NULL;
2759     }
2760 
2761     gst_adaptive_demux_handle_preroll (demux, stream);
2762     GST_MANIFEST_UNLOCK (demux);
2763 
2764     while (stream->do_block && !stream->cancelled) {
2765       GST_LOG_OBJECT (demux, "Stream %p sleeping for preroll", stream);
2766       g_cond_wait (&demux->priv->preroll_cond, &demux->priv->preroll_lock);
2767     }
2768     if (stream->cancelled) {
2769       GST_LOG_OBJECT (demux, "stream %p cancelled", stream);
2770       gst_buffer_unref (buffer);
2771       g_mutex_unlock (&demux->priv->preroll_lock);
2772       return GST_FLOW_FLUSHING;
2773     }
2774 
2775     g_mutex_unlock (&demux->priv->preroll_lock);
2776     GST_MANIFEST_LOCK (demux);
2777   }
2778 
2779   if (G_UNLIKELY (stream->pending_segment)) {
2780     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2781     pending_segment = stream->pending_segment;
2782     stream->pending_segment = NULL;
2783     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2784   }
2785   if (G_UNLIKELY (stream->pending_tags || stream->bitrate_changed)) {
2786     GstTagList *tags = stream->pending_tags;
2787 
2788     stream->pending_tags = NULL;
2789     stream->bitrate_changed = 0;
2790 
2791     if (stream->fragment.bitrate != 0) {
2792       if (tags)
2793         tags = gst_tag_list_make_writable (tags);
2794       else
2795         tags = gst_tag_list_new_empty ();
2796 
2797       gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
2798           GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, NULL);
2799     }
2800     if (tags)
2801       pending_tags = gst_event_new_tag (tags);
2802   }
2803   if (G_UNLIKELY (stream->pending_events)) {
2804     pending_events = stream->pending_events;
2805     stream->pending_events = NULL;
2806   }
2807 
2808   GST_MANIFEST_UNLOCK (demux);
2809 
2810   /* Do not push events or buffers holding the manifest lock */
2811   if (G_UNLIKELY (pending_caps)) {
2812     GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT,
2813         pending_caps);
2814     gst_pad_push_event (stream->pad, pending_caps);
2815   }
2816   if (G_UNLIKELY (pending_segment)) {
2817     GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
2818         pending_segment);
2819     gst_pad_push_event (stream->pad, pending_segment);
2820   }
2821   if (G_UNLIKELY (pending_tags)) {
2822     GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
2823         pending_tags);
2824     gst_pad_push_event (stream->pad, pending_tags);
2825   }
2826   while (pending_events != NULL) {
2827     GstEvent *event = pending_events->data;
2828 
2829     if (!gst_pad_push_event (stream->pad, event))
2830       GST_ERROR_OBJECT (stream->pad, "Failed to send pending event");
2831 
2832     pending_events = g_list_delete_link (pending_events, pending_events);
2833   }
2834 
2835   /* Wait for preroll if blocking */
2836   GST_DEBUG_OBJECT (stream->pad,
2837       "About to push buffer of size %" G_GSIZE_FORMAT,
2838       gst_buffer_get_size (buffer));
2839 
2840   ret = gst_pad_push (stream->pad, buffer);
2841 
2842   GST_MANIFEST_LOCK (demux);
2843 
2844   g_mutex_lock (&stream->fragment_download_lock);
2845   if (G_UNLIKELY (stream->cancelled)) {
2846     GST_LOG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2847         "Stream was cancelled");
2848     ret = stream->last_ret = GST_FLOW_FLUSHING;
2849     g_mutex_unlock (&stream->fragment_download_lock);
2850     return ret;
2851   }
2852   g_mutex_unlock (&stream->fragment_download_lock);
2853 
2854   GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
2855       gst_flow_get_name (ret));
2856 
2857   return ret;
2858 }
2859 
2860 /* must be called with manifest_lock taken */
2861 static GstFlowReturn
gst_adaptive_demux_stream_finish_fragment_default(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2862 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
2863     GstAdaptiveDemuxStream * stream)
2864 {
2865   /* No need to advance, this isn't a real fragment */
2866   if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
2867     return GST_FLOW_OK;
2868 
2869   return gst_adaptive_demux_stream_advance_fragment (demux, stream,
2870       stream->fragment.duration);
2871 }
2872 
2873 /* must be called with manifest_lock taken.
2874  * Can temporarily release manifest_lock
2875  */
2876 static GstFlowReturn
gst_adaptive_demux_stream_data_received_default(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstBuffer * buffer)2877 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
2878     GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
2879 {
2880   return gst_adaptive_demux_stream_push_buffer (stream, buffer);
2881 }
2882 
2883 static gboolean
gst_adaptive_demux_requires_periodical_playlist_update_default(GstAdaptiveDemux * demux)2884 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2885     * demux)
2886 {
2887   return TRUE;
2888 }
2889 
2890 static GstFlowReturn
_src_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)2891 _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2892 {
2893   GstAdaptiveDemuxStream *stream;
2894   GstAdaptiveDemux *demux;
2895   GstAdaptiveDemuxClass *klass;
2896   GstFlowReturn ret = GST_FLOW_OK;
2897 
2898   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2899   stream = gst_pad_get_element_private (pad);
2900   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2901 
2902   GST_MANIFEST_LOCK (demux);
2903 
2904   /* do not make any changes if the stream is cancelled */
2905   g_mutex_lock (&stream->fragment_download_lock);
2906   if (G_UNLIKELY (stream->cancelled)) {
2907     g_mutex_unlock (&stream->fragment_download_lock);
2908     gst_buffer_unref (buffer);
2909     ret = stream->last_ret = GST_FLOW_FLUSHING;
2910     GST_MANIFEST_UNLOCK (demux);
2911     return ret;
2912   }
2913   g_mutex_unlock (&stream->fragment_download_lock);
2914 
2915   /* starting_fragment is set to TRUE at the beginning of
2916    * _stream_download_fragment()
2917    * /!\ If there is a header/index being downloaded, then this will
2918    * be TRUE for the first one ... but FALSE for the remaining ones,
2919    * including the *actual* fragment ! */
2920   if (stream->starting_fragment) {
2921     GstClockTime offset =
2922         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2923     GstClockTime period_start =
2924         gst_adaptive_demux_get_period_start_time (demux);
2925 
2926     stream->starting_fragment = FALSE;
2927     if (klass->start_fragment) {
2928       if (!klass->start_fragment (demux, stream)) {
2929         ret = GST_FLOW_ERROR;
2930         goto error;
2931       }
2932     }
2933 
2934     GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2935     if (GST_BUFFER_PTS_IS_VALID (buffer))
2936       GST_BUFFER_PTS (buffer) += offset;
2937 
2938     GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
2939         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2940 
2941     if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2942       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2943       stream->segment.position = GST_BUFFER_PTS (buffer);
2944 
2945       /* Convert from position inside the stream's segment to the demuxer's
2946        * segment, they are not necessarily the same */
2947       if (stream->segment.position - offset + period_start >
2948           demux->segment.position)
2949         demux->segment.position =
2950             stream->segment.position - offset + period_start;
2951       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2952     }
2953 
2954   } else {
2955     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2956   }
2957 
2958   /* downloading_first_buffer is set to TRUE in download_uri() just before
2959    * activating the source (i.e. requesting a given URI)
2960    *
2961    * The difference with starting_fragment is that this will be called
2962    * for *all* first buffers (of index, and header, and fragment)
2963    *
2964    * ... to then only do something useful (in this block) for actual
2965    * fragments... */
2966   if (stream->downloading_first_buffer) {
2967     gint64 chunk_size = 0;
2968 
2969     stream->downloading_first_buffer = FALSE;
2970 
2971     if (!stream->downloading_header && !stream->downloading_index) {
2972       /* If this is the first buffer of a fragment (not the headers or index)
2973        * and we don't have a birate from the sub-class, then see if we
2974        * can work it out from the fragment size and duration */
2975       if (stream->fragment.bitrate == 0 &&
2976           stream->fragment.duration != 0 &&
2977           gst_element_query_duration (stream->uri_handler, GST_FORMAT_BYTES,
2978               &chunk_size) && chunk_size != -1) {
2979         guint bitrate = MIN (G_MAXUINT, gst_util_uint64_scale (chunk_size,
2980                 8 * GST_SECOND, stream->fragment.duration));
2981         GST_LOG_OBJECT (demux,
2982             "Fragment has size %" G_GINT64_FORMAT " duration %" GST_TIME_FORMAT
2983             " = bitrate %u", chunk_size,
2984             GST_TIME_ARGS (stream->fragment.duration), bitrate);
2985         stream->fragment.bitrate = bitrate;
2986       }
2987       if (stream->fragment.bitrate) {
2988         stream->bitrate_changed = TRUE;
2989       } else {
2990         GST_WARNING_OBJECT (demux, "Bitrate for fragment not available");
2991       }
2992     }
2993   }
2994 
2995   stream->download_total_bytes += gst_buffer_get_size (buffer);
2996 
2997   GST_TRACE_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT,
2998       gst_buffer_get_size (buffer));
2999 
3000   ret = klass->data_received (demux, stream, buffer);
3001 
3002   if (ret == GST_FLOW_FLUSHING) {
3003     /* do not make any changes if the stream is cancelled */
3004     g_mutex_lock (&stream->fragment_download_lock);
3005     if (G_UNLIKELY (stream->cancelled)) {
3006       g_mutex_unlock (&stream->fragment_download_lock);
3007       GST_MANIFEST_UNLOCK (demux);
3008       return ret;
3009     }
3010     g_mutex_unlock (&stream->fragment_download_lock);
3011   }
3012 
3013   if (ret != GST_FLOW_OK) {
3014     gboolean finished = FALSE;
3015 
3016     if (ret < GST_FLOW_EOS) {
3017       GST_ELEMENT_FLOW_ERROR (demux, ret);
3018 
3019       /* TODO push this on all pads */
3020       gst_pad_push_event (stream->pad, gst_event_new_eos ());
3021     } else {
3022       GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
3023           gst_flow_get_name (ret));
3024     }
3025 
3026     if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
3027       ret = GST_FLOW_EOS;       /* return EOS to make the source stop */
3028     } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
3029       /* Behaves like an EOS event from upstream */
3030       stream->fragment.finished = TRUE;
3031       ret = klass->finish_fragment (demux, stream);
3032       if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
3033         ret = GST_FLOW_EOS;     /* return EOS to make the source stop */
3034       } else if (ret != GST_FLOW_OK) {
3035         goto error;
3036       }
3037       finished = TRUE;
3038     }
3039 
3040     gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
3041     if (finished)
3042       ret = GST_FLOW_EOS;
3043   }
3044 
3045 error:
3046 
3047   GST_MANIFEST_UNLOCK (demux);
3048 
3049   return ret;
3050 }
3051 
3052 /* must be called with manifest_lock taken */
3053 static void
gst_adaptive_demux_stream_fragment_download_finish(GstAdaptiveDemuxStream * stream,GstFlowReturn ret,GError * err)3054 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
3055     stream, GstFlowReturn ret, GError * err)
3056 {
3057   GST_DEBUG_OBJECT (stream->pad, "Download finish: %d %s - err: %p", ret,
3058       gst_flow_get_name (ret), err);
3059 
3060   /* if we have an error, only replace last_ret if it was OK before to avoid
3061    * overwriting the first error we got */
3062   if (stream->last_ret == GST_FLOW_OK) {
3063     stream->last_ret = ret;
3064     if (err) {
3065       g_clear_error (&stream->last_error);
3066       stream->last_error = g_error_copy (err);
3067     }
3068   }
3069   g_mutex_lock (&stream->fragment_download_lock);
3070   stream->download_finished = TRUE;
3071   g_cond_signal (&stream->fragment_download_cond);
3072   g_mutex_unlock (&stream->fragment_download_lock);
3073 }
3074 
3075 static GstFlowReturn
gst_adaptive_demux_eos_handling(GstAdaptiveDemuxStream * stream)3076 gst_adaptive_demux_eos_handling (GstAdaptiveDemuxStream * stream)
3077 {
3078   GstFlowReturn ret = GST_FLOW_OK;
3079   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
3080 
3081   if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
3082       || !klass->need_another_chunk (stream)
3083       || stream->fragment.chunk_size == 0) {
3084     stream->fragment.finished = TRUE;
3085 
3086     /* Last chance to figure out a fallback nominal bitrate if neither baseclass
3087        nor the HTTP Content-Length implementation worked. */
3088     if (stream->fragment.bitrate == 0 && stream->fragment.duration != 0 &&
3089         stream->fragment_bytes_downloaded != 0 && !stream->downloading_index &&
3090         !stream->downloading_header) {
3091       guint bitrate = MIN (G_MAXUINT,
3092           gst_util_uint64_scale (stream->fragment_bytes_downloaded,
3093               8 * GST_SECOND, stream->fragment.duration));
3094       GST_LOG_OBJECT (stream->pad,
3095           "Fragment has size %" G_GUINT64_FORMAT " duration %" GST_TIME_FORMAT
3096           " = bitrate %u", stream->fragment_bytes_downloaded,
3097           GST_TIME_ARGS (stream->fragment.duration), bitrate);
3098       stream->fragment.bitrate = bitrate;
3099       stream->bitrate_changed = TRUE;
3100     }
3101     ret = klass->finish_fragment (stream->demux, stream);
3102   }
3103   gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
3104 
3105   return ret;
3106 }
3107 
3108 static gboolean
_src_event(GstPad * pad,GstObject * parent,GstEvent * event)3109 _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3110 {
3111   GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
3112   GstAdaptiveDemux *demux = stream->demux;
3113 
3114   switch (GST_EVENT_TYPE (event)) {
3115     case GST_EVENT_EOS:{
3116       GST_DEBUG_OBJECT (pad, "Saw EOS on src pad");
3117       GST_MANIFEST_LOCK (demux);
3118 
3119       gst_adaptive_demux_eos_handling (stream);
3120 
3121       /* FIXME ?
3122        * _eos_handling() calls  fragment_download_finish() which does the
3123        * same thing as below.
3124        * Could this cause races ? */
3125       g_mutex_lock (&stream->fragment_download_lock);
3126       stream->download_finished = TRUE;
3127       g_cond_signal (&stream->fragment_download_cond);
3128       g_mutex_unlock (&stream->fragment_download_lock);
3129 
3130       GST_MANIFEST_UNLOCK (demux);
3131       break;
3132     }
3133     default:
3134       break;
3135   }
3136 
3137   gst_event_unref (event);
3138 
3139   return TRUE;
3140 }
3141 
3142 static gboolean
_src_query(GstPad * pad,GstObject * parent,GstQuery * query)3143 _src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3144 {
3145   GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
3146 
3147   switch (GST_QUERY_TYPE (query)) {
3148     case GST_QUERY_ALLOCATION:
3149       return FALSE;
3150       break;
3151     default:
3152       break;
3153   }
3154 
3155   return gst_pad_peer_query (stream->pad, query);
3156 }
3157 
3158 static GstPadProbeReturn
_uri_handler_probe(GstPad * pad,GstPadProbeInfo * info,GstAdaptiveDemuxStream * stream)3159 _uri_handler_probe (GstPad * pad, GstPadProbeInfo * info,
3160     GstAdaptiveDemuxStream * stream)
3161 {
3162   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
3163 
3164   if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
3165     GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
3166     if (stream->fragment_bytes_downloaded == 0) {
3167       stream->last_latency =
3168           gst_adaptive_demux_get_monotonic_time (stream->demux) -
3169           (stream->download_start_time * GST_USECOND);
3170       GST_DEBUG_OBJECT (pad,
3171           "FIRST BYTE since download_start %" GST_TIME_FORMAT,
3172           GST_TIME_ARGS (stream->last_latency));
3173     }
3174     stream->fragment_bytes_downloaded += gst_buffer_get_size (buf);
3175     GST_LOG_OBJECT (pad,
3176         "Received buffer, size %" G_GSIZE_FORMAT " total %" G_GUINT64_FORMAT,
3177         gst_buffer_get_size (buf), stream->fragment_bytes_downloaded);
3178   } else if (GST_PAD_PROBE_INFO_TYPE (info) &
3179       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
3180     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
3181     GST_LOG_OBJECT (pad, "Received event %s %" GST_PTR_FORMAT,
3182         GST_EVENT_TYPE_NAME (ev), ev);
3183     switch (GST_EVENT_TYPE (ev)) {
3184       case GST_EVENT_SEGMENT:
3185         stream->fragment_bytes_downloaded = 0;
3186         break;
3187       case GST_EVENT_EOS:
3188       {
3189         stream->last_download_time =
3190             gst_adaptive_demux_get_monotonic_time (stream->demux) -
3191             (stream->download_start_time * GST_USECOND);
3192         stream->last_bitrate =
3193             gst_util_uint64_scale (stream->fragment_bytes_downloaded,
3194             8 * GST_SECOND, stream->last_download_time);
3195         GST_DEBUG_OBJECT (pad,
3196             "EOS since download_start %" GST_TIME_FORMAT " bitrate %"
3197             G_GUINT64_FORMAT " bps", GST_TIME_ARGS (stream->last_download_time),
3198             stream->last_bitrate);
3199         /* Calculate bitrate since URI request */
3200       }
3201         break;
3202       default:
3203         break;
3204     }
3205   }
3206 
3207   return ret;
3208 }
3209 
3210 /* must be called with manifest_lock taken.
3211  * Can temporarily release manifest_lock
3212  */
3213 static gboolean
gst_adaptive_demux_stream_wait_manifest_update(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)3214 gst_adaptive_demux_stream_wait_manifest_update (GstAdaptiveDemux * demux,
3215     GstAdaptiveDemuxStream * stream)
3216 {
3217   gboolean ret = TRUE;
3218 
3219   /* Wait until we're cancelled or there's something for
3220    * us to download in the playlist or the playlist
3221    * became non-live */
3222   while (TRUE) {
3223     GST_DEBUG_OBJECT (demux, "No fragment left but live playlist, wait a bit");
3224 
3225     /* get the manifest_update_lock while still holding the manifest_lock.
3226      * This will prevent other threads to signal the condition (they will need
3227      * both manifest_lock and manifest_update_lock in order to signal).
3228      * It cannot deadlock because all threads always get the manifest_lock first
3229      * and manifest_update_lock second.
3230      */
3231     g_mutex_lock (&demux->priv->manifest_update_lock);
3232 
3233     GST_MANIFEST_UNLOCK (demux);
3234 
3235     g_cond_wait (&demux->priv->manifest_cond,
3236         &demux->priv->manifest_update_lock);
3237     g_mutex_unlock (&demux->priv->manifest_update_lock);
3238 
3239     GST_MANIFEST_LOCK (demux);
3240 
3241     /* check for cancelled every time we get the manifest_lock */
3242     g_mutex_lock (&stream->fragment_download_lock);
3243     if (G_UNLIKELY (stream->cancelled)) {
3244       ret = FALSE;
3245       stream->last_ret = GST_FLOW_FLUSHING;
3246       g_mutex_unlock (&stream->fragment_download_lock);
3247       break;
3248     }
3249     g_mutex_unlock (&stream->fragment_download_lock);
3250 
3251     /* Got a new fragment or not live anymore? */
3252     if (gst_adaptive_demux_stream_update_fragment_info (demux, stream) ==
3253         GST_FLOW_OK) {
3254       GST_DEBUG_OBJECT (demux, "new fragment available, "
3255           "not waiting for manifest update");
3256       ret = TRUE;
3257       break;
3258     }
3259 
3260     if (!gst_adaptive_demux_is_live (demux)) {
3261       GST_DEBUG_OBJECT (demux, "Not live anymore, "
3262           "not waiting for manifest update");
3263       ret = FALSE;
3264       break;
3265     }
3266   }
3267   GST_DEBUG_OBJECT (demux, "Retrying now");
3268   return ret;
3269 }
3270 
3271 /* must be called with manifest_lock taken */
3272 static gboolean
gst_adaptive_demux_stream_update_source(GstAdaptiveDemuxStream * stream,const gchar * uri,const gchar * referer,gboolean refresh,gboolean allow_cache)3273 gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
3274     const gchar * uri, const gchar * referer, gboolean refresh,
3275     gboolean allow_cache)
3276 {
3277   GstAdaptiveDemux *demux = stream->demux;
3278 
3279   if (!gst_uri_is_valid (uri)) {
3280     GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
3281     return FALSE;
3282   }
3283 
3284   /* Try to re-use existing source element */
3285   if (stream->src != NULL) {
3286     gchar *old_protocol, *new_protocol;
3287     gchar *old_uri;
3288 
3289     old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler));
3290     old_protocol = gst_uri_get_protocol (old_uri);
3291     new_protocol = gst_uri_get_protocol (uri);
3292 
3293     if (!g_str_equal (old_protocol, new_protocol)) {
3294       GstElement *src = stream->src;
3295 
3296       stream->src = NULL;
3297       gst_object_unref (stream->src_srcpad);
3298       stream->src_srcpad = NULL;
3299       GST_MANIFEST_UNLOCK (demux);
3300       gst_element_set_locked_state (src, TRUE);
3301       gst_element_set_state (src, GST_STATE_NULL);
3302       gst_bin_remove (GST_BIN_CAST (demux), src);
3303       GST_MANIFEST_LOCK (demux);
3304       GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
3305     } else {
3306       GError *err = NULL;
3307 
3308       GST_DEBUG_OBJECT (demux, "Re-using old source element");
3309       if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri,
3310               &err)) {
3311         GstElement *src = stream->src;
3312 
3313         stream->src = NULL;
3314         GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
3315             err ? err->message : "Unknown error");
3316         g_clear_error (&err);
3317         gst_object_unref (stream->src_srcpad);
3318         stream->src_srcpad = NULL;
3319         GST_MANIFEST_UNLOCK (demux);
3320         gst_element_set_locked_state (src, TRUE);
3321         gst_element_set_state (src, GST_STATE_NULL);
3322         gst_bin_remove (GST_BIN_CAST (demux), src);
3323         GST_MANIFEST_LOCK (demux);
3324       }
3325     }
3326     g_free (old_uri);
3327     g_free (old_protocol);
3328     g_free (new_protocol);
3329   }
3330 
3331   if (stream->src == NULL) {
3332     GstPad *uri_handler_src;
3333     GstPad *queue_sink;
3334     GstPad *queue_src;
3335     GstElement *uri_handler;
3336     GstElement *queue;
3337     GstPadLinkReturn pad_link_ret;
3338     GObjectClass *gobject_class;
3339     gchar *internal_name, *bin_name;
3340 
3341     /* Our src consists of a bin containing uri_handler -> queue . The
3342      * purpose of the queue is to allow the uri_handler to download an
3343      * entire fragment without blocking, so we can accurately measure the
3344      * download bitrate. */
3345 
3346     queue = gst_element_factory_make ("queue", NULL);
3347     if (queue == NULL)
3348       return FALSE;
3349 
3350     g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL);
3351     g_object_set (queue, "max-size-buffers", (guint) 0, NULL);
3352     g_object_set (queue, "max-size-time", (guint64) 0, NULL);
3353 
3354     uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
3355     if (uri_handler == NULL) {
3356       GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
3357           ("Missing plugin to handle URI: '%s'", uri), (NULL));
3358       gst_object_unref (queue);
3359       return FALSE;
3360     }
3361 
3362     gobject_class = G_OBJECT_GET_CLASS (uri_handler);
3363 
3364     if (g_object_class_find_property (gobject_class, "compress"))
3365       g_object_set (uri_handler, "compress", FALSE, NULL);
3366     if (g_object_class_find_property (gobject_class, "keep-alive"))
3367       g_object_set (uri_handler, "keep-alive", TRUE, NULL);
3368     if (g_object_class_find_property (gobject_class, "extra-headers")) {
3369       if (referer || refresh || !allow_cache) {
3370         GstStructure *extra_headers = gst_structure_new_empty ("headers");
3371 
3372         if (referer)
3373           gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
3374               NULL);
3375 
3376         if (!allow_cache)
3377           gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3378               "no-cache", NULL);
3379         else if (refresh)
3380           gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3381               "max-age=0", NULL);
3382 
3383         g_object_set (uri_handler, "extra-headers", extra_headers, NULL);
3384 
3385         gst_structure_free (extra_headers);
3386       } else {
3387         g_object_set (uri_handler, "extra-headers", NULL, NULL);
3388       }
3389     }
3390 
3391     /* Source bin creation */
3392     bin_name = g_strdup_printf ("srcbin-%s", GST_PAD_NAME (stream->pad));
3393     stream->src = gst_bin_new (bin_name);
3394     g_free (bin_name);
3395     if (stream->src == NULL) {
3396       gst_object_unref (queue);
3397       gst_object_unref (uri_handler);
3398       return FALSE;
3399     }
3400 
3401     gst_bin_add (GST_BIN_CAST (stream->src), queue);
3402     gst_bin_add (GST_BIN_CAST (stream->src), uri_handler);
3403 
3404     uri_handler_src = gst_element_get_static_pad (uri_handler, "src");
3405     queue_sink = gst_element_get_static_pad (queue, "sink");
3406 
3407     pad_link_ret =
3408         gst_pad_link_full (uri_handler_src, queue_sink,
3409         GST_PAD_LINK_CHECK_NOTHING);
3410     if (GST_PAD_LINK_FAILED (pad_link_ret)) {
3411       GST_WARNING_OBJECT (demux,
3412           "Could not link pads %s:%s to %s:%s for reason %d",
3413           GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink),
3414           pad_link_ret);
3415       g_object_unref (queue_sink);
3416       g_object_unref (uri_handler_src);
3417       gst_object_unref (stream->src);
3418       stream->src = NULL;
3419       return FALSE;
3420     }
3421 
3422     /* Add a downstream event and data probe */
3423     gst_pad_add_probe (uri_handler_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
3424         (GstPadProbeCallback) _uri_handler_probe, stream, NULL);
3425 
3426     g_object_unref (queue_sink);
3427     g_object_unref (uri_handler_src);
3428     queue_src = gst_element_get_static_pad (queue, "src");
3429     stream->src_srcpad = gst_ghost_pad_new ("src", queue_src);
3430     g_object_unref (queue_src);
3431     gst_element_add_pad (stream->src, stream->src_srcpad);
3432 
3433     gst_element_set_locked_state (stream->src, TRUE);
3434     gst_bin_add (GST_BIN_CAST (demux), stream->src);
3435     stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
3436 
3437     /* set up our internal floating pad to drop all events from
3438      * the http src we don't care about. On the chain function
3439      * we just push the buffer forward */
3440     internal_name = g_strdup_printf ("internal-%s", GST_PAD_NAME (stream->pad));
3441     stream->internal_pad = gst_pad_new (internal_name, GST_PAD_SINK);
3442     g_free (internal_name);
3443     gst_object_set_parent (GST_OBJECT_CAST (stream->internal_pad),
3444         GST_OBJECT_CAST (demux));
3445     GST_OBJECT_FLAG_SET (stream->internal_pad, GST_PAD_FLAG_NEED_PARENT);
3446     gst_pad_set_element_private (stream->internal_pad, stream);
3447     gst_pad_set_active (stream->internal_pad, TRUE);
3448     gst_pad_set_chain_function (stream->internal_pad, _src_chain);
3449     gst_pad_set_event_function (stream->internal_pad, _src_event);
3450     gst_pad_set_query_function (stream->internal_pad, _src_query);
3451 
3452     if (gst_pad_link_full (stream->src_srcpad, stream->internal_pad,
3453             GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
3454       GST_ERROR_OBJECT (stream->pad, "Failed to link internal pad");
3455       return FALSE;
3456     }
3457 
3458     stream->uri_handler = uri_handler;
3459     stream->queue = queue;
3460 
3461     stream->last_status_code = 200;     /* default to OK */
3462   }
3463   return TRUE;
3464 }
3465 
3466 static GstPadProbeReturn
gst_ad_stream_src_to_ready_cb(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)3467 gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
3468     gpointer user_data)
3469 {
3470   GstAdaptiveDemuxStream *stream = user_data;
3471 
3472   /* The source's src pad is IDLE so now set the state to READY */
3473   g_mutex_lock (&stream->fragment_download_lock);
3474   stream->src_at_ready = TRUE;
3475   g_cond_signal (&stream->fragment_download_cond);
3476   g_mutex_unlock (&stream->fragment_download_lock);
3477 
3478   return GST_PAD_PROBE_REMOVE;
3479 }
3480 
3481 #ifndef GST_DISABLE_GST_DEBUG
3482 static const char *
uritype(GstAdaptiveDemuxStream * s)3483 uritype (GstAdaptiveDemuxStream * s)
3484 {
3485   if (s->downloading_header)
3486     return "header";
3487   if (s->downloading_index)
3488     return "index";
3489   return "fragment";
3490 }
3491 #endif
3492 
3493 /* must be called with manifest_lock taken.
3494  * Can temporarily release manifest_lock
3495  *
3496  * Will return when URI is fully downloaded (or aborted/errored)
3497  */
3498 static GstFlowReturn
gst_adaptive_demux_stream_download_uri(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,const gchar * uri,gint64 start,gint64 end,guint * http_status)3499 gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
3500     GstAdaptiveDemuxStream * stream, const gchar * uri, gint64 start,
3501     gint64 end, guint * http_status)
3502 {
3503   GstFlowReturn ret = GST_FLOW_OK;
3504   GST_DEBUG_OBJECT (stream->pad,
3505       "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
3506       uritype (stream), uri, start, end);
3507 
3508   if (http_status)
3509     *http_status = 200;         /* default to ok if no further information */
3510 
3511   if (!gst_adaptive_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
3512     ret = stream->last_ret = GST_FLOW_ERROR;
3513     return ret;
3514   }
3515 
3516   gst_element_set_locked_state (stream->src, TRUE);
3517 
3518   GST_MANIFEST_UNLOCK (demux);
3519   if (gst_element_set_state (stream->src,
3520           GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
3521     /* If ranges are specified, seek to it */
3522     if (start != 0 || end != -1) {
3523       /* HTTP ranges are inclusive, GStreamer segments are exclusive for the
3524        * stop position */
3525       if (end != -1)
3526         end += 1;
3527       /* Send the seek event to the uri_handler, as the other pipeline elements
3528        * can't handle it when READY. */
3529       if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0,
3530                   GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
3531                   GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
3532 
3533         GST_MANIFEST_LOCK (demux);
3534         /* looks like the source can't handle seeks in READY */
3535         g_clear_error (&stream->last_error);
3536         stream->last_error = g_error_new (GST_CORE_ERROR,
3537             GST_CORE_ERROR_NOT_IMPLEMENTED,
3538             "Source element can't handle range requests");
3539         stream->last_ret = GST_FLOW_ERROR;
3540       } else {
3541         GST_MANIFEST_LOCK (demux);
3542       }
3543     } else {
3544       GST_MANIFEST_LOCK (demux);
3545     }
3546 
3547     if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
3548       stream->download_start_time =
3549           GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
3550 
3551       /* src element is in state READY. Before we start it, we reset
3552        * download_finished
3553        */
3554       g_mutex_lock (&stream->fragment_download_lock);
3555       stream->download_finished = FALSE;
3556       stream->downloading_first_buffer = TRUE;
3557       g_mutex_unlock (&stream->fragment_download_lock);
3558 
3559       GST_MANIFEST_UNLOCK (demux);
3560 
3561       if (!gst_element_sync_state_with_parent (stream->src)) {
3562         GST_WARNING_OBJECT (demux, "Could not sync state for src element");
3563         GST_MANIFEST_LOCK (demux);
3564         ret = stream->last_ret = GST_FLOW_ERROR;
3565         return ret;
3566       }
3567 
3568       /* wait for the fragment to be completely downloaded */
3569       GST_DEBUG_OBJECT (stream->pad,
3570           "Waiting for %s download to finish: %s", uritype (stream), uri);
3571 
3572       g_mutex_lock (&stream->fragment_download_lock);
3573       stream->src_at_ready = FALSE;
3574       if (G_UNLIKELY (stream->cancelled)) {
3575         g_mutex_unlock (&stream->fragment_download_lock);
3576         GST_MANIFEST_LOCK (demux);
3577         ret = stream->last_ret = GST_FLOW_FLUSHING;
3578         return ret;
3579       }
3580       /* download_finished is only set:
3581        * * in ::fragment_download_finish()
3582        * * if EOS is received on the _src pad
3583        * */
3584       while (!stream->cancelled && !stream->download_finished) {
3585         g_cond_wait (&stream->fragment_download_cond,
3586             &stream->fragment_download_lock);
3587       }
3588       g_mutex_unlock (&stream->fragment_download_lock);
3589 
3590       GST_DEBUG_OBJECT (stream->pad,
3591           "Finished Waiting for %s download: %s", uritype (stream), uri);
3592 
3593       GST_MANIFEST_LOCK (demux);
3594       g_mutex_lock (&stream->fragment_download_lock);
3595       if (G_UNLIKELY (stream->cancelled)) {
3596         ret = stream->last_ret = GST_FLOW_FLUSHING;
3597         g_mutex_unlock (&stream->fragment_download_lock);
3598         return ret;
3599       }
3600       g_mutex_unlock (&stream->fragment_download_lock);
3601 
3602       ret = stream->last_ret;
3603 
3604       GST_DEBUG_OBJECT (stream->pad, "%s download finished: %s %d %s",
3605           uritype (stream), uri, stream->last_ret,
3606           gst_flow_get_name (stream->last_ret));
3607       if (stream->last_ret != GST_FLOW_OK && http_status) {
3608         *http_status = stream->last_status_code;
3609       }
3610     }
3611 
3612     /* changing src element state might try to join the streaming thread, so
3613      * we must not hold the manifest lock.
3614      */
3615     GST_MANIFEST_UNLOCK (demux);
3616   } else {
3617     GST_MANIFEST_UNLOCK (demux);
3618     if (stream->last_ret == GST_FLOW_OK)
3619       stream->last_ret = GST_FLOW_CUSTOM_ERROR;
3620     ret = GST_FLOW_CUSTOM_ERROR;
3621   }
3622 
3623   stream->src_at_ready = FALSE;
3624 
3625   gst_element_set_locked_state (stream->src, TRUE);
3626   gst_pad_add_probe (stream->src_srcpad, GST_PAD_PROBE_TYPE_IDLE,
3627       gst_ad_stream_src_to_ready_cb, stream, NULL);
3628 
3629   g_mutex_lock (&stream->fragment_download_lock);
3630   while (!stream->src_at_ready) {
3631     g_cond_wait (&stream->fragment_download_cond,
3632         &stream->fragment_download_lock);
3633   }
3634   g_mutex_unlock (&stream->fragment_download_lock);
3635 
3636   gst_element_set_state (stream->src, GST_STATE_READY);
3637 
3638   /* Need to drop the fragment_download_lock to get the MANIFEST lock */
3639   GST_MANIFEST_LOCK (demux);
3640   g_mutex_lock (&stream->fragment_download_lock);
3641   if (G_UNLIKELY (stream->cancelled)) {
3642     ret = stream->last_ret = GST_FLOW_FLUSHING;
3643     g_mutex_unlock (&stream->fragment_download_lock);
3644     return ret;
3645   }
3646   g_mutex_unlock (&stream->fragment_download_lock);
3647 
3648   /* deactivate and reactivate our ghostpad to make it fresh for a new
3649    * stream */
3650   gst_pad_set_active (stream->internal_pad, FALSE);
3651   gst_pad_set_active (stream->internal_pad, TRUE);
3652 
3653   return ret;
3654 }
3655 
3656 /* must be called with manifest_lock taken.
3657  * Can temporarily release manifest_lock
3658  */
3659 static GstFlowReturn
gst_adaptive_demux_stream_download_header_fragment(GstAdaptiveDemuxStream * stream)3660 gst_adaptive_demux_stream_download_header_fragment (GstAdaptiveDemuxStream *
3661     stream)
3662 {
3663   GstAdaptiveDemux *demux = stream->demux;
3664   GstFlowReturn ret = GST_FLOW_OK;
3665 
3666   if (stream->fragment.header_uri != NULL) {
3667     GST_DEBUG_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
3668         G_GINT64_FORMAT, stream->fragment.header_uri,
3669         stream->fragment.header_range_start, stream->fragment.header_range_end);
3670 
3671     stream->downloading_header = TRUE;
3672     ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3673         stream->fragment.header_uri, stream->fragment.header_range_start,
3674         stream->fragment.header_range_end, NULL);
3675     stream->downloading_header = FALSE;
3676   }
3677 
3678   /* check if we have an index */
3679   if (ret == GST_FLOW_OK) {     /* TODO check for other valid types */
3680 
3681     if (stream->fragment.index_uri != NULL) {
3682       GST_DEBUG_OBJECT (demux,
3683           "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
3684           stream->fragment.index_uri,
3685           stream->fragment.index_range_start, stream->fragment.index_range_end);
3686       stream->downloading_index = TRUE;
3687       ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3688           stream->fragment.index_uri, stream->fragment.index_range_start,
3689           stream->fragment.index_range_end, NULL);
3690       stream->downloading_index = FALSE;
3691     }
3692   }
3693 
3694   return ret;
3695 }
3696 
3697 /* must be called with manifest_lock taken.
3698  * Can temporarily release manifest_lock
3699  */
3700 static GstFlowReturn
gst_adaptive_demux_stream_download_fragment(GstAdaptiveDemuxStream * stream)3701 gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
3702 {
3703   GstAdaptiveDemux *demux = stream->demux;
3704   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3705   gchar *url = NULL;
3706   GstFlowReturn ret;
3707   gboolean retried_once = FALSE, live;
3708   guint http_status;
3709   guint last_status_code;
3710 
3711   /* FIXME :  */
3712   /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
3713   stream->starting_fragment = TRUE;
3714   stream->last_ret = GST_FLOW_OK;
3715   stream->first_fragment_buffer = TRUE;
3716 
3717   GST_DEBUG_OBJECT (stream->pad, "Downloading %s%s%s",
3718       stream->fragment.uri ? "FRAGMENT " : "",
3719       stream->fragment.header_uri ? "HEADER " : "",
3720       stream->fragment.index_uri ? "INDEX" : "");
3721 
3722   if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
3723       stream->fragment.index_uri == NULL)
3724     goto no_url_error;
3725 
3726   if (stream->need_header) {
3727     ret = gst_adaptive_demux_stream_download_header_fragment (stream);
3728     if (ret != GST_FLOW_OK) {
3729       return ret;
3730     }
3731     stream->need_header = FALSE;
3732   }
3733 
3734 again:
3735   ret = GST_FLOW_OK;
3736   url = stream->fragment.uri;
3737   GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream);
3738   if (!url)
3739     return ret;
3740 
3741   stream->last_ret = GST_FLOW_OK;
3742   http_status = 200;
3743 
3744   /* Download the actual fragment, either in fragments or in one go */
3745   if (klass->need_another_chunk && klass->need_another_chunk (stream)
3746       && stream->fragment.chunk_size != 0) {
3747     /* Handle chunk downloading */
3748     gint64 range_start, range_end, chunk_start, chunk_end;
3749     guint64 download_total_bytes;
3750     gint chunk_size = stream->fragment.chunk_size;
3751 
3752     range_start = chunk_start = stream->fragment.range_start;
3753     range_end = stream->fragment.range_end;
3754     /* HTTP ranges are inclusive for the end */
3755     if (chunk_size != -1)
3756       chunk_end = range_start + chunk_size - 1;
3757     else
3758       chunk_end = range_end;
3759 
3760     if (range_end != -1)
3761       chunk_end = MIN (chunk_end, range_end);
3762 
3763     while (!stream->fragment.finished && (chunk_start <= range_end
3764             || range_end == -1)) {
3765       download_total_bytes = stream->download_total_bytes;
3766 
3767       ret =
3768           gst_adaptive_demux_stream_download_uri (demux, stream, url,
3769           chunk_start, chunk_end, &http_status);
3770 
3771       GST_DEBUG_OBJECT (stream->pad,
3772           "Fragment chunk download result: %d (%d) %s", stream->last_ret,
3773           http_status, gst_flow_get_name (stream->last_ret));
3774 
3775       /* Don't retry for any chunks except the first. We would have sent
3776        * data downstream already otherwise and it's difficult to recover
3777        * from that in a meaningful way */
3778       if (chunk_start > range_start)
3779         retried_once = TRUE;
3780 
3781       /* FIXME: Check for 416 Range Not Satisfiable here and fall back to
3782        * downloading up to -1. We don't know the full duration.
3783        * Needs https://bugzilla.gnome.org/show_bug.cgi?id=756806 */
3784       if (ret != GST_FLOW_OK && chunk_end == -1) {
3785         break;
3786       } else if (ret != GST_FLOW_OK) {
3787         chunk_end = -1;
3788         stream->last_ret = GST_FLOW_OK;
3789         continue;
3790       }
3791 
3792       if (chunk_end == -1)
3793         break;
3794 
3795       /* Short read, we're at the end now */
3796       if (stream->download_total_bytes - download_total_bytes <
3797           chunk_end + 1 - chunk_start)
3798         break;
3799 
3800       if (!klass->need_another_chunk (stream))
3801         break;
3802 
3803       /* HTTP ranges are inclusive for the end */
3804       chunk_start += chunk_size;
3805       chunk_size = stream->fragment.chunk_size;
3806       if (chunk_size != -1)
3807         chunk_end = chunk_start + chunk_size - 1;
3808       else
3809         chunk_end = range_end;
3810 
3811       if (range_end != -1)
3812         chunk_end = MIN (chunk_end, range_end);
3813     }
3814   } else {
3815     ret =
3816         gst_adaptive_demux_stream_download_uri (demux, stream, url,
3817         stream->fragment.range_start, stream->fragment.range_end, &http_status);
3818     GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s",
3819         stream->last_ret, http_status, gst_flow_get_name (stream->last_ret));
3820   }
3821   if (ret == GST_FLOW_OK)
3822     goto beach;
3823 
3824   g_mutex_lock (&stream->fragment_download_lock);
3825   if (G_UNLIKELY (stream->cancelled)) {
3826     g_mutex_unlock (&stream->fragment_download_lock);
3827     return ret;
3828   }
3829   g_mutex_unlock (&stream->fragment_download_lock);
3830 
3831   /* TODO check if we are truly stopping */
3832   if (ret != GST_FLOW_CUSTOM_ERROR)
3833     goto beach;
3834 
3835   last_status_code = stream->last_status_code;
3836   GST_WARNING_OBJECT (stream->pad, "Got custom error, status %u, dc %d",
3837       last_status_code, stream->download_error_count);
3838 
3839   live = gst_adaptive_demux_is_live (demux);
3840 
3841 #ifdef OHOS_EXT_FUNC
3842   // ohos.ext.func.0013
3843   if (last_status_code == 408) {
3844     GST_WARNING_OBJECT (stream->pad, "Receive error of souphttpsrc, status_code is %u", last_status_code);
3845     stream->download_error_count = MAX_DOWNLOAD_ERROR_COUNT;
3846     return GST_FLOW_ERROR;
3847   }
3848 #endif
3849 
3850   if (!retried_once && ((last_status_code / 100 == 4 && live)
3851           || last_status_code / 100 == 5)) {
3852     /* 4xx/5xx */
3853     /* if current position is before available start, switch to next */
3854     if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream))
3855       goto flushing;
3856 
3857     if (live) {
3858       gint64 range_start, range_stop;
3859 
3860       if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
3861               &range_stop))
3862         goto flushing;
3863 
3864       if (demux->segment.position < range_start) {
3865         GST_DEBUG_OBJECT (stream->pad, "Retrying once with next segment");
3866         stream->last_ret = GST_FLOW_OK;
3867         ret = gst_adaptive_demux_eos_handling (stream);
3868         GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3869             gst_flow_get_name (ret));
3870         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3871         ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3872         GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3873             gst_flow_get_name (ret));
3874         if (ret == GST_FLOW_OK) {
3875           retried_once = TRUE;
3876           goto again;
3877         }
3878       } else if (demux->segment.position > range_stop) {
3879         /* wait a bit to be in range, we don't have any locks at that point */
3880         gint64 wait_time =
3881             gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3882         if (wait_time > 0) {
3883           gint64 end_time = g_get_monotonic_time () + wait_time / GST_USECOND;
3884 
3885           GST_DEBUG_OBJECT (stream->pad,
3886               "Download waiting for %" GST_TIME_FORMAT,
3887               GST_TIME_ARGS (wait_time));
3888 
3889           GST_MANIFEST_UNLOCK (demux);
3890           g_mutex_lock (&stream->fragment_download_lock);
3891           if (G_UNLIKELY (stream->cancelled)) {
3892             g_mutex_unlock (&stream->fragment_download_lock);
3893             GST_MANIFEST_LOCK (demux);
3894             stream->last_ret = GST_FLOW_FLUSHING;
3895             goto flushing;
3896           }
3897           do {
3898             g_cond_wait_until (&stream->fragment_download_cond,
3899                 &stream->fragment_download_lock, end_time);
3900             if (G_UNLIKELY (stream->cancelled)) {
3901               g_mutex_unlock (&stream->fragment_download_lock);
3902               GST_MANIFEST_LOCK (demux);
3903               stream->last_ret = GST_FLOW_FLUSHING;
3904               goto flushing;
3905             }
3906           } while (!stream->download_finished);
3907           g_mutex_unlock (&stream->fragment_download_lock);
3908 
3909           GST_MANIFEST_LOCK (demux);
3910         }
3911       }
3912     }
3913 
3914   flushing:
3915     if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
3916       /* looks like there is no way of knowing when a live stream has ended
3917        * Have to assume we are falling behind and cause a manifest reload */
3918       GST_DEBUG_OBJECT (stream->pad, "Converting error of live stream to EOS");
3919       return GST_FLOW_EOS;
3920     }
3921   } else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
3922     /* If this is the last fragment, consider failures EOS and not actual
3923      * errors. Due to rounding errors in the durations, the last fragment
3924      * might not actually exist */
3925     GST_DEBUG_OBJECT (stream->pad, "Converting error for last fragment to EOS");
3926     return GST_FLOW_EOS;
3927   } else {
3928     /* retry once (same segment) for 5xx (server errors) */
3929     if (!retried_once) {
3930       retried_once = TRUE;
3931       /* wait a short time in case the server needs a bit to recover, we don't
3932        * care if we get woken up before end time. We can use sleep here since
3933        * we're already blocking and just want to wait some time. */
3934       g_usleep (100000);        /* a tenth of a second */
3935       goto again;
3936     }
3937   }
3938 
3939 beach:
3940   return ret;
3941 
3942 no_url_error:
3943   {
3944     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
3945         (_("Failed to get fragment URL.")),
3946         ("An error happened when getting fragment URL"));
3947     gst_task_stop (stream->download_task);
3948     return GST_FLOW_ERROR;
3949   }
3950 }
3951 
3952 /* this function will take the manifest_lock and will keep it until the end.
3953  * It will release it temporarily only when going to sleep.
3954  * Every time it takes the manifest_lock, it will check for cancelled condition
3955  */
3956 static void
gst_adaptive_demux_stream_download_loop(GstAdaptiveDemuxStream * stream)3957 gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
3958 {
3959   GstAdaptiveDemux *demux = stream->demux;
3960   GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
3961   GstFlowReturn ret;
3962   gboolean live;
3963 
3964   GST_LOG_OBJECT (stream->pad, "download loop start");
3965 
3966   GST_MANIFEST_LOCK (demux);
3967 
3968   g_mutex_lock (&stream->fragment_download_lock);
3969   if (G_UNLIKELY (stream->cancelled)) {
3970     stream->last_ret = GST_FLOW_FLUSHING;
3971     g_mutex_unlock (&stream->fragment_download_lock);
3972     goto cancelled;
3973   }
3974   g_mutex_unlock (&stream->fragment_download_lock);
3975 
3976   /* Check if we're done with our segment */
3977   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3978   if (demux->segment.rate > 0) {
3979     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
3980         && stream->segment.position >= stream->segment.stop) {
3981       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3982       ret = GST_FLOW_EOS;
3983       gst_task_stop (stream->download_task);
3984       goto end_of_manifest;
3985     }
3986   } else {
3987     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
3988         && stream->segment.position <= stream->segment.start) {
3989       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3990       ret = GST_FLOW_EOS;
3991       gst_task_stop (stream->download_task);
3992       goto end_of_manifest;
3993     }
3994   }
3995   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3996 
3997   /* Cleanup old streams if any */
3998   if (G_UNLIKELY (demux->priv->old_streams != NULL)) {
3999     GList *old_streams = demux->priv->old_streams;
4000     demux->priv->old_streams = NULL;
4001 
4002     GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams");
4003     g_list_free_full (old_streams,
4004         (GDestroyNotify) gst_adaptive_demux_stream_free);
4005     GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams (done)");
4006 
4007     /* gst_adaptive_demux_stream_free had temporarily released the manifest_lock.
4008      * Recheck the cancelled flag.
4009      */
4010     g_mutex_lock (&stream->fragment_download_lock);
4011     if (G_UNLIKELY (stream->cancelled)) {
4012       stream->last_ret = GST_FLOW_FLUSHING;
4013       g_mutex_unlock (&stream->fragment_download_lock);
4014       goto cancelled;
4015     }
4016     g_mutex_unlock (&stream->fragment_download_lock);
4017   }
4018 
4019   /* Restarting download, figure out new position
4020    * FIXME : Move this to a separate function ? */
4021   if (G_UNLIKELY (stream->restart_download)) {
4022     GstEvent *seg_event;
4023     GstClockTime cur, ts = 0;
4024     gint64 pos;
4025 
4026     GST_DEBUG_OBJECT (stream->pad,
4027         "Activating stream due to reconfigure event");
4028 
4029     if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
4030       ts = (GstClockTime) pos;
4031       GST_DEBUG_OBJECT (demux, "Downstream position: %"
4032           GST_TIME_FORMAT, GST_TIME_ARGS (ts));
4033     } else {
4034       /* query other pads as some faulty element in the pad's branch might
4035        * reject position queries. This should be better than using the
4036        * demux segment position that can be much ahead */
4037       GList *iter;
4038 
4039       for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
4040         GstAdaptiveDemuxStream *cur_stream =
4041             (GstAdaptiveDemuxStream *) iter->data;
4042 
4043         if (gst_pad_peer_query_position (cur_stream->pad, GST_FORMAT_TIME,
4044                 &pos)) {
4045           ts = (GstClockTime) pos;
4046           GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
4047               GST_TIME_FORMAT, GST_TIME_ARGS (ts));
4048           break;
4049         }
4050       }
4051     }
4052 
4053     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4054     cur =
4055         gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
4056         stream->segment.position);
4057 
4058     /* we might have already pushed this data */
4059     ts = MAX (ts, cur);
4060 
4061     GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
4062         "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
4063 
4064     if (GST_CLOCK_TIME_IS_VALID (ts)) {
4065       GstClockTime offset, period_start;
4066 
4067       offset =
4068           gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4069       period_start = gst_adaptive_demux_get_period_start_time (demux);
4070 
4071       /* TODO check return */
4072       gst_adaptive_demux_stream_seek (demux, stream, demux->segment.rate >= 0,
4073           0, ts, &ts);
4074 
4075       stream->segment.position = ts - period_start + offset;
4076     }
4077 
4078     /* The stream's segment is still correct except for
4079      * the position, so let's send a new one with the
4080      * updated position */
4081     seg_event = gst_event_new_segment (&stream->segment);
4082     gst_event_set_seqnum (seg_event, demux->priv->segment_seqnum);
4083     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4084 
4085     GST_DEBUG_OBJECT (stream->pad, "Sending restart segment: %"
4086         GST_PTR_FORMAT, seg_event);
4087     gst_pad_push_event (stream->pad, seg_event);
4088 
4089     stream->discont = TRUE;
4090     stream->restart_download = FALSE;
4091   }
4092 
4093   live = gst_adaptive_demux_is_live (demux);
4094 
4095   /* Get information about the fragment to download */
4096   GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
4097   ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
4098   GST_DEBUG_OBJECT (stream->pad, "Fragment info update result: %d %s",
4099       ret, gst_flow_get_name (ret));
4100   if (ret == GST_FLOW_OK) {
4101 
4102     /* wait for live fragments to be available */
4103     if (live) {
4104       gint64 wait_time =
4105           gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
4106       if (wait_time > 0) {
4107         GstClockTime end_time =
4108             gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
4109 
4110         GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
4111             GST_TIME_ARGS (wait_time));
4112 
4113         GST_MANIFEST_UNLOCK (demux);
4114 
4115         g_mutex_lock (&stream->fragment_download_lock);
4116         if (G_UNLIKELY (stream->cancelled)) {
4117           g_mutex_unlock (&stream->fragment_download_lock);
4118           GST_MANIFEST_LOCK (demux);
4119           stream->last_ret = GST_FLOW_FLUSHING;
4120           goto cancelled;
4121         }
4122         gst_adaptive_demux_wait_until (demux->realtime_clock,
4123             &stream->fragment_download_cond, &stream->fragment_download_lock,
4124             end_time);
4125         g_mutex_unlock (&stream->fragment_download_lock);
4126 
4127         GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
4128 
4129         GST_MANIFEST_LOCK (demux);
4130 
4131         g_mutex_lock (&stream->fragment_download_lock);
4132         if (G_UNLIKELY (stream->cancelled)) {
4133           stream->last_ret = GST_FLOW_FLUSHING;
4134           g_mutex_unlock (&stream->fragment_download_lock);
4135           goto cancelled;
4136         }
4137         g_mutex_unlock (&stream->fragment_download_lock);
4138       }
4139     }
4140 
4141     stream->last_ret = GST_FLOW_OK;
4142 
4143     next_download = gst_adaptive_demux_get_monotonic_time (demux);
4144     ret = gst_adaptive_demux_stream_download_fragment (stream);
4145 
4146     if (ret == GST_FLOW_FLUSHING) {
4147       g_mutex_lock (&stream->fragment_download_lock);
4148       if (G_UNLIKELY (stream->cancelled)) {
4149         stream->last_ret = GST_FLOW_FLUSHING;
4150         g_mutex_unlock (&stream->fragment_download_lock);
4151         goto cancelled;
4152       }
4153       g_mutex_unlock (&stream->fragment_download_lock);
4154     }
4155 
4156   } else {
4157     stream->last_ret = ret;
4158   }
4159 
4160   switch (ret) {
4161     case GST_FLOW_OK:
4162       break;                    /* all is good, let's go */
4163     case GST_FLOW_EOS:
4164       GST_DEBUG_OBJECT (stream->pad, "EOS, checking to stop download loop");
4165 
4166       /* we push the EOS after releasing the object lock */
4167       if (gst_adaptive_demux_is_live (demux)
4168           && (demux->segment.rate == 1.0
4169               || gst_adaptive_demux_stream_in_live_seek_range (demux,
4170                   stream))) {
4171         GstAdaptiveDemuxClass *demux_class =
4172             GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4173 
4174         /* this might be a fragment download error, refresh the manifest, just in case */
4175         if (!demux_class->requires_periodical_playlist_update (demux)) {
4176           ret = gst_adaptive_demux_update_manifest (demux);
4177           break;
4178           /* Wait only if we can ensure current manifest has been expired.
4179            * The meaning "we have next period" *WITH* EOS is that, current
4180            * period has been ended but we can continue to the next period */
4181         } else if (!gst_adaptive_demux_has_next_period (demux) &&
4182             gst_adaptive_demux_stream_wait_manifest_update (demux, stream)) {
4183           goto end;
4184         }
4185         gst_task_stop (stream->download_task);
4186         if (stream->replaced) {
4187           goto end;
4188         }
4189       } else {
4190         gst_task_stop (stream->download_task);
4191       }
4192 
4193       if (gst_adaptive_demux_combine_flows (demux) == GST_FLOW_EOS) {
4194         if (gst_adaptive_demux_has_next_period (demux)) {
4195           GST_DEBUG_OBJECT (stream->pad,
4196               "Next period available, not sending EOS");
4197           gst_adaptive_demux_advance_period (demux);
4198           ret = GST_FLOW_OK;
4199         }
4200       }
4201       break;
4202 
4203     case GST_FLOW_NOT_LINKED:
4204     {
4205       GstFlowReturn ret;
4206       gst_task_stop (stream->download_task);
4207 
4208       ret = gst_adaptive_demux_combine_flows (demux);
4209       if (ret == GST_FLOW_NOT_LINKED) {
4210         GST_ELEMENT_FLOW_ERROR (demux, ret);
4211       }
4212     }
4213       break;
4214 
4215     case GST_FLOW_FLUSHING:{
4216       GList *iter;
4217 
4218       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4219         GstAdaptiveDemuxStream *other;
4220 
4221         other = iter->data;
4222         gst_task_stop (other->download_task);
4223       }
4224     }
4225       break;
4226 
4227     default:
4228       if (ret <= GST_FLOW_ERROR) {
4229         gboolean is_live = gst_adaptive_demux_is_live (demux);
4230         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
4231         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
4232           goto download_error;
4233         }
4234 
4235         g_clear_error (&stream->last_error);
4236 
4237         /* First try to update the playlist for non-live playlists
4238          * in case the URIs have changed in the meantime. But only
4239          * try it the first time, after that we're going to wait a
4240          * a bit to not flood the server */
4241         if (stream->download_error_count == 1 && !is_live) {
4242           /* TODO hlsdemux had more options to this function (boolean and err) */
4243 
4244           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
4245             /* Retry immediately, the playlist actually has changed */
4246             GST_DEBUG_OBJECT (demux, "Updated the playlist");
4247             goto end;
4248           }
4249         }
4250 
4251         /* Wait half the fragment duration before retrying */
4252         next_download += stream->fragment.duration / 2;
4253 
4254         GST_MANIFEST_UNLOCK (demux);
4255 
4256         g_mutex_lock (&stream->fragment_download_lock);
4257         if (G_UNLIKELY (stream->cancelled)) {
4258           g_mutex_unlock (&stream->fragment_download_lock);
4259           GST_MANIFEST_LOCK (demux);
4260           stream->last_ret = GST_FLOW_FLUSHING;
4261           goto cancelled;
4262         }
4263         gst_adaptive_demux_wait_until (demux->realtime_clock,
4264             &stream->fragment_download_cond, &stream->fragment_download_lock,
4265             next_download);
4266         g_mutex_unlock (&stream->fragment_download_lock);
4267 
4268         GST_DEBUG_OBJECT (demux, "Retrying now");
4269 
4270         GST_MANIFEST_LOCK (demux);
4271 
4272         g_mutex_lock (&stream->fragment_download_lock);
4273         if (G_UNLIKELY (stream->cancelled)) {
4274           stream->last_ret = GST_FLOW_FLUSHING;
4275           g_mutex_unlock (&stream->fragment_download_lock);
4276           goto cancelled;
4277         }
4278         g_mutex_unlock (&stream->fragment_download_lock);
4279 
4280         /* Refetch the playlist now after we waited */
4281         if (!is_live
4282             && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
4283           GST_DEBUG_OBJECT (demux, "Updated the playlist");
4284         }
4285         goto end;
4286       }
4287       break;
4288   }
4289 
4290 end_of_manifest:
4291   if (G_UNLIKELY (ret == GST_FLOW_EOS)) {
4292     if (GST_OBJECT_PARENT (stream->pad) != NULL) {
4293       if (demux->next_streams == NULL && demux->prepared_streams == NULL) {
4294         GST_DEBUG_OBJECT (stream->src, "Pushing EOS on pad");
4295         gst_adaptive_demux_stream_push_event (stream, gst_event_new_eos ());
4296       } else {
4297         GST_DEBUG_OBJECT (stream->src,
4298             "Stream is EOS, but we're switching fragments. Not sending.");
4299       }
4300     } else {
4301       GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
4302       goto download_error;
4303     }
4304   }
4305 
4306 end:
4307   GST_MANIFEST_UNLOCK (demux);
4308   GST_LOG_OBJECT (stream->pad, "download loop end");
4309   return;
4310 
4311 cancelled:
4312   {
4313     GST_DEBUG_OBJECT (stream->pad, "Stream has been cancelled");
4314     goto end;
4315   }
4316 download_error:
4317   {
4318     GstMessage *msg;
4319 
4320     if (stream->last_error) {
4321       gchar *debug = g_strdup_printf ("Error on stream %s:%s",
4322           GST_DEBUG_PAD_NAME (stream->pad));
4323       msg =
4324           gst_message_new_error (GST_OBJECT_CAST (demux), stream->last_error,
4325           debug);
4326       GST_ERROR_OBJECT (stream->pad, "Download error: %s",
4327           stream->last_error->message);
4328       g_free (debug);
4329     } else {
4330       GError *err =
4331           g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
4332           _("Couldn't download fragments"));
4333       msg =
4334           gst_message_new_error (GST_OBJECT_CAST (demux), err,
4335           "Fragment downloading has failed consecutive times");
4336       g_error_free (err);
4337       GST_ERROR_OBJECT (stream->pad,
4338           "Download error: Couldn't download fragments, too many failures");
4339     }
4340 
4341     gst_task_stop (stream->download_task);
4342     if (stream->src) {
4343       GstElement *src = stream->src;
4344 
4345       stream->src = NULL;
4346       GST_MANIFEST_UNLOCK (demux);
4347       gst_element_set_locked_state (src, TRUE);
4348       gst_element_set_state (src, GST_STATE_NULL);
4349       gst_bin_remove (GST_BIN_CAST (demux), src);
4350       GST_MANIFEST_LOCK (demux);
4351     }
4352 
4353     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
4354 
4355     goto end;
4356   }
4357 }
4358 
4359 static void
gst_adaptive_demux_updates_loop(GstAdaptiveDemux * demux)4360 gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
4361 {
4362   GstClockTime next_update;
4363   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4364 
4365   /* Loop for updating of the playlist. This periodically checks if
4366    * the playlist is updated and does so, then signals the streaming
4367    * thread in case it can continue downloading now. */
4368 
4369   /* block until the next scheduled update or the signal to quit this thread */
4370   GST_DEBUG_OBJECT (demux, "Started updates task");
4371 
4372   GST_MANIFEST_LOCK (demux);
4373 
4374   next_update =
4375       gst_adaptive_demux_get_monotonic_time (demux) +
4376       klass->get_manifest_update_interval (demux) * GST_USECOND;
4377 
4378   /* Updating playlist only needed for live playlists */
4379   while (gst_adaptive_demux_is_live (demux)) {
4380     GstFlowReturn ret = GST_FLOW_OK;
4381 
4382     /* Wait here until we should do the next update or we're cancelled */
4383     GST_DEBUG_OBJECT (demux, "Wait for next playlist update");
4384 
4385     GST_MANIFEST_UNLOCK (demux);
4386 
4387     g_mutex_lock (&demux->priv->updates_timed_lock);
4388     if (demux->priv->stop_updates_task) {
4389       g_mutex_unlock (&demux->priv->updates_timed_lock);
4390       goto quit;
4391     }
4392     gst_adaptive_demux_wait_until (demux->realtime_clock,
4393         &demux->priv->updates_timed_cond,
4394         &demux->priv->updates_timed_lock, next_update);
4395     g_mutex_unlock (&demux->priv->updates_timed_lock);
4396 
4397     g_mutex_lock (&demux->priv->updates_timed_lock);
4398     if (demux->priv->stop_updates_task) {
4399       g_mutex_unlock (&demux->priv->updates_timed_lock);
4400       goto quit;
4401     }
4402     g_mutex_unlock (&demux->priv->updates_timed_lock);
4403 
4404     GST_MANIFEST_LOCK (demux);
4405 
4406     GST_DEBUG_OBJECT (demux, "Updating playlist");
4407 
4408     ret = gst_adaptive_demux_update_manifest (demux);
4409 
4410     if (ret == GST_FLOW_EOS) {
4411     } else if (ret != GST_FLOW_OK) {
4412       /* update_failed_count is used only here, no need to protect it */
4413       demux->priv->update_failed_count++;
4414       if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
4415         GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
4416             gst_flow_get_name (ret));
4417         next_update = gst_adaptive_demux_get_monotonic_time (demux)
4418             + klass->get_manifest_update_interval (demux) * GST_USECOND;
4419       } else {
4420         GST_ELEMENT_ERROR (demux, STREAM, FAILED,
4421             (_("Internal data stream error.")), ("Could not update playlist"));
4422         GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
4423         gst_task_stop (demux->priv->updates_task);
4424         GST_MANIFEST_UNLOCK (demux);
4425         goto end;
4426       }
4427     } else {
4428       GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
4429       demux->priv->update_failed_count = 0;
4430       next_update =
4431           gst_adaptive_demux_get_monotonic_time (demux) +
4432           klass->get_manifest_update_interval (demux) * GST_USECOND;
4433 
4434       /* Wake up download tasks */
4435       g_mutex_lock (&demux->priv->manifest_update_lock);
4436       g_cond_broadcast (&demux->priv->manifest_cond);
4437       g_mutex_unlock (&demux->priv->manifest_update_lock);
4438     }
4439   }
4440 
4441   GST_MANIFEST_UNLOCK (demux);
4442 
4443 quit:
4444   {
4445     GST_DEBUG_OBJECT (demux, "Stop updates task request detected.");
4446   }
4447 
4448 end:
4449   {
4450     return;
4451   }
4452 }
4453 
4454 /* must be called with manifest_lock taken */
4455 static gboolean
gst_adaptive_demux_stream_push_event(GstAdaptiveDemuxStream * stream,GstEvent * event)4456 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
4457     GstEvent * event)
4458 {
4459   gboolean ret;
4460   GstPad *pad;
4461   GstAdaptiveDemux *demux = stream->demux;
4462 
4463   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
4464     stream->eos = TRUE;
4465   }
4466 
4467   pad = gst_object_ref (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream));
4468 
4469   /* Can't push events holding the manifest lock */
4470   GST_MANIFEST_UNLOCK (demux);
4471 
4472   GST_DEBUG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
4473       "Pushing event %" GST_PTR_FORMAT, event);
4474 
4475   ret = gst_pad_push_event (pad, event);
4476 
4477   gst_object_unref (pad);
4478 
4479   GST_MANIFEST_LOCK (demux);
4480 
4481   return ret;
4482 }
4483 
4484 /* must be called with manifest_lock taken */
4485 static gboolean
gst_adaptive_demux_is_live(GstAdaptiveDemux * demux)4486 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
4487 {
4488   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4489 
4490   if (klass->is_live)
4491     return klass->is_live (demux);
4492   return FALSE;
4493 }
4494 
4495 /* must be called with manifest_lock taken */
4496 static GstFlowReturn
gst_adaptive_demux_stream_seek(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,gboolean forward,GstSeekFlags flags,GstClockTime ts,GstClockTime * final_ts)4497 gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
4498     GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
4499     GstClockTime ts, GstClockTime * final_ts)
4500 {
4501   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4502 
4503   if (klass->stream_seek)
4504     return klass->stream_seek (stream, forward, flags, ts, final_ts);
4505   return GST_FLOW_ERROR;
4506 }
4507 
4508 /* must be called with manifest_lock taken */
4509 static gboolean
gst_adaptive_demux_stream_has_next_fragment(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4510 gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
4511     GstAdaptiveDemuxStream * stream)
4512 {
4513   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4514   gboolean ret = TRUE;
4515 
4516   if (klass->stream_has_next_fragment)
4517     ret = klass->stream_has_next_fragment (stream);
4518 
4519   return ret;
4520 }
4521 
4522 /* must be called with manifest_lock taken */
4523 /* Called from:
4524  *  the ::finish_fragment() handlers when an *actual* fragment is done
4525  *   */
4526 GstFlowReturn
gst_adaptive_demux_stream_advance_fragment(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstClockTime duration)4527 gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
4528     GstAdaptiveDemuxStream * stream, GstClockTime duration)
4529 {
4530   GstFlowReturn ret;
4531 
4532   if (stream->last_ret == GST_FLOW_OK) {
4533     stream->last_ret =
4534         gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
4535         duration);
4536   }
4537   ret = stream->last_ret;
4538 
4539   return ret;
4540 }
4541 
4542 /* must be called with manifest_lock taken */
4543 GstFlowReturn
gst_adaptive_demux_stream_advance_fragment_unlocked(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstClockTime duration)4544 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
4545     GstAdaptiveDemuxStream * stream, GstClockTime duration)
4546 {
4547   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4548   GstFlowReturn ret;
4549 
4550   g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
4551 
4552   GST_LOG_OBJECT (stream->pad,
4553       "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4554       GST_TIME_ARGS (stream->fragment.timestamp), GST_TIME_ARGS (duration));
4555 
4556   stream->download_error_count = 0;
4557   g_clear_error (&stream->last_error);
4558 
4559   /* FIXME - url has no indication of byte ranges for subsegments */
4560   /* FIXME : All those time statistics are biased, since they are calculated
4561    * *AFTER* the queue2, which might be blocking. They should ideally be
4562    * calculated *before* queue2 in the uri_handler_probe */
4563   gst_element_post_message (GST_ELEMENT_CAST (demux),
4564       gst_message_new_element (GST_OBJECT_CAST (demux),
4565           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
4566               "manifest-uri", G_TYPE_STRING,
4567               demux->manifest_uri, "uri", G_TYPE_STRING,
4568               stream->fragment.uri, "fragment-start-time",
4569               GST_TYPE_CLOCK_TIME, stream->download_start_time,
4570               "fragment-stop-time", GST_TYPE_CLOCK_TIME,
4571               gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
4572               stream->download_total_bytes, "fragment-download-time",
4573               GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
4574 
4575   /* Don't update to the end of the segment if in reverse playback */
4576   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4577   if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
4578     GstClockTime offset =
4579         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4580     GstClockTime period_start =
4581         gst_adaptive_demux_get_period_start_time (demux);
4582 
4583     stream->segment.position += duration;
4584 
4585     /* Convert from position inside the stream's segment to the demuxer's
4586      * segment, they are not necessarily the same */
4587     if (stream->segment.position - offset + period_start >
4588         demux->segment.position)
4589       demux->segment.position =
4590           stream->segment.position - offset + period_start;
4591   }
4592   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4593 
4594   /* When advancing with a non 1.0 rate on live streams, we need to check
4595    * the live seeking range again to make sure we can still advance to
4596    * that position */
4597   if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
4598     if (!gst_adaptive_demux_stream_in_live_seek_range (demux, stream))
4599       ret = GST_FLOW_EOS;
4600     else
4601       ret = klass->stream_advance_fragment (stream);
4602   } else if (gst_adaptive_demux_is_live (demux)
4603       || gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
4604     ret = klass->stream_advance_fragment (stream);
4605   } else {
4606     ret = GST_FLOW_EOS;
4607   }
4608 
4609   stream->download_start_time =
4610       GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
4611 
4612   if (ret == GST_FLOW_OK) {
4613     if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
4614             gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) {
4615       stream->need_header = TRUE;
4616       ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
4617     }
4618 
4619     /* the subclass might want to switch pads */
4620     if (G_UNLIKELY (demux->next_streams)) {
4621       GList *iter;
4622       gboolean can_expose = TRUE;
4623 
4624       gst_task_stop (stream->download_task);
4625 
4626       ret = GST_FLOW_EOS;
4627 
4628       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4629         /* Only expose if all streams are now cancelled or finished downloading */
4630         GstAdaptiveDemuxStream *other = iter->data;
4631         if (other != stream) {
4632           g_mutex_lock (&other->fragment_download_lock);
4633           can_expose &= (other->cancelled == TRUE
4634               || other->download_finished == TRUE);
4635           g_mutex_unlock (&other->fragment_download_lock);
4636         }
4637       }
4638 
4639       if (can_expose) {
4640         GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
4641             "to do bitrate switching");
4642         gst_adaptive_demux_prepare_streams (demux, FALSE);
4643         gst_adaptive_demux_start_tasks (demux, TRUE);
4644       } else {
4645         GST_LOG_OBJECT (demux, "Not switching yet - ongoing downloads");
4646       }
4647     }
4648   }
4649 
4650   return ret;
4651 }
4652 
4653 /* must be called with manifest_lock taken */
4654 static gboolean
gst_adaptive_demux_stream_select_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,guint64 bitrate)4655 gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
4656     demux, GstAdaptiveDemuxStream * stream, guint64 bitrate)
4657 {
4658   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4659 
4660   if (klass->stream_select_bitrate)
4661     return klass->stream_select_bitrate (stream, bitrate);
4662   return FALSE;
4663 }
4664 
4665 /* must be called with manifest_lock taken */
4666 static GstFlowReturn
gst_adaptive_demux_stream_update_fragment_info(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4667 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
4668     GstAdaptiveDemuxStream * stream)
4669 {
4670   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4671   GstFlowReturn ret;
4672 
4673   g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
4674       GST_FLOW_ERROR);
4675 
4676   /* Make sure the sub-class will update bitrate, or else
4677    * we will later */
4678   stream->fragment.bitrate = 0;
4679   stream->fragment.finished = FALSE;
4680 
4681   GST_LOG_OBJECT (stream->pad, "position %" GST_TIME_FORMAT,
4682       GST_TIME_ARGS (stream->segment.position));
4683 
4684   ret = klass->stream_update_fragment_info (stream);
4685 
4686   GST_LOG_OBJECT (stream->pad, "ret:%s uri:%s", gst_flow_get_name (ret),
4687       stream->fragment.uri);
4688   if (ret == GST_FLOW_OK) {
4689     GST_LOG_OBJECT (stream->pad,
4690         "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4691         GST_TIME_ARGS (stream->fragment.timestamp),
4692         GST_TIME_ARGS (stream->fragment.duration));
4693     GST_LOG_OBJECT (stream->pad,
4694         "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
4695         stream->fragment.range_start, stream->fragment.range_end);
4696   }
4697 
4698   return ret;
4699 }
4700 
4701 /* must be called with manifest_lock taken */
4702 static gint64
gst_adaptive_demux_stream_get_fragment_waiting_time(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4703 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux *
4704     demux, GstAdaptiveDemuxStream * stream)
4705 {
4706   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4707 
4708   if (klass->stream_get_fragment_waiting_time)
4709     return klass->stream_get_fragment_waiting_time (stream);
4710   return 0;
4711 }
4712 
4713 /* must be called with manifest_lock taken */
4714 static GstFlowReturn
gst_adaptive_demux_update_manifest_default(GstAdaptiveDemux * demux)4715 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4716 {
4717   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4718   GstFragment *download;
4719   GstBuffer *buffer;
4720   GstFlowReturn ret;
4721   GError *error = NULL;
4722 
4723   download = gst_uri_downloader_fetch_uri (demux->downloader,
4724       demux->manifest_uri, NULL, TRUE, TRUE, TRUE, &error);
4725   if (download) {
4726     g_free (demux->manifest_uri);
4727     g_free (demux->manifest_base_uri);
4728     if (download->redirect_permanent && download->redirect_uri) {
4729       demux->manifest_uri = g_strdup (download->redirect_uri);
4730       demux->manifest_base_uri = NULL;
4731     } else {
4732       demux->manifest_uri = g_strdup (download->uri);
4733       demux->manifest_base_uri = g_strdup (download->redirect_uri);
4734     }
4735 
4736     buffer = gst_fragment_get_buffer (download);
4737     g_object_unref (download);
4738     ret = klass->update_manifest_data (demux, buffer);
4739     gst_buffer_unref (buffer);
4740     /* FIXME: Should the manifest uri vars be reverted to original
4741      * values if updating fails? */
4742   } else {
4743     GST_WARNING_OBJECT (demux, "Failed to download manifest: %s",
4744         error->message);
4745     ret = GST_FLOW_NOT_LINKED;
4746   }
4747   g_clear_error (&error);
4748 
4749   return ret;
4750 }
4751 
4752 /* must be called with manifest_lock taken */
4753 static GstFlowReturn
gst_adaptive_demux_update_manifest(GstAdaptiveDemux * demux)4754 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4755 {
4756   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4757   GstFlowReturn ret;
4758 
4759   ret = klass->update_manifest (demux);
4760 
4761   if (ret == GST_FLOW_OK) {
4762     GstClockTime duration;
4763     /* Send an updated duration message */
4764     duration = klass->get_duration (demux);
4765     if (duration != GST_CLOCK_TIME_NONE) {
4766       GST_DEBUG_OBJECT (demux,
4767           "Sending duration message : %" GST_TIME_FORMAT,
4768           GST_TIME_ARGS (duration));
4769       gst_element_post_message (GST_ELEMENT (demux),
4770           gst_message_new_duration_changed (GST_OBJECT (demux)));
4771     } else {
4772       GST_DEBUG_OBJECT (demux,
4773           "Duration unknown, can not send the duration message");
4774     }
4775 
4776     /* If a manifest changes it's liveness or periodic updateness, we need
4777      * to start/stop the manifest update task appropriately */
4778     /* Keep this condition in sync with the one in
4779      * gst_adaptive_demux_start_manifest_update_task()
4780      */
4781     if (gst_adaptive_demux_is_live (demux) &&
4782         klass->requires_periodical_playlist_update (demux)) {
4783       gst_adaptive_demux_start_manifest_update_task (demux);
4784     } else {
4785       gst_adaptive_demux_stop_manifest_update_task (demux);
4786     }
4787   }
4788 
4789   return ret;
4790 }
4791 
4792 void
gst_adaptive_demux_stream_fragment_clear(GstAdaptiveDemuxStreamFragment * f)4793 gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f)
4794 {
4795   g_free (f->uri);
4796   f->uri = NULL;
4797   f->range_start = 0;
4798   f->range_end = -1;
4799 
4800   g_free (f->header_uri);
4801   f->header_uri = NULL;
4802   f->header_range_start = 0;
4803   f->header_range_end = -1;
4804 
4805   g_free (f->index_uri);
4806   f->index_uri = NULL;
4807   f->index_range_start = 0;
4808   f->index_range_end = -1;
4809 
4810   f->finished = FALSE;
4811 }
4812 
4813 /* must be called with manifest_lock taken */
4814 static gboolean
gst_adaptive_demux_has_next_period(GstAdaptiveDemux * demux)4815 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4816 {
4817   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4818   gboolean ret = FALSE;
4819 
4820   if (klass->has_next_period)
4821     ret = klass->has_next_period (demux);
4822   GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4823   return ret;
4824 }
4825 
4826 /* must be called with manifest_lock taken */
4827 static void
gst_adaptive_demux_advance_period(GstAdaptiveDemux * demux)4828 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4829 {
4830   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4831 
4832   g_return_if_fail (klass->advance_period != NULL);
4833 
4834   GST_DEBUG_OBJECT (demux, "Advancing to next period");
4835   klass->advance_period (demux);
4836   gst_adaptive_demux_prepare_streams (demux, FALSE);
4837   gst_adaptive_demux_start_tasks (demux, TRUE);
4838 }
4839 
4840 /**
4841  * gst_adaptive_demux_get_monotonic_time:
4842  * Returns: a monotonically increasing time, using the system realtime clock
4843  */
4844 GstClockTime
gst_adaptive_demux_get_monotonic_time(GstAdaptiveDemux * demux)4845 gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux)
4846 {
4847   g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4848   return gst_clock_get_time (demux->realtime_clock);
4849 }
4850 
4851 /**
4852  * gst_adaptive_demux_get_client_now_utc:
4853  * @demux: #GstAdaptiveDemux
4854  * Returns: the client's estimate of UTC
4855  *
4856  * Used to find the client's estimate of UTC, using the system realtime clock.
4857  */
4858 GDateTime *
gst_adaptive_demux_get_client_now_utc(GstAdaptiveDemux * demux)4859 gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
4860 {
4861   GstClockTime rtc_now;
4862   GDateTime *unix_datetime;
4863   GDateTime *result_datetime;
4864   gint64 utc_now_in_us;
4865 
4866   rtc_now = gst_clock_get_time (demux->realtime_clock);
4867   utc_now_in_us = demux->clock_offset + GST_TIME_AS_USECONDS (rtc_now);
4868   unix_datetime =
4869       g_date_time_new_from_unix_utc (utc_now_in_us / G_TIME_SPAN_SECOND);
4870   result_datetime =
4871       g_date_time_add (unix_datetime, utc_now_in_us % G_TIME_SPAN_SECOND);
4872   g_date_time_unref (unix_datetime);
4873   return result_datetime;
4874 }
4875 
4876 /**
4877  * gst_adaptive_demux_is_running
4878  * @demux: #GstAdaptiveDemux
4879  * Returns: whether the demuxer is processing data
4880  *
4881  * Returns FALSE if shutdown has started (transitioning down from
4882  * PAUSED), otherwise TRUE.
4883  */
4884 gboolean
gst_adaptive_demux_is_running(GstAdaptiveDemux * demux)4885 gst_adaptive_demux_is_running (GstAdaptiveDemux * demux)
4886 {
4887   return g_atomic_int_get (&demux->running);
4888 }
4889 
4890 static GstAdaptiveDemuxTimer *
gst_adaptive_demux_timer_new(GCond * cond,GMutex * mutex)4891 gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
4892 {
4893   GstAdaptiveDemuxTimer *timer;
4894 
4895   timer = g_slice_new (GstAdaptiveDemuxTimer);
4896   timer->fired = FALSE;
4897   timer->cond = cond;
4898   timer->mutex = mutex;
4899   g_atomic_int_set (&timer->ref_count, 1);
4900   return timer;
4901 }
4902 
4903 static GstAdaptiveDemuxTimer *
gst_adaptive_demux_timer_ref(GstAdaptiveDemuxTimer * timer)4904 gst_adaptive_demux_timer_ref (GstAdaptiveDemuxTimer * timer)
4905 {
4906   g_return_val_if_fail (timer != NULL, NULL);
4907   g_atomic_int_inc (&timer->ref_count);
4908   return timer;
4909 }
4910 
4911 static void
gst_adaptive_demux_timer_unref(GstAdaptiveDemuxTimer * timer)4912 gst_adaptive_demux_timer_unref (GstAdaptiveDemuxTimer * timer)
4913 {
4914   g_return_if_fail (timer != NULL);
4915   if (g_atomic_int_dec_and_test (&timer->ref_count)) {
4916     g_slice_free (GstAdaptiveDemuxTimer, timer);
4917   }
4918 }
4919 
4920 /* gst_adaptive_demux_wait_until:
4921  * A replacement for g_cond_wait_until that uses the clock rather
4922  * than system time to control the duration of the sleep. Typically
4923  * clock is actually a #GstSystemClock, in which case this function
4924  * behaves exactly like g_cond_wait_until. Inside unit tests,
4925  * the clock is typically a #GstTestClock, which allows tests to run
4926  * in non-realtime.
4927  * This function must be called with mutex held.
4928  */
4929 static gboolean
gst_adaptive_demux_wait_until(GstClock * clock,GCond * cond,GMutex * mutex,GstClockTime end_time)4930 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
4931     GstClockTime end_time)
4932 {
4933   GstAdaptiveDemuxTimer *timer;
4934   gboolean fired;
4935   GstClockReturn res;
4936 
4937   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
4938     /* for an invalid time, gst_clock_id_wait_async will try to call
4939      * gst_adaptive_demux_clock_callback from the current thread.
4940      * It still holds the mutex while doing that, so it will deadlock.
4941      * g_cond_wait_until would return immediately with false, so we'll do the same.
4942      */
4943     return FALSE;
4944   }
4945   timer = gst_adaptive_demux_timer_new (cond, mutex);
4946   timer->clock_id = gst_clock_new_single_shot_id (clock, end_time);
4947   res =
4948       gst_clock_id_wait_async (timer->clock_id,
4949       gst_adaptive_demux_clock_callback, gst_adaptive_demux_timer_ref (timer),
4950       (GDestroyNotify) gst_adaptive_demux_timer_unref);
4951   /* clock does not support asynchronously wait. Assert and return */
4952   if (res == GST_CLOCK_UNSUPPORTED) {
4953     gst_clock_id_unref (timer->clock_id);
4954     gst_adaptive_demux_timer_unref (timer);
4955     g_return_val_if_reached (TRUE);
4956   }
4957   g_assert (!timer->fired);
4958   /* the gst_adaptive_demux_clock_callback() will signal the
4959    * cond when the clock's single shot timer fires, or the cond will be
4960    * signalled by another thread that wants to cause this wait to finish
4961    * early (e.g. to terminate the waiting thread).
4962    * There is no need for a while loop here, because that logic is
4963    * implemented by the function calling gst_adaptive_demux_wait_until() */
4964   g_cond_wait (cond, mutex);
4965   fired = timer->fired;
4966   if (!fired)
4967     gst_clock_id_unschedule (timer->clock_id);
4968   gst_clock_id_unref (timer->clock_id);
4969   gst_adaptive_demux_timer_unref (timer);
4970   return !fired;
4971 }
4972 
4973 static gboolean
gst_adaptive_demux_clock_callback(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)4974 gst_adaptive_demux_clock_callback (GstClock * clock,
4975     GstClockTime time, GstClockID id, gpointer user_data)
4976 {
4977   GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
4978   g_return_val_if_fail (timer != NULL, FALSE);
4979   g_mutex_lock (timer->mutex);
4980   timer->fired = TRUE;
4981   g_cond_signal (timer->cond);
4982   g_mutex_unlock (timer->mutex);
4983   return TRUE;
4984 }
4985 
4986 /**
4987  * gst_adaptive_demux_get_qos_earliest_time:
4988  *
4989  * Returns: The QOS earliest time
4990  *
4991  * Since: 1.20
4992  */
4993 GstClockTime
gst_adaptive_demux_get_qos_earliest_time(GstAdaptiveDemux * demux)4994 gst_adaptive_demux_get_qos_earliest_time (GstAdaptiveDemux * demux)
4995 {
4996   GstClockTime earliest;
4997 
4998   GST_OBJECT_LOCK (demux);
4999   earliest = demux->priv->qos_earliest_time;
5000   GST_OBJECT_UNLOCK (demux);
5001 
5002   return earliest;
5003 }
5004