• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GstCurlHttpSrc
3  * Copyright 2017 British Broadcasting Corporation - Research and Development
4  *
5  * Author: Sam Hurst <samuelh@rd.bbc.co.uk>
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Alternatively, the contents of this file may be used under the
26  * GNU Lesser General Public License Version 2.1 (the "LGPL"), in
27  * which case the following provisions apply instead of the ones
28  * mentioned above:
29  *
30  * This library is free software; you can redistribute it and/or
31  * modify it under the terms of the GNU Library General Public
32  * License as published by the Free Software Foundation; either
33  * version 2 of the License, or (at your option) any later version.
34  *
35  * This library is distributed in the hope that it will be useful,
36  * but WITHOUT ANY WARRANTY; without even the implied warranty of
37  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
38  * Library General Public License for more details.
39  *
40  * You should have received a copy of the GNU Library General Public
41  * License along with this library; if not, write to the
42  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
43  * Boston, MA 02111-1307, USA.
44  */
45 
46 /**
47  * SECTION:element-curlhttpsrc
48  *
49  * This plugin reads data from a remote location specified by a URI, when the
50  * protocol is 'http' or 'https'.
51  *
52  * It is based on the cURL project (http://curl.haxx.se/) and is specifically
53  * designed to be also used with nghttp2 (http://nghttp2.org) to enable HTTP/2
54  * support for GStreamer. Your libcurl library MUST be compiled against nghttp2
55  * for HTTP/2 support for this functionality. HTTPS support is dependent on
56  * cURL being built with SSL support (OpenSSL/PolarSSL/NSS/GnuTLS).
57  *
58  * An HTTP proxy must be specified by URL.
59  * If the "http_proxy" environment variable is set, its value is used.
60  * The #GstCurlHttpSrc:proxy property can be used to override the default.
61  *
62  * ## Example launch line
63  *
64  * |[
65  * gst-launch-1.0 curlhttpsrc location=http://127.0.1.1/index.html ! fakesink dump=1
66  * ]| The above pipeline reads a web page from the local machine using HTTP and
67  * dumps it to stdout.
68  * |[
69  * gst-launch-1.0 playbin uri=http://rdmedia.bbc.co.uk/dash/testmpds/multiperiod/bbb.php
70  * ]| The above pipeline will start up a DASH streaming session from the given
71  * MPD file. This requires GStreamer to have been built with dashdemux from
72  * gst-plugins-bad.
73  */
74 
75 /*
76  * Thread safety notes.
77  *
78  * GstCurlHttpSrc uses a single thread running the
79  * gst_curl_http_src_curl_multi_loop() function to handle receiving
80  * data and messages from libcurl. Each instance of GstCurlHttpSrc adds
81  * an entry into a queue in GstCurlHttpSrcMultiTaskContext and waits
82  * for the multi_loop to perform the HTTP request.
83  *
84  * When an instance of GstCurlHttpSrc wants to make a request (i.e.
85  * it has moved to the PLAYING state) it adds itself to the
86  * multi_task_context.queue list and signals the multi_loop task.
87  *
88  * Each instance of GstCurlHttpSrc uses buffer_mutex and buffer_cond
89  * to wait for gst_curl_http_src_curl_multi_loop() to perform the
90  * request and signal completion.
91  *
92  * Each instance of GstCurlHttpSrc is protected by the mutexes:
93  * 1. uri_mutex
94  * 2. buffer_mutex
95  *
96  * uri_mutex is used to protect access to the uri field.
97  *
98  * buffer_mutex is used to protect access to buffer_cond, state and
99  * connection_status.
100  *
101  * The gst_curl_http_src_curl_multi_loop() function uses the mutexes:
102  * 1. multi_task_context.task_rec_mutex
103  * 2. multi_task_context.mutex
104  *
105  * multi_task_context.task_rec_mutex is only used by GstTask.
106  *
107  * multi_task_context.mutex is used to protect access to queue and state
108  *
109  * To avoid deadlock, it is vital that if both multi_task_context.mutex
110  * and buffer_mutex are required, that they are locked in the order:
111  * 1. multi_task_context.mutex
112  * 2. buffer_mutex
113  */
114 
115 #ifdef HAVE_CONFIG_H
116 #include <config.h>
117 #endif
118 
119 #include <gst/gst-i18n-plugin.h>
120 
121 #include "gstcurlelements.h"
122 #include "gstcurlhttpsrc.h"
123 #include "gstcurlqueue.h"
124 #include "gstcurldefaults.h"
125 
126 GST_DEBUG_CATEGORY_STATIC (gst_curl_http_src_debug);
127 #define GST_CAT_DEFAULT gst_curl_http_src_debug
128 GST_DEBUG_CATEGORY_STATIC (gst_curl_loop_debug);
129 
130 #define CURL_HTTP_SRC_ERROR(src,cat,code,error_message)     \
131   do { \
132     GST_ELEMENT_ERROR_WITH_DETAILS ((src), cat, code, ("%s", error_message), \
133         ("%s (%d), URL: %s, Redirect to: %s", (src)->reason_phrase, \
134             (src)->status_code, (src)->uri, GST_STR_NULL ((src)->redirect_uri)), \
135             ("http-status-code", G_TYPE_UINT, (src)->status_code, \
136              "http-redirect-uri", G_TYPE_STRING, GST_STR_NULL ((src)->redirect_uri), NULL)); \
137   } while(0)
138 
139 enum
140 {
141   PROP_0,
142   PROP_URI,
143   PROP_USERNAME,
144   PROP_PASSWORD,
145   PROP_PROXYURI,
146   PROP_PROXYUSERNAME,
147   PROP_PROXYPASSWORD,
148   PROP_COOKIES,
149   PROP_USERAGENT,
150   PROP_HEADERS,
151   PROP_COMPRESS,
152   PROP_REDIRECT,
153   PROP_MAXREDIRECT,
154   PROP_KEEPALIVE,
155   PROP_TIMEOUT,
156   PROP_STRICT_SSL,
157   PROP_SSL_CA_FILE,
158   PROP_RETRIES,
159   PROP_CONNECTIONMAXTIME,
160   PROP_MAXCONCURRENT_SERVER,
161   PROP_MAXCONCURRENT_PROXY,
162   PROP_MAXCONCURRENT_GLOBAL,
163   PROP_HTTPVERSION,
164   PROP_IRADIO_MODE,
165 #ifdef OHOS_EXT_FUNC
166   // ohos.ext.func.0033
167   PROP_RECONNECTION_TIMEOUT,
168   PROP_STATE_CHANGE,
169 #endif
170   PROP_MAX
171 };
172 
173 /*
174  * Make a source pad template to be able to kick out recv'd data
175  */
176 static GstStaticPadTemplate srcpadtemplate = GST_STATIC_PAD_TEMPLATE ("src",
177     GST_PAD_SRC,
178     GST_PAD_ALWAYS,
179     GST_STATIC_CAPS_ANY);
180 
181 /*
182  * Function Definitions
183  */
184 /* Gstreamer generic element functions */
185 static void gst_curl_http_src_set_property (GObject * object, guint prop_id,
186     const GValue * value, GParamSpec * pspec);
187 static void gst_curl_http_src_get_property (GObject * object, guint prop_id,
188     GValue * value, GParamSpec * pspec);
189 static void gst_curl_http_src_ref_multi (GstCurlHttpSrc * src);
190 static void gst_curl_http_src_unref_multi (GstCurlHttpSrc * src);
191 static void gst_curl_http_src_finalize (GObject * obj);
192 static GstFlowReturn gst_curl_http_src_create (GstPushSrc * psrc,
193     GstBuffer ** outbuf);
194 static GstFlowReturn gst_curl_http_src_handle_response (GstCurlHttpSrc * src);
195 static gboolean gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src);
196 static GstStateChangeReturn gst_curl_http_src_change_state (GstElement *
197     element, GstStateChange transition);
198 static void gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src);
199 static gboolean gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query);
200 static gboolean gst_curl_http_src_get_content_length (GstBaseSrc * bsrc,
201     guint64 * size);
202 static gboolean gst_curl_http_src_is_seekable (GstBaseSrc * bsrc);
203 static gboolean gst_curl_http_src_do_seek (GstBaseSrc * bsrc,
204     GstSegment * segment);
205 static gboolean gst_curl_http_src_unlock (GstBaseSrc * bsrc);
206 static gboolean gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc);
207 
208 /* URI Handler functions */
209 static void gst_curl_http_src_uri_handler_init (gpointer g_iface,
210     gpointer iface_data);
211 static guint gst_curl_http_src_urihandler_get_type (GType type);
212 static const gchar *const *gst_curl_http_src_urihandler_get_protocols (GType
213     type);
214 static gchar *gst_curl_http_src_urihandler_get_uri (GstURIHandler * handler);
215 static gboolean gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
216     const gchar * uri, GError ** error);
217 
218 /* GstTask functions */
219 static void gst_curl_http_src_curl_multi_loop (gpointer thread_data);
220 static CURL *gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s);
221 static inline void gst_curl_http_src_destroy_easy_handle (GstCurlHttpSrc * src);
222 static size_t gst_curl_http_src_get_header (void *header, size_t size,
223     size_t nmemb, void *src);
224 static size_t gst_curl_http_src_get_chunks (void *chunk, size_t size,
225     size_t nmemb, void *src);
226 static void gst_curl_http_src_request_remove (GstCurlHttpSrc * src);
227 static void gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src);
228 static char *gst_curl_http_src_strcasestr (const char *haystack,
229     const char *needle);
230 #ifndef GST_DISABLE_GST_DEBUG
231 static int gst_curl_http_src_get_debug (CURL * handle, curl_infotype type,
232     char *data, size_t size, void *clientp);
233 #endif
234 #ifdef OHOS_EXT_FUNC
235 // ohos.ext.func.0033
236 static void gst_curl_http_src_deal_sockets_timeout (GstCurlHttpSrcMultiTaskContext *context);
237 static gboolean gst_curl_http_src_reconnect_is_timeout (GstCurlHttpSrc *src);
238 #endif
239 
240 static curl_version_info_data *gst_curl_http_src_curl_capabilities = NULL;
241 static GstCurlHttpVersion pref_http_ver;
242 
243 #define GST_TYPE_CURL_HTTP_VERSION (gst_curl_http_version_get_type ())
244 
245 static GType
gst_curl_http_version_get_type(void)246 gst_curl_http_version_get_type (void)
247 {
248   static GType gtype = 0;
249 
250   if (!gtype) {
251     static const GEnumValue http_versions[] = {
252       {GSTCURL_HTTP_VERSION_1_0, "HTTP Version 1.0", "1.0"},
253       {GSTCURL_HTTP_VERSION_1_1, "HTTP Version 1.1", "1.1"},
254 #ifdef CURL_VERSION_HTTP2
255       {GSTCURL_HTTP_VERSION_2_0, "HTTP Version 2.0", "2.0"},
256 #endif
257       {0, NULL, NULL}
258     };
259     gtype = g_enum_register_static ("GstCurlHttpVersionType", http_versions);
260   }
261   return gtype;
262 }
263 
264 #define gst_curl_http_src_parent_class parent_class
265 G_DEFINE_TYPE_WITH_CODE (GstCurlHttpSrc, gst_curl_http_src, GST_TYPE_PUSH_SRC,
266     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
267         gst_curl_http_src_uri_handler_init));
268 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (curlhttpsrc, "curlhttpsrc",
269     GST_RANK_SECONDARY, GST_TYPE_CURLHTTPSRC, curl_element_init (plugin));
270 
271 static void
gst_curl_http_src_class_init(GstCurlHttpSrcClass * klass)272 gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
273 {
274   GObjectClass *gobject_class;
275   GstElementClass *gstelement_class;
276   GstBaseSrcClass *gstbasesrc_class;
277   GstPushSrcClass *gstpushsrc_class;
278   const gchar *http_env;
279   GstCurlHttpVersion default_http_version;
280 
281   gobject_class = (GObjectClass *) klass;
282   gstelement_class = (GstElementClass *) klass;
283   gstbasesrc_class = (GstBaseSrcClass *) klass;
284   gstpushsrc_class = (GstPushSrcClass *) klass;
285 
286   GST_DEBUG_CATEGORY_INIT (gst_curl_http_src_debug, "curlhttpsrc",
287       0, "UriHandler for libcURL");
288 
289   gstelement_class->change_state =
290       GST_DEBUG_FUNCPTR (gst_curl_http_src_change_state);
291   gstpushsrc_class->create = GST_DEBUG_FUNCPTR (gst_curl_http_src_create);
292   gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_curl_http_src_query);
293   gstbasesrc_class->get_size =
294       GST_DEBUG_FUNCPTR (gst_curl_http_src_get_content_length);
295   gstbasesrc_class->is_seekable =
296       GST_DEBUG_FUNCPTR (gst_curl_http_src_is_seekable);
297   gstbasesrc_class->do_seek = GST_DEBUG_FUNCPTR (gst_curl_http_src_do_seek);
298   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock);
299   gstbasesrc_class->unlock_stop =
300       GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock_stop);
301 
302   gst_element_class_add_pad_template (gstelement_class,
303       gst_static_pad_template_get (&srcpadtemplate));
304 
305   gst_curl_http_src_curl_capabilities = curl_version_info (CURLVERSION_NOW);
306 #ifdef CURL_VERSION_HTTP2
307   if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
308     default_http_version = GSTCURL_HTTP_VERSION_2_0;
309   } else
310 #endif
311     default_http_version = GSTCURL_HTTP_VERSION_1_1;
312 
313   http_env = g_getenv ("GST_CURL_HTTP_VER");
314   if (http_env != NULL) {
315     GST_INFO_OBJECT (klass, "Seen env var GST_CURL_HTTP_VER with value %s",
316         http_env);
317     if (!strcmp (http_env, "1.0")) {
318       pref_http_ver = GSTCURL_HTTP_VERSION_1_0;
319     } else if (!strcmp (http_env, "1.1")) {
320       pref_http_ver = GSTCURL_HTTP_VERSION_1_1;
321     } else if (!strcmp (http_env, "2.0")) {
322 #ifdef CURL_VERSION_HTTP2
323       if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
324         pref_http_ver = GSTCURL_HTTP_VERSION_2_0;
325       } else {
326         goto unsupported_http_version;
327       }
328 #endif
329     } else {
330     unsupported_http_version:
331       GST_WARNING_OBJECT (klass,
332           "Unsupported HTTP version: %s. Fallback to default", http_env);
333       pref_http_ver = default_http_version;
334     }
335   } else {
336     pref_http_ver = default_http_version;
337   }
338 
339   gobject_class->set_property = gst_curl_http_src_set_property;
340   gobject_class->get_property = gst_curl_http_src_get_property;
341   gobject_class->finalize = gst_curl_http_src_finalize;
342 
343   g_object_class_install_property (gobject_class, PROP_URI,
344       g_param_spec_string ("location", "Location", "URI of resource to read",
345           GSTCURL_HANDLE_DEFAULT_CURLOPT_URL,
346           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
347 
348   g_object_class_install_property (gobject_class, PROP_USERNAME,
349       g_param_spec_string ("user-id", "user-id",
350           "HTTP location URI user id for authentication",
351           GSTCURL_HANDLE_DEFAULT_CURLOPT_USERNAME,
352           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353 
354   g_object_class_install_property (gobject_class, PROP_PASSWORD,
355       g_param_spec_string ("user-pw", "user-pw",
356           "HTTP location URI password for authentication",
357           GSTCURL_HANDLE_DEFAULT_CURLOPT_PASSWORD,
358           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
359 
360   g_object_class_install_property (gobject_class, PROP_PROXYURI,
361       g_param_spec_string ("proxy", "Proxy", "URI of HTTP proxy server",
362           GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXY,
363           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
364 
365   g_object_class_install_property (gobject_class, PROP_PROXYUSERNAME,
366       g_param_spec_string ("proxy-id", "proxy-id",
367           "HTTP proxy URI user id for authentication",
368           GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYUSERNAME,
369           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
370 
371   g_object_class_install_property (gobject_class, PROP_PROXYPASSWORD,
372       g_param_spec_string ("proxy-pw", "proxy-pw",
373           "HTTP proxy URI password for authentication",
374           GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYPASSWORD,
375           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
376 
377   g_object_class_install_property (gobject_class, PROP_COOKIES,
378       g_param_spec_boxed ("cookies", "Cookies", "List of HTTP Cookies",
379           G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380 
381   g_object_class_install_property (gobject_class, PROP_USERAGENT,
382       g_param_spec_string ("user-agent", "User-Agent",
383           "URI of resource requested",
384           GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/<curl-version>",
385           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
386           GST_PARAM_DOC_SHOW_DEFAULT));
387 
388   g_object_class_install_property (gobject_class, PROP_COMPRESS,
389       g_param_spec_boolean ("compress", "Compress",
390           "Allow compressed content encodings",
391           GSTCURL_HANDLE_DEFAULT_CURLOPT_ACCEPT_ENCODING,
392           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
393 
394   g_object_class_install_property (gobject_class, PROP_REDIRECT,
395       g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
396           "Allow HTTP Redirections (HTTP Status Code 300 series)",
397           GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION,
398           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399 
400   g_object_class_install_property (gobject_class, PROP_MAXREDIRECT,
401       g_param_spec_int ("max-redirect", "Max-Redirect",
402           "Maximum number of permitted redirections. -1 is unlimited.",
403           GSTCURL_HANDLE_MIN_CURLOPT_MAXREDIRS,
404           GSTCURL_HANDLE_MAX_CURLOPT_MAXREDIRS,
405           GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS,
406           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
407 
408   g_object_class_install_property (gobject_class, PROP_KEEPALIVE,
409       g_param_spec_boolean ("keep-alive", "Keep-Alive",
410           "Toggle keep-alive for connection reuse.",
411           GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE,
412           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
413 
414   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
415       g_param_spec_int ("timeout", "Timeout",
416           "Value in seconds before timeout a blocking request (0 = no timeout)",
417           GSTCURL_HANDLE_MIN_CURLOPT_TIMEOUT,
418           GSTCURL_HANDLE_MAX_CURLOPT_TIMEOUT,
419           GSTCURL_HANDLE_DEFAULT_CURLOPT_TIMEOUT,
420           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
421 
422   g_object_class_install_property (gobject_class, PROP_HEADERS,
423       g_param_spec_boxed ("extra-headers", "Extra Headers",
424           "Extra headers to append to the HTTP request",
425           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
426 
427   g_object_class_install_property (gobject_class, PROP_STRICT_SSL,
428       g_param_spec_boolean ("ssl-strict", "SSL Strict",
429           "Strict SSL certificate checking",
430           GSTCURL_HANDLE_DEFAULT_CURLOPT_SSL_VERIFYPEER,
431           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
432 
433   g_object_class_install_property (gobject_class, PROP_SSL_CA_FILE,
434       g_param_spec_string ("ssl-ca-file", "SSL CA File",
435           "Location of an SSL CA file to use for checking SSL certificates",
436           GSTCURL_HANDLE_DEFAULT_CURLOPT_CAINFO,
437           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
438 
439   g_object_class_install_property (gobject_class, PROP_RETRIES,
440       g_param_spec_int ("retries", "Retries",
441           "Maximum number of retries until giving up (-1=infinite)",
442           GSTCURL_HANDLE_MIN_RETRIES, GSTCURL_HANDLE_MAX_RETRIES,
443           GSTCURL_HANDLE_DEFAULT_RETRIES,
444           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
445 
446   g_object_class_install_property (gobject_class, PROP_CONNECTIONMAXTIME,
447       g_param_spec_uint ("max-connection-time", "Max-Connection-Time",
448           "Maximum amount of time to keep-alive HTTP connections",
449           GSTCURL_MIN_CONNECTION_TIME, GSTCURL_MAX_CONNECTION_TIME,
450           GSTCURL_DEFAULT_CONNECTION_TIME,
451           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452 
453   g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_SERVER,
454       g_param_spec_uint ("max-connections-per-server",
455           "Max-Connections-Per-Server",
456           "Maximum number of connections allowed per server for HTTP/1.x",
457           GSTCURL_MIN_CONNECTIONS_SERVER, GSTCURL_MAX_CONNECTIONS_SERVER,
458           GSTCURL_DEFAULT_CONNECTIONS_SERVER,
459           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
460 
461   g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_PROXY,
462       g_param_spec_uint ("max-connections-per-proxy",
463           "Max-Connections-Per-Proxy",
464           "Maximum number of concurrent connections allowed per proxy for HTTP/1.x",
465           GSTCURL_MIN_CONNECTIONS_PROXY, GSTCURL_MAX_CONNECTIONS_PROXY,
466           GSTCURL_DEFAULT_CONNECTIONS_PROXY,
467           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
468 
469   g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_GLOBAL,
470       g_param_spec_uint ("max-connections", "Max-Connections",
471           "Maximum number of concurrent connections allowed for HTTP/1.x",
472           GSTCURL_MIN_CONNECTIONS_GLOBAL, GSTCURL_MAX_CONNECTIONS_GLOBAL,
473           GSTCURL_DEFAULT_CONNECTIONS_GLOBAL,
474           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
475 
476   g_object_class_install_property (gobject_class, PROP_HTTPVERSION,
477       g_param_spec_enum ("http-version", "HTTP-Version",
478           "The preferred HTTP protocol version",
479           GST_TYPE_CURL_HTTP_VERSION, pref_http_ver,
480           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
481 
482 #ifdef OHOS_EXT_FUNC
483   // ohos.ext.func.0033
484   g_object_class_install_property (gobject_class, PROP_RECONNECTION_TIMEOUT,
485       g_param_spec_uint ("reconnection-timeout", "Reconnection-timeout",
486           "Value in seconds to timeout reconnection",
487           GSTCURL_HANDLE_MIN_CURLOPT_RECONNECTION_TIMEOUT,
488           GSTCURL_HANDLE_MAX_CURLOPT_RECONNECTION_TIMEOUT,
489           GSTCURL_HANDLE_DEFAULT_CURLOPT_RECONNECTION_TIMEOUT,
490           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
491 
492   g_object_class_install_property (gobject_class, PROP_STATE_CHANGE,
493       g_param_spec_int ("state-change", "State-change from adaptive-demux",
494           "State-change from adaptive-demux", 0, (gint) (G_MAXINT32), 0,
495           G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
496 #endif
497 
498   /* Add a debugging task so it's easier to debug in the Multi worker thread */
499   GST_DEBUG_CATEGORY_INIT (gst_curl_loop_debug, "curl_multi_loop", 0,
500       "libcURL loop thread debugging");
501 #ifndef GST_DISABLE_GST_DEBUG
502   gst_debug_log (gst_curl_loop_debug, GST_LEVEL_INFO, __FILE__, __func__,
503       __LINE__, NULL, "Testing the curl_multi_loop debugging prints");
504 #endif
505 
506   klass->multi_task_context.task = NULL;
507   klass->multi_task_context.refcount = 0;
508   klass->multi_task_context.queue = NULL;
509   klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
510   klass->multi_task_context.multi_handle = NULL;
511   g_mutex_init (&klass->multi_task_context.mutex);
512 #ifdef OHOS_OPT_STABLE
513   /* ohos.opt.stable.0003 for multiple instances ref/unref mutex */
514   g_mutex_init (&klass->multi_task_context.multiple_mutex);
515 #endif
516   g_cond_init (&klass->multi_task_context.signal);
517 
518   gst_element_class_set_static_metadata (gstelement_class,
519       "HTTP Client Source using libcURL",
520       "Source/Network",
521       "Receiver data as a client over a network via HTTP using cURL",
522       "Sam Hurst <samuelh@rd.bbc.co.uk>");
523 
524   gst_type_mark_as_plugin_api (GST_TYPE_CURL_HTTP_VERSION, 0);
525 }
526 
527 static void
gst_curl_http_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)528 gst_curl_http_src_set_property (GObject * object, guint prop_id,
529     const GValue * value, GParamSpec * pspec)
530 {
531   GstCurlHttpSrc *source = GST_CURLHTTPSRC (object);
532   GSTCURL_FUNCTION_ENTRY (source);
533 
534   switch (prop_id) {
535     case PROP_URI:
536       g_mutex_lock (&source->uri_mutex);
537       g_free (source->uri);
538       source->uri = g_value_dup_string (value);
539       g_mutex_unlock (&source->uri_mutex);
540       break;
541     case PROP_USERNAME:
542       g_free (source->username);
543       source->username = g_value_dup_string (value);
544       break;
545     case PROP_PASSWORD:
546       g_free (source->password);
547       source->password = g_value_dup_string (value);
548       break;
549     case PROP_PROXYURI:
550       g_free (source->proxy_uri);
551       source->proxy_uri = g_value_dup_string (value);
552       break;
553     case PROP_PROXYUSERNAME:
554       g_free (source->proxy_user);
555       source->proxy_user = g_value_dup_string (value);
556       break;
557     case PROP_PROXYPASSWORD:
558       g_free (source->proxy_pass);
559       source->proxy_pass = g_value_dup_string (value);
560       break;
561     case PROP_COOKIES:
562       g_strfreev (source->cookies);
563       source->cookies = g_strdupv (g_value_get_boxed (value));
564       source->number_cookies = g_strv_length (source->cookies);
565       break;
566     case PROP_USERAGENT:
567       g_free (source->user_agent);
568       source->user_agent = g_value_dup_string (value);
569       break;
570     case PROP_HEADERS:
571     {
572       const GstStructure *s = gst_value_get_structure (value);
573       if (source->request_headers)
574         gst_structure_free (source->request_headers);
575       source->request_headers =
576           s ? gst_structure_copy (s) :
577           gst_structure_new_empty (REQUEST_HEADERS_NAME);
578     }
579       break;
580     case PROP_COMPRESS:
581       source->accept_compressed_encodings = g_value_get_boolean (value);
582       break;
583     case PROP_REDIRECT:
584       source->allow_3xx_redirect = g_value_get_boolean (value);
585       break;
586     case PROP_MAXREDIRECT:
587       source->max_3xx_redirects = g_value_get_int (value);
588       break;
589     case PROP_KEEPALIVE:
590       source->keep_alive = g_value_get_boolean (value);
591       break;
592     case PROP_TIMEOUT:
593       source->timeout_secs = g_value_get_int (value);
594       break;
595     case PROP_STRICT_SSL:
596       source->strict_ssl = g_value_get_boolean (value);
597       break;
598     case PROP_SSL_CA_FILE:
599 #ifdef OHOS_OPT_MEMLEAK
600       /* ohos.opt.memleak.0003 fix memory leak. */
601       g_free (source->custom_ca_file);
602 #endif
603       source->custom_ca_file = g_value_dup_string (value);
604       break;
605     case PROP_RETRIES:
606       source->total_retries = g_value_get_int (value);
607       break;
608     case PROP_CONNECTIONMAXTIME:
609       source->max_connection_time = g_value_get_uint (value);
610       break;
611     case PROP_MAXCONCURRENT_SERVER:
612       source->max_conns_per_server = g_value_get_uint (value);
613       break;
614     case PROP_MAXCONCURRENT_PROXY:
615       source->max_conns_per_proxy = g_value_get_uint (value);
616       break;
617     case PROP_MAXCONCURRENT_GLOBAL:
618       source->max_conns_global = g_value_get_uint (value);
619       break;
620     case PROP_HTTPVERSION:
621       source->preferred_http_version = g_value_get_enum (value);
622       break;
623 #ifdef OHOS_EXT_FUNC
624     // ohos.ext.func.0033
625     case PROP_RECONNECTION_TIMEOUT: {
626       source->reconnection_timeout = g_value_get_uint (value);
627       GST_DEBUG_OBJECT (source, "set reconnection_timeout to %u us", source->reconnection_timeout);
628       break;
629     }
630     case PROP_STATE_CHANGE: {
631       source->player_state = g_value_get_int (value);
632       GST_DEBUG_OBJECT (source, "set player_state to %d", source->player_state);
633       break;
634     }
635 #endif
636     default:
637       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
638       break;
639   }
640   GSTCURL_FUNCTION_EXIT (source);
641 }
642 
643 static void
gst_curl_http_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)644 gst_curl_http_src_get_property (GObject * object, guint prop_id,
645     GValue * value, GParamSpec * pspec)
646 {
647   GstCurlHttpSrc *source = GST_CURLHTTPSRC (object);
648   GSTCURL_FUNCTION_ENTRY (source);
649 
650   switch (prop_id) {
651     case PROP_URI:
652       g_mutex_lock (&source->uri_mutex);
653       g_value_set_string (value, source->uri);
654       g_mutex_unlock (&source->uri_mutex);
655       break;
656     case PROP_USERNAME:
657       g_value_set_string (value, source->username);
658       break;
659     case PROP_PASSWORD:
660       g_value_set_string (value, source->password);
661       break;
662     case PROP_PROXYURI:
663       g_value_set_string (value, source->proxy_uri);
664       break;
665     case PROP_PROXYUSERNAME:
666       g_value_set_string (value, source->proxy_user);
667       break;
668     case PROP_PROXYPASSWORD:
669       g_value_set_string (value, source->proxy_pass);
670       break;
671     case PROP_COOKIES:
672       g_value_set_boxed (value, source->cookies);
673       break;
674     case PROP_USERAGENT:
675       g_value_set_string (value, source->user_agent);
676       break;
677     case PROP_HEADERS:
678       gst_value_set_structure (value, source->request_headers);
679       break;
680     case PROP_COMPRESS:
681       g_value_set_boolean (value, source->accept_compressed_encodings);
682       break;
683     case PROP_REDIRECT:
684       g_value_set_boolean (value, source->allow_3xx_redirect);
685       break;
686     case PROP_MAXREDIRECT:
687       g_value_set_int (value, source->max_3xx_redirects);
688       break;
689     case PROP_KEEPALIVE:
690       g_value_set_boolean (value, source->keep_alive);
691       break;
692     case PROP_TIMEOUT:
693       g_value_set_int (value, source->timeout_secs);
694       break;
695     case PROP_STRICT_SSL:
696       g_value_set_boolean (value, source->strict_ssl);
697       break;
698     case PROP_SSL_CA_FILE:
699       g_value_set_string (value, source->custom_ca_file);
700       break;
701     case PROP_RETRIES:
702       g_value_set_int (value, source->total_retries);
703       break;
704     case PROP_CONNECTIONMAXTIME:
705       g_value_set_uint (value, source->max_connection_time);
706       break;
707     case PROP_MAXCONCURRENT_SERVER:
708       g_value_set_uint (value, source->max_conns_per_server);
709       break;
710     case PROP_MAXCONCURRENT_PROXY:
711       g_value_set_uint (value, source->max_conns_per_proxy);
712       break;
713     case PROP_MAXCONCURRENT_GLOBAL:
714       g_value_set_uint (value, source->max_conns_global);
715       break;
716     case PROP_HTTPVERSION:
717       g_value_set_enum (value, source->preferred_http_version);
718       break;
719 #ifdef OHOS_EXT_FUNC
720     // ohos.ext.func.0033
721     case PROP_RECONNECTION_TIMEOUT:
722       g_value_set_uint (value, source->reconnection_timeout);
723       break;
724 #endif
725     default:
726       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
727       break;
728   }
729   GSTCURL_FUNCTION_EXIT (source);
730 }
731 
732 static void
gst_curl_http_src_init(GstCurlHttpSrc * source)733 gst_curl_http_src_init (GstCurlHttpSrc * source)
734 {
735   GSTCURL_FUNCTION_ENTRY (source);
736 
737   /* Assume everything is already free'd */
738   source->uri = NULL;
739   source->redirect_uri = NULL;
740   source->username = GSTCURL_HANDLE_DEFAULT_CURLOPT_USERNAME;
741   source->password = GSTCURL_HANDLE_DEFAULT_CURLOPT_PASSWORD;
742   source->proxy_uri = NULL;
743   source->proxy_user = NULL;
744   source->proxy_pass = NULL;
745   source->cookies = NULL;
746   g_assert (gst_curl_http_src_curl_capabilities != NULL);
747   source->user_agent =
748       g_strdup_printf (GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/%s",
749       gst_curl_http_src_curl_capabilities->version);
750   source->number_cookies = 0;
751   source->request_headers = gst_structure_new_empty (REQUEST_HEADERS_NAME);
752   source->allow_3xx_redirect = GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION;
753   source->max_3xx_redirects = GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS;
754   source->keep_alive = GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE;
755   source->timeout_secs = GSTCURL_HANDLE_DEFAULT_CURLOPT_TIMEOUT;
756   source->max_connection_time = GSTCURL_DEFAULT_CONNECTION_TIME;
757   source->max_conns_per_server = GSTCURL_DEFAULT_CONNECTIONS_SERVER;
758   source->max_conns_per_proxy = GSTCURL_DEFAULT_CONNECTIONS_PROXY;
759   source->max_conns_global = GSTCURL_DEFAULT_CONNECTIONS_GLOBAL;
760 #ifdef OHOS_OPT_COMPAT
761   /* ohos.ext.compat.0025
762   It will be closed temporarily until the certificate verification needs are delivered on September 30 */
763   source->strict_ssl = 0;
764 #else
765   source->strict_ssl = GSTCURL_HANDLE_DEFAULT_CURLOPT_SSL_VERIFYPEER;
766 #endif
767   source->custom_ca_file = NULL;
768   source->preferred_http_version = pref_http_ver;
769   source->total_retries = GSTCURL_HANDLE_DEFAULT_RETRIES;
770   source->retries_remaining = source->total_retries;
771   source->slist = NULL;
772   source->accept_compressed_encodings = FALSE;
773   source->seekable = GSTCURL_SEEKABLE_UNKNOWN;
774   source->content_size = 0;
775   source->request_position = 0;
776 #ifdef OHOS_EXT_FUNC
777   /* ohos.ext.func.0025 support https seek: */
778   source->orig_request_pos = 0;
779   source->read_position = 0;
780 #endif
781   source->stop_position = -1;
782 
783 #ifdef OHOS_EXT_FUNC
784   // ohos.ext.func.0033
785   source->start_usecs = 0;
786   source->end_usecs = 0;
787   source->reconnection_timeout = GSTCURL_HANDLE_DEFAULT_CURLOPT_RECONNECTION_TIMEOUT;
788   source->player_state = GST_PLAYER_STATUS_IDLE;
789 #endif
790 
791   gst_base_src_set_automatic_eos (GST_BASE_SRC (source), FALSE);
792 
793   source->proxy_uri = g_strdup (g_getenv ("http_proxy"));
794   source->no_proxy_list = g_strdup (g_getenv ("no_proxy"));
795 
796   g_mutex_init (&source->uri_mutex);
797   g_mutex_init (&source->buffer_mutex);
798   g_cond_init (&source->buffer_cond);
799 
800   source->buffer = NULL;
801   source->buffer_len = 0;
802   source->state = GSTCURL_NONE;
803   source->pending_state = GSTCURL_NONE;
804   source->transfer_begun = FALSE;
805   source->data_received = FALSE;
806   source->connection_status = GSTCURL_NOT_CONNECTED;
807 
808   source->http_headers = NULL;
809   source->content_type = NULL;
810   source->status_code = 0;
811   source->reason_phrase = NULL;
812   source->hdrs_updated = FALSE;
813   source->curl_result = CURLE_OK;
814   gst_caps_replace (&source->caps, NULL);
815 
816   GSTCURL_FUNCTION_EXIT (source);
817 }
818 
819 /*
820  * Check if the Curl multi loop has been started. If not, initialise it and
821  * start it running. If it is already running, increment the refcount.
822  */
823 static void
gst_curl_http_src_ref_multi(GstCurlHttpSrc * src)824 gst_curl_http_src_ref_multi (GstCurlHttpSrc * src)
825 {
826   GstCurlHttpSrcClass *klass;
827 
828   GSTCURL_FUNCTION_ENTRY (src);
829 
830   /*klass = (GstCurlHttpSrcClass) g_type_class_peek_parent (src); */
831   klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
832       GstCurlHttpSrcClass);
833 
834 #ifdef OHOS_OPT_STABLE
835   /* ohos.opt.stable.0003 for multiple instances unref mutex */
836   g_mutex_lock (&klass->multi_task_context.multiple_mutex);
837 #endif
838 
839   g_mutex_lock (&klass->multi_task_context.mutex);
840   if (klass->multi_task_context.refcount == 0) {
841     /* Set up various in-task properties */
842 
843     /* NULL is treated as the start of the list, no need to allocate. */
844     klass->multi_task_context.queue = NULL;
845 
846     /* set up curl */
847     klass->multi_task_context.multi_handle = curl_multi_init ();
848 
849     curl_multi_setopt (klass->multi_task_context.multi_handle,
850         CURLMOPT_PIPELINING, 1);
851 #ifdef CURLMOPT_MAX_HOST_CONNECTIONS
852     curl_multi_setopt (klass->multi_task_context.multi_handle,
853         CURLMOPT_MAX_HOST_CONNECTIONS, 1);
854 #endif
855 
856     /* Start the thread */
857     g_rec_mutex_init (&klass->multi_task_context.task_rec_mutex);
858     klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
859     klass->multi_task_context.task = gst_task_new (
860         (GstTaskFunction) gst_curl_http_src_curl_multi_loop,
861         (gpointer) & klass->multi_task_context, NULL);
862     gst_task_set_lock (klass->multi_task_context.task,
863         &klass->multi_task_context.task_rec_mutex);
864     if (gst_task_start (klass->multi_task_context.task) == FALSE) {
865       /*
866        * This is a pretty critical failure and is not recoverable, so commit
867        * sudoku and run away.
868        */
869       GSTCURL_ERROR_PRINT ("Couldn't start curl_multi task! Aborting.");
870       abort ();
871     }
872     GSTCURL_INFO_PRINT ("Curl multi loop has been correctly initialised!");
873   }
874   klass->multi_task_context.refcount++;
875   g_mutex_unlock (&klass->multi_task_context.mutex);
876 
877 #ifdef OHOS_OPT_STABLE
878   /* ohos.opt.stable.0003 for multiple instances unref mutex */
879   g_mutex_unlock (&klass->multi_task_context.multiple_mutex);
880 #endif
881 
882   GSTCURL_FUNCTION_EXIT (src);
883 }
884 
885 /*
886  * Decrement the reference count on the curl multi loop. If this is called by
887  * the last instance to hold a reference, shut down the worker. (Otherwise
888  * GStreamer can't close down with a thread still running). Also offers the
889  * "force_all" boolean parameter, which if TRUE removes all references and shuts
890  * down.
891  */
892 static void
gst_curl_http_src_unref_multi(GstCurlHttpSrc * src)893 gst_curl_http_src_unref_multi (GstCurlHttpSrc * src)
894 {
895   GstCurlHttpSrcClass *klass;
896 
897   GSTCURL_FUNCTION_ENTRY (src);
898 
899   klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
900       GstCurlHttpSrcClass);
901 
902 #ifdef OHOS_OPT_STABLE
903   /* ohos.opt.stable.0003 for multiple instances ref mutex */
904   g_mutex_lock (&klass->multi_task_context.multiple_mutex);
905 #endif
906 
907   g_mutex_lock (&klass->multi_task_context.mutex);
908 #ifdef OHOS_EXT_FUNC
909   /* ohos.ext.func.0025 for clean code */
910   if (klass->multi_task_context.refcount == 0) {
911     GST_WARNING_OBJECT (src, "worker thread refcount is 0");
912     g_mutex_unlock (&klass->multi_task_context.mutex);
913     GSTCURL_FUNCTION_EXIT (src);
914     return;
915   }
916 #endif
917   klass->multi_task_context.refcount--;
918   GST_INFO_OBJECT (src, "Closing instance, worker thread refcount is now %u",
919       klass->multi_task_context.refcount);
920 
921   if (klass->multi_task_context.refcount == 0) {
922     /* Everything's done! Clean up. */
923     gst_task_stop (klass->multi_task_context.task);
924     klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
925     g_cond_signal (&klass->multi_task_context.signal);
926     g_mutex_unlock (&klass->multi_task_context.mutex);
927     GST_DEBUG_OBJECT (src, "Joining curl_multi_loop task...");
928     gst_task_join (klass->multi_task_context.task);
929     gst_object_unref (klass->multi_task_context.task);
930     klass->multi_task_context.task = NULL;
931     curl_multi_cleanup (klass->multi_task_context.multi_handle);
932     klass->multi_task_context.multi_handle = NULL;
933     g_rec_mutex_clear (&klass->multi_task_context.task_rec_mutex);
934     GST_DEBUG_OBJECT (src, "multi_task_context cleanup complete");
935   } else {
936     g_mutex_unlock (&klass->multi_task_context.mutex);
937   }
938 
939 #ifdef OHOS_OPT_STABLE
940   /* ohos.opt.stable.0003 for multiple instances ref mutex */
941   g_mutex_unlock (&klass->multi_task_context.multiple_mutex);
942 #endif
943 
944   GSTCURL_FUNCTION_EXIT (src);
945 }
946 
947 static void
gst_curl_http_src_finalize(GObject * obj)948 gst_curl_http_src_finalize (GObject * obj)
949 {
950   GstCurlHttpSrc *src = GST_CURLHTTPSRC (obj);
951 
952   GSTCURL_FUNCTION_ENTRY (src);
953 
954   /* Cleanup all memory allocated */
955   gst_curl_http_src_cleanup_instance (src);
956 
957   GSTCURL_FUNCTION_EXIT (src);
958 
959   /* Chain up to parent class */
960   G_OBJECT_CLASS (gst_curl_http_src_parent_class)->finalize (obj);
961 }
962 
963 #ifdef OHOS_EXT_FUNC
964 /* ohos.ext.func.0025 for seek */
965 static void
gst_curl_http_src_handle_seek(GstCurlHttpSrc * src)966 gst_curl_http_src_handle_seek (GstCurlHttpSrc * src)
967 {
968   if (src->curl_handle == NULL) {
969     GST_INFO_OBJECT (src, "parameter is invalid");
970     return;
971   }
972 
973   g_mutex_lock (&src->buffer_mutex);
974   if (src->request_position == src->read_position) {
975 #ifdef OHOS_OPT_COMPAT
976     /* ohos.opt.compat.0044 */
977     GST_INFO_OBJECT (src, "request_position is equal to read_position, req = %"
978       G_GUINT64_FORMAT, src->request_position);
979 #endif
980     /* not seek, just return */
981     g_mutex_unlock (&src->buffer_mutex);
982     return;
983   }
984   g_mutex_unlock (&src->buffer_mutex);
985 
986   gst_curl_http_src_wait_until_removed(src);
987 
988   g_mutex_lock (&src->buffer_mutex);
989   src->state = GSTCURL_NONE;
990   src->transfer_begun = FALSE;
991   src->status_code = 0;
992   if (src->reason_phrase != NULL) {
993     g_free (src->reason_phrase);
994     src->reason_phrase = NULL;
995   }
996   src->hdrs_updated = FALSE;
997   gst_curl_http_src_destroy_easy_handle (src);
998 
999   if (src->buffer_len > 0) {
1000     g_free (src->buffer);
1001     src->buffer = NULL;
1002     src->buffer_len = 0;
1003   }
1004   g_mutex_unlock (&src->buffer_mutex);
1005 
1006   GST_INFO_OBJECT (src, "seek_begin: curl handle removed, req_pos:%" G_GUINT64_FORMAT ", read_pos:%" G_GUINT64_FORMAT,
1007     src->request_position, src->read_position);
1008 }
1009 #endif
1010 
1011 /*
1012  * Do the transfer. If the transfer hasn't begun yet, start a new curl handle
1013  * and pass it to the multi queue to be operated on. Then wait for any blocks
1014  * of data and push them to the source pad.
1015  */
1016 static GstFlowReturn
gst_curl_http_src_create(GstPushSrc * psrc,GstBuffer ** outbuf)1017 gst_curl_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
1018 {
1019   GstFlowReturn ret;
1020   GstCurlHttpSrc *src = GST_CURLHTTPSRC (psrc);
1021   GstCurlHttpSrcClass *klass;
1022   GstStructure *empty_headers;
1023   GstBaseSrc *basesrc;
1024 
1025   GSTCURL_FUNCTION_ENTRY (src);
1026 
1027   klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
1028       GstCurlHttpSrcClass);
1029   basesrc = GST_BASE_SRC_CAST (src);
1030 
1031 retry:
1032 #ifdef OHOS_OPT_COMPAT
1033   /** ohos.opt.compat.0063
1034    * stop interrupt src create
1035    */
1036   if (src->player_state == GST_PLAYER_STATUS_READY) {
1037     GST_INFO_OBJECT(src, "stopping, cancel create");
1038     return GST_FLOW_ERROR;
1039   }
1040 #endif
1041   ret = GST_FLOW_OK;
1042 
1043 #ifdef OHOS_EXT_FUNC
1044   /* ohos.ext.func.0025 for seek */
1045   gst_curl_http_src_handle_seek(src);
1046 #endif
1047 
1048   /* NOTE: when both the buffer_mutex and multi_task_context.mutex are
1049      needed, multi_task_context.mutex must be acquired first */
1050   g_mutex_lock (&klass->multi_task_context.mutex);
1051   g_mutex_lock (&src->buffer_mutex);
1052   if (src->state == GSTCURL_UNLOCK) {
1053     ret = GST_FLOW_FLUSHING;
1054     goto escape;
1055   }
1056 
1057   if (!src->transfer_begun) {
1058     GST_DEBUG_OBJECT (src, "Starting new request for URI %s", src->uri);
1059     /* Create the Easy Handle and set up the session. */
1060     src->curl_handle = gst_curl_http_src_create_easy_handle (src);
1061     if (src->curl_handle == NULL) {
1062       ret = GST_FLOW_ERROR;
1063       goto escape;
1064     }
1065 
1066     if (gst_curl_http_src_add_queue_item (&klass->multi_task_context.queue, src)
1067         == FALSE) {
1068       GST_ERROR_OBJECT (src, "Couldn't create new queue item! Aborting...");
1069       ret = GST_FLOW_ERROR;
1070       goto escape;
1071     }
1072     /* Signal the worker thread */
1073     g_cond_signal (&klass->multi_task_context.signal);
1074 
1075     src->state = GSTCURL_OK;
1076     src->transfer_begun = TRUE;
1077     src->data_received = FALSE;
1078 #ifdef OHOS_EXT_FUNC
1079     // ohos.ext.func.0033
1080     src->curl_result = CURLE_OK; // refresh the response state code
1081 #endif
1082 
1083     GST_DEBUG_OBJECT (src, "Submitted request for URI %s to curl", src->uri);
1084 
1085     if (src->http_headers != NULL) {
1086       gst_structure_free (src->http_headers);
1087     }
1088     empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME);
1089     src->http_headers = gst_structure_new (HTTP_HEADERS_NAME,
1090         URI_NAME, G_TYPE_STRING, src->uri,
1091         REQUEST_HEADERS_NAME, GST_TYPE_STRUCTURE, src->request_headers,
1092         RESPONSE_HEADERS_NAME, GST_TYPE_STRUCTURE, empty_headers, NULL);
1093     gst_structure_free (empty_headers);
1094     GST_INFO_OBJECT (src, "Created a new headers object");
1095   }
1096 
1097   g_mutex_unlock (&klass->multi_task_context.mutex);
1098 
1099   /* Wait for data to become available, then punt it downstream */
1100   while ((src->buffer_len == 0) && (src->state == GSTCURL_OK)
1101       && (src->connection_status == GSTCURL_CONNECTED)) {
1102     g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
1103   }
1104 
1105   if (src->state == GSTCURL_UNLOCK) {
1106     if (src->buffer_len > 0) {
1107       g_free (src->buffer);
1108       src->buffer = NULL;
1109       src->buffer_len = 0;
1110     }
1111     g_mutex_unlock (&src->buffer_mutex);
1112     return GST_FLOW_FLUSHING;
1113   }
1114 
1115   ret = gst_curl_http_src_handle_response (src);
1116   switch (ret) {
1117 #ifdef OHOS_EXT_FUNC
1118     /**
1119      * ohos.ext.func.0033
1120      * Support reconnection after disconnection in gstcurl.
1121      * When reconnected, reset the timeout clock.
1122      */
1123     case GST_FLOW_OK:
1124       src->start_usecs = 0;
1125       break;
1126 #endif
1127     case GST_FLOW_ERROR:
1128       /* Don't attempt a retry, just bomb out */
1129       g_mutex_unlock (&src->buffer_mutex);
1130       return ret;
1131     case GST_FLOW_CUSTOM_ERROR:
1132       if (src->data_received == TRUE) {
1133         /*
1134          * If data has already been received, we can't recall previously sent
1135          * buffers so don't attempt a retry in this case.
1136          *
1137          * TODO: Remember the position we got to, and make a range request for
1138          * the resource without the bit we've already received?
1139          */
1140         GST_WARNING_OBJECT (src,
1141             "Failed mid-transfer, can't continue for URI %s", src->uri);
1142         g_mutex_unlock (&src->buffer_mutex);
1143         return GST_FLOW_ERROR;
1144       }
1145       src->retries_remaining--;
1146       if (src->retries_remaining == 0) {
1147         GST_WARNING_OBJECT (src, "Out of retries for URI %s", src->uri);
1148         g_mutex_unlock (&src->buffer_mutex);
1149         return GST_FLOW_ERROR;  /* Don't attempt a retry, just bomb out */
1150       }
1151       GST_INFO_OBJECT (src, "Attempting retry for URI %s", src->uri);
1152       src->state = GSTCURL_NONE;
1153       src->transfer_begun = FALSE;
1154       src->status_code = 0;
1155       g_free (src->reason_phrase);
1156       src->reason_phrase = NULL;
1157       src->hdrs_updated = FALSE;
1158       if (src->http_headers != NULL) {
1159         gst_structure_free (src->http_headers);
1160         src->http_headers = NULL;
1161         GST_INFO_OBJECT (src, "NULL'd the headers");
1162       }
1163       gst_curl_http_src_destroy_easy_handle (src);
1164 #ifdef OHOS_EXT_FUNC
1165       /**
1166        * ohos.ext.func.0033
1167        * Support reconnection after disconnection in gstcurl.
1168        * When network brokes, try reconnecting until timeout.
1169        */
1170       if (gst_curl_http_src_reconnect_is_timeout(src)) {
1171         CURL_HTTP_SRC_ERROR (src, RESOURCE, TIME_OUT, "reconnection timeout");
1172         g_mutex_unlock (&src->buffer_mutex);
1173         return GST_FLOW_RECONNECTION_TIMEOUT;
1174       }
1175 #endif
1176       g_mutex_unlock (&src->buffer_mutex);
1177       goto retry;               /* Attempt a retry! */
1178     default:
1179       break;
1180   }
1181 
1182   if (((src->state == GSTCURL_OK) || (src->state == GSTCURL_DONE)) &&
1183       (src->buffer_len > 0)) {
1184 
1185     GST_DEBUG_OBJECT (src, "Pushing %u bytes of transfer for URI %s to pad",
1186         src->buffer_len, src->uri);
1187     *outbuf = gst_buffer_new_allocate (NULL, src->buffer_len, NULL);
1188     gst_buffer_fill (*outbuf, 0, src->buffer, src->buffer_len);
1189     GST_BUFFER_OFFSET (*outbuf) = basesrc->segment.position;
1190 
1191     g_free (src->buffer);
1192     src->buffer = NULL;
1193     src->buffer_len = 0;
1194     src->data_received = TRUE;
1195 
1196     /* ret should still be GST_FLOW_OK */
1197   } else if ((src->state == GSTCURL_DONE) && (src->buffer_len == 0)) {
1198     GST_INFO_OBJECT (src, "Full body received, signalling EOS for URI %s.",
1199         src->uri);
1200     src->state = GSTCURL_NONE;
1201     src->transfer_begun = FALSE;
1202     src->status_code = 0;
1203     g_free (src->reason_phrase);
1204     src->reason_phrase = NULL;
1205     src->hdrs_updated = FALSE;
1206     gst_curl_http_src_destroy_easy_handle (src);
1207     ret = GST_FLOW_EOS;
1208   } else {
1209     switch (src->state) {
1210       case GSTCURL_NONE:
1211         GST_WARNING_OBJECT (src, "Got unexpected GSTCURL_NONE state!");
1212         break;
1213       case GSTCURL_REMOVED:
1214         GST_WARNING_OBJECT (src, "Transfer got removed from the curl queue");
1215         ret = GST_FLOW_EOS;
1216         break;
1217       case GSTCURL_BAD_QUEUE_REQUEST:
1218         GST_ERROR_OBJECT (src, "Bad Queue Request!");
1219         ret = GST_FLOW_ERROR;
1220         break;
1221       case GSTCURL_TOTAL_ERROR:
1222         GST_ERROR_OBJECT (src, "Critical, unrecoverable error!");
1223         ret = GST_FLOW_ERROR;
1224         break;
1225       case GSTCURL_PIPELINE_NULL:
1226         GST_ERROR_OBJECT (src, "Pipeline null");
1227         break;
1228       default:
1229         GST_ERROR_OBJECT (src, "Unknown state of %u", src->state);
1230     }
1231   }
1232   g_mutex_unlock (&src->buffer_mutex);
1233   GSTCURL_FUNCTION_EXIT (src);
1234   return ret;
1235 
1236 escape:
1237   g_mutex_unlock (&src->buffer_mutex);
1238   g_mutex_unlock (&klass->multi_task_context.mutex);
1239 
1240   GSTCURL_FUNCTION_EXIT (src);
1241   return ret;
1242 }
1243 
1244 /*
1245  * Convert header from a GstStructure type to a curl_slist type that curl will
1246  * understand.
1247  */
1248 static gboolean
_headers_to_curl_slist(GQuark field_id,const GValue * value,gpointer ptr)1249 _headers_to_curl_slist (GQuark field_id, const GValue * value, gpointer ptr)
1250 {
1251   gchar *field;
1252   struct curl_slist **p_slist = ptr;
1253 
1254   field = g_strdup_printf ("%s: %s", g_quark_to_string (field_id),
1255       g_value_get_string (value));
1256 
1257   *p_slist = curl_slist_append (*p_slist, field);
1258 
1259   g_free (field);
1260 
1261   return TRUE;
1262 }
1263 
1264 /*
1265  * From the data in the queue element s, create a CURL easy handle and populate
1266  * options with the URL, proxy data, login options, cookies,
1267  */
1268 static CURL *
gst_curl_http_src_create_easy_handle(GstCurlHttpSrc * s)1269 gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s)
1270 {
1271   CURL *handle;
1272   gint i;
1273   GSTCURL_FUNCTION_ENTRY (s);
1274 
1275   /* This is mandatory and yet not default option, so if this is NULL
1276    * then something very bad is going on. */
1277   if (s->uri == NULL) {
1278     GST_ERROR_OBJECT (s, "No URI for curl!");
1279     return NULL;
1280   }
1281 
1282   handle = curl_easy_init ();
1283   if (handle == NULL) {
1284     GST_ERROR_OBJECT (s, "Couldn't init a curl easy handle!");
1285     return NULL;
1286   }
1287   GST_INFO_OBJECT (s, "Creating a new handle for URI %s", s->uri);
1288 
1289 #ifndef GST_DISABLE_GST_DEBUG
1290   if (curl_easy_setopt (handle, CURLOPT_VERBOSE, 1) != CURLE_OK) {
1291     GST_WARNING_OBJECT (s, "Failed to set verbose!");
1292   }
1293   if (curl_easy_setopt (handle, CURLOPT_DEBUGDATA, s) != CURLE_OK) {
1294     GST_WARNING_OBJECT (s, "Failed to set debug user_data!");
1295   }
1296   if (curl_easy_setopt (handle, CURLOPT_DEBUGFUNCTION,
1297           gst_curl_http_src_get_debug) != CURLE_OK) {
1298     GST_WARNING_OBJECT (s, "Failed to set debug function!");
1299   }
1300 #endif
1301 
1302   gst_curl_setopt_str (s, handle, CURLOPT_URL, s->uri);
1303   gst_curl_setopt_str (s, handle, CURLOPT_USERNAME, s->username);
1304   gst_curl_setopt_str (s, handle, CURLOPT_PASSWORD, s->password);
1305   gst_curl_setopt_str (s, handle, CURLOPT_PROXY, s->proxy_uri);
1306   gst_curl_setopt_str (s, handle, CURLOPT_NOPROXY, s->no_proxy_list);
1307   gst_curl_setopt_str (s, handle, CURLOPT_PROXYUSERNAME, s->proxy_user);
1308   gst_curl_setopt_str (s, handle, CURLOPT_PROXYPASSWORD, s->proxy_pass);
1309 
1310   for (i = 0; i < s->number_cookies; i++) {
1311     gst_curl_setopt_str (s, handle, CURLOPT_COOKIELIST, s->cookies[i]);
1312   }
1313 
1314   /* curl_slist_append dynamically allocates memory, but I need to free it */
1315   if (s->request_headers != NULL) {
1316     gst_structure_foreach (s->request_headers, _headers_to_curl_slist,
1317         &s->slist);
1318     if (curl_easy_setopt (handle, CURLOPT_HTTPHEADER, s->slist) != CURLE_OK) {
1319       GST_WARNING_OBJECT (s, "Failed to set HTTP headers!");
1320     }
1321   }
1322 
1323   gst_curl_setopt_str_default (s, handle, CURLOPT_USERAGENT, s->user_agent);
1324 
1325   /*
1326    * Unlike soup, this isn't a binary op, curl wants a string here. So if it's
1327    * TRUE, simply set the value as an empty string as this allows both gzip and
1328    * zlib compression methods.
1329    */
1330   if (s->accept_compressed_encodings == TRUE) {
1331     gst_curl_setopt_str (s, handle, CURLOPT_ACCEPT_ENCODING, "");
1332   } else {
1333     gst_curl_setopt_str (s, handle, CURLOPT_ACCEPT_ENCODING, "identity");
1334   }
1335 
1336   gst_curl_setopt_int (s, handle, CURLOPT_FOLLOWLOCATION,
1337       s->allow_3xx_redirect);
1338   gst_curl_setopt_int_default (s, handle, CURLOPT_MAXREDIRS,
1339       s->max_3xx_redirects);
1340   gst_curl_setopt_bool (s, handle, CURLOPT_TCP_KEEPALIVE, s->keep_alive);
1341   gst_curl_setopt_int (s, handle, CURLOPT_TIMEOUT, s->timeout_secs);
1342   gst_curl_setopt_bool (s, handle, CURLOPT_SSL_VERIFYPEER, s->strict_ssl);
1343   gst_curl_setopt_str (s, handle, CURLOPT_CAINFO, s->custom_ca_file);
1344 
1345 #ifdef OHOS_EXT_FUNC
1346   /* ohos.ext.func.0025 for seek */
1347   if (s->request_position > 0 || s->stop_position > 0) {
1348     gchar *range;
1349     if (s->stop_position < 1) {
1350       /* start specified, no end specified */
1351       range = g_strdup_printf ("%" G_GUINT64_FORMAT "-", s->request_position);
1352     } else {
1353       /* in GStreamer the end position indicates the first byte that is not
1354          in the range, whereas in HTTP the Content-Range header includes the
1355          byte listed in the end value */
1356       range = g_strdup_printf ("%" G_GUINT64_FORMAT "-%" G_GINT64_FORMAT,
1357           s->request_position, s->stop_position - 1);
1358     }
1359     s->orig_request_pos = s->request_position;
1360     GST_TRACE_OBJECT (s, "Requesting range: %s", range);
1361     curl_easy_setopt (handle, CURLOPT_RANGE, range);
1362     g_free (range);
1363   }
1364   s->read_position = s->request_position;
1365 #else
1366   if (s->request_position || s->stop_position > 0) {
1367     gchar *range;
1368     if (s->stop_position < 1) {
1369       /* start specified, no end specified */
1370       range = g_strdup_printf ("%" G_GINT64_FORMAT "-", s->request_position);
1371     } else {
1372       /* in GStreamer the end position indicates the first byte that is not
1373          in the range, whereas in HTTP the Content-Range header includes the
1374          byte listed in the end value */
1375       range = g_strdup_printf ("%" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1376           s->request_position, s->stop_position - 1);
1377     }
1378     GST_TRACE_OBJECT (s, "Requesting range: %s", range);
1379     curl_easy_setopt (handle, CURLOPT_RANGE, range);
1380     g_free (range);
1381   }
1382 #endif
1383 
1384   switch (s->preferred_http_version) {
1385     case GSTCURL_HTTP_VERSION_1_0:
1386       GST_DEBUG_OBJECT (s, "Setting version as HTTP/1.0");
1387       gst_curl_setopt_int (s, handle, CURLOPT_HTTP_VERSION,
1388           CURL_HTTP_VERSION_1_0);
1389       break;
1390     case GSTCURL_HTTP_VERSION_1_1:
1391       GST_DEBUG_OBJECT (s, "Setting version as HTTP/1.1");
1392       gst_curl_setopt_int (s, handle, CURLOPT_HTTP_VERSION,
1393           CURL_HTTP_VERSION_1_1);
1394       break;
1395 #ifdef CURL_VERSION_HTTP2
1396     case GSTCURL_HTTP_VERSION_2_0:
1397       GST_DEBUG_OBJECT (s, "Setting version as HTTP/2.0");
1398       if (curl_easy_setopt (handle, CURLOPT_HTTP_VERSION,
1399               CURL_HTTP_VERSION_2_0) != CURLE_OK) {
1400         if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
1401           GST_WARNING_OBJECT (s,
1402               "Cannot set unsupported option CURLOPT_HTTP_VERSION");
1403         } else {
1404           GST_INFO_OBJECT (s, "HTTP/2 unsupported by libcurl at this time");
1405         }
1406       }
1407       break;
1408 #endif
1409     default:
1410       GST_WARNING_OBJECT (s,
1411           "Supplied a bogus HTTP version, using curl default!");
1412   }
1413 
1414   gst_curl_setopt_generic (s, handle, CURLOPT_HEADERFUNCTION,
1415       gst_curl_http_src_get_header);
1416   gst_curl_setopt_str (s, handle, CURLOPT_HEADERDATA, s);
1417   gst_curl_setopt_generic (s, handle, CURLOPT_WRITEFUNCTION,
1418       gst_curl_http_src_get_chunks);
1419   gst_curl_setopt_str (s, handle, CURLOPT_WRITEDATA, s);
1420 
1421   gst_curl_setopt_str (s, handle, CURLOPT_ERRORBUFFER, s->curl_errbuf);
1422 
1423   GSTCURL_FUNCTION_EXIT (s);
1424   return handle;
1425 }
1426 
1427 /*
1428  * Check the return type from the curl transfer. If it was okay, then deal with
1429  * any headers that were received. Headers should only be dealt with once - but
1430  * we might get a second set if there are trailing headers (RFC7230 Section 4.4)
1431  */
1432 static GstFlowReturn
gst_curl_http_src_handle_response(GstCurlHttpSrc * src)1433 gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
1434 {
1435   glong curl_info_long;
1436   gdouble curl_info_dbl;
1437   curl_off_t curl_info_offt;
1438   gchar *redirect_url;
1439   GstBaseSrc *basesrc;
1440   const GValue *response_headers;
1441   GstFlowReturn ret = GST_FLOW_OK;
1442 
1443   GSTCURL_FUNCTION_ENTRY (src);
1444 
1445   GST_TRACE_OBJECT (src, "status code: %d (%s), curl return code %d",
1446       src->status_code, src->reason_phrase, src->curl_result);
1447 
1448   /* Check the curl result code first - anything not 0 is probably a failure */
1449   if (src->curl_result != 0) {
1450     GST_WARNING_OBJECT (src, "Curl failed the transfer (%d): %s",
1451         src->curl_result, curl_easy_strerror (src->curl_result));
1452     GST_DEBUG_OBJECT (src, "Reason for curl failure: %s", src->curl_errbuf);
1453 #ifdef OHOS_EXT_FUNC
1454     /**
1455      * ohos.ext.func.0033
1456      * Support reconnection after disconnection in gstcurl.
1457      * When network brokes, try reconnecting until timeout.
1458      * Reconnection time counting will start after player state changing to buffering
1459      * and receiving curl error message of "could not connect" or
1460      * "operation timeout"(may cost some time after performing operation)
1461      */
1462     if (src->curl_result == CURLE_COULDNT_CONNECT || src->curl_result == CURLE_OPERATION_TIMEDOUT) {
1463       src->data_received = FALSE;
1464       if (src->buffer_len > 0) {
1465         return GST_FLOW_OK;
1466       }
1467       return GST_FLOW_CUSTOM_ERROR;
1468     }
1469 #endif
1470     return GST_FLOW_ERROR;
1471   }
1472 
1473   /*
1474    * What response code do we have?
1475    */
1476   if (src->status_code >= 400) {
1477     GST_WARNING_OBJECT (src, "Transfer for URI %s returned error status %u",
1478         src->uri, src->status_code);
1479     src->retries_remaining = 0;
1480 #ifdef OHOS_OPT_COMPAT
1481     /**
1482      * ohos.opt.compat.0032
1483      * Fix seek failed when request position from gstqueue2 is the end of the file.
1484      * A 416 response is not actually an error when the file is already completely downloaded
1485      * and the request position is the end of the file.
1486      */
1487     if (src->status_code == 416 && src->content_size > 0) {
1488       return GST_FLOW_OK;
1489     }
1490 #endif
1491     CURL_HTTP_SRC_ERROR (src, RESOURCE, NOT_FOUND, (src->reason_phrase));
1492     return GST_FLOW_ERROR;
1493   } else if (src->status_code == 0) {
1494     if (curl_easy_getinfo (src->curl_handle, CURLINFO_TOTAL_TIME,
1495             &curl_info_dbl) != CURLE_OK) {
1496       /* Curl cannot be relied on in this state, so return an error. */
1497       return GST_FLOW_ERROR;
1498     }
1499     if (curl_info_dbl > src->timeout_secs) {
1500       return GST_FLOW_CUSTOM_ERROR;
1501     }
1502 
1503     if (curl_easy_getinfo (src->curl_handle, CURLINFO_OS_ERRNO,
1504             &curl_info_long) != CURLE_OK) {
1505       /* Curl cannot be relied on in this state, so return an error. */
1506       return GST_FLOW_ERROR;
1507 
1508     }
1509 
1510     GST_WARNING_OBJECT (src, "Errno for CONNECT call was %ld (%s)",
1511         curl_info_long, g_strerror ((gint) curl_info_long));
1512 
1513     /* Some of these responses are retry-able, others not. Set the returned
1514      * state to ERROR so we crash out instead of fruitlessly retrying.
1515      */
1516     if (curl_info_long == ECONNREFUSED) {
1517       return GST_FLOW_ERROR;
1518     }
1519     ret = GST_FLOW_CUSTOM_ERROR;
1520   }
1521 
1522 
1523   if (ret == GST_FLOW_CUSTOM_ERROR) {
1524     src->hdrs_updated = FALSE;
1525     GSTCURL_FUNCTION_EXIT (src);
1526     return ret;
1527   }
1528 
1529   /* Only do this once */
1530   if (src->hdrs_updated == FALSE) {
1531     GSTCURL_FUNCTION_EXIT (src);
1532     return GST_FLOW_OK;
1533   }
1534 
1535   /*
1536    * Deal with redirections...
1537    */
1538   if (curl_easy_getinfo (src->curl_handle, CURLINFO_EFFECTIVE_URL,
1539           &redirect_url)
1540       == CURLE_OK) {
1541     size_t lena, lenb;
1542     lena = strlen (src->uri);
1543     lenb = strlen (redirect_url);
1544     if (g_ascii_strncasecmp (src->uri, redirect_url,
1545             (lena > lenb) ? lenb : lena) != 0) {
1546       GST_INFO_OBJECT (src, "Got a redirect to %s, setting as redirect URI",
1547           redirect_url);
1548       src->redirect_uri = g_strdup (redirect_url);
1549       gst_structure_remove_field (src->http_headers, REDIRECT_URI_NAME);
1550       gst_structure_set (src->http_headers, REDIRECT_URI_NAME,
1551           G_TYPE_STRING, redirect_url, NULL);
1552     }
1553   }
1554 
1555   /*
1556    * Push the content length
1557    */
1558   if (curl_easy_getinfo (src->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T,
1559           &curl_info_offt) == CURLE_OK) {
1560     if (curl_info_offt == -1) {
1561       GST_WARNING_OBJECT (src,
1562           "No Content-Length was specified in the response.");
1563 #ifdef OHOS_OPT_COMPAT
1564     /**
1565      * ohos.opt.compat.0059
1566      * DASH stream's seek need obtaining content length, and HLS stream does not need it.
1567      * If dash cannot obtain content length, it is necessary to set seekable to false.
1568      * Currently, DASH is not supported, so disable this setting to avoid affecting HLS.
1569      */
1570 #else
1571       src->seekable = GSTCURL_SEEKABLE_FALSE;
1572 #endif
1573     } else {
1574       /* Note that in the case of a range get, Content-Length is the number
1575          of bytes requested, not the total size of the resource */
1576 #ifdef OHOS_EXT_FUNC
1577       /* ohos.ext.func.0025 */
1578       GST_INFO_OBJECT (src, "orig req pos:%" G_GUINT64_FORMAT ", Content-Length was given as %" G_GUINT64_FORMAT,
1579           src->orig_request_pos, curl_info_offt);
1580       if (src->content_size == 0) {
1581         src->content_size = src->orig_request_pos + curl_info_offt;
1582       }
1583       basesrc = GST_BASE_SRC_CAST (src);
1584       basesrc->segment.duration = src->orig_request_pos + curl_info_offt;
1585 #else
1586       GST_INFO_OBJECT (src, "Content-Length was given as %" G_GUINT64_FORMAT,
1587           curl_info_offt);
1588       if (src->content_size == 0) {
1589         src->content_size = src->request_position + curl_info_offt;
1590       }
1591       basesrc = GST_BASE_SRC_CAST (src);
1592       basesrc->segment.duration = src->request_position + curl_info_offt;
1593 #endif
1594       if (src->seekable == GSTCURL_SEEKABLE_UNKNOWN) {
1595         src->seekable = GSTCURL_SEEKABLE_TRUE;
1596       }
1597       gst_element_post_message (GST_ELEMENT (src),
1598           gst_message_new_duration_changed (GST_OBJECT (src)));
1599     }
1600   }
1601 
1602   /*
1603    * Push all the received headers down via a sicky event
1604    */
1605   response_headers = gst_structure_get_value (src->http_headers,
1606       RESPONSE_HEADERS_NAME);
1607   if (gst_structure_n_fields (gst_value_get_structure (response_headers)) > 0) {
1608     GstEvent *hdrs_event;
1609 
1610     gst_element_post_message (GST_ELEMENT_CAST (src),
1611         gst_message_new_element (GST_OBJECT_CAST (src),
1612             gst_structure_copy (src->http_headers)));
1613 
1614     /* gst_event_new_custom takes ownership of our structure */
1615     hdrs_event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY,
1616         gst_structure_copy (src->http_headers));
1617     gst_pad_push_event (GST_BASE_SRC_PAD (src), hdrs_event);
1618     GST_INFO_OBJECT (src, "Pushed headers downstream");
1619   }
1620 
1621   src->hdrs_updated = FALSE;
1622 
1623   GSTCURL_FUNCTION_EXIT (src);
1624 
1625   return ret;
1626 }
1627 
1628 /*
1629  * "Negotiate" capabilities between us and the sink.
1630  * I.e. tell the sink device what data to expect. We can't be told what to send
1631  * unless we implement "only return to me if this type" property. Potential TODO
1632  */
1633 static gboolean
gst_curl_http_src_negotiate_caps(GstCurlHttpSrc * src)1634 gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src)
1635 {
1636   const GValue *response_headers;
1637   const GstStructure *response_struct;
1638 
1639   GST_INFO_OBJECT (src, "Negotiating caps...");
1640   if (src->caps && src->http_headers) {
1641     response_headers =
1642         gst_structure_get_value (src->http_headers, RESPONSE_HEADERS_NAME);
1643     if (!response_headers) {
1644       GST_WARNING_OBJECT (src, "Failed to get %s", RESPONSE_HEADERS_NAME);
1645       return FALSE;
1646     }
1647     response_struct = gst_value_get_structure (response_headers);
1648     if (gst_structure_has_field_typed (response_struct, "content-type",
1649             G_TYPE_STRING)) {
1650       const gchar *content_type =
1651           gst_structure_get_string (response_struct, "content-type");
1652       GST_INFO_OBJECT (src, "Setting caps as Content-Type of %s", content_type);
1653       src->caps = gst_caps_make_writable (src->caps);
1654       gst_caps_set_simple (src->caps, "content-type", G_TYPE_STRING,
1655           content_type, NULL);
1656       if (gst_base_src_set_caps (GST_BASE_SRC (src), src->caps) != TRUE) {
1657         GST_ERROR_OBJECT (src, "Setting caps failed!");
1658         return FALSE;
1659       }
1660     }
1661   } else {
1662     GST_DEBUG_OBJECT (src, "No caps have been set, continue.");
1663   }
1664 
1665   return TRUE;
1666 }
1667 
1668 /*
1669  * Cleanup the CURL easy handle once we're done with it.
1670  */
1671 static inline void
gst_curl_http_src_destroy_easy_handle(GstCurlHttpSrc * src)1672 gst_curl_http_src_destroy_easy_handle (GstCurlHttpSrc * src)
1673 {
1674   /* Thank you Handles, and well done. Well done, mate. */
1675   if (src->curl_handle != NULL) {
1676     curl_easy_cleanup (src->curl_handle);
1677     src->curl_handle = NULL;
1678   }
1679 
1680   /* In addition, clean up the curl header slist if it was used. */
1681   if (src->slist != NULL) {
1682     curl_slist_free_all (src->slist);
1683     src->slist = NULL;
1684   }
1685 }
1686 
1687 static GstStateChangeReturn
gst_curl_http_src_change_state(GstElement * element,GstStateChange transition)1688 gst_curl_http_src_change_state (GstElement * element, GstStateChange transition)
1689 {
1690   GstStateChangeReturn ret;
1691   GstCurlHttpSrc *source = GST_CURLHTTPSRC (element);
1692   GSTCURL_FUNCTION_ENTRY (source);
1693 
1694   switch (transition) {
1695     case GST_STATE_CHANGE_NULL_TO_READY:
1696       gst_curl_http_src_ref_multi (source);
1697       break;
1698     case GST_STATE_CHANGE_READY_TO_PAUSED:
1699       if (source->uri == NULL) {
1700         GST_ELEMENT_ERROR (element, RESOURCE, OPEN_READ, (_("No URL set.")),
1701             ("Missing URL"));
1702         return GST_STATE_CHANGE_FAILURE;
1703       }
1704       break;
1705     case GST_STATE_CHANGE_READY_TO_NULL:
1706       GST_DEBUG_OBJECT (source, "Removing from multi_loop queue...");
1707       /* The pipeline has ended, so signal any running request to end
1708          and wait until the multi_loop has stopped using this element */
1709       gst_curl_http_src_wait_until_removed (source);
1710       gst_curl_http_src_unref_multi (source);
1711       break;
1712     default:
1713       break;
1714   }
1715 
1716   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1717 
1718   GSTCURL_FUNCTION_EXIT (source);
1719   return ret;
1720 }
1721 
1722 /*
1723  * Take care of any memory that may be left over from the instance that's now
1724  * closing before we leak it.
1725  */
1726 static void
gst_curl_http_src_cleanup_instance(GstCurlHttpSrc * src)1727 gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src)
1728 {
1729   gint i;
1730   g_mutex_lock (&src->uri_mutex);
1731   g_free (src->uri);
1732   src->uri = NULL;
1733   g_free (src->redirect_uri);
1734   src->redirect_uri = NULL;
1735   g_mutex_unlock (&src->uri_mutex);
1736   g_mutex_clear (&src->uri_mutex);
1737 
1738   g_free (src->proxy_uri);
1739   src->proxy_uri = NULL;
1740   g_free (src->no_proxy_list);
1741   src->no_proxy_list = NULL;
1742   g_free (src->proxy_user);
1743   src->proxy_user = NULL;
1744   g_free (src->proxy_pass);
1745   src->proxy_pass = NULL;
1746 
1747   for (i = 0; i < src->number_cookies; i++) {
1748     g_free (src->cookies[i]);
1749     src->cookies[i] = NULL;
1750   }
1751   g_free (src->cookies);
1752   src->cookies = NULL;
1753 
1754   g_free (src->user_agent);
1755   src->user_agent = NULL;
1756 
1757   g_mutex_clear (&src->buffer_mutex);
1758 
1759   g_cond_clear (&src->buffer_cond);
1760 
1761   g_free (src->buffer);
1762   src->buffer = NULL;
1763 
1764   if (src->request_headers) {
1765     gst_structure_free (src->request_headers);
1766     src->request_headers = NULL;
1767   }
1768   if (src->http_headers != NULL) {
1769     gst_structure_free (src->http_headers);
1770     src->http_headers = NULL;
1771   }
1772   g_free (src->reason_phrase);
1773   src->reason_phrase = NULL;
1774 #ifdef OHOS_OPT_MEMLEAK
1775   /* ohos.opt.memleak.0003 fix memory leak. */
1776   g_free (src->custom_ca_file);
1777   src->custom_ca_file = NULL;
1778 #endif
1779   gst_caps_replace (&src->caps, NULL);
1780 
1781   gst_curl_http_src_destroy_easy_handle (src);
1782 }
1783 
1784 static gboolean
gst_curl_http_src_query(GstBaseSrc * bsrc,GstQuery * query)1785 gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query)
1786 {
1787   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1788   gboolean ret;
1789 #ifdef OHOS_EXT_FUNC
1790   /* ohos.opt.compat.0022 support pull mode in libav demux */
1791   GstSchedulingFlags flags;
1792   gint minsize, maxsize, align;
1793 #endif
1794   GSTCURL_FUNCTION_ENTRY (src);
1795 
1796   switch (GST_QUERY_TYPE (query)) {
1797     case GST_QUERY_URI:
1798       g_mutex_lock (&src->uri_mutex);
1799       gst_query_set_uri (query, src->uri);
1800       if (src->redirect_uri != NULL) {
1801         gst_query_set_uri_redirection (query, src->redirect_uri);
1802       }
1803       g_mutex_unlock (&src->uri_mutex);
1804       ret = TRUE;
1805       break;
1806     default:
1807       ret = GST_BASE_SRC_CLASS (parent_class)->query (bsrc, query);
1808       break;
1809   }
1810 
1811 #ifdef OHOS_EXT_FUNC
1812   /* ohos.opt.compat.0022 support pull mode in libav demux */
1813   switch (GST_QUERY_TYPE (query)) {
1814     case GST_QUERY_SCHEDULING:
1815       gst_query_parse_scheduling (query, &flags, &minsize, &maxsize, &align);
1816       flags |= GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED;
1817 
1818       if (src->seekable) {
1819         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
1820       } else {
1821         flags &= (~GST_SCHEDULING_FLAG_SEEKABLE);
1822       }
1823       GST_INFO_OBJECT (src, "seekable: %d", src->seekable);
1824 
1825       gst_query_set_scheduling (query, flags, minsize, maxsize, align);
1826       break;
1827     default:
1828       break;
1829   }
1830 #endif
1831 
1832   GSTCURL_FUNCTION_EXIT (src);
1833   return ret;
1834 }
1835 
1836 static gboolean
gst_curl_http_src_get_content_length(GstBaseSrc * bsrc,guint64 * size)1837 gst_curl_http_src_get_content_length (GstBaseSrc * bsrc, guint64 * size)
1838 {
1839   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1840   const GValue *response_headers;
1841   gboolean ret = FALSE;
1842 
1843   if (src->http_headers == NULL) {
1844     return FALSE;
1845   }
1846 
1847   response_headers = gst_structure_get_value (src->http_headers,
1848       RESPONSE_HEADERS_NAME);
1849   if (gst_structure_has_field_typed (gst_value_get_structure (response_headers),
1850           "content-length", G_TYPE_STRING)) {
1851     const gchar *content_length =
1852         gst_structure_get_string (gst_value_get_structure (response_headers),
1853         "content-length");
1854     *size = (guint64) g_ascii_strtoull (content_length, NULL, 10);
1855     ret = TRUE;
1856   } else {
1857     GST_DEBUG_OBJECT (src,
1858         "No content length has yet been set, or there was an error!");
1859   }
1860   return ret;
1861 }
1862 
1863 static gboolean
gst_curl_http_src_is_seekable(GstBaseSrc * bsrc)1864 gst_curl_http_src_is_seekable (GstBaseSrc * bsrc)
1865 {
1866   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1867 
1868   /* NOTE: if seekable is UNKNOWN, assume yes */
1869   return src->seekable != GSTCURL_SEEKABLE_FALSE;
1870 }
1871 
1872 static gboolean
gst_curl_http_src_do_seek(GstBaseSrc * bsrc,GstSegment * segment)1873 gst_curl_http_src_do_seek (GstBaseSrc * bsrc, GstSegment * segment)
1874 {
1875   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1876   gboolean ret = TRUE;
1877 
1878   g_mutex_lock (&src->buffer_mutex);
1879 #ifdef OHOS_EXT_FUNC
1880 /* ohos.ext.func.0025 support https seek: */
1881   GST_INFO_OBJECT (src, "do_seek(%" G_GINT64_FORMAT ", %" G_GINT64_FORMAT
1882       ")", segment->start, segment->stop);
1883 #endif
1884   if (src->state == GSTCURL_UNLOCK) {
1885     GST_WARNING_OBJECT (src, "Attempt to seek while unlocked");
1886     ret = FALSE;
1887     goto done;
1888   }
1889   if (src->request_position == segment->start &&
1890       src->stop_position == segment->stop) {
1891 #ifdef OHOS_OPT_COMPAT
1892     /* ohos.opt.compat.0044 */
1893     src->read_position = (guint64)-1;
1894 #endif
1895     GST_DEBUG_OBJECT (src, "Seek to current read/end position");
1896     goto done;
1897   }
1898 
1899   if (src->seekable == GSTCURL_SEEKABLE_FALSE) {
1900     GST_WARNING_OBJECT (src, "Not seekable");
1901     ret = FALSE;
1902     goto done;
1903   }
1904 
1905   if (segment->rate < 0.0 || segment->format != GST_FORMAT_BYTES) {
1906     GST_WARNING_OBJECT (src, "Invalid seek segment");
1907     ret = FALSE;
1908     goto done;
1909   }
1910 
1911   if (src->content_size > 0 && segment->start >= src->content_size) {
1912     GST_WARNING_OBJECT (src,
1913         "Potentially seeking beyond end of file, might EOS immediately");
1914   }
1915 
1916   src->request_position = segment->start;
1917   src->stop_position = segment->stop;
1918 #ifdef OHOS_OPT_COMPAT
1919   /**
1920    * ohos.opt.compat.0044
1921    * Fix seek failed when request_position from gstqueue2 seek event is the same as read_position.
1922    * This case occurs when pulled position from demux jumps back and forth between the boundary of two ranges in gstqueue2,
1923    * which causes requested postion of seek event from gstqueue2 be the same as right boundary of the last range, namely
1924    * current downloaded position(read_position). Thus, reset read position when occurred gst_curl_http_src_do_seek().
1925    */
1926   src->read_position = (guint64)-1;
1927 #endif
1928 done:
1929   g_mutex_unlock (&src->buffer_mutex);
1930   return ret;
1931 }
1932 
1933 static void
gst_curl_http_src_uri_handler_init(gpointer g_iface,gpointer iface_data)1934 gst_curl_http_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1935 {
1936   GstURIHandlerInterface *uri_iface = (GstURIHandlerInterface *) g_iface;
1937 
1938   uri_iface->get_type = gst_curl_http_src_urihandler_get_type;
1939   uri_iface->get_protocols = gst_curl_http_src_urihandler_get_protocols;
1940   uri_iface->get_uri = gst_curl_http_src_urihandler_get_uri;
1941   uri_iface->set_uri = gst_curl_http_src_urihandler_set_uri;
1942 }
1943 
1944 static guint
gst_curl_http_src_urihandler_get_type(GType type)1945 gst_curl_http_src_urihandler_get_type (GType type)
1946 {
1947   return GST_URI_SRC;
1948 }
1949 
1950 static const gchar *const *
gst_curl_http_src_urihandler_get_protocols(GType type)1951 gst_curl_http_src_urihandler_get_protocols (GType type)
1952 {
1953   static const gchar *protocols[] = { "http", "https", NULL };
1954 
1955   return protocols;
1956 }
1957 
1958 static gchar *
gst_curl_http_src_urihandler_get_uri(GstURIHandler * handler)1959 gst_curl_http_src_urihandler_get_uri (GstURIHandler * handler)
1960 {
1961   gchar *ret;
1962   GstCurlHttpSrc *source;
1963 
1964   g_return_val_if_fail (GST_IS_URI_HANDLER (handler), NULL);
1965   source = GST_CURLHTTPSRC (handler);
1966 
1967   GSTCURL_FUNCTION_ENTRY (source);
1968 
1969   g_mutex_lock (&source->uri_mutex);
1970   ret = g_strdup (source->uri);
1971   g_mutex_unlock (&source->uri_mutex);
1972 
1973   GSTCURL_FUNCTION_EXIT (source);
1974   return ret;
1975 }
1976 
1977 static gboolean
gst_curl_http_src_urihandler_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)1978 gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
1979     const gchar * uri, GError ** error)
1980 {
1981   GstCurlHttpSrc *source = GST_CURLHTTPSRC (handler);
1982   GSTCURL_FUNCTION_ENTRY (source);
1983 
1984   g_return_val_if_fail (GST_IS_URI_HANDLER (handler), FALSE);
1985   g_return_val_if_fail (uri != NULL, FALSE);
1986 
1987   g_mutex_lock (&source->uri_mutex);
1988 
1989   if (source->uri != NULL) {
1990     GST_DEBUG_OBJECT (source,
1991         "URI already present as %s, updating to new URI %s", source->uri, uri);
1992     g_free (source->uri);
1993   }
1994 
1995   source->uri = g_strdup (uri);
1996   if (source->uri == NULL) {
1997     g_mutex_unlock (&source->uri_mutex);
1998     return FALSE;
1999   }
2000   source->retries_remaining = source->total_retries;
2001 
2002   g_mutex_unlock (&source->uri_mutex);
2003 
2004   GSTCURL_FUNCTION_EXIT (source);
2005   return TRUE;
2006 }
2007 
2008 /*
2009  * Cancel any currently running transfer, and then signal all the loops to drop
2010  * any received buffers. The ::create() method should return GST_FLOW_FLUSHING.
2011  */
2012 static gboolean
gst_curl_http_src_unlock(GstBaseSrc * bsrc)2013 gst_curl_http_src_unlock (GstBaseSrc * bsrc)
2014 {
2015   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
2016   gboolean want_removal = FALSE;
2017 
2018   g_mutex_lock (&src->buffer_mutex);
2019   if (src->state != GSTCURL_UNLOCK) {
2020     if (src->state == GSTCURL_OK) {
2021       /* A transfer is running, cancel it */
2022       if (src->connection_status == GSTCURL_CONNECTED) {
2023         src->connection_status = GSTCURL_WANT_REMOVAL;
2024       }
2025       want_removal = TRUE;
2026     }
2027     src->pending_state = src->state;
2028     src->state = GSTCURL_UNLOCK;
2029   }
2030   g_cond_signal (&src->buffer_cond);
2031   g_mutex_unlock (&src->buffer_mutex);
2032 
2033   if (want_removal) {
2034     GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
2035         GST_TYPE_CURL_HTTP_SRC,
2036         GstCurlHttpSrcClass);
2037     g_mutex_lock (&klass->multi_task_context.mutex);
2038     g_cond_signal (&klass->multi_task_context.signal);
2039     g_mutex_unlock (&klass->multi_task_context.mutex);
2040   }
2041 
2042   return TRUE;
2043 }
2044 
2045 /*
2046  * Finish the unlock request above and return curlhttpsrc to the normal state.
2047  * This will probably be GSTCURL_DONE, and the next return from ::create() will
2048  * be GST_FLOW_EOS as we don't want to deliver parts of a HTTP body.
2049  */
2050 static gboolean
gst_curl_http_src_unlock_stop(GstBaseSrc * bsrc)2051 gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc)
2052 {
2053   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
2054 
2055   g_mutex_lock (&src->buffer_mutex);
2056   src->state = src->pending_state;
2057   src->pending_state = GSTCURL_NONE;
2058   g_cond_signal (&src->buffer_cond);
2059   g_mutex_unlock (&src->buffer_mutex);
2060 
2061   return TRUE;
2062 }
2063 
2064 /*****************************************************************************
2065  * Curl loop task functions begin
2066  *****************************************************************************/
2067 static void
gst_curl_http_src_curl_multi_loop(gpointer thread_data)2068 gst_curl_http_src_curl_multi_loop (gpointer thread_data)
2069 {
2070   GstCurlHttpSrcMultiTaskContext *context;
2071   GstCurlHttpSrcQueueElement *qelement, *qnext;
2072   gint i, still_running = 0;
2073   CURLMsg *curl_message;
2074   GstCurlHttpSrc *elt;
2075   guint active = 0;
2076 
2077   context = (GstCurlHttpSrcMultiTaskContext *) thread_data;
2078 
2079   g_mutex_lock (&context->mutex);
2080 
2081   /* Someone is holding a reference to us, but isn't using us so to avoid
2082    * unnecessary clock cycle wasting, sit in a conditional wait until woken.
2083    */
2084   while (context->queue == NULL
2085       && context->state == GSTCURL_MULTI_LOOP_STATE_RUNNING) {
2086     GSTCURL_DEBUG_PRINT ("Waiting for an element to be added...");
2087     g_cond_wait (&context->signal, &context->mutex);
2088     GSTCURL_DEBUG_PRINT ("Received wake up call!");
2089   }
2090   if (context->state == GSTCURL_MULTI_LOOP_STATE_STOP) {
2091     GSTCURL_INFO_PRINT ("Got instruction to shut down");
2092     goto out;
2093   }
2094 
2095   /* check for elements that need to be started or removed */
2096   qelement = context->queue;
2097   while (qelement != NULL) {
2098     qnext = qelement->next;
2099     elt = qelement->p;
2100     /* NOTE: when both the buffer_mutex and multi_task_context.mutex are
2101        needed, multi_task_context.mutex must be acquired first */
2102     g_mutex_lock (&elt->buffer_mutex);
2103     if (elt->connection_status == GSTCURL_WANT_REMOVAL) {
2104       curl_multi_remove_handle (context->multi_handle, elt->curl_handle);
2105       if (elt->state == GSTCURL_UNLOCK) {
2106         elt->pending_state = GSTCURL_REMOVED;
2107       } else {
2108         elt->state = GSTCURL_REMOVED;
2109       }
2110       elt->connection_status = GSTCURL_NOT_CONNECTED;
2111       gst_curl_http_src_remove_queue_item (&context->queue, qelement->p);
2112       g_cond_signal (&elt->buffer_cond);
2113     } else if (elt->connection_status == GSTCURL_CONNECTED) {
2114       active++;
2115       if (g_atomic_int_compare_and_exchange (&qelement->running, 0, 1)) {
2116         GSTCURL_DEBUG_PRINT ("Adding easy handle for URI %s", qelement->p->uri);
2117         curl_multi_add_handle (context->multi_handle, qelement->p->curl_handle);
2118       }
2119     }
2120     g_mutex_unlock (&elt->buffer_mutex);
2121     qelement = qnext;
2122   }
2123 
2124   if (active == 0) {
2125     GSTCURL_DEBUG_PRINT ("No active elements");
2126     goto out;
2127   }
2128 
2129   /* perform a select() on all of the active sockets and process any
2130      messages from curl */
2131   {
2132     struct timeval timeout;
2133     gint rc;
2134     fd_set fdread, fdwrite, fdexcep;
2135     int maxfd = -1;
2136     long curl_timeo = -1;
2137     gboolean cond = FALSE;
2138 
2139     /* Because curl can possibly take some time here, be nice and let go of the
2140      * mutex so other threads can perform state/queue operations as we don't
2141      * care about those until the end of this. */
2142     g_mutex_unlock (&context->mutex);
2143 
2144     FD_ZERO (&fdread);
2145     FD_ZERO (&fdwrite);
2146     FD_ZERO (&fdexcep);
2147 
2148     timeout.tv_sec = 1;
2149     timeout.tv_usec = 0;
2150 
2151     curl_multi_timeout (context->multi_handle, &curl_timeo);
2152     if (curl_timeo >= 0) {
2153       timeout.tv_sec = curl_timeo / 1000;
2154       if (timeout.tv_sec > 1) {
2155         timeout.tv_sec = 1;
2156       } else {
2157         timeout.tv_usec = (curl_timeo % 1000) * 1000;
2158       }
2159     }
2160 
2161     /* get file descriptors from the transfers */
2162     curl_multi_fdset (context->multi_handle, &fdread, &fdwrite, &fdexcep,
2163         &maxfd);
2164 
2165     rc = select (maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);
2166 
2167     switch (rc) {
2168       case -1:
2169         /* select error */
2170         break;
2171       case 0:
2172 #ifdef OHOS_EXT_FUNC
2173         /**
2174          * ohos.ext.func.0033
2175          * Support reconnection after disconnection in gstcurl.
2176          * When curl_timeo is -1, that means there's no timeout setted at all, the socket is abnormal.
2177          */
2178         if (curl_timeo == -1) {
2179           GST_INFO ("sockets connect timeout.");
2180           gst_curl_http_src_deal_sockets_timeout (context);
2181           return;
2182         }
2183         curl_multi_perform (context->multi_handle, &still_running);
2184         break;
2185 #endif
2186       default:
2187         /* timeout or readable/writable sockets */
2188         curl_multi_perform (context->multi_handle, &still_running);
2189         break;
2190     }
2191 
2192     g_mutex_lock (&context->mutex);
2193 
2194     /*
2195      * Check the CURL message buffer to find out if any transfers have
2196      * completed. If they have, call the signal_finished function which
2197      * will signal the g_cond_wait call in that calling instance.
2198      */
2199     i = 0;
2200     while (cond != TRUE) {
2201       curl_message = curl_multi_info_read (context->multi_handle, &i);
2202       if (curl_message == NULL) {
2203         cond = TRUE;
2204       } else if (curl_message->msg == CURLMSG_DONE) {
2205         /* A hack, but I have seen curl_message->easy_handle being
2206          * NULL randomly, so check for that. */
2207         if (curl_message->easy_handle != NULL) {
2208           curl_multi_remove_handle (context->multi_handle,
2209               curl_message->easy_handle);
2210           gst_curl_http_src_remove_queue_handle (&context->queue,
2211               curl_message->easy_handle, curl_message->data.result);
2212         }
2213       }
2214     }
2215   }
2216 out:
2217   g_mutex_unlock (&context->mutex);
2218 }
2219 
2220 /*
2221  * Receive headers from the remote server and put them into the http_headers
2222  * structure to be sent downstream when we've got them all and started receiving
2223  * the body (see ::_handle_response())
2224  */
2225 static size_t
gst_curl_http_src_get_header(void * header,size_t size,size_t nmemb,void * src)2226 gst_curl_http_src_get_header (void *header, size_t size, size_t nmemb,
2227     void *src)
2228 {
2229   GstCurlHttpSrc *s = src;
2230   char *substr;
2231 
2232   GST_DEBUG_OBJECT (s, "Received header: %s", (char *) header);
2233 
2234   g_mutex_lock (&s->buffer_mutex);
2235 
2236   if (s->state == GSTCURL_UNLOCK) {
2237     g_mutex_unlock (&s->buffer_mutex);
2238     return size * nmemb;
2239   }
2240 
2241   if (s->http_headers == NULL) {
2242     /* Can't do anything here, so just silently swallow the header */
2243     GST_DEBUG_OBJECT (s, "HTTP Headers Structure has already been sent,"
2244         " ignoring header");
2245     g_mutex_unlock (&s->buffer_mutex);
2246     return size * nmemb;
2247   }
2248 
2249   substr = gst_curl_http_src_strcasestr (header, "HTTP");
2250   if (substr == header) {
2251     /* We have a status line! */
2252     gchar **status_line_fields;
2253 
2254     /* Have we already seen a status line? If so, delete any response headers */
2255     if (s->status_code > 0) {
2256       GstStructure *empty_headers =
2257           gst_structure_new_empty (RESPONSE_HEADERS_NAME);
2258       gst_structure_remove_field (s->http_headers, RESPONSE_HEADERS_NAME);
2259       gst_structure_set (s->http_headers, RESPONSE_HEADERS_NAME,
2260           GST_TYPE_STRUCTURE, empty_headers, NULL);
2261       gst_structure_free (empty_headers);
2262 
2263     }
2264 
2265     /* Process the status line */
2266     status_line_fields = g_strsplit ((gchar *) header, " ", 3);
2267     if (status_line_fields == NULL) {
2268       GST_ERROR_OBJECT (s, "Status line processing failed!");
2269     } else {
2270       s->status_code =
2271           (guint) g_ascii_strtoll (status_line_fields[1], NULL, 10);
2272       g_free (s->reason_phrase);
2273       s->reason_phrase = g_strdup (status_line_fields[2]);
2274       GST_INFO_OBJECT (s, "Received status %u for request for URI %s: %s",
2275           s->status_code, s->uri, s->reason_phrase);
2276       gst_structure_set (s->http_headers, HTTP_STATUS_CODE,
2277           G_TYPE_UINT, s->status_code, NULL);
2278       g_strfreev (status_line_fields);
2279     }
2280   } else {
2281     /* Normal header line */
2282     gchar **header_tpl = g_strsplit ((gchar *) header, ": ", 2);
2283     if (header_tpl == NULL) {
2284       GST_ERROR_OBJECT (s, "Header processing failed! (%s)", (gchar *) header);
2285     } else {
2286       const GValue *gv_resp_hdrs = gst_structure_get_value (s->http_headers,
2287           RESPONSE_HEADERS_NAME);
2288       const GstStructure *response_headers =
2289           gst_value_get_structure (gv_resp_hdrs);
2290       /* Store header key lower case (g_ascii_strdown), makes searching through
2291        * later on easier - end applications shouldn't care, as all HTTP headers
2292        * are case-insensitive */
2293       gchar *header_key = g_ascii_strdown (header_tpl[0], -1);
2294       gchar *header_value;
2295 
2296       /* If header field already exists, append to the end */
2297       if (gst_structure_has_field (response_headers, header_key) == TRUE) {
2298         header_value = g_strdup_printf ("%s, %s",
2299             gst_structure_get_string (response_headers, header_key),
2300             header_tpl[1]);
2301         gst_structure_set ((GstStructure *) response_headers, header_key,
2302             G_TYPE_STRING, header_value, NULL);
2303         g_free (header_value);
2304       } else {
2305         header_value = header_tpl[1];
2306         gst_structure_set ((GstStructure *) response_headers, header_key,
2307             G_TYPE_STRING, header_value, NULL);
2308       }
2309 
2310       /* We have some special cases - deal with them here */
2311       if (g_strcmp0 (header_key, "content-type") == 0) {
2312         gst_curl_http_src_negotiate_caps (src);
2313       } else if (g_strcmp0 (header_key, "accept-ranges") == 0 &&
2314           g_ascii_strcasecmp (header_value, "none") == 0) {
2315         s->seekable = GSTCURL_SEEKABLE_FALSE;
2316       } else if (g_strcmp0 (header_key, "content-range") == 0) {
2317         /* In the case of a Range GET, the Content-Length header will contain
2318            the size of range requested, and the Content-Range header will
2319            have the start, stop and total size of the resource */
2320         gchar *size = strchr (header_value, '/');
2321         if (size) {
2322 #ifdef OHOS_OPT_COMPAT
2323           /* ohos.opt.compat.0032 */
2324           s->content_size = atoi (size + 1);
2325 #else
2326           s->content_size = atoi (size);
2327 #endif
2328         }
2329       }
2330 
2331       g_free (header_key);
2332       g_strfreev (header_tpl);
2333     }
2334   }
2335 
2336   s->hdrs_updated = TRUE;
2337 
2338   g_mutex_unlock (&s->buffer_mutex);
2339 
2340   return size * nmemb;
2341 }
2342 
2343 /*
2344  * My own quick and dirty implementation of strcasestr. This is a GNU extension
2345  * (i.e. not portable) and not always guaranteed to be available.
2346  *
2347  * I know this doesn't work if the haystack and needle are the same size. But
2348  * this isn't necessarily a bad thing, as the only place we currently use this
2349  * is at a point where returning nothing even if a string match occurs but the
2350  * needle is the same size as the haystack actually saves us time.
2351  */
2352 static char *
gst_curl_http_src_strcasestr(const char * haystack,const char * needle)2353 gst_curl_http_src_strcasestr (const char *haystack, const char *needle)
2354 {
2355   int i, j, needle_len;
2356   char *location;
2357 
2358   needle_len = (int) strlen (needle);
2359   i = 0;
2360   j = 0;
2361   location = NULL;
2362 
2363   while (haystack[i] != '\0') {
2364     if (j == needle_len) {
2365       location = (char *) haystack + (i - j);
2366     }
2367     if (tolower (haystack[i]) == tolower (needle[j])) {
2368       j++;
2369     } else {
2370       j = 0;
2371     }
2372     i++;
2373   }
2374 
2375   return location;
2376 }
2377 
2378 #ifdef OHOS_EXT_FUNC
2379 /*
2380  * ohos.ext.func.0025 support https seek:
2381  */
2382 static void
gst_curl_http_src_update_position(GstCurlHttpSrc * src,guint64 bytes_read)2383 gst_curl_http_src_update_position (GstCurlHttpSrc * src, guint64 bytes_read)
2384 {
2385   guint64 new_position;
2386 #ifdef OHOS_OPT_COMPAT
2387   /* ohos.opt.compat.0044 */
2388   if (src->read_position != (guint64)-1 && bytes_read > (G_MAXUINT64 - src->read_position)) {
2389 #else
2390   if (bytes_read > (G_MAXUINT64 - src->read_position)) {
2391 #endif
2392     GST_WARNING_OBJECT (src, "bytes_read:%" G_GUINT64_FORMAT " abnormal, should check, read pos:%" G_GUINT64_FORMAT,
2393       bytes_read, src->read_position);
2394     return;
2395   }
2396 
2397 #ifdef OHOS_OPT_COMPAT
2398   /* ohos.opt.compat.0044 */
2399   new_position = (src->read_position == (guint64)-1) ? bytes_read : (src->read_position + bytes_read);
2400 #else
2401   new_position = src->read_position + bytes_read;
2402 #endif
2403   if (G_LIKELY(src->request_position == src->read_position)) {
2404     src->request_position = new_position;
2405   }
2406   src->read_position = new_position;
2407 
2408   GST_DEBUG_OBJECT (src, "bytes_read:%" G_GUINT64_FORMAT ", req:%" G_GUINT64_FORMAT ", read:%" G_GUINT64_FORMAT,
2409     bytes_read, src->request_position, src->read_position);
2410 }
2411 #endif
2412 
2413 /*
2414  * Receive chunks of the requested body and pass these back to the ::create()
2415  * loop
2416  */
2417 static size_t
2418 gst_curl_http_src_get_chunks (void *chunk, size_t size, size_t nmemb, void *src)
2419 {
2420   GstCurlHttpSrc *s = src;
2421   size_t chunk_len = size * nmemb;
2422   GST_TRACE_OBJECT (s,
2423       "Received curl chunk for URI %s of size %d", s->uri, (int) chunk_len);
2424   g_mutex_lock (&s->buffer_mutex);
2425 
2426 #ifdef OHOS_EXT_FUNC
2427 /* ohos.ext.func.0025 support https seek: */
2428   gst_curl_http_src_update_position(s, (guint64)chunk_len);
2429 #endif
2430 
2431   if (s->state == GSTCURL_UNLOCK) {
2432     g_mutex_unlock (&s->buffer_mutex);
2433     return chunk_len;
2434   }
2435   s->buffer =
2436       g_realloc (s->buffer, (s->buffer_len + chunk_len + 1) * sizeof (char));
2437   if (s->buffer == NULL) {
2438     GST_ERROR_OBJECT (s, "Realloc for cURL response message failed!");
2439     return 0;
2440   }
2441   memcpy (s->buffer + s->buffer_len, chunk, chunk_len);
2442   s->buffer_len += chunk_len;
2443   g_cond_signal (&s->buffer_cond);
2444   g_mutex_unlock (&s->buffer_mutex);
2445   return chunk_len;
2446 }
2447 
2448 /*
2449  * Request a cancellation of a currently running curl handle.
2450  */
2451 static void
2452 gst_curl_http_src_request_remove (GstCurlHttpSrc * src)
2453 {
2454   GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
2455       GST_TYPE_CURL_HTTP_SRC,
2456       GstCurlHttpSrcClass);
2457 
2458   g_mutex_lock (&klass->multi_task_context.mutex);
2459   g_mutex_lock (&src->buffer_mutex);
2460   if (src->connection_status == GSTCURL_CONNECTED) {
2461     src->connection_status = GSTCURL_WANT_REMOVAL;
2462   }
2463   g_mutex_unlock (&src->buffer_mutex);
2464   g_cond_signal (&klass->multi_task_context.signal);
2465   g_mutex_unlock (&klass->multi_task_context.mutex);
2466 }
2467 
2468 /*
2469  * Request a cancellation of a currently running curl handle and
2470  * block this thread until the src element has been removed
2471  * from the queue
2472  */
2473 static void
2474 gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src)
2475 {
2476   gst_curl_http_src_request_remove (src);
2477   g_mutex_lock (&src->buffer_mutex);
2478   while (src->connection_status != GSTCURL_NOT_CONNECTED) {
2479     g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
2480   }
2481   g_mutex_unlock (&src->buffer_mutex);
2482 }
2483 
2484 #ifndef GST_DISABLE_GST_DEBUG
2485 /*
2486  * This callback receives debug information, as specified in the type argument.
2487  * This function must return 0.
2488  */
2489 static int
2490 gst_curl_http_src_get_debug (CURL * handle, curl_infotype type, char *data,
2491     size_t size, void *clientp)
2492 {
2493   GstCurlHttpSrc *src = (GstCurlHttpSrc *) clientp;
2494   gchar *msg = NULL;
2495 
2496   switch (type) {
2497     case CURLINFO_TEXT:
2498     case CURLINFO_HEADER_OUT:
2499       msg = g_memdup2 (data, size);
2500       if (size > 0) {
2501         msg[size - 1] = '\0';
2502         g_strchomp (msg);
2503       }
2504       break;
2505     default:
2506       break;
2507   }
2508 
2509   switch (type) {
2510     case CURLINFO_TEXT:
2511       GST_DEBUG_OBJECT (src, "%s", msg);
2512       break;
2513     case CURLINFO_HEADER_OUT:
2514       GST_DEBUG_OBJECT (src, "outgoing header: %s", msg);
2515       break;
2516     case CURLINFO_DATA_IN:
2517       GST_MEMDUMP_OBJECT (src, "incoming data", (guint8 *) data, size);
2518       break;
2519     case CURLINFO_DATA_OUT:
2520       GST_MEMDUMP_OBJECT (src, "outgoing data", (guint8 *) data, size);
2521       break;
2522     case CURLINFO_SSL_DATA_IN:
2523       GST_MEMDUMP_OBJECT (src, "incoming ssl data", (guint8 *) data, size);
2524       break;
2525     case CURLINFO_SSL_DATA_OUT:
2526       GST_MEMDUMP_OBJECT (src, "outgoing ssl data", (guint8 *) data, size);
2527       break;
2528     default:
2529       GST_DEBUG_OBJECT (src, "unknown debug info type %d", type);
2530       GST_MEMDUMP_OBJECT (src, "unknown data", (guint8 *) data, size);
2531       break;
2532   }
2533   g_free (msg);
2534   return 0;
2535 }
2536 #endif
2537 
2538 #ifdef OHOS_EXT_FUNC
2539 /**
2540  * ohos.ext.func.0033
2541  * Support reconnection after disconnection in gstcurl.
2542  */
2543 static void gst_curl_http_src_deal_sockets_timeout (GstCurlHttpSrcMultiTaskContext *context)
2544 {
2545   GstCurlHttpSrcQueueElement *qnext = NULL;
2546   GstCurlHttpSrcQueueElement *qelement = context->queue;
2547   GstCurlHttpSrc *elt = NULL;
2548   while (qelement != NULL) {
2549     qnext = qelement->next;
2550     elt = qelement->p;
2551     g_mutex_lock (&elt->buffer_mutex);
2552     elt->curl_result = CURLE_COULDNT_CONNECT;
2553     curl_multi_remove_handle (context->multi_handle, elt->curl_handle);
2554     elt->connection_status = GSTCURL_NOT_CONNECTED;
2555     elt->data_received = FALSE;
2556     gst_curl_http_src_remove_queue_item (&context->queue, qelement->p);
2557     g_cond_signal (&elt->buffer_cond);
2558     g_mutex_unlock (&elt->buffer_mutex);
2559     qelement = qnext;
2560   }
2561 }
2562 
2563 static gboolean gst_curl_http_src_reconnect_is_timeout (GstCurlHttpSrc *src)
2564 {
2565   if (src->player_state == GST_PLAYER_STATUS_PAUSED || src->player_state == GST_PLAYER_STATUS_PLAYING ||
2566     src->player_state == GST_PLAYER_STATUS_READY) {
2567     GST_INFO_OBJECT (src, "player_state is paused or playing or ready, donot wait for reconnection");
2568     return FALSE;
2569   }
2570 
2571   if (src->start_usecs == 0) {
2572     src->start_usecs = g_get_monotonic_time ();
2573     GST_INFO_OBJECT (src, "Wait for network reconnecting, timeout:0us");
2574   } else {
2575     src->end_usecs = g_get_monotonic_time ();
2576     gint64 time_diff_us = src->end_usecs - src->start_usecs;
2577     if (time_diff_us > (gint64)src->reconnection_timeout) {
2578       GST_INFO_OBJECT (src, "Network has broken too long, exit! end_usecs:%"G_GINT64_FORMAT" us, "
2579           "start_usecs:%"G_GINT64_FORMAT" us, reconnection_timeout:%u us",
2580           src->end_usecs, src->start_usecs, src->reconnection_timeout);
2581       return TRUE;
2582     }
2583     GST_INFO_OBJECT (src, "Wait for network reconnecting, timeout:%"G_GINT64_FORMAT"us", time_diff_us);
2584   }
2585   return FALSE;
2586 }
2587 #endif
2588