1 /* GStreamer
2 * Copyright (C) 2006 Thomas Vander Stichele <thomas at apestaart dot org>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
18 */
19
20 /**
21 * SECTION:element-gdpdepay
22 * @title: gdpdepay
23 * @see_also: gdppay
24 *
25 * This element depayloads GStreamer Data Protocol buffers back to deserialized
26 * buffers and events.
27 *
28 * |[
29 * gst-launch-1.0 -v -m filesrc location=test.gdp ! gdpdepay ! xvimagesink
30 * ]| This pipeline plays back a serialized video stream as created in the
31 * example for gdppay.
32 *
33 */
34
35 #ifdef HAVE_CONFIG_H
36 #include "config.h"
37 #endif
38
39 #include <string.h>
40
41 #include "dataprotocol.h"
42
43 #include "gstgdpelements.h"
44 #include "gstgdpdepay.h"
45
46 enum
47 {
48 PROP_0,
49 PROP_TS_OFFSET
50 };
51
52 static GstStaticPadTemplate gdp_depay_sink_template =
53 GST_STATIC_PAD_TEMPLATE ("sink",
54 GST_PAD_SINK,
55 GST_PAD_ALWAYS,
56 GST_STATIC_CAPS ("application/x-gdp"));
57
58 static GstStaticPadTemplate gdp_depay_src_template =
59 GST_STATIC_PAD_TEMPLATE ("src",
60 GST_PAD_SRC,
61 GST_PAD_ALWAYS,
62 GST_STATIC_CAPS_ANY);
63
64 GST_DEBUG_CATEGORY_STATIC (gst_gdp_depay_debug);
65 #define GST_CAT_DEFAULT gst_gdp_depay_debug
66
67 #define _do_init \
68 GST_DEBUG_CATEGORY_INIT (gst_gdp_depay_debug, "gdpdepay", 0, \
69 "GDP depayloader");
70 #define gst_gdp_depay_parent_class parent_class
71 G_DEFINE_TYPE_WITH_CODE (GstGDPDepay, gst_gdp_depay,
72 GST_TYPE_ELEMENT, _do_init);
73 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (gdpdepay, "gdpdepay", GST_RANK_NONE,
74 GST_TYPE_GDP_DEPAY, gdp_element_init (plugin));
75
76 static gboolean gst_gdp_depay_sink_event (GstPad * pad, GstObject * parent,
77 GstEvent * event);
78 static gboolean gst_gdp_depay_src_event (GstPad * pad, GstObject * parent,
79 GstEvent * event);
80
81 static GstFlowReturn gst_gdp_depay_chain (GstPad * pad, GstObject * parent,
82 GstBuffer * buffer);
83
84 static GstStateChangeReturn gst_gdp_depay_change_state (GstElement *
85 element, GstStateChange transition);
86
87 static void gst_gdp_depay_finalize (GObject * object);
88 static void gst_gdp_depay_set_property (GObject * object, guint prop_id,
89 const GValue * value, GParamSpec * pspec);
90 static void gst_gdp_depay_get_property (GObject * object, guint prop_id,
91 GValue * value, GParamSpec * pspec);
92 static void gst_gdp_depay_decide_allocation (GstGDPDepay * depay);
93
94 static void
gst_gdp_depay_class_init(GstGDPDepayClass * klass)95 gst_gdp_depay_class_init (GstGDPDepayClass * klass)
96 {
97 GObjectClass *gobject_class;
98 GstElementClass *gstelement_class;
99
100 gobject_class = (GObjectClass *) klass;
101 gstelement_class = (GstElementClass *) klass;
102
103 gobject_class->set_property = gst_gdp_depay_set_property;
104 gobject_class->get_property = gst_gdp_depay_get_property;
105
106 g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
107 g_param_spec_int64 ("ts-offset", "Timestamp Offset",
108 "Timestamp Offset",
109 G_MININT64, G_MAXINT64, 0,
110 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
111
112 gst_element_class_set_static_metadata (gstelement_class,
113 "GDP Depayloader", "GDP/Depayloader",
114 "Depayloads GStreamer Data Protocol buffers",
115 "Thomas Vander Stichele <thomas at apestaart dot org>");
116
117 gst_element_class_add_static_pad_template (gstelement_class,
118 &gdp_depay_sink_template);
119 gst_element_class_add_static_pad_template (gstelement_class,
120 &gdp_depay_src_template);
121
122 gstelement_class->change_state =
123 GST_DEBUG_FUNCPTR (gst_gdp_depay_change_state);
124 gobject_class->finalize = gst_gdp_depay_finalize;
125 }
126
127 static void
gst_gdp_depay_init(GstGDPDepay * gdpdepay)128 gst_gdp_depay_init (GstGDPDepay * gdpdepay)
129 {
130 gdpdepay->sinkpad =
131 gst_pad_new_from_static_template (&gdp_depay_sink_template, "sink");
132 gst_pad_set_chain_function (gdpdepay->sinkpad,
133 GST_DEBUG_FUNCPTR (gst_gdp_depay_chain));
134 gst_pad_set_event_function (gdpdepay->sinkpad,
135 GST_DEBUG_FUNCPTR (gst_gdp_depay_sink_event));
136 gst_element_add_pad (GST_ELEMENT (gdpdepay), gdpdepay->sinkpad);
137
138 gdpdepay->srcpad =
139 gst_pad_new_from_static_template (&gdp_depay_src_template, "src");
140 gst_pad_set_event_function (gdpdepay->srcpad,
141 GST_DEBUG_FUNCPTR (gst_gdp_depay_src_event));
142 /* our caps will always be decided by the incoming GDP caps buffers */
143 gst_pad_use_fixed_caps (gdpdepay->srcpad);
144 gst_element_add_pad (GST_ELEMENT (gdpdepay), gdpdepay->srcpad);
145
146 gdpdepay->adapter = gst_adapter_new ();
147
148 gdpdepay->allocator = NULL;
149 gst_allocation_params_init (&gdpdepay->allocation_params);
150 }
151
152 static void
gst_gdp_depay_finalize(GObject * gobject)153 gst_gdp_depay_finalize (GObject * gobject)
154 {
155 GstGDPDepay *this;
156
157 this = GST_GDP_DEPAY (gobject);
158 if (this->caps)
159 gst_caps_unref (this->caps);
160 g_free (this->header);
161 gst_adapter_clear (this->adapter);
162 g_object_unref (this->adapter);
163 if (this->allocator)
164 gst_object_unref (this->allocator);
165
166 GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (gobject));
167 }
168
169 static void
gst_gdp_depay_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)170 gst_gdp_depay_set_property (GObject * object, guint prop_id,
171 const GValue * value, GParamSpec * pspec)
172 {
173 GstGDPDepay *this;
174
175 this = GST_GDP_DEPAY (object);
176
177 switch (prop_id) {
178 case PROP_TS_OFFSET:
179 this->ts_offset = g_value_get_int64 (value);
180 break;
181 default:
182 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
183 break;
184 }
185 }
186
187 static void
gst_gdp_depay_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)188 gst_gdp_depay_get_property (GObject * object, guint prop_id,
189 GValue * value, GParamSpec * pspec)
190 {
191 GstGDPDepay *this;
192
193 this = GST_GDP_DEPAY (object);
194
195 switch (prop_id) {
196 case PROP_TS_OFFSET:
197 g_value_set_int64 (value, this->ts_offset);
198 break;
199 default:
200 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
201 break;
202 }
203 }
204
205 static gboolean
gst_gdp_depay_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)206 gst_gdp_depay_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
207 {
208 GstGDPDepay *this;
209 gboolean res = TRUE;
210
211 this = GST_GDP_DEPAY (parent);
212
213 switch (GST_EVENT_TYPE (event)) {
214 case GST_EVENT_FLUSH_START:
215 /* forward flush start */
216 res = gst_pad_push_event (this->srcpad, event);
217 break;
218 case GST_EVENT_FLUSH_STOP:
219 /* clear adapter on flush */
220 gst_adapter_clear (this->adapter);
221 /* forward flush stop */
222 res = gst_pad_push_event (this->srcpad, event);
223 break;
224 case GST_EVENT_EOS:
225 /* after EOS, we don't expect to output anything anymore */
226 res = gst_pad_push_event (this->srcpad, event);
227 break;
228 case GST_EVENT_SEGMENT:
229 case GST_EVENT_TAG:
230 case GST_EVENT_BUFFERSIZE:
231 default:
232 /* we unref most events as we take them from the datastream */
233 gst_event_unref (event);
234 break;
235 }
236
237 return res;
238 }
239
240 static gboolean
gst_gdp_depay_src_event(GstPad * pad,GstObject * parent,GstEvent * event)241 gst_gdp_depay_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
242 {
243 GstGDPDepay *this;
244 gboolean res = TRUE;
245
246 this = GST_GDP_DEPAY (parent);
247
248 switch (GST_EVENT_TYPE (event)) {
249 case GST_EVENT_SEEK:
250 /* we refuse seek for now. */
251 gst_event_unref (event);
252 res = FALSE;
253 break;
254 case GST_EVENT_QOS:
255 case GST_EVENT_NAVIGATION:
256 default:
257 /* everything else is passed */
258 res = gst_pad_push_event (this->sinkpad, event);
259 break;
260 }
261
262 return res;
263 }
264
265 static GstFlowReturn
gst_gdp_depay_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)266 gst_gdp_depay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
267 {
268 GstGDPDepay *this;
269 GstFlowReturn ret = GST_FLOW_OK;
270 GstCaps *caps;
271 GstBuffer *buf;
272 GstEvent *event;
273 guint available;
274
275 this = GST_GDP_DEPAY (parent);
276
277 if (gst_pad_check_reconfigure (this->srcpad)) {
278 gst_gdp_depay_decide_allocation (this);
279 }
280
281 /* On DISCONT, get rid of accumulated data. We assume a buffer after the
282 * DISCONT contains (part of) a new valid header, if not we error because we
283 * lost sync */
284 if (GST_BUFFER_IS_DISCONT (buffer)) {
285 gst_adapter_clear (this->adapter);
286 this->state = GST_GDP_DEPAY_STATE_HEADER;
287 }
288 gst_adapter_push (this->adapter, buffer);
289
290 while (TRUE) {
291 switch (this->state) {
292 case GST_GDP_DEPAY_STATE_HEADER:
293 {
294 guint8 *header;
295
296 /* collect a complete header, validate and store the header. Figure out
297 * the payload length and switch to the PAYLOAD state */
298 available = gst_adapter_available (this->adapter);
299 if (available < GST_DP_HEADER_LENGTH)
300 goto done;
301
302 GST_LOG_OBJECT (this, "reading GDP header from adapter");
303 header = gst_adapter_take (this->adapter, GST_DP_HEADER_LENGTH);
304 if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header)) {
305 g_free (header);
306 goto header_validate_error;
307 }
308
309 /* store types and payload length. Also store the header, which we need
310 * to make the payload. */
311 this->payload_length = gst_dp_header_payload_length (header);
312 this->payload_type = gst_dp_header_payload_type (header);
313 /* free previous header and store new one. */
314 g_free (this->header);
315 this->header = header;
316
317 GST_LOG_OBJECT (this,
318 "read GDP header, payload size %d, payload type %d, switching to state PAYLOAD",
319 this->payload_length, this->payload_type);
320 this->state = GST_GDP_DEPAY_STATE_PAYLOAD;
321 break;
322 }
323 case GST_GDP_DEPAY_STATE_PAYLOAD:
324 {
325 /* in this state we wait for all the payload data to be available in the
326 * adapter. Then we switch to the state where we actually process the
327 * payload. */
328 available = gst_adapter_available (this->adapter);
329 if (available < this->payload_length)
330 goto done;
331
332 /* change state based on type */
333 if (this->payload_type == GST_DP_PAYLOAD_BUFFER) {
334 GST_LOG_OBJECT (this, "switching to state BUFFER");
335 this->state = GST_GDP_DEPAY_STATE_BUFFER;
336 } else if (this->payload_type == GST_DP_PAYLOAD_CAPS) {
337 GST_LOG_OBJECT (this, "switching to state CAPS");
338 this->state = GST_GDP_DEPAY_STATE_CAPS;
339 } else if (this->payload_type >= GST_DP_PAYLOAD_EVENT_NONE) {
340 GST_LOG_OBJECT (this, "switching to state EVENT");
341 this->state = GST_GDP_DEPAY_STATE_EVENT;
342 } else {
343 goto wrong_type;
344 }
345
346 if (this->payload_length) {
347 const guint8 *data;
348 gboolean res;
349
350 data = gst_adapter_map (this->adapter, this->payload_length);
351 res = gst_dp_validate_payload (GST_DP_HEADER_LENGTH, this->header,
352 data);
353 gst_adapter_unmap (this->adapter);
354
355 if (!res)
356 goto payload_validate_error;
357 }
358
359 break;
360 }
361 case GST_GDP_DEPAY_STATE_BUFFER:
362 {
363 /* if we receive a buffer without caps first, we error out */
364 if (!this->caps)
365 goto no_caps;
366
367 GST_LOG_OBJECT (this, "reading GDP buffer from adapter");
368 buf =
369 gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, this->header,
370 this->allocator, &this->allocation_params);
371 if (!buf)
372 goto buffer_failed;
373
374 /* now take the payload if there is any */
375 if (this->payload_length > 0) {
376 GstMapInfo map;
377
378 gst_buffer_map (buf, &map, GST_MAP_WRITE);
379 gst_adapter_copy (this->adapter, map.data, 0, this->payload_length);
380 gst_buffer_unmap (buf, &map);
381
382 gst_adapter_flush (this->adapter, this->payload_length);
383 }
384
385 if (GST_BUFFER_TIMESTAMP (buf) > -this->ts_offset)
386 GST_BUFFER_TIMESTAMP (buf) += this->ts_offset;
387 else
388 GST_BUFFER_TIMESTAMP (buf) = 0;
389
390 if (GST_BUFFER_DTS (buf) > -this->ts_offset)
391 GST_BUFFER_DTS (buf) += this->ts_offset;
392 else
393 GST_BUFFER_DTS (buf) = 0;
394
395 /* set caps and push */
396 GST_LOG_OBJECT (this, "deserialized buffer %p, pushing, timestamp %"
397 GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
398 ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT
399 ", size %" G_GSIZE_FORMAT ", flags 0x%x",
400 buf,
401 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
402 GST_TIME_ARGS (GST_BUFFER_DURATION (buf)),
403 GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf),
404 gst_buffer_get_size (buf), GST_BUFFER_FLAGS (buf));
405 ret = gst_pad_push (this->srcpad, buf);
406 if (ret != GST_FLOW_OK)
407 goto push_error;
408
409 GST_LOG_OBJECT (this, "switching to state HEADER");
410 this->state = GST_GDP_DEPAY_STATE_HEADER;
411 break;
412 }
413 case GST_GDP_DEPAY_STATE_CAPS:
414 {
415 guint8 *payload;
416
417 /* take the payload of the caps */
418 GST_LOG_OBJECT (this, "reading GDP caps from adapter");
419 payload = gst_adapter_take (this->adapter, this->payload_length);
420 caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, this->header,
421 payload);
422 g_free (payload);
423 if (!caps)
424 goto caps_failed;
425
426 GST_DEBUG_OBJECT (this, "deserialized caps %" GST_PTR_FORMAT, caps);
427 gst_caps_replace (&(this->caps), caps);
428 gst_pad_set_caps (this->srcpad, caps);
429 gst_gdp_depay_decide_allocation (this);
430 /* drop the creation ref we still have */
431 gst_caps_unref (caps);
432
433 GST_LOG_OBJECT (this, "switching to state HEADER");
434 this->state = GST_GDP_DEPAY_STATE_HEADER;
435 break;
436 }
437 case GST_GDP_DEPAY_STATE_EVENT:
438 {
439 guint8 *payload;
440
441 GST_LOG_OBJECT (this, "reading GDP event from adapter");
442
443 /* adapter doesn't like 0 length payload */
444 if (this->payload_length > 0)
445 payload = gst_adapter_take (this->adapter, this->payload_length);
446 else
447 payload = NULL;
448 event = gst_dp_event_from_packet (GST_DP_HEADER_LENGTH, this->header,
449 payload);
450 g_free (payload);
451 if (!event)
452 goto event_failed;
453
454 GST_DEBUG_OBJECT (this, "deserialized event %p of type %s, pushing",
455 event, gst_event_type_get_name (event->type));
456 gst_pad_push_event (this->srcpad, event);
457
458 GST_LOG_OBJECT (this, "switching to state HEADER");
459 this->state = GST_GDP_DEPAY_STATE_HEADER;
460 break;
461 }
462 }
463 }
464
465 done:
466 return ret;
467
468 /* ERRORS */
469 header_validate_error:
470 {
471 GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
472 ("GDP packet header does not validate"));
473 ret = GST_FLOW_ERROR;
474 goto done;
475 }
476 payload_validate_error:
477 {
478 GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
479 ("GDP packet payload does not validate"));
480 ret = GST_FLOW_ERROR;
481 goto done;
482 }
483 wrong_type:
484 {
485 GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
486 ("GDP packet header is of wrong type"));
487 ret = GST_FLOW_ERROR;
488 goto done;
489 }
490 no_caps:
491 {
492 GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
493 ("Received a buffer without first receiving caps"));
494 ret = GST_FLOW_NOT_NEGOTIATED;
495 goto done;
496 }
497 buffer_failed:
498 {
499 GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
500 ("could not create buffer from GDP packet"));
501 ret = GST_FLOW_ERROR;
502 goto done;
503 }
504 push_error:
505 {
506 GST_WARNING_OBJECT (this, "pushing depayloaded buffer returned %d", ret);
507 goto done;
508 }
509 caps_failed:
510 {
511 GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
512 ("could not create caps from GDP packet"));
513 ret = GST_FLOW_ERROR;
514 goto done;
515 }
516 event_failed:
517 {
518 GST_ELEMENT_ERROR (this, STREAM, DECODE, (NULL),
519 ("could not create event from GDP packet"));
520 ret = GST_FLOW_ERROR;
521 goto done;
522 }
523 }
524
525 static GstStateChangeReturn
gst_gdp_depay_change_state(GstElement * element,GstStateChange transition)526 gst_gdp_depay_change_state (GstElement * element, GstStateChange transition)
527 {
528 GstStateChangeReturn ret;
529 GstGDPDepay *this = GST_GDP_DEPAY (element);
530
531 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
532
533 switch (transition) {
534 case GST_STATE_CHANGE_PAUSED_TO_READY:
535 if (this->caps) {
536 gst_caps_unref (this->caps);
537 this->caps = NULL;
538 }
539 gst_adapter_clear (this->adapter);
540 if (this->allocator)
541 gst_object_unref (this->allocator);
542 this->allocator = NULL;
543 gst_allocation_params_init (&this->allocation_params);
544 break;
545 default:
546 break;
547 }
548 return ret;
549 }
550
551 static void
gst_gdp_depay_decide_allocation(GstGDPDepay * gdpdepay)552 gst_gdp_depay_decide_allocation (GstGDPDepay * gdpdepay)
553 {
554 GstAllocator *allocator;
555 GstAllocationParams params;
556 GstQuery *query = NULL;
557 GstCaps *caps;
558
559 caps = gst_pad_query_caps (gdpdepay->srcpad, NULL);
560 if (!caps) {
561 GST_LOG_OBJECT (gdpdepay,
562 "No peer pad caps found. Using default allocator.");
563 return;
564 }
565
566 if (!gst_caps_is_fixed (caps)) {
567 GST_LOG_OBJECT (gdpdepay, "Caps on src pad are not fixed. Not querying.");
568 return;
569 }
570
571 query = gst_query_new_allocation (caps, TRUE);
572 if (!gst_pad_peer_query (gdpdepay->srcpad, query)) {
573 GST_WARNING_OBJECT (gdpdepay, "Peer allocation query failed.");
574 }
575
576 if (gst_query_get_n_allocation_params (query) > 0) {
577 gst_query_parse_nth_allocation_param (query, 0, &allocator, ¶ms);
578 } else {
579 allocator = NULL;
580 gst_allocation_params_init (¶ms);
581 }
582
583 if (gdpdepay->allocator)
584 gst_object_unref (gdpdepay->allocator);
585
586 gdpdepay->allocator = allocator;
587 gdpdepay->allocation_params = params;
588
589 gst_caps_unref (caps);
590 gst_query_unref (query);
591 }
592