• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <2009> Collabora Ltd
3  *  @author: Olivier Crete <olivier.crete@collabora.co.uk
4  * Copyright (C) <2009> Nokia Inc
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:element-shmsink
23  * @title: shmsink
24  *
25  * Send data over shared memory to the matching source.
26  *
27  * ## Example launch lines
28  * |[
29  * gst-launch-1.0 -v videotestsrc ! "video/x-raw, format=YUY2, color-matrix=sdtv, \
30  * chroma-site=mpeg2, width=(int)320, height=(int)240, framerate=(fraction)30/1" \
31  * ! shmsink socket-path=/tmp/blah shm-size=2000000
32  * ]| Send video to shm buffers.
33  *
34  */
35 #ifdef HAVE_CONFIG_H
36 #include "config.h"
37 #endif
38 
39 #include "gstshmsink.h"
40 
41 #include <gst/gst.h>
42 
43 #include <string.h>
44 
45 /* signals */
46 enum
47 {
48   SIGNAL_CLIENT_CONNECTED,
49   SIGNAL_CLIENT_DISCONNECTED,
50   LAST_SIGNAL
51 };
52 
53 /* properties */
54 enum
55 {
56   PROP_0,
57   PROP_SOCKET_PATH,
58   PROP_PERMS,
59   PROP_SHM_SIZE,
60   PROP_WAIT_FOR_CONNECTION,
61   PROP_BUFFER_TIME
62 };
63 
64 struct GstShmClient
65 {
66   ShmClient *client;
67   GstPollFD pollfd;
68 };
69 
70 #define DEFAULT_SIZE ( 64 * 1024 * 1024 )
71 #define DEFAULT_WAIT_FOR_CONNECTION (TRUE)
72 /* Default is user read/write, group read */
73 #define DEFAULT_PERMS ( S_IRUSR | S_IWUSR | S_IRGRP )
74 
75 
76 GST_DEBUG_CATEGORY_STATIC (shmsink_debug);
77 #define GST_CAT_DEFAULT shmsink_debug
78 
79 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
80     GST_PAD_SINK,
81     GST_PAD_ALWAYS,
82     GST_STATIC_CAPS_ANY);
83 
84 #define gst_shm_sink_parent_class parent_class
85 G_DEFINE_TYPE (GstShmSink, gst_shm_sink, GST_TYPE_BASE_SINK);
86 GST_ELEMENT_REGISTER_DEFINE (shmsink, "shmsink", GST_RANK_NONE,
87     GST_TYPE_SHM_SINK);
88 
89 static void gst_shm_sink_finalize (GObject * object);
90 static void gst_shm_sink_set_property (GObject * object, guint prop_id,
91     const GValue * value, GParamSpec * pspec);
92 static void gst_shm_sink_get_property (GObject * object, guint prop_id,
93     GValue * value, GParamSpec * pspec);
94 
95 static gboolean gst_shm_sink_start (GstBaseSink * bsink);
96 static gboolean gst_shm_sink_stop (GstBaseSink * bsink);
97 static GstFlowReturn gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf);
98 
99 static gboolean gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event);
100 static gboolean gst_shm_sink_unlock (GstBaseSink * bsink);
101 static gboolean gst_shm_sink_unlock_stop (GstBaseSink * bsink);
102 static gboolean gst_shm_sink_propose_allocation (GstBaseSink * sink,
103     GstQuery * query);
104 
105 static gpointer pollthread_func (gpointer data);
106 
107 static guint signals[LAST_SIGNAL] = { 0 };
108 
109 
110 
111 /********************
112  * CUSTOM ALLOCATOR *
113  ********************/
114 
115 #define GST_TYPE_SHM_SINK_ALLOCATOR \
116   (gst_shm_sink_allocator_get_type())
117 #define GST_SHM_SINK_ALLOCATOR(obj) \
118   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SHM_SINK_ALLOCATOR, \
119       GstShmSinkAllocator))
120 #define GST_SHM_SINK_ALLOCATOR_CLASS(klass) \
121   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SHM_SINK_ALLOCATOR, \
122       GstShmSinkAllocatorClass))
123 #define GST_IS_SHM_SINK_ALLOCATOR(obj) \
124   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SHM_SINK_ALLOCATOR))
125 #define GST_IS_SHM_SINK_ALLOCATOR_CLASS(klass) \
126   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SHM_SINK_ALLOCATOR))
127 
128 struct _GstShmSinkAllocator
129 {
130   GstAllocator parent;
131 
132   GstShmSink *sink;
133 };
134 
135 typedef struct _GstShmSinkAllocatorClass
136 {
137   GstAllocatorClass parent;
138 } GstShmSinkAllocatorClass;
139 
140 typedef struct _GstShmSinkMemory
141 {
142   GstMemory mem;
143 
144   gchar *data;
145   GstShmSink *sink;
146   ShmBlock *block;
147 } GstShmSinkMemory;
148 
149 GType gst_shm_sink_allocator_get_type (void);
150 
151 G_DEFINE_TYPE (GstShmSinkAllocator, gst_shm_sink_allocator, GST_TYPE_ALLOCATOR);
152 
153 static void
gst_shm_sink_allocator_dispose(GObject * object)154 gst_shm_sink_allocator_dispose (GObject * object)
155 {
156   GstShmSinkAllocator *self = GST_SHM_SINK_ALLOCATOR (object);
157 
158   if (self->sink)
159     gst_object_unref (self->sink);
160   self->sink = NULL;
161 
162   G_OBJECT_CLASS (gst_shm_sink_allocator_parent_class)->dispose (object);
163 }
164 
165 static void
gst_shm_sink_allocator_free(GstAllocator * allocator,GstMemory * mem)166 gst_shm_sink_allocator_free (GstAllocator * allocator, GstMemory * mem)
167 {
168   GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem;
169 
170   if (mymem->block) {
171     GST_OBJECT_LOCK (mymem->sink);
172     sp_writer_free_block (mymem->block);
173     GST_OBJECT_UNLOCK (mymem->sink);
174     gst_object_unref (mymem->sink);
175   }
176   gst_object_unref (mem->allocator);
177 
178   g_slice_free (GstShmSinkMemory, mymem);
179 }
180 
181 static gpointer
gst_shm_sink_allocator_mem_map(GstMemory * mem,gsize maxsize,GstMapFlags flags)182 gst_shm_sink_allocator_mem_map (GstMemory * mem, gsize maxsize,
183     GstMapFlags flags)
184 {
185   GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem;
186 
187   return mymem->data;
188 }
189 
190 static void
gst_shm_sink_allocator_mem_unmap(GstMemory * mem)191 gst_shm_sink_allocator_mem_unmap (GstMemory * mem)
192 {
193 }
194 
195 static GstMemory *
gst_shm_sink_allocator_mem_share(GstMemory * mem,gssize offset,gssize size)196 gst_shm_sink_allocator_mem_share (GstMemory * mem, gssize offset, gssize size)
197 {
198   GstShmSinkMemory *mymem = (GstShmSinkMemory *) mem;
199   GstShmSinkMemory *mysub;
200   GstMemory *parent;
201 
202   /* find the real parent */
203   if ((parent = mem->parent) == NULL)
204     parent = mem;
205 
206   if (size == -1)
207     size = mem->size - offset;
208 
209   mysub = g_slice_new0 (GstShmSinkMemory);
210   /* the shared memory is always readonly */
211   gst_memory_init (GST_MEMORY_CAST (mysub), GST_MINI_OBJECT_FLAGS (parent) |
212       GST_MINI_OBJECT_FLAG_LOCK_READONLY, gst_object_ref (mem->allocator),
213       parent, mem->maxsize, mem->align, mem->offset + offset, size);
214   mysub->data = mymem->data;
215 
216   return (GstMemory *) mysub;
217 }
218 
219 static gboolean
gst_shm_sink_allocator_mem_is_span(GstMemory * mem1,GstMemory * mem2,gsize * offset)220 gst_shm_sink_allocator_mem_is_span (GstMemory * mem1, GstMemory * mem2,
221     gsize * offset)
222 {
223   GstShmSinkMemory *mymem1 = (GstShmSinkMemory *) mem1;
224   GstShmSinkMemory *mymem2 = (GstShmSinkMemory *) mem2;
225 
226   if (offset) {
227     GstMemory *parent;
228 
229     parent = mem1->parent;
230 
231     *offset = mem1->offset - parent->offset;
232   }
233 
234   /* and memory is contiguous */
235   return mymem1->data + mem1->offset + mem1->size ==
236       mymem2->data + mem2->offset;
237 }
238 
239 static void
gst_shm_sink_allocator_init(GstShmSinkAllocator * self)240 gst_shm_sink_allocator_init (GstShmSinkAllocator * self)
241 {
242   GstAllocator *allocator = GST_ALLOCATOR (self);
243 
244   allocator->mem_map = gst_shm_sink_allocator_mem_map;
245   allocator->mem_unmap = gst_shm_sink_allocator_mem_unmap;
246   allocator->mem_share = gst_shm_sink_allocator_mem_share;
247   allocator->mem_is_span = gst_shm_sink_allocator_mem_is_span;
248 }
249 
250 
251 static GstMemory *
gst_shm_sink_allocator_alloc_locked(GstShmSinkAllocator * self,gsize size,GstAllocationParams * params)252 gst_shm_sink_allocator_alloc_locked (GstShmSinkAllocator * self, gsize size,
253     GstAllocationParams * params)
254 {
255   GstMemory *memory = NULL;
256   ShmBlock *block = NULL;
257   gsize maxsize = size + params->prefix + params->padding;
258   gsize align = params->align;
259 
260   /* ensure configured alignment */
261   align |= gst_memory_alignment;
262   /* allocate more to compensate for alignment */
263   maxsize += align;
264 
265   block = sp_writer_alloc_block (self->sink->pipe, maxsize);
266   if (block) {
267     GstShmSinkMemory *mymem;
268     gsize aoffset, padding;
269 
270     GST_LOG_OBJECT (self,
271         "Allocated block %p with %" G_GSIZE_FORMAT " bytes at %p", block, size,
272         sp_writer_block_get_buf (block));
273 
274     mymem = g_slice_new0 (GstShmSinkMemory);
275     memory = GST_MEMORY_CAST (mymem);
276     mymem->data = sp_writer_block_get_buf (block);
277     mymem->sink = gst_object_ref (self->sink);
278     mymem->block = block;
279 
280     /* do alignment */
281     if ((aoffset = ((guintptr) mymem->data & align))) {
282       aoffset = (align + 1) - aoffset;
283       mymem->data += aoffset;
284       maxsize -= aoffset;
285     }
286 
287     if (params->prefix && (params->flags & GST_MEMORY_FLAG_ZERO_PREFIXED))
288       memset (mymem->data, 0, params->prefix);
289 
290     padding = maxsize - (params->prefix + size);
291     if (padding && (params->flags & GST_MEMORY_FLAG_ZERO_PADDED))
292       memset (mymem->data + params->prefix + size, 0, padding);
293 
294     gst_memory_init (memory, params->flags,
295         GST_ALLOCATOR_CAST (g_object_ref (self)), NULL, maxsize, align,
296         params->prefix, size);
297   }
298 
299   return memory;
300 }
301 
302 static GstMemory *
gst_shm_sink_allocator_alloc(GstAllocator * allocator,gsize size,GstAllocationParams * params)303 gst_shm_sink_allocator_alloc (GstAllocator * allocator, gsize size,
304     GstAllocationParams * params)
305 {
306   GstShmSinkAllocator *self = GST_SHM_SINK_ALLOCATOR (allocator);
307   GstMemory *memory = NULL;
308 
309   GST_OBJECT_LOCK (self->sink);
310   memory = gst_shm_sink_allocator_alloc_locked (self, size, params);
311   GST_OBJECT_UNLOCK (self->sink);
312 
313   if (!memory) {
314     memory = gst_allocator_alloc (NULL, size, params);
315     GST_LOG_OBJECT (self,
316         "Not enough shared memory for GstMemory of %" G_GSIZE_FORMAT
317         "bytes, allocating using standard allocator", size);
318   }
319 
320   return memory;
321 }
322 
323 
324 static void
gst_shm_sink_allocator_class_init(GstShmSinkAllocatorClass * klass)325 gst_shm_sink_allocator_class_init (GstShmSinkAllocatorClass * klass)
326 {
327   GstAllocatorClass *allocator_class = GST_ALLOCATOR_CLASS (klass);
328   GObjectClass *object_class = G_OBJECT_CLASS (klass);
329 
330   allocator_class->alloc = gst_shm_sink_allocator_alloc;
331   allocator_class->free = gst_shm_sink_allocator_free;
332   object_class->dispose = gst_shm_sink_allocator_dispose;
333 }
334 
335 static GstShmSinkAllocator *
gst_shm_sink_allocator_new(GstShmSink * sink)336 gst_shm_sink_allocator_new (GstShmSink * sink)
337 {
338   GstShmSinkAllocator *self = g_object_new (GST_TYPE_SHM_SINK_ALLOCATOR, NULL);
339 
340   gst_object_ref_sink (self);
341 
342   self->sink = gst_object_ref (sink);
343 
344   return self;
345 }
346 
347 
348 /***************
349  * MAIN OBJECT *
350  ***************/
351 
352 static void
gst_shm_sink_init(GstShmSink * self)353 gst_shm_sink_init (GstShmSink * self)
354 {
355   g_cond_init (&self->cond);
356   self->size = DEFAULT_SIZE;
357   self->unlock = FALSE;
358   self->wait_for_connection = DEFAULT_WAIT_FOR_CONNECTION;
359   self->perms = DEFAULT_PERMS;
360 
361   gst_allocation_params_init (&self->params);
362 }
363 
364 static void
gst_shm_sink_class_init(GstShmSinkClass * klass)365 gst_shm_sink_class_init (GstShmSinkClass * klass)
366 {
367   GObjectClass *gobject_class;
368   GstElementClass *gstelement_class;
369   GstBaseSinkClass *gstbasesink_class;
370 
371   gobject_class = (GObjectClass *) klass;
372   gstelement_class = (GstElementClass *) klass;
373   gstbasesink_class = (GstBaseSinkClass *) klass;
374 
375   gobject_class->finalize = gst_shm_sink_finalize;
376   gobject_class->set_property = gst_shm_sink_set_property;
377   gobject_class->get_property = gst_shm_sink_get_property;
378 
379   gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_shm_sink_start);
380   gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_shm_sink_stop);
381   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_shm_sink_render);
382   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_shm_sink_event);
383   gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock);
384   gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_shm_sink_unlock_stop);
385   gstbasesink_class->propose_allocation =
386       GST_DEBUG_FUNCPTR (gst_shm_sink_propose_allocation);
387 
388   g_object_class_install_property (gobject_class, PROP_SOCKET_PATH,
389       g_param_spec_string ("socket-path",
390           "Path to the control socket",
391           "The path to the control socket used to control the shared memory "
392           "transport. This may be modified during the NULL->READY transition",
393           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
394 
395   g_object_class_install_property (gobject_class, PROP_PERMS,
396       g_param_spec_uint ("perms",
397           "Permissions on the shm area",
398           "Permissions to set on the shm area",
399           0, 07777, DEFAULT_PERMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
400 
401   g_object_class_install_property (gobject_class, PROP_SHM_SIZE,
402       g_param_spec_uint ("shm-size",
403           "Size of the shm area",
404           "Size of the shared memory area",
405           0, G_MAXUINT, DEFAULT_SIZE,
406           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
407 
408   g_object_class_install_property (gobject_class, PROP_WAIT_FOR_CONNECTION,
409       g_param_spec_boolean ("wait-for-connection",
410           "Wait for a connection until rendering",
411           "Block the stream until the shm pipe is connected",
412           DEFAULT_WAIT_FOR_CONNECTION,
413           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
414 
415   g_object_class_install_property (gobject_class, PROP_BUFFER_TIME,
416       g_param_spec_int64 ("buffer-time",
417           "Buffer Time of the shm buffer",
418           "Maximum Size of the shm buffer in nanoseconds (-1 to disable)",
419           -1, G_MAXINT64, -1,
420           G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
421 
422   signals[SIGNAL_CLIENT_CONNECTED] = g_signal_new ("client-connected",
423       GST_TYPE_SHM_SINK, G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
424       G_TYPE_NONE, 1, G_TYPE_INT);
425 
426   signals[SIGNAL_CLIENT_DISCONNECTED] = g_signal_new ("client-disconnected",
427       GST_TYPE_SHM_SINK, G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
428       G_TYPE_NONE, 1, G_TYPE_INT);
429 
430   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
431 
432   gst_element_class_set_static_metadata (gstelement_class,
433       "Shared Memory Sink",
434       "Sink",
435       "Send data over shared memory to the matching source",
436       "Olivier Crete <olivier.crete@collabora.co.uk>");
437 
438   GST_DEBUG_CATEGORY_INIT (shmsink_debug, "shmsink", 0, "Shared Memory Sink");
439 }
440 
441 static void
gst_shm_sink_finalize(GObject * object)442 gst_shm_sink_finalize (GObject * object)
443 {
444   GstShmSink *self = GST_SHM_SINK (object);
445 
446   g_cond_clear (&self->cond);
447   g_free (self->socket_path);
448 
449   G_OBJECT_CLASS (parent_class)->finalize (object);
450 }
451 
452 /*
453  * Set the value of a property for the server sink.
454  */
455 static void
gst_shm_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)456 gst_shm_sink_set_property (GObject * object, guint prop_id,
457     const GValue * value, GParamSpec * pspec)
458 {
459   GstShmSink *self = GST_SHM_SINK (object);
460   int ret = 0;
461 
462   switch (prop_id) {
463     case PROP_SOCKET_PATH:
464       GST_OBJECT_LOCK (object);
465       g_free (self->socket_path);
466       self->socket_path = g_value_dup_string (value);
467       GST_OBJECT_UNLOCK (object);
468       break;
469     case PROP_PERMS:
470       GST_OBJECT_LOCK (object);
471       self->perms = g_value_get_uint (value);
472       if (self->pipe)
473         ret = sp_writer_setperms_shm (self->pipe, self->perms);
474       GST_OBJECT_UNLOCK (object);
475       if (ret < 0)
476         GST_WARNING_OBJECT (object, "Could not set permissions on pipe: %s",
477             strerror (ret));
478       break;
479     case PROP_SHM_SIZE:
480       GST_OBJECT_LOCK (object);
481       if (self->pipe) {
482         if (sp_writer_resize (self->pipe, g_value_get_uint (value)) < 0) {
483           /* Swap allocators, so we can know immediately if the memory is
484            * ours */
485           gst_object_unref (self->allocator);
486           self->allocator = gst_shm_sink_allocator_new (self);
487 
488           GST_DEBUG_OBJECT (self, "Resized shared memory area from %u to "
489               "%u bytes", self->size, g_value_get_uint (value));
490         } else {
491           GST_WARNING_OBJECT (self, "Could not resize shared memory area from"
492               "%u to %u bytes", self->size, g_value_get_uint (value));
493         }
494       }
495       self->size = g_value_get_uint (value);
496       GST_OBJECT_UNLOCK (object);
497       break;
498     case PROP_WAIT_FOR_CONNECTION:
499       GST_OBJECT_LOCK (object);
500       self->wait_for_connection = g_value_get_boolean (value);
501       GST_OBJECT_UNLOCK (object);
502       g_cond_broadcast (&self->cond);
503       break;
504     case PROP_BUFFER_TIME:
505       GST_OBJECT_LOCK (object);
506       self->buffer_time = g_value_get_int64 (value);
507       GST_OBJECT_UNLOCK (object);
508       g_cond_broadcast (&self->cond);
509       break;
510     default:
511       break;
512   }
513 }
514 
515 static void
gst_shm_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)516 gst_shm_sink_get_property (GObject * object, guint prop_id,
517     GValue * value, GParamSpec * pspec)
518 {
519   GstShmSink *self = GST_SHM_SINK (object);
520 
521   GST_OBJECT_LOCK (object);
522 
523   switch (prop_id) {
524     case PROP_SOCKET_PATH:
525       g_value_set_string (value, self->socket_path);
526       break;
527     case PROP_PERMS:
528       g_value_set_uint (value, self->perms);
529       break;
530     case PROP_SHM_SIZE:
531       g_value_set_uint (value, self->size);
532       break;
533     case PROP_WAIT_FOR_CONNECTION:
534       g_value_set_boolean (value, self->wait_for_connection);
535       break;
536     case PROP_BUFFER_TIME:
537       g_value_set_int64 (value, self->buffer_time);
538       break;
539     default:
540       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
541       break;
542   }
543 
544   GST_OBJECT_UNLOCK (object);
545 }
546 
547 
548 
549 static gboolean
gst_shm_sink_start(GstBaseSink * bsink)550 gst_shm_sink_start (GstBaseSink * bsink)
551 {
552   GstShmSink *self = GST_SHM_SINK (bsink);
553   GError *err = NULL;
554 
555   self->stop = FALSE;
556 
557   if (!self->socket_path) {
558     GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE,
559         ("Could not open socket."), (NULL));
560     return FALSE;
561   }
562 
563   GST_DEBUG_OBJECT (self, "Creating new socket at %s"
564       " with shared memory of %d bytes", self->socket_path, self->size);
565 
566   self->pipe = sp_writer_create (self->socket_path, self->size, self->perms);
567 
568   if (!self->pipe) {
569     GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE,
570         ("Could not open socket."), (NULL));
571     return FALSE;
572   }
573 
574   sp_set_data (self->pipe, self);
575   g_free (self->socket_path);
576   self->socket_path = g_strdup (sp_writer_get_path (self->pipe));
577 
578   GST_DEBUG ("Created socket at %s", self->socket_path);
579 
580   self->poll = gst_poll_new (TRUE);
581   gst_poll_fd_init (&self->serverpollfd);
582   self->serverpollfd.fd = sp_get_fd (self->pipe);
583   gst_poll_add_fd (self->poll, &self->serverpollfd);
584   gst_poll_fd_ctl_read (self->poll, &self->serverpollfd, TRUE);
585 
586   self->pollthread =
587       g_thread_try_new ("gst-shmsink-poll-thread", pollthread_func, self, &err);
588 
589   if (!self->pollthread)
590     goto thread_error;
591 
592   self->allocator = gst_shm_sink_allocator_new (self);
593 
594   return TRUE;
595 
596 thread_error:
597 
598   sp_writer_close (self->pipe, NULL, NULL);
599   self->pipe = NULL;
600   gst_poll_free (self->poll);
601 
602   GST_ELEMENT_ERROR (self, CORE, THREAD, ("Could not start thread"),
603       ("%s", err->message));
604   g_error_free (err);
605   return FALSE;
606 }
607 
608 
609 static gboolean
gst_shm_sink_stop(GstBaseSink * bsink)610 gst_shm_sink_stop (GstBaseSink * bsink)
611 {
612   GstShmSink *self = GST_SHM_SINK (bsink);
613 
614   self->stop = TRUE;
615   gst_poll_set_flushing (self->poll, TRUE);
616 
617   if (self->allocator)
618     gst_object_unref (self->allocator);
619   self->allocator = NULL;
620 
621   g_thread_join (self->pollthread);
622   self->pollthread = NULL;
623 
624   GST_DEBUG_OBJECT (self, "Stopping");
625 
626   while (self->clients) {
627     struct GstShmClient *client = self->clients->data;
628     self->clients = g_list_remove (self->clients, client);
629     sp_writer_close_client (self->pipe, client->client,
630         (sp_buffer_free_callback) gst_buffer_unref, NULL);
631     g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0,
632         client->pollfd.fd);
633     g_slice_free (struct GstShmClient, client);
634   }
635 
636   gst_poll_free (self->poll);
637   self->poll = NULL;
638 
639   sp_writer_close (self->pipe, NULL, NULL);
640   self->pipe = NULL;
641 
642   return TRUE;
643 }
644 
645 static gboolean
gst_shm_sink_can_render(GstShmSink * self,GstClockTime time)646 gst_shm_sink_can_render (GstShmSink * self, GstClockTime time)
647 {
648   ShmBuffer *b;
649 
650   if (time == GST_CLOCK_TIME_NONE || self->buffer_time == GST_CLOCK_TIME_NONE)
651     return TRUE;
652 
653   b = sp_writer_get_pending_buffers (self->pipe);
654   for (; b != NULL; b = sp_writer_get_next_buffer (b)) {
655     GstBuffer *buf = sp_writer_buf_get_tag (b);
656     if (GST_CLOCK_DIFF (time, GST_BUFFER_PTS (buf)) > self->buffer_time)
657       return FALSE;
658   }
659 
660   return TRUE;
661 }
662 
663 static GstFlowReturn
gst_shm_sink_render(GstBaseSink * bsink,GstBuffer * buf)664 gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf)
665 {
666   GstShmSink *self = GST_SHM_SINK (bsink);
667   int rv = 0;
668   GstMapInfo map;
669   gboolean need_new_memory = FALSE;
670   GstFlowReturn ret = GST_FLOW_OK;
671   GstMemory *memory = NULL;
672   GstBuffer *sendbuf = NULL;
673   gsize written_bytes;
674 
675   GST_OBJECT_LOCK (self);
676   if (self->unlock) {
677     GST_OBJECT_UNLOCK (self);
678     return GST_FLOW_FLUSHING;
679   }
680 
681   while (self->wait_for_connection && !self->clients) {
682     g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
683     if (self->unlock) {
684       GST_OBJECT_UNLOCK (self);
685       ret = gst_base_sink_wait_preroll (bsink);
686       if (ret == GST_FLOW_OK)
687         GST_OBJECT_LOCK (self);
688       else
689         return ret;
690     }
691   }
692 
693   while (!gst_shm_sink_can_render (self, GST_BUFFER_TIMESTAMP (buf))) {
694     g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
695     if (self->unlock) {
696       GST_OBJECT_UNLOCK (self);
697       ret = gst_base_sink_wait_preroll (bsink);
698       if (ret == GST_FLOW_OK)
699         GST_OBJECT_LOCK (self);
700       else
701         return ret;
702     }
703   }
704 
705 
706   if (gst_buffer_n_memory (buf) > 1) {
707     GST_LOG_OBJECT (self, "Buffer %p has %d GstMemory, we only support a single"
708         " one, need to do a memcpy", buf, gst_buffer_n_memory (buf));
709     need_new_memory = TRUE;
710   } else {
711     memory = gst_buffer_peek_memory (buf, 0);
712 
713     if (memory->allocator != GST_ALLOCATOR (self->allocator)) {
714       need_new_memory = TRUE;
715       GST_LOG_OBJECT (self, "Memory in buffer %p was not allocated by "
716           "%" GST_PTR_FORMAT ", will memcpy", buf, memory->allocator);
717     }
718   }
719 
720   if (need_new_memory) {
721     if (gst_buffer_get_size (buf) > sp_writer_get_max_buf_size (self->pipe)) {
722       gsize area_size = sp_writer_get_max_buf_size (self->pipe);
723       GST_ELEMENT_ERROR (self, RESOURCE, NO_SPACE_LEFT, (NULL),
724           ("Shared memory area of size %" G_GSIZE_FORMAT " is smaller than"
725               "buffer of size %" G_GSIZE_FORMAT, area_size,
726               gst_buffer_get_size (buf)));
727       goto error;
728     }
729 
730     while ((memory =
731             gst_shm_sink_allocator_alloc_locked (self->allocator,
732                 gst_buffer_get_size (buf), &self->params)) == NULL) {
733       g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
734       if (self->unlock) {
735         GST_OBJECT_UNLOCK (self);
736         ret = gst_base_sink_wait_preroll (bsink);
737         if (ret == GST_FLOW_OK)
738           GST_OBJECT_LOCK (self);
739         else
740           return ret;
741       }
742     }
743 
744     while (self->wait_for_connection && !self->clients) {
745       g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
746       if (self->unlock) {
747         GST_OBJECT_UNLOCK (self);
748         ret = gst_base_sink_wait_preroll (bsink);
749         if (ret == GST_FLOW_OK) {
750           GST_OBJECT_LOCK (self);
751         } else {
752           gst_memory_unref (memory);
753           return ret;
754         }
755       }
756     }
757 
758     if (!gst_memory_map (memory, &map, GST_MAP_WRITE)) {
759       GST_ELEMENT_ERROR (self, STREAM, FAILED,
760           (NULL), ("Failed to map memory"));
761       goto error;
762     }
763 
764     GST_DEBUG_OBJECT (self,
765         "Copying %" G_GSIZE_FORMAT " bytes into map of size %" G_GSIZE_FORMAT
766         " bytes.", gst_buffer_get_size (buf), map.size);
767     written_bytes = gst_buffer_extract (buf, 0, map.data, map.size);
768     GST_DEBUG_OBJECT (self, "Copied %" G_GSIZE_FORMAT " bytes.", written_bytes);
769     gst_memory_unmap (memory, &map);
770 
771     sendbuf = gst_buffer_new ();
772     if (!gst_buffer_copy_into (sendbuf, buf, GST_BUFFER_COPY_METADATA, 0, -1)) {
773       GST_ELEMENT_ERROR (self, STREAM, FAILED,
774           (NULL), ("Failed to copy data into send buffer"));
775       gst_buffer_unref (sendbuf);
776       goto error;
777     }
778     gst_buffer_append_memory (sendbuf, memory);
779   } else {
780     sendbuf = gst_buffer_ref (buf);
781   }
782 
783   if (!gst_buffer_map (sendbuf, &map, GST_MAP_READ)) {
784     GST_ELEMENT_ERROR (self, STREAM, FAILED,
785         (NULL), ("Failed to map data into send buffer"));
786     goto error;
787   }
788 
789   /* Make the memory readonly as of now as we've sent it to the other side
790    * We know it's not mapped for writing anywhere as we just mapped it for
791    * reading
792    */
793   rv = sp_writer_send_buf (self->pipe, (char *) map.data, map.size, sendbuf);
794   if (rv == -1) {
795     GST_ELEMENT_ERROR (self, STREAM, FAILED,
796         (NULL), ("Failed to send data over SHM"));
797     gst_buffer_unmap (sendbuf, &map);
798     goto error;
799   }
800 
801   gst_buffer_unmap (sendbuf, &map);
802 
803   GST_OBJECT_UNLOCK (self);
804 
805   if (rv == 0) {
806     GST_DEBUG_OBJECT (self, "No clients connected, unreffing buffer");
807     gst_buffer_unref (sendbuf);
808   }
809 
810   return ret;
811 
812 error:
813   GST_OBJECT_UNLOCK (self);
814   return GST_FLOW_ERROR;
815 }
816 
817 static void
free_buffer_locked(GstBuffer * buffer,void * data)818 free_buffer_locked (GstBuffer * buffer, void *data)
819 {
820   GSList **list = data;
821 
822   g_assert (buffer != NULL);
823 
824   *list = g_slist_prepend (*list, buffer);
825 }
826 
827 static gpointer
pollthread_func(gpointer data)828 pollthread_func (gpointer data)
829 {
830   GstShmSink *self = GST_SHM_SINK (data);
831   GList *item;
832   GstClockTime timeout = GST_CLOCK_TIME_NONE;
833   int rv = 0;
834 
835   while (!self->stop) {
836 
837     do {
838       rv = gst_poll_wait (self->poll, timeout);
839     } while (rv < 0 && errno == EINTR);
840 
841     if (rv < 0) {
842       GST_ELEMENT_ERROR (self, RESOURCE, READ,
843           ("Failed waiting on fd activity"),
844           ("gst_poll_wait returned %d, errno: %d", rv, errno));
845       return NULL;
846     }
847 
848     timeout = GST_CLOCK_TIME_NONE;
849 
850     if (self->stop)
851       return NULL;
852 
853     if (gst_poll_fd_has_closed (self->poll, &self->serverpollfd)) {
854       GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed read from shmsink"),
855           ("Control socket has closed"));
856       return NULL;
857     }
858 
859     if (gst_poll_fd_has_error (self->poll, &self->serverpollfd)) {
860       GST_ELEMENT_ERROR (self, RESOURCE, READ, ("Failed to read from shmsink"),
861           ("Control socket has error"));
862       return NULL;
863     }
864 
865     if (gst_poll_fd_can_read (self->poll, &self->serverpollfd)) {
866       ShmClient *client;
867       struct GstShmClient *gclient;
868 
869       GST_OBJECT_LOCK (self);
870       client = sp_writer_accept_client (self->pipe);
871       GST_OBJECT_UNLOCK (self);
872 
873       if (!client) {
874         GST_ELEMENT_ERROR (self, RESOURCE, READ,
875             ("Failed to read from shmsink"),
876             ("Control socket returns wrong data"));
877         return NULL;
878       }
879 
880       gclient = g_slice_new (struct GstShmClient);
881       gclient->client = client;
882       gst_poll_fd_init (&gclient->pollfd);
883       gclient->pollfd.fd = sp_writer_get_client_fd (client);
884       gst_poll_add_fd (self->poll, &gclient->pollfd);
885       gst_poll_fd_ctl_read (self->poll, &gclient->pollfd, TRUE);
886       self->clients = g_list_prepend (self->clients, gclient);
887       g_signal_emit (self, signals[SIGNAL_CLIENT_CONNECTED], 0,
888           gclient->pollfd.fd);
889       /* we need to call gst_poll_wait before calling gst_poll_* status
890          functions on that new descriptor, so restart the loop, so _wait
891          will have been called on all elements of self->poll, whether
892          they have just been added or not. */
893       timeout = 0;
894       continue;
895     }
896 
897   again:
898     for (item = self->clients; item; item = item->next) {
899       struct GstShmClient *gclient = item->data;
900 
901       if (gst_poll_fd_has_closed (self->poll, &gclient->pollfd)) {
902         GST_WARNING_OBJECT (self, "One client is gone, closing");
903         goto close_client;
904       }
905 
906       if (gst_poll_fd_has_error (self->poll, &gclient->pollfd)) {
907         GST_WARNING_OBJECT (self, "One client fd has error, closing");
908         goto close_client;
909       }
910 
911       if (gst_poll_fd_can_read (self->poll, &gclient->pollfd)) {
912         int rv;
913         gpointer tag = NULL;
914 
915         GST_OBJECT_LOCK (self);
916         rv = sp_writer_recv (self->pipe, gclient->client, &tag);
917         GST_OBJECT_UNLOCK (self);
918 
919         if (rv < 0) {
920           GST_WARNING_OBJECT (self, "One client has read error,"
921               " closing (retval: %d errno: %d)", rv, errno);
922           goto close_client;
923         }
924 
925         g_assert (rv == 0 || tag == NULL);
926 
927         if (rv == 0)
928           gst_buffer_unref (tag);
929       }
930       continue;
931     close_client:
932       {
933         GSList *list = NULL;
934         GST_OBJECT_LOCK (self);
935         sp_writer_close_client (self->pipe, gclient->client,
936             (sp_buffer_free_callback) free_buffer_locked, (void **) &list);
937         GST_OBJECT_UNLOCK (self);
938         g_slist_free_full (list, (GDestroyNotify) gst_buffer_unref);
939       }
940 
941       gst_poll_remove_fd (self->poll, &gclient->pollfd);
942       self->clients = g_list_remove (self->clients, gclient);
943 
944       g_signal_emit (self, signals[SIGNAL_CLIENT_DISCONNECTED], 0,
945           gclient->pollfd.fd);
946       g_slice_free (struct GstShmClient, gclient);
947 
948       goto again;
949     }
950 
951     g_cond_broadcast (&self->cond);
952   }
953 
954   return NULL;
955 }
956 
957 static gboolean
gst_shm_sink_event(GstBaseSink * bsink,GstEvent * event)958 gst_shm_sink_event (GstBaseSink * bsink, GstEvent * event)
959 {
960   GstShmSink *self = GST_SHM_SINK (bsink);
961 
962   switch (GST_EVENT_TYPE (event)) {
963     case GST_EVENT_EOS:
964       GST_OBJECT_LOCK (self);
965       while (self->wait_for_connection && sp_writer_pending_writes (self->pipe)
966           && !self->unlock)
967         g_cond_wait (&self->cond, GST_OBJECT_GET_LOCK (self));
968       GST_OBJECT_UNLOCK (self);
969       break;
970     default:
971       break;
972   }
973 
974   return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event);
975 }
976 
977 
978 static gboolean
gst_shm_sink_unlock(GstBaseSink * bsink)979 gst_shm_sink_unlock (GstBaseSink * bsink)
980 {
981   GstShmSink *self = GST_SHM_SINK (bsink);
982 
983   GST_OBJECT_LOCK (self);
984   self->unlock = TRUE;
985   GST_OBJECT_UNLOCK (self);
986 
987   g_cond_broadcast (&self->cond);
988   return TRUE;
989 }
990 
991 static gboolean
gst_shm_sink_unlock_stop(GstBaseSink * bsink)992 gst_shm_sink_unlock_stop (GstBaseSink * bsink)
993 {
994   GstShmSink *self = GST_SHM_SINK (bsink);
995 
996   GST_OBJECT_LOCK (self);
997   self->unlock = FALSE;
998   GST_OBJECT_UNLOCK (self);
999 
1000   return TRUE;
1001 }
1002 
1003 static gboolean
gst_shm_sink_propose_allocation(GstBaseSink * sink,GstQuery * query)1004 gst_shm_sink_propose_allocation (GstBaseSink * sink, GstQuery * query)
1005 {
1006   GstShmSink *self = GST_SHM_SINK (sink);
1007 
1008   if (self->allocator)
1009     gst_query_add_allocation_param (query, GST_ALLOCATOR (self->allocator),
1010         NULL);
1011 
1012   return TRUE;
1013 }
1014