1 /* GStreamer
2 * Copyright (C) <2005> Philippe Khalaf <burger@speedy.org>
3 * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
14 *
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20
21 /**
22 * SECTION:gstrtpbasedepayload
23 * @title: GstRTPBaseDepayload
24 * @short_description: Base class for RTP depayloader
25 *
26 * Provides a base class for RTP depayloaders
27 */
28 #ifdef HAVE_CONFIG_H
29 #include "config.h"
30 #endif
31
32 #include "gstrtpbasedepayload.h"
33 #include "gstrtpmeta.h"
34
35 GST_DEBUG_CATEGORY_STATIC (rtpbasedepayload_debug);
36 #define GST_CAT_DEFAULT (rtpbasedepayload_debug)
37
38 struct _GstRTPBaseDepayloadPrivate
39 {
40 GstClockTime npt_start;
41 GstClockTime npt_stop;
42 gdouble play_speed;
43 gdouble play_scale;
44 guint clock_base;
45
46 gboolean discont;
47 GstClockTime pts;
48 GstClockTime dts;
49 GstClockTime duration;
50
51 guint32 last_ssrc;
52 guint32 last_seqnum;
53 guint32 last_rtptime;
54 guint32 next_seqnum;
55
56 gboolean negotiated;
57
58 GstCaps *last_caps;
59 GstEvent *segment_event;
60 guint32 segment_seqnum; /* Note: this is a GstEvent seqnum */
61
62 gboolean source_info;
63 GstBuffer *input_buffer;
64 };
65
66 /* Filter signals and args */
67 enum
68 {
69 /* FILL ME */
70 LAST_SIGNAL
71 };
72
73 #define DEFAULT_SOURCE_INFO FALSE
74
75 enum
76 {
77 PROP_0,
78 PROP_STATS,
79 PROP_SOURCE_INFO,
80 PROP_LAST
81 };
82
83 static void gst_rtp_base_depayload_finalize (GObject * object);
84 static void gst_rtp_base_depayload_set_property (GObject * object,
85 guint prop_id, const GValue * value, GParamSpec * pspec);
86 static void gst_rtp_base_depayload_get_property (GObject * object,
87 guint prop_id, GValue * value, GParamSpec * pspec);
88
89 static GstFlowReturn gst_rtp_base_depayload_chain (GstPad * pad,
90 GstObject * parent, GstBuffer * in);
91 static GstFlowReturn gst_rtp_base_depayload_chain_list (GstPad * pad,
92 GstObject * parent, GstBufferList * list);
93 static gboolean gst_rtp_base_depayload_handle_sink_event (GstPad * pad,
94 GstObject * parent, GstEvent * event);
95
96 static GstStateChangeReturn gst_rtp_base_depayload_change_state (GstElement *
97 element, GstStateChange transition);
98
99 static gboolean gst_rtp_base_depayload_packet_lost (GstRTPBaseDepayload *
100 filter, GstEvent * event);
101 static gboolean gst_rtp_base_depayload_handle_event (GstRTPBaseDepayload *
102 filter, GstEvent * event);
103
104 static GstElementClass *parent_class = NULL;
105 static gint private_offset = 0;
106
107 static void gst_rtp_base_depayload_class_init (GstRTPBaseDepayloadClass *
108 klass);
109 static void gst_rtp_base_depayload_init (GstRTPBaseDepayload * rtpbasepayload,
110 GstRTPBaseDepayloadClass * klass);
111 static GstEvent *create_segment_event (GstRTPBaseDepayload * filter,
112 guint rtptime, GstClockTime position);
113
114 GType
gst_rtp_base_depayload_get_type(void)115 gst_rtp_base_depayload_get_type (void)
116 {
117 static GType rtp_base_depayload_type = 0;
118
119 if (g_once_init_enter ((gsize *) & rtp_base_depayload_type)) {
120 static const GTypeInfo rtp_base_depayload_info = {
121 sizeof (GstRTPBaseDepayloadClass),
122 NULL,
123 NULL,
124 (GClassInitFunc) gst_rtp_base_depayload_class_init,
125 NULL,
126 NULL,
127 sizeof (GstRTPBaseDepayload),
128 0,
129 (GInstanceInitFunc) gst_rtp_base_depayload_init,
130 };
131 GType _type;
132
133 _type = g_type_register_static (GST_TYPE_ELEMENT, "GstRTPBaseDepayload",
134 &rtp_base_depayload_info, G_TYPE_FLAG_ABSTRACT);
135
136 private_offset =
137 g_type_add_instance_private (_type,
138 sizeof (GstRTPBaseDepayloadPrivate));
139
140 g_once_init_leave ((gsize *) & rtp_base_depayload_type, _type);
141 }
142 return rtp_base_depayload_type;
143 }
144
145 static inline GstRTPBaseDepayloadPrivate *
gst_rtp_base_depayload_get_instance_private(GstRTPBaseDepayload * self)146 gst_rtp_base_depayload_get_instance_private (GstRTPBaseDepayload * self)
147 {
148 return (G_STRUCT_MEMBER_P (self, private_offset));
149 }
150
151 static void
gst_rtp_base_depayload_class_init(GstRTPBaseDepayloadClass * klass)152 gst_rtp_base_depayload_class_init (GstRTPBaseDepayloadClass * klass)
153 {
154 GObjectClass *gobject_class;
155 GstElementClass *gstelement_class;
156
157 gobject_class = G_OBJECT_CLASS (klass);
158 gstelement_class = (GstElementClass *) klass;
159 parent_class = g_type_class_peek_parent (klass);
160
161 if (private_offset != 0)
162 g_type_class_adjust_private_offset (klass, &private_offset);
163
164 gobject_class->finalize = gst_rtp_base_depayload_finalize;
165 gobject_class->set_property = gst_rtp_base_depayload_set_property;
166 gobject_class->get_property = gst_rtp_base_depayload_get_property;
167
168
169 /**
170 * GstRTPBaseDepayload:stats:
171 *
172 * Various depayloader statistics retrieved atomically (and are therefore
173 * synchroized with each other). This property return a GstStructure named
174 * application/x-rtp-depayload-stats containing the following fields relating to
175 * the last processed buffer and current state of the stream being depayloaded:
176 *
177 * * `clock-rate`: #G_TYPE_UINT, clock-rate of the stream
178 * * `npt-start`: #G_TYPE_UINT64, time of playback start
179 * * `npt-stop`: #G_TYPE_UINT64, time of playback stop
180 * * `play-speed`: #G_TYPE_DOUBLE, the playback speed
181 * * `play-scale`: #G_TYPE_DOUBLE, the playback scale
182 * * `running-time-dts`: #G_TYPE_UINT64, the last running-time of the
183 * last DTS
184 * * `running-time-pts`: #G_TYPE_UINT64, the last running-time of the
185 * last PTS
186 * * `seqnum`: #G_TYPE_UINT, the last seen seqnum
187 * * `timestamp`: #G_TYPE_UINT, the last seen RTP timestamp
188 **/
189 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_STATS,
190 g_param_spec_boxed ("stats", "Statistics", "Various statistics",
191 GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
192
193 /**
194 * GstRTPBaseDepayload:source-info:
195 *
196 * Add RTP source information found in RTP header as meta to output buffer.
197 *
198 * Since: 1.16
199 **/
200 g_object_class_install_property (gobject_class, PROP_SOURCE_INFO,
201 g_param_spec_boolean ("source-info", "RTP source information",
202 "Add RTP source information as buffer meta",
203 DEFAULT_SOURCE_INFO, G_PARAM_READWRITE));
204
205 gstelement_class->change_state = gst_rtp_base_depayload_change_state;
206
207 klass->packet_lost = gst_rtp_base_depayload_packet_lost;
208 klass->handle_event = gst_rtp_base_depayload_handle_event;
209
210 GST_DEBUG_CATEGORY_INIT (rtpbasedepayload_debug, "rtpbasedepayload", 0,
211 "Base class for RTP Depayloaders");
212 }
213
214 static void
gst_rtp_base_depayload_init(GstRTPBaseDepayload * filter,GstRTPBaseDepayloadClass * klass)215 gst_rtp_base_depayload_init (GstRTPBaseDepayload * filter,
216 GstRTPBaseDepayloadClass * klass)
217 {
218 GstPadTemplate *pad_template;
219 GstRTPBaseDepayloadPrivate *priv;
220
221 priv = gst_rtp_base_depayload_get_instance_private (filter);
222
223 filter->priv = priv;
224
225 GST_DEBUG_OBJECT (filter, "init");
226
227 pad_template =
228 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
229 g_return_if_fail (pad_template != NULL);
230 filter->sinkpad = gst_pad_new_from_template (pad_template, "sink");
231 gst_pad_set_chain_function (filter->sinkpad, gst_rtp_base_depayload_chain);
232 gst_pad_set_chain_list_function (filter->sinkpad,
233 gst_rtp_base_depayload_chain_list);
234 gst_pad_set_event_function (filter->sinkpad,
235 gst_rtp_base_depayload_handle_sink_event);
236 gst_element_add_pad (GST_ELEMENT (filter), filter->sinkpad);
237
238 pad_template =
239 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
240 g_return_if_fail (pad_template != NULL);
241 filter->srcpad = gst_pad_new_from_template (pad_template, "src");
242 gst_pad_use_fixed_caps (filter->srcpad);
243 gst_element_add_pad (GST_ELEMENT (filter), filter->srcpad);
244
245 priv->npt_start = 0;
246 priv->npt_stop = -1;
247 priv->play_speed = 1.0;
248 priv->play_scale = 1.0;
249 priv->clock_base = -1;
250 priv->dts = -1;
251 priv->pts = -1;
252 priv->duration = -1;
253 priv->source_info = DEFAULT_SOURCE_INFO;
254
255 gst_segment_init (&filter->segment, GST_FORMAT_UNDEFINED);
256 }
257
258 static void
gst_rtp_base_depayload_finalize(GObject * object)259 gst_rtp_base_depayload_finalize (GObject * object)
260 {
261 G_OBJECT_CLASS (parent_class)->finalize (object);
262 }
263
264 static gboolean
gst_rtp_base_depayload_setcaps(GstRTPBaseDepayload * filter,GstCaps * caps)265 gst_rtp_base_depayload_setcaps (GstRTPBaseDepayload * filter, GstCaps * caps)
266 {
267 GstRTPBaseDepayloadClass *bclass;
268 GstRTPBaseDepayloadPrivate *priv;
269 gboolean res;
270 GstStructure *caps_struct;
271 const GValue *value;
272
273 priv = filter->priv;
274
275 bclass = GST_RTP_BASE_DEPAYLOAD_GET_CLASS (filter);
276
277 GST_DEBUG_OBJECT (filter, "Set caps %" GST_PTR_FORMAT, caps);
278
279 if (priv->last_caps) {
280 if (gst_caps_is_equal (priv->last_caps, caps)) {
281 res = TRUE;
282 goto caps_not_changed;
283 } else {
284 gst_caps_unref (priv->last_caps);
285 priv->last_caps = NULL;
286 }
287 }
288
289 caps_struct = gst_caps_get_structure (caps, 0);
290
291 /* get other values for newsegment */
292 value = gst_structure_get_value (caps_struct, "npt-start");
293 if (value && G_VALUE_HOLDS_UINT64 (value))
294 priv->npt_start = g_value_get_uint64 (value);
295 else
296 priv->npt_start = 0;
297 GST_DEBUG_OBJECT (filter, "NPT start %" G_GUINT64_FORMAT, priv->npt_start);
298
299 value = gst_structure_get_value (caps_struct, "npt-stop");
300 if (value && G_VALUE_HOLDS_UINT64 (value))
301 priv->npt_stop = g_value_get_uint64 (value);
302 else
303 priv->npt_stop = -1;
304
305 GST_DEBUG_OBJECT (filter, "NPT stop %" G_GUINT64_FORMAT, priv->npt_stop);
306
307 value = gst_structure_get_value (caps_struct, "play-speed");
308 if (value && G_VALUE_HOLDS_DOUBLE (value))
309 priv->play_speed = g_value_get_double (value);
310 else
311 priv->play_speed = 1.0;
312
313 value = gst_structure_get_value (caps_struct, "play-scale");
314 if (value && G_VALUE_HOLDS_DOUBLE (value))
315 priv->play_scale = g_value_get_double (value);
316 else
317 priv->play_scale = 1.0;
318
319 value = gst_structure_get_value (caps_struct, "clock-base");
320 if (value && G_VALUE_HOLDS_UINT (value))
321 priv->clock_base = g_value_get_uint (value);
322 else
323 priv->clock_base = -1;
324
325 if (bclass->set_caps) {
326 res = bclass->set_caps (filter, caps);
327 if (!res) {
328 GST_WARNING_OBJECT (filter, "Subclass rejected caps %" GST_PTR_FORMAT,
329 caps);
330 }
331 } else {
332 res = TRUE;
333 }
334
335 priv->negotiated = res;
336
337 if (priv->negotiated)
338 priv->last_caps = gst_caps_ref (caps);
339
340 return res;
341
342 caps_not_changed:
343 {
344 GST_DEBUG_OBJECT (filter, "Caps did not change");
345 return res;
346 }
347 }
348
349 /* takes ownership of the input buffer */
350 static GstFlowReturn
gst_rtp_base_depayload_handle_buffer(GstRTPBaseDepayload * filter,GstRTPBaseDepayloadClass * bclass,GstBuffer * in)351 gst_rtp_base_depayload_handle_buffer (GstRTPBaseDepayload * filter,
352 GstRTPBaseDepayloadClass * bclass, GstBuffer * in)
353 {
354 GstBuffer *(*process_rtp_packet_func) (GstRTPBaseDepayload * base,
355 GstRTPBuffer * rtp_buffer);
356 GstBuffer *(*process_func) (GstRTPBaseDepayload * base, GstBuffer * in);
357 GstRTPBaseDepayloadPrivate *priv;
358 GstFlowReturn ret = GST_FLOW_OK;
359 GstBuffer *out_buf;
360 guint32 ssrc;
361 guint16 seqnum;
362 guint32 rtptime;
363 gboolean discont, buf_discont;
364 gint gap;
365 GstRTPBuffer rtp = { NULL };
366
367 priv = filter->priv;
368
369 process_func = bclass->process;
370 process_rtp_packet_func = bclass->process_rtp_packet;
371
372 /* we must have a setcaps first */
373 if (G_UNLIKELY (!priv->negotiated))
374 goto not_negotiated;
375
376 if (G_UNLIKELY (!gst_rtp_buffer_map (in, GST_MAP_READ, &rtp)))
377 goto invalid_buffer;
378
379 buf_discont = GST_BUFFER_IS_DISCONT (in);
380
381 priv->pts = GST_BUFFER_PTS (in);
382 priv->dts = GST_BUFFER_DTS (in);
383 priv->duration = GST_BUFFER_DURATION (in);
384
385 ssrc = gst_rtp_buffer_get_ssrc (&rtp);
386 seqnum = gst_rtp_buffer_get_seq (&rtp);
387 rtptime = gst_rtp_buffer_get_timestamp (&rtp);
388
389 priv->last_seqnum = seqnum;
390 priv->last_rtptime = rtptime;
391
392 discont = buf_discont;
393
394 GST_LOG_OBJECT (filter, "discont %d, seqnum %u, rtptime %u, pts %"
395 GST_TIME_FORMAT ", dts %" GST_TIME_FORMAT, buf_discont, seqnum, rtptime,
396 GST_TIME_ARGS (priv->pts), GST_TIME_ARGS (priv->dts));
397
398 /* Check seqnum. This is a very simple check that makes sure that the seqnums
399 * are strictly increasing, dropping anything that is out of the ordinary. We
400 * can only do this when the next_seqnum is known. */
401 if (G_LIKELY (priv->next_seqnum != -1)) {
402 if (ssrc != priv->last_ssrc) {
403 GST_LOG_OBJECT (filter,
404 "New ssrc %u (current ssrc %u), sender restarted",
405 ssrc, priv->last_ssrc);
406 discont = TRUE;
407 } else {
408 gap = gst_rtp_buffer_compare_seqnum (seqnum, priv->next_seqnum);
409
410 /* if we have no gap, all is fine */
411 if (G_UNLIKELY (gap != 0)) {
412 GST_LOG_OBJECT (filter, "got packet %u, expected %u, gap %d", seqnum,
413 priv->next_seqnum, gap);
414 if (gap < 0) {
415 /* seqnum > next_seqnum, we are missing some packets, this is always a
416 * DISCONT. */
417 GST_LOG_OBJECT (filter, "%d missing packets", gap);
418 discont = TRUE;
419 } else {
420 /* seqnum < next_seqnum, we have seen this packet before or the sender
421 * could be restarted. If the packet is not too old, we throw it away as
422 * a duplicate, otherwise we mark discont and continue. 100 misordered
423 * packets is a good threshold. See also RFC 4737. */
424 if (gap < 100)
425 goto dropping;
426
427 GST_LOG_OBJECT (filter,
428 "%d > 100, packet too old, sender likely restarted", gap);
429 discont = TRUE;
430 }
431 }
432 }
433 }
434 priv->next_seqnum = (seqnum + 1) & 0xffff;
435 priv->last_ssrc = ssrc;
436
437 if (G_UNLIKELY (discont)) {
438 priv->discont = TRUE;
439 if (!buf_discont) {
440 gpointer old_inbuf = in;
441
442 /* we detected a seqnum discont but the buffer was not flagged with a discont,
443 * set the discont flag so that the subclass can throw away old data. */
444 GST_LOG_OBJECT (filter, "mark DISCONT on input buffer");
445 in = gst_buffer_make_writable (in);
446 GST_BUFFER_FLAG_SET (in, GST_BUFFER_FLAG_DISCONT);
447 /* depayloaders will check flag on rtpbuffer->buffer, so if the input
448 * buffer was not writable already we need to remap to make our
449 * newly-flagged buffer current on the rtpbuffer */
450 if (in != old_inbuf) {
451 gst_rtp_buffer_unmap (&rtp);
452 if (G_UNLIKELY (!gst_rtp_buffer_map (in, GST_MAP_READ, &rtp)))
453 goto invalid_buffer;
454 }
455 }
456 }
457
458 /* prepare segment event if needed */
459 if (filter->need_newsegment) {
460 priv->segment_event = create_segment_event (filter, rtptime,
461 GST_BUFFER_PTS (in));
462 filter->need_newsegment = FALSE;
463 }
464
465 priv->input_buffer = in;
466
467 if (process_rtp_packet_func != NULL) {
468 out_buf = process_rtp_packet_func (filter, &rtp);
469 gst_rtp_buffer_unmap (&rtp);
470 } else if (process_func != NULL) {
471 gst_rtp_buffer_unmap (&rtp);
472 out_buf = process_func (filter, in);
473 } else {
474 goto no_process;
475 }
476
477 /* let's send it out to processing */
478 if (out_buf) {
479 ret = gst_rtp_base_depayload_push (filter, out_buf);
480 }
481
482 gst_buffer_unref (in);
483 priv->input_buffer = NULL;
484
485 return ret;
486
487 /* ERRORS */
488 not_negotiated:
489 {
490 /* this is not fatal but should be filtered earlier */
491 GST_ELEMENT_ERROR (filter, CORE, NEGOTIATION,
492 ("No RTP format was negotiated."),
493 ("Input buffers need to have RTP caps set on them. This is usually "
494 "achieved by setting the 'caps' property of the upstream source "
495 "element (often udpsrc or appsrc), or by putting a capsfilter "
496 "element before the depayloader and setting the 'caps' property "
497 "on that. Also see http://cgit.freedesktop.org/gstreamer/"
498 "gst-plugins-good/tree/gst/rtp/README"));
499 gst_buffer_unref (in);
500 return GST_FLOW_NOT_NEGOTIATED;
501 }
502 invalid_buffer:
503 {
504 /* this is not fatal but should be filtered earlier */
505 GST_ELEMENT_WARNING (filter, STREAM, DECODE, (NULL),
506 ("Received invalid RTP payload, dropping"));
507 gst_buffer_unref (in);
508 return GST_FLOW_OK;
509 }
510 dropping:
511 {
512 gst_rtp_buffer_unmap (&rtp);
513 GST_WARNING_OBJECT (filter, "%d <= 100, dropping old packet", gap);
514 gst_buffer_unref (in);
515 return GST_FLOW_OK;
516 }
517 no_process:
518 {
519 gst_rtp_buffer_unmap (&rtp);
520 /* this is not fatal but should be filtered earlier */
521 GST_ELEMENT_ERROR (filter, STREAM, NOT_IMPLEMENTED, (NULL),
522 ("The subclass does not have a process or process_rtp_packet method"));
523 gst_buffer_unref (in);
524 return GST_FLOW_ERROR;
525 }
526 }
527
528 static GstFlowReturn
gst_rtp_base_depayload_chain(GstPad * pad,GstObject * parent,GstBuffer * in)529 gst_rtp_base_depayload_chain (GstPad * pad, GstObject * parent, GstBuffer * in)
530 {
531 GstRTPBaseDepayloadClass *bclass;
532 GstRTPBaseDepayload *basedepay;
533 GstFlowReturn flow_ret;
534
535 basedepay = GST_RTP_BASE_DEPAYLOAD_CAST (parent);
536
537 bclass = GST_RTP_BASE_DEPAYLOAD_GET_CLASS (basedepay);
538
539 flow_ret = gst_rtp_base_depayload_handle_buffer (basedepay, bclass, in);
540
541 return flow_ret;
542 }
543
544 static GstFlowReturn
gst_rtp_base_depayload_chain_list(GstPad * pad,GstObject * parent,GstBufferList * list)545 gst_rtp_base_depayload_chain_list (GstPad * pad, GstObject * parent,
546 GstBufferList * list)
547 {
548 GstRTPBaseDepayloadClass *bclass;
549 GstRTPBaseDepayload *basedepay;
550 GstFlowReturn flow_ret;
551 GstBuffer *buffer;
552 guint i, len;
553
554 basedepay = GST_RTP_BASE_DEPAYLOAD_CAST (parent);
555
556 bclass = GST_RTP_BASE_DEPAYLOAD_GET_CLASS (basedepay);
557
558 flow_ret = GST_FLOW_OK;
559
560 /* chain each buffer in list individually */
561 len = gst_buffer_list_length (list);
562
563 if (len == 0)
564 goto done;
565
566 for (i = 0; i < len; i++) {
567 buffer = gst_buffer_list_get (list, i);
568
569 /* handle_buffer takes ownership of input buffer */
570 /* FIXME: add a way to steal buffers from list as we will unref it anyway */
571 gst_buffer_ref (buffer);
572
573 /* Should we fix up any missing timestamps for list buffers here
574 * (e.g. set to first or previous timestamp in list) or just assume
575 * the's a jitterbuffer that will have done that for us? */
576 flow_ret = gst_rtp_base_depayload_handle_buffer (basedepay, bclass, buffer);
577 if (flow_ret != GST_FLOW_OK)
578 break;
579 }
580
581 done:
582
583 gst_buffer_list_unref (list);
584
585 return flow_ret;
586 }
587
588 static gboolean
gst_rtp_base_depayload_handle_event(GstRTPBaseDepayload * filter,GstEvent * event)589 gst_rtp_base_depayload_handle_event (GstRTPBaseDepayload * filter,
590 GstEvent * event)
591 {
592 gboolean res = TRUE;
593 gboolean forward = TRUE;
594
595 switch (GST_EVENT_TYPE (event)) {
596 case GST_EVENT_FLUSH_STOP:
597 GST_OBJECT_LOCK (filter);
598 gst_segment_init (&filter->segment, GST_FORMAT_UNDEFINED);
599 GST_OBJECT_UNLOCK (filter);
600
601 filter->need_newsegment = TRUE;
602 filter->priv->next_seqnum = -1;
603 gst_event_replace (&filter->priv->segment_event, NULL);
604 break;
605 case GST_EVENT_CAPS:
606 {
607 GstCaps *caps;
608
609 gst_event_parse_caps (event, &caps);
610
611 res = gst_rtp_base_depayload_setcaps (filter, caps);
612 forward = FALSE;
613 break;
614 }
615 case GST_EVENT_SEGMENT:
616 {
617 GstSegment segment;
618
619 GST_OBJECT_LOCK (filter);
620 gst_event_copy_segment (event, &segment);
621
622 if (segment.format != GST_FORMAT_TIME) {
623 GST_ERROR_OBJECT (filter, "Segment with non-TIME format not supported");
624 res = FALSE;
625 }
626 filter->priv->segment_seqnum = gst_event_get_seqnum (event);
627 filter->segment = segment;
628 GST_OBJECT_UNLOCK (filter);
629
630 /* don't pass the event downstream, we generate our own segment including
631 * the NTP time and other things we receive in caps */
632 forward = FALSE;
633 break;
634 }
635 case GST_EVENT_CUSTOM_DOWNSTREAM:
636 {
637 GstRTPBaseDepayloadClass *bclass;
638
639 bclass = GST_RTP_BASE_DEPAYLOAD_GET_CLASS (filter);
640
641 if (gst_event_has_name (event, "GstRTPPacketLost")) {
642 /* we get this event from the jitterbuffer when it considers a packet as
643 * being lost. We send it to our packet_lost vmethod. The default
644 * implementation will make time progress by pushing out a GAP event.
645 * Subclasses can override and do one of the following:
646 * - Adjust timestamp/duration to something more accurate before
647 * calling the parent (default) packet_lost method.
648 * - do some more advanced error concealing on the already received
649 * (fragmented) packets.
650 * - ignore the packet lost.
651 */
652 if (bclass->packet_lost)
653 res = bclass->packet_lost (filter, event);
654 forward = FALSE;
655 }
656 break;
657 }
658 default:
659 break;
660 }
661
662 if (forward)
663 res = gst_pad_push_event (filter->srcpad, event);
664 else
665 gst_event_unref (event);
666
667 return res;
668 }
669
670 static gboolean
gst_rtp_base_depayload_handle_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)671 gst_rtp_base_depayload_handle_sink_event (GstPad * pad, GstObject * parent,
672 GstEvent * event)
673 {
674 gboolean res = FALSE;
675 GstRTPBaseDepayload *filter;
676 GstRTPBaseDepayloadClass *bclass;
677
678 filter = GST_RTP_BASE_DEPAYLOAD (parent);
679 bclass = GST_RTP_BASE_DEPAYLOAD_GET_CLASS (filter);
680 if (bclass->handle_event)
681 res = bclass->handle_event (filter, event);
682 else
683 gst_event_unref (event);
684
685 return res;
686 }
687
688 static GstEvent *
create_segment_event(GstRTPBaseDepayload * filter,guint rtptime,GstClockTime position)689 create_segment_event (GstRTPBaseDepayload * filter, guint rtptime,
690 GstClockTime position)
691 {
692 GstEvent *event;
693 GstClockTime start, stop, running_time;
694 GstRTPBaseDepayloadPrivate *priv;
695 GstSegment segment;
696
697 priv = filter->priv;
698
699 /* We don't need the object lock around - the segment
700 * can't change here while we're holding the STREAM_LOCK
701 */
702
703 /* determining the start of the segment */
704 start = filter->segment.start;
705 if (priv->clock_base != -1 && position != -1) {
706 GstClockTime exttime, gap;
707
708 exttime = priv->clock_base;
709 gst_rtp_buffer_ext_timestamp (&exttime, rtptime);
710 gap = gst_util_uint64_scale_int (exttime - priv->clock_base,
711 filter->clock_rate, GST_SECOND);
712
713 /* account for lost packets */
714 if (position > gap) {
715 GST_DEBUG_OBJECT (filter,
716 "Found gap of %" GST_TIME_FORMAT ", adjusting start: %"
717 GST_TIME_FORMAT " = %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT,
718 GST_TIME_ARGS (gap), GST_TIME_ARGS (position - gap),
719 GST_TIME_ARGS (position), GST_TIME_ARGS (gap));
720 start = position - gap;
721 }
722 }
723
724 /* determining the stop of the segment */
725 stop = filter->segment.stop;
726 if (priv->npt_stop != -1)
727 stop = start + (priv->npt_stop - priv->npt_start);
728
729 if (position == -1)
730 position = start;
731
732 running_time = gst_segment_to_running_time (&filter->segment,
733 GST_FORMAT_TIME, start);
734
735 gst_segment_init (&segment, GST_FORMAT_TIME);
736 segment.rate = priv->play_speed;
737 segment.applied_rate = priv->play_scale;
738 segment.start = start;
739 segment.stop = stop;
740 segment.time = priv->npt_start;
741 segment.position = position;
742 segment.base = running_time;
743
744 GST_DEBUG_OBJECT (filter, "Creating segment event %" GST_SEGMENT_FORMAT,
745 &segment);
746 event = gst_event_new_segment (&segment);
747 if (filter->priv->segment_seqnum != GST_SEQNUM_INVALID)
748 gst_event_set_seqnum (event, filter->priv->segment_seqnum);
749
750 return event;
751 }
752
753 static void
add_rtp_source_meta(GstBuffer * outbuf,GstBuffer * rtpbuf)754 add_rtp_source_meta (GstBuffer * outbuf, GstBuffer * rtpbuf)
755 {
756 GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
757 GstRTPSourceMeta *meta;
758 guint32 ssrc;
759
760 if (!gst_rtp_buffer_map (rtpbuf, GST_MAP_READ, &rtp))
761 return;
762
763 ssrc = gst_rtp_buffer_get_ssrc (&rtp);
764 meta = gst_buffer_add_rtp_source_meta (outbuf, &ssrc, NULL, 0);
765 if (meta != NULL) {
766 gint i;
767 gint csrc_count = gst_rtp_buffer_get_csrc_count (&rtp);
768 for (i = 0; i < csrc_count; i++) {
769 guint32 csrc = gst_rtp_buffer_get_csrc (&rtp, i);
770 gst_rtp_source_meta_append_csrc (meta, &csrc, 1);
771 }
772 }
773
774 gst_rtp_buffer_unmap (&rtp);
775 }
776
777 static gboolean
set_headers(GstBuffer ** buffer,guint idx,GstRTPBaseDepayload * depayload)778 set_headers (GstBuffer ** buffer, guint idx, GstRTPBaseDepayload * depayload)
779 {
780 GstRTPBaseDepayloadPrivate *priv = depayload->priv;
781 GstClockTime pts, dts, duration;
782
783 *buffer = gst_buffer_make_writable (*buffer);
784
785 pts = GST_BUFFER_PTS (*buffer);
786 dts = GST_BUFFER_DTS (*buffer);
787 duration = GST_BUFFER_DURATION (*buffer);
788
789 /* apply last incomming timestamp and duration to outgoing buffer if
790 * not otherwise set. */
791 if (!GST_CLOCK_TIME_IS_VALID (pts))
792 GST_BUFFER_PTS (*buffer) = priv->pts;
793 if (!GST_CLOCK_TIME_IS_VALID (dts))
794 GST_BUFFER_DTS (*buffer) = priv->dts;
795 if (!GST_CLOCK_TIME_IS_VALID (duration))
796 GST_BUFFER_DURATION (*buffer) = priv->duration;
797
798 if (G_UNLIKELY (depayload->priv->discont)) {
799 GST_LOG_OBJECT (depayload, "Marking DISCONT on output buffer");
800 GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_DISCONT);
801 depayload->priv->discont = FALSE;
802 }
803
804 /* make sure we only set the timestamp on the first packet */
805 priv->pts = GST_CLOCK_TIME_NONE;
806 priv->dts = GST_CLOCK_TIME_NONE;
807 priv->duration = GST_CLOCK_TIME_NONE;
808
809 if (priv->source_info && priv->input_buffer)
810 add_rtp_source_meta (*buffer, priv->input_buffer);
811
812 return TRUE;
813 }
814
815 static GstFlowReturn
gst_rtp_base_depayload_prepare_push(GstRTPBaseDepayload * filter,gboolean is_list,gpointer obj)816 gst_rtp_base_depayload_prepare_push (GstRTPBaseDepayload * filter,
817 gboolean is_list, gpointer obj)
818 {
819 if (is_list) {
820 GstBufferList **blist = obj;
821 gst_buffer_list_foreach (*blist, (GstBufferListFunc) set_headers, filter);
822 } else {
823 GstBuffer **buf = obj;
824 set_headers (buf, 0, filter);
825 }
826
827 /* if this is the first buffer send a NEWSEGMENT */
828 if (G_UNLIKELY (filter->priv->segment_event)) {
829 gst_pad_push_event (filter->srcpad, filter->priv->segment_event);
830 filter->priv->segment_event = NULL;
831 GST_DEBUG_OBJECT (filter, "Pushed newsegment event on this first buffer");
832 }
833
834 return GST_FLOW_OK;
835 }
836
837 /**
838 * gst_rtp_base_depayload_push:
839 * @filter: a #GstRTPBaseDepayload
840 * @out_buf: a #GstBuffer
841 *
842 * Push @out_buf to the peer of @filter. This function takes ownership of
843 * @out_buf.
844 *
845 * This function will by default apply the last incomming timestamp on
846 * the outgoing buffer when it didn't have a timestamp already.
847 *
848 * Returns: a #GstFlowReturn.
849 */
850 GstFlowReturn
gst_rtp_base_depayload_push(GstRTPBaseDepayload * filter,GstBuffer * out_buf)851 gst_rtp_base_depayload_push (GstRTPBaseDepayload * filter, GstBuffer * out_buf)
852 {
853 GstFlowReturn res;
854
855 res = gst_rtp_base_depayload_prepare_push (filter, FALSE, &out_buf);
856
857 if (G_LIKELY (res == GST_FLOW_OK))
858 res = gst_pad_push (filter->srcpad, out_buf);
859 else
860 gst_buffer_unref (out_buf);
861
862 return res;
863 }
864
865 /**
866 * gst_rtp_base_depayload_push_list:
867 * @filter: a #GstRTPBaseDepayload
868 * @out_list: a #GstBufferList
869 *
870 * Push @out_list to the peer of @filter. This function takes ownership of
871 * @out_list.
872 *
873 * Returns: a #GstFlowReturn.
874 */
875 GstFlowReturn
gst_rtp_base_depayload_push_list(GstRTPBaseDepayload * filter,GstBufferList * out_list)876 gst_rtp_base_depayload_push_list (GstRTPBaseDepayload * filter,
877 GstBufferList * out_list)
878 {
879 GstFlowReturn res;
880
881 res = gst_rtp_base_depayload_prepare_push (filter, TRUE, &out_list);
882
883 if (G_LIKELY (res == GST_FLOW_OK))
884 res = gst_pad_push_list (filter->srcpad, out_list);
885 else
886 gst_buffer_list_unref (out_list);
887
888 return res;
889 }
890
891 /* convert the PacketLost event from a jitterbuffer to a GAP event.
892 * subclasses can override this. */
893 static gboolean
gst_rtp_base_depayload_packet_lost(GstRTPBaseDepayload * filter,GstEvent * event)894 gst_rtp_base_depayload_packet_lost (GstRTPBaseDepayload * filter,
895 GstEvent * event)
896 {
897 GstClockTime timestamp, duration;
898 GstEvent *sevent;
899 const GstStructure *s;
900 gboolean might_have_been_fec;
901 gboolean res = TRUE;
902
903 s = gst_event_get_structure (event);
904
905 /* first start by parsing the timestamp and duration */
906 timestamp = -1;
907 duration = -1;
908
909 if (!gst_structure_get_clock_time (s, "timestamp", ×tamp) ||
910 !gst_structure_get_clock_time (s, "duration", &duration)) {
911 GST_ERROR_OBJECT (filter,
912 "Packet loss event without timestamp or duration");
913 return FALSE;
914 }
915
916 sevent = gst_pad_get_sticky_event (filter->srcpad, GST_EVENT_SEGMENT, 0);
917 if (G_UNLIKELY (!sevent)) {
918 /* Typically happens if lost event arrives before first buffer */
919 GST_DEBUG_OBJECT (filter,
920 "Ignore packet loss because segment event missing");
921 return FALSE;
922 }
923 gst_event_unref (sevent);
924
925 if (!gst_structure_get_boolean (s, "might-have-been-fec",
926 &might_have_been_fec) || !might_have_been_fec) {
927 /* send GAP event */
928 sevent = gst_event_new_gap (timestamp, duration);
929 res = gst_pad_push_event (filter->srcpad, sevent);
930 }
931
932 return res;
933 }
934
935 static GstStateChangeReturn
gst_rtp_base_depayload_change_state(GstElement * element,GstStateChange transition)936 gst_rtp_base_depayload_change_state (GstElement * element,
937 GstStateChange transition)
938 {
939 GstRTPBaseDepayload *filter;
940 GstRTPBaseDepayloadPrivate *priv;
941 GstStateChangeReturn ret;
942
943 filter = GST_RTP_BASE_DEPAYLOAD (element);
944 priv = filter->priv;
945
946 switch (transition) {
947 case GST_STATE_CHANGE_NULL_TO_READY:
948 break;
949 case GST_STATE_CHANGE_READY_TO_PAUSED:
950 filter->need_newsegment = TRUE;
951 priv->npt_start = 0;
952 priv->npt_stop = -1;
953 priv->play_speed = 1.0;
954 priv->play_scale = 1.0;
955 priv->clock_base = -1;
956 priv->next_seqnum = -1;
957 priv->negotiated = FALSE;
958 priv->discont = FALSE;
959 priv->segment_seqnum = GST_SEQNUM_INVALID;
960 break;
961 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
962 break;
963 default:
964 break;
965 }
966
967 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
968
969 switch (transition) {
970 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
971 break;
972 case GST_STATE_CHANGE_PAUSED_TO_READY:
973 gst_caps_replace (&priv->last_caps, NULL);
974 gst_event_replace (&priv->segment_event, NULL);
975 break;
976 case GST_STATE_CHANGE_READY_TO_NULL:
977 break;
978 default:
979 break;
980 }
981 return ret;
982 }
983
984 static GstStructure *
gst_rtp_base_depayload_create_stats(GstRTPBaseDepayload * depayload)985 gst_rtp_base_depayload_create_stats (GstRTPBaseDepayload * depayload)
986 {
987 GstRTPBaseDepayloadPrivate *priv;
988 GstStructure *s;
989 GstClockTime pts = GST_CLOCK_TIME_NONE, dts = GST_CLOCK_TIME_NONE;
990
991 priv = depayload->priv;
992
993 GST_OBJECT_LOCK (depayload);
994 if (depayload->segment.format != GST_FORMAT_UNDEFINED) {
995 pts = gst_segment_to_running_time (&depayload->segment, GST_FORMAT_TIME,
996 priv->pts);
997 dts = gst_segment_to_running_time (&depayload->segment, GST_FORMAT_TIME,
998 priv->dts);
999 }
1000 GST_OBJECT_UNLOCK (depayload);
1001
1002 s = gst_structure_new ("application/x-rtp-depayload-stats",
1003 "clock_rate", G_TYPE_UINT, depayload->clock_rate,
1004 "npt-start", G_TYPE_UINT64, priv->npt_start,
1005 "npt-stop", G_TYPE_UINT64, priv->npt_stop,
1006 "play-speed", G_TYPE_DOUBLE, priv->play_speed,
1007 "play-scale", G_TYPE_DOUBLE, priv->play_scale,
1008 "running-time-dts", G_TYPE_UINT64, dts,
1009 "running-time-pts", G_TYPE_UINT64, pts,
1010 "seqnum", G_TYPE_UINT, (guint) priv->last_seqnum,
1011 "timestamp", G_TYPE_UINT, (guint) priv->last_rtptime, NULL);
1012
1013 return s;
1014 }
1015
1016
1017 static void
gst_rtp_base_depayload_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1018 gst_rtp_base_depayload_set_property (GObject * object, guint prop_id,
1019 const GValue * value, GParamSpec * pspec)
1020 {
1021 GstRTPBaseDepayload *depayload;
1022
1023 depayload = GST_RTP_BASE_DEPAYLOAD (object);
1024
1025 switch (prop_id) {
1026 case PROP_SOURCE_INFO:
1027 gst_rtp_base_depayload_set_source_info_enabled (depayload,
1028 g_value_get_boolean (value));
1029 break;
1030 default:
1031 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1032 break;
1033 }
1034 }
1035
1036 static void
gst_rtp_base_depayload_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1037 gst_rtp_base_depayload_get_property (GObject * object, guint prop_id,
1038 GValue * value, GParamSpec * pspec)
1039 {
1040 GstRTPBaseDepayload *depayload;
1041
1042 depayload = GST_RTP_BASE_DEPAYLOAD (object);
1043
1044 switch (prop_id) {
1045 case PROP_STATS:
1046 g_value_take_boxed (value,
1047 gst_rtp_base_depayload_create_stats (depayload));
1048 break;
1049 case PROP_SOURCE_INFO:
1050 g_value_set_boolean (value,
1051 gst_rtp_base_depayload_is_source_info_enabled (depayload));
1052 break;
1053 default:
1054 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1055 break;
1056 }
1057 }
1058
1059 /**
1060 * gst_rtp_base_depayload_set_source_info_enabled:
1061 * @depayload: a #GstRTPBaseDepayload
1062 * @enable: whether to add meta about RTP sources to buffer
1063 *
1064 * Enable or disable adding #GstRTPSourceMeta to depayloaded buffers.
1065 *
1066 * Since: 1.16
1067 **/
1068 void
gst_rtp_base_depayload_set_source_info_enabled(GstRTPBaseDepayload * depayload,gboolean enable)1069 gst_rtp_base_depayload_set_source_info_enabled (GstRTPBaseDepayload * depayload,
1070 gboolean enable)
1071 {
1072 depayload->priv->source_info = enable;
1073 }
1074
1075 /**
1076 * gst_rtp_base_depayload_is_source_info_enabled:
1077 * @depayload: a #GstRTPBaseDepayload
1078 *
1079 * Queries whether #GstRTPSourceMeta will be added to depayloaded buffers.
1080 *
1081 * Returns: %TRUE if source-info is enabled.
1082 *
1083 * Since: 1.16
1084 **/
1085 gboolean
gst_rtp_base_depayload_is_source_info_enabled(GstRTPBaseDepayload * depayload)1086 gst_rtp_base_depayload_is_source_info_enabled (GstRTPBaseDepayload * depayload)
1087 {
1088 return depayload->priv->source_info;
1089 }
1090