1 /*
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/modules/pacing/include/paced_sender.h"
12
13 #include <assert.h>
14
15 #include <map>
16 #include <set>
17
18 #include "webrtc/modules/interface/module_common_types.h"
19 #include "webrtc/system_wrappers/interface/clock.h"
20 #include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
21 #include "webrtc/system_wrappers/interface/trace_event.h"
22
23 namespace {
24 // Time limit in milliseconds between packet bursts.
25 const int kMinPacketLimitMs = 5;
26
27 // Upper cap on process interval, in case process has not been called in a long
28 // time.
29 const int kMaxIntervalTimeMs = 30;
30
31 // Max time that the first packet in the queue can sit in the queue if no
32 // packets are sent, regardless of buffer state. In practice only in effect at
33 // low bitrates (less than 320 kbits/s).
34 const int kMaxQueueTimeWithoutSendingUs = 30000;
35
36 } // namespace
37
38 namespace webrtc {
39 namespace paced_sender {
40 struct Packet {
Packetwebrtc::paced_sender::Packet41 Packet(uint32_t ssrc,
42 uint16_t seq_number,
43 int64_t capture_time_ms,
44 int64_t enqueue_time_ms,
45 int length_in_bytes,
46 bool retransmission)
47 : ssrc(ssrc),
48 sequence_number(seq_number),
49 capture_time_ms(capture_time_ms),
50 enqueue_time_ms(enqueue_time_ms),
51 bytes(length_in_bytes),
52 retransmission(retransmission) {}
53 uint32_t ssrc;
54 uint16_t sequence_number;
55 int64_t capture_time_ms;
56 int64_t enqueue_time_ms;
57 int bytes;
58 bool retransmission;
59 };
60
61 // STL list style class which prevents duplicates in the list.
62 class PacketList {
63 public:
PacketList()64 PacketList() {};
65
empty() const66 bool empty() const {
67 return packet_list_.empty();
68 }
69
front() const70 Packet front() const {
71 return packet_list_.front();
72 }
73
pop_front()74 void pop_front() {
75 Packet& packet = packet_list_.front();
76 uint16_t sequence_number = packet.sequence_number;
77 uint32_t ssrc = packet.ssrc;
78 packet_list_.pop_front();
79 sequence_number_set_[ssrc].erase(sequence_number);
80 }
81
push_back(const Packet & packet)82 void push_back(const Packet& packet) {
83 if (sequence_number_set_[packet.ssrc].find(packet.sequence_number) ==
84 sequence_number_set_[packet.ssrc].end()) {
85 // Don't insert duplicates.
86 packet_list_.push_back(packet);
87 sequence_number_set_[packet.ssrc].insert(packet.sequence_number);
88 }
89 }
90
91 private:
92 std::list<Packet> packet_list_;
93 std::map<uint32_t, std::set<uint16_t> > sequence_number_set_;
94 };
95
96 class IntervalBudget {
97 public:
IntervalBudget(int initial_target_rate_kbps)98 explicit IntervalBudget(int initial_target_rate_kbps)
99 : target_rate_kbps_(initial_target_rate_kbps),
100 bytes_remaining_(0) {}
101
set_target_rate_kbps(int target_rate_kbps)102 void set_target_rate_kbps(int target_rate_kbps) {
103 target_rate_kbps_ = target_rate_kbps;
104 }
105
IncreaseBudget(int delta_time_ms)106 void IncreaseBudget(int delta_time_ms) {
107 int bytes = target_rate_kbps_ * delta_time_ms / 8;
108 if (bytes_remaining_ < 0) {
109 // We overused last interval, compensate this interval.
110 bytes_remaining_ = bytes_remaining_ + bytes;
111 } else {
112 // If we underused last interval we can't use it this interval.
113 bytes_remaining_ = bytes;
114 }
115 }
116
UseBudget(int bytes)117 void UseBudget(int bytes) {
118 bytes_remaining_ = std::max(bytes_remaining_ - bytes,
119 -500 * target_rate_kbps_ / 8);
120 }
121
bytes_remaining() const122 int bytes_remaining() const { return bytes_remaining_; }
123
124 private:
125 int target_rate_kbps_;
126 int bytes_remaining_;
127 };
128 } // namespace paced_sender
129
130 const float PacedSender::kDefaultPaceMultiplier = 2.5f;
131
PacedSender(Clock * clock,Callback * callback,int max_bitrate_kbps,int min_bitrate_kbps)132 PacedSender::PacedSender(Clock* clock,
133 Callback* callback,
134 int max_bitrate_kbps,
135 int min_bitrate_kbps)
136 : clock_(clock),
137 callback_(callback),
138 critsect_(CriticalSectionWrapper::CreateCriticalSection()),
139 enabled_(true),
140 paused_(false),
141 max_queue_length_ms_(kDefaultMaxQueueLengthMs),
142 media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
143 padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
144 time_last_update_us_(clock->TimeInMicroseconds()),
145 capture_time_ms_last_queued_(0),
146 capture_time_ms_last_sent_(0),
147 high_priority_packets_(new paced_sender::PacketList),
148 normal_priority_packets_(new paced_sender::PacketList),
149 low_priority_packets_(new paced_sender::PacketList) {
150 UpdateBytesPerInterval(kMinPacketLimitMs);
151 }
152
~PacedSender()153 PacedSender::~PacedSender() {}
154
Pause()155 void PacedSender::Pause() {
156 CriticalSectionScoped cs(critsect_.get());
157 paused_ = true;
158 }
159
Resume()160 void PacedSender::Resume() {
161 CriticalSectionScoped cs(critsect_.get());
162 paused_ = false;
163 }
164
SetStatus(bool enable)165 void PacedSender::SetStatus(bool enable) {
166 CriticalSectionScoped cs(critsect_.get());
167 enabled_ = enable;
168 }
169
Enabled() const170 bool PacedSender::Enabled() const {
171 CriticalSectionScoped cs(critsect_.get());
172 return enabled_;
173 }
174
UpdateBitrate(int max_bitrate_kbps,int min_bitrate_kbps)175 void PacedSender::UpdateBitrate(int max_bitrate_kbps,
176 int min_bitrate_kbps) {
177 CriticalSectionScoped cs(critsect_.get());
178 media_budget_->set_target_rate_kbps(max_bitrate_kbps);
179 padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
180 }
181
SendPacket(Priority priority,uint32_t ssrc,uint16_t sequence_number,int64_t capture_time_ms,int bytes,bool retransmission)182 bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
183 uint16_t sequence_number, int64_t capture_time_ms, int bytes,
184 bool retransmission) {
185 CriticalSectionScoped cs(critsect_.get());
186
187 if (!enabled_) {
188 return true; // We can send now.
189 }
190 if (capture_time_ms < 0) {
191 capture_time_ms = clock_->TimeInMilliseconds();
192 }
193 if (priority != kHighPriority &&
194 capture_time_ms > capture_time_ms_last_queued_) {
195 capture_time_ms_last_queued_ = capture_time_ms;
196 TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
197 "capture_time_ms", capture_time_ms);
198 }
199 paced_sender::PacketList* packet_list = NULL;
200 switch (priority) {
201 case kHighPriority:
202 packet_list = high_priority_packets_.get();
203 break;
204 case kNormalPriority:
205 packet_list = normal_priority_packets_.get();
206 break;
207 case kLowPriority:
208 packet_list = low_priority_packets_.get();
209 break;
210 }
211 packet_list->push_back(paced_sender::Packet(ssrc,
212 sequence_number,
213 capture_time_ms,
214 clock_->TimeInMilliseconds(),
215 bytes,
216 retransmission));
217 return false;
218 }
219
set_max_queue_length_ms(int max_queue_length_ms)220 void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
221 CriticalSectionScoped cs(critsect_.get());
222 max_queue_length_ms_ = max_queue_length_ms;
223 }
224
QueueInMs() const225 int PacedSender::QueueInMs() const {
226 CriticalSectionScoped cs(critsect_.get());
227 int64_t now_ms = clock_->TimeInMilliseconds();
228 int64_t oldest_packet_enqueue_time = now_ms;
229 if (!high_priority_packets_->empty()) {
230 oldest_packet_enqueue_time =
231 std::min(oldest_packet_enqueue_time,
232 high_priority_packets_->front().enqueue_time_ms);
233 }
234 if (!normal_priority_packets_->empty()) {
235 oldest_packet_enqueue_time =
236 std::min(oldest_packet_enqueue_time,
237 normal_priority_packets_->front().enqueue_time_ms);
238 }
239 if (!low_priority_packets_->empty()) {
240 oldest_packet_enqueue_time =
241 std::min(oldest_packet_enqueue_time,
242 low_priority_packets_->front().enqueue_time_ms);
243 }
244 return now_ms - oldest_packet_enqueue_time;
245 }
246
TimeUntilNextProcess()247 int32_t PacedSender::TimeUntilNextProcess() {
248 CriticalSectionScoped cs(critsect_.get());
249 int64_t elapsed_time_ms = (clock_->TimeInMicroseconds() -
250 time_last_update_us_ + 500) / 1000;
251 if (elapsed_time_ms <= 0) {
252 return kMinPacketLimitMs;
253 }
254 if (elapsed_time_ms >= kMinPacketLimitMs) {
255 return 0;
256 }
257 return kMinPacketLimitMs - elapsed_time_ms;
258 }
259
Process()260 int32_t PacedSender::Process() {
261 int64_t now_us = clock_->TimeInMicroseconds();
262 CriticalSectionScoped cs(critsect_.get());
263 int elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000;
264 time_last_update_us_ = now_us;
265 if (!enabled_) {
266 return 0;
267 }
268 if (!paused_) {
269 if (elapsed_time_ms > 0) {
270 uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
271 UpdateBytesPerInterval(delta_time_ms);
272 }
273 paced_sender::PacketList* packet_list;
274 while (ShouldSendNextPacket(&packet_list)) {
275 if (!SendPacketFromList(packet_list))
276 return 0;
277 }
278 if (high_priority_packets_->empty() &&
279 normal_priority_packets_->empty() &&
280 low_priority_packets_->empty() &&
281 padding_budget_->bytes_remaining() > 0) {
282 int padding_needed = padding_budget_->bytes_remaining();
283 critsect_->Leave();
284 int bytes_sent = callback_->TimeToSendPadding(padding_needed);
285 critsect_->Enter();
286 media_budget_->UseBudget(bytes_sent);
287 padding_budget_->UseBudget(bytes_sent);
288 }
289 }
290 return 0;
291 }
292
SendPacketFromList(paced_sender::PacketList * packet_list)293 bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list)
294 EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) {
295 paced_sender::Packet packet = GetNextPacketFromList(packet_list);
296 critsect_->Leave();
297
298 const bool success = callback_->TimeToSendPacket(packet.ssrc,
299 packet.sequence_number,
300 packet.capture_time_ms,
301 packet.retransmission);
302 critsect_->Enter();
303 // If packet cannot be sent then keep it in packet list and exit early.
304 // There's no need to send more packets.
305 if (!success) {
306 return false;
307 }
308 packet_list->pop_front();
309 const bool last_packet =
310 packet_list->empty() ||
311 packet_list->front().capture_time_ms > packet.capture_time_ms;
312 if (packet_list != high_priority_packets_.get()) {
313 if (packet.capture_time_ms > capture_time_ms_last_sent_) {
314 capture_time_ms_last_sent_ = packet.capture_time_ms;
315 } else if (packet.capture_time_ms == capture_time_ms_last_sent_ &&
316 last_packet) {
317 TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", packet.capture_time_ms);
318 }
319 }
320 return true;
321 }
322
UpdateBytesPerInterval(uint32_t delta_time_ms)323 void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
324 media_budget_->IncreaseBudget(delta_time_ms);
325 padding_budget_->IncreaseBudget(delta_time_ms);
326 }
327
ShouldSendNextPacket(paced_sender::PacketList ** packet_list)328 bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
329 *packet_list = NULL;
330 if (media_budget_->bytes_remaining() <= 0) {
331 // All bytes consumed for this interval.
332 // Check if we have not sent in a too long time.
333 if (clock_->TimeInMicroseconds() - time_last_send_us_ >
334 kMaxQueueTimeWithoutSendingUs) {
335 if (!high_priority_packets_->empty()) {
336 *packet_list = high_priority_packets_.get();
337 return true;
338 }
339 if (!normal_priority_packets_->empty()) {
340 *packet_list = normal_priority_packets_.get();
341 return true;
342 }
343 }
344 // Send any old packets to avoid queuing for too long.
345 if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) {
346 int64_t high_priority_capture_time = -1;
347 if (!high_priority_packets_->empty()) {
348 high_priority_capture_time =
349 high_priority_packets_->front().capture_time_ms;
350 *packet_list = high_priority_packets_.get();
351 }
352 if (!normal_priority_packets_->empty() &&
353 (high_priority_capture_time == -1 ||
354 high_priority_capture_time >
355 normal_priority_packets_->front().capture_time_ms)) {
356 *packet_list = normal_priority_packets_.get();
357 }
358 if (*packet_list)
359 return true;
360 }
361 return false;
362 }
363 if (!high_priority_packets_->empty()) {
364 *packet_list = high_priority_packets_.get();
365 return true;
366 }
367 if (!normal_priority_packets_->empty()) {
368 *packet_list = normal_priority_packets_.get();
369 return true;
370 }
371 if (!low_priority_packets_->empty()) {
372 *packet_list = low_priority_packets_.get();
373 return true;
374 }
375 return false;
376 }
377
GetNextPacketFromList(paced_sender::PacketList * packets)378 paced_sender::Packet PacedSender::GetNextPacketFromList(
379 paced_sender::PacketList* packets) {
380 paced_sender::Packet packet = packets->front();
381 UpdateMediaBytesSent(packet.bytes);
382 return packet;
383 }
384
UpdateMediaBytesSent(int num_bytes)385 void PacedSender::UpdateMediaBytesSent(int num_bytes) {
386 time_last_send_us_ = clock_->TimeInMicroseconds();
387 media_budget_->UseBudget(num_bytes);
388 padding_budget_->UseBudget(num_bytes);
389 }
390
391 } // namespace webrtc
392