1 /*
2 * GstCurlHttpSrc
3 * Copyright 2017 British Broadcasting Corporation - Research and Development
4 *
5 * Author: Sam Hurst <samuelh@rd.bbc.co.uk>
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a
8 * copy of this software and associated documentation files (the "Software"),
9 * to deal in the Software without restriction, including without limitation
10 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11 * and/or sell copies of the Software, and to permit persons to whom the
12 * Software is furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 * DEALINGS IN THE SOFTWARE.
24 *
25 * Alternatively, the contents of this file may be used under the
26 * GNU Lesser General Public License Version 2.1 (the "LGPL"), in
27 * which case the following provisions apply instead of the ones
28 * mentioned above:
29 *
30 * This library is free software; you can redistribute it and/or
31 * modify it under the terms of the GNU Library General Public
32 * License as published by the Free Software Foundation; either
33 * version 2 of the License, or (at your option) any later version.
34 *
35 * This library is distributed in the hope that it will be useful,
36 * but WITHOUT ANY WARRANTY; without even the implied warranty of
37 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
38 * Library General Public License for more details.
39 *
40 * You should have received a copy of the GNU Library General Public
41 * License along with this library; if not, write to the
42 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
43 * Boston, MA 02111-1307, USA.
44 */
45
46 /**
47 * SECTION:element-curlhttpsrc
48 *
49 * This plugin reads data from a remote location specified by a URI, when the
50 * protocol is 'http' or 'https'.
51 *
52 * It is based on the cURL project (http://curl.haxx.se/) and is specifically
53 * designed to be also used with nghttp2 (http://nghttp2.org) to enable HTTP/2
54 * support for GStreamer. Your libcurl library MUST be compiled against nghttp2
55 * for HTTP/2 support for this functionality. HTTPS support is dependent on
56 * cURL being built with SSL support (OpenSSL/PolarSSL/NSS/GnuTLS).
57 *
58 * An HTTP proxy must be specified by URL.
59 * If the "http_proxy" environment variable is set, its value is used.
60 * The #GstCurlHttpSrc:proxy property can be used to override the default.
61 *
62 * ## Example launch line
63 *
64 * |[
65 * gst-launch-1.0 curlhttpsrc location=http://127.0.1.1/index.html ! fakesink dump=1
66 * ]| The above pipeline reads a web page from the local machine using HTTP and
67 * dumps it to stdout.
68 * |[
69 * gst-launch-1.0 playbin uri=http://rdmedia.bbc.co.uk/dash/testmpds/multiperiod/bbb.php
70 * ]| The above pipeline will start up a DASH streaming session from the given
71 * MPD file. This requires GStreamer to have been built with dashdemux from
72 * gst-plugins-bad.
73 */
74
75 /*
76 * Thread safety notes.
77 *
78 * GstCurlHttpSrc uses a single thread running the
79 * gst_curl_http_src_curl_multi_loop() function to handle receiving
80 * data and messages from libcurl. Each instance of GstCurlHttpSrc adds
81 * an entry into a queue in GstCurlHttpSrcMultiTaskContext and waits
82 * for the multi_loop to perform the HTTP request.
83 *
84 * When an instance of GstCurlHttpSrc wants to make a request (i.e.
85 * it has moved to the PLAYING state) it adds itself to the
86 * multi_task_context.queue list and signals the multi_loop task.
87 *
88 * Each instance of GstCurlHttpSrc uses buffer_mutex and buffer_cond
89 * to wait for gst_curl_http_src_curl_multi_loop() to perform the
90 * request and signal completion.
91 *
92 * Each instance of GstCurlHttpSrc is protected by the mutexes:
93 * 1. uri_mutex
94 * 2. buffer_mutex
95 *
96 * uri_mutex is used to protect access to the uri field.
97 *
98 * buffer_mutex is used to protect access to buffer_cond, state and
99 * connection_status.
100 *
101 * The gst_curl_http_src_curl_multi_loop() function uses the mutexes:
102 * 1. multi_task_context.task_rec_mutex
103 * 2. multi_task_context.mutex
104 *
105 * multi_task_context.task_rec_mutex is only used by GstTask.
106 *
107 * multi_task_context.mutex is used to protect access to queue and state
108 *
109 * To avoid deadlock, it is vital that if both multi_task_context.mutex
110 * and buffer_mutex are required, that they are locked in the order:
111 * 1. multi_task_context.mutex
112 * 2. buffer_mutex
113 */
114
115 #ifdef HAVE_CONFIG_H
116 #include <config.h>
117 #endif
118
119 #include <gst/gst-i18n-plugin.h>
120
121 #include "gstcurlelements.h"
122 #include "gstcurlhttpsrc.h"
123 #include "gstcurlqueue.h"
124 #include "gstcurldefaults.h"
125
126 GST_DEBUG_CATEGORY_STATIC (gst_curl_http_src_debug);
127 #define GST_CAT_DEFAULT gst_curl_http_src_debug
128 GST_DEBUG_CATEGORY_STATIC (gst_curl_loop_debug);
129
130 #define CURL_HTTP_SRC_ERROR(src,cat,code,error_message) \
131 do { \
132 GST_ELEMENT_ERROR_WITH_DETAILS ((src), cat, code, ("%s", error_message), \
133 ("%s (%d), URL: %s, Redirect to: %s", (src)->reason_phrase, \
134 (src)->status_code, (src)->uri, GST_STR_NULL ((src)->redirect_uri)), \
135 ("http-status-code", G_TYPE_UINT, (src)->status_code, \
136 "http-redirect-uri", G_TYPE_STRING, GST_STR_NULL ((src)->redirect_uri), NULL)); \
137 } while(0)
138
139 enum
140 {
141 PROP_0,
142 PROP_URI,
143 PROP_USERNAME,
144 PROP_PASSWORD,
145 PROP_PROXYURI,
146 PROP_PROXYUSERNAME,
147 PROP_PROXYPASSWORD,
148 PROP_COOKIES,
149 PROP_USERAGENT,
150 PROP_HEADERS,
151 PROP_COMPRESS,
152 PROP_REDIRECT,
153 PROP_MAXREDIRECT,
154 PROP_KEEPALIVE,
155 PROP_TIMEOUT,
156 PROP_STRICT_SSL,
157 PROP_SSL_CA_FILE,
158 PROP_RETRIES,
159 PROP_CONNECTIONMAXTIME,
160 PROP_MAXCONCURRENT_SERVER,
161 PROP_MAXCONCURRENT_PROXY,
162 PROP_MAXCONCURRENT_GLOBAL,
163 PROP_HTTPVERSION,
164 PROP_IRADIO_MODE,
165 #ifdef OHOS_EXT_FUNC
166 // ohos.ext.func.0033
167 PROP_RECONNECTION_TIMEOUT,
168 PROP_STATE_CHANGE,
169 #endif
170 PROP_MAX
171 };
172
173 /*
174 * Make a source pad template to be able to kick out recv'd data
175 */
176 static GstStaticPadTemplate srcpadtemplate = GST_STATIC_PAD_TEMPLATE ("src",
177 GST_PAD_SRC,
178 GST_PAD_ALWAYS,
179 GST_STATIC_CAPS_ANY);
180
181 /*
182 * Function Definitions
183 */
184 /* Gstreamer generic element functions */
185 static void gst_curl_http_src_set_property (GObject * object, guint prop_id,
186 const GValue * value, GParamSpec * pspec);
187 static void gst_curl_http_src_get_property (GObject * object, guint prop_id,
188 GValue * value, GParamSpec * pspec);
189 static void gst_curl_http_src_ref_multi (GstCurlHttpSrc * src);
190 static void gst_curl_http_src_unref_multi (GstCurlHttpSrc * src);
191 static void gst_curl_http_src_finalize (GObject * obj);
192 static GstFlowReturn gst_curl_http_src_create (GstPushSrc * psrc,
193 GstBuffer ** outbuf);
194 static GstFlowReturn gst_curl_http_src_handle_response (GstCurlHttpSrc * src);
195 static gboolean gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src);
196 static GstStateChangeReturn gst_curl_http_src_change_state (GstElement *
197 element, GstStateChange transition);
198 static void gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src);
199 static gboolean gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query);
200 static gboolean gst_curl_http_src_get_content_length (GstBaseSrc * bsrc,
201 guint64 * size);
202 static gboolean gst_curl_http_src_is_seekable (GstBaseSrc * bsrc);
203 static gboolean gst_curl_http_src_do_seek (GstBaseSrc * bsrc,
204 GstSegment * segment);
205 static gboolean gst_curl_http_src_unlock (GstBaseSrc * bsrc);
206 static gboolean gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc);
207
208 /* URI Handler functions */
209 static void gst_curl_http_src_uri_handler_init (gpointer g_iface,
210 gpointer iface_data);
211 static guint gst_curl_http_src_urihandler_get_type (GType type);
212 static const gchar *const *gst_curl_http_src_urihandler_get_protocols (GType
213 type);
214 static gchar *gst_curl_http_src_urihandler_get_uri (GstURIHandler * handler);
215 static gboolean gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
216 const gchar * uri, GError ** error);
217
218 /* GstTask functions */
219 static void gst_curl_http_src_curl_multi_loop (gpointer thread_data);
220 static CURL *gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s);
221 static inline void gst_curl_http_src_destroy_easy_handle (GstCurlHttpSrc * src);
222 static size_t gst_curl_http_src_get_header (void *header, size_t size,
223 size_t nmemb, void *src);
224 static size_t gst_curl_http_src_get_chunks (void *chunk, size_t size,
225 size_t nmemb, void *src);
226 static void gst_curl_http_src_request_remove (GstCurlHttpSrc * src);
227 static void gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src);
228 static char *gst_curl_http_src_strcasestr (const char *haystack,
229 const char *needle);
230 #ifndef GST_DISABLE_GST_DEBUG
231 static int gst_curl_http_src_get_debug (CURL * handle, curl_infotype type,
232 char *data, size_t size, void *clientp);
233 #endif
234 #ifdef OHOS_EXT_FUNC
235 // ohos.ext.func.0033
236 static void gst_curl_http_src_deal_sockets_timeout (GstCurlHttpSrcMultiTaskContext *context);
237 static gboolean gst_curl_http_src_reconnect_is_timeout (GstCurlHttpSrc *src);
238 #endif
239
240 static curl_version_info_data *gst_curl_http_src_curl_capabilities = NULL;
241 static GstCurlHttpVersion pref_http_ver;
242
243 #define GST_TYPE_CURL_HTTP_VERSION (gst_curl_http_version_get_type ())
244
245 static GType
gst_curl_http_version_get_type(void)246 gst_curl_http_version_get_type (void)
247 {
248 static GType gtype = 0;
249
250 if (!gtype) {
251 static const GEnumValue http_versions[] = {
252 {GSTCURL_HTTP_VERSION_1_0, "HTTP Version 1.0", "1.0"},
253 {GSTCURL_HTTP_VERSION_1_1, "HTTP Version 1.1", "1.1"},
254 #ifdef CURL_VERSION_HTTP2
255 {GSTCURL_HTTP_VERSION_2_0, "HTTP Version 2.0", "2.0"},
256 #endif
257 {0, NULL, NULL}
258 };
259 gtype = g_enum_register_static ("GstCurlHttpVersionType", http_versions);
260 }
261 return gtype;
262 }
263
264 #define gst_curl_http_src_parent_class parent_class
265 G_DEFINE_TYPE_WITH_CODE (GstCurlHttpSrc, gst_curl_http_src, GST_TYPE_PUSH_SRC,
266 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
267 gst_curl_http_src_uri_handler_init));
268 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (curlhttpsrc, "curlhttpsrc",
269 GST_RANK_SECONDARY, GST_TYPE_CURLHTTPSRC, curl_element_init (plugin));
270
271 static void
gst_curl_http_src_class_init(GstCurlHttpSrcClass * klass)272 gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
273 {
274 GObjectClass *gobject_class;
275 GstElementClass *gstelement_class;
276 GstBaseSrcClass *gstbasesrc_class;
277 GstPushSrcClass *gstpushsrc_class;
278 const gchar *http_env;
279 GstCurlHttpVersion default_http_version;
280
281 gobject_class = (GObjectClass *) klass;
282 gstelement_class = (GstElementClass *) klass;
283 gstbasesrc_class = (GstBaseSrcClass *) klass;
284 gstpushsrc_class = (GstPushSrcClass *) klass;
285
286 GST_DEBUG_CATEGORY_INIT (gst_curl_http_src_debug, "curlhttpsrc",
287 0, "UriHandler for libcURL");
288
289 gstelement_class->change_state =
290 GST_DEBUG_FUNCPTR (gst_curl_http_src_change_state);
291 gstpushsrc_class->create = GST_DEBUG_FUNCPTR (gst_curl_http_src_create);
292 gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_curl_http_src_query);
293 gstbasesrc_class->get_size =
294 GST_DEBUG_FUNCPTR (gst_curl_http_src_get_content_length);
295 gstbasesrc_class->is_seekable =
296 GST_DEBUG_FUNCPTR (gst_curl_http_src_is_seekable);
297 gstbasesrc_class->do_seek = GST_DEBUG_FUNCPTR (gst_curl_http_src_do_seek);
298 gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock);
299 gstbasesrc_class->unlock_stop =
300 GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock_stop);
301
302 gst_element_class_add_pad_template (gstelement_class,
303 gst_static_pad_template_get (&srcpadtemplate));
304
305 gst_curl_http_src_curl_capabilities = curl_version_info (CURLVERSION_NOW);
306 #ifdef CURL_VERSION_HTTP2
307 if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
308 default_http_version = GSTCURL_HTTP_VERSION_2_0;
309 } else
310 #endif
311 default_http_version = GSTCURL_HTTP_VERSION_1_1;
312
313 http_env = g_getenv ("GST_CURL_HTTP_VER");
314 if (http_env != NULL) {
315 GST_INFO_OBJECT (klass, "Seen env var GST_CURL_HTTP_VER with value %s",
316 http_env);
317 if (!strcmp (http_env, "1.0")) {
318 pref_http_ver = GSTCURL_HTTP_VERSION_1_0;
319 } else if (!strcmp (http_env, "1.1")) {
320 pref_http_ver = GSTCURL_HTTP_VERSION_1_1;
321 } else if (!strcmp (http_env, "2.0")) {
322 #ifdef CURL_VERSION_HTTP2
323 if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
324 pref_http_ver = GSTCURL_HTTP_VERSION_2_0;
325 } else {
326 goto unsupported_http_version;
327 }
328 #endif
329 } else {
330 unsupported_http_version:
331 GST_WARNING_OBJECT (klass,
332 "Unsupported HTTP version: %s. Fallback to default", http_env);
333 pref_http_ver = default_http_version;
334 }
335 } else {
336 pref_http_ver = default_http_version;
337 }
338
339 gobject_class->set_property = gst_curl_http_src_set_property;
340 gobject_class->get_property = gst_curl_http_src_get_property;
341 gobject_class->finalize = gst_curl_http_src_finalize;
342
343 g_object_class_install_property (gobject_class, PROP_URI,
344 g_param_spec_string ("location", "Location", "URI of resource to read",
345 GSTCURL_HANDLE_DEFAULT_CURLOPT_URL,
346 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
347
348 g_object_class_install_property (gobject_class, PROP_USERNAME,
349 g_param_spec_string ("user-id", "user-id",
350 "HTTP location URI user id for authentication",
351 GSTCURL_HANDLE_DEFAULT_CURLOPT_USERNAME,
352 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353
354 g_object_class_install_property (gobject_class, PROP_PASSWORD,
355 g_param_spec_string ("user-pw", "user-pw",
356 "HTTP location URI password for authentication",
357 GSTCURL_HANDLE_DEFAULT_CURLOPT_PASSWORD,
358 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
359
360 g_object_class_install_property (gobject_class, PROP_PROXYURI,
361 g_param_spec_string ("proxy", "Proxy", "URI of HTTP proxy server",
362 GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXY,
363 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
364
365 g_object_class_install_property (gobject_class, PROP_PROXYUSERNAME,
366 g_param_spec_string ("proxy-id", "proxy-id",
367 "HTTP proxy URI user id for authentication",
368 GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYUSERNAME,
369 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
370
371 g_object_class_install_property (gobject_class, PROP_PROXYPASSWORD,
372 g_param_spec_string ("proxy-pw", "proxy-pw",
373 "HTTP proxy URI password for authentication",
374 GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYPASSWORD,
375 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
376
377 g_object_class_install_property (gobject_class, PROP_COOKIES,
378 g_param_spec_boxed ("cookies", "Cookies", "List of HTTP Cookies",
379 G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380
381 g_object_class_install_property (gobject_class, PROP_USERAGENT,
382 g_param_spec_string ("user-agent", "User-Agent",
383 "URI of resource requested",
384 GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/<curl-version>",
385 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
386 GST_PARAM_DOC_SHOW_DEFAULT));
387
388 g_object_class_install_property (gobject_class, PROP_COMPRESS,
389 g_param_spec_boolean ("compress", "Compress",
390 "Allow compressed content encodings",
391 GSTCURL_HANDLE_DEFAULT_CURLOPT_ACCEPT_ENCODING,
392 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
393
394 g_object_class_install_property (gobject_class, PROP_REDIRECT,
395 g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
396 "Allow HTTP Redirections (HTTP Status Code 300 series)",
397 GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION,
398 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399
400 g_object_class_install_property (gobject_class, PROP_MAXREDIRECT,
401 g_param_spec_int ("max-redirect", "Max-Redirect",
402 "Maximum number of permitted redirections. -1 is unlimited.",
403 GSTCURL_HANDLE_MIN_CURLOPT_MAXREDIRS,
404 GSTCURL_HANDLE_MAX_CURLOPT_MAXREDIRS,
405 GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS,
406 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
407
408 g_object_class_install_property (gobject_class, PROP_KEEPALIVE,
409 g_param_spec_boolean ("keep-alive", "Keep-Alive",
410 "Toggle keep-alive for connection reuse.",
411 GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE,
412 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
413
414 g_object_class_install_property (gobject_class, PROP_TIMEOUT,
415 g_param_spec_int ("timeout", "Timeout",
416 "Value in seconds before timeout a blocking request (0 = no timeout)",
417 GSTCURL_HANDLE_MIN_CURLOPT_TIMEOUT,
418 GSTCURL_HANDLE_MAX_CURLOPT_TIMEOUT,
419 GSTCURL_HANDLE_DEFAULT_CURLOPT_TIMEOUT,
420 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
421
422 g_object_class_install_property (gobject_class, PROP_HEADERS,
423 g_param_spec_boxed ("extra-headers", "Extra Headers",
424 "Extra headers to append to the HTTP request",
425 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
426
427 g_object_class_install_property (gobject_class, PROP_STRICT_SSL,
428 g_param_spec_boolean ("ssl-strict", "SSL Strict",
429 "Strict SSL certificate checking",
430 GSTCURL_HANDLE_DEFAULT_CURLOPT_SSL_VERIFYPEER,
431 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
432
433 g_object_class_install_property (gobject_class, PROP_SSL_CA_FILE,
434 g_param_spec_string ("ssl-ca-file", "SSL CA File",
435 "Location of an SSL CA file to use for checking SSL certificates",
436 GSTCURL_HANDLE_DEFAULT_CURLOPT_CAINFO,
437 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
438
439 g_object_class_install_property (gobject_class, PROP_RETRIES,
440 g_param_spec_int ("retries", "Retries",
441 "Maximum number of retries until giving up (-1=infinite)",
442 GSTCURL_HANDLE_MIN_RETRIES, GSTCURL_HANDLE_MAX_RETRIES,
443 GSTCURL_HANDLE_DEFAULT_RETRIES,
444 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
445
446 g_object_class_install_property (gobject_class, PROP_CONNECTIONMAXTIME,
447 g_param_spec_uint ("max-connection-time", "Max-Connection-Time",
448 "Maximum amount of time to keep-alive HTTP connections",
449 GSTCURL_MIN_CONNECTION_TIME, GSTCURL_MAX_CONNECTION_TIME,
450 GSTCURL_DEFAULT_CONNECTION_TIME,
451 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452
453 g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_SERVER,
454 g_param_spec_uint ("max-connections-per-server",
455 "Max-Connections-Per-Server",
456 "Maximum number of connections allowed per server for HTTP/1.x",
457 GSTCURL_MIN_CONNECTIONS_SERVER, GSTCURL_MAX_CONNECTIONS_SERVER,
458 GSTCURL_DEFAULT_CONNECTIONS_SERVER,
459 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
460
461 g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_PROXY,
462 g_param_spec_uint ("max-connections-per-proxy",
463 "Max-Connections-Per-Proxy",
464 "Maximum number of concurrent connections allowed per proxy for HTTP/1.x",
465 GSTCURL_MIN_CONNECTIONS_PROXY, GSTCURL_MAX_CONNECTIONS_PROXY,
466 GSTCURL_DEFAULT_CONNECTIONS_PROXY,
467 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
468
469 g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_GLOBAL,
470 g_param_spec_uint ("max-connections", "Max-Connections",
471 "Maximum number of concurrent connections allowed for HTTP/1.x",
472 GSTCURL_MIN_CONNECTIONS_GLOBAL, GSTCURL_MAX_CONNECTIONS_GLOBAL,
473 GSTCURL_DEFAULT_CONNECTIONS_GLOBAL,
474 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
475
476 g_object_class_install_property (gobject_class, PROP_HTTPVERSION,
477 g_param_spec_enum ("http-version", "HTTP-Version",
478 "The preferred HTTP protocol version",
479 GST_TYPE_CURL_HTTP_VERSION, pref_http_ver,
480 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
481
482 #ifdef OHOS_EXT_FUNC
483 // ohos.ext.func.0033
484 g_object_class_install_property (gobject_class, PROP_RECONNECTION_TIMEOUT,
485 g_param_spec_uint ("reconnection-timeout", "Reconnection-timeout",
486 "Value in seconds to timeout reconnection",
487 GSTCURL_HANDLE_MIN_CURLOPT_RECONNECTION_TIMEOUT,
488 GSTCURL_HANDLE_MAX_CURLOPT_RECONNECTION_TIMEOUT,
489 GSTCURL_HANDLE_DEFAULT_CURLOPT_RECONNECTION_TIMEOUT,
490 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
491
492 g_object_class_install_property (gobject_class, PROP_STATE_CHANGE,
493 g_param_spec_int ("state-change", "State-change from adaptive-demux",
494 "State-change from adaptive-demux", 0, (gint) (G_MAXINT32), 0,
495 G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
496 #endif
497
498 /* Add a debugging task so it's easier to debug in the Multi worker thread */
499 GST_DEBUG_CATEGORY_INIT (gst_curl_loop_debug, "curl_multi_loop", 0,
500 "libcURL loop thread debugging");
501 #ifndef GST_DISABLE_GST_DEBUG
502 gst_debug_log (gst_curl_loop_debug, GST_LEVEL_INFO, __FILE__, __func__,
503 __LINE__, NULL, "Testing the curl_multi_loop debugging prints");
504 #endif
505
506 klass->multi_task_context.task = NULL;
507 klass->multi_task_context.refcount = 0;
508 klass->multi_task_context.queue = NULL;
509 klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
510 klass->multi_task_context.multi_handle = NULL;
511 g_mutex_init (&klass->multi_task_context.mutex);
512 #ifdef OHOS_OPT_STABLE
513 /* ohos.opt.stable.0003 for multiple instances ref/unref mutex */
514 g_mutex_init (&klass->multi_task_context.multiple_mutex);
515 #endif
516 g_cond_init (&klass->multi_task_context.signal);
517
518 gst_element_class_set_static_metadata (gstelement_class,
519 "HTTP Client Source using libcURL",
520 "Source/Network",
521 "Receiver data as a client over a network via HTTP using cURL",
522 "Sam Hurst <samuelh@rd.bbc.co.uk>");
523
524 gst_type_mark_as_plugin_api (GST_TYPE_CURL_HTTP_VERSION, 0);
525 }
526
527 static void
gst_curl_http_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)528 gst_curl_http_src_set_property (GObject * object, guint prop_id,
529 const GValue * value, GParamSpec * pspec)
530 {
531 GstCurlHttpSrc *source = GST_CURLHTTPSRC (object);
532 GSTCURL_FUNCTION_ENTRY (source);
533
534 switch (prop_id) {
535 case PROP_URI:
536 g_mutex_lock (&source->uri_mutex);
537 g_free (source->uri);
538 source->uri = g_value_dup_string (value);
539 g_mutex_unlock (&source->uri_mutex);
540 break;
541 case PROP_USERNAME:
542 g_free (source->username);
543 source->username = g_value_dup_string (value);
544 break;
545 case PROP_PASSWORD:
546 g_free (source->password);
547 source->password = g_value_dup_string (value);
548 break;
549 case PROP_PROXYURI:
550 g_free (source->proxy_uri);
551 source->proxy_uri = g_value_dup_string (value);
552 break;
553 case PROP_PROXYUSERNAME:
554 g_free (source->proxy_user);
555 source->proxy_user = g_value_dup_string (value);
556 break;
557 case PROP_PROXYPASSWORD:
558 g_free (source->proxy_pass);
559 source->proxy_pass = g_value_dup_string (value);
560 break;
561 case PROP_COOKIES:
562 g_strfreev (source->cookies);
563 source->cookies = g_strdupv (g_value_get_boxed (value));
564 source->number_cookies = g_strv_length (source->cookies);
565 break;
566 case PROP_USERAGENT:
567 g_free (source->user_agent);
568 source->user_agent = g_value_dup_string (value);
569 break;
570 case PROP_HEADERS:
571 {
572 const GstStructure *s = gst_value_get_structure (value);
573 if (source->request_headers)
574 gst_structure_free (source->request_headers);
575 source->request_headers =
576 s ? gst_structure_copy (s) :
577 gst_structure_new_empty (REQUEST_HEADERS_NAME);
578 }
579 break;
580 case PROP_COMPRESS:
581 source->accept_compressed_encodings = g_value_get_boolean (value);
582 break;
583 case PROP_REDIRECT:
584 source->allow_3xx_redirect = g_value_get_boolean (value);
585 break;
586 case PROP_MAXREDIRECT:
587 source->max_3xx_redirects = g_value_get_int (value);
588 break;
589 case PROP_KEEPALIVE:
590 source->keep_alive = g_value_get_boolean (value);
591 break;
592 case PROP_TIMEOUT:
593 source->timeout_secs = g_value_get_int (value);
594 break;
595 case PROP_STRICT_SSL:
596 source->strict_ssl = g_value_get_boolean (value);
597 break;
598 case PROP_SSL_CA_FILE:
599 #ifdef OHOS_OPT_MEMLEAK
600 /* ohos.opt.memleak.0003 fix memory leak. */
601 g_free (source->custom_ca_file);
602 #endif
603 source->custom_ca_file = g_value_dup_string (value);
604 break;
605 case PROP_RETRIES:
606 source->total_retries = g_value_get_int (value);
607 break;
608 case PROP_CONNECTIONMAXTIME:
609 source->max_connection_time = g_value_get_uint (value);
610 break;
611 case PROP_MAXCONCURRENT_SERVER:
612 source->max_conns_per_server = g_value_get_uint (value);
613 break;
614 case PROP_MAXCONCURRENT_PROXY:
615 source->max_conns_per_proxy = g_value_get_uint (value);
616 break;
617 case PROP_MAXCONCURRENT_GLOBAL:
618 source->max_conns_global = g_value_get_uint (value);
619 break;
620 case PROP_HTTPVERSION:
621 source->preferred_http_version = g_value_get_enum (value);
622 break;
623 #ifdef OHOS_EXT_FUNC
624 // ohos.ext.func.0033
625 case PROP_RECONNECTION_TIMEOUT: {
626 source->reconnection_timeout = g_value_get_uint (value);
627 GST_DEBUG_OBJECT (source, "set reconnection_timeout to %u us", source->reconnection_timeout);
628 break;
629 }
630 case PROP_STATE_CHANGE: {
631 source->player_state = g_value_get_int (value);
632 GST_DEBUG_OBJECT (source, "set player_state to %d", source->player_state);
633 break;
634 }
635 #endif
636 default:
637 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
638 break;
639 }
640 GSTCURL_FUNCTION_EXIT (source);
641 }
642
643 static void
gst_curl_http_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)644 gst_curl_http_src_get_property (GObject * object, guint prop_id,
645 GValue * value, GParamSpec * pspec)
646 {
647 GstCurlHttpSrc *source = GST_CURLHTTPSRC (object);
648 GSTCURL_FUNCTION_ENTRY (source);
649
650 switch (prop_id) {
651 case PROP_URI:
652 g_mutex_lock (&source->uri_mutex);
653 g_value_set_string (value, source->uri);
654 g_mutex_unlock (&source->uri_mutex);
655 break;
656 case PROP_USERNAME:
657 g_value_set_string (value, source->username);
658 break;
659 case PROP_PASSWORD:
660 g_value_set_string (value, source->password);
661 break;
662 case PROP_PROXYURI:
663 g_value_set_string (value, source->proxy_uri);
664 break;
665 case PROP_PROXYUSERNAME:
666 g_value_set_string (value, source->proxy_user);
667 break;
668 case PROP_PROXYPASSWORD:
669 g_value_set_string (value, source->proxy_pass);
670 break;
671 case PROP_COOKIES:
672 g_value_set_boxed (value, source->cookies);
673 break;
674 case PROP_USERAGENT:
675 g_value_set_string (value, source->user_agent);
676 break;
677 case PROP_HEADERS:
678 gst_value_set_structure (value, source->request_headers);
679 break;
680 case PROP_COMPRESS:
681 g_value_set_boolean (value, source->accept_compressed_encodings);
682 break;
683 case PROP_REDIRECT:
684 g_value_set_boolean (value, source->allow_3xx_redirect);
685 break;
686 case PROP_MAXREDIRECT:
687 g_value_set_int (value, source->max_3xx_redirects);
688 break;
689 case PROP_KEEPALIVE:
690 g_value_set_boolean (value, source->keep_alive);
691 break;
692 case PROP_TIMEOUT:
693 g_value_set_int (value, source->timeout_secs);
694 break;
695 case PROP_STRICT_SSL:
696 g_value_set_boolean (value, source->strict_ssl);
697 break;
698 case PROP_SSL_CA_FILE:
699 g_value_set_string (value, source->custom_ca_file);
700 break;
701 case PROP_RETRIES:
702 g_value_set_int (value, source->total_retries);
703 break;
704 case PROP_CONNECTIONMAXTIME:
705 g_value_set_uint (value, source->max_connection_time);
706 break;
707 case PROP_MAXCONCURRENT_SERVER:
708 g_value_set_uint (value, source->max_conns_per_server);
709 break;
710 case PROP_MAXCONCURRENT_PROXY:
711 g_value_set_uint (value, source->max_conns_per_proxy);
712 break;
713 case PROP_MAXCONCURRENT_GLOBAL:
714 g_value_set_uint (value, source->max_conns_global);
715 break;
716 case PROP_HTTPVERSION:
717 g_value_set_enum (value, source->preferred_http_version);
718 break;
719 #ifdef OHOS_EXT_FUNC
720 // ohos.ext.func.0033
721 case PROP_RECONNECTION_TIMEOUT:
722 g_value_set_uint (value, source->reconnection_timeout);
723 break;
724 #endif
725 default:
726 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
727 break;
728 }
729 GSTCURL_FUNCTION_EXIT (source);
730 }
731
732 static void
gst_curl_http_src_init(GstCurlHttpSrc * source)733 gst_curl_http_src_init (GstCurlHttpSrc * source)
734 {
735 GSTCURL_FUNCTION_ENTRY (source);
736
737 /* Assume everything is already free'd */
738 source->uri = NULL;
739 source->redirect_uri = NULL;
740 source->username = GSTCURL_HANDLE_DEFAULT_CURLOPT_USERNAME;
741 source->password = GSTCURL_HANDLE_DEFAULT_CURLOPT_PASSWORD;
742 source->proxy_uri = NULL;
743 source->proxy_user = NULL;
744 source->proxy_pass = NULL;
745 source->cookies = NULL;
746 g_assert (gst_curl_http_src_curl_capabilities != NULL);
747 source->user_agent =
748 g_strdup_printf (GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/%s",
749 gst_curl_http_src_curl_capabilities->version);
750 source->number_cookies = 0;
751 source->request_headers = gst_structure_new_empty (REQUEST_HEADERS_NAME);
752 source->allow_3xx_redirect = GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION;
753 source->max_3xx_redirects = GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS;
754 source->keep_alive = GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE;
755 source->timeout_secs = GSTCURL_HANDLE_DEFAULT_CURLOPT_TIMEOUT;
756 source->max_connection_time = GSTCURL_DEFAULT_CONNECTION_TIME;
757 source->max_conns_per_server = GSTCURL_DEFAULT_CONNECTIONS_SERVER;
758 source->max_conns_per_proxy = GSTCURL_DEFAULT_CONNECTIONS_PROXY;
759 source->max_conns_global = GSTCURL_DEFAULT_CONNECTIONS_GLOBAL;
760 #ifdef OHOS_OPT_COMPAT
761 /* ohos.ext.compat.0025
762 It will be closed temporarily until the certificate verification needs are delivered on September 30 */
763 source->strict_ssl = 0;
764 #else
765 source->strict_ssl = GSTCURL_HANDLE_DEFAULT_CURLOPT_SSL_VERIFYPEER;
766 #endif
767 source->custom_ca_file = NULL;
768 source->preferred_http_version = pref_http_ver;
769 source->total_retries = GSTCURL_HANDLE_DEFAULT_RETRIES;
770 source->retries_remaining = source->total_retries;
771 source->slist = NULL;
772 source->accept_compressed_encodings = FALSE;
773 source->seekable = GSTCURL_SEEKABLE_UNKNOWN;
774 source->content_size = 0;
775 source->request_position = 0;
776 #ifdef OHOS_EXT_FUNC
777 /* ohos.ext.func.0025 support https seek: */
778 source->orig_request_pos = 0;
779 source->read_position = 0;
780 #endif
781 source->stop_position = -1;
782
783 #ifdef OHOS_EXT_FUNC
784 // ohos.ext.func.0033
785 source->start_usecs = 0;
786 source->end_usecs = 0;
787 source->reconnection_timeout = GSTCURL_HANDLE_DEFAULT_CURLOPT_RECONNECTION_TIMEOUT;
788 source->player_state = GST_PLAYER_STATUS_IDLE;
789 #endif
790
791 gst_base_src_set_automatic_eos (GST_BASE_SRC (source), FALSE);
792
793 source->proxy_uri = g_strdup (g_getenv ("http_proxy"));
794 source->no_proxy_list = g_strdup (g_getenv ("no_proxy"));
795
796 g_mutex_init (&source->uri_mutex);
797 g_mutex_init (&source->buffer_mutex);
798 g_cond_init (&source->buffer_cond);
799
800 source->buffer = NULL;
801 source->buffer_len = 0;
802 source->state = GSTCURL_NONE;
803 source->pending_state = GSTCURL_NONE;
804 source->transfer_begun = FALSE;
805 source->data_received = FALSE;
806 source->connection_status = GSTCURL_NOT_CONNECTED;
807
808 source->http_headers = NULL;
809 source->content_type = NULL;
810 source->status_code = 0;
811 source->reason_phrase = NULL;
812 source->hdrs_updated = FALSE;
813 source->curl_result = CURLE_OK;
814 gst_caps_replace (&source->caps, NULL);
815
816 GSTCURL_FUNCTION_EXIT (source);
817 }
818
819 /*
820 * Check if the Curl multi loop has been started. If not, initialise it and
821 * start it running. If it is already running, increment the refcount.
822 */
823 static void
gst_curl_http_src_ref_multi(GstCurlHttpSrc * src)824 gst_curl_http_src_ref_multi (GstCurlHttpSrc * src)
825 {
826 GstCurlHttpSrcClass *klass;
827
828 GSTCURL_FUNCTION_ENTRY (src);
829
830 /*klass = (GstCurlHttpSrcClass) g_type_class_peek_parent (src); */
831 klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
832 GstCurlHttpSrcClass);
833
834 #ifdef OHOS_OPT_STABLE
835 /* ohos.opt.stable.0003 for multiple instances unref mutex */
836 g_mutex_lock (&klass->multi_task_context.multiple_mutex);
837 #endif
838
839 g_mutex_lock (&klass->multi_task_context.mutex);
840 if (klass->multi_task_context.refcount == 0) {
841 /* Set up various in-task properties */
842
843 /* NULL is treated as the start of the list, no need to allocate. */
844 klass->multi_task_context.queue = NULL;
845
846 /* set up curl */
847 klass->multi_task_context.multi_handle = curl_multi_init ();
848
849 curl_multi_setopt (klass->multi_task_context.multi_handle,
850 CURLMOPT_PIPELINING, 1);
851 #ifdef CURLMOPT_MAX_HOST_CONNECTIONS
852 curl_multi_setopt (klass->multi_task_context.multi_handle,
853 CURLMOPT_MAX_HOST_CONNECTIONS, 1);
854 #endif
855
856 /* Start the thread */
857 g_rec_mutex_init (&klass->multi_task_context.task_rec_mutex);
858 klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
859 klass->multi_task_context.task = gst_task_new (
860 (GstTaskFunction) gst_curl_http_src_curl_multi_loop,
861 (gpointer) & klass->multi_task_context, NULL);
862 gst_task_set_lock (klass->multi_task_context.task,
863 &klass->multi_task_context.task_rec_mutex);
864 if (gst_task_start (klass->multi_task_context.task) == FALSE) {
865 /*
866 * This is a pretty critical failure and is not recoverable, so commit
867 * sudoku and run away.
868 */
869 GSTCURL_ERROR_PRINT ("Couldn't start curl_multi task! Aborting.");
870 abort ();
871 }
872 GSTCURL_INFO_PRINT ("Curl multi loop has been correctly initialised!");
873 }
874 klass->multi_task_context.refcount++;
875 g_mutex_unlock (&klass->multi_task_context.mutex);
876
877 #ifdef OHOS_OPT_STABLE
878 /* ohos.opt.stable.0003 for multiple instances unref mutex */
879 g_mutex_unlock (&klass->multi_task_context.multiple_mutex);
880 #endif
881
882 GSTCURL_FUNCTION_EXIT (src);
883 }
884
885 /*
886 * Decrement the reference count on the curl multi loop. If this is called by
887 * the last instance to hold a reference, shut down the worker. (Otherwise
888 * GStreamer can't close down with a thread still running). Also offers the
889 * "force_all" boolean parameter, which if TRUE removes all references and shuts
890 * down.
891 */
892 static void
gst_curl_http_src_unref_multi(GstCurlHttpSrc * src)893 gst_curl_http_src_unref_multi (GstCurlHttpSrc * src)
894 {
895 GstCurlHttpSrcClass *klass;
896
897 GSTCURL_FUNCTION_ENTRY (src);
898
899 klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
900 GstCurlHttpSrcClass);
901
902 #ifdef OHOS_OPT_STABLE
903 /* ohos.opt.stable.0003 for multiple instances ref mutex */
904 g_mutex_lock (&klass->multi_task_context.multiple_mutex);
905 #endif
906
907 g_mutex_lock (&klass->multi_task_context.mutex);
908 #ifdef OHOS_EXT_FUNC
909 /* ohos.ext.func.0025 for clean code */
910 if (klass->multi_task_context.refcount == 0) {
911 GST_WARNING_OBJECT (src, "worker thread refcount is 0");
912 g_mutex_unlock (&klass->multi_task_context.mutex);
913 GSTCURL_FUNCTION_EXIT (src);
914 return;
915 }
916 #endif
917 klass->multi_task_context.refcount--;
918 GST_INFO_OBJECT (src, "Closing instance, worker thread refcount is now %u",
919 klass->multi_task_context.refcount);
920
921 if (klass->multi_task_context.refcount == 0) {
922 /* Everything's done! Clean up. */
923 gst_task_stop (klass->multi_task_context.task);
924 klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
925 g_cond_signal (&klass->multi_task_context.signal);
926 g_mutex_unlock (&klass->multi_task_context.mutex);
927 GST_DEBUG_OBJECT (src, "Joining curl_multi_loop task...");
928 gst_task_join (klass->multi_task_context.task);
929 gst_object_unref (klass->multi_task_context.task);
930 klass->multi_task_context.task = NULL;
931 curl_multi_cleanup (klass->multi_task_context.multi_handle);
932 klass->multi_task_context.multi_handle = NULL;
933 g_rec_mutex_clear (&klass->multi_task_context.task_rec_mutex);
934 GST_DEBUG_OBJECT (src, "multi_task_context cleanup complete");
935 } else {
936 g_mutex_unlock (&klass->multi_task_context.mutex);
937 }
938
939 #ifdef OHOS_OPT_STABLE
940 /* ohos.opt.stable.0003 for multiple instances ref mutex */
941 g_mutex_unlock (&klass->multi_task_context.multiple_mutex);
942 #endif
943
944 GSTCURL_FUNCTION_EXIT (src);
945 }
946
947 static void
gst_curl_http_src_finalize(GObject * obj)948 gst_curl_http_src_finalize (GObject * obj)
949 {
950 GstCurlHttpSrc *src = GST_CURLHTTPSRC (obj);
951
952 GSTCURL_FUNCTION_ENTRY (src);
953
954 /* Cleanup all memory allocated */
955 gst_curl_http_src_cleanup_instance (src);
956
957 GSTCURL_FUNCTION_EXIT (src);
958
959 /* Chain up to parent class */
960 G_OBJECT_CLASS (gst_curl_http_src_parent_class)->finalize (obj);
961 }
962
963 #ifdef OHOS_EXT_FUNC
964 /* ohos.ext.func.0025 for seek */
965 static void
gst_curl_http_src_handle_seek(GstCurlHttpSrc * src)966 gst_curl_http_src_handle_seek (GstCurlHttpSrc * src)
967 {
968 if (src->curl_handle == NULL) {
969 GST_INFO_OBJECT (src, "parameter is invalid");
970 return;
971 }
972
973 g_mutex_lock (&src->buffer_mutex);
974 if (src->request_position == src->read_position) {
975 #ifdef OHOS_OPT_COMPAT
976 /* ohos.opt.compat.0044 */
977 GST_INFO_OBJECT (src, "request_position is equal to read_position, req = %"
978 G_GUINT64_FORMAT, src->request_position);
979 #endif
980 /* not seek, just return */
981 g_mutex_unlock (&src->buffer_mutex);
982 return;
983 }
984 g_mutex_unlock (&src->buffer_mutex);
985
986 gst_curl_http_src_wait_until_removed(src);
987
988 g_mutex_lock (&src->buffer_mutex);
989 src->state = GSTCURL_NONE;
990 src->transfer_begun = FALSE;
991 src->status_code = 0;
992 if (src->reason_phrase != NULL) {
993 g_free (src->reason_phrase);
994 src->reason_phrase = NULL;
995 }
996 src->hdrs_updated = FALSE;
997 gst_curl_http_src_destroy_easy_handle (src);
998
999 if (src->buffer_len > 0) {
1000 g_free (src->buffer);
1001 src->buffer = NULL;
1002 src->buffer_len = 0;
1003 }
1004 g_mutex_unlock (&src->buffer_mutex);
1005
1006 GST_INFO_OBJECT (src, "seek_begin: curl handle removed, req_pos:%" G_GUINT64_FORMAT ", read_pos:%" G_GUINT64_FORMAT,
1007 src->request_position, src->read_position);
1008 }
1009 #endif
1010
1011 /*
1012 * Do the transfer. If the transfer hasn't begun yet, start a new curl handle
1013 * and pass it to the multi queue to be operated on. Then wait for any blocks
1014 * of data and push them to the source pad.
1015 */
1016 static GstFlowReturn
gst_curl_http_src_create(GstPushSrc * psrc,GstBuffer ** outbuf)1017 gst_curl_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
1018 {
1019 GstFlowReturn ret;
1020 GstCurlHttpSrc *src = GST_CURLHTTPSRC (psrc);
1021 GstCurlHttpSrcClass *klass;
1022 GstStructure *empty_headers;
1023 GstBaseSrc *basesrc;
1024
1025 GSTCURL_FUNCTION_ENTRY (src);
1026
1027 klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
1028 GstCurlHttpSrcClass);
1029 basesrc = GST_BASE_SRC_CAST (src);
1030
1031 retry:
1032 #ifdef OHOS_OPT_COMPAT
1033 /** ohos.opt.compat.0063
1034 * stop interrupt src create
1035 */
1036 if (src->player_state == GST_PLAYER_STATUS_READY) {
1037 GST_INFO_OBJECT(src, "stopping, cancel create");
1038 return GST_FLOW_ERROR;
1039 }
1040 #endif
1041 ret = GST_FLOW_OK;
1042
1043 #ifdef OHOS_EXT_FUNC
1044 /* ohos.ext.func.0025 for seek */
1045 gst_curl_http_src_handle_seek(src);
1046 #endif
1047
1048 /* NOTE: when both the buffer_mutex and multi_task_context.mutex are
1049 needed, multi_task_context.mutex must be acquired first */
1050 g_mutex_lock (&klass->multi_task_context.mutex);
1051 g_mutex_lock (&src->buffer_mutex);
1052 if (src->state == GSTCURL_UNLOCK) {
1053 ret = GST_FLOW_FLUSHING;
1054 goto escape;
1055 }
1056
1057 if (!src->transfer_begun) {
1058 GST_DEBUG_OBJECT (src, "Starting new request for URI %s", src->uri);
1059 /* Create the Easy Handle and set up the session. */
1060 src->curl_handle = gst_curl_http_src_create_easy_handle (src);
1061 if (src->curl_handle == NULL) {
1062 ret = GST_FLOW_ERROR;
1063 goto escape;
1064 }
1065
1066 if (gst_curl_http_src_add_queue_item (&klass->multi_task_context.queue, src)
1067 == FALSE) {
1068 GST_ERROR_OBJECT (src, "Couldn't create new queue item! Aborting...");
1069 ret = GST_FLOW_ERROR;
1070 goto escape;
1071 }
1072 /* Signal the worker thread */
1073 g_cond_signal (&klass->multi_task_context.signal);
1074
1075 src->state = GSTCURL_OK;
1076 src->transfer_begun = TRUE;
1077 src->data_received = FALSE;
1078 #ifdef OHOS_EXT_FUNC
1079 // ohos.ext.func.0033
1080 src->curl_result = CURLE_OK; // refresh the response state code
1081 #endif
1082
1083 GST_DEBUG_OBJECT (src, "Submitted request for URI %s to curl", src->uri);
1084
1085 if (src->http_headers != NULL) {
1086 gst_structure_free (src->http_headers);
1087 }
1088 empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME);
1089 src->http_headers = gst_structure_new (HTTP_HEADERS_NAME,
1090 URI_NAME, G_TYPE_STRING, src->uri,
1091 REQUEST_HEADERS_NAME, GST_TYPE_STRUCTURE, src->request_headers,
1092 RESPONSE_HEADERS_NAME, GST_TYPE_STRUCTURE, empty_headers, NULL);
1093 gst_structure_free (empty_headers);
1094 GST_INFO_OBJECT (src, "Created a new headers object");
1095 }
1096
1097 g_mutex_unlock (&klass->multi_task_context.mutex);
1098
1099 /* Wait for data to become available, then punt it downstream */
1100 while ((src->buffer_len == 0) && (src->state == GSTCURL_OK)
1101 && (src->connection_status == GSTCURL_CONNECTED)) {
1102 g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
1103 }
1104
1105 if (src->state == GSTCURL_UNLOCK) {
1106 if (src->buffer_len > 0) {
1107 g_free (src->buffer);
1108 src->buffer = NULL;
1109 src->buffer_len = 0;
1110 }
1111 g_mutex_unlock (&src->buffer_mutex);
1112 return GST_FLOW_FLUSHING;
1113 }
1114
1115 ret = gst_curl_http_src_handle_response (src);
1116 switch (ret) {
1117 #ifdef OHOS_EXT_FUNC
1118 /**
1119 * ohos.ext.func.0033
1120 * Support reconnection after disconnection in gstcurl.
1121 * When reconnected, reset the timeout clock.
1122 */
1123 case GST_FLOW_OK:
1124 src->start_usecs = 0;
1125 break;
1126 #endif
1127 case GST_FLOW_ERROR:
1128 /* Don't attempt a retry, just bomb out */
1129 g_mutex_unlock (&src->buffer_mutex);
1130 return ret;
1131 case GST_FLOW_CUSTOM_ERROR:
1132 if (src->data_received == TRUE) {
1133 /*
1134 * If data has already been received, we can't recall previously sent
1135 * buffers so don't attempt a retry in this case.
1136 *
1137 * TODO: Remember the position we got to, and make a range request for
1138 * the resource without the bit we've already received?
1139 */
1140 GST_WARNING_OBJECT (src,
1141 "Failed mid-transfer, can't continue for URI %s", src->uri);
1142 g_mutex_unlock (&src->buffer_mutex);
1143 return GST_FLOW_ERROR;
1144 }
1145 src->retries_remaining--;
1146 if (src->retries_remaining == 0) {
1147 GST_WARNING_OBJECT (src, "Out of retries for URI %s", src->uri);
1148 g_mutex_unlock (&src->buffer_mutex);
1149 return GST_FLOW_ERROR; /* Don't attempt a retry, just bomb out */
1150 }
1151 GST_INFO_OBJECT (src, "Attempting retry for URI %s", src->uri);
1152 src->state = GSTCURL_NONE;
1153 src->transfer_begun = FALSE;
1154 src->status_code = 0;
1155 g_free (src->reason_phrase);
1156 src->reason_phrase = NULL;
1157 src->hdrs_updated = FALSE;
1158 if (src->http_headers != NULL) {
1159 gst_structure_free (src->http_headers);
1160 src->http_headers = NULL;
1161 GST_INFO_OBJECT (src, "NULL'd the headers");
1162 }
1163 gst_curl_http_src_destroy_easy_handle (src);
1164 #ifdef OHOS_EXT_FUNC
1165 /**
1166 * ohos.ext.func.0033
1167 * Support reconnection after disconnection in gstcurl.
1168 * When network brokes, try reconnecting until timeout.
1169 */
1170 if (gst_curl_http_src_reconnect_is_timeout(src)) {
1171 CURL_HTTP_SRC_ERROR (src, RESOURCE, TIME_OUT, "reconnection timeout");
1172 g_mutex_unlock (&src->buffer_mutex);
1173 return GST_FLOW_RECONNECTION_TIMEOUT;
1174 }
1175 #endif
1176 g_mutex_unlock (&src->buffer_mutex);
1177 goto retry; /* Attempt a retry! */
1178 default:
1179 break;
1180 }
1181
1182 if (((src->state == GSTCURL_OK) || (src->state == GSTCURL_DONE)) &&
1183 (src->buffer_len > 0)) {
1184
1185 GST_DEBUG_OBJECT (src, "Pushing %u bytes of transfer for URI %s to pad",
1186 src->buffer_len, src->uri);
1187 *outbuf = gst_buffer_new_allocate (NULL, src->buffer_len, NULL);
1188 gst_buffer_fill (*outbuf, 0, src->buffer, src->buffer_len);
1189 GST_BUFFER_OFFSET (*outbuf) = basesrc->segment.position;
1190
1191 g_free (src->buffer);
1192 src->buffer = NULL;
1193 src->buffer_len = 0;
1194 src->data_received = TRUE;
1195
1196 /* ret should still be GST_FLOW_OK */
1197 } else if ((src->state == GSTCURL_DONE) && (src->buffer_len == 0)) {
1198 GST_INFO_OBJECT (src, "Full body received, signalling EOS for URI %s.",
1199 src->uri);
1200 src->state = GSTCURL_NONE;
1201 src->transfer_begun = FALSE;
1202 src->status_code = 0;
1203 g_free (src->reason_phrase);
1204 src->reason_phrase = NULL;
1205 src->hdrs_updated = FALSE;
1206 gst_curl_http_src_destroy_easy_handle (src);
1207 ret = GST_FLOW_EOS;
1208 } else {
1209 switch (src->state) {
1210 case GSTCURL_NONE:
1211 GST_WARNING_OBJECT (src, "Got unexpected GSTCURL_NONE state!");
1212 break;
1213 case GSTCURL_REMOVED:
1214 GST_WARNING_OBJECT (src, "Transfer got removed from the curl queue");
1215 ret = GST_FLOW_EOS;
1216 break;
1217 case GSTCURL_BAD_QUEUE_REQUEST:
1218 GST_ERROR_OBJECT (src, "Bad Queue Request!");
1219 ret = GST_FLOW_ERROR;
1220 break;
1221 case GSTCURL_TOTAL_ERROR:
1222 GST_ERROR_OBJECT (src, "Critical, unrecoverable error!");
1223 ret = GST_FLOW_ERROR;
1224 break;
1225 case GSTCURL_PIPELINE_NULL:
1226 GST_ERROR_OBJECT (src, "Pipeline null");
1227 break;
1228 default:
1229 GST_ERROR_OBJECT (src, "Unknown state of %u", src->state);
1230 }
1231 }
1232 g_mutex_unlock (&src->buffer_mutex);
1233 GSTCURL_FUNCTION_EXIT (src);
1234 return ret;
1235
1236 escape:
1237 g_mutex_unlock (&src->buffer_mutex);
1238 g_mutex_unlock (&klass->multi_task_context.mutex);
1239
1240 GSTCURL_FUNCTION_EXIT (src);
1241 return ret;
1242 }
1243
1244 /*
1245 * Convert header from a GstStructure type to a curl_slist type that curl will
1246 * understand.
1247 */
1248 static gboolean
_headers_to_curl_slist(GQuark field_id,const GValue * value,gpointer ptr)1249 _headers_to_curl_slist (GQuark field_id, const GValue * value, gpointer ptr)
1250 {
1251 gchar *field;
1252 struct curl_slist **p_slist = ptr;
1253
1254 field = g_strdup_printf ("%s: %s", g_quark_to_string (field_id),
1255 g_value_get_string (value));
1256
1257 *p_slist = curl_slist_append (*p_slist, field);
1258
1259 g_free (field);
1260
1261 return TRUE;
1262 }
1263
1264 /*
1265 * From the data in the queue element s, create a CURL easy handle and populate
1266 * options with the URL, proxy data, login options, cookies,
1267 */
1268 static CURL *
gst_curl_http_src_create_easy_handle(GstCurlHttpSrc * s)1269 gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s)
1270 {
1271 CURL *handle;
1272 gint i;
1273 GSTCURL_FUNCTION_ENTRY (s);
1274
1275 /* This is mandatory and yet not default option, so if this is NULL
1276 * then something very bad is going on. */
1277 if (s->uri == NULL) {
1278 GST_ERROR_OBJECT (s, "No URI for curl!");
1279 return NULL;
1280 }
1281
1282 handle = curl_easy_init ();
1283 if (handle == NULL) {
1284 GST_ERROR_OBJECT (s, "Couldn't init a curl easy handle!");
1285 return NULL;
1286 }
1287 GST_INFO_OBJECT (s, "Creating a new handle for URI %s", s->uri);
1288
1289 #ifndef GST_DISABLE_GST_DEBUG
1290 if (curl_easy_setopt (handle, CURLOPT_VERBOSE, 1) != CURLE_OK) {
1291 GST_WARNING_OBJECT (s, "Failed to set verbose!");
1292 }
1293 if (curl_easy_setopt (handle, CURLOPT_DEBUGDATA, s) != CURLE_OK) {
1294 GST_WARNING_OBJECT (s, "Failed to set debug user_data!");
1295 }
1296 if (curl_easy_setopt (handle, CURLOPT_DEBUGFUNCTION,
1297 gst_curl_http_src_get_debug) != CURLE_OK) {
1298 GST_WARNING_OBJECT (s, "Failed to set debug function!");
1299 }
1300 #endif
1301
1302 gst_curl_setopt_str (s, handle, CURLOPT_URL, s->uri);
1303 gst_curl_setopt_str (s, handle, CURLOPT_USERNAME, s->username);
1304 gst_curl_setopt_str (s, handle, CURLOPT_PASSWORD, s->password);
1305 gst_curl_setopt_str (s, handle, CURLOPT_PROXY, s->proxy_uri);
1306 gst_curl_setopt_str (s, handle, CURLOPT_NOPROXY, s->no_proxy_list);
1307 gst_curl_setopt_str (s, handle, CURLOPT_PROXYUSERNAME, s->proxy_user);
1308 gst_curl_setopt_str (s, handle, CURLOPT_PROXYPASSWORD, s->proxy_pass);
1309
1310 for (i = 0; i < s->number_cookies; i++) {
1311 gst_curl_setopt_str (s, handle, CURLOPT_COOKIELIST, s->cookies[i]);
1312 }
1313
1314 /* curl_slist_append dynamically allocates memory, but I need to free it */
1315 if (s->request_headers != NULL) {
1316 gst_structure_foreach (s->request_headers, _headers_to_curl_slist,
1317 &s->slist);
1318 if (curl_easy_setopt (handle, CURLOPT_HTTPHEADER, s->slist) != CURLE_OK) {
1319 GST_WARNING_OBJECT (s, "Failed to set HTTP headers!");
1320 }
1321 }
1322
1323 gst_curl_setopt_str_default (s, handle, CURLOPT_USERAGENT, s->user_agent);
1324
1325 /*
1326 * Unlike soup, this isn't a binary op, curl wants a string here. So if it's
1327 * TRUE, simply set the value as an empty string as this allows both gzip and
1328 * zlib compression methods.
1329 */
1330 if (s->accept_compressed_encodings == TRUE) {
1331 gst_curl_setopt_str (s, handle, CURLOPT_ACCEPT_ENCODING, "");
1332 } else {
1333 gst_curl_setopt_str (s, handle, CURLOPT_ACCEPT_ENCODING, "identity");
1334 }
1335
1336 gst_curl_setopt_int (s, handle, CURLOPT_FOLLOWLOCATION,
1337 s->allow_3xx_redirect);
1338 gst_curl_setopt_int_default (s, handle, CURLOPT_MAXREDIRS,
1339 s->max_3xx_redirects);
1340 gst_curl_setopt_bool (s, handle, CURLOPT_TCP_KEEPALIVE, s->keep_alive);
1341 gst_curl_setopt_int (s, handle, CURLOPT_TIMEOUT, s->timeout_secs);
1342 gst_curl_setopt_bool (s, handle, CURLOPT_SSL_VERIFYPEER, s->strict_ssl);
1343 gst_curl_setopt_str (s, handle, CURLOPT_CAINFO, s->custom_ca_file);
1344
1345 #ifdef OHOS_EXT_FUNC
1346 /* ohos.ext.func.0025 for seek */
1347 if (s->request_position > 0 || s->stop_position > 0) {
1348 gchar *range;
1349 if (s->stop_position < 1) {
1350 /* start specified, no end specified */
1351 range = g_strdup_printf ("%" G_GUINT64_FORMAT "-", s->request_position);
1352 } else {
1353 /* in GStreamer the end position indicates the first byte that is not
1354 in the range, whereas in HTTP the Content-Range header includes the
1355 byte listed in the end value */
1356 range = g_strdup_printf ("%" G_GUINT64_FORMAT "-%" G_GINT64_FORMAT,
1357 s->request_position, s->stop_position - 1);
1358 }
1359 s->orig_request_pos = s->request_position;
1360 GST_TRACE_OBJECT (s, "Requesting range: %s", range);
1361 curl_easy_setopt (handle, CURLOPT_RANGE, range);
1362 g_free (range);
1363 }
1364 s->read_position = s->request_position;
1365 #else
1366 if (s->request_position || s->stop_position > 0) {
1367 gchar *range;
1368 if (s->stop_position < 1) {
1369 /* start specified, no end specified */
1370 range = g_strdup_printf ("%" G_GINT64_FORMAT "-", s->request_position);
1371 } else {
1372 /* in GStreamer the end position indicates the first byte that is not
1373 in the range, whereas in HTTP the Content-Range header includes the
1374 byte listed in the end value */
1375 range = g_strdup_printf ("%" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
1376 s->request_position, s->stop_position - 1);
1377 }
1378 GST_TRACE_OBJECT (s, "Requesting range: %s", range);
1379 curl_easy_setopt (handle, CURLOPT_RANGE, range);
1380 g_free (range);
1381 }
1382 #endif
1383
1384 switch (s->preferred_http_version) {
1385 case GSTCURL_HTTP_VERSION_1_0:
1386 GST_DEBUG_OBJECT (s, "Setting version as HTTP/1.0");
1387 gst_curl_setopt_int (s, handle, CURLOPT_HTTP_VERSION,
1388 CURL_HTTP_VERSION_1_0);
1389 break;
1390 case GSTCURL_HTTP_VERSION_1_1:
1391 GST_DEBUG_OBJECT (s, "Setting version as HTTP/1.1");
1392 gst_curl_setopt_int (s, handle, CURLOPT_HTTP_VERSION,
1393 CURL_HTTP_VERSION_1_1);
1394 break;
1395 #ifdef CURL_VERSION_HTTP2
1396 case GSTCURL_HTTP_VERSION_2_0:
1397 GST_DEBUG_OBJECT (s, "Setting version as HTTP/2.0");
1398 if (curl_easy_setopt (handle, CURLOPT_HTTP_VERSION,
1399 CURL_HTTP_VERSION_2_0) != CURLE_OK) {
1400 if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
1401 GST_WARNING_OBJECT (s,
1402 "Cannot set unsupported option CURLOPT_HTTP_VERSION");
1403 } else {
1404 GST_INFO_OBJECT (s, "HTTP/2 unsupported by libcurl at this time");
1405 }
1406 }
1407 break;
1408 #endif
1409 default:
1410 GST_WARNING_OBJECT (s,
1411 "Supplied a bogus HTTP version, using curl default!");
1412 }
1413
1414 gst_curl_setopt_generic (s, handle, CURLOPT_HEADERFUNCTION,
1415 gst_curl_http_src_get_header);
1416 gst_curl_setopt_str (s, handle, CURLOPT_HEADERDATA, s);
1417 gst_curl_setopt_generic (s, handle, CURLOPT_WRITEFUNCTION,
1418 gst_curl_http_src_get_chunks);
1419 gst_curl_setopt_str (s, handle, CURLOPT_WRITEDATA, s);
1420
1421 gst_curl_setopt_str (s, handle, CURLOPT_ERRORBUFFER, s->curl_errbuf);
1422
1423 GSTCURL_FUNCTION_EXIT (s);
1424 return handle;
1425 }
1426
1427 /*
1428 * Check the return type from the curl transfer. If it was okay, then deal with
1429 * any headers that were received. Headers should only be dealt with once - but
1430 * we might get a second set if there are trailing headers (RFC7230 Section 4.4)
1431 */
1432 static GstFlowReturn
gst_curl_http_src_handle_response(GstCurlHttpSrc * src)1433 gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
1434 {
1435 glong curl_info_long;
1436 gdouble curl_info_dbl;
1437 curl_off_t curl_info_offt;
1438 gchar *redirect_url;
1439 GstBaseSrc *basesrc;
1440 const GValue *response_headers;
1441 GstFlowReturn ret = GST_FLOW_OK;
1442
1443 GSTCURL_FUNCTION_ENTRY (src);
1444
1445 GST_TRACE_OBJECT (src, "status code: %d (%s), curl return code %d",
1446 src->status_code, src->reason_phrase, src->curl_result);
1447
1448 /* Check the curl result code first - anything not 0 is probably a failure */
1449 if (src->curl_result != 0) {
1450 GST_WARNING_OBJECT (src, "Curl failed the transfer (%d): %s",
1451 src->curl_result, curl_easy_strerror (src->curl_result));
1452 GST_DEBUG_OBJECT (src, "Reason for curl failure: %s", src->curl_errbuf);
1453 #ifdef OHOS_EXT_FUNC
1454 /**
1455 * ohos.ext.func.0033
1456 * Support reconnection after disconnection in gstcurl.
1457 * When network brokes, try reconnecting until timeout.
1458 * Reconnection time counting will start after player state changing to buffering
1459 * and receiving curl error message of "could not connect" or
1460 * "operation timeout"(may cost some time after performing operation)
1461 */
1462 if (src->curl_result == CURLE_COULDNT_CONNECT || src->curl_result == CURLE_OPERATION_TIMEDOUT) {
1463 src->data_received = FALSE;
1464 if (src->buffer_len > 0) {
1465 return GST_FLOW_OK;
1466 }
1467 return GST_FLOW_CUSTOM_ERROR;
1468 }
1469 #endif
1470 return GST_FLOW_ERROR;
1471 }
1472
1473 /*
1474 * What response code do we have?
1475 */
1476 if (src->status_code >= 400) {
1477 GST_WARNING_OBJECT (src, "Transfer for URI %s returned error status %u",
1478 src->uri, src->status_code);
1479 src->retries_remaining = 0;
1480 #ifdef OHOS_OPT_COMPAT
1481 /**
1482 * ohos.opt.compat.0032
1483 * Fix seek failed when request position from gstqueue2 is the end of the file.
1484 * A 416 response is not actually an error when the file is already completely downloaded
1485 * and the request position is the end of the file.
1486 */
1487 if (src->status_code == 416 && src->content_size > 0) {
1488 return GST_FLOW_OK;
1489 }
1490 #endif
1491 CURL_HTTP_SRC_ERROR (src, RESOURCE, NOT_FOUND, (src->reason_phrase));
1492 return GST_FLOW_ERROR;
1493 } else if (src->status_code == 0) {
1494 if (curl_easy_getinfo (src->curl_handle, CURLINFO_TOTAL_TIME,
1495 &curl_info_dbl) != CURLE_OK) {
1496 /* Curl cannot be relied on in this state, so return an error. */
1497 return GST_FLOW_ERROR;
1498 }
1499 if (curl_info_dbl > src->timeout_secs) {
1500 return GST_FLOW_CUSTOM_ERROR;
1501 }
1502
1503 if (curl_easy_getinfo (src->curl_handle, CURLINFO_OS_ERRNO,
1504 &curl_info_long) != CURLE_OK) {
1505 /* Curl cannot be relied on in this state, so return an error. */
1506 return GST_FLOW_ERROR;
1507
1508 }
1509
1510 GST_WARNING_OBJECT (src, "Errno for CONNECT call was %ld (%s)",
1511 curl_info_long, g_strerror ((gint) curl_info_long));
1512
1513 /* Some of these responses are retry-able, others not. Set the returned
1514 * state to ERROR so we crash out instead of fruitlessly retrying.
1515 */
1516 if (curl_info_long == ECONNREFUSED) {
1517 return GST_FLOW_ERROR;
1518 }
1519 ret = GST_FLOW_CUSTOM_ERROR;
1520 }
1521
1522
1523 if (ret == GST_FLOW_CUSTOM_ERROR) {
1524 src->hdrs_updated = FALSE;
1525 GSTCURL_FUNCTION_EXIT (src);
1526 return ret;
1527 }
1528
1529 /* Only do this once */
1530 if (src->hdrs_updated == FALSE) {
1531 GSTCURL_FUNCTION_EXIT (src);
1532 return GST_FLOW_OK;
1533 }
1534
1535 /*
1536 * Deal with redirections...
1537 */
1538 if (curl_easy_getinfo (src->curl_handle, CURLINFO_EFFECTIVE_URL,
1539 &redirect_url)
1540 == CURLE_OK) {
1541 size_t lena, lenb;
1542 lena = strlen (src->uri);
1543 lenb = strlen (redirect_url);
1544 if (g_ascii_strncasecmp (src->uri, redirect_url,
1545 (lena > lenb) ? lenb : lena) != 0) {
1546 GST_INFO_OBJECT (src, "Got a redirect to %s, setting as redirect URI",
1547 redirect_url);
1548 src->redirect_uri = g_strdup (redirect_url);
1549 gst_structure_remove_field (src->http_headers, REDIRECT_URI_NAME);
1550 gst_structure_set (src->http_headers, REDIRECT_URI_NAME,
1551 G_TYPE_STRING, redirect_url, NULL);
1552 }
1553 }
1554
1555 /*
1556 * Push the content length
1557 */
1558 if (curl_easy_getinfo (src->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T,
1559 &curl_info_offt) == CURLE_OK) {
1560 if (curl_info_offt == -1) {
1561 GST_WARNING_OBJECT (src,
1562 "No Content-Length was specified in the response.");
1563 #ifdef OHOS_OPT_COMPAT
1564 /**
1565 * ohos.opt.compat.0059
1566 * DASH stream's seek need obtaining content length, and HLS stream does not need it.
1567 * If dash cannot obtain content length, it is necessary to set seekable to false.
1568 * Currently, DASH is not supported, so disable this setting to avoid affecting HLS.
1569 */
1570 #else
1571 src->seekable = GSTCURL_SEEKABLE_FALSE;
1572 #endif
1573 } else {
1574 /* Note that in the case of a range get, Content-Length is the number
1575 of bytes requested, not the total size of the resource */
1576 #ifdef OHOS_EXT_FUNC
1577 /* ohos.ext.func.0025 */
1578 GST_INFO_OBJECT (src, "orig req pos:%" G_GUINT64_FORMAT ", Content-Length was given as %" G_GUINT64_FORMAT,
1579 src->orig_request_pos, curl_info_offt);
1580 if (src->content_size == 0) {
1581 src->content_size = src->orig_request_pos + curl_info_offt;
1582 }
1583 basesrc = GST_BASE_SRC_CAST (src);
1584 basesrc->segment.duration = src->orig_request_pos + curl_info_offt;
1585 #else
1586 GST_INFO_OBJECT (src, "Content-Length was given as %" G_GUINT64_FORMAT,
1587 curl_info_offt);
1588 if (src->content_size == 0) {
1589 src->content_size = src->request_position + curl_info_offt;
1590 }
1591 basesrc = GST_BASE_SRC_CAST (src);
1592 basesrc->segment.duration = src->request_position + curl_info_offt;
1593 #endif
1594 if (src->seekable == GSTCURL_SEEKABLE_UNKNOWN) {
1595 src->seekable = GSTCURL_SEEKABLE_TRUE;
1596 }
1597 gst_element_post_message (GST_ELEMENT (src),
1598 gst_message_new_duration_changed (GST_OBJECT (src)));
1599 }
1600 }
1601
1602 /*
1603 * Push all the received headers down via a sicky event
1604 */
1605 response_headers = gst_structure_get_value (src->http_headers,
1606 RESPONSE_HEADERS_NAME);
1607 if (gst_structure_n_fields (gst_value_get_structure (response_headers)) > 0) {
1608 GstEvent *hdrs_event;
1609
1610 gst_element_post_message (GST_ELEMENT_CAST (src),
1611 gst_message_new_element (GST_OBJECT_CAST (src),
1612 gst_structure_copy (src->http_headers)));
1613
1614 /* gst_event_new_custom takes ownership of our structure */
1615 hdrs_event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY,
1616 gst_structure_copy (src->http_headers));
1617 gst_pad_push_event (GST_BASE_SRC_PAD (src), hdrs_event);
1618 GST_INFO_OBJECT (src, "Pushed headers downstream");
1619 }
1620
1621 src->hdrs_updated = FALSE;
1622
1623 GSTCURL_FUNCTION_EXIT (src);
1624
1625 return ret;
1626 }
1627
1628 /*
1629 * "Negotiate" capabilities between us and the sink.
1630 * I.e. tell the sink device what data to expect. We can't be told what to send
1631 * unless we implement "only return to me if this type" property. Potential TODO
1632 */
1633 static gboolean
gst_curl_http_src_negotiate_caps(GstCurlHttpSrc * src)1634 gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src)
1635 {
1636 const GValue *response_headers;
1637 const GstStructure *response_struct;
1638
1639 GST_INFO_OBJECT (src, "Negotiating caps...");
1640 if (src->caps && src->http_headers) {
1641 response_headers =
1642 gst_structure_get_value (src->http_headers, RESPONSE_HEADERS_NAME);
1643 if (!response_headers) {
1644 GST_WARNING_OBJECT (src, "Failed to get %s", RESPONSE_HEADERS_NAME);
1645 return FALSE;
1646 }
1647 response_struct = gst_value_get_structure (response_headers);
1648 if (gst_structure_has_field_typed (response_struct, "content-type",
1649 G_TYPE_STRING)) {
1650 const gchar *content_type =
1651 gst_structure_get_string (response_struct, "content-type");
1652 GST_INFO_OBJECT (src, "Setting caps as Content-Type of %s", content_type);
1653 src->caps = gst_caps_make_writable (src->caps);
1654 gst_caps_set_simple (src->caps, "content-type", G_TYPE_STRING,
1655 content_type, NULL);
1656 if (gst_base_src_set_caps (GST_BASE_SRC (src), src->caps) != TRUE) {
1657 GST_ERROR_OBJECT (src, "Setting caps failed!");
1658 return FALSE;
1659 }
1660 }
1661 } else {
1662 GST_DEBUG_OBJECT (src, "No caps have been set, continue.");
1663 }
1664
1665 return TRUE;
1666 }
1667
1668 /*
1669 * Cleanup the CURL easy handle once we're done with it.
1670 */
1671 static inline void
gst_curl_http_src_destroy_easy_handle(GstCurlHttpSrc * src)1672 gst_curl_http_src_destroy_easy_handle (GstCurlHttpSrc * src)
1673 {
1674 /* Thank you Handles, and well done. Well done, mate. */
1675 if (src->curl_handle != NULL) {
1676 curl_easy_cleanup (src->curl_handle);
1677 src->curl_handle = NULL;
1678 }
1679
1680 /* In addition, clean up the curl header slist if it was used. */
1681 if (src->slist != NULL) {
1682 curl_slist_free_all (src->slist);
1683 src->slist = NULL;
1684 }
1685 }
1686
1687 static GstStateChangeReturn
gst_curl_http_src_change_state(GstElement * element,GstStateChange transition)1688 gst_curl_http_src_change_state (GstElement * element, GstStateChange transition)
1689 {
1690 GstStateChangeReturn ret;
1691 GstCurlHttpSrc *source = GST_CURLHTTPSRC (element);
1692 GSTCURL_FUNCTION_ENTRY (source);
1693
1694 switch (transition) {
1695 case GST_STATE_CHANGE_NULL_TO_READY:
1696 gst_curl_http_src_ref_multi (source);
1697 break;
1698 case GST_STATE_CHANGE_READY_TO_PAUSED:
1699 if (source->uri == NULL) {
1700 GST_ELEMENT_ERROR (element, RESOURCE, OPEN_READ, (_("No URL set.")),
1701 ("Missing URL"));
1702 return GST_STATE_CHANGE_FAILURE;
1703 }
1704 break;
1705 case GST_STATE_CHANGE_READY_TO_NULL:
1706 GST_DEBUG_OBJECT (source, "Removing from multi_loop queue...");
1707 /* The pipeline has ended, so signal any running request to end
1708 and wait until the multi_loop has stopped using this element */
1709 gst_curl_http_src_wait_until_removed (source);
1710 gst_curl_http_src_unref_multi (source);
1711 break;
1712 default:
1713 break;
1714 }
1715
1716 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1717
1718 GSTCURL_FUNCTION_EXIT (source);
1719 return ret;
1720 }
1721
1722 /*
1723 * Take care of any memory that may be left over from the instance that's now
1724 * closing before we leak it.
1725 */
1726 static void
gst_curl_http_src_cleanup_instance(GstCurlHttpSrc * src)1727 gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src)
1728 {
1729 gint i;
1730 g_mutex_lock (&src->uri_mutex);
1731 g_free (src->uri);
1732 src->uri = NULL;
1733 g_free (src->redirect_uri);
1734 src->redirect_uri = NULL;
1735 g_mutex_unlock (&src->uri_mutex);
1736 g_mutex_clear (&src->uri_mutex);
1737
1738 g_free (src->proxy_uri);
1739 src->proxy_uri = NULL;
1740 g_free (src->no_proxy_list);
1741 src->no_proxy_list = NULL;
1742 g_free (src->proxy_user);
1743 src->proxy_user = NULL;
1744 g_free (src->proxy_pass);
1745 src->proxy_pass = NULL;
1746
1747 for (i = 0; i < src->number_cookies; i++) {
1748 g_free (src->cookies[i]);
1749 src->cookies[i] = NULL;
1750 }
1751 g_free (src->cookies);
1752 src->cookies = NULL;
1753
1754 g_free (src->user_agent);
1755 src->user_agent = NULL;
1756
1757 g_mutex_clear (&src->buffer_mutex);
1758
1759 g_cond_clear (&src->buffer_cond);
1760
1761 g_free (src->buffer);
1762 src->buffer = NULL;
1763
1764 if (src->request_headers) {
1765 gst_structure_free (src->request_headers);
1766 src->request_headers = NULL;
1767 }
1768 if (src->http_headers != NULL) {
1769 gst_structure_free (src->http_headers);
1770 src->http_headers = NULL;
1771 }
1772 g_free (src->reason_phrase);
1773 src->reason_phrase = NULL;
1774 #ifdef OHOS_OPT_MEMLEAK
1775 /* ohos.opt.memleak.0003 fix memory leak. */
1776 g_free (src->custom_ca_file);
1777 src->custom_ca_file = NULL;
1778 #endif
1779 gst_caps_replace (&src->caps, NULL);
1780
1781 gst_curl_http_src_destroy_easy_handle (src);
1782 }
1783
1784 static gboolean
gst_curl_http_src_query(GstBaseSrc * bsrc,GstQuery * query)1785 gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query)
1786 {
1787 GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1788 gboolean ret;
1789 #ifdef OHOS_EXT_FUNC
1790 /* ohos.opt.compat.0022 support pull mode in libav demux */
1791 GstSchedulingFlags flags;
1792 gint minsize, maxsize, align;
1793 #endif
1794 GSTCURL_FUNCTION_ENTRY (src);
1795
1796 switch (GST_QUERY_TYPE (query)) {
1797 case GST_QUERY_URI:
1798 g_mutex_lock (&src->uri_mutex);
1799 gst_query_set_uri (query, src->uri);
1800 if (src->redirect_uri != NULL) {
1801 gst_query_set_uri_redirection (query, src->redirect_uri);
1802 }
1803 g_mutex_unlock (&src->uri_mutex);
1804 ret = TRUE;
1805 break;
1806 default:
1807 ret = GST_BASE_SRC_CLASS (parent_class)->query (bsrc, query);
1808 break;
1809 }
1810
1811 #ifdef OHOS_EXT_FUNC
1812 /* ohos.opt.compat.0022 support pull mode in libav demux */
1813 switch (GST_QUERY_TYPE (query)) {
1814 case GST_QUERY_SCHEDULING:
1815 gst_query_parse_scheduling (query, &flags, &minsize, &maxsize, &align);
1816 flags |= GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED;
1817
1818 if (src->seekable) {
1819 flags |= GST_SCHEDULING_FLAG_SEEKABLE;
1820 } else {
1821 flags &= (~GST_SCHEDULING_FLAG_SEEKABLE);
1822 }
1823 GST_INFO_OBJECT (src, "seekable: %d", src->seekable);
1824
1825 gst_query_set_scheduling (query, flags, minsize, maxsize, align);
1826 break;
1827 default:
1828 break;
1829 }
1830 #endif
1831
1832 GSTCURL_FUNCTION_EXIT (src);
1833 return ret;
1834 }
1835
1836 static gboolean
gst_curl_http_src_get_content_length(GstBaseSrc * bsrc,guint64 * size)1837 gst_curl_http_src_get_content_length (GstBaseSrc * bsrc, guint64 * size)
1838 {
1839 GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1840 const GValue *response_headers;
1841 gboolean ret = FALSE;
1842
1843 if (src->http_headers == NULL) {
1844 return FALSE;
1845 }
1846
1847 response_headers = gst_structure_get_value (src->http_headers,
1848 RESPONSE_HEADERS_NAME);
1849 if (gst_structure_has_field_typed (gst_value_get_structure (response_headers),
1850 "content-length", G_TYPE_STRING)) {
1851 const gchar *content_length =
1852 gst_structure_get_string (gst_value_get_structure (response_headers),
1853 "content-length");
1854 *size = (guint64) g_ascii_strtoull (content_length, NULL, 10);
1855 ret = TRUE;
1856 } else {
1857 GST_DEBUG_OBJECT (src,
1858 "No content length has yet been set, or there was an error!");
1859 }
1860 return ret;
1861 }
1862
1863 static gboolean
gst_curl_http_src_is_seekable(GstBaseSrc * bsrc)1864 gst_curl_http_src_is_seekable (GstBaseSrc * bsrc)
1865 {
1866 GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1867
1868 /* NOTE: if seekable is UNKNOWN, assume yes */
1869 return src->seekable != GSTCURL_SEEKABLE_FALSE;
1870 }
1871
1872 static gboolean
gst_curl_http_src_do_seek(GstBaseSrc * bsrc,GstSegment * segment)1873 gst_curl_http_src_do_seek (GstBaseSrc * bsrc, GstSegment * segment)
1874 {
1875 GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1876 gboolean ret = TRUE;
1877
1878 g_mutex_lock (&src->buffer_mutex);
1879 #ifdef OHOS_EXT_FUNC
1880 /* ohos.ext.func.0025 support https seek: */
1881 GST_INFO_OBJECT (src, "do_seek(%" G_GINT64_FORMAT ", %" G_GINT64_FORMAT
1882 ")", segment->start, segment->stop);
1883 #endif
1884 if (src->state == GSTCURL_UNLOCK) {
1885 GST_WARNING_OBJECT (src, "Attempt to seek while unlocked");
1886 ret = FALSE;
1887 goto done;
1888 }
1889 if (src->request_position == segment->start &&
1890 src->stop_position == segment->stop) {
1891 #ifdef OHOS_OPT_COMPAT
1892 /* ohos.opt.compat.0044 */
1893 src->read_position = (guint64)-1;
1894 #endif
1895 GST_DEBUG_OBJECT (src, "Seek to current read/end position");
1896 goto done;
1897 }
1898
1899 if (src->seekable == GSTCURL_SEEKABLE_FALSE) {
1900 GST_WARNING_OBJECT (src, "Not seekable");
1901 ret = FALSE;
1902 goto done;
1903 }
1904
1905 if (segment->rate < 0.0 || segment->format != GST_FORMAT_BYTES) {
1906 GST_WARNING_OBJECT (src, "Invalid seek segment");
1907 ret = FALSE;
1908 goto done;
1909 }
1910
1911 if (src->content_size > 0 && segment->start >= src->content_size) {
1912 GST_WARNING_OBJECT (src,
1913 "Potentially seeking beyond end of file, might EOS immediately");
1914 }
1915
1916 src->request_position = segment->start;
1917 src->stop_position = segment->stop;
1918 #ifdef OHOS_OPT_COMPAT
1919 /**
1920 * ohos.opt.compat.0044
1921 * Fix seek failed when request_position from gstqueue2 seek event is the same as read_position.
1922 * This case occurs when pulled position from demux jumps back and forth between the boundary of two ranges in gstqueue2,
1923 * which causes requested postion of seek event from gstqueue2 be the same as right boundary of the last range, namely
1924 * current downloaded position(read_position). Thus, reset read position when occurred gst_curl_http_src_do_seek().
1925 */
1926 src->read_position = (guint64)-1;
1927 #endif
1928 done:
1929 g_mutex_unlock (&src->buffer_mutex);
1930 return ret;
1931 }
1932
1933 static void
gst_curl_http_src_uri_handler_init(gpointer g_iface,gpointer iface_data)1934 gst_curl_http_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1935 {
1936 GstURIHandlerInterface *uri_iface = (GstURIHandlerInterface *) g_iface;
1937
1938 uri_iface->get_type = gst_curl_http_src_urihandler_get_type;
1939 uri_iface->get_protocols = gst_curl_http_src_urihandler_get_protocols;
1940 uri_iface->get_uri = gst_curl_http_src_urihandler_get_uri;
1941 uri_iface->set_uri = gst_curl_http_src_urihandler_set_uri;
1942 }
1943
1944 static guint
gst_curl_http_src_urihandler_get_type(GType type)1945 gst_curl_http_src_urihandler_get_type (GType type)
1946 {
1947 return GST_URI_SRC;
1948 }
1949
1950 static const gchar *const *
gst_curl_http_src_urihandler_get_protocols(GType type)1951 gst_curl_http_src_urihandler_get_protocols (GType type)
1952 {
1953 static const gchar *protocols[] = { "http", "https", NULL };
1954
1955 return protocols;
1956 }
1957
1958 static gchar *
gst_curl_http_src_urihandler_get_uri(GstURIHandler * handler)1959 gst_curl_http_src_urihandler_get_uri (GstURIHandler * handler)
1960 {
1961 gchar *ret;
1962 GstCurlHttpSrc *source;
1963
1964 g_return_val_if_fail (GST_IS_URI_HANDLER (handler), NULL);
1965 source = GST_CURLHTTPSRC (handler);
1966
1967 GSTCURL_FUNCTION_ENTRY (source);
1968
1969 g_mutex_lock (&source->uri_mutex);
1970 ret = g_strdup (source->uri);
1971 g_mutex_unlock (&source->uri_mutex);
1972
1973 GSTCURL_FUNCTION_EXIT (source);
1974 return ret;
1975 }
1976
1977 static gboolean
gst_curl_http_src_urihandler_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)1978 gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
1979 const gchar * uri, GError ** error)
1980 {
1981 GstCurlHttpSrc *source = GST_CURLHTTPSRC (handler);
1982 GSTCURL_FUNCTION_ENTRY (source);
1983
1984 g_return_val_if_fail (GST_IS_URI_HANDLER (handler), FALSE);
1985 g_return_val_if_fail (uri != NULL, FALSE);
1986
1987 g_mutex_lock (&source->uri_mutex);
1988
1989 if (source->uri != NULL) {
1990 GST_DEBUG_OBJECT (source,
1991 "URI already present as %s, updating to new URI %s", source->uri, uri);
1992 g_free (source->uri);
1993 }
1994
1995 source->uri = g_strdup (uri);
1996 if (source->uri == NULL) {
1997 g_mutex_unlock (&source->uri_mutex);
1998 return FALSE;
1999 }
2000 source->retries_remaining = source->total_retries;
2001
2002 g_mutex_unlock (&source->uri_mutex);
2003
2004 GSTCURL_FUNCTION_EXIT (source);
2005 return TRUE;
2006 }
2007
2008 /*
2009 * Cancel any currently running transfer, and then signal all the loops to drop
2010 * any received buffers. The ::create() method should return GST_FLOW_FLUSHING.
2011 */
2012 static gboolean
gst_curl_http_src_unlock(GstBaseSrc * bsrc)2013 gst_curl_http_src_unlock (GstBaseSrc * bsrc)
2014 {
2015 GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
2016 gboolean want_removal = FALSE;
2017
2018 g_mutex_lock (&src->buffer_mutex);
2019 if (src->state != GSTCURL_UNLOCK) {
2020 if (src->state == GSTCURL_OK) {
2021 /* A transfer is running, cancel it */
2022 if (src->connection_status == GSTCURL_CONNECTED) {
2023 src->connection_status = GSTCURL_WANT_REMOVAL;
2024 }
2025 want_removal = TRUE;
2026 }
2027 src->pending_state = src->state;
2028 src->state = GSTCURL_UNLOCK;
2029 }
2030 g_cond_signal (&src->buffer_cond);
2031 g_mutex_unlock (&src->buffer_mutex);
2032
2033 if (want_removal) {
2034 GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
2035 GST_TYPE_CURL_HTTP_SRC,
2036 GstCurlHttpSrcClass);
2037 g_mutex_lock (&klass->multi_task_context.mutex);
2038 g_cond_signal (&klass->multi_task_context.signal);
2039 g_mutex_unlock (&klass->multi_task_context.mutex);
2040 }
2041
2042 return TRUE;
2043 }
2044
2045 /*
2046 * Finish the unlock request above and return curlhttpsrc to the normal state.
2047 * This will probably be GSTCURL_DONE, and the next return from ::create() will
2048 * be GST_FLOW_EOS as we don't want to deliver parts of a HTTP body.
2049 */
2050 static gboolean
gst_curl_http_src_unlock_stop(GstBaseSrc * bsrc)2051 gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc)
2052 {
2053 GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
2054
2055 g_mutex_lock (&src->buffer_mutex);
2056 src->state = src->pending_state;
2057 src->pending_state = GSTCURL_NONE;
2058 g_cond_signal (&src->buffer_cond);
2059 g_mutex_unlock (&src->buffer_mutex);
2060
2061 return TRUE;
2062 }
2063
2064 /*****************************************************************************
2065 * Curl loop task functions begin
2066 *****************************************************************************/
2067 static void
gst_curl_http_src_curl_multi_loop(gpointer thread_data)2068 gst_curl_http_src_curl_multi_loop (gpointer thread_data)
2069 {
2070 GstCurlHttpSrcMultiTaskContext *context;
2071 GstCurlHttpSrcQueueElement *qelement, *qnext;
2072 gint i, still_running = 0;
2073 CURLMsg *curl_message;
2074 GstCurlHttpSrc *elt;
2075 guint active = 0;
2076
2077 context = (GstCurlHttpSrcMultiTaskContext *) thread_data;
2078
2079 g_mutex_lock (&context->mutex);
2080
2081 /* Someone is holding a reference to us, but isn't using us so to avoid
2082 * unnecessary clock cycle wasting, sit in a conditional wait until woken.
2083 */
2084 while (context->queue == NULL
2085 && context->state == GSTCURL_MULTI_LOOP_STATE_RUNNING) {
2086 GSTCURL_DEBUG_PRINT ("Waiting for an element to be added...");
2087 g_cond_wait (&context->signal, &context->mutex);
2088 GSTCURL_DEBUG_PRINT ("Received wake up call!");
2089 }
2090 if (context->state == GSTCURL_MULTI_LOOP_STATE_STOP) {
2091 GSTCURL_INFO_PRINT ("Got instruction to shut down");
2092 goto out;
2093 }
2094
2095 /* check for elements that need to be started or removed */
2096 qelement = context->queue;
2097 while (qelement != NULL) {
2098 qnext = qelement->next;
2099 elt = qelement->p;
2100 /* NOTE: when both the buffer_mutex and multi_task_context.mutex are
2101 needed, multi_task_context.mutex must be acquired first */
2102 g_mutex_lock (&elt->buffer_mutex);
2103 if (elt->connection_status == GSTCURL_WANT_REMOVAL) {
2104 curl_multi_remove_handle (context->multi_handle, elt->curl_handle);
2105 if (elt->state == GSTCURL_UNLOCK) {
2106 elt->pending_state = GSTCURL_REMOVED;
2107 } else {
2108 elt->state = GSTCURL_REMOVED;
2109 }
2110 elt->connection_status = GSTCURL_NOT_CONNECTED;
2111 gst_curl_http_src_remove_queue_item (&context->queue, qelement->p);
2112 g_cond_signal (&elt->buffer_cond);
2113 } else if (elt->connection_status == GSTCURL_CONNECTED) {
2114 active++;
2115 if (g_atomic_int_compare_and_exchange (&qelement->running, 0, 1)) {
2116 GSTCURL_DEBUG_PRINT ("Adding easy handle for URI %s", qelement->p->uri);
2117 curl_multi_add_handle (context->multi_handle, qelement->p->curl_handle);
2118 }
2119 }
2120 g_mutex_unlock (&elt->buffer_mutex);
2121 qelement = qnext;
2122 }
2123
2124 if (active == 0) {
2125 GSTCURL_DEBUG_PRINT ("No active elements");
2126 goto out;
2127 }
2128
2129 /* perform a select() on all of the active sockets and process any
2130 messages from curl */
2131 {
2132 struct timeval timeout;
2133 gint rc;
2134 fd_set fdread, fdwrite, fdexcep;
2135 int maxfd = -1;
2136 long curl_timeo = -1;
2137 gboolean cond = FALSE;
2138
2139 /* Because curl can possibly take some time here, be nice and let go of the
2140 * mutex so other threads can perform state/queue operations as we don't
2141 * care about those until the end of this. */
2142 g_mutex_unlock (&context->mutex);
2143
2144 FD_ZERO (&fdread);
2145 FD_ZERO (&fdwrite);
2146 FD_ZERO (&fdexcep);
2147
2148 timeout.tv_sec = 1;
2149 timeout.tv_usec = 0;
2150
2151 curl_multi_timeout (context->multi_handle, &curl_timeo);
2152 if (curl_timeo >= 0) {
2153 timeout.tv_sec = curl_timeo / 1000;
2154 if (timeout.tv_sec > 1) {
2155 timeout.tv_sec = 1;
2156 } else {
2157 timeout.tv_usec = (curl_timeo % 1000) * 1000;
2158 }
2159 }
2160
2161 /* get file descriptors from the transfers */
2162 curl_multi_fdset (context->multi_handle, &fdread, &fdwrite, &fdexcep,
2163 &maxfd);
2164
2165 rc = select (maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);
2166
2167 switch (rc) {
2168 case -1:
2169 /* select error */
2170 break;
2171 case 0:
2172 #ifdef OHOS_EXT_FUNC
2173 /**
2174 * ohos.ext.func.0033
2175 * Support reconnection after disconnection in gstcurl.
2176 * When curl_timeo is -1, that means there's no timeout setted at all, the socket is abnormal.
2177 */
2178 if (curl_timeo == -1) {
2179 GST_INFO ("sockets connect timeout.");
2180 gst_curl_http_src_deal_sockets_timeout (context);
2181 return;
2182 }
2183 curl_multi_perform (context->multi_handle, &still_running);
2184 break;
2185 #endif
2186 default:
2187 /* timeout or readable/writable sockets */
2188 curl_multi_perform (context->multi_handle, &still_running);
2189 break;
2190 }
2191
2192 g_mutex_lock (&context->mutex);
2193
2194 /*
2195 * Check the CURL message buffer to find out if any transfers have
2196 * completed. If they have, call the signal_finished function which
2197 * will signal the g_cond_wait call in that calling instance.
2198 */
2199 i = 0;
2200 while (cond != TRUE) {
2201 curl_message = curl_multi_info_read (context->multi_handle, &i);
2202 if (curl_message == NULL) {
2203 cond = TRUE;
2204 } else if (curl_message->msg == CURLMSG_DONE) {
2205 /* A hack, but I have seen curl_message->easy_handle being
2206 * NULL randomly, so check for that. */
2207 if (curl_message->easy_handle != NULL) {
2208 curl_multi_remove_handle (context->multi_handle,
2209 curl_message->easy_handle);
2210 gst_curl_http_src_remove_queue_handle (&context->queue,
2211 curl_message->easy_handle, curl_message->data.result);
2212 }
2213 }
2214 }
2215 }
2216 out:
2217 g_mutex_unlock (&context->mutex);
2218 }
2219
2220 /*
2221 * Receive headers from the remote server and put them into the http_headers
2222 * structure to be sent downstream when we've got them all and started receiving
2223 * the body (see ::_handle_response())
2224 */
2225 static size_t
gst_curl_http_src_get_header(void * header,size_t size,size_t nmemb,void * src)2226 gst_curl_http_src_get_header (void *header, size_t size, size_t nmemb,
2227 void *src)
2228 {
2229 GstCurlHttpSrc *s = src;
2230 char *substr;
2231
2232 GST_DEBUG_OBJECT (s, "Received header: %s", (char *) header);
2233
2234 g_mutex_lock (&s->buffer_mutex);
2235
2236 if (s->state == GSTCURL_UNLOCK) {
2237 g_mutex_unlock (&s->buffer_mutex);
2238 return size * nmemb;
2239 }
2240
2241 if (s->http_headers == NULL) {
2242 /* Can't do anything here, so just silently swallow the header */
2243 GST_DEBUG_OBJECT (s, "HTTP Headers Structure has already been sent,"
2244 " ignoring header");
2245 g_mutex_unlock (&s->buffer_mutex);
2246 return size * nmemb;
2247 }
2248
2249 substr = gst_curl_http_src_strcasestr (header, "HTTP");
2250 if (substr == header) {
2251 /* We have a status line! */
2252 gchar **status_line_fields;
2253
2254 /* Have we already seen a status line? If so, delete any response headers */
2255 if (s->status_code > 0) {
2256 GstStructure *empty_headers =
2257 gst_structure_new_empty (RESPONSE_HEADERS_NAME);
2258 gst_structure_remove_field (s->http_headers, RESPONSE_HEADERS_NAME);
2259 gst_structure_set (s->http_headers, RESPONSE_HEADERS_NAME,
2260 GST_TYPE_STRUCTURE, empty_headers, NULL);
2261 gst_structure_free (empty_headers);
2262
2263 }
2264
2265 /* Process the status line */
2266 status_line_fields = g_strsplit ((gchar *) header, " ", 3);
2267 if (status_line_fields == NULL) {
2268 GST_ERROR_OBJECT (s, "Status line processing failed!");
2269 } else {
2270 s->status_code =
2271 (guint) g_ascii_strtoll (status_line_fields[1], NULL, 10);
2272 g_free (s->reason_phrase);
2273 s->reason_phrase = g_strdup (status_line_fields[2]);
2274 GST_INFO_OBJECT (s, "Received status %u for request for URI %s: %s",
2275 s->status_code, s->uri, s->reason_phrase);
2276 gst_structure_set (s->http_headers, HTTP_STATUS_CODE,
2277 G_TYPE_UINT, s->status_code, NULL);
2278 g_strfreev (status_line_fields);
2279 }
2280 } else {
2281 /* Normal header line */
2282 gchar **header_tpl = g_strsplit ((gchar *) header, ": ", 2);
2283 if (header_tpl == NULL) {
2284 GST_ERROR_OBJECT (s, "Header processing failed! (%s)", (gchar *) header);
2285 } else {
2286 const GValue *gv_resp_hdrs = gst_structure_get_value (s->http_headers,
2287 RESPONSE_HEADERS_NAME);
2288 const GstStructure *response_headers =
2289 gst_value_get_structure (gv_resp_hdrs);
2290 /* Store header key lower case (g_ascii_strdown), makes searching through
2291 * later on easier - end applications shouldn't care, as all HTTP headers
2292 * are case-insensitive */
2293 gchar *header_key = g_ascii_strdown (header_tpl[0], -1);
2294 gchar *header_value;
2295
2296 /* If header field already exists, append to the end */
2297 if (gst_structure_has_field (response_headers, header_key) == TRUE) {
2298 header_value = g_strdup_printf ("%s, %s",
2299 gst_structure_get_string (response_headers, header_key),
2300 header_tpl[1]);
2301 gst_structure_set ((GstStructure *) response_headers, header_key,
2302 G_TYPE_STRING, header_value, NULL);
2303 g_free (header_value);
2304 } else {
2305 header_value = header_tpl[1];
2306 gst_structure_set ((GstStructure *) response_headers, header_key,
2307 G_TYPE_STRING, header_value, NULL);
2308 }
2309
2310 /* We have some special cases - deal with them here */
2311 if (g_strcmp0 (header_key, "content-type") == 0) {
2312 gst_curl_http_src_negotiate_caps (src);
2313 } else if (g_strcmp0 (header_key, "accept-ranges") == 0 &&
2314 g_ascii_strcasecmp (header_value, "none") == 0) {
2315 s->seekable = GSTCURL_SEEKABLE_FALSE;
2316 } else if (g_strcmp0 (header_key, "content-range") == 0) {
2317 /* In the case of a Range GET, the Content-Length header will contain
2318 the size of range requested, and the Content-Range header will
2319 have the start, stop and total size of the resource */
2320 gchar *size = strchr (header_value, '/');
2321 if (size) {
2322 #ifdef OHOS_OPT_COMPAT
2323 /* ohos.opt.compat.0032 */
2324 s->content_size = atoi (size + 1);
2325 #else
2326 s->content_size = atoi (size);
2327 #endif
2328 }
2329 }
2330
2331 g_free (header_key);
2332 g_strfreev (header_tpl);
2333 }
2334 }
2335
2336 s->hdrs_updated = TRUE;
2337
2338 g_mutex_unlock (&s->buffer_mutex);
2339
2340 return size * nmemb;
2341 }
2342
2343 /*
2344 * My own quick and dirty implementation of strcasestr. This is a GNU extension
2345 * (i.e. not portable) and not always guaranteed to be available.
2346 *
2347 * I know this doesn't work if the haystack and needle are the same size. But
2348 * this isn't necessarily a bad thing, as the only place we currently use this
2349 * is at a point where returning nothing even if a string match occurs but the
2350 * needle is the same size as the haystack actually saves us time.
2351 */
2352 static char *
gst_curl_http_src_strcasestr(const char * haystack,const char * needle)2353 gst_curl_http_src_strcasestr (const char *haystack, const char *needle)
2354 {
2355 int i, j, needle_len;
2356 char *location;
2357
2358 needle_len = (int) strlen (needle);
2359 i = 0;
2360 j = 0;
2361 location = NULL;
2362
2363 while (haystack[i] != '\0') {
2364 if (j == needle_len) {
2365 location = (char *) haystack + (i - j);
2366 }
2367 if (tolower (haystack[i]) == tolower (needle[j])) {
2368 j++;
2369 } else {
2370 j = 0;
2371 }
2372 i++;
2373 }
2374
2375 return location;
2376 }
2377
2378 #ifdef OHOS_EXT_FUNC
2379 /*
2380 * ohos.ext.func.0025 support https seek:
2381 */
2382 static void
gst_curl_http_src_update_position(GstCurlHttpSrc * src,guint64 bytes_read)2383 gst_curl_http_src_update_position (GstCurlHttpSrc * src, guint64 bytes_read)
2384 {
2385 guint64 new_position;
2386 #ifdef OHOS_OPT_COMPAT
2387 /* ohos.opt.compat.0044 */
2388 if (src->read_position != (guint64)-1 && bytes_read > (G_MAXUINT64 - src->read_position)) {
2389 #else
2390 if (bytes_read > (G_MAXUINT64 - src->read_position)) {
2391 #endif
2392 GST_WARNING_OBJECT (src, "bytes_read:%" G_GUINT64_FORMAT " abnormal, should check, read pos:%" G_GUINT64_FORMAT,
2393 bytes_read, src->read_position);
2394 return;
2395 }
2396
2397 #ifdef OHOS_OPT_COMPAT
2398 /* ohos.opt.compat.0044 */
2399 new_position = (src->read_position == (guint64)-1) ? bytes_read : (src->read_position + bytes_read);
2400 #else
2401 new_position = src->read_position + bytes_read;
2402 #endif
2403 if (G_LIKELY(src->request_position == src->read_position)) {
2404 src->request_position = new_position;
2405 }
2406 src->read_position = new_position;
2407
2408 GST_DEBUG_OBJECT (src, "bytes_read:%" G_GUINT64_FORMAT ", req:%" G_GUINT64_FORMAT ", read:%" G_GUINT64_FORMAT,
2409 bytes_read, src->request_position, src->read_position);
2410 }
2411 #endif
2412
2413 /*
2414 * Receive chunks of the requested body and pass these back to the ::create()
2415 * loop
2416 */
2417 static size_t
2418 gst_curl_http_src_get_chunks (void *chunk, size_t size, size_t nmemb, void *src)
2419 {
2420 GstCurlHttpSrc *s = src;
2421 size_t chunk_len = size * nmemb;
2422 GST_TRACE_OBJECT (s,
2423 "Received curl chunk for URI %s of size %d", s->uri, (int) chunk_len);
2424 g_mutex_lock (&s->buffer_mutex);
2425
2426 #ifdef OHOS_EXT_FUNC
2427 /* ohos.ext.func.0025 support https seek: */
2428 gst_curl_http_src_update_position(s, (guint64)chunk_len);
2429 #endif
2430
2431 if (s->state == GSTCURL_UNLOCK) {
2432 g_mutex_unlock (&s->buffer_mutex);
2433 return chunk_len;
2434 }
2435 s->buffer =
2436 g_realloc (s->buffer, (s->buffer_len + chunk_len + 1) * sizeof (char));
2437 if (s->buffer == NULL) {
2438 GST_ERROR_OBJECT (s, "Realloc for cURL response message failed!");
2439 return 0;
2440 }
2441 memcpy (s->buffer + s->buffer_len, chunk, chunk_len);
2442 s->buffer_len += chunk_len;
2443 g_cond_signal (&s->buffer_cond);
2444 g_mutex_unlock (&s->buffer_mutex);
2445 return chunk_len;
2446 }
2447
2448 /*
2449 * Request a cancellation of a currently running curl handle.
2450 */
2451 static void
2452 gst_curl_http_src_request_remove (GstCurlHttpSrc * src)
2453 {
2454 GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
2455 GST_TYPE_CURL_HTTP_SRC,
2456 GstCurlHttpSrcClass);
2457
2458 g_mutex_lock (&klass->multi_task_context.mutex);
2459 g_mutex_lock (&src->buffer_mutex);
2460 if (src->connection_status == GSTCURL_CONNECTED) {
2461 src->connection_status = GSTCURL_WANT_REMOVAL;
2462 }
2463 g_mutex_unlock (&src->buffer_mutex);
2464 g_cond_signal (&klass->multi_task_context.signal);
2465 g_mutex_unlock (&klass->multi_task_context.mutex);
2466 }
2467
2468 /*
2469 * Request a cancellation of a currently running curl handle and
2470 * block this thread until the src element has been removed
2471 * from the queue
2472 */
2473 static void
2474 gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src)
2475 {
2476 gst_curl_http_src_request_remove (src);
2477 g_mutex_lock (&src->buffer_mutex);
2478 while (src->connection_status != GSTCURL_NOT_CONNECTED) {
2479 g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
2480 }
2481 g_mutex_unlock (&src->buffer_mutex);
2482 }
2483
2484 #ifndef GST_DISABLE_GST_DEBUG
2485 /*
2486 * This callback receives debug information, as specified in the type argument.
2487 * This function must return 0.
2488 */
2489 static int
2490 gst_curl_http_src_get_debug (CURL * handle, curl_infotype type, char *data,
2491 size_t size, void *clientp)
2492 {
2493 GstCurlHttpSrc *src = (GstCurlHttpSrc *) clientp;
2494 gchar *msg = NULL;
2495
2496 switch (type) {
2497 case CURLINFO_TEXT:
2498 case CURLINFO_HEADER_OUT:
2499 msg = g_memdup2 (data, size);
2500 if (size > 0) {
2501 msg[size - 1] = '\0';
2502 g_strchomp (msg);
2503 }
2504 break;
2505 default:
2506 break;
2507 }
2508
2509 switch (type) {
2510 case CURLINFO_TEXT:
2511 GST_DEBUG_OBJECT (src, "%s", msg);
2512 break;
2513 case CURLINFO_HEADER_OUT:
2514 GST_DEBUG_OBJECT (src, "outgoing header: %s", msg);
2515 break;
2516 case CURLINFO_DATA_IN:
2517 GST_MEMDUMP_OBJECT (src, "incoming data", (guint8 *) data, size);
2518 break;
2519 case CURLINFO_DATA_OUT:
2520 GST_MEMDUMP_OBJECT (src, "outgoing data", (guint8 *) data, size);
2521 break;
2522 case CURLINFO_SSL_DATA_IN:
2523 GST_MEMDUMP_OBJECT (src, "incoming ssl data", (guint8 *) data, size);
2524 break;
2525 case CURLINFO_SSL_DATA_OUT:
2526 GST_MEMDUMP_OBJECT (src, "outgoing ssl data", (guint8 *) data, size);
2527 break;
2528 default:
2529 GST_DEBUG_OBJECT (src, "unknown debug info type %d", type);
2530 GST_MEMDUMP_OBJECT (src, "unknown data", (guint8 *) data, size);
2531 break;
2532 }
2533 g_free (msg);
2534 return 0;
2535 }
2536 #endif
2537
2538 #ifdef OHOS_EXT_FUNC
2539 /**
2540 * ohos.ext.func.0033
2541 * Support reconnection after disconnection in gstcurl.
2542 */
2543 static void gst_curl_http_src_deal_sockets_timeout (GstCurlHttpSrcMultiTaskContext *context)
2544 {
2545 GstCurlHttpSrcQueueElement *qnext = NULL;
2546 GstCurlHttpSrcQueueElement *qelement = context->queue;
2547 GstCurlHttpSrc *elt = NULL;
2548 while (qelement != NULL) {
2549 qnext = qelement->next;
2550 elt = qelement->p;
2551 g_mutex_lock (&elt->buffer_mutex);
2552 elt->curl_result = CURLE_COULDNT_CONNECT;
2553 curl_multi_remove_handle (context->multi_handle, elt->curl_handle);
2554 elt->connection_status = GSTCURL_NOT_CONNECTED;
2555 elt->data_received = FALSE;
2556 gst_curl_http_src_remove_queue_item (&context->queue, qelement->p);
2557 g_cond_signal (&elt->buffer_cond);
2558 g_mutex_unlock (&elt->buffer_mutex);
2559 qelement = qnext;
2560 }
2561 }
2562
2563 static gboolean gst_curl_http_src_reconnect_is_timeout (GstCurlHttpSrc *src)
2564 {
2565 if (src->player_state == GST_PLAYER_STATUS_PAUSED || src->player_state == GST_PLAYER_STATUS_PLAYING ||
2566 src->player_state == GST_PLAYER_STATUS_READY) {
2567 GST_INFO_OBJECT (src, "player_state is paused or playing or ready, donot wait for reconnection");
2568 return FALSE;
2569 }
2570
2571 if (src->start_usecs == 0) {
2572 src->start_usecs = g_get_monotonic_time ();
2573 GST_INFO_OBJECT (src, "Wait for network reconnecting, timeout:0us");
2574 } else {
2575 src->end_usecs = g_get_monotonic_time ();
2576 gint64 time_diff_us = src->end_usecs - src->start_usecs;
2577 if (time_diff_us > (gint64)src->reconnection_timeout) {
2578 GST_INFO_OBJECT (src, "Network has broken too long, exit! end_usecs:%"G_GINT64_FORMAT" us, "
2579 "start_usecs:%"G_GINT64_FORMAT" us, reconnection_timeout:%u us",
2580 src->end_usecs, src->start_usecs, src->reconnection_timeout);
2581 return TRUE;
2582 }
2583 GST_INFO_OBJECT (src, "Wait for network reconnecting, timeout:%"G_GINT64_FORMAT"us", time_diff_us);
2584 }
2585 return FALSE;
2586 }
2587 #endif
2588