1 /* GStreamer
2 * Copyright (C) <2009> Collabora Ltd
3 * @author: Olivier Crete <olivier.crete@collabora.co.uk
4 * Copyright (C) <2009> Nokia Inc
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21 /**
22 * SECTION:element-shmsrc
23 * @title: shmsrc
24 *
25 * Receive data from the shared memory sink.
26 *
27 * ## Example launch lines
28 * |[
29 * gst-launch-1.0 shmsrc socket-path=/tmp/blah ! \
30 * "video/x-raw, format=YUY2, color-matrix=sdtv, \
31 * chroma-site=mpeg2, width=(int)320, height=(int)240, framerate=(fraction)30/1" \
32 * ! queue ! videoconvert ! autovideosink
33 * ]| Render video from shm buffers.
34 *
35 */
36
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40
41 #include "gstshmsrc.h"
42
43 #include <gst/gst.h>
44
45 #include <string.h>
46
47 /* signals */
48 enum
49 {
50 LAST_SIGNAL
51 };
52
53 /* properties */
54 enum
55 {
56 PROP_0,
57 PROP_SOCKET_PATH,
58 PROP_IS_LIVE,
59 PROP_SHM_AREA_NAME
60 };
61
62 struct GstShmBuffer
63 {
64 char *buf;
65 GstShmPipe *pipe;
66 };
67
68
69 GST_DEBUG_CATEGORY_STATIC (shmsrc_debug);
70 #define GST_CAT_DEFAULT shmsrc_debug
71
72 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
73 GST_PAD_SRC,
74 GST_PAD_ALWAYS,
75 GST_STATIC_CAPS_ANY);
76
77 #define gst_shm_src_parent_class parent_class
78 G_DEFINE_TYPE (GstShmSrc, gst_shm_src, GST_TYPE_PUSH_SRC);
79 GST_ELEMENT_REGISTER_DEFINE (shmsrc, "shmsrc", GST_RANK_NONE, GST_TYPE_SHM_SRC);
80
81 static void gst_shm_src_set_property (GObject * object, guint prop_id,
82 const GValue * value, GParamSpec * pspec);
83 static void gst_shm_src_get_property (GObject * object, guint prop_id,
84 GValue * value, GParamSpec * pspec);
85 static void gst_shm_src_finalize (GObject * object);
86 static gboolean gst_shm_src_start (GstBaseSrc * bsrc);
87 static gboolean gst_shm_src_stop (GstBaseSrc * bsrc);
88 static GstFlowReturn gst_shm_src_create (GstPushSrc * psrc,
89 GstBuffer ** outbuf);
90 static gboolean gst_shm_src_unlock (GstBaseSrc * bsrc);
91 static gboolean gst_shm_src_unlock_stop (GstBaseSrc * bsrc);
92 static GstStateChangeReturn gst_shm_src_change_state (GstElement * element,
93 GstStateChange transition);
94
95 static void gst_shm_pipe_dec (GstShmPipe * pipe);
96
97 static void
gst_shm_src_class_init(GstShmSrcClass * klass)98 gst_shm_src_class_init (GstShmSrcClass * klass)
99 {
100 GObjectClass *gobject_class;
101 GstElementClass *gstelement_class;
102 GstBaseSrcClass *gstbasesrc_class;
103 GstPushSrcClass *gstpush_src_class;
104
105 gobject_class = (GObjectClass *) klass;
106 gstelement_class = (GstElementClass *) klass;
107 gstbasesrc_class = (GstBaseSrcClass *) klass;
108 gstpush_src_class = (GstPushSrcClass *) klass;
109
110 gobject_class->set_property = gst_shm_src_set_property;
111 gobject_class->get_property = gst_shm_src_get_property;
112 gobject_class->finalize = gst_shm_src_finalize;
113
114 gstelement_class->change_state = gst_shm_src_change_state;
115
116 gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_shm_src_start);
117 gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_shm_src_stop);
118 gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_src_unlock);
119 gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_src_unlock_stop);
120
121 gstpush_src_class->create = gst_shm_src_create;
122
123 g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
124 g_param_spec_string ("socket-path",
125 "Path to the control socket",
126 "The path to the control socket used to control the shared memory",
127 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
128
129 g_object_class_install_property (gobject_class, PROP_IS_LIVE,
130 g_param_spec_boolean ("is-live", "Is this a live source",
131 "True if the element cannot produce data in PAUSED", FALSE,
132 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
133
134 g_object_class_install_property (gobject_class, PROP_SHM_AREA_NAME,
135 g_param_spec_string ("shm-area-name",
136 "Name of the shared memory area",
137 "The name of the shared memory area used to get buffers",
138 NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
139
140 gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
141
142 gst_element_class_set_static_metadata (gstelement_class,
143 "Shared Memory Source",
144 "Source",
145 "Receive data from the shared memory sink",
146 "Olivier Crete <olivier.crete@collabora.co.uk>");
147
148 GST_DEBUG_CATEGORY_INIT (shmsrc_debug, "shmsrc", 0, "Shared Memory Source");
149 }
150
151 static void
gst_shm_src_init(GstShmSrc * self)152 gst_shm_src_init (GstShmSrc * self)
153 {
154 self->poll = gst_poll_new (TRUE);
155 gst_poll_fd_init (&self->pollfd);
156 }
157
158 static void
gst_shm_src_finalize(GObject * object)159 gst_shm_src_finalize (GObject * object)
160 {
161 GstShmSrc *self = GST_SHM_SRC (object);
162
163 gst_poll_free (self->poll);
164 g_free (self->socket_path);
165
166 G_OBJECT_CLASS (parent_class)->finalize (object);
167 }
168
169
170 static void
gst_shm_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)171 gst_shm_src_set_property (GObject * object, guint prop_id,
172 const GValue * value, GParamSpec * pspec)
173 {
174 GstShmSrc *self = GST_SHM_SRC (object);
175
176 switch (prop_id) {
177 case PROP_SOCKET_PATH:
178 GST_OBJECT_LOCK (object);
179 if (self->pipe) {
180 GST_WARNING_OBJECT (object, "Can not modify socket path while the "
181 "element is playing");
182 } else {
183 g_free (self->socket_path);
184 self->socket_path = g_value_dup_string (value);
185 }
186 GST_OBJECT_UNLOCK (object);
187 break;
188 case PROP_IS_LIVE:
189 gst_base_src_set_live (GST_BASE_SRC (object),
190 g_value_get_boolean (value));
191 break;
192 default:
193 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
194 break;
195 }
196 }
197
198 static void
gst_shm_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)199 gst_shm_src_get_property (GObject * object, guint prop_id,
200 GValue * value, GParamSpec * pspec)
201 {
202 GstShmSrc *self = GST_SHM_SRC (object);
203
204 switch (prop_id) {
205 case PROP_SOCKET_PATH:
206 GST_OBJECT_LOCK (object);
207 g_value_set_string (value, self->socket_path);
208 GST_OBJECT_UNLOCK (object);
209 break;
210 case PROP_IS_LIVE:
211 g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (object)));
212 break;
213 case PROP_SHM_AREA_NAME:
214 GST_OBJECT_LOCK (object);
215 if (self->pipe)
216 g_value_set_string (value, sp_get_shm_area_name (self->pipe->pipe));
217 GST_OBJECT_UNLOCK (object);
218 break;
219 default:
220 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
221 break;
222 }
223 }
224
225 static gboolean
gst_shm_src_start_reading(GstShmSrc * self)226 gst_shm_src_start_reading (GstShmSrc * self)
227 {
228 GstShmPipe *gstpipe;
229
230 if (!self->socket_path) {
231 GST_ELEMENT_ERROR (self, RESOURCE, NOT_FOUND,
232 ("No path specified for socket."), (NULL));
233 return FALSE;
234 }
235
236 gstpipe = g_slice_new0 (GstShmPipe);
237 gstpipe->use_count = 1;
238 gstpipe->src = gst_object_ref (self);
239
240 GST_DEBUG_OBJECT (self, "Opening socket %s", self->socket_path);
241
242 GST_OBJECT_LOCK (self);
243 gstpipe->pipe = sp_client_open (self->socket_path);
244 GST_OBJECT_UNLOCK (self);
245
246 if (!gstpipe->pipe) {
247 GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE,
248 ("Could not open socket %s: %d %s", self->socket_path, errno,
249 strerror (errno)), (NULL));
250 gst_shm_pipe_dec (gstpipe);
251 return FALSE;
252 }
253
254 self->pipe = gstpipe;
255
256 self->unlocked = FALSE;
257 gst_poll_set_flushing (self->poll, FALSE);
258
259 gst_poll_fd_init (&self->pollfd);
260 self->pollfd.fd = sp_get_fd (self->pipe->pipe);
261 gst_poll_add_fd (self->poll, &self->pollfd);
262 gst_poll_fd_ctl_read (self->poll, &self->pollfd, TRUE);
263
264 return TRUE;
265 }
266
267 static void
gst_shm_src_stop_reading(GstShmSrc * self)268 gst_shm_src_stop_reading (GstShmSrc * self)
269 {
270 GstShmPipe *pipe;
271
272 GST_DEBUG_OBJECT (self, "Stopping %p", self);
273
274 GST_OBJECT_LOCK (self);
275 pipe = self->pipe;
276 self->pipe = NULL;
277 GST_OBJECT_UNLOCK (self);
278
279 if (pipe) {
280 gst_shm_pipe_dec (pipe);
281 }
282 gst_poll_set_flushing (self->poll, TRUE);
283 }
284
285 static gboolean
gst_shm_src_start(GstBaseSrc * bsrc)286 gst_shm_src_start (GstBaseSrc * bsrc)
287 {
288 if (gst_base_src_is_live (bsrc))
289 return TRUE;
290 else
291 return gst_shm_src_start_reading (GST_SHM_SRC (bsrc));
292 }
293
294 static gboolean
gst_shm_src_stop(GstBaseSrc * bsrc)295 gst_shm_src_stop (GstBaseSrc * bsrc)
296 {
297 if (!gst_base_src_is_live (bsrc))
298 gst_shm_src_stop_reading (GST_SHM_SRC (bsrc));
299
300 return TRUE;
301 }
302
303
304 static void
free_buffer(gpointer data)305 free_buffer (gpointer data)
306 {
307 struct GstShmBuffer *gsb = data;
308 g_return_if_fail (gsb->pipe != NULL);
309 g_return_if_fail (gsb->pipe->src != NULL);
310
311 GST_LOG ("Freeing buffer %p", gsb->buf);
312
313 GST_OBJECT_LOCK (gsb->pipe->src);
314 sp_client_recv_finish (gsb->pipe->pipe, gsb->buf);
315 GST_OBJECT_UNLOCK (gsb->pipe->src);
316
317 gst_shm_pipe_dec (gsb->pipe);
318
319 g_slice_free (struct GstShmBuffer, gsb);
320 }
321
322 static GstFlowReturn
gst_shm_src_create(GstPushSrc * psrc,GstBuffer ** outbuf)323 gst_shm_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
324 {
325 GstShmSrc *self = GST_SHM_SRC (psrc);
326 GstShmPipe *pipe;
327 gchar *buf = NULL;
328 int rv = 0;
329 struct GstShmBuffer *gsb;
330
331 GST_DEBUG_OBJECT (self, "Stopping %p", self);
332
333 GST_OBJECT_LOCK (self);
334 pipe = self->pipe;
335 if (!pipe) {
336 GST_OBJECT_UNLOCK (self);
337 return GST_FLOW_FLUSHING;
338 } else {
339 pipe->use_count++;
340 }
341 GST_OBJECT_UNLOCK (self);
342
343 do {
344 if (gst_poll_wait (self->poll, GST_CLOCK_TIME_NONE) < 0) {
345 if (errno == EBUSY)
346 goto flushing;
347 GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
348 ("Poll failed on fd: %s", strerror (errno)));
349 goto error;
350 }
351
352 if (self->unlocked)
353 goto flushing;
354
355 if (gst_poll_fd_has_closed (self->poll, &self->pollfd)) {
356 GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
357 ("Control socket has closed"));
358 goto error;
359 }
360
361 if (gst_poll_fd_has_error (self->poll, &self->pollfd)) {
362 GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
363 ("Control socket has error"));
364 goto error;
365 }
366
367 if (gst_poll_fd_can_read (self->poll, &self->pollfd)) {
368 buf = NULL;
369 GST_LOG_OBJECT (self, "Reading from pipe");
370 GST_OBJECT_LOCK (self);
371 rv = sp_client_recv (pipe->pipe, &buf);
372 GST_OBJECT_UNLOCK (self);
373 if (rv < 0) {
374 GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
375 ("Error reading control data: %d", rv));
376 goto error;
377 }
378 }
379 } while (buf == NULL);
380
381 GST_LOG_OBJECT (self, "Got buffer %p of size %d", buf, rv);
382
383 gsb = g_slice_new0 (struct GstShmBuffer);
384 gsb->buf = buf;
385 gsb->pipe = pipe;
386
387 *outbuf = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY,
388 buf, rv, 0, rv, gsb, free_buffer);
389
390 return GST_FLOW_OK;
391
392 error:
393 gst_shm_pipe_dec (pipe);
394 return GST_FLOW_ERROR;
395 flushing:
396 gst_shm_pipe_dec (pipe);
397 return GST_FLOW_FLUSHING;
398 }
399
400 static GstStateChangeReturn
gst_shm_src_change_state(GstElement * element,GstStateChange transition)401 gst_shm_src_change_state (GstElement * element, GstStateChange transition)
402 {
403 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
404 GstShmSrc *self = GST_SHM_SRC (element);
405
406 switch (transition) {
407 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
408 if (gst_base_src_is_live (GST_BASE_SRC (element))) {
409 if (!gst_shm_src_start_reading (self))
410 return GST_STATE_CHANGE_FAILURE;
411 }
412 default:
413 break;
414 }
415
416 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
417 if (ret == GST_STATE_CHANGE_FAILURE)
418 return ret;
419
420 switch (transition) {
421 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
422 if (gst_base_src_is_live (GST_BASE_SRC (element))) {
423 gst_shm_src_unlock (GST_BASE_SRC (element));
424 gst_shm_src_stop_reading (self);
425 }
426 default:
427 break;
428 }
429
430 return ret;
431 }
432
433 static gboolean
gst_shm_src_unlock(GstBaseSrc * bsrc)434 gst_shm_src_unlock (GstBaseSrc * bsrc)
435 {
436 GstShmSrc *self = GST_SHM_SRC (bsrc);
437
438 self->unlocked = TRUE;
439 gst_poll_set_flushing (self->poll, TRUE);
440
441 return TRUE;
442 }
443
444 static gboolean
gst_shm_src_unlock_stop(GstBaseSrc * bsrc)445 gst_shm_src_unlock_stop (GstBaseSrc * bsrc)
446 {
447 GstShmSrc *self = GST_SHM_SRC (bsrc);
448
449 self->unlocked = FALSE;
450 gst_poll_set_flushing (self->poll, FALSE);
451
452 return TRUE;
453 }
454
455 static void
gst_shm_pipe_dec(GstShmPipe * pipe)456 gst_shm_pipe_dec (GstShmPipe * pipe)
457 {
458 g_return_if_fail (pipe);
459 g_return_if_fail (pipe->src);
460 g_return_if_fail (pipe->use_count > 0);
461
462 GST_OBJECT_LOCK (pipe->src);
463 pipe->use_count--;
464
465 if (pipe->use_count > 0) {
466 GST_OBJECT_UNLOCK (pipe->src);
467 return;
468 }
469
470 if (pipe->pipe)
471 sp_client_close (pipe->pipe);
472
473 gst_poll_remove_fd (pipe->src->poll, &pipe->src->pollfd);
474 gst_poll_fd_init (&pipe->src->pollfd);
475
476 GST_OBJECT_UNLOCK (pipe->src);
477
478 gst_object_unref (pipe->src);
479 g_slice_free (GstShmPipe, pipe);
480 }
481