1 /*
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/modules/pacing/include/paced_sender.h"
12
13 #include <assert.h>
14
15 #include "webrtc/modules/interface/module_common_types.h"
16 #include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
17 #include "webrtc/system_wrappers/interface/trace_event.h"
18
19 namespace {
20 // Time limit in milliseconds between packet bursts.
21 const int kMinPacketLimitMs = 5;
22
23 // Upper cap on process interval, in case process has not been called in a long
24 // time.
25 const int kMaxIntervalTimeMs = 30;
26
27 // Max time that the first packet in the queue can sit in the queue if no
28 // packets are sent, regardless of buffer state. In practice only in effect at
29 // low bitrates (less than 320 kbits/s).
30 const int kMaxQueueTimeWithoutSendingMs = 30;
31
32 } // namespace
33
34 namespace webrtc {
35
36 namespace paced_sender {
37 struct Packet {
Packetwebrtc::paced_sender::Packet38 Packet(uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms,
39 int64_t enqueue_time_ms, int length_in_bytes, bool retransmission)
40 : ssrc_(ssrc),
41 sequence_number_(seq_number),
42 capture_time_ms_(capture_time_ms),
43 enqueue_time_ms_(enqueue_time_ms),
44 bytes_(length_in_bytes),
45 retransmission_(retransmission) {
46 }
47 uint32_t ssrc_;
48 uint16_t sequence_number_;
49 int64_t capture_time_ms_;
50 int64_t enqueue_time_ms_;
51 int bytes_;
52 bool retransmission_;
53 };
54
55 // STL list style class which prevents duplicates in the list.
56 class PacketList {
57 public:
PacketList()58 PacketList() {};
59
empty() const60 bool empty() const {
61 return packet_list_.empty();
62 }
63
front() const64 Packet front() const {
65 return packet_list_.front();
66 }
67
pop_front()68 void pop_front() {
69 Packet& packet = packet_list_.front();
70 uint16_t sequence_number = packet.sequence_number_;
71 packet_list_.pop_front();
72 sequence_number_set_.erase(sequence_number);
73 }
74
push_back(const Packet & packet)75 void push_back(const Packet& packet) {
76 if (sequence_number_set_.find(packet.sequence_number_) ==
77 sequence_number_set_.end()) {
78 // Don't insert duplicates.
79 packet_list_.push_back(packet);
80 sequence_number_set_.insert(packet.sequence_number_);
81 }
82 }
83
84 private:
85 std::list<Packet> packet_list_;
86 std::set<uint16_t> sequence_number_set_;
87 };
88
89 class IntervalBudget {
90 public:
IntervalBudget(int initial_target_rate_kbps)91 explicit IntervalBudget(int initial_target_rate_kbps)
92 : target_rate_kbps_(initial_target_rate_kbps),
93 bytes_remaining_(0) {}
94
set_target_rate_kbps(int target_rate_kbps)95 void set_target_rate_kbps(int target_rate_kbps) {
96 target_rate_kbps_ = target_rate_kbps;
97 }
98
IncreaseBudget(int delta_time_ms)99 void IncreaseBudget(int delta_time_ms) {
100 int bytes = target_rate_kbps_ * delta_time_ms / 8;
101 if (bytes_remaining_ < 0) {
102 // We overused last interval, compensate this interval.
103 bytes_remaining_ = bytes_remaining_ + bytes;
104 } else {
105 // If we underused last interval we can't use it this interval.
106 bytes_remaining_ = bytes;
107 }
108 }
109
UseBudget(int bytes)110 void UseBudget(int bytes) {
111 bytes_remaining_ = std::max(bytes_remaining_ - bytes,
112 -500 * target_rate_kbps_ / 8);
113 }
114
bytes_remaining() const115 int bytes_remaining() const { return bytes_remaining_; }
116
117 private:
118 int target_rate_kbps_;
119 int bytes_remaining_;
120 };
121 } // namespace paced_sender
122
PacedSender(Callback * callback,int max_bitrate_kbps,int min_bitrate_kbps)123 PacedSender::PacedSender(Callback* callback,
124 int max_bitrate_kbps,
125 int min_bitrate_kbps)
126 : callback_(callback),
127 enabled_(true),
128 paused_(false),
129 max_queue_length_ms_(kDefaultMaxQueueLengthMs),
130 critsect_(CriticalSectionWrapper::CreateCriticalSection()),
131 media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
132 padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
133 time_last_update_(TickTime::Now()),
134 capture_time_ms_last_queued_(0),
135 capture_time_ms_last_sent_(0),
136 high_priority_packets_(new paced_sender::PacketList),
137 normal_priority_packets_(new paced_sender::PacketList),
138 low_priority_packets_(new paced_sender::PacketList) {
139 UpdateBytesPerInterval(kMinPacketLimitMs);
140 }
141
~PacedSender()142 PacedSender::~PacedSender() {
143 }
144
Pause()145 void PacedSender::Pause() {
146 CriticalSectionScoped cs(critsect_.get());
147 paused_ = true;
148 }
149
Resume()150 void PacedSender::Resume() {
151 CriticalSectionScoped cs(critsect_.get());
152 paused_ = false;
153 }
154
SetStatus(bool enable)155 void PacedSender::SetStatus(bool enable) {
156 CriticalSectionScoped cs(critsect_.get());
157 enabled_ = enable;
158 }
159
Enabled() const160 bool PacedSender::Enabled() const {
161 CriticalSectionScoped cs(critsect_.get());
162 return enabled_;
163 }
164
UpdateBitrate(int max_bitrate_kbps,int min_bitrate_kbps)165 void PacedSender::UpdateBitrate(int max_bitrate_kbps,
166 int min_bitrate_kbps) {
167 CriticalSectionScoped cs(critsect_.get());
168 media_budget_->set_target_rate_kbps(max_bitrate_kbps);
169 padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
170 }
171
SendPacket(Priority priority,uint32_t ssrc,uint16_t sequence_number,int64_t capture_time_ms,int bytes,bool retransmission)172 bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
173 uint16_t sequence_number, int64_t capture_time_ms, int bytes,
174 bool retransmission) {
175 CriticalSectionScoped cs(critsect_.get());
176
177 if (!enabled_) {
178 return true; // We can send now.
179 }
180 if (capture_time_ms < 0) {
181 capture_time_ms = TickTime::MillisecondTimestamp();
182 }
183 if (priority != kHighPriority &&
184 capture_time_ms > capture_time_ms_last_queued_) {
185 capture_time_ms_last_queued_ = capture_time_ms;
186 TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
187 "capture_time_ms", capture_time_ms);
188 }
189 paced_sender::PacketList* packet_list = NULL;
190 switch (priority) {
191 case kHighPriority:
192 packet_list = high_priority_packets_.get();
193 break;
194 case kNormalPriority:
195 packet_list = normal_priority_packets_.get();
196 break;
197 case kLowPriority:
198 packet_list = low_priority_packets_.get();
199 break;
200 }
201 packet_list->push_back(paced_sender::Packet(ssrc,
202 sequence_number,
203 capture_time_ms,
204 TickTime::MillisecondTimestamp(),
205 bytes,
206 retransmission));
207 return false;
208 }
209
set_max_queue_length_ms(int max_queue_length_ms)210 void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
211 CriticalSectionScoped cs(critsect_.get());
212 max_queue_length_ms_ = max_queue_length_ms;
213 }
214
QueueInMs() const215 int PacedSender::QueueInMs() const {
216 CriticalSectionScoped cs(critsect_.get());
217 int64_t now_ms = TickTime::MillisecondTimestamp();
218 int64_t oldest_packet_enqueue_time = now_ms;
219 if (!high_priority_packets_->empty()) {
220 oldest_packet_enqueue_time = std::min(
221 oldest_packet_enqueue_time,
222 high_priority_packets_->front().enqueue_time_ms_);
223 }
224 if (!normal_priority_packets_->empty()) {
225 oldest_packet_enqueue_time = std::min(
226 oldest_packet_enqueue_time,
227 normal_priority_packets_->front().enqueue_time_ms_);
228 }
229 if (!low_priority_packets_->empty()) {
230 oldest_packet_enqueue_time = std::min(
231 oldest_packet_enqueue_time,
232 low_priority_packets_->front().enqueue_time_ms_);
233 }
234 return now_ms - oldest_packet_enqueue_time;
235 }
236
TimeUntilNextProcess()237 int32_t PacedSender::TimeUntilNextProcess() {
238 CriticalSectionScoped cs(critsect_.get());
239 int64_t elapsed_time_ms =
240 (TickTime::Now() - time_last_update_).Milliseconds();
241 if (elapsed_time_ms <= 0) {
242 return kMinPacketLimitMs;
243 }
244 if (elapsed_time_ms >= kMinPacketLimitMs) {
245 return 0;
246 }
247 return kMinPacketLimitMs - elapsed_time_ms;
248 }
249
Process()250 int32_t PacedSender::Process() {
251 TickTime now = TickTime::Now();
252 CriticalSectionScoped cs(critsect_.get());
253 int elapsed_time_ms = (now - time_last_update_).Milliseconds();
254 time_last_update_ = now;
255 if (!enabled_) {
256 return 0;
257 }
258 if (!paused_) {
259 if (elapsed_time_ms > 0) {
260 uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
261 UpdateBytesPerInterval(delta_time_ms);
262 }
263 paced_sender::PacketList* packet_list;
264 while (ShouldSendNextPacket(&packet_list)) {
265 if (!SendPacketFromList(packet_list))
266 return 0;
267 }
268 if (high_priority_packets_->empty() &&
269 normal_priority_packets_->empty() &&
270 low_priority_packets_->empty() &&
271 padding_budget_->bytes_remaining() > 0) {
272 int padding_needed = padding_budget_->bytes_remaining();
273 critsect_->Leave();
274 int bytes_sent = callback_->TimeToSendPadding(padding_needed);
275 critsect_->Enter();
276 media_budget_->UseBudget(bytes_sent);
277 padding_budget_->UseBudget(bytes_sent);
278 }
279 }
280 return 0;
281 }
282
283 // MUST have critsect_ when calling.
SendPacketFromList(paced_sender::PacketList * packet_list)284 bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list)
285 EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) {
286 paced_sender::Packet packet = GetNextPacketFromList(packet_list);
287 critsect_->Leave();
288
289 const bool success = callback_->TimeToSendPacket(packet.ssrc_,
290 packet.sequence_number_,
291 packet.capture_time_ms_,
292 packet.retransmission_);
293 critsect_->Enter();
294 // If packet cannot be sent then keep it in packet list and exit early.
295 // There's no need to send more packets.
296 if (!success) {
297 return false;
298 }
299 packet_list->pop_front();
300 const bool last_packet = packet_list->empty() ||
301 packet_list->front().capture_time_ms_ > packet.capture_time_ms_;
302 if (packet_list != high_priority_packets_.get()) {
303 if (packet.capture_time_ms_ > capture_time_ms_last_sent_) {
304 capture_time_ms_last_sent_ = packet.capture_time_ms_;
305 } else if (packet.capture_time_ms_ == capture_time_ms_last_sent_ &&
306 last_packet) {
307 TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend",
308 packet.capture_time_ms_);
309 }
310 }
311 return true;
312 }
313
314 // MUST have critsect_ when calling.
UpdateBytesPerInterval(uint32_t delta_time_ms)315 void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
316 media_budget_->IncreaseBudget(delta_time_ms);
317 padding_budget_->IncreaseBudget(delta_time_ms);
318 }
319
320 // MUST have critsect_ when calling.
ShouldSendNextPacket(paced_sender::PacketList ** packet_list)321 bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
322 *packet_list = NULL;
323 if (media_budget_->bytes_remaining() <= 0) {
324 // All bytes consumed for this interval.
325 // Check if we have not sent in a too long time.
326 if ((TickTime::Now() - time_last_send_).Milliseconds() >
327 kMaxQueueTimeWithoutSendingMs) {
328 if (!high_priority_packets_->empty()) {
329 *packet_list = high_priority_packets_.get();
330 return true;
331 }
332 if (!normal_priority_packets_->empty()) {
333 *packet_list = normal_priority_packets_.get();
334 return true;
335 }
336 }
337 // Send any old packets to avoid queuing for too long.
338 if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) {
339 int64_t high_priority_capture_time = -1;
340 if (!high_priority_packets_->empty()) {
341 high_priority_capture_time =
342 high_priority_packets_->front().capture_time_ms_;
343 *packet_list = high_priority_packets_.get();
344 }
345 if (!normal_priority_packets_->empty() &&
346 (high_priority_capture_time == -1 || high_priority_capture_time >
347 normal_priority_packets_->front().capture_time_ms_)) {
348 *packet_list = normal_priority_packets_.get();
349 }
350 if (*packet_list)
351 return true;
352 }
353 return false;
354 }
355 if (!high_priority_packets_->empty()) {
356 *packet_list = high_priority_packets_.get();
357 return true;
358 }
359 if (!normal_priority_packets_->empty()) {
360 *packet_list = normal_priority_packets_.get();
361 return true;
362 }
363 if (!low_priority_packets_->empty()) {
364 *packet_list = low_priority_packets_.get();
365 return true;
366 }
367 return false;
368 }
369
GetNextPacketFromList(paced_sender::PacketList * packets)370 paced_sender::Packet PacedSender::GetNextPacketFromList(
371 paced_sender::PacketList* packets) {
372 paced_sender::Packet packet = packets->front();
373 UpdateMediaBytesSent(packet.bytes_);
374 return packet;
375 }
376
377 // MUST have critsect_ when calling.
UpdateMediaBytesSent(int num_bytes)378 void PacedSender::UpdateMediaBytesSent(int num_bytes) {
379 time_last_send_ = TickTime::Now();
380 media_budget_->UseBudget(num_bytes);
381 padding_budget_->UseBudget(num_bytes);
382 }
383
384 } // namespace webrtc
385