1 /* GStreamer
2 * Copyright (C) 2011 David Schleef <ds@entropywave.com>
3 * Copyright (C) 2021 Igalia S.L.
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
14 *
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
18 * Boston, MA 02110-1335, USA.
19 */
20 /**
21 * SECTION:element-gstsouphttpclientsink
22 * @title: gstsouphttpclientsink
23 *
24 * The souphttpclientsink element sends pipeline data to an HTTP server
25 * using HTTP PUT commands.
26 *
27 * ## Example launch line
28 * |[
29 * gst-launch-1.0 -v videotestsrc num-buffers=300 ! theoraenc ! oggmux !
30 * souphttpclientsink location=http://server/filename.ogv
31 * ]|
32 *
33 * This example encodes 10 seconds of video and sends it to the HTTP
34 * server "server" using HTTP PUT commands.
35 *
36 */
37
38 #ifdef HAVE_CONFIG_H
39 #include "config.h"
40 #endif
41
42 #include <gst/gst.h>
43 #include <gst/base/gstbasesink.h>
44 #include <gio/gio.h>
45
46 #include "gstsoupelements.h"
47 #include "gstsouphttpclientsink.h"
48 #include "gstsouputils.h"
49
50 GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg);
51 #define GST_CAT_DEFAULT souphttpclientsink_dbg
52
53 /* prototypes */
54
55
56 static void gst_soup_http_client_sink_set_property (GObject * object,
57 guint property_id, const GValue * value, GParamSpec * pspec);
58 static void gst_soup_http_client_sink_get_property (GObject * object,
59 guint property_id, GValue * value, GParamSpec * pspec);
60 static void gst_soup_http_client_sink_dispose (GObject * object);
61 static void gst_soup_http_client_sink_finalize (GObject * object);
62
63 static gboolean gst_soup_http_client_sink_set_caps (GstBaseSink * sink,
64 GstCaps * caps);
65 static gboolean gst_soup_http_client_sink_start (GstBaseSink * sink);
66 static gboolean gst_soup_http_client_sink_stop (GstBaseSink * sink);
67 static gboolean gst_soup_http_client_sink_unlock (GstBaseSink * sink);
68 static GstFlowReturn gst_soup_http_client_sink_render (GstBaseSink * sink,
69 GstBuffer * buffer);
70 static void gst_soup_http_client_sink_reset (GstSoupHttpClientSink *
71 souphttpsink);
72
73 static gboolean authenticate (SoupMessage * msg, SoupAuth * auth,
74 gboolean retrying, gpointer user_data);
75 static void restarted (SoupMessage * msg, GBytes * body);
76 static gboolean send_handle_status (SoupMessage * msg, GError * error,
77 GstSoupHttpClientSink * sink);
78
79 static gboolean
80 gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink,
81 const gchar * uri);
82
83 enum
84 {
85 PROP_0,
86 PROP_LOCATION,
87 PROP_USER_AGENT,
88 PROP_AUTOMATIC_REDIRECT,
89 PROP_PROXY,
90 PROP_USER_ID,
91 PROP_USER_PW,
92 PROP_PROXY_ID,
93 PROP_PROXY_PW,
94 PROP_COOKIES,
95 PROP_SESSION,
96 PROP_SOUP_LOG_LEVEL,
97 PROP_RETRY_DELAY,
98 PROP_RETRIES
99 };
100
101 #define DEFAULT_USER_AGENT "GStreamer souphttpclientsink "
102 #define DEFAULT_SOUP_LOG_LEVEL SOUP_LOGGER_LOG_NONE
103
104 /* pad templates */
105
106 static GstStaticPadTemplate gst_soup_http_client_sink_sink_template =
107 GST_STATIC_PAD_TEMPLATE ("sink",
108 GST_PAD_SINK,
109 GST_PAD_ALWAYS,
110 GST_STATIC_CAPS_ANY);
111
112
113 /* class initialization */
114
115 #define gst_soup_http_client_sink_parent_class parent_class
116 G_DEFINE_TYPE (GstSoupHttpClientSink, gst_soup_http_client_sink,
117 GST_TYPE_BASE_SINK);
118
119 static gboolean souphttpclientsink_element_init (GstPlugin * plugin);
120 GST_ELEMENT_REGISTER_DEFINE_CUSTOM (souphttpclientsink,
121 souphttpclientsink_element_init);
122
123
124 static void
gst_soup_http_client_sink_class_init(GstSoupHttpClientSinkClass * klass)125 gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass)
126 {
127 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
128 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
129 GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
130
131 gobject_class->set_property = gst_soup_http_client_sink_set_property;
132 gobject_class->get_property = gst_soup_http_client_sink_get_property;
133 gobject_class->dispose = gst_soup_http_client_sink_dispose;
134 gobject_class->finalize = gst_soup_http_client_sink_finalize;
135
136 g_object_class_install_property (gobject_class,
137 PROP_LOCATION,
138 g_param_spec_string ("location", "Location",
139 "URI to send to", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
140 g_object_class_install_property (gobject_class,
141 PROP_USER_AGENT,
142 g_param_spec_string ("user-agent", "User-Agent",
143 "Value of the User-Agent HTTP request header field",
144 DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
145 g_object_class_install_property (gobject_class,
146 PROP_AUTOMATIC_REDIRECT,
147 g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
148 "Automatically follow HTTP redirects (HTTP Status Code 3xx)",
149 TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
150 g_object_class_install_property (gobject_class,
151 PROP_PROXY,
152 g_param_spec_string ("proxy", "Proxy",
153 "HTTP proxy server URI", "",
154 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
155 g_object_class_install_property (gobject_class,
156 PROP_USER_ID,
157 g_param_spec_string ("user-id", "user-id",
158 "user id for authentication", "",
159 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
160 g_object_class_install_property (gobject_class, PROP_USER_PW,
161 g_param_spec_string ("user-pw", "user-pw",
162 "user password for authentication", "",
163 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
164 g_object_class_install_property (gobject_class, PROP_PROXY_ID,
165 g_param_spec_string ("proxy-id", "proxy-id",
166 "user id for proxy authentication", "",
167 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
168 g_object_class_install_property (gobject_class, PROP_PROXY_PW,
169 g_param_spec_string ("proxy-pw", "proxy-pw",
170 "user password for proxy authentication", "",
171 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172 g_object_class_install_property (gobject_class, PROP_SESSION,
173 g_param_spec_object ("session", "session",
174 "SoupSession object to use for communication",
175 _soup_session_get_type (),
176 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
177 g_object_class_install_property (gobject_class, PROP_COOKIES,
178 g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
179 G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
180 g_object_class_install_property (gobject_class, PROP_RETRY_DELAY,
181 g_param_spec_int ("retry-delay", "Retry Delay",
182 "Delay in seconds between retries after a failure", 1, G_MAXINT, 5,
183 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
184 g_object_class_install_property (gobject_class, PROP_RETRIES,
185 g_param_spec_int ("retries", "Retries",
186 "Maximum number of retries, zero to disable, -1 to retry forever",
187 -1, G_MAXINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
188 /**
189 * GstSoupHttpClientSink::http-log-level:
190 *
191 * If set and > 0, captures and dumps HTTP session data as
192 * log messages if log level >= GST_LEVEL_TRACE
193 *
194 * Since: 1.4
195 */
196 g_object_class_install_property (gobject_class, PROP_SOUP_LOG_LEVEL,
197 g_param_spec_enum ("http-log-level", "HTTP log level",
198 "Set log level for soup's HTTP session log",
199 _soup_logger_log_level_get_type (),
200 DEFAULT_SOUP_LOG_LEVEL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
201
202 gst_element_class_add_static_pad_template (gstelement_class,
203 &gst_soup_http_client_sink_sink_template);
204
205 gst_element_class_set_static_metadata (gstelement_class, "HTTP client sink",
206 "Generic", "Sends streams to HTTP server via PUT",
207 "David Schleef <ds@entropywave.com>");
208
209 base_sink_class->set_caps =
210 GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_set_caps);
211 base_sink_class->start = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_start);
212 base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_stop);
213 base_sink_class->unlock =
214 GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_unlock);
215 base_sink_class->render =
216 GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_render);
217 }
218
219 static void
gst_soup_http_client_sink_init(GstSoupHttpClientSink * souphttpsink)220 gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink)
221 {
222 const char *proxy;
223
224 g_mutex_init (&souphttpsink->mutex);
225 g_cond_init (&souphttpsink->cond);
226
227 souphttpsink->location = NULL;
228 souphttpsink->automatic_redirect = TRUE;
229 souphttpsink->user_agent = g_strdup (DEFAULT_USER_AGENT);
230 souphttpsink->user_id = NULL;
231 souphttpsink->user_pw = NULL;
232 souphttpsink->proxy_id = NULL;
233 souphttpsink->proxy_pw = NULL;
234 souphttpsink->prop_session = NULL;
235 souphttpsink->timeout = 1;
236 souphttpsink->log_level = DEFAULT_SOUP_LOG_LEVEL;
237 souphttpsink->retry_delay = 5;
238 souphttpsink->retries = 0;
239 souphttpsink->sent_buffers = NULL;
240 proxy = g_getenv ("http_proxy");
241 if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
242 GST_WARNING_OBJECT (souphttpsink,
243 "The proxy in the http_proxy env var (\"%s\") cannot be parsed.",
244 proxy);
245 }
246
247 gst_soup_http_client_sink_reset (souphttpsink);
248 }
249
250 static void
gst_soup_http_client_sink_reset(GstSoupHttpClientSink * souphttpsink)251 gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
252 {
253 g_list_free_full (souphttpsink->queued_buffers,
254 (GDestroyNotify) gst_buffer_unref);
255 souphttpsink->queued_buffers = NULL;
256 g_free (souphttpsink->reason_phrase);
257 souphttpsink->reason_phrase = NULL;
258 souphttpsink->status_code = 0;
259 souphttpsink->offset = 0;
260 souphttpsink->failures = 0;
261
262 g_list_free_full (souphttpsink->streamheader_buffers,
263 (GDestroyNotify) gst_buffer_unref);
264 souphttpsink->streamheader_buffers = NULL;
265 g_list_free_full (souphttpsink->sent_buffers,
266 (GDestroyNotify) gst_buffer_unref);
267 souphttpsink->sent_buffers = NULL;
268 }
269
270 static gboolean
gst_soup_http_client_sink_set_proxy(GstSoupHttpClientSink * souphttpsink,const gchar * uri)271 gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink,
272 const gchar * uri)
273 {
274 if (souphttpsink->proxy) {
275 gst_soup_uri_free (souphttpsink->proxy);
276 souphttpsink->proxy = NULL;
277 }
278 if (g_str_has_prefix (uri, "http://")) {
279 souphttpsink->proxy = gst_soup_uri_new (uri);
280 } else {
281 gchar *new_uri = g_strconcat ("http://", uri, NULL);
282
283 souphttpsink->proxy = gst_soup_uri_new (new_uri);
284 g_free (new_uri);
285 }
286
287 return TRUE;
288 }
289
290 void
gst_soup_http_client_sink_set_property(GObject * object,guint property_id,const GValue * value,GParamSpec * pspec)291 gst_soup_http_client_sink_set_property (GObject * object, guint property_id,
292 const GValue * value, GParamSpec * pspec)
293 {
294 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
295
296 g_mutex_lock (&souphttpsink->mutex);
297 switch (property_id) {
298 case PROP_SESSION:
299 if (souphttpsink->prop_session) {
300 g_object_unref (souphttpsink->prop_session);
301 }
302 souphttpsink->prop_session = g_value_dup_object (value);
303 break;
304 case PROP_LOCATION:
305 g_free (souphttpsink->location);
306 souphttpsink->location = g_value_dup_string (value);
307 souphttpsink->offset = 0;
308 if ((souphttpsink->location == NULL)
309 || !gst_uri_is_valid (souphttpsink->location)) {
310 GST_WARNING_OBJECT (souphttpsink,
311 "The location (\"%s\") set, is not a valid uri.",
312 souphttpsink->location);
313 g_free (souphttpsink->location);
314 souphttpsink->location = NULL;
315 }
316 break;
317 case PROP_USER_AGENT:
318 g_free (souphttpsink->user_agent);
319 souphttpsink->user_agent = g_value_dup_string (value);
320 break;
321 case PROP_AUTOMATIC_REDIRECT:
322 souphttpsink->automatic_redirect = g_value_get_boolean (value);
323 break;
324 case PROP_USER_ID:
325 g_free (souphttpsink->user_id);
326 souphttpsink->user_id = g_value_dup_string (value);
327 break;
328 case PROP_USER_PW:
329 g_free (souphttpsink->user_pw);
330 souphttpsink->user_pw = g_value_dup_string (value);
331 break;
332 case PROP_PROXY_ID:
333 g_free (souphttpsink->proxy_id);
334 souphttpsink->proxy_id = g_value_dup_string (value);
335 break;
336 case PROP_PROXY_PW:
337 g_free (souphttpsink->proxy_pw);
338 souphttpsink->proxy_pw = g_value_dup_string (value);
339 break;
340 case PROP_PROXY:
341 {
342 const gchar *proxy;
343
344 proxy = g_value_get_string (value);
345
346 if (proxy == NULL) {
347 GST_WARNING ("proxy property cannot be NULL");
348 goto done;
349 }
350 if (!gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
351 GST_WARNING ("badly formatted proxy URI");
352 goto done;
353 }
354 break;
355 }
356 case PROP_COOKIES:
357 g_strfreev (souphttpsink->cookies);
358 souphttpsink->cookies = g_strdupv (g_value_get_boxed (value));
359 break;
360 case PROP_SOUP_LOG_LEVEL:
361 souphttpsink->log_level = g_value_get_enum (value);
362 break;
363 case PROP_RETRY_DELAY:
364 souphttpsink->retry_delay = g_value_get_int (value);
365 break;
366 case PROP_RETRIES:
367 souphttpsink->retries = g_value_get_int (value);
368 break;
369 default:
370 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
371 break;
372 }
373 done:
374 g_mutex_unlock (&souphttpsink->mutex);
375 }
376
377 void
gst_soup_http_client_sink_get_property(GObject * object,guint property_id,GValue * value,GParamSpec * pspec)378 gst_soup_http_client_sink_get_property (GObject * object, guint property_id,
379 GValue * value, GParamSpec * pspec)
380 {
381 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
382
383 switch (property_id) {
384 case PROP_SESSION:
385 g_value_set_object (value, souphttpsink->prop_session);
386 break;
387 case PROP_LOCATION:
388 g_value_set_string (value, souphttpsink->location);
389 break;
390 case PROP_AUTOMATIC_REDIRECT:
391 g_value_set_boolean (value, souphttpsink->automatic_redirect);
392 break;
393 case PROP_USER_AGENT:
394 g_value_set_string (value, souphttpsink->user_agent);
395 break;
396 case PROP_USER_ID:
397 g_value_set_string (value, souphttpsink->user_id);
398 break;
399 case PROP_USER_PW:
400 g_value_set_string (value, souphttpsink->user_pw);
401 break;
402 case PROP_PROXY_ID:
403 g_value_set_string (value, souphttpsink->proxy_id);
404 break;
405 case PROP_PROXY_PW:
406 g_value_set_string (value, souphttpsink->proxy_pw);
407 break;
408 case PROP_PROXY:
409 if (souphttpsink->proxy == NULL)
410 g_value_set_static_string (value, "");
411 else {
412 char *proxy = gst_soup_uri_to_string (souphttpsink->proxy);
413
414 g_value_set_string (value, proxy);
415 g_free (proxy);
416 }
417 break;
418 case PROP_COOKIES:
419 g_value_set_boxed (value, g_strdupv (souphttpsink->cookies));
420 break;
421 case PROP_SOUP_LOG_LEVEL:
422 g_value_set_enum (value, souphttpsink->log_level);
423 break;
424 case PROP_RETRY_DELAY:
425 g_value_set_int (value, souphttpsink->retry_delay);
426 break;
427 case PROP_RETRIES:
428 g_value_set_int (value, souphttpsink->retries);
429 break;
430 default:
431 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
432 break;
433 }
434 }
435
436 void
gst_soup_http_client_sink_dispose(GObject * object)437 gst_soup_http_client_sink_dispose (GObject * object)
438 {
439 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
440
441 /* clean up as possible. may be called multiple times */
442 if (souphttpsink->prop_session)
443 g_object_unref (souphttpsink->prop_session);
444 souphttpsink->prop_session = NULL;
445
446 G_OBJECT_CLASS (parent_class)->dispose (object);
447 }
448
449 void
gst_soup_http_client_sink_finalize(GObject * object)450 gst_soup_http_client_sink_finalize (GObject * object)
451 {
452 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
453
454 /* clean up object here */
455
456 g_free (souphttpsink->user_agent);
457 g_free (souphttpsink->user_id);
458 g_free (souphttpsink->user_pw);
459 g_free (souphttpsink->proxy_id);
460 g_free (souphttpsink->proxy_pw);
461 if (souphttpsink->proxy)
462 gst_soup_uri_free (souphttpsink->proxy);
463 g_free (souphttpsink->location);
464 g_strfreev (souphttpsink->cookies);
465
466 g_cond_clear (&souphttpsink->cond);
467 g_mutex_clear (&souphttpsink->mutex);
468
469 G_OBJECT_CLASS (parent_class)->finalize (object);
470 }
471
472 static gboolean
gst_soup_http_client_sink_set_caps(GstBaseSink * sink,GstCaps * caps)473 gst_soup_http_client_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
474 {
475 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
476 GstStructure *structure;
477 const GValue *value_array;
478 int i, n;
479
480 GST_DEBUG_OBJECT (souphttpsink, "new stream headers set");
481 structure = gst_caps_get_structure (caps, 0);
482 value_array = gst_structure_get_value (structure, "streamheader");
483 if (value_array) {
484 g_list_free_full (souphttpsink->streamheader_buffers,
485 (GDestroyNotify) gst_buffer_unref);
486 souphttpsink->streamheader_buffers = NULL;
487
488 n = gst_value_array_get_size (value_array);
489 for (i = 0; i < n; i++) {
490 const GValue *value;
491 GstBuffer *buffer;
492 value = gst_value_array_get_value (value_array, i);
493 buffer = GST_BUFFER (gst_value_get_buffer (value));
494 souphttpsink->streamheader_buffers =
495 g_list_append (souphttpsink->streamheader_buffers,
496 gst_buffer_ref (buffer));
497 }
498 }
499
500 return TRUE;
501 }
502
503 static gboolean
thread_ready_idle_cb(gpointer data)504 thread_ready_idle_cb (gpointer data)
505 {
506 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (data);
507
508 GST_LOG_OBJECT (souphttpsink, "thread ready");
509
510 g_mutex_lock (&souphttpsink->mutex);
511 g_cond_signal (&souphttpsink->cond);
512 g_mutex_unlock (&souphttpsink->mutex);
513
514 return FALSE; /* only run once */
515 }
516
517 static gpointer
thread_func(gpointer ptr)518 thread_func (gpointer ptr)
519 {
520 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (ptr);
521 GProxyResolver *proxy_resolver;
522 GMainContext *context;
523
524 GST_DEBUG ("thread start");
525
526 context = souphttpsink->context;
527 g_main_context_push_thread_default (context);
528
529 if (souphttpsink->proxy != NULL) {
530 char *proxy_string = gst_soup_uri_to_string (souphttpsink->proxy);
531 proxy_resolver = g_simple_proxy_resolver_new (proxy_string, NULL);
532 g_free (proxy_string);
533 } else
534 proxy_resolver = g_object_ref (g_proxy_resolver_get_default ());
535
536 souphttpsink->session =
537 _soup_session_new_with_options ("user-agent", souphttpsink->user_agent,
538 "timeout", souphttpsink->timeout, "proxy-resolver", proxy_resolver, NULL);
539
540 g_object_unref (proxy_resolver);
541
542 if (gst_soup_loader_get_api_version () < 3) {
543 g_signal_connect (souphttpsink->session, "authenticate",
544 G_CALLBACK (authenticate), souphttpsink);
545 }
546
547 GST_DEBUG ("created session");
548
549 g_main_loop_run (souphttpsink->loop);
550
551 g_main_context_pop_thread_default (context);
552
553 GST_DEBUG ("thread quit");
554
555 return NULL;
556 }
557
558 static gboolean
gst_soup_http_client_sink_start(GstBaseSink * sink)559 gst_soup_http_client_sink_start (GstBaseSink * sink)
560 {
561 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
562
563 if (souphttpsink->prop_session) {
564 souphttpsink->session = souphttpsink->prop_session;
565 } else {
566 GSource *source;
567 GError *error = NULL;
568
569 souphttpsink->context = g_main_context_new ();
570
571 /* set up idle source to signal when the main loop is running and
572 * it's safe for ::stop() to call g_main_loop_quit() */
573 source = g_idle_source_new ();
574 g_source_set_callback (source, thread_ready_idle_cb, sink, NULL);
575 g_source_attach (source, souphttpsink->context);
576 g_source_unref (source);
577
578 souphttpsink->loop = g_main_loop_new (souphttpsink->context, FALSE);
579
580 g_mutex_lock (&souphttpsink->mutex);
581
582 souphttpsink->thread = g_thread_try_new ("souphttpclientsink-thread",
583 thread_func, souphttpsink, &error);
584
585 if (error != NULL) {
586 GST_DEBUG_OBJECT (souphttpsink, "failed to start thread, %s",
587 error->message);
588 g_error_free (error);
589 g_mutex_unlock (&souphttpsink->mutex);
590 return FALSE;
591 }
592
593 GST_LOG_OBJECT (souphttpsink, "waiting for main loop thread to start up");
594 while (!g_main_loop_is_running (souphttpsink->loop))
595 g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex);
596 g_mutex_unlock (&souphttpsink->mutex);
597 GST_LOG_OBJECT (souphttpsink, "main loop thread running");
598 }
599
600 /* Set up logging */
601 gst_soup_util_log_setup (souphttpsink->session, souphttpsink->log_level,
602 G_OBJECT (souphttpsink));
603
604 return TRUE;
605 }
606
607 static gboolean
gst_soup_http_client_sink_stop(GstBaseSink * sink)608 gst_soup_http_client_sink_stop (GstBaseSink * sink)
609 {
610 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
611
612 GST_DEBUG ("stop");
613
614 if (souphttpsink->prop_session == NULL) {
615 _soup_session_abort (souphttpsink->session);
616 g_object_unref (souphttpsink->session);
617 }
618
619 g_mutex_lock (&souphttpsink->mutex);
620 if (souphttpsink->timer) {
621 g_source_destroy (souphttpsink->timer);
622 g_source_unref (souphttpsink->timer);
623 souphttpsink->timer = NULL;
624 }
625 g_mutex_unlock (&souphttpsink->mutex);
626
627 if (souphttpsink->loop) {
628 g_main_loop_quit (souphttpsink->loop);
629 g_mutex_lock (&souphttpsink->mutex);
630 g_cond_signal (&souphttpsink->cond);
631 g_mutex_unlock (&souphttpsink->mutex);
632 g_thread_join (souphttpsink->thread);
633 g_main_loop_unref (souphttpsink->loop);
634 souphttpsink->loop = NULL;
635 }
636 if (souphttpsink->context) {
637 g_main_context_unref (souphttpsink->context);
638 souphttpsink->context = NULL;
639 }
640
641 gst_soup_http_client_sink_reset (souphttpsink);
642
643 return TRUE;
644 }
645
646 static gboolean
gst_soup_http_client_sink_unlock(GstBaseSink * sink)647 gst_soup_http_client_sink_unlock (GstBaseSink * sink)
648 {
649 GST_DEBUG ("unlock");
650
651 return TRUE;
652 }
653
654 static void
send_message_locked(GstSoupHttpClientSink * souphttpsink)655 send_message_locked (GstSoupHttpClientSink * souphttpsink)
656 {
657 GList *g;
658 guint64 n;
659 GByteArray *array;
660 GInputStream *in_stream;
661
662 if (souphttpsink->queued_buffers == NULL || souphttpsink->message) {
663 return;
664 }
665
666 /* If the URI went away, drop all these buffers */
667 if (souphttpsink->location == NULL) {
668 GST_DEBUG_OBJECT (souphttpsink, "URI went away, dropping queued buffers");
669 g_list_free_full (souphttpsink->queued_buffers,
670 (GDestroyNotify) gst_buffer_unref);
671 souphttpsink->queued_buffers = NULL;
672 return;
673 }
674
675 souphttpsink->message = _soup_message_new ("PUT", souphttpsink->location);
676 if (souphttpsink->message == NULL) {
677 GST_WARNING_OBJECT (souphttpsink,
678 "URI could not be parsed while creating message.");
679 g_list_free_full (souphttpsink->queued_buffers,
680 (GDestroyNotify) gst_buffer_unref);
681 souphttpsink->queued_buffers = NULL;
682 return;
683 }
684
685 g_signal_connect (souphttpsink->message, "restarted", G_CALLBACK (restarted),
686 souphttpsink->request_body);
687
688 _soup_message_set_flags (souphttpsink->message,
689 (souphttpsink->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT));
690
691 if (souphttpsink->cookies) {
692 gchar **cookie;
693
694 for (cookie = souphttpsink->cookies; *cookie != NULL; cookie++) {
695 _soup_message_headers_append (_soup_message_get_request_headers
696 (souphttpsink->message), "Cookie", *cookie);
697 }
698 }
699 array = g_byte_array_new ();
700 n = 0;
701 if (souphttpsink->offset == 0) {
702 for (g = souphttpsink->streamheader_buffers; g; g = g_list_next (g)) {
703 GstBuffer *buffer = g->data;
704 GstMapInfo map;
705
706 GST_DEBUG_OBJECT (souphttpsink, "queueing stream headers");
707 gst_buffer_map (buffer, &map, GST_MAP_READ);
708 g_byte_array_append (array, map.data, map.size);
709 n += map.size;
710 gst_buffer_unmap (buffer, &map);
711 }
712 }
713
714 for (g = souphttpsink->queued_buffers; g; g = g_list_next (g)) {
715 GstBuffer *buffer = g->data;
716 if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) {
717 GstMapInfo map;
718
719 gst_buffer_map (buffer, &map, GST_MAP_READ);
720 g_byte_array_append (array, map.data, map.size);
721 n += map.size;
722 gst_buffer_unmap (buffer, &map);
723 }
724 }
725
726 {
727 souphttpsink->request_body = g_byte_array_free_to_bytes (array);
728 _soup_message_set_request_body_from_bytes (souphttpsink->message,
729 NULL, souphttpsink->request_body);
730 }
731
732 if (souphttpsink->offset != 0) {
733 char *s;
734 s = g_strdup_printf ("bytes %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "/*",
735 souphttpsink->offset, souphttpsink->offset + n - 1);
736 _soup_message_headers_append (_soup_message_get_request_headers
737 (souphttpsink->message), "Content-Range", s);
738 g_free (s);
739 }
740
741 if (n == 0) {
742 GST_DEBUG_OBJECT (souphttpsink,
743 "total size of buffers queued is 0, freeing everything");
744 g_list_free_full (souphttpsink->queued_buffers,
745 (GDestroyNotify) gst_buffer_unref);
746 souphttpsink->queued_buffers = NULL;
747 g_clear_object (&souphttpsink->message);
748 g_clear_pointer (&souphttpsink->request_body, g_bytes_unref);
749 return;
750 }
751
752 in_stream =
753 _soup_session_send (souphttpsink->session, souphttpsink->message, NULL,
754 NULL);
755 if (in_stream == NULL) {
756 GError *error = NULL;
757
758 if (!send_handle_status (souphttpsink->message, error, souphttpsink)) {
759 g_object_unref (souphttpsink->message);
760 g_clear_pointer (&souphttpsink->request_body, g_bytes_unref);
761 g_clear_error (&error);
762 return;
763 }
764 }
765 souphttpsink->sent_buffers = souphttpsink->queued_buffers;
766
767 g_clear_pointer (&souphttpsink->request_body, g_bytes_unref);
768 g_object_unref (in_stream);
769
770 g_list_free_full (souphttpsink->sent_buffers,
771 (GDestroyNotify) gst_buffer_unref);
772 souphttpsink->sent_buffers = NULL;
773 souphttpsink->failures = 0;
774 souphttpsink->queued_buffers = NULL;
775 g_clear_object (&souphttpsink->message);
776 souphttpsink->offset += n;
777 }
778
779 static gboolean
send_message(GstSoupHttpClientSink * souphttpsink)780 send_message (GstSoupHttpClientSink * souphttpsink)
781 {
782 g_mutex_lock (&souphttpsink->mutex);
783 send_message_locked (souphttpsink);
784 if (souphttpsink->timer) {
785 g_source_destroy (souphttpsink->timer);
786 g_source_unref (souphttpsink->timer);
787 souphttpsink->timer = NULL;
788 }
789 g_mutex_unlock (&souphttpsink->mutex);
790
791 return FALSE;
792 }
793
794 static gboolean
send_handle_status(SoupMessage * msg,GError * error,GstSoupHttpClientSink * sink)795 send_handle_status (SoupMessage * msg, GError * error,
796 GstSoupHttpClientSink * sink)
797 {
798 if (error) {
799 GST_DEBUG_OBJECT (sink, "callback error=%d %s",
800 error->code, error->message);
801 } else {
802 GST_DEBUG_OBJECT (sink, "callback status=%d %s",
803 _soup_message_get_status (msg), _soup_message_get_reason_phrase (msg));
804 }
805
806 if (error || !SOUP_STATUS_IS_SUCCESSFUL (_soup_message_get_status (msg))) {
807 sink->failures++;
808 if (sink->retries && (sink->retries < 0 || sink->retries >= sink->failures)) {
809 guint64 retry_delay;
810 const char *retry_after;
811 SoupMessageHeaders *res_hdrs;
812 if (error) {
813 retry_delay = sink->retry_delay;
814 GST_WARNING_OBJECT (sink, "Could not write to HTTP URI: "
815 "error: %d %s (retrying PUT after %" G_GINT64_FORMAT
816 " seconds)", error->code, error->message, retry_delay);
817 goto err_done;
818 }
819 res_hdrs = _soup_message_get_response_headers (msg);
820 retry_after = _soup_message_headers_get_one (res_hdrs, "Retry-After");
821 if (retry_after) {
822 gchar *end = NULL;
823 retry_delay = g_ascii_strtoull (retry_after, &end, 10);
824 if (end || errno) {
825 retry_delay = sink->retry_delay;
826 } else {
827 retry_delay = MAX (retry_delay, sink->retry_delay);
828 }
829 GST_WARNING_OBJECT (sink, "Could not write to HTTP URI: "
830 "status: %d %s (retrying PUT after %" G_GINT64_FORMAT
831 " seconds with Retry-After: %s)",
832 _soup_message_get_status (msg),
833 _soup_message_get_reason_phrase (msg), retry_delay, retry_after);
834 } else {
835 retry_delay = sink->retry_delay;
836 GST_WARNING_OBJECT (sink, "Could not write to HTTP URI: "
837 "status: %d %s (retrying PUT after %" G_GINT64_FORMAT
838 " seconds)",
839 _soup_message_get_status (msg),
840 _soup_message_get_reason_phrase (msg), retry_delay);
841 }
842 err_done:
843 sink->timer = g_timeout_source_new_seconds (retry_delay);
844 g_source_set_callback (sink->timer, (GSourceFunc) (send_message),
845 sink, NULL);
846 g_source_attach (sink->timer, sink->context);
847 } else {
848 sink->status_code = _soup_message_get_status (msg);
849 sink->reason_phrase = g_strdup (_soup_message_get_reason_phrase (msg));
850 }
851 return FALSE;
852 }
853
854 return TRUE;
855 }
856
857 static GstFlowReturn
gst_soup_http_client_sink_render(GstBaseSink * sink,GstBuffer * buffer)858 gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer)
859 {
860 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
861 GSource *source;
862
863 if (souphttpsink->status_code != 0) {
864 GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE,
865 ("Could not write to HTTP URI"),
866 ("status: %d %s", souphttpsink->status_code,
867 souphttpsink->reason_phrase));
868 return GST_FLOW_ERROR;
869 }
870
871 g_mutex_lock (&souphttpsink->mutex);
872 if (souphttpsink->location != NULL) {
873 souphttpsink->queued_buffers =
874 g_list_append (souphttpsink->queued_buffers, gst_buffer_ref (buffer));
875
876 GST_DEBUG_OBJECT (souphttpsink, "setting callback for new buffers");
877 source = g_idle_source_new ();
878 g_source_set_callback (source, (GSourceFunc) (send_message),
879 souphttpsink, NULL);
880 g_source_attach (source, souphttpsink->context);
881 g_source_unref (source);
882 }
883 g_mutex_unlock (&souphttpsink->mutex);
884
885 return GST_FLOW_OK;
886 }
887
888 static void
restarted(SoupMessage * msg,GBytes * body)889 restarted (SoupMessage * msg, GBytes * body)
890 {
891 _soup_message_set_request_body_from_bytes (msg, NULL, body);
892 }
893
894 static gboolean
authenticate(SoupMessage * msg,SoupAuth * auth,gboolean retrying,gpointer user_data)895 authenticate (SoupMessage * msg, SoupAuth * auth,
896 gboolean retrying, gpointer user_data)
897 {
898 GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
899
900 if (!retrying) {
901 SoupStatus status_code = _soup_message_get_status (msg);
902 /* First time authentication only, if we fail and are called again with retry true fall through */
903 if (status_code == SOUP_STATUS_UNAUTHORIZED) {
904 if (souphttpsink->user_id && souphttpsink->user_pw)
905 _soup_auth_authenticate (auth, souphttpsink->user_id,
906 souphttpsink->user_pw);
907 } else if (status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED) {
908 if (souphttpsink->proxy_id && souphttpsink->proxy_pw)
909 _soup_auth_authenticate (auth, souphttpsink->proxy_id,
910 souphttpsink->proxy_pw);
911 }
912 }
913 return FALSE;
914 }
915
916 static gboolean
souphttpclientsink_element_init(GstPlugin * plugin)917 souphttpclientsink_element_init (GstPlugin * plugin)
918 {
919 gboolean ret = TRUE;
920
921 GST_DEBUG_CATEGORY_INIT (souphttpclientsink_dbg, "souphttpclientsink", 0,
922 "souphttpclientsink element");
923
924 if (!soup_element_init (plugin))
925 return TRUE;
926
927 ret =
928 gst_element_register (plugin, "souphttpclientsink", GST_RANK_NONE,
929 GST_TYPE_SOUP_HTTP_CLIENT_SINK);
930
931 return ret;
932 }
933