1 /* GStreamer
2 * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
3 * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
4 * Copyright (C) <2012> Collabora Ltd.
5 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6 * Copyright (C) 2014 Tim-Philipp Müller <tim@centricular.com>
7 * Copyright (C) 2014 Centricular Ltd
8 *
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
13 *
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
18 *
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22 * Boston, MA 02110-1301, USA.
23 */
24
25 /**
26 * SECTION:element-udpsrc
27 * @title: udpsrc
28 * @see_also: udpsink, multifdsink
29 *
30 * udpsrc is a network source that reads UDP packets from the network.
31 * It can be combined with RTP depayloaders to implement RTP streaming.
32 *
33 * The udpsrc element supports automatic port allocation by setting the
34 * #GstUDPSrc:port property to 0. After setting the udpsrc to PAUSED, the
35 * allocated port can be obtained by reading the port property.
36 *
37 * udpsrc can read from multicast groups by setting the #GstUDPSrc:multicast-group
38 * property to the IP address of the multicast group.
39 *
40 * Alternatively one can provide a custom socket to udpsrc with the #GstUDPSrc:socket
41 * property, udpsrc will then not allocate a socket itself but use the provided
42 * one.
43 *
44 * The #GstUDPSrc:caps property is mainly used to give a type to the UDP packet
45 * so that they can be autoplugged in GStreamer pipelines. This is very useful
46 * for RTP implementations where the contents of the UDP packets is transferred
47 * out-of-bounds using SDP or other means.
48 *
49 * The #GstUDPSrc:buffer-size property is used to change the default kernel
50 * buffersizes used for receiving packets. The buffer size may be increased for
51 * high-volume connections, or may be decreased to limit the possible backlog of
52 * incoming data. The system places an absolute limit on these values, on Linux,
53 * for example, the default buffer size is typically 50K and can be increased to
54 * maximally 100K.
55 *
56 * The #GstUDPSrc:skip-first-bytes property is used to strip off an arbitrary
57 * number of bytes from the start of the raw udp packet and can be used to strip
58 * off proprietary header, for example.
59 *
60 * The udpsrc is always a live source. It does however not provide a #GstClock,
61 * this is left for downstream elements such as an RTP session manager or demuxer
62 * (such as an MPEG demuxer). As with all live sources, the captured buffers
63 * will have their timestamp set to the current running time of the pipeline.
64 *
65 * udpsrc implements a #GstURIHandler interface that handles udp://host:port
66 * type URIs.
67 *
68 * If the #GstUDPSrc:timeout property is set to a value bigger than 0, udpsrc
69 * will generate an element message named `GstUDPSrcTimeout`
70 * if no data was received in the given timeout.
71 *
72 * The message's structure contains one field:
73 *
74 * * #guint64 `timeout`: the timeout in microseconds that expired when waiting for data.
75 *
76 * The message is typically used to detect that no UDP arrives in the receiver
77 * because it is blocked by a firewall.
78 *
79 * A custom file descriptor can be configured with the
80 * #GstUDPSrc:socket property. The socket will be closed when setting
81 * the element to READY by default. This behaviour can be overridden
82 * with the #GstUDPSrc:close-socket property, in which case the
83 * application is responsible for closing the file descriptor.
84 *
85 * ## Examples
86 * |[
87 * gst-launch-1.0 -v udpsrc ! fakesink dump=1
88 * ]| A pipeline to read from the default port and dump the udp packets.
89 * To actually generate udp packets on the default port one can use the
90 * udpsink element. When running the following pipeline in another terminal, the
91 * above mentioned pipeline should dump data packets to the console.
92 * |[
93 * gst-launch-1.0 -v audiotestsrc ! udpsink
94 * ]|
95 * |[
96 * gst-launch-1.0 -v udpsrc port=0 ! fakesink
97 * ]| read udp packets from a free port.
98 *
99 */
100 #ifdef HAVE_CONFIG_H
101 #include "config.h"
102 #endif
103
104 /* Needed to get struct in6_pktinfo.
105 * Also all these have to be before glib.h is included as
106 * otherwise struct in6_pktinfo is not defined completely
107 * due to broken glibc headers */
108 #define _GNU_SOURCE
109 /* Needed for OSX/iOS to define the IPv6 variants */
110 #define __APPLE_USE_RFC_3542
111 #include <sys/types.h>
112 #ifdef HAVE_SYS_SOCKET_H
113 #include <sys/socket.h>
114 #endif
115
116 #include <string.h>
117 #include "gstudpelements.h"
118 #include "gstudpsrc.h"
119
120 #include <gst/net/gstnetaddressmeta.h>
121
122 #include <gio/gnetworking.h>
123
124 /* Required for other parts of in_pktinfo / in6_pktinfo but only
125 * on non-Windows and can be included after glib.h */
126 #ifndef G_PLATFORM_WIN32
127 #include <netinet/ip.h>
128 #endif
129
130 /* Control messages for getting the destination address */
131 #ifdef IP_PKTINFO
132 GType gst_ip_pktinfo_message_get_type (void);
133
134 #define GST_TYPE_IP_PKTINFO_MESSAGE (gst_ip_pktinfo_message_get_type ())
135 #define GST_IP_PKTINFO_MESSAGE(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), GST_TYPE_IP_PKTINFO_MESSAGE, GstIPPktinfoMessage))
136 #define GST_IP_PKTINFO_MESSAGE_CLASS(c) (G_TYPE_CHECK_CLASS_CAST ((c), GST_TYPE_IP_PKTINFO_MESSAGE, GstIPPktinfoMessageClass))
137 #define GST_IS_IP_PKTINFO_MESSAGE(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), GST_TYPE_IP_PKTINFO_MESSAGE))
138 #define GST_IS_IP_PKTINFO_MESSAGE_CLASS(c) (G_TYPE_CHECK_CLASS_TYPE ((c), GST_TYPE_IP_PKTINFO_MESSAGE))
139 #define GST_IP_PKTINFO_MESSAGE_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), GST_TYPE_IP_PKTINFO_MESSAGE, GstIPPktinfoMessageClass))
140
141 typedef struct _GstIPPktinfoMessage GstIPPktinfoMessage;
142 typedef struct _GstIPPktinfoMessageClass GstIPPktinfoMessageClass;
143
144 struct _GstIPPktinfoMessageClass
145 {
146 GSocketControlMessageClass parent_class;
147
148 };
149
150 struct _GstIPPktinfoMessage
151 {
152 GSocketControlMessage parent;
153
154 guint ifindex;
155 #ifndef G_PLATFORM_WIN32
156 #ifndef __NetBSD__
157 struct in_addr spec_dst;
158 #endif
159 #endif
160 struct in_addr addr;
161 };
162
163 G_DEFINE_TYPE (GstIPPktinfoMessage, gst_ip_pktinfo_message,
164 G_TYPE_SOCKET_CONTROL_MESSAGE);
165
166 static gsize
gst_ip_pktinfo_message_get_size(GSocketControlMessage * message)167 gst_ip_pktinfo_message_get_size (GSocketControlMessage * message)
168 {
169 return sizeof (struct in_pktinfo);
170 }
171
172 static int
gst_ip_pktinfo_message_get_level(GSocketControlMessage * message)173 gst_ip_pktinfo_message_get_level (GSocketControlMessage * message)
174 {
175 return IPPROTO_IP;
176 }
177
178 static int
gst_ip_pktinfo_message_get_msg_type(GSocketControlMessage * message)179 gst_ip_pktinfo_message_get_msg_type (GSocketControlMessage * message)
180 {
181 return IP_PKTINFO;
182 }
183
184 static GSocketControlMessage *
gst_ip_pktinfo_message_deserialize(gint level,gint type,gsize size,gpointer data)185 gst_ip_pktinfo_message_deserialize (gint level,
186 gint type, gsize size, gpointer data)
187 {
188 struct in_pktinfo *pktinfo;
189 GstIPPktinfoMessage *message;
190
191 if (level != IPPROTO_IP || type != IP_PKTINFO)
192 return NULL;
193
194 if (size < sizeof (struct in_pktinfo))
195 return NULL;
196
197 pktinfo = data;
198
199 message = g_object_new (GST_TYPE_IP_PKTINFO_MESSAGE, NULL);
200 message->ifindex = pktinfo->ipi_ifindex;
201 #ifndef G_PLATFORM_WIN32
202 #ifndef __NetBSD__
203 message->spec_dst = pktinfo->ipi_spec_dst;
204 #endif
205 #endif
206 message->addr = pktinfo->ipi_addr;
207
208 return G_SOCKET_CONTROL_MESSAGE (message);
209 }
210
211 static void
gst_ip_pktinfo_message_init(GstIPPktinfoMessage * message)212 gst_ip_pktinfo_message_init (GstIPPktinfoMessage * message)
213 {
214 }
215
216 static void
gst_ip_pktinfo_message_class_init(GstIPPktinfoMessageClass * class)217 gst_ip_pktinfo_message_class_init (GstIPPktinfoMessageClass * class)
218 {
219 GSocketControlMessageClass *scm_class;
220
221 scm_class = G_SOCKET_CONTROL_MESSAGE_CLASS (class);
222 scm_class->get_size = gst_ip_pktinfo_message_get_size;
223 scm_class->get_level = gst_ip_pktinfo_message_get_level;
224 scm_class->get_type = gst_ip_pktinfo_message_get_msg_type;
225 scm_class->deserialize = gst_ip_pktinfo_message_deserialize;
226 }
227 #endif
228
229 #ifdef IPV6_PKTINFO
230 GType gst_ipv6_pktinfo_message_get_type (void);
231
232 #define GST_TYPE_IPV6_PKTINFO_MESSAGE (gst_ipv6_pktinfo_message_get_type ())
233 #define GST_IPV6_PKTINFO_MESSAGE(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), GST_TYPE_IPV6_PKTINFO_MESSAGE, GstIPV6PktinfoMessage))
234 #define GST_IPV6_PKTINFO_MESSAGE_CLASS(c) (G_TYPE_CHECK_CLASS_CAST ((c), GST_TYPE_IPV6_PKTINFO_MESSAGE, GstIPV6PktinfoMessageClass))
235 #define GST_IS_IPV6_PKTINFO_MESSAGE(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), GST_TYPE_IPV6_PKTINFO_MESSAGE))
236 #define GST_IS_IPV6_PKTINFO_MESSAGE_CLASS(c) (G_TYPE_CHECK_CLASS_TYPE ((c), GST_TYPE_IPV6_PKTINFO_MESSAGE))
237 #define GST_IPV6_PKTINFO_MESSAGE_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), GST_TYPE_IPV6_PKTINFO_MESSAGE, GstIPV6PktinfoMessageClass))
238
239 typedef struct _GstIPV6PktinfoMessage GstIPV6PktinfoMessage;
240 typedef struct _GstIPV6PktinfoMessageClass GstIPV6PktinfoMessageClass;
241
242 struct _GstIPV6PktinfoMessageClass
243 {
244 GSocketControlMessageClass parent_class;
245
246 };
247
248 struct _GstIPV6PktinfoMessage
249 {
250 GSocketControlMessage parent;
251
252 guint ifindex;
253 struct in6_addr addr;
254 };
255
256 G_DEFINE_TYPE (GstIPV6PktinfoMessage, gst_ipv6_pktinfo_message,
257 G_TYPE_SOCKET_CONTROL_MESSAGE);
258
259 static gsize
gst_ipv6_pktinfo_message_get_size(GSocketControlMessage * message)260 gst_ipv6_pktinfo_message_get_size (GSocketControlMessage * message)
261 {
262 return sizeof (struct in6_pktinfo);
263 }
264
265 static int
gst_ipv6_pktinfo_message_get_level(GSocketControlMessage * message)266 gst_ipv6_pktinfo_message_get_level (GSocketControlMessage * message)
267 {
268 return IPPROTO_IPV6;
269 }
270
271 static int
gst_ipv6_pktinfo_message_get_msg_type(GSocketControlMessage * message)272 gst_ipv6_pktinfo_message_get_msg_type (GSocketControlMessage * message)
273 {
274 return IPV6_PKTINFO;
275 }
276
277 static GSocketControlMessage *
gst_ipv6_pktinfo_message_deserialize(gint level,gint type,gsize size,gpointer data)278 gst_ipv6_pktinfo_message_deserialize (gint level,
279 gint type, gsize size, gpointer data)
280 {
281 struct in6_pktinfo *pktinfo;
282 GstIPV6PktinfoMessage *message;
283
284 if (level != IPPROTO_IPV6 || type != IPV6_PKTINFO)
285 return NULL;
286
287 if (size < sizeof (struct in6_pktinfo))
288 return NULL;
289
290 pktinfo = data;
291
292 message = g_object_new (GST_TYPE_IPV6_PKTINFO_MESSAGE, NULL);
293 message->ifindex = pktinfo->ipi6_ifindex;
294 message->addr = pktinfo->ipi6_addr;
295
296 return G_SOCKET_CONTROL_MESSAGE (message);
297 }
298
299 static void
gst_ipv6_pktinfo_message_init(GstIPV6PktinfoMessage * message)300 gst_ipv6_pktinfo_message_init (GstIPV6PktinfoMessage * message)
301 {
302 }
303
304 static void
gst_ipv6_pktinfo_message_class_init(GstIPV6PktinfoMessageClass * class)305 gst_ipv6_pktinfo_message_class_init (GstIPV6PktinfoMessageClass * class)
306 {
307 GSocketControlMessageClass *scm_class;
308
309 scm_class = G_SOCKET_CONTROL_MESSAGE_CLASS (class);
310 scm_class->get_size = gst_ipv6_pktinfo_message_get_size;
311 scm_class->get_level = gst_ipv6_pktinfo_message_get_level;
312 scm_class->get_type = gst_ipv6_pktinfo_message_get_msg_type;
313 scm_class->deserialize = gst_ipv6_pktinfo_message_deserialize;
314 }
315
316 #endif
317
318 #ifdef IP_RECVDSTADDR
319 GType gst_ip_recvdstaddr_message_get_type (void);
320
321 #define GST_TYPE_IP_RECVDSTADDR_MESSAGE (gst_ip_recvdstaddr_message_get_type ())
322 #define GST_IP_RECVDSTADDR_MESSAGE(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), GST_TYPE_IP_RECVDSTADDR_MESSAGE, GstIPRecvdstaddrMessage))
323 #define GST_IP_RECVDSTADDR_MESSAGE_CLASS(c) (G_TYPE_CHECK_CLASS_CAST ((c), GST_TYPE_IP_RECVDSTADDR_MESSAGE, GstIPRecvdstaddrMessageClass))
324 #define GST_IS_IP_RECVDSTADDR_MESSAGE(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), GST_TYPE_IP_RECVDSTADDR_MESSAGE))
325 #define GST_IS_IP_RECVDSTADDR_MESSAGE_CLASS(c) (G_TYPE_CHECK_CLASS_TYPE ((c), GST_TYPE_IP_RECVDSTADDR_MESSAGE))
326 #define GST_IP_RECVDSTADDR_MESSAGE_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), GST_TYPE_IP_RECVDSTADDR_MESSAGE, GstIPRecvdstaddrMessageClass))
327
328 typedef struct _GstIPRecvdstaddrMessage GstIPRecvdstaddrMessage;
329 typedef struct _GstIPRecvdstaddrMessageClass GstIPRecvdstaddrMessageClass;
330
331 struct _GstIPRecvdstaddrMessageClass
332 {
333 GSocketControlMessageClass parent_class;
334
335 };
336
337 struct _GstIPRecvdstaddrMessage
338 {
339 GSocketControlMessage parent;
340
341 guint ifindex;
342 struct in_addr addr;
343 };
344
345 G_DEFINE_TYPE (GstIPRecvdstaddrMessage, gst_ip_recvdstaddr_message,
346 G_TYPE_SOCKET_CONTROL_MESSAGE);
347
348 static gsize
gst_ip_recvdstaddr_message_get_size(GSocketControlMessage * message)349 gst_ip_recvdstaddr_message_get_size (GSocketControlMessage * message)
350 {
351 return sizeof (struct in_addr);
352 }
353
354 static int
gst_ip_recvdstaddr_message_get_level(GSocketControlMessage * message)355 gst_ip_recvdstaddr_message_get_level (GSocketControlMessage * message)
356 {
357 return IPPROTO_IP;
358 }
359
360 static int
gst_ip_recvdstaddr_message_get_msg_type(GSocketControlMessage * message)361 gst_ip_recvdstaddr_message_get_msg_type (GSocketControlMessage * message)
362 {
363 return IP_RECVDSTADDR;
364 }
365
366 static GSocketControlMessage *
gst_ip_recvdstaddr_message_deserialize(gint level,gint type,gsize size,gpointer data)367 gst_ip_recvdstaddr_message_deserialize (gint level,
368 gint type, gsize size, gpointer data)
369 {
370 struct in_addr *addr;
371 GstIPRecvdstaddrMessage *message;
372
373 if (level != IPPROTO_IP || type != IP_RECVDSTADDR)
374 return NULL;
375
376 if (size < sizeof (struct in_addr))
377 return NULL;
378
379 addr = data;
380
381 message = g_object_new (GST_TYPE_IP_RECVDSTADDR_MESSAGE, NULL);
382 message->addr = *addr;
383
384 return G_SOCKET_CONTROL_MESSAGE (message);
385 }
386
387 static void
gst_ip_recvdstaddr_message_init(GstIPRecvdstaddrMessage * message)388 gst_ip_recvdstaddr_message_init (GstIPRecvdstaddrMessage * message)
389 {
390 }
391
392 static void
gst_ip_recvdstaddr_message_class_init(GstIPRecvdstaddrMessageClass * class)393 gst_ip_recvdstaddr_message_class_init (GstIPRecvdstaddrMessageClass * class)
394 {
395 GSocketControlMessageClass *scm_class;
396
397 scm_class = G_SOCKET_CONTROL_MESSAGE_CLASS (class);
398 scm_class->get_size = gst_ip_recvdstaddr_message_get_size;
399 scm_class->get_level = gst_ip_recvdstaddr_message_get_level;
400 scm_class->get_type = gst_ip_recvdstaddr_message_get_msg_type;
401 scm_class->deserialize = gst_ip_recvdstaddr_message_deserialize;
402 }
403 #endif
404
405 #define GST_TYPE_SOCKET_TIMESTAMP_MODE gst_socket_timestamp_mode_get_type()
406 #define GST_SOCKET_TIMESTAMP_MODE (gst_socket_timestamp_mode_get_type ())
407 static GType
gst_socket_timestamp_mode_get_type(void)408 gst_socket_timestamp_mode_get_type (void)
409 {
410 static GType socket_timestamp_mode_type = 0;
411 static const GEnumValue socket_timestamp_mode_types[] = {
412 {GST_SOCKET_TIMESTAMP_MODE_DISABLED, "Disable additional timestamps",
413 "disabled"},
414 {GST_SOCKET_TIMESTAMP_MODE_REALTIME,
415 "Timestamp with realtime clock (nsec resolution, may not be monotonic)",
416 "realtime"},
417 {0, NULL, NULL}
418 };
419
420 if (!socket_timestamp_mode_type)
421 socket_timestamp_mode_type =
422 g_enum_register_static ("GstSocketTimestampMode",
423 socket_timestamp_mode_types);
424
425 return socket_timestamp_mode_type;
426 }
427
428 #ifdef SO_TIMESTAMPNS
429 GType gst_socket_timestamp_message_get_type (void);
430
431 #define GST_TYPE_SOCKET_TIMESTAMP_MESSAGE (gst_socket_timestamp_message_get_type ())
432 #define GST_SOCKET_TIMESTAMP_MESSAGE(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), GST_TYPE_SOCKET_TIMESTAMP_MESSAGE, GstSocketTimestampMessage))
433 #define GST_SOCKET_TIMESTAMP_MESSAGE_CLASS(c) (G_TYPE_CHECK_CLASS_CAST ((c), GST_TYPE_SOCKET_TIMESTAMP_MESSAGE, GstSocketTimestampMessageClass))
434 #define GST_IS_SOCKET_TIMESTAMP_MESSAGE(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), GST_TYPE_SOCKET_TIMESTAMP_MESSAGE))
435 #define GST_IS_SOCKET_TIMESTAMP_MESSAGE_CLASS(c) (G_TYPE_CHECK_CLASS_TYPE ((c), GST_TYPE_SOCKET_TIMESTAMP_MESSAGE))
436 #define GST_SOCKET_TIMESTAMP_MESSAGE_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), GST_TYPE_SOCKET_TIMESTAMP_MESSAGE, GstSocketTimestampMessageClass))
437
438 typedef struct _GstSocketTimestampMessage GstSocketTimestampMessage;
439 typedef struct _GstSocketTimestampMessageClass GstSocketTimestampMessageClass;
440
441 struct _GstSocketTimestampMessageClass
442 {
443 GSocketControlMessageClass parent_class;
444 };
445
446 struct _GstSocketTimestampMessage
447 {
448 GSocketControlMessage parent;
449 struct timespec socket_ts;
450 };
451
452 G_DEFINE_TYPE (GstSocketTimestampMessage, gst_socket_timestamp_message,
453 G_TYPE_SOCKET_CONTROL_MESSAGE);
454
455 static gsize
gst_socket_timestamp_message_get_size(GSocketControlMessage * message)456 gst_socket_timestamp_message_get_size (GSocketControlMessage * message)
457 {
458 return sizeof (struct timespec);
459 }
460
461 static int
gst_socket_timestamp_message_get_level(GSocketControlMessage * message)462 gst_socket_timestamp_message_get_level (GSocketControlMessage * message)
463 {
464 return SOL_SOCKET;
465 }
466
467 static int
gst_socket_timestamp_message_get_msg_type(GSocketControlMessage * message)468 gst_socket_timestamp_message_get_msg_type (GSocketControlMessage * message)
469 {
470 return SCM_TIMESTAMPNS;
471 }
472
473 static GSocketControlMessage *
gst_socket_timestamp_message_deserialize(gint level,gint type,gsize size,gpointer data)474 gst_socket_timestamp_message_deserialize (gint level,
475 gint type, gsize size, gpointer data)
476 {
477 GstSocketTimestampMessage *message;
478
479 if (level != SOL_SOCKET)
480 return NULL;
481
482 if (size < sizeof (struct timespec))
483 return NULL;
484
485 message = g_object_new (GST_TYPE_SOCKET_TIMESTAMP_MESSAGE, NULL);
486 memcpy (&message->socket_ts, data, sizeof (struct timespec));
487
488 return G_SOCKET_CONTROL_MESSAGE (message);
489 }
490
491 static void
gst_socket_timestamp_message_init(GstSocketTimestampMessage * message)492 gst_socket_timestamp_message_init (GstSocketTimestampMessage * message)
493 {
494 }
495
496 static void
gst_socket_timestamp_message_class_init(GstSocketTimestampMessageClass * class)497 gst_socket_timestamp_message_class_init (GstSocketTimestampMessageClass * class)
498 {
499 GSocketControlMessageClass *scm_class;
500
501 scm_class = G_SOCKET_CONTROL_MESSAGE_CLASS (class);
502 scm_class->get_size = gst_socket_timestamp_message_get_size;
503 scm_class->get_level = gst_socket_timestamp_message_get_level;
504 scm_class->get_type = gst_socket_timestamp_message_get_msg_type;
505 scm_class->deserialize = gst_socket_timestamp_message_deserialize;
506 }
507 #endif
508
509 static gboolean
gst_udpsrc_decide_allocation(GstBaseSrc * bsrc,GstQuery * query)510 gst_udpsrc_decide_allocation (GstBaseSrc * bsrc, GstQuery * query)
511 {
512 GstUDPSrc *udpsrc;
513 GstBufferPool *pool;
514 gboolean update;
515 GstStructure *config;
516 GstCaps *caps = NULL;
517
518 udpsrc = GST_UDPSRC (bsrc);
519
520 if (gst_query_get_n_allocation_pools (query) > 0) {
521 update = TRUE;
522 } else {
523 update = FALSE;
524 }
525
526 pool = gst_buffer_pool_new ();
527
528 config = gst_buffer_pool_get_config (pool);
529
530 gst_query_parse_allocation (query, &caps, NULL);
531
532 gst_buffer_pool_config_set_params (config, caps, udpsrc->mtu, 0, 0);
533
534 gst_buffer_pool_set_config (pool, config);
535
536 if (update)
537 gst_query_set_nth_allocation_pool (query, 0, pool, udpsrc->mtu, 0, 0);
538 else
539 gst_query_add_allocation_pool (query, pool, udpsrc->mtu, 0, 0);
540
541 gst_object_unref (pool);
542
543 return TRUE;
544 }
545
546 /* not 100% correct, but a good upper bound for memory allocation purposes */
547 #define MAX_IPV4_UDP_PACKET_SIZE (65536 - 8)
548
549 GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
550 #define GST_CAT_DEFAULT (udpsrc_debug)
551
552 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
553 GST_PAD_SRC,
554 GST_PAD_ALWAYS,
555 GST_STATIC_CAPS_ANY);
556
557 #define UDP_DEFAULT_PORT 5004
558 #define UDP_DEFAULT_MULTICAST_GROUP "0.0.0.0"
559 #define UDP_DEFAULT_MULTICAST_IFACE NULL
560 #define UDP_DEFAULT_URI "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
561 #define UDP_DEFAULT_CAPS NULL
562 #define UDP_DEFAULT_SOCKET NULL
563 #define UDP_DEFAULT_BUFFER_SIZE 0
564 #define UDP_DEFAULT_TIMEOUT 0
565 #define UDP_DEFAULT_SKIP_FIRST_BYTES 0
566 #define UDP_DEFAULT_CLOSE_SOCKET TRUE
567 #define UDP_DEFAULT_USED_SOCKET NULL
568 #define UDP_DEFAULT_AUTO_MULTICAST TRUE
569 #define UDP_DEFAULT_REUSE TRUE
570 #define UDP_DEFAULT_LOOP TRUE
571 #define UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS TRUE
572 #define UDP_DEFAULT_MTU (1492)
573
574 enum
575 {
576 PROP_0,
577
578 PROP_PORT,
579 PROP_MULTICAST_GROUP,
580 PROP_MULTICAST_IFACE,
581 PROP_URI,
582 PROP_CAPS,
583 PROP_SOCKET,
584 PROP_BUFFER_SIZE,
585 PROP_TIMEOUT,
586 PROP_SKIP_FIRST_BYTES,
587 PROP_CLOSE_SOCKET,
588 PROP_USED_SOCKET,
589 PROP_AUTO_MULTICAST,
590 PROP_REUSE,
591 PROP_ADDRESS,
592 PROP_LOOP,
593 PROP_RETRIEVE_SENDER_ADDRESS,
594 PROP_MTU,
595 PROP_SOCKET_TIMESTAMP,
596 };
597
598 static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
599
600 static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter);
601 static gboolean gst_udpsrc_close (GstUDPSrc * src);
602 static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc);
603 static gboolean gst_udpsrc_unlock_stop (GstBaseSrc * bsrc);
604 static GstFlowReturn gst_udpsrc_fill (GstPushSrc * psrc, GstBuffer * outbuf);
605
606 static void gst_udpsrc_finalize (GObject * object);
607
608 static void gst_udpsrc_set_property (GObject * object, guint prop_id,
609 const GValue * value, GParamSpec * pspec);
610 static void gst_udpsrc_get_property (GObject * object, guint prop_id,
611 GValue * value, GParamSpec * pspec);
612
613 static GstStateChangeReturn gst_udpsrc_change_state (GstElement * element,
614 GstStateChange transition);
615
616 #define gst_udpsrc_parent_class parent_class
617 G_DEFINE_TYPE_WITH_CODE (GstUDPSrc, gst_udpsrc, GST_TYPE_PUSH_SRC,
618 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_udpsrc_uri_handler_init));
619 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (udpsrc, "udpsrc", GST_RANK_NONE,
620 GST_TYPE_UDPSRC, udp_element_init (plugin));
621
622 static void
gst_udpsrc_class_init(GstUDPSrcClass * klass)623 gst_udpsrc_class_init (GstUDPSrcClass * klass)
624 {
625 GObjectClass *gobject_class;
626 GstElementClass *gstelement_class;
627 GstBaseSrcClass *gstbasesrc_class;
628 GstPushSrcClass *gstpushsrc_class;
629
630 gobject_class = (GObjectClass *) klass;
631 gstelement_class = (GstElementClass *) klass;
632 gstbasesrc_class = (GstBaseSrcClass *) klass;
633 gstpushsrc_class = (GstPushSrcClass *) klass;
634
635 GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src");
636
637 #ifdef IP_PKTINFO
638 GST_TYPE_IP_PKTINFO_MESSAGE;
639 #endif
640 #ifdef IPV6_PKTINFO
641 GST_TYPE_IPV6_PKTINFO_MESSAGE;
642 #endif
643 #ifdef IP_RECVDSTADDR
644 GST_TYPE_IP_RECVDSTADDR_MESSAGE;
645 #endif
646 #ifdef SO_TIMESTAMPNS
647 GST_TYPE_SOCKET_TIMESTAMP_MESSAGE;
648 #endif
649
650 gobject_class->set_property = gst_udpsrc_set_property;
651 gobject_class->get_property = gst_udpsrc_get_property;
652 gobject_class->finalize = gst_udpsrc_finalize;
653
654 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
655 g_param_spec_int ("port", "Port",
656 "The port to receive the packets from, 0=allocate", 0, G_MAXUINT16,
657 UDP_DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
658 /* FIXME 2.0: Remove multicast-group property */
659 #ifndef GST_REMOVE_DEPRECATED
660 g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
661 g_param_spec_string ("multicast-group", "Multicast Group",
662 "The Address of multicast group to join. (DEPRECATED: "
663 "Use address property instead)", UDP_DEFAULT_MULTICAST_GROUP,
664 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
665 #endif
666 g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
667 g_param_spec_string ("multicast-iface", "Multicast Interface",
668 "The network interface on which to join the multicast group."
669 "This allows multiple interfaces separated by comma. (\"eth0,eth1\")",
670 UDP_DEFAULT_MULTICAST_IFACE,
671 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
672 g_object_class_install_property (gobject_class, PROP_URI,
673 g_param_spec_string ("uri", "URI",
674 "URI in the form of udp://multicast_group:port", UDP_DEFAULT_URI,
675 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
676 g_object_class_install_property (gobject_class, PROP_CAPS,
677 g_param_spec_boxed ("caps", "Caps",
678 "The caps of the source pad", GST_TYPE_CAPS,
679 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
680 g_object_class_install_property (gobject_class, PROP_SOCKET,
681 g_param_spec_object ("socket", "Socket",
682 "Socket to use for UDP reception. (NULL == allocate)",
683 G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
684 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
685 g_param_spec_int ("buffer-size", "Buffer Size",
686 "Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
687 UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
688 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
689 g_param_spec_uint64 ("timeout", "Timeout",
690 "Post a message after timeout nanoseconds (0 = disabled)", 0,
691 G_MAXUINT64, UDP_DEFAULT_TIMEOUT,
692 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
693 g_object_class_install_property (G_OBJECT_CLASS (klass),
694 PROP_SKIP_FIRST_BYTES, g_param_spec_int ("skip-first-bytes",
695 "Skip first bytes", "number of bytes to skip for each udp packet", 0,
696 G_MAXINT, UDP_DEFAULT_SKIP_FIRST_BYTES,
697 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
698 g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
699 g_param_spec_boolean ("close-socket", "Close socket",
700 "Close socket if passed as property on state change",
701 UDP_DEFAULT_CLOSE_SOCKET,
702 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
703 g_object_class_install_property (gobject_class, PROP_USED_SOCKET,
704 g_param_spec_object ("used-socket", "Socket Handle",
705 "Socket currently in use for UDP reception. (NULL = no socket)",
706 G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
707 g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
708 g_param_spec_boolean ("auto-multicast", "Auto Multicast",
709 "Automatically join/leave multicast groups",
710 UDP_DEFAULT_AUTO_MULTICAST,
711 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
712 g_object_class_install_property (gobject_class, PROP_REUSE,
713 g_param_spec_boolean ("reuse", "Reuse", "Enable reuse of the port",
714 UDP_DEFAULT_REUSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
715 g_object_class_install_property (gobject_class, PROP_ADDRESS,
716 g_param_spec_string ("address", "Address",
717 "Address to receive packets for. This is equivalent to the "
718 "multicast-group property for now", UDP_DEFAULT_MULTICAST_GROUP,
719 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
720 /**
721 * GstUDPSrc:loop:
722 *
723 * Can be used to disable multicast loopback.
724 *
725 * Since: 1.8
726 */
727 g_object_class_install_property (gobject_class, PROP_LOOP,
728 g_param_spec_boolean ("loop", "Multicast Loopback",
729 "Used for setting the multicast loop parameter. TRUE = enable,"
730 " FALSE = disable", UDP_DEFAULT_LOOP,
731 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
732 /**
733 * GstUDPSrc:retrieve-sender-address:
734 *
735 * Whether to retrieve the sender address and add it to the buffers as
736 * meta. Disabling this might result in minor performance improvements
737 * in certain scenarios.
738 *
739 * Since: 1.10
740 */
741 g_object_class_install_property (gobject_class, PROP_RETRIEVE_SENDER_ADDRESS,
742 g_param_spec_boolean ("retrieve-sender-address",
743 "Retrieve Sender Address",
744 "Whether to retrieve the sender address and add it to buffers as "
745 "meta. Disabling this might result in minor performance improvements "
746 "in certain scenarios", UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS,
747 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
748 /**
749 * GstUDPSrc:mtu:
750 *
751 * Maximum expected packet size. This directly defines the allocation
752 * size of the receive buffer pool.
753 *
754 * In case more data is received, a new #GstMemory is appended to the
755 * output buffer, ensuring no data is lost, this however leads to that
756 * buffer being freed and reallocated.
757 *
758 * Since: 1.14
759 */
760 g_object_class_install_property (gobject_class, PROP_MTU,
761 g_param_spec_uint ("mtu", "Expected Maximum Transmission Unit",
762 "Maximum expected packet size. This directly defines the allocation"
763 "size of the receive buffer pool.",
764 0, G_MAXINT, UDP_DEFAULT_MTU,
765 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
766
767 /**
768 * GstUDPSrc:socket-timestamp:
769 *
770 * Can be used to read the timestamp on incoming buffers using socket
771 * control messages and set as the DTS.
772 *
773 * Since: 1.20
774 */
775 g_object_class_install_property (gobject_class, PROP_SOCKET_TIMESTAMP,
776 g_param_spec_enum ("socket-timestamp",
777 "Use Socket Control Message Timestamp for DTS",
778 "Used for adding alternative timestamp using SO_TIMESTAMP.",
779 GST_SOCKET_TIMESTAMP_MODE, GST_SOCKET_TIMESTAMP_MODE_REALTIME,
780 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
781
782 gst_element_class_add_static_pad_template (gstelement_class, &src_template);
783
784 gst_element_class_set_static_metadata (gstelement_class,
785 "UDP packet receiver", "Source/Network",
786 "Receive data over the network via UDP",
787 "Wim Taymans <wim@fluendo.com>, "
788 "Thijs Vermeir <thijs.vermeir@barco.com>");
789
790 gstelement_class->change_state = gst_udpsrc_change_state;
791
792 gstbasesrc_class->unlock = gst_udpsrc_unlock;
793 gstbasesrc_class->unlock_stop = gst_udpsrc_unlock_stop;
794 gstbasesrc_class->get_caps = gst_udpsrc_getcaps;
795 gstbasesrc_class->decide_allocation = gst_udpsrc_decide_allocation;
796
797 gstpushsrc_class->fill = gst_udpsrc_fill;
798
799 gst_type_mark_as_plugin_api (GST_TYPE_SOCKET_TIMESTAMP_MODE, 0);
800 }
801
802 static void
gst_udpsrc_init(GstUDPSrc * udpsrc)803 gst_udpsrc_init (GstUDPSrc * udpsrc)
804 {
805 udpsrc->uri =
806 g_strdup_printf ("udp://%s:%u", UDP_DEFAULT_MULTICAST_GROUP,
807 UDP_DEFAULT_PORT);
808
809 udpsrc->address = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
810 udpsrc->port = UDP_DEFAULT_PORT;
811 udpsrc->socket = UDP_DEFAULT_SOCKET;
812 udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
813 udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
814 udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
815 udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
816 udpsrc->close_socket = UDP_DEFAULT_CLOSE_SOCKET;
817 udpsrc->external_socket = (udpsrc->socket != NULL);
818 udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
819 udpsrc->used_socket = UDP_DEFAULT_USED_SOCKET;
820 udpsrc->reuse = UDP_DEFAULT_REUSE;
821 udpsrc->loop = UDP_DEFAULT_LOOP;
822 udpsrc->retrieve_sender_address = UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS;
823 udpsrc->mtu = UDP_DEFAULT_MTU;
824
825 /* configure basesrc to be a live source */
826 gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
827 /* make basesrc output a segment in time */
828 gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
829 /* make basesrc set timestamps on outgoing buffers based on the running_time
830 * when they were captured */
831 gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
832 }
833
834 static void
gst_udpsrc_finalize(GObject * object)835 gst_udpsrc_finalize (GObject * object)
836 {
837 GstUDPSrc *udpsrc;
838
839 udpsrc = GST_UDPSRC (object);
840
841 if (udpsrc->caps)
842 gst_caps_unref (udpsrc->caps);
843 udpsrc->caps = NULL;
844
845 g_free (udpsrc->multi_iface);
846 udpsrc->multi_iface = NULL;
847
848 g_free (udpsrc->uri);
849 udpsrc->uri = NULL;
850
851 g_free (udpsrc->address);
852 udpsrc->address = NULL;
853
854 if (udpsrc->socket)
855 g_object_unref (udpsrc->socket);
856 udpsrc->socket = NULL;
857
858 if (udpsrc->used_socket)
859 g_object_unref (udpsrc->used_socket);
860 udpsrc->used_socket = NULL;
861
862 if (udpsrc->extra_mem)
863 gst_memory_unref (udpsrc->extra_mem);
864 udpsrc->extra_mem = NULL;
865
866 G_OBJECT_CLASS (parent_class)->finalize (object);
867 }
868
869 static GstCaps *
gst_udpsrc_getcaps(GstBaseSrc * src,GstCaps * filter)870 gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter)
871 {
872 GstUDPSrc *udpsrc;
873 GstCaps *caps, *result;
874
875 udpsrc = GST_UDPSRC (src);
876
877 GST_OBJECT_LOCK (src);
878 if ((caps = udpsrc->caps))
879 gst_caps_ref (caps);
880 GST_OBJECT_UNLOCK (src);
881
882 if (caps) {
883 if (filter) {
884 result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
885 gst_caps_unref (caps);
886 } else {
887 result = caps;
888 }
889 } else {
890 result = (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
891 }
892 return result;
893 }
894
895 static void
gst_udpsrc_create_cancellable(GstUDPSrc * src)896 gst_udpsrc_create_cancellable (GstUDPSrc * src)
897 {
898 GPollFD pollfd;
899
900 src->cancellable = g_cancellable_new ();
901 src->made_cancel_fd = g_cancellable_make_pollfd (src->cancellable, &pollfd);
902 }
903
904 static void
gst_udpsrc_free_cancellable(GstUDPSrc * src)905 gst_udpsrc_free_cancellable (GstUDPSrc * src)
906 {
907 if (src->made_cancel_fd) {
908 g_cancellable_release_fd (src->cancellable);
909 src->made_cancel_fd = FALSE;
910 }
911 g_object_unref (src->cancellable);
912 src->cancellable = NULL;
913 }
914
915 static GstFlowReturn
gst_udpsrc_fill(GstPushSrc * psrc,GstBuffer * outbuf)916 gst_udpsrc_fill (GstPushSrc * psrc, GstBuffer * outbuf)
917 {
918 GstUDPSrc *udpsrc;
919 GSocketAddress *saddr = NULL;
920 GSocketAddress **p_saddr;
921 gint flags = G_SOCKET_MSG_NONE;
922 gboolean try_again;
923 GError *err = NULL;
924 gssize res;
925 gsize offset;
926 GSocketControlMessage **msgs = NULL;
927 GSocketControlMessage ***p_msgs;
928 gint n_msgs = 0, i;
929 GstMapInfo info;
930 GstMapInfo extra_info;
931 GInputVector ivec[2];
932
933 udpsrc = GST_UDPSRC_CAST (psrc);
934
935 /* optimization: use messages only in multicast mode and
936 * if we can't let the kernel do the filtering for us */
937 p_msgs =
938 (g_inet_address_get_is_multicast (g_inet_socket_address_get_address
939 (udpsrc->addr))) ? &msgs : NULL;
940 #ifdef IP_MULTICAST_ALL
941 if (g_inet_address_get_family (g_inet_socket_address_get_address
942 (udpsrc->addr)) == G_SOCKET_FAMILY_IPV4)
943 p_msgs = NULL;
944 #endif
945 #ifdef SO_TIMESTAMPNS
946 if (udpsrc->socket_timestamp_mode == GST_SOCKET_TIMESTAMP_MODE_REALTIME)
947 p_msgs = &msgs;
948 #endif
949
950 /* Retrieve sender address unless we've been configured not to do so */
951 p_saddr = (udpsrc->retrieve_sender_address) ? &saddr : NULL;
952
953 if (!gst_buffer_map (outbuf, &info, GST_MAP_READWRITE))
954 goto buffer_map_error;
955
956 ivec[0].buffer = info.data;
957 ivec[0].size = info.size;
958
959 /* Prepare memory in case the data size exceeds mtu */
960 if (udpsrc->extra_mem == NULL) {
961 GstBufferPool *pool;
962 GstStructure *config;
963 GstAllocator *allocator = NULL;
964 GstAllocationParams params;
965
966 pool = gst_base_src_get_buffer_pool (GST_BASE_SRC_CAST (psrc));
967 config = gst_buffer_pool_get_config (pool);
968 gst_buffer_pool_config_get_allocator (config, &allocator, ¶ms);
969
970 udpsrc->extra_mem =
971 gst_allocator_alloc (allocator, MAX_IPV4_UDP_PACKET_SIZE, ¶ms);
972
973 gst_object_unref (pool);
974 gst_structure_free (config);
975 if (allocator)
976 gst_object_unref (allocator);
977 }
978
979 if (!gst_memory_map (udpsrc->extra_mem, &extra_info, GST_MAP_READWRITE))
980 goto memory_map_error;
981
982 ivec[1].buffer = extra_info.data;
983 ivec[1].size = extra_info.size;
984
985 retry:
986 if (saddr != NULL) {
987 g_object_unref (saddr);
988 saddr = NULL;
989 }
990
991 do {
992 gint64 timeout;
993
994 try_again = FALSE;
995
996 if (udpsrc->timeout)
997 timeout = udpsrc->timeout / 1000;
998 else
999 timeout = -1;
1000
1001 GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GINT64_FORMAT, timeout);
1002
1003 if (!g_socket_condition_timed_wait (udpsrc->used_socket, G_IO_IN | G_IO_PRI,
1004 timeout, udpsrc->cancellable, &err)) {
1005 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)
1006 || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
1007 goto stopped;
1008 } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
1009 g_clear_error (&err);
1010 /* timeout, post element message */
1011 gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
1012 gst_message_new_element (GST_OBJECT_CAST (udpsrc),
1013 gst_structure_new ("GstUDPSrcTimeout",
1014 "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
1015 } else {
1016 goto select_error;
1017 }
1018
1019 try_again = TRUE;
1020 }
1021 } while (G_UNLIKELY (try_again));
1022
1023 res =
1024 g_socket_receive_message (udpsrc->used_socket, p_saddr, ivec, 2,
1025 p_msgs, &n_msgs, &flags, udpsrc->cancellable, &err);
1026
1027 if (G_UNLIKELY (res < 0)) {
1028 /* G_IO_ERROR_HOST_UNREACHABLE for a UDP socket means that a packet sent
1029 * with udpsink generated a "port unreachable" ICMP response. We ignore
1030 * that and try again.
1031 * On Windows we get G_IO_ERROR_CONNECTION_CLOSED instead */
1032 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_HOST_UNREACHABLE) ||
1033 g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) {
1034 g_clear_error (&err);
1035 goto retry;
1036 }
1037 goto receive_error;
1038 }
1039
1040 /* Retry if multicast and the destination address is not ours. We don't want
1041 * to receive arbitrary packets */
1042 if (p_msgs) {
1043 GInetAddress *iaddr = g_inet_socket_address_get_address (udpsrc->addr);
1044 gboolean skip_packet = FALSE;
1045 gsize iaddr_size = g_inet_address_get_native_size (iaddr);
1046 const guint8 *iaddr_bytes = g_inet_address_to_bytes (iaddr);
1047
1048 for (i = 0; i < n_msgs && !skip_packet; i++) {
1049 #ifdef IP_PKTINFO
1050 if (GST_IS_IP_PKTINFO_MESSAGE (msgs[i])) {
1051 GstIPPktinfoMessage *msg = GST_IP_PKTINFO_MESSAGE (msgs[i]);
1052
1053 if (sizeof (msg->addr) == iaddr_size
1054 && memcmp (iaddr_bytes, &msg->addr, sizeof (msg->addr)))
1055 skip_packet = TRUE;
1056 }
1057 #endif
1058 #ifdef IPV6_PKTINFO
1059 if (GST_IS_IPV6_PKTINFO_MESSAGE (msgs[i])) {
1060 GstIPV6PktinfoMessage *msg = GST_IPV6_PKTINFO_MESSAGE (msgs[i]);
1061
1062 if (sizeof (msg->addr) == iaddr_size
1063 && memcmp (iaddr_bytes, &msg->addr, sizeof (msg->addr)))
1064 skip_packet = TRUE;
1065 }
1066 #endif
1067 #ifdef IP_RECVDSTADDR
1068 if (GST_IS_IP_RECVDSTADDR_MESSAGE (msgs[i])) {
1069 GstIPRecvdstaddrMessage *msg = GST_IP_RECVDSTADDR_MESSAGE (msgs[i]);
1070
1071 if (sizeof (msg->addr) == iaddr_size
1072 && memcmp (iaddr_bytes, &msg->addr, sizeof (msg->addr)))
1073 skip_packet = TRUE;
1074 }
1075 #endif
1076 #ifdef SO_TIMESTAMPNS
1077 if (GST_IS_SOCKET_TIMESTAMP_MESSAGE (msgs[i])) {
1078 GstSocketTimestampMessage *msg = GST_SOCKET_TIMESTAMP_MESSAGE (msgs[i]);
1079 GstClock *clock;
1080 GstClockTime socket_ts;
1081
1082 socket_ts = GST_TIMESPEC_TO_TIME (msg->socket_ts);
1083 GST_TRACE_OBJECT (udpsrc,
1084 "Got SCM_TIMESTAMPNS %" GST_TIME_FORMAT " in msg",
1085 GST_TIME_ARGS (socket_ts));
1086
1087 clock = gst_element_get_clock (GST_ELEMENT_CAST (udpsrc));
1088 if (clock != NULL) {
1089 gint64 adjust_dts, cur_sys_time, delta;
1090 GstClockTime base_time, cur_gst_clk_time, running_time;
1091
1092 /*
1093 * We use g_get_real_time as the time reference for SCM timestamps
1094 * is always CLOCK_REALTIME.
1095 */
1096 cur_sys_time = g_get_real_time () * GST_USECOND;
1097 cur_gst_clk_time = gst_clock_get_time (clock);
1098
1099 delta = (gint64) cur_sys_time - (gint64) socket_ts;
1100 if (delta < 0) {
1101 /*
1102 * The current system time will always be greater than the SCM
1103 * timestamp as the packet would have been timestamped at least
1104 * some clock cycles before. If it is not, then the system time
1105 * was adjusted. Since we cannot rely on the delta calculation in
1106 * such a case, set the DTS to current pipeline clock when this
1107 * happens.
1108 */
1109 GST_LOG_OBJECT (udpsrc,
1110 "Current system time is behind SCM timestamp, setting DTS to pipeline clock");
1111 GST_BUFFER_DTS (outbuf) = cur_gst_clk_time;
1112 } else {
1113 base_time = gst_element_get_base_time (GST_ELEMENT_CAST (udpsrc));
1114 running_time = cur_gst_clk_time - base_time;
1115 adjust_dts = (gint64) running_time - delta;
1116 /*
1117 * If the system time was adjusted much further ahead, we might
1118 * end up with delta > cur_gst_clk_time. Set the DTS to current
1119 * pipeline clock for this scenario as well.
1120 */
1121 if (adjust_dts < 0) {
1122 GST_LOG_OBJECT (udpsrc,
1123 "Current system time much ahead in time, setting DTS to pipeline clock");
1124 GST_BUFFER_DTS (outbuf) = cur_gst_clk_time;
1125 } else {
1126 GST_BUFFER_DTS (outbuf) = adjust_dts;
1127 GST_LOG_OBJECT (udpsrc, "Setting DTS to %" GST_TIME_FORMAT,
1128 GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)));
1129 }
1130 }
1131 g_object_unref (clock);
1132 } else {
1133 GST_ERROR_OBJECT (udpsrc,
1134 "Failed to get element clock, not setting DTS");
1135 }
1136 }
1137 #endif
1138 }
1139
1140 for (i = 0; i < n_msgs; i++) {
1141 g_object_unref (msgs[i]);
1142 }
1143 g_free (msgs);
1144
1145 if (skip_packet) {
1146 GST_DEBUG_OBJECT (udpsrc,
1147 "Dropping packet for a different multicast address");
1148 goto retry;
1149 }
1150 }
1151
1152 gst_buffer_unmap (outbuf, &info);
1153 gst_memory_unmap (udpsrc->extra_mem, &extra_info);
1154
1155 /* If this is the case, the buffer will be freed once unreffed,
1156 * and the buffer pool will have to reallocate a new one.
1157 */
1158 if (res > udpsrc->mtu) {
1159 gst_buffer_append_memory (outbuf, udpsrc->extra_mem);
1160 udpsrc->extra_mem = NULL;
1161 }
1162
1163 offset = udpsrc->skip_first_bytes;
1164
1165 if (G_UNLIKELY (offset > 0 && res < offset))
1166 goto skip_error;
1167
1168 gst_buffer_resize (outbuf, offset, res - offset);
1169
1170 /* use buffer metadata so receivers can also track the address */
1171 if (saddr) {
1172 gst_buffer_add_net_address_meta (outbuf, saddr);
1173 g_object_unref (saddr);
1174 saddr = NULL;
1175 }
1176
1177 GST_LOG_OBJECT (udpsrc, "read packet of %d bytes", (int) res);
1178
1179 return GST_FLOW_OK;
1180
1181 /* ERRORS */
1182 buffer_map_error:
1183 {
1184 GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
1185 ("Failed to map memory"));
1186 return GST_FLOW_ERROR;
1187 }
1188 memory_map_error:
1189 {
1190 gst_buffer_unmap (outbuf, &info);
1191 GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
1192 ("Failed to map memory"));
1193 return GST_FLOW_ERROR;
1194 }
1195 select_error:
1196 {
1197 gst_buffer_unmap (outbuf, &info);
1198 gst_memory_unmap (udpsrc->extra_mem, &extra_info);
1199 GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
1200 ("select error: %s", err->message));
1201 g_clear_error (&err);
1202 return GST_FLOW_ERROR;
1203 }
1204 stopped:
1205 {
1206 gst_buffer_unmap (outbuf, &info);
1207 gst_memory_unmap (udpsrc->extra_mem, &extra_info);
1208 GST_DEBUG ("stop called");
1209 g_clear_error (&err);
1210 return GST_FLOW_FLUSHING;
1211 }
1212 receive_error:
1213 {
1214 gst_buffer_unmap (outbuf, &info);
1215 gst_memory_unmap (udpsrc->extra_mem, &extra_info);
1216 g_clear_object (&saddr);
1217 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY) ||
1218 g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
1219 g_clear_error (&err);
1220 return GST_FLOW_FLUSHING;
1221 } else {
1222 GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
1223 ("receive error %" G_GSSIZE_FORMAT ": %s", res, err->message));
1224 g_clear_error (&err);
1225 return GST_FLOW_ERROR;
1226 }
1227 }
1228 skip_error:
1229 {
1230 g_clear_object (&saddr);
1231 GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),
1232 ("UDP buffer to small to skip header"));
1233 return GST_FLOW_ERROR;
1234 }
1235 }
1236
1237 static gboolean
gst_udpsrc_set_uri(GstUDPSrc * src,const gchar * uri,GError ** error)1238 gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri, GError ** error)
1239 {
1240 gchar *address;
1241 guint16 port;
1242
1243 if (!gst_udp_parse_uri (uri, &address, &port))
1244 goto wrong_uri;
1245
1246 if (port == (guint16) - 1)
1247 port = UDP_DEFAULT_PORT;
1248
1249 g_free (src->address);
1250 src->address = address;
1251 src->port = port;
1252
1253 g_free (src->uri);
1254 src->uri = g_strdup (uri);
1255
1256 return TRUE;
1257
1258 /* ERRORS */
1259 wrong_uri:
1260 {
1261 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
1262 ("error parsing uri %s", uri));
1263 g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
1264 "Could not parse UDP URI");
1265 return FALSE;
1266 }
1267 }
1268
1269 static void
gst_udpsrc_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1270 gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
1271 GParamSpec * pspec)
1272 {
1273 GstUDPSrc *udpsrc = GST_UDPSRC (object);
1274
1275 switch (prop_id) {
1276 case PROP_BUFFER_SIZE:
1277 udpsrc->buffer_size = g_value_get_int (value);
1278 break;
1279 case PROP_PORT:
1280 udpsrc->port = g_value_get_int (value);
1281 g_free (udpsrc->uri);
1282 udpsrc->uri =
1283 g_strdup_printf ("udp://%s:%u", udpsrc->address, udpsrc->port);
1284 break;
1285 case PROP_MULTICAST_GROUP:
1286 case PROP_ADDRESS:
1287 {
1288 const gchar *group;
1289
1290 g_free (udpsrc->address);
1291 if ((group = g_value_get_string (value)))
1292 udpsrc->address = g_strdup (group);
1293 else
1294 udpsrc->address = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
1295
1296 g_free (udpsrc->uri);
1297 udpsrc->uri =
1298 g_strdup_printf ("udp://%s:%u", udpsrc->address, udpsrc->port);
1299 break;
1300 }
1301 case PROP_MULTICAST_IFACE:
1302 g_free (udpsrc->multi_iface);
1303
1304 if (g_value_get_string (value) == NULL)
1305 udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
1306 else
1307 udpsrc->multi_iface = g_value_dup_string (value);
1308 break;
1309 case PROP_URI:
1310 gst_udpsrc_set_uri (udpsrc, g_value_get_string (value), NULL);
1311 break;
1312 case PROP_CAPS:
1313 {
1314 const GstCaps *new_caps_val = gst_value_get_caps (value);
1315 GstCaps *new_caps;
1316 GstCaps *old_caps;
1317
1318 if (new_caps_val == NULL) {
1319 new_caps = gst_caps_new_any ();
1320 } else {
1321 new_caps = gst_caps_copy (new_caps_val);
1322 }
1323
1324 GST_OBJECT_LOCK (udpsrc);
1325 old_caps = udpsrc->caps;
1326 udpsrc->caps = new_caps;
1327 GST_OBJECT_UNLOCK (udpsrc);
1328 if (old_caps)
1329 gst_caps_unref (old_caps);
1330
1331 gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (udpsrc));
1332 break;
1333 }
1334 case PROP_SOCKET:
1335 if (udpsrc->socket != NULL && udpsrc->socket != udpsrc->used_socket &&
1336 udpsrc->close_socket) {
1337 GError *err = NULL;
1338
1339 if (!g_socket_close (udpsrc->socket, &err)) {
1340 GST_ERROR ("failed to close socket %p: %s", udpsrc->socket,
1341 err->message);
1342 g_clear_error (&err);
1343 }
1344 }
1345 if (udpsrc->socket)
1346 g_object_unref (udpsrc->socket);
1347 udpsrc->socket = g_value_dup_object (value);
1348 GST_DEBUG ("setting socket to %p", udpsrc->socket);
1349 break;
1350 case PROP_TIMEOUT:
1351 udpsrc->timeout = g_value_get_uint64 (value);
1352 break;
1353 case PROP_SKIP_FIRST_BYTES:
1354 udpsrc->skip_first_bytes = g_value_get_int (value);
1355 break;
1356 case PROP_CLOSE_SOCKET:
1357 udpsrc->close_socket = g_value_get_boolean (value);
1358 break;
1359 case PROP_AUTO_MULTICAST:
1360 udpsrc->auto_multicast = g_value_get_boolean (value);
1361 break;
1362 case PROP_REUSE:
1363 udpsrc->reuse = g_value_get_boolean (value);
1364 break;
1365 case PROP_LOOP:
1366 udpsrc->loop = g_value_get_boolean (value);
1367 break;
1368 case PROP_RETRIEVE_SENDER_ADDRESS:
1369 udpsrc->retrieve_sender_address = g_value_get_boolean (value);
1370 break;
1371 case PROP_MTU:
1372 udpsrc->mtu = g_value_get_uint (value);
1373 break;
1374 case PROP_SOCKET_TIMESTAMP:
1375 udpsrc->socket_timestamp_mode = g_value_get_enum (value);
1376 break;
1377 default:
1378 break;
1379 }
1380 }
1381
1382 static void
gst_udpsrc_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1383 gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
1384 GParamSpec * pspec)
1385 {
1386 GstUDPSrc *udpsrc = GST_UDPSRC (object);
1387
1388 switch (prop_id) {
1389 case PROP_BUFFER_SIZE:
1390 g_value_set_int (value, udpsrc->buffer_size);
1391 break;
1392 case PROP_PORT:
1393 g_value_set_int (value, udpsrc->port);
1394 break;
1395 case PROP_MULTICAST_GROUP:
1396 case PROP_ADDRESS:
1397 g_value_set_string (value, udpsrc->address);
1398 break;
1399 case PROP_MULTICAST_IFACE:
1400 g_value_set_string (value, udpsrc->multi_iface);
1401 break;
1402 case PROP_URI:
1403 g_value_set_string (value, udpsrc->uri);
1404 break;
1405 case PROP_CAPS:
1406 GST_OBJECT_LOCK (udpsrc);
1407 gst_value_set_caps (value, udpsrc->caps);
1408 GST_OBJECT_UNLOCK (udpsrc);
1409 break;
1410 case PROP_SOCKET:
1411 g_value_set_object (value, udpsrc->socket);
1412 break;
1413 case PROP_TIMEOUT:
1414 g_value_set_uint64 (value, udpsrc->timeout);
1415 break;
1416 case PROP_SKIP_FIRST_BYTES:
1417 g_value_set_int (value, udpsrc->skip_first_bytes);
1418 break;
1419 case PROP_CLOSE_SOCKET:
1420 g_value_set_boolean (value, udpsrc->close_socket);
1421 break;
1422 case PROP_USED_SOCKET:
1423 g_value_set_object (value, udpsrc->used_socket);
1424 break;
1425 case PROP_AUTO_MULTICAST:
1426 g_value_set_boolean (value, udpsrc->auto_multicast);
1427 break;
1428 case PROP_REUSE:
1429 g_value_set_boolean (value, udpsrc->reuse);
1430 break;
1431 case PROP_LOOP:
1432 g_value_set_boolean (value, udpsrc->loop);
1433 break;
1434 case PROP_RETRIEVE_SENDER_ADDRESS:
1435 g_value_set_boolean (value, udpsrc->retrieve_sender_address);
1436 break;
1437 case PROP_MTU:
1438 g_value_set_uint (value, udpsrc->mtu);
1439 break;
1440 case PROP_SOCKET_TIMESTAMP:
1441 g_value_set_enum (value, udpsrc->socket_timestamp_mode);
1442 break;
1443 default:
1444 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1445 break;
1446 }
1447 }
1448
1449 static GInetAddress *
gst_udpsrc_resolve(GstUDPSrc * src,const gchar * address)1450 gst_udpsrc_resolve (GstUDPSrc * src, const gchar * address)
1451 {
1452 GInetAddress *addr;
1453 GError *err = NULL;
1454 GResolver *resolver;
1455
1456 addr = g_inet_address_new_from_string (address);
1457 if (!addr) {
1458 GList *results;
1459
1460 GST_DEBUG_OBJECT (src, "resolving IP address for host %s", address);
1461 resolver = g_resolver_get_default ();
1462 results =
1463 g_resolver_lookup_by_name (resolver, address, src->cancellable, &err);
1464 if (!results)
1465 goto name_resolve;
1466 addr = G_INET_ADDRESS (g_object_ref (results->data));
1467
1468 g_resolver_free_addresses (results);
1469 g_object_unref (resolver);
1470 }
1471 #ifndef GST_DISABLE_GST_DEBUG
1472 {
1473 gchar *ip = g_inet_address_to_string (addr);
1474
1475 GST_DEBUG_OBJECT (src, "IP address for host %s is %s", address, ip);
1476 g_free (ip);
1477 }
1478 #endif
1479
1480 return addr;
1481
1482 name_resolve:
1483 {
1484 GST_WARNING_OBJECT (src, "Failed to resolve %s: %s", address, err->message);
1485 g_clear_error (&err);
1486 g_object_unref (resolver);
1487 return NULL;
1488 }
1489 }
1490
1491 static gint
gst_udpsrc_get_rcvbuf(GstUDPSrc * src)1492 gst_udpsrc_get_rcvbuf (GstUDPSrc * src)
1493 {
1494 gint val = 0;
1495
1496 /* read the value of the receive buffer. Note that on linux this returns
1497 * 2x the value we set because the kernel allocates extra memory for
1498 * metadata. The default on Linux is about 100K (which is about 50K
1499 * without metadata) */
1500 if (!g_socket_get_option (src->used_socket, SOL_SOCKET, SO_RCVBUF, &val,
1501 NULL)) {
1502 GST_DEBUG_OBJECT (src, "could not get udp buffer size");
1503 return 0;
1504 }
1505 #ifdef __linux__
1506 /* Devise by 2 so that the numbers matches when we do get/set */
1507 val /= 2;
1508 #endif
1509
1510 return val;
1511 }
1512
1513 /* create a socket for sending to remote machine */
1514 static gboolean
gst_udpsrc_open(GstUDPSrc * src)1515 gst_udpsrc_open (GstUDPSrc * src)
1516 {
1517 GInetAddress *addr, *bind_addr;
1518 GSocketAddress *bind_saddr;
1519 GError *err = NULL;
1520
1521 gst_udpsrc_create_cancellable (src);
1522
1523 if (src->socket == NULL) {
1524 /* need to allocate a socket */
1525 GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->address,
1526 src->port);
1527
1528 addr = gst_udpsrc_resolve (src, src->address);
1529 if (!addr)
1530 goto name_resolve;
1531
1532 if ((src->used_socket =
1533 g_socket_new (g_inet_address_get_family (addr),
1534 G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
1535 goto no_socket;
1536
1537 src->external_socket = FALSE;
1538
1539 GST_DEBUG_OBJECT (src, "got socket %p", src->used_socket);
1540
1541 if (src->addr)
1542 g_object_unref (src->addr);
1543 src->addr =
1544 G_INET_SOCKET_ADDRESS (g_inet_socket_address_new (addr, src->port));
1545
1546 GST_DEBUG_OBJECT (src, "binding on port %d", src->port);
1547
1548 /* For multicast, bind to ANY and join the multicast group later */
1549 if (g_inet_address_get_is_multicast (addr))
1550 bind_addr = g_inet_address_new_any (g_inet_address_get_family (addr));
1551 else
1552 bind_addr = G_INET_ADDRESS (g_object_ref (addr));
1553
1554 g_object_unref (addr);
1555
1556 bind_saddr = g_inet_socket_address_new (bind_addr, src->port);
1557 g_object_unref (bind_addr);
1558 if (!g_socket_bind (src->used_socket, bind_saddr, src->reuse, &err)) {
1559 GST_ERROR_OBJECT (src, "%s: error binding to %s:%d", err->message,
1560 src->address, src->port);
1561 goto bind_error;
1562 }
1563
1564 g_object_unref (bind_saddr);
1565 g_socket_set_multicast_loopback (src->used_socket, src->loop);
1566 } else {
1567 GInetSocketAddress *local_addr;
1568
1569 GST_DEBUG_OBJECT (src, "using provided socket %p", src->socket);
1570 /* we use the configured socket, try to get some info about it */
1571 src->used_socket = G_SOCKET (g_object_ref (src->socket));
1572 src->external_socket = TRUE;
1573
1574 local_addr =
1575 G_INET_SOCKET_ADDRESS (g_socket_get_local_address (src->used_socket,
1576 &err));
1577 if (!local_addr)
1578 goto getsockname_error;
1579
1580 addr = gst_udpsrc_resolve (src, src->address);
1581 if (!addr)
1582 goto name_resolve;
1583
1584 /* If bound to ANY and address points to a multicast address, make
1585 * sure that address is not overridden with ANY but we have the
1586 * opportunity later to join the multicast address. This ensures that we
1587 * have the same behaviour as for sockets created by udpsrc */
1588 if (!src->auto_multicast ||
1589 !g_inet_address_get_is_any (g_inet_socket_address_get_address
1590 (local_addr))
1591 || !g_inet_address_get_is_multicast (addr)) {
1592 g_object_unref (addr);
1593 if (src->addr)
1594 g_object_unref (src->addr);
1595 src->addr = local_addr;
1596 } else {
1597 g_object_unref (local_addr);
1598 if (src->addr)
1599 g_object_unref (src->addr);
1600 src->addr =
1601 G_INET_SOCKET_ADDRESS (g_inet_socket_address_new (addr, src->port));
1602 g_object_unref (addr);
1603 }
1604 }
1605
1606 {
1607 gint val;
1608 GError *opt_err = NULL;
1609 gboolean force_rcvbuf G_GNUC_UNUSED = FALSE;
1610
1611 if (src->buffer_size != 0) {
1612 GST_INFO_OBJECT (src, "setting udp buffer of %d bytes", src->buffer_size);
1613 /* set buffer size, Note that on Linux this is typically limited to a
1614 * maximum of around 100K. Also a minimum of 128 bytes is required on
1615 * Linux. */
1616 if (!g_socket_set_option (src->used_socket, SOL_SOCKET, SO_RCVBUF,
1617 src->buffer_size, &opt_err)) {
1618 GST_INFO_OBJECT (src,
1619 "Could not create a buffer of requested %d bytes (%s) try forcing",
1620 src->buffer_size, opt_err->message);
1621 g_clear_error (&opt_err);
1622 force_rcvbuf = TRUE;
1623 }
1624 }
1625 #if defined(SO_RCVBUFFORCE)
1626 val = gst_udpsrc_get_rcvbuf (src);
1627 if (val < src->buffer_size)
1628 force_rcvbuf = TRUE;
1629
1630 if (force_rcvbuf) {
1631 GST_INFO_OBJECT (src,
1632 "forcibly setting udp buffer of %d bytes", src->buffer_size);
1633
1634 /* Will only work with CAP_NET_ADMIN privilege */
1635 if (!g_socket_set_option (src->used_socket, SOL_SOCKET, SO_RCVBUFFORCE,
1636 src->buffer_size, &opt_err)) {
1637 GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
1638 ("Could not create a buffer of requested %d bytes (%s). Need net.admin privilege?",
1639 src->buffer_size, opt_err->message));
1640 g_clear_error (&opt_err);
1641 }
1642 }
1643 #endif
1644
1645 val = gst_udpsrc_get_rcvbuf (src);
1646 if (val < src->buffer_size)
1647 GST_WARNING_OBJECT (src,
1648 "have udp buffer of %d bytes while %d were requested",
1649 val, src->buffer_size);
1650 else
1651 GST_INFO_OBJECT (src, "have udp buffer of %d bytes", val);
1652 }
1653
1654 g_socket_set_broadcast (src->used_socket, TRUE);
1655
1656 if (src->auto_multicast
1657 &&
1658 g_inet_address_get_is_multicast (g_inet_socket_address_get_address
1659 (src->addr))) {
1660
1661 if (src->multi_iface) {
1662 GStrv multi_ifaces = g_strsplit (src->multi_iface, ",", -1);
1663 gchar **ifaces = multi_ifaces;
1664 while (*ifaces) {
1665 g_strstrip (*ifaces);
1666 GST_DEBUG_OBJECT (src, "joining multicast group %s interface %s",
1667 src->address, *ifaces);
1668 if (!g_socket_join_multicast_group (src->used_socket,
1669 g_inet_socket_address_get_address (src->addr),
1670 FALSE, *ifaces, &err)) {
1671 g_strfreev (multi_ifaces);
1672 goto membership;
1673 }
1674
1675 ifaces++;
1676 }
1677 g_strfreev (multi_ifaces);
1678 } else {
1679 GST_DEBUG_OBJECT (src, "joining multicast group %s", src->address);
1680 if (!g_socket_join_multicast_group (src->used_socket,
1681 g_inet_socket_address_get_address (src->addr), FALSE, NULL, &err))
1682 goto membership;
1683 }
1684
1685 if (g_inet_address_get_family (g_inet_socket_address_get_address
1686 (src->addr)) == G_SOCKET_FAMILY_IPV4) {
1687 #if defined(IP_MULTICAST_ALL)
1688 if (!g_socket_set_option (src->used_socket, IPPROTO_IP, IP_MULTICAST_ALL,
1689 0, &err)) {
1690 GST_WARNING_OBJECT (src, "Failed to disable IP_MULTICAST_ALL: %s",
1691 err->message);
1692 g_clear_error (&err);
1693 }
1694 #elif defined(IP_PKTINFO)
1695 if (!g_socket_set_option (src->used_socket, IPPROTO_IP, IP_PKTINFO, TRUE,
1696 &err)) {
1697 GST_WARNING_OBJECT (src, "Failed to enable IP_PKTINFO: %s",
1698 err->message);
1699 g_clear_error (&err);
1700 }
1701 #elif defined(IP_RECVDSTADDR)
1702 if (!g_socket_set_option (src->used_socket, IPPROTO_IP, IP_RECVDSTADDR,
1703 TRUE, &err)) {
1704 GST_WARNING_OBJECT (src, "Failed to enable IP_RECVDSTADDR: %s",
1705 err->message);
1706 g_clear_error (&err);
1707 }
1708 #else
1709 #pragma message("No API available for getting IPv4 destination address")
1710 GST_WARNING_OBJECT (src, "No API available for getting IPv4 destination "
1711 "address, will receive packets for every destination to our port");
1712 #endif
1713 } else
1714 if (g_inet_address_get_family (g_inet_socket_address_get_address
1715 (src->addr)) == G_SOCKET_FAMILY_IPV6) {
1716 #ifdef IPV6_PKTINFO
1717 #ifdef IPV6_RECVPKTINFO
1718 if (!g_socket_set_option (src->used_socket, IPPROTO_IPV6,
1719 IPV6_RECVPKTINFO, TRUE, &err)) {
1720 #else
1721 if (!g_socket_set_option (src->used_socket, IPPROTO_IPV6, IPV6_PKTINFO,
1722 TRUE, &err)) {
1723 #endif
1724 GST_WARNING_OBJECT (src, "Failed to enable IPV6_PKTINFO: %s",
1725 err->message);
1726 g_clear_error (&err);
1727 }
1728 #else
1729 #pragma message("No API available for getting IPv6 destination address")
1730 GST_WARNING_OBJECT (src, "No API available for getting IPv6 destination "
1731 "address, will receive packets for every destination to our port");
1732 #endif
1733 }
1734 }
1735
1736 if (src->socket_timestamp_mode == GST_SOCKET_TIMESTAMP_MODE_REALTIME) {
1737 #ifdef SO_TIMESTAMPNS
1738 if (!g_socket_set_option (src->used_socket, SOL_SOCKET, SO_TIMESTAMPNS,
1739 TRUE, &err)) {
1740 GST_WARNING_OBJECT (src,
1741 "Failed to enable socket control message timestamps: %s",
1742 err->message);
1743 g_clear_error (&err);
1744 src->socket_timestamp_mode = GST_SOCKET_TIMESTAMP_MODE_DISABLED;
1745 g_object_notify (G_OBJECT (src), "socket-timestamp");
1746 } else {
1747 GST_LOG_OBJECT (src, "Socket control message timestamps enabled");
1748 }
1749 }
1750 #else
1751 GST_WARNING_OBJECT (src,
1752 "socket-timestamp was requested but SO_TIMESTAMPNS is not defined");
1753 }
1754 #endif
1755
1756 /* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
1757 * follows ss_family on both */
1758 {
1759 GInetSocketAddress *addr;
1760 guint16 port;
1761
1762 addr =
1763 G_INET_SOCKET_ADDRESS (g_socket_get_local_address (src->used_socket,
1764 &err));
1765 if (!addr)
1766 goto getsockname_error;
1767
1768 port = g_inet_socket_address_get_port (addr);
1769 GST_DEBUG_OBJECT (src, "bound, on port %d", port);
1770 if (port != src->port) {
1771 src->port = port;
1772 GST_DEBUG_OBJECT (src, "notifying port %d", port);
1773 g_object_notify (G_OBJECT (src), "port");
1774 }
1775 g_object_unref (addr);
1776 }
1777
1778 return TRUE;
1779
1780 /* ERRORS */
1781 name_resolve:
1782 {
1783 return FALSE;
1784 }
1785 no_socket:
1786 {
1787 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
1788 ("no socket error: %s", err->message));
1789 g_clear_error (&err);
1790 g_object_unref (addr);
1791 return FALSE;
1792 }
1793 bind_error:
1794 {
1795 GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
1796 ("bind failed: %s", err->message));
1797 g_clear_error (&err);
1798 g_object_unref (bind_saddr);
1799 gst_udpsrc_close (src);
1800 return FALSE;
1801 }
1802 membership:
1803 {
1804 GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
1805 ("could not add membership: %s", err->message));
1806 g_clear_error (&err);
1807 gst_udpsrc_close (src);
1808 return FALSE;
1809 }
1810 getsockname_error:
1811 {
1812 GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
1813 ("getsockname failed: %s", err->message));
1814 g_clear_error (&err);
1815 gst_udpsrc_close (src);
1816 return FALSE;
1817 }
1818 }
1819
1820 static gboolean
gst_udpsrc_unlock(GstBaseSrc * bsrc)1821 gst_udpsrc_unlock (GstBaseSrc * bsrc)
1822 {
1823 GstUDPSrc *src;
1824
1825 src = GST_UDPSRC (bsrc);
1826
1827 GST_LOG_OBJECT (src, "Flushing");
1828 g_cancellable_cancel (src->cancellable);
1829
1830 return TRUE;
1831 }
1832
1833 static gboolean
gst_udpsrc_unlock_stop(GstBaseSrc * bsrc)1834 gst_udpsrc_unlock_stop (GstBaseSrc * bsrc)
1835 {
1836 GstUDPSrc *src;
1837
1838 src = GST_UDPSRC (bsrc);
1839
1840 GST_LOG_OBJECT (src, "No longer flushing");
1841
1842 gst_udpsrc_free_cancellable (src);
1843 gst_udpsrc_create_cancellable (src);
1844
1845 return TRUE;
1846 }
1847
1848 static gboolean
gst_udpsrc_close(GstUDPSrc * src)1849 gst_udpsrc_close (GstUDPSrc * src)
1850 {
1851 GST_DEBUG ("closing sockets");
1852
1853 if (src->used_socket) {
1854 if (src->auto_multicast
1855 &&
1856 g_inet_address_get_is_multicast (g_inet_socket_address_get_address
1857 (src->addr))) {
1858 GError *err = NULL;
1859
1860 if (src->multi_iface) {
1861 GStrv multi_ifaces = g_strsplit (src->multi_iface, ",", -1);
1862 gchar **ifaces = multi_ifaces;
1863 while (*ifaces) {
1864 g_strstrip (*ifaces);
1865 GST_DEBUG_OBJECT (src, "leaving multicast group %s interface %s",
1866 src->address, *ifaces);
1867 if (!g_socket_leave_multicast_group (src->used_socket,
1868 g_inet_socket_address_get_address (src->addr),
1869 FALSE, *ifaces, &err)) {
1870 GST_ERROR_OBJECT (src, "Failed to leave multicast group: %s",
1871 err->message);
1872 g_clear_error (&err);
1873 }
1874 ifaces++;
1875 }
1876 g_strfreev (multi_ifaces);
1877
1878 } else {
1879 GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->address);
1880 if (!g_socket_leave_multicast_group (src->used_socket,
1881 g_inet_socket_address_get_address (src->addr), FALSE,
1882 NULL, &err)) {
1883 GST_ERROR_OBJECT (src, "Failed to leave multicast group: %s",
1884 err->message);
1885 g_clear_error (&err);
1886 }
1887 }
1888 }
1889
1890 if (src->close_socket || !src->external_socket) {
1891 GError *err = NULL;
1892 if (!g_socket_close (src->used_socket, &err)) {
1893 GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
1894 g_clear_error (&err);
1895 }
1896 }
1897
1898 g_object_unref (src->used_socket);
1899 src->used_socket = NULL;
1900 g_object_unref (src->addr);
1901 src->addr = NULL;
1902 }
1903
1904 gst_udpsrc_free_cancellable (src);
1905
1906 return TRUE;
1907 }
1908
1909
1910 static GstStateChangeReturn
gst_udpsrc_change_state(GstElement * element,GstStateChange transition)1911 gst_udpsrc_change_state (GstElement * element, GstStateChange transition)
1912 {
1913 GstUDPSrc *src;
1914 GstStateChangeReturn result;
1915
1916 src = GST_UDPSRC (element);
1917
1918 switch (transition) {
1919 case GST_STATE_CHANGE_NULL_TO_READY:
1920 if (!gst_udpsrc_open (src))
1921 goto open_failed;
1922 break;
1923 default:
1924 break;
1925 }
1926 if ((result =
1927 GST_ELEMENT_CLASS (parent_class)->change_state (element,
1928 transition)) == GST_STATE_CHANGE_FAILURE)
1929 goto failure;
1930
1931 switch (transition) {
1932 case GST_STATE_CHANGE_READY_TO_NULL:
1933 gst_udpsrc_close (src);
1934 break;
1935 default:
1936 break;
1937 }
1938 return result;
1939 /* ERRORS */
1940 open_failed:
1941 {
1942 GST_DEBUG_OBJECT (src, "failed to open socket");
1943 return GST_STATE_CHANGE_FAILURE;
1944 }
1945 failure:
1946 {
1947 GST_DEBUG_OBJECT (src, "parent failed state change");
1948 return result;
1949 }
1950 }
1951
1952
1953
1954
1955 /*** GSTURIHANDLER INTERFACE *************************************************/
1956
1957 static GstURIType
gst_udpsrc_uri_get_type(GType type)1958 gst_udpsrc_uri_get_type (GType type)
1959 {
1960 return GST_URI_SRC;
1961 }
1962
1963 static const gchar *const *
gst_udpsrc_uri_get_protocols(GType type)1964 gst_udpsrc_uri_get_protocols (GType type)
1965 {
1966 static const gchar *protocols[] = { "udp", NULL };
1967
1968 return protocols;
1969 }
1970
1971 static gchar *
gst_udpsrc_uri_get_uri(GstURIHandler * handler)1972 gst_udpsrc_uri_get_uri (GstURIHandler * handler)
1973 {
1974 GstUDPSrc *src = GST_UDPSRC (handler);
1975
1976 return g_strdup (src->uri);
1977 }
1978
1979 static gboolean
gst_udpsrc_uri_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)1980 gst_udpsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri,
1981 GError ** error)
1982 {
1983 return gst_udpsrc_set_uri (GST_UDPSRC (handler), uri, error);
1984 }
1985
1986 static void
gst_udpsrc_uri_handler_init(gpointer g_iface,gpointer iface_data)1987 gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data)
1988 {
1989 GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1990
1991 iface->get_type = gst_udpsrc_uri_get_type;
1992 iface->get_protocols = gst_udpsrc_uri_get_protocols;
1993 iface->get_uri = gst_udpsrc_uri_get_uri;
1994 iface->set_uri = gst_udpsrc_uri_set_uri;
1995 }
1996