1 /* GStreamer
2 * Copyright (C) 2018, Collabora Ltd.
3 * Copyright (C) 2018, SK Telecom, Co., Ltd.
4 * Author: Jeongseok Kim <jeongseok.kim@sk.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21
22 /**
23 * SECTION:element-srtsrc
24 * @title: srtsrc
25 *
26 * srtsrc is a network source that reads <ulink url="http://www.srtalliance.org/">SRT</ulink>
27 * packets from the network.
28 *
29 * <refsect2>
30 * <title>Examples</title>
31 * |[
32 * gst-launch-1.0 -v srtsrc uri="srt://127.0.0.1:7001" ! fakesink
33 * ]| This pipeline shows how to connect SRT server by setting #GstSRTSrc:uri property.
34 *
35 * |[
36 * gst-launch-1.0 -v srtsrc uri="srt://:7001?mode=listener" ! fakesink
37 * ]| This pipeline shows how to wait SRT connection by setting #GstSRTSrc:uri property.
38 *
39 * |[
40 * gst-launch-1.0 -v srtclientsrc uri="srt://192.168.1.10:7001?mode=rendez-vous" ! fakesink
41 * ]| This pipeline shows how to connect SRT server by setting #GstSRTSrc:uri property and using the rendez-vous mode.
42 * </refsect2>
43 *
44 */
45
46 #ifdef HAVE_CONFIG_H
47 #include <config.h>
48 #endif
49
50 #include "gstsrtsrc.h"
51
52 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
53 GST_PAD_SRC,
54 GST_PAD_ALWAYS,
55 GST_STATIC_CAPS_ANY);
56
57 #define GST_CAT_DEFAULT gst_debug_srt_src
58 GST_DEBUG_CATEGORY (GST_CAT_DEFAULT);
59
60 enum
61 {
62 SIG_CALLER_ADDED,
63 SIG_CALLER_REMOVED,
64
65 LAST_SIGNAL
66 };
67
68 static guint signals[LAST_SIGNAL] = { 0 };
69
70 static void gst_srt_src_uri_handler_init (gpointer g_iface,
71 gpointer iface_data);
72 static gchar *gst_srt_src_uri_get_uri (GstURIHandler * handler);
73 static gboolean gst_srt_src_uri_set_uri (GstURIHandler * handler,
74 const gchar * uri, GError ** error);
75
76 #define gst_srt_src_parent_class parent_class
77 G_DEFINE_TYPE_WITH_CODE (GstSRTSrc, gst_srt_src,
78 GST_TYPE_PUSH_SRC,
79 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_srt_src_uri_handler_init)
80 GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, "srtsrc", 0, "SRT Source"));
81
82 static void
gst_srt_src_caller_added_cb(int sock,GSocketAddress * addr,GstSRTObject * srtobject)83 gst_srt_src_caller_added_cb (int sock, GSocketAddress * addr,
84 GstSRTObject * srtobject)
85 {
86 g_signal_emit (srtobject->element, signals[SIG_CALLER_ADDED], 0, sock, addr);
87 }
88
89 static void
gst_srt_src_caller_removed_cb(int sock,GSocketAddress * addr,GstSRTObject * srtobject)90 gst_srt_src_caller_removed_cb (int sock, GSocketAddress * addr,
91 GstSRTObject * srtobject)
92 {
93 g_signal_emit (srtobject->element, signals[SIG_CALLER_REMOVED], 0, sock,
94 addr);
95 }
96
97 static gboolean
gst_srt_src_start(GstBaseSrc * bsrc)98 gst_srt_src_start (GstBaseSrc * bsrc)
99 {
100 GstSRTSrc *self = GST_SRT_SRC (bsrc);
101 GError *error = NULL;
102 gboolean ret = FALSE;
103 GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
104
105 gst_structure_get_enum (self->srtobject->parameters, "mode",
106 GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
107
108 if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
109 ret =
110 gst_srt_object_open_full (self->srtobject, gst_srt_src_caller_added_cb,
111 gst_srt_src_caller_removed_cb, self->cancellable, &error);
112 } else {
113 ret = gst_srt_object_open (self->srtobject, self->cancellable, &error);
114 }
115
116 if (!ret) {
117 /* ensure error is posted since state change will fail */
118 GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ, (NULL),
119 ("Failed to open SRT: %s", error->message));
120 g_clear_error (&error);
121 }
122
123 return ret;
124 }
125
126 static gboolean
gst_srt_src_stop(GstBaseSrc * bsrc)127 gst_srt_src_stop (GstBaseSrc * bsrc)
128 {
129 GstSRTSrc *self = GST_SRT_SRC (bsrc);
130
131 gst_srt_object_close (self->srtobject);
132
133 return TRUE;
134 }
135
136 static GstFlowReturn
gst_srt_src_fill(GstPushSrc * src,GstBuffer * outbuf)137 gst_srt_src_fill (GstPushSrc * src, GstBuffer * outbuf)
138 {
139 GstSRTSrc *self = GST_SRT_SRC (src);
140 GstFlowReturn ret = GST_FLOW_OK;
141 GstMapInfo info;
142 GError *err = NULL;
143 gssize recv_len;
144
145 if (g_cancellable_is_cancelled (self->cancellable)) {
146 ret = GST_FLOW_FLUSHING;
147 }
148
149 if (!gst_buffer_map (outbuf, &info, GST_MAP_WRITE)) {
150 GST_ELEMENT_ERROR (src, RESOURCE, READ,
151 ("Could not map the buffer for writing "), (NULL));
152 ret = GST_FLOW_ERROR;
153 goto out;
154 }
155
156 recv_len = gst_srt_object_read (self->srtobject, info.data,
157 gst_buffer_get_size (outbuf), self->cancellable, &err);
158
159 gst_buffer_unmap (outbuf, &info);
160
161 if (g_cancellable_is_cancelled (self->cancellable)) {
162 ret = GST_FLOW_FLUSHING;
163 goto out;
164 }
165
166 if (recv_len < 0) {
167 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("%s", err->message));
168 ret = GST_FLOW_ERROR;
169 g_clear_error (&err);
170 goto out;
171 } else if (recv_len == 0) {
172 ret = GST_FLOW_EOS;
173 goto out;
174 }
175
176 gst_buffer_resize (outbuf, 0, recv_len);
177
178 GST_LOG_OBJECT (src,
179 "filled buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
180 GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
181 ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
182 gst_buffer_get_size (outbuf),
183 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
184 GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
185 GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
186
187 out:
188 return ret;
189 }
190
191 static void
gst_srt_src_init(GstSRTSrc * self)192 gst_srt_src_init (GstSRTSrc * self)
193 {
194 self->srtobject = gst_srt_object_new (GST_ELEMENT (self));
195 self->cancellable = g_cancellable_new ();
196
197 gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME);
198 gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
199 gst_base_src_set_do_timestamp (GST_BASE_SRC (self), TRUE);
200
201 gst_srt_object_set_uri (self->srtobject, GST_SRT_DEFAULT_URI, NULL);
202
203 }
204
205 static void
gst_srt_src_finalize(GObject * object)206 gst_srt_src_finalize (GObject * object)
207 {
208 GstSRTSrc *self = GST_SRT_SRC (object);
209
210 g_clear_object (&self->cancellable);
211 gst_srt_object_destroy (self->srtobject);
212
213 G_OBJECT_CLASS (parent_class)->finalize (object);
214 }
215
216 static gboolean
gst_srt_src_unlock(GstBaseSrc * bsrc)217 gst_srt_src_unlock (GstBaseSrc * bsrc)
218 {
219 GstSRTSrc *self = GST_SRT_SRC (bsrc);
220
221 gst_srt_object_wakeup (self->srtobject, self->cancellable);
222
223 return TRUE;
224 }
225
226 static gboolean
gst_srt_src_unlock_stop(GstBaseSrc * bsrc)227 gst_srt_src_unlock_stop (GstBaseSrc * bsrc)
228 {
229 GstSRTSrc *self = GST_SRT_SRC (bsrc);
230
231 g_cancellable_reset (self->cancellable);
232
233 return TRUE;
234 }
235
236 static void
gst_srt_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)237 gst_srt_src_set_property (GObject * object,
238 guint prop_id, const GValue * value, GParamSpec * pspec)
239 {
240 GstSRTSrc *self = GST_SRT_SRC (object);
241
242 if (!gst_srt_object_set_property_helper (self->srtobject, prop_id, value,
243 pspec)) {
244 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
245 }
246 }
247
248 static void
gst_srt_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)249 gst_srt_src_get_property (GObject * object,
250 guint prop_id, GValue * value, GParamSpec * pspec)
251 {
252 GstSRTSrc *self = GST_SRT_SRC (object);
253
254 if (!gst_srt_object_get_property_helper (self->srtobject, prop_id, value,
255 pspec)) {
256 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
257 }
258 }
259
260 static void
gst_srt_src_class_init(GstSRTSrcClass * klass)261 gst_srt_src_class_init (GstSRTSrcClass * klass)
262 {
263 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
264 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
265 GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
266 GstPushSrcClass *gstpushsrc_class = GST_PUSH_SRC_CLASS (klass);
267
268 gobject_class->set_property = gst_srt_src_set_property;
269 gobject_class->get_property = gst_srt_src_get_property;
270 gobject_class->finalize = gst_srt_src_finalize;
271
272 /**
273 * GstSRTSrc::caller-added:
274 * @gstsrtsink: the srtsink element that emitted this signal
275 * @sock: the client socket descriptor that was added to srtsink
276 * @addr: the #GSocketAddress that describes the @sock
277 *
278 * The given socket descriptor was added to srtsink.
279 */
280 signals[SIG_CALLER_ADDED] =
281 g_signal_new ("caller-added", G_TYPE_FROM_CLASS (klass),
282 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSRTSrcClass, caller_added),
283 NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE,
284 2, G_TYPE_INT, G_TYPE_SOCKET_ADDRESS);
285
286 /**
287 * GstSRTSrc::caller-removed:
288 * @gstsrtsink: the srtsink element that emitted this signal
289 * @sock: the client socket descriptor that was added to srtsink
290 * @addr: the #GSocketAddress that describes the @sock
291 *
292 * The given socket descriptor was removed from srtsink.
293 */
294 signals[SIG_CALLER_REMOVED] =
295 g_signal_new ("caller-removed", G_TYPE_FROM_CLASS (klass),
296 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSRTSrcClass,
297 caller_added), NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE,
298 2, G_TYPE_INT, G_TYPE_SOCKET_ADDRESS);
299
300 gst_srt_object_install_properties_helper (gobject_class);
301
302 gst_element_class_add_static_pad_template (gstelement_class, &src_template);
303 gst_element_class_set_metadata (gstelement_class,
304 "SRT source", "Source/Network",
305 "Receive data over the network via SRT",
306 "Justin Kim <justin.joy.9to5@gmail.com>");
307
308 gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_srt_src_start);
309 gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_srt_src_stop);
310 gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_srt_src_unlock);
311 gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_srt_src_unlock_stop);
312
313 gstpushsrc_class->fill = GST_DEBUG_FUNCPTR (gst_srt_src_fill);
314 }
315
316 static GstURIType
gst_srt_src_uri_get_type(GType type)317 gst_srt_src_uri_get_type (GType type)
318 {
319 return GST_URI_SRC;
320 }
321
322 static const gchar *const *
gst_srt_src_uri_get_protocols(GType type)323 gst_srt_src_uri_get_protocols (GType type)
324 {
325 static const gchar *protocols[] = { GST_SRT_DEFAULT_URI_SCHEME, NULL };
326
327 return protocols;
328 }
329
330 static gchar *
gst_srt_src_uri_get_uri(GstURIHandler * handler)331 gst_srt_src_uri_get_uri (GstURIHandler * handler)
332 {
333 gchar *uri_str;
334 GstSRTSrc *self = GST_SRT_SRC (handler);
335
336 GST_OBJECT_LOCK (self);
337 uri_str = gst_uri_to_string (self->srtobject->uri);
338 GST_OBJECT_UNLOCK (self);
339
340 return uri_str;
341 }
342
343 static gboolean
gst_srt_src_uri_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)344 gst_srt_src_uri_set_uri (GstURIHandler * handler,
345 const gchar * uri, GError ** error)
346 {
347 GstSRTSrc *self = GST_SRT_SRC (handler);
348
349 return gst_srt_object_set_uri (self->srtobject, uri, error);
350 }
351
352 static void
gst_srt_src_uri_handler_init(gpointer g_iface,gpointer iface_data)353 gst_srt_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
354 {
355 GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
356
357 iface->get_type = gst_srt_src_uri_get_type;
358 iface->get_protocols = gst_srt_src_uri_get_protocols;
359 iface->get_uri = gst_srt_src_uri_get_uri;
360 iface->set_uri = gst_srt_src_uri_set_uri;
361 }
362