1 // Copyright 2020 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "cast/streaming/sender_packet_router.h"
6
7 #include <algorithm>
8 #include <utility>
9
10 #include "cast/streaming/constants.h"
11 #include "cast/streaming/packet_util.h"
12 #include "util/chrono_helpers.h"
13 #include "util/osp_logging.h"
14 #include "util/saturate_cast.h"
15 #include "util/stringprintf.h"
16
17 namespace openscreen {
18 namespace cast {
19
SenderPacketRouter(Environment * environment,int max_burst_bitrate)20 SenderPacketRouter::SenderPacketRouter(Environment* environment,
21 int max_burst_bitrate)
22 : SenderPacketRouter(
23 environment,
24 ComputeMaxPacketsPerBurst(max_burst_bitrate,
25 environment->GetMaxPacketSize(),
26 kDefaultBurstInterval),
27 kDefaultBurstInterval) {}
28
SenderPacketRouter(Environment * environment,int max_packets_per_burst,milliseconds burst_interval)29 SenderPacketRouter::SenderPacketRouter(Environment* environment,
30 int max_packets_per_burst,
31 milliseconds burst_interval)
32 : BandwidthEstimator(max_packets_per_burst,
33 burst_interval,
34 environment->now()),
35 environment_(environment),
36 packet_buffer_size_(environment->GetMaxPacketSize()),
37 packet_buffer_(new uint8_t[packet_buffer_size_]),
38 max_packets_per_burst_(max_packets_per_burst),
39 burst_interval_(burst_interval),
40 max_burst_bitrate_(ComputeMaxBurstBitrate(packet_buffer_size_,
41 max_packets_per_burst_,
42 burst_interval_)),
43 alarm_(environment_->now_function(), environment_->task_runner()) {
44 OSP_DCHECK(environment_);
45 OSP_DCHECK_GT(packet_buffer_size_, kRequiredNetworkPacketSize);
46 }
47
~SenderPacketRouter()48 SenderPacketRouter::~SenderPacketRouter() {
49 OSP_DCHECK(senders_.empty());
50 }
51
OnSenderCreated(Ssrc receiver_ssrc,Sender * sender)52 void SenderPacketRouter::OnSenderCreated(Ssrc receiver_ssrc, Sender* sender) {
53 OSP_DCHECK(FindEntry(receiver_ssrc) == senders_.end());
54 senders_.push_back(SenderEntry{receiver_ssrc, sender, kNever, kNever});
55
56 if (senders_.size() == 1) {
57 environment_->ConsumeIncomingPackets(this);
58 } else {
59 // Sort the list of Senders so that they are iterated in priority order.
60 std::sort(senders_.begin(), senders_.end());
61 }
62 }
63
OnSenderDestroyed(Ssrc receiver_ssrc)64 void SenderPacketRouter::OnSenderDestroyed(Ssrc receiver_ssrc) {
65 const auto it = FindEntry(receiver_ssrc);
66 OSP_DCHECK(it != senders_.end());
67 senders_.erase(it);
68
69 // If there are no longer any Senders, suspend receiving RTCP packets.
70 if (senders_.empty()) {
71 environment_->DropIncomingPackets();
72 }
73 }
74
RequestRtcpSend(Ssrc receiver_ssrc)75 void SenderPacketRouter::RequestRtcpSend(Ssrc receiver_ssrc) {
76 const auto it = FindEntry(receiver_ssrc);
77 OSP_DCHECK(it != senders_.end());
78 it->next_rtcp_send_time = Alarm::kImmediately;
79 ScheduleNextBurst();
80 }
81
RequestRtpSend(Ssrc receiver_ssrc)82 void SenderPacketRouter::RequestRtpSend(Ssrc receiver_ssrc) {
83 const auto it = FindEntry(receiver_ssrc);
84 OSP_DCHECK(it != senders_.end());
85 it->next_rtp_send_time = Alarm::kImmediately;
86 ScheduleNextBurst();
87 }
88
OnReceivedPacket(const IPEndpoint & source,Clock::time_point arrival_time,std::vector<uint8_t> packet)89 void SenderPacketRouter::OnReceivedPacket(const IPEndpoint& source,
90 Clock::time_point arrival_time,
91 std::vector<uint8_t> packet) {
92 // If the packet did not come from the expected endpoint, ignore it.
93 OSP_DCHECK_NE(source.port, uint16_t{0});
94 if (source != environment_->remote_endpoint()) {
95 return;
96 }
97
98 // Determine which Sender to dispatch the packet to. Senders may only receive
99 // RTCP packets from Receivers. Log a warning containing a pretty-printed dump
100 // if the packet is not an RTCP packet.
101 const std::pair<ApparentPacketType, Ssrc> seems_like =
102 InspectPacketForRouting(packet);
103 if (seems_like.first != ApparentPacketType::RTCP) {
104 constexpr int kMaxPartiaHexDumpSize = 96;
105 OSP_LOG_WARN << "UNKNOWN packet of " << packet.size()
106 << " bytes. Partial hex dump: "
107 << HexEncode(absl::Span<const uint8_t>(packet).subspan(
108 0, kMaxPartiaHexDumpSize));
109 return;
110 }
111 const auto it = FindEntry(seems_like.second);
112 if (it != senders_.end()) {
113 it->sender->OnReceivedRtcpPacket(arrival_time, std::move(packet));
114 }
115 }
116
FindEntry(Ssrc receiver_ssrc)117 SenderPacketRouter::SenderEntries::iterator SenderPacketRouter::FindEntry(
118 Ssrc receiver_ssrc) {
119 return std::find_if(senders_.begin(), senders_.end(),
120 [receiver_ssrc](const SenderEntry& entry) {
121 return entry.receiver_ssrc == receiver_ssrc;
122 });
123 }
124
ScheduleNextBurst()125 void SenderPacketRouter::ScheduleNextBurst() {
126 // Determine the next burst time by scanning for the earliest of the
127 // next-scheduled send times for each Sender.
128 const Clock::time_point earliest_allowed_burst_time =
129 last_burst_time_ + burst_interval_;
130 Clock::time_point next_burst_time = kNever;
131 for (const SenderEntry& entry : senders_) {
132 const auto next_send_time =
133 std::min(entry.next_rtcp_send_time, entry.next_rtp_send_time);
134 if (next_send_time >= next_burst_time) {
135 continue;
136 }
137 if (next_send_time <= earliest_allowed_burst_time) {
138 next_burst_time = earliest_allowed_burst_time;
139 // No need to continue, since |next_burst_time| cannot become any earlier.
140 break;
141 }
142 next_burst_time = next_send_time;
143 }
144
145 // Schedule the alarm for the next burst time unless none of the Senders has
146 // anything to send.
147 if (next_burst_time == kNever) {
148 alarm_.Cancel();
149 } else {
150 alarm_.Schedule([this] { SendBurstOfPackets(); }, next_burst_time);
151 }
152 }
153
SendBurstOfPackets()154 void SenderPacketRouter::SendBurstOfPackets() {
155 // Treat RTCP packets as "critical priority," and so there is no upper limit
156 // on the number to send. Practically, this will always be limited by the
157 // number of Senders; so, this won't be a huge number of packets.
158 const Clock::time_point burst_time = environment_->now();
159 const int num_rtcp_packets_sent = SendJustTheRtcpPackets(burst_time);
160 // Now send all the RTP packets, up to the maximum number allowed in a burst.
161 // Higher priority Senders' RTP packets are sent first.
162 const int num_rtp_packets_sent = SendJustTheRtpPackets(
163 burst_time, max_packets_per_burst_ - num_rtcp_packets_sent);
164 last_burst_time_ = burst_time;
165
166 BandwidthEstimator::OnBurstComplete(
167 num_rtcp_packets_sent + num_rtp_packets_sent, burst_time);
168
169 ScheduleNextBurst();
170 }
171
SendJustTheRtcpPackets(Clock::time_point send_time)172 int SenderPacketRouter::SendJustTheRtcpPackets(Clock::time_point send_time) {
173 int num_sent = 0;
174 for (SenderEntry& entry : senders_) {
175 if (entry.next_rtcp_send_time > send_time) {
176 continue;
177 }
178
179 // Note: Only one RTCP packet is sent from the same Sender in the same
180 // burst. This is because RTCP packets are supposed to always contain the
181 // most up-to-date Sender state. Having multiple RTCP packets in the same
182 // burst would mean that all but the last one are old/irrelevant snapshots
183 // of Sender state, and this would just thrash/confuse the Receiver.
184 const absl::Span<uint8_t> packet =
185 entry.sender->GetRtcpPacketForImmediateSend(
186 send_time,
187 absl::Span<uint8_t>(packet_buffer_.get(), packet_buffer_size_));
188 if (!packet.empty()) {
189 environment_->SendPacket(packet);
190 entry.next_rtcp_send_time = send_time + kRtcpReportInterval;
191 ++num_sent;
192 }
193 }
194
195 return num_sent;
196 }
197
SendJustTheRtpPackets(Clock::time_point send_time,int num_packets_to_send)198 int SenderPacketRouter::SendJustTheRtpPackets(Clock::time_point send_time,
199 int num_packets_to_send) {
200 int num_sent = 0;
201 for (SenderEntry& entry : senders_) {
202 if (num_sent >= num_packets_to_send) {
203 break;
204 }
205 if (entry.next_rtp_send_time > send_time) {
206 continue;
207 }
208
209 for (; num_sent < num_packets_to_send; ++num_sent) {
210 const absl::Span<uint8_t> packet =
211 entry.sender->GetRtpPacketForImmediateSend(
212 send_time,
213 absl::Span<uint8_t>(packet_buffer_.get(), packet_buffer_size_));
214 if (packet.empty()) {
215 break;
216 }
217 environment_->SendPacket(packet);
218 }
219 entry.next_rtp_send_time = entry.sender->GetRtpResumeTime();
220 }
221
222 return num_sent;
223 }
224
225 namespace {
226 constexpr int kBitsPerByte = 8;
227 constexpr auto kOneSecondInMilliseconds = to_milliseconds(seconds(1));
228 } // namespace
229
230 // static
ComputeMaxPacketsPerBurst(int max_burst_bitrate,int packet_size,milliseconds burst_interval)231 int SenderPacketRouter::ComputeMaxPacketsPerBurst(int max_burst_bitrate,
232 int packet_size,
233 milliseconds burst_interval) {
234 OSP_DCHECK_GT(max_burst_bitrate, 0);
235 OSP_DCHECK_GT(packet_size, 0);
236 OSP_DCHECK_GT(burst_interval, milliseconds(0));
237 OSP_DCHECK_LE(burst_interval, kOneSecondInMilliseconds);
238
239 const int max_packets_per_second =
240 max_burst_bitrate / kBitsPerByte / packet_size;
241 const int bursts_per_second = kOneSecondInMilliseconds / burst_interval;
242 return std::max(max_packets_per_second / bursts_per_second, 1);
243 }
244
245 // static
ComputeMaxBurstBitrate(int packet_size,int max_packets_per_burst,milliseconds burst_interval)246 int SenderPacketRouter::ComputeMaxBurstBitrate(int packet_size,
247 int max_packets_per_burst,
248 milliseconds burst_interval) {
249 OSP_DCHECK_GT(packet_size, 0);
250 OSP_DCHECK_GT(max_packets_per_burst, 0);
251 OSP_DCHECK_GT(burst_interval, milliseconds(0));
252 OSP_DCHECK_LE(burst_interval, kOneSecondInMilliseconds);
253
254 const int64_t max_bits_per_burst =
255 int64_t{packet_size} * kBitsPerByte * max_packets_per_burst;
256 const int bursts_per_second = kOneSecondInMilliseconds / burst_interval;
257 return saturate_cast<int>(max_bits_per_burst * bursts_per_second);
258 }
259
260 SenderPacketRouter::Sender::~Sender() = default;
261
262 // static
263 constexpr int SenderPacketRouter::kDefaultMaxBurstBitrate;
264 // static
265 constexpr milliseconds SenderPacketRouter::kDefaultBurstInterval;
266 // static
267 constexpr Clock::time_point SenderPacketRouter::kNever;
268
269 } // namespace cast
270 } // namespace openscreen
271