• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C)  2019 Pexip (http://pexip.com/)
3  *   @author: Havard Graff <havard@pexip.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 #include "rtptwcc.h"
21 #include <gst/rtp/gstrtcpbuffer.h>
22 #include <gst/base/gstbitreader.h>
23 #include <gst/base/gstbitwriter.h>
24 
25 GST_DEBUG_CATEGORY_EXTERN (rtp_session_debug);
26 #define GST_CAT_DEFAULT rtp_session_debug
27 
28 #define TWCC_EXTMAP_STR "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"
29 
30 #define REF_TIME_UNIT (64 * GST_MSECOND)
31 #define DELTA_UNIT (250 * GST_USECOND)
32 #define MAX_TS_DELTA (0xff * DELTA_UNIT)
33 
34 #define STATUS_VECTOR_MAX_CAPACITY 14
35 #define STATUS_VECTOR_TWO_BIT_MAX_CAPACITY 7
36 
37 typedef enum
38 {
39   RTP_TWCC_CHUNK_TYPE_RUN_LENGTH = 0,
40   RTP_TWCC_CHUNK_TYPE_STATUS_VECTOR = 1,
41 } RTPTWCCChunkType;
42 
43 typedef struct
44 {
45   guint8 base_seqnum[2];
46   guint8 packet_count[2];
47   guint8 base_time[3];
48   guint8 fb_pkt_count[1];
49 } RTPTWCCHeader;
50 
51 typedef struct
52 {
53   GstClockTime ts;
54   guint16 seqnum;
55 
56   gint64 delta;
57   RTPTWCCPacketStatus status;
58   guint16 missing_run;
59   guint equal_run;
60 } RecvPacket;
61 
62 typedef struct
63 {
64   GstClockTime ts;
65   GstClockTime socket_ts;
66   GstClockTime remote_ts;
67   guint16 seqnum;
68   guint8 pt;
69   guint size;
70   gboolean lost;
71 } SentPacket;
72 
73 struct _RTPTWCCManager
74 {
75   GObject object;
76 
77   guint8 send_ext_id;
78   guint8 recv_ext_id;
79   guint16 send_seqnum;
80 
81   guint mtu;
82   guint max_packets_per_rtcp;
83   GArray *recv_packets;
84 
85   guint64 fb_pkt_count;
86   gint32 last_seqnum;
87 
88   GArray *sent_packets;
89   GArray *parsed_packets;
90   GQueue *rtcp_buffers;
91 
92   guint64 recv_sender_ssrc;
93   guint64 recv_media_ssrc;
94 
95   guint16 expected_recv_seqnum;
96   guint16 packet_count_no_marker;
97 
98   gboolean first_fci_parse;
99   guint16 expected_parsed_seqnum;
100   guint8 expected_parsed_fb_pkt_count;
101 
102   GstClockTime next_feedback_send_time;
103   GstClockTime feedback_interval;
104 };
105 
106 G_DEFINE_TYPE (RTPTWCCManager, rtp_twcc_manager, G_TYPE_OBJECT);
107 
108 static void
rtp_twcc_manager_init(RTPTWCCManager * twcc)109 rtp_twcc_manager_init (RTPTWCCManager * twcc)
110 {
111   twcc->recv_packets = g_array_new (FALSE, FALSE, sizeof (RecvPacket));
112   twcc->sent_packets = g_array_new (FALSE, FALSE, sizeof (SentPacket));
113   twcc->parsed_packets = g_array_new (FALSE, FALSE, sizeof (RecvPacket));
114 
115   twcc->rtcp_buffers = g_queue_new ();
116 
117   twcc->last_seqnum = -1;
118   twcc->recv_media_ssrc = -1;
119   twcc->recv_sender_ssrc = -1;
120 
121   twcc->first_fci_parse = TRUE;
122 
123   twcc->feedback_interval = GST_CLOCK_TIME_NONE;
124   twcc->next_feedback_send_time = GST_CLOCK_TIME_NONE;
125 }
126 
127 static void
rtp_twcc_manager_finalize(GObject * object)128 rtp_twcc_manager_finalize (GObject * object)
129 {
130   RTPTWCCManager *twcc = RTP_TWCC_MANAGER_CAST (object);
131 
132   g_array_unref (twcc->recv_packets);
133   g_array_unref (twcc->sent_packets);
134   g_array_unref (twcc->parsed_packets);
135   g_queue_free_full (twcc->rtcp_buffers, (GDestroyNotify) gst_buffer_unref);
136 
137   G_OBJECT_CLASS (rtp_twcc_manager_parent_class)->finalize (object);
138 }
139 
140 static void
rtp_twcc_manager_class_init(RTPTWCCManagerClass * klass)141 rtp_twcc_manager_class_init (RTPTWCCManagerClass * klass)
142 {
143   GObjectClass *gobject_class = (GObjectClass *) klass;
144   gobject_class->finalize = rtp_twcc_manager_finalize;
145 }
146 
147 RTPTWCCManager *
rtp_twcc_manager_new(guint mtu)148 rtp_twcc_manager_new (guint mtu)
149 {
150   RTPTWCCManager *twcc = g_object_new (RTP_TYPE_TWCC_MANAGER, NULL);
151 
152   rtp_twcc_manager_set_mtu (twcc, mtu);
153 
154   return twcc;
155 }
156 
157 static void
recv_packet_init(RecvPacket * packet,guint16 seqnum,RTPPacketInfo * pinfo)158 recv_packet_init (RecvPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo)
159 {
160   memset (packet, 0, sizeof (RecvPacket));
161   packet->seqnum = seqnum;
162 
163   if (GST_CLOCK_TIME_IS_VALID (pinfo->arrival_time))
164     packet->ts = pinfo->arrival_time;
165   else
166     packet->ts = pinfo->current_time;
167 }
168 
169 static guint8
_get_extmap_id_for_attribute(const GstStructure * s,const gchar * ext_name)170 _get_extmap_id_for_attribute (const GstStructure * s, const gchar * ext_name)
171 {
172   guint i;
173   guint8 extmap_id = 0;
174   guint n_fields = gst_structure_n_fields (s);
175 
176   for (i = 0; i < n_fields; i++) {
177     const gchar *field_name = gst_structure_nth_field_name (s, i);
178     if (g_str_has_prefix (field_name, "extmap-")) {
179       const gchar *str = gst_structure_get_string (s, field_name);
180       if (str && g_strcmp0 (str, ext_name) == 0) {
181         gint64 id = g_ascii_strtoll (field_name + 7, NULL, 10);
182         if (id > 0 && id < 15) {
183           extmap_id = id;
184           break;
185         }
186       }
187     }
188   }
189   return extmap_id;
190 }
191 
192 void
rtp_twcc_manager_parse_recv_ext_id(RTPTWCCManager * twcc,const GstStructure * s)193 rtp_twcc_manager_parse_recv_ext_id (RTPTWCCManager * twcc,
194     const GstStructure * s)
195 {
196   guint8 recv_ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR);
197   if (recv_ext_id > 0) {
198     twcc->recv_ext_id = recv_ext_id;
199     GST_INFO ("TWCC enabled for recv using extension id: %u",
200         twcc->recv_ext_id);
201   }
202 }
203 
204 void
rtp_twcc_manager_parse_send_ext_id(RTPTWCCManager * twcc,const GstStructure * s)205 rtp_twcc_manager_parse_send_ext_id (RTPTWCCManager * twcc,
206     const GstStructure * s)
207 {
208   guint8 send_ext_id = _get_extmap_id_for_attribute (s, TWCC_EXTMAP_STR);
209   if (send_ext_id > 0) {
210     twcc->send_ext_id = send_ext_id;
211     GST_INFO ("TWCC enabled for send using extension id: %u",
212         twcc->send_ext_id);
213   }
214 }
215 
216 void
rtp_twcc_manager_set_mtu(RTPTWCCManager * twcc,guint mtu)217 rtp_twcc_manager_set_mtu (RTPTWCCManager * twcc, guint mtu)
218 {
219   twcc->mtu = mtu;
220 
221   /* the absolute worst case is that 7 packets uses
222      header (4 * 4 * 4) 32 bytes) and
223      packet_chunk 2 bytes +
224      recv_deltas (2 * 7) 14 bytes */
225   twcc->max_packets_per_rtcp = ((twcc->mtu - 32) * 7) / (2 + 14);
226 }
227 
228 void
rtp_twcc_manager_set_feedback_interval(RTPTWCCManager * twcc,GstClockTime feedback_interval)229 rtp_twcc_manager_set_feedback_interval (RTPTWCCManager * twcc,
230     GstClockTime feedback_interval)
231 {
232   twcc->feedback_interval = feedback_interval;
233 }
234 
235 GstClockTime
rtp_twcc_manager_get_feedback_interval(RTPTWCCManager * twcc)236 rtp_twcc_manager_get_feedback_interval (RTPTWCCManager * twcc)
237 {
238   return twcc->feedback_interval;
239 }
240 
241 static gboolean
_get_twcc_seqnum_data(RTPPacketInfo * pinfo,guint8 ext_id,gpointer * data)242 _get_twcc_seqnum_data (RTPPacketInfo * pinfo, guint8 ext_id, gpointer * data)
243 {
244   gboolean ret = FALSE;
245   guint size;
246 
247   if (pinfo->header_ext &&
248       gst_rtp_buffer_get_extension_onebyte_header_from_bytes (pinfo->header_ext,
249           pinfo->header_ext_bit_pattern, ext_id, 0, data, &size)) {
250     if (size == 2)
251       ret = TRUE;
252   }
253   return ret;
254 }
255 
256 static gboolean
sent_packet_init(SentPacket * packet,guint16 seqnum,RTPPacketInfo * pinfo,GstBuffer * buffer)257 sent_packet_init (SentPacket * packet, guint16 seqnum, RTPPacketInfo * pinfo,
258     GstBuffer * buffer)
259 {
260   GstRTPBuffer rtp = { NULL };
261 
262   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
263     goto invalid_packet;
264 
265   packet->seqnum = seqnum;
266   packet->ts = pinfo->current_time;
267   packet->size = gst_rtp_buffer_get_payload_len (&rtp);
268   packet->pt = gst_rtp_buffer_get_payload_type (&rtp);
269   packet->remote_ts = GST_CLOCK_TIME_NONE;
270   packet->socket_ts = GST_CLOCK_TIME_NONE;
271   packet->lost = FALSE;
272 
273   gst_rtp_buffer_unmap (&rtp);
274 
275   return TRUE;
276 
277 invalid_packet:
278   {
279     GST_DEBUG ("invalid RTP packet received");
280     return FALSE;
281   }
282 }
283 
284 static void
_set_twcc_seqnum_data(RTPTWCCManager * twcc,RTPPacketInfo * pinfo,GstBuffer * buf,guint8 ext_id)285 _set_twcc_seqnum_data (RTPTWCCManager * twcc, RTPPacketInfo * pinfo,
286     GstBuffer * buf, guint8 ext_id)
287 {
288   SentPacket packet;
289   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
290   gpointer data;
291 
292   if (gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp)) {
293     if (gst_rtp_buffer_get_extension_onebyte_header (&rtp,
294             ext_id, 0, &data, NULL)) {
295       guint16 seqnum = twcc->send_seqnum++;
296 
297       GST_WRITE_UINT16_BE (data, seqnum);
298       sent_packet_init (&packet, seqnum, pinfo, buf);
299       g_array_append_val (twcc->sent_packets, packet);
300 
301       GST_LOG ("Send: twcc-seqnum: %u, pt: %u, marker: %d, len: %u, ts: %"
302           GST_TIME_FORMAT, seqnum, pinfo->pt, pinfo->marker, packet.size,
303           GST_TIME_ARGS (pinfo->current_time));
304     }
305     gst_rtp_buffer_unmap (&rtp);
306   }
307 }
308 
309 static void
rtp_twcc_manager_set_send_twcc_seqnum(RTPTWCCManager * twcc,RTPPacketInfo * pinfo)310 rtp_twcc_manager_set_send_twcc_seqnum (RTPTWCCManager * twcc,
311     RTPPacketInfo * pinfo)
312 {
313   if (GST_IS_BUFFER_LIST (pinfo->data)) {
314     GstBufferList *list;
315     guint i = 0;
316 
317     pinfo->data = gst_buffer_list_make_writable (pinfo->data);
318 
319     list = GST_BUFFER_LIST (pinfo->data);
320 
321     for (i = 0; i < gst_buffer_list_length (list); i++) {
322       GstBuffer *buffer = gst_buffer_list_get_writable (list, i);
323 
324       _set_twcc_seqnum_data (twcc, pinfo, buffer, twcc->send_ext_id);
325     }
326   } else {
327     pinfo->data = gst_buffer_make_writable (pinfo->data);
328     _set_twcc_seqnum_data (twcc, pinfo, pinfo->data, twcc->send_ext_id);
329   }
330 }
331 
332 static gint32
rtp_twcc_manager_get_recv_twcc_seqnum(RTPTWCCManager * twcc,RTPPacketInfo * pinfo)333 rtp_twcc_manager_get_recv_twcc_seqnum (RTPTWCCManager * twcc,
334     RTPPacketInfo * pinfo)
335 {
336   gint32 val = -1;
337   gpointer data;
338 
339   if (twcc->recv_ext_id == 0) {
340     GST_DEBUG ("Received TWCC packet, but no extension registered; ignoring");
341     return val;
342   }
343 
344   if (_get_twcc_seqnum_data (pinfo, twcc->recv_ext_id, &data)) {
345     val = GST_READ_UINT16_BE (data);
346   }
347 
348   return val;
349 }
350 
351 static gint
_twcc_seqnum_sort(gconstpointer a,gconstpointer b)352 _twcc_seqnum_sort (gconstpointer a, gconstpointer b)
353 {
354   gint32 seqa = ((RecvPacket *) a)->seqnum;
355   gint32 seqb = ((RecvPacket *) b)->seqnum;
356   gint res = seqa - seqb;
357   if (res < -65000)
358     res = 1;
359   if (res > 65000)
360     res = -1;
361   return res;
362 }
363 
364 static void
rtp_twcc_write_recv_deltas(guint8 * fci_data,GArray * twcc_packets)365 rtp_twcc_write_recv_deltas (guint8 * fci_data, GArray * twcc_packets)
366 {
367   guint i;
368   for (i = 0; i < twcc_packets->len; i++) {
369     RecvPacket *pkt = &g_array_index (twcc_packets, RecvPacket, i);
370 
371     if (pkt->status == RTP_TWCC_PACKET_STATUS_SMALL_DELTA) {
372       GST_WRITE_UINT8 (fci_data, pkt->delta);
373       fci_data += 1;
374     } else if (pkt->status == RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA) {
375       GST_WRITE_UINT16_BE (fci_data, pkt->delta);
376       fci_data += 2;
377     }
378   }
379 }
380 
381 static void
rtp_twcc_write_run_length_chunk(GArray * packet_chunks,RTPTWCCPacketStatus status,guint run_length)382 rtp_twcc_write_run_length_chunk (GArray * packet_chunks,
383     RTPTWCCPacketStatus status, guint run_length)
384 {
385   guint written = 0;
386   while (written < run_length) {
387     GstBitWriter writer;
388     guint16 data = 0;
389     guint len = MIN (run_length - written, 8191);
390 
391     GST_LOG ("Writing a run-length of %u with status %u", len, status);
392 
393     gst_bit_writer_init_with_data (&writer, (guint8 *) & data, 2, FALSE);
394     gst_bit_writer_put_bits_uint8 (&writer, RTP_TWCC_CHUNK_TYPE_RUN_LENGTH, 1);
395     gst_bit_writer_put_bits_uint8 (&writer, status, 2);
396     gst_bit_writer_put_bits_uint16 (&writer, len, 13);
397     g_array_append_val (packet_chunks, data);
398     written += len;
399   }
400 }
401 
402 typedef struct
403 {
404   GArray *packet_chunks;
405   GstBitWriter writer;
406   guint16 data;
407   guint symbol_size;
408 } ChunkBitWriter;
409 
410 static void
chunk_bit_writer_reset(ChunkBitWriter * writer)411 chunk_bit_writer_reset (ChunkBitWriter * writer)
412 {
413   writer->data = 0;
414   gst_bit_writer_init_with_data (&writer->writer,
415       (guint8 *) & writer->data, 2, FALSE);
416 
417   gst_bit_writer_put_bits_uint8 (&writer->writer,
418       RTP_TWCC_CHUNK_TYPE_STATUS_VECTOR, 1);
419   /* 1 for 2-bit symbol-size, 0 for 1-bit */
420   gst_bit_writer_put_bits_uint8 (&writer->writer, writer->symbol_size - 1, 1);
421 }
422 
423 static void
chunk_bit_writer_configure(ChunkBitWriter * writer,guint symbol_size)424 chunk_bit_writer_configure (ChunkBitWriter * writer, guint symbol_size)
425 {
426   writer->symbol_size = symbol_size;
427   chunk_bit_writer_reset (writer);
428 }
429 
430 static gboolean
chunk_bit_writer_is_empty(ChunkBitWriter * writer)431 chunk_bit_writer_is_empty (ChunkBitWriter * writer)
432 {
433   return writer->writer.bit_size == 2;
434 }
435 
436 static gboolean
chunk_bit_writer_is_full(ChunkBitWriter * writer)437 chunk_bit_writer_is_full (ChunkBitWriter * writer)
438 {
439   return writer->writer.bit_size == 16;
440 }
441 
442 static guint
chunk_bit_writer_get_available_slots(ChunkBitWriter * writer)443 chunk_bit_writer_get_available_slots (ChunkBitWriter * writer)
444 {
445   return (16 - writer->writer.bit_size) / writer->symbol_size;
446 }
447 
448 static guint
chunk_bit_writer_get_total_slots(ChunkBitWriter * writer)449 chunk_bit_writer_get_total_slots (ChunkBitWriter * writer)
450 {
451   return STATUS_VECTOR_MAX_CAPACITY / writer->symbol_size;
452 }
453 
454 static void
chunk_bit_writer_flush(ChunkBitWriter * writer)455 chunk_bit_writer_flush (ChunkBitWriter * writer)
456 {
457   /* don't append a chunk if no bits have been written */
458   if (!chunk_bit_writer_is_empty (writer)) {
459     g_array_append_val (writer->packet_chunks, writer->data);
460     chunk_bit_writer_reset (writer);
461   }
462 }
463 
464 static void
chunk_bit_writer_init(ChunkBitWriter * writer,GArray * packet_chunks,guint symbol_size)465 chunk_bit_writer_init (ChunkBitWriter * writer,
466     GArray * packet_chunks, guint symbol_size)
467 {
468   writer->packet_chunks = packet_chunks;
469   chunk_bit_writer_configure (writer, symbol_size);
470 }
471 
472 static void
chunk_bit_writer_write(ChunkBitWriter * writer,RTPTWCCPacketStatus status)473 chunk_bit_writer_write (ChunkBitWriter * writer, RTPTWCCPacketStatus status)
474 {
475   gst_bit_writer_put_bits_uint8 (&writer->writer, status, writer->symbol_size);
476   if (chunk_bit_writer_is_full (writer)) {
477     chunk_bit_writer_flush (writer);
478   }
479 }
480 
481 static void
rtp_twcc_write_status_vector_chunk(ChunkBitWriter * writer,RecvPacket * pkt)482 rtp_twcc_write_status_vector_chunk (ChunkBitWriter * writer, RecvPacket * pkt)
483 {
484   if (pkt->missing_run > 0) {
485     guint available = chunk_bit_writer_get_available_slots (writer);
486     guint total = chunk_bit_writer_get_total_slots (writer);
487     guint i;
488 
489     if (pkt->missing_run > (available + total)) {
490       /* here it is better to finish up the current status-chunk and then
491          go for run-length */
492       for (i = 0; i < available; i++) {
493         chunk_bit_writer_write (writer, RTP_TWCC_PACKET_STATUS_NOT_RECV);
494       }
495       rtp_twcc_write_run_length_chunk (writer->packet_chunks,
496           RTP_TWCC_PACKET_STATUS_NOT_RECV, pkt->missing_run - available);
497     } else {
498       for (i = 0; i < pkt->missing_run; i++) {
499         chunk_bit_writer_write (writer, RTP_TWCC_PACKET_STATUS_NOT_RECV);
500       }
501     }
502   }
503 
504   chunk_bit_writer_write (writer, pkt->status);
505 }
506 
507 typedef struct
508 {
509   RecvPacket *equal;
510 } RunLengthHelper;
511 
512 static void
run_lenght_helper_update(RunLengthHelper * rlh,RecvPacket * pkt)513 run_lenght_helper_update (RunLengthHelper * rlh, RecvPacket * pkt)
514 {
515   /* for missing packets we reset */
516   if (pkt->missing_run > 0) {
517     rlh->equal = NULL;
518   }
519 
520   /* all status equal run */
521   if (rlh->equal == NULL) {
522     rlh->equal = pkt;
523     rlh->equal->equal_run = 0;
524   }
525 
526   if (rlh->equal->status == pkt->status) {
527     rlh->equal->equal_run++;
528   } else {
529     rlh->equal = pkt;
530     rlh->equal->equal_run = 1;
531   }
532 }
533 
534 static guint
_get_max_packets_capacity(guint symbol_size)535 _get_max_packets_capacity (guint symbol_size)
536 {
537   if (symbol_size == 2)
538     return STATUS_VECTOR_TWO_BIT_MAX_CAPACITY;
539 
540   return STATUS_VECTOR_MAX_CAPACITY;
541 }
542 
543 static gboolean
_pkt_fits_run_length_chunk(RecvPacket * pkt,guint packets_per_chunks,guint remaining_packets)544 _pkt_fits_run_length_chunk (RecvPacket * pkt, guint packets_per_chunks,
545     guint remaining_packets)
546 {
547   if (pkt->missing_run == 0) {
548     /* we have more or the same equal packets than the ones we can write in to a status chunk */
549     if (pkt->equal_run >= packets_per_chunks)
550       return TRUE;
551 
552     /* we have more than one equal and not enough space for the remainings */
553     if (pkt->equal_run > 1 && remaining_packets > STATUS_VECTOR_MAX_CAPACITY)
554       return TRUE;
555 
556     /* we have all equal packets for the remaining to write */
557     if (pkt->equal_run == remaining_packets)
558       return TRUE;
559   }
560 
561   return FALSE;
562 }
563 
564 static void
rtp_twcc_write_chunks(GArray * packet_chunks,GArray * twcc_packets,guint symbol_size)565 rtp_twcc_write_chunks (GArray * packet_chunks,
566     GArray * twcc_packets, guint symbol_size)
567 {
568   ChunkBitWriter writer;
569   guint i;
570   guint packets_per_chunks = _get_max_packets_capacity (symbol_size);
571 
572   chunk_bit_writer_init (&writer, packet_chunks, symbol_size);
573 
574   for (i = 0; i < twcc_packets->len; i++) {
575     RecvPacket *pkt = &g_array_index (twcc_packets, RecvPacket, i);
576     guint remaining_packets = twcc_packets->len - i;
577 
578     GST_LOG
579         ("About to write pkt: #%u missing_run: %u equal_run: %u status: %u, remaining_packets: %u",
580         pkt->seqnum, pkt->missing_run, pkt->equal_run, pkt->status,
581         remaining_packets);
582 
583     /* we can only start a run-length chunk if the status-chunk is
584        completed */
585     if (chunk_bit_writer_is_empty (&writer)) {
586       /* first write in any preceeding gaps, we use run-length
587          if it would take up more than one chunk (14/7) */
588       if (pkt->missing_run > packets_per_chunks) {
589         rtp_twcc_write_run_length_chunk (packet_chunks,
590             RTP_TWCC_PACKET_STATUS_NOT_RECV, pkt->missing_run);
591       }
592 
593       /* we have a run of the same status, write a run-length chunk and skip
594          to the next point */
595       if (_pkt_fits_run_length_chunk (pkt, packets_per_chunks,
596               remaining_packets)) {
597 
598         rtp_twcc_write_run_length_chunk (packet_chunks,
599             pkt->status, pkt->equal_run);
600         i += pkt->equal_run - 1;
601         continue;
602       }
603     }
604 
605     GST_LOG ("i=%u: Writing a %u-bit vector of status: %u",
606         i, symbol_size, pkt->status);
607     rtp_twcc_write_status_vector_chunk (&writer, pkt);
608   }
609   chunk_bit_writer_flush (&writer);
610 }
611 
612 static void
rtp_twcc_manager_add_fci(RTPTWCCManager * twcc,GstRTCPPacket * packet)613 rtp_twcc_manager_add_fci (RTPTWCCManager * twcc, GstRTCPPacket * packet)
614 {
615   RecvPacket *first, *last, *prev;
616   guint16 packet_count;
617   GstClockTime base_time;
618   GstClockTime ts_rounded;
619   guint i;
620   GArray *packet_chunks = g_array_new (FALSE, FALSE, 2);
621   RTPTWCCHeader header;
622   guint header_size = sizeof (RTPTWCCHeader);
623   guint packet_chunks_size;
624   guint recv_deltas_size = 0;
625   guint16 fci_length;
626   guint16 fci_chunks;
627   guint8 *fci_data;
628   guint8 *fci_data_ptr;
629   RunLengthHelper rlh = { NULL };
630   guint symbol_size = 1;
631   GstClockTimeDiff delta_ts;
632   gint64 delta_ts_rounded;
633   guint8 fb_pkt_count;
634 
635   g_array_sort (twcc->recv_packets, _twcc_seqnum_sort);
636 
637   /* get first and last packet */
638   first = &g_array_index (twcc->recv_packets, RecvPacket, 0);
639   last =
640       &g_array_index (twcc->recv_packets, RecvPacket,
641       twcc->recv_packets->len - 1);
642 
643   packet_count = last->seqnum - first->seqnum + 1;
644   base_time = first->ts / REF_TIME_UNIT;
645   fb_pkt_count = (guint8) (twcc->fb_pkt_count % G_MAXUINT8);
646 
647   GST_WRITE_UINT16_BE (header.base_seqnum, first->seqnum);
648   GST_WRITE_UINT16_BE (header.packet_count, packet_count);
649   GST_WRITE_UINT24_BE (header.base_time, base_time);
650   GST_WRITE_UINT8 (header.fb_pkt_count, fb_pkt_count);
651 
652   base_time *= REF_TIME_UNIT;
653   ts_rounded = base_time;
654 
655   GST_DEBUG ("Created TWCC feedback: base_seqnum: #%u, packet_count: %u, "
656       "base_time %" GST_TIME_FORMAT " fb_pkt_count: %u",
657       first->seqnum, packet_count, GST_TIME_ARGS (base_time), fb_pkt_count);
658 
659   twcc->fb_pkt_count++;
660   twcc->expected_recv_seqnum = first->seqnum + packet_count;
661 
662   /* calculate all deltas and check for gaps etc */
663   prev = first;
664   for (i = 0; i < twcc->recv_packets->len; i++) {
665     RecvPacket *pkt = &g_array_index (twcc->recv_packets, RecvPacket, i);
666     if (i != 0) {
667       pkt->missing_run = pkt->seqnum - prev->seqnum - 1;
668     }
669 
670     delta_ts = GST_CLOCK_DIFF (ts_rounded, pkt->ts);
671     pkt->delta = delta_ts / DELTA_UNIT;
672     delta_ts_rounded = pkt->delta * DELTA_UNIT;
673     ts_rounded += delta_ts_rounded;
674 
675     if (delta_ts_rounded < 0 || delta_ts_rounded > MAX_TS_DELTA) {
676       pkt->status = RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA;
677       recv_deltas_size += 2;
678       symbol_size = 2;
679     } else {
680       pkt->status = RTP_TWCC_PACKET_STATUS_SMALL_DELTA;
681       recv_deltas_size += 1;
682     }
683     run_lenght_helper_update (&rlh, pkt);
684 
685     GST_LOG ("pkt: #%u, ts: %" GST_TIME_FORMAT
686         " ts_rounded: %" GST_TIME_FORMAT
687         " delta_ts: %" GST_STIME_FORMAT
688         " delta_ts_rounded: %" GST_STIME_FORMAT
689         " missing_run: %u, status: %u", pkt->seqnum,
690         GST_TIME_ARGS (pkt->ts), GST_TIME_ARGS (ts_rounded),
691         GST_STIME_ARGS (delta_ts), GST_STIME_ARGS (delta_ts_rounded),
692         pkt->missing_run, pkt->status);
693     prev = pkt;
694   }
695 
696   rtp_twcc_write_chunks (packet_chunks, twcc->recv_packets, symbol_size);
697 
698   packet_chunks_size = packet_chunks->len * 2;
699   fci_length = header_size + packet_chunks_size + recv_deltas_size;
700   fci_chunks = (fci_length - 1) / sizeof (guint32) + 1;
701 
702   if (!gst_rtcp_packet_fb_set_fci_length (packet, fci_chunks)) {
703     GST_ERROR ("Could not fit: %u packets", packet_count);
704     g_assert_not_reached ();
705   }
706 
707   fci_data = gst_rtcp_packet_fb_get_fci (packet);
708   fci_data_ptr = fci_data;
709 
710   memcpy (fci_data_ptr, &header, header_size);
711   fci_data_ptr += header_size;
712 
713   memcpy (fci_data_ptr, packet_chunks->data, packet_chunks_size);
714   fci_data_ptr += packet_chunks_size;
715 
716   rtp_twcc_write_recv_deltas (fci_data_ptr, twcc->recv_packets);
717 
718   GST_MEMDUMP ("twcc-header:", (guint8 *) & header, header_size);
719   GST_MEMDUMP ("packet-chunks:", (guint8 *) packet_chunks->data,
720       packet_chunks_size);
721   GST_MEMDUMP ("full fci:", fci_data, fci_length);
722 
723   g_array_unref (packet_chunks);
724   g_array_set_size (twcc->recv_packets, 0);
725 }
726 
727 static void
rtp_twcc_manager_create_feedback(RTPTWCCManager * twcc)728 rtp_twcc_manager_create_feedback (RTPTWCCManager * twcc)
729 {
730   GstBuffer *buf;
731   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
732   GstRTCPPacket packet;
733 
734   buf = gst_rtcp_buffer_new (twcc->mtu);
735 
736   gst_rtcp_buffer_map (buf, GST_MAP_READWRITE, &rtcp);
737 
738   gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_RTPFB, &packet);
739 
740   gst_rtcp_packet_fb_set_type (&packet, GST_RTCP_RTPFB_TYPE_TWCC);
741   if (twcc->recv_sender_ssrc != 1)
742     gst_rtcp_packet_fb_set_sender_ssrc (&packet, twcc->recv_sender_ssrc);
743   gst_rtcp_packet_fb_set_media_ssrc (&packet, twcc->recv_media_ssrc);
744 
745   rtp_twcc_manager_add_fci (twcc, &packet);
746 
747   gst_rtcp_buffer_unmap (&rtcp);
748 
749   g_queue_push_tail (twcc->rtcp_buffers, buf);
750 }
751 
752 /* we have calculated a (very pessimistic) max-packets per RTCP feedback,
753    so this is to make sure we don't exceed that */
754 static gboolean
_exceeds_max_packets(RTPTWCCManager * twcc,guint16 seqnum)755 _exceeds_max_packets (RTPTWCCManager * twcc, guint16 seqnum)
756 {
757   if (twcc->recv_packets->len + 1 > twcc->max_packets_per_rtcp)
758     return TRUE;
759 
760   return FALSE;
761 }
762 
763 /* in this case we could have lost the packet with the marker bit,
764    so with a large (30) amount of packets, lost packets and still no marker,
765    we send a feedback anyway */
766 static gboolean
_many_packets_some_lost(RTPTWCCManager * twcc,guint16 seqnum)767 _many_packets_some_lost (RTPTWCCManager * twcc, guint16 seqnum)
768 {
769   RecvPacket *first;
770   guint16 packet_count;
771   guint received_packets = twcc->recv_packets->len;
772   guint lost_packets;
773   if (received_packets == 0)
774     return FALSE;
775 
776   first = &g_array_index (twcc->recv_packets, RecvPacket, 0);
777   packet_count = seqnum - first->seqnum + 1;
778 
779   /* check if we lost half of the threshold */
780   lost_packets = packet_count - received_packets;
781   if (received_packets >= 30 && lost_packets >= 60)
782     return TRUE;
783 
784   /* we have lost the marker bit for some and lost some */
785   if (twcc->packet_count_no_marker >= 10 && lost_packets >= 60)
786     return TRUE;
787 
788   return FALSE;
789 }
790 
791 gboolean
rtp_twcc_manager_recv_packet(RTPTWCCManager * twcc,RTPPacketInfo * pinfo)792 rtp_twcc_manager_recv_packet (RTPTWCCManager * twcc, RTPPacketInfo * pinfo)
793 {
794   gboolean send_feedback = FALSE;
795   RecvPacket packet;
796   gint32 seqnum;
797   gint diff;
798 
799   seqnum = rtp_twcc_manager_get_recv_twcc_seqnum (twcc, pinfo);
800   if (seqnum == -1)
801     return FALSE;
802 
803   /* if this packet would exceed the capacity of our MTU, we create a feedback
804      with the current packets, and start over with this one */
805   if (_exceeds_max_packets (twcc, seqnum)) {
806     GST_INFO ("twcc-seqnum: %u would overflow max packets: %u, create feedback"
807         " with current packets", seqnum, twcc->max_packets_per_rtcp);
808     rtp_twcc_manager_create_feedback (twcc);
809     send_feedback = TRUE;
810   }
811 
812   /* we can have multiple ssrcs here, so just pick the first one */
813   if (twcc->recv_media_ssrc == -1)
814     twcc->recv_media_ssrc = pinfo->ssrc;
815 
816   /* check if we are reordered, and treat it as lost if we already sent
817      a feedback msg with a higher seqnum. If the diff is huge, treat
818      it as a restart of a stream */
819   diff = gst_rtp_buffer_compare_seqnum (twcc->expected_recv_seqnum, seqnum);
820   if (twcc->fb_pkt_count > 0 && diff < 0) {
821     GST_INFO ("Received out of order packet (%u after %u), treating as lost",
822         seqnum, twcc->expected_recv_seqnum);
823     return FALSE;
824   }
825 
826   if (twcc->recv_packets->len > 0) {
827     RecvPacket *last = &g_array_index (twcc->recv_packets, RecvPacket,
828         twcc->recv_packets->len - 1);
829 
830     diff = gst_rtp_buffer_compare_seqnum (last->seqnum, seqnum);
831     if (diff == 0) {
832       GST_INFO ("Received duplicate packet (%u), dropping", seqnum);
833       return FALSE;
834     }
835   }
836 
837   /* store the packet for Transport-wide RTCP feedback message */
838   recv_packet_init (&packet, seqnum, pinfo);
839   g_array_append_val (twcc->recv_packets, packet);
840   twcc->last_seqnum = seqnum;
841 
842   GST_LOG ("Receive: twcc-seqnum: %u, pt: %u, marker: %d, ts: %"
843       GST_TIME_FORMAT, seqnum, pinfo->pt, pinfo->marker,
844       GST_TIME_ARGS (pinfo->arrival_time));
845 
846   if (!pinfo->marker)
847     twcc->packet_count_no_marker++;
848 
849   /* are we sending on an interval, or based on marker bit */
850   if (GST_CLOCK_TIME_IS_VALID (twcc->feedback_interval)) {
851     if (!GST_CLOCK_TIME_IS_VALID (twcc->next_feedback_send_time))
852       twcc->next_feedback_send_time =
853           pinfo->running_time + twcc->feedback_interval;
854 
855     if (pinfo->running_time >= twcc->next_feedback_send_time) {
856       rtp_twcc_manager_create_feedback (twcc);
857       send_feedback = TRUE;
858 
859       while (pinfo->running_time >= twcc->next_feedback_send_time)
860         twcc->next_feedback_send_time += twcc->feedback_interval;
861     }
862   } else if (pinfo->marker || _many_packets_some_lost (twcc, seqnum)) {
863     rtp_twcc_manager_create_feedback (twcc);
864     send_feedback = TRUE;
865 
866     twcc->packet_count_no_marker = 0;
867   }
868 
869   return send_feedback;
870 }
871 
872 static void
_change_rtcp_fb_sender_ssrc(GstBuffer * buf,guint32 sender_ssrc)873 _change_rtcp_fb_sender_ssrc (GstBuffer * buf, guint32 sender_ssrc)
874 {
875   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
876   GstRTCPPacket packet;
877   gst_rtcp_buffer_map (buf, GST_MAP_READWRITE, &rtcp);
878   gst_rtcp_buffer_get_first_packet (&rtcp, &packet);
879   gst_rtcp_packet_fb_set_sender_ssrc (&packet, sender_ssrc);
880   gst_rtcp_buffer_unmap (&rtcp);
881 }
882 
883 GstBuffer *
rtp_twcc_manager_get_feedback(RTPTWCCManager * twcc,guint sender_ssrc)884 rtp_twcc_manager_get_feedback (RTPTWCCManager * twcc, guint sender_ssrc)
885 {
886   GstBuffer *buf;
887   buf = g_queue_pop_head (twcc->rtcp_buffers);
888 
889   if (buf && twcc->recv_sender_ssrc != sender_ssrc) {
890     _change_rtcp_fb_sender_ssrc (buf, sender_ssrc);
891     twcc->recv_sender_ssrc = sender_ssrc;
892   }
893 
894   return buf;
895 }
896 
897 void
rtp_twcc_manager_send_packet(RTPTWCCManager * twcc,RTPPacketInfo * pinfo)898 rtp_twcc_manager_send_packet (RTPTWCCManager * twcc, RTPPacketInfo * pinfo)
899 {
900   if (twcc->send_ext_id == 0)
901     return;
902 
903   rtp_twcc_manager_set_send_twcc_seqnum (twcc, pinfo);
904 }
905 
906 static void
_add_twcc_packet(GArray * twcc_packets,guint16 seqnum,guint status)907 _add_twcc_packet (GArray * twcc_packets, guint16 seqnum, guint status)
908 {
909   RTPTWCCPacket packet;
910   memset (&packet, 0, sizeof (RTPTWCCPacket));
911   packet.local_ts = GST_CLOCK_TIME_NONE;
912   packet.remote_ts = GST_CLOCK_TIME_NONE;
913   packet.local_delta = GST_CLOCK_STIME_NONE;
914   packet.remote_delta = GST_CLOCK_STIME_NONE;
915   packet.delta_delta = GST_CLOCK_STIME_NONE;
916   packet.seqnum = seqnum;
917   packet.status = status;
918   g_array_append_val (twcc_packets, packet);
919 }
920 
921 static guint
_parse_run_length_chunk(GstBitReader * reader,GArray * twcc_packets,guint16 seqnum_offset,guint remaining_packets)922 _parse_run_length_chunk (GstBitReader * reader, GArray * twcc_packets,
923     guint16 seqnum_offset, guint remaining_packets)
924 {
925   guint16 run_length;
926   guint8 status_code;
927   guint i;
928 
929   gst_bit_reader_get_bits_uint8 (reader, &status_code, 2);
930   gst_bit_reader_get_bits_uint16 (reader, &run_length, 13);
931 
932   run_length = MIN (remaining_packets, run_length);
933 
934   for (i = 0; i < run_length; i++) {
935     _add_twcc_packet (twcc_packets, seqnum_offset + i, status_code);
936   }
937 
938   return run_length;
939 }
940 
941 static guint
_parse_status_vector_chunk(GstBitReader * reader,GArray * twcc_packets,guint16 seqnum_offset,guint remaining_packets)942 _parse_status_vector_chunk (GstBitReader * reader, GArray * twcc_packets,
943     guint16 seqnum_offset, guint remaining_packets)
944 {
945   guint8 symbol_size;
946   guint num_bits;
947   guint i;
948 
949   gst_bit_reader_get_bits_uint8 (reader, &symbol_size, 1);
950   symbol_size += 1;
951   num_bits = MIN (remaining_packets, 14 / symbol_size);
952 
953   for (i = 0; i < num_bits; i++) {
954     guint8 status_code;
955     if (gst_bit_reader_get_bits_uint8 (reader, &status_code, symbol_size))
956       _add_twcc_packet (twcc_packets, seqnum_offset + i, status_code);
957   }
958 
959   return num_bits;
960 }
961 
962 /* Remove all locally stored packets that has been reported
963    back to us */
964 static void
_prune_sent_packets(RTPTWCCManager * twcc,GArray * twcc_packets)965 _prune_sent_packets (RTPTWCCManager * twcc, GArray * twcc_packets)
966 {
967   SentPacket *first;
968   RTPTWCCPacket *last;
969   guint16 last_idx;
970 
971   if (twcc_packets->len == 0 || twcc->sent_packets->len == 0)
972     return;
973 
974   first = &g_array_index (twcc->sent_packets, SentPacket, 0);
975   last = &g_array_index (twcc_packets, RTPTWCCPacket, twcc_packets->len - 1);
976 
977   last_idx = last->seqnum - first->seqnum;
978 
979   if (last_idx < twcc->sent_packets->len)
980     g_array_remove_range (twcc->sent_packets, 0, last_idx);
981 }
982 
983 static void
_check_for_lost_packets(RTPTWCCManager * twcc,GArray * twcc_packets,guint16 base_seqnum,guint16 packet_count,guint8 fb_pkt_count)984 _check_for_lost_packets (RTPTWCCManager * twcc, GArray * twcc_packets,
985     guint16 base_seqnum, guint16 packet_count, guint8 fb_pkt_count)
986 {
987   guint packets_lost;
988   gint8 fb_pkt_count_diff;
989   guint i;
990 
991   /* first packet */
992   if (twcc->first_fci_parse) {
993     twcc->first_fci_parse = FALSE;
994     goto done;
995   }
996 
997   fb_pkt_count_diff =
998       (gint8) (fb_pkt_count - twcc->expected_parsed_fb_pkt_count);
999 
1000   /* we have gone backwards, don't reset the expectations,
1001      but process the packet nonetheless */
1002   if (fb_pkt_count_diff < 0) {
1003     GST_DEBUG ("feedback packet count going backwards (%u < %u)",
1004         fb_pkt_count, twcc->expected_parsed_fb_pkt_count);
1005     return;
1006   }
1007 
1008   /* we have jumped forwards, reset expectations, but don't trigger
1009      lost packets in case the missing fb-packet(s) arrive later */
1010   if (fb_pkt_count_diff > 0) {
1011     GST_DEBUG ("feedback packet count jumped ahead (%u > %u)",
1012         fb_pkt_count, twcc->expected_parsed_fb_pkt_count);
1013     goto done;
1014   }
1015 
1016   if (base_seqnum < twcc->expected_parsed_seqnum) {
1017     GST_DEBUG ("twcc seqnum is older than expected  (%u < %u)", base_seqnum,
1018         twcc->expected_parsed_seqnum);
1019     return;
1020   }
1021 
1022   packets_lost = base_seqnum - twcc->expected_parsed_seqnum;
1023   for (i = 0; i < packets_lost; i++) {
1024     _add_twcc_packet (twcc_packets, twcc->expected_parsed_seqnum + i,
1025         RTP_TWCC_PACKET_STATUS_NOT_RECV);
1026   }
1027 
1028 done:
1029   twcc->expected_parsed_seqnum = base_seqnum + packet_count;
1030   twcc->expected_parsed_fb_pkt_count = fb_pkt_count + 1;
1031   return;
1032 }
1033 
1034 GArray *
rtp_twcc_manager_parse_fci(RTPTWCCManager * twcc,guint8 * fci_data,guint fci_length)1035 rtp_twcc_manager_parse_fci (RTPTWCCManager * twcc,
1036     guint8 * fci_data, guint fci_length)
1037 {
1038   GArray *twcc_packets;
1039   guint16 base_seqnum;
1040   guint16 packet_count;
1041   GstClockTime base_time;
1042   GstClockTime ts_rounded;
1043   guint8 fb_pkt_count;
1044   guint packets_parsed = 0;
1045   guint fci_parsed;
1046   guint i;
1047   SentPacket *first_sent_pkt = NULL;
1048 
1049   if (fci_length < 10) {
1050     GST_WARNING ("Malformed TWCC RTCP feedback packet");
1051     return NULL;
1052   }
1053 
1054   base_seqnum = GST_READ_UINT16_BE (&fci_data[0]);
1055   packet_count = GST_READ_UINT16_BE (&fci_data[2]);
1056   base_time = GST_READ_UINT24_BE (&fci_data[4]) * REF_TIME_UNIT;
1057   fb_pkt_count = fci_data[7];
1058 
1059   GST_DEBUG ("Parsed TWCC feedback: base_seqnum: #%u, packet_count: %u, "
1060       "base_time %" GST_TIME_FORMAT " fb_pkt_count: %u",
1061       base_seqnum, packet_count, GST_TIME_ARGS (base_time), fb_pkt_count);
1062 
1063   twcc_packets = g_array_sized_new (FALSE, FALSE,
1064       sizeof (RTPTWCCPacket), packet_count);
1065 
1066   _check_for_lost_packets (twcc, twcc_packets,
1067       base_seqnum, packet_count, fb_pkt_count);
1068 
1069   fci_parsed = 8;
1070   while (packets_parsed < packet_count && (fci_parsed + 1) < fci_length) {
1071     GstBitReader reader = GST_BIT_READER_INIT (&fci_data[fci_parsed], 2);
1072     guint8 chunk_type;
1073     guint seqnum_offset = base_seqnum + packets_parsed;
1074     guint remaining_packets = packet_count - packets_parsed;
1075 
1076     gst_bit_reader_get_bits_uint8 (&reader, &chunk_type, 1);
1077 
1078     if (chunk_type == RTP_TWCC_CHUNK_TYPE_RUN_LENGTH) {
1079       packets_parsed += _parse_run_length_chunk (&reader,
1080           twcc_packets, seqnum_offset, remaining_packets);
1081     } else {
1082       packets_parsed += _parse_status_vector_chunk (&reader,
1083           twcc_packets, seqnum_offset, remaining_packets);
1084     }
1085     fci_parsed += 2;
1086   }
1087 
1088   if (twcc->sent_packets->len > 0)
1089     first_sent_pkt = &g_array_index (twcc->sent_packets, SentPacket, 0);
1090 
1091   ts_rounded = base_time;
1092   for (i = 0; i < twcc_packets->len; i++) {
1093     RTPTWCCPacket *pkt = &g_array_index (twcc_packets, RTPTWCCPacket, i);
1094     gint16 delta = 0;
1095     GstClockTimeDiff delta_ts;
1096 
1097     if (pkt->status == RTP_TWCC_PACKET_STATUS_SMALL_DELTA) {
1098       delta = fci_data[fci_parsed];
1099       fci_parsed += 1;
1100     } else if (pkt->status == RTP_TWCC_PACKET_STATUS_LARGE_NEGATIVE_DELTA) {
1101       delta = GST_READ_UINT16_BE (&fci_data[fci_parsed]);
1102       fci_parsed += 2;
1103     }
1104 
1105     if (fci_parsed > fci_length) {
1106       GST_WARNING ("Malformed TWCC RTCP feedback packet");
1107       g_array_set_size (twcc_packets, 0);
1108       break;
1109     }
1110 
1111     if (pkt->status != RTP_TWCC_PACKET_STATUS_NOT_RECV) {
1112       delta_ts = delta * DELTA_UNIT;
1113       ts_rounded += delta_ts;
1114       pkt->remote_ts = ts_rounded;
1115 
1116       GST_LOG ("pkt: #%u, remote_ts: %" GST_TIME_FORMAT
1117           " delta_ts: %" GST_STIME_FORMAT
1118           " status: %u", pkt->seqnum,
1119           GST_TIME_ARGS (pkt->remote_ts), GST_STIME_ARGS (delta_ts),
1120           pkt->status);
1121     }
1122 
1123     if (first_sent_pkt) {
1124       SentPacket *found = NULL;
1125       guint16 sent_idx = pkt->seqnum - first_sent_pkt->seqnum;
1126       if (sent_idx < twcc->sent_packets->len)
1127         found = &g_array_index (twcc->sent_packets, SentPacket, sent_idx);
1128       if (found && found->seqnum == pkt->seqnum) {
1129         if (GST_CLOCK_TIME_IS_VALID (found->socket_ts)) {
1130           pkt->local_ts = found->socket_ts;
1131         } else {
1132           pkt->local_ts = found->ts;
1133         }
1134         pkt->size = found->size;
1135         pkt->pt = found->pt;
1136 
1137         GST_LOG ("matching pkt: #%u with local_ts: %" GST_TIME_FORMAT
1138             " size: %u", pkt->seqnum, GST_TIME_ARGS (pkt->local_ts), pkt->size);
1139       }
1140     }
1141   }
1142 
1143   _prune_sent_packets (twcc, twcc_packets);
1144 
1145   return twcc_packets;
1146 }
1147