• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2015-2017 YouView TV Ltd
3  *   Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk>
4  *
5  * gstipcpipelinecomm.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26 
27 #ifdef HAVE_UNISTD_H
28 #  include <unistd.h>
29 #endif
30 #ifdef _MSC_VER
31 /* ssize_t is not available, so match return value of recv()/send() on MSVC */
32 #  define ssize_t int
33 #  include <winsock2.h>
34 #endif
35 #include <errno.h>
36 #include <string.h>
37 #include <gst/base/gstbytewriter.h>
38 #include <gst/gstprotection.h>
39 #include "gstipcpipelinecomm.h"
40 
41 GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_comm_debug);
42 #define GST_CAT_DEFAULT gst_ipc_pipeline_comm_debug
43 
44 #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
45 
46 GQuark QUARK_ID;
47 
48 typedef enum
49 {
50   ACK_TYPE_NONE,
51   ACK_TYPE_TIMED,
52   ACK_TYPE_BLOCKING
53 } AckType;
54 
55 typedef enum
56 {
57   COMM_REQUEST_TYPE_BUFFER,
58   COMM_REQUEST_TYPE_EVENT,
59   COMM_REQUEST_TYPE_QUERY,
60   COMM_REQUEST_TYPE_STATE_CHANGE,
61   COMM_REQUEST_TYPE_MESSAGE,
62 } CommRequestType;
63 
64 typedef struct
65 {
66   guint32 id;
67   gboolean replied;
68   gboolean comm_error;
69   guint32 ret;
70   GstQuery *query;
71   CommRequestType type;
72   GCond cond;
73 } CommRequest;
74 
75 static const gchar *comm_request_ret_get_name (CommRequestType type,
76     guint32 ret);
77 static guint32 comm_request_ret_get_failure_value (CommRequestType type);
78 
79 static CommRequest *
comm_request_new(guint32 id,CommRequestType type,GstQuery * query)80 comm_request_new (guint32 id, CommRequestType type, GstQuery * query)
81 {
82   CommRequest *req;
83 
84   req = g_malloc (sizeof (CommRequest));
85   req->id = id;
86   g_cond_init (&req->cond);
87   req->replied = FALSE;
88   req->comm_error = FALSE;
89   req->query = query;
90   req->ret = comm_request_ret_get_failure_value (type);
91   req->type = type;
92 
93   return req;
94 }
95 
96 static guint32
comm_request_wait(GstIpcPipelineComm * comm,CommRequest * req,AckType ack_type)97 comm_request_wait (GstIpcPipelineComm * comm, CommRequest * req,
98     AckType ack_type)
99 {
100   guint32 ret = comm_request_ret_get_failure_value (req->type);
101   guint64 end_time;
102 
103   if (ack_type == ACK_TYPE_TIMED)
104     end_time = g_get_monotonic_time () + comm->ack_time;
105   else
106     end_time = G_MAXUINT64;
107 
108   GST_TRACE_OBJECT (comm->element, "Waiting for ACK/NAK for request %u",
109       req->id);
110   while (!req->replied) {
111     if (ack_type == ACK_TYPE_TIMED) {
112       if (!g_cond_wait_until (&req->cond, &comm->mutex, end_time))
113         break;
114     } else
115       g_cond_wait (&req->cond, &comm->mutex);
116   }
117 
118   if (req->replied) {
119     ret = req->ret;
120     GST_TRACE_OBJECT (comm->element, "Got reply for request %u: %d (%s)",
121         req->id, ret, comm_request_ret_get_name (req->type, ret));
122   } else {
123     req->comm_error = TRUE;
124     GST_ERROR_OBJECT (comm->element, "Timeout waiting for reply for request %u",
125         req->id);
126   }
127 
128   return ret;
129 }
130 
131 static void
comm_request_free(CommRequest * req)132 comm_request_free (CommRequest * req)
133 {
134   g_cond_clear (&req->cond);
135   g_free (req);
136 }
137 
138 static const gchar *
comm_request_ret_get_name(CommRequestType type,guint32 ret)139 comm_request_ret_get_name (CommRequestType type, guint32 ret)
140 {
141   switch (type) {
142     case COMM_REQUEST_TYPE_BUFFER:
143       return gst_flow_get_name (ret);
144     case COMM_REQUEST_TYPE_EVENT:
145     case COMM_REQUEST_TYPE_QUERY:
146     case COMM_REQUEST_TYPE_MESSAGE:
147       return ret ? "TRUE" : "FALSE";
148     case COMM_REQUEST_TYPE_STATE_CHANGE:
149       return gst_element_state_change_return_get_name (ret);
150     default:
151       g_assert_not_reached ();
152   }
153 }
154 
155 static guint32
comm_request_ret_get_failure_value(CommRequestType type)156 comm_request_ret_get_failure_value (CommRequestType type)
157 {
158   switch (type) {
159     case COMM_REQUEST_TYPE_BUFFER:
160       return GST_FLOW_COMM_ERROR;
161     case COMM_REQUEST_TYPE_EVENT:
162     case COMM_REQUEST_TYPE_MESSAGE:
163     case COMM_REQUEST_TYPE_QUERY:
164       return FALSE;
165     case COMM_REQUEST_TYPE_STATE_CHANGE:
166       return GST_STATE_CHANGE_FAILURE;
167     default:
168       g_assert_not_reached ();
169   }
170 }
171 
172 static const gchar *
gst_ipc_pipeline_comm_data_type_get_name(GstIpcPipelineCommDataType type)173 gst_ipc_pipeline_comm_data_type_get_name (GstIpcPipelineCommDataType type)
174 {
175   switch (type) {
176     case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
177       return "ACK";
178     case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
179       return "QUERY_RESULT";
180     case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
181       return "BUFFER";
182     case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
183       return "EVENT";
184     case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
185       return "SINK_MESSAGE_EVENT";
186     case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
187       return "QUERY";
188     case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
189       return "STATE_CHANGE";
190     case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
191       return "STATE_LOST";
192     case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
193       return "MESSAGE";
194     case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
195       return "GERROR_MESSAGE";
196     default:
197       return "UNKNOWN";
198   }
199 }
200 
201 static gboolean
gst_ipc_pipeline_comm_sync_fd(GstIpcPipelineComm * comm,guint32 id,GstQuery * query,guint32 * ret,AckType ack_type,CommRequestType type)202 gst_ipc_pipeline_comm_sync_fd (GstIpcPipelineComm * comm, guint32 id,
203     GstQuery * query, guint32 * ret, AckType ack_type, CommRequestType type)
204 {
205   CommRequest *req;
206   gboolean comm_error;
207   GHashTable *waiting_ids;
208 
209   if (ack_type == ACK_TYPE_NONE)
210     return TRUE;
211 
212   req = comm_request_new (id, type, query);
213   waiting_ids = g_hash_table_ref (comm->waiting_ids);
214   g_hash_table_insert (waiting_ids, GINT_TO_POINTER (id), req);
215   *ret = comm_request_wait (comm, req, ack_type);
216   comm_error = req->comm_error;
217   g_hash_table_remove (waiting_ids, GINT_TO_POINTER (id));
218   g_hash_table_unref (waiting_ids);
219   return !comm_error;
220 }
221 
222 static gboolean
write_to_fd_raw(GstIpcPipelineComm * comm,const void * data,size_t size)223 write_to_fd_raw (GstIpcPipelineComm * comm, const void *data, size_t size)
224 {
225   size_t offset;
226   gboolean ret = TRUE;
227 
228   offset = 0;
229   GST_TRACE_OBJECT (comm->element, "Writing %u bytes to fdout",
230       (unsigned) size);
231   while (size) {
232 #ifdef _MSC_VER
233     ssize_t written =
234         send (comm->fdout, (const unsigned char *) data + offset, size, 0);
235     if (written < 0) {
236       int last_error = WSAGetLastError ();
237       if (last_error == WSAEWOULDBLOCK)
238         continue;
239       gchar *error_text = g_win32_error_message (last_error);
240       GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s", error_text);
241       g_free (error_text);
242       ret = FALSE;
243       goto done;
244     }
245 #else
246     ssize_t written =
247         write (comm->fdout, (const unsigned char *) data + offset, size);
248     if (written < 0) {
249       if (errno == EAGAIN || errno == EINTR)
250         continue;
251       GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s",
252           strerror (errno));
253       ret = FALSE;
254       goto done;
255     }
256 #endif
257     size -= written;
258     offset += written;
259   }
260 
261 done:
262   return ret;
263 }
264 
265 static gboolean
write_byte_writer_to_fd(GstIpcPipelineComm * comm,GstByteWriter * bw)266 write_byte_writer_to_fd (GstIpcPipelineComm * comm, GstByteWriter * bw)
267 {
268   guint8 *data;
269   gboolean ret;
270   guint size;
271 
272   size = gst_byte_writer_get_size (bw);
273   data = gst_byte_writer_reset_and_get_data (bw);
274   if (!data)
275     return FALSE;
276   ret = write_to_fd_raw (comm, data, size);
277   g_free (data);
278   return ret;
279 }
280 
281 static void
gst_ipc_pipeline_comm_write_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,guint32 ret,CommRequestType type)282 gst_ipc_pipeline_comm_write_ack_to_fd (GstIpcPipelineComm * comm, guint32 id,
283     guint32 ret, CommRequestType type)
284 {
285   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK;
286   guint32 size;
287   GstByteWriter bw;
288 
289   g_mutex_lock (&comm->mutex);
290 
291   GST_TRACE_OBJECT (comm->element, "Writing ACK for %u: %s (%d)", id,
292       comm_request_ret_get_name (type, ret), ret);
293   gst_byte_writer_init (&bw);
294   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
295     goto write_failed;
296   if (!gst_byte_writer_put_uint32_le (&bw, id))
297     goto write_failed;
298   size = sizeof (ret);
299   if (!gst_byte_writer_put_uint32_le (&bw, size))
300     goto write_failed;
301   if (!gst_byte_writer_put_uint32_le (&bw, ret))
302     goto write_failed;
303 
304   if (!write_byte_writer_to_fd (comm, &bw))
305     goto write_failed;
306 
307 done:
308   g_mutex_unlock (&comm->mutex);
309   gst_byte_writer_reset (&bw);
310   return;
311 
312 write_failed:
313   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
314       ("Failed to write to socket"));
315   goto done;
316 }
317 
318 void
gst_ipc_pipeline_comm_write_flow_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,GstFlowReturn ret)319 gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm,
320     guint32 id, GstFlowReturn ret)
321 {
322   gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
323       COMM_REQUEST_TYPE_BUFFER);
324 }
325 
326 void
gst_ipc_pipeline_comm_write_boolean_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,gboolean ret)327 gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm,
328     guint32 id, gboolean ret)
329 {
330   gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
331       COMM_REQUEST_TYPE_EVENT);
332 }
333 
334 void
gst_ipc_pipeline_comm_write_state_change_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,GstStateChangeReturn ret)335 gst_ipc_pipeline_comm_write_state_change_ack_to_fd (GstIpcPipelineComm * comm,
336     guint32 id, GstStateChangeReturn ret)
337 {
338   gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
339       COMM_REQUEST_TYPE_STATE_CHANGE);
340 }
341 
342 void
gst_ipc_pipeline_comm_write_query_result_to_fd(GstIpcPipelineComm * comm,guint32 id,gboolean result,GstQuery * query)343 gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm,
344     guint32 id, gboolean result, GstQuery * query)
345 {
346   const unsigned char payload_type =
347       GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT;
348   guint8 result8 = result;
349   guint32 size;
350   size_t len;
351   char *str = NULL;
352   guint32 type;
353   const GstStructure *structure;
354   GstByteWriter bw;
355 
356   g_mutex_lock (&comm->mutex);
357 
358   GST_TRACE_OBJECT (comm->element,
359       "Writing query result for %u: %d, %" GST_PTR_FORMAT, id, result, query);
360   gst_byte_writer_init (&bw);
361   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
362     goto write_failed;
363   if (!gst_byte_writer_put_uint32_le (&bw, id))
364     goto write_failed;
365   structure = gst_query_get_structure (query);
366   if (structure) {
367     str = gst_structure_to_string (structure);
368     len = strlen (str);
369   } else {
370     str = NULL;
371     len = 0;
372   }
373   size = 1 + sizeof (guint32) + len + 1;
374   if (!gst_byte_writer_put_uint32_le (&bw, size))
375     goto write_failed;
376   if (!gst_byte_writer_put_uint8 (&bw, result8))
377     goto write_failed;
378   type = GST_QUERY_TYPE (query);
379   if (!gst_byte_writer_put_uint32_le (&bw, type))
380     goto write_failed;
381   if (str) {
382     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, len + 1))
383       goto write_failed;
384   } else {
385     if (!gst_byte_writer_put_uint8 (&bw, 0))
386       goto write_failed;
387   }
388 
389   if (!write_byte_writer_to_fd (comm, &bw))
390     goto write_failed;
391 
392 done:
393   g_mutex_unlock (&comm->mutex);
394   gst_byte_writer_reset (&bw);
395   g_free (str);
396   return;
397 
398 write_failed:
399   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
400       ("Failed to write to socket"));
401   goto done;
402 }
403 
404 static gboolean
gst_ipc_pipeline_comm_read_query_result(GstIpcPipelineComm * comm,guint32 size,GstQuery ** query)405 gst_ipc_pipeline_comm_read_query_result (GstIpcPipelineComm * comm,
406     guint32 size, GstQuery ** query)
407 {
408   gchar *end = NULL;
409   GstStructure *structure;
410   guint8 result;
411   guint32 type;
412   const guint8 *payload = NULL;
413   guint32 mapped_size = size;
414 
415   /* this should not be called if we don't have enough yet */
416   *query = NULL;
417   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
418   g_return_val_if_fail (size >= 1 + sizeof (guint32), FALSE);
419 
420   payload = gst_adapter_map (comm->adapter, mapped_size);
421   if (!payload)
422     return FALSE;
423   result = *payload++;
424   memcpy (&type, payload, sizeof (type));
425   payload += sizeof (type);
426 
427   size -= 1 + sizeof (guint32);
428   if (size == 0)
429     goto done;
430 
431   if (payload[size - 1]) {
432     result = FALSE;
433     goto done;
434   }
435   if (*payload) {
436     structure = gst_structure_from_string ((const char *) payload, &end);
437   } else {
438     structure = NULL;
439   }
440   if (!structure) {
441     result = FALSE;
442     goto done;
443   }
444 
445   *query = gst_query_new_custom (type, structure);
446 
447 done:
448   gst_adapter_unmap (comm->adapter);
449   gst_adapter_flush (comm->adapter, mapped_size);
450   return result;
451 }
452 
453 typedef struct
454 {
455   guint32 bytes;
456 
457   guint64 size;
458   guint32 flags;
459   guint64 api;
460   char *str;
461 } MetaBuildInfo;
462 
463 typedef struct
464 {
465   GstIpcPipelineComm *comm;
466   guint32 n_meta;
467   guint32 total_bytes;
468   MetaBuildInfo *info;
469 } MetaListRepresentation;
470 
471 static gboolean
build_meta(GstBuffer * buffer,GstMeta ** meta,gpointer user_data)472 build_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data)
473 {
474   MetaListRepresentation *repr = user_data;
475 
476   repr->n_meta++;
477   repr->info = g_realloc (repr->info, repr->n_meta * sizeof (MetaBuildInfo));
478   repr->info[repr->n_meta - 1].bytes =
479       /* 4 byte bytes */
480       4
481       /* 4 byte GstMetaFlags */
482       + 4
483       /* GstMetaInfo::api */
484       + 4 + strlen (g_type_name ((*meta)->info->api)) + 1
485       /* GstMetaInfo::size */
486       + 8
487       /* str length */
488       + 4;
489 
490   repr->info[repr->n_meta - 1].flags = (*meta)->flags;
491   repr->info[repr->n_meta - 1].api = (*meta)->info->api;
492   repr->info[repr->n_meta - 1].size = (*meta)->info->size;
493   repr->info[repr->n_meta - 1].str = NULL;
494 
495   /* GstMeta is a base class, and actual useful classes are all different...
496      So we list a few of them we know we want and ignore the open ended rest */
497   if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) {
498     GstProtectionMeta *m = (GstProtectionMeta *) * meta;
499     repr->info[repr->n_meta - 1].str = gst_structure_to_string (m->info);
500     repr->info[repr->n_meta - 1].bytes +=
501         strlen (repr->info[repr->n_meta - 1].str) + 1;
502     GST_TRACE_OBJECT (repr->comm->element, "Found GstMeta type %s: %s",
503         g_type_name ((*meta)->info->api), repr->info[repr->n_meta - 1].str);
504   } else {
505     GST_WARNING_OBJECT (repr->comm->element, "Ignoring GstMeta type %s",
506         g_type_name ((*meta)->info->api));
507   }
508   repr->total_bytes += repr->info[repr->n_meta - 1].bytes;
509   return TRUE;
510 }
511 
512 typedef struct
513 {
514   guint64 pts;
515   guint64 dts;
516   guint64 duration;
517   guint64 offset;
518   guint64 offset_end;
519   guint64 flags;
520 } CommBufferMetadata;
521 
522 GstFlowReturn
gst_ipc_pipeline_comm_write_buffer_to_fd(GstIpcPipelineComm * comm,GstBuffer * buffer)523 gst_ipc_pipeline_comm_write_buffer_to_fd (GstIpcPipelineComm * comm,
524     GstBuffer * buffer)
525 {
526   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER;
527   GstMapInfo map;
528   guint32 ret32 = GST_FLOW_OK;
529   guint32 size, n;
530   CommBufferMetadata meta;
531   GstFlowReturn ret;
532   MetaListRepresentation repr = { comm, 0, 4, NULL };   /* starts a 4 for n_meta */
533   GstByteWriter bw;
534 
535   g_mutex_lock (&comm->mutex);
536   ++comm->send_id;
537 
538   GST_TRACE_OBJECT (comm->element, "Writing buffer %u: %" GST_PTR_FORMAT,
539       comm->send_id, buffer);
540 
541   gst_byte_writer_init (&bw);
542 
543   meta.pts = GST_BUFFER_PTS (buffer);
544   meta.dts = GST_BUFFER_DTS (buffer);
545   meta.duration = GST_BUFFER_DURATION (buffer);
546   meta.offset = GST_BUFFER_OFFSET (buffer);
547   meta.offset_end = GST_BUFFER_OFFSET_END (buffer);
548   meta.flags = GST_BUFFER_FLAGS (buffer);
549 
550   /* work out meta size */
551   gst_buffer_foreach_meta (buffer, build_meta, &repr);
552 
553   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
554     goto write_failed;
555   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
556     goto write_failed;
557   size =
558       gst_buffer_get_size (buffer) + sizeof (guint32) +
559       sizeof (CommBufferMetadata) + repr.total_bytes;
560   if (!gst_byte_writer_put_uint32_le (&bw, size))
561     goto write_failed;
562   if (!gst_byte_writer_put_data (&bw, (const guint8 *) &meta, sizeof (meta)))
563     goto write_failed;
564   size = gst_buffer_get_size (buffer);
565   if (!gst_byte_writer_put_uint32_le (&bw, size))
566     goto write_failed;
567   if (!write_byte_writer_to_fd (comm, &bw))
568     goto write_failed;
569 
570   if (!gst_buffer_map (buffer, &map, GST_MAP_READ))
571     goto map_failed;
572   ret = write_to_fd_raw (comm, map.data, map.size);
573   gst_buffer_unmap (buffer, &map);
574   if (!ret)
575     goto write_failed;
576 
577   /* meta */
578   gst_byte_writer_init (&bw);
579   if (!gst_byte_writer_put_uint32_le (&bw, repr.n_meta))
580     goto write_failed;
581   for (n = 0; n < repr.n_meta; ++n) {
582     const MetaBuildInfo *info = repr.info + n;
583     guint32 len;
584     const char *s;
585 
586     if (!gst_byte_writer_put_uint32_le (&bw, info->bytes))
587       goto write_failed;
588 
589     if (!gst_byte_writer_put_uint32_le (&bw, info->flags))
590       goto write_failed;
591 
592     s = g_type_name (info->api);
593     len = strlen (s) + 1;
594     if (!gst_byte_writer_put_uint32_le (&bw, len))
595       goto write_failed;
596     if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
597       goto write_failed;
598 
599     if (!gst_byte_writer_put_uint64_le (&bw, info->size))
600       goto write_failed;
601 
602     s = info->str;
603     len = s ? (strlen (s) + 1) : 0;
604     if (!gst_byte_writer_put_uint32_le (&bw, len))
605       goto write_failed;
606     if (len)
607       if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
608         goto write_failed;
609   }
610 
611   if (!write_byte_writer_to_fd (comm, &bw))
612     goto write_failed;
613 
614   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
615           ACK_TYPE_BLOCKING, COMM_REQUEST_TYPE_BUFFER))
616     goto wait_failed;
617   ret = ret32;
618 
619 done:
620   g_mutex_unlock (&comm->mutex);
621   gst_byte_writer_reset (&bw);
622   for (n = 0; n < repr.n_meta; ++n)
623     g_free (repr.info[n].str);
624   g_free (repr.info);
625   return ret;
626 
627 write_failed:
628   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
629       ("Failed to write to socket"));
630   ret = GST_FLOW_COMM_ERROR;
631   goto done;
632 
633 wait_failed:
634   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
635       ("Failed to wait for reply on socket"));
636   ret = GST_FLOW_COMM_ERROR;
637   goto done;
638 
639 map_failed:
640   GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
641       ("Failed to map buffer"));
642   ret = GST_FLOW_ERROR;
643   goto done;
644 }
645 
646 static GstBuffer *
gst_ipc_pipeline_comm_read_buffer(GstIpcPipelineComm * comm,guint32 size)647 gst_ipc_pipeline_comm_read_buffer (GstIpcPipelineComm * comm, guint32 size)
648 {
649   GstBuffer *buffer;
650   CommBufferMetadata meta;
651   guint32 n_meta, n;
652   const guint8 *payload = NULL;
653   guint32 mapped_size, buffer_data_size;
654 
655   /* this should not be called if we don't have enough yet */
656   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
657   g_return_val_if_fail (size >= sizeof (CommBufferMetadata), NULL);
658 
659   mapped_size = sizeof (CommBufferMetadata) + sizeof (buffer_data_size);
660   payload = gst_adapter_map (comm->adapter, mapped_size);
661   if (!payload)
662     return NULL;
663   memcpy (&meta, payload, sizeof (CommBufferMetadata));
664   payload += sizeof (CommBufferMetadata);
665   memcpy (&buffer_data_size, payload, sizeof (buffer_data_size));
666   size -= mapped_size;
667   gst_adapter_unmap (comm->adapter);
668   gst_adapter_flush (comm->adapter, mapped_size);
669 
670   if (buffer_data_size == 0) {
671     buffer = gst_buffer_new ();
672   } else {
673     buffer = gst_adapter_get_buffer (comm->adapter, buffer_data_size);
674     gst_adapter_flush (comm->adapter, buffer_data_size);
675   }
676   size -= buffer_data_size;
677 
678   GST_BUFFER_PTS (buffer) = meta.pts;
679   GST_BUFFER_DTS (buffer) = meta.dts;
680   GST_BUFFER_DURATION (buffer) = meta.duration;
681   GST_BUFFER_OFFSET (buffer) = meta.offset;
682   GST_BUFFER_OFFSET_END (buffer) = meta.offset_end;
683   GST_BUFFER_FLAGS (buffer) = meta.flags;
684 
685   /* If you don't call that, the GType isn't yet known at the
686      g_type_from_name below */
687   gst_protection_meta_get_info ();
688 
689   mapped_size = size;
690   payload = gst_adapter_map (comm->adapter, mapped_size);
691   if (!payload) {
692     gst_buffer_unref (buffer);
693     return NULL;
694   }
695   memcpy (&n_meta, payload, sizeof (n_meta));
696   payload += sizeof (n_meta);
697 
698   for (n = 0; n < n_meta; ++n) {
699     guint32 flags, len, bytes;
700     guint64 msize;
701     GType api;
702     GstMeta *meta;
703     GstStructure *structure = NULL;
704 
705     memcpy (&bytes, payload, sizeof (bytes));
706     payload += sizeof (bytes);
707 
708 #define READ_FIELD(f) do { \
709     memcpy (&f, payload, sizeof (f)); \
710     payload += sizeof(f); \
711     } while(0)
712 
713     READ_FIELD (flags);
714     READ_FIELD (len);
715     api = g_type_from_name ((const char *) payload);
716     payload = (const guint8 *) strchr ((const char *) payload, 0) + 1;
717     READ_FIELD (msize);
718     READ_FIELD (len);
719     if (len) {
720       structure = gst_structure_new_from_string ((const char *) payload);
721       payload += len + 1;
722     }
723 
724     /* Seems we can add a meta from the api nor type ? */
725     if (api == GST_PROTECTION_META_API_TYPE) {
726       meta =
727           gst_buffer_add_meta (buffer, gst_protection_meta_get_info (), NULL);
728       ((GstProtectionMeta *) meta)->info = structure;
729     } else {
730       GST_WARNING_OBJECT (comm->element, "Unsupported meta: %s",
731           g_type_name (api));
732       if (structure)
733         gst_structure_free (structure);
734     }
735 
736 #undef READ_FIELD
737 
738   }
739 
740   gst_adapter_unmap (comm->adapter);
741   gst_adapter_flush (comm->adapter, mapped_size);
742 
743   return buffer;
744 }
745 
746 static gboolean
gst_ipc_pipeline_comm_write_sink_message_event_to_fd(GstIpcPipelineComm * comm,GstEvent * event)747 gst_ipc_pipeline_comm_write_sink_message_event_to_fd (GstIpcPipelineComm * comm,
748     GstEvent * event)
749 {
750   const unsigned char payload_type =
751       GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT;
752   gboolean ret;
753   guint32 type, size, eseqnum, mseqnum, ret32 = TRUE, slen, structure_slen;
754   char *str = NULL;
755   const GstStructure *structure;
756   GstMessage *message = NULL;
757   const char *name;
758   GstByteWriter bw;
759 
760   g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE,
761       FALSE);
762 
763   g_mutex_lock (&comm->mutex);
764   ++comm->send_id;
765 
766   GST_TRACE_OBJECT (comm->element,
767       "Writing sink message event %u: %" GST_PTR_FORMAT, comm->send_id, event);
768 
769   gst_byte_writer_init (&bw);
770   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
771     goto write_failed;
772   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
773     goto write_failed;
774   name = gst_structure_get_name (gst_event_get_structure (event));
775   slen = strlen (name) + 1;
776   gst_event_parse_sink_message (event, &message);
777   structure = gst_message_get_structure (message);
778   if (structure) {
779     str = gst_structure_to_string (structure);
780     structure_slen = strlen (str);
781   } else {
782     str = NULL;
783     structure_slen = 0;
784   }
785   size = sizeof (type) + sizeof (eseqnum) + sizeof (mseqnum) + sizeof (slen) +
786       strlen (name) + 1 + structure_slen + 1;
787   if (!gst_byte_writer_put_uint32_le (&bw, size))
788     goto write_failed;
789 
790   type = GST_MESSAGE_TYPE (message);
791   if (!gst_byte_writer_put_uint32_le (&bw, type))
792     goto write_failed;
793   size -= sizeof (type);
794 
795   eseqnum = GST_EVENT_SEQNUM (event);
796   if (!gst_byte_writer_put_uint32_le (&bw, eseqnum))
797     goto write_failed;
798   size -= sizeof (eseqnum);
799 
800   mseqnum = GST_MESSAGE_SEQNUM (message);
801   if (!gst_byte_writer_put_uint32_le (&bw, mseqnum))
802     goto write_failed;
803   size -= sizeof (mseqnum);
804 
805   if (!gst_byte_writer_put_uint32_le (&bw, slen))
806     goto write_failed;
807   size -= sizeof (slen);
808 
809   if (!gst_byte_writer_put_data (&bw, (const guint8 *) name, slen))
810     goto write_failed;
811   size -= slen;
812 
813   if (str) {
814     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
815       goto write_failed;
816   } else {
817     if (!gst_byte_writer_put_uint8 (&bw, 0))
818       goto write_failed;
819   }
820 
821   if (!write_byte_writer_to_fd (comm, &bw))
822     goto write_failed;
823 
824   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
825           GST_EVENT_IS_SERIALIZED (event) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
826           COMM_REQUEST_TYPE_EVENT))
827     goto write_failed;
828 
829   ret = ret32;
830 
831 done:
832   g_mutex_unlock (&comm->mutex);
833   gst_byte_writer_reset (&bw);
834   g_free (str);
835   if (message)
836     gst_message_unref (message);
837   return ret;
838 
839 write_failed:
840   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
841       ("Failed to write to socket"));
842   ret = FALSE;
843   goto done;
844 }
845 
846 static GstEvent *
gst_ipc_pipeline_comm_read_sink_message_event(GstIpcPipelineComm * comm,guint32 size)847 gst_ipc_pipeline_comm_read_sink_message_event (GstIpcPipelineComm * comm,
848     guint32 size)
849 {
850   GstMessage *message;
851   GstEvent *event = NULL;
852   gchar *end = NULL;
853   GstStructure *structure;
854   guint32 type, eseqnum, mseqnum, slen;
855   const char *name;
856   guint32 mapped_size = size;
857   const guint8 *payload;
858 
859   /* this should not be called if we don't have enough yet */
860   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
861   g_return_val_if_fail (size >= sizeof (type) + sizeof (slen), NULL);
862 
863   payload = gst_adapter_map (comm->adapter, mapped_size);
864   if (!payload)
865     return NULL;
866   memcpy (&type, payload, sizeof (type));
867   payload += sizeof (type);
868   size -= sizeof (type);
869   if (size == 0)
870     goto done;
871 
872   memcpy (&eseqnum, payload, sizeof (eseqnum));
873   payload += sizeof (eseqnum);
874   size -= sizeof (eseqnum);
875   if (size == 0)
876     goto done;
877 
878   memcpy (&mseqnum, payload, sizeof (mseqnum));
879   payload += sizeof (mseqnum);
880   size -= sizeof (mseqnum);
881   if (size == 0)
882     goto done;
883 
884   memcpy (&slen, payload, sizeof (slen));
885   payload += sizeof (slen);
886   size -= sizeof (slen);
887   if (size == 0)
888     goto done;
889 
890   if (payload[slen - 1])
891     goto done;
892   name = (const char *) payload;
893   payload += slen;
894   size -= slen;
895 
896   if ((payload)[size - 1]) {
897     goto done;
898   }
899   if (*payload) {
900     structure = gst_structure_from_string ((const char *) payload, &end);
901   } else {
902     structure = NULL;
903   }
904 
905   message =
906       gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
907   gst_message_set_seqnum (message, mseqnum);
908   event = gst_event_new_sink_message (name, message);
909   gst_event_set_seqnum (event, eseqnum);
910   gst_message_unref (message);
911 
912 done:
913   gst_adapter_unmap (comm->adapter);
914   gst_adapter_flush (comm->adapter, mapped_size);
915   return event;
916 }
917 
918 gboolean
gst_ipc_pipeline_comm_write_event_to_fd(GstIpcPipelineComm * comm,gboolean upstream,GstEvent * event)919 gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm,
920     gboolean upstream, GstEvent * event)
921 {
922   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT;
923   gboolean ret;
924   guint32 type, size, ret32 = TRUE, seqnum, slen;
925   char *str = NULL;
926   const GstStructure *structure;
927   GstByteWriter bw;
928 
929   /* we special case sink-message event as gst can't serialize/de-serialize it */
930   if (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE)
931     return gst_ipc_pipeline_comm_write_sink_message_event_to_fd (comm, event);
932 
933   g_mutex_lock (&comm->mutex);
934   ++comm->send_id;
935 
936   GST_TRACE_OBJECT (comm->element, "Writing event %u: %" GST_PTR_FORMAT,
937       comm->send_id, event);
938 
939   gst_byte_writer_init (&bw);
940   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
941     goto write_failed;
942   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
943     goto write_failed;
944   structure = gst_event_get_structure (event);
945   if (structure) {
946 
947     if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) {
948       GstStructure *s = gst_structure_copy (structure);
949       gst_structure_remove_field (s, "stream");
950       str = gst_structure_to_string (s);
951       gst_structure_free (s);
952     } else {
953       str = gst_structure_to_string (structure);
954     }
955 
956     slen = strlen (str);
957   } else {
958     str = NULL;
959     slen = 0;
960   }
961   size = sizeof (type) + sizeof (seqnum) + 1 + slen + 1;
962   if (!gst_byte_writer_put_uint32_le (&bw, size))
963     goto write_failed;
964 
965   type = GST_EVENT_TYPE (event);
966   if (!gst_byte_writer_put_uint32_le (&bw, type))
967     goto write_failed;
968 
969   seqnum = GST_EVENT_SEQNUM (event);
970   if (!gst_byte_writer_put_uint32_le (&bw, seqnum))
971     goto write_failed;
972 
973   if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
974     goto write_failed;
975 
976   if (str) {
977     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
978       goto write_failed;
979   } else {
980     if (!gst_byte_writer_put_uint8 (&bw, 0))
981       goto write_failed;
982   }
983 
984   if (!write_byte_writer_to_fd (comm, &bw))
985     goto write_failed;
986 
987   /* Upstream events get serialized, this is required to send seeks only
988    * one at a time. */
989   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
990           (GST_EVENT_IS_SERIALIZED (event) || GST_EVENT_IS_UPSTREAM (event)) ?
991           ACK_TYPE_BLOCKING : ACK_TYPE_NONE, COMM_REQUEST_TYPE_EVENT))
992     goto write_failed;
993   ret = ret32;
994 
995 done:
996   g_mutex_unlock (&comm->mutex);
997   g_free (str);
998   gst_byte_writer_reset (&bw);
999   return ret;
1000 
1001 write_failed:
1002   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1003       ("Failed to write to socket"));
1004   ret = FALSE;
1005   goto done;
1006 }
1007 
1008 static GstEvent *
gst_ipc_pipeline_comm_read_event(GstIpcPipelineComm * comm,guint32 size,gboolean * upstream)1009 gst_ipc_pipeline_comm_read_event (GstIpcPipelineComm * comm, guint32 size,
1010     gboolean * upstream)
1011 {
1012   GstEvent *event = NULL;
1013   gchar *end = NULL;
1014   GstStructure *structure;
1015   guint32 type, seqnum;
1016   guint32 mapped_size = size;
1017   const guint8 *payload;
1018 
1019   /* this should not be called if we don't have enough yet */
1020   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1021   g_return_val_if_fail (size >= sizeof (type), NULL);
1022 
1023   payload = gst_adapter_map (comm->adapter, mapped_size);
1024   if (!payload)
1025     return NULL;
1026 
1027   memcpy (&type, payload, sizeof (type));
1028   payload += sizeof (type);
1029   size -= sizeof (type);
1030   if (size == 0)
1031     goto done;
1032 
1033   memcpy (&seqnum, payload, sizeof (seqnum));
1034   payload += sizeof (seqnum);
1035   size -= sizeof (seqnum);
1036   if (size == 0)
1037     goto done;
1038 
1039   *upstream = (*payload) ? TRUE : FALSE;
1040   payload += 1;
1041   size -= 1;
1042   if (size == 0)
1043     goto done;
1044 
1045   if (payload[size - 1])
1046     goto done;
1047   if (*payload) {
1048     structure = gst_structure_from_string ((const char *) payload, &end);
1049   } else {
1050     structure = NULL;
1051   }
1052 
1053   event = gst_event_new_custom (type, structure);
1054   gst_event_set_seqnum (event, seqnum);
1055 
1056 done:
1057   gst_adapter_unmap (comm->adapter);
1058   gst_adapter_flush (comm->adapter, mapped_size);
1059   return event;
1060 }
1061 
1062 gboolean
gst_ipc_pipeline_comm_write_query_to_fd(GstIpcPipelineComm * comm,gboolean upstream,GstQuery * query)1063 gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm,
1064     gboolean upstream, GstQuery * query)
1065 {
1066   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY;
1067   gboolean ret;
1068   guint32 type, size, ret32 = TRUE, slen;
1069   char *str = NULL;
1070   const GstStructure *structure;
1071   GstByteWriter bw;
1072 
1073   g_mutex_lock (&comm->mutex);
1074   ++comm->send_id;
1075 
1076   GST_TRACE_OBJECT (comm->element, "Writing query %u: %" GST_PTR_FORMAT,
1077       comm->send_id, query);
1078 
1079   gst_byte_writer_init (&bw);
1080   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1081     goto write_failed;
1082   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1083     goto write_failed;
1084   structure = gst_query_get_structure (query);
1085   if (structure) {
1086     str = gst_structure_to_string (structure);
1087     slen = strlen (str);
1088   } else {
1089     str = NULL;
1090     slen = 0;
1091   }
1092   size = sizeof (type) + 1 + slen + 1;
1093   if (!gst_byte_writer_put_uint32_le (&bw, size))
1094     goto write_failed;
1095 
1096   type = GST_QUERY_TYPE (query);
1097   if (!gst_byte_writer_put_uint32_le (&bw, type))
1098     goto write_failed;
1099 
1100   if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
1101     goto write_failed;
1102 
1103   if (str) {
1104     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
1105       goto write_failed;
1106   } else {
1107     if (!gst_byte_writer_put_uint8 (&bw, 0))
1108       goto write_failed;
1109   }
1110 
1111   if (!write_byte_writer_to_fd (comm, &bw))
1112     goto write_failed;
1113 
1114   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, query, &ret32,
1115           GST_QUERY_IS_SERIALIZED (query) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
1116           COMM_REQUEST_TYPE_QUERY))
1117     goto write_failed;
1118 
1119   ret = ret32;
1120 
1121 done:
1122   g_mutex_unlock (&comm->mutex);
1123   g_free (str);
1124   gst_byte_writer_reset (&bw);
1125   return ret;
1126 
1127 write_failed:
1128   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1129       ("Failed to write to socket"));
1130   ret = FALSE;
1131   goto done;
1132 }
1133 
1134 static GstQuery *
gst_ipc_pipeline_comm_read_query(GstIpcPipelineComm * comm,guint32 size,gboolean * upstream)1135 gst_ipc_pipeline_comm_read_query (GstIpcPipelineComm * comm, guint32 size,
1136     gboolean * upstream)
1137 {
1138   GstQuery *query = NULL;
1139   gchar *end = NULL;
1140   GstStructure *structure;
1141   guint32 type;
1142   guint32 mapped_size = size;
1143   const guint8 *payload;
1144 
1145   /* this should not be called if we don't have enough yet */
1146   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1147   g_return_val_if_fail (size >= sizeof (type), NULL);
1148 
1149   payload = gst_adapter_map (comm->adapter, mapped_size);
1150   if (!payload)
1151     return NULL;
1152 
1153   memcpy (&type, payload, sizeof (type));
1154   payload += sizeof (type);
1155   size -= sizeof (type);
1156   if (size == 0)
1157     goto done;
1158 
1159   *upstream = (*payload) ? TRUE : FALSE;
1160   payload += 1;
1161   size -= 1;
1162   if (size == 0)
1163     goto done;
1164 
1165   if (payload[size - 1])
1166     goto done;
1167   if (*payload) {
1168     structure = gst_structure_from_string ((const char *) payload, &end);
1169   } else {
1170     structure = NULL;
1171   }
1172 
1173   query = gst_query_new_custom (type, structure);
1174 
1175   /* CAPS queries contain a filter field, of GstCaps type, which can be NULL.
1176      This does not play well with the serialization/deserialization system,
1177      which will give us a non-NULL GstCaps which has a value of NULL. This
1178      in turn wreaks havoc with any code that tests whether filter is NULL
1179      (which basically means, am I being given an optional GstCaps ?).
1180      So we look for non-NULL GstCaps which have NULL contents, and replace
1181      them with NULL instead. */
1182   if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) {
1183     GstCaps *filter;
1184     gst_query_parse_caps (query, &filter);
1185     if (filter
1186         && !strcmp (gst_structure_get_name (gst_caps_get_structure (filter, 0)),
1187             "NULL")) {
1188       gst_query_unref (query);
1189       query = gst_query_new_caps (NULL);
1190     }
1191   }
1192 
1193 done:
1194   gst_adapter_unmap (comm->adapter);
1195   gst_adapter_flush (comm->adapter, mapped_size);
1196   return query;
1197 }
1198 
1199 GstStateChangeReturn
gst_ipc_pipeline_comm_write_state_change_to_fd(GstIpcPipelineComm * comm,GstStateChange transition)1200 gst_ipc_pipeline_comm_write_state_change_to_fd (GstIpcPipelineComm * comm,
1201     GstStateChange transition)
1202 {
1203   const unsigned char payload_type =
1204       GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE;
1205   GstStateChangeReturn ret;
1206   guint32 size, ret32 = GST_STATE_CHANGE_SUCCESS;
1207   GstByteWriter bw;
1208 
1209   g_mutex_lock (&comm->mutex);
1210   ++comm->send_id;
1211 
1212   GST_TRACE_OBJECT (comm->element, "Writing state change %u: %s -> %s",
1213       comm->send_id,
1214       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
1215       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
1216 
1217   gst_byte_writer_init (&bw);
1218   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1219     goto write_failed;
1220   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1221     goto write_failed;
1222   size = sizeof (transition);
1223   if (!gst_byte_writer_put_uint32_le (&bw, size))
1224     goto write_failed;
1225   if (!gst_byte_writer_put_uint32_le (&bw, transition))
1226     goto write_failed;
1227 
1228   if (!write_byte_writer_to_fd (comm, &bw))
1229     goto write_failed;
1230 
1231   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1232           ACK_TYPE_TIMED, COMM_REQUEST_TYPE_STATE_CHANGE))
1233     goto write_failed;
1234   ret = ret32;
1235 
1236 done:
1237   g_mutex_unlock (&comm->mutex);
1238   gst_byte_writer_reset (&bw);
1239   return ret;
1240 
1241 write_failed:
1242   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1243       ("Failed to write to socket"));
1244   ret = GST_STATE_CHANGE_FAILURE;
1245   goto done;
1246 }
1247 
1248 static gboolean
is_valid_state_change(GstStateChange transition)1249 is_valid_state_change (GstStateChange transition)
1250 {
1251   if (transition == GST_STATE_CHANGE_NULL_TO_READY)
1252     return TRUE;
1253   if (transition == GST_STATE_CHANGE_READY_TO_PAUSED)
1254     return TRUE;
1255   if (transition == GST_STATE_CHANGE_PAUSED_TO_PLAYING)
1256     return TRUE;
1257   if (transition == GST_STATE_CHANGE_PLAYING_TO_PAUSED)
1258     return TRUE;
1259   if (transition == GST_STATE_CHANGE_PAUSED_TO_READY)
1260     return TRUE;
1261   if (transition == GST_STATE_CHANGE_READY_TO_NULL)
1262     return TRUE;
1263   if (GST_STATE_TRANSITION_CURRENT (transition) ==
1264       GST_STATE_TRANSITION_NEXT (transition))
1265     return TRUE;
1266   return FALSE;
1267 }
1268 
1269 static gboolean
gst_ipc_pipeline_comm_read_state_change(GstIpcPipelineComm * comm,guint32 size,guint32 * transition)1270 gst_ipc_pipeline_comm_read_state_change (GstIpcPipelineComm * comm,
1271     guint32 size, guint32 * transition)
1272 {
1273   guint32 mapped_size = size;
1274   const guint8 *payload;
1275 
1276   /* this should not be called if we don't have enough yet */
1277   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
1278   g_return_val_if_fail (size >= sizeof (*transition), FALSE);
1279 
1280   payload = gst_adapter_map (comm->adapter, size);
1281   if (!payload)
1282     return FALSE;
1283   memcpy (transition, payload, sizeof (*transition));
1284   gst_adapter_unmap (comm->adapter);
1285   gst_adapter_flush (comm->adapter, mapped_size);
1286   return is_valid_state_change (*transition);
1287 }
1288 
1289 void
gst_ipc_pipeline_comm_write_state_lost_to_fd(GstIpcPipelineComm * comm)1290 gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm)
1291 {
1292   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST;
1293   guint32 size;
1294   GstByteWriter bw;
1295 
1296   g_mutex_lock (&comm->mutex);
1297   ++comm->send_id;
1298 
1299   GST_TRACE_OBJECT (comm->element, "Writing state-lost %u", comm->send_id);
1300   gst_byte_writer_init (&bw);
1301   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1302     goto write_failed;
1303   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1304     goto write_failed;
1305   size = 0;
1306   if (!gst_byte_writer_put_uint32_le (&bw, size))
1307     goto write_failed;
1308 
1309   if (!write_byte_writer_to_fd (comm, &bw))
1310     goto write_failed;
1311 
1312 done:
1313   g_mutex_unlock (&comm->mutex);
1314   gst_byte_writer_reset (&bw);
1315   return;
1316 
1317 write_failed:
1318   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1319       ("Failed to write to socket"));
1320   goto done;
1321 }
1322 
1323 static gboolean
gst_ipc_pipeline_comm_read_state_lost(GstIpcPipelineComm * comm,guint32 size)1324 gst_ipc_pipeline_comm_read_state_lost (GstIpcPipelineComm * comm, guint32 size)
1325 {
1326   /* no payload */
1327   return TRUE;
1328 }
1329 
1330 static gboolean
gst_ipc_pipeline_comm_write_gerror_message_to_fd(GstIpcPipelineComm * comm,GstMessage * message)1331 gst_ipc_pipeline_comm_write_gerror_message_to_fd (GstIpcPipelineComm * comm,
1332     GstMessage * message)
1333 {
1334   const unsigned char payload_type =
1335       GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE;
1336   gboolean ret;
1337   guint32 code, size, ret32 = TRUE;
1338   char *str = NULL;
1339   GError *error;
1340   char *extra_message;
1341   const char *domain_string;
1342   unsigned char msgtype;
1343   GstByteWriter bw;
1344 
1345   g_mutex_lock (&comm->mutex);
1346   ++comm->send_id;
1347 
1348   if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) {
1349     gst_message_parse_error (message, &error, &extra_message);
1350     msgtype = 2;
1351   } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) {
1352     gst_message_parse_warning (message, &error, &extra_message);
1353     msgtype = 1;
1354   } else {
1355     gst_message_parse_info (message, &error, &extra_message);
1356     msgtype = 0;
1357   }
1358   code = error->code;
1359   domain_string = g_quark_to_string (error->domain);
1360   GST_TRACE_OBJECT (comm->element,
1361       "Writing error %u: domain %s, code %u, message %s, extra message %s",
1362       comm->send_id, domain_string, error->code, error->message, extra_message);
1363 
1364   gst_byte_writer_init (&bw);
1365   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1366     goto write_failed;
1367   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1368     goto write_failed;
1369 
1370   size = sizeof (size);
1371   size += 1;
1372   size += strlen (domain_string) + 1;
1373   size += sizeof (code);
1374   size += sizeof (size);
1375   size += error->message ? strlen (error->message) + 1 : 0;
1376   size += sizeof (size);
1377   size += extra_message ? strlen (extra_message) + 1 : 0;
1378 
1379   if (!gst_byte_writer_put_uint32_le (&bw, size))
1380     goto write_failed;
1381 
1382   if (!gst_byte_writer_put_uint8 (&bw, msgtype))
1383     goto write_failed;
1384   size = strlen (domain_string) + 1;
1385   if (!gst_byte_writer_put_uint32_le (&bw, size))
1386     goto write_failed;
1387   if (!gst_byte_writer_put_data (&bw, (const guint8 *) domain_string, size))
1388     goto write_failed;
1389   if (!gst_byte_writer_put_uint32_le (&bw, code))
1390     goto write_failed;
1391   size = error->message ? strlen (error->message) + 1 : 0;
1392   if (!gst_byte_writer_put_uint32_le (&bw, size))
1393     goto write_failed;
1394   if (error->message) {
1395     if (!gst_byte_writer_put_data (&bw, (const guint8 *) error->message, size))
1396       goto write_failed;
1397   }
1398   size = extra_message ? strlen (extra_message) + 1 : 0;
1399   if (!gst_byte_writer_put_uint32_le (&bw, size))
1400     goto write_failed;
1401   if (extra_message) {
1402     if (!gst_byte_writer_put_data (&bw, (const guint8 *) extra_message, size))
1403       goto write_failed;
1404   }
1405 
1406   if (!write_byte_writer_to_fd (comm, &bw))
1407     goto write_failed;
1408 
1409   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1410           ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
1411     goto write_failed;
1412 
1413   ret = ret32;
1414 
1415 done:
1416   g_mutex_unlock (&comm->mutex);
1417   g_free (str);
1418   g_error_free (error);
1419   g_free (extra_message);
1420   gst_byte_writer_reset (&bw);
1421   return ret;
1422 
1423 write_failed:
1424   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1425       ("Failed to write to socket"));
1426   ret = FALSE;
1427   goto done;
1428 }
1429 
1430 static GstMessage *
gst_ipc_pipeline_comm_read_gerror_message(GstIpcPipelineComm * comm,guint32 size)1431 gst_ipc_pipeline_comm_read_gerror_message (GstIpcPipelineComm * comm,
1432     guint32 size)
1433 {
1434   GstMessage *message = NULL;
1435   guint32 code;
1436   GQuark domain;
1437   const char *msg, *extra_message;
1438   GError *error;
1439   unsigned char msgtype;
1440   guint32 mapped_size = size;
1441   const guint8 *payload;
1442 
1443   /* this should not be called if we don't have enough yet */
1444   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1445   g_return_val_if_fail (size >= sizeof (code) + sizeof (size) * 3 + 1 + 1,
1446       NULL);
1447 
1448   payload = gst_adapter_map (comm->adapter, mapped_size);
1449   if (!payload)
1450     return NULL;
1451   msgtype = *payload++;
1452   memcpy (&size, payload, sizeof (size));
1453   payload += sizeof (size);
1454   if (payload[size - 1])
1455     goto done;
1456   domain = g_quark_from_string ((const char *) payload);
1457   payload += size;
1458 
1459   memcpy (&code, payload, sizeof (code));
1460   payload += sizeof (code);
1461 
1462   memcpy (&size, payload, sizeof (size));
1463   payload += sizeof (size);
1464   if (size) {
1465     if (payload[size - 1])
1466       goto done;
1467     msg = (const char *) payload;
1468   } else {
1469     msg = NULL;
1470   }
1471   payload += size;
1472 
1473   memcpy (&size, payload, sizeof (size));
1474   payload += sizeof (size);
1475   if (size) {
1476     if (payload[size - 1])
1477       goto done;
1478     extra_message = (const char *) payload;
1479   } else {
1480     extra_message = NULL;
1481   }
1482   payload += size;
1483 
1484   error = g_error_new (domain, code, "%s", msg);
1485   if (msgtype == 2)
1486     message =
1487         gst_message_new_error (GST_OBJECT (comm->element), error,
1488         extra_message);
1489   else if (msgtype == 1)
1490     message =
1491         gst_message_new_warning (GST_OBJECT (comm->element), error,
1492         extra_message);
1493   else
1494     message =
1495         gst_message_new_info (GST_OBJECT (comm->element), error, extra_message);
1496   g_error_free (error);
1497 
1498 done:
1499   gst_adapter_unmap (comm->adapter);
1500   gst_adapter_flush (comm->adapter, mapped_size);
1501 
1502   return message;
1503 }
1504 
1505 gboolean
gst_ipc_pipeline_comm_write_message_to_fd(GstIpcPipelineComm * comm,GstMessage * message)1506 gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm,
1507     GstMessage * message)
1508 {
1509   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE;
1510   gboolean ret;
1511   guint32 type, size, ret32 = TRUE, slen;
1512   char *str = NULL;
1513   const GstStructure *structure;
1514   GstByteWriter bw;
1515 
1516   /* we special case error as gst can't serialize/de-serialize it */
1517   if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR
1518       || GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING
1519       || GST_MESSAGE_TYPE (message) == GST_MESSAGE_INFO)
1520     return gst_ipc_pipeline_comm_write_gerror_message_to_fd (comm, message);
1521 
1522   g_mutex_lock (&comm->mutex);
1523   ++comm->send_id;
1524 
1525   GST_TRACE_OBJECT (comm->element, "Writing message %u: %" GST_PTR_FORMAT,
1526       comm->send_id, message);
1527 
1528   gst_byte_writer_init (&bw);
1529   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1530     goto write_failed;
1531   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1532     goto write_failed;
1533   structure = gst_message_get_structure (message);
1534   if (structure) {
1535     str = gst_structure_to_string (structure);
1536     slen = strlen (str);
1537   } else {
1538     str = NULL;
1539     slen = 0;
1540   }
1541   size = sizeof (type) + slen + 1;
1542   if (!gst_byte_writer_put_uint32_le (&bw, size))
1543     goto write_failed;
1544 
1545   type = GST_MESSAGE_TYPE (message);
1546   if (!gst_byte_writer_put_uint32_le (&bw, type))
1547     goto write_failed;
1548   size -= sizeof (type);
1549   if (str) {
1550     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
1551       goto write_failed;
1552   } else {
1553     if (!gst_byte_writer_put_uint8 (&bw, 0))
1554       goto write_failed;
1555   }
1556 
1557   if (!write_byte_writer_to_fd (comm, &bw))
1558     goto write_failed;
1559 
1560   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1561           ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
1562     goto write_failed;
1563 
1564   ret = ret32;
1565 
1566 done:
1567   g_mutex_unlock (&comm->mutex);
1568   g_free (str);
1569   gst_byte_writer_reset (&bw);
1570   return ret;
1571 
1572 write_failed:
1573   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1574       ("Failed to write to socket"));
1575   ret = FALSE;
1576   goto done;
1577 }
1578 
1579 static GstMessage *
gst_ipc_pipeline_comm_read_message(GstIpcPipelineComm * comm,guint32 size)1580 gst_ipc_pipeline_comm_read_message (GstIpcPipelineComm * comm, guint32 size)
1581 {
1582   GstMessage *message = NULL;
1583   gchar *end = NULL;
1584   GstStructure *structure;
1585   guint32 type;
1586   guint32 mapped_size = size;
1587   const guint8 *payload;
1588 
1589   /* this should not be called if we don't have enough yet */
1590   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1591   g_return_val_if_fail (size >= sizeof (type), NULL);
1592 
1593   payload = gst_adapter_map (comm->adapter, mapped_size);
1594   if (!payload)
1595     return NULL;
1596   memcpy (&type, payload, sizeof (type));
1597   payload += sizeof (type);
1598   size -= sizeof (type);
1599   if (size == 0)
1600     goto done;
1601 
1602   if (payload[size - 1])
1603     goto done;
1604   if (*payload) {
1605     structure = gst_structure_from_string ((const char *) payload, &end);
1606   } else {
1607     structure = NULL;
1608   }
1609 
1610   message =
1611       gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
1612 
1613 done:
1614   gst_adapter_unmap (comm->adapter);
1615   gst_adapter_flush (comm->adapter, mapped_size);
1616 
1617   return message;
1618 }
1619 
1620 void
gst_ipc_pipeline_comm_init(GstIpcPipelineComm * comm,GstElement * element)1621 gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element)
1622 {
1623   g_mutex_init (&comm->mutex);
1624   comm->element = element;
1625   comm->fdin = comm->fdout = -1;
1626   comm->ack_time = DEFAULT_ACK_TIME;
1627   comm->waiting_ids =
1628       g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
1629       (GDestroyNotify) comm_request_free);
1630   comm->adapter = gst_adapter_new ();
1631   comm->poll = gst_poll_new (TRUE);
1632   gst_poll_fd_init (&comm->pollFDin);
1633 }
1634 
1635 void
gst_ipc_pipeline_comm_clear(GstIpcPipelineComm * comm)1636 gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm)
1637 {
1638   g_hash_table_destroy (comm->waiting_ids);
1639   gst_object_unref (comm->adapter);
1640   gst_poll_free (comm->poll);
1641   g_mutex_clear (&comm->mutex);
1642 }
1643 
1644 static void
cancel_request(gpointer key,gpointer value,gpointer user_data,GstFlowReturn fret)1645 cancel_request (gpointer key, gpointer value, gpointer user_data,
1646     GstFlowReturn fret)
1647 {
1648   GstIpcPipelineComm *comm = (GstIpcPipelineComm *) user_data;
1649   guint32 id = GPOINTER_TO_INT (key);
1650   CommRequest *req = (CommRequest *) value;
1651 
1652   GST_TRACE_OBJECT (comm->element, "Cancelling request %u, type %d", id,
1653       req->type);
1654   req->ret = fret;
1655   req->replied = TRUE;
1656   g_cond_signal (&req->cond);
1657 }
1658 
1659 static void
cancel_request_error(gpointer key,gpointer value,gpointer user_data)1660 cancel_request_error (gpointer key, gpointer value, gpointer user_data)
1661 {
1662   CommRequest *req = (CommRequest *) value;
1663   GstFlowReturn fret = comm_request_ret_get_failure_value (req->type);
1664 
1665   cancel_request (key, value, user_data, fret);
1666 }
1667 
1668 void
gst_ipc_pipeline_comm_cancel(GstIpcPipelineComm * comm,gboolean cleanup)1669 gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, gboolean cleanup)
1670 {
1671   g_mutex_lock (&comm->mutex);
1672   g_hash_table_foreach (comm->waiting_ids, cancel_request_error, comm);
1673   if (cleanup) {
1674     g_hash_table_unref (comm->waiting_ids);
1675     comm->waiting_ids =
1676         g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
1677         (GDestroyNotify) comm_request_free);
1678   }
1679   g_mutex_unlock (&comm->mutex);
1680 }
1681 
1682 static gboolean
set_field(GQuark field_id,const GValue * value,gpointer user_data)1683 set_field (GQuark field_id, const GValue * value, gpointer user_data)
1684 {
1685   GstStructure *structure = user_data;
1686 
1687   gst_structure_id_set_value (structure, field_id, value);
1688 
1689   return TRUE;
1690 }
1691 
1692 static gboolean
gst_ipc_pipeline_comm_reply_request(GstIpcPipelineComm * comm,guint32 id,GstFlowReturn ret,GstQuery * query)1693 gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id,
1694     GstFlowReturn ret, GstQuery * query)
1695 {
1696   CommRequest *req;
1697 
1698   req = g_hash_table_lookup (comm->waiting_ids, GINT_TO_POINTER (id));
1699   if (!req) {
1700     GST_WARNING_OBJECT (comm->element, "Got reply for unknown request %u", id);
1701     return FALSE;
1702   }
1703 
1704   GST_TRACE_OBJECT (comm->element, "Got reply %d (%s) for request %u", ret,
1705       comm_request_ret_get_name (req->type, ret), req->id);
1706   req->replied = TRUE;
1707   req->ret = ret;
1708   if (query) {
1709     if (req->query) {
1710       /* We need to update the original query in place, as the caller
1711          will expect the object to be the same */
1712       GstStructure *structure = gst_query_writable_structure (req->query);
1713       gst_structure_remove_all_fields (structure);
1714       gst_structure_foreach (gst_query_get_structure (query), set_field,
1715           structure);
1716     } else {
1717       GST_WARNING_OBJECT (comm->element,
1718           "Got query reply, but no query was in the request");
1719     }
1720   }
1721   g_cond_signal (&req->cond);
1722   return TRUE;
1723 }
1724 
1725 static gint
update_adapter(GstIpcPipelineComm * comm)1726 update_adapter (GstIpcPipelineComm * comm)
1727 {
1728   GstMemory *mem = NULL;
1729   GstBuffer *buf;
1730   GstMapInfo map;
1731   ssize_t sz;
1732   gint ret = 0;
1733 
1734 again:
1735   /* update pollFDin if necessary (fdin changed or we lost our parent).
1736    * we do not allow a parent-less element to communicate with its peer
1737    * in order to avoid race conditions where the slave tries to change
1738    * the state of its parent pipeline while it is not yet added in that
1739    * pipeline. */
1740   if (comm->pollFDin.fd != comm->fdin || !GST_OBJECT_PARENT (comm->element)) {
1741     if (comm->pollFDin.fd != -1) {
1742       GST_DEBUG_OBJECT (comm->element, "Stop watching fd %d",
1743           comm->pollFDin.fd);
1744       gst_poll_remove_fd (comm->poll, &comm->pollFDin);
1745       gst_poll_fd_init (&comm->pollFDin);
1746     }
1747     if (comm->fdin != -1 && GST_OBJECT_PARENT (comm->element)) {
1748       GST_DEBUG_OBJECT (comm->element, "Start watching fd %d", comm->fdin);
1749       comm->pollFDin.fd = comm->fdin;
1750       gst_poll_add_fd (comm->poll, &comm->pollFDin);
1751       gst_poll_fd_ctl_read (comm->poll, &comm->pollFDin, TRUE);
1752     }
1753   }
1754 
1755   /* wait for activity on fdin or a flush */
1756   if (gst_poll_wait (comm->poll, 100 * GST_MSECOND) < 0) {
1757     if (errno == EAGAIN)
1758       goto again;
1759     /* error out, unless interrupted or flushing */
1760     if (errno != EINTR)
1761       ret = (errno == EBUSY) ? 2 : 1;
1762   }
1763 
1764   /* read from fdin if possible and push data to our adapter */
1765   if (comm->pollFDin.fd >= 0
1766       && gst_poll_fd_can_read (comm->poll, &comm->pollFDin)) {
1767     if (!mem)
1768       mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL);
1769 
1770     gst_memory_map (mem, &map, GST_MAP_WRITE);
1771 #ifdef _MSC_VER
1772     sz = recv (comm->pollFDin.fd, map.data, map.size, 0);
1773     if (sz < 0) {
1774       int last_error = WSAGetLastError ();
1775       if (last_error == WSAEWOULDBLOCK) {
1776         errno = EAGAIN;
1777       } else {
1778         errno = last_error;
1779       }
1780     }
1781 #else
1782     sz = read (comm->pollFDin.fd, map.data, map.size);
1783 #endif
1784     gst_memory_unmap (mem, &map);
1785 
1786     if (sz <= 0) {
1787       if (errno == EAGAIN)
1788         goto again;
1789       /* error out, unless interrupted */
1790       if (errno != EINTR)
1791         ret = 1;
1792     } else {
1793       gst_memory_resize (mem, 0, sz);
1794       buf = gst_buffer_new ();
1795       gst_buffer_append_memory (buf, mem);
1796       mem = NULL;
1797       GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
1798       gst_adapter_push (comm->adapter, buf);
1799     }
1800   }
1801 
1802   if (mem)
1803     gst_memory_unref (mem);
1804 
1805   return ret;
1806 }
1807 
1808 static gboolean
read_many(GstIpcPipelineComm * comm)1809 read_many (GstIpcPipelineComm * comm)
1810 {
1811   gboolean ret = TRUE;
1812   gsize available;
1813   const guint8 *payload;
1814 
1815   while (1)
1816     switch (comm->state) {
1817       case GST_IPC_PIPELINE_COMM_STATE_TYPE:
1818       {
1819         guint8 type;
1820         guint32 mapped_size;
1821 
1822         available = gst_adapter_available (comm->adapter);
1823         mapped_size = 1 + sizeof (gint32) * 2;
1824         if (available < mapped_size)
1825           goto done;
1826 
1827         payload = gst_adapter_map (comm->adapter, mapped_size);
1828         type = *payload++;
1829         g_mutex_lock (&comm->mutex);
1830         memcpy (&comm->id, payload, sizeof (guint32));
1831         memcpy (&comm->payload_length, payload + 4, sizeof (guint32));
1832         g_mutex_unlock (&comm->mutex);
1833         gst_adapter_unmap (comm->adapter);
1834         gst_adapter_flush (comm->adapter, mapped_size);
1835         GST_TRACE_OBJECT (comm->element, "Got id %u, type %d, payload %u",
1836             comm->id, type, comm->payload_length);
1837         switch (type) {
1838           case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
1839           case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
1840           case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
1841           case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
1842           case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
1843           case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
1844           case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
1845           case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
1846           case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
1847           case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
1848             GST_TRACE_OBJECT (comm->element, "switching to state %s",
1849                 gst_ipc_pipeline_comm_data_type_get_name (type));
1850             comm->state = type;
1851             break;
1852           default:
1853             goto out_of_sync;
1854         }
1855         break;
1856       }
1857       case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
1858       {
1859         const guint8 *rets;
1860         guint32 ret32;
1861 
1862         available = gst_adapter_available (comm->adapter);
1863         if (available < comm->payload_length)
1864           goto done;
1865 
1866         if (available < sizeof (guint32))
1867           goto ack_failed;
1868 
1869         rets = gst_adapter_map (comm->adapter, sizeof (guint32));
1870         memcpy (&ret32, rets, sizeof (ret32));
1871         gst_adapter_unmap (comm->adapter);
1872         gst_adapter_flush (comm->adapter, sizeof (guint32));
1873         GST_TRACE_OBJECT (comm->element, "Got ACK %s for id %u",
1874             gst_flow_get_name (ret32), comm->id);
1875 
1876         g_mutex_lock (&comm->mutex);
1877         gst_ipc_pipeline_comm_reply_request (comm, comm->id, ret32, NULL);
1878         g_mutex_unlock (&comm->mutex);
1879 
1880         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1881         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1882         break;
1883       }
1884       case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
1885       {
1886         GstQuery *query = NULL;
1887         gboolean qret;
1888 
1889         available = gst_adapter_available (comm->adapter);
1890         if (available < comm->payload_length)
1891           goto done;
1892 
1893         qret =
1894             gst_ipc_pipeline_comm_read_query_result (comm, comm->payload_length,
1895             &query);
1896 
1897         GST_TRACE_OBJECT (comm->element,
1898             "deserialized query result %p: %d, %" GST_PTR_FORMAT, query, qret,
1899             query);
1900 
1901         g_mutex_lock (&comm->mutex);
1902         gst_ipc_pipeline_comm_reply_request (comm, comm->id, qret, query);
1903         g_mutex_unlock (&comm->mutex);
1904 
1905         gst_query_unref (query);
1906 
1907         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1908         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1909         break;
1910       }
1911       case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
1912       {
1913         GstBuffer *buf;
1914 
1915         available = gst_adapter_available (comm->adapter);
1916         if (available < comm->payload_length)
1917           goto done;
1918 
1919         buf = gst_ipc_pipeline_comm_read_buffer (comm, comm->payload_length);
1920         if (!buf)
1921           goto buffer_failed;
1922 
1923         /* set caps and push */
1924         GST_TRACE_OBJECT (comm->element,
1925             "deserialized buffer %p, pushing, timestamp %" GST_TIME_FORMAT
1926             ", duration %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT
1927             ", offset_end %" G_GINT64_FORMAT ", size %" G_GSIZE_FORMAT
1928             ", flags 0x%x", buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
1929             GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf),
1930             GST_BUFFER_OFFSET_END (buf), gst_buffer_get_size (buf),
1931             GST_BUFFER_FLAGS (buf));
1932 
1933         gst_mini_object_set_qdata (GST_MINI_OBJECT (buf), QUARK_ID,
1934             GINT_TO_POINTER (comm->id), NULL);
1935 
1936         if (comm->on_buffer)
1937           (*comm->on_buffer) (comm->id, buf, comm->user_data);
1938 
1939         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1940         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1941         break;
1942       }
1943       case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
1944       {
1945         GstEvent *event;
1946         gboolean upstream;
1947 
1948         available = gst_adapter_available (comm->adapter);
1949         if (available < comm->payload_length)
1950           goto done;
1951 
1952         event = gst_ipc_pipeline_comm_read_event (comm, comm->payload_length,
1953             &upstream);
1954         if (!event)
1955           goto event_failed;
1956 
1957         GST_TRACE_OBJECT (comm->element, "deserialized event %p of type %s",
1958             event, gst_event_type_get_name (event->type));
1959 
1960         gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
1961             GINT_TO_POINTER (comm->id), NULL);
1962 
1963         if (comm->on_event)
1964           (*comm->on_event) (comm->id, event, upstream, comm->user_data);
1965 
1966         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1967         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1968         break;
1969       }
1970       case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
1971       {
1972         GstEvent *event;
1973 
1974         available = gst_adapter_available (comm->adapter);
1975         if (available < comm->payload_length)
1976           goto done;
1977 
1978         event = gst_ipc_pipeline_comm_read_sink_message_event (comm,
1979             comm->payload_length);
1980         if (!event)
1981           goto event_failed;
1982 
1983         GST_TRACE_OBJECT (comm->element, "deserialized sink message event %p",
1984             event);
1985 
1986         gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
1987             GINT_TO_POINTER (comm->id), NULL);
1988 
1989         if (comm->on_event)
1990           (*comm->on_event) (comm->id, event, FALSE, comm->user_data);
1991 
1992         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1993         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1994         break;
1995       }
1996       case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
1997       {
1998         GstQuery *query;
1999         gboolean upstream;
2000 
2001         available = gst_adapter_available (comm->adapter);
2002         if (available < comm->payload_length)
2003           goto done;
2004 
2005         query = gst_ipc_pipeline_comm_read_query (comm, comm->payload_length,
2006             &upstream);
2007         if (!query)
2008           goto query_failed;
2009 
2010         GST_TRACE_OBJECT (comm->element, "deserialized query %p of type %s",
2011             query, gst_query_type_get_name (query->type));
2012 
2013         gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_ID,
2014             GINT_TO_POINTER (comm->id), NULL);
2015 
2016         if (comm->on_query)
2017           (*comm->on_query) (comm->id, query, upstream, comm->user_data);
2018 
2019         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2020         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2021         break;
2022       }
2023       case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
2024       {
2025         guint32 transition;
2026 
2027         available = gst_adapter_available (comm->adapter);
2028         if (available < comm->payload_length)
2029           goto done;
2030 
2031         if (!gst_ipc_pipeline_comm_read_state_change (comm,
2032                 comm->payload_length, &transition))
2033           goto state_change_failed;
2034 
2035         GST_TRACE_OBJECT (comm->element,
2036             "deserialized state change request: %s -> %s",
2037             gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT
2038                 (transition)),
2039             gst_element_state_get_name (GST_STATE_TRANSITION_NEXT
2040                 (transition)));
2041 
2042         if (comm->on_state_change)
2043           (*comm->on_state_change) (comm->id, transition, comm->user_data);
2044 
2045         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2046         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2047         break;
2048       }
2049       case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
2050       {
2051         available = gst_adapter_available (comm->adapter);
2052         if (available < comm->payload_length)
2053           goto done;
2054 
2055         if (!gst_ipc_pipeline_comm_read_state_lost (comm, comm->payload_length))
2056           goto event_failed;
2057 
2058         GST_TRACE_OBJECT (comm->element, "deserialized state-lost");
2059 
2060         if (comm->on_state_lost)
2061           (*comm->on_state_lost) (comm->user_data);
2062 
2063         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2064         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2065         break;
2066       }
2067       case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
2068       {
2069         GstMessage *message;
2070 
2071         available = gst_adapter_available (comm->adapter);
2072         if (available < comm->payload_length)
2073           goto done;
2074 
2075         message = gst_ipc_pipeline_comm_read_message (comm,
2076             comm->payload_length);
2077         if (!message)
2078           goto message_failed;
2079 
2080         GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
2081             message, gst_message_type_get_name (message->type));
2082 
2083         if (comm->on_message)
2084           (*comm->on_message) (comm->id, message, comm->user_data);
2085 
2086         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2087         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2088         break;
2089       }
2090       case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
2091       {
2092         GstMessage *message;
2093 
2094         available = gst_adapter_available (comm->adapter);
2095         if (available < comm->payload_length)
2096           goto done;
2097 
2098         message = gst_ipc_pipeline_comm_read_gerror_message (comm,
2099             comm->payload_length);
2100         if (!message)
2101           goto message_failed;
2102 
2103         GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
2104             message, gst_message_type_get_name (message->type));
2105 
2106         if (comm->on_message)
2107           (*comm->on_message) (comm->id, message, comm->user_data);
2108 
2109         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2110         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2111         break;
2112       }
2113     }
2114 
2115 done:
2116   return ret;
2117 
2118   /* ERRORS */
2119 out_of_sync:
2120   {
2121     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2122         ("Socket out of sync"));
2123     ret = FALSE;
2124     goto done;
2125   }
2126 state_change_failed:
2127   {
2128     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2129         ("could not read state change from fd"));
2130     ret = FALSE;
2131     goto done;
2132   }
2133 ack_failed:
2134   {
2135     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2136         ("could not read ack from fd"));
2137     ret = FALSE;
2138     goto done;
2139   }
2140 buffer_failed:
2141   {
2142     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2143         ("could not read buffer from fd"));
2144     ret = FALSE;
2145     goto done;
2146   }
2147 event_failed:
2148   {
2149     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2150         ("could not read event from fd"));
2151     ret = FALSE;
2152     goto done;
2153   }
2154 message_failed:
2155   {
2156     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2157         ("could not read message from fd"));
2158     ret = FALSE;
2159     goto done;
2160   }
2161 query_failed:
2162   {
2163     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2164         ("could not read query from fd"));
2165     ret = FALSE;
2166     goto done;
2167   }
2168 }
2169 
2170 static gpointer
reader_thread(gpointer data)2171 reader_thread (gpointer data)
2172 {
2173   GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data;
2174   gboolean running = TRUE;
2175   gint ret = 0;
2176 
2177   while (running) {
2178     ret = update_adapter (comm);
2179     switch (ret) {
2180       case 1:
2181         GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
2182             ("Failed to read from socket"));
2183         running = FALSE;
2184         break;
2185       case 2:
2186         GST_INFO_OBJECT (comm->element, "We're stopping, all good");
2187         running = FALSE;
2188         break;
2189       default:
2190         read_many (comm);
2191         break;
2192     }
2193   }
2194 
2195   GST_INFO_OBJECT (comm->element, "Reader thread ending");
2196   return NULL;
2197 }
2198 
2199 gboolean
gst_ipc_pipeline_comm_start_reader_thread(GstIpcPipelineComm * comm,void (* on_buffer)(guint32,GstBuffer *,gpointer),void (* on_event)(guint32,GstEvent *,gboolean,gpointer),void (* on_query)(guint32,GstQuery *,gboolean,gpointer),void (* on_state_change)(guint32,GstStateChange,gpointer),void (* on_state_lost)(gpointer),void (* on_message)(guint32,GstMessage *,gpointer),gpointer user_data)2200 gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
2201     void (*on_buffer) (guint32, GstBuffer *, gpointer),
2202     void (*on_event) (guint32, GstEvent *, gboolean, gpointer),
2203     void (*on_query) (guint32, GstQuery *, gboolean, gpointer),
2204     void (*on_state_change) (guint32, GstStateChange, gpointer),
2205     void (*on_state_lost) (gpointer),
2206     void (*on_message) (guint32, GstMessage *, gpointer), gpointer user_data)
2207 {
2208   if (comm->reader_thread)
2209     return FALSE;
2210 
2211   comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2212   comm->on_buffer = on_buffer;
2213   comm->on_event = on_event;
2214   comm->on_query = on_query;
2215   comm->on_state_change = on_state_change;
2216   comm->on_state_lost = on_state_lost;
2217   comm->on_message = on_message;
2218   comm->user_data = user_data;
2219   gst_poll_set_flushing (comm->poll, FALSE);
2220   comm->reader_thread =
2221       g_thread_new ("reader", (GThreadFunc) reader_thread, comm);
2222   return TRUE;
2223 }
2224 
2225 void
gst_ipc_pipeline_comm_stop_reader_thread(GstIpcPipelineComm * comm)2226 gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm)
2227 {
2228   if (!comm->reader_thread)
2229     return;
2230 
2231   gst_poll_set_flushing (comm->poll, TRUE);
2232   g_thread_join (comm->reader_thread);
2233   comm->reader_thread = NULL;
2234 }
2235 
2236 static gchar *
gst_value_serialize_event(const GValue * value)2237 gst_value_serialize_event (const GValue * value)
2238 {
2239   const GstStructure *structure;
2240   GstEvent *ev;
2241   gchar *type, *ts, *seqnum, *rt_offset, *str, *str64, *s;
2242   GValue val = G_VALUE_INIT;
2243 
2244   ev = g_value_get_boxed (value);
2245 
2246   g_value_init (&val, gst_event_type_get_type ());
2247   g_value_set_enum (&val, ev->type);
2248   type = gst_value_serialize (&val);
2249   g_value_unset (&val);
2250 
2251   g_value_init (&val, G_TYPE_UINT64);
2252   g_value_set_uint64 (&val, ev->timestamp);
2253   ts = gst_value_serialize (&val);
2254   g_value_unset (&val);
2255 
2256   g_value_init (&val, G_TYPE_UINT);
2257   g_value_set_uint (&val, ev->seqnum);
2258   seqnum = gst_value_serialize (&val);
2259   g_value_unset (&val);
2260 
2261   g_value_init (&val, G_TYPE_INT64);
2262   g_value_set_int64 (&val, gst_event_get_running_time_offset (ev));
2263   rt_offset = gst_value_serialize (&val);
2264   g_value_unset (&val);
2265 
2266   structure = gst_event_get_structure (ev);
2267   str = gst_structure_to_string (structure);
2268   str64 = g_base64_encode ((guchar *) str, strlen (str) + 1);
2269   g_strdelimit (str64, "=", '_');
2270   g_free (str);
2271 
2272   s = g_strconcat (type, ":", ts, ":", seqnum, ":", rt_offset, ":", str64,
2273       NULL);
2274 
2275   g_free (type);
2276   g_free (ts);
2277   g_free (seqnum);
2278   g_free (rt_offset);
2279   g_free (str64);
2280 
2281   return s;
2282 }
2283 
2284 static gboolean
gst_value_deserialize_event(GValue * dest,const gchar * s)2285 gst_value_deserialize_event (GValue * dest, const gchar * s)
2286 {
2287   GstEvent *ev = NULL;
2288   GValue val = G_VALUE_INIT;
2289   gboolean ret = FALSE;
2290   gchar **fields;
2291   gsize len;
2292 
2293   fields = g_strsplit (s, ":", -1);
2294   if (g_strv_length (fields) != 5)
2295     goto wrong_length;
2296 
2297   g_strdelimit (fields[4], "_", '=');
2298   g_base64_decode_inplace (fields[4], &len);
2299 
2300   g_value_init (&val, gst_event_type_get_type ());
2301   if (!gst_value_deserialize (&val, fields[0]))
2302     goto fail;
2303   ev = gst_event_new_custom (g_value_get_enum (&val),
2304       gst_structure_new_from_string (fields[4]));
2305 
2306   g_value_unset (&val);
2307   g_value_init (&val, G_TYPE_UINT64);
2308   if (!gst_value_deserialize (&val, fields[1]))
2309     goto fail;
2310   ev->timestamp = g_value_get_uint64 (&val);
2311 
2312   g_value_unset (&val);
2313   g_value_init (&val, G_TYPE_UINT);
2314   if (!gst_value_deserialize (&val, fields[2]))
2315     goto fail;
2316   ev->seqnum = g_value_get_uint (&val);
2317 
2318   g_value_unset (&val);
2319   g_value_init (&val, G_TYPE_INT64);
2320   if (!gst_value_deserialize (&val, fields[3]))
2321     goto fail;
2322   gst_event_set_running_time_offset (ev, g_value_get_int64 (&val));
2323 
2324   g_value_take_boxed (dest, ev);
2325   ev = NULL;
2326   ret = TRUE;
2327 
2328 fail:
2329   g_clear_pointer (&ev, gst_event_unref);
2330   g_value_unset (&val);
2331 
2332 wrong_length:
2333   g_strfreev (fields);
2334   return ret;
2335 }
2336 
2337 #define REGISTER_SERIALIZATION_NO_COMPARE(_gtype, _type)                \
2338 G_STMT_START {                                                          \
2339   static GstValueTable gst_value =                                      \
2340     { 0, NULL,                                             \
2341     gst_value_serialize_ ## _type, gst_value_deserialize_ ## _type };    \
2342   gst_value.type = _gtype;                                              \
2343   gst_value_register (&gst_value);                                      \
2344 } G_STMT_END
2345 
2346 void
gst_ipc_pipeline_comm_plugin_init(void)2347 gst_ipc_pipeline_comm_plugin_init (void)
2348 {
2349   static gsize once = 0;
2350 
2351   if (g_once_init_enter (&once)) {
2352     GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_comm_debug, "ipcpipelinecomm", 0,
2353         "ipc pipeline comm");
2354     QUARK_ID = g_quark_from_static_string ("ipcpipeline-id");
2355     REGISTER_SERIALIZATION_NO_COMPARE (gst_event_get_type (), event);
2356     g_once_init_leave (&once, (gsize) 1);
2357   }
2358 }
2359