• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 
22 /**
23  * SECTION:gstadaptivedemux
24  * @short_description: Base class for adaptive demuxers
25  *
26  * What is an adaptive demuxer?
27  * Adaptive demuxers are special demuxers in the sense that they don't
28  * actually demux data received from upstream but download the data
29  * themselves.
30  *
31  * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and
32  * a set of fragments. The manifest describes the available media and
33  * the sequence of fragments to use. Each fragment contains a small
34  * part of the media (typically only a few seconds). It is possible for
35  * the manifest to have the same media available in different configurations
36  * (bitrates for example) so that the client can select the one that
37  * best suits its scenario (network fluctuation, hardware requirements...).
38  * It is possible to switch from one representation of the media to another
39  * during playback. That's why it is called 'adaptive', because it can be
40  * adapted to the client's needs.
41  *
42  * Architectural overview:
43  * The manifest is received by the demuxer in its sink pad and, upon receiving
44  * EOS, it parses the manifest and exposes the streams available in it. For
45  * each stream a source element will be created and will download the list
46  * of fragments one by one. Once a fragment is finished downloading, the next
47  * URI is set to the source element and it starts fetching it and pushing
48  * through the stream's pad. This implies that each stream is independent from
49  * each other as it runs on a separate thread.
50  *
51  * After downloading each fragment, the download rate of it is calculated and
52  * the demuxer has a chance to switch to a different bitrate if needed. The
53  * switch can be done by simply pushing a new caps before the next fragment
54  * when codecs are the same, or by exposing a new pad group if it needs
55  * a codec change.
56  *
57  * Extra features:
58  * - Not linked streams: Streams that are not-linked have their download threads
59  *                       interrupted to save network bandwidth. When they are
60  *                       relinked a reconfigure event is received and the
61  *                       stream is restarted.
62  *
63  * Subclasses:
64  * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
65  * about the intrinsics of the subclass formats, so the subclasses are
66  * responsible for maintaining the manifest data structures and stream
67  * information.
68  */
69 
70 /*
71 MT safety.
72 The following rules were observed while implementing MT safety in adaptive demux:
73 1. If a variable is accessed from multiple threads and at least one thread
74 writes to it, then all the accesses needs to be done from inside a critical section.
75 2. If thread A wants to join thread B then at the moment it calls gst_task_join
76 it must not hold any mutexes that thread B might take.
77 
78 Adaptive demux API can be called from several threads. More, adaptive demux
79 starts some threads to monitor the download of fragments. In order to protect
80 accesses to shared variables (demux and streams) all the API functions that
81 can be run in different threads will need to get a mutex (manifest_lock)
82 when they start and release it when they end. Because some of those functions
83 can indirectly call other API functions (eg they can generate events or messages
84 that are processed in the same thread) the manifest_lock must be recursive.
85 
86 The manifest_lock will serialize the public API making access to shared
87 variables safe. But some of these functions will try at some moment to join
88 threads created by adaptive demux, or to change the state of src elements
89 (which will block trying to join the src element streaming thread). Because
90 of rule 2, those functions will need to release the manifest_lock during the
91 call of gst_task_join. During this time they can be interrupted by other API calls.
92 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
93 is called and this will join all threads. In order to prevent interruptions
94 during such period, all the API functions will also use a second lock: api_lock.
95 This will be taken at the beginning of the function and released at the end,
96 but this time this lock will not be temporarily released during join.
97 This lock will be used only by API calls (not by gst_adaptive_demux_stream_download_loop
98 or gst_adaptive_demux_updates_loop or _src_chain or _src_event) so it is safe
99 to hold it while joining the threads or changing the src element state. The
100 api_lock will serialise all external requests to adaptive demux. In order to
101 avoid deadlocks, if a function needs to acquire both manifest and api locks,
102 the api_lock will be taken first and the manifest_lock second.
103 
104 By using the api_lock a thread is protected against other API calls. But when
105 temporarily dropping the manifest_lock, it will be vulnerable to changes from
106 threads that use only the manifest_lock and not the api_lock. These threads run
107 one of the following functions: gst_adaptive_demux_stream_download_loop,
108 gst_adaptive_demux_updates_loop, _src_chain, _src_event. In order to guarantee
109 that all operations during an API call are not impacted by other writes, the
110 above mentioned functions must check a cancelled flag every time they reacquire
111 the manifest_lock. If the flag is set, they must exit immediately, without
112 performing any changes on the shared data. In this way, an API call (eg seek
113 request) can set the cancel flag before releasing the manifest_lock and be sure
114 that the demux object and its streams are not changed by anybody else.
115 */
116 
117 #ifdef HAVE_CONFIG_H
118 #include "config.h"
119 #endif
120 
121 #include "gstadaptivedemux.h"
122 #include "gst/gst-i18n-plugin.h"
123 #include <gst/base/gstadapter.h>
124 
125 GST_DEBUG_CATEGORY (adaptivedemux_debug);
126 #define GST_CAT_DEFAULT adaptivedemux_debug
127 
128 #define MAX_DOWNLOAD_ERROR_COUNT 3
129 #define DEFAULT_FAILED_COUNT 3
130 #define DEFAULT_CONNECTION_SPEED 0
131 #define DEFAULT_BITRATE_LIMIT 0.8f
132 #define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024    /* For safety. Large enough to hold a segment. */
133 #define NUM_LOOKBACK_FRAGMENTS 3
134 
135 #ifdef OHOS_EXT_FUNC
136 // ohos.ext.func.0033
137 #define DEFAULT_RECONNECTION_TIMEOUT 3000000 // 3s
138 #endif
139 
140 #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
141 #define GST_MANIFEST_LOCK(d) G_STMT_START { \
142     GST_TRACE("Locking from thread %p", g_thread_self()); \
143     g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d)); \
144     GST_TRACE("Locked from thread %p", g_thread_self()); \
145  } G_STMT_END
146 
147 #define GST_MANIFEST_UNLOCK(d) G_STMT_START { \
148     GST_TRACE("Unlocking from thread %p", g_thread_self()); \
149     g_rec_mutex_unlock (GST_MANIFEST_GET_LOCK (d)); \
150  } G_STMT_END
151 
152 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
153 #define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
154 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
155 
156 #define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
157 #define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
158 #define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
159 
160 enum
161 {
162   PROP_0,
163   PROP_CONNECTION_SPEED,
164   PROP_BITRATE_LIMIT,
165 #ifdef OHOS_EXT_FUNC
166   // ohos.ext.func.0013
167   PROP_STATE_CHANGE,
168   PROP_EXIT_BLOCK,
169 #endif
170 #ifdef OHOS_EXT_FUNC
171   // ohos.ext.func.0033
172   PROP_RECONNECTION_TIMEOUT,
173 #endif
174 #ifdef OHOS_EXT_FUNC
175   // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
176   PROP_SLICE_POSITION,
177   PROP_DOWNLOAD_LOOP_START,
178 #endif
179   PROP_LAST
180 };
181 
182 #ifdef OHOS_EXT_FUNC
183 // ohos.ext.func.0028
184 enum {
185   SIGNAL_BITRATE_PARSE_COMPLETE,
186 // ohos.ext.func.0034
187   SIGNAL_IS_LIVE_SCENE,
188   LAST_SIGNALS
189 };
190 
191 static guint g_gst_adaptive_demux_signals[LAST_SIGNALS] = {0};
192 #endif
193 
194 /* Internal, so not using GST_FLOW_CUSTOM_SUCCESS_N */
195 #define GST_ADAPTIVE_DEMUX_FLOW_SWITCH (GST_FLOW_CUSTOM_SUCCESS_2 + 1)
196 
197 struct _GstAdaptiveDemuxPrivate
198 {
199   GstAdapter *input_adapter;    /* protected by manifest_lock */
200   gint have_manifest;           /* MT safe */
201 
202   GList *old_streams;           /* protected by manifest_lock */
203 
204   GstTask *updates_task;        /* MT safe */
205   GRecMutex updates_lock;
206   GMutex updates_timed_lock;
207   GCond updates_timed_cond;     /* protected by updates_timed_lock */
208   gboolean stop_updates_task;   /* protected by updates_timed_lock */
209 
210   /* used only from updates_task, no need to protect it */
211   gint update_failed_count;
212 
213   guint32 segment_seqnum;       /* protected by manifest_lock */
214 
215   /* main lock used to protect adaptive demux and all its streams.
216    * It serializes the adaptive demux public API.
217    */
218   GRecMutex manifest_lock;
219 
220   /* condition to wait for manifest updates on a live stream.
221    * In order to signal the manifest_cond, the caller needs to hold both
222    * manifest_lock and manifest_update_lock (taken in this order)
223    */
224   GCond manifest_cond;
225   GMutex manifest_update_lock;
226 
227   /* Lock and condition for prerolling streams before exposing */
228   GMutex preroll_lock;
229   GCond preroll_cond;
230   gint preroll_pending;
231 
232   GMutex api_lock;
233 
234   /* Protects demux and stream segment information
235    * Needed because seeks can update segment information
236    * without needing to stop tasks when they just want to
237    * update the segment boundaries */
238   GMutex segment_lock;
239 
240   GstClockTime qos_earliest_time;
241 #ifdef OHOS_EXT_FUNC
242 // ohos.ext.func.0036
243   gboolean set_auto_bitrate_first;
244 #endif
245 };
246 
247 typedef struct _GstAdaptiveDemuxTimer
248 {
249   gint ref_count;
250   GCond *cond;
251   GMutex *mutex;
252   GstClockID clock_id;
253   gboolean fired;
254 } GstAdaptiveDemuxTimer;
255 
256 static GstBinClass *parent_class = NULL;
257 static gint private_offset = 0;
258 
259 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
260 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
261     GstAdaptiveDemuxClass * klass);
262 static void gst_adaptive_demux_finalize (GObject * object);
263 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
264     element, GstStateChange transition);
265 
266 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
267 
268 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
269     GstEvent * event);
270 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
271     GstObject * parent, GstBuffer * buffer);
272 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
273     GstQuery * query);
274 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
275     GstEvent * event);
276 
277 static gboolean
278 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
279 
280 static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
281 static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
282     stream);
283 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
284 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
285     gboolean first_and_live);
286 static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
287 static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
288 static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
289     GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
290     GstClockTime ts, GstClockTime * final_ts);
291 static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
292     demux, GstAdaptiveDemuxStream * stream);
293 static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
294     demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
295 static GstFlowReturn
296 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
297     GstAdaptiveDemuxStream * stream);
298 static gint64
299 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux * demux,
300     GstAdaptiveDemuxStream * stream);
301 static GstFlowReturn gst_adaptive_demux_update_manifest (GstAdaptiveDemux *
302     demux);
303 static GstFlowReturn
304 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
305 static gboolean gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux);
306 static void gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux);
307 
308 static void gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream);
309 static GstFlowReturn
310 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
311     GstEvent * event);
312 
313 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
314     demux);
315 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
316     demux);
317 
318 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
319     gboolean start_preroll_streams);
320 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
321     gboolean stop_updates);
322 static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
323     demux);
324 static void
325 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
326     stream, GstFlowReturn ret, GError * err);
327 static GstFlowReturn
328 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
329     GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
330 static GstFlowReturn
331 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
332     GstAdaptiveDemuxStream * stream);
333 static GstFlowReturn
334 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
335     GstAdaptiveDemuxStream * stream, GstClockTime duration);
336 static gboolean
337 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
338     GstClockTime end_time);
339 static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
340     GstClockTime time, GstClockID id, gpointer user_data);
341 static gboolean
342 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
343     * demux);
344 
345 #ifdef OHOS_EXT_FUNC
346 // ohos.ext.func.0013
347 static void set_property_to_element (GstObject *elem, guint property_id, const void *property_value);
348 static void set_property_to_src_element (const GList *stream_list, guint property_id, const void *value);
349 static void set_property_to_src_and_download (GstAdaptiveDemux *demux, guint property_id, const void *property_value);
350 #endif
351 
352 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
353  * method to get to the padtemplates */
354 GType
gst_adaptive_demux_get_type(void)355 gst_adaptive_demux_get_type (void)
356 {
357   static gsize type = 0;
358 
359   if (g_once_init_enter (&type)) {
360     GType _type;
361     static const GTypeInfo info = {
362       sizeof (GstAdaptiveDemuxClass),
363       NULL,
364       NULL,
365       (GClassInitFunc) gst_adaptive_demux_class_init,
366       NULL,
367       NULL,
368       sizeof (GstAdaptiveDemux),
369       0,
370       (GInstanceInitFunc) gst_adaptive_demux_init,
371     };
372 
373     _type = g_type_register_static (GST_TYPE_BIN,
374         "GstAdaptiveDemux", &info, G_TYPE_FLAG_ABSTRACT);
375 
376     private_offset =
377         g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
378 
379     g_once_init_leave (&type, _type);
380   }
381   return type;
382 }
383 
384 static inline GstAdaptiveDemuxPrivate *
gst_adaptive_demux_get_instance_private(GstAdaptiveDemux * self)385 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
386 {
387   return (G_STRUCT_MEMBER_P (self, private_offset));
388 }
389 
390 #ifdef OHOS_OPT_COMPAT
391 // ohos.ext.func.0029
392 static void
gst_adaptive_demux_set_exit_block(GstAdaptiveDemux * demux,const GValue * value)393 gst_adaptive_demux_set_exit_block (GstAdaptiveDemux *demux, const GValue *value)
394 {
395   gint exit_block = g_value_get_int (value);
396   GST_INFO ("adaptive exit_block :%d", exit_block);
397   if (demux->priv == NULL) {
398     GST_WARNING_OBJECT (demux, "adaptive exit_block :%d, demux->priv is null", exit_block);
399     return;
400   }
401   if (exit_block != 1) {
402     return;
403   }
404   GList *iter = NULL;
405   /* stop update task */
406   if (demux->priv->updates_task != NULL) {
407     gst_task_stop (demux->priv->updates_task);
408     demux->priv->updates_task = NULL;
409   }
410   g_mutex_lock (&demux->priv->updates_timed_lock);
411   demux->priv->stop_updates_task = TRUE;
412   g_cond_signal (&demux->priv->updates_timed_cond);
413   g_mutex_unlock (&demux->priv->updates_timed_lock);
414   if (demux->downloader != NULL) {
415     gst_uri_downloader_cancel (demux->downloader);
416   }
417   /* stop download tasks */
418   for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
419     GstAdaptiveDemuxStream *stream = iter->data;
420     if (stream == NULL) {
421       GST_WARNING_OBJECT (demux, "stream is null");
422       continue;
423     }
424 
425     if (stream->download_task != NULL) {
426       gst_task_stop (stream->download_task);
427     }
428     g_mutex_lock (&stream->fragment_download_lock);
429     stream->cancelled = TRUE;
430     g_cond_signal (&stream->fragment_download_cond);
431     g_mutex_unlock (&stream->fragment_download_lock);
432   }
433   /* set exit_block to src element */
434   set_property_to_src_and_download (demux, PROP_EXIT_BLOCK, (void *) &exit_block);
435 }
436 #endif
437 
438 static void
gst_adaptive_demux_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)439 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
440     const GValue * value, GParamSpec * pspec)
441 {
442   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
443 
444 #ifdef OHOS_OPT_COMPAT
445   // ohos.opt.compat.0029
446   if (prop_id != PROP_EXIT_BLOCK) {
447 #endif
448     GST_API_LOCK (demux);
449     GST_MANIFEST_LOCK (demux);
450 #ifdef OHOS_OPT_COMPAT
451   // ohos.opt.compat.0029
452   }
453 #endif
454 
455   switch (prop_id) {
456     case PROP_CONNECTION_SPEED:
457 #ifdef OHOS_EXT_FUNC
458       // ohos.ext.func.0028
459       demux->connection_speed = g_value_get_uint (value);
460 #else
461       demux->connection_speed = g_value_get_uint (value) * 1000;
462 #endif
463       GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
464           demux->connection_speed);
465       break;
466     case PROP_BITRATE_LIMIT:
467       demux->bitrate_limit = g_value_get_float (value);
468       break;
469 #ifdef OHOS_EXT_FUNC
470     // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
471     case PROP_SLICE_POSITION:
472       demux->slice_position = g_value_get_uint64 (value);
473       GST_WARNING_OBJECT (demux, "slice_position set to %" G_GUINT64_FORMAT, demux->slice_position);
474       break;
475 #endif
476 #ifdef OHOS_EXT_FUNC
477     // ohos.ext.func.0013
478     case PROP_STATE_CHANGE: {
479       gint state = g_value_get_int (value);
480       set_property_to_src_and_download(demux, prop_id, (void *)&state);
481       break;
482     }
483     case PROP_EXIT_BLOCK: {
484       // ohos.ext.func.0029
485       gst_adaptive_demux_set_exit_block(demux, value);
486       break;
487     }
488 #endif
489 #ifdef OHOS_EXT_FUNC
490     // ohos.ext.func.0033
491     case PROP_RECONNECTION_TIMEOUT: {
492       guint timeout = g_value_get_uint (value);
493       set_property_to_src_and_download(demux, prop_id, (void *)&timeout);
494       break;
495     }
496 #endif
497     default:
498       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
499       break;
500   }
501 
502 #ifdef OHOS_OPT_COMPAT
503   // ohos.opt.compat.0029
504   if (prop_id != PROP_EXIT_BLOCK) {
505 #endif
506     GST_MANIFEST_UNLOCK (demux);
507     GST_API_UNLOCK (demux);
508 #ifdef OHOS_OPT_COMPAT
509   // ohos.opt.compat.0029
510   }
511 #endif
512 }
513 
514 static void
gst_adaptive_demux_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)515 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
516     GValue * value, GParamSpec * pspec)
517 {
518   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
519 
520   GST_MANIFEST_LOCK (demux);
521 
522 #ifdef OHOS_EXT_FUNC
523     // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
524   gboolean start = FALSE;
525 #endif
526   switch (prop_id) {
527     case PROP_CONNECTION_SPEED:
528       g_value_set_uint (value, demux->connection_speed / 1000);
529       break;
530     case PROP_BITRATE_LIMIT:
531       g_value_set_float (value, demux->bitrate_limit);
532       break;
533 #ifdef OHOS_EXT_FUNC
534     // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
535     case PROP_DOWNLOAD_LOOP_START:
536       for (GList *iter = demux->streams; iter; iter = g_list_next (iter)) {
537         GstAdaptiveDemuxStream *stream = iter->data;
538         if (GST_TASK_STATE (stream->download_task) == GST_TASK_STARTED) {
539           start = TRUE;
540           break;
541         }
542       }
543       GST_DEBUG_OBJECT (demux, "download loop is %d", start);
544       g_value_set_boolean(value, start);
545       break;
546 #endif
547     default:
548       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
549       break;
550   }
551 
552   GST_MANIFEST_UNLOCK (demux);
553 }
554 
555 static void
gst_adaptive_demux_class_init(GstAdaptiveDemuxClass * klass)556 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
557 {
558   GObjectClass *gobject_class;
559   GstElementClass *gstelement_class;
560   GstBinClass *gstbin_class;
561 
562   gobject_class = G_OBJECT_CLASS (klass);
563   gstelement_class = GST_ELEMENT_CLASS (klass);
564   gstbin_class = GST_BIN_CLASS (klass);
565 
566   GST_DEBUG_CATEGORY_INIT (adaptivedemux_debug, "adaptivedemux", 0,
567       "Base Adaptive Demux");
568 
569   parent_class = g_type_class_peek_parent (klass);
570 
571   if (private_offset != 0)
572     g_type_class_adjust_private_offset (klass, &private_offset);
573 
574   gobject_class->set_property = gst_adaptive_demux_set_property;
575   gobject_class->get_property = gst_adaptive_demux_get_property;
576   gobject_class->finalize = gst_adaptive_demux_finalize;
577 
578 #ifdef OHOS_EXT_FUNC
579   // ohos.ext.func.0028
580   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
581       g_param_spec_uint ("connection-speed", "Connection Speed",
582           "Network connection speed in kbps (0 = calculate from downloaded"
583           " fragments)", 0, G_MAXUINT, DEFAULT_CONNECTION_SPEED,
584           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
585 #else
586   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
587       g_param_spec_uint ("connection-speed", "Connection Speed",
588           "Network connection speed in kbps (0 = calculate from downloaded"
589           " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
590           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
591 #endif
592 
593 #ifdef OHOS_EXT_FUNC
594   // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
595   g_object_class_install_property (gobject_class, PROP_SLICE_POSITION,
596       g_param_spec_uint64 ("slice-position", "slice-position",
597           "slice-position", 0, G_MAXUINT64, 0,
598           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
599 
600   g_object_class_install_property (gobject_class, PROP_DOWNLOAD_LOOP_START,
601       g_param_spec_boolean ("download-loop-start", "download-loop-start",
602           "download-loop-start", FALSE,
603           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
604 #endif
605 
606   /* FIXME 2.0: rename this property to bandwidth-usage or any better name */
607   g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
608       g_param_spec_float ("bitrate-limit",
609           "Bitrate limit in %",
610           "Limit of the available bitrate to use when switching to alternates.",
611           0, 1, DEFAULT_BITRATE_LIMIT,
612           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
613 
614 #ifdef OHOS_EXT_FUNC
615   // ohos.ext.func.0013
616   g_object_class_install_property (gobject_class, PROP_STATE_CHANGE,
617       g_param_spec_int ("state-change", "state-change from adaptive-demux",
618           "state-change from adaptive-demux", 0, (gint) (G_MAXINT32), 0,
619           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
620 
621   g_object_class_install_property (gobject_class, PROP_EXIT_BLOCK,
622       g_param_spec_int ("exit-block", "EXIT BLOCK",
623           "souphttpsrc exit block", 0, (gint) (G_MAXINT32), 0,
624           G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
625 #endif
626 
627 #ifdef OHOS_EXT_FUNC
628   // ohos.ext.func.0033
629   g_object_class_install_property (gobject_class, PROP_RECONNECTION_TIMEOUT,
630       g_param_spec_uint ("reconnection-timeout", "Reconnection-timeout",
631           "Value in seconds to timeout reconnection", 0, 3600000000, DEFAULT_RECONNECTION_TIMEOUT,
632           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
633 #endif
634 
635 #ifdef OHOS_EXT_FUNC
636   // ohos.ext.func.0028
637   g_gst_adaptive_demux_signals[SIGNAL_BITRATE_PARSE_COMPLETE] =
638         g_signal_new("bitrate-parse-complete",
639             G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST,
640             0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_POINTER, G_TYPE_UINT);
641 #endif
642 
643 #ifdef OHOS_EXT_FUNC
644   // ohos.ext.func.0034
645   g_gst_adaptive_demux_signals[SIGNAL_IS_LIVE_SCENE] =
646         g_signal_new("is-live-scene", G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST,
647             0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_BOOLEAN);
648 #endif
649 
650   gstelement_class->change_state = gst_adaptive_demux_change_state;
651 
652   gstbin_class->handle_message = gst_adaptive_demux_handle_message;
653 
654   klass->data_received = gst_adaptive_demux_stream_data_received_default;
655   klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
656   klass->update_manifest = gst_adaptive_demux_update_manifest_default;
657   klass->requires_periodical_playlist_update =
658       gst_adaptive_demux_requires_periodical_playlist_update_default;
659 
660 }
661 
662 static void
gst_adaptive_demux_init(GstAdaptiveDemux * demux,GstAdaptiveDemuxClass * klass)663 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
664     GstAdaptiveDemuxClass * klass)
665 {
666   GstPadTemplate *pad_template;
667   GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
668   GObjectClass *gobject_class;
669 
670   GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
671 
672   demux->priv = gst_adaptive_demux_get_instance_private (demux);
673   demux->priv->input_adapter = gst_adapter_new ();
674   demux->downloader = gst_uri_downloader_new ();
675   gst_uri_downloader_set_parent (demux->downloader, GST_ELEMENT_CAST (demux));
676   demux->stream_struct_size = sizeof (GstAdaptiveDemuxStream);
677   demux->priv->segment_seqnum = gst_util_seqnum_next ();
678   demux->have_group_id = FALSE;
679   demux->group_id = G_MAXUINT;
680 
681   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
682 
683   gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
684       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
685 
686   demux->realtime_clock = gst_system_clock_obtain ();
687   g_assert (demux->realtime_clock != NULL);
688   gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
689   if (g_object_class_find_property (gobject_class, "clock-type")) {
690     g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
691   } else {
692     GST_WARNING_OBJECT (demux,
693         "System clock does not have clock-type property");
694   }
695   if (clock_type == GST_CLOCK_TYPE_REALTIME) {
696     demux->clock_offset = 0;
697   } else {
698     GDateTime *utc_now;
699     GstClockTime rtc_now;
700 
701     utc_now = g_date_time_new_now_utc ();
702     rtc_now = gst_clock_get_time (demux->realtime_clock);
703     demux->clock_offset =
704         g_date_time_to_unix (utc_now) * G_TIME_SPAN_SECOND +
705         g_date_time_get_microsecond (utc_now) - GST_TIME_AS_USECONDS (rtc_now);
706     g_date_time_unref (utc_now);
707   }
708   g_rec_mutex_init (&demux->priv->updates_lock);
709   demux->priv->updates_task =
710       gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
711       demux, NULL);
712   gst_task_set_lock (demux->priv->updates_task, &demux->priv->updates_lock);
713 
714   g_mutex_init (&demux->priv->updates_timed_lock);
715   g_cond_init (&demux->priv->updates_timed_cond);
716 
717   g_cond_init (&demux->priv->manifest_cond);
718   g_mutex_init (&demux->priv->manifest_update_lock);
719 
720   g_rec_mutex_init (&demux->priv->manifest_lock);
721   g_mutex_init (&demux->priv->api_lock);
722   g_mutex_init (&demux->priv->segment_lock);
723 
724   g_cond_init (&demux->priv->preroll_cond);
725   g_mutex_init (&demux->priv->preroll_lock);
726 
727   pad_template =
728       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
729   g_return_if_fail (pad_template != NULL);
730 
731   demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
732   gst_pad_set_event_function (demux->sinkpad,
733       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
734   gst_pad_set_chain_function (demux->sinkpad,
735       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
736 
737   /* Properties */
738   demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
739   demux->connection_speed = DEFAULT_CONNECTION_SPEED;
740 #ifdef OHOS_EXT_FUNC
741   // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
742   demux->slice_position = GST_CLOCK_TIME_NONE;
743 #endif
744 #ifdef OHOS_EXT_FUNC
745 // ohos.ext.func.0036
746   demux->priv->set_auto_bitrate_first = FALSE;
747 #endif
748 
749   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
750 }
751 
752 static void
gst_adaptive_demux_finalize(GObject * object)753 gst_adaptive_demux_finalize (GObject * object)
754 {
755   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
756   GstAdaptiveDemuxPrivate *priv = demux->priv;
757 
758   GST_DEBUG_OBJECT (object, "finalize");
759 
760   g_object_unref (priv->input_adapter);
761   g_object_unref (demux->downloader);
762 
763   g_mutex_clear (&priv->updates_timed_lock);
764   g_cond_clear (&priv->updates_timed_cond);
765   g_mutex_clear (&demux->priv->manifest_update_lock);
766   g_cond_clear (&demux->priv->manifest_cond);
767   g_object_unref (priv->updates_task);
768   g_rec_mutex_clear (&priv->updates_lock);
769   g_rec_mutex_clear (&demux->priv->manifest_lock);
770   g_mutex_clear (&demux->priv->api_lock);
771   g_mutex_clear (&demux->priv->segment_lock);
772   if (demux->realtime_clock) {
773     gst_object_unref (demux->realtime_clock);
774     demux->realtime_clock = NULL;
775   }
776 
777   g_cond_clear (&demux->priv->preroll_cond);
778   g_mutex_clear (&demux->priv->preroll_lock);
779 
780   G_OBJECT_CLASS (parent_class)->finalize (object);
781 }
782 
783 #ifdef OHOS_EXT_FUNC
784 // ohos.ext.func.0013
785 static void
set_property_to_element(GstObject * elem,guint property_id,const void * property_value)786 set_property_to_element (GstObject *elem, guint property_id, const void *property_value)
787 {
788   if ((elem == NULL) || (property_value == NULL)) {
789     return;
790   }
791 
792   if (property_id == PROP_STATE_CHANGE) {
793     const gint *state_change = (const gint *) property_value;
794     g_object_set (elem, "state-change", *state_change, NULL);
795   } else if (property_id == PROP_RECONNECTION_TIMEOUT) { // ohos.ext.func.0033
796     const guint *timeout = (const guint *) property_value;
797     g_object_set (elem, "reconnection-timeout", *timeout, NULL);
798   } else if (property_id == PROP_EXIT_BLOCK) {
799     const gint *exit_block = (const gint *) property_value;
800     g_object_set (elem, "exit-block", *exit_block, NULL);
801   }
802 }
803 
804 static void
set_property_to_src_element(const GList * stream_list,guint property_id,const void * property_value)805 set_property_to_src_element (const GList *stream_list, guint property_id, const void *property_value)
806 {
807   GstAdaptiveDemuxStream *stream = NULL;
808   GstIterator *iter = NULL;
809 
810   if (property_value == NULL) {
811     GST_WARNING ("value is NULL, set_property_to_src_element failed!");
812     return;
813   }
814 
815   for (; stream_list != NULL; stream_list = g_list_next (stream_list)) {
816     GValue data = { 0, };
817 
818     stream = stream_list->data;
819     if ((stream == NULL) || (stream->src == NULL)) {
820       continue;
821     }
822 
823     iter = gst_bin_iterate_sources ((GstBin *) stream->src);
824     if (iter == NULL) {
825       continue;
826     }
827     if (gst_iterator_next (iter, &data) == GST_ITERATOR_OK) {
828       GstElement *uri_src = g_value_get_object (&data);
829       if (uri_src != NULL) {
830         set_property_to_element ((GstObject *) uri_src, property_id, property_value);
831       }
832     }
833     if (G_IS_VALUE (&data)) {
834       g_value_unset (&data);
835     }
836     gst_iterator_free (iter);
837     iter = NULL;
838   }
839 }
840 
841 static void
set_property_to_src_and_download(GstAdaptiveDemux * demux,guint property_id,const void * property_value)842 set_property_to_src_and_download (GstAdaptiveDemux *demux, guint property_id, const void *property_value)
843 {
844   if ((demux == NULL) || (property_value == NULL)) {
845     GST_WARNING ("input parameter is error");
846     return;
847   }
848 
849   set_property_to_src_element (demux->streams, property_id, property_value);
850   //set_property_to_element ((GstObject *) demux->downloader, property_id, property_value);
851 }
852 #endif
853 
854 static GstStateChangeReturn
gst_adaptive_demux_change_state(GstElement * element,GstStateChange transition)855 gst_adaptive_demux_change_state (GstElement * element,
856     GstStateChange transition)
857 {
858   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
859   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
860 
861   switch (transition) {
862     case GST_STATE_CHANGE_PAUSED_TO_READY:
863       if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
864         GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
865       gst_uri_downloader_cancel (demux->downloader);
866 
867       GST_API_LOCK (demux);
868       GST_MANIFEST_LOCK (demux);
869       gst_adaptive_demux_reset (demux);
870       GST_MANIFEST_UNLOCK (demux);
871       GST_API_UNLOCK (demux);
872       break;
873     case GST_STATE_CHANGE_READY_TO_PAUSED:
874       GST_API_LOCK (demux);
875       GST_MANIFEST_LOCK (demux);
876       gst_adaptive_demux_reset (demux);
877       /* Clear "cancelled" flag in uridownloader since subclass might want to
878        * use uridownloader to fetch another manifest */
879       gst_uri_downloader_reset (demux->downloader);
880       if (g_atomic_int_get (&demux->priv->have_manifest))
881         gst_adaptive_demux_start_manifest_update_task (demux);
882       GST_MANIFEST_UNLOCK (demux);
883       GST_API_UNLOCK (demux);
884       if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
885         GST_DEBUG_OBJECT (demux, "demuxer has started running");
886       break;
887     default:
888       break;
889   }
890 
891   /* this must be run without MANIFEST_LOCK taken.
892    * For PLAYING to PLAYING state changes, it will want to take a lock in
893    * src element and that lock is held while the streaming thread is running.
894    * The streaming thread will take the MANIFEST_LOCK, leading to a deadlock.
895    */
896   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
897 
898   return result;
899 }
900 
901 #ifdef OHOS_EXT_FUNC
902 // ohos.ext.func.0028
903 static void
gst_adaptive_demux_update_bitrate(GstAdaptiveDemux * demux)904 gst_adaptive_demux_update_bitrate (GstAdaptiveDemux *demux)
905 {
906   GstAdaptiveDemuxBitrateInfo stream_bitrate_info;
907   stream_bitrate_info.bitrate_list = NULL;
908   stream_bitrate_info.bitrate_num = 0;
909   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
910   if ((demux_class->get_bitrate_info) == NULL) {
911     return;
912   }
913 
914   gboolean ret = demux_class->get_bitrate_info(demux, &stream_bitrate_info);
915   if (!ret) {
916     return;
917   }
918 
919   GST_INFO_OBJECT (demux, "Send to user, bitrate num = %u", stream_bitrate_info.bitrate_num);
920   g_signal_emit (GST_ELEMENT(demux), g_gst_adaptive_demux_signals[SIGNAL_BITRATE_PARSE_COMPLETE],
921     0, stream_bitrate_info.bitrate_list, stream_bitrate_info.bitrate_num);
922 
923   if (stream_bitrate_info.bitrate_list != NULL) {
924     g_free (stream_bitrate_info.bitrate_list);
925     stream_bitrate_info.bitrate_list = NULL;
926     stream_bitrate_info.bitrate_num = 0;
927   }
928 }
929 #endif
930 
931 static gboolean
gst_adaptive_demux_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)932 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
933     GstEvent * event)
934 {
935   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
936   gboolean ret;
937 
938   switch (event->type) {
939     case GST_EVENT_FLUSH_STOP:{
940       GST_API_LOCK (demux);
941       GST_MANIFEST_LOCK (demux);
942 
943       gst_adaptive_demux_reset (demux);
944 
945       ret = gst_pad_event_default (pad, parent, event);
946 
947       GST_MANIFEST_UNLOCK (demux);
948       GST_API_UNLOCK (demux);
949 
950       return ret;
951     }
952     case GST_EVENT_EOS:{
953       GstAdaptiveDemuxClass *demux_class;
954       GstQuery *query;
955       gboolean query_res;
956       gboolean ret = TRUE;
957       gsize available;
958       GstBuffer *manifest_buffer;
959 
960       GST_API_LOCK (demux);
961       GST_MANIFEST_LOCK (demux);
962 
963       demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
964 
965       available = gst_adapter_available (demux->priv->input_adapter);
966 
967       if (available == 0) {
968         GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
969         ret = gst_pad_event_default (pad, parent, event);
970 
971         GST_MANIFEST_UNLOCK (demux);
972         GST_API_UNLOCK (demux);
973 
974         return ret;
975       }
976 
977       GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
978 
979       /* Need to get the URI to use it as a base to generate the fragment's
980        * uris */
981       query = gst_query_new_uri ();
982       query_res = gst_pad_peer_query (pad, query);
983       if (query_res) {
984         gchar *uri, *redirect_uri;
985         gboolean permanent;
986 
987         gst_query_parse_uri (query, &uri);
988         gst_query_parse_uri_redirection (query, &redirect_uri);
989         gst_query_parse_uri_redirection_permanent (query, &permanent);
990 
991         if (permanent && redirect_uri) {
992           demux->manifest_uri = redirect_uri;
993           demux->manifest_base_uri = NULL;
994           g_free (uri);
995         } else {
996           demux->manifest_uri = uri;
997           demux->manifest_base_uri = redirect_uri;
998         }
999 
1000         GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
1001             demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
1002       } else {
1003         GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
1004       }
1005       gst_query_unref (query);
1006 
1007       /* Let the subclass parse the manifest */
1008       manifest_buffer =
1009           gst_adapter_take_buffer (demux->priv->input_adapter, available);
1010       if (!demux_class->process_manifest (demux, manifest_buffer)) {
1011         /* In most cases, this will happen if we set a wrong url in the
1012          * source element and we have received the 404 HTML response instead of
1013          * the manifest */
1014         GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
1015             (NULL));
1016         ret = FALSE;
1017       } else {
1018         g_atomic_int_set (&demux->priv->have_manifest, TRUE);
1019       }
1020 
1021 #ifdef OHOS_EXT_FUNC
1022       // ohos.ext.func.0028
1023       if (ret) {
1024         gst_adaptive_demux_update_bitrate(demux);
1025       }
1026 #endif
1027       gst_buffer_unref (manifest_buffer);
1028 
1029       gst_element_post_message (GST_ELEMENT_CAST (demux),
1030           gst_message_new_element (GST_OBJECT_CAST (demux),
1031               gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
1032                   "manifest-uri", G_TYPE_STRING,
1033                   demux->manifest_uri, "uri", G_TYPE_STRING,
1034                   demux->manifest_uri,
1035                   "manifest-download-start", GST_TYPE_CLOCK_TIME,
1036                   GST_CLOCK_TIME_NONE,
1037                   "manifest-download-stop", GST_TYPE_CLOCK_TIME,
1038                   gst_util_get_timestamp (), NULL)));
1039 
1040       if (ret) {
1041         /* Send duration message */
1042         if (!gst_adaptive_demux_is_live (demux)) {
1043           GstClockTime duration = demux_class->get_duration (demux);
1044 
1045           if (duration != GST_CLOCK_TIME_NONE) {
1046             GST_DEBUG_OBJECT (demux,
1047                 "Sending duration message : %" GST_TIME_FORMAT,
1048                 GST_TIME_ARGS (duration));
1049             gst_element_post_message (GST_ELEMENT (demux),
1050                 gst_message_new_duration_changed (GST_OBJECT (demux)));
1051           } else {
1052             GST_DEBUG_OBJECT (demux,
1053                 "media duration unknown, can not send the duration message");
1054           }
1055         }
1056 
1057         if (demux->next_streams) {
1058           gst_adaptive_demux_prepare_streams (demux,
1059               gst_adaptive_demux_is_live (demux));
1060           gst_adaptive_demux_start_tasks (demux, TRUE);
1061           gst_adaptive_demux_start_manifest_update_task (demux);
1062         } else {
1063           /* no streams */
1064           GST_WARNING_OBJECT (demux, "No streams created from manifest");
1065           GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1066               (_("This file contains no playable streams.")),
1067               ("No known stream formats found at the Manifest"));
1068           ret = FALSE;
1069         }
1070 
1071       }
1072       GST_MANIFEST_UNLOCK (demux);
1073       GST_API_UNLOCK (demux);
1074 
1075       gst_event_unref (event);
1076       return ret;
1077     }
1078     case GST_EVENT_SEGMENT:
1079       /* Swallow newsegments, we'll push our own */
1080       gst_event_unref (event);
1081       return TRUE;
1082     default:
1083       break;
1084   }
1085 
1086   return gst_pad_event_default (pad, parent, event);
1087 }
1088 
1089 static GstFlowReturn
gst_adaptive_demux_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)1090 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1091     GstBuffer * buffer)
1092 {
1093   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1094 
1095   GST_MANIFEST_LOCK (demux);
1096 
1097   gst_adapter_push (demux->priv->input_adapter, buffer);
1098 
1099   GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1100       (gint) gst_adapter_available (demux->priv->input_adapter));
1101 
1102   GST_MANIFEST_UNLOCK (demux);
1103   return GST_FLOW_OK;
1104 }
1105 
1106 /* must be called with manifest_lock taken */
1107 static void
gst_adaptive_demux_reset(GstAdaptiveDemux * demux)1108 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1109 {
1110   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1111   GList *iter;
1112   GList *old_streams;
1113   GstEvent *eos;
1114 
1115   /* take ownership of old_streams before releasing the manifest_lock in
1116    * gst_adaptive_demux_stop_tasks
1117    */
1118   old_streams = demux->priv->old_streams;
1119   demux->priv->old_streams = NULL;
1120 
1121   gst_adaptive_demux_stop_tasks (demux, TRUE);
1122 
1123   if (klass->reset)
1124     klass->reset (demux);
1125 
1126   eos = gst_event_new_eos ();
1127   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1128     GstAdaptiveDemuxStream *stream = iter->data;
1129     if (stream->pad) {
1130       gst_pad_push_event (stream->pad, gst_event_ref (eos));
1131       gst_pad_set_active (stream->pad, FALSE);
1132 
1133       gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
1134     }
1135     gst_adaptive_demux_stream_free (stream);
1136   }
1137   gst_event_unref (eos);
1138   g_list_free (demux->streams);
1139   demux->streams = NULL;
1140   if (demux->prepared_streams) {
1141     g_list_free_full (demux->prepared_streams,
1142         (GDestroyNotify) gst_adaptive_demux_stream_free);
1143     demux->prepared_streams = NULL;
1144   }
1145   if (demux->next_streams) {
1146     g_list_free_full (demux->next_streams,
1147         (GDestroyNotify) gst_adaptive_demux_stream_free);
1148     demux->next_streams = NULL;
1149   }
1150 
1151   if (old_streams) {
1152     g_list_free_full (old_streams,
1153         (GDestroyNotify) gst_adaptive_demux_stream_free);
1154   }
1155 
1156   if (demux->priv->old_streams) {
1157     g_list_free_full (demux->priv->old_streams,
1158         (GDestroyNotify) gst_adaptive_demux_stream_free);
1159     demux->priv->old_streams = NULL;
1160   }
1161 
1162   g_free (demux->manifest_uri);
1163   g_free (demux->manifest_base_uri);
1164   demux->manifest_uri = NULL;
1165   demux->manifest_base_uri = NULL;
1166 
1167   gst_adapter_clear (demux->priv->input_adapter);
1168   g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1169 
1170   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1171 
1172   demux->have_group_id = FALSE;
1173   demux->group_id = G_MAXUINT;
1174   demux->priv->segment_seqnum = gst_util_seqnum_next ();
1175 }
1176 
1177 static void
gst_adaptive_demux_handle_message(GstBin * bin,GstMessage * msg)1178 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1179 {
1180   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1181 
1182   switch (GST_MESSAGE_TYPE (msg)) {
1183     case GST_MESSAGE_ERROR:{
1184       GList *iter;
1185       GstAdaptiveDemuxStream *stream = NULL;
1186       GError *err = NULL;
1187       gchar *debug = NULL;
1188       gchar *new_error = NULL;
1189       const GstStructure *details = NULL;
1190 
1191       GST_MANIFEST_LOCK (demux);
1192 
1193       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1194         GstAdaptiveDemuxStream *cur = iter->data;
1195         if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
1196                 GST_OBJECT_CAST (cur->src))) {
1197           stream = cur;
1198           break;
1199         }
1200       }
1201       if (stream == NULL) {
1202         for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1203           GstAdaptiveDemuxStream *cur = iter->data;
1204           if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
1205                   GST_OBJECT_CAST (cur->src))) {
1206             stream = cur;
1207             break;
1208           }
1209         }
1210         if (stream == NULL) {
1211           GST_WARNING_OBJECT (demux,
1212               "Failed to locate stream for errored element");
1213           break;
1214         }
1215       }
1216 
1217       gst_message_parse_error (msg, &err, &debug);
1218 
1219       GST_WARNING_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
1220           "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1221           err->message, debug);
1222 
1223 #ifdef OHOS_EXT_FUNC
1224       // ohos.ext.func.0033
1225       stream->errcode = err->code;
1226 #endif
1227 
1228       if (debug)
1229         new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1230       if (new_error) {
1231         g_free (err->message);
1232         err->message = new_error;
1233       }
1234 
1235       gst_message_parse_error_details (msg, &details);
1236       if (details) {
1237         gst_structure_get_uint (details, "http-status-code",
1238             &stream->last_status_code);
1239       }
1240 
1241       /* error, but ask to retry */
1242       gst_adaptive_demux_stream_fragment_download_finish (stream,
1243           GST_FLOW_CUSTOM_ERROR, err);
1244 
1245 #ifdef OHOS_EXT_FUNC
1246       // ohos.ext.func.0037 timeout need notify parent_class
1247       gboolean need_msg = FALSE;
1248       if (err->domain == GST_RESOURCE_ERROR && err->code == GST_RESOURCE_ERROR_TIME_OUT) {
1249         need_msg = TRUE;
1250       }
1251 #endif
1252 
1253       g_error_free (err);
1254       g_free (debug);
1255 
1256       GST_MANIFEST_UNLOCK (demux);
1257 
1258 #ifdef OHOS_EXT_FUNC
1259       // ohos.ext.func.0037 timeout need notify parent_class
1260       if (need_msg) {
1261         break;
1262       }
1263 #endif
1264 
1265       gst_message_unref (msg);
1266       msg = NULL;
1267     }
1268       break;
1269     default:
1270       break;
1271   }
1272 
1273   if (msg)
1274     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1275 }
1276 
1277 void
gst_adaptive_demux_set_stream_struct_size(GstAdaptiveDemux * demux,gsize struct_size)1278 gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
1279     gsize struct_size)
1280 {
1281   GST_API_LOCK (demux);
1282   GST_MANIFEST_LOCK (demux);
1283   demux->stream_struct_size = struct_size;
1284   GST_MANIFEST_UNLOCK (demux);
1285   GST_API_UNLOCK (demux);
1286 }
1287 
1288 /* must be called with manifest_lock taken */
1289 static gboolean
gst_adaptive_demux_prepare_stream(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1290 gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux,
1291     GstAdaptiveDemuxStream * stream)
1292 {
1293   GstPad *pad = stream->pad;
1294   gchar *name = gst_pad_get_name (pad);
1295   GstEvent *event;
1296   gchar *stream_id;
1297 
1298   gst_pad_set_active (pad, TRUE);
1299   stream->need_header = TRUE;
1300 
1301   stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT_CAST (demux), name);
1302 
1303   event =
1304       gst_pad_get_sticky_event (GST_ADAPTIVE_DEMUX_SINK_PAD (demux),
1305       GST_EVENT_STREAM_START, 0);
1306   if (event) {
1307     if (gst_event_parse_group_id (event, &demux->group_id))
1308       demux->have_group_id = TRUE;
1309     else
1310       demux->have_group_id = FALSE;
1311     gst_event_unref (event);
1312   } else if (!demux->have_group_id) {
1313     demux->have_group_id = TRUE;
1314     demux->group_id = gst_util_group_id_next ();
1315   }
1316   event = gst_event_new_stream_start (stream_id);
1317   if (demux->have_group_id)
1318     gst_event_set_group_id (event, demux->group_id);
1319 
1320   gst_pad_push_event (pad, event);
1321   g_free (stream_id);
1322   g_free (name);
1323 
1324   GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
1325 
1326   stream->discont = TRUE;
1327 
1328   return TRUE;
1329 }
1330 
1331 static gboolean
gst_adaptive_demux_expose_stream(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1332 gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
1333     GstAdaptiveDemuxStream * stream)
1334 {
1335   gboolean ret;
1336   GstPad *pad = stream->pad;
1337   GstCaps *caps;
1338 
1339   if (stream->pending_caps) {
1340     gst_pad_set_caps (pad, stream->pending_caps);
1341     caps = stream->pending_caps;
1342     stream->pending_caps = NULL;
1343   } else {
1344     caps = gst_pad_get_current_caps (pad);
1345   }
1346 
1347   GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT,
1348       GST_DEBUG_PAD_NAME (pad), caps);
1349   if (caps)
1350     gst_caps_unref (caps);
1351 
1352   gst_object_ref (pad);
1353 
1354   /* Don't hold the manifest lock while exposing a pad */
1355   GST_MANIFEST_UNLOCK (demux);
1356   ret = gst_element_add_pad (GST_ELEMENT_CAST (demux), pad);
1357   GST_MANIFEST_LOCK (demux);
1358 
1359   return ret;
1360 }
1361 
1362 /* must be called with manifest_lock taken */
1363 static GstClockTime
gst_adaptive_demux_stream_get_presentation_offset(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1364 gst_adaptive_demux_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1365     GstAdaptiveDemuxStream * stream)
1366 {
1367   GstAdaptiveDemuxClass *klass;
1368 
1369   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1370 
1371   if (klass->get_presentation_offset == NULL)
1372     return 0;
1373 
1374   return klass->get_presentation_offset (demux, stream);
1375 }
1376 
1377 /* must be called with manifest_lock taken */
1378 static GstClockTime
gst_adaptive_demux_get_period_start_time(GstAdaptiveDemux * demux)1379 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1380 {
1381   GstAdaptiveDemuxClass *klass;
1382 
1383   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1384 
1385   if (klass->get_period_start_time == NULL)
1386     return 0;
1387 
1388   return klass->get_period_start_time (demux);
1389 }
1390 
1391 /* must be called with manifest_lock taken */
1392 static gboolean
gst_adaptive_demux_prepare_streams(GstAdaptiveDemux * demux,gboolean first_and_live)1393 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1394     gboolean first_and_live)
1395 {
1396   GList *iter;
1397   GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
1398 
1399   g_return_val_if_fail (demux->next_streams != NULL, FALSE);
1400   if (demux->prepared_streams != NULL) {
1401     /* Old streams that were never exposed, due to a seek or so */
1402     GST_FIXME_OBJECT (demux,
1403         "Preparing new streams without cleaning up old ones!");
1404     return FALSE;
1405   }
1406 
1407   demux->prepared_streams = demux->next_streams;
1408   demux->next_streams = NULL;
1409 
1410   if (!gst_adaptive_demux_is_running (demux)) {
1411     GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1412     return TRUE;
1413   }
1414 
1415   for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1416     GstAdaptiveDemuxStream *stream = iter->data;
1417 
1418     stream->do_block = TRUE;
1419 
1420     if (!gst_adaptive_demux_prepare_stream (demux,
1421             GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1422       /* TODO act on error */
1423       GST_FIXME_OBJECT (stream->pad,
1424           "Do something on failure to expose stream");
1425     }
1426 
1427     if (first_and_live) {
1428       /* TODO we only need the first timestamp, maybe create a simple function to
1429        * get the current PTS of a fragment ? */
1430       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1431       gst_adaptive_demux_stream_update_fragment_info (demux, stream);
1432 
1433       if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
1434         min_pts = MIN (min_pts, stream->fragment.timestamp);
1435       } else {
1436         min_pts = stream->fragment.timestamp;
1437       }
1438     }
1439   }
1440 
1441   period_start = gst_adaptive_demux_get_period_start_time (demux);
1442 
1443   /* For live streams, the subclass is supposed to seek to the current
1444    * fragment and then tell us its timestamp in stream->fragment.timestamp.
1445    * We now also have to seek our demuxer segment to reflect this.
1446    *
1447    * FIXME: This needs some refactoring at some point.
1448    */
1449   if (first_and_live) {
1450     gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1451         GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_pts + period_start,
1452         GST_SEEK_TYPE_NONE, -1, NULL);
1453   }
1454 
1455   for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1456     GstAdaptiveDemuxStream *stream = iter->data;
1457     GstClockTime offset;
1458 
1459     offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1460     stream->segment = demux->segment;
1461 
1462     /* The demuxer segment is just built from seek events, but for each stream
1463      * we have to adjust segments according to the current period and the
1464      * stream specific presentation time offset.
1465      *
1466      * For each period, buffer timestamps start again from 0. Additionally the
1467      * buffer timestamps are shifted by the stream specific presentation time
1468      * offset, so the first buffer timestamp of a period is 0 + presentation
1469      * time offset. If the stream contains timestamps itself, this is also
1470      * supposed to be the presentation time stored inside the stream.
1471      *
1472      * The stream time over periods is supposed to be continuous, that is the
1473      * buffer timestamp 0 + presentation time offset should map to the start
1474      * time of the current period.
1475      *
1476      *
1477      * The adjustment of the stream segments as such works the following.
1478      *
1479      * If the demuxer segment start is bigger than the period start, this
1480      * means that we have to drop some media at the beginning of the current
1481      * period, e.g. because a seek into the middle of the period has
1482      * happened. The amount of media to drop is the difference between the
1483      * period start and the demuxer segment start, and as each period starts
1484      * again from 0, this difference is going to be the actual stream's
1485      * segment start. As all timestamps of the stream are shifted by the
1486      * presentation time offset, we will also have to move the segment start
1487      * by that offset.
1488      *
1489      * Likewise, the demuxer segment stop value is adjusted in the same
1490      * fashion.
1491      *
1492      * Now the running time and stream time at the stream's segment start has
1493      * to be the one that is stored inside the demuxer's segment, which means
1494      * that segment.base and segment.time have to be copied over (done just
1495      * above)
1496      *
1497      *
1498      * If the demuxer segment start is smaller than the period start time,
1499      * this means that the whole period is inside the segment. As each period
1500      * starts timestamps from 0, and additionally timestamps are shifted by
1501      * the presentation time offset, the stream's first timestamp (and as such
1502      * the stream's segment start) has to be the presentation time offset.
1503      * The stream time at the segment start is supposed to be the stream time
1504      * of the period start according to the demuxer segment, so the stream
1505      * segment's time would be set to that. The same goes for the stream
1506      * segment's base, which is supposed to be the running time of the period
1507      * start according to the demuxer's segment.
1508      *
1509      * The same logic applies for negative rates with the segment stop and
1510      * the period stop time (which gets clamped).
1511      *
1512      *
1513      * For the first case where not the complete period is inside the segment,
1514      * the segment time and base as calculated by the second case would be
1515      * equivalent.
1516      */
1517     GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
1518         &demux->segment);
1519     GST_DEBUG_OBJECT (demux,
1520         "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
1521         GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
1522     /* note for readers:
1523      * Since stream->segment is initially a copy of demux->segment,
1524      * only the values that need updating are modified below. */
1525     if (first_and_live) {
1526       /* If first and live, demuxer did seek to the current position already */
1527       stream->segment.start = demux->segment.start - period_start + offset;
1528       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1529         stream->segment.stop = demux->segment.stop - period_start + offset;
1530       /* FIXME : Do we need to handle negative rates for this ? */
1531       stream->segment.position = stream->segment.start;
1532     } else if (demux->segment.start > period_start) {
1533       /* seek within a period */
1534       stream->segment.start = demux->segment.start - period_start + offset;
1535       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1536         stream->segment.stop = demux->segment.stop - period_start + offset;
1537       if (stream->segment.rate >= 0)
1538         stream->segment.position = offset;
1539       else
1540         stream->segment.position = stream->segment.stop;
1541     } else {
1542       stream->segment.start = offset;
1543       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1544         stream->segment.stop = demux->segment.stop - period_start + offset;
1545       if (stream->segment.rate >= 0)
1546         stream->segment.position = offset;
1547       else
1548         stream->segment.position = stream->segment.stop;
1549       stream->segment.time =
1550           gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
1551           period_start);
1552       stream->segment.base =
1553           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
1554           period_start);
1555     }
1556 
1557     stream->pending_segment = gst_event_new_segment (&stream->segment);
1558     gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
1559 
1560     GST_DEBUG_OBJECT (demux,
1561         "Prepared segment %" GST_SEGMENT_FORMAT " for stream %p",
1562         &stream->segment, stream);
1563   }
1564   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1565 
1566   return TRUE;
1567 }
1568 
1569 static gboolean
gst_adaptive_demux_expose_streams(GstAdaptiveDemux * demux)1570 gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux)
1571 {
1572   GList *iter;
1573   GList *old_streams;
1574 
1575   g_return_val_if_fail (demux->prepared_streams != NULL, FALSE);
1576 
1577   old_streams = demux->streams;
1578   demux->streams = demux->prepared_streams;
1579   demux->prepared_streams = NULL;
1580 
1581   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1582     GstAdaptiveDemuxStream *stream = iter->data;
1583 
1584     if (!gst_adaptive_demux_expose_stream (demux,
1585             GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1586       /* TODO act on error */
1587     }
1588   }
1589   demux->priv->preroll_pending = 0;
1590 
1591   GST_MANIFEST_UNLOCK (demux);
1592   gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
1593   GST_MANIFEST_LOCK (demux);
1594 
1595   if (old_streams) {
1596     GstEvent *eos = gst_event_new_eos ();
1597 
1598     /* before we put streams in the demux->priv->old_streams list,
1599      * we ask the download task to stop. In this way, it will no longer be
1600      * allowed to change the demux object.
1601      */
1602     for (iter = old_streams; iter; iter = g_list_next (iter)) {
1603       GstAdaptiveDemuxStream *stream = iter->data;
1604       GstPad *pad = gst_object_ref (GST_PAD (stream->pad));
1605 
1606       GST_MANIFEST_UNLOCK (demux);
1607 
1608       GST_DEBUG_OBJECT (pad, "Pushing EOS");
1609       gst_pad_push_event (pad, gst_event_ref (eos));
1610       gst_pad_set_active (pad, FALSE);
1611 
1612       GST_LOG_OBJECT (pad, "Removing stream");
1613       gst_element_remove_pad (GST_ELEMENT (demux), pad);
1614       GST_MANIFEST_LOCK (demux);
1615 
1616       gst_object_unref (GST_OBJECT (pad));
1617 
1618       /* ask the download task to stop.
1619        * We will not join it now, because our thread can be one of these tasks.
1620        * We will do the joining later, from another stream download task or
1621        * from gst_adaptive_demux_stop_tasks.
1622        * We also cannot change the state of the stream->src element, because
1623        * that will wait on the streaming thread (which could be this thread)
1624        * to stop first.
1625        * Because we sent an EOS to the downstream element, the stream->src
1626        * element should detect this in its streaming task and stop.
1627        * Even if it doesn't do that, we will change its state later in
1628        * gst_adaptive_demux_stop_tasks.
1629        */
1630       GST_LOG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
1631           "Marking stream as cancelled");
1632       gst_task_stop (stream->download_task);
1633       g_mutex_lock (&stream->fragment_download_lock);
1634       stream->cancelled = TRUE;
1635       stream->replaced = TRUE;
1636       g_cond_signal (&stream->fragment_download_cond);
1637       g_mutex_unlock (&stream->fragment_download_lock);
1638     }
1639     gst_event_unref (eos);
1640 
1641     /* The list should be freed from another thread as we can't properly
1642      * cleanup a GstTask from itself */
1643     demux->priv->old_streams =
1644         g_list_concat (demux->priv->old_streams, old_streams);
1645   }
1646 
1647   /* Unblock after removing oldstreams */
1648   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1649     GstAdaptiveDemuxStream *stream = iter->data;
1650     stream->do_block = FALSE;
1651   }
1652 
1653   GST_DEBUG_OBJECT (demux, "All streams are exposed");
1654 
1655   return TRUE;
1656 }
1657 
1658 /* must be called with manifest_lock taken */
1659 GstAdaptiveDemuxStream *
gst_adaptive_demux_stream_new(GstAdaptiveDemux * demux,GstPad * pad)1660 gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
1661 {
1662   GstAdaptiveDemuxStream *stream;
1663 
1664   stream = g_malloc0 (demux->stream_struct_size);
1665 
1666   /* Downloading task */
1667   g_rec_mutex_init (&stream->download_lock);
1668   stream->download_task =
1669       gst_task_new ((GstTaskFunction) gst_adaptive_demux_stream_download_loop,
1670       stream, NULL);
1671   gst_task_set_lock (stream->download_task, &stream->download_lock);
1672 
1673   stream->pad = pad;
1674   stream->demux = demux;
1675   stream->fragment_bitrates =
1676       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
1677   gst_pad_set_element_private (pad, stream);
1678   stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1679 
1680   g_mutex_lock (&demux->priv->preroll_lock);
1681   stream->do_block = TRUE;
1682   demux->priv->preroll_pending++;
1683   g_mutex_unlock (&demux->priv->preroll_lock);
1684 
1685   gst_pad_set_query_function (pad,
1686       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
1687   gst_pad_set_event_function (pad,
1688       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
1689 
1690   gst_segment_init (&stream->segment, GST_FORMAT_TIME);
1691   g_cond_init (&stream->fragment_download_cond);
1692   g_mutex_init (&stream->fragment_download_lock);
1693 
1694   demux->next_streams = g_list_append (demux->next_streams, stream);
1695 
1696   return stream;
1697 }
1698 
1699 GstAdaptiveDemuxStream *
gst_adaptive_demux_find_stream_for_pad(GstAdaptiveDemux * demux,GstPad * pad)1700 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1701 {
1702   GList *iter;
1703 
1704   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1705     GstAdaptiveDemuxStream *stream = iter->data;
1706     if (stream->pad == pad) {
1707       return stream;
1708     }
1709   }
1710 
1711   return NULL;
1712 }
1713 
1714 /* must be called with manifest_lock taken.
1715  * It will temporarily drop the manifest_lock in order to join the task.
1716  * It will join only the old_streams (the demux->streams are joined by
1717  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux_stream_free is
1718  * called)
1719  */
1720 static void
gst_adaptive_demux_stream_free(GstAdaptiveDemuxStream * stream)1721 gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
1722 {
1723   GstAdaptiveDemux *demux = stream->demux;
1724   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1725 
1726   if (klass->stream_free)
1727     klass->stream_free (stream);
1728 
1729   g_clear_error (&stream->last_error);
1730   if (stream->download_task) {
1731     if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
1732       GST_DEBUG_OBJECT (demux, "Leaving streaming task %s:%s",
1733           GST_DEBUG_PAD_NAME (stream->pad));
1734 
1735       gst_task_stop (stream->download_task);
1736 
1737       g_mutex_lock (&stream->fragment_download_lock);
1738       stream->cancelled = TRUE;
1739       g_cond_signal (&stream->fragment_download_cond);
1740       g_mutex_unlock (&stream->fragment_download_lock);
1741     }
1742     GST_LOG_OBJECT (demux, "Waiting for task to finish");
1743 
1744     /* temporarily drop the manifest lock to join the task */
1745     GST_MANIFEST_UNLOCK (demux);
1746 
1747     gst_task_join (stream->download_task);
1748 
1749     GST_MANIFEST_LOCK (demux);
1750 
1751     GST_LOG_OBJECT (demux, "Finished");
1752     gst_object_unref (stream->download_task);
1753     g_rec_mutex_clear (&stream->download_lock);
1754     stream->download_task = NULL;
1755   }
1756 
1757   gst_adaptive_demux_stream_fragment_clear (&stream->fragment);
1758 
1759   if (stream->pending_segment) {
1760     gst_event_unref (stream->pending_segment);
1761     stream->pending_segment = NULL;
1762   }
1763 
1764   if (stream->pending_events) {
1765     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
1766     stream->pending_events = NULL;
1767   }
1768 
1769   if (stream->internal_pad) {
1770     gst_object_unparent (GST_OBJECT_CAST (stream->internal_pad));
1771   }
1772 
1773   if (stream->src_srcpad) {
1774     gst_object_unref (stream->src_srcpad);
1775     stream->src_srcpad = NULL;
1776   }
1777 
1778   if (stream->src) {
1779     GstElement *src = stream->src;
1780 
1781     stream->src = NULL;
1782 
1783     GST_MANIFEST_UNLOCK (demux);
1784     gst_element_set_locked_state (src, TRUE);
1785     gst_element_set_state (src, GST_STATE_NULL);
1786     gst_bin_remove (GST_BIN_CAST (demux), src);
1787     GST_MANIFEST_LOCK (demux);
1788   }
1789 
1790   g_cond_clear (&stream->fragment_download_cond);
1791   g_mutex_clear (&stream->fragment_download_lock);
1792   g_free (stream->fragment_bitrates);
1793 
1794   if (stream->pad) {
1795     gst_object_unref (stream->pad);
1796     stream->pad = NULL;
1797   }
1798   if (stream->pending_caps)
1799     gst_caps_unref (stream->pending_caps);
1800 
1801   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
1802 
1803   g_free (stream);
1804 }
1805 
1806 /* must be called with manifest_lock taken */
1807 static gboolean
gst_adaptive_demux_get_live_seek_range(GstAdaptiveDemux * demux,gint64 * range_start,gint64 * range_stop)1808 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1809     gint64 * range_start, gint64 * range_stop)
1810 {
1811   GstAdaptiveDemuxClass *klass;
1812 
1813   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1814 
1815   g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1816 
1817   return klass->get_live_seek_range (demux, range_start, range_stop);
1818 }
1819 
1820 /* must be called with manifest_lock taken */
1821 static gboolean
gst_adaptive_demux_stream_in_live_seek_range(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)1822 gst_adaptive_demux_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1823     GstAdaptiveDemuxStream * stream)
1824 {
1825   gint64 range_start, range_stop;
1826   if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1827     GST_LOG_OBJECT (stream->pad,
1828         "stream position %" GST_TIME_FORMAT "  live seek range %"
1829         GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1830         GST_TIME_ARGS (stream->segment.position), GST_STIME_ARGS (range_start),
1831         GST_STIME_ARGS (range_stop));
1832     return (stream->segment.position >= range_start
1833         && stream->segment.position <= range_stop);
1834   }
1835 
1836   return FALSE;
1837 }
1838 
1839 /* must be called with manifest_lock taken */
1840 static gboolean
gst_adaptive_demux_can_seek(GstAdaptiveDemux * demux)1841 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1842 {
1843   GstAdaptiveDemuxClass *klass;
1844 
1845   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1846   if (gst_adaptive_demux_is_live (demux)) {
1847     return klass->get_live_seek_range != NULL;
1848   }
1849 
1850   return klass->seek != NULL;
1851 }
1852 
1853 static void
gst_adaptive_demux_update_streams_segment(GstAdaptiveDemux * demux,GList * streams,gint64 period_start,GstSeekType start_type,GstSeekType stop_type)1854 gst_adaptive_demux_update_streams_segment (GstAdaptiveDemux * demux,
1855     GList * streams, gint64 period_start, GstSeekType start_type,
1856     GstSeekType stop_type)
1857 {
1858   GList *iter;
1859   for (iter = streams; iter; iter = g_list_next (iter)) {
1860     GstAdaptiveDemuxStream *stream = iter->data;
1861     GstEvent *seg_evt;
1862     GstClockTime offset;
1863 
1864     /* See comments in gst_adaptive_demux_get_period_start_time() for
1865      * an explanation of the segment modifications */
1866     stream->segment = demux->segment;
1867     offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1868     stream->segment.start += offset - period_start;
1869     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1870       stream->segment.stop += offset - period_start;
1871     if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1872       stream->segment.position = stream->segment.start;
1873     else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1874       stream->segment.position = stream->segment.stop;
1875     seg_evt = gst_event_new_segment (&stream->segment);
1876     gst_event_set_seqnum (seg_evt, demux->priv->segment_seqnum);
1877     gst_event_replace (&stream->pending_segment, seg_evt);
1878     GST_DEBUG_OBJECT (stream->pad, "Pending segment now %" GST_PTR_FORMAT,
1879         stream->pending_segment);
1880     gst_event_unref (seg_evt);
1881     /* Make sure the first buffer after a seek has the discont flag */
1882     stream->discont = TRUE;
1883   }
1884   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1885 }
1886 
1887 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE |	  \
1888                               GST_SEEK_FLAG_SNAP_AFTER |	  \
1889                               GST_SEEK_FLAG_SNAP_NEAREST |	  \
1890 			      GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1891 			      GST_SEEK_FLAG_KEY_UNIT))
1892 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1893                               GST_SEEK_FLAG_SNAP_AFTER | \
1894                               GST_SEEK_FLAG_SNAP_NEAREST))
1895 
1896 static gboolean
gst_adaptive_demux_handle_seek_event(GstAdaptiveDemux * demux,GstPad * pad,GstEvent * event)1897 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
1898     GstEvent * event)
1899 {
1900   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1901   gdouble rate;
1902   GstFormat format;
1903   GstSeekFlags flags;
1904   GstSeekType start_type, stop_type;
1905   gint64 start, stop;
1906   guint32 seqnum;
1907   gboolean update;
1908   gboolean ret;
1909   GstSegment oldsegment;
1910   GstAdaptiveDemuxStream *stream = NULL;
1911 
1912   GST_INFO_OBJECT (demux, "Received seek event");
1913 
1914   GST_API_LOCK (demux);
1915   GST_MANIFEST_LOCK (demux);
1916 
1917   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1918       &stop_type, &stop);
1919 
1920   if (format != GST_FORMAT_TIME) {
1921     GST_MANIFEST_UNLOCK (demux);
1922     GST_API_UNLOCK (demux);
1923     GST_WARNING_OBJECT (demux,
1924         "Adaptive demuxers only support TIME-based seeking");
1925     gst_event_unref (event);
1926     return FALSE;
1927   }
1928 
1929   if (flags & GST_SEEK_FLAG_SEGMENT) {
1930     GST_FIXME_OBJECT (demux, "Handle segment seeks");
1931     GST_MANIFEST_UNLOCK (demux);
1932     GST_API_UNLOCK (demux);
1933     gst_event_unref (event);
1934     return FALSE;
1935   }
1936 
1937   seqnum = gst_event_get_seqnum (event);
1938 
1939   if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
1940     /* For instant rate seeks, reply directly and update
1941      * our segment so the new rate is reflected in any future
1942      * fragments */
1943     GstEvent *ev;
1944 
1945     /* instant rate change only supported if direction does not change. All
1946      * other requirements are already checked before creating the seek event
1947      * but let's double-check here to be sure */
1948     if ((demux->segment.rate > 0 && rate < 0) ||
1949         (demux->segment.rate < 0 && rate > 0) ||
1950         start_type != GST_SEEK_TYPE_NONE ||
1951         stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
1952       GST_ERROR_OBJECT (demux,
1953           "Instant rate change seeks only supported in the "
1954           "same direction, without flushing and position change");
1955       GST_MANIFEST_UNLOCK (demux);
1956       GST_API_UNLOCK (demux);
1957       return FALSE;
1958     }
1959 
1960     ev = gst_event_new_instant_rate_change (rate / demux->segment.rate,
1961         (GstSegmentFlags) flags);
1962     gst_event_set_seqnum (ev, seqnum);
1963 
1964     GST_MANIFEST_UNLOCK (demux);
1965 
1966     ret = gst_adaptive_demux_push_src_event (demux, ev);
1967 
1968     GST_API_UNLOCK (demux);
1969     gst_event_unref (event);
1970 
1971     return ret;
1972   }
1973 
1974   if (!gst_adaptive_demux_can_seek (demux)) {
1975     GST_MANIFEST_UNLOCK (demux);
1976     GST_API_UNLOCK (demux);
1977     gst_event_unref (event);
1978     return FALSE;
1979   }
1980 
1981   if (gst_adaptive_demux_is_live (demux)) {
1982     gint64 range_start, range_stop;
1983     gboolean changed = FALSE;
1984     gboolean start_valid = TRUE, stop_valid = TRUE;
1985 
1986     if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1987             &range_stop)) {
1988       GST_MANIFEST_UNLOCK (demux);
1989       GST_API_UNLOCK (demux);
1990       gst_event_unref (event);
1991       GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
1992       return FALSE;
1993     }
1994 
1995     GST_DEBUG_OBJECT (demux,
1996         "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
1997         GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
1998 
1999     /* Handle relative positioning for live streams (relative to the range_stop) */
2000     if (start_type == GST_SEEK_TYPE_END) {
2001       start = range_stop + start;
2002       start_type = GST_SEEK_TYPE_SET;
2003       changed = TRUE;
2004     }
2005     if (stop_type == GST_SEEK_TYPE_END) {
2006       stop = range_stop + stop;
2007       stop_type = GST_SEEK_TYPE_SET;
2008       changed = TRUE;
2009     }
2010 
2011     /* Adjust the requested start/stop position if it falls beyond the live
2012      * seek range.
2013      * The only case where we don't adjust is for the starting point of
2014      * an accurate seek (start if forward and stop if backwards)
2015      */
2016     if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2017         (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2018       GST_DEBUG_OBJECT (demux,
2019           "seek before live stream start, setting to range start: %"
2020           GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2021       start = range_start;
2022       changed = TRUE;
2023     }
2024     /* truncate stop position also if set */
2025     if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2026         (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2027       GST_DEBUG_OBJECT (demux,
2028           "seek ending after live start, adjusting to: %"
2029           GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2030       stop = range_stop;
2031       changed = TRUE;
2032     }
2033 
2034     if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2035         (start < range_start || start > range_stop)) {
2036       GST_WARNING_OBJECT (demux,
2037           "Seek to invalid position start:%" GST_STIME_FORMAT
2038           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2039           ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2040           GST_STIME_ARGS (range_stop));
2041       start_valid = FALSE;
2042     }
2043     if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2044         (stop < range_start || stop > range_stop)) {
2045       GST_WARNING_OBJECT (demux,
2046           "Seek to invalid position stop:%" GST_STIME_FORMAT
2047           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2048           ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2049           GST_STIME_ARGS (range_stop));
2050       stop_valid = FALSE;
2051     }
2052 
2053     /* If the seek position is still outside of the seekable range, refuse the seek */
2054     if (!start_valid || !stop_valid) {
2055       GST_MANIFEST_UNLOCK (demux);
2056       GST_API_UNLOCK (demux);
2057       gst_event_unref (event);
2058       return FALSE;
2059     }
2060 
2061     /* Re-create seek event with changed/updated values */
2062     if (changed) {
2063       gst_event_unref (event);
2064       event =
2065           gst_event_new_seek (rate, format, flags,
2066           start_type, start, stop_type, stop);
2067       gst_event_set_seqnum (event, seqnum);
2068     }
2069   }
2070 
2071   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2072 
2073   /* have a backup in case seek fails */
2074   gst_segment_copy_into (&demux->segment, &oldsegment);
2075 
2076   if (flags & GST_SEEK_FLAG_FLUSH) {
2077     GstEvent *fevent;
2078 
2079     GST_DEBUG_OBJECT (demux, "sending flush start");
2080     fevent = gst_event_new_flush_start ();
2081     gst_event_set_seqnum (fevent, seqnum);
2082     GST_MANIFEST_UNLOCK (demux);
2083     gst_adaptive_demux_push_src_event (demux, fevent);
2084     GST_MANIFEST_LOCK (demux);
2085 
2086     gst_adaptive_demux_stop_tasks (demux, FALSE);
2087   } else if ((rate > 0 && start_type != GST_SEEK_TYPE_NONE) ||
2088       (rate < 0 && stop_type != GST_SEEK_TYPE_NONE)) {
2089 
2090     gst_adaptive_demux_stop_tasks (demux, FALSE);
2091   }
2092 
2093   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2094 
2095   /*
2096    * Handle snap seeks as follows:
2097    * 1) do the snap seeking on the stream that received
2098    *    the event
2099    * 2) use the final position on this stream to seek
2100    *    on the other streams to the same position
2101    *
2102    * We can't snap at all streams at the same time as
2103    * they might end in different positions, so just
2104    * use the one that received the event as the 'leading'
2105    * one to do the snap seek.
2106    */
2107   if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
2108           gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
2109     GstClockTime ts;
2110     GstSeekFlags stream_seek_flags = flags;
2111 
2112     /* snap-seek on the stream that received the event and then
2113      * use the resulting position to seek on all streams */
2114 
2115     if (rate >= 0) {
2116       if (start_type != GST_SEEK_TYPE_NONE)
2117         ts = start;
2118       else {
2119         ts = stream->segment.position;
2120         start_type = GST_SEEK_TYPE_SET;
2121       }
2122     } else {
2123       if (stop_type != GST_SEEK_TYPE_NONE)
2124         ts = stop;
2125       else {
2126         stop_type = GST_SEEK_TYPE_SET;
2127         ts = stream->segment.position;
2128       }
2129     }
2130 
2131     if (stream) {
2132       demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
2133     }
2134 
2135     /* replace event with a new one without snapping to seek on all streams */
2136     gst_event_unref (event);
2137     if (rate >= 0) {
2138       start = ts;
2139     } else {
2140       stop = ts;
2141     }
2142     event =
2143         gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2144         start_type, start, stop_type, stop);
2145     GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2146   }
2147   stream = NULL;
2148 
2149   ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2150       start, stop_type, stop, &update);
2151 
2152   if (ret) {
2153     /* FIXME - this seems unatural, do_seek() is updating base when we
2154      * only want the start/stop position to change, maybe do_seek() needs
2155      * some fixing? */
2156     if (!(flags & GST_SEEK_FLAG_FLUSH) && ((rate > 0
2157                 && start_type == GST_SEEK_TYPE_NONE) || (rate < 0
2158                 && stop_type == GST_SEEK_TYPE_NONE))) {
2159       demux->segment.base = oldsegment.base;
2160     }
2161 
2162     GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2163 
2164     ret = demux_class->seek (demux, event);
2165   }
2166 
2167   if (!ret) {
2168     /* Is there anything else we can do if it fails? */
2169     gst_segment_copy_into (&oldsegment, &demux->segment);
2170   } else {
2171     demux->priv->segment_seqnum = seqnum;
2172   }
2173   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2174 
2175   if (flags & GST_SEEK_FLAG_FLUSH) {
2176     GstEvent *fevent;
2177 
2178     GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2179     fevent = gst_event_new_flush_stop (TRUE);
2180     gst_event_set_seqnum (fevent, seqnum);
2181 #ifdef OHOS_OPT_COMPAT
2182     /**
2183      * ohos.opt.compat.0037
2184      * Fixed deadlock when seek and reconfigure are concurrent
2185      */
2186     GST_MANIFEST_UNLOCK (demux);
2187 #endif
2188     gst_adaptive_demux_push_src_event (demux, fevent);
2189 #ifdef OHOS_OPT_COMPAT
2190     // ohos.opt.compat.0037
2191     GST_MANIFEST_LOCK (demux);
2192 #endif
2193   }
2194 
2195   if (demux->next_streams) {
2196     /* If the seek generated new streams, get them
2197      * to preroll */
2198     gst_adaptive_demux_prepare_streams (demux, FALSE);
2199     gst_adaptive_demux_start_tasks (demux, TRUE);
2200   } else {
2201     GstClockTime period_start =
2202         gst_adaptive_demux_get_period_start_time (demux);
2203 
2204     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2205     gst_adaptive_demux_update_streams_segment (demux, demux->streams,
2206         period_start, start_type, stop_type);
2207     gst_adaptive_demux_update_streams_segment (demux, demux->prepared_streams,
2208         period_start, start_type, stop_type);
2209     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2210 
2211 #ifdef OHOS_OPT_COMPAT
2212     /* ohos.opt.compat.0028
2213      * In the variable resolution, if the changing path is still in the prepare stream state,
2214      * if seek continues to play at a certain location, the cancel of the prepare stream in will be set to true.
2215      * When switching to the main thread of the prepare stream, it is determined that when cancel is true,
2216      * no data will be pulled from the server, the prepare stream will be released first
2217      */
2218     if (demux->streams && demux->prepared_streams) {
2219       g_list_free_full (demux->prepared_streams,
2220           (GDestroyNotify) gst_adaptive_demux_stream_free);
2221       demux->prepared_streams = NULL;
2222       /* ohos.opt.compat.0043
2223        * Preroll_pending needs to reset when prepared_streams are released. The reason is
2224        * gst_adaptive_demux_stream_new() will increase the value, while after the streams were
2225        * prepared and released the value is not changed.
2226        */
2227       GST_MANIFEST_UNLOCK (demux);
2228       g_mutex_lock (&demux->priv->preroll_lock);
2229       demux->priv->preroll_pending = 0;
2230       g_mutex_unlock (&demux->priv->preroll_lock);
2231       GST_MANIFEST_LOCK (demux);
2232     }
2233 #endif
2234 
2235     /* Restart the demux */
2236     gst_adaptive_demux_start_tasks (demux, FALSE);
2237   }
2238 
2239   GST_MANIFEST_UNLOCK (demux);
2240   GST_API_UNLOCK (demux);
2241   gst_event_unref (event);
2242 
2243   return ret;
2244 }
2245 
2246 static gboolean
gst_adaptive_demux_src_event(GstPad * pad,GstObject * parent,GstEvent * event)2247 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2248     GstEvent * event)
2249 {
2250   GstAdaptiveDemux *demux;
2251 
2252   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2253 
2254   /* FIXME handle events received on pads that are to be removed */
2255 
2256   switch (event->type) {
2257     case GST_EVENT_SEEK:
2258     {
2259       guint32 seqnum = gst_event_get_seqnum (event);
2260       if (seqnum == demux->priv->segment_seqnum) {
2261         GST_LOG_OBJECT (pad,
2262             "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2263         gst_event_unref (event);
2264         return TRUE;
2265       }
2266       return gst_adaptive_demux_handle_seek_event (demux, pad, event);
2267     }
2268     case GST_EVENT_RECONFIGURE:{
2269       GstAdaptiveDemuxStream *stream;
2270 
2271       GST_MANIFEST_LOCK (demux);
2272       stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
2273 
2274       if (stream) {
2275         if (!stream->cancelled && gst_adaptive_demux_is_running (demux) &&
2276             stream->last_ret == GST_FLOW_NOT_LINKED) {
2277           stream->last_ret = GST_FLOW_OK;
2278           stream->restart_download = TRUE;
2279           stream->need_header = TRUE;
2280           stream->discont = TRUE;
2281           GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
2282           gst_task_start (stream->download_task);
2283         }
2284         gst_event_unref (event);
2285         GST_MANIFEST_UNLOCK (demux);
2286         return TRUE;
2287       }
2288       GST_MANIFEST_UNLOCK (demux);
2289     }
2290       break;
2291     case GST_EVENT_LATENCY:{
2292       /* Upstream and our internal source are irrelevant
2293        * for latency, and we should not fail here to
2294        * configure the latency */
2295       gst_event_unref (event);
2296       return TRUE;
2297     }
2298     case GST_EVENT_QOS:{
2299       GstClockTimeDiff diff;
2300       GstClockTime timestamp;
2301       GstClockTime earliest_time;
2302 
2303       gst_event_parse_qos (event, NULL, NULL, &diff, &timestamp);
2304       /* Only take into account lateness if late */
2305       if (diff > 0)
2306         earliest_time = timestamp + 2 * diff;
2307       else
2308         earliest_time = timestamp;
2309 
2310       GST_OBJECT_LOCK (demux);
2311       if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2312           earliest_time > demux->priv->qos_earliest_time) {
2313         demux->priv->qos_earliest_time = earliest_time;
2314         GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2315             GST_TIME_ARGS (demux->priv->qos_earliest_time));
2316       }
2317       GST_OBJECT_UNLOCK (demux);
2318       break;
2319     }
2320     default:
2321       break;
2322   }
2323 
2324   return gst_pad_event_default (pad, parent, event);
2325 }
2326 
2327 static gboolean
gst_adaptive_demux_src_query(GstPad * pad,GstObject * parent,GstQuery * query)2328 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2329     GstQuery * query)
2330 {
2331   GstAdaptiveDemux *demux;
2332   GstAdaptiveDemuxClass *demux_class;
2333   gboolean ret = FALSE;
2334 
2335   if (query == NULL)
2336     return FALSE;
2337 
2338   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2339   demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2340 
2341   switch (query->type) {
2342     case GST_QUERY_DURATION:{
2343       GstClockTime duration = -1;
2344       GstFormat fmt;
2345 
2346       gst_query_parse_duration (query, &fmt, NULL);
2347 #ifdef OHOS_OPT_COMPAT
2348       /**
2349        * ohos.opt.compat.0040 Fixed query duration failed.
2350        * gst_hls_demux_process_manifest() function hold a manifest lock to update playlist.
2351        * During this period, the value obtained by judgment(gst_adaptive_demux_is_live() function) is wrong.
2352        * Thus, use a manifest lock to block until the attribute endlist of m3u8 is updated.
2353        */
2354       GST_MANIFEST_LOCK (demux);
2355 #endif
2356       if (gst_adaptive_demux_is_live (demux)) {
2357         /* We are able to answer this query: the duration is unknown */
2358         gst_query_set_duration (query, fmt, -1);
2359         ret = TRUE;
2360 #ifdef OHOS_OPT_COMPAT
2361         // ohos.opt.compat.0040
2362         GST_MANIFEST_UNLOCK (demux);
2363 #endif
2364         break;
2365       }
2366 #ifdef OHOS_OPT_COMPAT
2367       // ohos.opt.compat.0040
2368       GST_MANIFEST_UNLOCK (demux);
2369 #endif
2370 
2371       if (fmt == GST_FORMAT_TIME
2372           && g_atomic_int_get (&demux->priv->have_manifest)) {
2373         duration = demux_class->get_duration (demux);
2374 
2375         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2376           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2377           ret = TRUE;
2378         }
2379       }
2380 
2381       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2382           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2383       break;
2384     }
2385     case GST_QUERY_LATENCY:{
2386       gst_query_set_latency (query, FALSE, 0, -1);
2387       ret = TRUE;
2388       break;
2389     }
2390     case GST_QUERY_SEEKING:{
2391       GstFormat fmt;
2392       gint64 stop = -1;
2393       gint64 start = 0;
2394 
2395       if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2396         GST_INFO_OBJECT (demux,
2397             "Don't have manifest yet, can't answer seeking query");
2398         return FALSE;           /* can't answer without manifest */
2399       }
2400 
2401       GST_MANIFEST_LOCK (demux);
2402 
2403       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2404       GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2405       if (fmt == GST_FORMAT_TIME) {
2406         GstClockTime duration;
2407         gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2408 
2409         ret = TRUE;
2410         if (can_seek) {
2411           if (gst_adaptive_demux_is_live (demux)) {
2412             ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2413             if (!ret) {
2414               GST_MANIFEST_UNLOCK (demux);
2415               GST_INFO_OBJECT (demux, "can't answer seeking query");
2416               return FALSE;
2417             }
2418           } else {
2419             duration = demux_class->get_duration (demux);
2420             if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2421               stop = duration;
2422           }
2423         }
2424         gst_query_set_seeking (query, fmt, can_seek, start, stop);
2425         GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2426             GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2427             GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2428       }
2429       GST_MANIFEST_UNLOCK (demux);
2430       break;
2431     }
2432     case GST_QUERY_URI:
2433 
2434       GST_MANIFEST_LOCK (demux);
2435 
2436       /* TODO HLS can answer this differently it seems */
2437       if (demux->manifest_uri) {
2438         /* FIXME: (hls) Do we answer with the variant playlist, with the current
2439          * playlist or the the uri of the last downlowaded fragment? */
2440         gst_query_set_uri (query, demux->manifest_uri);
2441         ret = TRUE;
2442       }
2443 
2444       GST_MANIFEST_UNLOCK (demux);
2445       break;
2446     default:
2447       /* Don't forward queries upstream because of the special nature of this
2448        *  "demuxer", which relies on the upstream element only to be fed
2449        *  the Manifest
2450        */
2451       break;
2452   }
2453 
2454   return ret;
2455 }
2456 
2457 /* must be called with manifest_lock taken */
2458 static void
gst_adaptive_demux_start_tasks(GstAdaptiveDemux * demux,gboolean start_preroll_streams)2459 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
2460     gboolean start_preroll_streams)
2461 {
2462   GList *iter;
2463 
2464   if (!gst_adaptive_demux_is_running (demux)) {
2465     GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2466     return;
2467   }
2468 
2469   GST_INFO_OBJECT (demux, "Starting streams' tasks");
2470 
2471   iter = start_preroll_streams ? demux->prepared_streams : demux->streams;
2472 
2473   for (; iter; iter = g_list_next (iter)) {
2474     GstAdaptiveDemuxStream *stream = iter->data;
2475 #ifdef OHOS_EXT_FUNC
2476     // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
2477     GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2478     if (klass->stream_select_bitrate)
2479       klass->stream_select_bitrate (stream, demux->connection_speed);
2480 #endif
2481 
2482     if (!start_preroll_streams) {
2483       g_mutex_lock (&stream->fragment_download_lock);
2484       stream->cancelled = FALSE;
2485       stream->replaced = FALSE;
2486       g_mutex_unlock (&stream->fragment_download_lock);
2487     }
2488 
2489     stream->last_ret = GST_FLOW_OK;
2490     gst_task_start (stream->download_task);
2491   }
2492 }
2493 
2494 /* must be called with manifest_lock taken */
2495 static void
gst_adaptive_demux_stop_manifest_update_task(GstAdaptiveDemux * demux)2496 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2497 {
2498   gst_uri_downloader_cancel (demux->downloader);
2499 
2500   gst_task_stop (demux->priv->updates_task);
2501 
2502   g_mutex_lock (&demux->priv->updates_timed_lock);
2503   GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2504   demux->priv->stop_updates_task = TRUE;
2505   g_cond_signal (&demux->priv->updates_timed_cond);
2506   g_mutex_unlock (&demux->priv->updates_timed_lock);
2507 }
2508 
2509 /* must be called with manifest_lock taken */
2510 static void
gst_adaptive_demux_start_manifest_update_task(GstAdaptiveDemux * demux)2511 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2512 {
2513   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2514 
2515   if (gst_adaptive_demux_is_live (demux)) {
2516 #ifdef OHOS_EXT_FUNC
2517 // ohos.ext.func.0034
2518     g_signal_emit (GST_ELEMENT(demux), g_gst_adaptive_demux_signals[SIGNAL_IS_LIVE_SCENE], 0, TRUE);
2519 // ohos.ext.func.0036
2520     if (demux->priv->set_auto_bitrate_first == FALSE) {
2521       demux->connection_speed = 0;
2522       demux->priv->set_auto_bitrate_first = TRUE;
2523     }
2524 
2525 #endif
2526     gst_uri_downloader_reset (demux->downloader);
2527     g_mutex_lock (&demux->priv->updates_timed_lock);
2528     demux->priv->stop_updates_task = FALSE;
2529     g_mutex_unlock (&demux->priv->updates_timed_lock);
2530     /* Task to periodically update the manifest */
2531     if (demux_class->requires_periodical_playlist_update (demux)) {
2532       GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2533       gst_task_start (demux->priv->updates_task);
2534     }
2535   }
2536 }
2537 
2538 /* must be called with manifest_lock taken
2539  * This function will temporarily release manifest_lock in order to join the
2540  * download threads.
2541  * The api_lock will still protect it against other threads trying to modify
2542  * the demux element.
2543  */
2544 static void
gst_adaptive_demux_stop_tasks(GstAdaptiveDemux * demux,gboolean stop_updates)2545 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2546 {
2547   int i;
2548   GList *iter;
2549   GList *list_to_process;
2550 
2551   GST_LOG_OBJECT (demux, "Stopping tasks");
2552 
2553   if (stop_updates)
2554     gst_adaptive_demux_stop_manifest_update_task (demux);
2555 
2556   list_to_process = demux->streams;
2557   for (i = 0; i < 2; ++i) {
2558     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2559       GstAdaptiveDemuxStream *stream = iter->data;
2560 
2561       g_mutex_lock (&stream->fragment_download_lock);
2562       stream->cancelled = TRUE;
2563       gst_task_stop (stream->download_task);
2564       g_cond_signal (&stream->fragment_download_cond);
2565       g_mutex_unlock (&stream->fragment_download_lock);
2566     }
2567     list_to_process = demux->prepared_streams;
2568   }
2569 
2570   GST_MANIFEST_UNLOCK (demux);
2571   g_mutex_lock (&demux->priv->preroll_lock);
2572   g_cond_broadcast (&demux->priv->preroll_cond);
2573   g_mutex_unlock (&demux->priv->preroll_lock);
2574   GST_MANIFEST_LOCK (demux);
2575 
2576   g_mutex_lock (&demux->priv->manifest_update_lock);
2577   g_cond_broadcast (&demux->priv->manifest_cond);
2578   g_mutex_unlock (&demux->priv->manifest_update_lock);
2579 
2580   /* need to release manifest_lock before stopping the src element.
2581    * The streams were asked to cancel, so they will not make any writes to demux
2582    * object. Even if we temporarily release manifest_lock, the demux->streams
2583    * cannot change and iter cannot be invalidated.
2584    */
2585   list_to_process = demux->streams;
2586   for (i = 0; i < 2; ++i) {
2587     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2588       GstAdaptiveDemuxStream *stream = iter->data;
2589       GstElement *src = stream->src;
2590 
2591       GST_MANIFEST_UNLOCK (demux);
2592 
2593       if (src) {
2594         gst_element_set_locked_state (src, TRUE);
2595         gst_element_set_state (src, GST_STATE_READY);
2596       }
2597 
2598       /* stream->download_task value never changes, so it is safe to read it
2599        * outside critical section
2600        */
2601       gst_task_join (stream->download_task);
2602 
2603       GST_MANIFEST_LOCK (demux);
2604     }
2605     list_to_process = demux->prepared_streams;
2606   }
2607 
2608   GST_MANIFEST_UNLOCK (demux);
2609   if (stop_updates)
2610     gst_task_join (demux->priv->updates_task);
2611 
2612   GST_MANIFEST_LOCK (demux);
2613 
2614   list_to_process = demux->streams;
2615   for (i = 0; i < 2; ++i) {
2616     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2617       GstAdaptiveDemuxStream *stream = iter->data;
2618 
2619       stream->download_error_count = 0;
2620       stream->need_header = TRUE;
2621     }
2622     list_to_process = demux->prepared_streams;
2623   }
2624   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2625 }
2626 
2627 /* must be called with manifest_lock taken */
2628 static gboolean
gst_adaptive_demux_push_src_event(GstAdaptiveDemux * demux,GstEvent * event)2629 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2630 {
2631   GList *iter;
2632   gboolean ret = TRUE;
2633 
2634   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2635     GstAdaptiveDemuxStream *stream = iter->data;
2636     gst_event_ref (event);
2637     ret = ret & gst_pad_push_event (stream->pad, event);
2638   }
2639   gst_event_unref (event);
2640   return ret;
2641 }
2642 
2643 /* must be called with manifest_lock taken */
2644 void
gst_adaptive_demux_stream_set_caps(GstAdaptiveDemuxStream * stream,GstCaps * caps)2645 gst_adaptive_demux_stream_set_caps (GstAdaptiveDemuxStream * stream,
2646     GstCaps * caps)
2647 {
2648   GST_DEBUG_OBJECT (stream->pad, "setting new caps for stream %" GST_PTR_FORMAT,
2649       caps);
2650   gst_caps_replace (&stream->pending_caps, caps);
2651   gst_caps_unref (caps);
2652 }
2653 
2654 /* must be called with manifest_lock taken */
2655 void
gst_adaptive_demux_stream_set_tags(GstAdaptiveDemuxStream * stream,GstTagList * tags)2656 gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
2657     GstTagList * tags)
2658 {
2659   GST_DEBUG_OBJECT (stream->pad, "setting new tags for stream %" GST_PTR_FORMAT,
2660       tags);
2661   if (stream->pending_tags) {
2662     gst_tag_list_unref (stream->pending_tags);
2663   }
2664   stream->pending_tags = tags;
2665 }
2666 
2667 /* must be called with manifest_lock taken */
2668 void
gst_adaptive_demux_stream_queue_event(GstAdaptiveDemuxStream * stream,GstEvent * event)2669 gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
2670     GstEvent * event)
2671 {
2672   stream->pending_events = g_list_append (stream->pending_events, event);
2673 }
2674 
2675 /* must be called with manifest_lock taken */
2676 static guint64
_update_average_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,guint64 new_bitrate)2677 _update_average_bitrate (GstAdaptiveDemux * demux,
2678     GstAdaptiveDemuxStream * stream, guint64 new_bitrate)
2679 {
2680   gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2681 
2682   stream->moving_bitrate -= stream->fragment_bitrates[index];
2683   stream->fragment_bitrates[index] = new_bitrate;
2684   stream->moving_bitrate += new_bitrate;
2685 
2686   stream->moving_index += 1;
2687 
2688   if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2689     return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2690   return stream->moving_bitrate / stream->moving_index;
2691 }
2692 
2693 /* must be called with manifest_lock taken */
2694 static guint64
gst_adaptive_demux_stream_update_current_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2695 gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2696     GstAdaptiveDemuxStream * stream)
2697 {
2698   guint64 average_bitrate;
2699   guint64 fragment_bitrate;
2700 
2701   if (demux->connection_speed) {
2702     GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it",
2703         demux->connection_speed / 1000);
2704     stream->current_download_rate = demux->connection_speed;
2705     return demux->connection_speed;
2706   }
2707 
2708   fragment_bitrate = stream->last_bitrate;
2709   GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2710       fragment_bitrate);
2711 
2712   average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2713 
2714   GST_INFO_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2715       "last fragment bitrate was %" G_GUINT64_FORMAT, fragment_bitrate);
2716   GST_INFO_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2717       "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
2718       NUM_LOOKBACK_FRAGMENTS, average_bitrate);
2719 
2720   /* Conservative approach, make sure we don't upgrade too fast */
2721   stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
2722 
2723   stream->current_download_rate *= demux->bitrate_limit;
2724   GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %"
2725       G_GUINT64_FORMAT, demux->bitrate_limit, stream->current_download_rate);
2726 
2727 #if 0
2728   /* Debugging code, modulate the bitrate every few fragments */
2729   {
2730     static guint ctr = 0;
2731     if (ctr % 3 == 0) {
2732       GST_INFO_OBJECT (demux, "Halving reported bitrate for debugging");
2733       stream->current_download_rate /= 2;
2734     }
2735     ctr++;
2736   }
2737 #endif
2738 
2739   return stream->current_download_rate;
2740 }
2741 
2742 /* must be called with manifest_lock taken */
2743 static GstFlowReturn
gst_adaptive_demux_combine_flows(GstAdaptiveDemux * demux)2744 gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
2745 {
2746   gboolean all_notlinked = TRUE;
2747   gboolean all_eos = TRUE;
2748   GList *iter;
2749 
2750   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2751     GstAdaptiveDemuxStream *stream = iter->data;
2752 
2753     if (stream->last_ret != GST_FLOW_NOT_LINKED) {
2754       all_notlinked = FALSE;
2755       if (stream->last_ret != GST_FLOW_EOS)
2756         all_eos = FALSE;
2757     }
2758 
2759     if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
2760         || stream->last_ret == GST_FLOW_FLUSHING) {
2761       return stream->last_ret;
2762     }
2763   }
2764   if (all_notlinked)
2765     return GST_FLOW_NOT_LINKED;
2766   else if (all_eos)
2767     return GST_FLOW_EOS;
2768   return GST_FLOW_OK;
2769 }
2770 
2771 /* Called with preroll_lock */
2772 static void
gst_adaptive_demux_handle_preroll(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)2773 gst_adaptive_demux_handle_preroll (GstAdaptiveDemux * demux,
2774     GstAdaptiveDemuxStream * stream)
2775 {
2776   demux->priv->preroll_pending--;
2777   if (demux->priv->preroll_pending == 0) {
2778     /* That was the last one, time to release all streams
2779      * and expose them */
2780     GST_DEBUG_OBJECT (demux, "All streams prerolled. exposing");
2781     gst_adaptive_demux_expose_streams (demux);
2782     g_cond_broadcast (&demux->priv->preroll_cond);
2783   }
2784 }
2785 
2786 /* must be called with manifest_lock taken.
2787  * Temporarily releases manifest_lock
2788  */
2789 GstFlowReturn
gst_adaptive_demux_stream_push_buffer(GstAdaptiveDemuxStream * stream,GstBuffer * buffer)2790 gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
2791     GstBuffer * buffer)
2792 {
2793   GstAdaptiveDemux *demux = stream->demux;
2794   GstFlowReturn ret = GST_FLOW_OK;
2795   gboolean discont = FALSE;
2796   /* Pending events */
2797   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags = NULL;
2798   GList *pending_events = NULL;
2799 
2800   /* FIXME :
2801    * This is duplicating *exactly* the same thing as what is done at the beginning
2802    * of _src_chain if starting_fragment is TRUE */
2803   if (stream->first_fragment_buffer) {
2804     GstClockTime offset =
2805         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2806     GstClockTime period_start =
2807         gst_adaptive_demux_get_period_start_time (demux);
2808 
2809     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2810     if (demux->segment.rate < 0)
2811       /* Set DISCONT flag for every first buffer in reverse playback mode
2812        * as each fragment for its own has to be reversed */
2813       discont = TRUE;
2814 
2815     GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2816     if (GST_BUFFER_PTS_IS_VALID (buffer))
2817       GST_BUFFER_PTS (buffer) += offset;
2818 
2819     if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2820       stream->segment.position = GST_BUFFER_PTS (buffer);
2821 
2822       /* Convert from position inside the stream's segment to the demuxer's
2823        * segment, they are not necessarily the same */
2824       if (stream->segment.position - offset + period_start >
2825           demux->segment.position)
2826         demux->segment.position =
2827             stream->segment.position - offset + period_start;
2828     }
2829     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2830 
2831     GST_LOG_OBJECT (stream->pad,
2832         "Going to push buffer with PTS %" GST_TIME_FORMAT,
2833         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2834   } else {
2835     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2836   }
2837 
2838   if (stream->discont) {
2839     discont = TRUE;
2840     stream->discont = FALSE;
2841   }
2842 
2843   if (discont) {
2844     GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
2845     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2846   } else {
2847     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
2848   }
2849 
2850   stream->first_fragment_buffer = FALSE;
2851 
2852   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
2853   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
2854   if (G_UNLIKELY (stream->pending_caps)) {
2855     pending_caps = gst_event_new_caps (stream->pending_caps);
2856     gst_caps_unref (stream->pending_caps);
2857     stream->pending_caps = NULL;
2858   }
2859 
2860   if (stream->do_block) {
2861 
2862     g_mutex_lock (&demux->priv->preroll_lock);
2863 
2864     /* If we are preroll state, set caps in here */
2865     if (pending_caps) {
2866       gst_pad_push_event (stream->pad, pending_caps);
2867       pending_caps = NULL;
2868     }
2869 
2870     gst_adaptive_demux_handle_preroll (demux, stream);
2871     GST_MANIFEST_UNLOCK (demux);
2872 
2873     while (stream->do_block && !stream->cancelled) {
2874       GST_LOG_OBJECT (demux, "Stream %p sleeping for preroll", stream);
2875       g_cond_wait (&demux->priv->preroll_cond, &demux->priv->preroll_lock);
2876     }
2877     if (stream->cancelled) {
2878       GST_LOG_OBJECT (demux, "stream %p cancelled", stream);
2879       gst_buffer_unref (buffer);
2880       g_mutex_unlock (&demux->priv->preroll_lock);
2881       return GST_FLOW_FLUSHING;
2882     }
2883 
2884     g_mutex_unlock (&demux->priv->preroll_lock);
2885     GST_MANIFEST_LOCK (demux);
2886   }
2887 
2888   if (G_UNLIKELY (stream->pending_segment)) {
2889     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2890     pending_segment = stream->pending_segment;
2891     stream->pending_segment = NULL;
2892     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2893   }
2894   if (G_UNLIKELY (stream->pending_tags || stream->bitrate_changed)) {
2895     GstTagList *tags = stream->pending_tags;
2896 
2897     stream->pending_tags = NULL;
2898     stream->bitrate_changed = 0;
2899 
2900     if (stream->fragment.bitrate != 0) {
2901       if (tags)
2902         tags = gst_tag_list_make_writable (tags);
2903       else
2904         tags = gst_tag_list_new_empty ();
2905 
2906 #ifdef OHOS_EXT_FUNC
2907       // ohos.ext.func.0042 when change bitrate, ongly change playlist, don't add new src and new pipeline
2908       GST_DEBUG_OBJECT (demux, "set bandwidth");
2909       gint bandwidth = 0;
2910       GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2911       if (klass->get_current_bandwidth) {
2912         bandwidth = klass->get_current_bandwidth(stream);
2913       }
2914       guint64 position = 0;
2915       if (klass->get_current_position) {
2916         position = klass->get_current_position(stream);
2917       }
2918       gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
2919           GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, GST_TAG_BANDWIDTH, bandwidth, GST_TAG_SLICE_POSITION, position, NULL);
2920 #else
2921       gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
2922           GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, NULL);
2923 #endif
2924     }
2925     if (tags)
2926       pending_tags = gst_event_new_tag (tags);
2927   }
2928   if (G_UNLIKELY (stream->pending_events)) {
2929     pending_events = stream->pending_events;
2930     stream->pending_events = NULL;
2931   }
2932 
2933   GST_MANIFEST_UNLOCK (demux);
2934 
2935   /* Do not push events or buffers holding the manifest lock */
2936   if (G_UNLIKELY (pending_caps)) {
2937     GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT,
2938         pending_caps);
2939     gst_pad_push_event (stream->pad, pending_caps);
2940   }
2941 #ifdef OHOS_EXT_FUNC
2942   // ohos.ext.func.0043 Clear data in the multiqueue to speed up switching bitrate
2943   if (G_UNLIKELY (pending_tags)) {
2944     GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
2945         pending_tags);
2946     gst_pad_push_event (stream->pad, pending_tags);
2947   }
2948   if (G_UNLIKELY (pending_segment)) {
2949     GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
2950         pending_segment);
2951     gst_pad_push_event (stream->pad, pending_segment);
2952   }
2953 #else
2954   if (G_UNLIKELY (pending_segment)) {
2955     GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
2956         pending_segment);
2957     gst_pad_push_event (stream->pad, pending_segment);
2958   }
2959   if (G_UNLIKELY (pending_tags)) {
2960     GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
2961         pending_tags);
2962     gst_pad_push_event (stream->pad, pending_tags);
2963   }
2964 #endif
2965   while (pending_events != NULL) {
2966     GstEvent *event = pending_events->data;
2967 
2968     if (!gst_pad_push_event (stream->pad, event))
2969       GST_ERROR_OBJECT (stream->pad, "Failed to send pending event");
2970 
2971     pending_events = g_list_delete_link (pending_events, pending_events);
2972   }
2973 
2974   /* Wait for preroll if blocking */
2975   GST_DEBUG_OBJECT (stream->pad,
2976       "About to push buffer of size %" G_GSIZE_FORMAT,
2977       gst_buffer_get_size (buffer));
2978 
2979   ret = gst_pad_push (stream->pad, buffer);
2980 
2981   GST_MANIFEST_LOCK (demux);
2982 
2983   g_mutex_lock (&stream->fragment_download_lock);
2984   if (G_UNLIKELY (stream->cancelled)) {
2985     GST_LOG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2986         "Stream was cancelled");
2987     ret = stream->last_ret = GST_FLOW_FLUSHING;
2988     g_mutex_unlock (&stream->fragment_download_lock);
2989     return ret;
2990   }
2991   g_mutex_unlock (&stream->fragment_download_lock);
2992 
2993   GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
2994       gst_flow_get_name (ret));
2995 
2996   return ret;
2997 }
2998 
2999 /* must be called with manifest_lock taken */
3000 static GstFlowReturn
gst_adaptive_demux_stream_finish_fragment_default(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)3001 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
3002     GstAdaptiveDemuxStream * stream)
3003 {
3004   /* No need to advance, this isn't a real fragment */
3005   if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
3006     return GST_FLOW_OK;
3007 
3008   return gst_adaptive_demux_stream_advance_fragment (demux, stream,
3009       stream->fragment.duration);
3010 }
3011 
3012 /* must be called with manifest_lock taken.
3013  * Can temporarily release manifest_lock
3014  */
3015 static GstFlowReturn
gst_adaptive_demux_stream_data_received_default(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstBuffer * buffer)3016 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
3017     GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
3018 {
3019   return gst_adaptive_demux_stream_push_buffer (stream, buffer);
3020 }
3021 
3022 static gboolean
gst_adaptive_demux_requires_periodical_playlist_update_default(GstAdaptiveDemux * demux)3023 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
3024     * demux)
3025 {
3026   return TRUE;
3027 }
3028 
3029 static GstFlowReturn
_src_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)3030 _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
3031 {
3032   GstAdaptiveDemuxStream *stream;
3033   GstAdaptiveDemux *demux;
3034   GstAdaptiveDemuxClass *klass;
3035   GstFlowReturn ret = GST_FLOW_OK;
3036 
3037   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
3038   stream = gst_pad_get_element_private (pad);
3039   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3040 
3041   GST_MANIFEST_LOCK (demux);
3042 
3043   /* do not make any changes if the stream is cancelled */
3044   g_mutex_lock (&stream->fragment_download_lock);
3045   if (G_UNLIKELY (stream->cancelled)) {
3046     g_mutex_unlock (&stream->fragment_download_lock);
3047     gst_buffer_unref (buffer);
3048     ret = stream->last_ret = GST_FLOW_FLUSHING;
3049     GST_MANIFEST_UNLOCK (demux);
3050     return ret;
3051   }
3052   g_mutex_unlock (&stream->fragment_download_lock);
3053 
3054   /* starting_fragment is set to TRUE at the beginning of
3055    * _stream_download_fragment()
3056    * /!\ If there is a header/index being downloaded, then this will
3057    * be TRUE for the first one ... but FALSE for the remaining ones,
3058    * including the *actual* fragment ! */
3059   if (stream->starting_fragment) {
3060     GstClockTime offset =
3061         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
3062     GstClockTime period_start =
3063         gst_adaptive_demux_get_period_start_time (demux);
3064 
3065     stream->starting_fragment = FALSE;
3066     if (klass->start_fragment) {
3067       if (!klass->start_fragment (demux, stream)) {
3068         ret = GST_FLOW_ERROR;
3069         goto error;
3070       }
3071     }
3072 
3073     GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
3074     if (GST_BUFFER_PTS_IS_VALID (buffer))
3075       GST_BUFFER_PTS (buffer) += offset;
3076 
3077     GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
3078         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
3079 
3080     if (GST_BUFFER_PTS_IS_VALID (buffer)) {
3081       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3082       stream->segment.position = GST_BUFFER_PTS (buffer);
3083 
3084       /* Convert from position inside the stream's segment to the demuxer's
3085        * segment, they are not necessarily the same */
3086       if (stream->segment.position - offset + period_start >
3087           demux->segment.position)
3088         demux->segment.position =
3089             stream->segment.position - offset + period_start;
3090       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3091     }
3092 
3093   } else {
3094     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
3095   }
3096 
3097   /* downloading_first_buffer is set to TRUE in download_uri() just before
3098    * activating the source (i.e. requesting a given URI)
3099    *
3100    * The difference with starting_fragment is that this will be called
3101    * for *all* first buffers (of index, and header, and fragment)
3102    *
3103    * ... to then only do something useful (in this block) for actual
3104    * fragments... */
3105   if (stream->downloading_first_buffer) {
3106     gint64 chunk_size = 0;
3107 
3108     stream->downloading_first_buffer = FALSE;
3109 
3110     if (!stream->downloading_header && !stream->downloading_index) {
3111       /* If this is the first buffer of a fragment (not the headers or index)
3112        * and we don't have a birate from the sub-class, then see if we
3113        * can work it out from the fragment size and duration */
3114       if (stream->fragment.bitrate == 0 &&
3115           stream->fragment.duration != 0 &&
3116           gst_element_query_duration (stream->uri_handler, GST_FORMAT_BYTES,
3117               &chunk_size) && chunk_size != -1) {
3118         guint bitrate = MIN (G_MAXUINT, gst_util_uint64_scale (chunk_size,
3119                 8 * GST_SECOND, stream->fragment.duration));
3120         GST_LOG_OBJECT (demux,
3121             "Fragment has size %" G_GINT64_FORMAT " duration %" GST_TIME_FORMAT
3122             " = bitrate %u", chunk_size,
3123             GST_TIME_ARGS (stream->fragment.duration), bitrate);
3124         stream->fragment.bitrate = bitrate;
3125       }
3126       if (stream->fragment.bitrate) {
3127         stream->bitrate_changed = TRUE;
3128       } else {
3129         GST_WARNING_OBJECT (demux, "Bitrate for fragment not available");
3130       }
3131     }
3132   }
3133 
3134   stream->download_total_bytes += gst_buffer_get_size (buffer);
3135 
3136   GST_TRACE_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT,
3137       gst_buffer_get_size (buffer));
3138 
3139   ret = klass->data_received (demux, stream, buffer);
3140 
3141   if (ret == GST_FLOW_FLUSHING) {
3142     /* do not make any changes if the stream is cancelled */
3143     g_mutex_lock (&stream->fragment_download_lock);
3144     if (G_UNLIKELY (stream->cancelled)) {
3145       g_mutex_unlock (&stream->fragment_download_lock);
3146       GST_MANIFEST_UNLOCK (demux);
3147       return ret;
3148     }
3149     g_mutex_unlock (&stream->fragment_download_lock);
3150   }
3151 
3152   if (ret != GST_FLOW_OK) {
3153     gboolean finished = FALSE;
3154 
3155     if (ret < GST_FLOW_EOS) {
3156       GST_ELEMENT_FLOW_ERROR (demux, ret);
3157 
3158       /* TODO push this on all pads */
3159       gst_pad_push_event (stream->pad, gst_event_new_eos ());
3160     } else {
3161       GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
3162           gst_flow_get_name (ret));
3163     }
3164 
3165     if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
3166       ret = GST_FLOW_EOS;       /* return EOS to make the source stop */
3167     } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
3168       /* Behaves like an EOS event from upstream */
3169       stream->fragment.finished = TRUE;
3170       ret = klass->finish_fragment (demux, stream);
3171       if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
3172         ret = GST_FLOW_EOS;     /* return EOS to make the source stop */
3173       } else if (ret != GST_FLOW_OK) {
3174         goto error;
3175       }
3176       finished = TRUE;
3177     }
3178 
3179     gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
3180     if (finished)
3181       ret = GST_FLOW_EOS;
3182   }
3183 
3184 error:
3185 
3186   GST_MANIFEST_UNLOCK (demux);
3187 
3188   return ret;
3189 }
3190 
3191 /* must be called with manifest_lock taken */
3192 static void
gst_adaptive_demux_stream_fragment_download_finish(GstAdaptiveDemuxStream * stream,GstFlowReturn ret,GError * err)3193 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
3194     stream, GstFlowReturn ret, GError * err)
3195 {
3196   GST_DEBUG_OBJECT (stream->pad, "Download finish: %d %s - err: %p", ret,
3197       gst_flow_get_name (ret), err);
3198 
3199   /* if we have an error, only replace last_ret if it was OK before to avoid
3200    * overwriting the first error we got */
3201   if (stream->last_ret == GST_FLOW_OK) {
3202     stream->last_ret = ret;
3203     if (err) {
3204       g_clear_error (&stream->last_error);
3205       stream->last_error = g_error_copy (err);
3206     }
3207   }
3208   g_mutex_lock (&stream->fragment_download_lock);
3209   stream->download_finished = TRUE;
3210   g_cond_signal (&stream->fragment_download_cond);
3211   g_mutex_unlock (&stream->fragment_download_lock);
3212 }
3213 
3214 static GstFlowReturn
gst_adaptive_demux_eos_handling(GstAdaptiveDemuxStream * stream)3215 gst_adaptive_demux_eos_handling (GstAdaptiveDemuxStream * stream)
3216 {
3217   GstFlowReturn ret = GST_FLOW_OK;
3218   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
3219 
3220   if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
3221       || !klass->need_another_chunk (stream)
3222       || stream->fragment.chunk_size == 0) {
3223     stream->fragment.finished = TRUE;
3224 
3225     /* Last chance to figure out a fallback nominal bitrate if neither baseclass
3226        nor the HTTP Content-Length implementation worked. */
3227     if (stream->fragment.bitrate == 0 && stream->fragment.duration != 0 &&
3228         stream->fragment_bytes_downloaded != 0 && !stream->downloading_index &&
3229         !stream->downloading_header) {
3230       guint bitrate = MIN (G_MAXUINT,
3231           gst_util_uint64_scale (stream->fragment_bytes_downloaded,
3232               8 * GST_SECOND, stream->fragment.duration));
3233       GST_LOG_OBJECT (stream->pad,
3234           "Fragment has size %" G_GUINT64_FORMAT " duration %" GST_TIME_FORMAT
3235           " = bitrate %u", stream->fragment_bytes_downloaded,
3236           GST_TIME_ARGS (stream->fragment.duration), bitrate);
3237       stream->fragment.bitrate = bitrate;
3238       stream->bitrate_changed = TRUE;
3239     }
3240     ret = klass->finish_fragment (stream->demux, stream);
3241   }
3242   gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
3243 
3244   return ret;
3245 }
3246 
3247 static gboolean
_src_event(GstPad * pad,GstObject * parent,GstEvent * event)3248 _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3249 {
3250   GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
3251   GstAdaptiveDemux *demux = stream->demux;
3252 
3253   switch (GST_EVENT_TYPE (event)) {
3254     case GST_EVENT_EOS:{
3255       GST_DEBUG_OBJECT (pad, "Saw EOS on src pad");
3256       GST_MANIFEST_LOCK (demux);
3257 
3258       gst_adaptive_demux_eos_handling (stream);
3259 
3260       /* FIXME ?
3261        * _eos_handling() calls  fragment_download_finish() which does the
3262        * same thing as below.
3263        * Could this cause races ? */
3264       g_mutex_lock (&stream->fragment_download_lock);
3265       stream->download_finished = TRUE;
3266       g_cond_signal (&stream->fragment_download_cond);
3267       g_mutex_unlock (&stream->fragment_download_lock);
3268 
3269       GST_MANIFEST_UNLOCK (demux);
3270       break;
3271     }
3272     default:
3273       break;
3274   }
3275 
3276   gst_event_unref (event);
3277 
3278   return TRUE;
3279 }
3280 
3281 static gboolean
_src_query(GstPad * pad,GstObject * parent,GstQuery * query)3282 _src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3283 {
3284   GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
3285 
3286   switch (GST_QUERY_TYPE (query)) {
3287     case GST_QUERY_ALLOCATION:
3288       return FALSE;
3289       break;
3290     default:
3291       break;
3292   }
3293 
3294   return gst_pad_peer_query (stream->pad, query);
3295 }
3296 
3297 static GstPadProbeReturn
_uri_handler_probe(GstPad * pad,GstPadProbeInfo * info,GstAdaptiveDemuxStream * stream)3298 _uri_handler_probe (GstPad * pad, GstPadProbeInfo * info,
3299     GstAdaptiveDemuxStream * stream)
3300 {
3301   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
3302 
3303   if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
3304     GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
3305     if (stream->fragment_bytes_downloaded == 0) {
3306       stream->last_latency =
3307           gst_adaptive_demux_get_monotonic_time (stream->demux) -
3308           (stream->download_start_time * GST_USECOND);
3309       GST_DEBUG_OBJECT (pad,
3310           "FIRST BYTE since download_start %" GST_TIME_FORMAT,
3311           GST_TIME_ARGS (stream->last_latency));
3312     }
3313     stream->fragment_bytes_downloaded += gst_buffer_get_size (buf);
3314     GST_LOG_OBJECT (pad,
3315         "Received buffer, size %" G_GSIZE_FORMAT " total %" G_GUINT64_FORMAT,
3316         gst_buffer_get_size (buf), stream->fragment_bytes_downloaded);
3317   } else if (GST_PAD_PROBE_INFO_TYPE (info) &
3318       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
3319     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
3320     GST_LOG_OBJECT (pad, "Received event %s %" GST_PTR_FORMAT,
3321         GST_EVENT_TYPE_NAME (ev), ev);
3322     switch (GST_EVENT_TYPE (ev)) {
3323       case GST_EVENT_SEGMENT:
3324         stream->fragment_bytes_downloaded = 0;
3325         break;
3326       case GST_EVENT_EOS:
3327       {
3328         stream->last_download_time =
3329             gst_adaptive_demux_get_monotonic_time (stream->demux) -
3330             (stream->download_start_time * GST_USECOND);
3331         stream->last_bitrate =
3332             gst_util_uint64_scale (stream->fragment_bytes_downloaded,
3333             8 * GST_SECOND, stream->last_download_time);
3334         GST_DEBUG_OBJECT (pad,
3335             "EOS since download_start %" GST_TIME_FORMAT " bitrate %"
3336             G_GUINT64_FORMAT " bps", GST_TIME_ARGS (stream->last_download_time),
3337             stream->last_bitrate);
3338         /* Calculate bitrate since URI request */
3339       }
3340         break;
3341       default:
3342         break;
3343     }
3344   }
3345 
3346   return ret;
3347 }
3348 
3349 /* must be called with manifest_lock taken.
3350  * Can temporarily release manifest_lock
3351  */
3352 static gboolean
gst_adaptive_demux_stream_wait_manifest_update(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)3353 gst_adaptive_demux_stream_wait_manifest_update (GstAdaptiveDemux * demux,
3354     GstAdaptiveDemuxStream * stream)
3355 {
3356   gboolean ret = TRUE;
3357 
3358   /* Wait until we're cancelled or there's something for
3359    * us to download in the playlist or the playlist
3360    * became non-live */
3361   while (TRUE) {
3362     GST_DEBUG_OBJECT (demux, "No fragment left but live playlist, wait a bit");
3363 
3364     /* get the manifest_update_lock while still holding the manifest_lock.
3365      * This will prevent other threads to signal the condition (they will need
3366      * both manifest_lock and manifest_update_lock in order to signal).
3367      * It cannot deadlock because all threads always get the manifest_lock first
3368      * and manifest_update_lock second.
3369      */
3370     g_mutex_lock (&demux->priv->manifest_update_lock);
3371 
3372     GST_MANIFEST_UNLOCK (demux);
3373 
3374     g_cond_wait (&demux->priv->manifest_cond,
3375         &demux->priv->manifest_update_lock);
3376     g_mutex_unlock (&demux->priv->manifest_update_lock);
3377 
3378     GST_MANIFEST_LOCK (demux);
3379 
3380     /* check for cancelled every time we get the manifest_lock */
3381     g_mutex_lock (&stream->fragment_download_lock);
3382     if (G_UNLIKELY (stream->cancelled)) {
3383       ret = FALSE;
3384       stream->last_ret = GST_FLOW_FLUSHING;
3385       g_mutex_unlock (&stream->fragment_download_lock);
3386       break;
3387     }
3388     g_mutex_unlock (&stream->fragment_download_lock);
3389 
3390     /* Got a new fragment or not live anymore? */
3391     if (gst_adaptive_demux_stream_update_fragment_info (demux, stream) ==
3392         GST_FLOW_OK) {
3393       GST_DEBUG_OBJECT (demux, "new fragment available, "
3394           "not waiting for manifest update");
3395       ret = TRUE;
3396       break;
3397     }
3398 
3399     if (!gst_adaptive_demux_is_live (demux)) {
3400       GST_DEBUG_OBJECT (demux, "Not live anymore, "
3401           "not waiting for manifest update");
3402       ret = FALSE;
3403       break;
3404     }
3405   }
3406   GST_DEBUG_OBJECT (demux, "Retrying now");
3407   return ret;
3408 }
3409 
3410 /* must be called with manifest_lock taken */
3411 static gboolean
gst_adaptive_demux_stream_update_source(GstAdaptiveDemuxStream * stream,const gchar * uri,const gchar * referer,gboolean refresh,gboolean allow_cache)3412 gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
3413     const gchar * uri, const gchar * referer, gboolean refresh,
3414     gboolean allow_cache)
3415 {
3416   GstAdaptiveDemux *demux = stream->demux;
3417 
3418   if (!gst_uri_is_valid (uri)) {
3419     GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
3420     return FALSE;
3421   }
3422 
3423   /* Try to re-use existing source element */
3424   if (stream->src != NULL) {
3425     gchar *old_protocol, *new_protocol;
3426     gchar *old_uri;
3427 
3428     old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler));
3429     old_protocol = gst_uri_get_protocol (old_uri);
3430     new_protocol = gst_uri_get_protocol (uri);
3431 
3432     if (!g_str_equal (old_protocol, new_protocol)) {
3433       GstElement *src = stream->src;
3434 
3435       stream->src = NULL;
3436       gst_object_unref (stream->src_srcpad);
3437       stream->src_srcpad = NULL;
3438       GST_MANIFEST_UNLOCK (demux);
3439       gst_element_set_locked_state (src, TRUE);
3440       gst_element_set_state (src, GST_STATE_NULL);
3441       gst_bin_remove (GST_BIN_CAST (demux), src);
3442       GST_MANIFEST_LOCK (demux);
3443       GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
3444     } else {
3445       GError *err = NULL;
3446 
3447       GST_DEBUG_OBJECT (demux, "Re-using old source element");
3448       if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri,
3449               &err)) {
3450         GstElement *src = stream->src;
3451 
3452         stream->src = NULL;
3453         GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
3454             err ? err->message : "Unknown error");
3455         g_clear_error (&err);
3456         gst_object_unref (stream->src_srcpad);
3457         stream->src_srcpad = NULL;
3458         GST_MANIFEST_UNLOCK (demux);
3459         gst_element_set_locked_state (src, TRUE);
3460         gst_element_set_state (src, GST_STATE_NULL);
3461         gst_bin_remove (GST_BIN_CAST (demux), src);
3462         GST_MANIFEST_LOCK (demux);
3463       }
3464     }
3465     g_free (old_uri);
3466     g_free (old_protocol);
3467     g_free (new_protocol);
3468   }
3469 
3470   if (stream->src == NULL) {
3471     GstPad *uri_handler_src;
3472     GstPad *queue_sink;
3473     GstPad *queue_src;
3474     GstElement *uri_handler;
3475     GstElement *queue;
3476     GstPadLinkReturn pad_link_ret;
3477     GObjectClass *gobject_class;
3478     gchar *internal_name, *bin_name;
3479 
3480     /* Our src consists of a bin containing uri_handler -> queue . The
3481      * purpose of the queue is to allow the uri_handler to download an
3482      * entire fragment without blocking, so we can accurately measure the
3483      * download bitrate. */
3484 
3485     queue = gst_element_factory_make ("queue", NULL);
3486     if (queue == NULL)
3487       return FALSE;
3488 
3489     g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL);
3490     g_object_set (queue, "max-size-buffers", (guint) 0, NULL);
3491     g_object_set (queue, "max-size-time", (guint64) 0, NULL);
3492 
3493     uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
3494     if (uri_handler == NULL) {
3495       GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
3496           ("Missing plugin to handle URI: '%s'", uri), (NULL));
3497       gst_object_unref (queue);
3498       return FALSE;
3499     }
3500 
3501     gobject_class = G_OBJECT_GET_CLASS (uri_handler);
3502 
3503     if (g_object_class_find_property (gobject_class, "compress"))
3504       g_object_set (uri_handler, "compress", FALSE, NULL);
3505     if (g_object_class_find_property (gobject_class, "keep-alive"))
3506       g_object_set (uri_handler, "keep-alive", TRUE, NULL);
3507     if (g_object_class_find_property (gobject_class, "extra-headers")) {
3508       if (referer || refresh || !allow_cache) {
3509         GstStructure *extra_headers = gst_structure_new_empty ("headers");
3510 
3511         if (referer)
3512           gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
3513               NULL);
3514 
3515         if (!allow_cache)
3516           gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3517               "no-cache", NULL);
3518         else if (refresh)
3519           gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3520               "max-age=0", NULL);
3521 
3522         g_object_set (uri_handler, "extra-headers", extra_headers, NULL);
3523 
3524         gst_structure_free (extra_headers);
3525       } else {
3526         g_object_set (uri_handler, "extra-headers", NULL, NULL);
3527       }
3528     }
3529 
3530     /* Source bin creation */
3531     bin_name = g_strdup_printf ("srcbin-%s", GST_PAD_NAME (stream->pad));
3532     stream->src = gst_bin_new (bin_name);
3533     g_free (bin_name);
3534     if (stream->src == NULL) {
3535       gst_object_unref (queue);
3536       gst_object_unref (uri_handler);
3537       return FALSE;
3538     }
3539 
3540     gst_bin_add (GST_BIN_CAST (stream->src), queue);
3541     gst_bin_add (GST_BIN_CAST (stream->src), uri_handler);
3542 
3543     uri_handler_src = gst_element_get_static_pad (uri_handler, "src");
3544     queue_sink = gst_element_get_static_pad (queue, "sink");
3545 
3546     pad_link_ret =
3547         gst_pad_link_full (uri_handler_src, queue_sink,
3548         GST_PAD_LINK_CHECK_NOTHING);
3549     if (GST_PAD_LINK_FAILED (pad_link_ret)) {
3550       GST_WARNING_OBJECT (demux,
3551           "Could not link pads %s:%s to %s:%s for reason %d",
3552           GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink),
3553           pad_link_ret);
3554       g_object_unref (queue_sink);
3555       g_object_unref (uri_handler_src);
3556       gst_object_unref (stream->src);
3557       stream->src = NULL;
3558       return FALSE;
3559     }
3560 
3561     /* Add a downstream event and data probe */
3562     gst_pad_add_probe (uri_handler_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
3563         (GstPadProbeCallback) _uri_handler_probe, stream, NULL);
3564 
3565     g_object_unref (queue_sink);
3566     g_object_unref (uri_handler_src);
3567     queue_src = gst_element_get_static_pad (queue, "src");
3568     stream->src_srcpad = gst_ghost_pad_new ("src", queue_src);
3569     g_object_unref (queue_src);
3570     gst_element_add_pad (stream->src, stream->src_srcpad);
3571 
3572     gst_element_set_locked_state (stream->src, TRUE);
3573     gst_bin_add (GST_BIN_CAST (demux), stream->src);
3574     stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
3575 
3576     /* set up our internal floating pad to drop all events from
3577      * the http src we don't care about. On the chain function
3578      * we just push the buffer forward */
3579     internal_name = g_strdup_printf ("internal-%s", GST_PAD_NAME (stream->pad));
3580     stream->internal_pad = gst_pad_new (internal_name, GST_PAD_SINK);
3581     g_free (internal_name);
3582     gst_object_set_parent (GST_OBJECT_CAST (stream->internal_pad),
3583         GST_OBJECT_CAST (demux));
3584     GST_OBJECT_FLAG_SET (stream->internal_pad, GST_PAD_FLAG_NEED_PARENT);
3585     gst_pad_set_element_private (stream->internal_pad, stream);
3586     gst_pad_set_active (stream->internal_pad, TRUE);
3587     gst_pad_set_chain_function (stream->internal_pad, _src_chain);
3588     gst_pad_set_event_function (stream->internal_pad, _src_event);
3589     gst_pad_set_query_function (stream->internal_pad, _src_query);
3590 
3591     if (gst_pad_link_full (stream->src_srcpad, stream->internal_pad,
3592             GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
3593       GST_ERROR_OBJECT (stream->pad, "Failed to link internal pad");
3594       return FALSE;
3595     }
3596 
3597     stream->uri_handler = uri_handler;
3598     stream->queue = queue;
3599 
3600     stream->last_status_code = 200;     /* default to OK */
3601   }
3602   return TRUE;
3603 }
3604 
3605 static GstPadProbeReturn
gst_ad_stream_src_to_ready_cb(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)3606 gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
3607     gpointer user_data)
3608 {
3609   GstAdaptiveDemuxStream *stream = user_data;
3610 
3611   /* The source's src pad is IDLE so now set the state to READY */
3612   g_mutex_lock (&stream->fragment_download_lock);
3613   stream->src_at_ready = TRUE;
3614   g_cond_signal (&stream->fragment_download_cond);
3615   g_mutex_unlock (&stream->fragment_download_lock);
3616 
3617   return GST_PAD_PROBE_REMOVE;
3618 }
3619 
3620 #ifndef GST_DISABLE_GST_DEBUG
3621 static const char *
uritype(GstAdaptiveDemuxStream * s)3622 uritype (GstAdaptiveDemuxStream * s)
3623 {
3624   if (s->downloading_header)
3625     return "header";
3626   if (s->downloading_index)
3627     return "index";
3628   return "fragment";
3629 }
3630 #endif
3631 
3632 /* must be called with manifest_lock taken.
3633  * Can temporarily release manifest_lock
3634  *
3635  * Will return when URI is fully downloaded (or aborted/errored)
3636  */
3637 static GstFlowReturn
gst_adaptive_demux_stream_download_uri(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,const gchar * uri,gint64 start,gint64 end,guint * http_status)3638 gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
3639     GstAdaptiveDemuxStream * stream, const gchar * uri, gint64 start,
3640     gint64 end, guint * http_status)
3641 {
3642   GstFlowReturn ret = GST_FLOW_OK;
3643   GST_DEBUG_OBJECT (stream->pad,
3644       "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
3645       uritype (stream), uri, start, end);
3646 
3647   if (http_status)
3648     *http_status = 200;         /* default to ok if no further information */
3649 
3650   if (!gst_adaptive_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
3651     ret = stream->last_ret = GST_FLOW_ERROR;
3652     return ret;
3653   }
3654 
3655   gst_element_set_locked_state (stream->src, TRUE);
3656 
3657   GST_MANIFEST_UNLOCK (demux);
3658   if (gst_element_set_state (stream->src,
3659           GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
3660     /* If ranges are specified, seek to it */
3661     if (start != 0 || end != -1) {
3662       /* HTTP ranges are inclusive, GStreamer segments are exclusive for the
3663        * stop position */
3664       if (end != -1)
3665         end += 1;
3666       /* Send the seek event to the uri_handler, as the other pipeline elements
3667        * can't handle it when READY. */
3668       if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0,
3669                   GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
3670                   GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
3671 
3672         GST_MANIFEST_LOCK (demux);
3673         /* looks like the source can't handle seeks in READY */
3674         g_clear_error (&stream->last_error);
3675         stream->last_error = g_error_new (GST_CORE_ERROR,
3676             GST_CORE_ERROR_NOT_IMPLEMENTED,
3677             "Source element can't handle range requests");
3678         stream->last_ret = GST_FLOW_ERROR;
3679       } else {
3680         GST_MANIFEST_LOCK (demux);
3681       }
3682     } else {
3683       GST_MANIFEST_LOCK (demux);
3684     }
3685 
3686     if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
3687       stream->download_start_time =
3688           GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
3689 
3690       /* src element is in state READY. Before we start it, we reset
3691        * download_finished
3692        */
3693       g_mutex_lock (&stream->fragment_download_lock);
3694       stream->download_finished = FALSE;
3695       stream->downloading_first_buffer = TRUE;
3696       g_mutex_unlock (&stream->fragment_download_lock);
3697 
3698       GST_MANIFEST_UNLOCK (demux);
3699 
3700       if (!gst_element_sync_state_with_parent (stream->src)) {
3701         GST_WARNING_OBJECT (demux, "Could not sync state for src element");
3702         GST_MANIFEST_LOCK (demux);
3703         ret = stream->last_ret = GST_FLOW_ERROR;
3704         return ret;
3705       }
3706 
3707       /* wait for the fragment to be completely downloaded */
3708       GST_DEBUG_OBJECT (stream->pad,
3709           "Waiting for %s download to finish: %s", uritype (stream), uri);
3710 
3711       g_mutex_lock (&stream->fragment_download_lock);
3712       stream->src_at_ready = FALSE;
3713       if (G_UNLIKELY (stream->cancelled)) {
3714         g_mutex_unlock (&stream->fragment_download_lock);
3715         GST_MANIFEST_LOCK (demux);
3716         ret = stream->last_ret = GST_FLOW_FLUSHING;
3717         return ret;
3718       }
3719       /* download_finished is only set:
3720        * * in ::fragment_download_finish()
3721        * * if EOS is received on the _src pad
3722        * */
3723       while (!stream->cancelled && !stream->download_finished) {
3724         g_cond_wait (&stream->fragment_download_cond,
3725             &stream->fragment_download_lock);
3726       }
3727       g_mutex_unlock (&stream->fragment_download_lock);
3728 
3729       GST_DEBUG_OBJECT (stream->pad,
3730           "Finished Waiting for %s download: %s", uritype (stream), uri);
3731 
3732       GST_MANIFEST_LOCK (demux);
3733       g_mutex_lock (&stream->fragment_download_lock);
3734       if (G_UNLIKELY (stream->cancelled)) {
3735         ret = stream->last_ret = GST_FLOW_FLUSHING;
3736         g_mutex_unlock (&stream->fragment_download_lock);
3737         return ret;
3738       }
3739       g_mutex_unlock (&stream->fragment_download_lock);
3740 
3741       ret = stream->last_ret;
3742 
3743       GST_DEBUG_OBJECT (stream->pad, "%s download finished: %s %d %s",
3744           uritype (stream), uri, stream->last_ret,
3745           gst_flow_get_name (stream->last_ret));
3746       if (stream->last_ret != GST_FLOW_OK && http_status) {
3747         *http_status = stream->last_status_code;
3748       }
3749     }
3750 
3751     /* changing src element state might try to join the streaming thread, so
3752      * we must not hold the manifest lock.
3753      */
3754     GST_MANIFEST_UNLOCK (demux);
3755   } else {
3756     GST_MANIFEST_UNLOCK (demux);
3757     if (stream->last_ret == GST_FLOW_OK)
3758       stream->last_ret = GST_FLOW_CUSTOM_ERROR;
3759     ret = GST_FLOW_CUSTOM_ERROR;
3760   }
3761 
3762   stream->src_at_ready = FALSE;
3763 
3764   gst_element_set_locked_state (stream->src, TRUE);
3765   gst_pad_add_probe (stream->src_srcpad, GST_PAD_PROBE_TYPE_IDLE,
3766       gst_ad_stream_src_to_ready_cb, stream, NULL);
3767 
3768   g_mutex_lock (&stream->fragment_download_lock);
3769   while (!stream->src_at_ready) {
3770     g_cond_wait (&stream->fragment_download_cond,
3771         &stream->fragment_download_lock);
3772   }
3773   g_mutex_unlock (&stream->fragment_download_lock);
3774 
3775   gst_element_set_state (stream->src, GST_STATE_READY);
3776 
3777   /* Need to drop the fragment_download_lock to get the MANIFEST lock */
3778   GST_MANIFEST_LOCK (demux);
3779   g_mutex_lock (&stream->fragment_download_lock);
3780   if (G_UNLIKELY (stream->cancelled)) {
3781     ret = stream->last_ret = GST_FLOW_FLUSHING;
3782     g_mutex_unlock (&stream->fragment_download_lock);
3783     return ret;
3784   }
3785   g_mutex_unlock (&stream->fragment_download_lock);
3786 
3787   /* deactivate and reactivate our ghostpad to make it fresh for a new
3788    * stream */
3789   gst_pad_set_active (stream->internal_pad, FALSE);
3790   gst_pad_set_active (stream->internal_pad, TRUE);
3791 
3792   return ret;
3793 }
3794 
3795 /* must be called with manifest_lock taken.
3796  * Can temporarily release manifest_lock
3797  */
3798 static GstFlowReturn
gst_adaptive_demux_stream_download_header_fragment(GstAdaptiveDemuxStream * stream)3799 gst_adaptive_demux_stream_download_header_fragment (GstAdaptiveDemuxStream *
3800     stream)
3801 {
3802   GstAdaptiveDemux *demux = stream->demux;
3803   GstFlowReturn ret = GST_FLOW_OK;
3804 
3805   if (stream->fragment.header_uri != NULL) {
3806     GST_DEBUG_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
3807         G_GINT64_FORMAT, stream->fragment.header_uri,
3808         stream->fragment.header_range_start, stream->fragment.header_range_end);
3809 
3810     stream->downloading_header = TRUE;
3811     ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3812         stream->fragment.header_uri, stream->fragment.header_range_start,
3813         stream->fragment.header_range_end, NULL);
3814     stream->downloading_header = FALSE;
3815   }
3816 
3817   /* check if we have an index */
3818   if (ret == GST_FLOW_OK) {     /* TODO check for other valid types */
3819 
3820     if (stream->fragment.index_uri != NULL) {
3821       GST_DEBUG_OBJECT (demux,
3822           "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
3823           stream->fragment.index_uri,
3824           stream->fragment.index_range_start, stream->fragment.index_range_end);
3825       stream->downloading_index = TRUE;
3826       ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3827           stream->fragment.index_uri, stream->fragment.index_range_start,
3828           stream->fragment.index_range_end, NULL);
3829       stream->downloading_index = FALSE;
3830     }
3831   }
3832 
3833   return ret;
3834 }
3835 
3836 /* must be called with manifest_lock taken.
3837  * Can temporarily release manifest_lock
3838  */
3839 static GstFlowReturn
gst_adaptive_demux_stream_download_fragment(GstAdaptiveDemuxStream * stream)3840 gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
3841 {
3842   GstAdaptiveDemux *demux = stream->demux;
3843   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3844   gchar *url = NULL;
3845   GstFlowReturn ret;
3846   gboolean retried_once = FALSE, live;
3847   guint http_status;
3848   guint last_status_code;
3849 
3850   /* FIXME :  */
3851   /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
3852   stream->starting_fragment = TRUE;
3853   stream->last_ret = GST_FLOW_OK;
3854   stream->first_fragment_buffer = TRUE;
3855 
3856   GST_DEBUG_OBJECT (stream->pad, "Downloading %s%s%s",
3857       stream->fragment.uri ? "FRAGMENT " : "",
3858       stream->fragment.header_uri ? "HEADER " : "",
3859       stream->fragment.index_uri ? "INDEX" : "");
3860 
3861   if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
3862       stream->fragment.index_uri == NULL)
3863     goto no_url_error;
3864 
3865   if (stream->need_header) {
3866     ret = gst_adaptive_demux_stream_download_header_fragment (stream);
3867     if (ret != GST_FLOW_OK) {
3868       return ret;
3869     }
3870     stream->need_header = FALSE;
3871   }
3872 
3873 again:
3874   ret = GST_FLOW_OK;
3875   url = stream->fragment.uri;
3876   GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream);
3877   if (!url)
3878     return ret;
3879 
3880   stream->last_ret = GST_FLOW_OK;
3881   http_status = 200;
3882 
3883   /* Download the actual fragment, either in fragments or in one go */
3884   if (klass->need_another_chunk && klass->need_another_chunk (stream)
3885       && stream->fragment.chunk_size != 0) {
3886     /* Handle chunk downloading */
3887     gint64 range_start, range_end, chunk_start, chunk_end;
3888     guint64 download_total_bytes;
3889     gint chunk_size = stream->fragment.chunk_size;
3890 
3891     range_start = chunk_start = stream->fragment.range_start;
3892     range_end = stream->fragment.range_end;
3893     /* HTTP ranges are inclusive for the end */
3894     if (chunk_size != -1)
3895       chunk_end = range_start + chunk_size - 1;
3896     else
3897       chunk_end = range_end;
3898 
3899     if (range_end != -1)
3900       chunk_end = MIN (chunk_end, range_end);
3901 
3902     while (!stream->fragment.finished && (chunk_start <= range_end
3903             || range_end == -1)) {
3904       download_total_bytes = stream->download_total_bytes;
3905 
3906       ret =
3907           gst_adaptive_demux_stream_download_uri (demux, stream, url,
3908           chunk_start, chunk_end, &http_status);
3909 
3910       GST_DEBUG_OBJECT (stream->pad,
3911           "Fragment chunk download result: %d (%d) %s", stream->last_ret,
3912           http_status, gst_flow_get_name (stream->last_ret));
3913 
3914       /* Don't retry for any chunks except the first. We would have sent
3915        * data downstream already otherwise and it's difficult to recover
3916        * from that in a meaningful way */
3917       if (chunk_start > range_start)
3918         retried_once = TRUE;
3919 
3920       /* FIXME: Check for 416 Range Not Satisfiable here and fall back to
3921        * downloading up to -1. We don't know the full duration.
3922        * Needs https://bugzilla.gnome.org/show_bug.cgi?id=756806 */
3923       if (ret != GST_FLOW_OK && chunk_end == -1) {
3924         break;
3925       } else if (ret != GST_FLOW_OK) {
3926         chunk_end = -1;
3927         stream->last_ret = GST_FLOW_OK;
3928         continue;
3929       }
3930 
3931       if (chunk_end == -1)
3932         break;
3933 
3934       /* Short read, we're at the end now */
3935       if (stream->download_total_bytes - download_total_bytes <
3936           chunk_end + 1 - chunk_start)
3937         break;
3938 
3939       if (!klass->need_another_chunk (stream))
3940         break;
3941 
3942       /* HTTP ranges are inclusive for the end */
3943       chunk_start += chunk_size;
3944       chunk_size = stream->fragment.chunk_size;
3945       if (chunk_size != -1)
3946         chunk_end = chunk_start + chunk_size - 1;
3947       else
3948         chunk_end = range_end;
3949 
3950       if (range_end != -1)
3951         chunk_end = MIN (chunk_end, range_end);
3952     }
3953   } else {
3954     ret =
3955         gst_adaptive_demux_stream_download_uri (demux, stream, url,
3956         stream->fragment.range_start, stream->fragment.range_end, &http_status);
3957     GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s",
3958         stream->last_ret, http_status, gst_flow_get_name (stream->last_ret));
3959   }
3960   if (ret == GST_FLOW_OK)
3961     goto beach;
3962 
3963 #ifdef OHOS_EXT_FUNC
3964   /**
3965    * ohos.ext.func.0033
3966    * Support reconnection after disconnection in gstcurl.
3967    */
3968   if (ret == GST_FLOW_CUSTOM_ERROR && stream->errcode == GST_RESOURCE_ERROR_TIME_OUT) {
3969     return GST_FLOW_ERROR;
3970   }
3971 #endif
3972 
3973   g_mutex_lock (&stream->fragment_download_lock);
3974   if (G_UNLIKELY (stream->cancelled)) {
3975     g_mutex_unlock (&stream->fragment_download_lock);
3976     return ret;
3977   }
3978   g_mutex_unlock (&stream->fragment_download_lock);
3979 
3980   /* TODO check if we are truly stopping */
3981   if (ret != GST_FLOW_CUSTOM_ERROR)
3982     goto beach;
3983 
3984   last_status_code = stream->last_status_code;
3985   GST_WARNING_OBJECT (stream->pad, "Got custom error, status %u, dc %d",
3986       last_status_code, stream->download_error_count);
3987 
3988   live = gst_adaptive_demux_is_live (demux);
3989 
3990 #ifdef OHOS_EXT_FUNC
3991   // ohos.ext.func.0013
3992   if (last_status_code == 408) {
3993     GST_WARNING_OBJECT (stream->pad, "Receive error of souphttpsrc, status_code is %u", last_status_code);
3994     stream->download_error_count = MAX_DOWNLOAD_ERROR_COUNT;
3995     return GST_FLOW_ERROR;
3996   }
3997 #endif
3998 
3999   if (!retried_once && ((last_status_code / 100 == 4 && live)
4000           || last_status_code / 100 == 5)) {
4001     /* 4xx/5xx */
4002     /* if current position is before available start, switch to next */
4003     if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream))
4004       goto flushing;
4005 
4006     if (live) {
4007       gint64 range_start, range_stop;
4008 
4009       if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
4010               &range_stop))
4011         goto flushing;
4012 
4013       if (demux->segment.position < range_start) {
4014         GST_DEBUG_OBJECT (stream->pad, "Retrying once with next segment");
4015         stream->last_ret = GST_FLOW_OK;
4016         ret = gst_adaptive_demux_eos_handling (stream);
4017         GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
4018             gst_flow_get_name (ret));
4019         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
4020         ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
4021         GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
4022             gst_flow_get_name (ret));
4023         if (ret == GST_FLOW_OK) {
4024           retried_once = TRUE;
4025           goto again;
4026         }
4027       } else if (demux->segment.position > range_stop) {
4028         /* wait a bit to be in range, we don't have any locks at that point */
4029         gint64 wait_time =
4030             gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
4031         if (wait_time > 0) {
4032           gint64 end_time = g_get_monotonic_time () + wait_time / GST_USECOND;
4033 
4034           GST_DEBUG_OBJECT (stream->pad,
4035               "Download waiting for %" GST_TIME_FORMAT,
4036               GST_TIME_ARGS (wait_time));
4037 
4038           GST_MANIFEST_UNLOCK (demux);
4039           g_mutex_lock (&stream->fragment_download_lock);
4040           if (G_UNLIKELY (stream->cancelled)) {
4041             g_mutex_unlock (&stream->fragment_download_lock);
4042             GST_MANIFEST_LOCK (demux);
4043             stream->last_ret = GST_FLOW_FLUSHING;
4044             goto flushing;
4045           }
4046           do {
4047             g_cond_wait_until (&stream->fragment_download_cond,
4048                 &stream->fragment_download_lock, end_time);
4049             if (G_UNLIKELY (stream->cancelled)) {
4050               g_mutex_unlock (&stream->fragment_download_lock);
4051               GST_MANIFEST_LOCK (demux);
4052               stream->last_ret = GST_FLOW_FLUSHING;
4053               goto flushing;
4054             }
4055           } while (!stream->download_finished);
4056           g_mutex_unlock (&stream->fragment_download_lock);
4057 
4058           GST_MANIFEST_LOCK (demux);
4059         }
4060       }
4061     }
4062 
4063   flushing:
4064     if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
4065       /* looks like there is no way of knowing when a live stream has ended
4066        * Have to assume we are falling behind and cause a manifest reload */
4067       GST_DEBUG_OBJECT (stream->pad, "Converting error of live stream to EOS");
4068       return GST_FLOW_EOS;
4069     }
4070   } else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
4071     /* If this is the last fragment, consider failures EOS and not actual
4072      * errors. Due to rounding errors in the durations, the last fragment
4073      * might not actually exist */
4074     GST_DEBUG_OBJECT (stream->pad, "Converting error for last fragment to EOS");
4075     return GST_FLOW_EOS;
4076   } else {
4077     /* retry once (same segment) for 5xx (server errors) */
4078     if (!retried_once) {
4079       retried_once = TRUE;
4080       /* wait a short time in case the server needs a bit to recover, we don't
4081        * care if we get woken up before end time. We can use sleep here since
4082        * we're already blocking and just want to wait some time. */
4083       g_usleep (100000);        /* a tenth of a second */
4084       goto again;
4085     }
4086   }
4087 
4088 beach:
4089   return ret;
4090 
4091 no_url_error:
4092   {
4093     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
4094         (_("Failed to get fragment URL.")),
4095         ("An error happened when getting fragment URL"));
4096     gst_task_stop (stream->download_task);
4097     return GST_FLOW_ERROR;
4098   }
4099 }
4100 
4101 /* this function will take the manifest_lock and will keep it until the end.
4102  * It will release it temporarily only when going to sleep.
4103  * Every time it takes the manifest_lock, it will check for cancelled condition
4104  */
4105 static void
gst_adaptive_demux_stream_download_loop(GstAdaptiveDemuxStream * stream)4106 gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
4107 {
4108   GstAdaptiveDemux *demux = stream->demux;
4109   GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
4110   GstFlowReturn ret;
4111   gboolean live;
4112 
4113   GST_LOG_OBJECT (stream->pad, "download loop start");
4114 
4115   GST_MANIFEST_LOCK (demux);
4116 
4117   g_mutex_lock (&stream->fragment_download_lock);
4118   if (G_UNLIKELY (stream->cancelled)) {
4119     stream->last_ret = GST_FLOW_FLUSHING;
4120     g_mutex_unlock (&stream->fragment_download_lock);
4121     goto cancelled;
4122   }
4123   g_mutex_unlock (&stream->fragment_download_lock);
4124 
4125   /* Check if we're done with our segment */
4126   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4127   if (demux->segment.rate > 0) {
4128     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
4129         && stream->segment.position >= stream->segment.stop) {
4130       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4131       ret = GST_FLOW_EOS;
4132       gst_task_stop (stream->download_task);
4133       goto end_of_manifest;
4134     }
4135   } else {
4136     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
4137         && stream->segment.position <= stream->segment.start) {
4138       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4139       ret = GST_FLOW_EOS;
4140       gst_task_stop (stream->download_task);
4141       goto end_of_manifest;
4142     }
4143   }
4144   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4145 
4146   /* Cleanup old streams if any */
4147   if (G_UNLIKELY (demux->priv->old_streams != NULL)) {
4148     GList *old_streams = demux->priv->old_streams;
4149     demux->priv->old_streams = NULL;
4150 
4151     GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams");
4152     g_list_free_full (old_streams,
4153         (GDestroyNotify) gst_adaptive_demux_stream_free);
4154     GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams (done)");
4155 
4156     /* gst_adaptive_demux_stream_free had temporarily released the manifest_lock.
4157      * Recheck the cancelled flag.
4158      */
4159     g_mutex_lock (&stream->fragment_download_lock);
4160     if (G_UNLIKELY (stream->cancelled)) {
4161       stream->last_ret = GST_FLOW_FLUSHING;
4162       g_mutex_unlock (&stream->fragment_download_lock);
4163       goto cancelled;
4164     }
4165     g_mutex_unlock (&stream->fragment_download_lock);
4166   }
4167 
4168   /* Restarting download, figure out new position
4169    * FIXME : Move this to a separate function ? */
4170   if (G_UNLIKELY (stream->restart_download)) {
4171     GstEvent *seg_event;
4172     GstClockTime cur, ts = 0;
4173     gint64 pos;
4174 
4175     GST_DEBUG_OBJECT (stream->pad,
4176         "Activating stream due to reconfigure event");
4177 
4178     if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
4179       ts = (GstClockTime) pos;
4180       GST_DEBUG_OBJECT (demux, "Downstream position: %"
4181           GST_TIME_FORMAT, GST_TIME_ARGS (ts));
4182     } else {
4183       /* query other pads as some faulty element in the pad's branch might
4184        * reject position queries. This should be better than using the
4185        * demux segment position that can be much ahead */
4186       GList *iter;
4187 
4188       for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
4189         GstAdaptiveDemuxStream *cur_stream =
4190             (GstAdaptiveDemuxStream *) iter->data;
4191 
4192         if (gst_pad_peer_query_position (cur_stream->pad, GST_FORMAT_TIME,
4193                 &pos)) {
4194           ts = (GstClockTime) pos;
4195           GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
4196               GST_TIME_FORMAT, GST_TIME_ARGS (ts));
4197           break;
4198         }
4199       }
4200     }
4201 
4202     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4203     cur =
4204         gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
4205         stream->segment.position);
4206 
4207     /* we might have already pushed this data */
4208     ts = MAX (ts, cur);
4209 
4210     GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
4211         "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
4212 
4213     if (GST_CLOCK_TIME_IS_VALID (ts)) {
4214       GstClockTime offset, period_start;
4215 
4216       offset =
4217           gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4218       period_start = gst_adaptive_demux_get_period_start_time (demux);
4219 
4220       /* TODO check return */
4221       gst_adaptive_demux_stream_seek (demux, stream, demux->segment.rate >= 0,
4222           0, ts, &ts);
4223 
4224       stream->segment.position = ts - period_start + offset;
4225     }
4226 
4227     /* The stream's segment is still correct except for
4228      * the position, so let's send a new one with the
4229      * updated position */
4230     seg_event = gst_event_new_segment (&stream->segment);
4231     gst_event_set_seqnum (seg_event, demux->priv->segment_seqnum);
4232     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4233 
4234     GST_DEBUG_OBJECT (stream->pad, "Sending restart segment: %"
4235         GST_PTR_FORMAT, seg_event);
4236     gst_pad_push_event (stream->pad, seg_event);
4237 
4238     stream->discont = TRUE;
4239     stream->restart_download = FALSE;
4240   }
4241 
4242   live = gst_adaptive_demux_is_live (demux);
4243 
4244   /* Get information about the fragment to download */
4245   GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
4246   ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
4247   GST_DEBUG_OBJECT (stream->pad, "Fragment info update result: %d %s",
4248       ret, gst_flow_get_name (ret));
4249   if (ret == GST_FLOW_OK) {
4250 
4251     /* wait for live fragments to be available */
4252     if (live) {
4253       gint64 wait_time =
4254           gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
4255       if (wait_time > 0) {
4256         GstClockTime end_time =
4257             gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
4258 
4259         GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
4260             GST_TIME_ARGS (wait_time));
4261 
4262         GST_MANIFEST_UNLOCK (demux);
4263 
4264         g_mutex_lock (&stream->fragment_download_lock);
4265         if (G_UNLIKELY (stream->cancelled)) {
4266           g_mutex_unlock (&stream->fragment_download_lock);
4267           GST_MANIFEST_LOCK (demux);
4268           stream->last_ret = GST_FLOW_FLUSHING;
4269           goto cancelled;
4270         }
4271         gst_adaptive_demux_wait_until (demux->realtime_clock,
4272             &stream->fragment_download_cond, &stream->fragment_download_lock,
4273             end_time);
4274         g_mutex_unlock (&stream->fragment_download_lock);
4275 
4276         GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
4277 
4278         GST_MANIFEST_LOCK (demux);
4279 
4280         g_mutex_lock (&stream->fragment_download_lock);
4281         if (G_UNLIKELY (stream->cancelled)) {
4282           stream->last_ret = GST_FLOW_FLUSHING;
4283           g_mutex_unlock (&stream->fragment_download_lock);
4284           goto cancelled;
4285         }
4286         g_mutex_unlock (&stream->fragment_download_lock);
4287       }
4288     }
4289 
4290     stream->last_ret = GST_FLOW_OK;
4291 
4292     next_download = gst_adaptive_demux_get_monotonic_time (demux);
4293     ret = gst_adaptive_demux_stream_download_fragment (stream);
4294 
4295 #ifdef OHOS_EXT_FUNC
4296     /**
4297      * ohos.ext.func.0033
4298      * Support reconnection after disconnection in gstcurl.
4299      * Stop download task when reconnection timeout.
4300      */
4301     if (ret == GST_FLOW_ERROR && stream->errcode == GST_RESOURCE_ERROR_TIME_OUT) {
4302       GST_WARNING_OBJECT (stream->pad, "download failed since connect time out, stop download task");
4303       gst_task_stop (stream->download_task);
4304       goto end;
4305     }
4306 #endif
4307 
4308     if (ret == GST_FLOW_FLUSHING) {
4309       g_mutex_lock (&stream->fragment_download_lock);
4310       if (G_UNLIKELY (stream->cancelled)) {
4311         stream->last_ret = GST_FLOW_FLUSHING;
4312         g_mutex_unlock (&stream->fragment_download_lock);
4313         goto cancelled;
4314       }
4315       g_mutex_unlock (&stream->fragment_download_lock);
4316     }
4317 
4318   } else {
4319     stream->last_ret = ret;
4320   }
4321 
4322   switch (ret) {
4323     case GST_FLOW_OK:
4324       break;                    /* all is good, let's go */
4325     case GST_FLOW_EOS:
4326       GST_DEBUG_OBJECT (stream->pad, "EOS, checking to stop download loop");
4327 
4328       /* we push the EOS after releasing the object lock */
4329       if (gst_adaptive_demux_is_live (demux)
4330           && (demux->segment.rate == 1.0
4331               || gst_adaptive_demux_stream_in_live_seek_range (demux,
4332                   stream))) {
4333         GstAdaptiveDemuxClass *demux_class =
4334             GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4335 
4336         /* this might be a fragment download error, refresh the manifest, just in case */
4337         if (!demux_class->requires_periodical_playlist_update (demux)) {
4338           ret = gst_adaptive_demux_update_manifest (demux);
4339           break;
4340           /* Wait only if we can ensure current manifest has been expired.
4341            * The meaning "we have next period" *WITH* EOS is that, current
4342            * period has been ended but we can continue to the next period */
4343         } else if (!gst_adaptive_demux_has_next_period (demux) &&
4344             gst_adaptive_demux_stream_wait_manifest_update (demux, stream)) {
4345           goto end;
4346         }
4347         gst_task_stop (stream->download_task);
4348         if (stream->replaced) {
4349           goto end;
4350         }
4351       } else {
4352         gst_task_stop (stream->download_task);
4353       }
4354 
4355       if (gst_adaptive_demux_combine_flows (demux) == GST_FLOW_EOS) {
4356         if (gst_adaptive_demux_has_next_period (demux)) {
4357           GST_DEBUG_OBJECT (stream->pad,
4358               "Next period available, not sending EOS");
4359           gst_adaptive_demux_advance_period (demux);
4360           ret = GST_FLOW_OK;
4361         }
4362       }
4363       break;
4364 
4365     case GST_FLOW_NOT_LINKED:
4366     {
4367       GstFlowReturn ret;
4368       gst_task_stop (stream->download_task);
4369 
4370       ret = gst_adaptive_demux_combine_flows (demux);
4371       if (ret == GST_FLOW_NOT_LINKED) {
4372         GST_ELEMENT_FLOW_ERROR (demux, ret);
4373       }
4374     }
4375       break;
4376 
4377     case GST_FLOW_FLUSHING:{
4378       GList *iter;
4379 
4380       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4381         GstAdaptiveDemuxStream *other;
4382 
4383         other = iter->data;
4384         gst_task_stop (other->download_task);
4385       }
4386     }
4387       break;
4388 
4389     default:
4390       if (ret <= GST_FLOW_ERROR) {
4391         gboolean is_live = gst_adaptive_demux_is_live (demux);
4392         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
4393         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
4394           goto download_error;
4395         }
4396 
4397         g_clear_error (&stream->last_error);
4398 
4399         /* First try to update the playlist for non-live playlists
4400          * in case the URIs have changed in the meantime. But only
4401          * try it the first time, after that we're going to wait a
4402          * a bit to not flood the server */
4403         if (stream->download_error_count == 1 && !is_live) {
4404           /* TODO hlsdemux had more options to this function (boolean and err) */
4405 
4406           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
4407             /* Retry immediately, the playlist actually has changed */
4408             GST_DEBUG_OBJECT (demux, "Updated the playlist");
4409             goto end;
4410           }
4411         }
4412 
4413         /* Wait half the fragment duration before retrying */
4414         next_download += stream->fragment.duration / 2;
4415 
4416         GST_MANIFEST_UNLOCK (demux);
4417 
4418         g_mutex_lock (&stream->fragment_download_lock);
4419         if (G_UNLIKELY (stream->cancelled)) {
4420           g_mutex_unlock (&stream->fragment_download_lock);
4421           GST_MANIFEST_LOCK (demux);
4422           stream->last_ret = GST_FLOW_FLUSHING;
4423           goto cancelled;
4424         }
4425         gst_adaptive_demux_wait_until (demux->realtime_clock,
4426             &stream->fragment_download_cond, &stream->fragment_download_lock,
4427             next_download);
4428         g_mutex_unlock (&stream->fragment_download_lock);
4429 
4430         GST_DEBUG_OBJECT (demux, "Retrying now");
4431 
4432         GST_MANIFEST_LOCK (demux);
4433 
4434         g_mutex_lock (&stream->fragment_download_lock);
4435         if (G_UNLIKELY (stream->cancelled)) {
4436           stream->last_ret = GST_FLOW_FLUSHING;
4437           g_mutex_unlock (&stream->fragment_download_lock);
4438           goto cancelled;
4439         }
4440         g_mutex_unlock (&stream->fragment_download_lock);
4441 
4442         /* Refetch the playlist now after we waited */
4443         if (!is_live
4444             && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
4445           GST_DEBUG_OBJECT (demux, "Updated the playlist");
4446         }
4447         goto end;
4448       }
4449       break;
4450   }
4451 
4452 end_of_manifest:
4453   if (G_UNLIKELY (ret == GST_FLOW_EOS)) {
4454     if (GST_OBJECT_PARENT (stream->pad) != NULL) {
4455       if (demux->next_streams == NULL && demux->prepared_streams == NULL) {
4456         GST_DEBUG_OBJECT (stream->src, "Pushing EOS on pad");
4457         gst_adaptive_demux_stream_push_event (stream, gst_event_new_eos ());
4458       } else {
4459         GST_DEBUG_OBJECT (stream->src,
4460             "Stream is EOS, but we're switching fragments. Not sending.");
4461       }
4462     } else {
4463       GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
4464       goto download_error;
4465     }
4466   }
4467 
4468 end:
4469   GST_MANIFEST_UNLOCK (demux);
4470   GST_LOG_OBJECT (stream->pad, "download loop end");
4471   return;
4472 
4473 cancelled:
4474   {
4475     GST_DEBUG_OBJECT (stream->pad, "Stream has been cancelled");
4476     goto end;
4477   }
4478 download_error:
4479   {
4480     GstMessage *msg;
4481 
4482     if (stream->last_error) {
4483       gchar *debug = g_strdup_printf ("Error on stream %s:%s",
4484           GST_DEBUG_PAD_NAME (stream->pad));
4485       msg =
4486           gst_message_new_error (GST_OBJECT_CAST (demux), stream->last_error,
4487           debug);
4488       GST_ERROR_OBJECT (stream->pad, "Download error: %s",
4489           stream->last_error->message);
4490       g_free (debug);
4491     } else {
4492       GError *err =
4493           g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
4494           _("Couldn't download fragments"));
4495       msg =
4496           gst_message_new_error (GST_OBJECT_CAST (demux), err,
4497           "Fragment downloading has failed consecutive times");
4498       g_error_free (err);
4499       GST_ERROR_OBJECT (stream->pad,
4500           "Download error: Couldn't download fragments, too many failures");
4501     }
4502 
4503     gst_task_stop (stream->download_task);
4504     if (stream->src) {
4505       GstElement *src = stream->src;
4506 
4507       stream->src = NULL;
4508       GST_MANIFEST_UNLOCK (demux);
4509       gst_element_set_locked_state (src, TRUE);
4510       gst_element_set_state (src, GST_STATE_NULL);
4511       gst_bin_remove (GST_BIN_CAST (demux), src);
4512       GST_MANIFEST_LOCK (demux);
4513     }
4514 
4515     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
4516 
4517     goto end;
4518   }
4519 }
4520 
4521 static void
gst_adaptive_demux_updates_loop(GstAdaptiveDemux * demux)4522 gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
4523 {
4524   GstClockTime next_update;
4525   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4526 
4527   /* Loop for updating of the playlist. This periodically checks if
4528    * the playlist is updated and does so, then signals the streaming
4529    * thread in case it can continue downloading now. */
4530 
4531   /* block until the next scheduled update or the signal to quit this thread */
4532   GST_DEBUG_OBJECT (demux, "Started updates task");
4533 
4534   GST_MANIFEST_LOCK (demux);
4535 
4536   next_update =
4537       gst_adaptive_demux_get_monotonic_time (demux) +
4538       klass->get_manifest_update_interval (demux) * GST_USECOND;
4539 
4540   /* Updating playlist only needed for live playlists */
4541   while (gst_adaptive_demux_is_live (demux)) {
4542     GstFlowReturn ret = GST_FLOW_OK;
4543 
4544     /* Wait here until we should do the next update or we're cancelled */
4545     GST_DEBUG_OBJECT (demux, "Wait for next playlist update");
4546 
4547     GST_MANIFEST_UNLOCK (demux);
4548 
4549     g_mutex_lock (&demux->priv->updates_timed_lock);
4550     if (demux->priv->stop_updates_task) {
4551       g_mutex_unlock (&demux->priv->updates_timed_lock);
4552       goto quit;
4553     }
4554     gst_adaptive_demux_wait_until (demux->realtime_clock,
4555         &demux->priv->updates_timed_cond,
4556         &demux->priv->updates_timed_lock, next_update);
4557     g_mutex_unlock (&demux->priv->updates_timed_lock);
4558 
4559     g_mutex_lock (&demux->priv->updates_timed_lock);
4560     if (demux->priv->stop_updates_task) {
4561       g_mutex_unlock (&demux->priv->updates_timed_lock);
4562       goto quit;
4563     }
4564     g_mutex_unlock (&demux->priv->updates_timed_lock);
4565 
4566     GST_MANIFEST_LOCK (demux);
4567 
4568     GST_DEBUG_OBJECT (demux, "Updating playlist");
4569 
4570     ret = gst_adaptive_demux_update_manifest (demux);
4571 
4572     if (ret == GST_FLOW_EOS) {
4573     } else if (ret != GST_FLOW_OK) {
4574       /* update_failed_count is used only here, no need to protect it */
4575       demux->priv->update_failed_count++;
4576       if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
4577         GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
4578             gst_flow_get_name (ret));
4579         next_update = gst_adaptive_demux_get_monotonic_time (demux)
4580             + klass->get_manifest_update_interval (demux) * GST_USECOND;
4581       } else {
4582         GST_ELEMENT_ERROR (demux, STREAM, FAILED,
4583             (_("Internal data stream error.")), ("Could not update playlist"));
4584         GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
4585         gst_task_stop (demux->priv->updates_task);
4586         GST_MANIFEST_UNLOCK (demux);
4587         goto end;
4588       }
4589     } else {
4590       GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
4591       demux->priv->update_failed_count = 0;
4592       next_update =
4593           gst_adaptive_demux_get_monotonic_time (demux) +
4594           klass->get_manifest_update_interval (demux) * GST_USECOND;
4595 
4596       /* Wake up download tasks */
4597       g_mutex_lock (&demux->priv->manifest_update_lock);
4598       g_cond_broadcast (&demux->priv->manifest_cond);
4599       g_mutex_unlock (&demux->priv->manifest_update_lock);
4600     }
4601   }
4602 
4603   GST_MANIFEST_UNLOCK (demux);
4604 
4605 quit:
4606   {
4607     GST_DEBUG_OBJECT (demux, "Stop updates task request detected.");
4608   }
4609 
4610 end:
4611   {
4612     return;
4613   }
4614 }
4615 
4616 /* must be called with manifest_lock taken */
4617 static gboolean
gst_adaptive_demux_stream_push_event(GstAdaptiveDemuxStream * stream,GstEvent * event)4618 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
4619     GstEvent * event)
4620 {
4621   gboolean ret;
4622   GstPad *pad;
4623   GstAdaptiveDemux *demux = stream->demux;
4624 
4625   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
4626     stream->eos = TRUE;
4627   }
4628 
4629   pad = gst_object_ref (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream));
4630 
4631   /* Can't push events holding the manifest lock */
4632   GST_MANIFEST_UNLOCK (demux);
4633 
4634   GST_DEBUG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
4635       "Pushing event %" GST_PTR_FORMAT, event);
4636 
4637   ret = gst_pad_push_event (pad, event);
4638 
4639   gst_object_unref (pad);
4640 
4641   GST_MANIFEST_LOCK (demux);
4642 
4643   return ret;
4644 }
4645 
4646 /* must be called with manifest_lock taken */
4647 static gboolean
gst_adaptive_demux_is_live(GstAdaptiveDemux * demux)4648 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
4649 {
4650   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4651 
4652   if (klass->is_live)
4653     return klass->is_live (demux);
4654   return FALSE;
4655 }
4656 
4657 /* must be called with manifest_lock taken */
4658 static GstFlowReturn
gst_adaptive_demux_stream_seek(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,gboolean forward,GstSeekFlags flags,GstClockTime ts,GstClockTime * final_ts)4659 gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
4660     GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
4661     GstClockTime ts, GstClockTime * final_ts)
4662 {
4663   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4664 
4665   if (klass->stream_seek)
4666     return klass->stream_seek (stream, forward, flags, ts, final_ts);
4667   return GST_FLOW_ERROR;
4668 }
4669 
4670 /* must be called with manifest_lock taken */
4671 static gboolean
gst_adaptive_demux_stream_has_next_fragment(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4672 gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
4673     GstAdaptiveDemuxStream * stream)
4674 {
4675   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4676   gboolean ret = TRUE;
4677 
4678   if (klass->stream_has_next_fragment)
4679     ret = klass->stream_has_next_fragment (stream);
4680 
4681   return ret;
4682 }
4683 
4684 /* must be called with manifest_lock taken */
4685 /* Called from:
4686  *  the ::finish_fragment() handlers when an *actual* fragment is done
4687  *   */
4688 GstFlowReturn
gst_adaptive_demux_stream_advance_fragment(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstClockTime duration)4689 gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
4690     GstAdaptiveDemuxStream * stream, GstClockTime duration)
4691 {
4692   GstFlowReturn ret;
4693 
4694   if (stream->last_ret == GST_FLOW_OK) {
4695     stream->last_ret =
4696         gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
4697         duration);
4698   }
4699   ret = stream->last_ret;
4700 
4701   return ret;
4702 }
4703 
4704 /* must be called with manifest_lock taken */
4705 GstFlowReturn
gst_adaptive_demux_stream_advance_fragment_unlocked(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,GstClockTime duration)4706 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
4707     GstAdaptiveDemuxStream * stream, GstClockTime duration)
4708 {
4709   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4710   GstFlowReturn ret;
4711 
4712   g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
4713 
4714   GST_LOG_OBJECT (stream->pad,
4715       "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4716       GST_TIME_ARGS (stream->fragment.timestamp), GST_TIME_ARGS (duration));
4717 
4718   stream->download_error_count = 0;
4719   g_clear_error (&stream->last_error);
4720 
4721   /* FIXME - url has no indication of byte ranges for subsegments */
4722   /* FIXME : All those time statistics are biased, since they are calculated
4723    * *AFTER* the queue2, which might be blocking. They should ideally be
4724    * calculated *before* queue2 in the uri_handler_probe */
4725   gst_element_post_message (GST_ELEMENT_CAST (demux),
4726       gst_message_new_element (GST_OBJECT_CAST (demux),
4727           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
4728               "manifest-uri", G_TYPE_STRING,
4729               demux->manifest_uri, "uri", G_TYPE_STRING,
4730               stream->fragment.uri, "fragment-start-time",
4731               GST_TYPE_CLOCK_TIME, stream->download_start_time,
4732               "fragment-stop-time", GST_TYPE_CLOCK_TIME,
4733               gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
4734               stream->download_total_bytes, "fragment-download-time",
4735               GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
4736 
4737   /* Don't update to the end of the segment if in reverse playback */
4738   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4739   if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
4740     GstClockTime offset =
4741         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4742     GstClockTime period_start =
4743         gst_adaptive_demux_get_period_start_time (demux);
4744 
4745     stream->segment.position += duration;
4746 
4747     /* Convert from position inside the stream's segment to the demuxer's
4748      * segment, they are not necessarily the same */
4749     if (stream->segment.position - offset + period_start >
4750         demux->segment.position)
4751       demux->segment.position =
4752           stream->segment.position - offset + period_start;
4753   }
4754   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4755 
4756   /* When advancing with a non 1.0 rate on live streams, we need to check
4757    * the live seeking range again to make sure we can still advance to
4758    * that position */
4759   if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
4760     if (!gst_adaptive_demux_stream_in_live_seek_range (demux, stream))
4761       ret = GST_FLOW_EOS;
4762     else
4763       ret = klass->stream_advance_fragment (stream);
4764   } else if (gst_adaptive_demux_is_live (demux)
4765       || gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
4766     ret = klass->stream_advance_fragment (stream);
4767   } else {
4768     ret = GST_FLOW_EOS;
4769   }
4770 
4771   stream->download_start_time =
4772       GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
4773 
4774   if (ret == GST_FLOW_OK) {
4775     if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
4776             gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) {
4777       stream->need_header = TRUE;
4778       ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
4779     }
4780 
4781     /* the subclass might want to switch pads */
4782     if (G_UNLIKELY (demux->next_streams)) {
4783       GList *iter;
4784       gboolean can_expose = TRUE;
4785 
4786       gst_task_stop (stream->download_task);
4787 
4788       ret = GST_FLOW_EOS;
4789 
4790       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4791         /* Only expose if all streams are now cancelled or finished downloading */
4792         GstAdaptiveDemuxStream *other = iter->data;
4793         if (other != stream) {
4794           g_mutex_lock (&other->fragment_download_lock);
4795           can_expose &= (other->cancelled == TRUE
4796               || other->download_finished == TRUE);
4797           g_mutex_unlock (&other->fragment_download_lock);
4798         }
4799       }
4800 
4801       if (can_expose) {
4802         GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
4803             "to do bitrate switching");
4804         gst_adaptive_demux_prepare_streams (demux, FALSE);
4805         gst_adaptive_demux_start_tasks (demux, TRUE);
4806       } else {
4807         GST_LOG_OBJECT (demux, "Not switching yet - ongoing downloads");
4808       }
4809     }
4810   }
4811 
4812   return ret;
4813 }
4814 
4815 /* must be called with manifest_lock taken */
4816 static gboolean
gst_adaptive_demux_stream_select_bitrate(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream,guint64 bitrate)4817 gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
4818     demux, GstAdaptiveDemuxStream * stream, guint64 bitrate)
4819 {
4820   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4821 
4822   if (klass->stream_select_bitrate)
4823     return klass->stream_select_bitrate (stream, bitrate);
4824   return FALSE;
4825 }
4826 
4827 /* must be called with manifest_lock taken */
4828 static GstFlowReturn
gst_adaptive_demux_stream_update_fragment_info(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4829 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
4830     GstAdaptiveDemuxStream * stream)
4831 {
4832   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4833   GstFlowReturn ret;
4834 
4835   g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
4836       GST_FLOW_ERROR);
4837 
4838   /* Make sure the sub-class will update bitrate, or else
4839    * we will later */
4840   stream->fragment.bitrate = 0;
4841   stream->fragment.finished = FALSE;
4842 
4843   GST_LOG_OBJECT (stream->pad, "position %" GST_TIME_FORMAT,
4844       GST_TIME_ARGS (stream->segment.position));
4845 
4846   ret = klass->stream_update_fragment_info (stream);
4847 
4848   GST_LOG_OBJECT (stream->pad, "ret:%s uri:%s", gst_flow_get_name (ret),
4849       stream->fragment.uri);
4850   if (ret == GST_FLOW_OK) {
4851     GST_LOG_OBJECT (stream->pad,
4852         "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4853         GST_TIME_ARGS (stream->fragment.timestamp),
4854         GST_TIME_ARGS (stream->fragment.duration));
4855     GST_LOG_OBJECT (stream->pad,
4856         "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
4857         stream->fragment.range_start, stream->fragment.range_end);
4858   }
4859 
4860   return ret;
4861 }
4862 
4863 /* must be called with manifest_lock taken */
4864 static gint64
gst_adaptive_demux_stream_get_fragment_waiting_time(GstAdaptiveDemux * demux,GstAdaptiveDemuxStream * stream)4865 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux *
4866     demux, GstAdaptiveDemuxStream * stream)
4867 {
4868   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4869 
4870   if (klass->stream_get_fragment_waiting_time)
4871     return klass->stream_get_fragment_waiting_time (stream);
4872   return 0;
4873 }
4874 
4875 /* must be called with manifest_lock taken */
4876 static GstFlowReturn
gst_adaptive_demux_update_manifest_default(GstAdaptiveDemux * demux)4877 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4878 {
4879   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4880   GstFragment *download;
4881   GstBuffer *buffer;
4882   GstFlowReturn ret;
4883   GError *error = NULL;
4884 
4885   download = gst_uri_downloader_fetch_uri (demux->downloader,
4886       demux->manifest_uri, NULL, TRUE, TRUE, TRUE, &error);
4887   if (download) {
4888     g_free (demux->manifest_uri);
4889     g_free (demux->manifest_base_uri);
4890     if (download->redirect_permanent && download->redirect_uri) {
4891       demux->manifest_uri = g_strdup (download->redirect_uri);
4892       demux->manifest_base_uri = NULL;
4893     } else {
4894       demux->manifest_uri = g_strdup (download->uri);
4895       demux->manifest_base_uri = g_strdup (download->redirect_uri);
4896     }
4897 
4898     buffer = gst_fragment_get_buffer (download);
4899     g_object_unref (download);
4900     ret = klass->update_manifest_data (demux, buffer);
4901     gst_buffer_unref (buffer);
4902     /* FIXME: Should the manifest uri vars be reverted to original
4903      * values if updating fails? */
4904   } else {
4905     GST_WARNING_OBJECT (demux, "Failed to download manifest: %s",
4906         error->message);
4907     ret = GST_FLOW_NOT_LINKED;
4908   }
4909   g_clear_error (&error);
4910 
4911   return ret;
4912 }
4913 
4914 /* must be called with manifest_lock taken */
4915 static GstFlowReturn
gst_adaptive_demux_update_manifest(GstAdaptiveDemux * demux)4916 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4917 {
4918   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4919   GstFlowReturn ret;
4920 
4921   ret = klass->update_manifest (demux);
4922 
4923   if (ret == GST_FLOW_OK) {
4924     GstClockTime duration;
4925     /* Send an updated duration message */
4926     duration = klass->get_duration (demux);
4927     if (duration != GST_CLOCK_TIME_NONE) {
4928       GST_DEBUG_OBJECT (demux,
4929           "Sending duration message : %" GST_TIME_FORMAT,
4930           GST_TIME_ARGS (duration));
4931       gst_element_post_message (GST_ELEMENT (demux),
4932           gst_message_new_duration_changed (GST_OBJECT (demux)));
4933     } else {
4934       GST_DEBUG_OBJECT (demux,
4935           "Duration unknown, can not send the duration message");
4936     }
4937 
4938     /* If a manifest changes it's liveness or periodic updateness, we need
4939      * to start/stop the manifest update task appropriately */
4940     /* Keep this condition in sync with the one in
4941      * gst_adaptive_demux_start_manifest_update_task()
4942      */
4943     if (gst_adaptive_demux_is_live (demux) &&
4944         klass->requires_periodical_playlist_update (demux)) {
4945       gst_adaptive_demux_start_manifest_update_task (demux);
4946     } else {
4947       gst_adaptive_demux_stop_manifest_update_task (demux);
4948     }
4949   }
4950 
4951   return ret;
4952 }
4953 
4954 void
gst_adaptive_demux_stream_fragment_clear(GstAdaptiveDemuxStreamFragment * f)4955 gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f)
4956 {
4957   g_free (f->uri);
4958   f->uri = NULL;
4959   f->range_start = 0;
4960   f->range_end = -1;
4961 
4962   g_free (f->header_uri);
4963   f->header_uri = NULL;
4964   f->header_range_start = 0;
4965   f->header_range_end = -1;
4966 
4967   g_free (f->index_uri);
4968   f->index_uri = NULL;
4969   f->index_range_start = 0;
4970   f->index_range_end = -1;
4971 
4972   f->finished = FALSE;
4973 }
4974 
4975 /* must be called with manifest_lock taken */
4976 static gboolean
gst_adaptive_demux_has_next_period(GstAdaptiveDemux * demux)4977 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4978 {
4979   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4980   gboolean ret = FALSE;
4981 
4982   if (klass->has_next_period)
4983     ret = klass->has_next_period (demux);
4984   GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4985   return ret;
4986 }
4987 
4988 /* must be called with manifest_lock taken */
4989 static void
gst_adaptive_demux_advance_period(GstAdaptiveDemux * demux)4990 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4991 {
4992   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4993 
4994   g_return_if_fail (klass->advance_period != NULL);
4995 
4996   GST_DEBUG_OBJECT (demux, "Advancing to next period");
4997   klass->advance_period (demux);
4998   gst_adaptive_demux_prepare_streams (demux, FALSE);
4999   gst_adaptive_demux_start_tasks (demux, TRUE);
5000 }
5001 
5002 /**
5003  * gst_adaptive_demux_get_monotonic_time:
5004  * Returns: a monotonically increasing time, using the system realtime clock
5005  */
5006 GstClockTime
gst_adaptive_demux_get_monotonic_time(GstAdaptiveDemux * demux)5007 gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux)
5008 {
5009   g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
5010   return gst_clock_get_time (demux->realtime_clock);
5011 }
5012 
5013 /**
5014  * gst_adaptive_demux_get_client_now_utc:
5015  * @demux: #GstAdaptiveDemux
5016  * Returns: the client's estimate of UTC
5017  *
5018  * Used to find the client's estimate of UTC, using the system realtime clock.
5019  */
5020 GDateTime *
gst_adaptive_demux_get_client_now_utc(GstAdaptiveDemux * demux)5021 gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
5022 {
5023   GstClockTime rtc_now;
5024   GDateTime *unix_datetime;
5025   GDateTime *result_datetime;
5026   gint64 utc_now_in_us;
5027 
5028   rtc_now = gst_clock_get_time (demux->realtime_clock);
5029   utc_now_in_us = demux->clock_offset + GST_TIME_AS_USECONDS (rtc_now);
5030   unix_datetime =
5031       g_date_time_new_from_unix_utc (utc_now_in_us / G_TIME_SPAN_SECOND);
5032   result_datetime =
5033       g_date_time_add (unix_datetime, utc_now_in_us % G_TIME_SPAN_SECOND);
5034   g_date_time_unref (unix_datetime);
5035   return result_datetime;
5036 }
5037 
5038 /**
5039  * gst_adaptive_demux_is_running
5040  * @demux: #GstAdaptiveDemux
5041  * Returns: whether the demuxer is processing data
5042  *
5043  * Returns FALSE if shutdown has started (transitioning down from
5044  * PAUSED), otherwise TRUE.
5045  */
5046 gboolean
gst_adaptive_demux_is_running(GstAdaptiveDemux * demux)5047 gst_adaptive_demux_is_running (GstAdaptiveDemux * demux)
5048 {
5049   return g_atomic_int_get (&demux->running);
5050 }
5051 
5052 static GstAdaptiveDemuxTimer *
gst_adaptive_demux_timer_new(GCond * cond,GMutex * mutex)5053 gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
5054 {
5055   GstAdaptiveDemuxTimer *timer;
5056 
5057   timer = g_slice_new (GstAdaptiveDemuxTimer);
5058   timer->fired = FALSE;
5059   timer->cond = cond;
5060   timer->mutex = mutex;
5061   g_atomic_int_set (&timer->ref_count, 1);
5062   return timer;
5063 }
5064 
5065 static GstAdaptiveDemuxTimer *
gst_adaptive_demux_timer_ref(GstAdaptiveDemuxTimer * timer)5066 gst_adaptive_demux_timer_ref (GstAdaptiveDemuxTimer * timer)
5067 {
5068   g_return_val_if_fail (timer != NULL, NULL);
5069   g_atomic_int_inc (&timer->ref_count);
5070   return timer;
5071 }
5072 
5073 static void
gst_adaptive_demux_timer_unref(GstAdaptiveDemuxTimer * timer)5074 gst_adaptive_demux_timer_unref (GstAdaptiveDemuxTimer * timer)
5075 {
5076   g_return_if_fail (timer != NULL);
5077   if (g_atomic_int_dec_and_test (&timer->ref_count)) {
5078     g_slice_free (GstAdaptiveDemuxTimer, timer);
5079   }
5080 }
5081 
5082 /* gst_adaptive_demux_wait_until:
5083  * A replacement for g_cond_wait_until that uses the clock rather
5084  * than system time to control the duration of the sleep. Typically
5085  * clock is actually a #GstSystemClock, in which case this function
5086  * behaves exactly like g_cond_wait_until. Inside unit tests,
5087  * the clock is typically a #GstTestClock, which allows tests to run
5088  * in non-realtime.
5089  * This function must be called with mutex held.
5090  */
5091 static gboolean
gst_adaptive_demux_wait_until(GstClock * clock,GCond * cond,GMutex * mutex,GstClockTime end_time)5092 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
5093     GstClockTime end_time)
5094 {
5095   GstAdaptiveDemuxTimer *timer;
5096   gboolean fired;
5097   GstClockReturn res;
5098 
5099   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
5100     /* for an invalid time, gst_clock_id_wait_async will try to call
5101      * gst_adaptive_demux_clock_callback from the current thread.
5102      * It still holds the mutex while doing that, so it will deadlock.
5103      * g_cond_wait_until would return immediately with false, so we'll do the same.
5104      */
5105     return FALSE;
5106   }
5107   timer = gst_adaptive_demux_timer_new (cond, mutex);
5108   timer->clock_id = gst_clock_new_single_shot_id (clock, end_time);
5109   res =
5110       gst_clock_id_wait_async (timer->clock_id,
5111       gst_adaptive_demux_clock_callback, gst_adaptive_demux_timer_ref (timer),
5112       (GDestroyNotify) gst_adaptive_demux_timer_unref);
5113   /* clock does not support asynchronously wait. Assert and return */
5114   if (res == GST_CLOCK_UNSUPPORTED) {
5115     gst_clock_id_unref (timer->clock_id);
5116     gst_adaptive_demux_timer_unref (timer);
5117     g_return_val_if_reached (TRUE);
5118   }
5119   g_assert (!timer->fired);
5120   /* the gst_adaptive_demux_clock_callback() will signal the
5121    * cond when the clock's single shot timer fires, or the cond will be
5122    * signalled by another thread that wants to cause this wait to finish
5123    * early (e.g. to terminate the waiting thread).
5124    * There is no need for a while loop here, because that logic is
5125    * implemented by the function calling gst_adaptive_demux_wait_until() */
5126   g_cond_wait (cond, mutex);
5127   fired = timer->fired;
5128   if (!fired)
5129     gst_clock_id_unschedule (timer->clock_id);
5130   gst_clock_id_unref (timer->clock_id);
5131   gst_adaptive_demux_timer_unref (timer);
5132   return !fired;
5133 }
5134 
5135 static gboolean
gst_adaptive_demux_clock_callback(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)5136 gst_adaptive_demux_clock_callback (GstClock * clock,
5137     GstClockTime time, GstClockID id, gpointer user_data)
5138 {
5139   GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
5140   g_return_val_if_fail (timer != NULL, FALSE);
5141   g_mutex_lock (timer->mutex);
5142   timer->fired = TRUE;
5143   g_cond_signal (timer->cond);
5144   g_mutex_unlock (timer->mutex);
5145   return TRUE;
5146 }
5147 
5148 /**
5149  * gst_adaptive_demux_get_qos_earliest_time:
5150  *
5151  * Returns: The QOS earliest time
5152  *
5153  * Since: 1.20
5154  */
5155 GstClockTime
gst_adaptive_demux_get_qos_earliest_time(GstAdaptiveDemux * demux)5156 gst_adaptive_demux_get_qos_earliest_time (GstAdaptiveDemux * demux)
5157 {
5158   GstClockTime earliest;
5159 
5160   GST_OBJECT_LOCK (demux);
5161   earliest = demux->priv->qos_earliest_time;
5162   GST_OBJECT_UNLOCK (demux);
5163 
5164   return earliest;
5165 }
5166