1 /*
2 * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/pacing/round_robin_packet_queue.h"
12
13 #include <algorithm>
14 #include <cstdint>
15 #include <utility>
16
17 #include "absl/strings/match.h"
18 #include "rtc_base/checks.h"
19
20 namespace webrtc {
21 namespace {
22 static constexpr DataSize kMaxLeadingSize = DataSize::Bytes(1400);
23 }
24
25 RoundRobinPacketQueue::QueuedPacket::QueuedPacket(const QueuedPacket& rhs) =
26 default;
27 RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default;
28
QueuedPacket(int priority,Timestamp enqueue_time,uint64_t enqueue_order,std::multiset<Timestamp>::iterator enqueue_time_it,std::unique_ptr<RtpPacketToSend> packet)29 RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
30 int priority,
31 Timestamp enqueue_time,
32 uint64_t enqueue_order,
33 std::multiset<Timestamp>::iterator enqueue_time_it,
34 std::unique_ptr<RtpPacketToSend> packet)
35 : priority_(priority),
36 enqueue_time_(enqueue_time),
37 enqueue_order_(enqueue_order),
38 is_retransmission_(packet->packet_type() ==
39 RtpPacketMediaType::kRetransmission),
40 enqueue_time_it_(enqueue_time_it),
41 owned_packet_(packet.release()) {}
42
operator <(const RoundRobinPacketQueue::QueuedPacket & other) const43 bool RoundRobinPacketQueue::QueuedPacket::operator<(
44 const RoundRobinPacketQueue::QueuedPacket& other) const {
45 if (priority_ != other.priority_)
46 return priority_ > other.priority_;
47 if (is_retransmission_ != other.is_retransmission_)
48 return other.is_retransmission_;
49
50 return enqueue_order_ > other.enqueue_order_;
51 }
52
Priority() const53 int RoundRobinPacketQueue::QueuedPacket::Priority() const {
54 return priority_;
55 }
56
Type() const57 RtpPacketMediaType RoundRobinPacketQueue::QueuedPacket::Type() const {
58 return *owned_packet_->packet_type();
59 }
60
Ssrc() const61 uint32_t RoundRobinPacketQueue::QueuedPacket::Ssrc() const {
62 return owned_packet_->Ssrc();
63 }
64
EnqueueTime() const65 Timestamp RoundRobinPacketQueue::QueuedPacket::EnqueueTime() const {
66 return enqueue_time_;
67 }
68
IsRetransmission() const69 bool RoundRobinPacketQueue::QueuedPacket::IsRetransmission() const {
70 return Type() == RtpPacketMediaType::kRetransmission;
71 }
72
EnqueueOrder() const73 uint64_t RoundRobinPacketQueue::QueuedPacket::EnqueueOrder() const {
74 return enqueue_order_;
75 }
76
RtpPacket() const77 RtpPacketToSend* RoundRobinPacketQueue::QueuedPacket::RtpPacket() const {
78 return owned_packet_;
79 }
80
UpdateEnqueueTimeIterator(std::multiset<Timestamp>::iterator it)81 void RoundRobinPacketQueue::QueuedPacket::UpdateEnqueueTimeIterator(
82 std::multiset<Timestamp>::iterator it) {
83 enqueue_time_it_ = it;
84 }
85
86 std::multiset<Timestamp>::iterator
EnqueueTimeIterator() const87 RoundRobinPacketQueue::QueuedPacket::EnqueueTimeIterator() const {
88 return enqueue_time_it_;
89 }
90
SubtractPauseTime(TimeDelta pause_time_sum)91 void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTime(
92 TimeDelta pause_time_sum) {
93 enqueue_time_ -= pause_time_sum;
94 }
95
96 RoundRobinPacketQueue::PriorityPacketQueue::const_iterator
begin() const97 RoundRobinPacketQueue::PriorityPacketQueue::begin() const {
98 return c.begin();
99 }
100
101 RoundRobinPacketQueue::PriorityPacketQueue::const_iterator
end() const102 RoundRobinPacketQueue::PriorityPacketQueue::end() const {
103 return c.end();
104 }
105
Stream()106 RoundRobinPacketQueue::Stream::Stream() : size(DataSize::Zero()), ssrc(0) {}
107 RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default;
108 RoundRobinPacketQueue::Stream::~Stream() = default;
109
IsEnabled(const WebRtcKeyValueConfig * field_trials,const char * name)110 bool IsEnabled(const WebRtcKeyValueConfig* field_trials, const char* name) {
111 if (!field_trials) {
112 return false;
113 }
114 return absl::StartsWith(field_trials->Lookup(name), "Enabled");
115 }
116
RoundRobinPacketQueue(Timestamp start_time,const WebRtcKeyValueConfig * field_trials)117 RoundRobinPacketQueue::RoundRobinPacketQueue(
118 Timestamp start_time,
119 const WebRtcKeyValueConfig* field_trials)
120 : transport_overhead_per_packet_(DataSize::Zero()),
121 time_last_updated_(start_time),
122 paused_(false),
123 size_packets_(0),
124 size_(DataSize::Zero()),
125 max_size_(kMaxLeadingSize),
126 queue_time_sum_(TimeDelta::Zero()),
127 pause_time_sum_(TimeDelta::Zero()),
128 include_overhead_(false) {}
129
~RoundRobinPacketQueue()130 RoundRobinPacketQueue::~RoundRobinPacketQueue() {
131 // Make sure to release any packets owned by raw pointer in QueuedPacket.
132 while (!Empty()) {
133 Pop();
134 }
135 }
136
Push(int priority,Timestamp enqueue_time,uint64_t enqueue_order,std::unique_ptr<RtpPacketToSend> packet)137 void RoundRobinPacketQueue::Push(int priority,
138 Timestamp enqueue_time,
139 uint64_t enqueue_order,
140 std::unique_ptr<RtpPacketToSend> packet) {
141 RTC_DCHECK(packet->packet_type().has_value());
142 if (size_packets_ == 0) {
143 // Single packet fast-path.
144 single_packet_queue_.emplace(
145 QueuedPacket(priority, enqueue_time, enqueue_order,
146 enqueue_times_.end(), std::move(packet)));
147 UpdateQueueTime(enqueue_time);
148 single_packet_queue_->SubtractPauseTime(pause_time_sum_);
149 size_packets_ = 1;
150 size_ += PacketSize(*single_packet_queue_);
151 } else {
152 MaybePromoteSinglePacketToNormalQueue();
153 Push(QueuedPacket(priority, enqueue_time, enqueue_order,
154 enqueue_times_.insert(enqueue_time), std::move(packet)));
155 }
156 }
157
Pop()158 std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
159 if (single_packet_queue_.has_value()) {
160 RTC_DCHECK(stream_priorities_.empty());
161 std::unique_ptr<RtpPacketToSend> rtp_packet(
162 single_packet_queue_->RtpPacket());
163 single_packet_queue_.reset();
164 queue_time_sum_ = TimeDelta::Zero();
165 size_packets_ = 0;
166 size_ = DataSize::Zero();
167 return rtp_packet;
168 }
169
170 RTC_DCHECK(!Empty());
171 Stream* stream = GetHighestPriorityStream();
172 const QueuedPacket& queued_packet = stream->packet_queue.top();
173
174 stream_priorities_.erase(stream->priority_it);
175
176 // Calculate the total amount of time spent by this packet in the queue
177 // while in a non-paused state. Note that the |pause_time_sum_ms_| was
178 // subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
179 // by subtracting it now we effectively remove the time spent in in the
180 // queue while in a paused state.
181 TimeDelta time_in_non_paused_state =
182 time_last_updated_ - queued_packet.EnqueueTime() - pause_time_sum_;
183 queue_time_sum_ -= time_in_non_paused_state;
184
185 RTC_CHECK(queued_packet.EnqueueTimeIterator() != enqueue_times_.end());
186 enqueue_times_.erase(queued_packet.EnqueueTimeIterator());
187
188 // Update |bytes| of this stream. The general idea is that the stream that
189 // has sent the least amount of bytes should have the highest priority.
190 // The problem with that is if streams send with different rates, in which
191 // case a "budget" will be built up for the stream sending at the lower
192 // rate. To avoid building a too large budget we limit |bytes| to be within
193 // kMaxLeading bytes of the stream that has sent the most amount of bytes.
194 DataSize packet_size = PacketSize(queued_packet);
195 stream->size =
196 std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize);
197 max_size_ = std::max(max_size_, stream->size);
198
199 size_ -= packet_size;
200 size_packets_ -= 1;
201 RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
202
203 std::unique_ptr<RtpPacketToSend> rtp_packet(queued_packet.RtpPacket());
204 stream->packet_queue.pop();
205
206 // If there are packets left to be sent, schedule the stream again.
207 RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
208 if (stream->packet_queue.empty()) {
209 stream->priority_it = stream_priorities_.end();
210 } else {
211 int priority = stream->packet_queue.top().Priority();
212 stream->priority_it = stream_priorities_.emplace(
213 StreamPrioKey(priority, stream->size), stream->ssrc);
214 }
215
216 return rtp_packet;
217 }
218
Empty() const219 bool RoundRobinPacketQueue::Empty() const {
220 if (size_packets_ == 0) {
221 RTC_DCHECK(!single_packet_queue_.has_value() && stream_priorities_.empty());
222 return true;
223 }
224 RTC_DCHECK(single_packet_queue_.has_value() || !stream_priorities_.empty());
225 return false;
226 }
227
SizeInPackets() const228 size_t RoundRobinPacketQueue::SizeInPackets() const {
229 return size_packets_;
230 }
231
Size() const232 DataSize RoundRobinPacketQueue::Size() const {
233 return size_;
234 }
235
LeadingAudioPacketEnqueueTime() const236 absl::optional<Timestamp> RoundRobinPacketQueue::LeadingAudioPacketEnqueueTime()
237 const {
238 if (single_packet_queue_.has_value()) {
239 if (single_packet_queue_->Type() == RtpPacketMediaType::kAudio) {
240 return single_packet_queue_->EnqueueTime();
241 }
242 return absl::nullopt;
243 }
244
245 if (stream_priorities_.empty()) {
246 return absl::nullopt;
247 }
248 uint32_t ssrc = stream_priorities_.begin()->second;
249
250 const auto& top_packet = streams_.find(ssrc)->second.packet_queue.top();
251 if (top_packet.Type() == RtpPacketMediaType::kAudio) {
252 return top_packet.EnqueueTime();
253 }
254 return absl::nullopt;
255 }
256
OldestEnqueueTime() const257 Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const {
258 if (single_packet_queue_.has_value()) {
259 return single_packet_queue_->EnqueueTime();
260 }
261
262 if (Empty())
263 return Timestamp::MinusInfinity();
264 RTC_CHECK(!enqueue_times_.empty());
265 return *enqueue_times_.begin();
266 }
267
UpdateQueueTime(Timestamp now)268 void RoundRobinPacketQueue::UpdateQueueTime(Timestamp now) {
269 RTC_CHECK_GE(now, time_last_updated_);
270 if (now == time_last_updated_)
271 return;
272
273 TimeDelta delta = now - time_last_updated_;
274
275 if (paused_) {
276 pause_time_sum_ += delta;
277 } else {
278 queue_time_sum_ += TimeDelta::Micros(delta.us() * size_packets_);
279 }
280
281 time_last_updated_ = now;
282 }
283
SetPauseState(bool paused,Timestamp now)284 void RoundRobinPacketQueue::SetPauseState(bool paused, Timestamp now) {
285 if (paused_ == paused)
286 return;
287 UpdateQueueTime(now);
288 paused_ = paused;
289 }
290
SetIncludeOverhead()291 void RoundRobinPacketQueue::SetIncludeOverhead() {
292 MaybePromoteSinglePacketToNormalQueue();
293 include_overhead_ = true;
294 // We need to update the size to reflect overhead for existing packets.
295 for (const auto& stream : streams_) {
296 for (const QueuedPacket& packet : stream.second.packet_queue) {
297 size_ += DataSize::Bytes(packet.RtpPacket()->headers_size()) +
298 transport_overhead_per_packet_;
299 }
300 }
301 }
302
SetTransportOverhead(DataSize overhead_per_packet)303 void RoundRobinPacketQueue::SetTransportOverhead(DataSize overhead_per_packet) {
304 MaybePromoteSinglePacketToNormalQueue();
305 if (include_overhead_) {
306 DataSize previous_overhead = transport_overhead_per_packet_;
307 // We need to update the size to reflect overhead for existing packets.
308 for (const auto& stream : streams_) {
309 int packets = stream.second.packet_queue.size();
310 size_ -= packets * previous_overhead;
311 size_ += packets * overhead_per_packet;
312 }
313 }
314 transport_overhead_per_packet_ = overhead_per_packet;
315 }
316
AverageQueueTime() const317 TimeDelta RoundRobinPacketQueue::AverageQueueTime() const {
318 if (Empty())
319 return TimeDelta::Zero();
320 return queue_time_sum_ / size_packets_;
321 }
322
Push(QueuedPacket packet)323 void RoundRobinPacketQueue::Push(QueuedPacket packet) {
324 auto stream_info_it = streams_.find(packet.Ssrc());
325 if (stream_info_it == streams_.end()) {
326 stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first;
327 stream_info_it->second.priority_it = stream_priorities_.end();
328 stream_info_it->second.ssrc = packet.Ssrc();
329 }
330
331 Stream* stream = &stream_info_it->second;
332
333 if (stream->priority_it == stream_priorities_.end()) {
334 // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
335 RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
336 stream->priority_it = stream_priorities_.emplace(
337 StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
338 } else if (packet.Priority() < stream->priority_it->first.priority) {
339 // If the priority of this SSRC increased, remove the outdated StreamPrioKey
340 // and insert a new one with the new priority. Note that |priority_| uses
341 // lower ordinal for higher priority.
342 stream_priorities_.erase(stream->priority_it);
343 stream->priority_it = stream_priorities_.emplace(
344 StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
345 }
346 RTC_CHECK(stream->priority_it != stream_priorities_.end());
347
348 if (packet.EnqueueTimeIterator() == enqueue_times_.end()) {
349 // Promotion from single-packet queue. Just add to enqueue times.
350 packet.UpdateEnqueueTimeIterator(
351 enqueue_times_.insert(packet.EnqueueTime()));
352 } else {
353 // In order to figure out how much time a packet has spent in the queue
354 // while not in a paused state, we subtract the total amount of time the
355 // queue has been paused so far, and when the packet is popped we subtract
356 // the total amount of time the queue has been paused at that moment. This
357 // way we subtract the total amount of time the packet has spent in the
358 // queue while in a paused state.
359 UpdateQueueTime(packet.EnqueueTime());
360 packet.SubtractPauseTime(pause_time_sum_);
361
362 size_packets_ += 1;
363 size_ += PacketSize(packet);
364 }
365
366 stream->packet_queue.push(packet);
367 }
368
PacketSize(const QueuedPacket & packet) const369 DataSize RoundRobinPacketQueue::PacketSize(const QueuedPacket& packet) const {
370 DataSize packet_size = DataSize::Bytes(packet.RtpPacket()->payload_size() +
371 packet.RtpPacket()->padding_size());
372 if (include_overhead_) {
373 packet_size += DataSize::Bytes(packet.RtpPacket()->headers_size()) +
374 transport_overhead_per_packet_;
375 }
376 return packet_size;
377 }
378
MaybePromoteSinglePacketToNormalQueue()379 void RoundRobinPacketQueue::MaybePromoteSinglePacketToNormalQueue() {
380 if (single_packet_queue_.has_value()) {
381 Push(*single_packet_queue_);
382 single_packet_queue_.reset();
383 }
384 }
385
386 RoundRobinPacketQueue::Stream*
GetHighestPriorityStream()387 RoundRobinPacketQueue::GetHighestPriorityStream() {
388 RTC_CHECK(!stream_priorities_.empty());
389 uint32_t ssrc = stream_priorities_.begin()->second;
390
391 auto stream_info_it = streams_.find(ssrc);
392 RTC_CHECK(stream_info_it != streams_.end());
393 RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin());
394 RTC_CHECK(!stream_info_it->second.packet_queue.empty());
395 return &stream_info_it->second;
396 }
397
IsSsrcScheduled(uint32_t ssrc) const398 bool RoundRobinPacketQueue::IsSsrcScheduled(uint32_t ssrc) const {
399 for (const auto& scheduled_stream : stream_priorities_) {
400 if (scheduled_stream.second == ssrc)
401 return true;
402 }
403 return false;
404 }
405
406 } // namespace webrtc
407