1 /* GStreamer
2 * Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
18 */
19
20 /**
21 * SECTION:element-curlsink
22 * @title: curlsink
23 * @short_description: sink that uploads data to a server using libcurl
24 *
25 * This is a network sink that uses libcurl as a client to upload data to
26 * a server (e.g. a HTTP/FTP server).
27 *
28 * ## Example launch line (upload a JPEG file to an HTTP server)
29 * |[
30 * gst-launch-1.0 filesrc location=image.jpg ! jpegparse ! curlsink \
31 * file-name=image.jpg \
32 * location=http://192.168.0.1:8080/cgi-bin/patupload.cgi/ \
33 * user=test passwd=test \
34 * content-type=image/jpeg \
35 * use-content-length=false
36 * ]|
37 *
38 */
39
40 #ifdef HAVE_CONFIG_H
41 #include "config.h"
42 #endif
43
44 #include <curl/curl.h>
45 #include <string.h>
46 #include <stdio.h>
47
48 #if HAVE_SYS_SOCKET_H
49 #include <sys/socket.h>
50 #endif
51 #include <sys/types.h>
52 #if HAVE_NETINET_IN_H
53 #include <netinet/in.h>
54 #endif
55 #include <unistd.h>
56 #if HAVE_NETINET_IP_H
57 #include <netinet/ip.h>
58 #endif
59 #if HAVE_NETINET_TCP_H
60 #include <netinet/tcp.h>
61 #endif
62 #include <sys/stat.h>
63 #include <fcntl.h>
64
65 #include "gstcurlbasesink.h"
66
67 /* Default values */
68 #define GST_CAT_DEFAULT gst_curl_base_sink_debug
69 #define DEFAULT_URL "localhost:5555"
70 #define DEFAULT_TIMEOUT 30
71 #define DEFAULT_QOS_DSCP 0
72
73 #define DSCP_MIN 0
74 #define DSCP_MAX 63
75
76
77 /* Plugin specific settings */
78 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
79 GST_PAD_SINK,
80 GST_PAD_ALWAYS,
81 GST_STATIC_CAPS_ANY);
82
83 GST_DEBUG_CATEGORY_STATIC (gst_curl_base_sink_debug);
84
85 enum
86 {
87 PROP_0,
88 PROP_LOCATION,
89 PROP_USER_NAME,
90 PROP_USER_PASSWD,
91 PROP_FILE_NAME,
92 PROP_TIMEOUT,
93 PROP_QOS_DSCP
94 };
95
96 /* Object class function declarations */
97 static void gst_curl_base_sink_finalize (GObject * gobject);
98 static void gst_curl_base_sink_set_property (GObject * object, guint prop_id,
99 const GValue * value, GParamSpec * pspec);
100 static void gst_curl_base_sink_get_property (GObject * object, guint prop_id,
101 GValue * value, GParamSpec * pspec);
102
103 /* BaseSink class function declarations */
104 static GstFlowReturn gst_curl_base_sink_render (GstBaseSink * bsink,
105 GstBuffer * buf);
106 static gboolean gst_curl_base_sink_event (GstBaseSink * bsink,
107 GstEvent * event);
108 static gboolean gst_curl_base_sink_start (GstBaseSink * bsink);
109 static gboolean gst_curl_base_sink_stop (GstBaseSink * bsink);
110 static gboolean gst_curl_base_sink_unlock (GstBaseSink * bsink);
111 static gboolean gst_curl_base_sink_unlock_stop (GstBaseSink * bsink);
112
113 /* private functions */
114
115 static gboolean gst_curl_base_sink_transfer_setup_unlocked
116 (GstCurlBaseSink * sink);
117 static gboolean gst_curl_base_sink_transfer_start_unlocked
118 (GstCurlBaseSink * sink);
119 static void gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink);
120 static size_t gst_curl_base_sink_transfer_read_cb (void *ptr, size_t size,
121 size_t nmemb, void *stream);
122 static size_t gst_curl_base_sink_transfer_write_cb (void *ptr, size_t size,
123 size_t nmemb, void *stream);
124 static int gst_curl_base_sink_transfer_seek_cb (void *user_p, curl_off_t offset,
125 int origin);
126 static size_t gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink,
127 void *curl_ptr, size_t block_size, guint * last_chunk);
128 #ifndef GST_DISABLE_GST_DEBUG
129 static int gst_curl_base_sink_debug_cb (CURL * handle, curl_infotype type,
130 char *data, size_t size, void *clientp);
131 #endif
132 static int gst_curl_base_sink_transfer_socket_cb (void *clientp,
133 curl_socket_t curlfd, curlsocktype purpose);
134 static gpointer gst_curl_base_sink_transfer_thread_func (gpointer data);
135 static gint gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink);
136 static CURLcode gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink);
137
138 static gboolean gst_curl_base_sink_wait_for_data_unlocked
139 (GstCurlBaseSink * sink);
140 static void gst_curl_base_sink_new_file_notify_unlocked
141 (GstCurlBaseSink * sink);
142 static void gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked
143 (GstCurlBaseSink * sink);
144 static void gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink);
145 static void gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink);
146 static void gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink);
147
148 static void handle_transfer (GstCurlBaseSink * sink);
149 static size_t transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
150 size_t max_bytes_to_send, guint * last_chunk);
151
152 #define parent_class gst_curl_base_sink_parent_class
153 G_DEFINE_TYPE (GstCurlBaseSink, gst_curl_base_sink, GST_TYPE_BASE_SINK);
154
155 static gboolean
gst_curl_base_sink_default_has_buffered_data_unlocked(GstCurlBaseSink * sink)156 gst_curl_base_sink_default_has_buffered_data_unlocked (GstCurlBaseSink * sink)
157 {
158 return sink->transfer_buf->len > 0;
159 }
160
161 static gboolean
gst_curl_base_sink_has_buffered_data_unlocked(GstCurlBaseSink * sink)162 gst_curl_base_sink_has_buffered_data_unlocked (GstCurlBaseSink * sink)
163 {
164 GstCurlBaseSinkClass *klass;
165 gboolean res = FALSE;
166
167 klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
168
169 if (klass->has_buffered_data_unlocked)
170 res = klass->has_buffered_data_unlocked (sink);
171
172 return res;
173 }
174
175 static void
gst_curl_base_sink_class_init(GstCurlBaseSinkClass * klass)176 gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass)
177 {
178 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
179 GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass;
180 GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
181
182 GST_DEBUG_CATEGORY_INIT (gst_curl_base_sink_debug, "curlbasesink", 0,
183 "curl base sink element");
184
185 gst_element_class_set_static_metadata (element_class,
186 "Curl base sink",
187 "Sink/Network",
188 "Upload data over the network to a server using libcurl",
189 "Patricia Muscalu <patricia@axis.com>");
190
191 gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_curl_base_sink_event);
192 gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_curl_base_sink_render);
193 gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_curl_base_sink_start);
194 gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_curl_base_sink_stop);
195 gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock);
196 gstbasesink_class->unlock_stop =
197 GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock_stop);
198 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_base_sink_finalize);
199
200 gobject_class->set_property = gst_curl_base_sink_set_property;
201 gobject_class->get_property = gst_curl_base_sink_get_property;
202
203 klass->handle_transfer = handle_transfer;
204 klass->transfer_read_cb = gst_curl_base_sink_transfer_read_cb;
205 klass->transfer_data_buffer = gst_curl_base_sink_transfer_data_buffer;
206 klass->has_buffered_data_unlocked =
207 gst_curl_base_sink_default_has_buffered_data_unlocked;
208
209 /* FIXME: check against souphttpsrc and use same names for same properties */
210 g_object_class_install_property (gobject_class, PROP_LOCATION,
211 g_param_spec_string ("location", "Location",
212 "URI location to write to", NULL,
213 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
214 g_object_class_install_property (gobject_class, PROP_USER_NAME,
215 g_param_spec_string ("user", "User name",
216 "User name to use for server authentication", NULL,
217 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
218 g_object_class_install_property (gobject_class, PROP_USER_PASSWD,
219 g_param_spec_string ("passwd", "User password",
220 "User password to use for server authentication", NULL,
221 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222 g_object_class_install_property (gobject_class, PROP_FILE_NAME,
223 g_param_spec_string ("file-name", "Base file name",
224 "The base file name for the uploaded images", NULL,
225 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226 g_object_class_install_property (gobject_class, PROP_TIMEOUT,
227 g_param_spec_int ("timeout", "Timeout",
228 "Number of seconds waiting to write before timeout",
229 0, G_MAXINT, DEFAULT_TIMEOUT,
230 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231 g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
232 g_param_spec_int ("qos-dscp",
233 "QoS diff srv code point",
234 "Quality of Service, differentiated services code point (0 default)",
235 DSCP_MIN, DSCP_MAX, DEFAULT_QOS_DSCP,
236 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
237
238 gst_element_class_add_static_pad_template (element_class, &sinktemplate);
239
240 gst_type_mark_as_plugin_api (GST_TYPE_CURL_BASE_SINK, 0);
241 }
242
243 static void
gst_curl_base_sink_init(GstCurlBaseSink * sink)244 gst_curl_base_sink_init (GstCurlBaseSink * sink)
245 {
246 sink->transfer_buf = g_malloc (sizeof (TransferBuffer));
247 sink->transfer_cond = g_malloc (sizeof (TransferCondition));
248 g_cond_init (&sink->transfer_cond->cond);
249 sink->transfer_cond->data_sent = FALSE;
250 sink->transfer_cond->data_available = FALSE;
251 sink->transfer_cond->wait_for_response = FALSE;
252 sink->timeout = DEFAULT_TIMEOUT;
253 sink->qos_dscp = DEFAULT_QOS_DSCP;
254 sink->url = g_strdup (DEFAULT_URL);
255 sink->transfer_thread_close = FALSE;
256 sink->new_file = TRUE;
257 sink->error = NULL;
258 sink->flow_ret = GST_FLOW_OK;
259 sink->is_live = FALSE;
260 }
261
262 static void
gst_curl_base_sink_finalize(GObject * gobject)263 gst_curl_base_sink_finalize (GObject * gobject)
264 {
265 GstCurlBaseSink *this = GST_CURL_BASE_SINK (gobject);
266
267 GST_DEBUG ("finalizing curlsink");
268 if (this->transfer_thread != NULL) {
269 g_thread_join (this->transfer_thread);
270 }
271
272 g_cond_clear (&this->transfer_cond->cond);
273 g_free (this->transfer_cond);
274 g_free (this->transfer_buf);
275
276 g_free (this->url);
277 g_free (this->user);
278 g_free (this->passwd);
279 g_free (this->file_name);
280 if (this->fdset != NULL) {
281 gst_poll_free (this->fdset);
282 this->fdset = NULL;
283 }
284 G_OBJECT_CLASS (parent_class)->finalize (gobject);
285 }
286
287 void
gst_curl_base_sink_transfer_thread_notify_unlocked(GstCurlBaseSink * sink)288 gst_curl_base_sink_transfer_thread_notify_unlocked (GstCurlBaseSink * sink)
289 {
290 GST_LOG ("more data to send");
291
292 sink->transfer_cond->data_available = TRUE;
293 sink->transfer_cond->data_sent = FALSE;
294 sink->transfer_cond->wait_for_response = TRUE;
295 g_cond_signal (&sink->transfer_cond->cond);
296 }
297
298 void
gst_curl_base_sink_transfer_thread_close(GstCurlBaseSink * sink)299 gst_curl_base_sink_transfer_thread_close (GstCurlBaseSink * sink)
300 {
301 GST_OBJECT_LOCK (sink);
302 GST_LOG_OBJECT (sink, "setting transfer thread close flag");
303 sink->transfer_thread_close = TRUE;
304 g_cond_signal (&sink->transfer_cond->cond);
305 GST_OBJECT_UNLOCK (sink);
306
307 if (sink->transfer_thread != NULL) {
308 GST_LOG_OBJECT (sink, "waiting for transfer thread to finish");
309 g_thread_join (sink->transfer_thread);
310 sink->transfer_thread = NULL;
311 }
312 }
313
314 void
gst_curl_base_sink_set_live(GstCurlBaseSink * sink,gboolean live)315 gst_curl_base_sink_set_live (GstCurlBaseSink * sink, gboolean live)
316 {
317 g_return_if_fail (GST_IS_CURL_BASE_SINK (sink));
318
319 GST_OBJECT_LOCK (sink);
320 sink->is_live = live;
321 GST_OBJECT_UNLOCK (sink);
322 }
323
324 gboolean
gst_curl_base_sink_is_live(GstCurlBaseSink * sink)325 gst_curl_base_sink_is_live (GstCurlBaseSink * sink)
326 {
327 gboolean result;
328
329 g_return_val_if_fail (GST_IS_CURL_BASE_SINK (sink), FALSE);
330
331 GST_OBJECT_LOCK (sink);
332 result = sink->is_live;
333 GST_OBJECT_UNLOCK (sink);
334
335 return result;
336 }
337
338 static GstFlowReturn
gst_curl_base_sink_render(GstBaseSink * bsink,GstBuffer * buf)339 gst_curl_base_sink_render (GstBaseSink * bsink, GstBuffer * buf)
340 {
341 GstCurlBaseSink *sink;
342 GstMapInfo map;
343 guint8 *data;
344 size_t size;
345 GstFlowReturn ret;
346 gchar *error;
347
348 GST_LOG ("enter render");
349
350 sink = GST_CURL_BASE_SINK (bsink);
351
352 gst_buffer_map (buf, &map, GST_MAP_READ);
353 data = map.data;
354 size = map.size;
355
356 if (size == 0) {
357 gst_buffer_unmap (buf, &map);
358 return GST_FLOW_OK;
359 }
360
361 GST_OBJECT_LOCK (sink);
362
363 /* check if the transfer thread has encountered problems while the
364 * pipeline thread was working elsewhere */
365 if (sink->flow_ret != GST_FLOW_OK) {
366 goto done;
367 }
368
369 g_assert (sink->transfer_cond->data_available == FALSE);
370
371 /* if there is no transfer thread created, lets create one */
372 if (sink->transfer_thread == NULL) {
373 if (!gst_curl_base_sink_transfer_start_unlocked (sink)) {
374 sink->flow_ret = GST_FLOW_ERROR;
375 goto done;
376 }
377 }
378
379 /* make data available for the transfer thread and notify */
380 sink->transfer_buf->ptr = data;
381 sink->transfer_buf->len = size;
382 sink->transfer_buf->offset = 0;
383 gst_curl_base_sink_transfer_thread_notify_unlocked (sink);
384
385 /* wait for the transfer thread to send the data. This will be notified
386 * either when transfer is completed by the curl read callback or by
387 * the thread function if an error has occurred. */
388 gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked (sink);
389
390 done:
391 gst_buffer_unmap (buf, &map);
392
393 /* Hand over error from transfer thread to streaming thread */
394 error = sink->error;
395 sink->error = NULL;
396 ret = sink->flow_ret;
397 GST_OBJECT_UNLOCK (sink);
398
399 if (error != NULL) {
400 GST_ERROR_OBJECT (sink, "%s", error);
401 GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s", error), (NULL));
402 g_free (error);
403 }
404
405 GST_LOG ("exit render");
406
407 return ret;
408 }
409
410 static gboolean
gst_curl_base_sink_event(GstBaseSink * bsink,GstEvent * event)411 gst_curl_base_sink_event (GstBaseSink * bsink, GstEvent * event)
412 {
413 GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
414 GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
415
416 switch (event->type) {
417 case GST_EVENT_EOS:
418 GST_DEBUG_OBJECT (sink, "received EOS");
419 gst_curl_base_sink_transfer_thread_close (sink);
420 gst_curl_base_sink_wait_for_response (sink);
421 break;
422 case GST_EVENT_CAPS:
423 if (klass->set_mime_type) {
424 GstCaps *caps;
425 gst_event_parse_caps (event, &caps);
426 klass->set_mime_type (sink, caps);
427 }
428 break;
429 default:
430 break;
431 }
432
433 return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event);
434 }
435
436 static gboolean
gst_curl_base_sink_start(GstBaseSink * bsink)437 gst_curl_base_sink_start (GstBaseSink * bsink)
438 {
439 GstCurlBaseSink *sink;
440
441 sink = GST_CURL_BASE_SINK (bsink);
442
443 /* reset flags */
444 sink->transfer_cond->data_sent = FALSE;
445 sink->transfer_cond->data_available = FALSE;
446 sink->transfer_cond->wait_for_response = FALSE;
447 sink->transfer_thread_close = FALSE;
448 sink->new_file = TRUE;
449 sink->flow_ret = GST_FLOW_OK;
450
451 if ((sink->fdset = gst_poll_new (TRUE)) == NULL) {
452 GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE,
453 ("gst_poll_new failed: %s", g_strerror (errno)), (NULL));
454 return FALSE;
455 }
456
457 gst_poll_fd_init (&sink->fd);
458
459 return TRUE;
460 }
461
462 static gboolean
gst_curl_base_sink_stop(GstBaseSink * bsink)463 gst_curl_base_sink_stop (GstBaseSink * bsink)
464 {
465 GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
466
467 gst_curl_base_sink_transfer_thread_close (sink);
468 if (sink->fdset != NULL) {
469 gst_poll_free (sink->fdset);
470 sink->fdset = NULL;
471 }
472
473 return TRUE;
474 }
475
476 static gboolean
gst_curl_base_sink_unlock(GstBaseSink * bsink)477 gst_curl_base_sink_unlock (GstBaseSink * bsink)
478 {
479 GstCurlBaseSink *sink;
480
481 sink = GST_CURL_BASE_SINK (bsink);
482
483 GST_LOG_OBJECT (sink, "Flushing");
484 gst_poll_set_flushing (sink->fdset, TRUE);
485
486 return TRUE;
487 }
488
489 static gboolean
gst_curl_base_sink_unlock_stop(GstBaseSink * bsink)490 gst_curl_base_sink_unlock_stop (GstBaseSink * bsink)
491 {
492 GstCurlBaseSink *sink;
493
494 sink = GST_CURL_BASE_SINK (bsink);
495
496 GST_LOG_OBJECT (sink, "No longer flushing");
497 gst_poll_set_flushing (sink->fdset, FALSE);
498
499 return TRUE;
500 }
501
502 static void
gst_curl_base_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)503 gst_curl_base_sink_set_property (GObject * object, guint prop_id,
504 const GValue * value, GParamSpec * pspec)
505 {
506 GstCurlBaseSink *sink;
507 GstState cur_state;
508
509 g_return_if_fail (GST_IS_CURL_BASE_SINK (object));
510 sink = GST_CURL_BASE_SINK (object);
511
512 gst_element_get_state (GST_ELEMENT (sink), &cur_state, NULL, 0);
513 if (cur_state != GST_STATE_PLAYING && cur_state != GST_STATE_PAUSED) {
514 GST_OBJECT_LOCK (sink);
515
516 switch (prop_id) {
517 case PROP_LOCATION:
518 g_free (sink->url);
519 sink->url = g_value_dup_string (value);
520 GST_DEBUG_OBJECT (sink, "url set to %s", sink->url);
521 break;
522 case PROP_USER_NAME:
523 g_free (sink->user);
524 sink->user = g_value_dup_string (value);
525 GST_DEBUG_OBJECT (sink, "user set to %s", sink->user);
526 break;
527 case PROP_USER_PASSWD:
528 g_free (sink->passwd);
529 sink->passwd = g_value_dup_string (value);
530 GST_DEBUG_OBJECT (sink, "passwd set to %s", sink->passwd);
531 break;
532 case PROP_FILE_NAME:
533 g_free (sink->file_name);
534 sink->file_name = g_value_dup_string (value);
535 GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
536 break;
537 case PROP_TIMEOUT:
538 sink->timeout = g_value_get_int (value);
539 GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
540 break;
541 case PROP_QOS_DSCP:
542 sink->qos_dscp = g_value_get_int (value);
543 gst_curl_base_sink_setup_dscp_unlocked (sink);
544 GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
545 break;
546 default:
547 GST_DEBUG_OBJECT (sink, "invalid property id %d", prop_id);
548 break;
549 }
550
551 GST_OBJECT_UNLOCK (sink);
552
553 return;
554 }
555
556 /* in PLAYING or PAUSED state */
557 GST_OBJECT_LOCK (sink);
558
559 switch (prop_id) {
560 case PROP_FILE_NAME:
561 g_free (sink->file_name);
562 sink->file_name = g_value_dup_string (value);
563 GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
564 gst_curl_base_sink_new_file_notify_unlocked (sink);
565 break;
566 case PROP_TIMEOUT:
567 sink->timeout = g_value_get_int (value);
568 GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
569 break;
570 case PROP_QOS_DSCP:
571 sink->qos_dscp = g_value_get_int (value);
572 gst_curl_base_sink_setup_dscp_unlocked (sink);
573 GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
574 break;
575 default:
576 GST_WARNING_OBJECT (sink, "cannot set property when PLAYING");
577 break;
578 }
579
580 GST_OBJECT_UNLOCK (sink);
581 }
582
583 static void
gst_curl_base_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)584 gst_curl_base_sink_get_property (GObject * object, guint prop_id,
585 GValue * value, GParamSpec * pspec)
586 {
587 GstCurlBaseSink *sink;
588
589 g_return_if_fail (GST_IS_CURL_BASE_SINK (object));
590 sink = GST_CURL_BASE_SINK (object);
591
592 switch (prop_id) {
593 case PROP_LOCATION:
594 g_value_set_string (value, sink->url);
595 break;
596 case PROP_USER_NAME:
597 g_value_set_string (value, sink->user);
598 break;
599 case PROP_USER_PASSWD:
600 g_value_set_string (value, sink->passwd);
601 break;
602 case PROP_FILE_NAME:
603 g_value_set_string (value, sink->file_name);
604 break;
605 case PROP_TIMEOUT:
606 g_value_set_int (value, sink->timeout);
607 break;
608 case PROP_QOS_DSCP:
609 g_value_set_int (value, sink->qos_dscp);
610 break;
611 default:
612 GST_DEBUG_OBJECT (sink, "invalid property id");
613 break;
614 }
615 }
616
617 static gboolean
gst_curl_base_sink_transfer_set_common_options_unlocked(GstCurlBaseSink * sink)618 gst_curl_base_sink_transfer_set_common_options_unlocked (GstCurlBaseSink * sink)
619 {
620 GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
621 CURLcode res;
622
623 #ifndef GST_DISABLE_GST_DEBUG
624 res = curl_easy_setopt (sink->curl, CURLOPT_VERBOSE, 1);
625 if (res != CURLE_OK) {
626 sink->error = g_strdup_printf ("failed to set verbose: %s",
627 curl_easy_strerror (res));
628 return FALSE;
629 }
630 res = curl_easy_setopt (sink->curl, CURLOPT_DEBUGDATA, sink);
631 if (res != CURLE_OK) {
632 sink->error = g_strdup_printf ("failed to set debug user_data: %s",
633 curl_easy_strerror (res));
634 return FALSE;
635 }
636 res = curl_easy_setopt (sink->curl, CURLOPT_DEBUGFUNCTION,
637 gst_curl_base_sink_debug_cb);
638 if (res != CURLE_OK) {
639 sink->error = g_strdup_printf ("failed to set debug functions: %s",
640 curl_easy_strerror (res));
641 return FALSE;
642 }
643 #endif
644
645 res = curl_easy_setopt (sink->curl, CURLOPT_URL, sink->url);
646 if (res != CURLE_OK) {
647 sink->error = g_strdup_printf ("failed to set URL: %s",
648 curl_easy_strerror (res));
649 return FALSE;
650 }
651
652 res = curl_easy_setopt (sink->curl, CURLOPT_CONNECTTIMEOUT, sink->timeout);
653 if (res != CURLE_OK) {
654 sink->error = g_strdup_printf ("failed to set connection timeout: %s",
655 curl_easy_strerror (res));
656 return FALSE;
657 }
658
659 /* using signals in a multi-threaded application is dangerous */
660 res = curl_easy_setopt (sink->curl, CURLOPT_NOSIGNAL, 1);
661 if (res != CURLE_OK) {
662 sink->error = g_strdup_printf ("failed to set no signalling: %s",
663 curl_easy_strerror (res));
664 return FALSE;
665 }
666
667 /* socket settings */
668 res = curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTDATA, sink);
669 if (res != CURLE_OK) {
670 sink->error = g_strdup_printf ("failed to set sockopt user data: %s",
671 curl_easy_strerror (res));
672 return FALSE;
673 }
674 res = curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTFUNCTION,
675 gst_curl_base_sink_transfer_socket_cb);
676 if (res != CURLE_OK) {
677 sink->error = g_strdup_printf ("failed to set sockopt function: %s",
678 curl_easy_strerror (res));
679 return FALSE;
680 }
681
682 res = curl_easy_setopt (sink->curl, CURLOPT_READDATA, sink);
683 if (res != CURLE_OK) {
684 sink->error = g_strdup_printf ("failed to set read user data: %s",
685 curl_easy_strerror (res));
686 return FALSE;
687 }
688 res = curl_easy_setopt (sink->curl, CURLOPT_READFUNCTION,
689 klass->transfer_read_cb);
690 if (res != CURLE_OK) {
691 sink->error = g_strdup_printf ("failed to set read function: %s",
692 curl_easy_strerror (res));
693 return FALSE;
694 }
695
696 res = curl_easy_setopt (sink->curl, CURLOPT_WRITEDATA, sink);
697 if (res != CURLE_OK) {
698 sink->error = g_strdup_printf ("failed to set write user data: %s",
699 curl_easy_strerror (res));
700 return FALSE;
701 }
702 res = curl_easy_setopt (sink->curl, CURLOPT_WRITEFUNCTION,
703 gst_curl_base_sink_transfer_write_cb);
704 if (res != CURLE_OK) {
705 sink->error = g_strdup_printf ("failed to set write function: %s",
706 curl_easy_strerror (res));
707 return FALSE;
708 }
709
710 res = curl_easy_setopt (sink->curl, CURLOPT_SEEKDATA, sink);
711 if (res != CURLE_OK) {
712 sink->error = g_strdup_printf ("failed to set seek user data: %s",
713 curl_easy_strerror (res));
714 return FALSE;
715 }
716 res = curl_easy_setopt (sink->curl, CURLOPT_SEEKFUNCTION,
717 gst_curl_base_sink_transfer_seek_cb);
718 if (res != CURLE_OK) {
719 sink->error = g_strdup_printf ("failed to set seek function: %s",
720 curl_easy_strerror (res));
721 return FALSE;
722 }
723
724 /* Time out in case transfer speed in bytes per second stay below
725 * CURLOPT_LOW_SPEED_LIMIT during CURLOPT_LOW_SPEED_TIME */
726 res = curl_easy_setopt (sink->curl, CURLOPT_LOW_SPEED_LIMIT, 1L);
727 if (res != CURLE_OK) {
728 sink->error = g_strdup_printf ("failed to set low speed limit: %s",
729 curl_easy_strerror (res));
730 return FALSE;
731 }
732 res = curl_easy_setopt (sink->curl, CURLOPT_LOW_SPEED_TIME,
733 (long) sink->timeout);
734 if (res != CURLE_OK) {
735 sink->error = g_strdup_printf ("failed to set low speed time: %s",
736 curl_easy_strerror (res));
737 return FALSE;
738 }
739
740 GST_LOG ("common options set");
741 return TRUE;
742 }
743
744 static gboolean
gst_curl_base_sink_transfer_set_options_unlocked(GstCurlBaseSink * sink)745 gst_curl_base_sink_transfer_set_options_unlocked (GstCurlBaseSink * sink)
746 {
747 GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
748 CURLcode res;
749
750 if (!gst_curl_base_sink_transfer_set_common_options_unlocked (sink)) {
751 return FALSE;
752 }
753
754 /* authentication settings */
755 if (sink->user != NULL && strlen (sink->user)) {
756 res = curl_easy_setopt (sink->curl, CURLOPT_USERNAME, sink->user);
757 if (res != CURLE_OK) {
758 sink->error = g_strdup_printf ("failed to set user name: %s",
759 curl_easy_strerror (res));
760 return FALSE;
761 }
762 res = curl_easy_setopt (sink->curl, CURLOPT_PASSWORD, sink->passwd);
763 if (res != CURLE_OK) {
764 sink->error = g_strdup_printf ("failed to set password: %s",
765 curl_easy_strerror (res));
766 return FALSE;
767 }
768 }
769
770 if (klass->set_options_unlocked) {
771 return klass->set_options_unlocked (sink);
772 } else {
773 return FALSE;
774 }
775 }
776
777 static size_t
transfer_data_buffer(void * curl_ptr,TransferBuffer * buf,size_t max_bytes_to_send,guint * last_chunk)778 transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
779 size_t max_bytes_to_send, guint * last_chunk)
780 {
781 guint buf_len = buf->len;
782 size_t bytes_to_send = MIN (max_bytes_to_send, buf->len);
783
784 memcpy ((guint8 *) curl_ptr, buf->ptr + buf->offset, bytes_to_send);
785 buf->offset = buf->offset + bytes_to_send;
786 buf->len = buf->len - bytes_to_send;
787
788 /* the last data chunk */
789 if (bytes_to_send == buf_len) {
790 buf->offset = 0;
791 buf->len = 0;
792 *last_chunk = 1;
793 }
794
795 GST_LOG ("sent : %" G_GSIZE_FORMAT, bytes_to_send);
796
797 return bytes_to_send;
798 }
799
800 static size_t
gst_curl_base_sink_transfer_data_buffer(GstCurlBaseSink * sink,void * curl_ptr,size_t block_size,guint * last_chunk)801 gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink,
802 void *curl_ptr, size_t block_size, guint * last_chunk)
803 {
804 TransferBuffer *buffer;
805
806 buffer = sink->transfer_buf;
807 GST_LOG ("write buf len=%" G_GSIZE_FORMAT ", offset=%" G_GSIZE_FORMAT,
808 buffer->len, buffer->offset);
809
810 if (buffer->len <= 0) {
811 GST_WARNING ("got zero- or negative-length buffer");
812
813 return 0;
814 }
815
816 /* more data in buffer(s) */
817 return transfer_data_buffer (curl_ptr, buffer, block_size, last_chunk);
818 }
819
820 static size_t
gst_curl_base_sink_transfer_read_cb(void * curl_ptr,size_t size,size_t nmemb,void * stream)821 gst_curl_base_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb,
822 void *stream)
823 {
824 GstCurlBaseSink *sink;
825 GstCurlBaseSinkClass *klass;
826 size_t max_bytes_to_send;
827 size_t bytes_to_send;
828 guint last_chunk = 0;
829
830 sink = (GstCurlBaseSink *) stream;
831 klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
832
833 max_bytes_to_send = size * nmemb;
834
835 /* wait for data to come available, if new file or thread close is set
836 * then zero will be returned to indicate end of current transfer */
837 GST_OBJECT_LOCK (sink);
838 if (gst_curl_base_sink_wait_for_data_unlocked (sink) == FALSE) {
839
840 if (gst_curl_base_sink_has_buffered_data_unlocked (sink) &&
841 sink->transfer_thread_close) {
842 GST_WARNING_OBJECT (sink,
843 "discarding render data due to thread close flag");
844
845 GST_OBJECT_UNLOCK (sink);
846 return CURL_READFUNC_ABORT;
847 }
848
849 if (klass->flush_data_unlocked) {
850 bytes_to_send = klass->flush_data_unlocked (sink, curl_ptr,
851 max_bytes_to_send, sink->new_file, sink->transfer_thread_close);
852
853 GST_OBJECT_UNLOCK (sink);
854
855 return bytes_to_send;
856 }
857
858 GST_OBJECT_UNLOCK (sink);
859 GST_LOG ("returning 0, no more data to send in this file");
860
861 return 0;
862 }
863
864 GST_OBJECT_UNLOCK (sink);
865
866 bytes_to_send = klass->transfer_data_buffer (sink, curl_ptr,
867 max_bytes_to_send, &last_chunk);
868
869 /* the last data chunk */
870 if (last_chunk) {
871 gst_curl_base_sink_data_sent_notify (sink);
872 }
873
874 return bytes_to_send;
875 }
876
877 static size_t
gst_curl_base_sink_transfer_write_cb(void G_GNUC_UNUSED * ptr,size_t size,size_t nmemb,void G_GNUC_UNUSED * stream)878 gst_curl_base_sink_transfer_write_cb (void G_GNUC_UNUSED * ptr, size_t size,
879 size_t nmemb, void G_GNUC_UNUSED * stream)
880 {
881 GstCurlBaseSink *sink;
882 GstCurlBaseSinkClass *klass;
883 size_t realsize = size * nmemb;
884
885 sink = (GstCurlBaseSink *) stream;
886 klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
887
888 if (klass->transfer_verify_response_code) {
889 if (!klass->transfer_verify_response_code (sink)) {
890 GST_DEBUG_OBJECT (sink, "response error");
891 GST_OBJECT_LOCK (sink);
892 sink->flow_ret = GST_FLOW_ERROR;
893 GST_OBJECT_UNLOCK (sink);
894 }
895 }
896
897 GST_DEBUG ("response %s", (gchar *) ptr);
898
899 return realsize;
900 }
901
902 static int
gst_curl_base_sink_transfer_seek_cb(void * stream,curl_off_t offset,int origin)903 gst_curl_base_sink_transfer_seek_cb (void *stream, curl_off_t offset,
904 int origin)
905 {
906 GstCurlBaseSink *sink;
907 curl_off_t buf_size;
908
909 /*
910 * Origin is SEEK_SET, SEEK_CUR or SEEK_END,
911 * libcurl currently only passes SEEK_SET.
912 */
913
914 sink = (GstCurlBaseSink *) stream;
915
916 GST_OBJECT_LOCK (sink);
917 buf_size = sink->transfer_buf->offset + sink->transfer_buf->len;
918
919 switch (origin) {
920 case SEEK_SET:
921 if ((0 <= offset) && (offset <= buf_size)) {
922 sink->transfer_buf->offset = offset;
923 sink->transfer_buf->len = buf_size - offset;
924 } else {
925 GST_OBJECT_UNLOCK (sink);
926 return CURL_SEEKFUNC_FAIL;
927 }
928 break;
929 case SEEK_CUR:
930 case SEEK_END:
931 default:
932 GST_OBJECT_UNLOCK (sink);
933 return CURL_SEEKFUNC_FAIL;
934 break;
935 }
936
937 GST_OBJECT_UNLOCK (sink);
938 return CURL_SEEKFUNC_OK;
939 }
940
941 CURLcode
gst_curl_base_sink_transfer_check(GstCurlBaseSink * sink)942 gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink)
943 {
944 CURLcode code = CURLE_OK;
945 CURL *easy;
946 CURLMsg *msg;
947 gint msgs_left;
948 gchar *eff_url = NULL;
949
950 do {
951 easy = NULL;
952 while ((msg = curl_multi_info_read (sink->multi_handle, &msgs_left))) {
953 if (msg->msg == CURLMSG_DONE) {
954 easy = msg->easy_handle;
955 code = msg->data.result;
956 break;
957 }
958 }
959 if (easy) {
960 curl_easy_getinfo (easy, CURLINFO_EFFECTIVE_URL, &eff_url);
961 GST_DEBUG ("transfer done %s (%s-%d)", eff_url,
962 curl_easy_strerror (code), code);
963 }
964 } while (easy);
965
966 return code;
967 }
968
969 static void
handle_transfer(GstCurlBaseSink * sink)970 handle_transfer (GstCurlBaseSink * sink)
971 {
972 GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
973 gint retval;
974 gint activated_fds;
975 gint running_handles;
976 gint timeout;
977 CURLMcode m_code;
978 CURLcode e_code;
979
980 GST_OBJECT_LOCK (sink);
981 timeout = sink->timeout;
982 GST_OBJECT_UNLOCK (sink);
983
984 GST_DEBUG_OBJECT (sink, "handling transfers");
985
986 /* Receiving CURLM_CALL_MULTI_PERFORM means that libcurl may have more data
987 available to send or receive - call simply curl_multi_perform before
988 poll() on more actions */
989 do {
990 m_code = curl_multi_perform (sink->multi_handle, &running_handles);
991 } while (m_code == CURLM_CALL_MULTI_PERFORM);
992 GST_DEBUG_OBJECT (sink, "running handles: %d", running_handles);
993
994 while (running_handles && (m_code == CURLM_OK)) {
995 if (klass->transfer_prepare_poll_wait) {
996 klass->transfer_prepare_poll_wait (sink);
997 }
998
999 activated_fds = gst_poll_wait (sink->fdset, timeout * GST_SECOND);
1000 if (G_UNLIKELY (activated_fds == -1)) {
1001 if (errno == EAGAIN || errno == EINTR) {
1002 GST_DEBUG_OBJECT (sink, "interrupted by signal");
1003 } else if (errno == EBUSY) {
1004 GST_DEBUG_OBJECT (sink, "poll stopped");
1005 retval = GST_FLOW_EOS;
1006
1007 GST_OBJECT_LOCK (sink);
1008 if (gst_curl_base_sink_has_buffered_data_unlocked (sink))
1009 GST_WARNING_OBJECT (sink,
1010 "discarding render data due to thread close flag");
1011 GST_OBJECT_UNLOCK (sink);
1012
1013 goto fail;
1014 } else {
1015 sink->error = g_strdup_printf ("poll failed: %s", g_strerror (errno));
1016 retval = GST_FLOW_ERROR;
1017 goto fail;
1018 }
1019 } else if (G_UNLIKELY (activated_fds == 0)) {
1020 sink->error = g_strdup_printf ("poll timed out after %" GST_TIME_FORMAT,
1021 GST_TIME_ARGS (timeout * GST_SECOND));
1022 retval = GST_FLOW_ERROR;
1023 goto fail;
1024 }
1025
1026 /* readable/writable sockets */
1027 do {
1028 m_code = curl_multi_perform (sink->multi_handle, &running_handles);
1029 } while (m_code == CURLM_CALL_MULTI_PERFORM);
1030 GST_DEBUG_OBJECT (sink, "running handles: %d", running_handles);
1031 }
1032
1033 if (m_code != CURLM_OK) {
1034 sink->error = g_strdup_printf ("failed to write data: %s",
1035 curl_multi_strerror (m_code));
1036 retval = GST_FLOW_ERROR;
1037 goto fail;
1038 }
1039
1040 /* problems still might have occurred on individual transfers even when
1041 * curl_multi_perform returns CURLM_OK */
1042 if ((e_code = gst_curl_base_sink_transfer_check (sink)) != CURLE_OK) {
1043 sink->error = g_strdup_printf ("failed to transfer data: %s",
1044 curl_easy_strerror (e_code));
1045 retval = GST_FLOW_ERROR;
1046 goto fail;
1047 }
1048
1049 gst_curl_base_sink_got_response_notify (sink);
1050
1051 GST_OBJECT_LOCK (sink);
1052 if (sink->socket_type == CURLSOCKTYPE_ACCEPT) {
1053 /* FIXME: remove this again once we can depend on libcurl > 7.44.0,
1054 * see https://github.com/bagder/curl/issues/405.
1055 */
1056 if (G_UNLIKELY (sink->fd.fd < 0)) {
1057 sink->error = g_strdup_printf ("unknown error");
1058 retval = GST_FLOW_ERROR;
1059 GST_OBJECT_UNLOCK (sink);
1060 goto fail;
1061 }
1062 if (!gst_poll_remove_fd (sink->fdset, &sink->fd)) {
1063 sink->error = g_strdup_printf ("failed to remove fd");
1064 retval = GST_FLOW_ERROR;
1065 GST_OBJECT_UNLOCK (sink);
1066 goto fail;
1067 }
1068 sink->fd.fd = -1;
1069 }
1070 GST_OBJECT_UNLOCK (sink);
1071
1072 return;
1073
1074 fail:
1075 GST_OBJECT_LOCK (sink);
1076 if (sink->flow_ret == GST_FLOW_OK) {
1077 sink->flow_ret = retval;
1078 }
1079 GST_OBJECT_UNLOCK (sink);
1080 return;
1081 }
1082
1083 #ifndef GST_DISABLE_GST_DEBUG
1084 static int
gst_curl_base_sink_debug_cb(CURL * handle,curl_infotype type,char * data,size_t size,void * clientp)1085 gst_curl_base_sink_debug_cb (CURL * handle, curl_infotype type, char *data,
1086 size_t size, void *clientp)
1087 {
1088 GstCurlBaseSink *sink = (GstCurlBaseSink *) clientp;
1089 gchar *msg = NULL;
1090
1091 switch (type) {
1092 case CURLINFO_TEXT:
1093 case CURLINFO_HEADER_IN:
1094 case CURLINFO_HEADER_OUT:
1095 msg = g_memdup2 (data, size);
1096 if (size > 0) {
1097 msg[size - 1] = '\0';
1098 g_strchomp (msg);
1099 }
1100 break;
1101 default:
1102 break;
1103 }
1104
1105 switch (type) {
1106 case CURLINFO_TEXT:
1107 GST_DEBUG_OBJECT (sink, "%s", msg);
1108 break;
1109 case CURLINFO_HEADER_IN:
1110 GST_DEBUG_OBJECT (sink, "incoming header: %s", msg);
1111 break;
1112 case CURLINFO_HEADER_OUT:
1113 GST_DEBUG_OBJECT (sink, "outgoing header: %s", msg);
1114 break;
1115 case CURLINFO_DATA_IN:
1116 GST_MEMDUMP_OBJECT (sink, "incoming data", (guint8 *) data, size);
1117 break;
1118 case CURLINFO_DATA_OUT:
1119 GST_MEMDUMP_OBJECT (sink, "outgoing data", (guint8 *) data, size);
1120 break;
1121 case CURLINFO_SSL_DATA_IN:
1122 GST_MEMDUMP_OBJECT (sink, "incoming ssl data", (guint8 *) data, size);
1123 break;
1124 case CURLINFO_SSL_DATA_OUT:
1125 GST_MEMDUMP_OBJECT (sink, "outgoing ssl data", (guint8 *) data, size);
1126 break;
1127 default:
1128 GST_DEBUG_OBJECT (sink, "unknown debug info type %d", type);
1129 GST_MEMDUMP_OBJECT (sink, "unknown data", (guint8 *) data, size);
1130 break;
1131 }
1132 g_free (msg);
1133 return 0;
1134 }
1135 #endif
1136
1137 /* This function gets called by libcurl after the socket() call but before
1138 * the connect() call. */
1139 static int
gst_curl_base_sink_transfer_socket_cb(void * clientp,curl_socket_t curlfd,curlsocktype socket_type)1140 gst_curl_base_sink_transfer_socket_cb (void *clientp, curl_socket_t curlfd,
1141 curlsocktype socket_type)
1142 {
1143 GstCurlBaseSink *sink;
1144 gboolean ret = TRUE;
1145
1146 sink = (GstCurlBaseSink *) clientp;
1147
1148 g_assert (sink);
1149
1150 if (curlfd < 0) {
1151 /* signal an unrecoverable error to the library which will close the socket
1152 and return CURLE_COULDNT_CONNECT
1153 */
1154 GST_DEBUG_OBJECT (sink, "no curlfd");
1155 return 1;
1156 }
1157
1158 GST_OBJECT_LOCK (sink);
1159 sink->socket_type = socket_type;
1160
1161 if (sink->fd.fd != curlfd) {
1162 if (sink->fd.fd > 0 && sink->socket_type != CURLSOCKTYPE_ACCEPT) {
1163 ret &= gst_poll_remove_fd (sink->fdset, &sink->fd);
1164 }
1165 sink->fd.fd = curlfd;
1166 ret &= gst_poll_add_fd (sink->fdset, &sink->fd);
1167 ret &= gst_poll_fd_ctl_write (sink->fdset, &sink->fd, TRUE);
1168 ret &= gst_poll_fd_ctl_read (sink->fdset, &sink->fd, TRUE);
1169 }
1170 GST_DEBUG_OBJECT (sink, "fd: %d", sink->fd.fd);
1171 gst_curl_base_sink_setup_dscp_unlocked (sink);
1172 GST_OBJECT_UNLOCK (sink);
1173
1174 /* success */
1175 return ret ? 0 : 1;
1176 }
1177
1178 static gboolean
gst_curl_base_sink_transfer_start_unlocked(GstCurlBaseSink * sink)1179 gst_curl_base_sink_transfer_start_unlocked (GstCurlBaseSink * sink)
1180 {
1181 GError *error = NULL;
1182 gboolean ret = TRUE;
1183
1184 GST_LOG ("creating transfer thread");
1185 sink->transfer_thread_close = FALSE;
1186 sink->new_file = TRUE;
1187 sink->transfer_thread = g_thread_try_new ("curl-transfer", (GThreadFunc)
1188 gst_curl_base_sink_transfer_thread_func, sink, &error);
1189
1190 if (sink->transfer_thread == NULL || error != NULL) {
1191 ret = FALSE;
1192 if (error) {
1193 GST_ERROR_OBJECT (sink, "could not create thread %s", error->message);
1194 g_error_free (error);
1195 } else {
1196 GST_ERROR_OBJECT (sink, "could not create thread for unknown reason");
1197 }
1198 }
1199
1200 return ret;
1201 }
1202
1203 static gpointer
gst_curl_base_sink_transfer_thread_func(gpointer data)1204 gst_curl_base_sink_transfer_thread_func (gpointer data)
1205 {
1206 GstCurlBaseSink *sink = (GstCurlBaseSink *) data;
1207 GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
1208 GstFlowReturn ret;
1209 gboolean data_available;
1210
1211 GST_LOG ("transfer thread started");
1212 GST_OBJECT_LOCK (sink);
1213 if (!gst_curl_base_sink_transfer_setup_unlocked (sink)) {
1214 /* no need to set sink->error, as it is set by the called function */
1215 sink->flow_ret = GST_FLOW_ERROR;
1216 goto done;
1217 }
1218
1219 while (!sink->transfer_thread_close && sink->flow_ret == GST_FLOW_OK) {
1220 /* we are working on a new file, clearing flag and setting a new file
1221 * name */
1222 sink->new_file = FALSE;
1223
1224 /* wait for data to arrive for this new file, if we get a new file name
1225 * again before getting data we will simply skip transferring anything
1226 * for this file and go directly to the new file */
1227 data_available = gst_curl_base_sink_wait_for_data_unlocked (sink);
1228 if (data_available) {
1229 if (G_UNLIKELY (!klass->set_protocol_dynamic_options_unlocked (sink))) {
1230 sink->error = g_strdup ("unexpected state");
1231 sink->flow_ret = GST_FLOW_ERROR;
1232 goto done;
1233 }
1234 }
1235
1236 /* stay unlocked while handling the actual transfer */
1237 GST_OBJECT_UNLOCK (sink);
1238
1239 if (data_available) {
1240 GST_LOG ("have data");
1241 if (!gst_curl_base_sink_is_live (sink)) {
1242 /* prepare transfer if needed */
1243 if (klass->prepare_transfer) {
1244 GST_OBJECT_LOCK (sink);
1245 if (!klass->prepare_transfer (sink)) {
1246 sink->flow_ret = GST_FLOW_ERROR;
1247 goto done;
1248 }
1249 GST_OBJECT_UNLOCK (sink);
1250 }
1251 GST_LOG ("adding handle");
1252 curl_multi_add_handle (sink->multi_handle, sink->curl);
1253 }
1254
1255 /* Start driving the transfer. */
1256 klass->handle_transfer (sink);
1257
1258 /* easy handle will be possibly re-used for next transfer, thus it needs
1259 * to be removed from the multi stack and re-added again */
1260 if (!gst_curl_base_sink_is_live (sink)) {
1261 GST_LOG ("removing handle");
1262 curl_multi_remove_handle (sink->multi_handle, sink->curl);
1263 }
1264 } else {
1265 GST_LOG ("have no data yet");
1266 }
1267
1268 /* lock again before looping to check the thread closed flag */
1269 GST_OBJECT_LOCK (sink);
1270 }
1271
1272 if (sink->is_live) {
1273 GST_LOG ("removing handle");
1274 curl_multi_remove_handle (sink->multi_handle, sink->curl);
1275 }
1276
1277 done:
1278 gst_curl_base_sink_transfer_cleanup (sink);
1279
1280 /* extract the error code so the lock does not have to be
1281 * taken when calling the functions below that take the lock
1282 * on their own */
1283 ret = sink->flow_ret;
1284 GST_OBJECT_UNLOCK (sink);
1285
1286 /* if there is a flow error, always notify the render function so it
1287 * can return the flow error up along the pipeline. as an error has
1288 * occurred there is no response to receive, so notify the event function
1289 * so it doesn't block indefinitely waiting for a response. */
1290 if (ret != GST_FLOW_OK) {
1291 gst_curl_base_sink_data_sent_notify (sink);
1292 gst_curl_base_sink_got_response_notify (sink);
1293 }
1294
1295 GST_DEBUG ("exit thread func - transfer thread close flag: %d",
1296 sink->transfer_thread_close);
1297
1298 return NULL;
1299 }
1300
1301 static gboolean
gst_curl_base_sink_transfer_setup_unlocked(GstCurlBaseSink * sink)1302 gst_curl_base_sink_transfer_setup_unlocked (GstCurlBaseSink * sink)
1303 {
1304 g_assert (sink);
1305
1306 if (sink->curl == NULL) {
1307 /* curl_easy_init automatically calls curl_global_init(3) */
1308 if ((sink->curl = curl_easy_init ()) == NULL) {
1309 sink->error = g_strdup ("failed to init curl easy handle");
1310 return FALSE;
1311 }
1312 }
1313
1314 if (!gst_curl_base_sink_transfer_set_options_unlocked (sink)) {
1315 if (!sink->error) {
1316 sink->error = g_strdup ("failed to setup curl easy handle");
1317 }
1318 return FALSE;
1319 }
1320
1321 /* init a multi stack (non-blocking interface to libcurl) */
1322 if (sink->multi_handle == NULL) {
1323 if ((sink->multi_handle = curl_multi_init ()) == NULL) {
1324 sink->error = g_strdup ("failed to init curl multi handle");
1325 return FALSE;
1326 }
1327 }
1328
1329 GST_LOG ("transfer setup done");
1330 return TRUE;
1331 }
1332
1333 static void
gst_curl_base_sink_transfer_cleanup(GstCurlBaseSink * sink)1334 gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink)
1335 {
1336 if (sink->curl != NULL) {
1337 if (sink->multi_handle != NULL) {
1338 curl_multi_remove_handle (sink->multi_handle, sink->curl);
1339 }
1340 curl_easy_cleanup (sink->curl);
1341 sink->curl = NULL;
1342 }
1343
1344 if (sink->multi_handle != NULL) {
1345 curl_multi_cleanup (sink->multi_handle);
1346 sink->multi_handle = NULL;
1347 }
1348 }
1349
1350 static gboolean
gst_curl_base_sink_wait_for_data_unlocked(GstCurlBaseSink * sink)1351 gst_curl_base_sink_wait_for_data_unlocked (GstCurlBaseSink * sink)
1352 {
1353 gboolean data_available = FALSE;
1354
1355 GST_LOG ("waiting for data");
1356 while (!sink->transfer_cond->data_available &&
1357 !sink->transfer_thread_close && !sink->new_file) {
1358 g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
1359 }
1360
1361 if (sink->transfer_thread_close) {
1362 GST_LOG ("wait for data aborted due to thread close");
1363 } else if (sink->new_file) {
1364 GST_LOG ("wait for data aborted due to new file name");
1365 } else {
1366 GST_LOG ("wait for data completed");
1367 data_available = TRUE;
1368 }
1369
1370 return data_available;
1371 }
1372
1373 static void
gst_curl_base_sink_new_file_notify_unlocked(GstCurlBaseSink * sink)1374 gst_curl_base_sink_new_file_notify_unlocked (GstCurlBaseSink * sink)
1375 {
1376 GST_LOG ("new file name");
1377 sink->new_file = TRUE;
1378 g_cond_signal (&sink->transfer_cond->cond);
1379 }
1380
1381 static void
gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked(GstCurlBaseSink * sink)1382 gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked
1383 (GstCurlBaseSink * sink)
1384 {
1385 GST_LOG ("waiting for buffer send to complete");
1386
1387 /* this function should not check if the transfer thread is set to be closed
1388 * since that flag only can be set by the EOS event (by the pipeline thread).
1389 * This can therefore never happen while this function is running since this
1390 * function also is called by the pipeline thread (in the render function) */
1391 while (!sink->transfer_cond->data_sent) {
1392 g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
1393 }
1394 GST_LOG ("buffer send completed");
1395 }
1396
1397 static void
gst_curl_base_sink_data_sent_notify(GstCurlBaseSink * sink)1398 gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink)
1399 {
1400 GST_LOG ("transfer completed");
1401 GST_OBJECT_LOCK (sink);
1402 sink->transfer_cond->data_available = FALSE;
1403 sink->transfer_cond->data_sent = TRUE;
1404 g_cond_signal (&sink->transfer_cond->cond);
1405 GST_OBJECT_UNLOCK (sink);
1406 }
1407
1408 static void
gst_curl_base_sink_wait_for_response(GstCurlBaseSink * sink)1409 gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink)
1410 {
1411 GST_LOG ("waiting for remote to send response code");
1412
1413 GST_OBJECT_LOCK (sink);
1414 while (sink->transfer_cond->wait_for_response) {
1415 g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
1416 }
1417 GST_OBJECT_UNLOCK (sink);
1418
1419 GST_LOG ("response code received");
1420 }
1421
1422 static void
gst_curl_base_sink_got_response_notify(GstCurlBaseSink * sink)1423 gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink)
1424 {
1425 GST_LOG ("got response code");
1426
1427 GST_OBJECT_LOCK (sink);
1428 sink->transfer_cond->wait_for_response = FALSE;
1429 g_cond_signal (&sink->transfer_cond->cond);
1430 GST_OBJECT_UNLOCK (sink);
1431 }
1432
1433 static gint
gst_curl_base_sink_setup_dscp_unlocked(GstCurlBaseSink * sink)1434 gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink)
1435 {
1436 gint tos;
1437 gint af;
1438 gint ret = -1;
1439 union
1440 {
1441 struct sockaddr sa;
1442 struct sockaddr_in6 sa_in6;
1443 struct sockaddr_storage sa_stor;
1444 } sa;
1445 socklen_t slen = sizeof (sa);
1446
1447 if (getsockname (sink->fd.fd, &sa.sa, &slen) < 0) {
1448 GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
1449 return ret;
1450 }
1451 af = sa.sa.sa_family;
1452
1453 /* if this is an IPv4-mapped address then do IPv4 QoS */
1454 if (af == AF_INET6) {
1455 GST_DEBUG_OBJECT (sink, "check IP6 socket");
1456 if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
1457 GST_DEBUG_OBJECT (sink, "mapped to IPV4");
1458 af = AF_INET;
1459 }
1460 }
1461 /* extract and shift 6 bits of the DSCP */
1462 tos = (sink->qos_dscp & 0x3f) << 2;
1463
1464 switch (af) {
1465 case AF_INET:
1466 ret = setsockopt (sink->fd.fd, IPPROTO_IP, IP_TOS, (void *) &tos,
1467 sizeof (tos));
1468 break;
1469 case AF_INET6:
1470 #ifdef IPV6_TCLASS
1471 ret = setsockopt (sink->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, (void *) &tos,
1472 sizeof (tos));
1473 break;
1474 #endif
1475 default:
1476 GST_ERROR_OBJECT (sink, "unsupported AF");
1477 break;
1478 }
1479 if (ret) {
1480 GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
1481 }
1482
1483 return ret;
1484 }
1485