• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GstCurlHttpSrc
3  * Copyright 2017 British Broadcasting Corporation - Research and Development
4  *
5  * Author: Sam Hurst <samuelh@rd.bbc.co.uk>
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Alternatively, the contents of this file may be used under the
26  * GNU Lesser General Public License Version 2.1 (the "LGPL"), in
27  * which case the following provisions apply instead of the ones
28  * mentioned above:
29  *
30  * This library is free software; you can redistribute it and/or
31  * modify it under the terms of the GNU Library General Public
32  * License as published by the Free Software Foundation; either
33  * version 2 of the License, or (at your option) any later version.
34  *
35  * This library is distributed in the hope that it will be useful,
36  * but WITHOUT ANY WARRANTY; without even the implied warranty of
37  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
38  * Library General Public License for more details.
39  *
40  * You should have received a copy of the GNU Library General Public
41  * License along with this library; if not, write to the
42  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
43  * Boston, MA 02111-1307, USA.
44  */
45 
46 /**
47  * SECTION:element-curlhttpsrc
48  *
49  * This plugin reads data from a remote location specified by a URI, when the
50  * protocol is 'http' or 'https'.
51  *
52  * It is based on the cURL project (http://curl.haxx.se/) and is specifically
53  * designed to be also used with nghttp2 (http://nghttp2.org) to enable HTTP/2
54  * support for GStreamer. Your libcurl library MUST be compiled against nghttp2
55  * for HTTP/2 support for this functionality. HTTPS support is dependent on
56  * cURL being built with SSL support (OpenSSL/PolarSSL/NSS/GnuTLS).
57  *
58  * An HTTP proxy must be specified by URL.
59  * If the "http_proxy" environment variable is set, its value is used.
60  * The #GstCurlHttpSrc:proxy property can be used to override the default.
61  *
62  * ## Example launch line
63  *
64  * |[
65  * gst-launch-1.0 curlhttpsrc location=http://127.0.1.1/index.html ! fakesink dump=1
66  * ]| The above pipeline reads a web page from the local machine using HTTP and
67  * dumps it to stdout.
68  * |[
69  * gst-launch-1.0 playbin uri=http://rdmedia.bbc.co.uk/dash/testmpds/multiperiod/bbb.php
70  * ]| The above pipeline will start up a DASH streaming session from the given
71  * MPD file. This requires GStreamer to have been built with dashdemux from
72  * gst-plugins-bad.
73  */
74 
75 /*
76  * Thread safety notes.
77  *
78  * GstCurlHttpSrc uses a single thread running the
79  * gst_curl_http_src_curl_multi_loop() function to handle receiving
80  * data and messages from libcurl. Each instance of GstCurlHttpSrc adds
81  * an entry into a queue in GstCurlHttpSrcMultiTaskContext and waits
82  * for the multi_loop to perform the HTTP request.
83  *
84  * When an instance of GstCurlHttpSrc wants to make a request (i.e.
85  * it has moved to the PLAYING state) it adds itself to the
86  * multi_task_context.queue list and signals the multi_loop task.
87  *
88  * Each instance of GstCurlHttpSrc uses buffer_mutex and buffer_cond
89  * to wait for gst_curl_http_src_curl_multi_loop() to perform the
90  * request and signal completion.
91  *
92  * Each instance of GstCurlHttpSrc is protected by the mutexes:
93  * 1. uri_mutex
94  * 2. buffer_mutex
95  *
96  * uri_mutex is used to protect access to the uri field.
97  *
98  * buffer_mutex is used to protect access to buffer_cond, state and
99  * connection_status.
100  *
101  * The gst_curl_http_src_curl_multi_loop() function uses the mutexes:
102  * 1. multi_task_context.task_rec_mutex
103  * 2. multi_task_context.mutex
104  *
105  * multi_task_context.task_rec_mutex is only used by GstTask.
106  *
107  * multi_task_context.mutex is used to protect access to queue and state
108  *
109  * To avoid deadlock, it is vital that if both multi_task_context.mutex
110  * and buffer_mutex are required, that they are locked in the order:
111  * 1. multi_task_context.mutex
112  * 2. buffer_mutex
113  */
114 
115 #ifdef HAVE_CONFIG_H
116 #include <config.h>
117 #endif
118 
119 #include <gst/gst-i18n-plugin.h>
120 
121 #include "gstcurlelements.h"
122 #include "gstcurlhttpsrc.h"
123 #include "gstcurlqueue.h"
124 #include "gstcurldefaults.h"
125 
126 GST_DEBUG_CATEGORY_STATIC (gst_curl_http_src_debug);
127 #define GST_CAT_DEFAULT gst_curl_http_src_debug
128 GST_DEBUG_CATEGORY_STATIC (gst_curl_loop_debug);
129 
130 #define CURL_HTTP_SRC_ERROR(src,cat,code,error_message)     \
131   do { \
132     GST_ELEMENT_ERROR_WITH_DETAILS ((src), cat, code, ("%s", error_message), \
133         ("%s (%d), URL: %s, Redirect to: %s", (src)->reason_phrase, \
134             (src)->status_code, (src)->uri, GST_STR_NULL ((src)->redirect_uri)), \
135             ("http-status-code", G_TYPE_UINT, (src)->status_code, \
136              "http-redirect-uri", G_TYPE_STRING, GST_STR_NULL ((src)->redirect_uri), NULL)); \
137   } while(0)
138 
139 enum
140 {
141   PROP_0,
142   PROP_URI,
143   PROP_USERNAME,
144   PROP_PASSWORD,
145   PROP_PROXYURI,
146   PROP_PROXYUSERNAME,
147   PROP_PROXYPASSWORD,
148   PROP_COOKIES,
149   PROP_USERAGENT,
150   PROP_HEADERS,
151   PROP_COMPRESS,
152   PROP_REDIRECT,
153   PROP_MAXREDIRECT,
154   PROP_KEEPALIVE,
155   PROP_TIMEOUT,
156   PROP_STRICT_SSL,
157   PROP_SSL_CA_FILE,
158   PROP_RETRIES,
159   PROP_CONNECTIONMAXTIME,
160   PROP_MAXCONCURRENT_SERVER,
161   PROP_MAXCONCURRENT_PROXY,
162   PROP_MAXCONCURRENT_GLOBAL,
163   PROP_HTTPVERSION,
164   PROP_IRADIO_MODE,
165   PROP_MAX
166 };
167 
168 /*
169  * Make a source pad template to be able to kick out recv'd data
170  */
171 static GstStaticPadTemplate srcpadtemplate = GST_STATIC_PAD_TEMPLATE ("src",
172     GST_PAD_SRC,
173     GST_PAD_ALWAYS,
174     GST_STATIC_CAPS_ANY);
175 
176 /*
177  * Function Definitions
178  */
179 /* Gstreamer generic element functions */
180 static void gst_curl_http_src_set_property (GObject * object, guint prop_id,
181     const GValue * value, GParamSpec * pspec);
182 static void gst_curl_http_src_get_property (GObject * object, guint prop_id,
183     GValue * value, GParamSpec * pspec);
184 static void gst_curl_http_src_ref_multi (GstCurlHttpSrc * src);
185 static void gst_curl_http_src_unref_multi (GstCurlHttpSrc * src);
186 static void gst_curl_http_src_finalize (GObject * obj);
187 static GstFlowReturn gst_curl_http_src_create (GstPushSrc * psrc,
188     GstBuffer ** outbuf);
189 static GstFlowReturn gst_curl_http_src_handle_response (GstCurlHttpSrc * src);
190 static gboolean gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src);
191 static GstStateChangeReturn gst_curl_http_src_change_state (GstElement *
192     element, GstStateChange transition);
193 static void gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src);
194 static gboolean gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query);
195 static gboolean gst_curl_http_src_get_content_length (GstBaseSrc * bsrc,
196     guint64 * size);
197 static gboolean gst_curl_http_src_is_seekable (GstBaseSrc * bsrc);
198 static gboolean gst_curl_http_src_do_seek (GstBaseSrc * bsrc,
199     GstSegment * segment);
200 static gboolean gst_curl_http_src_unlock (GstBaseSrc * bsrc);
201 static gboolean gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc);
202 
203 /* URI Handler functions */
204 static void gst_curl_http_src_uri_handler_init (gpointer g_iface,
205     gpointer iface_data);
206 static guint gst_curl_http_src_urihandler_get_type (GType type);
207 static const gchar *const *gst_curl_http_src_urihandler_get_protocols (GType
208     type);
209 static gchar *gst_curl_http_src_urihandler_get_uri (GstURIHandler * handler);
210 static gboolean gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
211     const gchar * uri, GError ** error);
212 
213 /* GstTask functions */
214 static void gst_curl_http_src_curl_multi_loop (gpointer thread_data);
215 static CURL *gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s);
216 static inline void gst_curl_http_src_destroy_easy_handle (GstCurlHttpSrc * src);
217 static size_t gst_curl_http_src_get_header (void *header, size_t size,
218     size_t nmemb, void *src);
219 static size_t gst_curl_http_src_get_chunks (void *chunk, size_t size,
220     size_t nmemb, void *src);
221 static void gst_curl_http_src_request_remove (GstCurlHttpSrc * src);
222 static void gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src);
223 static char *gst_curl_http_src_strcasestr (const char *haystack,
224     const char *needle);
225 #ifndef GST_DISABLE_GST_DEBUG
226 static int gst_curl_http_src_get_debug (CURL * handle, curl_infotype type,
227     char *data, size_t size, void *clientp);
228 #endif
229 
230 static curl_version_info_data *gst_curl_http_src_curl_capabilities = NULL;
231 static GstCurlHttpVersion pref_http_ver;
232 
233 #define GST_TYPE_CURL_HTTP_VERSION (gst_curl_http_version_get_type ())
234 
235 static GType
gst_curl_http_version_get_type(void)236 gst_curl_http_version_get_type (void)
237 {
238   static GType gtype = 0;
239 
240   if (!gtype) {
241     static const GEnumValue http_versions[] = {
242       {GSTCURL_HTTP_VERSION_1_0, "HTTP Version 1.0", "1.0"},
243       {GSTCURL_HTTP_VERSION_1_1, "HTTP Version 1.1", "1.1"},
244 #ifdef CURL_VERSION_HTTP2
245       {GSTCURL_HTTP_VERSION_2_0, "HTTP Version 2.0", "2.0"},
246 #endif
247       {0, NULL, NULL}
248     };
249     gtype = g_enum_register_static ("GstCurlHttpVersionType", http_versions);
250   }
251   return gtype;
252 }
253 
254 #define gst_curl_http_src_parent_class parent_class
255 G_DEFINE_TYPE_WITH_CODE (GstCurlHttpSrc, gst_curl_http_src, GST_TYPE_PUSH_SRC,
256     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
257         gst_curl_http_src_uri_handler_init));
258 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (curlhttpsrc, "curlhttpsrc",
259     GST_RANK_SECONDARY, GST_TYPE_CURLHTTPSRC, curl_element_init (plugin));
260 
261 static void
gst_curl_http_src_class_init(GstCurlHttpSrcClass * klass)262 gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
263 {
264   GObjectClass *gobject_class;
265   GstElementClass *gstelement_class;
266   GstBaseSrcClass *gstbasesrc_class;
267   GstPushSrcClass *gstpushsrc_class;
268   const gchar *http_env;
269   GstCurlHttpVersion default_http_version;
270 
271   gobject_class = (GObjectClass *) klass;
272   gstelement_class = (GstElementClass *) klass;
273   gstbasesrc_class = (GstBaseSrcClass *) klass;
274   gstpushsrc_class = (GstPushSrcClass *) klass;
275 
276   GST_DEBUG_CATEGORY_INIT (gst_curl_http_src_debug, "curlhttpsrc",
277       0, "UriHandler for libcURL");
278 
279   gstelement_class->change_state =
280       GST_DEBUG_FUNCPTR (gst_curl_http_src_change_state);
281   gstpushsrc_class->create = GST_DEBUG_FUNCPTR (gst_curl_http_src_create);
282   gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_curl_http_src_query);
283   gstbasesrc_class->get_size =
284       GST_DEBUG_FUNCPTR (gst_curl_http_src_get_content_length);
285   gstbasesrc_class->is_seekable =
286       GST_DEBUG_FUNCPTR (gst_curl_http_src_is_seekable);
287   gstbasesrc_class->do_seek = GST_DEBUG_FUNCPTR (gst_curl_http_src_do_seek);
288   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock);
289   gstbasesrc_class->unlock_stop =
290       GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock_stop);
291 
292   gst_element_class_add_pad_template (gstelement_class,
293       gst_static_pad_template_get (&srcpadtemplate));
294 
295   gst_curl_http_src_curl_capabilities = curl_version_info (CURLVERSION_NOW);
296 #ifdef CURL_VERSION_HTTP2
297   if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
298     default_http_version = GSTCURL_HTTP_VERSION_2_0;
299   } else
300 #endif
301     default_http_version = GSTCURL_HTTP_VERSION_1_1;
302 
303   http_env = g_getenv ("GST_CURL_HTTP_VER");
304   if (http_env != NULL) {
305     GST_INFO_OBJECT (klass, "Seen env var GST_CURL_HTTP_VER with value %s",
306         http_env);
307     if (!strcmp (http_env, "1.0")) {
308       pref_http_ver = GSTCURL_HTTP_VERSION_1_0;
309     } else if (!strcmp (http_env, "1.1")) {
310       pref_http_ver = GSTCURL_HTTP_VERSION_1_1;
311     } else if (!strcmp (http_env, "2.0")) {
312 #ifdef CURL_VERSION_HTTP2
313       if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
314         pref_http_ver = GSTCURL_HTTP_VERSION_2_0;
315       } else {
316         goto unsupported_http_version;
317       }
318 #endif
319     } else {
320     unsupported_http_version:
321       GST_WARNING_OBJECT (klass,
322           "Unsupported HTTP version: %s. Fallback to default", http_env);
323       pref_http_ver = default_http_version;
324     }
325   } else {
326     pref_http_ver = default_http_version;
327   }
328 
329   gobject_class->set_property = gst_curl_http_src_set_property;
330   gobject_class->get_property = gst_curl_http_src_get_property;
331   gobject_class->finalize = gst_curl_http_src_finalize;
332 
333   g_object_class_install_property (gobject_class, PROP_URI,
334       g_param_spec_string ("location", "Location", "URI of resource to read",
335           GSTCURL_HANDLE_DEFAULT_CURLOPT_URL,
336           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
337 
338   g_object_class_install_property (gobject_class, PROP_USERNAME,
339       g_param_spec_string ("user-id", "user-id",
340           "HTTP location URI user id for authentication",
341           GSTCURL_HANDLE_DEFAULT_CURLOPT_USERNAME,
342           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
343 
344   g_object_class_install_property (gobject_class, PROP_PASSWORD,
345       g_param_spec_string ("user-pw", "user-pw",
346           "HTTP location URI password for authentication",
347           GSTCURL_HANDLE_DEFAULT_CURLOPT_PASSWORD,
348           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
349 
350   g_object_class_install_property (gobject_class, PROP_PROXYURI,
351       g_param_spec_string ("proxy", "Proxy", "URI of HTTP proxy server",
352           GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXY,
353           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
354 
355   g_object_class_install_property (gobject_class, PROP_PROXYUSERNAME,
356       g_param_spec_string ("proxy-id", "proxy-id",
357           "HTTP proxy URI user id for authentication",
358           GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYUSERNAME,
359           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
360 
361   g_object_class_install_property (gobject_class, PROP_PROXYPASSWORD,
362       g_param_spec_string ("proxy-pw", "proxy-pw",
363           "HTTP proxy URI password for authentication",
364           GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYPASSWORD,
365           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
366 
367   g_object_class_install_property (gobject_class, PROP_COOKIES,
368       g_param_spec_boxed ("cookies", "Cookies", "List of HTTP Cookies",
369           G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
370 
371   g_object_class_install_property (gobject_class, PROP_USERAGENT,
372       g_param_spec_string ("user-agent", "User-Agent",
373           "URI of resource requested",
374           GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/<curl-version>",
375           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
376           GST_PARAM_DOC_SHOW_DEFAULT));
377 
378   g_object_class_install_property (gobject_class, PROP_COMPRESS,
379       g_param_spec_boolean ("compress", "Compress",
380           "Allow compressed content encodings",
381           GSTCURL_HANDLE_DEFAULT_CURLOPT_ACCEPT_ENCODING,
382           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383 
384   g_object_class_install_property (gobject_class, PROP_REDIRECT,
385       g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
386           "Allow HTTP Redirections (HTTP Status Code 300 series)",
387           GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION,
388           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
389 
390   g_object_class_install_property (gobject_class, PROP_MAXREDIRECT,
391       g_param_spec_int ("max-redirect", "Max-Redirect",
392           "Maximum number of permitted redirections. -1 is unlimited.",
393           GSTCURL_HANDLE_MIN_CURLOPT_MAXREDIRS,
394           GSTCURL_HANDLE_MAX_CURLOPT_MAXREDIRS,
395           GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS,
396           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
397 
398   g_object_class_install_property (gobject_class, PROP_KEEPALIVE,
399       g_param_spec_boolean ("keep-alive", "Keep-Alive",
400           "Toggle keep-alive for connection reuse.",
401           GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE,
402           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
403 
404   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
405       g_param_spec_int ("timeout", "Timeout",
406           "Value in seconds before timeout a blocking request (0 = no timeout)",
407           GSTCURL_HANDLE_MIN_CURLOPT_TIMEOUT,
408           GSTCURL_HANDLE_MAX_CURLOPT_TIMEOUT,
409           GSTCURL_HANDLE_DEFAULT_CURLOPT_TIMEOUT,
410           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
411 
412   g_object_class_install_property (gobject_class, PROP_HEADERS,
413       g_param_spec_boxed ("extra-headers", "Extra Headers",
414           "Extra headers to append to the HTTP request",
415           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
416 
417   g_object_class_install_property (gobject_class, PROP_STRICT_SSL,
418       g_param_spec_boolean ("ssl-strict", "SSL Strict",
419           "Strict SSL certificate checking",
420           GSTCURL_HANDLE_DEFAULT_CURLOPT_SSL_VERIFYPEER,
421           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422 
423   g_object_class_install_property (gobject_class, PROP_SSL_CA_FILE,
424       g_param_spec_string ("ssl-ca-file", "SSL CA File",
425           "Location of an SSL CA file to use for checking SSL certificates",
426           GSTCURL_HANDLE_DEFAULT_CURLOPT_CAINFO,
427           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428 
429   g_object_class_install_property (gobject_class, PROP_RETRIES,
430       g_param_spec_int ("retries", "Retries",
431           "Maximum number of retries until giving up (-1=infinite)",
432           GSTCURL_HANDLE_MIN_RETRIES, GSTCURL_HANDLE_MAX_RETRIES,
433           GSTCURL_HANDLE_DEFAULT_RETRIES,
434           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
435 
436   g_object_class_install_property (gobject_class, PROP_CONNECTIONMAXTIME,
437       g_param_spec_uint ("max-connection-time", "Max-Connection-Time",
438           "Maximum amount of time to keep-alive HTTP connections",
439           GSTCURL_MIN_CONNECTION_TIME, GSTCURL_MAX_CONNECTION_TIME,
440           GSTCURL_DEFAULT_CONNECTION_TIME,
441           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442 
443   g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_SERVER,
444       g_param_spec_uint ("max-connections-per-server",
445           "Max-Connections-Per-Server",
446           "Maximum number of connections allowed per server for HTTP/1.x",
447           GSTCURL_MIN_CONNECTIONS_SERVER, GSTCURL_MAX_CONNECTIONS_SERVER,
448           GSTCURL_DEFAULT_CONNECTIONS_SERVER,
449           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
450 
451   g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_PROXY,
452       g_param_spec_uint ("max-connections-per-proxy",
453           "Max-Connections-Per-Proxy",
454           "Maximum number of concurrent connections allowed per proxy for HTTP/1.x",
455           GSTCURL_MIN_CONNECTIONS_PROXY, GSTCURL_MAX_CONNECTIONS_PROXY,
456           GSTCURL_DEFAULT_CONNECTIONS_PROXY,
457           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
458 
459   g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_GLOBAL,
460       g_param_spec_uint ("max-connections", "Max-Connections",
461           "Maximum number of concurrent connections allowed for HTTP/1.x",
462           GSTCURL_MIN_CONNECTIONS_GLOBAL, GSTCURL_MAX_CONNECTIONS_GLOBAL,
463           GSTCURL_DEFAULT_CONNECTIONS_GLOBAL,
464           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
465 
466   g_object_class_install_property (gobject_class, PROP_HTTPVERSION,
467       g_param_spec_enum ("http-version", "HTTP-Version",
468           "The preferred HTTP protocol version",
469           GST_TYPE_CURL_HTTP_VERSION, pref_http_ver,
470           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
471 
472   /* Add a debugging task so it's easier to debug in the Multi worker thread */
473   GST_DEBUG_CATEGORY_INIT (gst_curl_loop_debug, "curl_multi_loop", 0,
474       "libcURL loop thread debugging");
475 #ifndef GST_DISABLE_GST_DEBUG
476   gst_debug_log (gst_curl_loop_debug, GST_LEVEL_INFO, __FILE__, __func__,
477       __LINE__, NULL, "Testing the curl_multi_loop debugging prints");
478 #endif
479 
480   klass->multi_task_context.task = NULL;
481   klass->multi_task_context.refcount = 0;
482   klass->multi_task_context.queue = NULL;
483   klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
484   klass->multi_task_context.multi_handle = NULL;
485   g_mutex_init (&klass->multi_task_context.mutex);
486 #ifdef OHOS_OPT_STABLE
487   /* ohos.opt.stable.0003 for multiple instances ref/unref mutex */
488   g_mutex_init (&klass->multi_task_context.multiple_mutex);
489 #endif
490   g_cond_init (&klass->multi_task_context.signal);
491 
492   gst_element_class_set_static_metadata (gstelement_class,
493       "HTTP Client Source using libcURL",
494       "Source/Network",
495       "Receiver data as a client over a network via HTTP using cURL",
496       "Sam Hurst <samuelh@rd.bbc.co.uk>");
497 
498   gst_type_mark_as_plugin_api (GST_TYPE_CURL_HTTP_VERSION, 0);
499 }
500 
501 static void
gst_curl_http_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)502 gst_curl_http_src_set_property (GObject * object, guint prop_id,
503     const GValue * value, GParamSpec * pspec)
504 {
505   GstCurlHttpSrc *source = GST_CURLHTTPSRC (object);
506   GSTCURL_FUNCTION_ENTRY (source);
507 
508   switch (prop_id) {
509     case PROP_URI:
510       g_mutex_lock (&source->uri_mutex);
511       g_free (source->uri);
512       source->uri = g_value_dup_string (value);
513       g_mutex_unlock (&source->uri_mutex);
514       break;
515     case PROP_USERNAME:
516       g_free (source->username);
517       source->username = g_value_dup_string (value);
518       break;
519     case PROP_PASSWORD:
520       g_free (source->password);
521       source->password = g_value_dup_string (value);
522       break;
523     case PROP_PROXYURI:
524       g_free (source->proxy_uri);
525       source->proxy_uri = g_value_dup_string (value);
526       break;
527     case PROP_PROXYUSERNAME:
528       g_free (source->proxy_user);
529       source->proxy_user = g_value_dup_string (value);
530       break;
531     case PROP_PROXYPASSWORD:
532       g_free (source->proxy_pass);
533       source->proxy_pass = g_value_dup_string (value);
534       break;
535     case PROP_COOKIES:
536       g_strfreev (source->cookies);
537       source->cookies = g_strdupv (g_value_get_boxed (value));
538       source->number_cookies = g_strv_length (source->cookies);
539       break;
540     case PROP_USERAGENT:
541       g_free (source->user_agent);
542       source->user_agent = g_value_dup_string (value);
543       break;
544     case PROP_HEADERS:
545     {
546       const GstStructure *s = gst_value_get_structure (value);
547       if (source->request_headers)
548         gst_structure_free (source->request_headers);
549       source->request_headers =
550           s ? gst_structure_copy (s) :
551           gst_structure_new_empty (REQUEST_HEADERS_NAME);
552     }
553       break;
554     case PROP_COMPRESS:
555       source->accept_compressed_encodings = g_value_get_boolean (value);
556       break;
557     case PROP_REDIRECT:
558       source->allow_3xx_redirect = g_value_get_boolean (value);
559       break;
560     case PROP_MAXREDIRECT:
561       source->max_3xx_redirects = g_value_get_int (value);
562       break;
563     case PROP_KEEPALIVE:
564       source->keep_alive = g_value_get_boolean (value);
565       break;
566     case PROP_TIMEOUT:
567       source->timeout_secs = g_value_get_int (value);
568       break;
569     case PROP_STRICT_SSL:
570       source->strict_ssl = g_value_get_boolean (value);
571       break;
572     case PROP_SSL_CA_FILE:
573 #ifdef OHOS_OPT_MEMLEAK
574       /* ohos.opt.memleak.0003 fix memory leak. */
575       g_free (source->custom_ca_file);
576 #endif
577       source->custom_ca_file = g_value_dup_string (value);
578       break;
579     case PROP_RETRIES:
580       source->total_retries = g_value_get_int (value);
581       break;
582     case PROP_CONNECTIONMAXTIME:
583       source->max_connection_time = g_value_get_uint (value);
584       break;
585     case PROP_MAXCONCURRENT_SERVER:
586       source->max_conns_per_server = g_value_get_uint (value);
587       break;
588     case PROP_MAXCONCURRENT_PROXY:
589       source->max_conns_per_proxy = g_value_get_uint (value);
590       break;
591     case PROP_MAXCONCURRENT_GLOBAL:
592       source->max_conns_global = g_value_get_uint (value);
593       break;
594     case PROP_HTTPVERSION:
595       source->preferred_http_version = g_value_get_enum (value);
596       break;
597     default:
598       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
599       break;
600   }
601   GSTCURL_FUNCTION_EXIT (source);
602 }
603 
604 static void
gst_curl_http_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)605 gst_curl_http_src_get_property (GObject * object, guint prop_id,
606     GValue * value, GParamSpec * pspec)
607 {
608   GstCurlHttpSrc *source = GST_CURLHTTPSRC (object);
609   GSTCURL_FUNCTION_ENTRY (source);
610 
611   switch (prop_id) {
612     case PROP_URI:
613       g_mutex_lock (&source->uri_mutex);
614       g_value_set_string (value, source->uri);
615       g_mutex_unlock (&source->uri_mutex);
616       break;
617     case PROP_USERNAME:
618       g_value_set_string (value, source->username);
619       break;
620     case PROP_PASSWORD:
621       g_value_set_string (value, source->password);
622       break;
623     case PROP_PROXYURI:
624       g_value_set_string (value, source->proxy_uri);
625       break;
626     case PROP_PROXYUSERNAME:
627       g_value_set_string (value, source->proxy_user);
628       break;
629     case PROP_PROXYPASSWORD:
630       g_value_set_string (value, source->proxy_pass);
631       break;
632     case PROP_COOKIES:
633       g_value_set_boxed (value, source->cookies);
634       break;
635     case PROP_USERAGENT:
636       g_value_set_string (value, source->user_agent);
637       break;
638     case PROP_HEADERS:
639       gst_value_set_structure (value, source->request_headers);
640       break;
641     case PROP_COMPRESS:
642       g_value_set_boolean (value, source->accept_compressed_encodings);
643       break;
644     case PROP_REDIRECT:
645       g_value_set_boolean (value, source->allow_3xx_redirect);
646       break;
647     case PROP_MAXREDIRECT:
648       g_value_set_int (value, source->max_3xx_redirects);
649       break;
650     case PROP_KEEPALIVE:
651       g_value_set_boolean (value, source->keep_alive);
652       break;
653     case PROP_TIMEOUT:
654       g_value_set_int (value, source->timeout_secs);
655       break;
656     case PROP_STRICT_SSL:
657       g_value_set_boolean (value, source->strict_ssl);
658       break;
659     case PROP_SSL_CA_FILE:
660       g_value_set_string (value, source->custom_ca_file);
661       break;
662     case PROP_RETRIES:
663       g_value_set_int (value, source->total_retries);
664       break;
665     case PROP_CONNECTIONMAXTIME:
666       g_value_set_uint (value, source->max_connection_time);
667       break;
668     case PROP_MAXCONCURRENT_SERVER:
669       g_value_set_uint (value, source->max_conns_per_server);
670       break;
671     case PROP_MAXCONCURRENT_PROXY:
672       g_value_set_uint (value, source->max_conns_per_proxy);
673       break;
674     case PROP_MAXCONCURRENT_GLOBAL:
675       g_value_set_uint (value, source->max_conns_global);
676       break;
677     case PROP_HTTPVERSION:
678       g_value_set_enum (value, source->preferred_http_version);
679       break;
680     default:
681       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
682       break;
683   }
684   GSTCURL_FUNCTION_EXIT (source);
685 }
686 
687 static void
gst_curl_http_src_init(GstCurlHttpSrc * source)688 gst_curl_http_src_init (GstCurlHttpSrc * source)
689 {
690   GSTCURL_FUNCTION_ENTRY (source);
691 
692   /* Assume everything is already free'd */
693   source->uri = NULL;
694   source->redirect_uri = NULL;
695   source->username = GSTCURL_HANDLE_DEFAULT_CURLOPT_USERNAME;
696   source->password = GSTCURL_HANDLE_DEFAULT_CURLOPT_PASSWORD;
697   source->proxy_uri = NULL;
698   source->proxy_user = NULL;
699   source->proxy_pass = NULL;
700   source->cookies = NULL;
701   g_assert (gst_curl_http_src_curl_capabilities != NULL);
702   source->user_agent =
703       g_strdup_printf (GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/%s",
704       gst_curl_http_src_curl_capabilities->version);
705   source->number_cookies = 0;
706   source->request_headers = gst_structure_new_empty (REQUEST_HEADERS_NAME);
707   source->allow_3xx_redirect = GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION;
708   source->max_3xx_redirects = GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS;
709   source->keep_alive = GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE;
710   source->timeout_secs = GSTCURL_HANDLE_DEFAULT_CURLOPT_TIMEOUT;
711   source->max_connection_time = GSTCURL_DEFAULT_CONNECTION_TIME;
712   source->max_conns_per_server = GSTCURL_DEFAULT_CONNECTIONS_SERVER;
713   source->max_conns_per_proxy = GSTCURL_DEFAULT_CONNECTIONS_PROXY;
714   source->max_conns_global = GSTCURL_DEFAULT_CONNECTIONS_GLOBAL;
715 #ifdef OHOS_OPT_COMPAT
716   /* ohos.ext.compat.0025
717   It will be closed temporarily until the certificate verification needs are delivered on September 30 */
718   source->strict_ssl = 0;
719 #else
720   source->strict_ssl = GSTCURL_HANDLE_DEFAULT_CURLOPT_SSL_VERIFYPEER;
721 #endif
722   source->custom_ca_file = NULL;
723   source->preferred_http_version = pref_http_ver;
724   source->total_retries = GSTCURL_HANDLE_DEFAULT_RETRIES;
725   source->retries_remaining = source->total_retries;
726   source->slist = NULL;
727   source->accept_compressed_encodings = FALSE;
728   source->seekable = GSTCURL_SEEKABLE_UNKNOWN;
729   source->content_size = 0;
730   source->request_position = 0;
731 #ifdef OHOS_EXT_FUNC
732   /* ohos.ext.func.0025 support https seek: */
733   source->orig_request_pos = 0;
734   source->read_position = 0;
735 #endif
736   source->stop_position = -1;
737 
738   gst_base_src_set_automatic_eos (GST_BASE_SRC (source), FALSE);
739 
740   source->proxy_uri = g_strdup (g_getenv ("http_proxy"));
741   source->no_proxy_list = g_strdup (g_getenv ("no_proxy"));
742 
743   g_mutex_init (&source->uri_mutex);
744   g_mutex_init (&source->buffer_mutex);
745   g_cond_init (&source->buffer_cond);
746 
747   source->buffer = NULL;
748   source->buffer_len = 0;
749   source->state = GSTCURL_NONE;
750   source->pending_state = GSTCURL_NONE;
751   source->transfer_begun = FALSE;
752   source->data_received = FALSE;
753   source->connection_status = GSTCURL_NOT_CONNECTED;
754 
755   source->http_headers = NULL;
756   source->content_type = NULL;
757   source->status_code = 0;
758   source->reason_phrase = NULL;
759   source->hdrs_updated = FALSE;
760   source->curl_result = CURLE_OK;
761   gst_caps_replace (&source->caps, NULL);
762 
763   GSTCURL_FUNCTION_EXIT (source);
764 }
765 
766 /*
767  * Check if the Curl multi loop has been started. If not, initialise it and
768  * start it running. If it is already running, increment the refcount.
769  */
770 static void
gst_curl_http_src_ref_multi(GstCurlHttpSrc * src)771 gst_curl_http_src_ref_multi (GstCurlHttpSrc * src)
772 {
773   GstCurlHttpSrcClass *klass;
774 
775   GSTCURL_FUNCTION_ENTRY (src);
776 
777   /*klass = (GstCurlHttpSrcClass) g_type_class_peek_parent (src); */
778   klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
779       GstCurlHttpSrcClass);
780 
781 #ifdef OHOS_OPT_STABLE
782   /* ohos.opt.stable.0003 for multiple instances unref mutex */
783   g_mutex_lock (&klass->multi_task_context.multiple_mutex);
784 #endif
785 
786   g_mutex_lock (&klass->multi_task_context.mutex);
787   if (klass->multi_task_context.refcount == 0) {
788     /* Set up various in-task properties */
789 
790     /* NULL is treated as the start of the list, no need to allocate. */
791     klass->multi_task_context.queue = NULL;
792 
793     /* set up curl */
794     klass->multi_task_context.multi_handle = curl_multi_init ();
795 
796     curl_multi_setopt (klass->multi_task_context.multi_handle,
797         CURLMOPT_PIPELINING, 1);
798 #ifdef CURLMOPT_MAX_HOST_CONNECTIONS
799     curl_multi_setopt (klass->multi_task_context.multi_handle,
800         CURLMOPT_MAX_HOST_CONNECTIONS, 1);
801 #endif
802 
803     /* Start the thread */
804     g_rec_mutex_init (&klass->multi_task_context.task_rec_mutex);
805     klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
806     klass->multi_task_context.task = gst_task_new (
807         (GstTaskFunction) gst_curl_http_src_curl_multi_loop,
808         (gpointer) & klass->multi_task_context, NULL);
809     gst_task_set_lock (klass->multi_task_context.task,
810         &klass->multi_task_context.task_rec_mutex);
811     if (gst_task_start (klass->multi_task_context.task) == FALSE) {
812       /*
813        * This is a pretty critical failure and is not recoverable, so commit
814        * sudoku and run away.
815        */
816       GSTCURL_ERROR_PRINT ("Couldn't start curl_multi task! Aborting.");
817       abort ();
818     }
819     GSTCURL_INFO_PRINT ("Curl multi loop has been correctly initialised!");
820   }
821   klass->multi_task_context.refcount++;
822   g_mutex_unlock (&klass->multi_task_context.mutex);
823 
824 #ifdef OHOS_OPT_STABLE
825   /* ohos.opt.stable.0003 for multiple instances unref mutex */
826   g_mutex_unlock (&klass->multi_task_context.multiple_mutex);
827 #endif
828 
829   GSTCURL_FUNCTION_EXIT (src);
830 }
831 
832 /*
833  * Decrement the reference count on the curl multi loop. If this is called by
834  * the last instance to hold a reference, shut down the worker. (Otherwise
835  * GStreamer can't close down with a thread still running). Also offers the
836  * "force_all" boolean parameter, which if TRUE removes all references and shuts
837  * down.
838  */
839 static void
gst_curl_http_src_unref_multi(GstCurlHttpSrc * src)840 gst_curl_http_src_unref_multi (GstCurlHttpSrc * src)
841 {
842   GstCurlHttpSrcClass *klass;
843 
844   GSTCURL_FUNCTION_ENTRY (src);
845 
846   klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
847       GstCurlHttpSrcClass);
848 
849 #ifdef OHOS_OPT_STABLE
850   /* ohos.opt.stable.0003 for multiple instances ref mutex */
851   g_mutex_lock (&klass->multi_task_context.multiple_mutex);
852 #endif
853 
854   g_mutex_lock (&klass->multi_task_context.mutex);
855 #ifdef OHOS_EXT_FUNC
856   /* ohos.ext.func.0025 for clean code */
857   if (klass->multi_task_context.refcount == 0) {
858     GST_WARNING_OBJECT (src, "worker thread refcount is 0");
859     g_mutex_unlock (&klass->multi_task_context.mutex);
860     GSTCURL_FUNCTION_EXIT (src);
861     return;
862   }
863 #endif
864   klass->multi_task_context.refcount--;
865   GST_INFO_OBJECT (src, "Closing instance, worker thread refcount is now %u",
866       klass->multi_task_context.refcount);
867 
868   if (klass->multi_task_context.refcount == 0) {
869     /* Everything's done! Clean up. */
870     gst_task_stop (klass->multi_task_context.task);
871     klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
872     g_cond_signal (&klass->multi_task_context.signal);
873     g_mutex_unlock (&klass->multi_task_context.mutex);
874     GST_DEBUG_OBJECT (src, "Joining curl_multi_loop task...");
875     gst_task_join (klass->multi_task_context.task);
876     gst_object_unref (klass->multi_task_context.task);
877     klass->multi_task_context.task = NULL;
878     curl_multi_cleanup (klass->multi_task_context.multi_handle);
879     klass->multi_task_context.multi_handle = NULL;
880     g_rec_mutex_clear (&klass->multi_task_context.task_rec_mutex);
881     GST_DEBUG_OBJECT (src, "multi_task_context cleanup complete");
882   } else {
883     g_mutex_unlock (&klass->multi_task_context.mutex);
884   }
885 
886 #ifdef OHOS_OPT_STABLE
887   /* ohos.opt.stable.0003 for multiple instances ref mutex */
888   g_mutex_unlock (&klass->multi_task_context.multiple_mutex);
889 #endif
890 
891   GSTCURL_FUNCTION_EXIT (src);
892 }
893 
894 static void
gst_curl_http_src_finalize(GObject * obj)895 gst_curl_http_src_finalize (GObject * obj)
896 {
897   GstCurlHttpSrc *src = GST_CURLHTTPSRC (obj);
898 
899   GSTCURL_FUNCTION_ENTRY (src);
900 
901   /* Cleanup all memory allocated */
902   gst_curl_http_src_cleanup_instance (src);
903 
904   GSTCURL_FUNCTION_EXIT (src);
905 
906   /* Chain up to parent class */
907   G_OBJECT_CLASS (gst_curl_http_src_parent_class)->finalize (obj);
908 }
909 
910 #ifdef OHOS_EXT_FUNC
911 /* ohos.ext.func.0025 for seek */
912 static void
gst_curl_http_src_handle_seek(GstCurlHttpSrc * src)913 gst_curl_http_src_handle_seek (GstCurlHttpSrc * src)
914 {
915   if (src->curl_handle == NULL) {
916     GST_INFO_OBJECT (src, "parameter is invalid");
917     return;
918   }
919 
920   g_mutex_lock (&src->buffer_mutex);
921   if (src->request_position == src->read_position) {
922 #ifdef OHOS_OPT_COMPAT
923     /* ohos.opt.compat.0044 */
924     GST_INFO_OBJECT (src, "request_position is equal to read_position, req = %"
925       G_GUINT64_FORMAT, src->request_position);
926 #endif
927     /* not seek, just return */
928     g_mutex_unlock (&src->buffer_mutex);
929     return;
930   }
931   g_mutex_unlock (&src->buffer_mutex);
932 
933   gst_curl_http_src_wait_until_removed(src);
934 
935   g_mutex_lock (&src->buffer_mutex);
936   src->state = GSTCURL_NONE;
937   src->transfer_begun = FALSE;
938   src->status_code = 0;
939   if (src->reason_phrase != NULL) {
940     g_free (src->reason_phrase);
941     src->reason_phrase = NULL;
942   }
943   src->hdrs_updated = FALSE;
944   gst_curl_http_src_destroy_easy_handle (src);
945 
946   if (src->buffer_len > 0) {
947     g_free (src->buffer);
948     src->buffer = NULL;
949     src->buffer_len = 0;
950   }
951   g_mutex_unlock (&src->buffer_mutex);
952 
953   GST_INFO_OBJECT (src, "seek_begin: curl handle removed, req_pos:%" G_GUINT64_FORMAT ", read_pos:%" G_GUINT64_FORMAT,
954     src->request_position, src->read_position);
955 }
956 #endif
957 
958 /*
959  * Do the transfer. If the transfer hasn't begun yet, start a new curl handle
960  * and pass it to the multi queue to be operated on. Then wait for any blocks
961  * of data and push them to the source pad.
962  */
963 static GstFlowReturn
gst_curl_http_src_create(GstPushSrc * psrc,GstBuffer ** outbuf)964 gst_curl_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
965 {
966   GstFlowReturn ret;
967   GstCurlHttpSrc *src = GST_CURLHTTPSRC (psrc);
968   GstCurlHttpSrcClass *klass;
969   GstStructure *empty_headers;
970   GstBaseSrc *basesrc;
971 
972   GSTCURL_FUNCTION_ENTRY (src);
973 
974   klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
975       GstCurlHttpSrcClass);
976   basesrc = GST_BASE_SRC_CAST (src);
977 
978 retry:
979   ret = GST_FLOW_OK;
980 
981 #ifdef OHOS_EXT_FUNC
982   /* ohos.ext.func.0025 for seek */
983   gst_curl_http_src_handle_seek(src);
984 #endif
985 
986   /* NOTE: when both the buffer_mutex and multi_task_context.mutex are
987      needed, multi_task_context.mutex must be acquired first */
988   g_mutex_lock (&klass->multi_task_context.mutex);
989   g_mutex_lock (&src->buffer_mutex);
990   if (src->state == GSTCURL_UNLOCK) {
991     ret = GST_FLOW_FLUSHING;
992     goto escape;
993   }
994 
995   if (!src->transfer_begun) {
996     GST_DEBUG_OBJECT (src, "Starting new request for URI %s", src->uri);
997     /* Create the Easy Handle and set up the session. */
998     src->curl_handle = gst_curl_http_src_create_easy_handle (src);
999     if (src->curl_handle == NULL) {
1000       ret = GST_FLOW_ERROR;
1001       goto escape;
1002     }
1003 
1004     if (gst_curl_http_src_add_queue_item (&klass->multi_task_context.queue, src)
1005         == FALSE) {
1006       GST_ERROR_OBJECT (src, "Couldn't create new queue item! Aborting...");
1007       ret = GST_FLOW_ERROR;
1008       goto escape;
1009     }
1010     /* Signal the worker thread */
1011     g_cond_signal (&klass->multi_task_context.signal);
1012 
1013     src->state = GSTCURL_OK;
1014     src->transfer_begun = TRUE;
1015     src->data_received = FALSE;
1016 
1017     GST_DEBUG_OBJECT (src, "Submitted request for URI %s to curl", src->uri);
1018 
1019     if (src->http_headers != NULL) {
1020       gst_structure_free (src->http_headers);
1021     }
1022     empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME);
1023     src->http_headers = gst_structure_new (HTTP_HEADERS_NAME,
1024         URI_NAME, G_TYPE_STRING, src->uri,
1025         REQUEST_HEADERS_NAME, GST_TYPE_STRUCTURE, src->request_headers,
1026         RESPONSE_HEADERS_NAME, GST_TYPE_STRUCTURE, empty_headers, NULL);
1027     gst_structure_free (empty_headers);
1028     GST_INFO_OBJECT (src, "Created a new headers object");
1029   }
1030 
1031   g_mutex_unlock (&klass->multi_task_context.mutex);
1032 
1033   /* Wait for data to become available, then punt it downstream */
1034   while ((src->buffer_len == 0) && (src->state == GSTCURL_OK)
1035       && (src->connection_status == GSTCURL_CONNECTED)) {
1036     g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
1037   }
1038 
1039   if (src->state == GSTCURL_UNLOCK) {
1040     if (src->buffer_len > 0) {
1041       g_free (src->buffer);
1042       src->buffer = NULL;
1043       src->buffer_len = 0;
1044     }
1045     g_mutex_unlock (&src->buffer_mutex);
1046     return GST_FLOW_FLUSHING;
1047   }
1048 
1049   ret = gst_curl_http_src_handle_response (src);
1050   switch (ret) {
1051     case GST_FLOW_ERROR:
1052       /* Don't attempt a retry, just bomb out */
1053       g_mutex_unlock (&src->buffer_mutex);
1054       return ret;
1055     case GST_FLOW_CUSTOM_ERROR:
1056       if (src->data_received == TRUE) {
1057         /*
1058          * If data has already been received, we can't recall previously sent
1059          * buffers so don't attempt a retry in this case.
1060          *
1061          * TODO: Remember the position we got to, and make a range request for
1062          * the resource without the bit we've already received?
1063          */
1064         GST_WARNING_OBJECT (src,
1065             "Failed mid-transfer, can't continue for URI %s", src->uri);
1066         g_mutex_unlock (&src->buffer_mutex);
1067         return GST_FLOW_ERROR;
1068       }
1069       src->retries_remaining--;
1070       if (src->retries_remaining == 0) {
1071         GST_WARNING_OBJECT (src, "Out of retries for URI %s", src->uri);
1072         g_mutex_unlock (&src->buffer_mutex);
1073         return GST_FLOW_ERROR;  /* Don't attempt a retry, just bomb out */
1074       }
1075       GST_INFO_OBJECT (src, "Attempting retry for URI %s", src->uri);
1076       src->state = GSTCURL_NONE;
1077       src->transfer_begun = FALSE;
1078       src->status_code = 0;
1079       g_free (src->reason_phrase);
1080       src->reason_phrase = NULL;
1081       src->hdrs_updated = FALSE;
1082       if (src->http_headers != NULL) {
1083         gst_structure_free (src->http_headers);
1084         src->http_headers = NULL;
1085         GST_INFO_OBJECT (src, "NULL'd the headers");
1086       }
1087       gst_curl_http_src_destroy_easy_handle (src);
1088       g_mutex_unlock (&src->buffer_mutex);
1089       goto retry;               /* Attempt a retry! */
1090     default:
1091       break;
1092   }
1093 
1094   if (((src->state == GSTCURL_OK) || (src->state == GSTCURL_DONE)) &&
1095       (src->buffer_len > 0)) {
1096 
1097     GST_DEBUG_OBJECT (src, "Pushing %u bytes of transfer for URI %s to pad",
1098         src->buffer_len, src->uri);
1099     *outbuf = gst_buffer_new_allocate (NULL, src->buffer_len, NULL);
1100     gst_buffer_fill (*outbuf, 0, src->buffer, src->buffer_len);
1101     GST_BUFFER_OFFSET (*outbuf) = basesrc->segment.position;
1102 
1103     g_free (src->buffer);
1104     src->buffer = NULL;
1105     src->buffer_len = 0;
1106     src->data_received = TRUE;
1107 
1108     /* ret should still be GST_FLOW_OK */
1109   } else if ((src->state == GSTCURL_DONE) && (src->buffer_len == 0)) {
1110     GST_INFO_OBJECT (src, "Full body received, signalling EOS for URI %s.",
1111         src->uri);
1112     src->state = GSTCURL_NONE;
1113     src->transfer_begun = FALSE;
1114     src->status_code = 0;
1115     g_free (src->reason_phrase);
1116     src->reason_phrase = NULL;
1117     src->hdrs_updated = FALSE;
1118     gst_curl_http_src_destroy_easy_handle (src);
1119     ret = GST_FLOW_EOS;
1120   } else {
1121     switch (src->state) {
1122       case GSTCURL_NONE:
1123         GST_WARNING_OBJECT (src, "Got unexpected GSTCURL_NONE state!");
1124         break;
1125       case GSTCURL_REMOVED:
1126         GST_WARNING_OBJECT (src, "Transfer got removed from the curl queue");
1127         ret = GST_FLOW_EOS;
1128         break;
1129       case GSTCURL_BAD_QUEUE_REQUEST:
1130         GST_ERROR_OBJECT (src, "Bad Queue Request!");
1131         ret = GST_FLOW_ERROR;
1132         break;
1133       case GSTCURL_TOTAL_ERROR:
1134         GST_ERROR_OBJECT (src, "Critical, unrecoverable error!");
1135         ret = GST_FLOW_ERROR;
1136         break;
1137       case GSTCURL_PIPELINE_NULL:
1138         GST_ERROR_OBJECT (src, "Pipeline null");
1139         break;
1140       default:
1141         GST_ERROR_OBJECT (src, "Unknown state of %u", src->state);
1142     }
1143   }
1144   g_mutex_unlock (&src->buffer_mutex);
1145   GSTCURL_FUNCTION_EXIT (src);
1146   return ret;
1147 
1148 escape:
1149   g_mutex_unlock (&src->buffer_mutex);
1150   g_mutex_unlock (&klass->multi_task_context.mutex);
1151 
1152   GSTCURL_FUNCTION_EXIT (src);
1153   return ret;
1154 }
1155 
1156 /*
1157  * Convert header from a GstStructure type to a curl_slist type that curl will
1158  * understand.
1159  */
1160 static gboolean
_headers_to_curl_slist(GQuark field_id,const GValue * value,gpointer ptr)1161 _headers_to_curl_slist (GQuark field_id, const GValue * value, gpointer ptr)
1162 {
1163   gchar *field;
1164   struct curl_slist **p_slist = ptr;
1165 
1166   field = g_strdup_printf ("%s: %s", g_quark_to_string (field_id),
1167       g_value_get_string (value));
1168 
1169   *p_slist = curl_slist_append (*p_slist, field);
1170 
1171   g_free (field);
1172 
1173   return TRUE;
1174 }
1175 
1176 /*
1177  * From the data in the queue element s, create a CURL easy handle and populate
1178  * options with the URL, proxy data, login options, cookies,
1179  */
1180 static CURL *
gst_curl_http_src_create_easy_handle(GstCurlHttpSrc * s)1181 gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s)
1182 {
1183   CURL *handle;
1184   gint i;
1185   GSTCURL_FUNCTION_ENTRY (s);
1186 
1187   /* This is mandatory and yet not default option, so if this is NULL
1188    * then something very bad is going on. */
1189   if (s->uri == NULL) {
1190     GST_ERROR_OBJECT (s, "No URI for curl!");
1191     return NULL;
1192   }
1193 
1194   handle = curl_easy_init ();
1195   if (handle == NULL) {
1196     GST_ERROR_OBJECT (s, "Couldn't init a curl easy handle!");
1197     return NULL;
1198   }
1199   GST_INFO_OBJECT (s, "Creating a new handle for URI %s", s->uri);
1200 
1201 #ifndef GST_DISABLE_GST_DEBUG
1202   if (curl_easy_setopt (handle, CURLOPT_VERBOSE, 1) != CURLE_OK) {
1203     GST_WARNING_OBJECT (s, "Failed to set verbose!");
1204   }
1205   if (curl_easy_setopt (handle, CURLOPT_DEBUGDATA, s) != CURLE_OK) {
1206     GST_WARNING_OBJECT (s, "Failed to set debug user_data!");
1207   }
1208   if (curl_easy_setopt (handle, CURLOPT_DEBUGFUNCTION,
1209           gst_curl_http_src_get_debug) != CURLE_OK) {
1210     GST_WARNING_OBJECT (s, "Failed to set debug function!");
1211   }
1212 #endif
1213 
1214   gst_curl_setopt_str (s, handle, CURLOPT_URL, s->uri);
1215   gst_curl_setopt_str (s, handle, CURLOPT_USERNAME, s->username);
1216   gst_curl_setopt_str (s, handle, CURLOPT_PASSWORD, s->password);
1217   gst_curl_setopt_str (s, handle, CURLOPT_PROXY, s->proxy_uri);
1218   gst_curl_setopt_str (s, handle, CURLOPT_NOPROXY, s->no_proxy_list);
1219   gst_curl_setopt_str (s, handle, CURLOPT_PROXYUSERNAME, s->proxy_user);
1220   gst_curl_setopt_str (s, handle, CURLOPT_PROXYPASSWORD, s->proxy_pass);
1221 
1222   for (i = 0; i < s->number_cookies; i++) {
1223     gst_curl_setopt_str (s, handle, CURLOPT_COOKIELIST, s->cookies[i]);
1224   }
1225 
1226   /* curl_slist_append dynamically allocates memory, but I need to free it */
1227   if (s->request_headers != NULL) {
1228     gst_structure_foreach (s->request_headers, _headers_to_curl_slist,
1229         &s->slist);
1230     if (curl_easy_setopt (handle, CURLOPT_HTTPHEADER, s->slist) != CURLE_OK) {
1231       GST_WARNING_OBJECT (s, "Failed to set HTTP headers!");
1232     }
1233   }
1234 
1235   gst_curl_setopt_str_default (s, handle, CURLOPT_USERAGENT, s->user_agent);
1236 
1237   /*
1238    * Unlike soup, this isn't a binary op, curl wants a string here. So if it's
1239    * TRUE, simply set the value as an empty string as this allows both gzip and
1240    * zlib compression methods.
1241    */
1242   if (s->accept_compressed_encodings == TRUE) {
1243     gst_curl_setopt_str (s, handle, CURLOPT_ACCEPT_ENCODING, "");
1244   } else {
1245     gst_curl_setopt_str (s, handle, CURLOPT_ACCEPT_ENCODING, "identity");
1246   }
1247 
1248   gst_curl_setopt_int (s, handle, CURLOPT_FOLLOWLOCATION,
1249       s->allow_3xx_redirect);
1250   gst_curl_setopt_int_default (s, handle, CURLOPT_MAXREDIRS,
1251       s->max_3xx_redirects);
1252   gst_curl_setopt_bool (s, handle, CURLOPT_TCP_KEEPALIVE, s->keep_alive);
1253   gst_curl_setopt_int (s, handle, CURLOPT_TIMEOUT, s->timeout_secs);
1254   gst_curl_setopt_bool (s, handle, CURLOPT_SSL_VERIFYPEER, s->strict_ssl);
1255   gst_curl_setopt_str (s, handle, CURLOPT_CAINFO, s->custom_ca_file);
1256 
1257 #ifdef OHOS_EXT_FUNC
1258   /* ohos.ext.func.0025 for seek */
1259   if (s->request_position > 0 || s->stop_position > 0) {
1260     gchar *range;
1261     if (s->stop_position < 1) {
1262       /* start specified, no end specified */
1263       range = g_strdup_printf ("%" G_GUINT64_FORMAT "-", s->request_position);
1264     } else {
1265       /* in GStreamer the end position indicates the first byte that is not
1266          in the range, whereas in HTTP the Content-Range header includes the
1267          byte listed in the end value */
1268       range = g_strdup_printf ("%" G_GUINT64_FORMAT "-%" G_GINT64_FORMAT,
1269           s->request_position, s->stop_position - 1);
1270     }
1271     s->orig_request_pos = s->request_position;
1272     GST_TRACE_OBJECT (s, "Requesting range: %s", range);
1273     curl_easy_setopt (handle, CURLOPT_RANGE, range);
1274     g_free (range);
1275   }
1276   s->read_position = s->request_position;
1277 #else
1278   if (s->request_position || s->stop_position > 0) {
1279     gchar *range;
1280     if (s->stop_position < 1) {
1281       /* start specified, no end specified */
1282       range = g_strdup_printf ("%" G_GINT64_FORMAT "-", s->request_position);
1283     } else {
1284       /* in GStreamer the end position indicates the first byte that is not
1285          in the range, whereas in HTTP the Content-Range header includes the
1286          byte listed in the end value */
1287       range = g_strdup_printf ("%" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1288           s->request_position, s->stop_position - 1);
1289     }
1290     GST_TRACE_OBJECT (s, "Requesting range: %s", range);
1291     curl_easy_setopt (handle, CURLOPT_RANGE, range);
1292     g_free (range);
1293   }
1294 #endif
1295 
1296   switch (s->preferred_http_version) {
1297     case GSTCURL_HTTP_VERSION_1_0:
1298       GST_DEBUG_OBJECT (s, "Setting version as HTTP/1.0");
1299       gst_curl_setopt_int (s, handle, CURLOPT_HTTP_VERSION,
1300           CURL_HTTP_VERSION_1_0);
1301       break;
1302     case GSTCURL_HTTP_VERSION_1_1:
1303       GST_DEBUG_OBJECT (s, "Setting version as HTTP/1.1");
1304       gst_curl_setopt_int (s, handle, CURLOPT_HTTP_VERSION,
1305           CURL_HTTP_VERSION_1_1);
1306       break;
1307 #ifdef CURL_VERSION_HTTP2
1308     case GSTCURL_HTTP_VERSION_2_0:
1309       GST_DEBUG_OBJECT (s, "Setting version as HTTP/2.0");
1310       if (curl_easy_setopt (handle, CURLOPT_HTTP_VERSION,
1311               CURL_HTTP_VERSION_2_0) != CURLE_OK) {
1312         if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
1313           GST_WARNING_OBJECT (s,
1314               "Cannot set unsupported option CURLOPT_HTTP_VERSION");
1315         } else {
1316           GST_INFO_OBJECT (s, "HTTP/2 unsupported by libcurl at this time");
1317         }
1318       }
1319       break;
1320 #endif
1321     default:
1322       GST_WARNING_OBJECT (s,
1323           "Supplied a bogus HTTP version, using curl default!");
1324   }
1325 
1326   gst_curl_setopt_generic (s, handle, CURLOPT_HEADERFUNCTION,
1327       gst_curl_http_src_get_header);
1328   gst_curl_setopt_str (s, handle, CURLOPT_HEADERDATA, s);
1329   gst_curl_setopt_generic (s, handle, CURLOPT_WRITEFUNCTION,
1330       gst_curl_http_src_get_chunks);
1331   gst_curl_setopt_str (s, handle, CURLOPT_WRITEDATA, s);
1332 
1333   gst_curl_setopt_str (s, handle, CURLOPT_ERRORBUFFER, s->curl_errbuf);
1334 
1335   GSTCURL_FUNCTION_EXIT (s);
1336   return handle;
1337 }
1338 
1339 /*
1340  * Check the return type from the curl transfer. If it was okay, then deal with
1341  * any headers that were received. Headers should only be dealt with once - but
1342  * we might get a second set if there are trailing headers (RFC7230 Section 4.4)
1343  */
1344 static GstFlowReturn
gst_curl_http_src_handle_response(GstCurlHttpSrc * src)1345 gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
1346 {
1347   glong curl_info_long;
1348   gdouble curl_info_dbl;
1349   curl_off_t curl_info_offt;
1350   gchar *redirect_url;
1351   GstBaseSrc *basesrc;
1352   const GValue *response_headers;
1353   GstFlowReturn ret = GST_FLOW_OK;
1354 
1355   GSTCURL_FUNCTION_ENTRY (src);
1356 
1357   GST_TRACE_OBJECT (src, "status code: %d (%s), curl return code %d",
1358       src->status_code, src->reason_phrase, src->curl_result);
1359 
1360   /* Check the curl result code first - anything not 0 is probably a failure */
1361   if (src->curl_result != 0) {
1362     GST_WARNING_OBJECT (src, "Curl failed the transfer (%d): %s",
1363         src->curl_result, curl_easy_strerror (src->curl_result));
1364     GST_DEBUG_OBJECT (src, "Reason for curl failure: %s", src->curl_errbuf);
1365     return GST_FLOW_ERROR;
1366   }
1367 
1368   /*
1369    * What response code do we have?
1370    */
1371   if (src->status_code >= 400) {
1372     GST_WARNING_OBJECT (src, "Transfer for URI %s returned error status %u",
1373         src->uri, src->status_code);
1374     src->retries_remaining = 0;
1375 #ifdef OHOS_OPT_COMPAT
1376     /**
1377      * ohos.opt.compat.0032
1378      * Fix seek failed when request position from gstqueue2 is the end of the file.
1379      * A 416 response is not actually an error when the file is already completely downloaded
1380      * and the request position is the end of the file.
1381      */
1382     if (src->status_code == 416 && src->content_size > 0) {
1383       return GST_FLOW_OK;
1384     }
1385 #endif
1386     CURL_HTTP_SRC_ERROR (src, RESOURCE, NOT_FOUND, (src->reason_phrase));
1387     return GST_FLOW_ERROR;
1388   } else if (src->status_code == 0) {
1389     if (curl_easy_getinfo (src->curl_handle, CURLINFO_TOTAL_TIME,
1390             &curl_info_dbl) != CURLE_OK) {
1391       /* Curl cannot be relied on in this state, so return an error. */
1392       return GST_FLOW_ERROR;
1393     }
1394     if (curl_info_dbl > src->timeout_secs) {
1395       return GST_FLOW_CUSTOM_ERROR;
1396     }
1397 
1398     if (curl_easy_getinfo (src->curl_handle, CURLINFO_OS_ERRNO,
1399             &curl_info_long) != CURLE_OK) {
1400       /* Curl cannot be relied on in this state, so return an error. */
1401       return GST_FLOW_ERROR;
1402 
1403     }
1404 
1405     GST_WARNING_OBJECT (src, "Errno for CONNECT call was %ld (%s)",
1406         curl_info_long, g_strerror ((gint) curl_info_long));
1407 
1408     /* Some of these responses are retry-able, others not. Set the returned
1409      * state to ERROR so we crash out instead of fruitlessly retrying.
1410      */
1411     if (curl_info_long == ECONNREFUSED) {
1412       return GST_FLOW_ERROR;
1413     }
1414     ret = GST_FLOW_CUSTOM_ERROR;
1415   }
1416 
1417 
1418   if (ret == GST_FLOW_CUSTOM_ERROR) {
1419     src->hdrs_updated = FALSE;
1420     GSTCURL_FUNCTION_EXIT (src);
1421     return ret;
1422   }
1423 
1424   /* Only do this once */
1425   if (src->hdrs_updated == FALSE) {
1426     GSTCURL_FUNCTION_EXIT (src);
1427     return GST_FLOW_OK;
1428   }
1429 
1430   /*
1431    * Deal with redirections...
1432    */
1433   if (curl_easy_getinfo (src->curl_handle, CURLINFO_EFFECTIVE_URL,
1434           &redirect_url)
1435       == CURLE_OK) {
1436     size_t lena, lenb;
1437     lena = strlen (src->uri);
1438     lenb = strlen (redirect_url);
1439     if (g_ascii_strncasecmp (src->uri, redirect_url,
1440             (lena > lenb) ? lenb : lena) != 0) {
1441       GST_INFO_OBJECT (src, "Got a redirect to %s, setting as redirect URI",
1442           redirect_url);
1443       src->redirect_uri = g_strdup (redirect_url);
1444       gst_structure_remove_field (src->http_headers, REDIRECT_URI_NAME);
1445       gst_structure_set (src->http_headers, REDIRECT_URI_NAME,
1446           G_TYPE_STRING, redirect_url, NULL);
1447     }
1448   }
1449 
1450   /*
1451    * Push the content length
1452    */
1453   if (curl_easy_getinfo (src->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T,
1454           &curl_info_offt) == CURLE_OK) {
1455     if (curl_info_offt == -1) {
1456       GST_WARNING_OBJECT (src,
1457           "No Content-Length was specified in the response.");
1458       src->seekable = GSTCURL_SEEKABLE_FALSE;
1459     } else {
1460       /* Note that in the case of a range get, Content-Length is the number
1461          of bytes requested, not the total size of the resource */
1462 #ifdef OHOS_EXT_FUNC
1463       /* ohos.ext.func.0025 */
1464       GST_INFO_OBJECT (src, "orig req pos:%" G_GUINT64_FORMAT ", Content-Length was given as %" G_GUINT64_FORMAT,
1465           src->orig_request_pos, curl_info_offt);
1466       if (src->content_size == 0) {
1467         src->content_size = src->orig_request_pos + curl_info_offt;
1468       }
1469       basesrc = GST_BASE_SRC_CAST (src);
1470       basesrc->segment.duration = src->orig_request_pos + curl_info_offt;
1471 #else
1472       GST_INFO_OBJECT (src, "Content-Length was given as %" G_GUINT64_FORMAT,
1473           curl_info_offt);
1474       if (src->content_size == 0) {
1475         src->content_size = src->request_position + curl_info_offt;
1476       }
1477       basesrc = GST_BASE_SRC_CAST (src);
1478       basesrc->segment.duration = src->request_position + curl_info_offt;
1479 #endif
1480       if (src->seekable == GSTCURL_SEEKABLE_UNKNOWN) {
1481         src->seekable = GSTCURL_SEEKABLE_TRUE;
1482       }
1483       gst_element_post_message (GST_ELEMENT (src),
1484           gst_message_new_duration_changed (GST_OBJECT (src)));
1485     }
1486   }
1487 
1488   /*
1489    * Push all the received headers down via a sicky event
1490    */
1491   response_headers = gst_structure_get_value (src->http_headers,
1492       RESPONSE_HEADERS_NAME);
1493   if (gst_structure_n_fields (gst_value_get_structure (response_headers)) > 0) {
1494     GstEvent *hdrs_event;
1495 
1496     gst_element_post_message (GST_ELEMENT_CAST (src),
1497         gst_message_new_element (GST_OBJECT_CAST (src),
1498             gst_structure_copy (src->http_headers)));
1499 
1500     /* gst_event_new_custom takes ownership of our structure */
1501     hdrs_event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY,
1502         gst_structure_copy (src->http_headers));
1503     gst_pad_push_event (GST_BASE_SRC_PAD (src), hdrs_event);
1504     GST_INFO_OBJECT (src, "Pushed headers downstream");
1505   }
1506 
1507   src->hdrs_updated = FALSE;
1508 
1509   GSTCURL_FUNCTION_EXIT (src);
1510 
1511   return ret;
1512 }
1513 
1514 /*
1515  * "Negotiate" capabilities between us and the sink.
1516  * I.e. tell the sink device what data to expect. We can't be told what to send
1517  * unless we implement "only return to me if this type" property. Potential TODO
1518  */
1519 static gboolean
gst_curl_http_src_negotiate_caps(GstCurlHttpSrc * src)1520 gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src)
1521 {
1522   const GValue *response_headers;
1523   const GstStructure *response_struct;
1524 
1525   GST_INFO_OBJECT (src, "Negotiating caps...");
1526   if (src->caps && src->http_headers) {
1527     response_headers =
1528         gst_structure_get_value (src->http_headers, RESPONSE_HEADERS_NAME);
1529     if (!response_headers) {
1530       GST_WARNING_OBJECT (src, "Failed to get %s", RESPONSE_HEADERS_NAME);
1531       return FALSE;
1532     }
1533     response_struct = gst_value_get_structure (response_headers);
1534     if (gst_structure_has_field_typed (response_struct, "content-type",
1535             G_TYPE_STRING)) {
1536       const gchar *content_type =
1537           gst_structure_get_string (response_struct, "content-type");
1538       GST_INFO_OBJECT (src, "Setting caps as Content-Type of %s", content_type);
1539       src->caps = gst_caps_make_writable (src->caps);
1540       gst_caps_set_simple (src->caps, "content-type", G_TYPE_STRING,
1541           content_type, NULL);
1542       if (gst_base_src_set_caps (GST_BASE_SRC (src), src->caps) != TRUE) {
1543         GST_ERROR_OBJECT (src, "Setting caps failed!");
1544         return FALSE;
1545       }
1546     }
1547   } else {
1548     GST_DEBUG_OBJECT (src, "No caps have been set, continue.");
1549   }
1550 
1551   return TRUE;
1552 }
1553 
1554 /*
1555  * Cleanup the CURL easy handle once we're done with it.
1556  */
1557 static inline void
gst_curl_http_src_destroy_easy_handle(GstCurlHttpSrc * src)1558 gst_curl_http_src_destroy_easy_handle (GstCurlHttpSrc * src)
1559 {
1560   /* Thank you Handles, and well done. Well done, mate. */
1561   if (src->curl_handle != NULL) {
1562     curl_easy_cleanup (src->curl_handle);
1563     src->curl_handle = NULL;
1564   }
1565   /* In addition, clean up the curl header slist if it was used. */
1566   if (src->slist != NULL) {
1567     curl_slist_free_all (src->slist);
1568     src->slist = NULL;
1569   }
1570 }
1571 
1572 static GstStateChangeReturn
gst_curl_http_src_change_state(GstElement * element,GstStateChange transition)1573 gst_curl_http_src_change_state (GstElement * element, GstStateChange transition)
1574 {
1575   GstStateChangeReturn ret;
1576   GstCurlHttpSrc *source = GST_CURLHTTPSRC (element);
1577   GSTCURL_FUNCTION_ENTRY (source);
1578 
1579   switch (transition) {
1580     case GST_STATE_CHANGE_NULL_TO_READY:
1581       gst_curl_http_src_ref_multi (source);
1582       break;
1583     case GST_STATE_CHANGE_READY_TO_PAUSED:
1584       if (source->uri == NULL) {
1585         GST_ELEMENT_ERROR (element, RESOURCE, OPEN_READ, (_("No URL set.")),
1586             ("Missing URL"));
1587         return GST_STATE_CHANGE_FAILURE;
1588       }
1589       break;
1590     case GST_STATE_CHANGE_READY_TO_NULL:
1591       GST_DEBUG_OBJECT (source, "Removing from multi_loop queue...");
1592       /* The pipeline has ended, so signal any running request to end
1593          and wait until the multi_loop has stopped using this element */
1594       gst_curl_http_src_wait_until_removed (source);
1595       gst_curl_http_src_unref_multi (source);
1596       break;
1597     default:
1598       break;
1599   }
1600 
1601   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1602 
1603   GSTCURL_FUNCTION_EXIT (source);
1604   return ret;
1605 }
1606 
1607 /*
1608  * Take care of any memory that may be left over from the instance that's now
1609  * closing before we leak it.
1610  */
1611 static void
gst_curl_http_src_cleanup_instance(GstCurlHttpSrc * src)1612 gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src)
1613 {
1614   gint i;
1615   g_mutex_lock (&src->uri_mutex);
1616   g_free (src->uri);
1617   src->uri = NULL;
1618   g_free (src->redirect_uri);
1619   src->redirect_uri = NULL;
1620   g_mutex_unlock (&src->uri_mutex);
1621   g_mutex_clear (&src->uri_mutex);
1622 
1623   g_free (src->proxy_uri);
1624   src->proxy_uri = NULL;
1625   g_free (src->no_proxy_list);
1626   src->no_proxy_list = NULL;
1627   g_free (src->proxy_user);
1628   src->proxy_user = NULL;
1629   g_free (src->proxy_pass);
1630   src->proxy_pass = NULL;
1631 
1632   for (i = 0; i < src->number_cookies; i++) {
1633     g_free (src->cookies[i]);
1634     src->cookies[i] = NULL;
1635   }
1636   g_free (src->cookies);
1637   src->cookies = NULL;
1638 
1639   g_free (src->user_agent);
1640   src->user_agent = NULL;
1641 
1642   g_mutex_clear (&src->buffer_mutex);
1643 
1644   g_cond_clear (&src->buffer_cond);
1645 
1646   g_free (src->buffer);
1647   src->buffer = NULL;
1648 
1649   if (src->request_headers) {
1650     gst_structure_free (src->request_headers);
1651     src->request_headers = NULL;
1652   }
1653   if (src->http_headers != NULL) {
1654     gst_structure_free (src->http_headers);
1655     src->http_headers = NULL;
1656   }
1657   g_free (src->reason_phrase);
1658   src->reason_phrase = NULL;
1659 #ifdef OHOS_OPT_MEMLEAK
1660   /* ohos.opt.memleak.0003 fix memory leak. */
1661   g_free (src->custom_ca_file);
1662   src->custom_ca_file = NULL;
1663 #endif
1664   gst_caps_replace (&src->caps, NULL);
1665 
1666   gst_curl_http_src_destroy_easy_handle (src);
1667 }
1668 
1669 static gboolean
gst_curl_http_src_query(GstBaseSrc * bsrc,GstQuery * query)1670 gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query)
1671 {
1672   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1673   gboolean ret;
1674 #ifdef OHOS_EXT_FUNC
1675   /* ohos.opt.compat.0022 support pull mode in libav demux */
1676   GstSchedulingFlags flags;
1677   gint minsize, maxsize, align;
1678 #endif
1679   GSTCURL_FUNCTION_ENTRY (src);
1680 
1681   switch (GST_QUERY_TYPE (query)) {
1682     case GST_QUERY_URI:
1683       g_mutex_lock (&src->uri_mutex);
1684       gst_query_set_uri (query, src->uri);
1685       if (src->redirect_uri != NULL) {
1686         gst_query_set_uri_redirection (query, src->redirect_uri);
1687       }
1688       g_mutex_unlock (&src->uri_mutex);
1689       ret = TRUE;
1690       break;
1691     default:
1692       ret = GST_BASE_SRC_CLASS (parent_class)->query (bsrc, query);
1693       break;
1694   }
1695 
1696 #ifdef OHOS_EXT_FUNC
1697   /* ohos.opt.compat.0022 support pull mode in libav demux */
1698   switch (GST_QUERY_TYPE (query)) {
1699     case GST_QUERY_SCHEDULING:
1700       gst_query_parse_scheduling (query, &flags, &minsize, &maxsize, &align);
1701       flags |= GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED;
1702 
1703       if (src->seekable) {
1704         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
1705       } else {
1706         flags &= (~GST_SCHEDULING_FLAG_SEEKABLE);
1707       }
1708       GST_INFO_OBJECT (src, "seekable: %d", src->seekable);
1709 
1710       gst_query_set_scheduling (query, flags, minsize, maxsize, align);
1711       break;
1712     default:
1713       break;
1714   }
1715 #endif
1716 
1717   GSTCURL_FUNCTION_EXIT (src);
1718   return ret;
1719 }
1720 
1721 static gboolean
gst_curl_http_src_get_content_length(GstBaseSrc * bsrc,guint64 * size)1722 gst_curl_http_src_get_content_length (GstBaseSrc * bsrc, guint64 * size)
1723 {
1724   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1725   const GValue *response_headers;
1726   gboolean ret = FALSE;
1727 
1728   if (src->http_headers == NULL) {
1729     return FALSE;
1730   }
1731 
1732   response_headers = gst_structure_get_value (src->http_headers,
1733       RESPONSE_HEADERS_NAME);
1734   if (gst_structure_has_field_typed (gst_value_get_structure (response_headers),
1735           "content-length", G_TYPE_STRING)) {
1736     const gchar *content_length =
1737         gst_structure_get_string (gst_value_get_structure (response_headers),
1738         "content-length");
1739     *size = (guint64) g_ascii_strtoull (content_length, NULL, 10);
1740     ret = TRUE;
1741   } else {
1742     GST_DEBUG_OBJECT (src,
1743         "No content length has yet been set, or there was an error!");
1744   }
1745   return ret;
1746 }
1747 
1748 static gboolean
gst_curl_http_src_is_seekable(GstBaseSrc * bsrc)1749 gst_curl_http_src_is_seekable (GstBaseSrc * bsrc)
1750 {
1751   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1752 
1753   /* NOTE: if seekable is UNKNOWN, assume yes */
1754   return src->seekable != GSTCURL_SEEKABLE_FALSE;
1755 }
1756 
1757 static gboolean
gst_curl_http_src_do_seek(GstBaseSrc * bsrc,GstSegment * segment)1758 gst_curl_http_src_do_seek (GstBaseSrc * bsrc, GstSegment * segment)
1759 {
1760   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1761   gboolean ret = TRUE;
1762 
1763   g_mutex_lock (&src->buffer_mutex);
1764 #ifdef OHOS_EXT_FUNC
1765 /* ohos.ext.func.0025 support https seek: */
1766   GST_INFO_OBJECT (src, "do_seek(%" G_GINT64_FORMAT ", %" G_GINT64_FORMAT
1767       ")", segment->start, segment->stop);
1768 #endif
1769   if (src->state == GSTCURL_UNLOCK) {
1770     GST_WARNING_OBJECT (src, "Attempt to seek while unlocked");
1771     ret = FALSE;
1772     goto done;
1773   }
1774   if (src->request_position == segment->start &&
1775       src->stop_position == segment->stop) {
1776 #ifdef OHOS_OPT_COMPAT
1777     /* ohos.opt.compat.0044 */
1778     src->read_position = (guint64)-1;
1779 #endif
1780     GST_DEBUG_OBJECT (src, "Seek to current read/end position");
1781     goto done;
1782   }
1783 
1784   if (src->seekable == GSTCURL_SEEKABLE_FALSE) {
1785     GST_WARNING_OBJECT (src, "Not seekable");
1786     ret = FALSE;
1787     goto done;
1788   }
1789 
1790   if (segment->rate < 0.0 || segment->format != GST_FORMAT_BYTES) {
1791     GST_WARNING_OBJECT (src, "Invalid seek segment");
1792     ret = FALSE;
1793     goto done;
1794   }
1795 
1796   if (src->content_size > 0 && segment->start >= src->content_size) {
1797     GST_WARNING_OBJECT (src,
1798         "Potentially seeking beyond end of file, might EOS immediately");
1799   }
1800 
1801   src->request_position = segment->start;
1802   src->stop_position = segment->stop;
1803 #ifdef OHOS_OPT_COMPAT
1804   /**
1805    * ohos.opt.compat.0044
1806    * Fix seek failed when request_position from gstqueue2 seek event is the same as read_position.
1807    * This case occurs when pulled position from demux jumps back and forth between the boundary of two ranges in gstqueue2,
1808    * which causes requested postion of seek event from gstqueue2 be the same as right boundary of the last range, namely
1809    * current downloaded position(read_position). Thus, reset read position when occurred gst_curl_http_src_do_seek().
1810    */
1811   src->read_position = (guint64)-1;
1812 #endif
1813 done:
1814   g_mutex_unlock (&src->buffer_mutex);
1815   return ret;
1816 }
1817 
1818 static void
gst_curl_http_src_uri_handler_init(gpointer g_iface,gpointer iface_data)1819 gst_curl_http_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1820 {
1821   GstURIHandlerInterface *uri_iface = (GstURIHandlerInterface *) g_iface;
1822 
1823   uri_iface->get_type = gst_curl_http_src_urihandler_get_type;
1824   uri_iface->get_protocols = gst_curl_http_src_urihandler_get_protocols;
1825   uri_iface->get_uri = gst_curl_http_src_urihandler_get_uri;
1826   uri_iface->set_uri = gst_curl_http_src_urihandler_set_uri;
1827 }
1828 
1829 static guint
gst_curl_http_src_urihandler_get_type(GType type)1830 gst_curl_http_src_urihandler_get_type (GType type)
1831 {
1832   return GST_URI_SRC;
1833 }
1834 
1835 static const gchar *const *
gst_curl_http_src_urihandler_get_protocols(GType type)1836 gst_curl_http_src_urihandler_get_protocols (GType type)
1837 {
1838   static const gchar *protocols[] = { "http", "https", NULL };
1839 
1840   return protocols;
1841 }
1842 
1843 static gchar *
gst_curl_http_src_urihandler_get_uri(GstURIHandler * handler)1844 gst_curl_http_src_urihandler_get_uri (GstURIHandler * handler)
1845 {
1846   gchar *ret;
1847   GstCurlHttpSrc *source;
1848 
1849   g_return_val_if_fail (GST_IS_URI_HANDLER (handler), NULL);
1850   source = GST_CURLHTTPSRC (handler);
1851 
1852   GSTCURL_FUNCTION_ENTRY (source);
1853 
1854   g_mutex_lock (&source->uri_mutex);
1855   ret = g_strdup (source->uri);
1856   g_mutex_unlock (&source->uri_mutex);
1857 
1858   GSTCURL_FUNCTION_EXIT (source);
1859   return ret;
1860 }
1861 
1862 static gboolean
gst_curl_http_src_urihandler_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)1863 gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
1864     const gchar * uri, GError ** error)
1865 {
1866   GstCurlHttpSrc *source = GST_CURLHTTPSRC (handler);
1867   GSTCURL_FUNCTION_ENTRY (source);
1868 
1869   g_return_val_if_fail (GST_IS_URI_HANDLER (handler), FALSE);
1870   g_return_val_if_fail (uri != NULL, FALSE);
1871 
1872   g_mutex_lock (&source->uri_mutex);
1873 
1874   if (source->uri != NULL) {
1875     GST_DEBUG_OBJECT (source,
1876         "URI already present as %s, updating to new URI %s", source->uri, uri);
1877     g_free (source->uri);
1878   }
1879 
1880   source->uri = g_strdup (uri);
1881   if (source->uri == NULL) {
1882     g_mutex_unlock (&source->uri_mutex);
1883     return FALSE;
1884   }
1885   source->retries_remaining = source->total_retries;
1886 
1887   g_mutex_unlock (&source->uri_mutex);
1888 
1889   GSTCURL_FUNCTION_EXIT (source);
1890   return TRUE;
1891 }
1892 
1893 /*
1894  * Cancel any currently running transfer, and then signal all the loops to drop
1895  * any received buffers. The ::create() method should return GST_FLOW_FLUSHING.
1896  */
1897 static gboolean
gst_curl_http_src_unlock(GstBaseSrc * bsrc)1898 gst_curl_http_src_unlock (GstBaseSrc * bsrc)
1899 {
1900   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1901   gboolean want_removal = FALSE;
1902 
1903   g_mutex_lock (&src->buffer_mutex);
1904   if (src->state != GSTCURL_UNLOCK) {
1905     if (src->state == GSTCURL_OK) {
1906       /* A transfer is running, cancel it */
1907       if (src->connection_status == GSTCURL_CONNECTED) {
1908         src->connection_status = GSTCURL_WANT_REMOVAL;
1909       }
1910       want_removal = TRUE;
1911     }
1912     src->pending_state = src->state;
1913     src->state = GSTCURL_UNLOCK;
1914   }
1915   g_cond_signal (&src->buffer_cond);
1916   g_mutex_unlock (&src->buffer_mutex);
1917 
1918   if (want_removal) {
1919     GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
1920         GST_TYPE_CURL_HTTP_SRC,
1921         GstCurlHttpSrcClass);
1922     g_mutex_lock (&klass->multi_task_context.mutex);
1923     g_cond_signal (&klass->multi_task_context.signal);
1924     g_mutex_unlock (&klass->multi_task_context.mutex);
1925   }
1926 
1927   return TRUE;
1928 }
1929 
1930 /*
1931  * Finish the unlock request above and return curlhttpsrc to the normal state.
1932  * This will probably be GSTCURL_DONE, and the next return from ::create() will
1933  * be GST_FLOW_EOS as we don't want to deliver parts of a HTTP body.
1934  */
1935 static gboolean
gst_curl_http_src_unlock_stop(GstBaseSrc * bsrc)1936 gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc)
1937 {
1938   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1939 
1940   g_mutex_lock (&src->buffer_mutex);
1941   src->state = src->pending_state;
1942   src->pending_state = GSTCURL_NONE;
1943   g_cond_signal (&src->buffer_cond);
1944   g_mutex_unlock (&src->buffer_mutex);
1945 
1946   return TRUE;
1947 }
1948 
1949 /*****************************************************************************
1950  * Curl loop task functions begin
1951  *****************************************************************************/
1952 static void
gst_curl_http_src_curl_multi_loop(gpointer thread_data)1953 gst_curl_http_src_curl_multi_loop (gpointer thread_data)
1954 {
1955   GstCurlHttpSrcMultiTaskContext *context;
1956   GstCurlHttpSrcQueueElement *qelement, *qnext;
1957   gint i, still_running = 0;
1958   CURLMsg *curl_message;
1959   GstCurlHttpSrc *elt;
1960   guint active = 0;
1961 
1962   context = (GstCurlHttpSrcMultiTaskContext *) thread_data;
1963 
1964   g_mutex_lock (&context->mutex);
1965 
1966   /* Someone is holding a reference to us, but isn't using us so to avoid
1967    * unnecessary clock cycle wasting, sit in a conditional wait until woken.
1968    */
1969   while (context->queue == NULL
1970       && context->state == GSTCURL_MULTI_LOOP_STATE_RUNNING) {
1971     GSTCURL_DEBUG_PRINT ("Waiting for an element to be added...");
1972     g_cond_wait (&context->signal, &context->mutex);
1973     GSTCURL_DEBUG_PRINT ("Received wake up call!");
1974   }
1975   if (context->state == GSTCURL_MULTI_LOOP_STATE_STOP) {
1976     GSTCURL_INFO_PRINT ("Got instruction to shut down");
1977     goto out;
1978   }
1979 
1980   /* check for elements that need to be started or removed */
1981   qelement = context->queue;
1982   while (qelement != NULL) {
1983     qnext = qelement->next;
1984     elt = qelement->p;
1985     /* NOTE: when both the buffer_mutex and multi_task_context.mutex are
1986        needed, multi_task_context.mutex must be acquired first */
1987     g_mutex_lock (&elt->buffer_mutex);
1988     if (elt->connection_status == GSTCURL_WANT_REMOVAL) {
1989       curl_multi_remove_handle (context->multi_handle, elt->curl_handle);
1990       if (elt->state == GSTCURL_UNLOCK) {
1991         elt->pending_state = GSTCURL_REMOVED;
1992       } else {
1993         elt->state = GSTCURL_REMOVED;
1994       }
1995       elt->connection_status = GSTCURL_NOT_CONNECTED;
1996       gst_curl_http_src_remove_queue_item (&context->queue, qelement->p);
1997       g_cond_signal (&elt->buffer_cond);
1998     } else if (elt->connection_status == GSTCURL_CONNECTED) {
1999       active++;
2000       if (g_atomic_int_compare_and_exchange (&qelement->running, 0, 1)) {
2001         GSTCURL_DEBUG_PRINT ("Adding easy handle for URI %s", qelement->p->uri);
2002         curl_multi_add_handle (context->multi_handle, qelement->p->curl_handle);
2003       }
2004     }
2005     g_mutex_unlock (&elt->buffer_mutex);
2006     qelement = qnext;
2007   }
2008 
2009   if (active == 0) {
2010     GSTCURL_DEBUG_PRINT ("No active elements");
2011     goto out;
2012   }
2013 
2014   /* perform a select() on all of the active sockets and process any
2015      messages from curl */
2016   {
2017     struct timeval timeout;
2018     gint rc;
2019     fd_set fdread, fdwrite, fdexcep;
2020     int maxfd = -1;
2021     long curl_timeo = -1;
2022     gboolean cond = FALSE;
2023 
2024     /* Because curl can possibly take some time here, be nice and let go of the
2025      * mutex so other threads can perform state/queue operations as we don't
2026      * care about those until the end of this. */
2027     g_mutex_unlock (&context->mutex);
2028 
2029     FD_ZERO (&fdread);
2030     FD_ZERO (&fdwrite);
2031     FD_ZERO (&fdexcep);
2032 
2033     timeout.tv_sec = 1;
2034     timeout.tv_usec = 0;
2035 
2036     curl_multi_timeout (context->multi_handle, &curl_timeo);
2037     if (curl_timeo >= 0) {
2038       timeout.tv_sec = curl_timeo / 1000;
2039       if (timeout.tv_sec > 1) {
2040         timeout.tv_sec = 1;
2041       } else {
2042         timeout.tv_usec = (curl_timeo % 1000) * 1000;
2043       }
2044     }
2045 
2046     /* get file descriptors from the transfers */
2047     curl_multi_fdset (context->multi_handle, &fdread, &fdwrite, &fdexcep,
2048         &maxfd);
2049 
2050     rc = select (maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);
2051 
2052     switch (rc) {
2053       case -1:
2054         /* select error */
2055         break;
2056       case 0:
2057       default:
2058         /* timeout or readable/writable sockets */
2059         curl_multi_perform (context->multi_handle, &still_running);
2060         break;
2061     }
2062 
2063     g_mutex_lock (&context->mutex);
2064 
2065     /*
2066      * Check the CURL message buffer to find out if any transfers have
2067      * completed. If they have, call the signal_finished function which
2068      * will signal the g_cond_wait call in that calling instance.
2069      */
2070     i = 0;
2071     while (cond != TRUE) {
2072       curl_message = curl_multi_info_read (context->multi_handle, &i);
2073       if (curl_message == NULL) {
2074         cond = TRUE;
2075       } else if (curl_message->msg == CURLMSG_DONE) {
2076         /* A hack, but I have seen curl_message->easy_handle being
2077          * NULL randomly, so check for that. */
2078         if (curl_message->easy_handle != NULL) {
2079           curl_multi_remove_handle (context->multi_handle,
2080               curl_message->easy_handle);
2081           gst_curl_http_src_remove_queue_handle (&context->queue,
2082               curl_message->easy_handle, curl_message->data.result);
2083         }
2084       }
2085     }
2086   }
2087 out:
2088   g_mutex_unlock (&context->mutex);
2089 }
2090 
2091 /*
2092  * Receive headers from the remote server and put them into the http_headers
2093  * structure to be sent downstream when we've got them all and started receiving
2094  * the body (see ::_handle_response())
2095  */
2096 static size_t
gst_curl_http_src_get_header(void * header,size_t size,size_t nmemb,void * src)2097 gst_curl_http_src_get_header (void *header, size_t size, size_t nmemb,
2098     void *src)
2099 {
2100   GstCurlHttpSrc *s = src;
2101   char *substr;
2102 
2103   GST_DEBUG_OBJECT (s, "Received header: %s", (char *) header);
2104 
2105   g_mutex_lock (&s->buffer_mutex);
2106 
2107   if (s->state == GSTCURL_UNLOCK) {
2108     g_mutex_unlock (&s->buffer_mutex);
2109     return size * nmemb;
2110   }
2111 
2112   if (s->http_headers == NULL) {
2113     /* Can't do anything here, so just silently swallow the header */
2114     GST_DEBUG_OBJECT (s, "HTTP Headers Structure has already been sent,"
2115         " ignoring header");
2116     g_mutex_unlock (&s->buffer_mutex);
2117     return size * nmemb;
2118   }
2119 
2120   substr = gst_curl_http_src_strcasestr (header, "HTTP");
2121   if (substr == header) {
2122     /* We have a status line! */
2123     gchar **status_line_fields;
2124 
2125     /* Have we already seen a status line? If so, delete any response headers */
2126     if (s->status_code > 0) {
2127       GstStructure *empty_headers =
2128           gst_structure_new_empty (RESPONSE_HEADERS_NAME);
2129       gst_structure_remove_field (s->http_headers, RESPONSE_HEADERS_NAME);
2130       gst_structure_set (s->http_headers, RESPONSE_HEADERS_NAME,
2131           GST_TYPE_STRUCTURE, empty_headers, NULL);
2132       gst_structure_free (empty_headers);
2133 
2134     }
2135 
2136     /* Process the status line */
2137     status_line_fields = g_strsplit ((gchar *) header, " ", 3);
2138     if (status_line_fields == NULL) {
2139       GST_ERROR_OBJECT (s, "Status line processing failed!");
2140     } else {
2141       s->status_code =
2142           (guint) g_ascii_strtoll (status_line_fields[1], NULL, 10);
2143       g_free (s->reason_phrase);
2144       s->reason_phrase = g_strdup (status_line_fields[2]);
2145       GST_INFO_OBJECT (s, "Received status %u for request for URI %s: %s",
2146           s->status_code, s->uri, s->reason_phrase);
2147       gst_structure_set (s->http_headers, HTTP_STATUS_CODE,
2148           G_TYPE_UINT, s->status_code, NULL);
2149       g_strfreev (status_line_fields);
2150     }
2151   } else {
2152     /* Normal header line */
2153     gchar **header_tpl = g_strsplit ((gchar *) header, ": ", 2);
2154     if (header_tpl == NULL) {
2155       GST_ERROR_OBJECT (s, "Header processing failed! (%s)", (gchar *) header);
2156     } else {
2157       const GValue *gv_resp_hdrs = gst_structure_get_value (s->http_headers,
2158           RESPONSE_HEADERS_NAME);
2159       const GstStructure *response_headers =
2160           gst_value_get_structure (gv_resp_hdrs);
2161       /* Store header key lower case (g_ascii_strdown), makes searching through
2162        * later on easier - end applications shouldn't care, as all HTTP headers
2163        * are case-insensitive */
2164       gchar *header_key = g_ascii_strdown (header_tpl[0], -1);
2165       gchar *header_value;
2166 
2167       /* If header field already exists, append to the end */
2168       if (gst_structure_has_field (response_headers, header_key) == TRUE) {
2169         header_value = g_strdup_printf ("%s, %s",
2170             gst_structure_get_string (response_headers, header_key),
2171             header_tpl[1]);
2172         gst_structure_set ((GstStructure *) response_headers, header_key,
2173             G_TYPE_STRING, header_value, NULL);
2174         g_free (header_value);
2175       } else {
2176         header_value = header_tpl[1];
2177         gst_structure_set ((GstStructure *) response_headers, header_key,
2178             G_TYPE_STRING, header_value, NULL);
2179       }
2180 
2181       /* We have some special cases - deal with them here */
2182       if (g_strcmp0 (header_key, "content-type") == 0) {
2183         gst_curl_http_src_negotiate_caps (src);
2184       } else if (g_strcmp0 (header_key, "accept-ranges") == 0 &&
2185           g_ascii_strcasecmp (header_value, "none") == 0) {
2186         s->seekable = GSTCURL_SEEKABLE_FALSE;
2187       } else if (g_strcmp0 (header_key, "content-range") == 0) {
2188         /* In the case of a Range GET, the Content-Length header will contain
2189            the size of range requested, and the Content-Range header will
2190            have the start, stop and total size of the resource */
2191         gchar *size = strchr (header_value, '/');
2192         if (size) {
2193 #ifdef OHOS_OPT_COMPAT
2194           /* ohos.opt.compat.0032 */
2195           s->content_size = atoi (size + 1);
2196 #else
2197           s->content_size = atoi (size);
2198 #endif
2199         }
2200       }
2201 
2202       g_free (header_key);
2203       g_strfreev (header_tpl);
2204     }
2205   }
2206 
2207   s->hdrs_updated = TRUE;
2208 
2209   g_mutex_unlock (&s->buffer_mutex);
2210 
2211   return size * nmemb;
2212 }
2213 
2214 /*
2215  * My own quick and dirty implementation of strcasestr. This is a GNU extension
2216  * (i.e. not portable) and not always guaranteed to be available.
2217  *
2218  * I know this doesn't work if the haystack and needle are the same size. But
2219  * this isn't necessarily a bad thing, as the only place we currently use this
2220  * is at a point where returning nothing even if a string match occurs but the
2221  * needle is the same size as the haystack actually saves us time.
2222  */
2223 static char *
gst_curl_http_src_strcasestr(const char * haystack,const char * needle)2224 gst_curl_http_src_strcasestr (const char *haystack, const char *needle)
2225 {
2226   int i, j, needle_len;
2227   char *location;
2228 
2229   needle_len = (int) strlen (needle);
2230   i = 0;
2231   j = 0;
2232   location = NULL;
2233 
2234   while (haystack[i] != '\0') {
2235     if (j == needle_len) {
2236       location = (char *) haystack + (i - j);
2237     }
2238     if (tolower (haystack[i]) == tolower (needle[j])) {
2239       j++;
2240     } else {
2241       j = 0;
2242     }
2243     i++;
2244   }
2245 
2246   return location;
2247 }
2248 
2249 #ifdef OHOS_EXT_FUNC
2250 /*
2251  * ohos.ext.func.0025 support https seek:
2252  */
2253 static void
gst_curl_http_src_update_position(GstCurlHttpSrc * src,guint64 bytes_read)2254 gst_curl_http_src_update_position (GstCurlHttpSrc * src, guint64 bytes_read)
2255 {
2256   guint64 new_position;
2257 #ifdef OHOS_OPT_COMPAT
2258   /* ohos.opt.compat.0044 */
2259   if (src->read_position != (guint64)-1 && bytes_read > (G_MAXUINT64 - src->read_position)) {
2260 #else
2261   if (bytes_read > (G_MAXUINT64 - src->read_position)) {
2262 #endif
2263     GST_WARNING_OBJECT (src, "bytes_read:%" G_GUINT64_FORMAT " abnormal, should check, read pos:%" G_GUINT64_FORMAT,
2264       bytes_read, src->read_position);
2265     return;
2266   }
2267 
2268 #ifdef OHOS_OPT_COMPAT
2269   /* ohos.opt.compat.0044 */
2270   new_position = (src->read_position == (guint64)-1) ? bytes_read : (src->read_position + bytes_read);
2271 #else
2272   new_position = src->read_position + bytes_read;
2273 #endif
2274   if (G_LIKELY(src->request_position == src->read_position)) {
2275     src->request_position = new_position;
2276   }
2277   src->read_position = new_position;
2278 
2279   GST_DEBUG_OBJECT (src, "bytes_read:%" G_GUINT64_FORMAT ", req:%" G_GUINT64_FORMAT ", read:%" G_GUINT64_FORMAT,
2280     bytes_read, src->request_position, src->read_position);
2281 }
2282 #endif
2283 
2284 /*
2285  * Receive chunks of the requested body and pass these back to the ::create()
2286  * loop
2287  */
2288 static size_t
2289 gst_curl_http_src_get_chunks (void *chunk, size_t size, size_t nmemb, void *src)
2290 {
2291   GstCurlHttpSrc *s = src;
2292   size_t chunk_len = size * nmemb;
2293   GST_TRACE_OBJECT (s,
2294       "Received curl chunk for URI %s of size %d", s->uri, (int) chunk_len);
2295   g_mutex_lock (&s->buffer_mutex);
2296 
2297 #ifdef OHOS_EXT_FUNC
2298 /* ohos.ext.func.0025 support https seek: */
2299   gst_curl_http_src_update_position(s, (guint64)chunk_len);
2300 #endif
2301 
2302   if (s->state == GSTCURL_UNLOCK) {
2303     g_mutex_unlock (&s->buffer_mutex);
2304     return chunk_len;
2305   }
2306   s->buffer =
2307       g_realloc (s->buffer, (s->buffer_len + chunk_len + 1) * sizeof (char));
2308   if (s->buffer == NULL) {
2309     GST_ERROR_OBJECT (s, "Realloc for cURL response message failed!");
2310     return 0;
2311   }
2312   memcpy (s->buffer + s->buffer_len, chunk, chunk_len);
2313   s->buffer_len += chunk_len;
2314   g_cond_signal (&s->buffer_cond);
2315   g_mutex_unlock (&s->buffer_mutex);
2316   return chunk_len;
2317 }
2318 
2319 /*
2320  * Request a cancellation of a currently running curl handle.
2321  */
2322 static void
2323 gst_curl_http_src_request_remove (GstCurlHttpSrc * src)
2324 {
2325   GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
2326       GST_TYPE_CURL_HTTP_SRC,
2327       GstCurlHttpSrcClass);
2328 
2329   g_mutex_lock (&klass->multi_task_context.mutex);
2330   g_mutex_lock (&src->buffer_mutex);
2331   if (src->connection_status == GSTCURL_CONNECTED) {
2332     src->connection_status = GSTCURL_WANT_REMOVAL;
2333   }
2334   g_mutex_unlock (&src->buffer_mutex);
2335   g_cond_signal (&klass->multi_task_context.signal);
2336   g_mutex_unlock (&klass->multi_task_context.mutex);
2337 }
2338 
2339 /*
2340  * Request a cancellation of a currently running curl handle and
2341  * block this thread until the src element has been removed
2342  * from the queue
2343  */
2344 static void
2345 gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src)
2346 {
2347   gst_curl_http_src_request_remove (src);
2348   g_mutex_lock (&src->buffer_mutex);
2349   while (src->connection_status != GSTCURL_NOT_CONNECTED) {
2350     g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
2351   }
2352   g_mutex_unlock (&src->buffer_mutex);
2353 }
2354 
2355 #ifndef GST_DISABLE_GST_DEBUG
2356 /*
2357  * This callback receives debug information, as specified in the type argument.
2358  * This function must return 0.
2359  */
2360 static int
2361 gst_curl_http_src_get_debug (CURL * handle, curl_infotype type, char *data,
2362     size_t size, void *clientp)
2363 {
2364   GstCurlHttpSrc *src = (GstCurlHttpSrc *) clientp;
2365   gchar *msg = NULL;
2366 
2367   switch (type) {
2368     case CURLINFO_TEXT:
2369     case CURLINFO_HEADER_OUT:
2370       msg = g_memdup2 (data, size);
2371       if (size > 0) {
2372         msg[size - 1] = '\0';
2373         g_strchomp (msg);
2374       }
2375       break;
2376     default:
2377       break;
2378   }
2379 
2380   switch (type) {
2381     case CURLINFO_TEXT:
2382       GST_DEBUG_OBJECT (src, "%s", msg);
2383       break;
2384     case CURLINFO_HEADER_OUT:
2385       GST_DEBUG_OBJECT (src, "outgoing header: %s", msg);
2386       break;
2387     case CURLINFO_DATA_IN:
2388       GST_MEMDUMP_OBJECT (src, "incoming data", (guint8 *) data, size);
2389       break;
2390     case CURLINFO_DATA_OUT:
2391       GST_MEMDUMP_OBJECT (src, "outgoing data", (guint8 *) data, size);
2392       break;
2393     case CURLINFO_SSL_DATA_IN:
2394       GST_MEMDUMP_OBJECT (src, "incoming ssl data", (guint8 *) data, size);
2395       break;
2396     case CURLINFO_SSL_DATA_OUT:
2397       GST_MEMDUMP_OBJECT (src, "outgoing ssl data", (guint8 *) data, size);
2398       break;
2399     default:
2400       GST_DEBUG_OBJECT (src, "unknown debug info type %d", type);
2401       GST_MEMDUMP_OBJECT (src, "unknown data", (guint8 *) data, size);
2402       break;
2403   }
2404   g_free (msg);
2405   return 0;
2406 }
2407 #endif
2408