• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <2009> Collabora Ltd
3  *  @author: Olivier Crete <olivier.crete@collabora.co.uk
4  * Copyright (C) <2009> Nokia Inc
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:element-shmsrc
23  * @title: shmsrc
24  *
25  * Receive data from the shared memory sink.
26  *
27  * ## Example launch lines
28  * |[
29  * gst-launch-1.0 shmsrc socket-path=/tmp/blah ! \
30  * "video/x-raw, format=YUY2, color-matrix=sdtv, \
31  * chroma-site=mpeg2, width=(int)320, height=(int)240, framerate=(fraction)30/1" \
32  * ! queue ! videoconvert ! autovideosink
33  * ]| Render video from shm buffers.
34  *
35  */
36 
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40 
41 #include "gstshmsrc.h"
42 
43 #include <gst/gst.h>
44 
45 #include <string.h>
46 
47 /* signals */
48 enum
49 {
50   LAST_SIGNAL
51 };
52 
53 /* properties */
54 enum
55 {
56   PROP_0,
57   PROP_SOCKET_PATH,
58   PROP_IS_LIVE,
59   PROP_SHM_AREA_NAME
60 };
61 
62 struct GstShmBuffer
63 {
64   char *buf;
65   GstShmPipe *pipe;
66 };
67 
68 
69 GST_DEBUG_CATEGORY_STATIC (shmsrc_debug);
70 #define GST_CAT_DEFAULT shmsrc_debug
71 
72 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
73     GST_PAD_SRC,
74     GST_PAD_ALWAYS,
75     GST_STATIC_CAPS_ANY);
76 
77 #define gst_shm_src_parent_class parent_class
78 G_DEFINE_TYPE (GstShmSrc, gst_shm_src, GST_TYPE_PUSH_SRC);
79 GST_ELEMENT_REGISTER_DEFINE (shmsrc, "shmsrc", GST_RANK_NONE, GST_TYPE_SHM_SRC);
80 
81 static void gst_shm_src_set_property (GObject * object, guint prop_id,
82     const GValue * value, GParamSpec * pspec);
83 static void gst_shm_src_get_property (GObject * object, guint prop_id,
84     GValue * value, GParamSpec * pspec);
85 static void gst_shm_src_finalize (GObject * object);
86 static gboolean gst_shm_src_start (GstBaseSrc * bsrc);
87 static gboolean gst_shm_src_stop (GstBaseSrc * bsrc);
88 static GstFlowReturn gst_shm_src_create (GstPushSrc * psrc,
89     GstBuffer ** outbuf);
90 static gboolean gst_shm_src_unlock (GstBaseSrc * bsrc);
91 static gboolean gst_shm_src_unlock_stop (GstBaseSrc * bsrc);
92 static GstStateChangeReturn gst_shm_src_change_state (GstElement * element,
93     GstStateChange transition);
94 
95 static void gst_shm_pipe_dec (GstShmPipe * pipe);
96 
97 static void
gst_shm_src_class_init(GstShmSrcClass * klass)98 gst_shm_src_class_init (GstShmSrcClass * klass)
99 {
100   GObjectClass *gobject_class;
101   GstElementClass *gstelement_class;
102   GstBaseSrcClass *gstbasesrc_class;
103   GstPushSrcClass *gstpush_src_class;
104 
105   gobject_class = (GObjectClass *) klass;
106   gstelement_class = (GstElementClass *) klass;
107   gstbasesrc_class = (GstBaseSrcClass *) klass;
108   gstpush_src_class = (GstPushSrcClass *) klass;
109 
110   gobject_class->set_property = gst_shm_src_set_property;
111   gobject_class->get_property = gst_shm_src_get_property;
112   gobject_class->finalize = gst_shm_src_finalize;
113 
114   gstelement_class->change_state = gst_shm_src_change_state;
115 
116   gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_shm_src_start);
117   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_shm_src_stop);
118   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_src_unlock);
119   gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_src_unlock_stop);
120 
121   gstpush_src_class->create = gst_shm_src_create;
122 
123   g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
124       g_param_spec_string ("socket-path",
125           "Path to the control socket",
126           "The path to the control socket used to control the shared memory",
127           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
128 
129   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
130       g_param_spec_boolean ("is-live", "Is this a live source",
131           "True if the element cannot produce data in PAUSED", FALSE,
132           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
133 
134   g_object_class_install_property (gobject_class, PROP_SHM_AREA_NAME,
135       g_param_spec_string ("shm-area-name",
136           "Name of the shared memory area",
137           "The name of the shared memory area used to get buffers",
138           NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
139 
140   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
141 
142   gst_element_class_set_static_metadata (gstelement_class,
143       "Shared Memory Source",
144       "Source",
145       "Receive data from the shared memory sink",
146       "Olivier Crete <olivier.crete@collabora.co.uk>");
147 
148   GST_DEBUG_CATEGORY_INIT (shmsrc_debug, "shmsrc", 0, "Shared Memory Source");
149 }
150 
151 static void
gst_shm_src_init(GstShmSrc * self)152 gst_shm_src_init (GstShmSrc * self)
153 {
154   self->poll = gst_poll_new (TRUE);
155   gst_poll_fd_init (&self->pollfd);
156 }
157 
158 static void
gst_shm_src_finalize(GObject * object)159 gst_shm_src_finalize (GObject * object)
160 {
161   GstShmSrc *self = GST_SHM_SRC (object);
162 
163   gst_poll_free (self->poll);
164   g_free (self->socket_path);
165 
166   G_OBJECT_CLASS (parent_class)->finalize (object);
167 }
168 
169 
170 static void
gst_shm_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)171 gst_shm_src_set_property (GObject * object, guint prop_id,
172     const GValue * value, GParamSpec * pspec)
173 {
174   GstShmSrc *self = GST_SHM_SRC (object);
175 
176   switch (prop_id) {
177     case PROP_SOCKET_PATH:
178       GST_OBJECT_LOCK (object);
179       if (self->pipe) {
180         GST_WARNING_OBJECT (object, "Can not modify socket path while the "
181             "element is playing");
182       } else {
183         g_free (self->socket_path);
184         self->socket_path = g_value_dup_string (value);
185       }
186       GST_OBJECT_UNLOCK (object);
187       break;
188     case PROP_IS_LIVE:
189       gst_base_src_set_live (GST_BASE_SRC (object),
190           g_value_get_boolean (value));
191       break;
192     default:
193       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
194       break;
195   }
196 }
197 
198 static void
gst_shm_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)199 gst_shm_src_get_property (GObject * object, guint prop_id,
200     GValue * value, GParamSpec * pspec)
201 {
202   GstShmSrc *self = GST_SHM_SRC (object);
203 
204   switch (prop_id) {
205     case PROP_SOCKET_PATH:
206       GST_OBJECT_LOCK (object);
207       g_value_set_string (value, self->socket_path);
208       GST_OBJECT_UNLOCK (object);
209       break;
210     case PROP_IS_LIVE:
211       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (object)));
212       break;
213     case PROP_SHM_AREA_NAME:
214       GST_OBJECT_LOCK (object);
215       if (self->pipe)
216         g_value_set_string (value, sp_get_shm_area_name (self->pipe->pipe));
217       GST_OBJECT_UNLOCK (object);
218       break;
219     default:
220       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
221       break;
222   }
223 }
224 
225 static gboolean
gst_shm_src_start_reading(GstShmSrc * self)226 gst_shm_src_start_reading (GstShmSrc * self)
227 {
228   GstShmPipe *gstpipe;
229 
230   if (!self->socket_path) {
231     GST_ELEMENT_ERROR (self, RESOURCE, NOT_FOUND,
232         ("No path specified for socket."), (NULL));
233     return FALSE;
234   }
235 
236   gstpipe = g_slice_new0 (GstShmPipe);
237   gstpipe->use_count = 1;
238   gstpipe->src = gst_object_ref (self);
239 
240   GST_DEBUG_OBJECT (self, "Opening socket %s", self->socket_path);
241 
242   GST_OBJECT_LOCK (self);
243   gstpipe->pipe = sp_client_open (self->socket_path);
244   GST_OBJECT_UNLOCK (self);
245 
246   if (!gstpipe->pipe) {
247     GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE,
248         ("Could not open socket %s: %d %s", self->socket_path, errno,
249             strerror (errno)), (NULL));
250     gst_shm_pipe_dec (gstpipe);
251     return FALSE;
252   }
253 
254   self->pipe = gstpipe;
255 
256   self->unlocked = FALSE;
257   gst_poll_set_flushing (self->poll, FALSE);
258 
259   gst_poll_fd_init (&self->pollfd);
260   self->pollfd.fd = sp_get_fd (self->pipe->pipe);
261   gst_poll_add_fd (self->poll, &self->pollfd);
262   gst_poll_fd_ctl_read (self->poll, &self->pollfd, TRUE);
263 
264   return TRUE;
265 }
266 
267 static void
gst_shm_src_stop_reading(GstShmSrc * self)268 gst_shm_src_stop_reading (GstShmSrc * self)
269 {
270   GstShmPipe *pipe;
271 
272   GST_DEBUG_OBJECT (self, "Stopping %p", self);
273 
274   GST_OBJECT_LOCK (self);
275   pipe = self->pipe;
276   self->pipe = NULL;
277   GST_OBJECT_UNLOCK (self);
278 
279   if (pipe) {
280     gst_shm_pipe_dec (pipe);
281   }
282   gst_poll_set_flushing (self->poll, TRUE);
283 }
284 
285 static gboolean
gst_shm_src_start(GstBaseSrc * bsrc)286 gst_shm_src_start (GstBaseSrc * bsrc)
287 {
288   if (gst_base_src_is_live (bsrc))
289     return TRUE;
290   else
291     return gst_shm_src_start_reading (GST_SHM_SRC (bsrc));
292 }
293 
294 static gboolean
gst_shm_src_stop(GstBaseSrc * bsrc)295 gst_shm_src_stop (GstBaseSrc * bsrc)
296 {
297   if (!gst_base_src_is_live (bsrc))
298     gst_shm_src_stop_reading (GST_SHM_SRC (bsrc));
299 
300   return TRUE;
301 }
302 
303 
304 static void
free_buffer(gpointer data)305 free_buffer (gpointer data)
306 {
307   struct GstShmBuffer *gsb = data;
308   g_return_if_fail (gsb->pipe != NULL);
309   g_return_if_fail (gsb->pipe->src != NULL);
310 
311   GST_LOG ("Freeing buffer %p", gsb->buf);
312 
313   GST_OBJECT_LOCK (gsb->pipe->src);
314   sp_client_recv_finish (gsb->pipe->pipe, gsb->buf);
315   GST_OBJECT_UNLOCK (gsb->pipe->src);
316 
317   gst_shm_pipe_dec (gsb->pipe);
318 
319   g_slice_free (struct GstShmBuffer, gsb);
320 }
321 
322 static GstFlowReturn
gst_shm_src_create(GstPushSrc * psrc,GstBuffer ** outbuf)323 gst_shm_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
324 {
325   GstShmSrc *self = GST_SHM_SRC (psrc);
326   GstShmPipe *pipe;
327   gchar *buf = NULL;
328   int rv = 0;
329   struct GstShmBuffer *gsb;
330 
331   GST_DEBUG_OBJECT (self, "Stopping %p", self);
332 
333   GST_OBJECT_LOCK (self);
334   pipe = self->pipe;
335   if (!pipe) {
336     GST_OBJECT_UNLOCK (self);
337     return GST_FLOW_FLUSHING;
338   } else {
339     pipe->use_count++;
340   }
341   GST_OBJECT_UNLOCK (self);
342 
343   do {
344     if (gst_poll_wait (self->poll, GST_CLOCK_TIME_NONE) < 0) {
345       if (errno == EBUSY)
346         goto flushing;
347       GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
348           ("Poll failed on fd: %s", strerror (errno)));
349       goto error;
350     }
351 
352     if (self->unlocked)
353       goto flushing;
354 
355     if (gst_poll_fd_has_closed (self->poll, &self->pollfd)) {
356       GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
357           ("Control socket has closed"));
358       goto error;
359     }
360 
361     if (gst_poll_fd_has_error (self->poll, &self->pollfd)) {
362       GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
363           ("Control socket has error"));
364       goto error;
365     }
366 
367     if (gst_poll_fd_can_read (self->poll, &self->pollfd)) {
368       buf = NULL;
369       GST_LOG_OBJECT (self, "Reading from pipe");
370       GST_OBJECT_LOCK (self);
371       rv = sp_client_recv (pipe->pipe, &buf);
372       GST_OBJECT_UNLOCK (self);
373       if (rv < 0) {
374         GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsrc"),
375             ("Error reading control data: %d", rv));
376         goto error;
377       }
378     }
379   } while (buf == NULL);
380 
381   GST_LOG_OBJECT (self, "Got buffer %p of size %d", buf, rv);
382 
383   gsb = g_slice_new0 (struct GstShmBuffer);
384   gsb->buf = buf;
385   gsb->pipe = pipe;
386 
387   *outbuf = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY,
388       buf, rv, 0, rv, gsb, free_buffer);
389 
390   return GST_FLOW_OK;
391 
392 error:
393   gst_shm_pipe_dec (pipe);
394   return GST_FLOW_ERROR;
395 flushing:
396   gst_shm_pipe_dec (pipe);
397   return GST_FLOW_FLUSHING;
398 }
399 
400 static GstStateChangeReturn
gst_shm_src_change_state(GstElement * element,GstStateChange transition)401 gst_shm_src_change_state (GstElement * element, GstStateChange transition)
402 {
403   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
404   GstShmSrc *self = GST_SHM_SRC (element);
405 
406   switch (transition) {
407     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
408       if (gst_base_src_is_live (GST_BASE_SRC (element))) {
409         if (!gst_shm_src_start_reading (self))
410           return GST_STATE_CHANGE_FAILURE;
411       }
412     default:
413       break;
414   }
415 
416   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
417   if (ret == GST_STATE_CHANGE_FAILURE)
418     return ret;
419 
420   switch (transition) {
421     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
422       if (gst_base_src_is_live (GST_BASE_SRC (element))) {
423         gst_shm_src_unlock (GST_BASE_SRC (element));
424         gst_shm_src_stop_reading (self);
425       }
426     default:
427       break;
428   }
429 
430   return ret;
431 }
432 
433 static gboolean
gst_shm_src_unlock(GstBaseSrc * bsrc)434 gst_shm_src_unlock (GstBaseSrc * bsrc)
435 {
436   GstShmSrc *self = GST_SHM_SRC (bsrc);
437 
438   self->unlocked = TRUE;
439   gst_poll_set_flushing (self->poll, TRUE);
440 
441   return TRUE;
442 }
443 
444 static gboolean
gst_shm_src_unlock_stop(GstBaseSrc * bsrc)445 gst_shm_src_unlock_stop (GstBaseSrc * bsrc)
446 {
447   GstShmSrc *self = GST_SHM_SRC (bsrc);
448 
449   self->unlocked = FALSE;
450   gst_poll_set_flushing (self->poll, FALSE);
451 
452   return TRUE;
453 }
454 
455 static void
gst_shm_pipe_dec(GstShmPipe * pipe)456 gst_shm_pipe_dec (GstShmPipe * pipe)
457 {
458   g_return_if_fail (pipe);
459   g_return_if_fail (pipe->src);
460   g_return_if_fail (pipe->use_count > 0);
461 
462   GST_OBJECT_LOCK (pipe->src);
463   pipe->use_count--;
464 
465   if (pipe->use_count > 0) {
466     GST_OBJECT_UNLOCK (pipe->src);
467     return;
468   }
469 
470   if (pipe->pipe)
471     sp_client_close (pipe->pipe);
472 
473   gst_poll_remove_fd (pipe->src->poll, &pipe->src->pollfd);
474   gst_poll_fd_init (&pipe->src->pollfd);
475 
476   GST_OBJECT_UNLOCK (pipe->src);
477 
478   gst_object_unref (pipe->src);
479   g_slice_free (GstShmPipe, pipe);
480 }
481