• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "cast/streaming/sender_packet_router.h"
6 
7 #include <algorithm>
8 #include <utility>
9 
10 #include "cast/streaming/constants.h"
11 #include "cast/streaming/packet_util.h"
12 #include "util/chrono_helpers.h"
13 #include "util/osp_logging.h"
14 #include "util/saturate_cast.h"
15 #include "util/stringprintf.h"
16 
17 namespace openscreen {
18 namespace cast {
19 
SenderPacketRouter(Environment * environment,int max_burst_bitrate)20 SenderPacketRouter::SenderPacketRouter(Environment* environment,
21                                        int max_burst_bitrate)
22     : SenderPacketRouter(
23           environment,
24           ComputeMaxPacketsPerBurst(max_burst_bitrate,
25                                     environment->GetMaxPacketSize(),
26                                     kDefaultBurstInterval),
27           kDefaultBurstInterval) {}
28 
SenderPacketRouter(Environment * environment,int max_packets_per_burst,milliseconds burst_interval)29 SenderPacketRouter::SenderPacketRouter(Environment* environment,
30                                        int max_packets_per_burst,
31                                        milliseconds burst_interval)
32     : BandwidthEstimator(max_packets_per_burst,
33                          burst_interval,
34                          environment->now()),
35       environment_(environment),
36       packet_buffer_size_(environment->GetMaxPacketSize()),
37       packet_buffer_(new uint8_t[packet_buffer_size_]),
38       max_packets_per_burst_(max_packets_per_burst),
39       burst_interval_(burst_interval),
40       max_burst_bitrate_(ComputeMaxBurstBitrate(packet_buffer_size_,
41                                                 max_packets_per_burst_,
42                                                 burst_interval_)),
43       alarm_(environment_->now_function(), environment_->task_runner()) {
44   OSP_DCHECK(environment_);
45   OSP_DCHECK_GT(packet_buffer_size_, kRequiredNetworkPacketSize);
46 }
47 
~SenderPacketRouter()48 SenderPacketRouter::~SenderPacketRouter() {
49   OSP_DCHECK(senders_.empty());
50 }
51 
OnSenderCreated(Ssrc receiver_ssrc,Sender * sender)52 void SenderPacketRouter::OnSenderCreated(Ssrc receiver_ssrc, Sender* sender) {
53   OSP_DCHECK(FindEntry(receiver_ssrc) == senders_.end());
54   senders_.push_back(SenderEntry{receiver_ssrc, sender, kNever, kNever});
55 
56   if (senders_.size() == 1) {
57     environment_->ConsumeIncomingPackets(this);
58   } else {
59     // Sort the list of Senders so that they are iterated in priority order.
60     std::sort(senders_.begin(), senders_.end());
61   }
62 }
63 
OnSenderDestroyed(Ssrc receiver_ssrc)64 void SenderPacketRouter::OnSenderDestroyed(Ssrc receiver_ssrc) {
65   const auto it = FindEntry(receiver_ssrc);
66   OSP_DCHECK(it != senders_.end());
67   senders_.erase(it);
68 
69   // If there are no longer any Senders, suspend receiving RTCP packets.
70   if (senders_.empty()) {
71     environment_->DropIncomingPackets();
72   }
73 }
74 
RequestRtcpSend(Ssrc receiver_ssrc)75 void SenderPacketRouter::RequestRtcpSend(Ssrc receiver_ssrc) {
76   const auto it = FindEntry(receiver_ssrc);
77   OSP_DCHECK(it != senders_.end());
78   it->next_rtcp_send_time = Alarm::kImmediately;
79   ScheduleNextBurst();
80 }
81 
RequestRtpSend(Ssrc receiver_ssrc)82 void SenderPacketRouter::RequestRtpSend(Ssrc receiver_ssrc) {
83   const auto it = FindEntry(receiver_ssrc);
84   OSP_DCHECK(it != senders_.end());
85   it->next_rtp_send_time = Alarm::kImmediately;
86   ScheduleNextBurst();
87 }
88 
OnReceivedPacket(const IPEndpoint & source,Clock::time_point arrival_time,std::vector<uint8_t> packet)89 void SenderPacketRouter::OnReceivedPacket(const IPEndpoint& source,
90                                           Clock::time_point arrival_time,
91                                           std::vector<uint8_t> packet) {
92   // If the packet did not come from the expected endpoint, ignore it.
93   OSP_DCHECK_NE(source.port, uint16_t{0});
94   if (source != environment_->remote_endpoint()) {
95     return;
96   }
97 
98   // Determine which Sender to dispatch the packet to. Senders may only receive
99   // RTCP packets from Receivers. Log a warning containing a pretty-printed dump
100   // if the packet is not an RTCP packet.
101   const std::pair<ApparentPacketType, Ssrc> seems_like =
102       InspectPacketForRouting(packet);
103   if (seems_like.first != ApparentPacketType::RTCP) {
104     constexpr int kMaxPartiaHexDumpSize = 96;
105     const std::size_t encode_size =
106         std::min(packet.size(), static_cast<size_t>(kMaxPartiaHexDumpSize));
107     OSP_LOG_WARN << "UNKNOWN packet of " << packet.size()
108                  << " bytes. Partial hex dump: "
109                  << HexEncode(packet.data(), encode_size);
110     return;
111   }
112   const auto it = FindEntry(seems_like.second);
113   if (it != senders_.end()) {
114     it->sender->OnReceivedRtcpPacket(arrival_time, std::move(packet));
115   }
116 }
117 
FindEntry(Ssrc receiver_ssrc)118 SenderPacketRouter::SenderEntries::iterator SenderPacketRouter::FindEntry(
119     Ssrc receiver_ssrc) {
120   return std::find_if(senders_.begin(), senders_.end(),
121                       [receiver_ssrc](const SenderEntry& entry) {
122                         return entry.receiver_ssrc == receiver_ssrc;
123                       });
124 }
125 
ScheduleNextBurst()126 void SenderPacketRouter::ScheduleNextBurst() {
127   // Determine the next burst time by scanning for the earliest of the
128   // next-scheduled send times for each Sender.
129   const Clock::time_point earliest_allowed_burst_time =
130       last_burst_time_ + burst_interval_;
131   Clock::time_point next_burst_time = kNever;
132   for (const SenderEntry& entry : senders_) {
133     const auto next_send_time =
134         std::min(entry.next_rtcp_send_time, entry.next_rtp_send_time);
135     if (next_send_time >= next_burst_time) {
136       continue;
137     }
138     if (next_send_time <= earliest_allowed_burst_time) {
139       next_burst_time = earliest_allowed_burst_time;
140       // No need to continue, since |next_burst_time| cannot become any earlier.
141       break;
142     }
143     next_burst_time = next_send_time;
144   }
145 
146   // Schedule the alarm for the next burst time unless none of the Senders has
147   // anything to send.
148   if (next_burst_time == kNever) {
149     alarm_.Cancel();
150   } else {
151     alarm_.Schedule([this] { SendBurstOfPackets(); }, next_burst_time);
152   }
153 }
154 
SendBurstOfPackets()155 void SenderPacketRouter::SendBurstOfPackets() {
156   // Treat RTCP packets as "critical priority," and so there is no upper limit
157   // on the number to send. Practically, this will always be limited by the
158   // number of Senders; so, this won't be a huge number of packets.
159   const Clock::time_point burst_time = environment_->now();
160   const int num_rtcp_packets_sent = SendJustTheRtcpPackets(burst_time);
161   // Now send all the RTP packets, up to the maximum number allowed in a burst.
162   // Higher priority Senders' RTP packets are sent first.
163   const int num_rtp_packets_sent = SendJustTheRtpPackets(
164       burst_time, max_packets_per_burst_ - num_rtcp_packets_sent);
165   last_burst_time_ = burst_time;
166 
167   BandwidthEstimator::OnBurstComplete(
168       num_rtcp_packets_sent + num_rtp_packets_sent, burst_time);
169 
170   ScheduleNextBurst();
171 }
172 
SendJustTheRtcpPackets(Clock::time_point send_time)173 int SenderPacketRouter::SendJustTheRtcpPackets(Clock::time_point send_time) {
174   int num_sent = 0;
175   for (SenderEntry& entry : senders_) {
176     if (entry.next_rtcp_send_time > send_time) {
177       continue;
178     }
179 
180     // Note: Only one RTCP packet is sent from the same Sender in the same
181     // burst. This is because RTCP packets are supposed to always contain the
182     // most up-to-date Sender state. Having multiple RTCP packets in the same
183     // burst would mean that all but the last one are old/irrelevant snapshots
184     // of Sender state, and this would just thrash/confuse the Receiver.
185     const absl::Span<uint8_t> packet =
186         entry.sender->GetRtcpPacketForImmediateSend(
187             send_time,
188             absl::Span<uint8_t>(packet_buffer_.get(), packet_buffer_size_));
189     if (!packet.empty()) {
190       environment_->SendPacket(packet);
191       entry.next_rtcp_send_time = send_time + kRtcpReportInterval;
192       ++num_sent;
193     }
194   }
195 
196   return num_sent;
197 }
198 
SendJustTheRtpPackets(Clock::time_point send_time,int num_packets_to_send)199 int SenderPacketRouter::SendJustTheRtpPackets(Clock::time_point send_time,
200                                               int num_packets_to_send) {
201   int num_sent = 0;
202   for (SenderEntry& entry : senders_) {
203     if (num_sent >= num_packets_to_send) {
204       break;
205     }
206     if (entry.next_rtp_send_time > send_time) {
207       continue;
208     }
209 
210     for (; num_sent < num_packets_to_send; ++num_sent) {
211       const absl::Span<uint8_t> packet =
212           entry.sender->GetRtpPacketForImmediateSend(
213               send_time,
214               absl::Span<uint8_t>(packet_buffer_.get(), packet_buffer_size_));
215       if (packet.empty()) {
216         break;
217       }
218       environment_->SendPacket(packet);
219     }
220     entry.next_rtp_send_time = entry.sender->GetRtpResumeTime();
221   }
222 
223   return num_sent;
224 }
225 
226 namespace {
227 constexpr int kBitsPerByte = 8;
228 constexpr auto kOneSecondInMilliseconds = to_milliseconds(seconds(1));
229 }  // namespace
230 
231 // static
ComputeMaxPacketsPerBurst(int max_burst_bitrate,int packet_size,milliseconds burst_interval)232 int SenderPacketRouter::ComputeMaxPacketsPerBurst(int max_burst_bitrate,
233                                                   int packet_size,
234                                                   milliseconds burst_interval) {
235   OSP_DCHECK_GT(max_burst_bitrate, 0);
236   OSP_DCHECK_GT(packet_size, 0);
237   OSP_DCHECK_GT(burst_interval, milliseconds(0));
238   OSP_DCHECK_LE(burst_interval, kOneSecondInMilliseconds);
239 
240   const int max_packets_per_second =
241       max_burst_bitrate / kBitsPerByte / packet_size;
242   const int bursts_per_second = kOneSecondInMilliseconds / burst_interval;
243   return std::max(max_packets_per_second / bursts_per_second, 1);
244 }
245 
246 // static
ComputeMaxBurstBitrate(int packet_size,int max_packets_per_burst,milliseconds burst_interval)247 int SenderPacketRouter::ComputeMaxBurstBitrate(int packet_size,
248                                                int max_packets_per_burst,
249                                                milliseconds burst_interval) {
250   OSP_DCHECK_GT(packet_size, 0);
251   OSP_DCHECK_GT(max_packets_per_burst, 0);
252   OSP_DCHECK_GT(burst_interval, milliseconds(0));
253   OSP_DCHECK_LE(burst_interval, kOneSecondInMilliseconds);
254 
255   const int64_t max_bits_per_burst =
256       int64_t{packet_size} * kBitsPerByte * max_packets_per_burst;
257   const int bursts_per_second = kOneSecondInMilliseconds / burst_interval;
258   return saturate_cast<int>(max_bits_per_burst * bursts_per_second);
259 }
260 
261 SenderPacketRouter::Sender::~Sender() = default;
262 
263 // static
264 constexpr int SenderPacketRouter::kDefaultMaxBurstBitrate;
265 // static
266 constexpr milliseconds SenderPacketRouter::kDefaultBurstInterval;
267 // static
268 constexpr Clock::time_point SenderPacketRouter::kNever;
269 
270 }  // namespace cast
271 }  // namespace openscreen
272