• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 
20 /**
21  * SECTION:element-curlsink
22  * @title: curlsink
23  * @short_description: sink that uploads data to a server using libcurl
24  *
25  * This is a network sink that uses libcurl as a client to upload data to
26  * a server (e.g. a HTTP/FTP server).
27  *
28  * ## Example launch line (upload a JPEG file to an HTTP server)
29  * |[
30  * gst-launch-1.0 filesrc location=image.jpg ! jpegparse ! curlsink  \
31  *     file-name=image.jpg  \
32  *     location=http://192.168.0.1:8080/cgi-bin/patupload.cgi/  \
33  *     user=test passwd=test  \
34  *     content-type=image/jpeg  \
35  *     use-content-length=false
36  * ]|
37  *
38  */
39 
40 #ifdef HAVE_CONFIG_H
41 #include "config.h"
42 #endif
43 
44 #include <curl/curl.h>
45 #include <string.h>
46 #include <stdio.h>
47 
48 #if HAVE_SYS_SOCKET_H
49 #include <sys/socket.h>
50 #endif
51 #include <sys/types.h>
52 #if HAVE_NETINET_IN_H
53 #include <netinet/in.h>
54 #endif
55 #include <unistd.h>
56 #if HAVE_NETINET_IP_H
57 #include <netinet/ip.h>
58 #endif
59 #if HAVE_NETINET_TCP_H
60 #include <netinet/tcp.h>
61 #endif
62 #include <sys/stat.h>
63 #include <fcntl.h>
64 
65 #include "gstcurlbasesink.h"
66 
67 /* Default values */
68 #define GST_CAT_DEFAULT                gst_curl_base_sink_debug
69 #define DEFAULT_URL                    "localhost:5555"
70 #define DEFAULT_TIMEOUT                30
71 #define DEFAULT_QOS_DSCP               0
72 
73 #define DSCP_MIN                       0
74 #define DSCP_MAX                       63
75 
76 
77 /* Plugin specific settings */
78 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
79     GST_PAD_SINK,
80     GST_PAD_ALWAYS,
81     GST_STATIC_CAPS_ANY);
82 
83 GST_DEBUG_CATEGORY_STATIC (gst_curl_base_sink_debug);
84 
85 enum
86 {
87   PROP_0,
88   PROP_LOCATION,
89   PROP_USER_NAME,
90   PROP_USER_PASSWD,
91   PROP_FILE_NAME,
92   PROP_TIMEOUT,
93   PROP_QOS_DSCP
94 };
95 
96 /* Object class function declarations */
97 static void gst_curl_base_sink_finalize (GObject * gobject);
98 static void gst_curl_base_sink_set_property (GObject * object, guint prop_id,
99     const GValue * value, GParamSpec * pspec);
100 static void gst_curl_base_sink_get_property (GObject * object, guint prop_id,
101     GValue * value, GParamSpec * pspec);
102 
103 /* BaseSink class function declarations */
104 static GstFlowReturn gst_curl_base_sink_render (GstBaseSink * bsink,
105     GstBuffer * buf);
106 static gboolean gst_curl_base_sink_event (GstBaseSink * bsink,
107     GstEvent * event);
108 static gboolean gst_curl_base_sink_start (GstBaseSink * bsink);
109 static gboolean gst_curl_base_sink_stop (GstBaseSink * bsink);
110 static gboolean gst_curl_base_sink_unlock (GstBaseSink * bsink);
111 static gboolean gst_curl_base_sink_unlock_stop (GstBaseSink * bsink);
112 
113 /* private functions */
114 
115 static gboolean gst_curl_base_sink_transfer_setup_unlocked
116     (GstCurlBaseSink * sink);
117 static gboolean gst_curl_base_sink_transfer_start_unlocked
118     (GstCurlBaseSink * sink);
119 static void gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink);
120 static size_t gst_curl_base_sink_transfer_read_cb (void *ptr, size_t size,
121     size_t nmemb, void *stream);
122 static size_t gst_curl_base_sink_transfer_write_cb (void *ptr, size_t size,
123     size_t nmemb, void *stream);
124 static int gst_curl_base_sink_transfer_seek_cb (void *user_p, curl_off_t offset,
125     int origin);
126 static size_t gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink,
127     void *curl_ptr, size_t block_size, guint * last_chunk);
128 #ifndef GST_DISABLE_GST_DEBUG
129 static int gst_curl_base_sink_debug_cb (CURL * handle, curl_infotype type,
130     char *data, size_t size, void *clientp);
131 #endif
132 static int gst_curl_base_sink_transfer_socket_cb (void *clientp,
133     curl_socket_t curlfd, curlsocktype purpose);
134 static gpointer gst_curl_base_sink_transfer_thread_func (gpointer data);
135 static gint gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink);
136 static CURLcode gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink);
137 
138 static gboolean gst_curl_base_sink_wait_for_data_unlocked
139     (GstCurlBaseSink * sink);
140 static void gst_curl_base_sink_new_file_notify_unlocked
141     (GstCurlBaseSink * sink);
142 static void gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked
143     (GstCurlBaseSink * sink);
144 static void gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink);
145 static void gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink);
146 static void gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink);
147 
148 static void handle_transfer (GstCurlBaseSink * sink);
149 static size_t transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
150     size_t max_bytes_to_send, guint * last_chunk);
151 
152 #define parent_class gst_curl_base_sink_parent_class
153 G_DEFINE_TYPE (GstCurlBaseSink, gst_curl_base_sink, GST_TYPE_BASE_SINK);
154 
155 static gboolean
gst_curl_base_sink_default_has_buffered_data_unlocked(GstCurlBaseSink * sink)156 gst_curl_base_sink_default_has_buffered_data_unlocked (GstCurlBaseSink * sink)
157 {
158   return sink->transfer_buf->len > 0;
159 }
160 
161 static gboolean
gst_curl_base_sink_has_buffered_data_unlocked(GstCurlBaseSink * sink)162 gst_curl_base_sink_has_buffered_data_unlocked (GstCurlBaseSink * sink)
163 {
164   GstCurlBaseSinkClass *klass;
165   gboolean res = FALSE;
166 
167   klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
168 
169   if (klass->has_buffered_data_unlocked)
170     res = klass->has_buffered_data_unlocked (sink);
171 
172   return res;
173 }
174 
175 static void
gst_curl_base_sink_class_init(GstCurlBaseSinkClass * klass)176 gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass)
177 {
178   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
179   GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass;
180   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
181 
182   GST_DEBUG_CATEGORY_INIT (gst_curl_base_sink_debug, "curlbasesink", 0,
183       "curl base sink element");
184 
185   gst_element_class_set_static_metadata (element_class,
186       "Curl base sink",
187       "Sink/Network",
188       "Upload data over the network to a server using libcurl",
189       "Patricia Muscalu <patricia@axis.com>");
190 
191   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_curl_base_sink_event);
192   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_curl_base_sink_render);
193   gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_curl_base_sink_start);
194   gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_curl_base_sink_stop);
195   gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock);
196   gstbasesink_class->unlock_stop =
197       GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock_stop);
198   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_base_sink_finalize);
199 
200   gobject_class->set_property = gst_curl_base_sink_set_property;
201   gobject_class->get_property = gst_curl_base_sink_get_property;
202 
203   klass->handle_transfer = handle_transfer;
204   klass->transfer_read_cb = gst_curl_base_sink_transfer_read_cb;
205   klass->transfer_data_buffer = gst_curl_base_sink_transfer_data_buffer;
206   klass->has_buffered_data_unlocked =
207       gst_curl_base_sink_default_has_buffered_data_unlocked;
208 
209   /* FIXME: check against souphttpsrc and use same names for same properties */
210   g_object_class_install_property (gobject_class, PROP_LOCATION,
211       g_param_spec_string ("location", "Location",
212           "URI location to write to", NULL,
213           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
214   g_object_class_install_property (gobject_class, PROP_USER_NAME,
215       g_param_spec_string ("user", "User name",
216           "User name to use for server authentication", NULL,
217           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
218   g_object_class_install_property (gobject_class, PROP_USER_PASSWD,
219       g_param_spec_string ("passwd", "User password",
220           "User password to use for server authentication", NULL,
221           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222   g_object_class_install_property (gobject_class, PROP_FILE_NAME,
223       g_param_spec_string ("file-name", "Base file name",
224           "The base file name for the uploaded images", NULL,
225           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
227       g_param_spec_int ("timeout", "Timeout",
228           "Number of seconds waiting to write before timeout",
229           0, G_MAXINT, DEFAULT_TIMEOUT,
230           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231   g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
232       g_param_spec_int ("qos-dscp",
233           "QoS diff srv code point",
234           "Quality of Service, differentiated services code point (0 default)",
235           DSCP_MIN, DSCP_MAX, DEFAULT_QOS_DSCP,
236           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
237 
238   gst_element_class_add_static_pad_template (element_class, &sinktemplate);
239 
240   gst_type_mark_as_plugin_api (GST_TYPE_CURL_BASE_SINK, 0);
241 }
242 
243 static void
gst_curl_base_sink_init(GstCurlBaseSink * sink)244 gst_curl_base_sink_init (GstCurlBaseSink * sink)
245 {
246   sink->transfer_buf = g_malloc (sizeof (TransferBuffer));
247   sink->transfer_cond = g_malloc (sizeof (TransferCondition));
248   g_cond_init (&sink->transfer_cond->cond);
249   sink->transfer_cond->data_sent = FALSE;
250   sink->transfer_cond->data_available = FALSE;
251   sink->transfer_cond->wait_for_response = FALSE;
252   sink->timeout = DEFAULT_TIMEOUT;
253   sink->qos_dscp = DEFAULT_QOS_DSCP;
254   sink->url = g_strdup (DEFAULT_URL);
255   sink->transfer_thread_close = FALSE;
256   sink->new_file = TRUE;
257   sink->error = NULL;
258   sink->flow_ret = GST_FLOW_OK;
259   sink->is_live = FALSE;
260 }
261 
262 static void
gst_curl_base_sink_finalize(GObject * gobject)263 gst_curl_base_sink_finalize (GObject * gobject)
264 {
265   GstCurlBaseSink *this = GST_CURL_BASE_SINK (gobject);
266 
267   GST_DEBUG ("finalizing curlsink");
268   if (this->transfer_thread != NULL) {
269     g_thread_join (this->transfer_thread);
270   }
271 
272   g_cond_clear (&this->transfer_cond->cond);
273   g_free (this->transfer_cond);
274   g_free (this->transfer_buf);
275 
276   g_free (this->url);
277   g_free (this->user);
278   g_free (this->passwd);
279   g_free (this->file_name);
280   if (this->fdset != NULL) {
281     gst_poll_free (this->fdset);
282     this->fdset = NULL;
283   }
284   G_OBJECT_CLASS (parent_class)->finalize (gobject);
285 }
286 
287 void
gst_curl_base_sink_transfer_thread_notify_unlocked(GstCurlBaseSink * sink)288 gst_curl_base_sink_transfer_thread_notify_unlocked (GstCurlBaseSink * sink)
289 {
290   GST_LOG ("more data to send");
291 
292   sink->transfer_cond->data_available = TRUE;
293   sink->transfer_cond->data_sent = FALSE;
294   sink->transfer_cond->wait_for_response = TRUE;
295   g_cond_signal (&sink->transfer_cond->cond);
296 }
297 
298 void
gst_curl_base_sink_transfer_thread_close(GstCurlBaseSink * sink)299 gst_curl_base_sink_transfer_thread_close (GstCurlBaseSink * sink)
300 {
301   GST_OBJECT_LOCK (sink);
302   GST_LOG_OBJECT (sink, "setting transfer thread close flag");
303   sink->transfer_thread_close = TRUE;
304   g_cond_signal (&sink->transfer_cond->cond);
305   GST_OBJECT_UNLOCK (sink);
306 
307   if (sink->transfer_thread != NULL) {
308     GST_LOG_OBJECT (sink, "waiting for transfer thread to finish");
309     g_thread_join (sink->transfer_thread);
310     sink->transfer_thread = NULL;
311   }
312 }
313 
314 void
gst_curl_base_sink_set_live(GstCurlBaseSink * sink,gboolean live)315 gst_curl_base_sink_set_live (GstCurlBaseSink * sink, gboolean live)
316 {
317   g_return_if_fail (GST_IS_CURL_BASE_SINK (sink));
318 
319   GST_OBJECT_LOCK (sink);
320   sink->is_live = live;
321   GST_OBJECT_UNLOCK (sink);
322 }
323 
324 gboolean
gst_curl_base_sink_is_live(GstCurlBaseSink * sink)325 gst_curl_base_sink_is_live (GstCurlBaseSink * sink)
326 {
327   gboolean result;
328 
329   g_return_val_if_fail (GST_IS_CURL_BASE_SINK (sink), FALSE);
330 
331   GST_OBJECT_LOCK (sink);
332   result = sink->is_live;
333   GST_OBJECT_UNLOCK (sink);
334 
335   return result;
336 }
337 
338 static GstFlowReturn
gst_curl_base_sink_render(GstBaseSink * bsink,GstBuffer * buf)339 gst_curl_base_sink_render (GstBaseSink * bsink, GstBuffer * buf)
340 {
341   GstCurlBaseSink *sink;
342   GstMapInfo map;
343   guint8 *data;
344   size_t size;
345   GstFlowReturn ret;
346   gchar *error;
347 
348   GST_LOG ("enter render");
349 
350   sink = GST_CURL_BASE_SINK (bsink);
351 
352   gst_buffer_map (buf, &map, GST_MAP_READ);
353   data = map.data;
354   size = map.size;
355 
356   if (size == 0) {
357     gst_buffer_unmap (buf, &map);
358     return GST_FLOW_OK;
359   }
360 
361   GST_OBJECT_LOCK (sink);
362 
363   /* check if the transfer thread has encountered problems while the
364    * pipeline thread was working elsewhere */
365   if (sink->flow_ret != GST_FLOW_OK) {
366     goto done;
367   }
368 
369   g_assert (sink->transfer_cond->data_available == FALSE);
370 
371   /* if there is no transfer thread created, lets create one */
372   if (sink->transfer_thread == NULL) {
373     if (!gst_curl_base_sink_transfer_start_unlocked (sink)) {
374       sink->flow_ret = GST_FLOW_ERROR;
375       goto done;
376     }
377   }
378 
379   /* make data available for the transfer thread and notify */
380   sink->transfer_buf->ptr = data;
381   sink->transfer_buf->len = size;
382   sink->transfer_buf->offset = 0;
383   gst_curl_base_sink_transfer_thread_notify_unlocked (sink);
384 
385   /* wait for the transfer thread to send the data. This will be notified
386    * either when transfer is completed by the curl read callback or by
387    * the thread function if an error has occurred. */
388   gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked (sink);
389 
390 done:
391   gst_buffer_unmap (buf, &map);
392 
393   /* Hand over error from transfer thread to streaming thread */
394   error = sink->error;
395   sink->error = NULL;
396   ret = sink->flow_ret;
397   GST_OBJECT_UNLOCK (sink);
398 
399   if (error != NULL) {
400     GST_ERROR_OBJECT (sink, "%s", error);
401     GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s", error), (NULL));
402     g_free (error);
403   }
404 
405   GST_LOG ("exit render");
406 
407   return ret;
408 }
409 
410 static gboolean
gst_curl_base_sink_event(GstBaseSink * bsink,GstEvent * event)411 gst_curl_base_sink_event (GstBaseSink * bsink, GstEvent * event)
412 {
413   GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
414   GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
415 
416   switch (event->type) {
417     case GST_EVENT_EOS:
418       GST_DEBUG_OBJECT (sink, "received EOS");
419       gst_curl_base_sink_transfer_thread_close (sink);
420       gst_curl_base_sink_wait_for_response (sink);
421       break;
422     case GST_EVENT_CAPS:
423       if (klass->set_mime_type) {
424         GstCaps *caps;
425         gst_event_parse_caps (event, &caps);
426         klass->set_mime_type (sink, caps);
427       }
428       break;
429     default:
430       break;
431   }
432 
433   return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event);
434 }
435 
436 static gboolean
gst_curl_base_sink_start(GstBaseSink * bsink)437 gst_curl_base_sink_start (GstBaseSink * bsink)
438 {
439   GstCurlBaseSink *sink;
440 
441   sink = GST_CURL_BASE_SINK (bsink);
442 
443   /* reset flags */
444   sink->transfer_cond->data_sent = FALSE;
445   sink->transfer_cond->data_available = FALSE;
446   sink->transfer_cond->wait_for_response = FALSE;
447   sink->transfer_thread_close = FALSE;
448   sink->new_file = TRUE;
449   sink->flow_ret = GST_FLOW_OK;
450 
451   if ((sink->fdset = gst_poll_new (TRUE)) == NULL) {
452     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE,
453         ("gst_poll_new failed: %s", g_strerror (errno)), (NULL));
454     return FALSE;
455   }
456 
457   gst_poll_fd_init (&sink->fd);
458 
459   return TRUE;
460 }
461 
462 static gboolean
gst_curl_base_sink_stop(GstBaseSink * bsink)463 gst_curl_base_sink_stop (GstBaseSink * bsink)
464 {
465   GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
466 
467   gst_curl_base_sink_transfer_thread_close (sink);
468   if (sink->fdset != NULL) {
469     gst_poll_free (sink->fdset);
470     sink->fdset = NULL;
471   }
472 
473   return TRUE;
474 }
475 
476 static gboolean
gst_curl_base_sink_unlock(GstBaseSink * bsink)477 gst_curl_base_sink_unlock (GstBaseSink * bsink)
478 {
479   GstCurlBaseSink *sink;
480 
481   sink = GST_CURL_BASE_SINK (bsink);
482 
483   GST_LOG_OBJECT (sink, "Flushing");
484   gst_poll_set_flushing (sink->fdset, TRUE);
485 
486   return TRUE;
487 }
488 
489 static gboolean
gst_curl_base_sink_unlock_stop(GstBaseSink * bsink)490 gst_curl_base_sink_unlock_stop (GstBaseSink * bsink)
491 {
492   GstCurlBaseSink *sink;
493 
494   sink = GST_CURL_BASE_SINK (bsink);
495 
496   GST_LOG_OBJECT (sink, "No longer flushing");
497   gst_poll_set_flushing (sink->fdset, FALSE);
498 
499   return TRUE;
500 }
501 
502 static void
gst_curl_base_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)503 gst_curl_base_sink_set_property (GObject * object, guint prop_id,
504     const GValue * value, GParamSpec * pspec)
505 {
506   GstCurlBaseSink *sink;
507   GstState cur_state;
508 
509   g_return_if_fail (GST_IS_CURL_BASE_SINK (object));
510   sink = GST_CURL_BASE_SINK (object);
511 
512   gst_element_get_state (GST_ELEMENT (sink), &cur_state, NULL, 0);
513   if (cur_state != GST_STATE_PLAYING && cur_state != GST_STATE_PAUSED) {
514     GST_OBJECT_LOCK (sink);
515 
516     switch (prop_id) {
517       case PROP_LOCATION:
518         g_free (sink->url);
519         sink->url = g_value_dup_string (value);
520         GST_DEBUG_OBJECT (sink, "url set to %s", sink->url);
521         break;
522       case PROP_USER_NAME:
523         g_free (sink->user);
524         sink->user = g_value_dup_string (value);
525         GST_DEBUG_OBJECT (sink, "user set to %s", sink->user);
526         break;
527       case PROP_USER_PASSWD:
528         g_free (sink->passwd);
529         sink->passwd = g_value_dup_string (value);
530         GST_DEBUG_OBJECT (sink, "passwd set to %s", sink->passwd);
531         break;
532       case PROP_FILE_NAME:
533         g_free (sink->file_name);
534         sink->file_name = g_value_dup_string (value);
535         GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
536         break;
537       case PROP_TIMEOUT:
538         sink->timeout = g_value_get_int (value);
539         GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
540         break;
541       case PROP_QOS_DSCP:
542         sink->qos_dscp = g_value_get_int (value);
543         gst_curl_base_sink_setup_dscp_unlocked (sink);
544         GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
545         break;
546       default:
547         GST_DEBUG_OBJECT (sink, "invalid property id %d", prop_id);
548         break;
549     }
550 
551     GST_OBJECT_UNLOCK (sink);
552 
553     return;
554   }
555 
556   /* in PLAYING or PAUSED state */
557   GST_OBJECT_LOCK (sink);
558 
559   switch (prop_id) {
560     case PROP_FILE_NAME:
561       g_free (sink->file_name);
562       sink->file_name = g_value_dup_string (value);
563       GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
564       gst_curl_base_sink_new_file_notify_unlocked (sink);
565       break;
566     case PROP_TIMEOUT:
567       sink->timeout = g_value_get_int (value);
568       GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
569       break;
570     case PROP_QOS_DSCP:
571       sink->qos_dscp = g_value_get_int (value);
572       gst_curl_base_sink_setup_dscp_unlocked (sink);
573       GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
574       break;
575     default:
576       GST_WARNING_OBJECT (sink, "cannot set property when PLAYING");
577       break;
578   }
579 
580   GST_OBJECT_UNLOCK (sink);
581 }
582 
583 static void
gst_curl_base_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)584 gst_curl_base_sink_get_property (GObject * object, guint prop_id,
585     GValue * value, GParamSpec * pspec)
586 {
587   GstCurlBaseSink *sink;
588 
589   g_return_if_fail (GST_IS_CURL_BASE_SINK (object));
590   sink = GST_CURL_BASE_SINK (object);
591 
592   switch (prop_id) {
593     case PROP_LOCATION:
594       g_value_set_string (value, sink->url);
595       break;
596     case PROP_USER_NAME:
597       g_value_set_string (value, sink->user);
598       break;
599     case PROP_USER_PASSWD:
600       g_value_set_string (value, sink->passwd);
601       break;
602     case PROP_FILE_NAME:
603       g_value_set_string (value, sink->file_name);
604       break;
605     case PROP_TIMEOUT:
606       g_value_set_int (value, sink->timeout);
607       break;
608     case PROP_QOS_DSCP:
609       g_value_set_int (value, sink->qos_dscp);
610       break;
611     default:
612       GST_DEBUG_OBJECT (sink, "invalid property id");
613       break;
614   }
615 }
616 
617 static gboolean
gst_curl_base_sink_transfer_set_common_options_unlocked(GstCurlBaseSink * sink)618 gst_curl_base_sink_transfer_set_common_options_unlocked (GstCurlBaseSink * sink)
619 {
620   GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
621   CURLcode res;
622 
623 #ifndef GST_DISABLE_GST_DEBUG
624   res = curl_easy_setopt (sink->curl, CURLOPT_VERBOSE, 1);
625   if (res != CURLE_OK) {
626     sink->error = g_strdup_printf ("failed to set verbose: %s",
627         curl_easy_strerror (res));
628     return FALSE;
629   }
630   res = curl_easy_setopt (sink->curl, CURLOPT_DEBUGDATA, sink);
631   if (res != CURLE_OK) {
632     sink->error = g_strdup_printf ("failed to set debug user_data: %s",
633         curl_easy_strerror (res));
634     return FALSE;
635   }
636   res = curl_easy_setopt (sink->curl, CURLOPT_DEBUGFUNCTION,
637       gst_curl_base_sink_debug_cb);
638   if (res != CURLE_OK) {
639     sink->error = g_strdup_printf ("failed to set debug functions: %s",
640         curl_easy_strerror (res));
641     return FALSE;
642   }
643 #endif
644 
645   res = curl_easy_setopt (sink->curl, CURLOPT_URL, sink->url);
646   if (res != CURLE_OK) {
647     sink->error = g_strdup_printf ("failed to set URL: %s",
648         curl_easy_strerror (res));
649     return FALSE;
650   }
651 
652   res = curl_easy_setopt (sink->curl, CURLOPT_CONNECTTIMEOUT, sink->timeout);
653   if (res != CURLE_OK) {
654     sink->error = g_strdup_printf ("failed to set connection timeout: %s",
655         curl_easy_strerror (res));
656     return FALSE;
657   }
658 
659   /* using signals in a multi-threaded application is dangerous */
660   res = curl_easy_setopt (sink->curl, CURLOPT_NOSIGNAL, 1);
661   if (res != CURLE_OK) {
662     sink->error = g_strdup_printf ("failed to set no signalling: %s",
663         curl_easy_strerror (res));
664     return FALSE;
665   }
666 
667   /* socket settings */
668   res = curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTDATA, sink);
669   if (res != CURLE_OK) {
670     sink->error = g_strdup_printf ("failed to set sockopt user data: %s",
671         curl_easy_strerror (res));
672     return FALSE;
673   }
674   res = curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTFUNCTION,
675       gst_curl_base_sink_transfer_socket_cb);
676   if (res != CURLE_OK) {
677     sink->error = g_strdup_printf ("failed to set sockopt function: %s",
678         curl_easy_strerror (res));
679     return FALSE;
680   }
681 
682   res = curl_easy_setopt (sink->curl, CURLOPT_READDATA, sink);
683   if (res != CURLE_OK) {
684     sink->error = g_strdup_printf ("failed to set read user data: %s",
685         curl_easy_strerror (res));
686     return FALSE;
687   }
688   res = curl_easy_setopt (sink->curl, CURLOPT_READFUNCTION,
689       klass->transfer_read_cb);
690   if (res != CURLE_OK) {
691     sink->error = g_strdup_printf ("failed to set read function: %s",
692         curl_easy_strerror (res));
693     return FALSE;
694   }
695 
696   res = curl_easy_setopt (sink->curl, CURLOPT_WRITEDATA, sink);
697   if (res != CURLE_OK) {
698     sink->error = g_strdup_printf ("failed to set write user data: %s",
699         curl_easy_strerror (res));
700     return FALSE;
701   }
702   res = curl_easy_setopt (sink->curl, CURLOPT_WRITEFUNCTION,
703       gst_curl_base_sink_transfer_write_cb);
704   if (res != CURLE_OK) {
705     sink->error = g_strdup_printf ("failed to set write function: %s",
706         curl_easy_strerror (res));
707     return FALSE;
708   }
709 
710   res = curl_easy_setopt (sink->curl, CURLOPT_SEEKDATA, sink);
711   if (res != CURLE_OK) {
712     sink->error = g_strdup_printf ("failed to set seek user data: %s",
713         curl_easy_strerror (res));
714     return FALSE;
715   }
716   res = curl_easy_setopt (sink->curl, CURLOPT_SEEKFUNCTION,
717       gst_curl_base_sink_transfer_seek_cb);
718   if (res != CURLE_OK) {
719     sink->error = g_strdup_printf ("failed to set seek function: %s",
720         curl_easy_strerror (res));
721     return FALSE;
722   }
723 
724   /* Time out in case transfer speed in bytes per second stay below
725    * CURLOPT_LOW_SPEED_LIMIT during CURLOPT_LOW_SPEED_TIME */
726   res = curl_easy_setopt (sink->curl, CURLOPT_LOW_SPEED_LIMIT, 1L);
727   if (res != CURLE_OK) {
728     sink->error = g_strdup_printf ("failed to set low speed limit: %s",
729         curl_easy_strerror (res));
730     return FALSE;
731   }
732   res = curl_easy_setopt (sink->curl, CURLOPT_LOW_SPEED_TIME,
733       (long) sink->timeout);
734   if (res != CURLE_OK) {
735     sink->error = g_strdup_printf ("failed to set low speed time: %s",
736         curl_easy_strerror (res));
737     return FALSE;
738   }
739 
740   GST_LOG ("common options set");
741   return TRUE;
742 }
743 
744 static gboolean
gst_curl_base_sink_transfer_set_options_unlocked(GstCurlBaseSink * sink)745 gst_curl_base_sink_transfer_set_options_unlocked (GstCurlBaseSink * sink)
746 {
747   GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
748   CURLcode res;
749 
750   if (!gst_curl_base_sink_transfer_set_common_options_unlocked (sink)) {
751     return FALSE;
752   }
753 
754   /* authentication settings */
755   if (sink->user != NULL && strlen (sink->user)) {
756     res = curl_easy_setopt (sink->curl, CURLOPT_USERNAME, sink->user);
757     if (res != CURLE_OK) {
758       sink->error = g_strdup_printf ("failed to set user name: %s",
759           curl_easy_strerror (res));
760       return FALSE;
761     }
762     res = curl_easy_setopt (sink->curl, CURLOPT_PASSWORD, sink->passwd);
763     if (res != CURLE_OK) {
764       sink->error = g_strdup_printf ("failed to set password: %s",
765           curl_easy_strerror (res));
766       return FALSE;
767     }
768   }
769 
770   if (klass->set_options_unlocked) {
771     return klass->set_options_unlocked (sink);
772   } else {
773     return FALSE;
774   }
775 }
776 
777 static size_t
transfer_data_buffer(void * curl_ptr,TransferBuffer * buf,size_t max_bytes_to_send,guint * last_chunk)778 transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
779     size_t max_bytes_to_send, guint * last_chunk)
780 {
781   guint buf_len = buf->len;
782   size_t bytes_to_send = MIN (max_bytes_to_send, buf->len);
783 
784   memcpy ((guint8 *) curl_ptr, buf->ptr + buf->offset, bytes_to_send);
785   buf->offset = buf->offset + bytes_to_send;
786   buf->len = buf->len - bytes_to_send;
787 
788   /* the last data chunk */
789   if (bytes_to_send == buf_len) {
790     buf->offset = 0;
791     buf->len = 0;
792     *last_chunk = 1;
793   }
794 
795   GST_LOG ("sent : %" G_GSIZE_FORMAT, bytes_to_send);
796 
797   return bytes_to_send;
798 }
799 
800 static size_t
gst_curl_base_sink_transfer_data_buffer(GstCurlBaseSink * sink,void * curl_ptr,size_t block_size,guint * last_chunk)801 gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink,
802     void *curl_ptr, size_t block_size, guint * last_chunk)
803 {
804   TransferBuffer *buffer;
805 
806   buffer = sink->transfer_buf;
807   GST_LOG ("write buf len=%" G_GSIZE_FORMAT ", offset=%" G_GSIZE_FORMAT,
808       buffer->len, buffer->offset);
809 
810   if (buffer->len <= 0) {
811     GST_WARNING ("got zero- or negative-length buffer");
812 
813     return 0;
814   }
815 
816   /* more data in buffer(s) */
817   return transfer_data_buffer (curl_ptr, buffer, block_size, last_chunk);
818 }
819 
820 static size_t
gst_curl_base_sink_transfer_read_cb(void * curl_ptr,size_t size,size_t nmemb,void * stream)821 gst_curl_base_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb,
822     void *stream)
823 {
824   GstCurlBaseSink *sink;
825   GstCurlBaseSinkClass *klass;
826   size_t max_bytes_to_send;
827   size_t bytes_to_send;
828   guint last_chunk = 0;
829 
830   sink = (GstCurlBaseSink *) stream;
831   klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
832 
833   max_bytes_to_send = size * nmemb;
834 
835   /* wait for data to come available, if new file or thread close is set
836    * then zero will be returned to indicate end of current transfer */
837   GST_OBJECT_LOCK (sink);
838   if (gst_curl_base_sink_wait_for_data_unlocked (sink) == FALSE) {
839 
840     if (gst_curl_base_sink_has_buffered_data_unlocked (sink) &&
841         sink->transfer_thread_close) {
842       GST_WARNING_OBJECT (sink,
843           "discarding render data due to thread close flag");
844 
845       GST_OBJECT_UNLOCK (sink);
846       return CURL_READFUNC_ABORT;
847     }
848 
849     if (klass->flush_data_unlocked) {
850       bytes_to_send = klass->flush_data_unlocked (sink, curl_ptr,
851           max_bytes_to_send, sink->new_file, sink->transfer_thread_close);
852 
853       GST_OBJECT_UNLOCK (sink);
854 
855       return bytes_to_send;
856     }
857 
858     GST_OBJECT_UNLOCK (sink);
859     GST_LOG ("returning 0, no more data to send in this file");
860 
861     return 0;
862   }
863 
864   GST_OBJECT_UNLOCK (sink);
865 
866   bytes_to_send = klass->transfer_data_buffer (sink, curl_ptr,
867       max_bytes_to_send, &last_chunk);
868 
869   /* the last data chunk */
870   if (last_chunk) {
871     gst_curl_base_sink_data_sent_notify (sink);
872   }
873 
874   return bytes_to_send;
875 }
876 
877 static size_t
gst_curl_base_sink_transfer_write_cb(void G_GNUC_UNUSED * ptr,size_t size,size_t nmemb,void G_GNUC_UNUSED * stream)878 gst_curl_base_sink_transfer_write_cb (void G_GNUC_UNUSED * ptr, size_t size,
879     size_t nmemb, void G_GNUC_UNUSED * stream)
880 {
881   GstCurlBaseSink *sink;
882   GstCurlBaseSinkClass *klass;
883   size_t realsize = size * nmemb;
884 
885   sink = (GstCurlBaseSink *) stream;
886   klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
887 
888   if (klass->transfer_verify_response_code) {
889     if (!klass->transfer_verify_response_code (sink)) {
890       GST_DEBUG_OBJECT (sink, "response error");
891       GST_OBJECT_LOCK (sink);
892       sink->flow_ret = GST_FLOW_ERROR;
893       GST_OBJECT_UNLOCK (sink);
894     }
895   }
896 
897   GST_DEBUG ("response %s", (gchar *) ptr);
898 
899   return realsize;
900 }
901 
902 static int
gst_curl_base_sink_transfer_seek_cb(void * stream,curl_off_t offset,int origin)903 gst_curl_base_sink_transfer_seek_cb (void *stream, curl_off_t offset,
904     int origin)
905 {
906   GstCurlBaseSink *sink;
907   curl_off_t buf_size;
908 
909   /*
910    *  Origin is SEEK_SET, SEEK_CUR or SEEK_END,
911    *  libcurl currently only passes SEEK_SET.
912    */
913 
914   sink = (GstCurlBaseSink *) stream;
915 
916   GST_OBJECT_LOCK (sink);
917   buf_size = sink->transfer_buf->offset + sink->transfer_buf->len;
918 
919   switch (origin) {
920     case SEEK_SET:
921       if ((0 <= offset) && (offset <= buf_size)) {
922         sink->transfer_buf->offset = offset;
923         sink->transfer_buf->len = buf_size - offset;
924       } else {
925         GST_OBJECT_UNLOCK (sink);
926         return CURL_SEEKFUNC_FAIL;
927       }
928       break;
929     case SEEK_CUR:
930     case SEEK_END:
931     default:
932       GST_OBJECT_UNLOCK (sink);
933       return CURL_SEEKFUNC_FAIL;
934       break;
935   }
936 
937   GST_OBJECT_UNLOCK (sink);
938   return CURL_SEEKFUNC_OK;
939 }
940 
941 CURLcode
gst_curl_base_sink_transfer_check(GstCurlBaseSink * sink)942 gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink)
943 {
944   CURLcode code = CURLE_OK;
945   CURL *easy;
946   CURLMsg *msg;
947   gint msgs_left;
948   gchar *eff_url = NULL;
949 
950   do {
951     easy = NULL;
952     while ((msg = curl_multi_info_read (sink->multi_handle, &msgs_left))) {
953       if (msg->msg == CURLMSG_DONE) {
954         easy = msg->easy_handle;
955         code = msg->data.result;
956         break;
957       }
958     }
959     if (easy) {
960       curl_easy_getinfo (easy, CURLINFO_EFFECTIVE_URL, &eff_url);
961       GST_DEBUG ("transfer done %s (%s-%d)", eff_url,
962           curl_easy_strerror (code), code);
963     }
964   } while (easy);
965 
966   return code;
967 }
968 
969 static void
handle_transfer(GstCurlBaseSink * sink)970 handle_transfer (GstCurlBaseSink * sink)
971 {
972   GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
973   gint retval;
974   gint activated_fds;
975   gint running_handles;
976   gint timeout;
977   CURLMcode m_code;
978   CURLcode e_code;
979 
980   GST_OBJECT_LOCK (sink);
981   timeout = sink->timeout;
982   GST_OBJECT_UNLOCK (sink);
983 
984   GST_DEBUG_OBJECT (sink, "handling transfers");
985 
986   /* Receiving CURLM_CALL_MULTI_PERFORM means that libcurl may have more data
987      available to send or receive - call simply curl_multi_perform before
988      poll() on more actions */
989   do {
990     m_code = curl_multi_perform (sink->multi_handle, &running_handles);
991   } while (m_code == CURLM_CALL_MULTI_PERFORM);
992   GST_DEBUG_OBJECT (sink, "running handles: %d", running_handles);
993 
994   while (running_handles && (m_code == CURLM_OK)) {
995     if (klass->transfer_prepare_poll_wait) {
996       klass->transfer_prepare_poll_wait (sink);
997     }
998 
999     activated_fds = gst_poll_wait (sink->fdset, timeout * GST_SECOND);
1000     if (G_UNLIKELY (activated_fds == -1)) {
1001       if (errno == EAGAIN || errno == EINTR) {
1002         GST_DEBUG_OBJECT (sink, "interrupted by signal");
1003       } else if (errno == EBUSY) {
1004         GST_DEBUG_OBJECT (sink, "poll stopped");
1005         retval = GST_FLOW_EOS;
1006 
1007         GST_OBJECT_LOCK (sink);
1008         if (gst_curl_base_sink_has_buffered_data_unlocked (sink))
1009           GST_WARNING_OBJECT (sink,
1010               "discarding render data due to thread close flag");
1011         GST_OBJECT_UNLOCK (sink);
1012 
1013         goto fail;
1014       } else {
1015         sink->error = g_strdup_printf ("poll failed: %s", g_strerror (errno));
1016         retval = GST_FLOW_ERROR;
1017         goto fail;
1018       }
1019     } else if (G_UNLIKELY (activated_fds == 0)) {
1020       sink->error = g_strdup_printf ("poll timed out after %" GST_TIME_FORMAT,
1021           GST_TIME_ARGS (timeout * GST_SECOND));
1022       retval = GST_FLOW_ERROR;
1023       goto fail;
1024     }
1025 
1026     /* readable/writable sockets */
1027     do {
1028       m_code = curl_multi_perform (sink->multi_handle, &running_handles);
1029     } while (m_code == CURLM_CALL_MULTI_PERFORM);
1030     GST_DEBUG_OBJECT (sink, "running handles: %d", running_handles);
1031   }
1032 
1033   if (m_code != CURLM_OK) {
1034     sink->error = g_strdup_printf ("failed to write data: %s",
1035         curl_multi_strerror (m_code));
1036     retval = GST_FLOW_ERROR;
1037     goto fail;
1038   }
1039 
1040   /* problems still might have occurred on individual transfers even when
1041    * curl_multi_perform returns CURLM_OK */
1042   if ((e_code = gst_curl_base_sink_transfer_check (sink)) != CURLE_OK) {
1043     sink->error = g_strdup_printf ("failed to transfer data: %s",
1044         curl_easy_strerror (e_code));
1045     retval = GST_FLOW_ERROR;
1046     goto fail;
1047   }
1048 
1049   gst_curl_base_sink_got_response_notify (sink);
1050 
1051   GST_OBJECT_LOCK (sink);
1052   if (sink->socket_type == CURLSOCKTYPE_ACCEPT) {
1053     /* FIXME: remove this again once we can depend on libcurl > 7.44.0,
1054      * see https://github.com/bagder/curl/issues/405.
1055      */
1056     if (G_UNLIKELY (sink->fd.fd < 0)) {
1057       sink->error = g_strdup_printf ("unknown error");
1058       retval = GST_FLOW_ERROR;
1059       GST_OBJECT_UNLOCK (sink);
1060       goto fail;
1061     }
1062     if (!gst_poll_remove_fd (sink->fdset, &sink->fd)) {
1063       sink->error = g_strdup_printf ("failed to remove fd");
1064       retval = GST_FLOW_ERROR;
1065       GST_OBJECT_UNLOCK (sink);
1066       goto fail;
1067     }
1068     sink->fd.fd = -1;
1069   }
1070   GST_OBJECT_UNLOCK (sink);
1071 
1072   return;
1073 
1074 fail:
1075   GST_OBJECT_LOCK (sink);
1076   if (sink->flow_ret == GST_FLOW_OK) {
1077     sink->flow_ret = retval;
1078   }
1079   GST_OBJECT_UNLOCK (sink);
1080   return;
1081 }
1082 
1083 #ifndef GST_DISABLE_GST_DEBUG
1084 static int
gst_curl_base_sink_debug_cb(CURL * handle,curl_infotype type,char * data,size_t size,void * clientp)1085 gst_curl_base_sink_debug_cb (CURL * handle, curl_infotype type, char *data,
1086     size_t size, void *clientp)
1087 {
1088   GstCurlBaseSink *sink = (GstCurlBaseSink *) clientp;
1089   gchar *msg = NULL;
1090 
1091   switch (type) {
1092     case CURLINFO_TEXT:
1093     case CURLINFO_HEADER_IN:
1094     case CURLINFO_HEADER_OUT:
1095       msg = g_memdup2 (data, size);
1096       if (size > 0) {
1097         msg[size - 1] = '\0';
1098         g_strchomp (msg);
1099       }
1100       break;
1101     default:
1102       break;
1103   }
1104 
1105   switch (type) {
1106     case CURLINFO_TEXT:
1107       GST_DEBUG_OBJECT (sink, "%s", msg);
1108       break;
1109     case CURLINFO_HEADER_IN:
1110       GST_DEBUG_OBJECT (sink, "incoming header: %s", msg);
1111       break;
1112     case CURLINFO_HEADER_OUT:
1113       GST_DEBUG_OBJECT (sink, "outgoing header: %s", msg);
1114       break;
1115     case CURLINFO_DATA_IN:
1116       GST_MEMDUMP_OBJECT (sink, "incoming data", (guint8 *) data, size);
1117       break;
1118     case CURLINFO_DATA_OUT:
1119       GST_MEMDUMP_OBJECT (sink, "outgoing data", (guint8 *) data, size);
1120       break;
1121     case CURLINFO_SSL_DATA_IN:
1122       GST_MEMDUMP_OBJECT (sink, "incoming ssl data", (guint8 *) data, size);
1123       break;
1124     case CURLINFO_SSL_DATA_OUT:
1125       GST_MEMDUMP_OBJECT (sink, "outgoing ssl data", (guint8 *) data, size);
1126       break;
1127     default:
1128       GST_DEBUG_OBJECT (sink, "unknown debug info type %d", type);
1129       GST_MEMDUMP_OBJECT (sink, "unknown data", (guint8 *) data, size);
1130       break;
1131   }
1132   g_free (msg);
1133   return 0;
1134 }
1135 #endif
1136 
1137 /* This function gets called by libcurl after the socket() call but before
1138  * the connect() call. */
1139 static int
gst_curl_base_sink_transfer_socket_cb(void * clientp,curl_socket_t curlfd,curlsocktype socket_type)1140 gst_curl_base_sink_transfer_socket_cb (void *clientp, curl_socket_t curlfd,
1141     curlsocktype socket_type)
1142 {
1143   GstCurlBaseSink *sink;
1144   gboolean ret = TRUE;
1145 
1146   sink = (GstCurlBaseSink *) clientp;
1147 
1148   g_assert (sink);
1149 
1150   if (curlfd < 0) {
1151     /* signal an unrecoverable error to the library which will close the socket
1152        and return CURLE_COULDNT_CONNECT
1153      */
1154     GST_DEBUG_OBJECT (sink, "no curlfd");
1155     return 1;
1156   }
1157 
1158   GST_OBJECT_LOCK (sink);
1159   sink->socket_type = socket_type;
1160 
1161   if (sink->fd.fd != curlfd) {
1162     if (sink->fd.fd > 0 && sink->socket_type != CURLSOCKTYPE_ACCEPT) {
1163       ret &= gst_poll_remove_fd (sink->fdset, &sink->fd);
1164     }
1165     sink->fd.fd = curlfd;
1166     ret &= gst_poll_add_fd (sink->fdset, &sink->fd);
1167     ret &= gst_poll_fd_ctl_write (sink->fdset, &sink->fd, TRUE);
1168     ret &= gst_poll_fd_ctl_read (sink->fdset, &sink->fd, TRUE);
1169   }
1170   GST_DEBUG_OBJECT (sink, "fd: %d", sink->fd.fd);
1171   gst_curl_base_sink_setup_dscp_unlocked (sink);
1172   GST_OBJECT_UNLOCK (sink);
1173 
1174   /* success */
1175   return ret ? 0 : 1;
1176 }
1177 
1178 static gboolean
gst_curl_base_sink_transfer_start_unlocked(GstCurlBaseSink * sink)1179 gst_curl_base_sink_transfer_start_unlocked (GstCurlBaseSink * sink)
1180 {
1181   GError *error = NULL;
1182   gboolean ret = TRUE;
1183 
1184   GST_LOG ("creating transfer thread");
1185   sink->transfer_thread_close = FALSE;
1186   sink->new_file = TRUE;
1187   sink->transfer_thread = g_thread_try_new ("curl-transfer", (GThreadFunc)
1188       gst_curl_base_sink_transfer_thread_func, sink, &error);
1189 
1190   if (sink->transfer_thread == NULL || error != NULL) {
1191     ret = FALSE;
1192     if (error) {
1193       GST_ERROR_OBJECT (sink, "could not create thread %s", error->message);
1194       g_error_free (error);
1195     } else {
1196       GST_ERROR_OBJECT (sink, "could not create thread for unknown reason");
1197     }
1198   }
1199 
1200   return ret;
1201 }
1202 
1203 static gpointer
gst_curl_base_sink_transfer_thread_func(gpointer data)1204 gst_curl_base_sink_transfer_thread_func (gpointer data)
1205 {
1206   GstCurlBaseSink *sink = (GstCurlBaseSink *) data;
1207   GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
1208   GstFlowReturn ret;
1209   gboolean data_available;
1210 
1211   GST_LOG ("transfer thread started");
1212   GST_OBJECT_LOCK (sink);
1213   if (!gst_curl_base_sink_transfer_setup_unlocked (sink)) {
1214     /* no need to set sink->error, as it is set by the called function */
1215     sink->flow_ret = GST_FLOW_ERROR;
1216     goto done;
1217   }
1218 
1219   while (!sink->transfer_thread_close && sink->flow_ret == GST_FLOW_OK) {
1220     /* we are working on a new file, clearing flag and setting a new file
1221      * name */
1222     sink->new_file = FALSE;
1223 
1224     /* wait for data to arrive for this new file, if we get a new file name
1225      * again before getting data we will simply skip transferring anything
1226      * for this file and go directly to the new file */
1227     data_available = gst_curl_base_sink_wait_for_data_unlocked (sink);
1228     if (data_available) {
1229       if (G_UNLIKELY (!klass->set_protocol_dynamic_options_unlocked (sink))) {
1230         sink->error = g_strdup ("unexpected state");
1231         sink->flow_ret = GST_FLOW_ERROR;
1232         goto done;
1233       }
1234     }
1235 
1236     /* stay unlocked while handling the actual transfer */
1237     GST_OBJECT_UNLOCK (sink);
1238 
1239     if (data_available) {
1240       GST_LOG ("have data");
1241       if (!gst_curl_base_sink_is_live (sink)) {
1242         /* prepare transfer if needed */
1243         if (klass->prepare_transfer) {
1244           GST_OBJECT_LOCK (sink);
1245           if (!klass->prepare_transfer (sink)) {
1246             sink->flow_ret = GST_FLOW_ERROR;
1247             goto done;
1248           }
1249           GST_OBJECT_UNLOCK (sink);
1250         }
1251         GST_LOG ("adding handle");
1252         curl_multi_add_handle (sink->multi_handle, sink->curl);
1253       }
1254 
1255       /* Start driving the transfer. */
1256       klass->handle_transfer (sink);
1257 
1258       /* easy handle will be possibly re-used for next transfer, thus it needs
1259        * to be removed from the multi stack and re-added again */
1260       if (!gst_curl_base_sink_is_live (sink)) {
1261         GST_LOG ("removing handle");
1262         curl_multi_remove_handle (sink->multi_handle, sink->curl);
1263       }
1264     } else {
1265       GST_LOG ("have no data yet");
1266     }
1267 
1268     /* lock again before looping to check the thread closed flag */
1269     GST_OBJECT_LOCK (sink);
1270   }
1271 
1272   if (sink->is_live) {
1273     GST_LOG ("removing handle");
1274     curl_multi_remove_handle (sink->multi_handle, sink->curl);
1275   }
1276 
1277 done:
1278   gst_curl_base_sink_transfer_cleanup (sink);
1279 
1280   /* extract the error code so the lock does not have to be
1281    * taken when calling the functions below that take the lock
1282    * on their own */
1283   ret = sink->flow_ret;
1284   GST_OBJECT_UNLOCK (sink);
1285 
1286   /* if there is a flow error, always notify the render function so it
1287    * can return the flow error up along the pipeline. as an error has
1288    * occurred there is no response to receive, so notify the event function
1289    * so it doesn't block indefinitely waiting for a response. */
1290   if (ret != GST_FLOW_OK) {
1291     gst_curl_base_sink_data_sent_notify (sink);
1292     gst_curl_base_sink_got_response_notify (sink);
1293   }
1294 
1295   GST_DEBUG ("exit thread func - transfer thread close flag: %d",
1296       sink->transfer_thread_close);
1297 
1298   return NULL;
1299 }
1300 
1301 static gboolean
gst_curl_base_sink_transfer_setup_unlocked(GstCurlBaseSink * sink)1302 gst_curl_base_sink_transfer_setup_unlocked (GstCurlBaseSink * sink)
1303 {
1304   g_assert (sink);
1305 
1306   if (sink->curl == NULL) {
1307     /* curl_easy_init automatically calls curl_global_init(3) */
1308     if ((sink->curl = curl_easy_init ()) == NULL) {
1309       sink->error = g_strdup ("failed to init curl easy handle");
1310       return FALSE;
1311     }
1312   }
1313 
1314   if (!gst_curl_base_sink_transfer_set_options_unlocked (sink)) {
1315     if (!sink->error) {
1316       sink->error = g_strdup ("failed to setup curl easy handle");
1317     }
1318     return FALSE;
1319   }
1320 
1321   /* init a multi stack (non-blocking interface to libcurl) */
1322   if (sink->multi_handle == NULL) {
1323     if ((sink->multi_handle = curl_multi_init ()) == NULL) {
1324       sink->error = g_strdup ("failed to init curl multi handle");
1325       return FALSE;
1326     }
1327   }
1328 
1329   GST_LOG ("transfer setup done");
1330   return TRUE;
1331 }
1332 
1333 static void
gst_curl_base_sink_transfer_cleanup(GstCurlBaseSink * sink)1334 gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink)
1335 {
1336   if (sink->curl != NULL) {
1337     if (sink->multi_handle != NULL) {
1338       curl_multi_remove_handle (sink->multi_handle, sink->curl);
1339     }
1340     curl_easy_cleanup (sink->curl);
1341     sink->curl = NULL;
1342   }
1343 
1344   if (sink->multi_handle != NULL) {
1345     curl_multi_cleanup (sink->multi_handle);
1346     sink->multi_handle = NULL;
1347   }
1348 }
1349 
1350 static gboolean
gst_curl_base_sink_wait_for_data_unlocked(GstCurlBaseSink * sink)1351 gst_curl_base_sink_wait_for_data_unlocked (GstCurlBaseSink * sink)
1352 {
1353   gboolean data_available = FALSE;
1354 
1355   GST_LOG ("waiting for data");
1356   while (!sink->transfer_cond->data_available &&
1357       !sink->transfer_thread_close && !sink->new_file) {
1358     g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
1359   }
1360 
1361   if (sink->transfer_thread_close) {
1362     GST_LOG ("wait for data aborted due to thread close");
1363   } else if (sink->new_file) {
1364     GST_LOG ("wait for data aborted due to new file name");
1365   } else {
1366     GST_LOG ("wait for data completed");
1367     data_available = TRUE;
1368   }
1369 
1370   return data_available;
1371 }
1372 
1373 static void
gst_curl_base_sink_new_file_notify_unlocked(GstCurlBaseSink * sink)1374 gst_curl_base_sink_new_file_notify_unlocked (GstCurlBaseSink * sink)
1375 {
1376   GST_LOG ("new file name");
1377   sink->new_file = TRUE;
1378   g_cond_signal (&sink->transfer_cond->cond);
1379 }
1380 
1381 static void
gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked(GstCurlBaseSink * sink)1382     gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked
1383     (GstCurlBaseSink * sink)
1384 {
1385   GST_LOG ("waiting for buffer send to complete");
1386 
1387   /* this function should not check if the transfer thread is set to be closed
1388    * since that flag only can be set by the EOS event (by the pipeline thread).
1389    * This can therefore never happen while this function is running since this
1390    * function also is called by the pipeline thread (in the render function) */
1391   while (!sink->transfer_cond->data_sent) {
1392     g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
1393   }
1394   GST_LOG ("buffer send completed");
1395 }
1396 
1397 static void
gst_curl_base_sink_data_sent_notify(GstCurlBaseSink * sink)1398 gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink)
1399 {
1400   GST_LOG ("transfer completed");
1401   GST_OBJECT_LOCK (sink);
1402   sink->transfer_cond->data_available = FALSE;
1403   sink->transfer_cond->data_sent = TRUE;
1404   g_cond_signal (&sink->transfer_cond->cond);
1405   GST_OBJECT_UNLOCK (sink);
1406 }
1407 
1408 static void
gst_curl_base_sink_wait_for_response(GstCurlBaseSink * sink)1409 gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink)
1410 {
1411   GST_LOG ("waiting for remote to send response code");
1412 
1413   GST_OBJECT_LOCK (sink);
1414   while (sink->transfer_cond->wait_for_response) {
1415     g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
1416   }
1417   GST_OBJECT_UNLOCK (sink);
1418 
1419   GST_LOG ("response code received");
1420 }
1421 
1422 static void
gst_curl_base_sink_got_response_notify(GstCurlBaseSink * sink)1423 gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink)
1424 {
1425   GST_LOG ("got response code");
1426 
1427   GST_OBJECT_LOCK (sink);
1428   sink->transfer_cond->wait_for_response = FALSE;
1429   g_cond_signal (&sink->transfer_cond->cond);
1430   GST_OBJECT_UNLOCK (sink);
1431 }
1432 
1433 static gint
gst_curl_base_sink_setup_dscp_unlocked(GstCurlBaseSink * sink)1434 gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink)
1435 {
1436   gint tos;
1437   gint af;
1438   gint ret = -1;
1439   union
1440   {
1441     struct sockaddr sa;
1442     struct sockaddr_in6 sa_in6;
1443     struct sockaddr_storage sa_stor;
1444   } sa;
1445   socklen_t slen = sizeof (sa);
1446 
1447   if (getsockname (sink->fd.fd, &sa.sa, &slen) < 0) {
1448     GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
1449     return ret;
1450   }
1451   af = sa.sa.sa_family;
1452 
1453   /* if this is an IPv4-mapped address then do IPv4 QoS */
1454   if (af == AF_INET6) {
1455     GST_DEBUG_OBJECT (sink, "check IP6 socket");
1456     if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
1457       GST_DEBUG_OBJECT (sink, "mapped to IPV4");
1458       af = AF_INET;
1459     }
1460   }
1461   /* extract and shift 6 bits of the DSCP */
1462   tos = (sink->qos_dscp & 0x3f) << 2;
1463 
1464   switch (af) {
1465     case AF_INET:
1466       ret = setsockopt (sink->fd.fd, IPPROTO_IP, IP_TOS, (void *) &tos,
1467           sizeof (tos));
1468       break;
1469     case AF_INET6:
1470 #ifdef IPV6_TCLASS
1471       ret = setsockopt (sink->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, (void *) &tos,
1472           sizeof (tos));
1473       break;
1474 #endif
1475     default:
1476       GST_ERROR_OBJECT (sink, "unsupported AF");
1477       break;
1478   }
1479   if (ret) {
1480     GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
1481   }
1482 
1483   return ret;
1484 }
1485