1 /*
2 * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/rtp_rtcp/source/rtcp_transceiver_impl.h"
12
13 #include <algorithm>
14 #include <utility>
15
16 #include "absl/algorithm/container.h"
17 #include "absl/memory/memory.h"
18 #include "api/call/transport.h"
19 #include "api/video/video_bitrate_allocation.h"
20 #include "modules/rtp_rtcp/include/receive_statistics.h"
21 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
22 #include "modules/rtp_rtcp/source/rtcp_packet.h"
23 #include "modules/rtp_rtcp/source/rtcp_packet/bye.h"
24 #include "modules/rtp_rtcp/source/rtcp_packet/common_header.h"
25 #include "modules/rtp_rtcp/source/rtcp_packet/extended_reports.h"
26 #include "modules/rtp_rtcp/source/rtcp_packet/fir.h"
27 #include "modules/rtp_rtcp/source/rtcp_packet/nack.h"
28 #include "modules/rtp_rtcp/source/rtcp_packet/pli.h"
29 #include "modules/rtp_rtcp/source/rtcp_packet/receiver_report.h"
30 #include "modules/rtp_rtcp/source/rtcp_packet/report_block.h"
31 #include "modules/rtp_rtcp/source/rtcp_packet/sdes.h"
32 #include "modules/rtp_rtcp/source/rtcp_packet/sender_report.h"
33 #include "modules/rtp_rtcp/source/time_util.h"
34 #include "rtc_base/checks.h"
35 #include "rtc_base/containers/flat_map.h"
36 #include "rtc_base/logging.h"
37 #include "rtc_base/numerics/divide_round.h"
38 #include "rtc_base/task_utils/repeating_task.h"
39 #include "system_wrappers/include/clock.h"
40
41 namespace webrtc {
42 namespace {
43
44 struct SenderReportTimes {
45 Timestamp local_received_time;
46 NtpTime remote_sent_time;
47 };
48
49 } // namespace
50
51 struct RtcpTransceiverImpl::RemoteSenderState {
52 uint8_t fir_sequence_number = 0;
53 absl::optional<SenderReportTimes> last_received_sender_report;
54 std::vector<MediaReceiverRtcpObserver*> observers;
55 };
56
57 struct RtcpTransceiverImpl::LocalSenderState {
58 uint32_t ssrc;
59 size_t last_num_sent_bytes = 0;
60 // Sequence number of the last FIR message per sender SSRC.
61 flat_map<uint32_t, uint8_t> last_fir;
62 RtpStreamRtcpHandler* handler = nullptr;
63 };
64
65 // Helper to put several RTCP packets into lower layer datagram composing
66 // Compound or Reduced-Size RTCP packet, as defined by RFC 5506 section 2.
67 // TODO(danilchap): When in compound mode and packets are so many that several
68 // compound RTCP packets need to be generated, ensure each packet is compound.
69 class RtcpTransceiverImpl::PacketSender {
70 public:
PacketSender(rtcp::RtcpPacket::PacketReadyCallback callback,size_t max_packet_size)71 PacketSender(rtcp::RtcpPacket::PacketReadyCallback callback,
72 size_t max_packet_size)
73 : callback_(callback), max_packet_size_(max_packet_size) {
74 RTC_CHECK_LE(max_packet_size, IP_PACKET_SIZE);
75 }
~PacketSender()76 ~PacketSender() { RTC_DCHECK_EQ(index_, 0) << "Unsent rtcp packet."; }
77
78 // Appends a packet to pending compound packet.
79 // Sends rtcp compound packet if buffer was already full and resets buffer.
AppendPacket(const rtcp::RtcpPacket & packet)80 void AppendPacket(const rtcp::RtcpPacket& packet) {
81 packet.Create(buffer_, &index_, max_packet_size_, callback_);
82 }
83
84 // Sends pending rtcp compound packet.
Send()85 void Send() {
86 if (index_ > 0) {
87 callback_(rtc::ArrayView<const uint8_t>(buffer_, index_));
88 index_ = 0;
89 }
90 }
91
IsEmpty() const92 bool IsEmpty() const { return index_ == 0; }
93
94 private:
95 const rtcp::RtcpPacket::PacketReadyCallback callback_;
96 const size_t max_packet_size_;
97 size_t index_ = 0;
98 uint8_t buffer_[IP_PACKET_SIZE];
99 };
100
RtcpTransceiverImpl(const RtcpTransceiverConfig & config)101 RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config)
102 : config_(config), ready_to_send_(config.initial_ready_to_send) {
103 RTC_CHECK(config_.Validate());
104 if (ready_to_send_ && config_.schedule_periodic_compound_packets) {
105 SchedulePeriodicCompoundPackets(config_.initial_report_delay);
106 }
107 }
108
109 RtcpTransceiverImpl::~RtcpTransceiverImpl() = default;
110
AddMediaReceiverRtcpObserver(uint32_t remote_ssrc,MediaReceiverRtcpObserver * observer)111 void RtcpTransceiverImpl::AddMediaReceiverRtcpObserver(
112 uint32_t remote_ssrc,
113 MediaReceiverRtcpObserver* observer) {
114 if (config_.receive_statistics == nullptr && remote_senders_.empty()) {
115 RTC_LOG(LS_WARNING) << config_.debug_id
116 << "receive statistic is not set. RTCP report blocks "
117 "will not be generated.";
118 }
119 auto& stored = remote_senders_[remote_ssrc].observers;
120 RTC_DCHECK(!absl::c_linear_search(stored, observer));
121 stored.push_back(observer);
122 }
123
RemoveMediaReceiverRtcpObserver(uint32_t remote_ssrc,MediaReceiverRtcpObserver * observer)124 void RtcpTransceiverImpl::RemoveMediaReceiverRtcpObserver(
125 uint32_t remote_ssrc,
126 MediaReceiverRtcpObserver* observer) {
127 auto remote_sender_it = remote_senders_.find(remote_ssrc);
128 if (remote_sender_it == remote_senders_.end())
129 return;
130 auto& stored = remote_sender_it->second.observers;
131 auto it = absl::c_find(stored, observer);
132 if (it == stored.end())
133 return;
134 stored.erase(it);
135 }
136
AddMediaSender(uint32_t local_ssrc,RtpStreamRtcpHandler * handler)137 bool RtcpTransceiverImpl::AddMediaSender(uint32_t local_ssrc,
138 RtpStreamRtcpHandler* handler) {
139 RTC_DCHECK(handler != nullptr);
140 LocalSenderState state;
141 state.ssrc = local_ssrc;
142 state.handler = handler;
143 local_senders_.push_back(state);
144 auto it = std::prev(local_senders_.end());
145 auto [unused, inserted] = local_senders_by_ssrc_.emplace(local_ssrc, it);
146 if (!inserted) {
147 local_senders_.pop_back();
148 return false;
149 }
150 return true;
151 }
152
RemoveMediaSender(uint32_t local_ssrc)153 bool RtcpTransceiverImpl::RemoveMediaSender(uint32_t local_ssrc) {
154 auto index_it = local_senders_by_ssrc_.find(local_ssrc);
155 if (index_it == local_senders_by_ssrc_.end()) {
156 return false;
157 }
158 local_senders_.erase(index_it->second);
159 local_senders_by_ssrc_.erase(index_it);
160 return true;
161 }
162
SetReadyToSend(bool ready)163 void RtcpTransceiverImpl::SetReadyToSend(bool ready) {
164 if (config_.schedule_periodic_compound_packets) {
165 if (ready_to_send_ && !ready)
166 periodic_task_handle_.Stop();
167
168 if (!ready_to_send_ && ready) // Restart periodic sending.
169 SchedulePeriodicCompoundPackets(config_.report_period / 2);
170 }
171 ready_to_send_ = ready;
172 }
173
ReceivePacket(rtc::ArrayView<const uint8_t> packet,Timestamp now)174 void RtcpTransceiverImpl::ReceivePacket(rtc::ArrayView<const uint8_t> packet,
175 Timestamp now) {
176 // Report blocks may be spread across multiple sender and receiver reports.
177 std::vector<rtcp::ReportBlock> report_blocks;
178
179 while (!packet.empty()) {
180 rtcp::CommonHeader rtcp_block;
181 if (!rtcp_block.Parse(packet.data(), packet.size()))
182 break;
183
184 HandleReceivedPacket(rtcp_block, now, report_blocks);
185
186 packet = packet.subview(rtcp_block.packet_size());
187 }
188
189 if (!report_blocks.empty()) {
190 ProcessReportBlocks(now, report_blocks);
191 }
192 }
193
SendCompoundPacket()194 void RtcpTransceiverImpl::SendCompoundPacket() {
195 if (!ready_to_send_)
196 return;
197 SendPeriodicCompoundPacket();
198 ReschedulePeriodicCompoundPackets();
199 }
200
SetRemb(int64_t bitrate_bps,std::vector<uint32_t> ssrcs)201 void RtcpTransceiverImpl::SetRemb(int64_t bitrate_bps,
202 std::vector<uint32_t> ssrcs) {
203 RTC_DCHECK_GE(bitrate_bps, 0);
204
205 bool send_now = config_.send_remb_on_change &&
206 (!remb_.has_value() || bitrate_bps != remb_->bitrate_bps());
207 remb_.emplace();
208 remb_->SetSsrcs(std::move(ssrcs));
209 remb_->SetBitrateBps(bitrate_bps);
210 remb_->SetSenderSsrc(config_.feedback_ssrc);
211 // TODO(bugs.webrtc.org/8239): Move logic from PacketRouter for sending remb
212 // immideately on large bitrate change when there is one RtcpTransceiver per
213 // rtp transport.
214 if (send_now) {
215 absl::optional<rtcp::Remb> remb;
216 remb.swap(remb_);
217 SendImmediateFeedback(*remb);
218 remb.swap(remb_);
219 }
220 }
221
UnsetRemb()222 void RtcpTransceiverImpl::UnsetRemb() {
223 remb_.reset();
224 }
225
SendRawPacket(rtc::ArrayView<const uint8_t> packet)226 void RtcpTransceiverImpl::SendRawPacket(rtc::ArrayView<const uint8_t> packet) {
227 if (!ready_to_send_)
228 return;
229 // Unlike other senders, this functions just tries to send packet away and
230 // disregard rtcp_mode, max_packet_size or anything else.
231 // TODO(bugs.webrtc.org/8239): respect config_ by creating the
232 // TransportFeedback inside this class when there is one per rtp transport.
233 config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
234 }
235
SendNack(uint32_t ssrc,std::vector<uint16_t> sequence_numbers)236 void RtcpTransceiverImpl::SendNack(uint32_t ssrc,
237 std::vector<uint16_t> sequence_numbers) {
238 RTC_DCHECK(!sequence_numbers.empty());
239 if (!ready_to_send_)
240 return;
241 rtcp::Nack nack;
242 nack.SetSenderSsrc(config_.feedback_ssrc);
243 nack.SetMediaSsrc(ssrc);
244 nack.SetPacketIds(std::move(sequence_numbers));
245 SendImmediateFeedback(nack);
246 }
247
SendPictureLossIndication(uint32_t ssrc)248 void RtcpTransceiverImpl::SendPictureLossIndication(uint32_t ssrc) {
249 if (!ready_to_send_)
250 return;
251 rtcp::Pli pli;
252 pli.SetSenderSsrc(config_.feedback_ssrc);
253 pli.SetMediaSsrc(ssrc);
254 SendImmediateFeedback(pli);
255 }
256
SendFullIntraRequest(rtc::ArrayView<const uint32_t> ssrcs,bool new_request)257 void RtcpTransceiverImpl::SendFullIntraRequest(
258 rtc::ArrayView<const uint32_t> ssrcs,
259 bool new_request) {
260 RTC_DCHECK(!ssrcs.empty());
261 if (!ready_to_send_)
262 return;
263 rtcp::Fir fir;
264 fir.SetSenderSsrc(config_.feedback_ssrc);
265 for (uint32_t media_ssrc : ssrcs) {
266 uint8_t& command_seq_num = remote_senders_[media_ssrc].fir_sequence_number;
267 if (new_request)
268 command_seq_num += 1;
269 fir.AddRequestTo(media_ssrc, command_seq_num);
270 }
271 SendImmediateFeedback(fir);
272 }
273
HandleReceivedPacket(const rtcp::CommonHeader & rtcp_packet_header,Timestamp now,std::vector<rtcp::ReportBlock> & report_blocks)274 void RtcpTransceiverImpl::HandleReceivedPacket(
275 const rtcp::CommonHeader& rtcp_packet_header,
276 Timestamp now,
277 std::vector<rtcp::ReportBlock>& report_blocks) {
278 switch (rtcp_packet_header.type()) {
279 case rtcp::Bye::kPacketType:
280 HandleBye(rtcp_packet_header);
281 break;
282 case rtcp::SenderReport::kPacketType:
283 HandleSenderReport(rtcp_packet_header, now, report_blocks);
284 break;
285 case rtcp::ReceiverReport::kPacketType:
286 HandleReceiverReport(rtcp_packet_header, report_blocks);
287 break;
288 case rtcp::ExtendedReports::kPacketType:
289 HandleExtendedReports(rtcp_packet_header, now);
290 break;
291 case rtcp::Psfb::kPacketType:
292 HandlePayloadSpecificFeedback(rtcp_packet_header, now);
293 break;
294 case rtcp::Rtpfb::kPacketType:
295 HandleRtpFeedback(rtcp_packet_header, now);
296 break;
297 }
298 }
299
HandleBye(const rtcp::CommonHeader & rtcp_packet_header)300 void RtcpTransceiverImpl::HandleBye(
301 const rtcp::CommonHeader& rtcp_packet_header) {
302 rtcp::Bye bye;
303 if (!bye.Parse(rtcp_packet_header))
304 return;
305 auto remote_sender_it = remote_senders_.find(bye.sender_ssrc());
306 if (remote_sender_it == remote_senders_.end())
307 return;
308 for (MediaReceiverRtcpObserver* observer : remote_sender_it->second.observers)
309 observer->OnBye(bye.sender_ssrc());
310 }
311
HandleSenderReport(const rtcp::CommonHeader & rtcp_packet_header,Timestamp now,std::vector<rtcp::ReportBlock> & report_blocks)312 void RtcpTransceiverImpl::HandleSenderReport(
313 const rtcp::CommonHeader& rtcp_packet_header,
314 Timestamp now,
315 std::vector<rtcp::ReportBlock>& report_blocks) {
316 rtcp::SenderReport sender_report;
317 if (!sender_report.Parse(rtcp_packet_header))
318 return;
319 RemoteSenderState& remote_sender =
320 remote_senders_[sender_report.sender_ssrc()];
321 remote_sender.last_received_sender_report = {{now, sender_report.ntp()}};
322 const auto& received_report_blocks = sender_report.report_blocks();
323 CallbackOnReportBlocks(sender_report.sender_ssrc(), received_report_blocks);
324 report_blocks.insert(report_blocks.end(), received_report_blocks.begin(),
325 received_report_blocks.end());
326
327 for (MediaReceiverRtcpObserver* observer : remote_sender.observers)
328 observer->OnSenderReport(sender_report.sender_ssrc(), sender_report.ntp(),
329 sender_report.rtp_timestamp());
330 }
331
HandleReceiverReport(const rtcp::CommonHeader & rtcp_packet_header,std::vector<rtcp::ReportBlock> & report_blocks)332 void RtcpTransceiverImpl::HandleReceiverReport(
333 const rtcp::CommonHeader& rtcp_packet_header,
334 std::vector<rtcp::ReportBlock>& report_blocks) {
335 rtcp::ReceiverReport receiver_report;
336 if (!receiver_report.Parse(rtcp_packet_header)) {
337 return;
338 }
339 const auto& received_report_blocks = receiver_report.report_blocks();
340 CallbackOnReportBlocks(receiver_report.sender_ssrc(), received_report_blocks);
341 report_blocks.insert(report_blocks.end(), received_report_blocks.begin(),
342 received_report_blocks.end());
343 }
344
CallbackOnReportBlocks(uint32_t sender_ssrc,rtc::ArrayView<const rtcp::ReportBlock> report_blocks)345 void RtcpTransceiverImpl::CallbackOnReportBlocks(
346 uint32_t sender_ssrc,
347 rtc::ArrayView<const rtcp::ReportBlock> report_blocks) {
348 if (local_senders_.empty()) {
349 return;
350 }
351 for (const rtcp::ReportBlock& block : report_blocks) {
352 auto sender_it = local_senders_by_ssrc_.find(block.source_ssrc());
353 if (sender_it != local_senders_by_ssrc_.end()) {
354 sender_it->second->handler->OnReportBlock(sender_ssrc, block);
355 }
356 }
357 }
358
HandlePayloadSpecificFeedback(const rtcp::CommonHeader & rtcp_packet_header,Timestamp now)359 void RtcpTransceiverImpl::HandlePayloadSpecificFeedback(
360 const rtcp::CommonHeader& rtcp_packet_header,
361 Timestamp now) {
362 switch (rtcp_packet_header.fmt()) {
363 case rtcp::Fir::kFeedbackMessageType:
364 HandleFir(rtcp_packet_header);
365 break;
366 case rtcp::Pli::kFeedbackMessageType:
367 HandlePli(rtcp_packet_header);
368 break;
369 case rtcp::Psfb::kAfbMessageType:
370 HandleRemb(rtcp_packet_header, now);
371 break;
372 }
373 }
374
HandleFir(const rtcp::CommonHeader & rtcp_packet_header)375 void RtcpTransceiverImpl::HandleFir(
376 const rtcp::CommonHeader& rtcp_packet_header) {
377 rtcp::Fir fir;
378 if (local_senders_.empty() || !fir.Parse(rtcp_packet_header)) {
379 return;
380 }
381 for (const rtcp::Fir::Request& r : fir.requests()) {
382 auto it = local_senders_by_ssrc_.find(r.ssrc);
383 if (it == local_senders_by_ssrc_.end()) {
384 continue;
385 }
386 auto [fir_it, is_new] =
387 it->second->last_fir.emplace(fir.sender_ssrc(), r.seq_nr);
388 if (is_new || fir_it->second != r.seq_nr) {
389 it->second->handler->OnFir(fir.sender_ssrc());
390 fir_it->second = r.seq_nr;
391 }
392 }
393 }
394
HandlePli(const rtcp::CommonHeader & rtcp_packet_header)395 void RtcpTransceiverImpl::HandlePli(
396 const rtcp::CommonHeader& rtcp_packet_header) {
397 rtcp::Pli pli;
398 if (local_senders_.empty() || !pli.Parse(rtcp_packet_header)) {
399 return;
400 }
401 auto it = local_senders_by_ssrc_.find(pli.media_ssrc());
402 if (it != local_senders_by_ssrc_.end()) {
403 it->second->handler->OnPli(pli.sender_ssrc());
404 }
405 }
406
HandleRemb(const rtcp::CommonHeader & rtcp_packet_header,Timestamp now)407 void RtcpTransceiverImpl::HandleRemb(
408 const rtcp::CommonHeader& rtcp_packet_header,
409 Timestamp now) {
410 rtcp::Remb remb;
411 if (config_.network_link_observer == nullptr ||
412 !remb.Parse(rtcp_packet_header)) {
413 return;
414 }
415 config_.network_link_observer->OnReceiverEstimatedMaxBitrate(
416 now, DataRate::BitsPerSec(remb.bitrate_bps()));
417 }
418
HandleRtpFeedback(const rtcp::CommonHeader & rtcp_packet_header,Timestamp now)419 void RtcpTransceiverImpl::HandleRtpFeedback(
420 const rtcp::CommonHeader& rtcp_packet_header,
421 Timestamp now) {
422 switch (rtcp_packet_header.fmt()) {
423 case rtcp::Nack::kFeedbackMessageType:
424 HandleNack(rtcp_packet_header);
425 break;
426 case rtcp::TransportFeedback::kFeedbackMessageType:
427 HandleTransportFeedback(rtcp_packet_header, now);
428 break;
429 }
430 }
431
HandleNack(const rtcp::CommonHeader & rtcp_packet_header)432 void RtcpTransceiverImpl::HandleNack(
433 const rtcp::CommonHeader& rtcp_packet_header) {
434 rtcp::Nack nack;
435 if (local_senders_.empty() || !nack.Parse(rtcp_packet_header)) {
436 return;
437 }
438 auto it = local_senders_by_ssrc_.find(nack.media_ssrc());
439 if (it != local_senders_by_ssrc_.end()) {
440 it->second->handler->OnNack(nack.sender_ssrc(), nack.packet_ids());
441 }
442 }
443
HandleTransportFeedback(const rtcp::CommonHeader & rtcp_packet_header,Timestamp now)444 void RtcpTransceiverImpl::HandleTransportFeedback(
445 const rtcp::CommonHeader& rtcp_packet_header,
446 Timestamp now) {
447 RTC_DCHECK_EQ(rtcp_packet_header.fmt(),
448 rtcp::TransportFeedback::kFeedbackMessageType);
449 if (config_.network_link_observer == nullptr) {
450 return;
451 }
452 rtcp::TransportFeedback feedback;
453 if (feedback.Parse(rtcp_packet_header)) {
454 config_.network_link_observer->OnTransportFeedback(now, feedback);
455 }
456 }
457
HandleExtendedReports(const rtcp::CommonHeader & rtcp_packet_header,Timestamp now)458 void RtcpTransceiverImpl::HandleExtendedReports(
459 const rtcp::CommonHeader& rtcp_packet_header,
460 Timestamp now) {
461 rtcp::ExtendedReports extended_reports;
462 if (!extended_reports.Parse(rtcp_packet_header))
463 return;
464
465 if (config_.reply_to_non_sender_rtt_measurement && extended_reports.rrtr()) {
466 RrtrTimes& rrtr = received_rrtrs_[extended_reports.sender_ssrc()];
467 rrtr.received_remote_mid_ntp_time =
468 CompactNtp(extended_reports.rrtr()->ntp());
469 rrtr.local_receive_mid_ntp_time =
470 CompactNtp(config_.clock->ConvertTimestampToNtpTime(now));
471 }
472
473 if (extended_reports.dlrr())
474 HandleDlrr(extended_reports.dlrr(), now);
475
476 if (extended_reports.target_bitrate())
477 HandleTargetBitrate(*extended_reports.target_bitrate(),
478 extended_reports.sender_ssrc());
479 }
480
HandleDlrr(const rtcp::Dlrr & dlrr,Timestamp now)481 void RtcpTransceiverImpl::HandleDlrr(const rtcp::Dlrr& dlrr, Timestamp now) {
482 if (!config_.non_sender_rtt_measurement ||
483 config_.network_link_observer == nullptr) {
484 return;
485 }
486
487 // Delay and last_rr are transferred using 32bit compact ntp resolution.
488 // Convert packet arrival time to same format through 64bit ntp format.
489 uint32_t receive_time_ntp =
490 CompactNtp(config_.clock->ConvertTimestampToNtpTime(now));
491 for (const rtcp::ReceiveTimeInfo& rti : dlrr.sub_blocks()) {
492 if (rti.ssrc != config_.feedback_ssrc)
493 continue;
494 uint32_t rtt_ntp = receive_time_ntp - rti.delay_since_last_rr - rti.last_rr;
495 TimeDelta rtt = CompactNtpRttToTimeDelta(rtt_ntp);
496 config_.network_link_observer->OnRttUpdate(now, rtt);
497 }
498 }
499
ProcessReportBlocks(Timestamp now,rtc::ArrayView<const rtcp::ReportBlock> report_blocks)500 void RtcpTransceiverImpl::ProcessReportBlocks(
501 Timestamp now,
502 rtc::ArrayView<const rtcp::ReportBlock> report_blocks) {
503 RTC_DCHECK(!report_blocks.empty());
504 if (config_.network_link_observer == nullptr) {
505 return;
506 }
507 // Round trip time calculated from different report blocks suppose to be about
508 // the same, as those blocks should be generated by the same remote sender.
509 // To avoid too many callbacks, this code accumulate multiple rtts into one.
510 TimeDelta rtt_sum = TimeDelta::Zero();
511 size_t num_rtts = 0;
512 uint32_t receive_time_ntp =
513 CompactNtp(config_.clock->ConvertTimestampToNtpTime(now));
514 for (const rtcp::ReportBlock& report_block : report_blocks) {
515 if (report_block.last_sr() == 0) {
516 continue;
517 }
518
519 uint32_t rtt_ntp = receive_time_ntp - report_block.delay_since_last_sr() -
520 report_block.last_sr();
521 rtt_sum += CompactNtpRttToTimeDelta(rtt_ntp);
522 ++num_rtts;
523 }
524 // For backward compatibility, do not report rtt based on report blocks to the
525 // `config_.rtt_observer`
526 if (num_rtts > 0) {
527 config_.network_link_observer->OnRttUpdate(now, rtt_sum / num_rtts);
528 }
529 config_.network_link_observer->OnReportBlocks(now, report_blocks);
530 }
531
HandleTargetBitrate(const rtcp::TargetBitrate & target_bitrate,uint32_t remote_ssrc)532 void RtcpTransceiverImpl::HandleTargetBitrate(
533 const rtcp::TargetBitrate& target_bitrate,
534 uint32_t remote_ssrc) {
535 auto remote_sender_it = remote_senders_.find(remote_ssrc);
536 if (remote_sender_it == remote_senders_.end() ||
537 remote_sender_it->second.observers.empty())
538 return;
539
540 // Convert rtcp::TargetBitrate to VideoBitrateAllocation.
541 VideoBitrateAllocation bitrate_allocation;
542 for (const rtcp::TargetBitrate::BitrateItem& item :
543 target_bitrate.GetTargetBitrates()) {
544 if (item.spatial_layer >= kMaxSpatialLayers ||
545 item.temporal_layer >= kMaxTemporalStreams) {
546 RTC_DLOG(LS_WARNING)
547 << config_.debug_id
548 << "Invalid incoming TargetBitrate with spatial layer "
549 << item.spatial_layer << ", temporal layer " << item.temporal_layer;
550 continue;
551 }
552 bitrate_allocation.SetBitrate(item.spatial_layer, item.temporal_layer,
553 item.target_bitrate_kbps * 1000);
554 }
555
556 for (MediaReceiverRtcpObserver* observer : remote_sender_it->second.observers)
557 observer->OnBitrateAllocation(remote_ssrc, bitrate_allocation);
558 }
559
ReschedulePeriodicCompoundPackets()560 void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets() {
561 if (!config_.schedule_periodic_compound_packets)
562 return;
563 periodic_task_handle_.Stop();
564 RTC_DCHECK(ready_to_send_);
565 SchedulePeriodicCompoundPackets(config_.report_period);
566 }
567
SchedulePeriodicCompoundPackets(TimeDelta delay)568 void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(TimeDelta delay) {
569 periodic_task_handle_ = RepeatingTaskHandle::DelayedStart(
570 config_.task_queue, delay,
571 [this] {
572 RTC_DCHECK(config_.schedule_periodic_compound_packets);
573 RTC_DCHECK(ready_to_send_);
574 SendPeriodicCompoundPacket();
575 return config_.report_period;
576 },
577 TaskQueueBase::DelayPrecision::kLow, config_.clock);
578 }
579
FillReports(Timestamp now,ReservedBytes reserved,PacketSender & rtcp_sender)580 std::vector<uint32_t> RtcpTransceiverImpl::FillReports(
581 Timestamp now,
582 ReservedBytes reserved,
583 PacketSender& rtcp_sender) {
584 // Sender/receiver reports should be first in the RTCP packet.
585 RTC_DCHECK(rtcp_sender.IsEmpty());
586
587 size_t available_bytes = config_.max_packet_size;
588 if (reserved.per_packet > available_bytes) {
589 // Because reserved.per_packet is unsigned, substracting would underflow and
590 // will not produce desired result.
591 available_bytes = 0;
592 } else {
593 available_bytes -= reserved.per_packet;
594 }
595
596 const size_t sender_report_size_bytes = 28 + reserved.per_sender;
597 const size_t full_sender_report_size_bytes =
598 sender_report_size_bytes +
599 rtcp::SenderReport::kMaxNumberOfReportBlocks * rtcp::ReportBlock::kLength;
600 size_t max_full_sender_reports =
601 available_bytes / full_sender_report_size_bytes;
602 size_t max_report_blocks =
603 max_full_sender_reports * rtcp::SenderReport::kMaxNumberOfReportBlocks;
604 size_t available_bytes_for_last_sender_report =
605 available_bytes - max_full_sender_reports * full_sender_report_size_bytes;
606 if (available_bytes_for_last_sender_report >= sender_report_size_bytes) {
607 max_report_blocks +=
608 (available_bytes_for_last_sender_report - sender_report_size_bytes) /
609 rtcp::ReportBlock::kLength;
610 }
611
612 std::vector<rtcp::ReportBlock> report_blocks =
613 CreateReportBlocks(now, max_report_blocks);
614 // Previous calculation of max number of sender report made space for max
615 // number of report blocks per sender report, but if number of report blocks
616 // is low, more sender reports may fit in.
617 size_t max_sender_reports =
618 (available_bytes - report_blocks.size() * rtcp::ReportBlock::kLength) /
619 sender_report_size_bytes;
620
621 auto last_handled_sender_it = local_senders_.end();
622 auto report_block_it = report_blocks.begin();
623 std::vector<uint32_t> sender_ssrcs;
624 for (auto it = local_senders_.begin();
625 it != local_senders_.end() && sender_ssrcs.size() < max_sender_reports;
626 ++it) {
627 LocalSenderState& rtp_sender = *it;
628 RtpStreamRtcpHandler::RtpStats stats = rtp_sender.handler->SentStats();
629
630 if (stats.num_sent_bytes() < rtp_sender.last_num_sent_bytes) {
631 RTC_LOG(LS_ERROR) << "Inconsistent SR for SSRC " << rtp_sender.ssrc
632 << ". Number of total sent bytes decreased.";
633 rtp_sender.last_num_sent_bytes = 0;
634 }
635 if (stats.num_sent_bytes() == rtp_sender.last_num_sent_bytes) {
636 // Skip because no RTP packet was send for this SSRC since last report.
637 continue;
638 }
639 rtp_sender.last_num_sent_bytes = stats.num_sent_bytes();
640
641 last_handled_sender_it = it;
642 rtcp::SenderReport sender_report;
643 sender_report.SetSenderSsrc(rtp_sender.ssrc);
644 sender_report.SetPacketCount(stats.num_sent_packets());
645 sender_report.SetOctetCount(stats.num_sent_bytes());
646 sender_report.SetNtp(config_.clock->ConvertTimestampToNtpTime(now));
647 RTC_DCHECK_GE(now, stats.last_capture_time());
648 sender_report.SetRtpTimestamp(
649 stats.last_rtp_timestamp() +
650 ((now - stats.last_capture_time()) * stats.last_clock_rate())
651 .seconds());
652 if (report_block_it != report_blocks.end()) {
653 size_t num_blocks =
654 std::min<size_t>(rtcp::SenderReport::kMaxNumberOfReportBlocks,
655 report_blocks.end() - report_block_it);
656 std::vector<rtcp::ReportBlock> sub_blocks(report_block_it,
657 report_block_it + num_blocks);
658 sender_report.SetReportBlocks(std::move(sub_blocks));
659 report_block_it += num_blocks;
660 }
661 rtcp_sender.AppendPacket(sender_report);
662 sender_ssrcs.push_back(rtp_sender.ssrc);
663 }
664 if (last_handled_sender_it != local_senders_.end()) {
665 // Rotate `local_senders_` so that the 1st unhandled sender become first in
666 // the list, and thus will be first to generate rtcp sender report for on
667 // the next call to `FillReports`.
668 local_senders_.splice(local_senders_.end(), local_senders_,
669 local_senders_.begin(),
670 std::next(last_handled_sender_it));
671 }
672
673 // Calculcate number of receiver reports to attach remaining report blocks to.
674 size_t num_receiver_reports =
675 DivideRoundUp(report_blocks.end() - report_block_it,
676 rtcp::ReceiverReport::kMaxNumberOfReportBlocks);
677
678 // In compound mode each RTCP packet has to start with a sender or receiver
679 // report.
680 if (config_.rtcp_mode == RtcpMode::kCompound && sender_ssrcs.empty() &&
681 num_receiver_reports == 0) {
682 num_receiver_reports = 1;
683 }
684
685 uint32_t sender_ssrc =
686 sender_ssrcs.empty() ? config_.feedback_ssrc : sender_ssrcs.front();
687 for (size_t i = 0; i < num_receiver_reports; ++i) {
688 rtcp::ReceiverReport receiver_report;
689 receiver_report.SetSenderSsrc(sender_ssrc);
690 size_t num_blocks =
691 std::min<size_t>(rtcp::ReceiverReport::kMaxNumberOfReportBlocks,
692 report_blocks.end() - report_block_it);
693 std::vector<rtcp::ReportBlock> sub_blocks(report_block_it,
694 report_block_it + num_blocks);
695 receiver_report.SetReportBlocks(std::move(sub_blocks));
696 report_block_it += num_blocks;
697 rtcp_sender.AppendPacket(receiver_report);
698 }
699 // All report blocks should be attached at this point.
700 RTC_DCHECK_EQ(report_blocks.end() - report_block_it, 0);
701 return sender_ssrcs;
702 }
703
CreateCompoundPacket(Timestamp now,size_t reserved_bytes,PacketSender & sender)704 void RtcpTransceiverImpl::CreateCompoundPacket(Timestamp now,
705 size_t reserved_bytes,
706 PacketSender& sender) {
707 RTC_DCHECK(sender.IsEmpty());
708 ReservedBytes reserved = {.per_packet = reserved_bytes};
709 absl::optional<rtcp::Sdes> sdes;
710 if (!config_.cname.empty()) {
711 sdes.emplace();
712 bool added = sdes->AddCName(config_.feedback_ssrc, config_.cname);
713 RTC_DCHECK(added) << "Failed to add CNAME " << config_.cname
714 << " to RTCP SDES packet.";
715 reserved.per_packet += sdes->BlockLength();
716 }
717 if (remb_.has_value()) {
718 reserved.per_packet += remb_->BlockLength();
719 }
720 absl::optional<rtcp::ExtendedReports> xr_with_dlrr;
721 if (!received_rrtrs_.empty()) {
722 RTC_DCHECK(config_.reply_to_non_sender_rtt_measurement);
723 xr_with_dlrr.emplace();
724 uint32_t now_ntp =
725 CompactNtp(config_.clock->ConvertTimestampToNtpTime(now));
726 for (const auto& [ssrc, rrtr_info] : received_rrtrs_) {
727 rtcp::ReceiveTimeInfo reply;
728 reply.ssrc = ssrc;
729 reply.last_rr = rrtr_info.received_remote_mid_ntp_time;
730 reply.delay_since_last_rr =
731 now_ntp - rrtr_info.local_receive_mid_ntp_time;
732 xr_with_dlrr->AddDlrrItem(reply);
733 }
734 if (config_.reply_to_non_sender_rtt_mesaurments_on_all_ssrcs) {
735 reserved.per_sender += xr_with_dlrr->BlockLength();
736 } else {
737 reserved.per_packet += xr_with_dlrr->BlockLength();
738 }
739 }
740 if (config_.non_sender_rtt_measurement) {
741 // It looks like bytes for ExtendedReport header are reserved twice, but in
742 // practice the same RtcpTransceiver won't both produce RRTR (i.e. it is a
743 // receiver-only) and reply to RRTR (i.e. remote participant is a receiver
744 // only). If that happen, then `reserved_bytes` would be slightly larger
745 // than it should, which is not an issue.
746
747 // 4 bytes for common RTCP header + 4 bytes for the ExtenedReports header.
748 reserved.per_packet += (4 + 4 + rtcp::Rrtr::kLength);
749 }
750
751 std::vector<uint32_t> sender_ssrcs = FillReports(now, reserved, sender);
752 bool has_sender_report = !sender_ssrcs.empty();
753 uint32_t sender_ssrc =
754 has_sender_report ? sender_ssrcs.front() : config_.feedback_ssrc;
755
756 if (sdes.has_value() && !sender.IsEmpty()) {
757 sender.AppendPacket(*sdes);
758 }
759 if (remb_.has_value()) {
760 remb_->SetSenderSsrc(sender_ssrc);
761 sender.AppendPacket(*remb_);
762 }
763 if (!has_sender_report && config_.non_sender_rtt_measurement) {
764 rtcp::ExtendedReports xr_with_rrtr;
765 xr_with_rrtr.SetSenderSsrc(config_.feedback_ssrc);
766 rtcp::Rrtr rrtr;
767 rrtr.SetNtp(config_.clock->ConvertTimestampToNtpTime(now));
768 xr_with_rrtr.SetRrtr(rrtr);
769 sender.AppendPacket(xr_with_rrtr);
770 }
771 if (xr_with_dlrr.has_value()) {
772 rtc::ArrayView<const uint32_t> ssrcs(&sender_ssrc, 1);
773 if (config_.reply_to_non_sender_rtt_mesaurments_on_all_ssrcs &&
774 !sender_ssrcs.empty()) {
775 ssrcs = sender_ssrcs;
776 }
777 RTC_DCHECK(!ssrcs.empty());
778 for (uint32_t ssrc : ssrcs) {
779 xr_with_dlrr->SetSenderSsrc(ssrc);
780 sender.AppendPacket(*xr_with_dlrr);
781 }
782 }
783 }
784
SendPeriodicCompoundPacket()785 void RtcpTransceiverImpl::SendPeriodicCompoundPacket() {
786 auto send_packet = [this](rtc::ArrayView<const uint8_t> packet) {
787 config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
788 };
789 Timestamp now = config_.clock->CurrentTime();
790 PacketSender sender(send_packet, config_.max_packet_size);
791 CreateCompoundPacket(now, /*reserved_bytes=*/0, sender);
792 sender.Send();
793 }
794
SendCombinedRtcpPacket(std::vector<std::unique_ptr<rtcp::RtcpPacket>> rtcp_packets)795 void RtcpTransceiverImpl::SendCombinedRtcpPacket(
796 std::vector<std::unique_ptr<rtcp::RtcpPacket>> rtcp_packets) {
797 auto send_packet = [this](rtc::ArrayView<const uint8_t> packet) {
798 config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
799 };
800 PacketSender sender(send_packet, config_.max_packet_size);
801
802 for (auto& rtcp_packet : rtcp_packets) {
803 rtcp_packet->SetSenderSsrc(config_.feedback_ssrc);
804 sender.AppendPacket(*rtcp_packet);
805 }
806 sender.Send();
807 }
808
SendImmediateFeedback(const rtcp::RtcpPacket & rtcp_packet)809 void RtcpTransceiverImpl::SendImmediateFeedback(
810 const rtcp::RtcpPacket& rtcp_packet) {
811 auto send_packet = [this](rtc::ArrayView<const uint8_t> packet) {
812 config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
813 };
814 PacketSender sender(send_packet, config_.max_packet_size);
815 // Compound mode requires every sent rtcp packet to be compound, i.e. start
816 // with a sender or receiver report.
817 if (config_.rtcp_mode == RtcpMode::kCompound) {
818 Timestamp now = config_.clock->CurrentTime();
819 CreateCompoundPacket(now, /*reserved_bytes=*/rtcp_packet.BlockLength(),
820 sender);
821 }
822
823 sender.AppendPacket(rtcp_packet);
824 sender.Send();
825
826 // If compound packet was sent, delay (reschedule) the periodic one.
827 if (config_.rtcp_mode == RtcpMode::kCompound)
828 ReschedulePeriodicCompoundPackets();
829 }
830
CreateReportBlocks(Timestamp now,size_t num_max_blocks)831 std::vector<rtcp::ReportBlock> RtcpTransceiverImpl::CreateReportBlocks(
832 Timestamp now,
833 size_t num_max_blocks) {
834 if (!config_.receive_statistics)
835 return {};
836 std::vector<rtcp::ReportBlock> report_blocks =
837 config_.receive_statistics->RtcpReportBlocks(num_max_blocks);
838 uint32_t last_sr = 0;
839 uint32_t last_delay = 0;
840 for (rtcp::ReportBlock& report_block : report_blocks) {
841 auto it = remote_senders_.find(report_block.source_ssrc());
842 if (it == remote_senders_.end() ||
843 !it->second.last_received_sender_report) {
844 continue;
845 }
846 const SenderReportTimes& last_sender_report =
847 *it->second.last_received_sender_report;
848 last_sr = CompactNtp(last_sender_report.remote_sent_time);
849 last_delay =
850 SaturatedToCompactNtp(now - last_sender_report.local_received_time);
851 report_block.SetLastSr(last_sr);
852 report_block.SetDelayLastSr(last_delay);
853 }
854 return report_blocks;
855 }
856
857 } // namespace webrtc
858