1 /*
2 * Farsight Voice+Video library
3 *
4 * Copyright 2007 Collabora Ltd,
5 * Copyright 2007 Nokia Corporation
6 * @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7 * Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8 * Copyright 2015 Kurento (http://kurento.org/)
9 * @author: Miguel París <mparisdiaz@gmail.com>
10 * Copyright 2016 Pexip AS
11 * @author: Havard Graff <havard@pexip.com>
12 * @author: Stian Selnes <stian@pexip.com>
13 *
14 * This library is free software; you can redistribute it and/or
15 * modify it under the terms of the GNU Library General Public
16 * License as published by the Free Software Foundation; either
17 * version 2 of the License, or (at your option) any later version.
18 *
19 * This library is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
22 * Library General Public License for more details.
23 *
24 * You should have received a copy of the GNU Library General Public
25 * License along with this library; if not, write to the
26 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
27 * Boston, MA 02110-1301, USA.
28 *
29 */
30
31 /**
32 * SECTION:element-rtpjitterbuffer
33 * @title: rtpjitterbuffer
34 *
35 * This element reorders and removes duplicate RTP packets as they are received
36 * from a network source.
37 *
38 * The element needs the clock-rate of the RTP payload in order to estimate the
39 * delay. This information is obtained either from the caps on the sink pad or,
40 * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
41 * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
42 *
43 * The rtpjitterbuffer will wait for missing packets up to a configurable time
44 * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
45 * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
46 * property is set, lost packets will result in a custom serialized downstream
47 * event of name GstRTPPacketLost. The lost packet events are usually used by a
48 * depayloader or other element to create concealment data or some other logic
49 * to gracefully handle the missing packets.
50 *
51 * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incoming
52 * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
53 * buffer.
54 *
55 * The jitterbuffer can also be configured to send early retransmission events
56 * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
57 * this mode, the jitterbuffer tries to estimate when a packet should arrive and
58 * sends a custom upstream event named GstRTPRetransmissionRequest when the
59 * packet is considered late. The initial expected packet arrival time is
60 * calculated as follows:
61 *
62 * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
63 * T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
64 * calculated from the DTS (or PTS is no DTS) of two consecutive RTP
65 * packets with different rtptime.
66 *
67 * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
68 * seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
69 * previously scheduled timeout is overwritten.
70 *
71 * - If seqnum N arrived, all seqnum older than
72 * N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
73 * immediately. This is to request fast feedback for abnormally reorder
74 * packets before any of the previous timeouts is triggered.
75 *
76 * A late packet triggers the GstRTPRetransmissionRequest custom upstream
77 * event. After the initial timeout expires and the retransmission event is
78 * sent, the timeout is scheduled for
79 * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
80 * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
81 * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
82 * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
83 * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
84 * retransmission requests are sent and the regular logic is performed to
85 * schedule a lost packet as discussed above.
86 *
87 * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
88 * to the pipeline.
89 *
90 * This element will automatically be used inside rtpbin.
91 *
92 * ## Example pipelines
93 * |[
94 * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
95 * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
96 * inserted into the pipeline to smooth out network jitter and to reorder the
97 * out-of-order RTP packets.
98 *
99 */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 #include <stdlib.h>
106 #include <stdio.h>
107 #include <string.h>
108 #include <gst/rtp/gstrtpbuffer.h>
109 #include <gst/net/net.h>
110
111 #include "gstrtpjitterbuffer.h"
112 #include "rtpjitterbuffer.h"
113 #include "rtpstats.h"
114 #include "rtptimerqueue.h"
115
116 #include <gst/glib-compat-private.h>
117
118 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
119 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
120
121 /* RTPJitterBuffer signals and args */
122 enum
123 {
124 SIGNAL_REQUEST_PT_MAP,
125 SIGNAL_CLEAR_PT_MAP,
126 SIGNAL_HANDLE_SYNC,
127 SIGNAL_ON_NPT_STOP,
128 SIGNAL_SET_ACTIVE,
129 LAST_SIGNAL
130 };
131
132 #define DEFAULT_LATENCY_MS 200
133 #define DEFAULT_DROP_ON_LATENCY FALSE
134 #define DEFAULT_TS_OFFSET 0
135 #define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT 0
136 #define DEFAULT_DO_LOST FALSE
137 #define DEFAULT_POST_DROP_MESSAGES FALSE
138 #define DEFAULT_DROP_MESSAGES_INTERVAL_MS 200
139 #define DEFAULT_MODE RTP_JITTER_BUFFER_MODE_SLAVE
140 #define DEFAULT_PERCENT 0
141 #define DEFAULT_DO_RETRANSMISSION FALSE
142 #define DEFAULT_RTX_NEXT_SEQNUM TRUE
143 #define DEFAULT_RTX_DELAY -1
144 #define DEFAULT_RTX_MIN_DELAY 0
145 #define DEFAULT_RTX_DELAY_REORDER 3
146 #define DEFAULT_RTX_RETRY_TIMEOUT -1
147 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT -1
148 #define DEFAULT_RTX_RETRY_PERIOD -1
149 #define DEFAULT_RTX_MAX_RETRIES -1
150 #define DEFAULT_RTX_DEADLINE -1
151 #define DEFAULT_RTX_STATS_TIMEOUT 1000
152 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
153 #define DEFAULT_MAX_DROPOUT_TIME 60000
154 #define DEFAULT_MAX_MISORDER_TIME 2000
155 #define DEFAULT_RFC7273_SYNC FALSE
156 #define DEFAULT_FASTSTART_MIN_PACKETS 0
157
158 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
159 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
160
161 enum
162 {
163 PROP_0,
164 PROP_LATENCY,
165 PROP_DROP_ON_LATENCY,
166 PROP_TS_OFFSET,
167 PROP_MAX_TS_OFFSET_ADJUSTMENT,
168 PROP_DO_LOST,
169 PROP_POST_DROP_MESSAGES,
170 PROP_DROP_MESSAGES_INTERVAL,
171 PROP_MODE,
172 PROP_PERCENT,
173 PROP_DO_RETRANSMISSION,
174 PROP_RTX_NEXT_SEQNUM,
175 PROP_RTX_DELAY,
176 PROP_RTX_MIN_DELAY,
177 PROP_RTX_DELAY_REORDER,
178 PROP_RTX_RETRY_TIMEOUT,
179 PROP_RTX_MIN_RETRY_TIMEOUT,
180 PROP_RTX_RETRY_PERIOD,
181 PROP_RTX_MAX_RETRIES,
182 PROP_RTX_DEADLINE,
183 PROP_RTX_STATS_TIMEOUT,
184 PROP_STATS,
185 PROP_MAX_RTCP_RTP_TIME_DIFF,
186 PROP_MAX_DROPOUT_TIME,
187 PROP_MAX_MISORDER_TIME,
188 PROP_RFC7273_SYNC,
189 PROP_FASTSTART_MIN_PACKETS
190 };
191
192 #define JBUF_LOCK(priv) G_STMT_START { \
193 GST_TRACE("Locking from thread %p", g_thread_self()); \
194 (g_mutex_lock (&(priv)->jbuf_lock)); \
195 GST_TRACE("Locked from thread %p", g_thread_self()); \
196 } G_STMT_END
197
198 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START { \
199 JBUF_LOCK (priv); \
200 if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
201 goto label; \
202 } G_STMT_END
203 #define JBUF_UNLOCK(priv) G_STMT_START { \
204 GST_TRACE ("Unlocking from thread %p", g_thread_self ()); \
205 (g_mutex_unlock (&(priv)->jbuf_lock)); \
206 } G_STMT_END
207
208 #define JBUF_WAIT_QUEUE(priv) G_STMT_START { \
209 GST_DEBUG ("waiting queue"); \
210 (priv)->waiting_queue++; \
211 g_cond_wait (&(priv)->jbuf_queue, &(priv)->jbuf_lock); \
212 (priv)->waiting_queue--; \
213 GST_DEBUG ("waiting queue done"); \
214 } G_STMT_END
215 #define JBUF_SIGNAL_QUEUE(priv) G_STMT_START { \
216 if (G_UNLIKELY ((priv)->waiting_queue)) { \
217 GST_DEBUG ("signal queue, %d waiters", (priv)->waiting_queue); \
218 g_cond_signal (&(priv)->jbuf_queue); \
219 } \
220 } G_STMT_END
221
222 #define JBUF_WAIT_TIMER(priv) G_STMT_START { \
223 GST_DEBUG ("waiting timer"); \
224 (priv)->waiting_timer++; \
225 g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock); \
226 (priv)->waiting_timer--; \
227 GST_DEBUG ("waiting timer done"); \
228 } G_STMT_END
229 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \
230 if (G_UNLIKELY ((priv)->waiting_timer)) { \
231 GST_DEBUG ("signal timer, %d waiters", (priv)->waiting_timer); \
232 g_cond_signal (&(priv)->jbuf_timer); \
233 } \
234 } G_STMT_END
235
236 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START { \
237 if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
238 goto label; \
239 GST_DEBUG ("waiting event"); \
240 (priv)->waiting_event = TRUE; \
241 g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
242 (priv)->waiting_event = FALSE; \
243 GST_DEBUG ("waiting event done"); \
244 if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
245 goto label; \
246 } G_STMT_END
247 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START { \
248 if (G_UNLIKELY ((priv)->waiting_event)) { \
249 GST_DEBUG ("signal event"); \
250 g_cond_signal (&(priv)->jbuf_event); \
251 } \
252 } G_STMT_END
253
254 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START { \
255 if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
256 goto label; \
257 GST_DEBUG ("waiting query"); \
258 (priv)->waiting_query = TRUE; \
259 g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
260 (priv)->waiting_query = FALSE; \
261 GST_DEBUG ("waiting query done"); \
262 if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
263 goto label; \
264 } G_STMT_END
265 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START { \
266 (priv)->last_query = res; \
267 if (G_UNLIKELY ((priv)->waiting_query)) { \
268 GST_DEBUG ("signal query"); \
269 g_cond_signal (&(priv)->jbuf_query); \
270 } \
271 } G_STMT_END
272
273 #define GST_BUFFER_IS_RETRANSMISSION(buffer) \
274 GST_BUFFER_FLAG_IS_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION)
275
276 #if !GLIB_CHECK_VERSION(2, 60, 0)
277 #define g_queue_clear_full queue_clear_full
278 static void
queue_clear_full(GQueue * queue,GDestroyNotify free_func)279 queue_clear_full (GQueue * queue, GDestroyNotify free_func)
280 {
281 gpointer data;
282
283 while ((data = g_queue_pop_head (queue)) != NULL)
284 free_func (data);
285 }
286 #endif
287
288 struct _GstRtpJitterBufferPrivate
289 {
290 GstPad *sinkpad, *srcpad;
291 GstPad *rtcpsinkpad;
292
293 RTPJitterBuffer *jbuf;
294 GMutex jbuf_lock;
295 guint waiting_queue;
296 GCond jbuf_queue;
297 guint waiting_timer;
298 GCond jbuf_timer;
299 gboolean waiting_event;
300 GCond jbuf_event;
301 gboolean waiting_query;
302 GCond jbuf_query;
303 gboolean last_query;
304 gboolean discont;
305 gboolean ts_discont;
306 gboolean active;
307 guint64 out_offset;
308 guint32 segment_seqnum;
309
310 gboolean timer_running;
311 GThread *timer_thread;
312
313 /* properties */
314 guint latency_ms;
315 guint64 latency_ns;
316 gboolean drop_on_latency;
317 gint64 ts_offset;
318 guint64 max_ts_offset_adjustment;
319 gboolean do_lost;
320 gboolean post_drop_messages;
321 guint drop_messages_interval_ms;
322 gboolean do_retransmission;
323 gboolean rtx_next_seqnum;
324 gint rtx_delay;
325 guint rtx_min_delay;
326 gint rtx_delay_reorder;
327 gint rtx_retry_timeout;
328 gint rtx_min_retry_timeout;
329 gint rtx_retry_period;
330 gint rtx_max_retries;
331 guint rtx_stats_timeout;
332 gint rtx_deadline_ms;
333 gint max_rtcp_rtp_time_diff;
334 guint32 max_dropout_time;
335 guint32 max_misorder_time;
336 guint faststart_min_packets;
337
338 /* the last seqnum we pushed out */
339 guint32 last_popped_seqnum;
340 /* the next expected seqnum we push */
341 guint32 next_seqnum;
342 /* seqnum-base, if known */
343 guint32 seqnum_base;
344 /* last output time */
345 GstClockTime last_out_time;
346 /* last valid input timestamp and rtptime pair */
347 GstClockTime ips_pts;
348 guint64 ips_rtptime;
349 GstClockTime packet_spacing;
350 gint equidistant;
351
352 GQueue gap_packets;
353
354 /* the next expected seqnum we receive */
355 GstClockTime last_in_pts;
356 guint32 next_in_seqnum;
357
358 /* "normal" timers */
359 RtpTimerQueue *timers;
360 /* timers used for RTX statistics backlog */
361 RtpTimerQueue *rtx_stats_timers;
362
363 /* start and stop ranges */
364 GstClockTime npt_start;
365 GstClockTime npt_stop;
366 guint64 ext_timestamp;
367 guint64 last_elapsed;
368 guint64 estimated_eos;
369 GstClockID eos_id;
370
371 /* state */
372 gboolean eos;
373 guint last_percent;
374
375 /* clock rate and rtp timestamp offset */
376 gint last_pt;
377 gint32 clock_rate;
378 gint64 clock_base;
379 gint64 ts_offset_remainder;
380
381 /* when we are shutting down */
382 GstFlowReturn srcresult;
383 gboolean blocked;
384
385 /* for sync */
386 GstSegment segment;
387 GstClockID clock_id;
388 GstClockTime timer_timeout;
389 guint16 timer_seqnum;
390 /* the latency of the upstream peer, we have to take this into account when
391 * synchronizing the buffers. */
392 GstClockTime peer_latency;
393 guint64 ext_rtptime;
394 GstBuffer *last_sr;
395
396 /* some accounting */
397 guint64 num_pushed;
398 guint64 num_lost;
399 guint64 num_late;
400 guint64 num_duplicates;
401 guint64 num_rtx_requests;
402 guint64 num_rtx_success;
403 guint64 num_rtx_failed;
404 gdouble avg_rtx_num;
405 guint64 avg_rtx_rtt;
406 RTPPacketRateCtx packet_rate_ctx;
407
408 /* for the jitter */
409 GstClockTime last_dts;
410 GstClockTime last_pts;
411 guint64 last_rtptime;
412 GstClockTime avg_jitter;
413
414 /* for dropped packet messages */
415 GstClockTime last_drop_msg_timestamp;
416 /* accumulators; reset every time a drop message is posted */
417 guint num_too_late;
418 guint num_drop_on_latency;
419 };
420 typedef enum
421 {
422 REASON_TOO_LATE,
423 REASON_DROP_ON_LATENCY
424 } DropMessageReason;
425
426 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
427 GST_STATIC_PAD_TEMPLATE ("sink",
428 GST_PAD_SINK,
429 GST_PAD_ALWAYS,
430 GST_STATIC_CAPS ("application/x-rtp"
431 /* "clock-rate = (int) [ 1, 2147483647 ], "
432 * "payload = (int) , "
433 * "encoding-name = (string) "
434 */ )
435 );
436
437 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
438 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
439 GST_PAD_SINK,
440 GST_PAD_REQUEST,
441 GST_STATIC_CAPS ("application/x-rtcp")
442 );
443
444 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
445 GST_STATIC_PAD_TEMPLATE ("src",
446 GST_PAD_SRC,
447 GST_PAD_ALWAYS,
448 GST_STATIC_CAPS ("application/x-rtp"
449 /* "payload = (int) , "
450 * "clock-rate = (int) , "
451 * "encoding-name = (string) "
452 */ )
453 );
454
455 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
456
457 #define gst_rtp_jitter_buffer_parent_class parent_class
458 G_DEFINE_TYPE_WITH_PRIVATE (GstRtpJitterBuffer, gst_rtp_jitter_buffer,
459 GST_TYPE_ELEMENT);
460 GST_ELEMENT_REGISTER_DEFINE (rtpjitterbuffer, "rtpjitterbuffer", GST_RANK_NONE,
461 GST_TYPE_RTP_JITTER_BUFFER);
462
463 /* object overrides */
464 static void gst_rtp_jitter_buffer_set_property (GObject * object,
465 guint prop_id, const GValue * value, GParamSpec * pspec);
466 static void gst_rtp_jitter_buffer_get_property (GObject * object,
467 guint prop_id, GValue * value, GParamSpec * pspec);
468 static void gst_rtp_jitter_buffer_finalize (GObject * object);
469
470 /* element overrides */
471 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
472 * element, GstStateChange transition);
473 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
474 GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
475 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
476 GstPad * pad);
477 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
478 static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element,
479 GstClock * clock);
480
481 /* pad overrides */
482 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
483 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
484 GstObject * parent);
485
486 /* sinkpad overrides */
487 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
488 GstObject * parent, GstEvent * event);
489 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
490 GstObject * parent, GstBuffer * buffer);
491 static GstFlowReturn gst_rtp_jitter_buffer_chain_list (GstPad * pad,
492 GstObject * parent, GstBufferList * buffer_list);
493
494 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
495 GstObject * parent, GstEvent * event);
496 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
497 GstObject * parent, GstBuffer * buffer);
498
499 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
500 GstObject * parent, GstQuery * query);
501
502 /* srcpad overrides */
503 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
504 GstObject * parent, GstEvent * event);
505 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
506 GstObject * parent, GstPadMode mode, gboolean active);
507 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
508 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
509 GstObject * parent, GstQuery * query);
510
511 static void
512 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
513 static GstClockTime
514 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
515 gboolean active, guint64 base_time);
516 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
517
518 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
519
520 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
521
522 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
523 jitterbuffer);
524
525 static void update_rtx_stats (GstRtpJitterBuffer * jitterbuffer,
526 const RtpTimer * timer, GstClockTime dts, gboolean success);
527
528 static GstClockTime get_current_running_time (GstRtpJitterBuffer *
529 jitterbuffer);
530
531 static void
gst_rtp_jitter_buffer_class_init(GstRtpJitterBufferClass * klass)532 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
533 {
534 GObjectClass *gobject_class;
535 GstElementClass *gstelement_class;
536
537 gobject_class = (GObjectClass *) klass;
538 gstelement_class = (GstElementClass *) klass;
539
540 gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
541
542 gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
543 gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
544
545 /**
546 * GstRtpJitterBuffer:latency:
547 *
548 * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
549 * for at most this time.
550 */
551 g_object_class_install_property (gobject_class, PROP_LATENCY,
552 g_param_spec_uint ("latency", "Buffer latency in ms",
553 "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
554 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
555 /**
556 * GstRtpJitterBuffer:drop-on-latency:
557 *
558 * Drop oldest buffers when the queue is completely filled.
559 */
560 g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
561 g_param_spec_boolean ("drop-on-latency",
562 "Drop buffers when maximum latency is reached",
563 "Tells the jitterbuffer to never exceed the given latency in size",
564 DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
565 /**
566 * GstRtpJitterBuffer:ts-offset:
567 *
568 * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
569 * This is mainly used to ensure interstream synchronisation.
570 */
571 g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
572 g_param_spec_int64 ("ts-offset", "Timestamp Offset",
573 "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
574 G_MAXINT64, DEFAULT_TS_OFFSET,
575 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
576
577 /**
578 * GstRtpJitterBuffer:max-ts-offset-adjustment:
579 *
580 * The maximum number of nanoseconds per frame that time offset may be
581 * adjusted with. This is used to avoid sudden large changes to time stamps.
582 */
583 g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET_ADJUSTMENT,
584 g_param_spec_uint64 ("max-ts-offset-adjustment",
585 "Max Timestamp Offset Adjustment",
586 "The maximum number of nanoseconds per frame that time stamp "
587 "offsets may be adjusted (0 = no limit).", 0, G_MAXUINT64,
588 DEFAULT_MAX_TS_OFFSET_ADJUSTMENT, G_PARAM_READWRITE |
589 G_PARAM_STATIC_STRINGS));
590
591 /**
592 * GstRtpJitterBuffer:do-lost:
593 *
594 * Send out a GstRTPPacketLost event downstream when a packet is considered
595 * lost.
596 */
597 g_object_class_install_property (gobject_class, PROP_DO_LOST,
598 g_param_spec_boolean ("do-lost", "Do Lost",
599 "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
600 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
601
602 /**
603 * GstRtpJitterBuffer:post-drop-messages:
604 *
605 * Post custom messages to the bus when a packet is dropped by the
606 * jitterbuffer due to arriving too late, being already considered lost,
607 * or being dropped due to the drop-on-latency property being enabled.
608 * Message is of type GST_MESSAGE_ELEMENT and contains a GstStructure named
609 * "drop-msg" with the following fields:
610 *
611 * * #guint `seqnum`: Seqnum of dropped packet.
612 * * #guint64 `timestamp`: PTS timestamp of dropped packet.
613 * * #GString `reason`: Reason for dropping the packet.
614 * * #guint `num-too-late`: Number of packets arriving too late since
615 * last drop message.
616 * * #guint `num-drop-on-latency`: Number of packets dropped due to the
617 * drop-on-latency property since last drop message.
618 *
619 * Since: 1.18
620 */
621 g_object_class_install_property (gobject_class, PROP_POST_DROP_MESSAGES,
622 g_param_spec_boolean ("post-drop-messages", "Post drop messages",
623 "Post a custom message to the bus when a packet is dropped by the jitterbuffer",
624 DEFAULT_POST_DROP_MESSAGES,
625 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
626
627 /**
628 * GstRtpJitterBuffer:drop-messages-interval:
629 *
630 * Minimal time in milliseconds between posting dropped packet messages, if enabled
631 * by setting property by setting #GstRtpJitterBuffer:post-drop-messages to %TRUE.
632 * If interval is set to 0, every dropped packet will result in a drop message being posted.
633 *
634 * Since: 1.18
635 */
636 g_object_class_install_property (gobject_class, PROP_DROP_MESSAGES_INTERVAL,
637 g_param_spec_uint ("drop-messages-interval",
638 "Drop message interval",
639 "Minimal time between posting dropped packet messages", 0,
640 G_MAXUINT, DEFAULT_DROP_MESSAGES_INTERVAL_MS,
641 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
642
643 /**
644 * GstRtpJitterBuffer:mode:
645 *
646 * Control the buffering and timestamping mode used by the jitterbuffer.
647 */
648 g_object_class_install_property (gobject_class, PROP_MODE,
649 g_param_spec_enum ("mode", "Mode",
650 "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
651 DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
652 /**
653 * GstRtpJitterBuffer:percent:
654 *
655 * The percent of the jitterbuffer that is filled.
656 */
657 g_object_class_install_property (gobject_class, PROP_PERCENT,
658 g_param_spec_int ("percent", "percent",
659 "The buffer filled percent", 0, 100,
660 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
661 /**
662 * GstRtpJitterBuffer:do-retransmission:
663 *
664 * Send out a GstRTPRetransmission event upstream when a packet is considered
665 * late and should be retransmitted.
666 *
667 * Since: 1.2
668 */
669 g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
670 g_param_spec_boolean ("do-retransmission", "Do Retransmission",
671 "Send retransmission events upstream when a packet is late",
672 DEFAULT_DO_RETRANSMISSION,
673 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
674
675 /**
676 * GstRtpJitterBuffer:rtx-next-seqnum
677 *
678 * Estimate when the next packet should arrive and schedule a retransmission
679 * request for it.
680 * This is, when packet N arrives, a GstRTPRetransmission event is schedule
681 * for packet N+1. So it will be requested if it does not arrive at the expected time.
682 * The expected time is calculated using the dts of N and the packet spacing.
683 *
684 * Since: 1.6
685 */
686 g_object_class_install_property (gobject_class, PROP_RTX_NEXT_SEQNUM,
687 g_param_spec_boolean ("rtx-next-seqnum", "RTX next seqnum",
688 "Estimate when the next packet should arrive and schedule a "
689 "retransmission request for it.",
690 DEFAULT_RTX_NEXT_SEQNUM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
691
692 /**
693 * GstRtpJitterBuffer:rtx-delay:
694 *
695 * When a packet did not arrive at the expected time, wait this extra amount
696 * of time before sending a retransmission event.
697 *
698 * When -1 is used, the max jitter will be used as extra delay.
699 *
700 * Since: 1.2
701 */
702 g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
703 g_param_spec_int ("rtx-delay", "RTX Delay",
704 "Extra time in ms to wait before sending retransmission "
705 "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
706 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
707
708 /**
709 * GstRtpJitterBuffer:rtx-min-delay:
710 *
711 * When a packet did not arrive at the expected time, wait at least this extra amount
712 * of time before sending a retransmission event.
713 *
714 * Since: 1.6
715 */
716 g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
717 g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
718 "Minimum time in ms to wait before sending retransmission "
719 "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
720 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
721 /**
722 * GstRtpJitterBuffer:rtx-delay-reorder:
723 *
724 * Assume that a retransmission event should be sent when we see
725 * this much packet reordering.
726 *
727 * When -1 is used, the value will be estimated based on observed packet
728 * reordering. When 0 is used packet reordering alone will not cause a
729 * retransmission event (Since 1.10).
730 *
731 * Since: 1.2
732 */
733 g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
734 g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
735 "Sending retransmission event when this much reordering "
736 "(0 disable)",
737 -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
738 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
739 /**
740 * GstRtpJitterBuffer:rtx-retry-timeout:
741 *
742 * When no packet has been received after sending a retransmission event
743 * for this time, retry sending a retransmission event.
744 *
745 * When -1 is used, the value will be estimated based on observed round
746 * trip time.
747 *
748 * Since: 1.2
749 */
750 g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
751 g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
752 "Retry sending a transmission event after this timeout in "
753 "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
754 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
755 /**
756 * GstRtpJitterBuffer:rtx-min-retry-timeout:
757 *
758 * The minimum amount of time between retry timeouts. When
759 * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
760 * minimum interval between retry timeouts.
761 *
762 * When -1 is used, the value will be estimated based on the
763 * packet spacing.
764 *
765 * Since: 1.6
766 */
767 g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
768 g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
769 "Minimum timeout between sending a transmission event in "
770 "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
771 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
772 /**
773 * GstRtpJitterBuffer:rtx-retry-period:
774 *
775 * The amount of time to try to get a retransmission.
776 *
777 * When -1 is used, the value will be estimated based on the jitterbuffer
778 * latency and the observed round trip time.
779 *
780 * Since: 1.2
781 */
782 g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
783 g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
784 "Try to get a retransmission for this many ms "
785 "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
786 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
787 /**
788 * GstRtpJitterBuffer:rtx-max-retries:
789 *
790 * The maximum number of retries to request a retransmission.
791 *
792 * This implies that as maximum (rtx-max-retries + 1) retransmissions will be requested.
793 * When -1 is used, the number of retransmission request will not be limited.
794 *
795 * Since: 1.6
796 */
797 g_object_class_install_property (gobject_class, PROP_RTX_MAX_RETRIES,
798 g_param_spec_int ("rtx-max-retries", "RTX Max Retries",
799 "The maximum number of retries to request a retransmission. "
800 "(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES,
801 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
802 /**
803 * GstRtpJitterBuffer:rtx-deadline:
804 *
805 * The deadline for a valid RTX request in ms.
806 *
807 * How long the RTX RTCP will be valid for.
808 * When -1 is used, the size of the jitterbuffer will be used.
809 *
810 * Since: 1.10
811 */
812 g_object_class_install_property (gobject_class, PROP_RTX_DEADLINE,
813 g_param_spec_int ("rtx-deadline", "RTX Deadline (ms)",
814 "The deadline for a valid RTX request in milliseconds. "
815 "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DEADLINE,
816 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
817 /**
818 * GstRtpJitterBuffer:rtx-stats-timeout:
819 *
820 * The time to wait for a retransmitted packet after it has been
821 * considered lost in order to collect RTX statistics.
822 *
823 * Since: 1.10
824 */
825 g_object_class_install_property (gobject_class, PROP_RTX_STATS_TIMEOUT,
826 g_param_spec_uint ("rtx-stats-timeout", "RTX Statistics Timeout",
827 "The time to wait for a retransmitted packet after it has been "
828 "considered lost in order to collect statistics (ms)",
829 0, G_MAXUINT, DEFAULT_RTX_STATS_TIMEOUT,
830 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
831
832 g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
833 g_param_spec_uint ("max-dropout-time", "Max dropout time",
834 "The maximum time (milliseconds) of missing packets tolerated.",
835 0, G_MAXINT32, DEFAULT_MAX_DROPOUT_TIME,
836 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
837
838 g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
839 g_param_spec_uint ("max-misorder-time", "Max misorder time",
840 "The maximum time (milliseconds) of misordered packets tolerated.",
841 0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
842 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
843 /**
844 * GstRtpJitterBuffer:stats:
845 *
846 * Various jitterbuffer statistics. This property returns a GstStructure
847 * with name application/x-rtp-jitterbuffer-stats with the following fields:
848 *
849 * * #guint64 `num-pushed`: the number of packets pushed out.
850 * * #guint64 `num-lost`: the number of packets considered lost.
851 * * #guint64 `num-late`: the number of packets arriving too late.
852 * * #guint64 `num-duplicates`: the number of duplicate packets.
853 * * #guint64 `avg-jitter`: the average jitter in nanoseconds.
854 * * #guint64 `rtx-count`: the number of retransmissions requested.
855 * * #guint64 `rtx-success-count`: the number of successful retransmissions.
856 * * #gdouble `rtx-per-packet`: average number of RTX per packet.
857 * * #guint64 `rtx-rtt`: average round trip time per RTX.
858 *
859 * Since: 1.4
860 */
861 g_object_class_install_property (gobject_class, PROP_STATS,
862 g_param_spec_boxed ("stats", "Statistics",
863 "Various statistics", GST_TYPE_STRUCTURE,
864 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
865
866 /**
867 * GstRtpJitterBuffer:max-rtcp-rtp-time-diff
868 *
869 * The maximum amount of time in ms that the RTP time in the RTCP SRs
870 * is allowed to be ahead of the last RTP packet we received. Use
871 * -1 to disable ignoring of RTCP packets.
872 *
873 * Since: 1.8
874 */
875 g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
876 g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
877 "Maximum amount of time in ms that the RTP time in RTCP SRs "
878 "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
879 DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
880 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
881
882 g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
883 g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
884 "Synchronize received streams to the RFC7273 clock "
885 "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
886 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
887
888 /**
889 * GstRtpJitterBuffer:faststart-min-packets
890 *
891 * The number of consecutive packets needed to start (set to 0 to
892 * disable faststart. The jitterbuffer will by default start after the
893 * latency has elapsed)
894 *
895 * Since: 1.14
896 */
897 g_object_class_install_property (gobject_class, PROP_FASTSTART_MIN_PACKETS,
898 g_param_spec_uint ("faststart-min-packets", "Faststart minimum packets",
899 "The number of consecutive packets needed to start (set to 0 to "
900 "disable faststart. The jitterbuffer will by default start after "
901 "the latency has elapsed)",
902 0, G_MAXUINT, DEFAULT_FASTSTART_MIN_PACKETS,
903 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
904
905 /**
906 * GstRtpJitterBuffer::request-pt-map:
907 * @buffer: the object which received the signal
908 * @pt: the pt
909 *
910 * Request the payload type as #GstCaps for @pt.
911 */
912 gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
913 g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
914 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
915 request_pt_map), NULL, NULL, NULL, GST_TYPE_CAPS, 1, G_TYPE_UINT);
916 /**
917 * GstRtpJitterBuffer::handle-sync:
918 * @buffer: the object which received the signal
919 * @struct: a GstStructure containing sync values.
920 *
921 * Be notified of new sync values.
922 */
923 gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
924 g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
925 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
926 handle_sync), NULL, NULL, NULL,
927 G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
928
929 /**
930 * GstRtpJitterBuffer::on-npt-stop:
931 * @buffer: the object which received the signal
932 *
933 * Signal that the jitterbuffer has pushed the RTP packet that corresponds to
934 * the npt-stop position.
935 */
936 gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
937 g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
938 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
939 on_npt_stop), NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
940
941 /**
942 * GstRtpJitterBuffer::clear-pt-map:
943 * @buffer: the object which received the signal
944 *
945 * Invalidate the clock-rate as obtained with the
946 * #GstRtpJitterBuffer::request-pt-map signal.
947 */
948 gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
949 g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
950 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
951 G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
952 NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
953
954 /**
955 * GstRtpJitterBuffer::set-active:
956 * @buffer: the object which received the signal
957 *
958 * Start pushing out packets with the given base time. This signal is only
959 * useful in buffering mode.
960 *
961 * Returns: the time of the last pushed packet.
962 */
963 gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
964 g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
965 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
966 G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
967 NULL, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN, G_TYPE_UINT64);
968
969 gstelement_class->change_state =
970 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
971 gstelement_class->request_new_pad =
972 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
973 gstelement_class->release_pad =
974 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
975 gstelement_class->provide_clock =
976 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
977 gstelement_class->set_clock =
978 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock);
979
980 gst_element_class_add_static_pad_template (gstelement_class,
981 &gst_rtp_jitter_buffer_src_template);
982 gst_element_class_add_static_pad_template (gstelement_class,
983 &gst_rtp_jitter_buffer_sink_template);
984 gst_element_class_add_static_pad_template (gstelement_class,
985 &gst_rtp_jitter_buffer_sink_rtcp_template);
986
987 gst_element_class_set_static_metadata (gstelement_class,
988 "RTP packet jitter-buffer", "Filter/Network/RTP",
989 "A buffer that deals with network jitter and other transmission faults",
990 "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
991 "Wim Taymans <wim.taymans@gmail.com>");
992
993 klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
994 klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
995
996 GST_DEBUG_CATEGORY_INIT
997 (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
998 GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_jitter_buffer_chain_rtcp);
999
1000 gst_type_mark_as_plugin_api (RTP_TYPE_JITTER_BUFFER_MODE, 0);
1001 }
1002
1003 static void
gst_rtp_jitter_buffer_init(GstRtpJitterBuffer * jitterbuffer)1004 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
1005 {
1006 GstRtpJitterBufferPrivate *priv;
1007
1008 priv = gst_rtp_jitter_buffer_get_instance_private (jitterbuffer);
1009 jitterbuffer->priv = priv;
1010
1011 priv->latency_ms = DEFAULT_LATENCY_MS;
1012 priv->latency_ns = priv->latency_ms * GST_MSECOND;
1013 priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
1014 priv->ts_offset = DEFAULT_TS_OFFSET;
1015 priv->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT;
1016 priv->do_lost = DEFAULT_DO_LOST;
1017 priv->post_drop_messages = DEFAULT_POST_DROP_MESSAGES;
1018 priv->drop_messages_interval_ms = DEFAULT_DROP_MESSAGES_INTERVAL_MS;
1019 priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
1020 priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM;
1021 priv->rtx_delay = DEFAULT_RTX_DELAY;
1022 priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
1023 priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
1024 priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
1025 priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
1026 priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
1027 priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES;
1028 priv->rtx_deadline_ms = DEFAULT_RTX_DEADLINE;
1029 priv->rtx_stats_timeout = DEFAULT_RTX_STATS_TIMEOUT;
1030 priv->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
1031 priv->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
1032 priv->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
1033 priv->faststart_min_packets = DEFAULT_FASTSTART_MIN_PACKETS;
1034
1035 priv->ts_offset_remainder = 0;
1036 priv->last_dts = -1;
1037 priv->last_pts = -1;
1038 priv->last_rtptime = -1;
1039 priv->avg_jitter = 0;
1040 priv->last_drop_msg_timestamp = GST_CLOCK_TIME_NONE;
1041 priv->num_too_late = 0;
1042 priv->num_drop_on_latency = 0;
1043 priv->segment_seqnum = GST_SEQNUM_INVALID;
1044 priv->timers = rtp_timer_queue_new ();
1045 priv->rtx_stats_timers = rtp_timer_queue_new ();
1046 priv->jbuf = rtp_jitter_buffer_new ();
1047 g_mutex_init (&priv->jbuf_lock);
1048 g_cond_init (&priv->jbuf_queue);
1049 g_cond_init (&priv->jbuf_timer);
1050 g_cond_init (&priv->jbuf_event);
1051 g_cond_init (&priv->jbuf_query);
1052 g_queue_init (&priv->gap_packets);
1053 gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1054
1055 /* reset skew detection initially */
1056 rtp_jitter_buffer_reset_skew (priv->jbuf);
1057 rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
1058 rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1059 priv->active = TRUE;
1060
1061 priv->srcpad =
1062 gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
1063 "src");
1064
1065 gst_pad_set_activatemode_function (priv->srcpad,
1066 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
1067 gst_pad_set_query_function (priv->srcpad,
1068 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
1069 gst_pad_set_event_function (priv->srcpad,
1070 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
1071
1072 priv->sinkpad =
1073 gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
1074 "sink");
1075
1076 gst_pad_set_chain_function (priv->sinkpad,
1077 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
1078 gst_pad_set_chain_list_function (priv->sinkpad,
1079 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain_list));
1080 gst_pad_set_event_function (priv->sinkpad,
1081 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
1082 gst_pad_set_query_function (priv->sinkpad,
1083 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
1084
1085 gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
1086 gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
1087
1088 GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
1089 }
1090
1091 static void
free_item_and_retain_sticky_events(RTPJitterBufferItem * item,gpointer user_data)1092 free_item_and_retain_sticky_events (RTPJitterBufferItem * item,
1093 gpointer user_data)
1094 {
1095 GList **l = user_data;
1096
1097 if (item->data && item->type == ITEM_TYPE_EVENT
1098 && GST_EVENT_IS_STICKY (item->data)) {
1099 *l = g_list_prepend (*l, item->data);
1100 item->data = NULL;
1101 }
1102
1103 rtp_jitter_buffer_free_item (item);
1104 }
1105
1106 static void
gst_rtp_jitter_buffer_finalize(GObject * object)1107 gst_rtp_jitter_buffer_finalize (GObject * object)
1108 {
1109 GstRtpJitterBuffer *jitterbuffer;
1110 GstRtpJitterBufferPrivate *priv;
1111
1112 jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1113 priv = jitterbuffer->priv;
1114
1115 g_object_unref (priv->timers);
1116 g_object_unref (priv->rtx_stats_timers);
1117 g_mutex_clear (&priv->jbuf_lock);
1118 g_cond_clear (&priv->jbuf_queue);
1119 g_cond_clear (&priv->jbuf_timer);
1120 g_cond_clear (&priv->jbuf_event);
1121 g_cond_clear (&priv->jbuf_query);
1122
1123 rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL);
1124 g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1125 g_queue_clear (&priv->gap_packets);
1126 g_object_unref (priv->jbuf);
1127
1128 G_OBJECT_CLASS (parent_class)->finalize (object);
1129 }
1130
1131 static GstIterator *
gst_rtp_jitter_buffer_iterate_internal_links(GstPad * pad,GstObject * parent)1132 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
1133 {
1134 GstRtpJitterBuffer *jitterbuffer;
1135 GstPad *otherpad = NULL;
1136 GstIterator *it = NULL;
1137 GValue val = { 0, };
1138
1139 jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1140
1141 if (pad == jitterbuffer->priv->sinkpad) {
1142 otherpad = jitterbuffer->priv->srcpad;
1143 } else if (pad == jitterbuffer->priv->srcpad) {
1144 otherpad = jitterbuffer->priv->sinkpad;
1145 } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
1146 it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
1147 }
1148
1149 if (it == NULL) {
1150 g_value_init (&val, GST_TYPE_PAD);
1151 g_value_set_object (&val, otherpad);
1152 it = gst_iterator_new_single (GST_TYPE_PAD, &val);
1153 g_value_unset (&val);
1154 }
1155
1156 return it;
1157 }
1158
1159 static GstPad *
create_rtcp_sink(GstRtpJitterBuffer * jitterbuffer)1160 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1161 {
1162 GstRtpJitterBufferPrivate *priv;
1163
1164 priv = jitterbuffer->priv;
1165
1166 GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
1167
1168 priv->rtcpsinkpad =
1169 gst_pad_new_from_static_template
1170 (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
1171 gst_pad_set_chain_function (priv->rtcpsinkpad,
1172 gst_rtp_jitter_buffer_chain_rtcp);
1173 gst_pad_set_event_function (priv->rtcpsinkpad,
1174 (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
1175 gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
1176 gst_rtp_jitter_buffer_iterate_internal_links);
1177 gst_pad_set_active (priv->rtcpsinkpad, TRUE);
1178 gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1179
1180 return priv->rtcpsinkpad;
1181 }
1182
1183 static void
remove_rtcp_sink(GstRtpJitterBuffer * jitterbuffer)1184 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1185 {
1186 GstRtpJitterBufferPrivate *priv;
1187
1188 priv = jitterbuffer->priv;
1189
1190 GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
1191
1192 gst_pad_set_active (priv->rtcpsinkpad, FALSE);
1193
1194 gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1195 priv->rtcpsinkpad = NULL;
1196 }
1197
1198 static GstPad *
gst_rtp_jitter_buffer_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name,const GstCaps * filter)1199 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
1200 GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
1201 {
1202 GstRtpJitterBuffer *jitterbuffer;
1203 GstElementClass *klass;
1204 GstPad *result;
1205 GstRtpJitterBufferPrivate *priv;
1206
1207 g_return_val_if_fail (templ != NULL, NULL);
1208 g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
1209
1210 jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1211 priv = jitterbuffer->priv;
1212 klass = GST_ELEMENT_GET_CLASS (element);
1213
1214 GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
1215
1216 /* figure out the template */
1217 if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
1218 if (priv->rtcpsinkpad != NULL)
1219 goto exists;
1220
1221 result = create_rtcp_sink (jitterbuffer);
1222 } else
1223 goto wrong_template;
1224
1225 return result;
1226
1227 /* ERRORS */
1228 wrong_template:
1229 {
1230 g_warning ("rtpjitterbuffer: this is not our template");
1231 return NULL;
1232 }
1233 exists:
1234 {
1235 g_warning ("rtpjitterbuffer: pad already requested");
1236 return NULL;
1237 }
1238 }
1239
1240 static void
gst_rtp_jitter_buffer_release_pad(GstElement * element,GstPad * pad)1241 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
1242 {
1243 GstRtpJitterBuffer *jitterbuffer;
1244 GstRtpJitterBufferPrivate *priv;
1245
1246 g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
1247 g_return_if_fail (GST_IS_PAD (pad));
1248
1249 jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1250 priv = jitterbuffer->priv;
1251
1252 GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1253
1254 if (priv->rtcpsinkpad == pad) {
1255 remove_rtcp_sink (jitterbuffer);
1256 } else
1257 goto wrong_pad;
1258
1259 return;
1260
1261 /* ERRORS */
1262 wrong_pad:
1263 {
1264 g_warning ("gstjitterbuffer: asked to release an unknown pad");
1265 return;
1266 }
1267 }
1268
1269 static GstClock *
gst_rtp_jitter_buffer_provide_clock(GstElement * element)1270 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1271 {
1272 return gst_system_clock_obtain ();
1273 }
1274
1275 static gboolean
gst_rtp_jitter_buffer_set_clock(GstElement * element,GstClock * clock)1276 gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock)
1277 {
1278 GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1279
1280 rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock);
1281
1282 return GST_ELEMENT_CLASS (parent_class)->set_clock (element, clock);
1283 }
1284
1285 static void
gst_rtp_jitter_buffer_clear_pt_map(GstRtpJitterBuffer * jitterbuffer)1286 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1287 {
1288 GstRtpJitterBufferPrivate *priv;
1289
1290 priv = jitterbuffer->priv;
1291
1292 /* this will trigger a new pt-map request signal, FIXME, do something better. */
1293
1294 JBUF_LOCK (priv);
1295 priv->clock_rate = -1;
1296 /* do not clear current content, but refresh state for new arrival */
1297 GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1298 rtp_jitter_buffer_reset_skew (priv->jbuf);
1299 JBUF_UNLOCK (priv);
1300 }
1301
1302 static GstClockTime
gst_rtp_jitter_buffer_set_active(GstRtpJitterBuffer * jbuf,gboolean active,guint64 offset)1303 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1304 guint64 offset)
1305 {
1306 GstRtpJitterBufferPrivate *priv;
1307 GstClockTime last_out;
1308 RTPJitterBufferItem *item;
1309
1310 priv = jbuf->priv;
1311
1312 JBUF_LOCK (priv);
1313 GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1314 active, GST_TIME_ARGS (offset));
1315
1316 if (active != priv->active) {
1317 /* add the amount of time spent in paused to the output offset. All
1318 * outgoing buffers will have this offset applied to their timestamps in
1319 * order to make them arrive in time in the sink. */
1320 priv->out_offset = offset;
1321 GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1322 GST_TIME_ARGS (priv->out_offset));
1323 priv->active = active;
1324 JBUF_SIGNAL_EVENT (priv);
1325 }
1326 if (!active) {
1327 rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1328 }
1329 if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1330 /* head buffer timestamp and offset gives our output time */
1331 last_out = item->pts + priv->ts_offset;
1332 } else {
1333 /* use last known time when the buffer is empty */
1334 last_out = priv->last_out_time;
1335 }
1336 JBUF_UNLOCK (priv);
1337
1338 return last_out;
1339 }
1340
1341 static GstCaps *
gst_rtp_jitter_buffer_getcaps(GstPad * pad,GstCaps * filter)1342 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1343 {
1344 GstRtpJitterBuffer *jitterbuffer;
1345 GstRtpJitterBufferPrivate *priv;
1346 GstPad *other;
1347 GstCaps *caps;
1348 GstCaps *templ;
1349
1350 jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1351 priv = jitterbuffer->priv;
1352
1353 other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1354
1355 caps = gst_pad_peer_query_caps (other, filter);
1356
1357 templ = gst_pad_get_pad_template_caps (pad);
1358 if (caps == NULL) {
1359 GST_DEBUG_OBJECT (jitterbuffer, "use template");
1360 caps = templ;
1361 } else {
1362 GstCaps *intersect;
1363
1364 GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1365
1366 intersect = gst_caps_intersect (caps, templ);
1367 gst_caps_unref (caps);
1368 gst_caps_unref (templ);
1369
1370 caps = intersect;
1371 }
1372 gst_object_unref (jitterbuffer);
1373
1374 return caps;
1375 }
1376
1377 /*
1378 * Must be called with JBUF_LOCK held
1379 */
1380
1381 static gboolean
gst_jitter_buffer_sink_parse_caps(GstRtpJitterBuffer * jitterbuffer,GstCaps * caps,gint pt)1382 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1383 GstCaps * caps, gint pt)
1384 {
1385 GstRtpJitterBufferPrivate *priv;
1386 GstStructure *caps_struct;
1387 guint val;
1388 gint payload = -1;
1389 GstClockTime tval;
1390 const gchar *ts_refclk, *mediaclk;
1391
1392 priv = jitterbuffer->priv;
1393
1394 /* first parse the caps */
1395 caps_struct = gst_caps_get_structure (caps, 0);
1396
1397 GST_DEBUG_OBJECT (jitterbuffer, "got caps %" GST_PTR_FORMAT, caps);
1398
1399 if (gst_structure_get_int (caps_struct, "payload", &payload) && pt != -1
1400 && payload != pt) {
1401 GST_ERROR_OBJECT (jitterbuffer,
1402 "Got caps with wrong payload type (got %d, expected %d)", pt, payload);
1403 return FALSE;
1404 }
1405
1406 if (payload != -1) {
1407 GST_DEBUG_OBJECT (jitterbuffer, "Got payload type %d", payload);
1408 priv->last_pt = payload;
1409 }
1410
1411 /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1412 * measure the amount of data in the buffer */
1413 if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1414 goto error;
1415
1416 if (priv->clock_rate <= 0)
1417 goto wrong_rate;
1418
1419 GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1420
1421 rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1422
1423 gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
1424
1425 /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1426 * can use this to track the amount of time elapsed on the sender. */
1427 if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1428 priv->clock_base = val;
1429 else
1430 priv->clock_base = -1;
1431
1432 priv->ext_timestamp = priv->clock_base;
1433
1434 GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1435 priv->clock_base);
1436
1437 if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1438 /* first expected seqnum, only update when we didn't have a previous base. */
1439 if (priv->next_in_seqnum == -1)
1440 priv->next_in_seqnum = val;
1441 if (priv->next_seqnum == -1) {
1442 priv->next_seqnum = val;
1443 JBUF_SIGNAL_EVENT (priv);
1444 }
1445 priv->seqnum_base = val;
1446 } else {
1447 priv->seqnum_base = -1;
1448 }
1449
1450 GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1451
1452 /* the start and stop times. The seqnum-base corresponds to the start time. We
1453 * will keep track of the seqnums on the output and when we reach the one
1454 * corresponding to npt-stop, we emit the npt-stop-reached signal */
1455 if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1456 priv->npt_start = tval;
1457 else
1458 priv->npt_start = 0;
1459
1460 if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1461 priv->npt_stop = tval;
1462 else
1463 priv->npt_stop = -1;
1464
1465 GST_DEBUG_OBJECT (jitterbuffer,
1466 "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1467 GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1468
1469 if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) {
1470 GstClock *clock = NULL;
1471 guint64 clock_offset = -1;
1472
1473 GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s",
1474 ts_refclk);
1475
1476 if (g_str_has_prefix (ts_refclk, "ntp=")) {
1477 if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) {
1478 GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks");
1479 } else {
1480 const gchar *host, *portstr;
1481 gchar *hostname;
1482 guint port;
1483
1484 host = ts_refclk + sizeof ("ntp=") - 1;
1485 if (host[0] == '[') {
1486 /* IPv6 */
1487 portstr = strchr (host, ']');
1488 if (portstr && portstr[1] == ':')
1489 portstr = portstr + 1;
1490 else
1491 portstr = NULL;
1492 } else {
1493 portstr = strrchr (host, ':');
1494 }
1495
1496
1497 if (!portstr || sscanf (portstr, ":%u", &port) != 1)
1498 port = 123;
1499
1500 if (portstr)
1501 hostname = g_strndup (host, (portstr - host));
1502 else
1503 hostname = g_strdup (host);
1504
1505 clock = gst_ntp_clock_new (NULL, hostname, port, 0);
1506 g_free (hostname);
1507 }
1508 } else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) {
1509 const gchar *domainstr =
1510 ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1;
1511 guint domain;
1512
1513 if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1)
1514 domain = 0;
1515
1516 clock = gst_ptp_clock_new (NULL, domain);
1517 } else {
1518 GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock");
1519 }
1520
1521 if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) {
1522 GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk);
1523
1524 if (!g_str_has_prefix (mediaclk, "direct=") ||
1525 !g_ascii_string_to_unsigned (&mediaclk[7], 10, 0, G_MAXUINT64,
1526 &clock_offset, NULL))
1527 GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock");
1528 if (strstr (mediaclk, "rate=") != NULL) {
1529 GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported");
1530 clock_offset = -1;
1531 }
1532 }
1533
1534 rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset);
1535 } else {
1536 rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1);
1537 }
1538
1539 return TRUE;
1540
1541 /* ERRORS */
1542 error:
1543 {
1544 GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1545 return FALSE;
1546 }
1547 wrong_rate:
1548 {
1549 GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1550 return FALSE;
1551 }
1552 }
1553
1554 static void
gst_rtp_jitter_buffer_flush_start(GstRtpJitterBuffer * jitterbuffer)1555 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1556 {
1557 GstRtpJitterBufferPrivate *priv;
1558
1559 priv = jitterbuffer->priv;
1560
1561 JBUF_LOCK (priv);
1562 /* mark ourselves as flushing */
1563 priv->srcresult = GST_FLOW_FLUSHING;
1564 GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1565 /* this unblocks any waiting pops on the src pad task */
1566 JBUF_SIGNAL_EVENT (priv);
1567 JBUF_SIGNAL_QUERY (priv, FALSE);
1568 JBUF_SIGNAL_QUEUE (priv);
1569 JBUF_UNLOCK (priv);
1570 }
1571
1572 static void
gst_rtp_jitter_buffer_flush_stop(GstRtpJitterBuffer * jitterbuffer)1573 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1574 {
1575 GstRtpJitterBufferPrivate *priv;
1576
1577 priv = jitterbuffer->priv;
1578
1579 JBUF_LOCK (priv);
1580 GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1581 /* Mark as non flushing */
1582 priv->srcresult = GST_FLOW_OK;
1583 gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1584 priv->last_popped_seqnum = -1;
1585 priv->last_out_time = GST_CLOCK_TIME_NONE;
1586 priv->next_seqnum = -1;
1587 priv->seqnum_base = -1;
1588 priv->ips_rtptime = -1;
1589 priv->ips_pts = GST_CLOCK_TIME_NONE;
1590 priv->packet_spacing = 0;
1591 priv->next_in_seqnum = -1;
1592 priv->clock_rate = -1;
1593 priv->last_pt = -1;
1594 priv->eos = FALSE;
1595 priv->estimated_eos = -1;
1596 priv->last_elapsed = 0;
1597 priv->ext_timestamp = -1;
1598 priv->avg_jitter = 0;
1599 priv->last_dts = -1;
1600 priv->last_rtptime = -1;
1601 priv->last_in_pts = 0;
1602 priv->equidistant = 0;
1603 priv->segment_seqnum = GST_SEQNUM_INVALID;
1604 priv->last_drop_msg_timestamp = GST_CLOCK_TIME_NONE;
1605 priv->num_too_late = 0;
1606 priv->num_drop_on_latency = 0;
1607 GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1608 rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL);
1609 rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1610 rtp_jitter_buffer_reset_skew (priv->jbuf);
1611 rtp_timer_queue_remove_all (priv->timers);
1612 g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1613 g_queue_clear (&priv->gap_packets);
1614 JBUF_UNLOCK (priv);
1615 }
1616
1617 static gboolean
gst_rtp_jitter_buffer_src_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)1618 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1619 GstPadMode mode, gboolean active)
1620 {
1621 gboolean result;
1622 GstRtpJitterBuffer *jitterbuffer = NULL;
1623
1624 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1625
1626 switch (mode) {
1627 case GST_PAD_MODE_PUSH:
1628 if (active) {
1629 /* allow data processing */
1630 gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1631
1632 /* start pushing out buffers */
1633 GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1634 result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1635 (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1636 } else {
1637 /* make sure all data processing stops ASAP */
1638 gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1639
1640 /* NOTE this will hardlock if the state change is called from the src pad
1641 * task thread because we will _join() the thread. */
1642 GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1643 result = gst_pad_stop_task (pad);
1644 }
1645 break;
1646 default:
1647 result = FALSE;
1648 break;
1649 }
1650 return result;
1651 }
1652
1653 static GstStateChangeReturn
gst_rtp_jitter_buffer_change_state(GstElement * element,GstStateChange transition)1654 gst_rtp_jitter_buffer_change_state (GstElement * element,
1655 GstStateChange transition)
1656 {
1657 GstRtpJitterBuffer *jitterbuffer;
1658 GstRtpJitterBufferPrivate *priv;
1659 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1660
1661 jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1662 priv = jitterbuffer->priv;
1663
1664 switch (transition) {
1665 case GST_STATE_CHANGE_NULL_TO_READY:
1666 break;
1667 case GST_STATE_CHANGE_READY_TO_PAUSED:
1668 JBUF_LOCK (priv);
1669 /* reset negotiated values */
1670 priv->clock_rate = -1;
1671 priv->clock_base = -1;
1672 priv->peer_latency = 0;
1673 priv->last_pt = -1;
1674 /* block until we go to PLAYING */
1675 priv->blocked = TRUE;
1676 priv->timer_running = TRUE;
1677 priv->srcresult = GST_FLOW_OK;
1678 priv->timer_thread =
1679 g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1680 JBUF_UNLOCK (priv);
1681 break;
1682 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1683 JBUF_LOCK (priv);
1684 /* unblock to allow streaming in PLAYING */
1685 priv->blocked = FALSE;
1686 JBUF_SIGNAL_EVENT (priv);
1687 JBUF_SIGNAL_TIMER (priv);
1688 JBUF_UNLOCK (priv);
1689 break;
1690 default:
1691 break;
1692 }
1693
1694 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1695
1696 switch (transition) {
1697 case GST_STATE_CHANGE_READY_TO_PAUSED:
1698 /* we are a live element because we sync to the clock, which we can only
1699 * do in the PLAYING state */
1700 if (ret != GST_STATE_CHANGE_FAILURE)
1701 ret = GST_STATE_CHANGE_NO_PREROLL;
1702 break;
1703 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1704 JBUF_LOCK (priv);
1705 /* block to stop streaming when PAUSED */
1706 priv->blocked = TRUE;
1707 unschedule_current_timer (jitterbuffer);
1708 JBUF_UNLOCK (priv);
1709 if (ret != GST_STATE_CHANGE_FAILURE)
1710 ret = GST_STATE_CHANGE_NO_PREROLL;
1711 break;
1712 case GST_STATE_CHANGE_PAUSED_TO_READY:
1713 JBUF_LOCK (priv);
1714 gst_buffer_replace (&priv->last_sr, NULL);
1715 priv->timer_running = FALSE;
1716 priv->srcresult = GST_FLOW_FLUSHING;
1717 unschedule_current_timer (jitterbuffer);
1718 JBUF_SIGNAL_TIMER (priv);
1719 JBUF_SIGNAL_QUERY (priv, FALSE);
1720 JBUF_SIGNAL_QUEUE (priv);
1721 JBUF_UNLOCK (priv);
1722 g_thread_join (priv->timer_thread);
1723 priv->timer_thread = NULL;
1724 break;
1725 case GST_STATE_CHANGE_READY_TO_NULL:
1726 break;
1727 default:
1728 break;
1729 }
1730
1731 return ret;
1732 }
1733
1734 static gboolean
gst_rtp_jitter_buffer_src_event(GstPad * pad,GstObject * parent,GstEvent * event)1735 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1736 GstEvent * event)
1737 {
1738 gboolean ret = TRUE;
1739 GstRtpJitterBuffer *jitterbuffer;
1740 GstRtpJitterBufferPrivate *priv;
1741
1742 jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1743 priv = jitterbuffer->priv;
1744
1745 GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1746
1747 switch (GST_EVENT_TYPE (event)) {
1748 case GST_EVENT_LATENCY:
1749 {
1750 GstClockTime latency;
1751
1752 gst_event_parse_latency (event, &latency);
1753
1754 GST_DEBUG_OBJECT (jitterbuffer,
1755 "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1756
1757 JBUF_LOCK (priv);
1758 /* adjust the overall buffer delay to the total pipeline latency in
1759 * buffering mode because if downstream consumes too fast (because of
1760 * large latency or queues, we would start rebuffering again. */
1761 if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1762 RTP_JITTER_BUFFER_MODE_BUFFER) {
1763 rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1764 }
1765 JBUF_UNLOCK (priv);
1766
1767 ret = gst_pad_push_event (priv->sinkpad, event);
1768 break;
1769 }
1770 default:
1771 ret = gst_pad_push_event (priv->sinkpad, event);
1772 break;
1773 }
1774
1775 return ret;
1776 }
1777
1778 /* handles and stores the event in the jitterbuffer, must be called with
1779 * LOCK */
1780 static gboolean
queue_event(GstRtpJitterBuffer * jitterbuffer,GstEvent * event)1781 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1782 {
1783 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1784 gboolean head;
1785
1786 switch (GST_EVENT_TYPE (event)) {
1787 case GST_EVENT_CAPS:
1788 {
1789 GstCaps *caps;
1790
1791 gst_event_parse_caps (event, &caps);
1792 gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, -1);
1793 break;
1794 }
1795 case GST_EVENT_SEGMENT:
1796 {
1797 GstSegment segment;
1798 gst_event_copy_segment (event, &segment);
1799
1800 priv->segment_seqnum = gst_event_get_seqnum (event);
1801
1802 /* we need time for now */
1803 if (segment.format != GST_FORMAT_TIME) {
1804 GST_DEBUG_OBJECT (jitterbuffer, "ignoring non-TIME newsegment");
1805 gst_event_unref (event);
1806
1807 gst_segment_init (&segment, GST_FORMAT_TIME);
1808 event = gst_event_new_segment (&segment);
1809 gst_event_set_seqnum (event, priv->segment_seqnum);
1810 }
1811
1812 priv->segment = segment;
1813 break;
1814 }
1815 case GST_EVENT_EOS:
1816 priv->eos = TRUE;
1817 rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1818 break;
1819 default:
1820 break;
1821 }
1822
1823 GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1824 head = rtp_jitter_buffer_append_event (priv->jbuf, event);
1825 if (head || priv->eos)
1826 JBUF_SIGNAL_EVENT (priv);
1827
1828 return TRUE;
1829 }
1830
1831 static gboolean
gst_rtp_jitter_buffer_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)1832 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1833 GstEvent * event)
1834 {
1835 gboolean ret = TRUE;
1836 GstRtpJitterBuffer *jitterbuffer;
1837 GstRtpJitterBufferPrivate *priv;
1838
1839 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1840 priv = jitterbuffer->priv;
1841
1842 GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1843
1844 switch (GST_EVENT_TYPE (event)) {
1845 case GST_EVENT_FLUSH_START:
1846 ret = gst_pad_push_event (priv->srcpad, event);
1847 gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1848 /* wait for the loop to go into PAUSED */
1849 gst_pad_pause_task (priv->srcpad);
1850 break;
1851 case GST_EVENT_FLUSH_STOP:
1852 ret = gst_pad_push_event (priv->srcpad, event);
1853 ret =
1854 gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1855 GST_PAD_MODE_PUSH, TRUE);
1856 break;
1857 default:
1858 if (GST_EVENT_IS_SERIALIZED (event)) {
1859 /* serialized events go in the queue */
1860 JBUF_LOCK (priv);
1861 if (priv->srcresult != GST_FLOW_OK) {
1862 /* Errors in sticky event pushing are no problem and ignored here
1863 * as they will cause more meaningful errors during data flow.
1864 * For EOS events, that are not followed by data flow, we still
1865 * return FALSE here though.
1866 */
1867 if (!GST_EVENT_IS_STICKY (event) ||
1868 GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1869 goto out_flow_error;
1870 }
1871 /* refuse more events on EOS */
1872 if (priv->eos)
1873 goto out_eos;
1874 ret = queue_event (jitterbuffer, event);
1875 JBUF_UNLOCK (priv);
1876 } else {
1877 /* non-serialized events are forwarded downstream immediately */
1878 ret = gst_pad_push_event (priv->srcpad, event);
1879 }
1880 break;
1881 }
1882 return ret;
1883
1884 /* ERRORS */
1885 out_flow_error:
1886 {
1887 GST_DEBUG_OBJECT (jitterbuffer,
1888 "refusing event, we have a downstream flow error: %s",
1889 gst_flow_get_name (priv->srcresult));
1890 JBUF_UNLOCK (priv);
1891 gst_event_unref (event);
1892 return FALSE;
1893 }
1894 out_eos:
1895 {
1896 GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1897 JBUF_UNLOCK (priv);
1898 gst_event_unref (event);
1899 return FALSE;
1900 }
1901 }
1902
1903 static gboolean
gst_rtp_jitter_buffer_sink_rtcp_event(GstPad * pad,GstObject * parent,GstEvent * event)1904 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1905 GstEvent * event)
1906 {
1907 gboolean ret = TRUE;
1908 GstRtpJitterBuffer *jitterbuffer;
1909
1910 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1911
1912 GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1913
1914 switch (GST_EVENT_TYPE (event)) {
1915 case GST_EVENT_FLUSH_START:
1916 gst_event_unref (event);
1917 break;
1918 case GST_EVENT_FLUSH_STOP:
1919 gst_event_unref (event);
1920 break;
1921 default:
1922 ret = gst_pad_event_default (pad, parent, event);
1923 break;
1924 }
1925
1926 return ret;
1927 }
1928
1929 /*
1930 * Must be called with JBUF_LOCK held, will release the LOCK when emitting the
1931 * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1932 * GST_FLOW_FLUSHING when the element is shutting down. On success
1933 * GST_FLOW_OK is returned.
1934 */
1935 static GstFlowReturn
gst_rtp_jitter_buffer_get_clock_rate(GstRtpJitterBuffer * jitterbuffer,guint8 pt)1936 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1937 guint8 pt)
1938 {
1939 GValue ret = { 0 };
1940 GValue args[2] = { {0}, {0} };
1941 GstCaps *caps;
1942 gboolean res;
1943
1944 g_value_init (&args[0], GST_TYPE_ELEMENT);
1945 g_value_set_object (&args[0], jitterbuffer);
1946 g_value_init (&args[1], G_TYPE_UINT);
1947 g_value_set_uint (&args[1], pt);
1948
1949 g_value_init (&ret, GST_TYPE_CAPS);
1950 g_value_set_boxed (&ret, NULL);
1951
1952 JBUF_UNLOCK (jitterbuffer->priv);
1953 g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1954 &ret);
1955 JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1956
1957 g_value_unset (&args[0]);
1958 g_value_unset (&args[1]);
1959 caps = (GstCaps *) g_value_dup_boxed (&ret);
1960 g_value_unset (&ret);
1961 if (!caps)
1962 goto no_caps;
1963
1964 res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
1965 gst_caps_unref (caps);
1966
1967 if (G_UNLIKELY (!res))
1968 goto parse_failed;
1969
1970 return GST_FLOW_OK;
1971
1972 /* ERRORS */
1973 no_caps:
1974 {
1975 GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1976 return GST_FLOW_ERROR;
1977 }
1978 out_flushing:
1979 {
1980 GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1981 return GST_FLOW_FLUSHING;
1982 }
1983 parse_failed:
1984 {
1985 GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1986 return GST_FLOW_ERROR;
1987 }
1988 }
1989
1990 /* call with jbuf lock held */
1991 static GstMessage *
check_buffering_percent(GstRtpJitterBuffer * jitterbuffer,gint percent)1992 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1993 {
1994 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1995 GstMessage *message = NULL;
1996
1997 if (percent == -1)
1998 return NULL;
1999
2000 /* Post a buffering message */
2001 if (priv->last_percent != percent) {
2002 priv->last_percent = percent;
2003 message =
2004 gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
2005 gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
2006 }
2007
2008 return message;
2009 }
2010
2011 /* call with jbuf lock held */
2012 static GstMessage *
new_drop_message(GstRtpJitterBuffer * jitterbuffer,guint seqnum,GstClockTime timestamp,DropMessageReason reason)2013 new_drop_message (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
2014 GstClockTime timestamp, DropMessageReason reason)
2015 {
2016
2017 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2018 GstMessage *drop_msg = NULL;
2019 GstStructure *s;
2020 GstClockTime current_time;
2021 GstClockTime time_diff;
2022 const gchar *reason_str;
2023
2024 current_time = get_current_running_time (jitterbuffer);
2025 time_diff = current_time - priv->last_drop_msg_timestamp;
2026
2027 if (reason == REASON_TOO_LATE) {
2028 priv->num_too_late++;
2029 reason_str = "too-late";
2030 } else if (reason == REASON_DROP_ON_LATENCY) {
2031 priv->num_drop_on_latency++;
2032 reason_str = "drop-on-latency";
2033 } else {
2034 GST_WARNING_OBJECT (jitterbuffer, "Invalid reason for drop message");
2035 return drop_msg;
2036 }
2037
2038 /* Only create new drop_msg if time since last drop_msg is larger that
2039 * that the set interval, or if it is the first drop message posted */
2040 if ((time_diff >= priv->drop_messages_interval_ms * GST_MSECOND) ||
2041 (priv->last_drop_msg_timestamp == GST_CLOCK_TIME_NONE)) {
2042
2043 s = gst_structure_new ("drop-msg",
2044 "seqnum", G_TYPE_UINT, seqnum,
2045 "timestamp", GST_TYPE_CLOCK_TIME, timestamp,
2046 "reason", G_TYPE_STRING, reason_str,
2047 "num-too-late", G_TYPE_UINT, priv->num_too_late,
2048 "num-drop-on-latency", G_TYPE_UINT, priv->num_drop_on_latency, NULL);
2049
2050 priv->last_drop_msg_timestamp = current_time;
2051 priv->num_too_late = 0;
2052 priv->num_drop_on_latency = 0;
2053 drop_msg = gst_message_new_element (GST_OBJECT (jitterbuffer), s);
2054 }
2055 return drop_msg;
2056 }
2057
2058
2059 static inline GstClockTimeDiff
timeout_offset(GstRtpJitterBuffer * jitterbuffer)2060 timeout_offset (GstRtpJitterBuffer * jitterbuffer)
2061 {
2062 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2063 return priv->ts_offset + priv->out_offset + priv->latency_ns;
2064 }
2065
2066 static inline GstClockTime
get_pts_timeout(const RtpTimer * timer)2067 get_pts_timeout (const RtpTimer * timer)
2068 {
2069 if (timer->timeout == -1)
2070 return -1;
2071
2072 return timer->timeout - timer->offset;
2073 }
2074
2075 static inline gboolean
safe_add(guint64 * res,guint64 val,gint64 offset)2076 safe_add (guint64 * res, guint64 val, gint64 offset)
2077 {
2078 if (val <= G_MAXINT64) {
2079 gint64 tmp = (gint64) val + offset;
2080 if (tmp >= 0) {
2081 *res = tmp;
2082 return TRUE;
2083 }
2084 return FALSE;
2085 }
2086 /* From here, val > G_MAXINT64 */
2087
2088 /* Negative value */
2089 if (offset < 0 && val < -offset)
2090 return FALSE;
2091
2092 *res = val + offset;
2093 return TRUE;
2094 }
2095
2096 static void
update_timer_offsets(GstRtpJitterBuffer * jitterbuffer)2097 update_timer_offsets (GstRtpJitterBuffer * jitterbuffer)
2098 {
2099 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2100 RtpTimer *test = rtp_timer_queue_peek_earliest (priv->timers);
2101 GstClockTimeDiff new_offset = timeout_offset (jitterbuffer);
2102
2103 while (test) {
2104 if (test->type != RTP_TIMER_EXPECTED) {
2105 GstClockTime pts = get_pts_timeout (test);
2106 if (safe_add (&test->timeout, pts, new_offset)) {
2107 test->offset = new_offset;
2108 } else {
2109 GST_DEBUG_OBJECT (jitterbuffer,
2110 "Invalidating timeout (pts lower than new offset)");
2111 test->timeout = GST_CLOCK_TIME_NONE;
2112 test->offset = 0;
2113 }
2114 /* as we apply the offset on all timers, the order of timers won't
2115 * change and we can skip updating the timer queue */
2116 }
2117
2118 test = rtp_timer_get_next (test);
2119 }
2120 }
2121
2122 static void
update_offset(GstRtpJitterBuffer * jitterbuffer)2123 update_offset (GstRtpJitterBuffer * jitterbuffer)
2124 {
2125 GstRtpJitterBufferPrivate *priv;
2126
2127 priv = jitterbuffer->priv;
2128
2129 if (priv->ts_offset_remainder != 0) {
2130 GST_DEBUG ("adjustment %" G_GUINT64_FORMAT " remain %" G_GINT64_FORMAT
2131 " off %" G_GINT64_FORMAT, priv->max_ts_offset_adjustment,
2132 priv->ts_offset_remainder, priv->ts_offset);
2133 if (ABS (priv->ts_offset_remainder) > priv->max_ts_offset_adjustment) {
2134 if (priv->ts_offset_remainder > 0) {
2135 priv->ts_offset += priv->max_ts_offset_adjustment;
2136 priv->ts_offset_remainder -= priv->max_ts_offset_adjustment;
2137 } else {
2138 priv->ts_offset -= priv->max_ts_offset_adjustment;
2139 priv->ts_offset_remainder += priv->max_ts_offset_adjustment;
2140 }
2141 } else {
2142 priv->ts_offset += priv->ts_offset_remainder;
2143 priv->ts_offset_remainder = 0;
2144 }
2145
2146 update_timer_offsets (jitterbuffer);
2147 }
2148 }
2149
2150 static GstClockTime
apply_offset(GstRtpJitterBuffer * jitterbuffer,GstClockTime timestamp)2151 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
2152 {
2153 GstRtpJitterBufferPrivate *priv;
2154
2155 priv = jitterbuffer->priv;
2156
2157 if (timestamp == -1)
2158 return -1;
2159
2160 /* apply the timestamp offset, this is used for inter stream sync */
2161 if (!safe_add (×tamp, timestamp, priv->ts_offset))
2162 timestamp = 0;
2163 /* add the offset, this is used when buffering */
2164 timestamp += priv->out_offset;
2165
2166 return timestamp;
2167 }
2168
2169 static void
unschedule_current_timer(GstRtpJitterBuffer * jitterbuffer)2170 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
2171 {
2172 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2173
2174 if (priv->clock_id) {
2175 GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
2176 gst_clock_id_unschedule (priv->clock_id);
2177 priv->clock_id = NULL;
2178 }
2179 }
2180
2181 static void
update_current_timer(GstRtpJitterBuffer * jitterbuffer)2182 update_current_timer (GstRtpJitterBuffer * jitterbuffer)
2183 {
2184 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2185 RtpTimer *timer;
2186
2187 timer = rtp_timer_queue_peek_earliest (priv->timers);
2188
2189 /* we never need to wakeup the timer thread when there is no more timers, if
2190 * it was waiting on a clock id, it will simply do later and then wait on
2191 * the conditions */
2192 if (timer == NULL) {
2193 GST_DEBUG_OBJECT (jitterbuffer, "no more timers");
2194 return;
2195 }
2196
2197 GST_DEBUG_OBJECT (jitterbuffer, "waiting till %" GST_TIME_FORMAT
2198 " and earliest timeout is at %" GST_TIME_FORMAT,
2199 GST_TIME_ARGS (priv->timer_timeout), GST_TIME_ARGS (timer->timeout));
2200
2201 /* wakeup the timer thread in case the timer queue was empty */
2202 JBUF_SIGNAL_TIMER (priv);
2203
2204 /* no need to wait if the current wait is earlier or later */
2205 if (timer->timeout != -1 && timer->timeout >= priv->timer_timeout)
2206 return;
2207
2208 /* for other cases, force a reschedule of the timer thread */
2209 unschedule_current_timer (jitterbuffer);
2210 }
2211
2212 /* get the extra delay to wait before sending RTX */
2213 static GstClockTime
get_rtx_delay(GstRtpJitterBufferPrivate * priv)2214 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
2215 {
2216 GstClockTime delay;
2217
2218 if (priv->rtx_delay == -1) {
2219 /* the maximum delay for any RTX-packet is given by the latency, since
2220 anything after that is considered lost. For various calulcations,
2221 (given large avg_jitter and/or packet_spacing), the resulting delay
2222 could exceed the configured latency, ending up issuing an RTX-request
2223 that would never arrive in time. To help this we cap the delay
2224 for any RTX with the last possible time it could still arrive in time. */
2225 GstClockTime delay_max = (priv->latency_ns > priv->avg_rtx_rtt) ?
2226 priv->latency_ns - priv->avg_rtx_rtt : priv->latency_ns;
2227
2228 if (priv->avg_jitter == 0 && priv->packet_spacing == 0) {
2229 delay = DEFAULT_AUTO_RTX_DELAY;
2230 } else {
2231 /* jitter is in nanoseconds, maximum of 2x jitter and half the
2232 * packet spacing is a good margin */
2233 delay = MAX (priv->avg_jitter * 2, priv->packet_spacing / 2);
2234 }
2235
2236 delay = MIN (delay_max, delay);
2237 } else {
2238 delay = priv->rtx_delay * GST_MSECOND;
2239 }
2240 if (priv->rtx_min_delay > 0)
2241 delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
2242
2243 return delay;
2244 }
2245
2246 /* we just received a packet with seqnum and dts.
2247 *
2248 * First check for old seqnum that we are still expecting. If the gap with the
2249 * current seqnum is too big, unschedule the timeouts.
2250 *
2251 * If we have a valid packet spacing estimate we can set a timer for when we
2252 * should receive the next packet.
2253 * If we don't have a valid estimate, we remove any timer we might have
2254 * had for this packet.
2255 */
2256 static void
update_rtx_timers(GstRtpJitterBuffer * jitterbuffer,guint16 seqnum,GstClockTime dts,GstClockTime pts,gboolean do_next_seqnum,gboolean is_rtx,RtpTimer * timer)2257 update_rtx_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
2258 GstClockTime dts, GstClockTime pts, gboolean do_next_seqnum,
2259 gboolean is_rtx, RtpTimer * timer)
2260 {
2261 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2262 gboolean is_stats_timer = FALSE;
2263
2264 if (timer && rtp_timer_queue_find (priv->rtx_stats_timers, timer->seqnum))
2265 is_stats_timer = TRUE;
2266
2267 /* schedule immediatly expected timer which exceed the maximum RTX delay
2268 * reorder configuration */
2269 if (priv->do_retransmission && priv->rtx_delay_reorder > 0) {
2270 RtpTimer *test = rtp_timer_queue_peek_earliest (priv->timers);
2271 while (test) {
2272 gint gap;
2273
2274 /* filter the timer type to speed up this loop */
2275 if (test->type != RTP_TIMER_EXPECTED) {
2276 test = rtp_timer_get_next (test);
2277 continue;
2278 }
2279
2280 gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
2281
2282 GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d",
2283 test->type, test->seqnum, seqnum, gap);
2284
2285 /* if this expected packet have a smaller gap then the configured one,
2286 * then earlier timer are not expected to have bigger gap as the timer
2287 * queue is ordered */
2288 if (gap <= priv->rtx_delay_reorder)
2289 break;
2290
2291 /* max gap, we exceeded the max reorder distance and we don't expect the
2292 * missing packet to be this reordered */
2293 if (test->num_rtx_retry == 0 && test->type == RTP_TIMER_EXPECTED)
2294 rtp_timer_queue_update_timer (priv->timers, test, test->seqnum,
2295 -1, 0, 0, FALSE);
2296
2297 test = rtp_timer_get_next (test);
2298 }
2299 }
2300
2301 do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
2302 && priv->rtx_next_seqnum;
2303
2304 if (timer && timer->type != RTP_TIMER_DEADLINE) {
2305 if (timer->num_rtx_retry > 0) {
2306 if (is_rtx) {
2307 update_rtx_stats (jitterbuffer, timer, dts, TRUE);
2308 /* don't try to estimate the next seqnum because this is a retransmitted
2309 * packet and it probably did not arrive with the expected packet
2310 * spacing. */
2311 do_next_seqnum = FALSE;
2312 }
2313
2314 if (!is_stats_timer && (!is_rtx || timer->num_rtx_retry > 1)) {
2315 RtpTimer *stats_timer = rtp_timer_dup (timer);
2316 /* Store timer in order to record stats when/if the retransmitted
2317 * packet arrives. We should also store timer information if we've
2318 * requested retransmission more than once since we may receive
2319 * several retransmitted packets. For accuracy we should update the
2320 * stats also when the redundant retransmitted packets arrives. */
2321 stats_timer->timeout = pts + priv->rtx_stats_timeout * GST_MSECOND;
2322 stats_timer->type = RTP_TIMER_EXPECTED;
2323 rtp_timer_queue_insert (priv->rtx_stats_timers, stats_timer);
2324 }
2325 }
2326 }
2327
2328 if (do_next_seqnum && pts != GST_CLOCK_TIME_NONE) {
2329 GstClockTime next_expected_pts, delay;
2330
2331 /* calculate expected arrival time of the next seqnum */
2332 next_expected_pts = pts + priv->packet_spacing;
2333
2334 delay = get_rtx_delay (priv);
2335
2336 /* and update/install timer for next seqnum */
2337 GST_DEBUG_OBJECT (jitterbuffer, "Add RTX timer #%d, next_expected_pts %"
2338 GST_TIME_FORMAT ", delay %" GST_TIME_FORMAT ", est packet duration %"
2339 GST_TIME_FORMAT ", jitter %" GST_TIME_FORMAT, priv->next_in_seqnum,
2340 GST_TIME_ARGS (next_expected_pts), GST_TIME_ARGS (delay),
2341 GST_TIME_ARGS (priv->packet_spacing), GST_TIME_ARGS (priv->avg_jitter));
2342
2343 if (timer && !is_stats_timer) {
2344 timer->type = RTP_TIMER_EXPECTED;
2345 rtp_timer_queue_update_timer (priv->timers, timer, priv->next_in_seqnum,
2346 next_expected_pts, delay, 0, TRUE);
2347 } else {
2348 rtp_timer_queue_set_expected (priv->timers, priv->next_in_seqnum,
2349 next_expected_pts, delay, priv->packet_spacing);
2350 }
2351 } else if (timer && timer->type != RTP_TIMER_DEADLINE && !is_stats_timer) {
2352 /* if we had a timer, remove it, we don't know when to expect the next
2353 * packet. */
2354 rtp_timer_queue_unschedule (priv->timers, timer);
2355 rtp_timer_free (timer);
2356 }
2357 }
2358
2359 static void
calculate_packet_spacing(GstRtpJitterBuffer * jitterbuffer,guint32 rtptime,GstClockTime pts)2360 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
2361 GstClockTime pts)
2362 {
2363 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2364
2365 /* we need consecutive seqnums with a different
2366 * rtptime to estimate the packet spacing. */
2367 if (priv->ips_rtptime != rtptime) {
2368 /* rtptime changed, check pts diff */
2369 if (priv->ips_pts != -1 && pts != -1 && pts > priv->ips_pts) {
2370 GstClockTime new_packet_spacing = pts - priv->ips_pts;
2371 GstClockTime old_packet_spacing = priv->packet_spacing;
2372
2373 /* Biased towards bigger packet spacings to prevent
2374 * too many unneeded retransmission requests for next
2375 * packets that just arrive a little later than we would
2376 * expect */
2377 if (old_packet_spacing > new_packet_spacing)
2378 priv->packet_spacing =
2379 (new_packet_spacing + 3 * old_packet_spacing) / 4;
2380 else if (old_packet_spacing > 0)
2381 priv->packet_spacing =
2382 (3 * new_packet_spacing + old_packet_spacing) / 4;
2383 else
2384 priv->packet_spacing = new_packet_spacing;
2385
2386 GST_DEBUG_OBJECT (jitterbuffer,
2387 "new packet spacing %" GST_TIME_FORMAT
2388 " old packet spacing %" GST_TIME_FORMAT
2389 " combined to %" GST_TIME_FORMAT,
2390 GST_TIME_ARGS (new_packet_spacing),
2391 GST_TIME_ARGS (old_packet_spacing),
2392 GST_TIME_ARGS (priv->packet_spacing));
2393 }
2394 priv->ips_rtptime = rtptime;
2395 priv->ips_pts = pts;
2396 }
2397 }
2398
2399 static void
insert_lost_event(GstRtpJitterBuffer * jitterbuffer,guint16 seqnum,guint lost_packets,GstClockTime timestamp,GstClockTime duration,guint num_rtx_retry)2400 insert_lost_event (GstRtpJitterBuffer * jitterbuffer,
2401 guint16 seqnum, guint lost_packets, GstClockTime timestamp,
2402 GstClockTime duration, guint num_rtx_retry)
2403 {
2404 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2405 GstEvent *event = NULL;
2406 guint next_in_seqnum;
2407
2408 /* we had a gap and thus we lost some packets. Create an event for this. */
2409 if (lost_packets > 1)
2410 GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
2411 seqnum + lost_packets - 1);
2412 else
2413 GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
2414
2415 priv->num_lost += lost_packets;
2416 priv->num_rtx_failed += num_rtx_retry;
2417
2418 next_in_seqnum = (seqnum + lost_packets) & 0xffff;
2419
2420 /* we now only accept seqnum bigger than this */
2421 if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0) {
2422 priv->next_in_seqnum = next_in_seqnum;
2423 priv->last_in_pts = timestamp;
2424 }
2425
2426 /* Avoid creating events if we don't need it. Note that we still need to create
2427 * the lost *ITEM* since it will be used to notify the outgoing thread of
2428 * lost items (so that we can set discont flags and such) */
2429 if (priv->do_lost) {
2430 /* create packet lost event */
2431 if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
2432 duration = priv->packet_spacing;
2433 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
2434 gst_structure_new ("GstRTPPacketLost",
2435 "seqnum", G_TYPE_UINT, (guint) seqnum,
2436 "timestamp", G_TYPE_UINT64, timestamp,
2437 "duration", G_TYPE_UINT64, duration,
2438 "retry", G_TYPE_UINT, num_rtx_retry, NULL));
2439 }
2440 if (rtp_jitter_buffer_append_lost_event (priv->jbuf,
2441 event, seqnum, lost_packets))
2442 JBUF_SIGNAL_EVENT (priv);
2443 }
2444
2445 static void
gst_rtp_jitter_buffer_handle_missing_packets(GstRtpJitterBuffer * jitterbuffer,guint32 missing_seqnum,guint16 current_seqnum,GstClockTime pts,gint gap,GstClockTime now)2446 gst_rtp_jitter_buffer_handle_missing_packets (GstRtpJitterBuffer * jitterbuffer,
2447 guint32 missing_seqnum, guint16 current_seqnum, GstClockTime pts, gint gap,
2448 GstClockTime now)
2449 {
2450 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2451 GstClockTime est_pkt_duration, est_pts;
2452 gboolean equidistant = priv->equidistant > 0;
2453 GstClockTime last_in_pts = priv->last_in_pts;
2454 GstClockTimeDiff offset = timeout_offset (jitterbuffer);
2455 GstClockTime rtx_delay = get_rtx_delay (priv);
2456 guint16 remaining_gap;
2457 GstClockTimeDiff remaining_duration;
2458 GstClockTimeDiff remainder_duration;
2459 guint i;
2460
2461 GST_DEBUG_OBJECT (jitterbuffer,
2462 "Missing packets: (#%u->#%u), gap %d, pts %" GST_TIME_FORMAT
2463 ", last-pts %" GST_TIME_FORMAT,
2464 missing_seqnum, current_seqnum - 1, gap, GST_TIME_ARGS (pts),
2465 GST_TIME_ARGS (last_in_pts));
2466
2467 if (equidistant) {
2468 GstClockTimeDiff total_duration;
2469 gboolean too_late;
2470
2471 /* the total duration spanned by the missing packets */
2472 total_duration = MAX (0, GST_CLOCK_DIFF (last_in_pts, pts));
2473
2474 /* interpolate between the current time and the last time based on
2475 * number of packets we are missing, this is the estimated duration
2476 * for the missing packet based on equidistant packet spacing. */
2477 est_pkt_duration = total_duration / (gap + 1);
2478
2479 /* if we have valid packet-spacing, use that */
2480 if (total_duration > 0 && priv->packet_spacing) {
2481 est_pkt_duration = priv->packet_spacing;
2482 }
2483
2484 est_pts = last_in_pts + est_pkt_duration;
2485 GST_DEBUG_OBJECT (jitterbuffer, "estimated missing packet pts %"
2486 GST_TIME_FORMAT " and duration %" GST_TIME_FORMAT,
2487 GST_TIME_ARGS (est_pts), GST_TIME_ARGS (est_pkt_duration));
2488
2489 /* a packet is considered too late if our estimated pts plus all
2490 applicable offsets are in the past */
2491 too_late = now > (est_pts + offset);
2492
2493 /* Here we optimistically try to save any packets that could potentially
2494 be saved by making sure we create lost/rtx timers for them, and for
2495 the rest that could not possibly be saved, we create a "multi-lost"
2496 event immediately containing the missing duration and sequence numbers */
2497 if (too_late) {
2498 guint lost_packets;
2499 GstClockTime lost_duration;
2500 GstClockTimeDiff gap_time;
2501 guint max_saveable_packets = 0;
2502 GstClockTime max_saveable_duration;
2503 GstClockTime saveable_duration;
2504
2505 /* gap time represents the total duration of all missing packets */
2506 gap_time = MAX (0, GST_CLOCK_DIFF (est_pts, pts));
2507
2508 /* based on the estimated packet duration, we
2509 can figure out how many packets we could possibly save */
2510 if (est_pkt_duration)
2511 max_saveable_packets = offset / est_pkt_duration;
2512
2513 /* and say that the amount of lost packet is the sequence-number
2514 gap minus these saveable packets, but at least 1 */
2515 lost_packets = MAX (1, (gint) gap - (gint) max_saveable_packets);
2516
2517 /* now we know how many packets we can possibly save */
2518 max_saveable_packets = gap - lost_packets;
2519
2520 /* we convert that to time */
2521 max_saveable_duration = max_saveable_packets * est_pkt_duration;
2522
2523 /* determine the actual amount of time we can save */
2524 saveable_duration = MIN (max_saveable_duration, gap_time);
2525
2526 /* and we now have the duration we need to fill */
2527 lost_duration = GST_CLOCK_DIFF (saveable_duration, gap_time);
2528
2529 /* this multi-lost-packet event will be inserted directly into the packet-queue
2530 for immediate processing */
2531 if (lost_packets > 0) {
2532 RtpTimer *timer;
2533 GstClockTime timestamp = apply_offset (jitterbuffer, est_pts);
2534
2535 GST_INFO_OBJECT (jitterbuffer, "lost event for %d packet(s) (#%d->#%d) "
2536 "for duration %" GST_TIME_FORMAT, lost_packets, missing_seqnum,
2537 missing_seqnum + lost_packets - 1, GST_TIME_ARGS (lost_duration));
2538
2539 insert_lost_event (jitterbuffer, missing_seqnum, lost_packets,
2540 timestamp, lost_duration, 0);
2541
2542 timer = rtp_timer_queue_find (priv->timers, missing_seqnum);
2543 if (timer && timer->type != RTP_TIMER_DEADLINE) {
2544 if (timer->queued)
2545 rtp_timer_queue_unschedule (priv->timers, timer);
2546 GST_DEBUG_OBJECT (jitterbuffer, "removing timer for seqnum #%u",
2547 missing_seqnum);
2548 rtp_timer_free (timer);
2549 }
2550
2551 missing_seqnum += lost_packets;
2552 est_pts += lost_duration;
2553 }
2554 }
2555
2556 } else {
2557 /* If we cannot assume equidistant packet spacing, the only thing we now
2558 * for sure is that the missing packets have expected pts not later than
2559 * the last received pts. */
2560 est_pkt_duration = 0;
2561 est_pts = pts;
2562 }
2563
2564 /* Figure out how many more packets we are missing. */
2565 remaining_gap = current_seqnum - missing_seqnum;
2566 /* and how much time these packets represent */
2567 remaining_duration = MAX (0, GST_CLOCK_DIFF (est_pts, pts));
2568 /* Given the calculated packet-duration (packet spacing when equidistant),
2569 the remainder is what we are left with after subtracting the ideal time
2570 for the gap */
2571 remainder_duration =
2572 MAX (0, GST_CLOCK_DIFF (est_pkt_duration * remaining_gap,
2573 remaining_duration));
2574
2575 GST_DEBUG_OBJECT (jitterbuffer, "remaining gap of %u, with "
2576 "duration %" GST_TIME_FORMAT " gives remainder duration %"
2577 GST_STIME_FORMAT, remaining_gap, GST_TIME_ARGS (remaining_duration),
2578 GST_STIME_ARGS (remainder_duration));
2579
2580 for (i = 0; i < remaining_gap; i++) {
2581 GstClockTime duration = est_pkt_duration;
2582 /* we add the remainder on the first packet */
2583 if (i == 0)
2584 duration += remainder_duration;
2585
2586 /* clip duration to what is actually left */
2587 remaining_duration = MAX (0, GST_CLOCK_DIFF (est_pts, pts));
2588 duration = MIN (duration, remaining_duration);
2589
2590 if (priv->do_retransmission) {
2591 RtpTimer *timer = rtp_timer_queue_find (priv->timers, missing_seqnum);
2592
2593 /* if we had a timer for the missing packet, update it. */
2594 if (timer && timer->type == RTP_TIMER_EXPECTED) {
2595 timer->duration = duration;
2596 if (timer->timeout > (est_pts + rtx_delay) && timer->num_rtx_retry == 0) {
2597 rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum,
2598 est_pts, rtx_delay, 0, TRUE);
2599 GST_DEBUG_OBJECT (jitterbuffer, "Update RTX timer(s) #%u, "
2600 "pts %" GST_TIME_FORMAT ", delay %" GST_TIME_FORMAT
2601 ", duration %" GST_TIME_FORMAT,
2602 missing_seqnum, GST_TIME_ARGS (est_pts),
2603 GST_TIME_ARGS (rtx_delay), GST_TIME_ARGS (duration));
2604 }
2605 } else {
2606 GST_DEBUG_OBJECT (jitterbuffer, "Add RTX timer(s) #%u, "
2607 "pts %" GST_TIME_FORMAT ", delay %" GST_TIME_FORMAT
2608 ", duration %" GST_TIME_FORMAT,
2609 missing_seqnum, GST_TIME_ARGS (est_pts),
2610 GST_TIME_ARGS (rtx_delay), GST_TIME_ARGS (duration));
2611 rtp_timer_queue_set_expected (priv->timers, missing_seqnum, est_pts,
2612 rtx_delay, duration);
2613 }
2614 } else {
2615 GST_INFO_OBJECT (jitterbuffer,
2616 "Add Lost timer for #%u, pts %" GST_TIME_FORMAT
2617 ", duration %" GST_TIME_FORMAT ", offset %" GST_STIME_FORMAT,
2618 missing_seqnum, GST_TIME_ARGS (est_pts),
2619 GST_TIME_ARGS (duration), GST_STIME_ARGS (offset));
2620 rtp_timer_queue_set_lost (priv->timers, missing_seqnum, est_pts,
2621 duration, offset);
2622 }
2623
2624 missing_seqnum++;
2625 est_pts += duration;
2626 }
2627 }
2628
2629 static void
calculate_jitter(GstRtpJitterBuffer * jitterbuffer,GstClockTime dts,guint32 rtptime)2630 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2631 guint32 rtptime)
2632 {
2633 gint32 rtpdiff;
2634 GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2635 GstRtpJitterBufferPrivate *priv;
2636
2637 priv = jitterbuffer->priv;
2638
2639 if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2640 goto no_time;
2641
2642 if (priv->last_dts != -1)
2643 dtsdiff = dts - priv->last_dts;
2644 else
2645 dtsdiff = 0;
2646
2647 if (priv->last_rtptime != -1)
2648 rtpdiff = rtptime - (guint32) priv->last_rtptime;
2649 else
2650 rtpdiff = 0;
2651
2652 /* Guess whether stream currently uses equidistant packet spacing. If we
2653 * often see identical timestamps it means the packets are not
2654 * equidistant. */
2655 if (rtptime == priv->last_rtptime)
2656 priv->equidistant -= 2;
2657 else
2658 priv->equidistant += 1;
2659 priv->equidistant = CLAMP (priv->equidistant, -7, 7);
2660
2661 priv->last_dts = dts;
2662 priv->last_rtptime = rtptime;
2663
2664 if (rtpdiff > 0)
2665 rtpdiffns =
2666 gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2667 else
2668 rtpdiffns =
2669 -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2670
2671 diff = ABS (dtsdiff - rtpdiffns);
2672
2673 /* jitter is stored in nanoseconds */
2674 priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2675
2676 GST_LOG_OBJECT (jitterbuffer,
2677 "dtsdiff %" GST_STIME_FORMAT " rtptime %" GST_STIME_FORMAT
2678 ", clock-rate %d, diff %" GST_STIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2679 GST_STIME_ARGS (dtsdiff), GST_STIME_ARGS (rtpdiffns), priv->clock_rate,
2680 GST_STIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2681
2682 return;
2683
2684 /* ERRORS */
2685 no_time:
2686 {
2687 GST_DEBUG_OBJECT (jitterbuffer,
2688 "no dts or no clock-rate, can't calculate jitter");
2689 return;
2690 }
2691 }
2692
2693 static gint
compare_buffer_seqnum(GstBuffer * a,GstBuffer * b,gpointer user_data)2694 compare_buffer_seqnum (GstBuffer * a, GstBuffer * b, gpointer user_data)
2695 {
2696 GstRTPBuffer rtp_a = GST_RTP_BUFFER_INIT;
2697 GstRTPBuffer rtp_b = GST_RTP_BUFFER_INIT;
2698 guint seq_a, seq_b;
2699
2700 gst_rtp_buffer_map (a, GST_MAP_READ, &rtp_a);
2701 seq_a = gst_rtp_buffer_get_seq (&rtp_a);
2702 gst_rtp_buffer_unmap (&rtp_a);
2703
2704 gst_rtp_buffer_map (b, GST_MAP_READ, &rtp_b);
2705 seq_b = gst_rtp_buffer_get_seq (&rtp_b);
2706 gst_rtp_buffer_unmap (&rtp_b);
2707
2708 return gst_rtp_buffer_compare_seqnum (seq_b, seq_a);
2709 }
2710
2711 static gboolean
handle_big_gap_buffer(GstRtpJitterBuffer * jitterbuffer,GstBuffer * buffer,guint8 pt,guint16 seqnum,gint gap,guint max_dropout,guint max_misorder)2712 handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, GstBuffer * buffer,
2713 guint8 pt, guint16 seqnum, gint gap, guint max_dropout, guint max_misorder)
2714 {
2715 GstRtpJitterBufferPrivate *priv;
2716 guint gap_packets_length;
2717 gboolean reset = FALSE;
2718 gboolean future = gap > 0;
2719
2720 priv = jitterbuffer->priv;
2721
2722 if ((gap_packets_length = g_queue_get_length (&priv->gap_packets)) > 0) {
2723 GList *l;
2724 guint32 prev_gap_seq = -1;
2725 gboolean all_consecutive = TRUE;
2726
2727 g_queue_insert_sorted (&priv->gap_packets, buffer,
2728 (GCompareDataFunc) compare_buffer_seqnum, NULL);
2729
2730 for (l = priv->gap_packets.head; l; l = l->next) {
2731 GstBuffer *gap_buffer = l->data;
2732 GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2733 guint32 gap_seq;
2734
2735 gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2736
2737 all_consecutive = (gst_rtp_buffer_get_payload_type (&gap_rtp) == pt);
2738
2739 gap_seq = gst_rtp_buffer_get_seq (&gap_rtp);
2740 if (prev_gap_seq == -1)
2741 prev_gap_seq = gap_seq;
2742 else if (gst_rtp_buffer_compare_seqnum (gap_seq, prev_gap_seq) != -1)
2743 all_consecutive = FALSE;
2744 else
2745 prev_gap_seq = gap_seq;
2746
2747 gst_rtp_buffer_unmap (&gap_rtp);
2748 if (!all_consecutive)
2749 break;
2750 }
2751
2752 if (all_consecutive && gap_packets_length > 3) {
2753 GST_DEBUG_OBJECT (jitterbuffer,
2754 "buffer too %s %d < %d, got 5 consecutive ones - reset",
2755 (future ? "new" : "old"), gap,
2756 (future ? max_dropout : -max_misorder));
2757 reset = TRUE;
2758 } else if (!all_consecutive) {
2759 g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
2760 g_queue_clear (&priv->gap_packets);
2761 GST_DEBUG_OBJECT (jitterbuffer,
2762 "buffer too %s %d < %d, got no 5 consecutive ones - dropping",
2763 (future ? "new" : "old"), gap,
2764 (future ? max_dropout : -max_misorder));
2765 buffer = NULL;
2766 } else {
2767 GST_DEBUG_OBJECT (jitterbuffer,
2768 "buffer too %s %d < %d, got %u consecutive ones - waiting",
2769 (future ? "new" : "old"), gap,
2770 (future ? max_dropout : -max_misorder), gap_packets_length + 1);
2771 buffer = NULL;
2772 }
2773 } else {
2774 GST_DEBUG_OBJECT (jitterbuffer,
2775 "buffer too %s %d < %d, first one - waiting", (future ? "new" : "old"),
2776 gap, -max_misorder);
2777 g_queue_push_tail (&priv->gap_packets, buffer);
2778 buffer = NULL;
2779 }
2780
2781 return reset;
2782 }
2783
2784 static GstClockTime
get_current_running_time(GstRtpJitterBuffer * jitterbuffer)2785 get_current_running_time (GstRtpJitterBuffer * jitterbuffer)
2786 {
2787 GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (jitterbuffer));
2788 GstClockTime running_time = GST_CLOCK_TIME_NONE;
2789
2790 if (clock) {
2791 GstClockTime base_time =
2792 gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer));
2793 GstClockTime clock_time = gst_clock_get_time (clock);
2794
2795 if (clock_time > base_time)
2796 running_time = clock_time - base_time;
2797 else
2798 running_time = 0;
2799
2800 gst_object_unref (clock);
2801 }
2802
2803 return running_time;
2804 }
2805
2806 static GstFlowReturn
gst_rtp_jitter_buffer_reset(GstRtpJitterBuffer * jitterbuffer,GstPad * pad,GstObject * parent,guint16 seqnum)2807 gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer,
2808 GstPad * pad, GstObject * parent, guint16 seqnum)
2809 {
2810 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2811 GstFlowReturn ret = GST_FLOW_OK;
2812 GList *events = NULL, *l;
2813 GList *buffers;
2814
2815 GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2816 rtp_jitter_buffer_flush (priv->jbuf,
2817 (GFunc) free_item_and_retain_sticky_events, &events);
2818 rtp_jitter_buffer_reset_skew (priv->jbuf);
2819 rtp_timer_queue_remove_all (priv->timers);
2820 priv->discont = TRUE;
2821 priv->last_popped_seqnum = -1;
2822
2823 if (priv->gap_packets.head) {
2824 GstBuffer *gap_buffer = priv->gap_packets.head->data;
2825 GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2826
2827 gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2828 priv->next_seqnum = gst_rtp_buffer_get_seq (&gap_rtp);
2829 gst_rtp_buffer_unmap (&gap_rtp);
2830 } else {
2831 priv->next_seqnum = seqnum;
2832 }
2833
2834 priv->last_in_pts = -1;
2835 priv->next_in_seqnum = -1;
2836
2837 /* Insert all sticky events again in order, otherwise we would
2838 * potentially loose STREAM_START, CAPS or SEGMENT events
2839 */
2840 events = g_list_reverse (events);
2841 for (l = events; l; l = l->next) {
2842 rtp_jitter_buffer_append_event (priv->jbuf, l->data);
2843 }
2844 g_list_free (events);
2845
2846 JBUF_SIGNAL_EVENT (priv);
2847
2848 /* reset spacing estimation when gap */
2849 priv->ips_rtptime = -1;
2850 priv->ips_pts = GST_CLOCK_TIME_NONE;
2851
2852 buffers = g_list_copy (priv->gap_packets.head);
2853 g_queue_clear (&priv->gap_packets);
2854
2855 priv->ips_rtptime = -1;
2856 priv->ips_pts = GST_CLOCK_TIME_NONE;
2857 JBUF_UNLOCK (jitterbuffer->priv);
2858
2859 for (l = buffers; l; l = l->next) {
2860 ret = gst_rtp_jitter_buffer_chain (pad, parent, l->data);
2861 l->data = NULL;
2862 if (ret != GST_FLOW_OK) {
2863 l = l->next;
2864 break;
2865 }
2866 }
2867 for (; l; l = l->next)
2868 gst_buffer_unref (l->data);
2869 g_list_free (buffers);
2870
2871 return ret;
2872 }
2873
2874 static gboolean
gst_rtp_jitter_buffer_fast_start(GstRtpJitterBuffer * jitterbuffer)2875 gst_rtp_jitter_buffer_fast_start (GstRtpJitterBuffer * jitterbuffer)
2876 {
2877 GstRtpJitterBufferPrivate *priv;
2878 RTPJitterBufferItem *item;
2879 RtpTimer *timer;
2880
2881 priv = jitterbuffer->priv;
2882
2883 if (priv->faststart_min_packets == 0)
2884 return FALSE;
2885
2886 item = rtp_jitter_buffer_peek (priv->jbuf);
2887 if (!item)
2888 return FALSE;
2889
2890 timer = rtp_timer_queue_find (priv->timers, item->seqnum);
2891 if (!timer || timer->type != RTP_TIMER_DEADLINE)
2892 return FALSE;
2893
2894 if (rtp_jitter_buffer_can_fast_start (priv->jbuf,
2895 priv->faststart_min_packets)) {
2896 GST_INFO_OBJECT (jitterbuffer, "We found %i consecutive packet, start now",
2897 priv->faststart_min_packets);
2898 timer->timeout = -1;
2899 rtp_timer_queue_reschedule (priv->timers, timer);
2900 return TRUE;
2901 }
2902
2903 return FALSE;
2904 }
2905
2906 static GstFlowReturn
gst_rtp_jitter_buffer_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)2907 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2908 GstBuffer * buffer)
2909 {
2910 GstRtpJitterBuffer *jitterbuffer;
2911 GstRtpJitterBufferPrivate *priv;
2912 guint16 seqnum;
2913 guint32 expected, rtptime;
2914 GstFlowReturn ret = GST_FLOW_OK;
2915 GstClockTime now;
2916 GstClockTime dts, pts;
2917 guint64 latency_ts;
2918 gboolean head;
2919 gboolean duplicate;
2920 gint percent = -1;
2921 guint8 pt;
2922 GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2923 gboolean do_next_seqnum = FALSE;
2924 GstMessage *msg = NULL;
2925 GstMessage *drop_msg = NULL;
2926 gboolean estimated_dts = FALSE;
2927 gint32 packet_rate, max_dropout, max_misorder;
2928 RtpTimer *timer = NULL;
2929 gboolean is_rtx;
2930
2931 jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2932
2933 priv = jitterbuffer->priv;
2934
2935 if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2936 goto invalid_buffer;
2937
2938 pt = gst_rtp_buffer_get_payload_type (&rtp);
2939 seqnum = gst_rtp_buffer_get_seq (&rtp);
2940 rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2941 gst_rtp_buffer_unmap (&rtp);
2942
2943 is_rtx = GST_BUFFER_IS_RETRANSMISSION (buffer);
2944 now = get_current_running_time (jitterbuffer);
2945
2946 /* make sure we have PTS and DTS set */
2947 pts = GST_BUFFER_PTS (buffer);
2948 dts = GST_BUFFER_DTS (buffer);
2949 if (dts == -1)
2950 dts = pts;
2951 else if (pts == -1)
2952 pts = dts;
2953
2954 if (dts == -1) {
2955 /* If we have no DTS here, i.e. no capture time, get one from the
2956 * clock now to have something to calculate with in the future. */
2957 dts = now;
2958 pts = dts;
2959
2960 /* Remember that we estimated the DTS if we are running already
2961 * and this is not our first packet (or first packet after a reset).
2962 * If it's the first packet, we somehow must generate a timestamp for
2963 * everything, otherwise we can't calculate any times
2964 */
2965 estimated_dts = (priv->next_in_seqnum != -1);
2966 } else {
2967 /* take the DTS of the buffer. This is the time when the packet was
2968 * received and is used to calculate jitter and clock skew. We will adjust
2969 * this DTS with the smoothed value after processing it in the
2970 * jitterbuffer and assign it as the PTS. */
2971 /* bring to running time */
2972 dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2973 }
2974
2975 GST_DEBUG_OBJECT (jitterbuffer,
2976 "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d, rtx %d",
2977 seqnum, GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer), is_rtx);
2978
2979 JBUF_LOCK_CHECK (priv, out_flushing);
2980
2981 if (G_UNLIKELY (priv->last_pt != pt)) {
2982 GstCaps *caps;
2983
2984 GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2985 pt);
2986
2987 priv->last_pt = pt;
2988 /* reset clock-rate so that we get a new one */
2989 priv->clock_rate = -1;
2990
2991 /* Try to get the clock-rate from the caps first if we can. If there are no
2992 * caps we must fire the signal to get the clock-rate. */
2993 if ((caps = gst_pad_get_current_caps (pad))) {
2994 gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
2995 gst_caps_unref (caps);
2996 }
2997 }
2998
2999 if (G_UNLIKELY (priv->clock_rate == -1)) {
3000 /* no clock rate given on the caps, try to get one with the signal */
3001 if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
3002 pt) == GST_FLOW_FLUSHING)
3003 goto out_flushing;
3004
3005 if (G_UNLIKELY (priv->clock_rate == -1))
3006 goto no_clock_rate;
3007
3008 gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
3009 }
3010
3011 /* don't accept more data on EOS */
3012 if (G_UNLIKELY (priv->eos))
3013 goto have_eos;
3014
3015 if (!is_rtx)
3016 calculate_jitter (jitterbuffer, dts, rtptime);
3017
3018 if (priv->seqnum_base != -1) {
3019 gint gap;
3020
3021 gap = gst_rtp_buffer_compare_seqnum (priv->seqnum_base, seqnum);
3022
3023 if (gap < 0) {
3024 GST_DEBUG_OBJECT (jitterbuffer,
3025 "packet seqnum #%d before seqnum-base #%d", seqnum,
3026 priv->seqnum_base);
3027 gst_buffer_unref (buffer);
3028 goto finished;
3029 } else if (gap > 16384) {
3030 /* From now on don't compare against the seqnum base anymore as
3031 * at some point in the future we will wrap around and also that
3032 * much reordering is very unlikely */
3033 priv->seqnum_base = -1;
3034 }
3035 }
3036
3037 expected = priv->next_in_seqnum;
3038
3039 /* don't update packet-rate based on RTX, as those arrive highly unregularly */
3040 if (!is_rtx) {
3041 packet_rate = gst_rtp_packet_rate_ctx_update (&priv->packet_rate_ctx,
3042 seqnum, rtptime);
3043 GST_TRACE_OBJECT (jitterbuffer, "updated packet_rate: %d", packet_rate);
3044 }
3045 max_dropout =
3046 gst_rtp_packet_rate_ctx_get_max_dropout (&priv->packet_rate_ctx,
3047 priv->max_dropout_time);
3048 max_misorder =
3049 gst_rtp_packet_rate_ctx_get_max_misorder (&priv->packet_rate_ctx,
3050 priv->max_misorder_time);
3051 GST_TRACE_OBJECT (jitterbuffer, "max_dropout: %d, max_misorder: %d",
3052 max_dropout, max_misorder);
3053
3054 timer = rtp_timer_queue_find (priv->timers, seqnum);
3055 if (is_rtx) {
3056 if (G_UNLIKELY (!priv->do_retransmission))
3057 goto unsolicited_rtx;
3058
3059 if (!timer)
3060 timer = rtp_timer_queue_find (priv->rtx_stats_timers, seqnum);
3061
3062 /* If the first buffer is an (old) rtx, e.g. from before a reset, or
3063 * already lost, ignore it */
3064 if (!timer || expected == -1)
3065 goto unsolicited_rtx;
3066 }
3067
3068 /* now check against our expected seqnum */
3069 if (G_UNLIKELY (expected == -1)) {
3070 GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
3071
3072 /* calculate a pts based on rtptime and arrival time (dts) */
3073 pts =
3074 rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
3075 rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)),
3076 0, FALSE);
3077
3078 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (pts))) {
3079 /* A valid timestamp cannot be calculated, discard packet */
3080 goto discard_invalid;
3081 }
3082
3083 /* we don't know what the next_in_seqnum should be, wait for the last
3084 * possible moment to push this buffer, maybe we get an earlier seqnum
3085 * while we wait */
3086 rtp_timer_queue_set_deadline (priv->timers, seqnum, pts,
3087 timeout_offset (jitterbuffer));
3088
3089 do_next_seqnum = TRUE;
3090 /* take rtptime and pts to calculate packet spacing */
3091 priv->ips_rtptime = rtptime;
3092 priv->ips_pts = pts;
3093
3094 } else {
3095 gint gap;
3096 /* now calculate gap */
3097 gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
3098 GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
3099 expected, seqnum, gap);
3100
3101 if (G_UNLIKELY (gap > 0 &&
3102 rtp_timer_queue_length (priv->timers) >= max_dropout)) {
3103 /* If we have timers for more than RTP_MAX_DROPOUT packets
3104 * pending this means that we have a huge gap overall. We can
3105 * reset the jitterbuffer at this point because there's
3106 * just too much data missing to be able to do anything
3107 * sensible with the past data. Just try again from the
3108 * next packet */
3109 GST_WARNING_OBJECT (jitterbuffer, "%d pending timers > %d - resetting",
3110 rtp_timer_queue_length (priv->timers), max_dropout);
3111 g_queue_insert_sorted (&priv->gap_packets, buffer,
3112 (GCompareDataFunc) compare_buffer_seqnum, NULL);
3113 return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
3114 }
3115
3116 /* Special handling of large gaps */
3117 if (!is_rtx && ((gap != -1 && gap < -max_misorder) || (gap >= max_dropout))) {
3118 gboolean reset = handle_big_gap_buffer (jitterbuffer, buffer, pt, seqnum,
3119 gap, max_dropout, max_misorder);
3120 if (reset) {
3121 return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
3122 } else {
3123 GST_DEBUG_OBJECT (jitterbuffer,
3124 "Had big gap, waiting for more consecutive packets");
3125 goto finished;
3126 }
3127 }
3128
3129 /* We had no huge gap, let's drop all the gap packets */
3130 GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets");
3131 g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
3132 g_queue_clear (&priv->gap_packets);
3133
3134 /* calculate a pts based on rtptime and arrival time (dts) */
3135 /* If we estimated the DTS, don't consider it in the clock skew calculations */
3136 pts =
3137 rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
3138 rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)),
3139 gap, is_rtx);
3140
3141 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (pts))) {
3142 /* A valid timestamp cannot be calculated, discard packet */
3143 goto discard_invalid;
3144 }
3145
3146 if (G_LIKELY (gap == 0)) {
3147 /* packet is expected */
3148 calculate_packet_spacing (jitterbuffer, rtptime, pts);
3149 do_next_seqnum = TRUE;
3150 } else {
3151
3152 /* we have a gap */
3153 if (gap > 0) {
3154 GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
3155 /* fill in the gap with EXPECTED timers */
3156 gst_rtp_jitter_buffer_handle_missing_packets (jitterbuffer, expected,
3157 seqnum, pts, gap, now);
3158 do_next_seqnum = TRUE;
3159 } else {
3160 GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
3161 do_next_seqnum = FALSE;
3162 }
3163
3164 /* reset spacing estimation when gap */
3165 priv->ips_rtptime = -1;
3166 priv->ips_pts = GST_CLOCK_TIME_NONE;
3167 }
3168 }
3169
3170 if (do_next_seqnum) {
3171 priv->last_in_pts = pts;
3172 priv->next_in_seqnum = (seqnum + 1) & 0xffff;
3173 }
3174
3175 if (is_rtx)
3176 timer->num_rtx_received++;
3177
3178 /* At 2^15, we would detect a seqnum rollover too early, therefore
3179 * limit the queue size. But let's not limit it to a number that is
3180 * too small to avoid emptying it needlessly if there is a spurious huge
3181 * sequence number, let's allow at least 10k packets in any case. */
3182 while (rtp_jitter_buffer_is_full (priv->jbuf) &&
3183 priv->srcresult == GST_FLOW_OK) {
3184 RtpTimer *timer = rtp_timer_queue_peek_earliest (priv->timers);
3185 while (timer) {
3186 timer->timeout = -1;
3187 if (timer->type == RTP_TIMER_DEADLINE)
3188 break;
3189 timer = rtp_timer_get_next (timer);
3190 }
3191
3192 update_current_timer (jitterbuffer);
3193 JBUF_WAIT_QUEUE (priv);
3194 if (priv->srcresult != GST_FLOW_OK)
3195 goto out_flushing;
3196 }
3197
3198 /* let's check if this buffer is too late, we can only accept packets with
3199 * bigger seqnum than the one we last pushed. */
3200 if (G_LIKELY (priv->last_popped_seqnum != -1)) {
3201 gint gap;
3202
3203 gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
3204
3205 /* priv->last_popped_seqnum >= seqnum, we're too late. */
3206 if (G_UNLIKELY (gap <= 0)) {
3207 if (priv->do_retransmission) {
3208 if (is_rtx && timer) {
3209 update_rtx_stats (jitterbuffer, timer, dts, FALSE);
3210 /* Only count the retranmitted packet too late if it has been
3211 * considered lost. If the original packet arrived before the
3212 * retransmitted we just count it as a duplicate. */
3213 if (timer->type != RTP_TIMER_LOST)
3214 goto rtx_duplicate;
3215 }
3216 }
3217 goto too_late;
3218 }
3219 }
3220
3221 /* let's drop oldest packet if the queue is already full and drop-on-latency
3222 * is set. We can only do this when there actually is a latency. When no
3223 * latency is set, we just pump it in the queue and let the other end push it
3224 * out as fast as possible. */
3225 if (priv->latency_ms && priv->drop_on_latency) {
3226 latency_ts =
3227 gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
3228
3229 if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
3230 RTPJitterBufferItem *old_item;
3231
3232 old_item = rtp_jitter_buffer_peek (priv->jbuf);
3233
3234 if (IS_DROPABLE (old_item)) {
3235 old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3236 GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
3237 old_item);
3238 priv->next_seqnum = (old_item->seqnum + old_item->count) & 0xffff;
3239 if (priv->post_drop_messages) {
3240 drop_msg =
3241 new_drop_message (jitterbuffer, old_item->seqnum, old_item->pts,
3242 REASON_DROP_ON_LATENCY);
3243 }
3244 rtp_jitter_buffer_free_item (old_item);
3245 }
3246 /* we might have removed some head buffers, signal the pushing thread to
3247 * see if it can push now */
3248 JBUF_SIGNAL_EVENT (priv);
3249 }
3250 }
3251
3252 /* If we estimated the DTS, don't consider it in the clock skew calculations
3253 * later. The code above always sets dts to pts or the other way around if
3254 * any of those is valid in the buffer, so we know that if we estimated the
3255 * dts that both are unknown */
3256 head = rtp_jitter_buffer_append_buffer (priv->jbuf, buffer,
3257 estimated_dts ? GST_CLOCK_TIME_NONE : dts, pts, seqnum, rtptime,
3258 &duplicate, &percent);
3259
3260 /* now insert the packet into the queue in sorted order. This function returns
3261 * FALSE if a packet with the same seqnum was already in the queue, meaning we
3262 * have a duplicate. */
3263 if (G_UNLIKELY (duplicate)) {
3264 if (is_rtx && timer)
3265 update_rtx_stats (jitterbuffer, timer, dts, FALSE);
3266 goto duplicate;
3267 }
3268
3269 /* Trigger fast start if needed */
3270 if (gst_rtp_jitter_buffer_fast_start (jitterbuffer))
3271 head = TRUE;
3272
3273 /* update rtx timers */
3274 if (priv->do_retransmission)
3275 update_rtx_timers (jitterbuffer, seqnum, dts, pts, do_next_seqnum, is_rtx,
3276 timer);
3277
3278 /* we had an unhandled SR, handle it now */
3279 if (priv->last_sr)
3280 do_handle_sync (jitterbuffer);
3281
3282 if (G_UNLIKELY (head)) {
3283 /* signal addition of new buffer when the _loop is waiting. */
3284 if (G_LIKELY (priv->active))
3285 JBUF_SIGNAL_EVENT (priv);
3286 }
3287
3288 GST_DEBUG_OBJECT (jitterbuffer,
3289 "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
3290 rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
3291
3292 msg = check_buffering_percent (jitterbuffer, percent);
3293
3294 finished:
3295 update_current_timer (jitterbuffer);
3296 JBUF_UNLOCK (priv);
3297
3298 if (msg)
3299 gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3300 if (drop_msg)
3301 gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), drop_msg);
3302
3303 return ret;
3304
3305 /* ERRORS */
3306 invalid_buffer:
3307 {
3308 /* this is not fatal but should be filtered earlier */
3309 GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3310 ("Received invalid RTP payload, dropping"));
3311 gst_buffer_unref (buffer);
3312 return GST_FLOW_OK;
3313 }
3314 no_clock_rate:
3315 {
3316 GST_WARNING_OBJECT (jitterbuffer,
3317 "No clock-rate in caps!, dropping buffer");
3318 gst_buffer_unref (buffer);
3319 goto finished;
3320 }
3321 out_flushing:
3322 {
3323 ret = priv->srcresult;
3324 GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
3325 gst_buffer_unref (buffer);
3326 goto finished;
3327 }
3328 have_eos:
3329 {
3330 ret = GST_FLOW_EOS;
3331 GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
3332 gst_buffer_unref (buffer);
3333 goto finished;
3334 }
3335 too_late:
3336 {
3337 GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
3338 " popped, dropping", seqnum, priv->last_popped_seqnum);
3339 priv->num_late++;
3340 if (priv->post_drop_messages) {
3341 drop_msg = new_drop_message (jitterbuffer, seqnum, pts, REASON_TOO_LATE);
3342 }
3343 gst_buffer_unref (buffer);
3344 goto finished;
3345 }
3346 duplicate:
3347 {
3348 GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
3349 seqnum);
3350 priv->num_duplicates++;
3351 goto finished;
3352 }
3353 rtx_duplicate:
3354 {
3355 GST_DEBUG_OBJECT (jitterbuffer,
3356 "Duplicate RTX packet #%d detected, dropping", seqnum);
3357 priv->num_duplicates++;
3358 gst_buffer_unref (buffer);
3359 goto finished;
3360 }
3361 unsolicited_rtx:
3362 {
3363 GST_DEBUG_OBJECT (jitterbuffer,
3364 "Unsolicited RTX packet #%d detected, dropping", seqnum);
3365 gst_buffer_unref (buffer);
3366 goto finished;
3367 }
3368 discard_invalid:
3369 {
3370 GST_DEBUG_OBJECT (jitterbuffer,
3371 "cannot calculate a valid pts for #%d (rtx: %d), discard",
3372 seqnum, is_rtx);
3373 gst_buffer_unref (buffer);
3374 goto finished;
3375 }
3376 }
3377
3378 /* FIXME: hopefully we can do something more efficient here, especially when
3379 * all packets are in order and/or outside of the currently cached range.
3380 * Still worthwhile to have it, avoids taking/releasing object lock and pad
3381 * stream lock for every single buffer in the default chain_list fallback. */
3382 static GstFlowReturn
gst_rtp_jitter_buffer_chain_list(GstPad * pad,GstObject * parent,GstBufferList * buffer_list)3383 gst_rtp_jitter_buffer_chain_list (GstPad * pad, GstObject * parent,
3384 GstBufferList * buffer_list)
3385 {
3386 GstFlowReturn flow_ret = GST_FLOW_OK;
3387 guint i, n;
3388
3389 n = gst_buffer_list_length (buffer_list);
3390 for (i = 0; i < n; ++i) {
3391 GstBuffer *buf = gst_buffer_list_get (buffer_list, i);
3392
3393 flow_ret = gst_rtp_jitter_buffer_chain (pad, parent, gst_buffer_ref (buf));
3394
3395 if (flow_ret != GST_FLOW_OK)
3396 break;
3397 }
3398 gst_buffer_list_unref (buffer_list);
3399
3400 return flow_ret;
3401 }
3402
3403 static GstClockTime
compute_elapsed(GstRtpJitterBuffer * jitterbuffer,RTPJitterBufferItem * item)3404 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
3405 {
3406 guint64 ext_time, elapsed;
3407 guint32 rtp_time;
3408 GstRtpJitterBufferPrivate *priv;
3409
3410 priv = jitterbuffer->priv;
3411 rtp_time = item->rtptime;
3412
3413 GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
3414 G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
3415
3416 ext_time = priv->ext_timestamp;
3417 ext_time = gst_rtp_buffer_ext_timestamp (&ext_time, rtp_time);
3418 if (ext_time < priv->ext_timestamp) {
3419 ext_time = priv->ext_timestamp;
3420 } else {
3421 priv->ext_timestamp = ext_time;
3422 }
3423
3424 if (ext_time > priv->clock_base)
3425 elapsed = ext_time - priv->clock_base;
3426 else
3427 elapsed = 0;
3428
3429 elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
3430 return elapsed;
3431 }
3432
3433 static void
update_estimated_eos(GstRtpJitterBuffer * jitterbuffer,RTPJitterBufferItem * item)3434 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
3435 RTPJitterBufferItem * item)
3436 {
3437 guint64 total, elapsed, left, estimated;
3438 GstClockTime out_time;
3439 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3440
3441 if (priv->npt_stop == -1 || priv->ext_timestamp == -1
3442 || priv->clock_base == -1 || priv->clock_rate <= 0)
3443 return;
3444
3445 /* compute the elapsed time */
3446 elapsed = compute_elapsed (jitterbuffer, item);
3447
3448 /* do nothing if elapsed time doesn't increment */
3449 if (priv->last_elapsed && elapsed <= priv->last_elapsed)
3450 return;
3451
3452 priv->last_elapsed = elapsed;
3453
3454 /* this is the total time we need to play */
3455 total = priv->npt_stop - priv->npt_start;
3456 GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
3457 GST_TIME_ARGS (total));
3458
3459 /* this is how much time there is left */
3460 if (total > elapsed)
3461 left = total - elapsed;
3462 else
3463 left = 0;
3464
3465 /* if we have less time left that the size of the buffer, we will not
3466 * be able to keep it filled, disabled buffering then */
3467 if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
3468 GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
3469 ", disable buffering close to EOS", GST_TIME_ARGS (left));
3470 rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
3471 }
3472
3473 /* this is the current time as running-time */
3474 out_time = item->pts;
3475
3476 if (elapsed > 0)
3477 estimated = gst_util_uint64_scale (out_time, total, elapsed);
3478 else {
3479 /* if there is almost nothing left,
3480 * we may never advance enough to end up in the above case */
3481 if (total < GST_SECOND)
3482 estimated = GST_SECOND;
3483 else
3484 estimated = -1;
3485 }
3486 GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
3487 GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
3488
3489 if (estimated != -1 && priv->estimated_eos != estimated) {
3490 rtp_timer_queue_set_eos (priv->timers, estimated,
3491 timeout_offset (jitterbuffer));
3492 priv->estimated_eos = estimated;
3493 }
3494 }
3495
3496 /* take a buffer from the queue and push it */
3497 static GstFlowReturn
pop_and_push_next(GstRtpJitterBuffer * jitterbuffer,guint seqnum)3498 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
3499 {
3500 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3501 GstFlowReturn result = GST_FLOW_OK;
3502 RTPJitterBufferItem *item;
3503 GstBuffer *outbuf = NULL;
3504 GstEvent *outevent = NULL;
3505 GstQuery *outquery = NULL;
3506 GstClockTime dts, pts;
3507 gint percent = -1;
3508 gboolean do_push = TRUE;
3509 guint type;
3510 GstMessage *msg;
3511
3512 /* when we get here we are ready to pop and push the buffer */
3513 item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3514 type = item->type;
3515
3516 switch (type) {
3517 case ITEM_TYPE_BUFFER:
3518
3519 /* we need to make writable to change the flags and timestamps */
3520 outbuf = gst_buffer_make_writable (item->data);
3521
3522 if (G_UNLIKELY (priv->discont)) {
3523 /* set DISCONT flag when we missed a packet. We pushed the buffer writable
3524 * into the jitterbuffer so we can modify now. */
3525 GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
3526 GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
3527 priv->discont = FALSE;
3528 }
3529 if (G_UNLIKELY (priv->ts_discont)) {
3530 GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
3531 priv->ts_discont = FALSE;
3532 }
3533
3534 dts =
3535 gst_segment_position_from_running_time (&priv->segment,
3536 GST_FORMAT_TIME, item->dts);
3537 pts =
3538 gst_segment_position_from_running_time (&priv->segment,
3539 GST_FORMAT_TIME, item->pts);
3540
3541 /* if this is a new frame, check if ts_offset needs to be updated */
3542 if (pts != priv->last_pts) {
3543 update_offset (jitterbuffer);
3544 }
3545
3546 /* apply timestamp with offset to buffer now */
3547 GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
3548 GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
3549
3550 /* update the elapsed time when we need to check against the npt stop time. */
3551 update_estimated_eos (jitterbuffer, item);
3552
3553 priv->last_pts = pts;
3554 priv->last_out_time = GST_BUFFER_PTS (outbuf);
3555 break;
3556 case ITEM_TYPE_LOST:
3557 priv->discont = TRUE;
3558 if (!priv->do_lost)
3559 do_push = FALSE;
3560 /* FALLTHROUGH */
3561 case ITEM_TYPE_EVENT:
3562 outevent = item->data;
3563 break;
3564 case ITEM_TYPE_QUERY:
3565 outquery = item->data;
3566 break;
3567 }
3568
3569 /* now we are ready to push the buffer. Save the seqnum and release the lock
3570 * so the other end can push stuff in the queue again. */
3571 if (seqnum != -1) {
3572 priv->last_popped_seqnum = seqnum;
3573 priv->next_seqnum = (seqnum + item->count) & 0xffff;
3574 }
3575 msg = check_buffering_percent (jitterbuffer, percent);
3576
3577 if (type == ITEM_TYPE_EVENT && outevent &&
3578 GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
3579 g_assert (priv->eos);
3580 while (rtp_timer_queue_length (priv->timers) > 0) {
3581 /* Stopping timers */
3582 unschedule_current_timer (jitterbuffer);
3583 JBUF_WAIT_TIMER (priv);
3584 }
3585 }
3586
3587 JBUF_UNLOCK (priv);
3588
3589 item->data = NULL;
3590 rtp_jitter_buffer_free_item (item);
3591
3592 if (msg)
3593 gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3594
3595 switch (type) {
3596 case ITEM_TYPE_BUFFER:
3597 /* push buffer */
3598 GST_DEBUG_OBJECT (jitterbuffer,
3599 "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
3600 seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
3601 GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
3602 priv->num_pushed++;
3603 GST_BUFFER_DTS (outbuf) = GST_CLOCK_TIME_NONE;
3604 result = gst_pad_push (priv->srcpad, outbuf);
3605
3606 JBUF_LOCK_CHECK (priv, out_flushing);
3607 break;
3608 case ITEM_TYPE_LOST:
3609 case ITEM_TYPE_EVENT:
3610 /* We got not enough consecutive packets with a huge gap, we can
3611 * as well just drop them here now on EOS */
3612 if (outevent && GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
3613 GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets on EOS");
3614 g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
3615 g_queue_clear (&priv->gap_packets);
3616 }
3617
3618 GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
3619 ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
3620
3621 if (do_push)
3622 gst_pad_push_event (priv->srcpad, outevent);
3623 else if (outevent)
3624 gst_event_unref (outevent);
3625
3626 result = GST_FLOW_OK;
3627
3628 JBUF_LOCK_CHECK (priv, out_flushing);
3629 break;
3630 case ITEM_TYPE_QUERY:
3631 {
3632 gboolean res;
3633
3634 res = gst_pad_peer_query (priv->srcpad, outquery);
3635
3636 JBUF_LOCK_CHECK (priv, out_flushing);
3637 result = GST_FLOW_OK;
3638 GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
3639 JBUF_SIGNAL_QUERY (priv, res);
3640 break;
3641 }
3642 }
3643 return result;
3644
3645 /* ERRORS */
3646 out_flushing:
3647 {
3648 return priv->srcresult;
3649 }
3650 }
3651
3652 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
3653
3654 /* Peek a buffer and compare the seqnum to the expected seqnum.
3655 * If all is fine, the buffer is pushed.
3656 * If something is wrong, we wait for some event
3657 */
3658 static GstFlowReturn
handle_next_buffer(GstRtpJitterBuffer * jitterbuffer)3659 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
3660 {
3661 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3662 GstFlowReturn result;
3663 RTPJitterBufferItem *item;
3664 guint seqnum;
3665 guint32 next_seqnum;
3666
3667 /* only push buffers when PLAYING and active and not buffering */
3668 if (priv->blocked || !priv->active ||
3669 rtp_jitter_buffer_is_buffering (priv->jbuf)) {
3670 return GST_FLOW_WAIT;
3671 }
3672
3673 /* peek a buffer, we're just looking at the sequence number.
3674 * If all is fine, we'll pop and push it. If the sequence number is wrong we
3675 * wait for a timeout or something to change.
3676 * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
3677 item = rtp_jitter_buffer_peek (priv->jbuf);
3678 if (item == NULL) {
3679 goto wait;
3680 }
3681
3682 /* get the seqnum and the next expected seqnum */
3683 seqnum = item->seqnum;
3684 if (seqnum == -1) {
3685 return pop_and_push_next (jitterbuffer, seqnum);
3686 }
3687
3688 next_seqnum = priv->next_seqnum;
3689
3690 /* get the gap between this and the previous packet. If we don't know the
3691 * previous packet seqnum assume no gap. */
3692 if (G_UNLIKELY (next_seqnum == -1)) {
3693 GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
3694 /* we don't know what the next_seqnum should be, the chain function should
3695 * have scheduled a DEADLINE timer that will increment next_seqnum when it
3696 * fires, so wait for that */
3697 result = GST_FLOW_WAIT;
3698 } else {
3699 gint gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
3700
3701 if (G_LIKELY (gap == 0)) {
3702 /* no missing packet, pop and push */
3703 result = pop_and_push_next (jitterbuffer, seqnum);
3704 } else if (G_UNLIKELY (gap < 0)) {
3705 /* if we have a packet that we already pushed or considered dropped, pop it
3706 * off and get the next packet */
3707 GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
3708 seqnum, next_seqnum);
3709 item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
3710 rtp_jitter_buffer_free_item (item);
3711 result = GST_FLOW_OK;
3712 } else {
3713 /* the chain function has scheduled timers to request retransmission or
3714 * when to consider the packet lost, wait for that */
3715 GST_DEBUG_OBJECT (jitterbuffer,
3716 "Sequence number GAP detected: expected %d instead of %d (%d missing)",
3717 next_seqnum, seqnum, gap);
3718 /* if we have reached EOS, just keep processing */
3719 /* Also do the same if we block input because the JB is full */
3720 if (priv->eos || rtp_jitter_buffer_is_full (priv->jbuf)) {
3721 result = pop_and_push_next (jitterbuffer, seqnum);
3722 result = GST_FLOW_OK;
3723 } else {
3724 result = GST_FLOW_WAIT;
3725 }
3726 }
3727 }
3728
3729 return result;
3730
3731 wait:
3732 {
3733 GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
3734 if (priv->eos) {
3735 return GST_FLOW_EOS;
3736 } else {
3737 return GST_FLOW_WAIT;
3738 }
3739 }
3740 }
3741
3742 static GstClockTime
get_rtx_retry_timeout(GstRtpJitterBufferPrivate * priv)3743 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
3744 {
3745 GstClockTime rtx_retry_timeout;
3746 GstClockTime rtx_min_retry_timeout;
3747
3748 if (priv->rtx_retry_timeout == -1) {
3749 if (priv->avg_rtx_rtt == 0)
3750 rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
3751 else
3752 /* we want to ask for a retransmission after we waited for a
3753 * complete RTT and the additional jitter */
3754 rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
3755 } else {
3756 rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
3757 }
3758 /* make sure we don't retry too often. On very low latency networks,
3759 * the RTT and jitter can be very low. */
3760 if (priv->rtx_min_retry_timeout == -1) {
3761 rtx_min_retry_timeout = priv->packet_spacing;
3762 } else {
3763 rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
3764 }
3765 rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
3766
3767 return rtx_retry_timeout;
3768 }
3769
3770 static GstClockTime
get_rtx_retry_period(GstRtpJitterBufferPrivate * priv,GstClockTime rtx_retry_timeout)3771 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
3772 GstClockTime rtx_retry_timeout)
3773 {
3774 GstClockTime rtx_retry_period;
3775
3776 if (priv->rtx_retry_period == -1) {
3777 /* we retry up to the configured jitterbuffer size but leaving some
3778 * room for the retransmission to arrive in time */
3779 if (rtx_retry_timeout > priv->latency_ns) {
3780 rtx_retry_period = 0;
3781 } else {
3782 rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
3783 }
3784 } else {
3785 rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
3786 }
3787 return rtx_retry_period;
3788 }
3789
3790 /*
3791 1. For *larger* rtx-rtt, weigh a new measurement as before (1/8th)
3792 2. For *smaller* rtx-rtt, be a bit more conservative and weigh a bit less (1/16th)
3793 3. For very large measurements (> avg * 2), consider them "outliers"
3794 and count them a lot less (1/48th)
3795 */
3796 static void
update_avg_rtx_rtt(GstRtpJitterBufferPrivate * priv,GstClockTime rtt)3797 update_avg_rtx_rtt (GstRtpJitterBufferPrivate * priv, GstClockTime rtt)
3798 {
3799 gint weight;
3800
3801 if (priv->avg_rtx_rtt == 0) {
3802 priv->avg_rtx_rtt = rtt;
3803 return;
3804 }
3805
3806 if (rtt > 2 * priv->avg_rtx_rtt)
3807 weight = 48;
3808 else if (rtt > priv->avg_rtx_rtt)
3809 weight = 8;
3810 else
3811 weight = 16;
3812
3813 priv->avg_rtx_rtt = (rtt + (weight - 1) * priv->avg_rtx_rtt) / weight;
3814 }
3815
3816 static void
update_rtx_stats(GstRtpJitterBuffer * jitterbuffer,const RtpTimer * timer,GstClockTime dts,gboolean success)3817 update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, const RtpTimer * timer,
3818 GstClockTime dts, gboolean success)
3819 {
3820 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3821 GstClockTime delay;
3822
3823 if (success) {
3824 /* we scheduled a retry for this packet and now we have it */
3825 priv->num_rtx_success++;
3826 /* all the previous retry attempts failed */
3827 priv->num_rtx_failed += timer->num_rtx_retry - 1;
3828 } else {
3829 /* All retries failed or was too late */
3830 priv->num_rtx_failed += timer->num_rtx_retry;
3831 }
3832
3833 /* number of retries before (hopefully) receiving the packet */
3834 if (priv->avg_rtx_num == 0.0)
3835 priv->avg_rtx_num = timer->num_rtx_retry;
3836 else
3837 priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
3838
3839 /* Calculate the delay between retransmission request and receiving this
3840 * packet. We have a valid delay if and only if this packet is a response to
3841 * our last request. If not we don't know if this is a response to an
3842 * earlier request and delay could be way off. For RTT is more important
3843 * with correct values than to update for every packet. */
3844 if (timer->num_rtx_retry == timer->num_rtx_received &&
3845 dts != GST_CLOCK_TIME_NONE && dts > timer->rtx_last) {
3846 delay = dts - timer->rtx_last;
3847 update_avg_rtx_rtt (priv, delay);
3848 } else {
3849 delay = 0;
3850 }
3851
3852 GST_LOG_OBJECT (jitterbuffer,
3853 "RTX #%d, result %d, success %" G_GUINT64_FORMAT ", failed %"
3854 G_GUINT64_FORMAT ", requests %" G_GUINT64_FORMAT ", dups %"
3855 G_GUINT64_FORMAT ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %"
3856 GST_TIME_FORMAT, timer->seqnum, success, priv->num_rtx_success,
3857 priv->num_rtx_failed, priv->num_rtx_requests, priv->num_duplicates,
3858 priv->avg_rtx_num, GST_TIME_ARGS (delay),
3859 GST_TIME_ARGS (priv->avg_rtx_rtt));
3860 }
3861
3862 /* the timeout for when we expected a packet expired */
3863 static gboolean
do_expected_timeout(GstRtpJitterBuffer * jitterbuffer,RtpTimer * timer,GstClockTime now,GQueue * events)3864 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
3865 GstClockTime now, GQueue * events)
3866 {
3867 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3868 GstEvent *event;
3869 guint delay, delay_ms, avg_rtx_rtt_ms;
3870 guint rtx_retry_timeout_ms, rtx_retry_period_ms;
3871 guint rtx_deadline_ms;
3872 GstClockTime rtx_retry_period;
3873 GstClockTime rtx_retry_timeout;
3874 GstClock *clock;
3875 GstClockTimeDiff offset = 0;
3876 GstClockTime timeout;
3877
3878 GST_DEBUG_OBJECT (jitterbuffer, "expected #%d didn't arrive, now %"
3879 GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
3880
3881 rtx_retry_timeout = get_rtx_retry_timeout (priv);
3882 rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
3883
3884 /* delay expresses how late this packet is currently */
3885 delay = now - timer->rtx_base;
3886
3887 delay_ms = GST_TIME_AS_MSECONDS (delay);
3888 rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
3889 rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
3890 avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
3891 rtx_deadline_ms =
3892 priv->rtx_deadline_ms != -1 ? priv->rtx_deadline_ms : priv->latency_ms;
3893
3894 event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
3895 gst_structure_new ("GstRTPRetransmissionRequest",
3896 "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
3897 "running-time", G_TYPE_UINT64, timer->rtx_base,
3898 "delay", G_TYPE_UINT, delay_ms,
3899 "retry", G_TYPE_UINT, timer->num_rtx_retry,
3900 "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
3901 "period", G_TYPE_UINT, rtx_retry_period_ms,
3902 "deadline", G_TYPE_UINT, rtx_deadline_ms,
3903 "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
3904 "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
3905 g_queue_push_tail (events, event);
3906 GST_DEBUG_OBJECT (jitterbuffer, "Request RTX: %" GST_PTR_FORMAT, event);
3907
3908 priv->num_rtx_requests++;
3909 timer->num_rtx_retry++;
3910
3911 GST_OBJECT_LOCK (jitterbuffer);
3912 if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
3913 timer->rtx_last = gst_clock_get_time (clock);
3914 timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
3915 } else {
3916 timer->rtx_last = now;
3917 }
3918 GST_OBJECT_UNLOCK (jitterbuffer);
3919
3920 /*
3921 Calculate the timeout for the next retransmission attempt:
3922 We have just successfully sent one RTX request, and we need to
3923 find out when to schedule the next one.
3924
3925 The rtx_retry_timeout tells us the logical timeout between RTX
3926 requests based on things like round-trip time, jitter and packet spacing,
3927 and is how long we are going to wait before attempting another RTX packet
3928 */
3929 timeout = timer->rtx_last + rtx_retry_timeout;
3930 GST_DEBUG_OBJECT (jitterbuffer,
3931 "timer #%i new timeout %" GST_TIME_FORMAT ", rtx retry timeout %"
3932 GST_TIME_FORMAT ", num_retry %u", timer->seqnum, GST_TIME_ARGS (timeout),
3933 GST_TIME_ARGS (rtx_retry_timeout), timer->num_rtx_retry);
3934 if ((priv->rtx_max_retries != -1
3935 && timer->num_rtx_retry >= priv->rtx_max_retries)
3936 || (timeout > timer->rtx_base + rtx_retry_period)) {
3937 /* too many retransmission request, we now convert the timer
3938 * to a lost timer, leave the num_rtx_retry as it is for stats */
3939 timer->type = RTP_TIMER_LOST;
3940 timeout = timer->rtx_base;
3941 offset = timeout_offset (jitterbuffer);
3942 GST_DEBUG_OBJECT (jitterbuffer, "reschedule #%i as LOST timer for %"
3943 GST_TIME_FORMAT, timer->seqnum,
3944 GST_TIME_ARGS (timer->rtx_base + offset));
3945 }
3946 rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum,
3947 timeout, 0, offset, FALSE);
3948
3949 return FALSE;
3950 }
3951
3952 /* a packet is lost */
3953 static gboolean
do_lost_timeout(GstRtpJitterBuffer * jitterbuffer,RtpTimer * timer,GstClockTime now)3954 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
3955 GstClockTime now)
3956 {
3957 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3958 GstClockTime timestamp;
3959
3960 timestamp = apply_offset (jitterbuffer, get_pts_timeout (timer));
3961 insert_lost_event (jitterbuffer, timer->seqnum, 1, timestamp,
3962 timer->duration, timer->num_rtx_retry);
3963
3964 if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) {
3965 /* Store info to update stats if the packet arrives too late */
3966 timer->timeout = now + priv->rtx_stats_timeout * GST_MSECOND;
3967 timer->type = RTP_TIMER_LOST;
3968 rtp_timer_queue_insert (priv->rtx_stats_timers, timer);
3969 } else {
3970 rtp_timer_free (timer);
3971 }
3972
3973 return TRUE;
3974 }
3975
3976 static gboolean
do_eos_timeout(GstRtpJitterBuffer * jitterbuffer,RtpTimer * timer,GstClockTime now)3977 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
3978 GstClockTime now)
3979 {
3980 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3981
3982 GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
3983 rtp_timer_free (timer);
3984 if (!priv->eos) {
3985 GstEvent *event;
3986
3987 /* there was no EOS in the buffer, put one in there now */
3988 event = gst_event_new_eos ();
3989 if (priv->segment_seqnum != GST_SEQNUM_INVALID)
3990 gst_event_set_seqnum (event, priv->segment_seqnum);
3991 queue_event (jitterbuffer, event);
3992 }
3993 JBUF_SIGNAL_EVENT (priv);
3994
3995 return TRUE;
3996 }
3997
3998 static gboolean
do_deadline_timeout(GstRtpJitterBuffer * jitterbuffer,RtpTimer * timer,GstClockTime now)3999 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
4000 GstClockTime now)
4001 {
4002 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
4003
4004 GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
4005
4006 /* timer seqnum might have been obsoleted by caps seqnum-base,
4007 * only mess with current ongoing seqnum if still unknown */
4008 if (priv->next_seqnum == -1)
4009 priv->next_seqnum = timer->seqnum;
4010 rtp_timer_free (timer);
4011 JBUF_SIGNAL_EVENT (priv);
4012
4013 return TRUE;
4014 }
4015
4016 static gboolean
do_timeout(GstRtpJitterBuffer * jitterbuffer,RtpTimer * timer,GstClockTime now,GQueue * events)4017 do_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
4018 GstClockTime now, GQueue * events)
4019 {
4020 gboolean removed = FALSE;
4021
4022 switch (timer->type) {
4023 case RTP_TIMER_EXPECTED:
4024 removed = do_expected_timeout (jitterbuffer, timer, now, events);
4025 break;
4026 case RTP_TIMER_LOST:
4027 removed = do_lost_timeout (jitterbuffer, timer, now);
4028 break;
4029 case RTP_TIMER_DEADLINE:
4030 removed = do_deadline_timeout (jitterbuffer, timer, now);
4031 break;
4032 case RTP_TIMER_EOS:
4033 removed = do_eos_timeout (jitterbuffer, timer, now);
4034 break;
4035 }
4036 return removed;
4037 }
4038
4039 static void
push_rtx_events_unlocked(GstRtpJitterBuffer * jitterbuffer,GQueue * events)4040 push_rtx_events_unlocked (GstRtpJitterBuffer * jitterbuffer, GQueue * events)
4041 {
4042 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
4043 GstEvent *event;
4044
4045 while ((event = (GstEvent *) g_queue_pop_head (events)))
4046 gst_pad_push_event (priv->sinkpad, event);
4047 }
4048
4049 /* called with JBUF lock
4050 *
4051 * Pushes all events in @events queue.
4052 *
4053 * Returns: %TRUE if the timer thread is not longer running
4054 */
4055 static void
push_rtx_events(GstRtpJitterBuffer * jitterbuffer,GQueue * events)4056 push_rtx_events (GstRtpJitterBuffer * jitterbuffer, GQueue * events)
4057 {
4058 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
4059
4060 if (events->length == 0)
4061 return;
4062
4063 JBUF_UNLOCK (priv);
4064 push_rtx_events_unlocked (jitterbuffer, events);
4065 JBUF_LOCK (priv);
4066 }
4067
4068 /* called when we need to wait for the next timeout.
4069 *
4070 * We loop over the array of recorded timeouts and wait for the earliest one.
4071 * When it timed out, do the logic associated with the timer.
4072 *
4073 * If there are no timers, we wait on a gcond until something new happens.
4074 */
4075 static void
wait_next_timeout(GstRtpJitterBuffer * jitterbuffer)4076 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
4077 {
4078 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
4079 GstClockTime now = 0;
4080
4081 JBUF_LOCK (priv);
4082 while (priv->timer_running) {
4083 RtpTimer *timer = NULL;
4084 GQueue events = G_QUEUE_INIT;
4085
4086 /* don't produce data in paused */
4087 while (priv->blocked) {
4088 JBUF_WAIT_TIMER (priv);
4089 if (!priv->timer_running)
4090 goto stopping;
4091 }
4092
4093 /* If we have a clock, update "now" now with the very
4094 * latest running time we have. If timers are unscheduled below we
4095 * otherwise wouldn't update now (it's only updated when timers
4096 * expire), and also for the very first loop iteration now would
4097 * otherwise always be 0
4098 */
4099 GST_OBJECT_LOCK (jitterbuffer);
4100 if (priv->eos) {
4101 now = GST_CLOCK_TIME_NONE;
4102 } else if (GST_ELEMENT_CLOCK (jitterbuffer)) {
4103 now =
4104 gst_clock_get_time (GST_ELEMENT_CLOCK (jitterbuffer)) -
4105 GST_ELEMENT_CAST (jitterbuffer)->base_time;
4106 }
4107 GST_OBJECT_UNLOCK (jitterbuffer);
4108
4109 GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
4110 GST_TIME_ARGS (now));
4111
4112 /* Clear expired rtx-stats timers */
4113 if (priv->do_retransmission)
4114 rtp_timer_queue_remove_until (priv->rtx_stats_timers, now);
4115
4116 /* Iterate expired "normal" timers */
4117 while ((timer = rtp_timer_queue_pop_until (priv->timers, now)))
4118 do_timeout (jitterbuffer, timer, now, &events);
4119
4120 timer = rtp_timer_queue_peek_earliest (priv->timers);
4121 if (timer) {
4122 GstClock *clock;
4123 GstClockTime sync_time;
4124 GstClockID id;
4125 GstClockReturn ret;
4126 GstClockTimeDiff clock_jitter;
4127
4128 /* we poped all immediate and due timer, so this should just never
4129 * happens */
4130 g_assert (GST_CLOCK_TIME_IS_VALID (timer->timeout));
4131
4132 GST_OBJECT_LOCK (jitterbuffer);
4133 clock = GST_ELEMENT_CLOCK (jitterbuffer);
4134 if (!clock) {
4135 GST_OBJECT_UNLOCK (jitterbuffer);
4136 /* let's just push if there is no clock */
4137 GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
4138 now = timer->timeout;
4139 push_rtx_events (jitterbuffer, &events);
4140 continue;
4141 }
4142
4143 /* prepare for sync against clock */
4144 sync_time = timer->timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
4145 /* add latency of peer to get input time */
4146 sync_time += priv->peer_latency;
4147
4148 GST_DEBUG_OBJECT (jitterbuffer, "timer #%i sync to timestamp %"
4149 GST_TIME_FORMAT " with sync time %" GST_TIME_FORMAT, timer->seqnum,
4150 GST_TIME_ARGS (get_pts_timeout (timer)), GST_TIME_ARGS (sync_time));
4151
4152 /* create an entry for the clock */
4153 id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
4154 priv->timer_timeout = timer->timeout;
4155 priv->timer_seqnum = timer->seqnum;
4156 GST_OBJECT_UNLOCK (jitterbuffer);
4157
4158 /* release the lock so that the other end can push stuff or unlock */
4159 JBUF_UNLOCK (priv);
4160
4161 push_rtx_events_unlocked (jitterbuffer, &events);
4162
4163 ret = gst_clock_id_wait (id, &clock_jitter);
4164
4165 JBUF_LOCK (priv);
4166
4167 if (!priv->timer_running) {
4168 g_queue_clear_full (&events, (GDestroyNotify) gst_event_unref);
4169 gst_clock_id_unref (id);
4170 priv->clock_id = NULL;
4171 break;
4172 }
4173
4174 if (ret != GST_CLOCK_UNSCHEDULED) {
4175 now = priv->timer_timeout + MAX (clock_jitter, 0);
4176 GST_DEBUG_OBJECT (jitterbuffer,
4177 "sync done, %d, #%d, %" GST_STIME_FORMAT, ret, priv->timer_seqnum,
4178 GST_STIME_ARGS (clock_jitter));
4179 } else {
4180 GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
4181 }
4182
4183 /* and free the entry */
4184 gst_clock_id_unref (id);
4185 priv->clock_id = NULL;
4186 } else {
4187 push_rtx_events_unlocked (jitterbuffer, &events);
4188
4189 /* when draining the timers, the pusher thread will reuse our
4190 * condition to wait for completion. Signal that thread before
4191 * sleeping again here */
4192 if (priv->eos)
4193 JBUF_SIGNAL_TIMER (priv);
4194
4195 /* no timers, wait for activity */
4196 JBUF_WAIT_TIMER (priv);
4197 }
4198 }
4199 stopping:
4200 JBUF_UNLOCK (priv);
4201
4202 GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
4203 return;
4204 }
4205
4206 /*
4207 * This function implements the main pushing loop on the source pad.
4208 *
4209 * It first tries to push as many buffers as possible. If there is a seqnum
4210 * mismatch, we wait for the next timeouts.
4211 */
4212 static void
gst_rtp_jitter_buffer_loop(GstRtpJitterBuffer * jitterbuffer)4213 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
4214 {
4215 GstRtpJitterBufferPrivate *priv;
4216 GstFlowReturn result = GST_FLOW_OK;
4217
4218 priv = jitterbuffer->priv;
4219
4220 JBUF_LOCK_CHECK (priv, flushing);
4221 do {
4222 result = handle_next_buffer (jitterbuffer);
4223 if (G_LIKELY (result == GST_FLOW_WAIT)) {
4224 /* now wait for the next event */
4225 JBUF_SIGNAL_QUEUE (priv);
4226 JBUF_WAIT_EVENT (priv, flushing);
4227 result = GST_FLOW_OK;
4228 }
4229 } while (result == GST_FLOW_OK);
4230 /* store result for upstream */
4231 priv->srcresult = result;
4232 /* if we get here we need to pause */
4233 goto pause;
4234
4235 /* ERRORS */
4236 flushing:
4237 {
4238 result = priv->srcresult;
4239 goto pause;
4240 }
4241 pause:
4242 {
4243 GstEvent *event;
4244
4245 JBUF_SIGNAL_QUERY (priv, FALSE);
4246 JBUF_UNLOCK (priv);
4247
4248 GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
4249 gst_flow_get_name (result));
4250 gst_pad_pause_task (priv->srcpad);
4251 if (result == GST_FLOW_EOS) {
4252 event = gst_event_new_eos ();
4253 if (priv->segment_seqnum != GST_SEQNUM_INVALID)
4254 gst_event_set_seqnum (event, priv->segment_seqnum);
4255 gst_pad_push_event (priv->srcpad, event);
4256 }
4257 return;
4258 }
4259 }
4260
4261 /* collect the info from the latest RTCP packet and the jitterbuffer sync, do
4262 * some sanity checks and then emit the handle-sync signal with the parameters.
4263 * This function must be called with the LOCK */
4264 static void
do_handle_sync(GstRtpJitterBuffer * jitterbuffer)4265 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
4266 {
4267 GstRtpJitterBufferPrivate *priv;
4268 guint64 base_rtptime, base_time;
4269 guint32 clock_rate;
4270 guint64 last_rtptime;
4271 guint64 clock_base;
4272 guint64 ext_rtptime, diff;
4273 gboolean valid = TRUE, keep = FALSE;
4274
4275 priv = jitterbuffer->priv;
4276
4277 /* get the last values from the jitterbuffer */
4278 rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
4279 &clock_rate, &last_rtptime);
4280
4281 clock_base = priv->clock_base;
4282 ext_rtptime = priv->ext_rtptime;
4283
4284 GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
4285 G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
4286 ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
4287 ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
4288
4289 if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
4290 /* we keep this SR packet for later. When we get a valid RTP packet the
4291 * above values will be set and we can try to use the SR packet */
4292 GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
4293 keep = TRUE;
4294 } else {
4295 /* we can't accept anything that happened before we did the last resync */
4296 if (base_rtptime > ext_rtptime) {
4297 GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
4298 valid = FALSE;
4299 } else {
4300 /* the SR RTP timestamp must be something close to what we last observed
4301 * in the jitterbuffer */
4302 if (ext_rtptime > last_rtptime) {
4303 /* check how far ahead it is to our RTP timestamps */
4304 diff = ext_rtptime - last_rtptime;
4305 /* if bigger than 1 second, we drop it */
4306 if (jitterbuffer->priv->max_rtcp_rtp_time_diff != -1 &&
4307 diff >
4308 gst_util_uint64_scale (jitterbuffer->priv->max_rtcp_rtp_time_diff,
4309 clock_rate, 1000)) {
4310 GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
4311 /* should drop this, but some RTSP servers end up with bogus
4312 * way too ahead RTCP packet when repeated PAUSE/PLAY,
4313 * so still trigger rptbin sync but invalidate RTCP data
4314 * (sync might use other methods) */
4315 ext_rtptime = -1;
4316 }
4317 GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
4318 G_GUINT64_FORMAT, last_rtptime, diff);
4319 }
4320 }
4321 }
4322
4323 if (keep) {
4324 GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
4325 } else if (valid) {
4326 GstStructure *s;
4327
4328 s = gst_structure_new ("application/x-rtp-sync",
4329 "base-rtptime", G_TYPE_UINT64, base_rtptime,
4330 "base-time", G_TYPE_UINT64, base_time,
4331 "clock-rate", G_TYPE_UINT, clock_rate,
4332 "clock-base", G_TYPE_UINT64, clock_base,
4333 "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
4334 "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
4335
4336 GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
4337 gst_buffer_replace (&priv->last_sr, NULL);
4338 JBUF_UNLOCK (priv);
4339 g_signal_emit (jitterbuffer,
4340 gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
4341 JBUF_LOCK (priv);
4342 gst_structure_free (s);
4343 } else {
4344 GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
4345 gst_buffer_replace (&priv->last_sr, NULL);
4346 }
4347 }
4348
4349 static GstFlowReturn
gst_rtp_jitter_buffer_chain_rtcp(GstPad * pad,GstObject * parent,GstBuffer * buffer)4350 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
4351 GstBuffer * buffer)
4352 {
4353 GstRtpJitterBuffer *jitterbuffer;
4354 GstRtpJitterBufferPrivate *priv;
4355 GstFlowReturn ret = GST_FLOW_OK;
4356 guint32 ssrc;
4357 GstRTCPPacket packet;
4358 guint64 ext_rtptime;
4359 guint32 rtptime;
4360 GstRTCPBuffer rtcp = { NULL, };
4361
4362 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4363
4364 if (G_UNLIKELY (!gst_rtcp_buffer_validate_reduced (buffer)))
4365 goto invalid_buffer;
4366
4367 priv = jitterbuffer->priv;
4368
4369 gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
4370
4371 if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
4372 goto empty_buffer;
4373
4374 /* first packet must be SR or RR or else the validate would have failed */
4375 switch (gst_rtcp_packet_get_type (&packet)) {
4376 case GST_RTCP_TYPE_SR:
4377 gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
4378 NULL, NULL);
4379 break;
4380 default:
4381 goto ignore_buffer;
4382 }
4383 gst_rtcp_buffer_unmap (&rtcp);
4384
4385 GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
4386
4387 JBUF_LOCK (priv);
4388 /* convert the RTP timestamp to our extended timestamp, using the same offset
4389 * we used in the jitterbuffer */
4390 ext_rtptime = priv->jbuf->ext_rtptime;
4391 ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
4392
4393 priv->ext_rtptime = ext_rtptime;
4394 gst_buffer_replace (&priv->last_sr, buffer);
4395
4396 do_handle_sync (jitterbuffer);
4397 JBUF_UNLOCK (priv);
4398
4399 done:
4400 gst_buffer_unref (buffer);
4401
4402 return ret;
4403
4404 invalid_buffer:
4405 {
4406 /* this is not fatal but should be filtered earlier */
4407 GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
4408 ("Received invalid RTCP payload, dropping"));
4409 ret = GST_FLOW_OK;
4410 goto done;
4411 }
4412 empty_buffer:
4413 {
4414 /* this is not fatal but should be filtered earlier */
4415 GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
4416 ("Received empty RTCP payload, dropping"));
4417 gst_rtcp_buffer_unmap (&rtcp);
4418 ret = GST_FLOW_OK;
4419 goto done;
4420 }
4421 ignore_buffer:
4422 {
4423 GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
4424 gst_rtcp_buffer_unmap (&rtcp);
4425 ret = GST_FLOW_OK;
4426 goto done;
4427 }
4428 }
4429
4430 static gboolean
gst_rtp_jitter_buffer_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)4431 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
4432 GstQuery * query)
4433 {
4434 gboolean res = FALSE;
4435 GstRtpJitterBuffer *jitterbuffer;
4436 GstRtpJitterBufferPrivate *priv;
4437
4438 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4439 priv = jitterbuffer->priv;
4440
4441 switch (GST_QUERY_TYPE (query)) {
4442 case GST_QUERY_CAPS:
4443 {
4444 GstCaps *filter, *caps;
4445
4446 gst_query_parse_caps (query, &filter);
4447 caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
4448 gst_query_set_caps_result (query, caps);
4449 gst_caps_unref (caps);
4450 res = TRUE;
4451 break;
4452 }
4453 default:
4454 if (GST_QUERY_IS_SERIALIZED (query)) {
4455 JBUF_LOCK_CHECK (priv, out_flushing);
4456 if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
4457 RTP_JITTER_BUFFER_MODE_BUFFER) {
4458 GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
4459 if (rtp_jitter_buffer_append_query (priv->jbuf, query))
4460 JBUF_SIGNAL_EVENT (priv);
4461 JBUF_WAIT_QUERY (priv, out_flushing);
4462 res = priv->last_query;
4463 } else {
4464 GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
4465 res = FALSE;
4466 }
4467 JBUF_UNLOCK (priv);
4468 } else {
4469 res = gst_pad_query_default (pad, parent, query);
4470 }
4471 break;
4472 }
4473 return res;
4474 /* ERRORS */
4475 out_flushing:
4476 {
4477 GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
4478 JBUF_UNLOCK (priv);
4479 return FALSE;
4480 }
4481
4482 }
4483
4484 static gboolean
gst_rtp_jitter_buffer_src_query(GstPad * pad,GstObject * parent,GstQuery * query)4485 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
4486 GstQuery * query)
4487 {
4488 GstRtpJitterBuffer *jitterbuffer;
4489 GstRtpJitterBufferPrivate *priv;
4490 gboolean res = FALSE;
4491
4492 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4493 priv = jitterbuffer->priv;
4494
4495 switch (GST_QUERY_TYPE (query)) {
4496 case GST_QUERY_LATENCY:
4497 {
4498 /* We need to send the query upstream and add the returned latency to our
4499 * own */
4500 GstClockTime min_latency, max_latency;
4501 gboolean us_live;
4502 GstClockTime our_latency;
4503
4504 if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
4505 gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
4506
4507 GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
4508 GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
4509 GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
4510
4511 /* store this so that we can safely sync on the peer buffers. */
4512 JBUF_LOCK (priv);
4513 priv->peer_latency = min_latency;
4514 our_latency = priv->latency_ns;
4515 JBUF_UNLOCK (priv);
4516
4517 GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
4518 GST_TIME_ARGS (our_latency));
4519
4520 /* we add some latency but can buffer an infinite amount of time */
4521 min_latency += our_latency;
4522 max_latency = -1;
4523
4524 GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
4525 GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
4526 GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
4527
4528 gst_query_set_latency (query, TRUE, min_latency, max_latency);
4529 }
4530 break;
4531 }
4532 case GST_QUERY_POSITION:
4533 {
4534 GstClockTime start, last_out;
4535 GstFormat fmt;
4536
4537 gst_query_parse_position (query, &fmt, NULL);
4538 if (fmt != GST_FORMAT_TIME) {
4539 res = gst_pad_query_default (pad, parent, query);
4540 break;
4541 }
4542
4543 JBUF_LOCK (priv);
4544 start = priv->npt_start;
4545 last_out = priv->last_out_time;
4546 JBUF_UNLOCK (priv);
4547
4548 GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
4549 ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
4550 GST_TIME_ARGS (last_out));
4551
4552 if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
4553 /* bring 0-based outgoing time to stream time */
4554 gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
4555 res = TRUE;
4556 } else {
4557 res = gst_pad_query_default (pad, parent, query);
4558 }
4559 break;
4560 }
4561 case GST_QUERY_CAPS:
4562 {
4563 GstCaps *filter, *caps;
4564
4565 gst_query_parse_caps (query, &filter);
4566 caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
4567 gst_query_set_caps_result (query, caps);
4568 gst_caps_unref (caps);
4569 res = TRUE;
4570 break;
4571 }
4572 default:
4573 res = gst_pad_query_default (pad, parent, query);
4574 break;
4575 }
4576
4577 return res;
4578 }
4579
4580 static void
gst_rtp_jitter_buffer_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)4581 gst_rtp_jitter_buffer_set_property (GObject * object,
4582 guint prop_id, const GValue * value, GParamSpec * pspec)
4583 {
4584 GstRtpJitterBuffer *jitterbuffer;
4585 GstRtpJitterBufferPrivate *priv;
4586
4587 jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4588 priv = jitterbuffer->priv;
4589
4590 switch (prop_id) {
4591 case PROP_LATENCY:
4592 {
4593 guint new_latency, old_latency;
4594
4595 new_latency = g_value_get_uint (value);
4596
4597 JBUF_LOCK (priv);
4598 old_latency = priv->latency_ms;
4599 priv->latency_ms = new_latency;
4600 priv->latency_ns = priv->latency_ms * GST_MSECOND;
4601 rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
4602 JBUF_UNLOCK (priv);
4603
4604 /* post message if latency changed, this will inform the parent pipeline
4605 * that a latency reconfiguration is possible/needed. */
4606 if (new_latency != old_latency) {
4607 GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
4608 GST_TIME_ARGS (new_latency * GST_MSECOND));
4609
4610 gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
4611 gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
4612 }
4613 break;
4614 }
4615 case PROP_DROP_ON_LATENCY:
4616 JBUF_LOCK (priv);
4617 priv->drop_on_latency = g_value_get_boolean (value);
4618 JBUF_UNLOCK (priv);
4619 break;
4620 case PROP_TS_OFFSET:
4621 JBUF_LOCK (priv);
4622 if (priv->max_ts_offset_adjustment != 0) {
4623 gint64 new_offset = g_value_get_int64 (value);
4624
4625 if (new_offset > priv->ts_offset) {
4626 priv->ts_offset_remainder = new_offset - priv->ts_offset;
4627 } else {
4628 priv->ts_offset_remainder = -(priv->ts_offset - new_offset);
4629 }
4630 } else {
4631 priv->ts_offset = g_value_get_int64 (value);
4632 priv->ts_offset_remainder = 0;
4633 update_timer_offsets (jitterbuffer);
4634 }
4635 priv->ts_discont = TRUE;
4636 JBUF_UNLOCK (priv);
4637 break;
4638 case PROP_MAX_TS_OFFSET_ADJUSTMENT:
4639 JBUF_LOCK (priv);
4640 priv->max_ts_offset_adjustment = g_value_get_uint64 (value);
4641 JBUF_UNLOCK (priv);
4642 break;
4643 case PROP_DO_LOST:
4644 JBUF_LOCK (priv);
4645 priv->do_lost = g_value_get_boolean (value);
4646 JBUF_UNLOCK (priv);
4647 break;
4648 case PROP_POST_DROP_MESSAGES:
4649 JBUF_LOCK (priv);
4650 priv->post_drop_messages = g_value_get_boolean (value);
4651 JBUF_UNLOCK (priv);
4652 break;
4653 case PROP_DROP_MESSAGES_INTERVAL:
4654 JBUF_LOCK (priv);
4655 priv->drop_messages_interval_ms = g_value_get_uint (value);
4656 JBUF_UNLOCK (priv);
4657 break;
4658 case PROP_MODE:
4659 JBUF_LOCK (priv);
4660 rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
4661 JBUF_UNLOCK (priv);
4662 break;
4663 case PROP_DO_RETRANSMISSION:
4664 JBUF_LOCK (priv);
4665 priv->do_retransmission = g_value_get_boolean (value);
4666 JBUF_UNLOCK (priv);
4667 break;
4668 case PROP_RTX_NEXT_SEQNUM:
4669 JBUF_LOCK (priv);
4670 priv->rtx_next_seqnum = g_value_get_boolean (value);
4671 JBUF_UNLOCK (priv);
4672 break;
4673 case PROP_RTX_DELAY:
4674 JBUF_LOCK (priv);
4675 priv->rtx_delay = g_value_get_int (value);
4676 JBUF_UNLOCK (priv);
4677 break;
4678 case PROP_RTX_MIN_DELAY:
4679 JBUF_LOCK (priv);
4680 priv->rtx_min_delay = g_value_get_uint (value);
4681 JBUF_UNLOCK (priv);
4682 break;
4683 case PROP_RTX_DELAY_REORDER:
4684 JBUF_LOCK (priv);
4685 priv->rtx_delay_reorder = g_value_get_int (value);
4686 JBUF_UNLOCK (priv);
4687 break;
4688 case PROP_RTX_RETRY_TIMEOUT:
4689 JBUF_LOCK (priv);
4690 priv->rtx_retry_timeout = g_value_get_int (value);
4691 JBUF_UNLOCK (priv);
4692 break;
4693 case PROP_RTX_MIN_RETRY_TIMEOUT:
4694 JBUF_LOCK (priv);
4695 priv->rtx_min_retry_timeout = g_value_get_int (value);
4696 JBUF_UNLOCK (priv);
4697 break;
4698 case PROP_RTX_RETRY_PERIOD:
4699 JBUF_LOCK (priv);
4700 priv->rtx_retry_period = g_value_get_int (value);
4701 JBUF_UNLOCK (priv);
4702 break;
4703 case PROP_RTX_MAX_RETRIES:
4704 JBUF_LOCK (priv);
4705 priv->rtx_max_retries = g_value_get_int (value);
4706 JBUF_UNLOCK (priv);
4707 break;
4708 case PROP_RTX_DEADLINE:
4709 JBUF_LOCK (priv);
4710 priv->rtx_deadline_ms = g_value_get_int (value);
4711 JBUF_UNLOCK (priv);
4712 break;
4713 case PROP_RTX_STATS_TIMEOUT:
4714 JBUF_LOCK (priv);
4715 priv->rtx_stats_timeout = g_value_get_uint (value);
4716 JBUF_UNLOCK (priv);
4717 break;
4718 case PROP_MAX_RTCP_RTP_TIME_DIFF:
4719 JBUF_LOCK (priv);
4720 priv->max_rtcp_rtp_time_diff = g_value_get_int (value);
4721 JBUF_UNLOCK (priv);
4722 break;
4723 case PROP_MAX_DROPOUT_TIME:
4724 JBUF_LOCK (priv);
4725 priv->max_dropout_time = g_value_get_uint (value);
4726 JBUF_UNLOCK (priv);
4727 break;
4728 case PROP_MAX_MISORDER_TIME:
4729 JBUF_LOCK (priv);
4730 priv->max_misorder_time = g_value_get_uint (value);
4731 JBUF_UNLOCK (priv);
4732 break;
4733 case PROP_RFC7273_SYNC:
4734 JBUF_LOCK (priv);
4735 rtp_jitter_buffer_set_rfc7273_sync (priv->jbuf,
4736 g_value_get_boolean (value));
4737 JBUF_UNLOCK (priv);
4738 break;
4739 case PROP_FASTSTART_MIN_PACKETS:
4740 JBUF_LOCK (priv);
4741 priv->faststart_min_packets = g_value_get_uint (value);
4742 JBUF_UNLOCK (priv);
4743 break;
4744 default:
4745 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4746 break;
4747 }
4748 }
4749
4750 static void
gst_rtp_jitter_buffer_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)4751 gst_rtp_jitter_buffer_get_property (GObject * object,
4752 guint prop_id, GValue * value, GParamSpec * pspec)
4753 {
4754 GstRtpJitterBuffer *jitterbuffer;
4755 GstRtpJitterBufferPrivate *priv;
4756
4757 jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4758 priv = jitterbuffer->priv;
4759
4760 switch (prop_id) {
4761 case PROP_LATENCY:
4762 JBUF_LOCK (priv);
4763 g_value_set_uint (value, priv->latency_ms);
4764 JBUF_UNLOCK (priv);
4765 break;
4766 case PROP_DROP_ON_LATENCY:
4767 JBUF_LOCK (priv);
4768 g_value_set_boolean (value, priv->drop_on_latency);
4769 JBUF_UNLOCK (priv);
4770 break;
4771 case PROP_TS_OFFSET:
4772 JBUF_LOCK (priv);
4773 g_value_set_int64 (value, priv->ts_offset);
4774 JBUF_UNLOCK (priv);
4775 break;
4776 case PROP_MAX_TS_OFFSET_ADJUSTMENT:
4777 JBUF_LOCK (priv);
4778 g_value_set_uint64 (value, priv->max_ts_offset_adjustment);
4779 JBUF_UNLOCK (priv);
4780 break;
4781 case PROP_DO_LOST:
4782 JBUF_LOCK (priv);
4783 g_value_set_boolean (value, priv->do_lost);
4784 JBUF_UNLOCK (priv);
4785 break;
4786 case PROP_POST_DROP_MESSAGES:
4787 JBUF_LOCK (priv);
4788 g_value_set_boolean (value, priv->post_drop_messages);
4789 JBUF_UNLOCK (priv);
4790 break;
4791 case PROP_DROP_MESSAGES_INTERVAL:
4792 JBUF_LOCK (priv);
4793 g_value_set_uint (value, priv->drop_messages_interval_ms);
4794 JBUF_UNLOCK (priv);
4795 break;
4796 case PROP_MODE:
4797 JBUF_LOCK (priv);
4798 g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
4799 JBUF_UNLOCK (priv);
4800 break;
4801 case PROP_PERCENT:
4802 {
4803 gint percent;
4804
4805 JBUF_LOCK (priv);
4806 if (priv->srcresult != GST_FLOW_OK)
4807 percent = 100;
4808 else
4809 percent = rtp_jitter_buffer_get_percent (priv->jbuf);
4810
4811 g_value_set_int (value, percent);
4812 JBUF_UNLOCK (priv);
4813 break;
4814 }
4815 case PROP_DO_RETRANSMISSION:
4816 JBUF_LOCK (priv);
4817 g_value_set_boolean (value, priv->do_retransmission);
4818 JBUF_UNLOCK (priv);
4819 break;
4820 case PROP_RTX_NEXT_SEQNUM:
4821 JBUF_LOCK (priv);
4822 g_value_set_boolean (value, priv->rtx_next_seqnum);
4823 JBUF_UNLOCK (priv);
4824 break;
4825 case PROP_RTX_DELAY:
4826 JBUF_LOCK (priv);
4827 g_value_set_int (value, priv->rtx_delay);
4828 JBUF_UNLOCK (priv);
4829 break;
4830 case PROP_RTX_MIN_DELAY:
4831 JBUF_LOCK (priv);
4832 g_value_set_uint (value, priv->rtx_min_delay);
4833 JBUF_UNLOCK (priv);
4834 break;
4835 case PROP_RTX_DELAY_REORDER:
4836 JBUF_LOCK (priv);
4837 g_value_set_int (value, priv->rtx_delay_reorder);
4838 JBUF_UNLOCK (priv);
4839 break;
4840 case PROP_RTX_RETRY_TIMEOUT:
4841 JBUF_LOCK (priv);
4842 g_value_set_int (value, priv->rtx_retry_timeout);
4843 JBUF_UNLOCK (priv);
4844 break;
4845 case PROP_RTX_MIN_RETRY_TIMEOUT:
4846 JBUF_LOCK (priv);
4847 g_value_set_int (value, priv->rtx_min_retry_timeout);
4848 JBUF_UNLOCK (priv);
4849 break;
4850 case PROP_RTX_RETRY_PERIOD:
4851 JBUF_LOCK (priv);
4852 g_value_set_int (value, priv->rtx_retry_period);
4853 JBUF_UNLOCK (priv);
4854 break;
4855 case PROP_RTX_MAX_RETRIES:
4856 JBUF_LOCK (priv);
4857 g_value_set_int (value, priv->rtx_max_retries);
4858 JBUF_UNLOCK (priv);
4859 break;
4860 case PROP_RTX_DEADLINE:
4861 JBUF_LOCK (priv);
4862 g_value_set_int (value, priv->rtx_deadline_ms);
4863 JBUF_UNLOCK (priv);
4864 break;
4865 case PROP_RTX_STATS_TIMEOUT:
4866 JBUF_LOCK (priv);
4867 g_value_set_uint (value, priv->rtx_stats_timeout);
4868 JBUF_UNLOCK (priv);
4869 break;
4870 case PROP_STATS:
4871 g_value_take_boxed (value,
4872 gst_rtp_jitter_buffer_create_stats (jitterbuffer));
4873 break;
4874 case PROP_MAX_RTCP_RTP_TIME_DIFF:
4875 JBUF_LOCK (priv);
4876 g_value_set_int (value, priv->max_rtcp_rtp_time_diff);
4877 JBUF_UNLOCK (priv);
4878 break;
4879 case PROP_MAX_DROPOUT_TIME:
4880 JBUF_LOCK (priv);
4881 g_value_set_uint (value, priv->max_dropout_time);
4882 JBUF_UNLOCK (priv);
4883 break;
4884 case PROP_MAX_MISORDER_TIME:
4885 JBUF_LOCK (priv);
4886 g_value_set_uint (value, priv->max_misorder_time);
4887 JBUF_UNLOCK (priv);
4888 break;
4889 case PROP_RFC7273_SYNC:
4890 JBUF_LOCK (priv);
4891 g_value_set_boolean (value,
4892 rtp_jitter_buffer_get_rfc7273_sync (priv->jbuf));
4893 JBUF_UNLOCK (priv);
4894 break;
4895 case PROP_FASTSTART_MIN_PACKETS:
4896 JBUF_LOCK (priv);
4897 g_value_set_uint (value, priv->faststart_min_packets);
4898 JBUF_UNLOCK (priv);
4899 break;
4900 default:
4901 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4902 break;
4903 }
4904 }
4905
4906 static GstStructure *
gst_rtp_jitter_buffer_create_stats(GstRtpJitterBuffer * jbuf)4907 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
4908 {
4909 GstRtpJitterBufferPrivate *priv = jbuf->priv;
4910 GstStructure *s;
4911
4912 JBUF_LOCK (priv);
4913 s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
4914 "num-pushed", G_TYPE_UINT64, priv->num_pushed,
4915 "num-lost", G_TYPE_UINT64, priv->num_lost,
4916 "num-late", G_TYPE_UINT64, priv->num_late,
4917 "num-duplicates", G_TYPE_UINT64, priv->num_duplicates,
4918 "avg-jitter", G_TYPE_UINT64, priv->avg_jitter,
4919 "rtx-count", G_TYPE_UINT64, priv->num_rtx_requests,
4920 "rtx-success-count", G_TYPE_UINT64, priv->num_rtx_success,
4921 "rtx-per-packet", G_TYPE_DOUBLE, priv->avg_rtx_num,
4922 "rtx-rtt", G_TYPE_UINT64, priv->avg_rtx_rtt, NULL);
4923 JBUF_UNLOCK (priv);
4924
4925 return s;
4926 }
4927