1 /* GStreamer
2 * Copyright (C) 2015-2017 YouView TV Ltd
3 * Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk>
4 *
5 * gstipcpipelinecomm.c:
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
16 *
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 #ifdef HAVE_CONFIG_H
24 # include "config.h"
25 #endif
26
27 #ifdef HAVE_UNISTD_H
28 # include <unistd.h>
29 #endif
30 #ifdef _MSC_VER
31 /* ssize_t is not available, so match return value of recv()/send() on MSVC */
32 # define ssize_t int
33 # include <winsock2.h>
34 #endif
35 #include <errno.h>
36 #include <string.h>
37 #include <gst/base/gstbytewriter.h>
38 #include <gst/gstprotection.h>
39 #include "gstipcpipelinecomm.h"
40
41 GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_comm_debug);
42 #define GST_CAT_DEFAULT gst_ipc_pipeline_comm_debug
43
44 #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
45
46 GQuark QUARK_ID;
47
48 typedef enum
49 {
50 ACK_TYPE_NONE,
51 ACK_TYPE_TIMED,
52 ACK_TYPE_BLOCKING
53 } AckType;
54
55 typedef enum
56 {
57 COMM_REQUEST_TYPE_BUFFER,
58 COMM_REQUEST_TYPE_EVENT,
59 COMM_REQUEST_TYPE_QUERY,
60 COMM_REQUEST_TYPE_STATE_CHANGE,
61 COMM_REQUEST_TYPE_MESSAGE,
62 } CommRequestType;
63
64 typedef struct
65 {
66 guint32 id;
67 gboolean replied;
68 gboolean comm_error;
69 guint32 ret;
70 GstQuery *query;
71 CommRequestType type;
72 GCond cond;
73 } CommRequest;
74
75 static const gchar *comm_request_ret_get_name (CommRequestType type,
76 guint32 ret);
77 static guint32 comm_request_ret_get_failure_value (CommRequestType type);
78
79 static CommRequest *
comm_request_new(guint32 id,CommRequestType type,GstQuery * query)80 comm_request_new (guint32 id, CommRequestType type, GstQuery * query)
81 {
82 CommRequest *req;
83
84 req = g_malloc (sizeof (CommRequest));
85 req->id = id;
86 g_cond_init (&req->cond);
87 req->replied = FALSE;
88 req->comm_error = FALSE;
89 req->query = query;
90 req->ret = comm_request_ret_get_failure_value (type);
91 req->type = type;
92
93 return req;
94 }
95
96 static guint32
comm_request_wait(GstIpcPipelineComm * comm,CommRequest * req,AckType ack_type)97 comm_request_wait (GstIpcPipelineComm * comm, CommRequest * req,
98 AckType ack_type)
99 {
100 guint32 ret = comm_request_ret_get_failure_value (req->type);
101 guint64 end_time;
102
103 if (ack_type == ACK_TYPE_TIMED)
104 end_time = g_get_monotonic_time () + comm->ack_time;
105 else
106 end_time = G_MAXUINT64;
107
108 GST_TRACE_OBJECT (comm->element, "Waiting for ACK/NAK for request %u",
109 req->id);
110 while (!req->replied) {
111 if (ack_type == ACK_TYPE_TIMED) {
112 if (!g_cond_wait_until (&req->cond, &comm->mutex, end_time))
113 break;
114 } else
115 g_cond_wait (&req->cond, &comm->mutex);
116 }
117
118 if (req->replied) {
119 ret = req->ret;
120 GST_TRACE_OBJECT (comm->element, "Got reply for request %u: %d (%s)",
121 req->id, ret, comm_request_ret_get_name (req->type, ret));
122 } else {
123 req->comm_error = TRUE;
124 GST_ERROR_OBJECT (comm->element, "Timeout waiting for reply for request %u",
125 req->id);
126 }
127
128 return ret;
129 }
130
131 static void
comm_request_free(CommRequest * req)132 comm_request_free (CommRequest * req)
133 {
134 g_cond_clear (&req->cond);
135 g_free (req);
136 }
137
138 static const gchar *
comm_request_ret_get_name(CommRequestType type,guint32 ret)139 comm_request_ret_get_name (CommRequestType type, guint32 ret)
140 {
141 switch (type) {
142 case COMM_REQUEST_TYPE_BUFFER:
143 return gst_flow_get_name (ret);
144 case COMM_REQUEST_TYPE_EVENT:
145 case COMM_REQUEST_TYPE_QUERY:
146 case COMM_REQUEST_TYPE_MESSAGE:
147 return ret ? "TRUE" : "FALSE";
148 case COMM_REQUEST_TYPE_STATE_CHANGE:
149 return gst_element_state_change_return_get_name (ret);
150 default:
151 g_assert_not_reached ();
152 }
153 }
154
155 static guint32
comm_request_ret_get_failure_value(CommRequestType type)156 comm_request_ret_get_failure_value (CommRequestType type)
157 {
158 switch (type) {
159 case COMM_REQUEST_TYPE_BUFFER:
160 return GST_FLOW_COMM_ERROR;
161 case COMM_REQUEST_TYPE_EVENT:
162 case COMM_REQUEST_TYPE_MESSAGE:
163 case COMM_REQUEST_TYPE_QUERY:
164 return FALSE;
165 case COMM_REQUEST_TYPE_STATE_CHANGE:
166 return GST_STATE_CHANGE_FAILURE;
167 default:
168 g_assert_not_reached ();
169 }
170 }
171
172 static const gchar *
gst_ipc_pipeline_comm_data_type_get_name(GstIpcPipelineCommDataType type)173 gst_ipc_pipeline_comm_data_type_get_name (GstIpcPipelineCommDataType type)
174 {
175 switch (type) {
176 case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
177 return "ACK";
178 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
179 return "QUERY_RESULT";
180 case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
181 return "BUFFER";
182 case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
183 return "EVENT";
184 case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
185 return "SINK_MESSAGE_EVENT";
186 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
187 return "QUERY";
188 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
189 return "STATE_CHANGE";
190 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
191 return "STATE_LOST";
192 case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
193 return "MESSAGE";
194 case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
195 return "GERROR_MESSAGE";
196 default:
197 return "UNKNOWN";
198 }
199 }
200
201 static gboolean
gst_ipc_pipeline_comm_sync_fd(GstIpcPipelineComm * comm,guint32 id,GstQuery * query,guint32 * ret,AckType ack_type,CommRequestType type)202 gst_ipc_pipeline_comm_sync_fd (GstIpcPipelineComm * comm, guint32 id,
203 GstQuery * query, guint32 * ret, AckType ack_type, CommRequestType type)
204 {
205 CommRequest *req;
206 gboolean comm_error;
207 GHashTable *waiting_ids;
208
209 if (ack_type == ACK_TYPE_NONE)
210 return TRUE;
211
212 req = comm_request_new (id, type, query);
213 waiting_ids = g_hash_table_ref (comm->waiting_ids);
214 g_hash_table_insert (waiting_ids, GINT_TO_POINTER (id), req);
215 *ret = comm_request_wait (comm, req, ack_type);
216 comm_error = req->comm_error;
217 g_hash_table_remove (waiting_ids, GINT_TO_POINTER (id));
218 g_hash_table_unref (waiting_ids);
219 return !comm_error;
220 }
221
222 static gboolean
write_to_fd_raw(GstIpcPipelineComm * comm,const void * data,size_t size)223 write_to_fd_raw (GstIpcPipelineComm * comm, const void *data, size_t size)
224 {
225 size_t offset;
226 gboolean ret = TRUE;
227
228 offset = 0;
229 GST_TRACE_OBJECT (comm->element, "Writing %u bytes to fdout",
230 (unsigned) size);
231 while (size) {
232 #ifdef _MSC_VER
233 ssize_t written =
234 send (comm->fdout, (const unsigned char *) data + offset, size, 0);
235 if (written < 0) {
236 int last_error = WSAGetLastError ();
237 if (last_error == WSAEWOULDBLOCK)
238 continue;
239 gchar *error_text = g_win32_error_message (last_error);
240 GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s", error_text);
241 g_free (error_text);
242 ret = FALSE;
243 goto done;
244 }
245 #else
246 ssize_t written =
247 write (comm->fdout, (const unsigned char *) data + offset, size);
248 if (written < 0) {
249 if (errno == EAGAIN || errno == EINTR)
250 continue;
251 GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s",
252 strerror (errno));
253 ret = FALSE;
254 goto done;
255 }
256 #endif
257 size -= written;
258 offset += written;
259 }
260
261 done:
262 return ret;
263 }
264
265 static gboolean
write_byte_writer_to_fd(GstIpcPipelineComm * comm,GstByteWriter * bw)266 write_byte_writer_to_fd (GstIpcPipelineComm * comm, GstByteWriter * bw)
267 {
268 guint8 *data;
269 gboolean ret;
270 guint size;
271
272 size = gst_byte_writer_get_size (bw);
273 data = gst_byte_writer_reset_and_get_data (bw);
274 if (!data)
275 return FALSE;
276 ret = write_to_fd_raw (comm, data, size);
277 g_free (data);
278 return ret;
279 }
280
281 static void
gst_ipc_pipeline_comm_write_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,guint32 ret,CommRequestType type)282 gst_ipc_pipeline_comm_write_ack_to_fd (GstIpcPipelineComm * comm, guint32 id,
283 guint32 ret, CommRequestType type)
284 {
285 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK;
286 guint32 size;
287 GstByteWriter bw;
288
289 g_mutex_lock (&comm->mutex);
290
291 GST_TRACE_OBJECT (comm->element, "Writing ACK for %u: %s (%d)", id,
292 comm_request_ret_get_name (type, ret), ret);
293 gst_byte_writer_init (&bw);
294 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
295 goto write_failed;
296 if (!gst_byte_writer_put_uint32_le (&bw, id))
297 goto write_failed;
298 size = sizeof (ret);
299 if (!gst_byte_writer_put_uint32_le (&bw, size))
300 goto write_failed;
301 if (!gst_byte_writer_put_uint32_le (&bw, ret))
302 goto write_failed;
303
304 if (!write_byte_writer_to_fd (comm, &bw))
305 goto write_failed;
306
307 done:
308 g_mutex_unlock (&comm->mutex);
309 gst_byte_writer_reset (&bw);
310 return;
311
312 write_failed:
313 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
314 ("Failed to write to socket"));
315 goto done;
316 }
317
318 void
gst_ipc_pipeline_comm_write_flow_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,GstFlowReturn ret)319 gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm,
320 guint32 id, GstFlowReturn ret)
321 {
322 gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
323 COMM_REQUEST_TYPE_BUFFER);
324 }
325
326 void
gst_ipc_pipeline_comm_write_boolean_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,gboolean ret)327 gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm,
328 guint32 id, gboolean ret)
329 {
330 gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
331 COMM_REQUEST_TYPE_EVENT);
332 }
333
334 void
gst_ipc_pipeline_comm_write_state_change_ack_to_fd(GstIpcPipelineComm * comm,guint32 id,GstStateChangeReturn ret)335 gst_ipc_pipeline_comm_write_state_change_ack_to_fd (GstIpcPipelineComm * comm,
336 guint32 id, GstStateChangeReturn ret)
337 {
338 gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
339 COMM_REQUEST_TYPE_STATE_CHANGE);
340 }
341
342 void
gst_ipc_pipeline_comm_write_query_result_to_fd(GstIpcPipelineComm * comm,guint32 id,gboolean result,GstQuery * query)343 gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm,
344 guint32 id, gboolean result, GstQuery * query)
345 {
346 const unsigned char payload_type =
347 GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT;
348 guint8 result8 = result;
349 guint32 size;
350 size_t len;
351 char *str = NULL;
352 guint32 type;
353 const GstStructure *structure;
354 GstByteWriter bw;
355
356 g_mutex_lock (&comm->mutex);
357
358 GST_TRACE_OBJECT (comm->element,
359 "Writing query result for %u: %d, %" GST_PTR_FORMAT, id, result, query);
360 gst_byte_writer_init (&bw);
361 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
362 goto write_failed;
363 if (!gst_byte_writer_put_uint32_le (&bw, id))
364 goto write_failed;
365 structure = gst_query_get_structure (query);
366 if (structure) {
367 str = gst_structure_to_string (structure);
368 len = strlen (str);
369 } else {
370 str = NULL;
371 len = 0;
372 }
373 size = 1 + sizeof (guint32) + len + 1;
374 if (!gst_byte_writer_put_uint32_le (&bw, size))
375 goto write_failed;
376 if (!gst_byte_writer_put_uint8 (&bw, result8))
377 goto write_failed;
378 type = GST_QUERY_TYPE (query);
379 if (!gst_byte_writer_put_uint32_le (&bw, type))
380 goto write_failed;
381 if (str) {
382 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, len + 1))
383 goto write_failed;
384 } else {
385 if (!gst_byte_writer_put_uint8 (&bw, 0))
386 goto write_failed;
387 }
388
389 if (!write_byte_writer_to_fd (comm, &bw))
390 goto write_failed;
391
392 done:
393 g_mutex_unlock (&comm->mutex);
394 gst_byte_writer_reset (&bw);
395 g_free (str);
396 return;
397
398 write_failed:
399 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
400 ("Failed to write to socket"));
401 goto done;
402 }
403
404 static gboolean
gst_ipc_pipeline_comm_read_query_result(GstIpcPipelineComm * comm,guint32 size,GstQuery ** query)405 gst_ipc_pipeline_comm_read_query_result (GstIpcPipelineComm * comm,
406 guint32 size, GstQuery ** query)
407 {
408 gchar *end = NULL;
409 GstStructure *structure;
410 guint8 result;
411 guint32 type;
412 const guint8 *payload = NULL;
413 guint32 mapped_size = size;
414
415 /* this should not be called if we don't have enough yet */
416 *query = NULL;
417 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
418 g_return_val_if_fail (size >= 1 + sizeof (guint32), FALSE);
419
420 payload = gst_adapter_map (comm->adapter, mapped_size);
421 if (!payload)
422 return FALSE;
423 result = *payload++;
424 memcpy (&type, payload, sizeof (type));
425 payload += sizeof (type);
426
427 size -= 1 + sizeof (guint32);
428 if (size == 0)
429 goto done;
430
431 if (payload[size - 1]) {
432 result = FALSE;
433 goto done;
434 }
435 if (*payload) {
436 structure = gst_structure_from_string ((const char *) payload, &end);
437 } else {
438 structure = NULL;
439 }
440 if (!structure) {
441 result = FALSE;
442 goto done;
443 }
444
445 *query = gst_query_new_custom (type, structure);
446
447 done:
448 gst_adapter_unmap (comm->adapter);
449 gst_adapter_flush (comm->adapter, mapped_size);
450 return result;
451 }
452
453 typedef struct
454 {
455 guint32 bytes;
456
457 guint64 size;
458 guint32 flags;
459 guint64 api;
460 char *str;
461 } MetaBuildInfo;
462
463 typedef struct
464 {
465 GstIpcPipelineComm *comm;
466 guint32 n_meta;
467 guint32 total_bytes;
468 MetaBuildInfo *info;
469 } MetaListRepresentation;
470
471 static gboolean
build_meta(GstBuffer * buffer,GstMeta ** meta,gpointer user_data)472 build_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data)
473 {
474 MetaListRepresentation *repr = user_data;
475
476 repr->n_meta++;
477 repr->info = g_realloc (repr->info, repr->n_meta * sizeof (MetaBuildInfo));
478 repr->info[repr->n_meta - 1].bytes =
479 /* 4 byte bytes */
480 4
481 /* 4 byte GstMetaFlags */
482 + 4
483 /* GstMetaInfo::api */
484 + 4 + strlen (g_type_name ((*meta)->info->api)) + 1
485 /* GstMetaInfo::size */
486 + 8
487 /* str length */
488 + 4;
489
490 repr->info[repr->n_meta - 1].flags = (*meta)->flags;
491 repr->info[repr->n_meta - 1].api = (*meta)->info->api;
492 repr->info[repr->n_meta - 1].size = (*meta)->info->size;
493 repr->info[repr->n_meta - 1].str = NULL;
494
495 /* GstMeta is a base class, and actual useful classes are all different...
496 So we list a few of them we know we want and ignore the open ended rest */
497 if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) {
498 GstProtectionMeta *m = (GstProtectionMeta *) * meta;
499 repr->info[repr->n_meta - 1].str = gst_structure_to_string (m->info);
500 repr->info[repr->n_meta - 1].bytes +=
501 strlen (repr->info[repr->n_meta - 1].str) + 1;
502 GST_TRACE_OBJECT (repr->comm->element, "Found GstMeta type %s: %s",
503 g_type_name ((*meta)->info->api), repr->info[repr->n_meta - 1].str);
504 } else {
505 GST_WARNING_OBJECT (repr->comm->element, "Ignoring GstMeta type %s",
506 g_type_name ((*meta)->info->api));
507 }
508 repr->total_bytes += repr->info[repr->n_meta - 1].bytes;
509 return TRUE;
510 }
511
512 typedef struct
513 {
514 guint64 pts;
515 guint64 dts;
516 guint64 duration;
517 guint64 offset;
518 guint64 offset_end;
519 guint64 flags;
520 } CommBufferMetadata;
521
522 GstFlowReturn
gst_ipc_pipeline_comm_write_buffer_to_fd(GstIpcPipelineComm * comm,GstBuffer * buffer)523 gst_ipc_pipeline_comm_write_buffer_to_fd (GstIpcPipelineComm * comm,
524 GstBuffer * buffer)
525 {
526 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER;
527 GstMapInfo map;
528 guint32 ret32 = GST_FLOW_OK;
529 guint32 size, n;
530 CommBufferMetadata meta;
531 GstFlowReturn ret;
532 MetaListRepresentation repr = { comm, 0, 4, NULL }; /* starts a 4 for n_meta */
533 GstByteWriter bw;
534
535 g_mutex_lock (&comm->mutex);
536 ++comm->send_id;
537
538 GST_TRACE_OBJECT (comm->element, "Writing buffer %u: %" GST_PTR_FORMAT,
539 comm->send_id, buffer);
540
541 gst_byte_writer_init (&bw);
542
543 meta.pts = GST_BUFFER_PTS (buffer);
544 meta.dts = GST_BUFFER_DTS (buffer);
545 meta.duration = GST_BUFFER_DURATION (buffer);
546 meta.offset = GST_BUFFER_OFFSET (buffer);
547 meta.offset_end = GST_BUFFER_OFFSET_END (buffer);
548 meta.flags = GST_BUFFER_FLAGS (buffer);
549
550 /* work out meta size */
551 gst_buffer_foreach_meta (buffer, build_meta, &repr);
552
553 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
554 goto write_failed;
555 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
556 goto write_failed;
557 size =
558 gst_buffer_get_size (buffer) + sizeof (guint32) +
559 sizeof (CommBufferMetadata) + repr.total_bytes;
560 if (!gst_byte_writer_put_uint32_le (&bw, size))
561 goto write_failed;
562 if (!gst_byte_writer_put_data (&bw, (const guint8 *) &meta, sizeof (meta)))
563 goto write_failed;
564 size = gst_buffer_get_size (buffer);
565 if (!gst_byte_writer_put_uint32_le (&bw, size))
566 goto write_failed;
567 if (!write_byte_writer_to_fd (comm, &bw))
568 goto write_failed;
569
570 if (!gst_buffer_map (buffer, &map, GST_MAP_READ))
571 goto map_failed;
572 ret = write_to_fd_raw (comm, map.data, map.size);
573 gst_buffer_unmap (buffer, &map);
574 if (!ret)
575 goto write_failed;
576
577 /* meta */
578 gst_byte_writer_init (&bw);
579 if (!gst_byte_writer_put_uint32_le (&bw, repr.n_meta))
580 goto write_failed;
581 for (n = 0; n < repr.n_meta; ++n) {
582 const MetaBuildInfo *info = repr.info + n;
583 guint32 len;
584 const char *s;
585
586 if (!gst_byte_writer_put_uint32_le (&bw, info->bytes))
587 goto write_failed;
588
589 if (!gst_byte_writer_put_uint32_le (&bw, info->flags))
590 goto write_failed;
591
592 s = g_type_name (info->api);
593 len = strlen (s) + 1;
594 if (!gst_byte_writer_put_uint32_le (&bw, len))
595 goto write_failed;
596 if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
597 goto write_failed;
598
599 if (!gst_byte_writer_put_uint64_le (&bw, info->size))
600 goto write_failed;
601
602 s = info->str;
603 len = s ? (strlen (s) + 1) : 0;
604 if (!gst_byte_writer_put_uint32_le (&bw, len))
605 goto write_failed;
606 if (len)
607 if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
608 goto write_failed;
609 }
610
611 if (!write_byte_writer_to_fd (comm, &bw))
612 goto write_failed;
613
614 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
615 ACK_TYPE_BLOCKING, COMM_REQUEST_TYPE_BUFFER))
616 goto wait_failed;
617 ret = ret32;
618
619 done:
620 g_mutex_unlock (&comm->mutex);
621 gst_byte_writer_reset (&bw);
622 for (n = 0; n < repr.n_meta; ++n)
623 g_free (repr.info[n].str);
624 g_free (repr.info);
625 return ret;
626
627 write_failed:
628 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
629 ("Failed to write to socket"));
630 ret = GST_FLOW_COMM_ERROR;
631 goto done;
632
633 wait_failed:
634 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
635 ("Failed to wait for reply on socket"));
636 ret = GST_FLOW_COMM_ERROR;
637 goto done;
638
639 map_failed:
640 GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
641 ("Failed to map buffer"));
642 ret = GST_FLOW_ERROR;
643 goto done;
644 }
645
646 static GstBuffer *
gst_ipc_pipeline_comm_read_buffer(GstIpcPipelineComm * comm,guint32 size)647 gst_ipc_pipeline_comm_read_buffer (GstIpcPipelineComm * comm, guint32 size)
648 {
649 GstBuffer *buffer;
650 CommBufferMetadata meta;
651 guint32 n_meta, n;
652 const guint8 *payload = NULL;
653 guint32 mapped_size, buffer_data_size;
654
655 /* this should not be called if we don't have enough yet */
656 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
657 g_return_val_if_fail (size >= sizeof (CommBufferMetadata), NULL);
658
659 mapped_size = sizeof (CommBufferMetadata) + sizeof (buffer_data_size);
660 payload = gst_adapter_map (comm->adapter, mapped_size);
661 if (!payload)
662 return NULL;
663 memcpy (&meta, payload, sizeof (CommBufferMetadata));
664 payload += sizeof (CommBufferMetadata);
665 memcpy (&buffer_data_size, payload, sizeof (buffer_data_size));
666 size -= mapped_size;
667 gst_adapter_unmap (comm->adapter);
668 gst_adapter_flush (comm->adapter, mapped_size);
669
670 if (buffer_data_size == 0) {
671 buffer = gst_buffer_new ();
672 } else {
673 buffer = gst_adapter_get_buffer (comm->adapter, buffer_data_size);
674 gst_adapter_flush (comm->adapter, buffer_data_size);
675 }
676 size -= buffer_data_size;
677
678 GST_BUFFER_PTS (buffer) = meta.pts;
679 GST_BUFFER_DTS (buffer) = meta.dts;
680 GST_BUFFER_DURATION (buffer) = meta.duration;
681 GST_BUFFER_OFFSET (buffer) = meta.offset;
682 GST_BUFFER_OFFSET_END (buffer) = meta.offset_end;
683 GST_BUFFER_FLAGS (buffer) = meta.flags;
684
685 /* If you don't call that, the GType isn't yet known at the
686 g_type_from_name below */
687 gst_protection_meta_get_info ();
688
689 mapped_size = size;
690 payload = gst_adapter_map (comm->adapter, mapped_size);
691 if (!payload) {
692 gst_buffer_unref (buffer);
693 return NULL;
694 }
695 memcpy (&n_meta, payload, sizeof (n_meta));
696 payload += sizeof (n_meta);
697
698 for (n = 0; n < n_meta; ++n) {
699 guint32 flags, len, bytes;
700 guint64 msize;
701 GType api;
702 GstMeta *meta;
703 GstStructure *structure = NULL;
704
705 memcpy (&bytes, payload, sizeof (bytes));
706 payload += sizeof (bytes);
707
708 #define READ_FIELD(f) do { \
709 memcpy (&f, payload, sizeof (f)); \
710 payload += sizeof(f); \
711 } while(0)
712
713 READ_FIELD (flags);
714 READ_FIELD (len);
715 api = g_type_from_name ((const char *) payload);
716 payload = (const guint8 *) strchr ((const char *) payload, 0) + 1;
717 READ_FIELD (msize);
718 READ_FIELD (len);
719 if (len) {
720 structure = gst_structure_new_from_string ((const char *) payload);
721 payload += len + 1;
722 }
723
724 /* Seems we can add a meta from the api nor type ? */
725 if (api == GST_PROTECTION_META_API_TYPE) {
726 meta =
727 gst_buffer_add_meta (buffer, gst_protection_meta_get_info (), NULL);
728 ((GstProtectionMeta *) meta)->info = structure;
729 } else {
730 GST_WARNING_OBJECT (comm->element, "Unsupported meta: %s",
731 g_type_name (api));
732 if (structure)
733 gst_structure_free (structure);
734 }
735
736 #undef READ_FIELD
737
738 }
739
740 gst_adapter_unmap (comm->adapter);
741 gst_adapter_flush (comm->adapter, mapped_size);
742
743 return buffer;
744 }
745
746 static gboolean
gst_ipc_pipeline_comm_write_sink_message_event_to_fd(GstIpcPipelineComm * comm,GstEvent * event)747 gst_ipc_pipeline_comm_write_sink_message_event_to_fd (GstIpcPipelineComm * comm,
748 GstEvent * event)
749 {
750 const unsigned char payload_type =
751 GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT;
752 gboolean ret;
753 guint32 type, size, eseqnum, mseqnum, ret32 = TRUE, slen, structure_slen;
754 char *str = NULL;
755 const GstStructure *structure;
756 GstMessage *message = NULL;
757 const char *name;
758 GstByteWriter bw;
759
760 g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE,
761 FALSE);
762
763 g_mutex_lock (&comm->mutex);
764 ++comm->send_id;
765
766 GST_TRACE_OBJECT (comm->element,
767 "Writing sink message event %u: %" GST_PTR_FORMAT, comm->send_id, event);
768
769 gst_byte_writer_init (&bw);
770 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
771 goto write_failed;
772 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
773 goto write_failed;
774 name = gst_structure_get_name (gst_event_get_structure (event));
775 slen = strlen (name) + 1;
776 gst_event_parse_sink_message (event, &message);
777 structure = gst_message_get_structure (message);
778 if (structure) {
779 str = gst_structure_to_string (structure);
780 structure_slen = strlen (str);
781 } else {
782 str = NULL;
783 structure_slen = 0;
784 }
785 size = sizeof (type) + sizeof (eseqnum) + sizeof (mseqnum) + sizeof (slen) +
786 strlen (name) + 1 + structure_slen + 1;
787 if (!gst_byte_writer_put_uint32_le (&bw, size))
788 goto write_failed;
789
790 type = GST_MESSAGE_TYPE (message);
791 if (!gst_byte_writer_put_uint32_le (&bw, type))
792 goto write_failed;
793 size -= sizeof (type);
794
795 eseqnum = GST_EVENT_SEQNUM (event);
796 if (!gst_byte_writer_put_uint32_le (&bw, eseqnum))
797 goto write_failed;
798 size -= sizeof (eseqnum);
799
800 mseqnum = GST_MESSAGE_SEQNUM (message);
801 if (!gst_byte_writer_put_uint32_le (&bw, mseqnum))
802 goto write_failed;
803 size -= sizeof (mseqnum);
804
805 if (!gst_byte_writer_put_uint32_le (&bw, slen))
806 goto write_failed;
807 size -= sizeof (slen);
808
809 if (!gst_byte_writer_put_data (&bw, (const guint8 *) name, slen))
810 goto write_failed;
811 size -= slen;
812
813 if (str) {
814 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
815 goto write_failed;
816 } else {
817 if (!gst_byte_writer_put_uint8 (&bw, 0))
818 goto write_failed;
819 }
820
821 if (!write_byte_writer_to_fd (comm, &bw))
822 goto write_failed;
823
824 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
825 GST_EVENT_IS_SERIALIZED (event) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
826 COMM_REQUEST_TYPE_EVENT))
827 goto write_failed;
828
829 ret = ret32;
830
831 done:
832 g_mutex_unlock (&comm->mutex);
833 gst_byte_writer_reset (&bw);
834 g_free (str);
835 if (message)
836 gst_message_unref (message);
837 return ret;
838
839 write_failed:
840 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
841 ("Failed to write to socket"));
842 ret = FALSE;
843 goto done;
844 }
845
846 static GstEvent *
gst_ipc_pipeline_comm_read_sink_message_event(GstIpcPipelineComm * comm,guint32 size)847 gst_ipc_pipeline_comm_read_sink_message_event (GstIpcPipelineComm * comm,
848 guint32 size)
849 {
850 GstMessage *message;
851 GstEvent *event = NULL;
852 gchar *end = NULL;
853 GstStructure *structure;
854 guint32 type, eseqnum, mseqnum, slen;
855 const char *name;
856 guint32 mapped_size = size;
857 const guint8 *payload;
858
859 /* this should not be called if we don't have enough yet */
860 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
861 g_return_val_if_fail (size >= sizeof (type) + sizeof (slen), NULL);
862
863 payload = gst_adapter_map (comm->adapter, mapped_size);
864 if (!payload)
865 return NULL;
866 memcpy (&type, payload, sizeof (type));
867 payload += sizeof (type);
868 size -= sizeof (type);
869 if (size == 0)
870 goto done;
871
872 memcpy (&eseqnum, payload, sizeof (eseqnum));
873 payload += sizeof (eseqnum);
874 size -= sizeof (eseqnum);
875 if (size == 0)
876 goto done;
877
878 memcpy (&mseqnum, payload, sizeof (mseqnum));
879 payload += sizeof (mseqnum);
880 size -= sizeof (mseqnum);
881 if (size == 0)
882 goto done;
883
884 memcpy (&slen, payload, sizeof (slen));
885 payload += sizeof (slen);
886 size -= sizeof (slen);
887 if (size == 0)
888 goto done;
889
890 if (payload[slen - 1])
891 goto done;
892 name = (const char *) payload;
893 payload += slen;
894 size -= slen;
895
896 if ((payload)[size - 1]) {
897 goto done;
898 }
899 if (*payload) {
900 structure = gst_structure_from_string ((const char *) payload, &end);
901 } else {
902 structure = NULL;
903 }
904
905 message =
906 gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
907 gst_message_set_seqnum (message, mseqnum);
908 event = gst_event_new_sink_message (name, message);
909 gst_event_set_seqnum (event, eseqnum);
910 gst_message_unref (message);
911
912 done:
913 gst_adapter_unmap (comm->adapter);
914 gst_adapter_flush (comm->adapter, mapped_size);
915 return event;
916 }
917
918 gboolean
gst_ipc_pipeline_comm_write_event_to_fd(GstIpcPipelineComm * comm,gboolean upstream,GstEvent * event)919 gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm,
920 gboolean upstream, GstEvent * event)
921 {
922 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT;
923 gboolean ret;
924 guint32 type, size, ret32 = TRUE, seqnum, slen;
925 char *str = NULL;
926 const GstStructure *structure;
927 GstByteWriter bw;
928
929 /* we special case sink-message event as gst can't serialize/de-serialize it */
930 if (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE)
931 return gst_ipc_pipeline_comm_write_sink_message_event_to_fd (comm, event);
932
933 g_mutex_lock (&comm->mutex);
934 ++comm->send_id;
935
936 GST_TRACE_OBJECT (comm->element, "Writing event %u: %" GST_PTR_FORMAT,
937 comm->send_id, event);
938
939 gst_byte_writer_init (&bw);
940 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
941 goto write_failed;
942 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
943 goto write_failed;
944 structure = gst_event_get_structure (event);
945 if (structure) {
946
947 if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) {
948 GstStructure *s = gst_structure_copy (structure);
949 gst_structure_remove_field (s, "stream");
950 str = gst_structure_to_string (s);
951 gst_structure_free (s);
952 } else {
953 str = gst_structure_to_string (structure);
954 }
955
956 slen = strlen (str);
957 } else {
958 str = NULL;
959 slen = 0;
960 }
961 size = sizeof (type) + sizeof (seqnum) + 1 + slen + 1;
962 if (!gst_byte_writer_put_uint32_le (&bw, size))
963 goto write_failed;
964
965 type = GST_EVENT_TYPE (event);
966 if (!gst_byte_writer_put_uint32_le (&bw, type))
967 goto write_failed;
968
969 seqnum = GST_EVENT_SEQNUM (event);
970 if (!gst_byte_writer_put_uint32_le (&bw, seqnum))
971 goto write_failed;
972
973 if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
974 goto write_failed;
975
976 if (str) {
977 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
978 goto write_failed;
979 } else {
980 if (!gst_byte_writer_put_uint8 (&bw, 0))
981 goto write_failed;
982 }
983
984 if (!write_byte_writer_to_fd (comm, &bw))
985 goto write_failed;
986
987 /* Upstream events get serialized, this is required to send seeks only
988 * one at a time. */
989 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
990 (GST_EVENT_IS_SERIALIZED (event) || GST_EVENT_IS_UPSTREAM (event)) ?
991 ACK_TYPE_BLOCKING : ACK_TYPE_NONE, COMM_REQUEST_TYPE_EVENT))
992 goto write_failed;
993 ret = ret32;
994
995 done:
996 g_mutex_unlock (&comm->mutex);
997 g_free (str);
998 gst_byte_writer_reset (&bw);
999 return ret;
1000
1001 write_failed:
1002 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1003 ("Failed to write to socket"));
1004 ret = FALSE;
1005 goto done;
1006 }
1007
1008 static GstEvent *
gst_ipc_pipeline_comm_read_event(GstIpcPipelineComm * comm,guint32 size,gboolean * upstream)1009 gst_ipc_pipeline_comm_read_event (GstIpcPipelineComm * comm, guint32 size,
1010 gboolean * upstream)
1011 {
1012 GstEvent *event = NULL;
1013 gchar *end = NULL;
1014 GstStructure *structure;
1015 guint32 type, seqnum;
1016 guint32 mapped_size = size;
1017 const guint8 *payload;
1018
1019 /* this should not be called if we don't have enough yet */
1020 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1021 g_return_val_if_fail (size >= sizeof (type), NULL);
1022
1023 payload = gst_adapter_map (comm->adapter, mapped_size);
1024 if (!payload)
1025 return NULL;
1026
1027 memcpy (&type, payload, sizeof (type));
1028 payload += sizeof (type);
1029 size -= sizeof (type);
1030 if (size == 0)
1031 goto done;
1032
1033 memcpy (&seqnum, payload, sizeof (seqnum));
1034 payload += sizeof (seqnum);
1035 size -= sizeof (seqnum);
1036 if (size == 0)
1037 goto done;
1038
1039 *upstream = (*payload) ? TRUE : FALSE;
1040 payload += 1;
1041 size -= 1;
1042 if (size == 0)
1043 goto done;
1044
1045 if (payload[size - 1])
1046 goto done;
1047 if (*payload) {
1048 structure = gst_structure_from_string ((const char *) payload, &end);
1049 } else {
1050 structure = NULL;
1051 }
1052
1053 event = gst_event_new_custom (type, structure);
1054 gst_event_set_seqnum (event, seqnum);
1055
1056 done:
1057 gst_adapter_unmap (comm->adapter);
1058 gst_adapter_flush (comm->adapter, mapped_size);
1059 return event;
1060 }
1061
1062 gboolean
gst_ipc_pipeline_comm_write_query_to_fd(GstIpcPipelineComm * comm,gboolean upstream,GstQuery * query)1063 gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm,
1064 gboolean upstream, GstQuery * query)
1065 {
1066 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY;
1067 gboolean ret;
1068 guint32 type, size, ret32 = TRUE, slen;
1069 char *str = NULL;
1070 const GstStructure *structure;
1071 GstByteWriter bw;
1072
1073 g_mutex_lock (&comm->mutex);
1074 ++comm->send_id;
1075
1076 GST_TRACE_OBJECT (comm->element, "Writing query %u: %" GST_PTR_FORMAT,
1077 comm->send_id, query);
1078
1079 gst_byte_writer_init (&bw);
1080 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1081 goto write_failed;
1082 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1083 goto write_failed;
1084 structure = gst_query_get_structure (query);
1085 if (structure) {
1086 str = gst_structure_to_string (structure);
1087 slen = strlen (str);
1088 } else {
1089 str = NULL;
1090 slen = 0;
1091 }
1092 size = sizeof (type) + 1 + slen + 1;
1093 if (!gst_byte_writer_put_uint32_le (&bw, size))
1094 goto write_failed;
1095
1096 type = GST_QUERY_TYPE (query);
1097 if (!gst_byte_writer_put_uint32_le (&bw, type))
1098 goto write_failed;
1099
1100 if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
1101 goto write_failed;
1102
1103 if (str) {
1104 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
1105 goto write_failed;
1106 } else {
1107 if (!gst_byte_writer_put_uint8 (&bw, 0))
1108 goto write_failed;
1109 }
1110
1111 if (!write_byte_writer_to_fd (comm, &bw))
1112 goto write_failed;
1113
1114 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, query, &ret32,
1115 GST_QUERY_IS_SERIALIZED (query) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
1116 COMM_REQUEST_TYPE_QUERY))
1117 goto write_failed;
1118
1119 ret = ret32;
1120
1121 done:
1122 g_mutex_unlock (&comm->mutex);
1123 g_free (str);
1124 gst_byte_writer_reset (&bw);
1125 return ret;
1126
1127 write_failed:
1128 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1129 ("Failed to write to socket"));
1130 ret = FALSE;
1131 goto done;
1132 }
1133
1134 static GstQuery *
gst_ipc_pipeline_comm_read_query(GstIpcPipelineComm * comm,guint32 size,gboolean * upstream)1135 gst_ipc_pipeline_comm_read_query (GstIpcPipelineComm * comm, guint32 size,
1136 gboolean * upstream)
1137 {
1138 GstQuery *query = NULL;
1139 gchar *end = NULL;
1140 GstStructure *structure;
1141 guint32 type;
1142 guint32 mapped_size = size;
1143 const guint8 *payload;
1144
1145 /* this should not be called if we don't have enough yet */
1146 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1147 g_return_val_if_fail (size >= sizeof (type), NULL);
1148
1149 payload = gst_adapter_map (comm->adapter, mapped_size);
1150 if (!payload)
1151 return NULL;
1152
1153 memcpy (&type, payload, sizeof (type));
1154 payload += sizeof (type);
1155 size -= sizeof (type);
1156 if (size == 0)
1157 goto done;
1158
1159 *upstream = (*payload) ? TRUE : FALSE;
1160 payload += 1;
1161 size -= 1;
1162 if (size == 0)
1163 goto done;
1164
1165 if (payload[size - 1])
1166 goto done;
1167 if (*payload) {
1168 structure = gst_structure_from_string ((const char *) payload, &end);
1169 } else {
1170 structure = NULL;
1171 }
1172
1173 query = gst_query_new_custom (type, structure);
1174
1175 /* CAPS queries contain a filter field, of GstCaps type, which can be NULL.
1176 This does not play well with the serialization/deserialization system,
1177 which will give us a non-NULL GstCaps which has a value of NULL. This
1178 in turn wreaks havoc with any code that tests whether filter is NULL
1179 (which basically means, am I being given an optional GstCaps ?).
1180 So we look for non-NULL GstCaps which have NULL contents, and replace
1181 them with NULL instead. */
1182 if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) {
1183 GstCaps *filter;
1184 gst_query_parse_caps (query, &filter);
1185 if (filter
1186 && !strcmp (gst_structure_get_name (gst_caps_get_structure (filter, 0)),
1187 "NULL")) {
1188 gst_query_unref (query);
1189 query = gst_query_new_caps (NULL);
1190 }
1191 }
1192
1193 done:
1194 gst_adapter_unmap (comm->adapter);
1195 gst_adapter_flush (comm->adapter, mapped_size);
1196 return query;
1197 }
1198
1199 GstStateChangeReturn
gst_ipc_pipeline_comm_write_state_change_to_fd(GstIpcPipelineComm * comm,GstStateChange transition)1200 gst_ipc_pipeline_comm_write_state_change_to_fd (GstIpcPipelineComm * comm,
1201 GstStateChange transition)
1202 {
1203 const unsigned char payload_type =
1204 GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE;
1205 GstStateChangeReturn ret;
1206 guint32 size, ret32 = GST_STATE_CHANGE_SUCCESS;
1207 GstByteWriter bw;
1208
1209 g_mutex_lock (&comm->mutex);
1210 ++comm->send_id;
1211
1212 GST_TRACE_OBJECT (comm->element, "Writing state change %u: %s -> %s",
1213 comm->send_id,
1214 gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
1215 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
1216
1217 gst_byte_writer_init (&bw);
1218 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1219 goto write_failed;
1220 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1221 goto write_failed;
1222 size = sizeof (transition);
1223 if (!gst_byte_writer_put_uint32_le (&bw, size))
1224 goto write_failed;
1225 if (!gst_byte_writer_put_uint32_le (&bw, transition))
1226 goto write_failed;
1227
1228 if (!write_byte_writer_to_fd (comm, &bw))
1229 goto write_failed;
1230
1231 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1232 ACK_TYPE_TIMED, COMM_REQUEST_TYPE_STATE_CHANGE))
1233 goto write_failed;
1234 ret = ret32;
1235
1236 done:
1237 g_mutex_unlock (&comm->mutex);
1238 gst_byte_writer_reset (&bw);
1239 return ret;
1240
1241 write_failed:
1242 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1243 ("Failed to write to socket"));
1244 ret = GST_STATE_CHANGE_FAILURE;
1245 goto done;
1246 }
1247
1248 static gboolean
is_valid_state_change(GstStateChange transition)1249 is_valid_state_change (GstStateChange transition)
1250 {
1251 if (transition == GST_STATE_CHANGE_NULL_TO_READY)
1252 return TRUE;
1253 if (transition == GST_STATE_CHANGE_READY_TO_PAUSED)
1254 return TRUE;
1255 if (transition == GST_STATE_CHANGE_PAUSED_TO_PLAYING)
1256 return TRUE;
1257 if (transition == GST_STATE_CHANGE_PLAYING_TO_PAUSED)
1258 return TRUE;
1259 if (transition == GST_STATE_CHANGE_PAUSED_TO_READY)
1260 return TRUE;
1261 if (transition == GST_STATE_CHANGE_READY_TO_NULL)
1262 return TRUE;
1263 if (GST_STATE_TRANSITION_CURRENT (transition) ==
1264 GST_STATE_TRANSITION_NEXT (transition))
1265 return TRUE;
1266 return FALSE;
1267 }
1268
1269 static gboolean
gst_ipc_pipeline_comm_read_state_change(GstIpcPipelineComm * comm,guint32 size,guint32 * transition)1270 gst_ipc_pipeline_comm_read_state_change (GstIpcPipelineComm * comm,
1271 guint32 size, guint32 * transition)
1272 {
1273 guint32 mapped_size = size;
1274 const guint8 *payload;
1275
1276 /* this should not be called if we don't have enough yet */
1277 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
1278 g_return_val_if_fail (size >= sizeof (*transition), FALSE);
1279
1280 payload = gst_adapter_map (comm->adapter, size);
1281 if (!payload)
1282 return FALSE;
1283 memcpy (transition, payload, sizeof (*transition));
1284 gst_adapter_unmap (comm->adapter);
1285 gst_adapter_flush (comm->adapter, mapped_size);
1286 return is_valid_state_change (*transition);
1287 }
1288
1289 void
gst_ipc_pipeline_comm_write_state_lost_to_fd(GstIpcPipelineComm * comm)1290 gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm)
1291 {
1292 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST;
1293 guint32 size;
1294 GstByteWriter bw;
1295
1296 g_mutex_lock (&comm->mutex);
1297 ++comm->send_id;
1298
1299 GST_TRACE_OBJECT (comm->element, "Writing state-lost %u", comm->send_id);
1300 gst_byte_writer_init (&bw);
1301 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1302 goto write_failed;
1303 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1304 goto write_failed;
1305 size = 0;
1306 if (!gst_byte_writer_put_uint32_le (&bw, size))
1307 goto write_failed;
1308
1309 if (!write_byte_writer_to_fd (comm, &bw))
1310 goto write_failed;
1311
1312 done:
1313 g_mutex_unlock (&comm->mutex);
1314 gst_byte_writer_reset (&bw);
1315 return;
1316
1317 write_failed:
1318 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1319 ("Failed to write to socket"));
1320 goto done;
1321 }
1322
1323 static gboolean
gst_ipc_pipeline_comm_read_state_lost(GstIpcPipelineComm * comm,guint32 size)1324 gst_ipc_pipeline_comm_read_state_lost (GstIpcPipelineComm * comm, guint32 size)
1325 {
1326 /* no payload */
1327 return TRUE;
1328 }
1329
1330 static gboolean
gst_ipc_pipeline_comm_write_gerror_message_to_fd(GstIpcPipelineComm * comm,GstMessage * message)1331 gst_ipc_pipeline_comm_write_gerror_message_to_fd (GstIpcPipelineComm * comm,
1332 GstMessage * message)
1333 {
1334 const unsigned char payload_type =
1335 GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE;
1336 gboolean ret;
1337 guint32 code, size, ret32 = TRUE;
1338 char *str = NULL;
1339 GError *error;
1340 char *extra_message;
1341 const char *domain_string;
1342 unsigned char msgtype;
1343 GstByteWriter bw;
1344
1345 g_mutex_lock (&comm->mutex);
1346 ++comm->send_id;
1347
1348 if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) {
1349 gst_message_parse_error (message, &error, &extra_message);
1350 msgtype = 2;
1351 } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) {
1352 gst_message_parse_warning (message, &error, &extra_message);
1353 msgtype = 1;
1354 } else {
1355 gst_message_parse_info (message, &error, &extra_message);
1356 msgtype = 0;
1357 }
1358 code = error->code;
1359 domain_string = g_quark_to_string (error->domain);
1360 GST_TRACE_OBJECT (comm->element,
1361 "Writing error %u: domain %s, code %u, message %s, extra message %s",
1362 comm->send_id, domain_string, error->code, error->message, extra_message);
1363
1364 gst_byte_writer_init (&bw);
1365 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1366 goto write_failed;
1367 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1368 goto write_failed;
1369
1370 size = sizeof (size);
1371 size += 1;
1372 size += strlen (domain_string) + 1;
1373 size += sizeof (code);
1374 size += sizeof (size);
1375 size += error->message ? strlen (error->message) + 1 : 0;
1376 size += sizeof (size);
1377 size += extra_message ? strlen (extra_message) + 1 : 0;
1378
1379 if (!gst_byte_writer_put_uint32_le (&bw, size))
1380 goto write_failed;
1381
1382 if (!gst_byte_writer_put_uint8 (&bw, msgtype))
1383 goto write_failed;
1384 size = strlen (domain_string) + 1;
1385 if (!gst_byte_writer_put_uint32_le (&bw, size))
1386 goto write_failed;
1387 if (!gst_byte_writer_put_data (&bw, (const guint8 *) domain_string, size))
1388 goto write_failed;
1389 if (!gst_byte_writer_put_uint32_le (&bw, code))
1390 goto write_failed;
1391 size = error->message ? strlen (error->message) + 1 : 0;
1392 if (!gst_byte_writer_put_uint32_le (&bw, size))
1393 goto write_failed;
1394 if (error->message) {
1395 if (!gst_byte_writer_put_data (&bw, (const guint8 *) error->message, size))
1396 goto write_failed;
1397 }
1398 size = extra_message ? strlen (extra_message) + 1 : 0;
1399 if (!gst_byte_writer_put_uint32_le (&bw, size))
1400 goto write_failed;
1401 if (extra_message) {
1402 if (!gst_byte_writer_put_data (&bw, (const guint8 *) extra_message, size))
1403 goto write_failed;
1404 }
1405
1406 if (!write_byte_writer_to_fd (comm, &bw))
1407 goto write_failed;
1408
1409 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1410 ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
1411 goto write_failed;
1412
1413 ret = ret32;
1414
1415 done:
1416 g_mutex_unlock (&comm->mutex);
1417 g_free (str);
1418 g_error_free (error);
1419 g_free (extra_message);
1420 gst_byte_writer_reset (&bw);
1421 return ret;
1422
1423 write_failed:
1424 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1425 ("Failed to write to socket"));
1426 ret = FALSE;
1427 goto done;
1428 }
1429
1430 static GstMessage *
gst_ipc_pipeline_comm_read_gerror_message(GstIpcPipelineComm * comm,guint32 size)1431 gst_ipc_pipeline_comm_read_gerror_message (GstIpcPipelineComm * comm,
1432 guint32 size)
1433 {
1434 GstMessage *message = NULL;
1435 guint32 code;
1436 GQuark domain;
1437 const char *msg, *extra_message;
1438 GError *error;
1439 unsigned char msgtype;
1440 guint32 mapped_size = size;
1441 const guint8 *payload;
1442
1443 /* this should not be called if we don't have enough yet */
1444 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1445 g_return_val_if_fail (size >= sizeof (code) + sizeof (size) * 3 + 1 + 1,
1446 NULL);
1447
1448 payload = gst_adapter_map (comm->adapter, mapped_size);
1449 if (!payload)
1450 return NULL;
1451 msgtype = *payload++;
1452 memcpy (&size, payload, sizeof (size));
1453 payload += sizeof (size);
1454 if (payload[size - 1])
1455 goto done;
1456 domain = g_quark_from_string ((const char *) payload);
1457 payload += size;
1458
1459 memcpy (&code, payload, sizeof (code));
1460 payload += sizeof (code);
1461
1462 memcpy (&size, payload, sizeof (size));
1463 payload += sizeof (size);
1464 if (size) {
1465 if (payload[size - 1])
1466 goto done;
1467 msg = (const char *) payload;
1468 } else {
1469 msg = NULL;
1470 }
1471 payload += size;
1472
1473 memcpy (&size, payload, sizeof (size));
1474 payload += sizeof (size);
1475 if (size) {
1476 if (payload[size - 1])
1477 goto done;
1478 extra_message = (const char *) payload;
1479 } else {
1480 extra_message = NULL;
1481 }
1482 payload += size;
1483
1484 error = g_error_new (domain, code, "%s", msg);
1485 if (msgtype == 2)
1486 message =
1487 gst_message_new_error (GST_OBJECT (comm->element), error,
1488 extra_message);
1489 else if (msgtype == 1)
1490 message =
1491 gst_message_new_warning (GST_OBJECT (comm->element), error,
1492 extra_message);
1493 else
1494 message =
1495 gst_message_new_info (GST_OBJECT (comm->element), error, extra_message);
1496 g_error_free (error);
1497
1498 done:
1499 gst_adapter_unmap (comm->adapter);
1500 gst_adapter_flush (comm->adapter, mapped_size);
1501
1502 return message;
1503 }
1504
1505 gboolean
gst_ipc_pipeline_comm_write_message_to_fd(GstIpcPipelineComm * comm,GstMessage * message)1506 gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm,
1507 GstMessage * message)
1508 {
1509 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE;
1510 gboolean ret;
1511 guint32 type, size, ret32 = TRUE, slen;
1512 char *str = NULL;
1513 const GstStructure *structure;
1514 GstByteWriter bw;
1515
1516 /* we special case error as gst can't serialize/de-serialize it */
1517 if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR
1518 || GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING
1519 || GST_MESSAGE_TYPE (message) == GST_MESSAGE_INFO)
1520 return gst_ipc_pipeline_comm_write_gerror_message_to_fd (comm, message);
1521
1522 g_mutex_lock (&comm->mutex);
1523 ++comm->send_id;
1524
1525 GST_TRACE_OBJECT (comm->element, "Writing message %u: %" GST_PTR_FORMAT,
1526 comm->send_id, message);
1527
1528 gst_byte_writer_init (&bw);
1529 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1530 goto write_failed;
1531 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1532 goto write_failed;
1533 structure = gst_message_get_structure (message);
1534 if (structure) {
1535 str = gst_structure_to_string (structure);
1536 slen = strlen (str);
1537 } else {
1538 str = NULL;
1539 slen = 0;
1540 }
1541 size = sizeof (type) + slen + 1;
1542 if (!gst_byte_writer_put_uint32_le (&bw, size))
1543 goto write_failed;
1544
1545 type = GST_MESSAGE_TYPE (message);
1546 if (!gst_byte_writer_put_uint32_le (&bw, type))
1547 goto write_failed;
1548 size -= sizeof (type);
1549 if (str) {
1550 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
1551 goto write_failed;
1552 } else {
1553 if (!gst_byte_writer_put_uint8 (&bw, 0))
1554 goto write_failed;
1555 }
1556
1557 if (!write_byte_writer_to_fd (comm, &bw))
1558 goto write_failed;
1559
1560 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1561 ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
1562 goto write_failed;
1563
1564 ret = ret32;
1565
1566 done:
1567 g_mutex_unlock (&comm->mutex);
1568 g_free (str);
1569 gst_byte_writer_reset (&bw);
1570 return ret;
1571
1572 write_failed:
1573 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1574 ("Failed to write to socket"));
1575 ret = FALSE;
1576 goto done;
1577 }
1578
1579 static GstMessage *
gst_ipc_pipeline_comm_read_message(GstIpcPipelineComm * comm,guint32 size)1580 gst_ipc_pipeline_comm_read_message (GstIpcPipelineComm * comm, guint32 size)
1581 {
1582 GstMessage *message = NULL;
1583 gchar *end = NULL;
1584 GstStructure *structure;
1585 guint32 type;
1586 guint32 mapped_size = size;
1587 const guint8 *payload;
1588
1589 /* this should not be called if we don't have enough yet */
1590 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1591 g_return_val_if_fail (size >= sizeof (type), NULL);
1592
1593 payload = gst_adapter_map (comm->adapter, mapped_size);
1594 if (!payload)
1595 return NULL;
1596 memcpy (&type, payload, sizeof (type));
1597 payload += sizeof (type);
1598 size -= sizeof (type);
1599 if (size == 0)
1600 goto done;
1601
1602 if (payload[size - 1])
1603 goto done;
1604 if (*payload) {
1605 structure = gst_structure_from_string ((const char *) payload, &end);
1606 } else {
1607 structure = NULL;
1608 }
1609
1610 message =
1611 gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
1612
1613 done:
1614 gst_adapter_unmap (comm->adapter);
1615 gst_adapter_flush (comm->adapter, mapped_size);
1616
1617 return message;
1618 }
1619
1620 void
gst_ipc_pipeline_comm_init(GstIpcPipelineComm * comm,GstElement * element)1621 gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element)
1622 {
1623 g_mutex_init (&comm->mutex);
1624 comm->element = element;
1625 comm->fdin = comm->fdout = -1;
1626 comm->ack_time = DEFAULT_ACK_TIME;
1627 comm->waiting_ids =
1628 g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
1629 (GDestroyNotify) comm_request_free);
1630 comm->adapter = gst_adapter_new ();
1631 comm->poll = gst_poll_new (TRUE);
1632 gst_poll_fd_init (&comm->pollFDin);
1633 }
1634
1635 void
gst_ipc_pipeline_comm_clear(GstIpcPipelineComm * comm)1636 gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm)
1637 {
1638 g_hash_table_destroy (comm->waiting_ids);
1639 gst_object_unref (comm->adapter);
1640 gst_poll_free (comm->poll);
1641 g_mutex_clear (&comm->mutex);
1642 }
1643
1644 static void
cancel_request(gpointer key,gpointer value,gpointer user_data,GstFlowReturn fret)1645 cancel_request (gpointer key, gpointer value, gpointer user_data,
1646 GstFlowReturn fret)
1647 {
1648 GstIpcPipelineComm *comm = (GstIpcPipelineComm *) user_data;
1649 guint32 id = GPOINTER_TO_INT (key);
1650 CommRequest *req = (CommRequest *) value;
1651
1652 GST_TRACE_OBJECT (comm->element, "Cancelling request %u, type %d", id,
1653 req->type);
1654 req->ret = fret;
1655 req->replied = TRUE;
1656 g_cond_signal (&req->cond);
1657 }
1658
1659 static void
cancel_request_error(gpointer key,gpointer value,gpointer user_data)1660 cancel_request_error (gpointer key, gpointer value, gpointer user_data)
1661 {
1662 CommRequest *req = (CommRequest *) value;
1663 GstFlowReturn fret = comm_request_ret_get_failure_value (req->type);
1664
1665 cancel_request (key, value, user_data, fret);
1666 }
1667
1668 void
gst_ipc_pipeline_comm_cancel(GstIpcPipelineComm * comm,gboolean cleanup)1669 gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, gboolean cleanup)
1670 {
1671 g_mutex_lock (&comm->mutex);
1672 g_hash_table_foreach (comm->waiting_ids, cancel_request_error, comm);
1673 if (cleanup) {
1674 g_hash_table_unref (comm->waiting_ids);
1675 comm->waiting_ids =
1676 g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
1677 (GDestroyNotify) comm_request_free);
1678 }
1679 g_mutex_unlock (&comm->mutex);
1680 }
1681
1682 static gboolean
set_field(GQuark field_id,const GValue * value,gpointer user_data)1683 set_field (GQuark field_id, const GValue * value, gpointer user_data)
1684 {
1685 GstStructure *structure = user_data;
1686
1687 gst_structure_id_set_value (structure, field_id, value);
1688
1689 return TRUE;
1690 }
1691
1692 static gboolean
gst_ipc_pipeline_comm_reply_request(GstIpcPipelineComm * comm,guint32 id,GstFlowReturn ret,GstQuery * query)1693 gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id,
1694 GstFlowReturn ret, GstQuery * query)
1695 {
1696 CommRequest *req;
1697
1698 req = g_hash_table_lookup (comm->waiting_ids, GINT_TO_POINTER (id));
1699 if (!req) {
1700 GST_WARNING_OBJECT (comm->element, "Got reply for unknown request %u", id);
1701 return FALSE;
1702 }
1703
1704 GST_TRACE_OBJECT (comm->element, "Got reply %d (%s) for request %u", ret,
1705 comm_request_ret_get_name (req->type, ret), req->id);
1706 req->replied = TRUE;
1707 req->ret = ret;
1708 if (query) {
1709 if (req->query) {
1710 /* We need to update the original query in place, as the caller
1711 will expect the object to be the same */
1712 GstStructure *structure = gst_query_writable_structure (req->query);
1713 gst_structure_remove_all_fields (structure);
1714 gst_structure_foreach (gst_query_get_structure (query), set_field,
1715 structure);
1716 } else {
1717 GST_WARNING_OBJECT (comm->element,
1718 "Got query reply, but no query was in the request");
1719 }
1720 }
1721 g_cond_signal (&req->cond);
1722 return TRUE;
1723 }
1724
1725 static gint
update_adapter(GstIpcPipelineComm * comm)1726 update_adapter (GstIpcPipelineComm * comm)
1727 {
1728 GstMemory *mem = NULL;
1729 GstBuffer *buf;
1730 GstMapInfo map;
1731 ssize_t sz;
1732 gint ret = 0;
1733
1734 again:
1735 /* update pollFDin if necessary (fdin changed or we lost our parent).
1736 * we do not allow a parent-less element to communicate with its peer
1737 * in order to avoid race conditions where the slave tries to change
1738 * the state of its parent pipeline while it is not yet added in that
1739 * pipeline. */
1740 if (comm->pollFDin.fd != comm->fdin || !GST_OBJECT_PARENT (comm->element)) {
1741 if (comm->pollFDin.fd != -1) {
1742 GST_DEBUG_OBJECT (comm->element, "Stop watching fd %d",
1743 comm->pollFDin.fd);
1744 gst_poll_remove_fd (comm->poll, &comm->pollFDin);
1745 gst_poll_fd_init (&comm->pollFDin);
1746 }
1747 if (comm->fdin != -1 && GST_OBJECT_PARENT (comm->element)) {
1748 GST_DEBUG_OBJECT (comm->element, "Start watching fd %d", comm->fdin);
1749 comm->pollFDin.fd = comm->fdin;
1750 gst_poll_add_fd (comm->poll, &comm->pollFDin);
1751 gst_poll_fd_ctl_read (comm->poll, &comm->pollFDin, TRUE);
1752 }
1753 }
1754
1755 /* wait for activity on fdin or a flush */
1756 if (gst_poll_wait (comm->poll, 100 * GST_MSECOND) < 0) {
1757 if (errno == EAGAIN)
1758 goto again;
1759 /* error out, unless interrupted or flushing */
1760 if (errno != EINTR)
1761 ret = (errno == EBUSY) ? 2 : 1;
1762 }
1763
1764 /* read from fdin if possible and push data to our adapter */
1765 if (comm->pollFDin.fd >= 0
1766 && gst_poll_fd_can_read (comm->poll, &comm->pollFDin)) {
1767 if (!mem)
1768 mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL);
1769
1770 gst_memory_map (mem, &map, GST_MAP_WRITE);
1771 #ifdef _MSC_VER
1772 sz = recv (comm->pollFDin.fd, map.data, map.size, 0);
1773 if (sz < 0) {
1774 int last_error = WSAGetLastError ();
1775 if (last_error == WSAEWOULDBLOCK) {
1776 errno = EAGAIN;
1777 } else {
1778 errno = last_error;
1779 }
1780 }
1781 #else
1782 sz = read (comm->pollFDin.fd, map.data, map.size);
1783 #endif
1784 gst_memory_unmap (mem, &map);
1785
1786 if (sz <= 0) {
1787 if (errno == EAGAIN)
1788 goto again;
1789 /* error out, unless interrupted */
1790 if (errno != EINTR)
1791 ret = 1;
1792 } else {
1793 gst_memory_resize (mem, 0, sz);
1794 buf = gst_buffer_new ();
1795 gst_buffer_append_memory (buf, mem);
1796 mem = NULL;
1797 GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
1798 gst_adapter_push (comm->adapter, buf);
1799 }
1800 }
1801
1802 if (mem)
1803 gst_memory_unref (mem);
1804
1805 return ret;
1806 }
1807
1808 static gboolean
read_many(GstIpcPipelineComm * comm)1809 read_many (GstIpcPipelineComm * comm)
1810 {
1811 gboolean ret = TRUE;
1812 gsize available;
1813 const guint8 *payload;
1814
1815 while (1)
1816 switch (comm->state) {
1817 case GST_IPC_PIPELINE_COMM_STATE_TYPE:
1818 {
1819 guint8 type;
1820 guint32 mapped_size;
1821
1822 available = gst_adapter_available (comm->adapter);
1823 mapped_size = 1 + sizeof (gint32) * 2;
1824 if (available < mapped_size)
1825 goto done;
1826
1827 payload = gst_adapter_map (comm->adapter, mapped_size);
1828 type = *payload++;
1829 g_mutex_lock (&comm->mutex);
1830 memcpy (&comm->id, payload, sizeof (guint32));
1831 memcpy (&comm->payload_length, payload + 4, sizeof (guint32));
1832 g_mutex_unlock (&comm->mutex);
1833 gst_adapter_unmap (comm->adapter);
1834 gst_adapter_flush (comm->adapter, mapped_size);
1835 GST_TRACE_OBJECT (comm->element, "Got id %u, type %d, payload %u",
1836 comm->id, type, comm->payload_length);
1837 switch (type) {
1838 case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
1839 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
1840 case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
1841 case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
1842 case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
1843 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
1844 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
1845 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
1846 case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
1847 case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
1848 GST_TRACE_OBJECT (comm->element, "switching to state %s",
1849 gst_ipc_pipeline_comm_data_type_get_name (type));
1850 comm->state = type;
1851 break;
1852 default:
1853 goto out_of_sync;
1854 }
1855 break;
1856 }
1857 case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
1858 {
1859 const guint8 *rets;
1860 guint32 ret32;
1861
1862 available = gst_adapter_available (comm->adapter);
1863 if (available < comm->payload_length)
1864 goto done;
1865
1866 if (available < sizeof (guint32))
1867 goto ack_failed;
1868
1869 rets = gst_adapter_map (comm->adapter, sizeof (guint32));
1870 memcpy (&ret32, rets, sizeof (ret32));
1871 gst_adapter_unmap (comm->adapter);
1872 gst_adapter_flush (comm->adapter, sizeof (guint32));
1873 GST_TRACE_OBJECT (comm->element, "Got ACK %s for id %u",
1874 gst_flow_get_name (ret32), comm->id);
1875
1876 g_mutex_lock (&comm->mutex);
1877 gst_ipc_pipeline_comm_reply_request (comm, comm->id, ret32, NULL);
1878 g_mutex_unlock (&comm->mutex);
1879
1880 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1881 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1882 break;
1883 }
1884 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
1885 {
1886 GstQuery *query = NULL;
1887 gboolean qret;
1888
1889 available = gst_adapter_available (comm->adapter);
1890 if (available < comm->payload_length)
1891 goto done;
1892
1893 qret =
1894 gst_ipc_pipeline_comm_read_query_result (comm, comm->payload_length,
1895 &query);
1896
1897 GST_TRACE_OBJECT (comm->element,
1898 "deserialized query result %p: %d, %" GST_PTR_FORMAT, query, qret,
1899 query);
1900
1901 g_mutex_lock (&comm->mutex);
1902 gst_ipc_pipeline_comm_reply_request (comm, comm->id, qret, query);
1903 g_mutex_unlock (&comm->mutex);
1904
1905 gst_query_unref (query);
1906
1907 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1908 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1909 break;
1910 }
1911 case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
1912 {
1913 GstBuffer *buf;
1914
1915 available = gst_adapter_available (comm->adapter);
1916 if (available < comm->payload_length)
1917 goto done;
1918
1919 buf = gst_ipc_pipeline_comm_read_buffer (comm, comm->payload_length);
1920 if (!buf)
1921 goto buffer_failed;
1922
1923 /* set caps and push */
1924 GST_TRACE_OBJECT (comm->element,
1925 "deserialized buffer %p, pushing, timestamp %" GST_TIME_FORMAT
1926 ", duration %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT
1927 ", offset_end %" G_GINT64_FORMAT ", size %" G_GSIZE_FORMAT
1928 ", flags 0x%x", buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
1929 GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf),
1930 GST_BUFFER_OFFSET_END (buf), gst_buffer_get_size (buf),
1931 GST_BUFFER_FLAGS (buf));
1932
1933 gst_mini_object_set_qdata (GST_MINI_OBJECT (buf), QUARK_ID,
1934 GINT_TO_POINTER (comm->id), NULL);
1935
1936 if (comm->on_buffer)
1937 (*comm->on_buffer) (comm->id, buf, comm->user_data);
1938
1939 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1940 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1941 break;
1942 }
1943 case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
1944 {
1945 GstEvent *event;
1946 gboolean upstream;
1947
1948 available = gst_adapter_available (comm->adapter);
1949 if (available < comm->payload_length)
1950 goto done;
1951
1952 event = gst_ipc_pipeline_comm_read_event (comm, comm->payload_length,
1953 &upstream);
1954 if (!event)
1955 goto event_failed;
1956
1957 GST_TRACE_OBJECT (comm->element, "deserialized event %p of type %s",
1958 event, gst_event_type_get_name (event->type));
1959
1960 gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
1961 GINT_TO_POINTER (comm->id), NULL);
1962
1963 if (comm->on_event)
1964 (*comm->on_event) (comm->id, event, upstream, comm->user_data);
1965
1966 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1967 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1968 break;
1969 }
1970 case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
1971 {
1972 GstEvent *event;
1973
1974 available = gst_adapter_available (comm->adapter);
1975 if (available < comm->payload_length)
1976 goto done;
1977
1978 event = gst_ipc_pipeline_comm_read_sink_message_event (comm,
1979 comm->payload_length);
1980 if (!event)
1981 goto event_failed;
1982
1983 GST_TRACE_OBJECT (comm->element, "deserialized sink message event %p",
1984 event);
1985
1986 gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
1987 GINT_TO_POINTER (comm->id), NULL);
1988
1989 if (comm->on_event)
1990 (*comm->on_event) (comm->id, event, FALSE, comm->user_data);
1991
1992 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1993 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1994 break;
1995 }
1996 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
1997 {
1998 GstQuery *query;
1999 gboolean upstream;
2000
2001 available = gst_adapter_available (comm->adapter);
2002 if (available < comm->payload_length)
2003 goto done;
2004
2005 query = gst_ipc_pipeline_comm_read_query (comm, comm->payload_length,
2006 &upstream);
2007 if (!query)
2008 goto query_failed;
2009
2010 GST_TRACE_OBJECT (comm->element, "deserialized query %p of type %s",
2011 query, gst_query_type_get_name (query->type));
2012
2013 gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_ID,
2014 GINT_TO_POINTER (comm->id), NULL);
2015
2016 if (comm->on_query)
2017 (*comm->on_query) (comm->id, query, upstream, comm->user_data);
2018
2019 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2020 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2021 break;
2022 }
2023 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
2024 {
2025 guint32 transition;
2026
2027 available = gst_adapter_available (comm->adapter);
2028 if (available < comm->payload_length)
2029 goto done;
2030
2031 if (!gst_ipc_pipeline_comm_read_state_change (comm,
2032 comm->payload_length, &transition))
2033 goto state_change_failed;
2034
2035 GST_TRACE_OBJECT (comm->element,
2036 "deserialized state change request: %s -> %s",
2037 gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT
2038 (transition)),
2039 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT
2040 (transition)));
2041
2042 if (comm->on_state_change)
2043 (*comm->on_state_change) (comm->id, transition, comm->user_data);
2044
2045 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2046 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2047 break;
2048 }
2049 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
2050 {
2051 available = gst_adapter_available (comm->adapter);
2052 if (available < comm->payload_length)
2053 goto done;
2054
2055 if (!gst_ipc_pipeline_comm_read_state_lost (comm, comm->payload_length))
2056 goto event_failed;
2057
2058 GST_TRACE_OBJECT (comm->element, "deserialized state-lost");
2059
2060 if (comm->on_state_lost)
2061 (*comm->on_state_lost) (comm->user_data);
2062
2063 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2064 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2065 break;
2066 }
2067 case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
2068 {
2069 GstMessage *message;
2070
2071 available = gst_adapter_available (comm->adapter);
2072 if (available < comm->payload_length)
2073 goto done;
2074
2075 message = gst_ipc_pipeline_comm_read_message (comm,
2076 comm->payload_length);
2077 if (!message)
2078 goto message_failed;
2079
2080 GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
2081 message, gst_message_type_get_name (message->type));
2082
2083 if (comm->on_message)
2084 (*comm->on_message) (comm->id, message, comm->user_data);
2085
2086 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2087 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2088 break;
2089 }
2090 case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
2091 {
2092 GstMessage *message;
2093
2094 available = gst_adapter_available (comm->adapter);
2095 if (available < comm->payload_length)
2096 goto done;
2097
2098 message = gst_ipc_pipeline_comm_read_gerror_message (comm,
2099 comm->payload_length);
2100 if (!message)
2101 goto message_failed;
2102
2103 GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
2104 message, gst_message_type_get_name (message->type));
2105
2106 if (comm->on_message)
2107 (*comm->on_message) (comm->id, message, comm->user_data);
2108
2109 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2110 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2111 break;
2112 }
2113 }
2114
2115 done:
2116 return ret;
2117
2118 /* ERRORS */
2119 out_of_sync:
2120 {
2121 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2122 ("Socket out of sync"));
2123 ret = FALSE;
2124 goto done;
2125 }
2126 state_change_failed:
2127 {
2128 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2129 ("could not read state change from fd"));
2130 ret = FALSE;
2131 goto done;
2132 }
2133 ack_failed:
2134 {
2135 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2136 ("could not read ack from fd"));
2137 ret = FALSE;
2138 goto done;
2139 }
2140 buffer_failed:
2141 {
2142 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2143 ("could not read buffer from fd"));
2144 ret = FALSE;
2145 goto done;
2146 }
2147 event_failed:
2148 {
2149 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2150 ("could not read event from fd"));
2151 ret = FALSE;
2152 goto done;
2153 }
2154 message_failed:
2155 {
2156 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2157 ("could not read message from fd"));
2158 ret = FALSE;
2159 goto done;
2160 }
2161 query_failed:
2162 {
2163 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2164 ("could not read query from fd"));
2165 ret = FALSE;
2166 goto done;
2167 }
2168 }
2169
2170 static gpointer
reader_thread(gpointer data)2171 reader_thread (gpointer data)
2172 {
2173 GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data;
2174 gboolean running = TRUE;
2175 gint ret = 0;
2176
2177 while (running) {
2178 ret = update_adapter (comm);
2179 switch (ret) {
2180 case 1:
2181 GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
2182 ("Failed to read from socket"));
2183 running = FALSE;
2184 break;
2185 case 2:
2186 GST_INFO_OBJECT (comm->element, "We're stopping, all good");
2187 running = FALSE;
2188 break;
2189 default:
2190 read_many (comm);
2191 break;
2192 }
2193 }
2194
2195 GST_INFO_OBJECT (comm->element, "Reader thread ending");
2196 return NULL;
2197 }
2198
2199 gboolean
gst_ipc_pipeline_comm_start_reader_thread(GstIpcPipelineComm * comm,void (* on_buffer)(guint32,GstBuffer *,gpointer),void (* on_event)(guint32,GstEvent *,gboolean,gpointer),void (* on_query)(guint32,GstQuery *,gboolean,gpointer),void (* on_state_change)(guint32,GstStateChange,gpointer),void (* on_state_lost)(gpointer),void (* on_message)(guint32,GstMessage *,gpointer),gpointer user_data)2200 gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
2201 void (*on_buffer) (guint32, GstBuffer *, gpointer),
2202 void (*on_event) (guint32, GstEvent *, gboolean, gpointer),
2203 void (*on_query) (guint32, GstQuery *, gboolean, gpointer),
2204 void (*on_state_change) (guint32, GstStateChange, gpointer),
2205 void (*on_state_lost) (gpointer),
2206 void (*on_message) (guint32, GstMessage *, gpointer), gpointer user_data)
2207 {
2208 if (comm->reader_thread)
2209 return FALSE;
2210
2211 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2212 comm->on_buffer = on_buffer;
2213 comm->on_event = on_event;
2214 comm->on_query = on_query;
2215 comm->on_state_change = on_state_change;
2216 comm->on_state_lost = on_state_lost;
2217 comm->on_message = on_message;
2218 comm->user_data = user_data;
2219 gst_poll_set_flushing (comm->poll, FALSE);
2220 comm->reader_thread =
2221 g_thread_new ("reader", (GThreadFunc) reader_thread, comm);
2222 return TRUE;
2223 }
2224
2225 void
gst_ipc_pipeline_comm_stop_reader_thread(GstIpcPipelineComm * comm)2226 gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm)
2227 {
2228 if (!comm->reader_thread)
2229 return;
2230
2231 gst_poll_set_flushing (comm->poll, TRUE);
2232 g_thread_join (comm->reader_thread);
2233 comm->reader_thread = NULL;
2234 }
2235
2236 static gchar *
gst_value_serialize_event(const GValue * value)2237 gst_value_serialize_event (const GValue * value)
2238 {
2239 const GstStructure *structure;
2240 GstEvent *ev;
2241 gchar *type, *ts, *seqnum, *rt_offset, *str, *str64, *s;
2242 GValue val = G_VALUE_INIT;
2243
2244 ev = g_value_get_boxed (value);
2245
2246 g_value_init (&val, gst_event_type_get_type ());
2247 g_value_set_enum (&val, ev->type);
2248 type = gst_value_serialize (&val);
2249 g_value_unset (&val);
2250
2251 g_value_init (&val, G_TYPE_UINT64);
2252 g_value_set_uint64 (&val, ev->timestamp);
2253 ts = gst_value_serialize (&val);
2254 g_value_unset (&val);
2255
2256 g_value_init (&val, G_TYPE_UINT);
2257 g_value_set_uint (&val, ev->seqnum);
2258 seqnum = gst_value_serialize (&val);
2259 g_value_unset (&val);
2260
2261 g_value_init (&val, G_TYPE_INT64);
2262 g_value_set_int64 (&val, gst_event_get_running_time_offset (ev));
2263 rt_offset = gst_value_serialize (&val);
2264 g_value_unset (&val);
2265
2266 structure = gst_event_get_structure (ev);
2267 str = gst_structure_to_string (structure);
2268 str64 = g_base64_encode ((guchar *) str, strlen (str) + 1);
2269 g_strdelimit (str64, "=", '_');
2270 g_free (str);
2271
2272 s = g_strconcat (type, ":", ts, ":", seqnum, ":", rt_offset, ":", str64,
2273 NULL);
2274
2275 g_free (type);
2276 g_free (ts);
2277 g_free (seqnum);
2278 g_free (rt_offset);
2279 g_free (str64);
2280
2281 return s;
2282 }
2283
2284 static gboolean
gst_value_deserialize_event(GValue * dest,const gchar * s)2285 gst_value_deserialize_event (GValue * dest, const gchar * s)
2286 {
2287 GstEvent *ev = NULL;
2288 GValue val = G_VALUE_INIT;
2289 gboolean ret = FALSE;
2290 gchar **fields;
2291 gsize len;
2292
2293 fields = g_strsplit (s, ":", -1);
2294 if (g_strv_length (fields) != 5)
2295 goto wrong_length;
2296
2297 g_strdelimit (fields[4], "_", '=');
2298 g_base64_decode_inplace (fields[4], &len);
2299
2300 g_value_init (&val, gst_event_type_get_type ());
2301 if (!gst_value_deserialize (&val, fields[0]))
2302 goto fail;
2303 ev = gst_event_new_custom (g_value_get_enum (&val),
2304 gst_structure_new_from_string (fields[4]));
2305
2306 g_value_unset (&val);
2307 g_value_init (&val, G_TYPE_UINT64);
2308 if (!gst_value_deserialize (&val, fields[1]))
2309 goto fail;
2310 ev->timestamp = g_value_get_uint64 (&val);
2311
2312 g_value_unset (&val);
2313 g_value_init (&val, G_TYPE_UINT);
2314 if (!gst_value_deserialize (&val, fields[2]))
2315 goto fail;
2316 ev->seqnum = g_value_get_uint (&val);
2317
2318 g_value_unset (&val);
2319 g_value_init (&val, G_TYPE_INT64);
2320 if (!gst_value_deserialize (&val, fields[3]))
2321 goto fail;
2322 gst_event_set_running_time_offset (ev, g_value_get_int64 (&val));
2323
2324 g_value_take_boxed (dest, ev);
2325 ev = NULL;
2326 ret = TRUE;
2327
2328 fail:
2329 g_clear_pointer (&ev, gst_event_unref);
2330 g_value_unset (&val);
2331
2332 wrong_length:
2333 g_strfreev (fields);
2334 return ret;
2335 }
2336
2337 #define REGISTER_SERIALIZATION_NO_COMPARE(_gtype, _type) \
2338 G_STMT_START { \
2339 static GstValueTable gst_value = \
2340 { 0, NULL, \
2341 gst_value_serialize_ ## _type, gst_value_deserialize_ ## _type }; \
2342 gst_value.type = _gtype; \
2343 gst_value_register (&gst_value); \
2344 } G_STMT_END
2345
2346 void
gst_ipc_pipeline_comm_plugin_init(void)2347 gst_ipc_pipeline_comm_plugin_init (void)
2348 {
2349 static gsize once = 0;
2350
2351 if (g_once_init_enter (&once)) {
2352 GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_comm_debug, "ipcpipelinecomm", 0,
2353 "ipc pipeline comm");
2354 QUARK_ID = g_quark_from_static_string ("ipcpipeline-id");
2355 REGISTER_SERIALIZATION_NO_COMPARE (gst_event_get_type (), event);
2356 g_once_init_leave (&once, (gsize) 1);
2357 }
2358 }
2359