• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *  Copyright 2015 Kurento (http://kurento.org/)
9  *   @author: Miguel París <mparisdiaz@gmail.com>
10  *  Copyright 2016 Pexip AS
11  *   @author: Havard Graff <havard@pexip.com>
12  *   @author: Stian Selnes <stian@pexip.com>
13  *
14  * This library is free software; you can redistribute it and/or
15  * modify it under the terms of the GNU Library General Public
16  * License as published by the Free Software Foundation; either
17  * version 2 of the License, or (at your option) any later version.
18  *
19  * This library is distributed in the hope that it will be useful,
20  * but WITHOUT ANY WARRANTY; without even the implied warranty of
21  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22  * Library General Public License for more details.
23  *
24  * You should have received a copy of the GNU Library General Public
25  * License along with this library; if not, write to the
26  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
27  * Boston, MA 02110-1301, USA.
28  *
29  */
30 
31 /**
32  * SECTION:element-rtpjitterbuffer
33  * @title: rtpjitterbuffer
34  *
35  * This element reorders and removes duplicate RTP packets as they are received
36  * from a network source.
37  *
38  * The element needs the clock-rate of the RTP payload in order to estimate the
39  * delay. This information is obtained either from the caps on the sink pad or,
40  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
41  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
42  *
43  * The rtpjitterbuffer will wait for missing packets up to a configurable time
44  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
45  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
46  * property is set, lost packets will result in a custom serialized downstream
47  * event of name GstRTPPacketLost. The lost packet events are usually used by a
48  * depayloader or other element to create concealment data or some other logic
49  * to gracefully handle the missing packets.
50  *
51  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incoming
52  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
53  * buffer.
54  *
55  * The jitterbuffer can also be configured to send early retransmission events
56  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
57  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
58  * sends a custom upstream event named GstRTPRetransmissionRequest when the
59  * packet is considered late. The initial expected packet arrival time is
60  * calculated as follows:
61  *
62  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
63  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
64  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
65  *     packets with different rtptime.
66  *
67  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
68  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
69  *     previously scheduled timeout is overwritten.
70  *
71  * - If seqnum N arrived, all seqnum older than
72  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
73  *     immediately. This is to request fast feedback for abnormally reorder
74  *     packets before any of the previous timeouts is triggered.
75  *
76  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
77  * event. After the initial timeout expires and the retransmission event is
78  * sent, the timeout is scheduled for
79  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
80  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
81  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
82  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
83  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
84  * retransmission requests are sent and the regular logic is performed to
85  * schedule a lost packet as discussed above.
86  *
87  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
88  * to the pipeline.
89  *
90  * This element will automatically be used inside rtpbin.
91  *
92  * ## Example pipelines
93  * |[
94  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
95  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
96  * inserted into the pipeline to smooth out network jitter and to reorder the
97  * out-of-order RTP packets.
98  *
99  */
100 
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104 
105 #include <stdlib.h>
106 #include <stdio.h>
107 #include <string.h>
108 #include <gst/rtp/gstrtpbuffer.h>
109 #include <gst/net/net.h>
110 
111 #include "gstrtpjitterbuffer.h"
112 #include "rtpjitterbuffer.h"
113 #include "rtpstats.h"
114 #include "rtptimerqueue.h"
115 
116 #include <gst/glib-compat-private.h>
117 
118 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
119 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
120 
121 /* RTPJitterBuffer signals and args */
122 enum
123 {
124   SIGNAL_REQUEST_PT_MAP,
125   SIGNAL_CLEAR_PT_MAP,
126   SIGNAL_HANDLE_SYNC,
127   SIGNAL_ON_NPT_STOP,
128   SIGNAL_SET_ACTIVE,
129   LAST_SIGNAL
130 };
131 
132 #define DEFAULT_LATENCY_MS          200
133 #define DEFAULT_DROP_ON_LATENCY     FALSE
134 #define DEFAULT_TS_OFFSET           0
135 #define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT 0
136 #define DEFAULT_DO_LOST             FALSE
137 #define DEFAULT_POST_DROP_MESSAGES  FALSE
138 #define DEFAULT_DROP_MESSAGES_INTERVAL_MS   200
139 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
140 #define DEFAULT_PERCENT             0
141 #define DEFAULT_DO_RETRANSMISSION   FALSE
142 #define DEFAULT_RTX_NEXT_SEQNUM     TRUE
143 #define DEFAULT_RTX_DELAY           -1
144 #define DEFAULT_RTX_MIN_DELAY       0
145 #define DEFAULT_RTX_DELAY_REORDER   3
146 #define DEFAULT_RTX_RETRY_TIMEOUT   -1
147 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT   -1
148 #define DEFAULT_RTX_RETRY_PERIOD    -1
149 #define DEFAULT_RTX_MAX_RETRIES    -1
150 #define DEFAULT_RTX_DEADLINE       -1
151 #define DEFAULT_RTX_STATS_TIMEOUT   1000
152 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
153 #define DEFAULT_MAX_DROPOUT_TIME    60000
154 #define DEFAULT_MAX_MISORDER_TIME   2000
155 #define DEFAULT_RFC7273_SYNC        FALSE
156 #define DEFAULT_FASTSTART_MIN_PACKETS 0
157 
158 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
159 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
160 
161 enum
162 {
163   PROP_0,
164   PROP_LATENCY,
165   PROP_DROP_ON_LATENCY,
166   PROP_TS_OFFSET,
167   PROP_MAX_TS_OFFSET_ADJUSTMENT,
168   PROP_DO_LOST,
169   PROP_POST_DROP_MESSAGES,
170   PROP_DROP_MESSAGES_INTERVAL,
171   PROP_MODE,
172   PROP_PERCENT,
173   PROP_DO_RETRANSMISSION,
174   PROP_RTX_NEXT_SEQNUM,
175   PROP_RTX_DELAY,
176   PROP_RTX_MIN_DELAY,
177   PROP_RTX_DELAY_REORDER,
178   PROP_RTX_RETRY_TIMEOUT,
179   PROP_RTX_MIN_RETRY_TIMEOUT,
180   PROP_RTX_RETRY_PERIOD,
181   PROP_RTX_MAX_RETRIES,
182   PROP_RTX_DEADLINE,
183   PROP_RTX_STATS_TIMEOUT,
184   PROP_STATS,
185   PROP_MAX_RTCP_RTP_TIME_DIFF,
186   PROP_MAX_DROPOUT_TIME,
187   PROP_MAX_MISORDER_TIME,
188   PROP_RFC7273_SYNC,
189   PROP_FASTSTART_MIN_PACKETS
190 };
191 
192 #define JBUF_LOCK(priv)   G_STMT_START {			\
193     GST_TRACE("Locking from thread %p", g_thread_self());	\
194     (g_mutex_lock (&(priv)->jbuf_lock));			\
195     GST_TRACE("Locked from thread %p", g_thread_self());	\
196   } G_STMT_END
197 
198 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
199   JBUF_LOCK (priv);                                   \
200   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
201     goto label;                                       \
202 } G_STMT_END
203 #define JBUF_UNLOCK(priv) G_STMT_START {			\
204     GST_TRACE ("Unlocking from thread %p", g_thread_self ());	\
205     (g_mutex_unlock (&(priv)->jbuf_lock));			\
206 } G_STMT_END
207 
208 #define JBUF_WAIT_QUEUE(priv)   G_STMT_START {            \
209   GST_DEBUG ("waiting queue");                            \
210   (priv)->waiting_queue++;                                \
211   g_cond_wait (&(priv)->jbuf_queue, &(priv)->jbuf_lock);  \
212   (priv)->waiting_queue--;                                \
213   GST_DEBUG ("waiting queue done");                       \
214 } G_STMT_END
215 #define JBUF_SIGNAL_QUEUE(priv) G_STMT_START {            \
216   if (G_UNLIKELY ((priv)->waiting_queue)) {               \
217     GST_DEBUG ("signal queue, %d waiters", (priv)->waiting_queue); \
218     g_cond_signal (&(priv)->jbuf_queue);                  \
219   }                                                       \
220 } G_STMT_END
221 
222 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
223   GST_DEBUG ("waiting timer");                            \
224   (priv)->waiting_timer++;                                \
225   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
226   (priv)->waiting_timer--;                                \
227   GST_DEBUG ("waiting timer done");                       \
228 } G_STMT_END
229 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
230   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
231     GST_DEBUG ("signal timer, %d waiters", (priv)->waiting_timer); \
232     g_cond_signal (&(priv)->jbuf_timer);                  \
233   }                                                       \
234 } G_STMT_END
235 
236 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
237   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
238     goto label;                                          \
239   GST_DEBUG ("waiting event");                           \
240   (priv)->waiting_event = TRUE;                          \
241   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
242   (priv)->waiting_event = FALSE;                         \
243   GST_DEBUG ("waiting event done");                      \
244   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
245     goto label;                                          \
246 } G_STMT_END
247 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
248   if (G_UNLIKELY ((priv)->waiting_event)) {              \
249     GST_DEBUG ("signal event");                          \
250     g_cond_signal (&(priv)->jbuf_event);                 \
251   }                                                      \
252 } G_STMT_END
253 
254 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START {       \
255   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
256     goto label;                                          \
257   GST_DEBUG ("waiting query");                           \
258   (priv)->waiting_query = TRUE;                          \
259   g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
260   (priv)->waiting_query = FALSE;                         \
261   GST_DEBUG ("waiting query done");                      \
262   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
263     goto label;                                          \
264 } G_STMT_END
265 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START {       \
266   (priv)->last_query = res;                              \
267   if (G_UNLIKELY ((priv)->waiting_query)) {              \
268     GST_DEBUG ("signal query");                          \
269     g_cond_signal (&(priv)->jbuf_query);                 \
270   }                                                      \
271 } G_STMT_END
272 
273 #define GST_BUFFER_IS_RETRANSMISSION(buffer) \
274   GST_BUFFER_FLAG_IS_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION)
275 
276 #if !GLIB_CHECK_VERSION(2, 60, 0)
277 #define g_queue_clear_full queue_clear_full
278 static void
queue_clear_full(GQueue * queue,GDestroyNotify free_func)279 queue_clear_full (GQueue * queue, GDestroyNotify free_func)
280 {
281   gpointer data;
282 
283   while ((data = g_queue_pop_head (queue)) != NULL)
284     free_func (data);
285 }
286 #endif
287 
288 struct _GstRtpJitterBufferPrivate
289 {
290   GstPad *sinkpad, *srcpad;
291   GstPad *rtcpsinkpad;
292 
293   RTPJitterBuffer *jbuf;
294   GMutex jbuf_lock;
295   guint waiting_queue;
296   GCond jbuf_queue;
297   guint waiting_timer;
298   GCond jbuf_timer;
299   gboolean waiting_event;
300   GCond jbuf_event;
301   gboolean waiting_query;
302   GCond jbuf_query;
303   gboolean last_query;
304   gboolean discont;
305   gboolean ts_discont;
306   gboolean active;
307   guint64 out_offset;
308   guint32 segment_seqnum;
309 
310   gboolean timer_running;
311   GThread *timer_thread;
312 
313   /* properties */
314   guint latency_ms;
315   guint64 latency_ns;
316   gboolean drop_on_latency;
317   gint64 ts_offset;
318   guint64 max_ts_offset_adjustment;
319   gboolean do_lost;
320   gboolean post_drop_messages;
321   guint drop_messages_interval_ms;
322   gboolean do_retransmission;
323   gboolean rtx_next_seqnum;
324   gint rtx_delay;
325   guint rtx_min_delay;
326   gint rtx_delay_reorder;
327   gint rtx_retry_timeout;
328   gint rtx_min_retry_timeout;
329   gint rtx_retry_period;
330   gint rtx_max_retries;
331   guint rtx_stats_timeout;
332   gint rtx_deadline_ms;
333   gint max_rtcp_rtp_time_diff;
334   guint32 max_dropout_time;
335   guint32 max_misorder_time;
336   guint faststart_min_packets;
337 
338   /* the last seqnum we pushed out */
339   guint32 last_popped_seqnum;
340   /* the next expected seqnum we push */
341   guint32 next_seqnum;
342   /* seqnum-base, if known */
343   guint32 seqnum_base;
344   /* last output time */
345   GstClockTime last_out_time;
346   /* last valid input timestamp and rtptime pair */
347   GstClockTime ips_pts;
348   guint64 ips_rtptime;
349   GstClockTime packet_spacing;
350   gint equidistant;
351 
352   GQueue gap_packets;
353 
354   /* the next expected seqnum we receive */
355   GstClockTime last_in_pts;
356   guint32 next_in_seqnum;
357 
358   /* "normal" timers */
359   RtpTimerQueue *timers;
360   /* timers used for RTX statistics backlog */
361   RtpTimerQueue *rtx_stats_timers;
362 
363   /* start and stop ranges */
364   GstClockTime npt_start;
365   GstClockTime npt_stop;
366   guint64 ext_timestamp;
367   guint64 last_elapsed;
368   guint64 estimated_eos;
369   GstClockID eos_id;
370 
371   /* state */
372   gboolean eos;
373   guint last_percent;
374 
375   /* clock rate and rtp timestamp offset */
376   gint last_pt;
377   gint32 clock_rate;
378   gint64 clock_base;
379   gint64 ts_offset_remainder;
380 
381   /* when we are shutting down */
382   GstFlowReturn srcresult;
383   gboolean blocked;
384 
385   /* for sync */
386   GstSegment segment;
387   GstClockID clock_id;
388   GstClockTime timer_timeout;
389   guint16 timer_seqnum;
390   /* the latency of the upstream peer, we have to take this into account when
391    * synchronizing the buffers. */
392   GstClockTime peer_latency;
393   guint64 ext_rtptime;
394   GstBuffer *last_sr;
395 
396   /* some accounting */
397   guint64 num_pushed;
398   guint64 num_lost;
399   guint64 num_late;
400   guint64 num_duplicates;
401   guint64 num_rtx_requests;
402   guint64 num_rtx_success;
403   guint64 num_rtx_failed;
404   gdouble avg_rtx_num;
405   guint64 avg_rtx_rtt;
406   RTPPacketRateCtx packet_rate_ctx;
407 
408   /* for the jitter */
409   GstClockTime last_dts;
410   GstClockTime last_pts;
411   guint64 last_rtptime;
412   GstClockTime avg_jitter;
413 
414   /* for dropped packet messages */
415   GstClockTime last_drop_msg_timestamp;
416   /* accumulators; reset every time a drop message is posted */
417   guint num_too_late;
418   guint num_drop_on_latency;
419 };
420 typedef enum
421 {
422   REASON_TOO_LATE,
423   REASON_DROP_ON_LATENCY
424 } DropMessageReason;
425 
426 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
427 GST_STATIC_PAD_TEMPLATE ("sink",
428     GST_PAD_SINK,
429     GST_PAD_ALWAYS,
430     GST_STATIC_CAPS ("application/x-rtp"
431         /* "clock-rate = (int) [ 1, 2147483647 ], "
432          * "payload = (int) , "
433          * "encoding-name = (string) "
434          */ )
435     );
436 
437 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
438 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
439     GST_PAD_SINK,
440     GST_PAD_REQUEST,
441     GST_STATIC_CAPS ("application/x-rtcp")
442     );
443 
444 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
445 GST_STATIC_PAD_TEMPLATE ("src",
446     GST_PAD_SRC,
447     GST_PAD_ALWAYS,
448     GST_STATIC_CAPS ("application/x-rtp"
449         /* "payload = (int) , "
450          * "clock-rate = (int) , "
451          * "encoding-name = (string) "
452          */ )
453     );
454 
455 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
456 
457 #define gst_rtp_jitter_buffer_parent_class parent_class
458 G_DEFINE_TYPE_WITH_PRIVATE (GstRtpJitterBuffer, gst_rtp_jitter_buffer,
459     GST_TYPE_ELEMENT);
460 GST_ELEMENT_REGISTER_DEFINE (rtpjitterbuffer, "rtpjitterbuffer", GST_RANK_NONE,
461     GST_TYPE_RTP_JITTER_BUFFER);
462 
463 /* object overrides */
464 static void gst_rtp_jitter_buffer_set_property (GObject * object,
465     guint prop_id, const GValue * value, GParamSpec * pspec);
466 static void gst_rtp_jitter_buffer_get_property (GObject * object,
467     guint prop_id, GValue * value, GParamSpec * pspec);
468 static void gst_rtp_jitter_buffer_finalize (GObject * object);
469 
470 /* element overrides */
471 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
472     * element, GstStateChange transition);
473 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
474     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
475 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
476     GstPad * pad);
477 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
478 static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element,
479     GstClock * clock);
480 
481 /* pad overrides */
482 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
483 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
484     GstObject * parent);
485 
486 /* sinkpad overrides */
487 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
488     GstObject * parent, GstEvent * event);
489 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
490     GstObject * parent, GstBuffer * buffer);
491 static GstFlowReturn gst_rtp_jitter_buffer_chain_list (GstPad * pad,
492     GstObject * parent, GstBufferList * buffer_list);
493 
494 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
495     GstObject * parent, GstEvent * event);
496 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
497     GstObject * parent, GstBuffer * buffer);
498 
499 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
500     GstObject * parent, GstQuery * query);
501 
502 /* srcpad overrides */
503 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
504     GstObject * parent, GstEvent * event);
505 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
506     GstObject * parent, GstPadMode mode, gboolean active);
507 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
508 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
509     GstObject * parent, GstQuery * query);
510 
511 static void
512 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
513 static GstClockTime
514 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
515     gboolean active, guint64 base_time);
516 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
517 
518 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
519 
520 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
521 
522 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
523     jitterbuffer);
524 
525 static void update_rtx_stats (GstRtpJitterBuffer * jitterbuffer,
526     const RtpTimer * timer, GstClockTime dts, gboolean success);
527 
528 static GstClockTime get_current_running_time (GstRtpJitterBuffer *
529     jitterbuffer);
530 
531 static void
gst_rtp_jitter_buffer_class_init(GstRtpJitterBufferClass * klass)532 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
533 {
534   GObjectClass *gobject_class;
535   GstElementClass *gstelement_class;
536 
537   gobject_class = (GObjectClass *) klass;
538   gstelement_class = (GstElementClass *) klass;
539 
540   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
541 
542   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
543   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
544 
545   /**
546    * GstRtpJitterBuffer:latency:
547    *
548    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
549    * for at most this time.
550    */
551   g_object_class_install_property (gobject_class, PROP_LATENCY,
552       g_param_spec_uint ("latency", "Buffer latency in ms",
553           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
554           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
555   /**
556    * GstRtpJitterBuffer:drop-on-latency:
557    *
558    * Drop oldest buffers when the queue is completely filled.
559    */
560   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
561       g_param_spec_boolean ("drop-on-latency",
562           "Drop buffers when maximum latency is reached",
563           "Tells the jitterbuffer to never exceed the given latency in size",
564           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
565   /**
566    * GstRtpJitterBuffer:ts-offset:
567    *
568    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
569    * This is mainly used to ensure interstream synchronisation.
570    */
571   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
572       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
573           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
574           G_MAXINT64, DEFAULT_TS_OFFSET,
575           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
576 
577   /**
578    * GstRtpJitterBuffer:max-ts-offset-adjustment:
579    *
580    * The maximum number of nanoseconds per frame that time offset may be
581    * adjusted with. This is used to avoid sudden large changes to time stamps.
582    */
583   g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET_ADJUSTMENT,
584       g_param_spec_uint64 ("max-ts-offset-adjustment",
585           "Max Timestamp Offset Adjustment",
586           "The maximum number of nanoseconds per frame that time stamp "
587           "offsets may be adjusted (0 = no limit).", 0, G_MAXUINT64,
588           DEFAULT_MAX_TS_OFFSET_ADJUSTMENT, G_PARAM_READWRITE |
589           G_PARAM_STATIC_STRINGS));
590 
591   /**
592    * GstRtpJitterBuffer:do-lost:
593    *
594    * Send out a GstRTPPacketLost event downstream when a packet is considered
595    * lost.
596    */
597   g_object_class_install_property (gobject_class, PROP_DO_LOST,
598       g_param_spec_boolean ("do-lost", "Do Lost",
599           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
600           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
601 
602   /**
603    * GstRtpJitterBuffer:post-drop-messages:
604    *
605    * Post custom messages to the bus when a packet is dropped by the
606    * jitterbuffer due to arriving too late, being already considered lost,
607    * or being dropped due to the drop-on-latency property being enabled.
608    * Message is of type GST_MESSAGE_ELEMENT and contains a GstStructure named
609    * "drop-msg" with the following fields:
610    *
611    * * #guint   `seqnum`: Seqnum of dropped packet.
612    * * #guint64 `timestamp`: PTS timestamp of dropped packet.
613    * * #GString `reason`: Reason for dropping the packet.
614    * * #guint   `num-too-late`: Number of packets arriving too late since
615    *    last drop message.
616    * * #guint   `num-drop-on-latency`: Number of packets dropped due to the
617    *    drop-on-latency property since last drop message.
618    *
619    * Since: 1.18
620    */
621   g_object_class_install_property (gobject_class, PROP_POST_DROP_MESSAGES,
622       g_param_spec_boolean ("post-drop-messages", "Post drop messages",
623           "Post a custom message to the bus when a packet is dropped by the jitterbuffer",
624           DEFAULT_POST_DROP_MESSAGES,
625           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
626 
627   /**
628    * GstRtpJitterBuffer:drop-messages-interval:
629    *
630    * Minimal time in milliseconds between posting dropped packet messages, if enabled
631    * by setting property by setting #GstRtpJitterBuffer:post-drop-messages to %TRUE.
632    * If interval is set to 0, every dropped packet will result in a drop message being posted.
633    *
634    * Since: 1.18
635    */
636   g_object_class_install_property (gobject_class, PROP_DROP_MESSAGES_INTERVAL,
637       g_param_spec_uint ("drop-messages-interval",
638           "Drop message interval",
639           "Minimal time between posting dropped packet messages", 0,
640           G_MAXUINT, DEFAULT_DROP_MESSAGES_INTERVAL_MS,
641           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
642 
643   /**
644    * GstRtpJitterBuffer:mode:
645    *
646    * Control the buffering and timestamping mode used by the jitterbuffer.
647    */
648   g_object_class_install_property (gobject_class, PROP_MODE,
649       g_param_spec_enum ("mode", "Mode",
650           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
651           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
652   /**
653    * GstRtpJitterBuffer:percent:
654    *
655    * The percent of the jitterbuffer that is filled.
656    */
657   g_object_class_install_property (gobject_class, PROP_PERCENT,
658       g_param_spec_int ("percent", "percent",
659           "The buffer filled percent", 0, 100,
660           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
661   /**
662    * GstRtpJitterBuffer:do-retransmission:
663    *
664    * Send out a GstRTPRetransmission event upstream when a packet is considered
665    * late and should be retransmitted.
666    *
667    * Since: 1.2
668    */
669   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
670       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
671           "Send retransmission events upstream when a packet is late",
672           DEFAULT_DO_RETRANSMISSION,
673           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
674 
675   /**
676    * GstRtpJitterBuffer:rtx-next-seqnum
677    *
678    * Estimate when the next packet should arrive and schedule a retransmission
679    * request for it.
680    * This is, when packet N arrives, a GstRTPRetransmission event is schedule
681    * for packet N+1. So it will be requested if it does not arrive at the expected time.
682    * The expected time is calculated using the dts of N and the packet spacing.
683    *
684    * Since: 1.6
685    */
686   g_object_class_install_property (gobject_class, PROP_RTX_NEXT_SEQNUM,
687       g_param_spec_boolean ("rtx-next-seqnum", "RTX next seqnum",
688           "Estimate when the next packet should arrive and schedule a "
689           "retransmission request for it.",
690           DEFAULT_RTX_NEXT_SEQNUM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
691 
692   /**
693    * GstRtpJitterBuffer:rtx-delay:
694    *
695    * When a packet did not arrive at the expected time, wait this extra amount
696    * of time before sending a retransmission event.
697    *
698    * When -1 is used, the max jitter will be used as extra delay.
699    *
700    * Since: 1.2
701    */
702   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
703       g_param_spec_int ("rtx-delay", "RTX Delay",
704           "Extra time in ms to wait before sending retransmission "
705           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
706           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
707 
708   /**
709    * GstRtpJitterBuffer:rtx-min-delay:
710    *
711    * When a packet did not arrive at the expected time, wait at least this extra amount
712    * of time before sending a retransmission event.
713    *
714    * Since: 1.6
715    */
716   g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
717       g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
718           "Minimum time in ms to wait before sending retransmission "
719           "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
720           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
721   /**
722    * GstRtpJitterBuffer:rtx-delay-reorder:
723    *
724    * Assume that a retransmission event should be sent when we see
725    * this much packet reordering.
726    *
727    * When -1 is used, the value will be estimated based on observed packet
728    * reordering. When 0 is used packet reordering alone will not cause a
729    * retransmission event (Since 1.10).
730    *
731    * Since: 1.2
732    */
733   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
734       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
735           "Sending retransmission event when this much reordering "
736           "(0 disable)",
737           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
738           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
739   /**
740    * GstRtpJitterBuffer:rtx-retry-timeout:
741    *
742    * When no packet has been received after sending a retransmission event
743    * for this time, retry sending a retransmission event.
744    *
745    * When -1 is used, the value will be estimated based on observed round
746    * trip time.
747    *
748    * Since: 1.2
749    */
750   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
751       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
752           "Retry sending a transmission event after this timeout in "
753           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
754           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
755   /**
756    * GstRtpJitterBuffer:rtx-min-retry-timeout:
757    *
758    * The minimum amount of time between retry timeouts. When
759    * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
760    * minimum interval between retry timeouts.
761    *
762    * When -1 is used, the value will be estimated based on the
763    * packet spacing.
764    *
765    * Since: 1.6
766    */
767   g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
768       g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
769           "Minimum timeout between sending a transmission event in "
770           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
771           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
772   /**
773    * GstRtpJitterBuffer:rtx-retry-period:
774    *
775    * The amount of time to try to get a retransmission.
776    *
777    * When -1 is used, the value will be estimated based on the jitterbuffer
778    * latency and the observed round trip time.
779    *
780    * Since: 1.2
781    */
782   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
783       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
784           "Try to get a retransmission for this many ms "
785           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
786           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
787   /**
788    * GstRtpJitterBuffer:rtx-max-retries:
789    *
790    * The maximum number of retries to request a retransmission.
791    *
792    * This implies that as maximum (rtx-max-retries + 1) retransmissions will be requested.
793    * When -1 is used, the number of retransmission request will not be limited.
794    *
795    * Since: 1.6
796    */
797   g_object_class_install_property (gobject_class, PROP_RTX_MAX_RETRIES,
798       g_param_spec_int ("rtx-max-retries", "RTX Max Retries",
799           "The maximum number of retries to request a retransmission. "
800           "(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES,
801           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
802   /**
803    * GstRtpJitterBuffer:rtx-deadline:
804    *
805    * The deadline for a valid RTX request in ms.
806    *
807    * How long the RTX RTCP will be valid for.
808    * When -1 is used, the size of the jitterbuffer will be used.
809    *
810    * Since: 1.10
811    */
812   g_object_class_install_property (gobject_class, PROP_RTX_DEADLINE,
813       g_param_spec_int ("rtx-deadline", "RTX Deadline (ms)",
814           "The deadline for a valid RTX request in milliseconds. "
815           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DEADLINE,
816           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
817 /**
818    * GstRtpJitterBuffer:rtx-stats-timeout:
819    *
820    * The time to wait for a retransmitted packet after it has been
821    * considered lost in order to collect RTX statistics.
822    *
823    * Since: 1.10
824    */
825   g_object_class_install_property (gobject_class, PROP_RTX_STATS_TIMEOUT,
826       g_param_spec_uint ("rtx-stats-timeout", "RTX Statistics Timeout",
827           "The time to wait for a retransmitted packet after it has been "
828           "considered lost in order to collect statistics (ms)",
829           0, G_MAXUINT, DEFAULT_RTX_STATS_TIMEOUT,
830           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
831 
832   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
833       g_param_spec_uint ("max-dropout-time", "Max dropout time",
834           "The maximum time (milliseconds) of missing packets tolerated.",
835           0, G_MAXINT32, DEFAULT_MAX_DROPOUT_TIME,
836           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
837 
838   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
839       g_param_spec_uint ("max-misorder-time", "Max misorder time",
840           "The maximum time (milliseconds) of misordered packets tolerated.",
841           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
842           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
843   /**
844    * GstRtpJitterBuffer:stats:
845    *
846    * Various jitterbuffer statistics. This property returns a GstStructure
847    * with name application/x-rtp-jitterbuffer-stats with the following fields:
848    *
849    * * #guint64 `num-pushed`: the number of packets pushed out.
850    * * #guint64 `num-lost`: the number of packets considered lost.
851    * * #guint64 `num-late`: the number of packets arriving too late.
852    * * #guint64 `num-duplicates`: the number of duplicate packets.
853    * * #guint64 `avg-jitter`: the average jitter in nanoseconds.
854    * * #guint64 `rtx-count`: the number of retransmissions requested.
855    * * #guint64 `rtx-success-count`: the number of successful retransmissions.
856    * * #gdouble `rtx-per-packet`: average number of RTX per packet.
857    * * #guint64 `rtx-rtt`: average round trip time per RTX.
858    *
859    * Since: 1.4
860    */
861   g_object_class_install_property (gobject_class, PROP_STATS,
862       g_param_spec_boxed ("stats", "Statistics",
863           "Various statistics", GST_TYPE_STRUCTURE,
864           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
865 
866   /**
867    * GstRtpJitterBuffer:max-rtcp-rtp-time-diff
868    *
869    * The maximum amount of time in ms that the RTP time in the RTCP SRs
870    * is allowed to be ahead of the last RTP packet we received. Use
871    * -1 to disable ignoring of RTCP packets.
872    *
873    * Since: 1.8
874    */
875   g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
876       g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
877           "Maximum amount of time in ms that the RTP time in RTCP SRs "
878           "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
879           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
880           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
881 
882   g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
883       g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
884           "Synchronize received streams to the RFC7273 clock "
885           "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
886           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
887 
888   /**
889    * GstRtpJitterBuffer:faststart-min-packets
890    *
891    * The number of consecutive packets needed to start (set to 0 to
892    * disable faststart. The jitterbuffer will by default start after the
893    * latency has elapsed)
894    *
895    * Since: 1.14
896    */
897   g_object_class_install_property (gobject_class, PROP_FASTSTART_MIN_PACKETS,
898       g_param_spec_uint ("faststart-min-packets", "Faststart minimum packets",
899           "The number of consecutive packets needed to start (set to 0 to "
900           "disable faststart. The jitterbuffer will by default start after "
901           "the latency has elapsed)",
902           0, G_MAXUINT, DEFAULT_FASTSTART_MIN_PACKETS,
903           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
904 
905   /**
906    * GstRtpJitterBuffer::request-pt-map:
907    * @buffer: the object which received the signal
908    * @pt: the pt
909    *
910    * Request the payload type as #GstCaps for @pt.
911    */
912   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
913       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
914       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
915           request_pt_map), NULL, NULL, NULL, GST_TYPE_CAPS, 1, G_TYPE_UINT);
916   /**
917    * GstRtpJitterBuffer::handle-sync:
918    * @buffer: the object which received the signal
919    * @struct: a GstStructure containing sync values.
920    *
921    * Be notified of new sync values.
922    */
923   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
924       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
925       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
926           handle_sync), NULL, NULL, NULL,
927       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
928 
929   /**
930    * GstRtpJitterBuffer::on-npt-stop:
931    * @buffer: the object which received the signal
932    *
933    * Signal that the jitterbuffer has pushed the RTP packet that corresponds to
934    * the npt-stop position.
935    */
936   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
937       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
938       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
939           on_npt_stop), NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
940 
941   /**
942    * GstRtpJitterBuffer::clear-pt-map:
943    * @buffer: the object which received the signal
944    *
945    * Invalidate the clock-rate as obtained with the
946    * #GstRtpJitterBuffer::request-pt-map signal.
947    */
948   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
949       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
950       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
951       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
952       NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
953 
954   /**
955    * GstRtpJitterBuffer::set-active:
956    * @buffer: the object which received the signal
957    *
958    * Start pushing out packets with the given base time. This signal is only
959    * useful in buffering mode.
960    *
961    * Returns: the time of the last pushed packet.
962    */
963   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
964       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
965       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
966       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
967       NULL, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN, G_TYPE_UINT64);
968 
969   gstelement_class->change_state =
970       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
971   gstelement_class->request_new_pad =
972       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
973   gstelement_class->release_pad =
974       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
975   gstelement_class->provide_clock =
976       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
977   gstelement_class->set_clock =
978       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock);
979 
980   gst_element_class_add_static_pad_template (gstelement_class,
981       &gst_rtp_jitter_buffer_src_template);
982   gst_element_class_add_static_pad_template (gstelement_class,
983       &gst_rtp_jitter_buffer_sink_template);
984   gst_element_class_add_static_pad_template (gstelement_class,
985       &gst_rtp_jitter_buffer_sink_rtcp_template);
986 
987   gst_element_class_set_static_metadata (gstelement_class,
988       "RTP packet jitter-buffer", "Filter/Network/RTP",
989       "A buffer that deals with network jitter and other transmission faults",
990       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
991       "Wim Taymans <wim.taymans@gmail.com>");
992 
993   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
994   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
995 
996   GST_DEBUG_CATEGORY_INIT
997       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
998   GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_jitter_buffer_chain_rtcp);
999 
1000   gst_type_mark_as_plugin_api (RTP_TYPE_JITTER_BUFFER_MODE, 0);
1001 }
1002 
1003 static void
gst_rtp_jitter_buffer_init(GstRtpJitterBuffer * jitterbuffer)1004 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
1005 {
1006   GstRtpJitterBufferPrivate *priv;
1007 
1008   priv = gst_rtp_jitter_buffer_get_instance_private (jitterbuffer);
1009   jitterbuffer->priv = priv;
1010 
1011   priv->latency_ms = DEFAULT_LATENCY_MS;
1012   priv->latency_ns = priv->latency_ms * GST_MSECOND;
1013   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
1014   priv->ts_offset = DEFAULT_TS_OFFSET;
1015   priv->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT;
1016   priv->do_lost = DEFAULT_DO_LOST;
1017   priv->post_drop_messages = DEFAULT_POST_DROP_MESSAGES;
1018   priv->drop_messages_interval_ms = DEFAULT_DROP_MESSAGES_INTERVAL_MS;
1019   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
1020   priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM;
1021   priv->rtx_delay = DEFAULT_RTX_DELAY;
1022   priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
1023   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
1024   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
1025   priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
1026   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
1027   priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES;
1028   priv->rtx_deadline_ms = DEFAULT_RTX_DEADLINE;
1029   priv->rtx_stats_timeout = DEFAULT_RTX_STATS_TIMEOUT;
1030   priv->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
1031   priv->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
1032   priv->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
1033   priv->faststart_min_packets = DEFAULT_FASTSTART_MIN_PACKETS;
1034 
1035   priv->ts_offset_remainder = 0;
1036   priv->last_dts = -1;
1037   priv->last_pts = -1;
1038   priv->last_rtptime = -1;
1039   priv->avg_jitter = 0;
1040   priv->last_drop_msg_timestamp = GST_CLOCK_TIME_NONE;
1041   priv->num_too_late = 0;
1042   priv->num_drop_on_latency = 0;
1043   priv->segment_seqnum = GST_SEQNUM_INVALID;
1044   priv->timers = rtp_timer_queue_new ();
1045   priv->rtx_stats_timers = rtp_timer_queue_new ();
1046   priv->jbuf = rtp_jitter_buffer_new ();
1047   g_mutex_init (&priv->jbuf_lock);
1048   g_cond_init (&priv->jbuf_queue);
1049   g_cond_init (&priv->jbuf_timer);
1050   g_cond_init (&priv->jbuf_event);
1051   g_cond_init (&priv->jbuf_query);
1052   g_queue_init (&priv->gap_packets);
1053   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1054 
1055   /* reset skew detection initially */
1056   rtp_jitter_buffer_reset_skew (priv->jbuf);
1057   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
1058   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1059   priv->active = TRUE;
1060 
1061   priv->srcpad =
1062       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
1063       "src");
1064 
1065   gst_pad_set_activatemode_function (priv->srcpad,
1066       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
1067   gst_pad_set_query_function (priv->srcpad,
1068       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
1069   gst_pad_set_event_function (priv->srcpad,
1070       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
1071 
1072   priv->sinkpad =
1073       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
1074       "sink");
1075 
1076   gst_pad_set_chain_function (priv->sinkpad,
1077       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
1078   gst_pad_set_chain_list_function (priv->sinkpad,
1079       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain_list));
1080   gst_pad_set_event_function (priv->sinkpad,
1081       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
1082   gst_pad_set_query_function (priv->sinkpad,
1083       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
1084 
1085   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
1086   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
1087 
1088   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
1089 }
1090 
1091 static void
free_item_and_retain_sticky_events(RTPJitterBufferItem * item,gpointer user_data)1092 free_item_and_retain_sticky_events (RTPJitterBufferItem * item,
1093     gpointer user_data)
1094 {
1095   GList **l = user_data;
1096 
1097   if (item->data && item->type == ITEM_TYPE_EVENT
1098       && GST_EVENT_IS_STICKY (item->data)) {
1099     *l = g_list_prepend (*l, item->data);
1100     item->data = NULL;
1101   }
1102 
1103   rtp_jitter_buffer_free_item (item);
1104 }
1105 
1106 static void
gst_rtp_jitter_buffer_finalize(GObject * object)1107 gst_rtp_jitter_buffer_finalize (GObject * object)
1108 {
1109   GstRtpJitterBuffer *jitterbuffer;
1110   GstRtpJitterBufferPrivate *priv;
1111 
1112   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1113   priv = jitterbuffer->priv;
1114 
1115   g_object_unref (priv->timers);
1116   g_object_unref (priv->rtx_stats_timers);
1117   g_mutex_clear (&priv->jbuf_lock);
1118   g_cond_clear (&priv->jbuf_queue);
1119   g_cond_clear (&priv->jbuf_timer);
1120   g_cond_clear (&priv->jbuf_event);
1121   g_cond_clear (&priv->jbuf_query);
1122 
1123   rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL);
1124   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1125   g_queue_clear (&priv->gap_packets);
1126   g_object_unref (priv->jbuf);
1127 
1128   G_OBJECT_CLASS (parent_class)->finalize (object);
1129 }
1130 
1131 static GstIterator *
gst_rtp_jitter_buffer_iterate_internal_links(GstPad * pad,GstObject * parent)1132 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
1133 {
1134   GstRtpJitterBuffer *jitterbuffer;
1135   GstPad *otherpad = NULL;
1136   GstIterator *it = NULL;
1137   GValue val = { 0, };
1138 
1139   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1140 
1141   if (pad == jitterbuffer->priv->sinkpad) {
1142     otherpad = jitterbuffer->priv->srcpad;
1143   } else if (pad == jitterbuffer->priv->srcpad) {
1144     otherpad = jitterbuffer->priv->sinkpad;
1145   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
1146     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
1147   }
1148 
1149   if (it == NULL) {
1150     g_value_init (&val, GST_TYPE_PAD);
1151     g_value_set_object (&val, otherpad);
1152     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
1153     g_value_unset (&val);
1154   }
1155 
1156   return it;
1157 }
1158 
1159 static GstPad *
create_rtcp_sink(GstRtpJitterBuffer * jitterbuffer)1160 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1161 {
1162   GstRtpJitterBufferPrivate *priv;
1163 
1164   priv = jitterbuffer->priv;
1165 
1166   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
1167 
1168   priv->rtcpsinkpad =
1169       gst_pad_new_from_static_template
1170       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
1171   gst_pad_set_chain_function (priv->rtcpsinkpad,
1172       gst_rtp_jitter_buffer_chain_rtcp);
1173   gst_pad_set_event_function (priv->rtcpsinkpad,
1174       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
1175   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
1176       gst_rtp_jitter_buffer_iterate_internal_links);
1177   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
1178   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1179 
1180   return priv->rtcpsinkpad;
1181 }
1182 
1183 static void
remove_rtcp_sink(GstRtpJitterBuffer * jitterbuffer)1184 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1185 {
1186   GstRtpJitterBufferPrivate *priv;
1187 
1188   priv = jitterbuffer->priv;
1189 
1190   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
1191 
1192   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
1193 
1194   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1195   priv->rtcpsinkpad = NULL;
1196 }
1197 
1198 static GstPad *
gst_rtp_jitter_buffer_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name,const GstCaps * filter)1199 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
1200     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
1201 {
1202   GstRtpJitterBuffer *jitterbuffer;
1203   GstElementClass *klass;
1204   GstPad *result;
1205   GstRtpJitterBufferPrivate *priv;
1206 
1207   g_return_val_if_fail (templ != NULL, NULL);
1208   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
1209 
1210   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1211   priv = jitterbuffer->priv;
1212   klass = GST_ELEMENT_GET_CLASS (element);
1213 
1214   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
1215 
1216   /* figure out the template */
1217   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
1218     if (priv->rtcpsinkpad != NULL)
1219       goto exists;
1220 
1221     result = create_rtcp_sink (jitterbuffer);
1222   } else
1223     goto wrong_template;
1224 
1225   return result;
1226 
1227   /* ERRORS */
1228 wrong_template:
1229   {
1230     g_warning ("rtpjitterbuffer: this is not our template");
1231     return NULL;
1232   }
1233 exists:
1234   {
1235     g_warning ("rtpjitterbuffer: pad already requested");
1236     return NULL;
1237   }
1238 }
1239 
1240 static void
gst_rtp_jitter_buffer_release_pad(GstElement * element,GstPad * pad)1241 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
1242 {
1243   GstRtpJitterBuffer *jitterbuffer;
1244   GstRtpJitterBufferPrivate *priv;
1245 
1246   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
1247   g_return_if_fail (GST_IS_PAD (pad));
1248 
1249   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1250   priv = jitterbuffer->priv;
1251 
1252   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1253 
1254   if (priv->rtcpsinkpad == pad) {
1255     remove_rtcp_sink (jitterbuffer);
1256   } else
1257     goto wrong_pad;
1258 
1259   return;
1260 
1261   /* ERRORS */
1262 wrong_pad:
1263   {
1264     g_warning ("gstjitterbuffer: asked to release an unknown pad");
1265     return;
1266   }
1267 }
1268 
1269 static GstClock *
gst_rtp_jitter_buffer_provide_clock(GstElement * element)1270 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1271 {
1272   return gst_system_clock_obtain ();
1273 }
1274 
1275 static gboolean
gst_rtp_jitter_buffer_set_clock(GstElement * element,GstClock * clock)1276 gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock)
1277 {
1278   GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1279 
1280   rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock);
1281 
1282   return GST_ELEMENT_CLASS (parent_class)->set_clock (element, clock);
1283 }
1284 
1285 static void
gst_rtp_jitter_buffer_clear_pt_map(GstRtpJitterBuffer * jitterbuffer)1286 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1287 {
1288   GstRtpJitterBufferPrivate *priv;
1289 
1290   priv = jitterbuffer->priv;
1291 
1292   /* this will trigger a new pt-map request signal, FIXME, do something better. */
1293 
1294   JBUF_LOCK (priv);
1295   priv->clock_rate = -1;
1296   /* do not clear current content, but refresh state for new arrival */
1297   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1298   rtp_jitter_buffer_reset_skew (priv->jbuf);
1299   JBUF_UNLOCK (priv);
1300 }
1301 
1302 static GstClockTime
gst_rtp_jitter_buffer_set_active(GstRtpJitterBuffer * jbuf,gboolean active,guint64 offset)1303 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1304     guint64 offset)
1305 {
1306   GstRtpJitterBufferPrivate *priv;
1307   GstClockTime last_out;
1308   RTPJitterBufferItem *item;
1309 
1310   priv = jbuf->priv;
1311 
1312   JBUF_LOCK (priv);
1313   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1314       active, GST_TIME_ARGS (offset));
1315 
1316   if (active != priv->active) {
1317     /* add the amount of time spent in paused to the output offset. All
1318      * outgoing buffers will have this offset applied to their timestamps in
1319      * order to make them arrive in time in the sink. */
1320     priv->out_offset = offset;
1321     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1322         GST_TIME_ARGS (priv->out_offset));
1323     priv->active = active;
1324     JBUF_SIGNAL_EVENT (priv);
1325   }
1326   if (!active) {
1327     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1328   }
1329   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1330     /* head buffer timestamp and offset gives our output time */
1331     last_out = item->pts + priv->ts_offset;
1332   } else {
1333     /* use last known time when the buffer is empty */
1334     last_out = priv->last_out_time;
1335   }
1336   JBUF_UNLOCK (priv);
1337 
1338   return last_out;
1339 }
1340 
1341 static GstCaps *
gst_rtp_jitter_buffer_getcaps(GstPad * pad,GstCaps * filter)1342 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1343 {
1344   GstRtpJitterBuffer *jitterbuffer;
1345   GstRtpJitterBufferPrivate *priv;
1346   GstPad *other;
1347   GstCaps *caps;
1348   GstCaps *templ;
1349 
1350   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1351   priv = jitterbuffer->priv;
1352 
1353   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1354 
1355   caps = gst_pad_peer_query_caps (other, filter);
1356 
1357   templ = gst_pad_get_pad_template_caps (pad);
1358   if (caps == NULL) {
1359     GST_DEBUG_OBJECT (jitterbuffer, "use template");
1360     caps = templ;
1361   } else {
1362     GstCaps *intersect;
1363 
1364     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1365 
1366     intersect = gst_caps_intersect (caps, templ);
1367     gst_caps_unref (caps);
1368     gst_caps_unref (templ);
1369 
1370     caps = intersect;
1371   }
1372   gst_object_unref (jitterbuffer);
1373 
1374   return caps;
1375 }
1376 
1377 /*
1378  * Must be called with JBUF_LOCK held
1379  */
1380 
1381 static gboolean
gst_jitter_buffer_sink_parse_caps(GstRtpJitterBuffer * jitterbuffer,GstCaps * caps,gint pt)1382 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1383     GstCaps * caps, gint pt)
1384 {
1385   GstRtpJitterBufferPrivate *priv;
1386   GstStructure *caps_struct;
1387   guint val;
1388   gint payload = -1;
1389   GstClockTime tval;
1390   const gchar *ts_refclk, *mediaclk;
1391 
1392   priv = jitterbuffer->priv;
1393 
1394   /* first parse the caps */
1395   caps_struct = gst_caps_get_structure (caps, 0);
1396 
1397   GST_DEBUG_OBJECT (jitterbuffer, "got caps %" GST_PTR_FORMAT, caps);
1398 
1399   if (gst_structure_get_int (caps_struct, "payload", &payload) && pt != -1
1400       && payload != pt) {
1401     GST_ERROR_OBJECT (jitterbuffer,
1402         "Got caps with wrong payload type (got %d, expected %d)", pt, payload);
1403     return FALSE;
1404   }
1405 
1406   if (payload != -1) {
1407     GST_DEBUG_OBJECT (jitterbuffer, "Got payload type %d", payload);
1408     priv->last_pt = payload;
1409   }
1410 
1411   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1412    * measure the amount of data in the buffer */
1413   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1414     goto error;
1415 
1416   if (priv->clock_rate <= 0)
1417     goto wrong_rate;
1418 
1419   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1420 
1421   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1422 
1423   gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
1424 
1425   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1426    * can use this to track the amount of time elapsed on the sender. */
1427   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1428     priv->clock_base = val;
1429   else
1430     priv->clock_base = -1;
1431 
1432   priv->ext_timestamp = priv->clock_base;
1433 
1434   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1435       priv->clock_base);
1436 
1437   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1438     /* first expected seqnum, only update when we didn't have a previous base. */
1439     if (priv->next_in_seqnum == -1)
1440       priv->next_in_seqnum = val;
1441     if (priv->next_seqnum == -1) {
1442       priv->next_seqnum = val;
1443       JBUF_SIGNAL_EVENT (priv);
1444     }
1445     priv->seqnum_base = val;
1446   } else {
1447     priv->seqnum_base = -1;
1448   }
1449 
1450   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1451 
1452   /* the start and stop times. The seqnum-base corresponds to the start time. We
1453    * will keep track of the seqnums on the output and when we reach the one
1454    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1455   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1456     priv->npt_start = tval;
1457   else
1458     priv->npt_start = 0;
1459 
1460   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1461     priv->npt_stop = tval;
1462   else
1463     priv->npt_stop = -1;
1464 
1465   GST_DEBUG_OBJECT (jitterbuffer,
1466       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1467       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1468 
1469   if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) {
1470     GstClock *clock = NULL;
1471     guint64 clock_offset = -1;
1472 
1473     GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s",
1474         ts_refclk);
1475 
1476     if (g_str_has_prefix (ts_refclk, "ntp=")) {
1477       if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) {
1478         GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks");
1479       } else {
1480         const gchar *host, *portstr;
1481         gchar *hostname;
1482         guint port;
1483 
1484         host = ts_refclk + sizeof ("ntp=") - 1;
1485         if (host[0] == '[') {
1486           /* IPv6 */
1487           portstr = strchr (host, ']');
1488           if (portstr && portstr[1] == ':')
1489             portstr = portstr + 1;
1490           else
1491             portstr = NULL;
1492         } else {
1493           portstr = strrchr (host, ':');
1494         }
1495 
1496 
1497         if (!portstr || sscanf (portstr, ":%u", &port) != 1)
1498           port = 123;
1499 
1500         if (portstr)
1501           hostname = g_strndup (host, (portstr - host));
1502         else
1503           hostname = g_strdup (host);
1504 
1505         clock = gst_ntp_clock_new (NULL, hostname, port, 0);
1506         g_free (hostname);
1507       }
1508     } else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) {
1509       const gchar *domainstr =
1510           ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1;
1511       guint domain;
1512 
1513       if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1)
1514         domain = 0;
1515 
1516       clock = gst_ptp_clock_new (NULL, domain);
1517     } else {
1518       GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock");
1519     }
1520 
1521     if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) {
1522       GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk);
1523 
1524       if (!g_str_has_prefix (mediaclk, "direct=") ||
1525           !g_ascii_string_to_unsigned (&mediaclk[7], 10, 0, G_MAXUINT64,
1526               &clock_offset, NULL))
1527         GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock");
1528       if (strstr (mediaclk, "rate=") != NULL) {
1529         GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported");
1530         clock_offset = -1;
1531       }
1532     }
1533 
1534     rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset);
1535   } else {
1536     rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1);
1537   }
1538 
1539   return TRUE;
1540 
1541   /* ERRORS */
1542 error:
1543   {
1544     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1545     return FALSE;
1546   }
1547 wrong_rate:
1548   {
1549     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1550     return FALSE;
1551   }
1552 }
1553 
1554 static void
gst_rtp_jitter_buffer_flush_start(GstRtpJitterBuffer * jitterbuffer)1555 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1556 {
1557   GstRtpJitterBufferPrivate *priv;
1558 
1559   priv = jitterbuffer->priv;
1560 
1561   JBUF_LOCK (priv);
1562   /* mark ourselves as flushing */
1563   priv->srcresult = GST_FLOW_FLUSHING;
1564   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1565   /* this unblocks any waiting pops on the src pad task */
1566   JBUF_SIGNAL_EVENT (priv);
1567   JBUF_SIGNAL_QUERY (priv, FALSE);
1568   JBUF_SIGNAL_QUEUE (priv);
1569   JBUF_UNLOCK (priv);
1570 }
1571 
1572 static void
gst_rtp_jitter_buffer_flush_stop(GstRtpJitterBuffer * jitterbuffer)1573 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1574 {
1575   GstRtpJitterBufferPrivate *priv;
1576 
1577   priv = jitterbuffer->priv;
1578 
1579   JBUF_LOCK (priv);
1580   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1581   /* Mark as non flushing */
1582   priv->srcresult = GST_FLOW_OK;
1583   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1584   priv->last_popped_seqnum = -1;
1585   priv->last_out_time = GST_CLOCK_TIME_NONE;
1586   priv->next_seqnum = -1;
1587   priv->seqnum_base = -1;
1588   priv->ips_rtptime = -1;
1589   priv->ips_pts = GST_CLOCK_TIME_NONE;
1590   priv->packet_spacing = 0;
1591   priv->next_in_seqnum = -1;
1592   priv->clock_rate = -1;
1593   priv->last_pt = -1;
1594   priv->eos = FALSE;
1595   priv->estimated_eos = -1;
1596   priv->last_elapsed = 0;
1597   priv->ext_timestamp = -1;
1598   priv->avg_jitter = 0;
1599   priv->last_dts = -1;
1600   priv->last_rtptime = -1;
1601   priv->last_in_pts = 0;
1602   priv->equidistant = 0;
1603   priv->segment_seqnum = GST_SEQNUM_INVALID;
1604   priv->last_drop_msg_timestamp = GST_CLOCK_TIME_NONE;
1605   priv->num_too_late = 0;
1606   priv->num_drop_on_latency = 0;
1607   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1608   rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL);
1609   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1610   rtp_jitter_buffer_reset_skew (priv->jbuf);
1611   rtp_timer_queue_remove_all (priv->timers);
1612   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1613   g_queue_clear (&priv->gap_packets);
1614   JBUF_UNLOCK (priv);
1615 }
1616 
1617 static gboolean
gst_rtp_jitter_buffer_src_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)1618 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1619     GstPadMode mode, gboolean active)
1620 {
1621   gboolean result;
1622   GstRtpJitterBuffer *jitterbuffer = NULL;
1623 
1624   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1625 
1626   switch (mode) {
1627     case GST_PAD_MODE_PUSH:
1628       if (active) {
1629         /* allow data processing */
1630         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1631 
1632         /* start pushing out buffers */
1633         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1634         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1635             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1636       } else {
1637         /* make sure all data processing stops ASAP */
1638         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1639 
1640         /* NOTE this will hardlock if the state change is called from the src pad
1641          * task thread because we will _join() the thread. */
1642         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1643         result = gst_pad_stop_task (pad);
1644       }
1645       break;
1646     default:
1647       result = FALSE;
1648       break;
1649   }
1650   return result;
1651 }
1652 
1653 static GstStateChangeReturn
gst_rtp_jitter_buffer_change_state(GstElement * element,GstStateChange transition)1654 gst_rtp_jitter_buffer_change_state (GstElement * element,
1655     GstStateChange transition)
1656 {
1657   GstRtpJitterBuffer *jitterbuffer;
1658   GstRtpJitterBufferPrivate *priv;
1659   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1660 
1661   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1662   priv = jitterbuffer->priv;
1663 
1664   switch (transition) {
1665     case GST_STATE_CHANGE_NULL_TO_READY:
1666       break;
1667     case GST_STATE_CHANGE_READY_TO_PAUSED:
1668       JBUF_LOCK (priv);
1669       /* reset negotiated values */
1670       priv->clock_rate = -1;
1671       priv->clock_base = -1;
1672       priv->peer_latency = 0;
1673       priv->last_pt = -1;
1674       /* block until we go to PLAYING */
1675       priv->blocked = TRUE;
1676       priv->timer_running = TRUE;
1677       priv->srcresult = GST_FLOW_OK;
1678       priv->timer_thread =
1679           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1680       JBUF_UNLOCK (priv);
1681       break;
1682     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1683       JBUF_LOCK (priv);
1684       /* unblock to allow streaming in PLAYING */
1685       priv->blocked = FALSE;
1686       JBUF_SIGNAL_EVENT (priv);
1687       JBUF_SIGNAL_TIMER (priv);
1688       JBUF_UNLOCK (priv);
1689       break;
1690     default:
1691       break;
1692   }
1693 
1694   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1695 
1696   switch (transition) {
1697     case GST_STATE_CHANGE_READY_TO_PAUSED:
1698       /* we are a live element because we sync to the clock, which we can only
1699        * do in the PLAYING state */
1700       if (ret != GST_STATE_CHANGE_FAILURE)
1701         ret = GST_STATE_CHANGE_NO_PREROLL;
1702       break;
1703     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1704       JBUF_LOCK (priv);
1705       /* block to stop streaming when PAUSED */
1706       priv->blocked = TRUE;
1707       unschedule_current_timer (jitterbuffer);
1708       JBUF_UNLOCK (priv);
1709       if (ret != GST_STATE_CHANGE_FAILURE)
1710         ret = GST_STATE_CHANGE_NO_PREROLL;
1711       break;
1712     case GST_STATE_CHANGE_PAUSED_TO_READY:
1713       JBUF_LOCK (priv);
1714       gst_buffer_replace (&priv->last_sr, NULL);
1715       priv->timer_running = FALSE;
1716       priv->srcresult = GST_FLOW_FLUSHING;
1717       unschedule_current_timer (jitterbuffer);
1718       JBUF_SIGNAL_TIMER (priv);
1719       JBUF_SIGNAL_QUERY (priv, FALSE);
1720       JBUF_SIGNAL_QUEUE (priv);
1721       JBUF_UNLOCK (priv);
1722       g_thread_join (priv->timer_thread);
1723       priv->timer_thread = NULL;
1724       break;
1725     case GST_STATE_CHANGE_READY_TO_NULL:
1726       break;
1727     default:
1728       break;
1729   }
1730 
1731   return ret;
1732 }
1733 
1734 static gboolean
gst_rtp_jitter_buffer_src_event(GstPad * pad,GstObject * parent,GstEvent * event)1735 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1736     GstEvent * event)
1737 {
1738   gboolean ret = TRUE;
1739   GstRtpJitterBuffer *jitterbuffer;
1740   GstRtpJitterBufferPrivate *priv;
1741 
1742   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1743   priv = jitterbuffer->priv;
1744 
1745   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1746 
1747   switch (GST_EVENT_TYPE (event)) {
1748     case GST_EVENT_LATENCY:
1749     {
1750       GstClockTime latency;
1751 
1752       gst_event_parse_latency (event, &latency);
1753 
1754       GST_DEBUG_OBJECT (jitterbuffer,
1755           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1756 
1757       JBUF_LOCK (priv);
1758       /* adjust the overall buffer delay to the total pipeline latency in
1759        * buffering mode because if downstream consumes too fast (because of
1760        * large latency or queues, we would start rebuffering again. */
1761       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1762           RTP_JITTER_BUFFER_MODE_BUFFER) {
1763         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1764       }
1765       JBUF_UNLOCK (priv);
1766 
1767       ret = gst_pad_push_event (priv->sinkpad, event);
1768       break;
1769     }
1770     default:
1771       ret = gst_pad_push_event (priv->sinkpad, event);
1772       break;
1773   }
1774 
1775   return ret;
1776 }
1777 
1778 /* handles and stores the event in the jitterbuffer, must be called with
1779  * LOCK */
1780 static gboolean
queue_event(GstRtpJitterBuffer * jitterbuffer,GstEvent * event)1781 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1782 {
1783   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1784   gboolean head;
1785 
1786   switch (GST_EVENT_TYPE (event)) {
1787     case GST_EVENT_CAPS:
1788     {
1789       GstCaps *caps;
1790 
1791       gst_event_parse_caps (event, &caps);
1792       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, -1);
1793       break;
1794     }
1795     case GST_EVENT_SEGMENT:
1796     {
1797       GstSegment segment;
1798       gst_event_copy_segment (event, &segment);
1799 
1800       priv->segment_seqnum = gst_event_get_seqnum (event);
1801 
1802       /* we need time for now */
1803       if (segment.format != GST_FORMAT_TIME) {
1804         GST_DEBUG_OBJECT (jitterbuffer, "ignoring non-TIME newsegment");
1805         gst_event_unref (event);
1806 
1807         gst_segment_init (&segment, GST_FORMAT_TIME);
1808         event = gst_event_new_segment (&segment);
1809         gst_event_set_seqnum (event, priv->segment_seqnum);
1810       }
1811 
1812       priv->segment = segment;
1813       break;
1814     }
1815     case GST_EVENT_EOS:
1816       priv->eos = TRUE;
1817       rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1818       break;
1819     default:
1820       break;
1821   }
1822 
1823   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1824   head = rtp_jitter_buffer_append_event (priv->jbuf, event);
1825   if (head || priv->eos)
1826     JBUF_SIGNAL_EVENT (priv);
1827 
1828   return TRUE;
1829 }
1830 
1831 static gboolean
gst_rtp_jitter_buffer_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)1832 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1833     GstEvent * event)
1834 {
1835   gboolean ret = TRUE;
1836   GstRtpJitterBuffer *jitterbuffer;
1837   GstRtpJitterBufferPrivate *priv;
1838 
1839   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1840   priv = jitterbuffer->priv;
1841 
1842   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1843 
1844   switch (GST_EVENT_TYPE (event)) {
1845     case GST_EVENT_FLUSH_START:
1846       ret = gst_pad_push_event (priv->srcpad, event);
1847       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1848       /* wait for the loop to go into PAUSED */
1849       gst_pad_pause_task (priv->srcpad);
1850       break;
1851     case GST_EVENT_FLUSH_STOP:
1852       ret = gst_pad_push_event (priv->srcpad, event);
1853       ret =
1854           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1855           GST_PAD_MODE_PUSH, TRUE);
1856       break;
1857     default:
1858       if (GST_EVENT_IS_SERIALIZED (event)) {
1859         /* serialized events go in the queue */
1860         JBUF_LOCK (priv);
1861         if (priv->srcresult != GST_FLOW_OK) {
1862           /* Errors in sticky event pushing are no problem and ignored here
1863            * as they will cause more meaningful errors during data flow.
1864            * For EOS events, that are not followed by data flow, we still
1865            * return FALSE here though.
1866            */
1867           if (!GST_EVENT_IS_STICKY (event) ||
1868               GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1869             goto out_flow_error;
1870         }
1871         /* refuse more events on EOS */
1872         if (priv->eos)
1873           goto out_eos;
1874         ret = queue_event (jitterbuffer, event);
1875         JBUF_UNLOCK (priv);
1876       } else {
1877         /* non-serialized events are forwarded downstream immediately */
1878         ret = gst_pad_push_event (priv->srcpad, event);
1879       }
1880       break;
1881   }
1882   return ret;
1883 
1884   /* ERRORS */
1885 out_flow_error:
1886   {
1887     GST_DEBUG_OBJECT (jitterbuffer,
1888         "refusing event, we have a downstream flow error: %s",
1889         gst_flow_get_name (priv->srcresult));
1890     JBUF_UNLOCK (priv);
1891     gst_event_unref (event);
1892     return FALSE;
1893   }
1894 out_eos:
1895   {
1896     GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1897     JBUF_UNLOCK (priv);
1898     gst_event_unref (event);
1899     return FALSE;
1900   }
1901 }
1902 
1903 static gboolean
gst_rtp_jitter_buffer_sink_rtcp_event(GstPad * pad,GstObject * parent,GstEvent * event)1904 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1905     GstEvent * event)
1906 {
1907   gboolean ret = TRUE;
1908   GstRtpJitterBuffer *jitterbuffer;
1909 
1910   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1911 
1912   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1913 
1914   switch (GST_EVENT_TYPE (event)) {
1915     case GST_EVENT_FLUSH_START:
1916       gst_event_unref (event);
1917       break;
1918     case GST_EVENT_FLUSH_STOP:
1919       gst_event_unref (event);
1920       break;
1921     default:
1922       ret = gst_pad_event_default (pad, parent, event);
1923       break;
1924   }
1925 
1926   return ret;
1927 }
1928 
1929 /*
1930  * Must be called with JBUF_LOCK held, will release the LOCK when emitting the
1931  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1932  * GST_FLOW_FLUSHING when the element is shutting down. On success
1933  * GST_FLOW_OK is returned.
1934  */
1935 static GstFlowReturn
gst_rtp_jitter_buffer_get_clock_rate(GstRtpJitterBuffer * jitterbuffer,guint8 pt)1936 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1937     guint8 pt)
1938 {
1939   GValue ret = { 0 };
1940   GValue args[2] = { {0}, {0} };
1941   GstCaps *caps;
1942   gboolean res;
1943 
1944   g_value_init (&args[0], GST_TYPE_ELEMENT);
1945   g_value_set_object (&args[0], jitterbuffer);
1946   g_value_init (&args[1], G_TYPE_UINT);
1947   g_value_set_uint (&args[1], pt);
1948 
1949   g_value_init (&ret, GST_TYPE_CAPS);
1950   g_value_set_boxed (&ret, NULL);
1951 
1952   JBUF_UNLOCK (jitterbuffer->priv);
1953   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1954       &ret);
1955   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1956 
1957   g_value_unset (&args[0]);
1958   g_value_unset (&args[1]);
1959   caps = (GstCaps *) g_value_dup_boxed (&ret);
1960   g_value_unset (&ret);
1961   if (!caps)
1962     goto no_caps;
1963 
1964   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
1965   gst_caps_unref (caps);
1966 
1967   if (G_UNLIKELY (!res))
1968     goto parse_failed;
1969 
1970   return GST_FLOW_OK;
1971 
1972   /* ERRORS */
1973 no_caps:
1974   {
1975     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1976     return GST_FLOW_ERROR;
1977   }
1978 out_flushing:
1979   {
1980     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1981     return GST_FLOW_FLUSHING;
1982   }
1983 parse_failed:
1984   {
1985     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1986     return GST_FLOW_ERROR;
1987   }
1988 }
1989 
1990 /* call with jbuf lock held */
1991 static GstMessage *
check_buffering_percent(GstRtpJitterBuffer * jitterbuffer,gint percent)1992 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1993 {
1994   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1995   GstMessage *message = NULL;
1996 
1997   if (percent == -1)
1998     return NULL;
1999 
2000   /* Post a buffering message */
2001   if (priv->last_percent != percent) {
2002     priv->last_percent = percent;
2003     message =
2004         gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
2005     gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
2006   }
2007 
2008   return message;
2009 }
2010 
2011 /* call with jbuf lock held */
2012 static GstMessage *
new_drop_message(GstRtpJitterBuffer * jitterbuffer,guint seqnum,GstClockTime timestamp,DropMessageReason reason)2013 new_drop_message (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
2014     GstClockTime timestamp, DropMessageReason reason)
2015 {
2016 
2017   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2018   GstMessage *drop_msg = NULL;
2019   GstStructure *s;
2020   GstClockTime current_time;
2021   GstClockTime time_diff;
2022   const gchar *reason_str;
2023 
2024   current_time = get_current_running_time (jitterbuffer);
2025   time_diff = current_time - priv->last_drop_msg_timestamp;
2026 
2027   if (reason == REASON_TOO_LATE) {
2028     priv->num_too_late++;
2029     reason_str = "too-late";
2030   } else if (reason == REASON_DROP_ON_LATENCY) {
2031     priv->num_drop_on_latency++;
2032     reason_str = "drop-on-latency";
2033   } else {
2034     GST_WARNING_OBJECT (jitterbuffer, "Invalid reason for drop message");
2035     return drop_msg;
2036   }
2037 
2038   /* Only create new drop_msg if time since last drop_msg is larger that
2039    * that the set interval, or if it is the first drop message posted */
2040   if ((time_diff >= priv->drop_messages_interval_ms * GST_MSECOND) ||
2041       (priv->last_drop_msg_timestamp == GST_CLOCK_TIME_NONE)) {
2042 
2043     s = gst_structure_new ("drop-msg",
2044         "seqnum", G_TYPE_UINT, seqnum,
2045         "timestamp", GST_TYPE_CLOCK_TIME, timestamp,
2046         "reason", G_TYPE_STRING, reason_str,
2047         "num-too-late", G_TYPE_UINT, priv->num_too_late,
2048         "num-drop-on-latency", G_TYPE_UINT, priv->num_drop_on_latency, NULL);
2049 
2050     priv->last_drop_msg_timestamp = current_time;
2051     priv->num_too_late = 0;
2052     priv->num_drop_on_latency = 0;
2053     drop_msg = gst_message_new_element (GST_OBJECT (jitterbuffer), s);
2054   }
2055   return drop_msg;
2056 }
2057 
2058 
2059 static inline GstClockTimeDiff
timeout_offset(GstRtpJitterBuffer * jitterbuffer)2060 timeout_offset (GstRtpJitterBuffer * jitterbuffer)
2061 {
2062   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2063   return priv->ts_offset + priv->out_offset + priv->latency_ns;
2064 }
2065 
2066 static inline GstClockTime
get_pts_timeout(const RtpTimer * timer)2067 get_pts_timeout (const RtpTimer * timer)
2068 {
2069   if (timer->timeout == -1)
2070     return -1;
2071 
2072   return timer->timeout - timer->offset;
2073 }
2074 
2075 static inline gboolean
safe_add(guint64 * res,guint64 val,gint64 offset)2076 safe_add (guint64 * res, guint64 val, gint64 offset)
2077 {
2078   if (val <= G_MAXINT64) {
2079     gint64 tmp = (gint64) val + offset;
2080     if (tmp >= 0) {
2081       *res = tmp;
2082       return TRUE;
2083     }
2084     return FALSE;
2085   }
2086   /* From here, val > G_MAXINT64 */
2087 
2088   /* Negative value */
2089   if (offset < 0 && val < -offset)
2090     return FALSE;
2091 
2092   *res = val + offset;
2093   return TRUE;
2094 }
2095 
2096 static void
update_timer_offsets(GstRtpJitterBuffer * jitterbuffer)2097 update_timer_offsets (GstRtpJitterBuffer * jitterbuffer)
2098 {
2099   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2100   RtpTimer *test = rtp_timer_queue_peek_earliest (priv->timers);
2101   GstClockTimeDiff new_offset = timeout_offset (jitterbuffer);
2102 
2103   while (test) {
2104     if (test->type != RTP_TIMER_EXPECTED) {
2105       GstClockTime pts = get_pts_timeout (test);
2106       if (safe_add (&test->timeout, pts, new_offset)) {
2107         test->offset = new_offset;
2108       } else {
2109         GST_DEBUG_OBJECT (jitterbuffer,
2110             "Invalidating timeout (pts lower than new offset)");
2111         test->timeout = GST_CLOCK_TIME_NONE;
2112         test->offset = 0;
2113       }
2114       /* as we apply the offset on all timers, the order of timers won't
2115        * change and we can skip updating the timer queue */
2116     }
2117 
2118     test = rtp_timer_get_next (test);
2119   }
2120 }
2121 
2122 static void
update_offset(GstRtpJitterBuffer * jitterbuffer)2123 update_offset (GstRtpJitterBuffer * jitterbuffer)
2124 {
2125   GstRtpJitterBufferPrivate *priv;
2126 
2127   priv = jitterbuffer->priv;
2128 
2129   if (priv->ts_offset_remainder != 0) {
2130     GST_DEBUG ("adjustment %" G_GUINT64_FORMAT " remain %" G_GINT64_FORMAT
2131         " off %" G_GINT64_FORMAT, priv->max_ts_offset_adjustment,
2132         priv->ts_offset_remainder, priv->ts_offset);
2133     if (ABS (priv->ts_offset_remainder) > priv->max_ts_offset_adjustment) {
2134       if (priv->ts_offset_remainder > 0) {
2135         priv->ts_offset += priv->max_ts_offset_adjustment;
2136         priv->ts_offset_remainder -= priv->max_ts_offset_adjustment;
2137       } else {
2138         priv->ts_offset -= priv->max_ts_offset_adjustment;
2139         priv->ts_offset_remainder += priv->max_ts_offset_adjustment;
2140       }
2141     } else {
2142       priv->ts_offset += priv->ts_offset_remainder;
2143       priv->ts_offset_remainder = 0;
2144     }
2145 
2146     update_timer_offsets (jitterbuffer);
2147   }
2148 }
2149 
2150 static GstClockTime
apply_offset(GstRtpJitterBuffer * jitterbuffer,GstClockTime timestamp)2151 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
2152 {
2153   GstRtpJitterBufferPrivate *priv;
2154 
2155   priv = jitterbuffer->priv;
2156 
2157   if (timestamp == -1)
2158     return -1;
2159 
2160   /* apply the timestamp offset, this is used for inter stream sync */
2161   if (!safe_add (&timestamp, timestamp, priv->ts_offset))
2162     timestamp = 0;
2163   /* add the offset, this is used when buffering */
2164   timestamp += priv->out_offset;
2165 
2166   return timestamp;
2167 }
2168 
2169 static void
unschedule_current_timer(GstRtpJitterBuffer * jitterbuffer)2170 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
2171 {
2172   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2173 
2174   if (priv->clock_id) {
2175     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
2176     gst_clock_id_unschedule (priv->clock_id);
2177     priv->clock_id = NULL;
2178   }
2179 }
2180 
2181 static void
update_current_timer(GstRtpJitterBuffer * jitterbuffer)2182 update_current_timer (GstRtpJitterBuffer * jitterbuffer)
2183 {
2184   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2185   RtpTimer *timer;
2186 
2187   timer = rtp_timer_queue_peek_earliest (priv->timers);
2188 
2189   /* we never need to wakeup the timer thread when there is no more timers, if
2190    * it was waiting on a clock id, it will simply do later and then wait on
2191    * the conditions */
2192   if (timer == NULL) {
2193     GST_DEBUG_OBJECT (jitterbuffer, "no more timers");
2194     return;
2195   }
2196 
2197   GST_DEBUG_OBJECT (jitterbuffer, "waiting till %" GST_TIME_FORMAT
2198       " and earliest timeout is at %" GST_TIME_FORMAT,
2199       GST_TIME_ARGS (priv->timer_timeout), GST_TIME_ARGS (timer->timeout));
2200 
2201   /* wakeup the timer thread in case the timer queue was empty */
2202   JBUF_SIGNAL_TIMER (priv);
2203 
2204   /* no need to wait if the current wait is earlier or later */
2205   if (timer->timeout != -1 && timer->timeout >= priv->timer_timeout)
2206     return;
2207 
2208   /* for other cases, force a reschedule of the timer thread */
2209   unschedule_current_timer (jitterbuffer);
2210 }
2211 
2212 /* get the extra delay to wait before sending RTX */
2213 static GstClockTime
get_rtx_delay(GstRtpJitterBufferPrivate * priv)2214 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
2215 {
2216   GstClockTime delay;
2217 
2218   if (priv->rtx_delay == -1) {
2219     /* the maximum delay for any RTX-packet is given by the latency, since
2220        anything after that is considered lost. For various calulcations,
2221        (given large avg_jitter and/or packet_spacing), the resulting delay
2222        could exceed the configured latency, ending up issuing an RTX-request
2223        that would never arrive in time. To help this we cap the delay
2224        for any RTX with the last possible time it could still arrive in time. */
2225     GstClockTime delay_max = (priv->latency_ns > priv->avg_rtx_rtt) ?
2226         priv->latency_ns - priv->avg_rtx_rtt : priv->latency_ns;
2227 
2228     if (priv->avg_jitter == 0 && priv->packet_spacing == 0) {
2229       delay = DEFAULT_AUTO_RTX_DELAY;
2230     } else {
2231       /* jitter is in nanoseconds, maximum of 2x jitter and half the
2232        * packet spacing is a good margin */
2233       delay = MAX (priv->avg_jitter * 2, priv->packet_spacing / 2);
2234     }
2235 
2236     delay = MIN (delay_max, delay);
2237   } else {
2238     delay = priv->rtx_delay * GST_MSECOND;
2239   }
2240   if (priv->rtx_min_delay > 0)
2241     delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
2242 
2243   return delay;
2244 }
2245 
2246 /* we just received a packet with seqnum and dts.
2247  *
2248  * First check for old seqnum that we are still expecting. If the gap with the
2249  * current seqnum is too big, unschedule the timeouts.
2250  *
2251  * If we have a valid packet spacing estimate we can set a timer for when we
2252  * should receive the next packet.
2253  * If we don't have a valid estimate, we remove any timer we might have
2254  * had for this packet.
2255  */
2256 static void
update_rtx_timers(GstRtpJitterBuffer * jitterbuffer,guint16 seqnum,GstClockTime dts,GstClockTime pts,gboolean do_next_seqnum,gboolean is_rtx,RtpTimer * timer)2257 update_rtx_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
2258     GstClockTime dts, GstClockTime pts, gboolean do_next_seqnum,
2259     gboolean is_rtx, RtpTimer * timer)
2260 {
2261   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2262   gboolean is_stats_timer = FALSE;
2263 
2264   if (timer && rtp_timer_queue_find (priv->rtx_stats_timers, timer->seqnum))
2265     is_stats_timer = TRUE;
2266 
2267   /* schedule immediatly expected timer which exceed the maximum RTX delay
2268    * reorder configuration */
2269   if (priv->do_retransmission && priv->rtx_delay_reorder > 0) {
2270     RtpTimer *test = rtp_timer_queue_peek_earliest (priv->timers);
2271     while (test) {
2272       gint gap;
2273 
2274       /* filter the timer type to speed up this loop */
2275       if (test->type != RTP_TIMER_EXPECTED) {
2276         test = rtp_timer_get_next (test);
2277         continue;
2278       }
2279 
2280       gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
2281 
2282       GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d",
2283           test->type, test->seqnum, seqnum, gap);
2284 
2285       /* if this expected packet have a smaller gap then the configured one,
2286        * then earlier timer are not expected to have bigger gap as the timer
2287        * queue is ordered */
2288       if (gap <= priv->rtx_delay_reorder)
2289         break;
2290 
2291       /* max gap, we exceeded the max reorder distance and we don't expect the
2292        * missing packet to be this reordered */
2293       if (test->num_rtx_retry == 0 && test->type == RTP_TIMER_EXPECTED)
2294         rtp_timer_queue_update_timer (priv->timers, test, test->seqnum,
2295             -1, 0, 0, FALSE);
2296 
2297       test = rtp_timer_get_next (test);
2298     }
2299   }
2300 
2301   do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
2302       && priv->rtx_next_seqnum;
2303 
2304   if (timer && timer->type != RTP_TIMER_DEADLINE) {
2305     if (timer->num_rtx_retry > 0) {
2306       if (is_rtx) {
2307         update_rtx_stats (jitterbuffer, timer, dts, TRUE);
2308         /* don't try to estimate the next seqnum because this is a retransmitted
2309          * packet and it probably did not arrive with the expected packet
2310          * spacing. */
2311         do_next_seqnum = FALSE;
2312       }
2313 
2314       if (!is_stats_timer && (!is_rtx || timer->num_rtx_retry > 1)) {
2315         RtpTimer *stats_timer = rtp_timer_dup (timer);
2316         /* Store timer in order to record stats when/if the retransmitted
2317          * packet arrives. We should also store timer information if we've
2318          * requested retransmission more than once since we may receive
2319          * several retransmitted packets. For accuracy we should update the
2320          * stats also when the redundant retransmitted packets arrives. */
2321         stats_timer->timeout = pts + priv->rtx_stats_timeout * GST_MSECOND;
2322         stats_timer->type = RTP_TIMER_EXPECTED;
2323         rtp_timer_queue_insert (priv->rtx_stats_timers, stats_timer);
2324       }
2325     }
2326   }
2327 
2328   if (do_next_seqnum && pts != GST_CLOCK_TIME_NONE) {
2329     GstClockTime next_expected_pts, delay;
2330 
2331     /* calculate expected arrival time of the next seqnum */
2332     next_expected_pts = pts + priv->packet_spacing;
2333 
2334     delay = get_rtx_delay (priv);
2335 
2336     /* and update/install timer for next seqnum */
2337     GST_DEBUG_OBJECT (jitterbuffer, "Add RTX timer #%d, next_expected_pts %"
2338         GST_TIME_FORMAT ", delay %" GST_TIME_FORMAT ", est packet duration %"
2339         GST_TIME_FORMAT ", jitter %" GST_TIME_FORMAT, priv->next_in_seqnum,
2340         GST_TIME_ARGS (next_expected_pts), GST_TIME_ARGS (delay),
2341         GST_TIME_ARGS (priv->packet_spacing), GST_TIME_ARGS (priv->avg_jitter));
2342 
2343     if (timer && !is_stats_timer) {
2344       timer->type = RTP_TIMER_EXPECTED;
2345       rtp_timer_queue_update_timer (priv->timers, timer, priv->next_in_seqnum,
2346           next_expected_pts, delay, 0, TRUE);
2347     } else {
2348       rtp_timer_queue_set_expected (priv->timers, priv->next_in_seqnum,
2349           next_expected_pts, delay, priv->packet_spacing);
2350     }
2351   } else if (timer && timer->type != RTP_TIMER_DEADLINE && !is_stats_timer) {
2352     /* if we had a timer, remove it, we don't know when to expect the next
2353      * packet. */
2354     rtp_timer_queue_unschedule (priv->timers, timer);
2355     rtp_timer_free (timer);
2356   }
2357 }
2358 
2359 static void
calculate_packet_spacing(GstRtpJitterBuffer * jitterbuffer,guint32 rtptime,GstClockTime pts)2360 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
2361     GstClockTime pts)
2362 {
2363   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2364 
2365   /* we need consecutive seqnums with a different
2366    * rtptime to estimate the packet spacing. */
2367   if (priv->ips_rtptime != rtptime) {
2368     /* rtptime changed, check pts diff */
2369     if (priv->ips_pts != -1 && pts != -1 && pts > priv->ips_pts) {
2370       GstClockTime new_packet_spacing = pts - priv->ips_pts;
2371       GstClockTime old_packet_spacing = priv->packet_spacing;
2372 
2373       /* Biased towards bigger packet spacings to prevent
2374        * too many unneeded retransmission requests for next
2375        * packets that just arrive a little later than we would
2376        * expect */
2377       if (old_packet_spacing > new_packet_spacing)
2378         priv->packet_spacing =
2379             (new_packet_spacing + 3 * old_packet_spacing) / 4;
2380       else if (old_packet_spacing > 0)
2381         priv->packet_spacing =
2382             (3 * new_packet_spacing + old_packet_spacing) / 4;
2383       else
2384         priv->packet_spacing = new_packet_spacing;
2385 
2386       GST_DEBUG_OBJECT (jitterbuffer,
2387           "new packet spacing %" GST_TIME_FORMAT
2388           " old packet spacing %" GST_TIME_FORMAT
2389           " combined to %" GST_TIME_FORMAT,
2390           GST_TIME_ARGS (new_packet_spacing),
2391           GST_TIME_ARGS (old_packet_spacing),
2392           GST_TIME_ARGS (priv->packet_spacing));
2393     }
2394     priv->ips_rtptime = rtptime;
2395     priv->ips_pts = pts;
2396   }
2397 }
2398 
2399 static void
insert_lost_event(GstRtpJitterBuffer * jitterbuffer,guint16 seqnum,guint lost_packets,GstClockTime timestamp,GstClockTime duration,guint num_rtx_retry)2400 insert_lost_event (GstRtpJitterBuffer * jitterbuffer,
2401     guint16 seqnum, guint lost_packets, GstClockTime timestamp,
2402     GstClockTime duration, guint num_rtx_retry)
2403 {
2404   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2405   GstEvent *event = NULL;
2406   guint next_in_seqnum;
2407 
2408   /* we had a gap and thus we lost some packets. Create an event for this.  */
2409   if (lost_packets > 1)
2410     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
2411         seqnum + lost_packets - 1);
2412   else
2413     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
2414 
2415   priv->num_lost += lost_packets;
2416   priv->num_rtx_failed += num_rtx_retry;
2417 
2418   next_in_seqnum = (seqnum + lost_packets) & 0xffff;
2419 
2420   /* we now only accept seqnum bigger than this */
2421   if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0) {
2422     priv->next_in_seqnum = next_in_seqnum;
2423     priv->last_in_pts = timestamp;
2424   }
2425 
2426   /* Avoid creating events if we don't need it. Note that we still need to create
2427    * the lost *ITEM* since it will be used to notify the outgoing thread of
2428    * lost items (so that we can set discont flags and such) */
2429   if (priv->do_lost) {
2430     /* create packet lost event */
2431     if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
2432       duration = priv->packet_spacing;
2433     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
2434         gst_structure_new ("GstRTPPacketLost",
2435             "seqnum", G_TYPE_UINT, (guint) seqnum,
2436             "timestamp", G_TYPE_UINT64, timestamp,
2437             "duration", G_TYPE_UINT64, duration,
2438             "retry", G_TYPE_UINT, num_rtx_retry, NULL));
2439   }
2440   if (rtp_jitter_buffer_append_lost_event (priv->jbuf,
2441           event, seqnum, lost_packets))
2442     JBUF_SIGNAL_EVENT (priv);
2443 }
2444 
2445 static void
gst_rtp_jitter_buffer_handle_missing_packets(GstRtpJitterBuffer * jitterbuffer,guint32 missing_seqnum,guint16 current_seqnum,GstClockTime pts,gint gap,GstClockTime now)2446 gst_rtp_jitter_buffer_handle_missing_packets (GstRtpJitterBuffer * jitterbuffer,
2447     guint32 missing_seqnum, guint16 current_seqnum, GstClockTime pts, gint gap,
2448     GstClockTime now)
2449 {
2450   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2451   GstClockTime est_pkt_duration, est_pts;
2452   gboolean equidistant = priv->equidistant > 0;
2453   GstClockTime last_in_pts = priv->last_in_pts;
2454   GstClockTimeDiff offset = timeout_offset (jitterbuffer);
2455   GstClockTime rtx_delay = get_rtx_delay (priv);
2456   guint16 remaining_gap;
2457   GstClockTimeDiff remaining_duration;
2458   GstClockTimeDiff remainder_duration;
2459   guint i;
2460 
2461   GST_DEBUG_OBJECT (jitterbuffer,
2462       "Missing packets: (#%u->#%u), gap %d, pts %" GST_TIME_FORMAT
2463       ", last-pts %" GST_TIME_FORMAT,
2464       missing_seqnum, current_seqnum - 1, gap, GST_TIME_ARGS (pts),
2465       GST_TIME_ARGS (last_in_pts));
2466 
2467   if (equidistant) {
2468     GstClockTimeDiff total_duration;
2469     gboolean too_late;
2470 
2471     /* the total duration spanned by the missing packets */
2472     total_duration = MAX (0, GST_CLOCK_DIFF (last_in_pts, pts));
2473 
2474     /* interpolate between the current time and the last time based on
2475      * number of packets we are missing, this is the estimated duration
2476      * for the missing packet based on equidistant packet spacing. */
2477     est_pkt_duration = total_duration / (gap + 1);
2478 
2479     /* if we have valid packet-spacing, use that */
2480     if (total_duration > 0 && priv->packet_spacing) {
2481       est_pkt_duration = priv->packet_spacing;
2482     }
2483 
2484     est_pts = last_in_pts + est_pkt_duration;
2485     GST_DEBUG_OBJECT (jitterbuffer, "estimated missing packet pts %"
2486         GST_TIME_FORMAT " and duration %" GST_TIME_FORMAT,
2487         GST_TIME_ARGS (est_pts), GST_TIME_ARGS (est_pkt_duration));
2488 
2489     /* a packet is considered too late if our estimated pts plus all
2490        applicable offsets are in the past */
2491     too_late = now > (est_pts + offset);
2492 
2493     /* Here we optimistically try to save any packets that could potentially
2494        be saved by making sure we create lost/rtx timers for them, and for
2495        the rest that could not possibly be saved, we create a "multi-lost"
2496        event immediately containing the missing duration and sequence numbers */
2497     if (too_late) {
2498       guint lost_packets;
2499       GstClockTime lost_duration;
2500       GstClockTimeDiff gap_time;
2501       guint max_saveable_packets = 0;
2502       GstClockTime max_saveable_duration;
2503       GstClockTime saveable_duration;
2504 
2505       /* gap time represents the total duration of all missing packets */
2506       gap_time = MAX (0, GST_CLOCK_DIFF (est_pts, pts));
2507 
2508       /* based on the estimated packet duration, we
2509          can figure out how many packets we could possibly save */
2510       if (est_pkt_duration)
2511         max_saveable_packets = offset / est_pkt_duration;
2512 
2513       /* and say that the amount of lost packet is the sequence-number
2514          gap minus these saveable packets, but at least 1 */
2515       lost_packets = MAX (1, (gint) gap - (gint) max_saveable_packets);
2516 
2517       /* now we know how many packets we can possibly save */
2518       max_saveable_packets = gap - lost_packets;
2519 
2520       /* we convert that to time */
2521       max_saveable_duration = max_saveable_packets * est_pkt_duration;
2522 
2523       /* determine the actual amount of time we can save */
2524       saveable_duration = MIN (max_saveable_duration, gap_time);
2525 
2526       /* and we now have the duration we need to fill */
2527       lost_duration = GST_CLOCK_DIFF (saveable_duration, gap_time);
2528 
2529       /* this multi-lost-packet event will be inserted directly into the packet-queue
2530          for immediate processing */
2531       if (lost_packets > 0) {
2532         RtpTimer *timer;
2533         GstClockTime timestamp = apply_offset (jitterbuffer, est_pts);
2534 
2535         GST_INFO_OBJECT (jitterbuffer, "lost event for %d packet(s) (#%d->#%d) "
2536             "for duration %" GST_TIME_FORMAT, lost_packets, missing_seqnum,
2537             missing_seqnum + lost_packets - 1, GST_TIME_ARGS (lost_duration));
2538 
2539         insert_lost_event (jitterbuffer, missing_seqnum, lost_packets,
2540             timestamp, lost_duration, 0);
2541 
2542         timer = rtp_timer_queue_find (priv->timers, missing_seqnum);
2543         if (timer && timer->type != RTP_TIMER_DEADLINE) {
2544           if (timer->queued)
2545             rtp_timer_queue_unschedule (priv->timers, timer);
2546           GST_DEBUG_OBJECT (jitterbuffer, "removing timer for seqnum #%u",
2547               missing_seqnum);
2548           rtp_timer_free (timer);
2549         }
2550 
2551         missing_seqnum += lost_packets;
2552         est_pts += lost_duration;
2553       }
2554     }
2555 
2556   } else {
2557     /* If we cannot assume equidistant packet spacing, the only thing we now
2558      * for sure is that the missing packets have expected pts not later than
2559      * the last received pts. */
2560     est_pkt_duration = 0;
2561     est_pts = pts;
2562   }
2563 
2564   /* Figure out how many more packets we are missing. */
2565   remaining_gap = current_seqnum - missing_seqnum;
2566   /* and how much time these packets represent */
2567   remaining_duration = MAX (0, GST_CLOCK_DIFF (est_pts, pts));
2568   /* Given the calculated packet-duration (packet spacing when equidistant),
2569      the remainder is what we are left with after subtracting the ideal time
2570      for the gap */
2571   remainder_duration =
2572       MAX (0, GST_CLOCK_DIFF (est_pkt_duration * remaining_gap,
2573           remaining_duration));
2574 
2575   GST_DEBUG_OBJECT (jitterbuffer, "remaining gap of %u, with "
2576       "duration %" GST_TIME_FORMAT " gives remainder duration %"
2577       GST_STIME_FORMAT, remaining_gap, GST_TIME_ARGS (remaining_duration),
2578       GST_STIME_ARGS (remainder_duration));
2579 
2580   for (i = 0; i < remaining_gap; i++) {
2581     GstClockTime duration = est_pkt_duration;
2582     /* we add the remainder on the first packet */
2583     if (i == 0)
2584       duration += remainder_duration;
2585 
2586     /* clip duration to what is actually left */
2587     remaining_duration = MAX (0, GST_CLOCK_DIFF (est_pts, pts));
2588     duration = MIN (duration, remaining_duration);
2589 
2590     if (priv->do_retransmission) {
2591       RtpTimer *timer = rtp_timer_queue_find (priv->timers, missing_seqnum);
2592 
2593       /* if we had a timer for the missing packet, update it. */
2594       if (timer && timer->type == RTP_TIMER_EXPECTED) {
2595         timer->duration = duration;
2596         if (timer->timeout > (est_pts + rtx_delay) && timer->num_rtx_retry == 0) {
2597           rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum,
2598               est_pts, rtx_delay, 0, TRUE);
2599           GST_DEBUG_OBJECT (jitterbuffer, "Update RTX timer(s) #%u, "
2600               "pts %" GST_TIME_FORMAT ", delay %" GST_TIME_FORMAT
2601               ", duration %" GST_TIME_FORMAT,
2602               missing_seqnum, GST_TIME_ARGS (est_pts),
2603               GST_TIME_ARGS (rtx_delay), GST_TIME_ARGS (duration));
2604         }
2605       } else {
2606         GST_DEBUG_OBJECT (jitterbuffer, "Add RTX timer(s) #%u, "
2607             "pts %" GST_TIME_FORMAT ", delay %" GST_TIME_FORMAT
2608             ", duration %" GST_TIME_FORMAT,
2609             missing_seqnum, GST_TIME_ARGS (est_pts),
2610             GST_TIME_ARGS (rtx_delay), GST_TIME_ARGS (duration));
2611         rtp_timer_queue_set_expected (priv->timers, missing_seqnum, est_pts,
2612             rtx_delay, duration);
2613       }
2614     } else {
2615       GST_INFO_OBJECT (jitterbuffer,
2616           "Add Lost timer for #%u, pts %" GST_TIME_FORMAT
2617           ", duration %" GST_TIME_FORMAT ", offset %" GST_STIME_FORMAT,
2618           missing_seqnum, GST_TIME_ARGS (est_pts),
2619           GST_TIME_ARGS (duration), GST_STIME_ARGS (offset));
2620       rtp_timer_queue_set_lost (priv->timers, missing_seqnum, est_pts,
2621           duration, offset);
2622     }
2623 
2624     missing_seqnum++;
2625     est_pts += duration;
2626   }
2627 }
2628 
2629 static void
calculate_jitter(GstRtpJitterBuffer * jitterbuffer,GstClockTime dts,guint32 rtptime)2630 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2631     guint32 rtptime)
2632 {
2633   gint32 rtpdiff;
2634   GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2635   GstRtpJitterBufferPrivate *priv;
2636 
2637   priv = jitterbuffer->priv;
2638 
2639   if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2640     goto no_time;
2641 
2642   if (priv->last_dts != -1)
2643     dtsdiff = dts - priv->last_dts;
2644   else
2645     dtsdiff = 0;
2646 
2647   if (priv->last_rtptime != -1)
2648     rtpdiff = rtptime - (guint32) priv->last_rtptime;
2649   else
2650     rtpdiff = 0;
2651 
2652   /* Guess whether stream currently uses equidistant packet spacing. If we
2653    * often see identical timestamps it means the packets are not
2654    * equidistant. */
2655   if (rtptime == priv->last_rtptime)
2656     priv->equidistant -= 2;
2657   else
2658     priv->equidistant += 1;
2659   priv->equidistant = CLAMP (priv->equidistant, -7, 7);
2660 
2661   priv->last_dts = dts;
2662   priv->last_rtptime = rtptime;
2663 
2664   if (rtpdiff > 0)
2665     rtpdiffns =
2666         gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2667   else
2668     rtpdiffns =
2669         -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2670 
2671   diff = ABS (dtsdiff - rtpdiffns);
2672 
2673   /* jitter is stored in nanoseconds */
2674   priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2675 
2676   GST_LOG_OBJECT (jitterbuffer,
2677       "dtsdiff %" GST_STIME_FORMAT " rtptime %" GST_STIME_FORMAT
2678       ", clock-rate %d, diff %" GST_STIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2679       GST_STIME_ARGS (dtsdiff), GST_STIME_ARGS (rtpdiffns), priv->clock_rate,
2680       GST_STIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2681 
2682   return;
2683 
2684   /* ERRORS */
2685 no_time:
2686   {
2687     GST_DEBUG_OBJECT (jitterbuffer,
2688         "no dts or no clock-rate, can't calculate jitter");
2689     return;
2690   }
2691 }
2692 
2693 static gint
compare_buffer_seqnum(GstBuffer * a,GstBuffer * b,gpointer user_data)2694 compare_buffer_seqnum (GstBuffer * a, GstBuffer * b, gpointer user_data)
2695 {
2696   GstRTPBuffer rtp_a = GST_RTP_BUFFER_INIT;
2697   GstRTPBuffer rtp_b = GST_RTP_BUFFER_INIT;
2698   guint seq_a, seq_b;
2699 
2700   gst_rtp_buffer_map (a, GST_MAP_READ, &rtp_a);
2701   seq_a = gst_rtp_buffer_get_seq (&rtp_a);
2702   gst_rtp_buffer_unmap (&rtp_a);
2703 
2704   gst_rtp_buffer_map (b, GST_MAP_READ, &rtp_b);
2705   seq_b = gst_rtp_buffer_get_seq (&rtp_b);
2706   gst_rtp_buffer_unmap (&rtp_b);
2707 
2708   return gst_rtp_buffer_compare_seqnum (seq_b, seq_a);
2709 }
2710 
2711 static gboolean
handle_big_gap_buffer(GstRtpJitterBuffer * jitterbuffer,GstBuffer * buffer,guint8 pt,guint16 seqnum,gint gap,guint max_dropout,guint max_misorder)2712 handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, GstBuffer * buffer,
2713     guint8 pt, guint16 seqnum, gint gap, guint max_dropout, guint max_misorder)
2714 {
2715   GstRtpJitterBufferPrivate *priv;
2716   guint gap_packets_length;
2717   gboolean reset = FALSE;
2718   gboolean future = gap > 0;
2719 
2720   priv = jitterbuffer->priv;
2721 
2722   if ((gap_packets_length = g_queue_get_length (&priv->gap_packets)) > 0) {
2723     GList *l;
2724     guint32 prev_gap_seq = -1;
2725     gboolean all_consecutive = TRUE;
2726 
2727     g_queue_insert_sorted (&priv->gap_packets, buffer,
2728         (GCompareDataFunc) compare_buffer_seqnum, NULL);
2729 
2730     for (l = priv->gap_packets.head; l; l = l->next) {
2731       GstBuffer *gap_buffer = l->data;
2732       GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2733       guint32 gap_seq;
2734 
2735       gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2736 
2737       all_consecutive = (gst_rtp_buffer_get_payload_type (&gap_rtp) == pt);
2738 
2739       gap_seq = gst_rtp_buffer_get_seq (&gap_rtp);
2740       if (prev_gap_seq == -1)
2741         prev_gap_seq = gap_seq;
2742       else if (gst_rtp_buffer_compare_seqnum (gap_seq, prev_gap_seq) != -1)
2743         all_consecutive = FALSE;
2744       else
2745         prev_gap_seq = gap_seq;
2746 
2747       gst_rtp_buffer_unmap (&gap_rtp);
2748       if (!all_consecutive)
2749         break;
2750     }
2751 
2752     if (all_consecutive && gap_packets_length > 3) {
2753       GST_DEBUG_OBJECT (jitterbuffer,
2754           "buffer too %s %d < %d, got 5 consecutive ones - reset",
2755           (future ? "new" : "old"), gap,
2756           (future ? max_dropout : -max_misorder));
2757       reset = TRUE;
2758     } else if (!all_consecutive) {
2759       g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
2760       g_queue_clear (&priv->gap_packets);
2761       GST_DEBUG_OBJECT (jitterbuffer,
2762           "buffer too %s %d < %d, got no 5 consecutive ones - dropping",
2763           (future ? "new" : "old"), gap,
2764           (future ? max_dropout : -max_misorder));
2765       buffer = NULL;
2766     } else {
2767       GST_DEBUG_OBJECT (jitterbuffer,
2768           "buffer too %s %d < %d, got %u consecutive ones - waiting",
2769           (future ? "new" : "old"), gap,
2770           (future ? max_dropout : -max_misorder), gap_packets_length + 1);
2771       buffer = NULL;
2772     }
2773   } else {
2774     GST_DEBUG_OBJECT (jitterbuffer,
2775         "buffer too %s %d < %d, first one - waiting", (future ? "new" : "old"),
2776         gap, -max_misorder);
2777     g_queue_push_tail (&priv->gap_packets, buffer);
2778     buffer = NULL;
2779   }
2780 
2781   return reset;
2782 }
2783 
2784 static GstClockTime
get_current_running_time(GstRtpJitterBuffer * jitterbuffer)2785 get_current_running_time (GstRtpJitterBuffer * jitterbuffer)
2786 {
2787   GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (jitterbuffer));
2788   GstClockTime running_time = GST_CLOCK_TIME_NONE;
2789 
2790   if (clock) {
2791     GstClockTime base_time =
2792         gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer));
2793     GstClockTime clock_time = gst_clock_get_time (clock);
2794 
2795     if (clock_time > base_time)
2796       running_time = clock_time - base_time;
2797     else
2798       running_time = 0;
2799 
2800     gst_object_unref (clock);
2801   }
2802 
2803   return running_time;
2804 }
2805 
2806 static GstFlowReturn
gst_rtp_jitter_buffer_reset(GstRtpJitterBuffer * jitterbuffer,GstPad * pad,GstObject * parent,guint16 seqnum)2807 gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer,
2808     GstPad * pad, GstObject * parent, guint16 seqnum)
2809 {
2810   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2811   GstFlowReturn ret = GST_FLOW_OK;
2812   GList *events = NULL, *l;
2813   GList *buffers;
2814 
2815   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2816   rtp_jitter_buffer_flush (priv->jbuf,
2817       (GFunc) free_item_and_retain_sticky_events, &events);
2818   rtp_jitter_buffer_reset_skew (priv->jbuf);
2819   rtp_timer_queue_remove_all (priv->timers);
2820   priv->discont = TRUE;
2821   priv->last_popped_seqnum = -1;
2822 
2823   if (priv->gap_packets.head) {
2824     GstBuffer *gap_buffer = priv->gap_packets.head->data;
2825     GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2826 
2827     gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2828     priv->next_seqnum = gst_rtp_buffer_get_seq (&gap_rtp);
2829     gst_rtp_buffer_unmap (&gap_rtp);
2830   } else {
2831     priv->next_seqnum = seqnum;
2832   }
2833 
2834   priv->last_in_pts = -1;
2835   priv->next_in_seqnum = -1;
2836 
2837   /* Insert all sticky events again in order, otherwise we would
2838    * potentially loose STREAM_START, CAPS or SEGMENT events
2839    */
2840   events = g_list_reverse (events);
2841   for (l = events; l; l = l->next) {
2842     rtp_jitter_buffer_append_event (priv->jbuf, l->data);
2843   }
2844   g_list_free (events);
2845 
2846   JBUF_SIGNAL_EVENT (priv);
2847 
2848   /* reset spacing estimation when gap */
2849   priv->ips_rtptime = -1;
2850   priv->ips_pts = GST_CLOCK_TIME_NONE;
2851 
2852   buffers = g_list_copy (priv->gap_packets.head);
2853   g_queue_clear (&priv->gap_packets);
2854 
2855   priv->ips_rtptime = -1;
2856   priv->ips_pts = GST_CLOCK_TIME_NONE;
2857   JBUF_UNLOCK (jitterbuffer->priv);
2858 
2859   for (l = buffers; l; l = l->next) {
2860     ret = gst_rtp_jitter_buffer_chain (pad, parent, l->data);
2861     l->data = NULL;
2862     if (ret != GST_FLOW_OK) {
2863       l = l->next;
2864       break;
2865     }
2866   }
2867   for (; l; l = l->next)
2868     gst_buffer_unref (l->data);
2869   g_list_free (buffers);
2870 
2871   return ret;
2872 }
2873 
2874 static gboolean
gst_rtp_jitter_buffer_fast_start(GstRtpJitterBuffer * jitterbuffer)2875 gst_rtp_jitter_buffer_fast_start (GstRtpJitterBuffer * jitterbuffer)
2876 {
2877   GstRtpJitterBufferPrivate *priv;
2878   RTPJitterBufferItem *item;
2879   RtpTimer *timer;
2880 
2881   priv = jitterbuffer->priv;
2882 
2883   if (priv->faststart_min_packets == 0)
2884     return FALSE;
2885 
2886   item = rtp_jitter_buffer_peek (priv->jbuf);
2887   if (!item)
2888     return FALSE;
2889 
2890   timer = rtp_timer_queue_find (priv->timers, item->seqnum);
2891   if (!timer || timer->type != RTP_TIMER_DEADLINE)
2892     return FALSE;
2893 
2894   if (rtp_jitter_buffer_can_fast_start (priv->jbuf,
2895           priv->faststart_min_packets)) {
2896     GST_INFO_OBJECT (jitterbuffer, "We found %i consecutive packet, start now",
2897         priv->faststart_min_packets);
2898     timer->timeout = -1;
2899     rtp_timer_queue_reschedule (priv->timers, timer);
2900     return TRUE;
2901   }
2902 
2903   return FALSE;
2904 }
2905 
2906 static GstFlowReturn
gst_rtp_jitter_buffer_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)2907 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2908     GstBuffer * buffer)
2909 {
2910   GstRtpJitterBuffer *jitterbuffer;
2911   GstRtpJitterBufferPrivate *priv;
2912   guint16 seqnum;
2913   guint32 expected, rtptime;
2914   GstFlowReturn ret = GST_FLOW_OK;
2915   GstClockTime now;
2916   GstClockTime dts, pts;
2917   guint64 latency_ts;
2918   gboolean head;
2919   gboolean duplicate;
2920   gint percent = -1;
2921   guint8 pt;
2922   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2923   gboolean do_next_seqnum = FALSE;
2924   GstMessage *msg = NULL;
2925   GstMessage *drop_msg = NULL;
2926   gboolean estimated_dts = FALSE;
2927   gint32 packet_rate, max_dropout, max_misorder;
2928   RtpTimer *timer = NULL;
2929   gboolean is_rtx;
2930 
2931   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2932 
2933   priv = jitterbuffer->priv;
2934 
2935   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2936     goto invalid_buffer;
2937 
2938   pt = gst_rtp_buffer_get_payload_type (&rtp);
2939   seqnum = gst_rtp_buffer_get_seq (&rtp);
2940   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2941   gst_rtp_buffer_unmap (&rtp);
2942 
2943   is_rtx = GST_BUFFER_IS_RETRANSMISSION (buffer);
2944   now = get_current_running_time (jitterbuffer);
2945 
2946   /* make sure we have PTS and DTS set */
2947   pts = GST_BUFFER_PTS (buffer);
2948   dts = GST_BUFFER_DTS (buffer);
2949   if (dts == -1)
2950     dts = pts;
2951   else if (pts == -1)
2952     pts = dts;
2953 
2954   if (dts == -1) {
2955     /* If we have no DTS here, i.e. no capture time, get one from the
2956      * clock now to have something to calculate with in the future. */
2957     dts = now;
2958     pts = dts;
2959 
2960     /* Remember that we estimated the DTS if we are running already
2961      * and this is not our first packet (or first packet after a reset).
2962      * If it's the first packet, we somehow must generate a timestamp for
2963      * everything, otherwise we can't calculate any times
2964      */
2965     estimated_dts = (priv->next_in_seqnum != -1);
2966   } else {
2967     /* take the DTS of the buffer. This is the time when the packet was
2968      * received and is used to calculate jitter and clock skew. We will adjust
2969      * this DTS with the smoothed value after processing it in the
2970      * jitterbuffer and assign it as the PTS. */
2971     /* bring to running time */
2972     dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2973   }
2974 
2975   GST_DEBUG_OBJECT (jitterbuffer,
2976       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d, rtx %d",
2977       seqnum, GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer), is_rtx);
2978 
2979   JBUF_LOCK_CHECK (priv, out_flushing);
2980 
2981   if (G_UNLIKELY (priv->last_pt != pt)) {
2982     GstCaps *caps;
2983 
2984     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2985         pt);
2986 
2987     priv->last_pt = pt;
2988     /* reset clock-rate so that we get a new one */
2989     priv->clock_rate = -1;
2990 
2991     /* Try to get the clock-rate from the caps first if we can. If there are no
2992      * caps we must fire the signal to get the clock-rate. */
2993     if ((caps = gst_pad_get_current_caps (pad))) {
2994       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
2995       gst_caps_unref (caps);
2996     }
2997   }
2998 
2999   if (G_UNLIKELY (priv->clock_rate == -1)) {
3000     /* no clock rate given on the caps, try to get one with the signal */
3001     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
3002             pt) == GST_FLOW_FLUSHING)
3003       goto out_flushing;
3004 
3005     if (G_UNLIKELY (priv->clock_rate == -1))
3006       goto no_clock_rate;
3007 
3008     gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
3009   }
3010 
3011   /* don't accept more data on EOS */
3012   if (G_UNLIKELY (priv->eos))
3013     goto have_eos;
3014 
3015   if (!is_rtx)
3016     calculate_jitter (jitterbuffer, dts, rtptime);
3017 
3018   if (priv->seqnum_base != -1) {
3019     gint gap;
3020 
3021     gap = gst_rtp_buffer_compare_seqnum (priv->seqnum_base, seqnum);
3022 
3023     if (gap < 0) {
3024       GST_DEBUG_OBJECT (jitterbuffer,
3025           "packet seqnum #%d before seqnum-base #%d", seqnum,
3026           priv->seqnum_base);
3027       gst_buffer_unref (buffer);
3028       goto finished;
3029     } else if (gap > 16384) {
3030       /* From now on don't compare against the seqnum base anymore as
3031        * at some point in the future we will wrap around and also that
3032        * much reordering is very unlikely */
3033       priv->seqnum_base = -1;
3034     }
3035   }
3036 
3037   expected = priv->next_in_seqnum;
3038 
3039   /* don't update packet-rate based on RTX, as those arrive highly unregularly */
3040   if (!is_rtx) {
3041     packet_rate = gst_rtp_packet_rate_ctx_update (&priv->packet_rate_ctx,
3042         seqnum, rtptime);
3043     GST_TRACE_OBJECT (jitterbuffer, "updated packet_rate: %d", packet_rate);
3044   }
3045   max_dropout =
3046       gst_rtp_packet_rate_ctx_get_max_dropout (&priv->packet_rate_ctx,
3047       priv->max_dropout_time);
3048   max_misorder =
3049       gst_rtp_packet_rate_ctx_get_max_misorder (&priv->packet_rate_ctx,
3050       priv->max_misorder_time);
3051   GST_TRACE_OBJECT (jitterbuffer, "max_dropout: %d, max_misorder: %d",
3052       max_dropout, max_misorder);
3053 
3054   timer = rtp_timer_queue_find (priv->timers, seqnum);
3055   if (is_rtx) {
3056     if (G_UNLIKELY (!priv->do_retransmission))
3057       goto unsolicited_rtx;
3058 
3059     if (!timer)
3060       timer = rtp_timer_queue_find (priv->rtx_stats_timers, seqnum);
3061 
3062     /* If the first buffer is an (old) rtx, e.g. from before a reset, or
3063      * already lost, ignore it */
3064     if (!timer || expected == -1)
3065       goto unsolicited_rtx;
3066   }
3067 
3068   /* now check against our expected seqnum */
3069   if (G_UNLIKELY (expected == -1)) {
3070     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
3071 
3072     /* calculate a pts based on rtptime and arrival time (dts) */
3073     pts =
3074         rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
3075         rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)),
3076         0, FALSE);
3077 
3078     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (pts))) {
3079       /* A valid timestamp cannot be calculated, discard packet */
3080       goto discard_invalid;
3081     }
3082 
3083     /* we don't know what the next_in_seqnum should be, wait for the last
3084      * possible moment to push this buffer, maybe we get an earlier seqnum
3085      * while we wait */
3086     rtp_timer_queue_set_deadline (priv->timers, seqnum, pts,
3087         timeout_offset (jitterbuffer));
3088 
3089     do_next_seqnum = TRUE;
3090     /* take rtptime and pts to calculate packet spacing */
3091     priv->ips_rtptime = rtptime;
3092     priv->ips_pts = pts;
3093 
3094   } else {
3095     gint gap;
3096     /* now calculate gap */
3097     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
3098     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
3099         expected, seqnum, gap);
3100 
3101     if (G_UNLIKELY (gap > 0 &&
3102             rtp_timer_queue_length (priv->timers) >= max_dropout)) {
3103       /* If we have timers for more than RTP_MAX_DROPOUT packets
3104        * pending this means that we have a huge gap overall. We can
3105        * reset the jitterbuffer at this point because there's
3106        * just too much data missing to be able to do anything
3107        * sensible with the past data. Just try again from the
3108        * next packet */
3109       GST_WARNING_OBJECT (jitterbuffer, "%d pending timers > %d - resetting",
3110           rtp_timer_queue_length (priv->timers), max_dropout);
3111       g_queue_insert_sorted (&priv->gap_packets, buffer,
3112           (GCompareDataFunc) compare_buffer_seqnum, NULL);
3113       return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
3114     }
3115 
3116     /* Special handling of large gaps */
3117     if (!is_rtx && ((gap != -1 && gap < -max_misorder) || (gap >= max_dropout))) {
3118       gboolean reset = handle_big_gap_buffer (jitterbuffer, buffer, pt, seqnum,
3119           gap, max_dropout, max_misorder);
3120       if (reset) {
3121         return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
3122       } else {
3123         GST_DEBUG_OBJECT (jitterbuffer,
3124             "Had big gap, waiting for more consecutive packets");
3125         goto finished;
3126       }
3127     }
3128 
3129     /* We had no huge gap, let's drop all the gap packets */
3130     GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets");
3131     g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
3132     g_queue_clear (&priv->gap_packets);
3133 
3134     /* calculate a pts based on rtptime and arrival time (dts) */
3135     /* If we estimated the DTS, don't consider it in the clock skew calculations */
3136     pts =
3137         rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
3138         rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)),
3139         gap, is_rtx);
3140 
3141     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (pts))) {
3142       /* A valid timestamp cannot be calculated, discard packet */
3143       goto discard_invalid;
3144     }
3145 
3146     if (G_LIKELY (gap == 0)) {
3147       /* packet is expected */
3148       calculate_packet_spacing (jitterbuffer, rtptime, pts);
3149       do_next_seqnum = TRUE;
3150     } else {
3151 
3152       /* we have a gap */
3153       if (gap > 0) {
3154         GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
3155         /* fill in the gap with EXPECTED timers */
3156         gst_rtp_jitter_buffer_handle_missing_packets (jitterbuffer, expected,
3157             seqnum, pts, gap, now);
3158         do_next_seqnum = TRUE;
3159       } else {
3160         GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
3161         do_next_seqnum = FALSE;
3162       }
3163 
3164       /* reset spacing estimation when gap */
3165       priv->ips_rtptime = -1;
3166       priv->ips_pts = GST_CLOCK_TIME_NONE;
3167     }
3168   }
3169 
3170   if (do_next_seqnum) {
3171     priv->last_in_pts = pts;
3172     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
3173   }
3174 
3175   if (is_rtx)
3176     timer->num_rtx_received++;
3177 
3178   /* At 2^15, we would detect a seqnum rollover too early, therefore
3179    * limit the queue size. But let's not limit it to a number that is
3180    * too small to avoid emptying it needlessly if there is a spurious huge
3181    * sequence number, let's allow at least 10k packets in any case. */
3182   while (rtp_jitter_buffer_is_full (priv->jbuf) &&
3183       priv->srcresult == GST_FLOW_OK) {
3184     RtpTimer *timer = rtp_timer_queue_peek_earliest (priv->timers);
3185     while (timer) {
3186       timer->timeout = -1;
3187       if (timer->type == RTP_TIMER_DEADLINE)
3188         break;
3189       timer = rtp_timer_get_next (timer);
3190     }
3191 
3192     update_current_timer (jitterbuffer);
3193     JBUF_WAIT_QUEUE (priv);
3194     if (priv->srcresult != GST_FLOW_OK)
3195       goto out_flushing;
3196   }
3197 
3198   /* let's check if this buffer is too late, we can only accept packets with
3199    * bigger seqnum than the one we last pushed. */
3200   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
3201     gint gap;
3202 
3203     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
3204 
3205     /* priv->last_popped_seqnum >= seqnum, we're too late. */
3206     if (G_UNLIKELY (gap <= 0)) {
3207       if (priv->do_retransmission) {
3208         if (is_rtx && timer) {
3209           update_rtx_stats (jitterbuffer, timer, dts, FALSE);
3210           /* Only count the retranmitted packet too late if it has been
3211            * considered lost. If the original packet arrived before the
3212            * retransmitted we just count it as a duplicate. */
3213           if (timer->type != RTP_TIMER_LOST)
3214             goto rtx_duplicate;
3215         }
3216       }
3217       goto too_late;
3218     }
3219   }
3220 
3221   /* let's drop oldest packet if the queue is already full and drop-on-latency
3222    * is set. We can only do this when there actually is a latency. When no
3223    * latency is set, we just pump it in the queue and let the other end push it
3224    * out as fast as possible. */
3225   if (priv->latency_ms && priv->drop_on_latency) {
3226     latency_ts =
3227         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
3228 
3229     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
3230       RTPJitterBufferItem *old_item;
3231 
3232       old_item = rtp_jitter_buffer_peek (priv->jbuf);
3233 
3234       if (IS_DROPABLE (old_item)) {
3235         old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3236         GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
3237             old_item);
3238         priv->next_seqnum = (old_item->seqnum + old_item->count) & 0xffff;
3239         if (priv->post_drop_messages) {
3240           drop_msg =
3241               new_drop_message (jitterbuffer, old_item->seqnum, old_item->pts,
3242               REASON_DROP_ON_LATENCY);
3243         }
3244         rtp_jitter_buffer_free_item (old_item);
3245       }
3246       /* we might have removed some head buffers, signal the pushing thread to
3247        * see if it can push now */
3248       JBUF_SIGNAL_EVENT (priv);
3249     }
3250   }
3251 
3252   /* If we estimated the DTS, don't consider it in the clock skew calculations
3253    * later. The code above always sets dts to pts or the other way around if
3254    * any of those is valid in the buffer, so we know that if we estimated the
3255    * dts that both are unknown */
3256   head = rtp_jitter_buffer_append_buffer (priv->jbuf, buffer,
3257       estimated_dts ? GST_CLOCK_TIME_NONE : dts, pts, seqnum, rtptime,
3258       &duplicate, &percent);
3259 
3260   /* now insert the packet into the queue in sorted order. This function returns
3261    * FALSE if a packet with the same seqnum was already in the queue, meaning we
3262    * have a duplicate. */
3263   if (G_UNLIKELY (duplicate)) {
3264     if (is_rtx && timer)
3265       update_rtx_stats (jitterbuffer, timer, dts, FALSE);
3266     goto duplicate;
3267   }
3268 
3269   /* Trigger fast start if needed */
3270   if (gst_rtp_jitter_buffer_fast_start (jitterbuffer))
3271     head = TRUE;
3272 
3273   /* update rtx timers */
3274   if (priv->do_retransmission)
3275     update_rtx_timers (jitterbuffer, seqnum, dts, pts, do_next_seqnum, is_rtx,
3276         timer);
3277 
3278   /* we had an unhandled SR, handle it now */
3279   if (priv->last_sr)
3280     do_handle_sync (jitterbuffer);
3281 
3282   if (G_UNLIKELY (head)) {
3283     /* signal addition of new buffer when the _loop is waiting. */
3284     if (G_LIKELY (priv->active))
3285       JBUF_SIGNAL_EVENT (priv);
3286   }
3287 
3288   GST_DEBUG_OBJECT (jitterbuffer,
3289       "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
3290       rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
3291 
3292   msg = check_buffering_percent (jitterbuffer, percent);
3293 
3294 finished:
3295   update_current_timer (jitterbuffer);
3296   JBUF_UNLOCK (priv);
3297 
3298   if (msg)
3299     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3300   if (drop_msg)
3301     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), drop_msg);
3302 
3303   return ret;
3304 
3305   /* ERRORS */
3306 invalid_buffer:
3307   {
3308     /* this is not fatal but should be filtered earlier */
3309     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3310         ("Received invalid RTP payload, dropping"));
3311     gst_buffer_unref (buffer);
3312     return GST_FLOW_OK;
3313   }
3314 no_clock_rate:
3315   {
3316     GST_WARNING_OBJECT (jitterbuffer,
3317         "No clock-rate in caps!, dropping buffer");
3318     gst_buffer_unref (buffer);
3319     goto finished;
3320   }
3321 out_flushing:
3322   {
3323     ret = priv->srcresult;
3324     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
3325     gst_buffer_unref (buffer);
3326     goto finished;
3327   }
3328 have_eos:
3329   {
3330     ret = GST_FLOW_EOS;
3331     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
3332     gst_buffer_unref (buffer);
3333     goto finished;
3334   }
3335 too_late:
3336   {
3337     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
3338         " popped, dropping", seqnum, priv->last_popped_seqnum);
3339     priv->num_late++;
3340     if (priv->post_drop_messages) {
3341       drop_msg = new_drop_message (jitterbuffer, seqnum, pts, REASON_TOO_LATE);
3342     }
3343     gst_buffer_unref (buffer);
3344     goto finished;
3345   }
3346 duplicate:
3347   {
3348     GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
3349         seqnum);
3350     priv->num_duplicates++;
3351     goto finished;
3352   }
3353 rtx_duplicate:
3354   {
3355     GST_DEBUG_OBJECT (jitterbuffer,
3356         "Duplicate RTX packet #%d detected, dropping", seqnum);
3357     priv->num_duplicates++;
3358     gst_buffer_unref (buffer);
3359     goto finished;
3360   }
3361 unsolicited_rtx:
3362   {
3363     GST_DEBUG_OBJECT (jitterbuffer,
3364         "Unsolicited RTX packet #%d detected, dropping", seqnum);
3365     gst_buffer_unref (buffer);
3366     goto finished;
3367   }
3368 discard_invalid:
3369   {
3370     GST_DEBUG_OBJECT (jitterbuffer,
3371         "cannot calculate a valid pts for #%d (rtx: %d), discard",
3372         seqnum, is_rtx);
3373     gst_buffer_unref (buffer);
3374     goto finished;
3375   }
3376 }
3377 
3378 /* FIXME: hopefully we can do something more efficient here, especially when
3379  * all packets are in order and/or outside of the currently cached range.
3380  * Still worthwhile to have it, avoids taking/releasing object lock and pad
3381  * stream lock for every single buffer in the default chain_list fallback. */
3382 static GstFlowReturn
gst_rtp_jitter_buffer_chain_list(GstPad * pad,GstObject * parent,GstBufferList * buffer_list)3383 gst_rtp_jitter_buffer_chain_list (GstPad * pad, GstObject * parent,
3384     GstBufferList * buffer_list)
3385 {
3386   GstFlowReturn flow_ret = GST_FLOW_OK;
3387   guint i, n;
3388 
3389   n = gst_buffer_list_length (buffer_list);
3390   for (i = 0; i < n; ++i) {
3391     GstBuffer *buf = gst_buffer_list_get (buffer_list, i);
3392 
3393     flow_ret = gst_rtp_jitter_buffer_chain (pad, parent, gst_buffer_ref (buf));
3394 
3395     if (flow_ret != GST_FLOW_OK)
3396       break;
3397   }
3398   gst_buffer_list_unref (buffer_list);
3399 
3400   return flow_ret;
3401 }
3402 
3403 static GstClockTime
compute_elapsed(GstRtpJitterBuffer * jitterbuffer,RTPJitterBufferItem * item)3404 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
3405 {
3406   guint64 ext_time, elapsed;
3407   guint32 rtp_time;
3408   GstRtpJitterBufferPrivate *priv;
3409 
3410   priv = jitterbuffer->priv;
3411   rtp_time = item->rtptime;
3412 
3413   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
3414       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
3415 
3416   ext_time = priv->ext_timestamp;
3417   ext_time = gst_rtp_buffer_ext_timestamp (&ext_time, rtp_time);
3418   if (ext_time < priv->ext_timestamp) {
3419     ext_time = priv->ext_timestamp;
3420   } else {
3421     priv->ext_timestamp = ext_time;
3422   }
3423 
3424   if (ext_time > priv->clock_base)
3425     elapsed = ext_time - priv->clock_base;
3426   else
3427     elapsed = 0;
3428 
3429   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
3430   return elapsed;
3431 }
3432 
3433 static void
update_estimated_eos(GstRtpJitterBuffer * jitterbuffer,RTPJitterBufferItem * item)3434 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
3435     RTPJitterBufferItem * item)
3436 {
3437   guint64 total, elapsed, left, estimated;
3438   GstClockTime out_time;
3439   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3440 
3441   if (priv->npt_stop == -1 || priv->ext_timestamp == -1
3442       || priv->clock_base == -1 || priv->clock_rate <= 0)
3443     return;
3444 
3445   /* compute the elapsed time */
3446   elapsed = compute_elapsed (jitterbuffer, item);
3447 
3448   /* do nothing if elapsed time doesn't increment */
3449   if (priv->last_elapsed && elapsed <= priv->last_elapsed)
3450     return;
3451 
3452   priv->last_elapsed = elapsed;
3453 
3454   /* this is the total time we need to play */
3455   total = priv->npt_stop - priv->npt_start;
3456   GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
3457       GST_TIME_ARGS (total));
3458 
3459   /* this is how much time there is left */
3460   if (total > elapsed)
3461     left = total - elapsed;
3462   else
3463     left = 0;
3464 
3465   /* if we have less time left that the size of the buffer, we will not
3466    * be able to keep it filled, disabled buffering then */
3467   if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
3468     GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
3469         ", disable buffering close to EOS", GST_TIME_ARGS (left));
3470     rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
3471   }
3472 
3473   /* this is the current time as running-time */
3474   out_time = item->pts;
3475 
3476   if (elapsed > 0)
3477     estimated = gst_util_uint64_scale (out_time, total, elapsed);
3478   else {
3479     /* if there is almost nothing left,
3480      * we may never advance enough to end up in the above case */
3481     if (total < GST_SECOND)
3482       estimated = GST_SECOND;
3483     else
3484       estimated = -1;
3485   }
3486   GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
3487       GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
3488 
3489   if (estimated != -1 && priv->estimated_eos != estimated) {
3490     rtp_timer_queue_set_eos (priv->timers, estimated,
3491         timeout_offset (jitterbuffer));
3492     priv->estimated_eos = estimated;
3493   }
3494 }
3495 
3496 /* take a buffer from the queue and push it */
3497 static GstFlowReturn
pop_and_push_next(GstRtpJitterBuffer * jitterbuffer,guint seqnum)3498 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
3499 {
3500   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3501   GstFlowReturn result = GST_FLOW_OK;
3502   RTPJitterBufferItem *item;
3503   GstBuffer *outbuf = NULL;
3504   GstEvent *outevent = NULL;
3505   GstQuery *outquery = NULL;
3506   GstClockTime dts, pts;
3507   gint percent = -1;
3508   gboolean do_push = TRUE;
3509   guint type;
3510   GstMessage *msg;
3511 
3512   /* when we get here we are ready to pop and push the buffer */
3513   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3514   type = item->type;
3515 
3516   switch (type) {
3517     case ITEM_TYPE_BUFFER:
3518 
3519       /* we need to make writable to change the flags and timestamps */
3520       outbuf = gst_buffer_make_writable (item->data);
3521 
3522       if (G_UNLIKELY (priv->discont)) {
3523         /* set DISCONT flag when we missed a packet. We pushed the buffer writable
3524          * into the jitterbuffer so we can modify now. */
3525         GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
3526         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
3527         priv->discont = FALSE;
3528       }
3529       if (G_UNLIKELY (priv->ts_discont)) {
3530         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
3531         priv->ts_discont = FALSE;
3532       }
3533 
3534       dts =
3535           gst_segment_position_from_running_time (&priv->segment,
3536           GST_FORMAT_TIME, item->dts);
3537       pts =
3538           gst_segment_position_from_running_time (&priv->segment,
3539           GST_FORMAT_TIME, item->pts);
3540 
3541       /* if this is a new frame, check if ts_offset needs to be updated */
3542       if (pts != priv->last_pts) {
3543         update_offset (jitterbuffer);
3544       }
3545 
3546       /* apply timestamp with offset to buffer now */
3547       GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
3548       GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
3549 
3550       /* update the elapsed time when we need to check against the npt stop time. */
3551       update_estimated_eos (jitterbuffer, item);
3552 
3553       priv->last_pts = pts;
3554       priv->last_out_time = GST_BUFFER_PTS (outbuf);
3555       break;
3556     case ITEM_TYPE_LOST:
3557       priv->discont = TRUE;
3558       if (!priv->do_lost)
3559         do_push = FALSE;
3560       /* FALLTHROUGH */
3561     case ITEM_TYPE_EVENT:
3562       outevent = item->data;
3563       break;
3564     case ITEM_TYPE_QUERY:
3565       outquery = item->data;
3566       break;
3567   }
3568 
3569   /* now we are ready to push the buffer. Save the seqnum and release the lock
3570    * so the other end can push stuff in the queue again. */
3571   if (seqnum != -1) {
3572     priv->last_popped_seqnum = seqnum;
3573     priv->next_seqnum = (seqnum + item->count) & 0xffff;
3574   }
3575   msg = check_buffering_percent (jitterbuffer, percent);
3576 
3577   if (type == ITEM_TYPE_EVENT && outevent &&
3578       GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
3579     g_assert (priv->eos);
3580     while (rtp_timer_queue_length (priv->timers) > 0) {
3581       /* Stopping timers */
3582       unschedule_current_timer (jitterbuffer);
3583       JBUF_WAIT_TIMER (priv);
3584     }
3585   }
3586 
3587   JBUF_UNLOCK (priv);
3588 
3589   item->data = NULL;
3590   rtp_jitter_buffer_free_item (item);
3591 
3592   if (msg)
3593     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3594 
3595   switch (type) {
3596     case ITEM_TYPE_BUFFER:
3597       /* push buffer */
3598       GST_DEBUG_OBJECT (jitterbuffer,
3599           "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
3600           seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
3601           GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
3602       priv->num_pushed++;
3603       GST_BUFFER_DTS (outbuf) = GST_CLOCK_TIME_NONE;
3604       result = gst_pad_push (priv->srcpad, outbuf);
3605 
3606       JBUF_LOCK_CHECK (priv, out_flushing);
3607       break;
3608     case ITEM_TYPE_LOST:
3609     case ITEM_TYPE_EVENT:
3610       /* We got not enough consecutive packets with a huge gap, we can
3611        * as well just drop them here now on EOS */
3612       if (outevent && GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
3613         GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets on EOS");
3614         g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
3615         g_queue_clear (&priv->gap_packets);
3616       }
3617 
3618       GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
3619           ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
3620 
3621       if (do_push)
3622         gst_pad_push_event (priv->srcpad, outevent);
3623       else if (outevent)
3624         gst_event_unref (outevent);
3625 
3626       result = GST_FLOW_OK;
3627 
3628       JBUF_LOCK_CHECK (priv, out_flushing);
3629       break;
3630     case ITEM_TYPE_QUERY:
3631     {
3632       gboolean res;
3633 
3634       res = gst_pad_peer_query (priv->srcpad, outquery);
3635 
3636       JBUF_LOCK_CHECK (priv, out_flushing);
3637       result = GST_FLOW_OK;
3638       GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
3639       JBUF_SIGNAL_QUERY (priv, res);
3640       break;
3641     }
3642   }
3643   return result;
3644 
3645   /* ERRORS */
3646 out_flushing:
3647   {
3648     return priv->srcresult;
3649   }
3650 }
3651 
3652 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
3653 
3654 /* Peek a buffer and compare the seqnum to the expected seqnum.
3655  * If all is fine, the buffer is pushed.
3656  * If something is wrong, we wait for some event
3657  */
3658 static GstFlowReturn
handle_next_buffer(GstRtpJitterBuffer * jitterbuffer)3659 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
3660 {
3661   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3662   GstFlowReturn result;
3663   RTPJitterBufferItem *item;
3664   guint seqnum;
3665   guint32 next_seqnum;
3666 
3667   /* only push buffers when PLAYING and active and not buffering */
3668   if (priv->blocked || !priv->active ||
3669       rtp_jitter_buffer_is_buffering (priv->jbuf)) {
3670     return GST_FLOW_WAIT;
3671   }
3672 
3673   /* peek a buffer, we're just looking at the sequence number.
3674    * If all is fine, we'll pop and push it. If the sequence number is wrong we
3675    * wait for a timeout or something to change.
3676    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
3677   item = rtp_jitter_buffer_peek (priv->jbuf);
3678   if (item == NULL) {
3679     goto wait;
3680   }
3681 
3682   /* get the seqnum and the next expected seqnum */
3683   seqnum = item->seqnum;
3684   if (seqnum == -1) {
3685     return pop_and_push_next (jitterbuffer, seqnum);
3686   }
3687 
3688   next_seqnum = priv->next_seqnum;
3689 
3690   /* get the gap between this and the previous packet. If we don't know the
3691    * previous packet seqnum assume no gap. */
3692   if (G_UNLIKELY (next_seqnum == -1)) {
3693     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
3694     /* we don't know what the next_seqnum should be, the chain function should
3695      * have scheduled a DEADLINE timer that will increment next_seqnum when it
3696      * fires, so wait for that */
3697     result = GST_FLOW_WAIT;
3698   } else {
3699     gint gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
3700 
3701     if (G_LIKELY (gap == 0)) {
3702       /* no missing packet, pop and push */
3703       result = pop_and_push_next (jitterbuffer, seqnum);
3704     } else if (G_UNLIKELY (gap < 0)) {
3705       /* if we have a packet that we already pushed or considered dropped, pop it
3706        * off and get the next packet */
3707       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
3708           seqnum, next_seqnum);
3709       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
3710       rtp_jitter_buffer_free_item (item);
3711       result = GST_FLOW_OK;
3712     } else {
3713       /* the chain function has scheduled timers to request retransmission or
3714        * when to consider the packet lost, wait for that */
3715       GST_DEBUG_OBJECT (jitterbuffer,
3716           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
3717           next_seqnum, seqnum, gap);
3718       /* if we have reached EOS, just keep processing */
3719       /* Also do the same if we block input because the JB is full */
3720       if (priv->eos || rtp_jitter_buffer_is_full (priv->jbuf)) {
3721         result = pop_and_push_next (jitterbuffer, seqnum);
3722         result = GST_FLOW_OK;
3723       } else {
3724         result = GST_FLOW_WAIT;
3725       }
3726     }
3727   }
3728 
3729   return result;
3730 
3731 wait:
3732   {
3733     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
3734     if (priv->eos) {
3735       return GST_FLOW_EOS;
3736     } else {
3737       return GST_FLOW_WAIT;
3738     }
3739   }
3740 }
3741 
3742 static GstClockTime
get_rtx_retry_timeout(GstRtpJitterBufferPrivate * priv)3743 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
3744 {
3745   GstClockTime rtx_retry_timeout;
3746   GstClockTime rtx_min_retry_timeout;
3747 
3748   if (priv->rtx_retry_timeout == -1) {
3749     if (priv->avg_rtx_rtt == 0)
3750       rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
3751     else
3752       /* we want to ask for a retransmission after we waited for a
3753        * complete RTT and the additional jitter */
3754       rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
3755   } else {
3756     rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
3757   }
3758   /* make sure we don't retry too often. On very low latency networks,
3759    * the RTT and jitter can be very low. */
3760   if (priv->rtx_min_retry_timeout == -1) {
3761     rtx_min_retry_timeout = priv->packet_spacing;
3762   } else {
3763     rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
3764   }
3765   rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
3766 
3767   return rtx_retry_timeout;
3768 }
3769 
3770 static GstClockTime
get_rtx_retry_period(GstRtpJitterBufferPrivate * priv,GstClockTime rtx_retry_timeout)3771 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
3772     GstClockTime rtx_retry_timeout)
3773 {
3774   GstClockTime rtx_retry_period;
3775 
3776   if (priv->rtx_retry_period == -1) {
3777     /* we retry up to the configured jitterbuffer size but leaving some
3778      * room for the retransmission to arrive in time */
3779     if (rtx_retry_timeout > priv->latency_ns) {
3780       rtx_retry_period = 0;
3781     } else {
3782       rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
3783     }
3784   } else {
3785     rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
3786   }
3787   return rtx_retry_period;
3788 }
3789 
3790 /*
3791   1. For *larger* rtx-rtt, weigh a new measurement as before (1/8th)
3792   2. For *smaller* rtx-rtt, be a bit more conservative and weigh a bit less (1/16th)
3793   3. For very large measurements (> avg * 2), consider them "outliers"
3794      and count them a lot less (1/48th)
3795 */
3796 static void
update_avg_rtx_rtt(GstRtpJitterBufferPrivate * priv,GstClockTime rtt)3797 update_avg_rtx_rtt (GstRtpJitterBufferPrivate * priv, GstClockTime rtt)
3798 {
3799   gint weight;
3800 
3801   if (priv->avg_rtx_rtt == 0) {
3802     priv->avg_rtx_rtt = rtt;
3803     return;
3804   }
3805 
3806   if (rtt > 2 * priv->avg_rtx_rtt)
3807     weight = 48;
3808   else if (rtt > priv->avg_rtx_rtt)
3809     weight = 8;
3810   else
3811     weight = 16;
3812 
3813   priv->avg_rtx_rtt = (rtt + (weight - 1) * priv->avg_rtx_rtt) / weight;
3814 }
3815 
3816 static void
update_rtx_stats(GstRtpJitterBuffer * jitterbuffer,const RtpTimer * timer,GstClockTime dts,gboolean success)3817 update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, const RtpTimer * timer,
3818     GstClockTime dts, gboolean success)
3819 {
3820   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3821   GstClockTime delay;
3822 
3823   if (success) {
3824     /* we scheduled a retry for this packet and now we have it */
3825     priv->num_rtx_success++;
3826     /* all the previous retry attempts failed */
3827     priv->num_rtx_failed += timer->num_rtx_retry - 1;
3828   } else {
3829     /* All retries failed or was too late */
3830     priv->num_rtx_failed += timer->num_rtx_retry;
3831   }
3832 
3833   /* number of retries before (hopefully) receiving the packet */
3834   if (priv->avg_rtx_num == 0.0)
3835     priv->avg_rtx_num = timer->num_rtx_retry;
3836   else
3837     priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
3838 
3839   /* Calculate the delay between retransmission request and receiving this
3840    * packet. We have a valid delay if and only if this packet is a response to
3841    * our last request. If not we don't know if this is a response to an
3842    * earlier request and delay could be way off. For RTT is more important
3843    * with correct values than to update for every packet. */
3844   if (timer->num_rtx_retry == timer->num_rtx_received &&
3845       dts != GST_CLOCK_TIME_NONE && dts > timer->rtx_last) {
3846     delay = dts - timer->rtx_last;
3847     update_avg_rtx_rtt (priv, delay);
3848   } else {
3849     delay = 0;
3850   }
3851 
3852   GST_LOG_OBJECT (jitterbuffer,
3853       "RTX #%d, result %d, success %" G_GUINT64_FORMAT ", failed %"
3854       G_GUINT64_FORMAT ", requests %" G_GUINT64_FORMAT ", dups %"
3855       G_GUINT64_FORMAT ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %"
3856       GST_TIME_FORMAT, timer->seqnum, success, priv->num_rtx_success,
3857       priv->num_rtx_failed, priv->num_rtx_requests, priv->num_duplicates,
3858       priv->avg_rtx_num, GST_TIME_ARGS (delay),
3859       GST_TIME_ARGS (priv->avg_rtx_rtt));
3860 }
3861 
3862 /* the timeout for when we expected a packet expired */
3863 static gboolean
do_expected_timeout(GstRtpJitterBuffer * jitterbuffer,RtpTimer * timer,GstClockTime now,GQueue * events)3864 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
3865     GstClockTime now, GQueue * events)
3866 {
3867   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3868   GstEvent *event;
3869   guint delay, delay_ms, avg_rtx_rtt_ms;
3870   guint rtx_retry_timeout_ms, rtx_retry_period_ms;
3871   guint rtx_deadline_ms;
3872   GstClockTime rtx_retry_period;
3873   GstClockTime rtx_retry_timeout;
3874   GstClock *clock;
3875   GstClockTimeDiff offset = 0;
3876   GstClockTime timeout;
3877 
3878   GST_DEBUG_OBJECT (jitterbuffer, "expected #%d didn't arrive, now %"
3879       GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
3880 
3881   rtx_retry_timeout = get_rtx_retry_timeout (priv);
3882   rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
3883 
3884   /* delay expresses how late this packet is currently */
3885   delay = now - timer->rtx_base;
3886 
3887   delay_ms = GST_TIME_AS_MSECONDS (delay);
3888   rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
3889   rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
3890   avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
3891   rtx_deadline_ms =
3892       priv->rtx_deadline_ms != -1 ? priv->rtx_deadline_ms : priv->latency_ms;
3893 
3894   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
3895       gst_structure_new ("GstRTPRetransmissionRequest",
3896           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
3897           "running-time", G_TYPE_UINT64, timer->rtx_base,
3898           "delay", G_TYPE_UINT, delay_ms,
3899           "retry", G_TYPE_UINT, timer->num_rtx_retry,
3900           "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
3901           "period", G_TYPE_UINT, rtx_retry_period_ms,
3902           "deadline", G_TYPE_UINT, rtx_deadline_ms,
3903           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
3904           "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
3905   g_queue_push_tail (events, event);
3906   GST_DEBUG_OBJECT (jitterbuffer, "Request RTX: %" GST_PTR_FORMAT, event);
3907 
3908   priv->num_rtx_requests++;
3909   timer->num_rtx_retry++;
3910 
3911   GST_OBJECT_LOCK (jitterbuffer);
3912   if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
3913     timer->rtx_last = gst_clock_get_time (clock);
3914     timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
3915   } else {
3916     timer->rtx_last = now;
3917   }
3918   GST_OBJECT_UNLOCK (jitterbuffer);
3919 
3920   /*
3921      Calculate the timeout for the next retransmission attempt:
3922      We have just successfully sent one RTX request, and we need to
3923      find out when to schedule the next one.
3924 
3925      The rtx_retry_timeout tells us the logical timeout between RTX
3926      requests based on things like round-trip time, jitter and packet spacing,
3927      and is how long we are going to wait before attempting another RTX packet
3928    */
3929   timeout = timer->rtx_last + rtx_retry_timeout;
3930   GST_DEBUG_OBJECT (jitterbuffer,
3931       "timer #%i new timeout %" GST_TIME_FORMAT ", rtx retry timeout %"
3932       GST_TIME_FORMAT ", num_retry %u", timer->seqnum, GST_TIME_ARGS (timeout),
3933       GST_TIME_ARGS (rtx_retry_timeout), timer->num_rtx_retry);
3934   if ((priv->rtx_max_retries != -1
3935           && timer->num_rtx_retry >= priv->rtx_max_retries)
3936       || (timeout > timer->rtx_base + rtx_retry_period)) {
3937     /* too many retransmission request, we now convert the timer
3938      * to a lost timer, leave the num_rtx_retry as it is for stats */
3939     timer->type = RTP_TIMER_LOST;
3940     timeout = timer->rtx_base;
3941     offset = timeout_offset (jitterbuffer);
3942     GST_DEBUG_OBJECT (jitterbuffer, "reschedule #%i as LOST timer for %"
3943         GST_TIME_FORMAT, timer->seqnum,
3944         GST_TIME_ARGS (timer->rtx_base + offset));
3945   }
3946   rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum,
3947       timeout, 0, offset, FALSE);
3948 
3949   return FALSE;
3950 }
3951 
3952 /* a packet is lost */
3953 static gboolean
do_lost_timeout(GstRtpJitterBuffer * jitterbuffer,RtpTimer * timer,GstClockTime now)3954 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
3955     GstClockTime now)
3956 {
3957   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3958   GstClockTime timestamp;
3959 
3960   timestamp = apply_offset (jitterbuffer, get_pts_timeout (timer));
3961   insert_lost_event (jitterbuffer, timer->seqnum, 1, timestamp,
3962       timer->duration, timer->num_rtx_retry);
3963 
3964   if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) {
3965     /* Store info to update stats if the packet arrives too late */
3966     timer->timeout = now + priv->rtx_stats_timeout * GST_MSECOND;
3967     timer->type = RTP_TIMER_LOST;
3968     rtp_timer_queue_insert (priv->rtx_stats_timers, timer);
3969   } else {
3970     rtp_timer_free (timer);
3971   }
3972 
3973   return TRUE;
3974 }
3975 
3976 static gboolean
do_eos_timeout(GstRtpJitterBuffer * jitterbuffer,RtpTimer * timer,GstClockTime now)3977 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
3978     GstClockTime now)
3979 {
3980   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3981 
3982   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
3983   rtp_timer_free (timer);
3984   if (!priv->eos) {
3985     GstEvent *event;
3986 
3987     /* there was no EOS in the buffer, put one in there now */
3988     event = gst_event_new_eos ();
3989     if (priv->segment_seqnum != GST_SEQNUM_INVALID)
3990       gst_event_set_seqnum (event, priv->segment_seqnum);
3991     queue_event (jitterbuffer, event);
3992   }
3993   JBUF_SIGNAL_EVENT (priv);
3994 
3995   return TRUE;
3996 }
3997 
3998 static gboolean
do_deadline_timeout(GstRtpJitterBuffer * jitterbuffer,RtpTimer * timer,GstClockTime now)3999 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
4000     GstClockTime now)
4001 {
4002   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
4003 
4004   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
4005 
4006   /* timer seqnum might have been obsoleted by caps seqnum-base,
4007    * only mess with current ongoing seqnum if still unknown */
4008   if (priv->next_seqnum == -1)
4009     priv->next_seqnum = timer->seqnum;
4010   rtp_timer_free (timer);
4011   JBUF_SIGNAL_EVENT (priv);
4012 
4013   return TRUE;
4014 }
4015 
4016 static gboolean
do_timeout(GstRtpJitterBuffer * jitterbuffer,RtpTimer * timer,GstClockTime now,GQueue * events)4017 do_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
4018     GstClockTime now, GQueue * events)
4019 {
4020   gboolean removed = FALSE;
4021 
4022   switch (timer->type) {
4023     case RTP_TIMER_EXPECTED:
4024       removed = do_expected_timeout (jitterbuffer, timer, now, events);
4025       break;
4026     case RTP_TIMER_LOST:
4027       removed = do_lost_timeout (jitterbuffer, timer, now);
4028       break;
4029     case RTP_TIMER_DEADLINE:
4030       removed = do_deadline_timeout (jitterbuffer, timer, now);
4031       break;
4032     case RTP_TIMER_EOS:
4033       removed = do_eos_timeout (jitterbuffer, timer, now);
4034       break;
4035   }
4036   return removed;
4037 }
4038 
4039 static void
push_rtx_events_unlocked(GstRtpJitterBuffer * jitterbuffer,GQueue * events)4040 push_rtx_events_unlocked (GstRtpJitterBuffer * jitterbuffer, GQueue * events)
4041 {
4042   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
4043   GstEvent *event;
4044 
4045   while ((event = (GstEvent *) g_queue_pop_head (events)))
4046     gst_pad_push_event (priv->sinkpad, event);
4047 }
4048 
4049 /* called with JBUF lock
4050  *
4051  * Pushes all events in @events queue.
4052  *
4053  * Returns: %TRUE if the timer thread is not longer running
4054  */
4055 static void
push_rtx_events(GstRtpJitterBuffer * jitterbuffer,GQueue * events)4056 push_rtx_events (GstRtpJitterBuffer * jitterbuffer, GQueue * events)
4057 {
4058   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
4059 
4060   if (events->length == 0)
4061     return;
4062 
4063   JBUF_UNLOCK (priv);
4064   push_rtx_events_unlocked (jitterbuffer, events);
4065   JBUF_LOCK (priv);
4066 }
4067 
4068 /* called when we need to wait for the next timeout.
4069  *
4070  * We loop over the array of recorded timeouts and wait for the earliest one.
4071  * When it timed out, do the logic associated with the timer.
4072  *
4073  * If there are no timers, we wait on a gcond until something new happens.
4074  */
4075 static void
wait_next_timeout(GstRtpJitterBuffer * jitterbuffer)4076 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
4077 {
4078   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
4079   GstClockTime now = 0;
4080 
4081   JBUF_LOCK (priv);
4082   while (priv->timer_running) {
4083     RtpTimer *timer = NULL;
4084     GQueue events = G_QUEUE_INIT;
4085 
4086     /* don't produce data in paused */
4087     while (priv->blocked) {
4088       JBUF_WAIT_TIMER (priv);
4089       if (!priv->timer_running)
4090         goto stopping;
4091     }
4092 
4093     /* If we have a clock, update "now" now with the very
4094      * latest running time we have. If timers are unscheduled below we
4095      * otherwise wouldn't update now (it's only updated when timers
4096      * expire), and also for the very first loop iteration now would
4097      * otherwise always be 0
4098      */
4099     GST_OBJECT_LOCK (jitterbuffer);
4100     if (priv->eos) {
4101       now = GST_CLOCK_TIME_NONE;
4102     } else if (GST_ELEMENT_CLOCK (jitterbuffer)) {
4103       now =
4104           gst_clock_get_time (GST_ELEMENT_CLOCK (jitterbuffer)) -
4105           GST_ELEMENT_CAST (jitterbuffer)->base_time;
4106     }
4107     GST_OBJECT_UNLOCK (jitterbuffer);
4108 
4109     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
4110         GST_TIME_ARGS (now));
4111 
4112     /* Clear expired rtx-stats timers */
4113     if (priv->do_retransmission)
4114       rtp_timer_queue_remove_until (priv->rtx_stats_timers, now);
4115 
4116     /* Iterate expired "normal" timers */
4117     while ((timer = rtp_timer_queue_pop_until (priv->timers, now)))
4118       do_timeout (jitterbuffer, timer, now, &events);
4119 
4120     timer = rtp_timer_queue_peek_earliest (priv->timers);
4121     if (timer) {
4122       GstClock *clock;
4123       GstClockTime sync_time;
4124       GstClockID id;
4125       GstClockReturn ret;
4126       GstClockTimeDiff clock_jitter;
4127 
4128       /* we poped all immediate and due timer, so this should just never
4129        * happens */
4130       g_assert (GST_CLOCK_TIME_IS_VALID (timer->timeout));
4131 
4132       GST_OBJECT_LOCK (jitterbuffer);
4133       clock = GST_ELEMENT_CLOCK (jitterbuffer);
4134       if (!clock) {
4135         GST_OBJECT_UNLOCK (jitterbuffer);
4136         /* let's just push if there is no clock */
4137         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
4138         now = timer->timeout;
4139         push_rtx_events (jitterbuffer, &events);
4140         continue;
4141       }
4142 
4143       /* prepare for sync against clock */
4144       sync_time = timer->timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
4145       /* add latency of peer to get input time */
4146       sync_time += priv->peer_latency;
4147 
4148       GST_DEBUG_OBJECT (jitterbuffer, "timer #%i sync to timestamp %"
4149           GST_TIME_FORMAT " with sync time %" GST_TIME_FORMAT, timer->seqnum,
4150           GST_TIME_ARGS (get_pts_timeout (timer)), GST_TIME_ARGS (sync_time));
4151 
4152       /* create an entry for the clock */
4153       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
4154       priv->timer_timeout = timer->timeout;
4155       priv->timer_seqnum = timer->seqnum;
4156       GST_OBJECT_UNLOCK (jitterbuffer);
4157 
4158       /* release the lock so that the other end can push stuff or unlock */
4159       JBUF_UNLOCK (priv);
4160 
4161       push_rtx_events_unlocked (jitterbuffer, &events);
4162 
4163       ret = gst_clock_id_wait (id, &clock_jitter);
4164 
4165       JBUF_LOCK (priv);
4166 
4167       if (!priv->timer_running) {
4168         g_queue_clear_full (&events, (GDestroyNotify) gst_event_unref);
4169         gst_clock_id_unref (id);
4170         priv->clock_id = NULL;
4171         break;
4172       }
4173 
4174       if (ret != GST_CLOCK_UNSCHEDULED) {
4175         now = priv->timer_timeout + MAX (clock_jitter, 0);
4176         GST_DEBUG_OBJECT (jitterbuffer,
4177             "sync done, %d, #%d, %" GST_STIME_FORMAT, ret, priv->timer_seqnum,
4178             GST_STIME_ARGS (clock_jitter));
4179       } else {
4180         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
4181       }
4182 
4183       /* and free the entry */
4184       gst_clock_id_unref (id);
4185       priv->clock_id = NULL;
4186     } else {
4187       push_rtx_events_unlocked (jitterbuffer, &events);
4188 
4189       /* when draining the timers, the pusher thread will reuse our
4190        * condition to wait for completion. Signal that thread before
4191        * sleeping again here */
4192       if (priv->eos)
4193         JBUF_SIGNAL_TIMER (priv);
4194 
4195       /* no timers, wait for activity */
4196       JBUF_WAIT_TIMER (priv);
4197     }
4198   }
4199 stopping:
4200   JBUF_UNLOCK (priv);
4201 
4202   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
4203   return;
4204 }
4205 
4206 /*
4207  * This function implements the main pushing loop on the source pad.
4208  *
4209  * It first tries to push as many buffers as possible. If there is a seqnum
4210  * mismatch, we wait for the next timeouts.
4211  */
4212 static void
gst_rtp_jitter_buffer_loop(GstRtpJitterBuffer * jitterbuffer)4213 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
4214 {
4215   GstRtpJitterBufferPrivate *priv;
4216   GstFlowReturn result = GST_FLOW_OK;
4217 
4218   priv = jitterbuffer->priv;
4219 
4220   JBUF_LOCK_CHECK (priv, flushing);
4221   do {
4222     result = handle_next_buffer (jitterbuffer);
4223     if (G_LIKELY (result == GST_FLOW_WAIT)) {
4224       /* now wait for the next event */
4225       JBUF_SIGNAL_QUEUE (priv);
4226       JBUF_WAIT_EVENT (priv, flushing);
4227       result = GST_FLOW_OK;
4228     }
4229   } while (result == GST_FLOW_OK);
4230   /* store result for upstream */
4231   priv->srcresult = result;
4232   /* if we get here we need to pause */
4233   goto pause;
4234 
4235   /* ERRORS */
4236 flushing:
4237   {
4238     result = priv->srcresult;
4239     goto pause;
4240   }
4241 pause:
4242   {
4243     GstEvent *event;
4244 
4245     JBUF_SIGNAL_QUERY (priv, FALSE);
4246     JBUF_UNLOCK (priv);
4247 
4248     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
4249         gst_flow_get_name (result));
4250     gst_pad_pause_task (priv->srcpad);
4251     if (result == GST_FLOW_EOS) {
4252       event = gst_event_new_eos ();
4253       if (priv->segment_seqnum != GST_SEQNUM_INVALID)
4254         gst_event_set_seqnum (event, priv->segment_seqnum);
4255       gst_pad_push_event (priv->srcpad, event);
4256     }
4257     return;
4258   }
4259 }
4260 
4261 /* collect the info from the latest RTCP packet and the jitterbuffer sync, do
4262  * some sanity checks and then emit the handle-sync signal with the parameters.
4263  * This function must be called with the LOCK */
4264 static void
do_handle_sync(GstRtpJitterBuffer * jitterbuffer)4265 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
4266 {
4267   GstRtpJitterBufferPrivate *priv;
4268   guint64 base_rtptime, base_time;
4269   guint32 clock_rate;
4270   guint64 last_rtptime;
4271   guint64 clock_base;
4272   guint64 ext_rtptime, diff;
4273   gboolean valid = TRUE, keep = FALSE;
4274 
4275   priv = jitterbuffer->priv;
4276 
4277   /* get the last values from the jitterbuffer */
4278   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
4279       &clock_rate, &last_rtptime);
4280 
4281   clock_base = priv->clock_base;
4282   ext_rtptime = priv->ext_rtptime;
4283 
4284   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
4285       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
4286       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
4287       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
4288 
4289   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
4290     /* we keep this SR packet for later. When we get a valid RTP packet the
4291      * above values will be set and we can try to use the SR packet */
4292     GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
4293     keep = TRUE;
4294   } else {
4295     /* we can't accept anything that happened before we did the last resync */
4296     if (base_rtptime > ext_rtptime) {
4297       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
4298       valid = FALSE;
4299     } else {
4300       /* the SR RTP timestamp must be something close to what we last observed
4301        * in the jitterbuffer */
4302       if (ext_rtptime > last_rtptime) {
4303         /* check how far ahead it is to our RTP timestamps */
4304         diff = ext_rtptime - last_rtptime;
4305         /* if bigger than 1 second, we drop it */
4306         if (jitterbuffer->priv->max_rtcp_rtp_time_diff != -1 &&
4307             diff >
4308             gst_util_uint64_scale (jitterbuffer->priv->max_rtcp_rtp_time_diff,
4309                 clock_rate, 1000)) {
4310           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
4311           /* should drop this, but some RTSP servers end up with bogus
4312            * way too ahead RTCP packet when repeated PAUSE/PLAY,
4313            * so still trigger rptbin sync but invalidate RTCP data
4314            * (sync might use other methods) */
4315           ext_rtptime = -1;
4316         }
4317         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
4318             G_GUINT64_FORMAT, last_rtptime, diff);
4319       }
4320     }
4321   }
4322 
4323   if (keep) {
4324     GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
4325   } else if (valid) {
4326     GstStructure *s;
4327 
4328     s = gst_structure_new ("application/x-rtp-sync",
4329         "base-rtptime", G_TYPE_UINT64, base_rtptime,
4330         "base-time", G_TYPE_UINT64, base_time,
4331         "clock-rate", G_TYPE_UINT, clock_rate,
4332         "clock-base", G_TYPE_UINT64, clock_base,
4333         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
4334         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
4335 
4336     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
4337     gst_buffer_replace (&priv->last_sr, NULL);
4338     JBUF_UNLOCK (priv);
4339     g_signal_emit (jitterbuffer,
4340         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
4341     JBUF_LOCK (priv);
4342     gst_structure_free (s);
4343   } else {
4344     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
4345     gst_buffer_replace (&priv->last_sr, NULL);
4346   }
4347 }
4348 
4349 static GstFlowReturn
gst_rtp_jitter_buffer_chain_rtcp(GstPad * pad,GstObject * parent,GstBuffer * buffer)4350 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
4351     GstBuffer * buffer)
4352 {
4353   GstRtpJitterBuffer *jitterbuffer;
4354   GstRtpJitterBufferPrivate *priv;
4355   GstFlowReturn ret = GST_FLOW_OK;
4356   guint32 ssrc;
4357   GstRTCPPacket packet;
4358   guint64 ext_rtptime;
4359   guint32 rtptime;
4360   GstRTCPBuffer rtcp = { NULL, };
4361 
4362   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4363 
4364   if (G_UNLIKELY (!gst_rtcp_buffer_validate_reduced (buffer)))
4365     goto invalid_buffer;
4366 
4367   priv = jitterbuffer->priv;
4368 
4369   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
4370 
4371   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
4372     goto empty_buffer;
4373 
4374   /* first packet must be SR or RR or else the validate would have failed */
4375   switch (gst_rtcp_packet_get_type (&packet)) {
4376     case GST_RTCP_TYPE_SR:
4377       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
4378           NULL, NULL);
4379       break;
4380     default:
4381       goto ignore_buffer;
4382   }
4383   gst_rtcp_buffer_unmap (&rtcp);
4384 
4385   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
4386 
4387   JBUF_LOCK (priv);
4388   /* convert the RTP timestamp to our extended timestamp, using the same offset
4389    * we used in the jitterbuffer */
4390   ext_rtptime = priv->jbuf->ext_rtptime;
4391   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
4392 
4393   priv->ext_rtptime = ext_rtptime;
4394   gst_buffer_replace (&priv->last_sr, buffer);
4395 
4396   do_handle_sync (jitterbuffer);
4397   JBUF_UNLOCK (priv);
4398 
4399 done:
4400   gst_buffer_unref (buffer);
4401 
4402   return ret;
4403 
4404 invalid_buffer:
4405   {
4406     /* this is not fatal but should be filtered earlier */
4407     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
4408         ("Received invalid RTCP payload, dropping"));
4409     ret = GST_FLOW_OK;
4410     goto done;
4411   }
4412 empty_buffer:
4413   {
4414     /* this is not fatal but should be filtered earlier */
4415     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
4416         ("Received empty RTCP payload, dropping"));
4417     gst_rtcp_buffer_unmap (&rtcp);
4418     ret = GST_FLOW_OK;
4419     goto done;
4420   }
4421 ignore_buffer:
4422   {
4423     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
4424     gst_rtcp_buffer_unmap (&rtcp);
4425     ret = GST_FLOW_OK;
4426     goto done;
4427   }
4428 }
4429 
4430 static gboolean
gst_rtp_jitter_buffer_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)4431 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
4432     GstQuery * query)
4433 {
4434   gboolean res = FALSE;
4435   GstRtpJitterBuffer *jitterbuffer;
4436   GstRtpJitterBufferPrivate *priv;
4437 
4438   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4439   priv = jitterbuffer->priv;
4440 
4441   switch (GST_QUERY_TYPE (query)) {
4442     case GST_QUERY_CAPS:
4443     {
4444       GstCaps *filter, *caps;
4445 
4446       gst_query_parse_caps (query, &filter);
4447       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
4448       gst_query_set_caps_result (query, caps);
4449       gst_caps_unref (caps);
4450       res = TRUE;
4451       break;
4452     }
4453     default:
4454       if (GST_QUERY_IS_SERIALIZED (query)) {
4455         JBUF_LOCK_CHECK (priv, out_flushing);
4456         if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
4457             RTP_JITTER_BUFFER_MODE_BUFFER) {
4458           GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
4459           if (rtp_jitter_buffer_append_query (priv->jbuf, query))
4460             JBUF_SIGNAL_EVENT (priv);
4461           JBUF_WAIT_QUERY (priv, out_flushing);
4462           res = priv->last_query;
4463         } else {
4464           GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
4465           res = FALSE;
4466         }
4467         JBUF_UNLOCK (priv);
4468       } else {
4469         res = gst_pad_query_default (pad, parent, query);
4470       }
4471       break;
4472   }
4473   return res;
4474   /* ERRORS */
4475 out_flushing:
4476   {
4477     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
4478     JBUF_UNLOCK (priv);
4479     return FALSE;
4480   }
4481 
4482 }
4483 
4484 static gboolean
gst_rtp_jitter_buffer_src_query(GstPad * pad,GstObject * parent,GstQuery * query)4485 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
4486     GstQuery * query)
4487 {
4488   GstRtpJitterBuffer *jitterbuffer;
4489   GstRtpJitterBufferPrivate *priv;
4490   gboolean res = FALSE;
4491 
4492   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4493   priv = jitterbuffer->priv;
4494 
4495   switch (GST_QUERY_TYPE (query)) {
4496     case GST_QUERY_LATENCY:
4497     {
4498       /* We need to send the query upstream and add the returned latency to our
4499        * own */
4500       GstClockTime min_latency, max_latency;
4501       gboolean us_live;
4502       GstClockTime our_latency;
4503 
4504       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
4505         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
4506 
4507         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
4508             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
4509             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
4510 
4511         /* store this so that we can safely sync on the peer buffers. */
4512         JBUF_LOCK (priv);
4513         priv->peer_latency = min_latency;
4514         our_latency = priv->latency_ns;
4515         JBUF_UNLOCK (priv);
4516 
4517         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
4518             GST_TIME_ARGS (our_latency));
4519 
4520         /* we add some latency but can buffer an infinite amount of time */
4521         min_latency += our_latency;
4522         max_latency = -1;
4523 
4524         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
4525             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
4526             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
4527 
4528         gst_query_set_latency (query, TRUE, min_latency, max_latency);
4529       }
4530       break;
4531     }
4532     case GST_QUERY_POSITION:
4533     {
4534       GstClockTime start, last_out;
4535       GstFormat fmt;
4536 
4537       gst_query_parse_position (query, &fmt, NULL);
4538       if (fmt != GST_FORMAT_TIME) {
4539         res = gst_pad_query_default (pad, parent, query);
4540         break;
4541       }
4542 
4543       JBUF_LOCK (priv);
4544       start = priv->npt_start;
4545       last_out = priv->last_out_time;
4546       JBUF_UNLOCK (priv);
4547 
4548       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
4549           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
4550           GST_TIME_ARGS (last_out));
4551 
4552       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
4553         /* bring 0-based outgoing time to stream time */
4554         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
4555         res = TRUE;
4556       } else {
4557         res = gst_pad_query_default (pad, parent, query);
4558       }
4559       break;
4560     }
4561     case GST_QUERY_CAPS:
4562     {
4563       GstCaps *filter, *caps;
4564 
4565       gst_query_parse_caps (query, &filter);
4566       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
4567       gst_query_set_caps_result (query, caps);
4568       gst_caps_unref (caps);
4569       res = TRUE;
4570       break;
4571     }
4572     default:
4573       res = gst_pad_query_default (pad, parent, query);
4574       break;
4575   }
4576 
4577   return res;
4578 }
4579 
4580 static void
gst_rtp_jitter_buffer_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)4581 gst_rtp_jitter_buffer_set_property (GObject * object,
4582     guint prop_id, const GValue * value, GParamSpec * pspec)
4583 {
4584   GstRtpJitterBuffer *jitterbuffer;
4585   GstRtpJitterBufferPrivate *priv;
4586 
4587   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4588   priv = jitterbuffer->priv;
4589 
4590   switch (prop_id) {
4591     case PROP_LATENCY:
4592     {
4593       guint new_latency, old_latency;
4594 
4595       new_latency = g_value_get_uint (value);
4596 
4597       JBUF_LOCK (priv);
4598       old_latency = priv->latency_ms;
4599       priv->latency_ms = new_latency;
4600       priv->latency_ns = priv->latency_ms * GST_MSECOND;
4601       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
4602       JBUF_UNLOCK (priv);
4603 
4604       /* post message if latency changed, this will inform the parent pipeline
4605        * that a latency reconfiguration is possible/needed. */
4606       if (new_latency != old_latency) {
4607         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
4608             GST_TIME_ARGS (new_latency * GST_MSECOND));
4609 
4610         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
4611             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
4612       }
4613       break;
4614     }
4615     case PROP_DROP_ON_LATENCY:
4616       JBUF_LOCK (priv);
4617       priv->drop_on_latency = g_value_get_boolean (value);
4618       JBUF_UNLOCK (priv);
4619       break;
4620     case PROP_TS_OFFSET:
4621       JBUF_LOCK (priv);
4622       if (priv->max_ts_offset_adjustment != 0) {
4623         gint64 new_offset = g_value_get_int64 (value);
4624 
4625         if (new_offset > priv->ts_offset) {
4626           priv->ts_offset_remainder = new_offset - priv->ts_offset;
4627         } else {
4628           priv->ts_offset_remainder = -(priv->ts_offset - new_offset);
4629         }
4630       } else {
4631         priv->ts_offset = g_value_get_int64 (value);
4632         priv->ts_offset_remainder = 0;
4633         update_timer_offsets (jitterbuffer);
4634       }
4635       priv->ts_discont = TRUE;
4636       JBUF_UNLOCK (priv);
4637       break;
4638     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
4639       JBUF_LOCK (priv);
4640       priv->max_ts_offset_adjustment = g_value_get_uint64 (value);
4641       JBUF_UNLOCK (priv);
4642       break;
4643     case PROP_DO_LOST:
4644       JBUF_LOCK (priv);
4645       priv->do_lost = g_value_get_boolean (value);
4646       JBUF_UNLOCK (priv);
4647       break;
4648     case PROP_POST_DROP_MESSAGES:
4649       JBUF_LOCK (priv);
4650       priv->post_drop_messages = g_value_get_boolean (value);
4651       JBUF_UNLOCK (priv);
4652       break;
4653     case PROP_DROP_MESSAGES_INTERVAL:
4654       JBUF_LOCK (priv);
4655       priv->drop_messages_interval_ms = g_value_get_uint (value);
4656       JBUF_UNLOCK (priv);
4657       break;
4658     case PROP_MODE:
4659       JBUF_LOCK (priv);
4660       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
4661       JBUF_UNLOCK (priv);
4662       break;
4663     case PROP_DO_RETRANSMISSION:
4664       JBUF_LOCK (priv);
4665       priv->do_retransmission = g_value_get_boolean (value);
4666       JBUF_UNLOCK (priv);
4667       break;
4668     case PROP_RTX_NEXT_SEQNUM:
4669       JBUF_LOCK (priv);
4670       priv->rtx_next_seqnum = g_value_get_boolean (value);
4671       JBUF_UNLOCK (priv);
4672       break;
4673     case PROP_RTX_DELAY:
4674       JBUF_LOCK (priv);
4675       priv->rtx_delay = g_value_get_int (value);
4676       JBUF_UNLOCK (priv);
4677       break;
4678     case PROP_RTX_MIN_DELAY:
4679       JBUF_LOCK (priv);
4680       priv->rtx_min_delay = g_value_get_uint (value);
4681       JBUF_UNLOCK (priv);
4682       break;
4683     case PROP_RTX_DELAY_REORDER:
4684       JBUF_LOCK (priv);
4685       priv->rtx_delay_reorder = g_value_get_int (value);
4686       JBUF_UNLOCK (priv);
4687       break;
4688     case PROP_RTX_RETRY_TIMEOUT:
4689       JBUF_LOCK (priv);
4690       priv->rtx_retry_timeout = g_value_get_int (value);
4691       JBUF_UNLOCK (priv);
4692       break;
4693     case PROP_RTX_MIN_RETRY_TIMEOUT:
4694       JBUF_LOCK (priv);
4695       priv->rtx_min_retry_timeout = g_value_get_int (value);
4696       JBUF_UNLOCK (priv);
4697       break;
4698     case PROP_RTX_RETRY_PERIOD:
4699       JBUF_LOCK (priv);
4700       priv->rtx_retry_period = g_value_get_int (value);
4701       JBUF_UNLOCK (priv);
4702       break;
4703     case PROP_RTX_MAX_RETRIES:
4704       JBUF_LOCK (priv);
4705       priv->rtx_max_retries = g_value_get_int (value);
4706       JBUF_UNLOCK (priv);
4707       break;
4708     case PROP_RTX_DEADLINE:
4709       JBUF_LOCK (priv);
4710       priv->rtx_deadline_ms = g_value_get_int (value);
4711       JBUF_UNLOCK (priv);
4712       break;
4713     case PROP_RTX_STATS_TIMEOUT:
4714       JBUF_LOCK (priv);
4715       priv->rtx_stats_timeout = g_value_get_uint (value);
4716       JBUF_UNLOCK (priv);
4717       break;
4718     case PROP_MAX_RTCP_RTP_TIME_DIFF:
4719       JBUF_LOCK (priv);
4720       priv->max_rtcp_rtp_time_diff = g_value_get_int (value);
4721       JBUF_UNLOCK (priv);
4722       break;
4723     case PROP_MAX_DROPOUT_TIME:
4724       JBUF_LOCK (priv);
4725       priv->max_dropout_time = g_value_get_uint (value);
4726       JBUF_UNLOCK (priv);
4727       break;
4728     case PROP_MAX_MISORDER_TIME:
4729       JBUF_LOCK (priv);
4730       priv->max_misorder_time = g_value_get_uint (value);
4731       JBUF_UNLOCK (priv);
4732       break;
4733     case PROP_RFC7273_SYNC:
4734       JBUF_LOCK (priv);
4735       rtp_jitter_buffer_set_rfc7273_sync (priv->jbuf,
4736           g_value_get_boolean (value));
4737       JBUF_UNLOCK (priv);
4738       break;
4739     case PROP_FASTSTART_MIN_PACKETS:
4740       JBUF_LOCK (priv);
4741       priv->faststart_min_packets = g_value_get_uint (value);
4742       JBUF_UNLOCK (priv);
4743       break;
4744     default:
4745       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4746       break;
4747   }
4748 }
4749 
4750 static void
gst_rtp_jitter_buffer_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)4751 gst_rtp_jitter_buffer_get_property (GObject * object,
4752     guint prop_id, GValue * value, GParamSpec * pspec)
4753 {
4754   GstRtpJitterBuffer *jitterbuffer;
4755   GstRtpJitterBufferPrivate *priv;
4756 
4757   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4758   priv = jitterbuffer->priv;
4759 
4760   switch (prop_id) {
4761     case PROP_LATENCY:
4762       JBUF_LOCK (priv);
4763       g_value_set_uint (value, priv->latency_ms);
4764       JBUF_UNLOCK (priv);
4765       break;
4766     case PROP_DROP_ON_LATENCY:
4767       JBUF_LOCK (priv);
4768       g_value_set_boolean (value, priv->drop_on_latency);
4769       JBUF_UNLOCK (priv);
4770       break;
4771     case PROP_TS_OFFSET:
4772       JBUF_LOCK (priv);
4773       g_value_set_int64 (value, priv->ts_offset);
4774       JBUF_UNLOCK (priv);
4775       break;
4776     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
4777       JBUF_LOCK (priv);
4778       g_value_set_uint64 (value, priv->max_ts_offset_adjustment);
4779       JBUF_UNLOCK (priv);
4780       break;
4781     case PROP_DO_LOST:
4782       JBUF_LOCK (priv);
4783       g_value_set_boolean (value, priv->do_lost);
4784       JBUF_UNLOCK (priv);
4785       break;
4786     case PROP_POST_DROP_MESSAGES:
4787       JBUF_LOCK (priv);
4788       g_value_set_boolean (value, priv->post_drop_messages);
4789       JBUF_UNLOCK (priv);
4790       break;
4791     case PROP_DROP_MESSAGES_INTERVAL:
4792       JBUF_LOCK (priv);
4793       g_value_set_uint (value, priv->drop_messages_interval_ms);
4794       JBUF_UNLOCK (priv);
4795       break;
4796     case PROP_MODE:
4797       JBUF_LOCK (priv);
4798       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
4799       JBUF_UNLOCK (priv);
4800       break;
4801     case PROP_PERCENT:
4802     {
4803       gint percent;
4804 
4805       JBUF_LOCK (priv);
4806       if (priv->srcresult != GST_FLOW_OK)
4807         percent = 100;
4808       else
4809         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
4810 
4811       g_value_set_int (value, percent);
4812       JBUF_UNLOCK (priv);
4813       break;
4814     }
4815     case PROP_DO_RETRANSMISSION:
4816       JBUF_LOCK (priv);
4817       g_value_set_boolean (value, priv->do_retransmission);
4818       JBUF_UNLOCK (priv);
4819       break;
4820     case PROP_RTX_NEXT_SEQNUM:
4821       JBUF_LOCK (priv);
4822       g_value_set_boolean (value, priv->rtx_next_seqnum);
4823       JBUF_UNLOCK (priv);
4824       break;
4825     case PROP_RTX_DELAY:
4826       JBUF_LOCK (priv);
4827       g_value_set_int (value, priv->rtx_delay);
4828       JBUF_UNLOCK (priv);
4829       break;
4830     case PROP_RTX_MIN_DELAY:
4831       JBUF_LOCK (priv);
4832       g_value_set_uint (value, priv->rtx_min_delay);
4833       JBUF_UNLOCK (priv);
4834       break;
4835     case PROP_RTX_DELAY_REORDER:
4836       JBUF_LOCK (priv);
4837       g_value_set_int (value, priv->rtx_delay_reorder);
4838       JBUF_UNLOCK (priv);
4839       break;
4840     case PROP_RTX_RETRY_TIMEOUT:
4841       JBUF_LOCK (priv);
4842       g_value_set_int (value, priv->rtx_retry_timeout);
4843       JBUF_UNLOCK (priv);
4844       break;
4845     case PROP_RTX_MIN_RETRY_TIMEOUT:
4846       JBUF_LOCK (priv);
4847       g_value_set_int (value, priv->rtx_min_retry_timeout);
4848       JBUF_UNLOCK (priv);
4849       break;
4850     case PROP_RTX_RETRY_PERIOD:
4851       JBUF_LOCK (priv);
4852       g_value_set_int (value, priv->rtx_retry_period);
4853       JBUF_UNLOCK (priv);
4854       break;
4855     case PROP_RTX_MAX_RETRIES:
4856       JBUF_LOCK (priv);
4857       g_value_set_int (value, priv->rtx_max_retries);
4858       JBUF_UNLOCK (priv);
4859       break;
4860     case PROP_RTX_DEADLINE:
4861       JBUF_LOCK (priv);
4862       g_value_set_int (value, priv->rtx_deadline_ms);
4863       JBUF_UNLOCK (priv);
4864       break;
4865     case PROP_RTX_STATS_TIMEOUT:
4866       JBUF_LOCK (priv);
4867       g_value_set_uint (value, priv->rtx_stats_timeout);
4868       JBUF_UNLOCK (priv);
4869       break;
4870     case PROP_STATS:
4871       g_value_take_boxed (value,
4872           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
4873       break;
4874     case PROP_MAX_RTCP_RTP_TIME_DIFF:
4875       JBUF_LOCK (priv);
4876       g_value_set_int (value, priv->max_rtcp_rtp_time_diff);
4877       JBUF_UNLOCK (priv);
4878       break;
4879     case PROP_MAX_DROPOUT_TIME:
4880       JBUF_LOCK (priv);
4881       g_value_set_uint (value, priv->max_dropout_time);
4882       JBUF_UNLOCK (priv);
4883       break;
4884     case PROP_MAX_MISORDER_TIME:
4885       JBUF_LOCK (priv);
4886       g_value_set_uint (value, priv->max_misorder_time);
4887       JBUF_UNLOCK (priv);
4888       break;
4889     case PROP_RFC7273_SYNC:
4890       JBUF_LOCK (priv);
4891       g_value_set_boolean (value,
4892           rtp_jitter_buffer_get_rfc7273_sync (priv->jbuf));
4893       JBUF_UNLOCK (priv);
4894       break;
4895     case PROP_FASTSTART_MIN_PACKETS:
4896       JBUF_LOCK (priv);
4897       g_value_set_uint (value, priv->faststart_min_packets);
4898       JBUF_UNLOCK (priv);
4899       break;
4900     default:
4901       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4902       break;
4903   }
4904 }
4905 
4906 static GstStructure *
gst_rtp_jitter_buffer_create_stats(GstRtpJitterBuffer * jbuf)4907 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
4908 {
4909   GstRtpJitterBufferPrivate *priv = jbuf->priv;
4910   GstStructure *s;
4911 
4912   JBUF_LOCK (priv);
4913   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
4914       "num-pushed", G_TYPE_UINT64, priv->num_pushed,
4915       "num-lost", G_TYPE_UINT64, priv->num_lost,
4916       "num-late", G_TYPE_UINT64, priv->num_late,
4917       "num-duplicates", G_TYPE_UINT64, priv->num_duplicates,
4918       "avg-jitter", G_TYPE_UINT64, priv->avg_jitter,
4919       "rtx-count", G_TYPE_UINT64, priv->num_rtx_requests,
4920       "rtx-success-count", G_TYPE_UINT64, priv->num_rtx_success,
4921       "rtx-per-packet", G_TYPE_DOUBLE, priv->avg_rtx_num,
4922       "rtx-rtt", G_TYPE_UINT64, priv->avg_rtx_rtt, NULL);
4923   JBUF_UNLOCK (priv);
4924 
4925   return s;
4926 }
4927