• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2011 David Schleef <ds@entropywave.com>
3  * Copyright (C) 2021 Igalia S.L.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
18  * Boston, MA 02110-1335, USA.
19  */
20 /**
21  * SECTION:element-gstsouphttpclientsink
22  * @title: gstsouphttpclientsink
23  *
24  * The souphttpclientsink element sends pipeline data to an HTTP server
25  * using HTTP PUT commands.
26  *
27  * ## Example launch line
28  * |[
29  * gst-launch-1.0 -v videotestsrc num-buffers=300 ! theoraenc ! oggmux !
30  *   souphttpclientsink location=http://server/filename.ogv
31  * ]|
32  *
33  * This example encodes 10 seconds of video and sends it to the HTTP
34  * server "server" using HTTP PUT commands.
35  *
36  */
37 
38 #ifdef HAVE_CONFIG_H
39 #include "config.h"
40 #endif
41 
42 #include <gst/gst.h>
43 #include <gst/base/gstbasesink.h>
44 #include <gio/gio.h>
45 
46 #include "gstsoupelements.h"
47 #include "gstsouphttpclientsink.h"
48 #include "gstsouputils.h"
49 
50 GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg);
51 #define GST_CAT_DEFAULT souphttpclientsink_dbg
52 
53 /* prototypes */
54 
55 
56 static void gst_soup_http_client_sink_set_property (GObject * object,
57     guint property_id, const GValue * value, GParamSpec * pspec);
58 static void gst_soup_http_client_sink_get_property (GObject * object,
59     guint property_id, GValue * value, GParamSpec * pspec);
60 static void gst_soup_http_client_sink_dispose (GObject * object);
61 static void gst_soup_http_client_sink_finalize (GObject * object);
62 
63 static gboolean gst_soup_http_client_sink_set_caps (GstBaseSink * sink,
64     GstCaps * caps);
65 static gboolean gst_soup_http_client_sink_start (GstBaseSink * sink);
66 static gboolean gst_soup_http_client_sink_stop (GstBaseSink * sink);
67 static gboolean gst_soup_http_client_sink_unlock (GstBaseSink * sink);
68 static GstFlowReturn gst_soup_http_client_sink_render (GstBaseSink * sink,
69     GstBuffer * buffer);
70 static void gst_soup_http_client_sink_reset (GstSoupHttpClientSink *
71     souphttpsink);
72 
73 static gboolean authenticate (SoupMessage * msg, SoupAuth * auth,
74     gboolean retrying, gpointer user_data);
75 static void restarted (SoupMessage * msg, GBytes * body);
76 static gboolean send_handle_status (SoupMessage * msg, GError * error,
77     GstSoupHttpClientSink * sink);
78 
79 static gboolean
80 gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink,
81     const gchar * uri);
82 
83 enum
84 {
85   PROP_0,
86   PROP_LOCATION,
87   PROP_USER_AGENT,
88   PROP_AUTOMATIC_REDIRECT,
89   PROP_PROXY,
90   PROP_USER_ID,
91   PROP_USER_PW,
92   PROP_PROXY_ID,
93   PROP_PROXY_PW,
94   PROP_COOKIES,
95   PROP_SESSION,
96   PROP_SOUP_LOG_LEVEL,
97   PROP_RETRY_DELAY,
98   PROP_RETRIES
99 };
100 
101 #define DEFAULT_USER_AGENT           "GStreamer souphttpclientsink "
102 #define DEFAULT_SOUP_LOG_LEVEL       SOUP_LOGGER_LOG_NONE
103 
104 /* pad templates */
105 
106 static GstStaticPadTemplate gst_soup_http_client_sink_sink_template =
107 GST_STATIC_PAD_TEMPLATE ("sink",
108     GST_PAD_SINK,
109     GST_PAD_ALWAYS,
110     GST_STATIC_CAPS_ANY);
111 
112 
113 /* class initialization */
114 
115 #define gst_soup_http_client_sink_parent_class parent_class
116 G_DEFINE_TYPE (GstSoupHttpClientSink, gst_soup_http_client_sink,
117     GST_TYPE_BASE_SINK);
118 
119 static gboolean souphttpclientsink_element_init (GstPlugin * plugin);
120 GST_ELEMENT_REGISTER_DEFINE_CUSTOM (souphttpclientsink,
121     souphttpclientsink_element_init);
122 
123 
124 static void
gst_soup_http_client_sink_class_init(GstSoupHttpClientSinkClass * klass)125 gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass)
126 {
127   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
128   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
129   GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
130 
131   gobject_class->set_property = gst_soup_http_client_sink_set_property;
132   gobject_class->get_property = gst_soup_http_client_sink_get_property;
133   gobject_class->dispose = gst_soup_http_client_sink_dispose;
134   gobject_class->finalize = gst_soup_http_client_sink_finalize;
135 
136   g_object_class_install_property (gobject_class,
137       PROP_LOCATION,
138       g_param_spec_string ("location", "Location",
139           "URI to send to", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
140   g_object_class_install_property (gobject_class,
141       PROP_USER_AGENT,
142       g_param_spec_string ("user-agent", "User-Agent",
143           "Value of the User-Agent HTTP request header field",
144           DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
145   g_object_class_install_property (gobject_class,
146       PROP_AUTOMATIC_REDIRECT,
147       g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
148           "Automatically follow HTTP redirects (HTTP Status Code 3xx)",
149           TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
150   g_object_class_install_property (gobject_class,
151       PROP_PROXY,
152       g_param_spec_string ("proxy", "Proxy",
153           "HTTP proxy server URI", "",
154           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
155   g_object_class_install_property (gobject_class,
156       PROP_USER_ID,
157       g_param_spec_string ("user-id", "user-id",
158           "user id for authentication", "",
159           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
160   g_object_class_install_property (gobject_class, PROP_USER_PW,
161       g_param_spec_string ("user-pw", "user-pw",
162           "user password for authentication", "",
163           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
164   g_object_class_install_property (gobject_class, PROP_PROXY_ID,
165       g_param_spec_string ("proxy-id", "proxy-id",
166           "user id for proxy authentication", "",
167           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
168   g_object_class_install_property (gobject_class, PROP_PROXY_PW,
169       g_param_spec_string ("proxy-pw", "proxy-pw",
170           "user password for proxy authentication", "",
171           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172   g_object_class_install_property (gobject_class, PROP_SESSION,
173       g_param_spec_object ("session", "session",
174           "SoupSession object to use for communication",
175           _soup_session_get_type (),
176           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
177   g_object_class_install_property (gobject_class, PROP_COOKIES,
178       g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
179           G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
180   g_object_class_install_property (gobject_class, PROP_RETRY_DELAY,
181       g_param_spec_int ("retry-delay", "Retry Delay",
182           "Delay in seconds between retries after a failure", 1, G_MAXINT, 5,
183           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
184   g_object_class_install_property (gobject_class, PROP_RETRIES,
185       g_param_spec_int ("retries", "Retries",
186           "Maximum number of retries, zero to disable, -1 to retry forever",
187           -1, G_MAXINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
188  /**
189    * GstSoupHttpClientSink::http-log-level:
190    *
191    * If set and > 0, captures and dumps HTTP session data as
192    * log messages if log level >= GST_LEVEL_TRACE
193    *
194    * Since: 1.4
195    */
196   g_object_class_install_property (gobject_class, PROP_SOUP_LOG_LEVEL,
197       g_param_spec_enum ("http-log-level", "HTTP log level",
198           "Set log level for soup's HTTP session log",
199           _soup_logger_log_level_get_type (),
200           DEFAULT_SOUP_LOG_LEVEL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
201 
202   gst_element_class_add_static_pad_template (gstelement_class,
203       &gst_soup_http_client_sink_sink_template);
204 
205   gst_element_class_set_static_metadata (gstelement_class, "HTTP client sink",
206       "Generic", "Sends streams to HTTP server via PUT",
207       "David Schleef <ds@entropywave.com>");
208 
209   base_sink_class->set_caps =
210       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_set_caps);
211   base_sink_class->start = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_start);
212   base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_stop);
213   base_sink_class->unlock =
214       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_unlock);
215   base_sink_class->render =
216       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_render);
217 }
218 
219 static void
gst_soup_http_client_sink_init(GstSoupHttpClientSink * souphttpsink)220 gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink)
221 {
222   const char *proxy;
223 
224   g_mutex_init (&souphttpsink->mutex);
225   g_cond_init (&souphttpsink->cond);
226 
227   souphttpsink->location = NULL;
228   souphttpsink->automatic_redirect = TRUE;
229   souphttpsink->user_agent = g_strdup (DEFAULT_USER_AGENT);
230   souphttpsink->user_id = NULL;
231   souphttpsink->user_pw = NULL;
232   souphttpsink->proxy_id = NULL;
233   souphttpsink->proxy_pw = NULL;
234   souphttpsink->prop_session = NULL;
235   souphttpsink->timeout = 1;
236   souphttpsink->log_level = DEFAULT_SOUP_LOG_LEVEL;
237   souphttpsink->retry_delay = 5;
238   souphttpsink->retries = 0;
239   souphttpsink->sent_buffers = NULL;
240   proxy = g_getenv ("http_proxy");
241   if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
242     GST_WARNING_OBJECT (souphttpsink,
243         "The proxy in the http_proxy env var (\"%s\") cannot be parsed.",
244         proxy);
245   }
246 
247   gst_soup_http_client_sink_reset (souphttpsink);
248 }
249 
250 static void
gst_soup_http_client_sink_reset(GstSoupHttpClientSink * souphttpsink)251 gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
252 {
253   g_list_free_full (souphttpsink->queued_buffers,
254       (GDestroyNotify) gst_buffer_unref);
255   souphttpsink->queued_buffers = NULL;
256   g_free (souphttpsink->reason_phrase);
257   souphttpsink->reason_phrase = NULL;
258   souphttpsink->status_code = 0;
259   souphttpsink->offset = 0;
260   souphttpsink->failures = 0;
261 
262   g_list_free_full (souphttpsink->streamheader_buffers,
263       (GDestroyNotify) gst_buffer_unref);
264   souphttpsink->streamheader_buffers = NULL;
265   g_list_free_full (souphttpsink->sent_buffers,
266       (GDestroyNotify) gst_buffer_unref);
267   souphttpsink->sent_buffers = NULL;
268 }
269 
270 static gboolean
gst_soup_http_client_sink_set_proxy(GstSoupHttpClientSink * souphttpsink,const gchar * uri)271 gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink,
272     const gchar * uri)
273 {
274   if (souphttpsink->proxy) {
275     gst_soup_uri_free (souphttpsink->proxy);
276     souphttpsink->proxy = NULL;
277   }
278   if (g_str_has_prefix (uri, "http://")) {
279     souphttpsink->proxy = gst_soup_uri_new (uri);
280   } else {
281     gchar *new_uri = g_strconcat ("http://", uri, NULL);
282 
283     souphttpsink->proxy = gst_soup_uri_new (new_uri);
284     g_free (new_uri);
285   }
286 
287   return TRUE;
288 }
289 
290 void
gst_soup_http_client_sink_set_property(GObject * object,guint property_id,const GValue * value,GParamSpec * pspec)291 gst_soup_http_client_sink_set_property (GObject * object, guint property_id,
292     const GValue * value, GParamSpec * pspec)
293 {
294   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
295 
296   g_mutex_lock (&souphttpsink->mutex);
297   switch (property_id) {
298     case PROP_SESSION:
299       if (souphttpsink->prop_session) {
300         g_object_unref (souphttpsink->prop_session);
301       }
302       souphttpsink->prop_session = g_value_dup_object (value);
303       break;
304     case PROP_LOCATION:
305       g_free (souphttpsink->location);
306       souphttpsink->location = g_value_dup_string (value);
307       souphttpsink->offset = 0;
308       if ((souphttpsink->location == NULL)
309           || !gst_uri_is_valid (souphttpsink->location)) {
310         GST_WARNING_OBJECT (souphttpsink,
311             "The location (\"%s\") set, is not a valid uri.",
312             souphttpsink->location);
313         g_free (souphttpsink->location);
314         souphttpsink->location = NULL;
315       }
316       break;
317     case PROP_USER_AGENT:
318       g_free (souphttpsink->user_agent);
319       souphttpsink->user_agent = g_value_dup_string (value);
320       break;
321     case PROP_AUTOMATIC_REDIRECT:
322       souphttpsink->automatic_redirect = g_value_get_boolean (value);
323       break;
324     case PROP_USER_ID:
325       g_free (souphttpsink->user_id);
326       souphttpsink->user_id = g_value_dup_string (value);
327       break;
328     case PROP_USER_PW:
329       g_free (souphttpsink->user_pw);
330       souphttpsink->user_pw = g_value_dup_string (value);
331       break;
332     case PROP_PROXY_ID:
333       g_free (souphttpsink->proxy_id);
334       souphttpsink->proxy_id = g_value_dup_string (value);
335       break;
336     case PROP_PROXY_PW:
337       g_free (souphttpsink->proxy_pw);
338       souphttpsink->proxy_pw = g_value_dup_string (value);
339       break;
340     case PROP_PROXY:
341     {
342       const gchar *proxy;
343 
344       proxy = g_value_get_string (value);
345 
346       if (proxy == NULL) {
347         GST_WARNING ("proxy property cannot be NULL");
348         goto done;
349       }
350       if (!gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
351         GST_WARNING ("badly formatted proxy URI");
352         goto done;
353       }
354       break;
355     }
356     case PROP_COOKIES:
357       g_strfreev (souphttpsink->cookies);
358       souphttpsink->cookies = g_strdupv (g_value_get_boxed (value));
359       break;
360     case PROP_SOUP_LOG_LEVEL:
361       souphttpsink->log_level = g_value_get_enum (value);
362       break;
363     case PROP_RETRY_DELAY:
364       souphttpsink->retry_delay = g_value_get_int (value);
365       break;
366     case PROP_RETRIES:
367       souphttpsink->retries = g_value_get_int (value);
368       break;
369     default:
370       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
371       break;
372   }
373 done:
374   g_mutex_unlock (&souphttpsink->mutex);
375 }
376 
377 void
gst_soup_http_client_sink_get_property(GObject * object,guint property_id,GValue * value,GParamSpec * pspec)378 gst_soup_http_client_sink_get_property (GObject * object, guint property_id,
379     GValue * value, GParamSpec * pspec)
380 {
381   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
382 
383   switch (property_id) {
384     case PROP_SESSION:
385       g_value_set_object (value, souphttpsink->prop_session);
386       break;
387     case PROP_LOCATION:
388       g_value_set_string (value, souphttpsink->location);
389       break;
390     case PROP_AUTOMATIC_REDIRECT:
391       g_value_set_boolean (value, souphttpsink->automatic_redirect);
392       break;
393     case PROP_USER_AGENT:
394       g_value_set_string (value, souphttpsink->user_agent);
395       break;
396     case PROP_USER_ID:
397       g_value_set_string (value, souphttpsink->user_id);
398       break;
399     case PROP_USER_PW:
400       g_value_set_string (value, souphttpsink->user_pw);
401       break;
402     case PROP_PROXY_ID:
403       g_value_set_string (value, souphttpsink->proxy_id);
404       break;
405     case PROP_PROXY_PW:
406       g_value_set_string (value, souphttpsink->proxy_pw);
407       break;
408     case PROP_PROXY:
409       if (souphttpsink->proxy == NULL)
410         g_value_set_static_string (value, "");
411       else {
412         char *proxy = gst_soup_uri_to_string (souphttpsink->proxy);
413 
414         g_value_set_string (value, proxy);
415         g_free (proxy);
416       }
417       break;
418     case PROP_COOKIES:
419       g_value_set_boxed (value, g_strdupv (souphttpsink->cookies));
420       break;
421     case PROP_SOUP_LOG_LEVEL:
422       g_value_set_enum (value, souphttpsink->log_level);
423       break;
424     case PROP_RETRY_DELAY:
425       g_value_set_int (value, souphttpsink->retry_delay);
426       break;
427     case PROP_RETRIES:
428       g_value_set_int (value, souphttpsink->retries);
429       break;
430     default:
431       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
432       break;
433   }
434 }
435 
436 void
gst_soup_http_client_sink_dispose(GObject * object)437 gst_soup_http_client_sink_dispose (GObject * object)
438 {
439   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
440 
441   /* clean up as possible.  may be called multiple times */
442   if (souphttpsink->prop_session)
443     g_object_unref (souphttpsink->prop_session);
444   souphttpsink->prop_session = NULL;
445 
446   G_OBJECT_CLASS (parent_class)->dispose (object);
447 }
448 
449 void
gst_soup_http_client_sink_finalize(GObject * object)450 gst_soup_http_client_sink_finalize (GObject * object)
451 {
452   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
453 
454   /* clean up object here */
455 
456   g_free (souphttpsink->user_agent);
457   g_free (souphttpsink->user_id);
458   g_free (souphttpsink->user_pw);
459   g_free (souphttpsink->proxy_id);
460   g_free (souphttpsink->proxy_pw);
461   if (souphttpsink->proxy)
462     gst_soup_uri_free (souphttpsink->proxy);
463   g_free (souphttpsink->location);
464   g_strfreev (souphttpsink->cookies);
465 
466   g_cond_clear (&souphttpsink->cond);
467   g_mutex_clear (&souphttpsink->mutex);
468 
469   G_OBJECT_CLASS (parent_class)->finalize (object);
470 }
471 
472 static gboolean
gst_soup_http_client_sink_set_caps(GstBaseSink * sink,GstCaps * caps)473 gst_soup_http_client_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
474 {
475   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
476   GstStructure *structure;
477   const GValue *value_array;
478   int i, n;
479 
480   GST_DEBUG_OBJECT (souphttpsink, "new stream headers set");
481   structure = gst_caps_get_structure (caps, 0);
482   value_array = gst_structure_get_value (structure, "streamheader");
483   if (value_array) {
484     g_list_free_full (souphttpsink->streamheader_buffers,
485         (GDestroyNotify) gst_buffer_unref);
486     souphttpsink->streamheader_buffers = NULL;
487 
488     n = gst_value_array_get_size (value_array);
489     for (i = 0; i < n; i++) {
490       const GValue *value;
491       GstBuffer *buffer;
492       value = gst_value_array_get_value (value_array, i);
493       buffer = GST_BUFFER (gst_value_get_buffer (value));
494       souphttpsink->streamheader_buffers =
495           g_list_append (souphttpsink->streamheader_buffers,
496           gst_buffer_ref (buffer));
497     }
498   }
499 
500   return TRUE;
501 }
502 
503 static gboolean
thread_ready_idle_cb(gpointer data)504 thread_ready_idle_cb (gpointer data)
505 {
506   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (data);
507 
508   GST_LOG_OBJECT (souphttpsink, "thread ready");
509 
510   g_mutex_lock (&souphttpsink->mutex);
511   g_cond_signal (&souphttpsink->cond);
512   g_mutex_unlock (&souphttpsink->mutex);
513 
514   return FALSE;                 /* only run once */
515 }
516 
517 static gpointer
thread_func(gpointer ptr)518 thread_func (gpointer ptr)
519 {
520   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (ptr);
521   GProxyResolver *proxy_resolver;
522   GMainContext *context;
523 
524   GST_DEBUG ("thread start");
525 
526   context = souphttpsink->context;
527   g_main_context_push_thread_default (context);
528 
529   if (souphttpsink->proxy != NULL) {
530     char *proxy_string = gst_soup_uri_to_string (souphttpsink->proxy);
531     proxy_resolver = g_simple_proxy_resolver_new (proxy_string, NULL);
532     g_free (proxy_string);
533   } else
534     proxy_resolver = g_object_ref (g_proxy_resolver_get_default ());
535 
536   souphttpsink->session =
537       _soup_session_new_with_options ("user-agent", souphttpsink->user_agent,
538       "timeout", souphttpsink->timeout, "proxy-resolver", proxy_resolver, NULL);
539 
540   g_object_unref (proxy_resolver);
541 
542   if (gst_soup_loader_get_api_version () < 3) {
543     g_signal_connect (souphttpsink->session, "authenticate",
544         G_CALLBACK (authenticate), souphttpsink);
545   }
546 
547   GST_DEBUG ("created session");
548 
549   g_main_loop_run (souphttpsink->loop);
550 
551   g_main_context_pop_thread_default (context);
552 
553   GST_DEBUG ("thread quit");
554 
555   return NULL;
556 }
557 
558 static gboolean
gst_soup_http_client_sink_start(GstBaseSink * sink)559 gst_soup_http_client_sink_start (GstBaseSink * sink)
560 {
561   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
562 
563   if (souphttpsink->prop_session) {
564     souphttpsink->session = souphttpsink->prop_session;
565   } else {
566     GSource *source;
567     GError *error = NULL;
568 
569     souphttpsink->context = g_main_context_new ();
570 
571     /* set up idle source to signal when the main loop is running and
572      * it's safe for ::stop() to call g_main_loop_quit() */
573     source = g_idle_source_new ();
574     g_source_set_callback (source, thread_ready_idle_cb, sink, NULL);
575     g_source_attach (source, souphttpsink->context);
576     g_source_unref (source);
577 
578     souphttpsink->loop = g_main_loop_new (souphttpsink->context, FALSE);
579 
580     g_mutex_lock (&souphttpsink->mutex);
581 
582     souphttpsink->thread = g_thread_try_new ("souphttpclientsink-thread",
583         thread_func, souphttpsink, &error);
584 
585     if (error != NULL) {
586       GST_DEBUG_OBJECT (souphttpsink, "failed to start thread, %s",
587           error->message);
588       g_error_free (error);
589       g_mutex_unlock (&souphttpsink->mutex);
590       return FALSE;
591     }
592 
593     GST_LOG_OBJECT (souphttpsink, "waiting for main loop thread to start up");
594     while (!g_main_loop_is_running (souphttpsink->loop))
595       g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex);
596     g_mutex_unlock (&souphttpsink->mutex);
597     GST_LOG_OBJECT (souphttpsink, "main loop thread running");
598   }
599 
600   /* Set up logging */
601   gst_soup_util_log_setup (souphttpsink->session, souphttpsink->log_level,
602       G_OBJECT (souphttpsink));
603 
604   return TRUE;
605 }
606 
607 static gboolean
gst_soup_http_client_sink_stop(GstBaseSink * sink)608 gst_soup_http_client_sink_stop (GstBaseSink * sink)
609 {
610   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
611 
612   GST_DEBUG ("stop");
613 
614   if (souphttpsink->prop_session == NULL) {
615     _soup_session_abort (souphttpsink->session);
616     g_object_unref (souphttpsink->session);
617   }
618 
619   g_mutex_lock (&souphttpsink->mutex);
620   if (souphttpsink->timer) {
621     g_source_destroy (souphttpsink->timer);
622     g_source_unref (souphttpsink->timer);
623     souphttpsink->timer = NULL;
624   }
625   g_mutex_unlock (&souphttpsink->mutex);
626 
627   if (souphttpsink->loop) {
628     g_main_loop_quit (souphttpsink->loop);
629     g_mutex_lock (&souphttpsink->mutex);
630     g_cond_signal (&souphttpsink->cond);
631     g_mutex_unlock (&souphttpsink->mutex);
632     g_thread_join (souphttpsink->thread);
633     g_main_loop_unref (souphttpsink->loop);
634     souphttpsink->loop = NULL;
635   }
636   if (souphttpsink->context) {
637     g_main_context_unref (souphttpsink->context);
638     souphttpsink->context = NULL;
639   }
640 
641   gst_soup_http_client_sink_reset (souphttpsink);
642 
643   return TRUE;
644 }
645 
646 static gboolean
gst_soup_http_client_sink_unlock(GstBaseSink * sink)647 gst_soup_http_client_sink_unlock (GstBaseSink * sink)
648 {
649   GST_DEBUG ("unlock");
650 
651   return TRUE;
652 }
653 
654 static void
send_message_locked(GstSoupHttpClientSink * souphttpsink)655 send_message_locked (GstSoupHttpClientSink * souphttpsink)
656 {
657   GList *g;
658   guint64 n;
659   GByteArray *array;
660   GInputStream *in_stream;
661 
662   if (souphttpsink->queued_buffers == NULL || souphttpsink->message) {
663     return;
664   }
665 
666   /* If the URI went away, drop all these buffers */
667   if (souphttpsink->location == NULL) {
668     GST_DEBUG_OBJECT (souphttpsink, "URI went away, dropping queued buffers");
669     g_list_free_full (souphttpsink->queued_buffers,
670         (GDestroyNotify) gst_buffer_unref);
671     souphttpsink->queued_buffers = NULL;
672     return;
673   }
674 
675   souphttpsink->message = _soup_message_new ("PUT", souphttpsink->location);
676   if (souphttpsink->message == NULL) {
677     GST_WARNING_OBJECT (souphttpsink,
678         "URI could not be parsed while creating message.");
679     g_list_free_full (souphttpsink->queued_buffers,
680         (GDestroyNotify) gst_buffer_unref);
681     souphttpsink->queued_buffers = NULL;
682     return;
683   }
684 
685   g_signal_connect (souphttpsink->message, "restarted", G_CALLBACK (restarted),
686       souphttpsink->request_body);
687 
688   _soup_message_set_flags (souphttpsink->message,
689       (souphttpsink->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT));
690 
691   if (souphttpsink->cookies) {
692     gchar **cookie;
693 
694     for (cookie = souphttpsink->cookies; *cookie != NULL; cookie++) {
695       _soup_message_headers_append (_soup_message_get_request_headers
696           (souphttpsink->message), "Cookie", *cookie);
697     }
698   }
699   array = g_byte_array_new ();
700   n = 0;
701   if (souphttpsink->offset == 0) {
702     for (g = souphttpsink->streamheader_buffers; g; g = g_list_next (g)) {
703       GstBuffer *buffer = g->data;
704       GstMapInfo map;
705 
706       GST_DEBUG_OBJECT (souphttpsink, "queueing stream headers");
707       gst_buffer_map (buffer, &map, GST_MAP_READ);
708       g_byte_array_append (array, map.data, map.size);
709       n += map.size;
710       gst_buffer_unmap (buffer, &map);
711     }
712   }
713 
714   for (g = souphttpsink->queued_buffers; g; g = g_list_next (g)) {
715     GstBuffer *buffer = g->data;
716     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) {
717       GstMapInfo map;
718 
719       gst_buffer_map (buffer, &map, GST_MAP_READ);
720       g_byte_array_append (array, map.data, map.size);
721       n += map.size;
722       gst_buffer_unmap (buffer, &map);
723     }
724   }
725 
726   {
727     souphttpsink->request_body = g_byte_array_free_to_bytes (array);
728     _soup_message_set_request_body_from_bytes (souphttpsink->message,
729         NULL, souphttpsink->request_body);
730   }
731 
732   if (souphttpsink->offset != 0) {
733     char *s;
734     s = g_strdup_printf ("bytes %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "/*",
735         souphttpsink->offset, souphttpsink->offset + n - 1);
736     _soup_message_headers_append (_soup_message_get_request_headers
737         (souphttpsink->message), "Content-Range", s);
738     g_free (s);
739   }
740 
741   if (n == 0) {
742     GST_DEBUG_OBJECT (souphttpsink,
743         "total size of buffers queued is 0, freeing everything");
744     g_list_free_full (souphttpsink->queued_buffers,
745         (GDestroyNotify) gst_buffer_unref);
746     souphttpsink->queued_buffers = NULL;
747     g_clear_object (&souphttpsink->message);
748     g_clear_pointer (&souphttpsink->request_body, g_bytes_unref);
749     return;
750   }
751 
752   in_stream =
753       _soup_session_send (souphttpsink->session, souphttpsink->message, NULL,
754       NULL);
755   if (in_stream == NULL) {
756     GError *error = NULL;
757 
758     if (!send_handle_status (souphttpsink->message, error, souphttpsink)) {
759       g_object_unref (souphttpsink->message);
760       g_clear_pointer (&souphttpsink->request_body, g_bytes_unref);
761       g_clear_error (&error);
762       return;
763     }
764   }
765   souphttpsink->sent_buffers = souphttpsink->queued_buffers;
766 
767   g_clear_pointer (&souphttpsink->request_body, g_bytes_unref);
768   g_object_unref (in_stream);
769 
770   g_list_free_full (souphttpsink->sent_buffers,
771       (GDestroyNotify) gst_buffer_unref);
772   souphttpsink->sent_buffers = NULL;
773   souphttpsink->failures = 0;
774   souphttpsink->queued_buffers = NULL;
775   g_clear_object (&souphttpsink->message);
776   souphttpsink->offset += n;
777 }
778 
779 static gboolean
send_message(GstSoupHttpClientSink * souphttpsink)780 send_message (GstSoupHttpClientSink * souphttpsink)
781 {
782   g_mutex_lock (&souphttpsink->mutex);
783   send_message_locked (souphttpsink);
784   if (souphttpsink->timer) {
785     g_source_destroy (souphttpsink->timer);
786     g_source_unref (souphttpsink->timer);
787     souphttpsink->timer = NULL;
788   }
789   g_mutex_unlock (&souphttpsink->mutex);
790 
791   return FALSE;
792 }
793 
794 static gboolean
send_handle_status(SoupMessage * msg,GError * error,GstSoupHttpClientSink * sink)795 send_handle_status (SoupMessage * msg, GError * error,
796     GstSoupHttpClientSink * sink)
797 {
798   if (error) {
799     GST_DEBUG_OBJECT (sink, "callback error=%d %s",
800         error->code, error->message);
801   } else {
802     GST_DEBUG_OBJECT (sink, "callback status=%d %s",
803         _soup_message_get_status (msg), _soup_message_get_reason_phrase (msg));
804   }
805 
806   if (error || !SOUP_STATUS_IS_SUCCESSFUL (_soup_message_get_status (msg))) {
807     sink->failures++;
808     if (sink->retries && (sink->retries < 0 || sink->retries >= sink->failures)) {
809       guint64 retry_delay;
810       const char *retry_after;
811       SoupMessageHeaders *res_hdrs;
812       if (error) {
813         retry_delay = sink->retry_delay;
814         GST_WARNING_OBJECT (sink, "Could not write to HTTP URI: "
815             "error: %d %s (retrying PUT after %" G_GINT64_FORMAT
816             " seconds)", error->code, error->message, retry_delay);
817         goto err_done;
818       }
819       res_hdrs = _soup_message_get_response_headers (msg);
820       retry_after = _soup_message_headers_get_one (res_hdrs, "Retry-After");
821       if (retry_after) {
822         gchar *end = NULL;
823         retry_delay = g_ascii_strtoull (retry_after, &end, 10);
824         if (end || errno) {
825           retry_delay = sink->retry_delay;
826         } else {
827           retry_delay = MAX (retry_delay, sink->retry_delay);
828         }
829         GST_WARNING_OBJECT (sink, "Could not write to HTTP URI: "
830             "status: %d %s (retrying PUT after %" G_GINT64_FORMAT
831             " seconds with Retry-After: %s)",
832             _soup_message_get_status (msg),
833             _soup_message_get_reason_phrase (msg), retry_delay, retry_after);
834       } else {
835         retry_delay = sink->retry_delay;
836         GST_WARNING_OBJECT (sink, "Could not write to HTTP URI: "
837             "status: %d %s (retrying PUT after %" G_GINT64_FORMAT
838             " seconds)",
839             _soup_message_get_status (msg),
840             _soup_message_get_reason_phrase (msg), retry_delay);
841       }
842     err_done:
843       sink->timer = g_timeout_source_new_seconds (retry_delay);
844       g_source_set_callback (sink->timer, (GSourceFunc) (send_message),
845           sink, NULL);
846       g_source_attach (sink->timer, sink->context);
847     } else {
848       sink->status_code = _soup_message_get_status (msg);
849       sink->reason_phrase = g_strdup (_soup_message_get_reason_phrase (msg));
850     }
851     return FALSE;
852   }
853 
854   return TRUE;
855 }
856 
857 static GstFlowReturn
gst_soup_http_client_sink_render(GstBaseSink * sink,GstBuffer * buffer)858 gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer)
859 {
860   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
861   GSource *source;
862 
863   if (souphttpsink->status_code != 0) {
864     GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE,
865         ("Could not write to HTTP URI"),
866         ("status: %d %s", souphttpsink->status_code,
867             souphttpsink->reason_phrase));
868     return GST_FLOW_ERROR;
869   }
870 
871   g_mutex_lock (&souphttpsink->mutex);
872   if (souphttpsink->location != NULL) {
873     souphttpsink->queued_buffers =
874         g_list_append (souphttpsink->queued_buffers, gst_buffer_ref (buffer));
875 
876     GST_DEBUG_OBJECT (souphttpsink, "setting callback for new buffers");
877     source = g_idle_source_new ();
878     g_source_set_callback (source, (GSourceFunc) (send_message),
879         souphttpsink, NULL);
880     g_source_attach (source, souphttpsink->context);
881     g_source_unref (source);
882   }
883   g_mutex_unlock (&souphttpsink->mutex);
884 
885   return GST_FLOW_OK;
886 }
887 
888 static void
restarted(SoupMessage * msg,GBytes * body)889 restarted (SoupMessage * msg, GBytes * body)
890 {
891   _soup_message_set_request_body_from_bytes (msg, NULL, body);
892 }
893 
894 static gboolean
authenticate(SoupMessage * msg,SoupAuth * auth,gboolean retrying,gpointer user_data)895 authenticate (SoupMessage * msg, SoupAuth * auth,
896     gboolean retrying, gpointer user_data)
897 {
898   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
899 
900   if (!retrying) {
901     SoupStatus status_code = _soup_message_get_status (msg);
902     /* First time authentication only, if we fail and are called again with retry true fall through */
903     if (status_code == SOUP_STATUS_UNAUTHORIZED) {
904       if (souphttpsink->user_id && souphttpsink->user_pw)
905         _soup_auth_authenticate (auth, souphttpsink->user_id,
906             souphttpsink->user_pw);
907     } else if (status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED) {
908       if (souphttpsink->proxy_id && souphttpsink->proxy_pw)
909         _soup_auth_authenticate (auth, souphttpsink->proxy_id,
910             souphttpsink->proxy_pw);
911     }
912   }
913   return FALSE;
914 }
915 
916 static gboolean
souphttpclientsink_element_init(GstPlugin * plugin)917 souphttpclientsink_element_init (GstPlugin * plugin)
918 {
919   gboolean ret = TRUE;
920 
921   GST_DEBUG_CATEGORY_INIT (souphttpclientsink_dbg, "souphttpclientsink", 0,
922       "souphttpclientsink element");
923 
924   if (!soup_element_init (plugin))
925     return TRUE;
926 
927   ret =
928       gst_element_register (plugin, "souphttpclientsink", GST_RANK_NONE,
929       GST_TYPE_SOUP_HTTP_CLIENT_SINK);
930 
931   return ret;
932 }
933