1 /*
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/rtp_rtcp/source/forward_error_correction.h"
12
13 #include <string.h>
14
15 #include <algorithm>
16 #include <utility>
17
18 #include "absl/algorithm/container.h"
19 #include "modules/include/module_common_types_public.h"
20 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
21 #include "modules/rtp_rtcp/source/byte_io.h"
22 #include "modules/rtp_rtcp/source/flexfec_header_reader_writer.h"
23 #include "modules/rtp_rtcp/source/forward_error_correction_internal.h"
24 #include "modules/rtp_rtcp/source/ulpfec_header_reader_writer.h"
25 #include "rtc_base/checks.h"
26 #include "rtc_base/logging.h"
27 #include "rtc_base/numerics/mod_ops.h"
28
29 namespace webrtc {
30
31 namespace {
32 // Transport header size in bytes. Assume UDP/IPv4 as a reasonable minimum.
33 constexpr size_t kTransportOverhead = 28;
34 } // namespace
35
Packet()36 ForwardErrorCorrection::Packet::Packet() : data(0), ref_count_(0) {}
37 ForwardErrorCorrection::Packet::~Packet() = default;
38
AddRef()39 int32_t ForwardErrorCorrection::Packet::AddRef() {
40 return ++ref_count_;
41 }
42
Release()43 int32_t ForwardErrorCorrection::Packet::Release() {
44 int32_t ref_count;
45 ref_count = --ref_count_;
46 if (ref_count == 0)
47 delete this;
48 return ref_count;
49 }
50
51 // This comparator is used to compare std::unique_ptr's pointing to
52 // subclasses of SortablePackets. It needs to be parametric since
53 // the std::unique_ptr's are not covariant w.r.t. the types that
54 // they are pointing to.
55 template <typename S, typename T>
operator ()(const S & first,const T & second)56 bool ForwardErrorCorrection::SortablePacket::LessThan::operator()(
57 const S& first,
58 const T& second) {
59 RTC_DCHECK_EQ(first->ssrc, second->ssrc);
60 return IsNewerSequenceNumber(second->seq_num, first->seq_num);
61 }
62
63 ForwardErrorCorrection::ReceivedPacket::ReceivedPacket() = default;
64 ForwardErrorCorrection::ReceivedPacket::~ReceivedPacket() = default;
65
66 ForwardErrorCorrection::RecoveredPacket::RecoveredPacket() = default;
67 ForwardErrorCorrection::RecoveredPacket::~RecoveredPacket() = default;
68
69 ForwardErrorCorrection::ProtectedPacket::ProtectedPacket() = default;
70 ForwardErrorCorrection::ProtectedPacket::~ProtectedPacket() = default;
71
72 ForwardErrorCorrection::ReceivedFecPacket::ReceivedFecPacket() = default;
73 ForwardErrorCorrection::ReceivedFecPacket::~ReceivedFecPacket() = default;
74
ForwardErrorCorrection(std::unique_ptr<FecHeaderReader> fec_header_reader,std::unique_ptr<FecHeaderWriter> fec_header_writer,uint32_t ssrc,uint32_t protected_media_ssrc)75 ForwardErrorCorrection::ForwardErrorCorrection(
76 std::unique_ptr<FecHeaderReader> fec_header_reader,
77 std::unique_ptr<FecHeaderWriter> fec_header_writer,
78 uint32_t ssrc,
79 uint32_t protected_media_ssrc)
80 : ssrc_(ssrc),
81 protected_media_ssrc_(protected_media_ssrc),
82 fec_header_reader_(std::move(fec_header_reader)),
83 fec_header_writer_(std::move(fec_header_writer)),
84 generated_fec_packets_(fec_header_writer_->MaxFecPackets()),
85 packet_mask_size_(0) {}
86
87 ForwardErrorCorrection::~ForwardErrorCorrection() = default;
88
CreateUlpfec(uint32_t ssrc)89 std::unique_ptr<ForwardErrorCorrection> ForwardErrorCorrection::CreateUlpfec(
90 uint32_t ssrc) {
91 std::unique_ptr<FecHeaderReader> fec_header_reader(new UlpfecHeaderReader());
92 std::unique_ptr<FecHeaderWriter> fec_header_writer(new UlpfecHeaderWriter());
93 return std::unique_ptr<ForwardErrorCorrection>(new ForwardErrorCorrection(
94 std::move(fec_header_reader), std::move(fec_header_writer), ssrc, ssrc));
95 }
96
CreateFlexfec(uint32_t ssrc,uint32_t protected_media_ssrc)97 std::unique_ptr<ForwardErrorCorrection> ForwardErrorCorrection::CreateFlexfec(
98 uint32_t ssrc,
99 uint32_t protected_media_ssrc) {
100 std::unique_ptr<FecHeaderReader> fec_header_reader(new FlexfecHeaderReader());
101 std::unique_ptr<FecHeaderWriter> fec_header_writer(new FlexfecHeaderWriter());
102 return std::unique_ptr<ForwardErrorCorrection>(new ForwardErrorCorrection(
103 std::move(fec_header_reader), std::move(fec_header_writer), ssrc,
104 protected_media_ssrc));
105 }
106
EncodeFec(const PacketList & media_packets,uint8_t protection_factor,int num_important_packets,bool use_unequal_protection,FecMaskType fec_mask_type,std::list<Packet * > * fec_packets)107 int ForwardErrorCorrection::EncodeFec(const PacketList& media_packets,
108 uint8_t protection_factor,
109 int num_important_packets,
110 bool use_unequal_protection,
111 FecMaskType fec_mask_type,
112 std::list<Packet*>* fec_packets) {
113 const size_t num_media_packets = media_packets.size();
114
115 // Sanity check arguments.
116 RTC_DCHECK_GT(num_media_packets, 0);
117 RTC_DCHECK_GE(num_important_packets, 0);
118 RTC_DCHECK_LE(num_important_packets, num_media_packets);
119 RTC_DCHECK(fec_packets->empty());
120 const size_t max_media_packets = fec_header_writer_->MaxMediaPackets();
121 if (num_media_packets > max_media_packets) {
122 RTC_LOG(LS_WARNING) << "Can't protect " << num_media_packets
123 << " media packets per frame. Max is "
124 << max_media_packets << ".";
125 return -1;
126 }
127
128 // Error check the media packets.
129 for (const auto& media_packet : media_packets) {
130 RTC_DCHECK(media_packet);
131 if (media_packet->data.size() < kRtpHeaderSize) {
132 RTC_LOG(LS_WARNING) << "Media packet " << media_packet->data.size()
133 << " bytes "
134 "is smaller than RTP header.";
135 return -1;
136 }
137 // Ensure the FEC packets will fit in a typical MTU.
138 if (media_packet->data.size() + MaxPacketOverhead() + kTransportOverhead >
139 IP_PACKET_SIZE) {
140 RTC_LOG(LS_WARNING) << "Media packet " << media_packet->data.size()
141 << " bytes "
142 "with overhead is larger than "
143 << IP_PACKET_SIZE << " bytes.";
144 }
145 }
146
147 // Prepare generated FEC packets.
148 int num_fec_packets = NumFecPackets(num_media_packets, protection_factor);
149 if (num_fec_packets == 0) {
150 return 0;
151 }
152 for (int i = 0; i < num_fec_packets; ++i) {
153 generated_fec_packets_[i].data.EnsureCapacity(IP_PACKET_SIZE);
154 memset(generated_fec_packets_[i].data.data(), 0, IP_PACKET_SIZE);
155 // Use this as a marker for untouched packets.
156 generated_fec_packets_[i].data.SetSize(0);
157 fec_packets->push_back(&generated_fec_packets_[i]);
158 }
159
160 internal::PacketMaskTable mask_table(fec_mask_type, num_media_packets);
161 packet_mask_size_ = internal::PacketMaskSize(num_media_packets);
162 memset(packet_masks_, 0, num_fec_packets * packet_mask_size_);
163 internal::GeneratePacketMasks(num_media_packets, num_fec_packets,
164 num_important_packets, use_unequal_protection,
165 &mask_table, packet_masks_);
166
167 // Adapt packet masks to missing media packets.
168 int num_mask_bits = InsertZerosInPacketMasks(media_packets, num_fec_packets);
169 if (num_mask_bits < 0) {
170 RTC_LOG(LS_INFO) << "Due to sequence number gaps, cannot protect media "
171 "packets with a single block of FEC packets.";
172 fec_packets->clear();
173 return -1;
174 }
175 packet_mask_size_ = internal::PacketMaskSize(num_mask_bits);
176
177 // Write FEC packets to |generated_fec_packets_|.
178 GenerateFecPayloads(media_packets, num_fec_packets);
179 // TODO(brandtr): Generalize this when multistream protection support is
180 // added.
181 const uint32_t media_ssrc = ParseSsrc(media_packets.front()->data.data());
182 const uint16_t seq_num_base =
183 ParseSequenceNumber(media_packets.front()->data.data());
184 FinalizeFecHeaders(num_fec_packets, media_ssrc, seq_num_base);
185
186 return 0;
187 }
188
NumFecPackets(int num_media_packets,int protection_factor)189 int ForwardErrorCorrection::NumFecPackets(int num_media_packets,
190 int protection_factor) {
191 // Result in Q0 with an unsigned round.
192 int num_fec_packets = (num_media_packets * protection_factor + (1 << 7)) >> 8;
193 // Generate at least one FEC packet if we need protection.
194 if (protection_factor > 0 && num_fec_packets == 0) {
195 num_fec_packets = 1;
196 }
197 RTC_DCHECK_LE(num_fec_packets, num_media_packets);
198 return num_fec_packets;
199 }
200
GenerateFecPayloads(const PacketList & media_packets,size_t num_fec_packets)201 void ForwardErrorCorrection::GenerateFecPayloads(
202 const PacketList& media_packets,
203 size_t num_fec_packets) {
204 RTC_DCHECK(!media_packets.empty());
205 for (size_t i = 0; i < num_fec_packets; ++i) {
206 Packet* const fec_packet = &generated_fec_packets_[i];
207 size_t pkt_mask_idx = i * packet_mask_size_;
208 const size_t min_packet_mask_size = fec_header_writer_->MinPacketMaskSize(
209 &packet_masks_[pkt_mask_idx], packet_mask_size_);
210 const size_t fec_header_size =
211 fec_header_writer_->FecHeaderSize(min_packet_mask_size);
212
213 size_t media_pkt_idx = 0;
214 auto media_packets_it = media_packets.cbegin();
215 uint16_t prev_seq_num =
216 ParseSequenceNumber((*media_packets_it)->data.data());
217 while (media_packets_it != media_packets.end()) {
218 Packet* const media_packet = media_packets_it->get();
219 const uint8_t* media_packet_data = media_packet->data.cdata();
220 // Should |media_packet| be protected by |fec_packet|?
221 if (packet_masks_[pkt_mask_idx] & (1 << (7 - media_pkt_idx))) {
222 size_t media_payload_length =
223 media_packet->data.size() - kRtpHeaderSize;
224
225 bool first_protected_packet = (fec_packet->data.size() == 0);
226 size_t fec_packet_length = fec_header_size + media_payload_length;
227 if (fec_packet_length > fec_packet->data.size()) {
228 // Recall that XORing with zero (which the FEC packets are prefilled
229 // with) is the identity operator, thus all prior XORs are
230 // still correct even though we expand the packet length here.
231 fec_packet->data.SetSize(fec_packet_length);
232 }
233 if (first_protected_packet) {
234 uint8_t* data = fec_packet->data.data();
235 // Write P, X, CC, M, and PT recovery fields.
236 // Note that bits 0, 1, and 16 are overwritten in FinalizeFecHeaders.
237 memcpy(&data[0], &media_packet_data[0], 2);
238 // Write length recovery field. (This is a temporary location for
239 // ULPFEC.)
240 ByteWriter<uint16_t>::WriteBigEndian(&data[2], media_payload_length);
241 // Write timestamp recovery field.
242 memcpy(&data[4], &media_packet_data[4], 4);
243 // Write payload.
244 if (media_payload_length > 0) {
245 memcpy(&data[fec_header_size], &media_packet_data[kRtpHeaderSize],
246 media_payload_length);
247 }
248 } else {
249 XorHeaders(*media_packet, fec_packet);
250 XorPayloads(*media_packet, media_payload_length, fec_header_size,
251 fec_packet);
252 }
253 }
254 media_packets_it++;
255 if (media_packets_it != media_packets.end()) {
256 uint16_t seq_num =
257 ParseSequenceNumber((*media_packets_it)->data.data());
258 media_pkt_idx += static_cast<uint16_t>(seq_num - prev_seq_num);
259 prev_seq_num = seq_num;
260 }
261 pkt_mask_idx += media_pkt_idx / 8;
262 media_pkt_idx %= 8;
263 }
264 RTC_DCHECK_GT(fec_packet->data.size(), 0)
265 << "Packet mask is wrong or poorly designed.";
266 }
267 }
268
InsertZerosInPacketMasks(const PacketList & media_packets,size_t num_fec_packets)269 int ForwardErrorCorrection::InsertZerosInPacketMasks(
270 const PacketList& media_packets,
271 size_t num_fec_packets) {
272 size_t num_media_packets = media_packets.size();
273 if (num_media_packets <= 1) {
274 return num_media_packets;
275 }
276 uint16_t last_seq_num =
277 ParseSequenceNumber(media_packets.back()->data.data());
278 uint16_t first_seq_num =
279 ParseSequenceNumber(media_packets.front()->data.data());
280 size_t total_missing_seq_nums =
281 static_cast<uint16_t>(last_seq_num - first_seq_num) - num_media_packets +
282 1;
283 if (total_missing_seq_nums == 0) {
284 // All sequence numbers are covered by the packet mask.
285 // No zero insertion required.
286 return num_media_packets;
287 }
288 const size_t max_media_packets = fec_header_writer_->MaxMediaPackets();
289 if (total_missing_seq_nums + num_media_packets > max_media_packets) {
290 return -1;
291 }
292 // Allocate the new mask.
293 size_t tmp_packet_mask_size =
294 internal::PacketMaskSize(total_missing_seq_nums + num_media_packets);
295 memset(tmp_packet_masks_, 0, num_fec_packets * tmp_packet_mask_size);
296
297 auto media_packets_it = media_packets.cbegin();
298 uint16_t prev_seq_num = first_seq_num;
299 ++media_packets_it;
300
301 // Insert the first column.
302 internal::CopyColumn(tmp_packet_masks_, tmp_packet_mask_size, packet_masks_,
303 packet_mask_size_, num_fec_packets, 0, 0);
304 size_t new_bit_index = 1;
305 size_t old_bit_index = 1;
306 // Insert zeros in the bit mask for every hole in the sequence.
307 while (media_packets_it != media_packets.end()) {
308 if (new_bit_index == max_media_packets) {
309 // We can only cover up to 48 packets.
310 break;
311 }
312 uint16_t seq_num = ParseSequenceNumber((*media_packets_it)->data.data());
313 const int num_zeros_to_insert =
314 static_cast<uint16_t>(seq_num - prev_seq_num - 1);
315 if (num_zeros_to_insert > 0) {
316 internal::InsertZeroColumns(num_zeros_to_insert, tmp_packet_masks_,
317 tmp_packet_mask_size, num_fec_packets,
318 new_bit_index);
319 }
320 new_bit_index += num_zeros_to_insert;
321 internal::CopyColumn(tmp_packet_masks_, tmp_packet_mask_size, packet_masks_,
322 packet_mask_size_, num_fec_packets, new_bit_index,
323 old_bit_index);
324 ++new_bit_index;
325 ++old_bit_index;
326 prev_seq_num = seq_num;
327 ++media_packets_it;
328 }
329 if (new_bit_index % 8 != 0) {
330 // We didn't fill the last byte. Shift bits to correct position.
331 for (uint16_t row = 0; row < num_fec_packets; ++row) {
332 int new_byte_index = row * tmp_packet_mask_size + new_bit_index / 8;
333 tmp_packet_masks_[new_byte_index] <<= (7 - (new_bit_index % 8));
334 }
335 }
336 // Replace the old mask with the new.
337 memcpy(packet_masks_, tmp_packet_masks_,
338 num_fec_packets * tmp_packet_mask_size);
339 return new_bit_index;
340 }
341
FinalizeFecHeaders(size_t num_fec_packets,uint32_t media_ssrc,uint16_t seq_num_base)342 void ForwardErrorCorrection::FinalizeFecHeaders(size_t num_fec_packets,
343 uint32_t media_ssrc,
344 uint16_t seq_num_base) {
345 for (size_t i = 0; i < num_fec_packets; ++i) {
346 fec_header_writer_->FinalizeFecHeader(
347 media_ssrc, seq_num_base, &packet_masks_[i * packet_mask_size_],
348 packet_mask_size_, &generated_fec_packets_[i]);
349 }
350 }
351
ResetState(RecoveredPacketList * recovered_packets)352 void ForwardErrorCorrection::ResetState(
353 RecoveredPacketList* recovered_packets) {
354 // Free the memory for any existing recovered packets, if the caller hasn't.
355 recovered_packets->clear();
356 received_fec_packets_.clear();
357 }
358
InsertMediaPacket(RecoveredPacketList * recovered_packets,const ReceivedPacket & received_packet)359 void ForwardErrorCorrection::InsertMediaPacket(
360 RecoveredPacketList* recovered_packets,
361 const ReceivedPacket& received_packet) {
362 RTC_DCHECK_EQ(received_packet.ssrc, protected_media_ssrc_);
363
364 // Search for duplicate packets.
365 for (const auto& recovered_packet : *recovered_packets) {
366 RTC_DCHECK_EQ(recovered_packet->ssrc, received_packet.ssrc);
367 if (recovered_packet->seq_num == received_packet.seq_num) {
368 // Duplicate packet, no need to add to list.
369 return;
370 }
371 }
372
373 std::unique_ptr<RecoveredPacket> recovered_packet(new RecoveredPacket());
374 // This "recovered packet" was not recovered using parity packets.
375 recovered_packet->was_recovered = false;
376 // This media packet has already been passed on.
377 recovered_packet->returned = true;
378 recovered_packet->ssrc = received_packet.ssrc;
379 recovered_packet->seq_num = received_packet.seq_num;
380 recovered_packet->pkt = received_packet.pkt;
381 // TODO(holmer): Consider replacing this with a binary search for the right
382 // position, and then just insert the new packet. Would get rid of the sort.
383 RecoveredPacket* recovered_packet_ptr = recovered_packet.get();
384 recovered_packets->push_back(std::move(recovered_packet));
385 recovered_packets->sort(SortablePacket::LessThan());
386 UpdateCoveringFecPackets(*recovered_packet_ptr);
387 }
388
UpdateCoveringFecPackets(const RecoveredPacket & packet)389 void ForwardErrorCorrection::UpdateCoveringFecPackets(
390 const RecoveredPacket& packet) {
391 for (auto& fec_packet : received_fec_packets_) {
392 // Is this FEC packet protecting the media packet |packet|?
393 auto protected_it = absl::c_lower_bound(
394 fec_packet->protected_packets, &packet, SortablePacket::LessThan());
395 if (protected_it != fec_packet->protected_packets.end() &&
396 (*protected_it)->seq_num == packet.seq_num) {
397 // Found an FEC packet which is protecting |packet|.
398 (*protected_it)->pkt = packet.pkt;
399 }
400 }
401 }
402
InsertFecPacket(const RecoveredPacketList & recovered_packets,const ReceivedPacket & received_packet)403 void ForwardErrorCorrection::InsertFecPacket(
404 const RecoveredPacketList& recovered_packets,
405 const ReceivedPacket& received_packet) {
406 RTC_DCHECK_EQ(received_packet.ssrc, ssrc_);
407
408 // Check for duplicate.
409 for (const auto& existing_fec_packet : received_fec_packets_) {
410 RTC_DCHECK_EQ(existing_fec_packet->ssrc, received_packet.ssrc);
411 if (existing_fec_packet->seq_num == received_packet.seq_num) {
412 // Drop duplicate FEC packet data.
413 return;
414 }
415 }
416
417 std::unique_ptr<ReceivedFecPacket> fec_packet(new ReceivedFecPacket());
418 fec_packet->pkt = received_packet.pkt;
419 fec_packet->ssrc = received_packet.ssrc;
420 fec_packet->seq_num = received_packet.seq_num;
421 // Parse ULPFEC/FlexFEC header specific info.
422 bool ret = fec_header_reader_->ReadFecHeader(fec_packet.get());
423 if (!ret) {
424 return;
425 }
426
427 // TODO(brandtr): Update here when we support multistream protection.
428 if (fec_packet->protected_ssrc != protected_media_ssrc_) {
429 RTC_LOG(LS_INFO)
430 << "Received FEC packet is protecting an unknown media SSRC; dropping.";
431 return;
432 }
433
434 if (fec_packet->packet_mask_offset + fec_packet->packet_mask_size >
435 fec_packet->pkt->data.size()) {
436 RTC_LOG(LS_INFO) << "Received corrupted FEC packet; dropping.";
437 return;
438 }
439
440 // Parse packet mask from header and represent as protected packets.
441 for (uint16_t byte_idx = 0; byte_idx < fec_packet->packet_mask_size;
442 ++byte_idx) {
443 uint8_t packet_mask =
444 fec_packet->pkt->data[fec_packet->packet_mask_offset + byte_idx];
445 for (uint16_t bit_idx = 0; bit_idx < 8; ++bit_idx) {
446 if (packet_mask & (1 << (7 - bit_idx))) {
447 std::unique_ptr<ProtectedPacket> protected_packet(
448 new ProtectedPacket());
449 // This wraps naturally with the sequence number.
450 protected_packet->ssrc = protected_media_ssrc_;
451 protected_packet->seq_num = static_cast<uint16_t>(
452 fec_packet->seq_num_base + (byte_idx << 3) + bit_idx);
453 protected_packet->pkt = nullptr;
454 fec_packet->protected_packets.push_back(std::move(protected_packet));
455 }
456 }
457 }
458
459 if (fec_packet->protected_packets.empty()) {
460 // All-zero packet mask; we can discard this FEC packet.
461 RTC_LOG(LS_WARNING) << "Received FEC packet has an all-zero packet mask.";
462 } else {
463 AssignRecoveredPackets(recovered_packets, fec_packet.get());
464 // TODO(holmer): Consider replacing this with a binary search for the right
465 // position, and then just insert the new packet. Would get rid of the sort.
466 received_fec_packets_.push_back(std::move(fec_packet));
467 received_fec_packets_.sort(SortablePacket::LessThan());
468 const size_t max_fec_packets = fec_header_reader_->MaxFecPackets();
469 if (received_fec_packets_.size() > max_fec_packets) {
470 received_fec_packets_.pop_front();
471 }
472 RTC_DCHECK_LE(received_fec_packets_.size(), max_fec_packets);
473 }
474 }
475
AssignRecoveredPackets(const RecoveredPacketList & recovered_packets,ReceivedFecPacket * fec_packet)476 void ForwardErrorCorrection::AssignRecoveredPackets(
477 const RecoveredPacketList& recovered_packets,
478 ReceivedFecPacket* fec_packet) {
479 ProtectedPacketList* protected_packets = &fec_packet->protected_packets;
480 std::vector<RecoveredPacket*> recovered_protected_packets;
481
482 // Find intersection between the (sorted) containers |protected_packets|
483 // and |recovered_packets|, i.e. all protected packets that have already
484 // been recovered. Update the corresponding protected packets to point to
485 // the recovered packets.
486 auto it_p = protected_packets->cbegin();
487 auto it_r = recovered_packets.cbegin();
488 SortablePacket::LessThan less_than;
489 while (it_p != protected_packets->end() && it_r != recovered_packets.end()) {
490 if (less_than(*it_p, *it_r)) {
491 ++it_p;
492 } else if (less_than(*it_r, *it_p)) {
493 ++it_r;
494 } else { // *it_p == *it_r.
495 // This protected packet has already been recovered.
496 (*it_p)->pkt = (*it_r)->pkt;
497 ++it_p;
498 ++it_r;
499 }
500 }
501 }
502
InsertPacket(const ReceivedPacket & received_packet,RecoveredPacketList * recovered_packets)503 void ForwardErrorCorrection::InsertPacket(
504 const ReceivedPacket& received_packet,
505 RecoveredPacketList* recovered_packets) {
506 // Discard old FEC packets such that the sequence numbers in
507 // |received_fec_packets_| span at most 1/2 of the sequence number space.
508 // This is important for keeping |received_fec_packets_| sorted, and may
509 // also reduce the possibility of incorrect decoding due to sequence number
510 // wrap-around.
511 // TODO(marpan/holmer): We should be able to improve detection/discarding of
512 // old FEC packets based on timestamp information or better sequence number
513 // thresholding (e.g., to distinguish between wrap-around and reordering).
514 if (!received_fec_packets_.empty() &&
515 received_packet.ssrc == received_fec_packets_.front()->ssrc) {
516 // It only makes sense to detect wrap-around when |received_packet|
517 // and |front_received_fec_packet| belong to the same sequence number
518 // space, i.e., the same SSRC. This happens when |received_packet|
519 // is a FEC packet, or if |received_packet| is a media packet and
520 // RED+ULPFEC is used.
521 auto it = received_fec_packets_.begin();
522 while (it != received_fec_packets_.end()) {
523 uint16_t seq_num_diff = MinDiff(received_packet.seq_num, (*it)->seq_num);
524 if (seq_num_diff > 0x3fff) {
525 it = received_fec_packets_.erase(it);
526 } else {
527 // No need to keep iterating, since |received_fec_packets_| is sorted.
528 break;
529 }
530 }
531 }
532
533 if (received_packet.is_fec) {
534 InsertFecPacket(*recovered_packets, received_packet);
535 } else {
536 InsertMediaPacket(recovered_packets, received_packet);
537 }
538
539 DiscardOldRecoveredPackets(recovered_packets);
540 }
541
StartPacketRecovery(const ReceivedFecPacket & fec_packet,RecoveredPacket * recovered_packet)542 bool ForwardErrorCorrection::StartPacketRecovery(
543 const ReceivedFecPacket& fec_packet,
544 RecoveredPacket* recovered_packet) {
545 // Ensure pkt is initialized.
546 recovered_packet->pkt = new Packet();
547 // Sanity check packet length.
548 if (fec_packet.pkt->data.size() <
549 fec_packet.fec_header_size + fec_packet.protection_length) {
550 RTC_LOG(LS_WARNING)
551 << "The FEC packet is truncated: it does not contain enough room "
552 "for its own header.";
553 return false;
554 }
555 if (fec_packet.protection_length >
556 std::min(size_t{IP_PACKET_SIZE - kRtpHeaderSize},
557 IP_PACKET_SIZE - fec_packet.fec_header_size)) {
558 RTC_LOG(LS_WARNING) << "Incorrect protection length, dropping FEC packet.";
559 return false;
560 }
561 // Initialize recovered packet data.
562 recovered_packet->pkt->data.EnsureCapacity(IP_PACKET_SIZE);
563 recovered_packet->pkt->data.SetSize(fec_packet.protection_length +
564 kRtpHeaderSize);
565 recovered_packet->returned = false;
566 recovered_packet->was_recovered = true;
567 // Copy bytes corresponding to minimum RTP header size.
568 // Note that the sequence number and SSRC fields will be overwritten
569 // at the end of packet recovery.
570 memcpy(recovered_packet->pkt->data.data(), fec_packet.pkt->data.cdata(),
571 kRtpHeaderSize);
572 // Copy remaining FEC payload.
573 if (fec_packet.protection_length > 0) {
574 memcpy(recovered_packet->pkt->data.data() + kRtpHeaderSize,
575 fec_packet.pkt->data.cdata() + fec_packet.fec_header_size,
576 fec_packet.protection_length);
577 }
578 return true;
579 }
580
FinishPacketRecovery(const ReceivedFecPacket & fec_packet,RecoveredPacket * recovered_packet)581 bool ForwardErrorCorrection::FinishPacketRecovery(
582 const ReceivedFecPacket& fec_packet,
583 RecoveredPacket* recovered_packet) {
584 uint8_t* data = recovered_packet->pkt->data.data();
585 // Set the RTP version to 2.
586 data[0] |= 0x80; // Set the 1st bit.
587 data[0] &= 0xbf; // Clear the 2nd bit.
588 // Recover the packet length, from temporary location.
589 const size_t new_size =
590 ByteReader<uint16_t>::ReadBigEndian(&data[2]) + kRtpHeaderSize;
591 if (new_size > size_t{IP_PACKET_SIZE - kRtpHeaderSize}) {
592 RTC_LOG(LS_WARNING) << "The recovered packet had a length larger than a "
593 "typical IP packet, and is thus dropped.";
594 return false;
595 }
596 recovered_packet->pkt->data.SetSize(new_size);
597 // Set the SN field.
598 ByteWriter<uint16_t>::WriteBigEndian(&data[2], recovered_packet->seq_num);
599 // Set the SSRC field.
600 ByteWriter<uint32_t>::WriteBigEndian(&data[8], fec_packet.protected_ssrc);
601 recovered_packet->ssrc = fec_packet.protected_ssrc;
602 return true;
603 }
604
XorHeaders(const Packet & src,Packet * dst)605 void ForwardErrorCorrection::XorHeaders(const Packet& src, Packet* dst) {
606 uint8_t* dst_data = dst->data.data();
607 const uint8_t* src_data = src.data.cdata();
608 // XOR the first 2 bytes of the header: V, P, X, CC, M, PT fields.
609 dst_data[0] ^= src_data[0];
610 dst_data[1] ^= src_data[1];
611
612 // XOR the length recovery field.
613 uint8_t src_payload_length_network_order[2];
614 ByteWriter<uint16_t>::WriteBigEndian(src_payload_length_network_order,
615 src.data.size() - kRtpHeaderSize);
616 dst_data[2] ^= src_payload_length_network_order[0];
617 dst_data[3] ^= src_payload_length_network_order[1];
618
619 // XOR the 5th to 8th bytes of the header: the timestamp field.
620 dst_data[4] ^= src_data[4];
621 dst_data[5] ^= src_data[5];
622 dst_data[6] ^= src_data[6];
623 dst_data[7] ^= src_data[7];
624
625 // Skip the 9th to 12th bytes of the header.
626 }
627
XorPayloads(const Packet & src,size_t payload_length,size_t dst_offset,Packet * dst)628 void ForwardErrorCorrection::XorPayloads(const Packet& src,
629 size_t payload_length,
630 size_t dst_offset,
631 Packet* dst) {
632 // XOR the payload.
633 RTC_DCHECK_LE(kRtpHeaderSize + payload_length, src.data.size());
634 RTC_DCHECK_LE(dst_offset + payload_length, dst->data.capacity());
635 if (dst_offset + payload_length > dst->data.size()) {
636 dst->data.SetSize(dst_offset + payload_length);
637 }
638 uint8_t* dst_data = dst->data.data();
639 const uint8_t* src_data = src.data.cdata();
640 for (size_t i = 0; i < payload_length; ++i) {
641 dst_data[dst_offset + i] ^= src_data[kRtpHeaderSize + i];
642 }
643 }
644
RecoverPacket(const ReceivedFecPacket & fec_packet,RecoveredPacket * recovered_packet)645 bool ForwardErrorCorrection::RecoverPacket(const ReceivedFecPacket& fec_packet,
646 RecoveredPacket* recovered_packet) {
647 if (!StartPacketRecovery(fec_packet, recovered_packet)) {
648 return false;
649 }
650 for (const auto& protected_packet : fec_packet.protected_packets) {
651 if (protected_packet->pkt == nullptr) {
652 // This is the packet we're recovering.
653 recovered_packet->seq_num = protected_packet->seq_num;
654 } else {
655 XorHeaders(*protected_packet->pkt, recovered_packet->pkt);
656 XorPayloads(*protected_packet->pkt,
657 protected_packet->pkt->data.size() - kRtpHeaderSize,
658 kRtpHeaderSize, recovered_packet->pkt);
659 }
660 }
661 if (!FinishPacketRecovery(fec_packet, recovered_packet)) {
662 return false;
663 }
664 return true;
665 }
666
AttemptRecovery(RecoveredPacketList * recovered_packets)667 void ForwardErrorCorrection::AttemptRecovery(
668 RecoveredPacketList* recovered_packets) {
669 auto fec_packet_it = received_fec_packets_.begin();
670 while (fec_packet_it != received_fec_packets_.end()) {
671 // Search for each FEC packet's protected media packets.
672 int packets_missing = NumCoveredPacketsMissing(**fec_packet_it);
673
674 // We can only recover one packet with an FEC packet.
675 if (packets_missing == 1) {
676 // Recovery possible.
677 std::unique_ptr<RecoveredPacket> recovered_packet(new RecoveredPacket());
678 recovered_packet->pkt = nullptr;
679 if (!RecoverPacket(**fec_packet_it, recovered_packet.get())) {
680 // Can't recover using this packet, drop it.
681 fec_packet_it = received_fec_packets_.erase(fec_packet_it);
682 continue;
683 }
684
685 auto* recovered_packet_ptr = recovered_packet.get();
686 // Add recovered packet to the list of recovered packets and update any
687 // FEC packets covering this packet with a pointer to the data.
688 // TODO(holmer): Consider replacing this with a binary search for the
689 // right position, and then just insert the new packet. Would get rid of
690 // the sort.
691 recovered_packets->push_back(std::move(recovered_packet));
692 recovered_packets->sort(SortablePacket::LessThan());
693 UpdateCoveringFecPackets(*recovered_packet_ptr);
694 DiscardOldRecoveredPackets(recovered_packets);
695 fec_packet_it = received_fec_packets_.erase(fec_packet_it);
696
697 // A packet has been recovered. We need to check the FEC list again, as
698 // this may allow additional packets to be recovered.
699 // Restart for first FEC packet.
700 fec_packet_it = received_fec_packets_.begin();
701 } else if (packets_missing == 0) {
702 // Either all protected packets arrived or have been recovered. We can
703 // discard this FEC packet.
704 fec_packet_it = received_fec_packets_.erase(fec_packet_it);
705 } else {
706 fec_packet_it++;
707 }
708 }
709 }
710
NumCoveredPacketsMissing(const ReceivedFecPacket & fec_packet)711 int ForwardErrorCorrection::NumCoveredPacketsMissing(
712 const ReceivedFecPacket& fec_packet) {
713 int packets_missing = 0;
714 for (const auto& protected_packet : fec_packet.protected_packets) {
715 if (protected_packet->pkt == nullptr) {
716 ++packets_missing;
717 if (packets_missing > 1) {
718 break; // We can't recover more than one packet.
719 }
720 }
721 }
722 return packets_missing;
723 }
724
DiscardOldRecoveredPackets(RecoveredPacketList * recovered_packets)725 void ForwardErrorCorrection::DiscardOldRecoveredPackets(
726 RecoveredPacketList* recovered_packets) {
727 const size_t max_media_packets = fec_header_reader_->MaxMediaPackets();
728 while (recovered_packets->size() > max_media_packets) {
729 recovered_packets->pop_front();
730 }
731 RTC_DCHECK_LE(recovered_packets->size(), max_media_packets);
732 }
733
ParseSequenceNumber(uint8_t * packet)734 uint16_t ForwardErrorCorrection::ParseSequenceNumber(uint8_t* packet) {
735 return (packet[2] << 8) + packet[3];
736 }
737
ParseSsrc(uint8_t * packet)738 uint32_t ForwardErrorCorrection::ParseSsrc(uint8_t* packet) {
739 return (packet[8] << 24) + (packet[9] << 16) + (packet[10] << 8) + packet[11];
740 }
741
DecodeFec(const ReceivedPacket & received_packet,RecoveredPacketList * recovered_packets)742 void ForwardErrorCorrection::DecodeFec(const ReceivedPacket& received_packet,
743 RecoveredPacketList* recovered_packets) {
744 RTC_DCHECK(recovered_packets);
745
746 const size_t max_media_packets = fec_header_reader_->MaxMediaPackets();
747 if (recovered_packets->size() == max_media_packets) {
748 const RecoveredPacket* back_recovered_packet =
749 recovered_packets->back().get();
750
751 if (received_packet.ssrc == back_recovered_packet->ssrc) {
752 const unsigned int seq_num_diff =
753 MinDiff(received_packet.seq_num, back_recovered_packet->seq_num);
754 if (seq_num_diff > max_media_packets) {
755 // A big gap in sequence numbers. The old recovered packets
756 // are now useless, so it's safe to do a reset.
757 RTC_LOG(LS_INFO) << "Big gap in media/ULPFEC sequence numbers. No need "
758 "to keep the old packets in the FEC buffers, thus "
759 "resetting them.";
760 ResetState(recovered_packets);
761 }
762 }
763 }
764
765 InsertPacket(received_packet, recovered_packets);
766 AttemptRecovery(recovered_packets);
767 }
768
MaxPacketOverhead() const769 size_t ForwardErrorCorrection::MaxPacketOverhead() const {
770 return fec_header_writer_->MaxPacketOverhead();
771 }
772
FecHeaderReader(size_t max_media_packets,size_t max_fec_packets)773 FecHeaderReader::FecHeaderReader(size_t max_media_packets,
774 size_t max_fec_packets)
775 : max_media_packets_(max_media_packets),
776 max_fec_packets_(max_fec_packets) {}
777
778 FecHeaderReader::~FecHeaderReader() = default;
779
MaxMediaPackets() const780 size_t FecHeaderReader::MaxMediaPackets() const {
781 return max_media_packets_;
782 }
783
MaxFecPackets() const784 size_t FecHeaderReader::MaxFecPackets() const {
785 return max_fec_packets_;
786 }
787
FecHeaderWriter(size_t max_media_packets,size_t max_fec_packets,size_t max_packet_overhead)788 FecHeaderWriter::FecHeaderWriter(size_t max_media_packets,
789 size_t max_fec_packets,
790 size_t max_packet_overhead)
791 : max_media_packets_(max_media_packets),
792 max_fec_packets_(max_fec_packets),
793 max_packet_overhead_(max_packet_overhead) {}
794
795 FecHeaderWriter::~FecHeaderWriter() = default;
796
MaxMediaPackets() const797 size_t FecHeaderWriter::MaxMediaPackets() const {
798 return max_media_packets_;
799 }
800
MaxFecPackets() const801 size_t FecHeaderWriter::MaxFecPackets() const {
802 return max_fec_packets_;
803 }
804
MaxPacketOverhead() const805 size_t FecHeaderWriter::MaxPacketOverhead() const {
806 return max_packet_overhead_;
807 }
808
809 } // namespace webrtc
810