• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/modules/pacing/include/paced_sender.h"
12 
13 #include <assert.h>
14 
15 #include "webrtc/modules/interface/module_common_types.h"
16 #include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
17 #include "webrtc/system_wrappers/interface/trace_event.h"
18 
19 namespace {
20 // Time limit in milliseconds between packet bursts.
21 const int kMinPacketLimitMs = 5;
22 
23 // Upper cap on process interval, in case process has not been called in a long
24 // time.
25 const int kMaxIntervalTimeMs = 30;
26 
27 // Max time that the first packet in the queue can sit in the queue if no
28 // packets are sent, regardless of buffer state. In practice only in effect at
29 // low bitrates (less than 320 kbits/s).
30 const int kMaxQueueTimeWithoutSendingMs = 30;
31 
32 }  // namespace
33 
34 namespace webrtc {
35 
36 namespace paced_sender {
37 struct Packet {
Packetwebrtc::paced_sender::Packet38   Packet(uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms,
39          int64_t enqueue_time_ms, int length_in_bytes, bool retransmission)
40       : ssrc_(ssrc),
41         sequence_number_(seq_number),
42         capture_time_ms_(capture_time_ms),
43         enqueue_time_ms_(enqueue_time_ms),
44         bytes_(length_in_bytes),
45         retransmission_(retransmission) {
46   }
47   uint32_t ssrc_;
48   uint16_t sequence_number_;
49   int64_t capture_time_ms_;
50   int64_t enqueue_time_ms_;
51   int bytes_;
52   bool retransmission_;
53 };
54 
55 // STL list style class which prevents duplicates in the list.
56 class PacketList {
57  public:
PacketList()58   PacketList() {};
59 
empty() const60   bool empty() const {
61     return packet_list_.empty();
62   }
63 
front() const64   Packet front() const {
65     return packet_list_.front();
66   }
67 
pop_front()68   void pop_front() {
69     Packet& packet = packet_list_.front();
70     uint16_t sequence_number = packet.sequence_number_;
71     packet_list_.pop_front();
72     sequence_number_set_.erase(sequence_number);
73   }
74 
push_back(const Packet & packet)75   void push_back(const Packet& packet) {
76     if (sequence_number_set_.find(packet.sequence_number_) ==
77         sequence_number_set_.end()) {
78       // Don't insert duplicates.
79       packet_list_.push_back(packet);
80       sequence_number_set_.insert(packet.sequence_number_);
81     }
82   }
83 
84  private:
85   std::list<Packet> packet_list_;
86   std::set<uint16_t> sequence_number_set_;
87 };
88 
89 class IntervalBudget {
90  public:
IntervalBudget(int initial_target_rate_kbps)91   explicit IntervalBudget(int initial_target_rate_kbps)
92       : target_rate_kbps_(initial_target_rate_kbps),
93         bytes_remaining_(0) {}
94 
set_target_rate_kbps(int target_rate_kbps)95   void set_target_rate_kbps(int target_rate_kbps) {
96     target_rate_kbps_ = target_rate_kbps;
97   }
98 
IncreaseBudget(int delta_time_ms)99   void IncreaseBudget(int delta_time_ms) {
100     int bytes = target_rate_kbps_ * delta_time_ms / 8;
101     if (bytes_remaining_ < 0) {
102       // We overused last interval, compensate this interval.
103       bytes_remaining_ = bytes_remaining_ + bytes;
104     } else {
105       // If we underused last interval we can't use it this interval.
106       bytes_remaining_ = bytes;
107     }
108   }
109 
UseBudget(int bytes)110   void UseBudget(int bytes) {
111     bytes_remaining_ = std::max(bytes_remaining_ - bytes,
112                                 -500 * target_rate_kbps_ / 8);
113   }
114 
bytes_remaining() const115   int bytes_remaining() const { return bytes_remaining_; }
116 
117  private:
118   int target_rate_kbps_;
119   int bytes_remaining_;
120 };
121 }  // namespace paced_sender
122 
PacedSender(Callback * callback,int max_bitrate_kbps,int min_bitrate_kbps)123 PacedSender::PacedSender(Callback* callback,
124                          int max_bitrate_kbps,
125                          int min_bitrate_kbps)
126     : callback_(callback),
127       enabled_(true),
128       paused_(false),
129       max_queue_length_ms_(kDefaultMaxQueueLengthMs),
130       critsect_(CriticalSectionWrapper::CreateCriticalSection()),
131       media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
132       padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
133       time_last_update_(TickTime::Now()),
134       capture_time_ms_last_queued_(0),
135       capture_time_ms_last_sent_(0),
136       high_priority_packets_(new paced_sender::PacketList),
137       normal_priority_packets_(new paced_sender::PacketList),
138       low_priority_packets_(new paced_sender::PacketList) {
139   UpdateBytesPerInterval(kMinPacketLimitMs);
140 }
141 
~PacedSender()142 PacedSender::~PacedSender() {
143 }
144 
Pause()145 void PacedSender::Pause() {
146   CriticalSectionScoped cs(critsect_.get());
147   paused_ = true;
148 }
149 
Resume()150 void PacedSender::Resume() {
151   CriticalSectionScoped cs(critsect_.get());
152   paused_ = false;
153 }
154 
SetStatus(bool enable)155 void PacedSender::SetStatus(bool enable) {
156   CriticalSectionScoped cs(critsect_.get());
157   enabled_ = enable;
158 }
159 
Enabled() const160 bool PacedSender::Enabled() const {
161   CriticalSectionScoped cs(critsect_.get());
162   return enabled_;
163 }
164 
UpdateBitrate(int max_bitrate_kbps,int min_bitrate_kbps)165 void PacedSender::UpdateBitrate(int max_bitrate_kbps,
166                                 int min_bitrate_kbps) {
167   CriticalSectionScoped cs(critsect_.get());
168   media_budget_->set_target_rate_kbps(max_bitrate_kbps);
169   padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
170 }
171 
SendPacket(Priority priority,uint32_t ssrc,uint16_t sequence_number,int64_t capture_time_ms,int bytes,bool retransmission)172 bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
173     uint16_t sequence_number, int64_t capture_time_ms, int bytes,
174     bool retransmission) {
175   CriticalSectionScoped cs(critsect_.get());
176 
177   if (!enabled_) {
178     return true;  // We can send now.
179   }
180   if (capture_time_ms < 0) {
181     capture_time_ms = TickTime::MillisecondTimestamp();
182   }
183   if (priority != kHighPriority &&
184       capture_time_ms > capture_time_ms_last_queued_) {
185     capture_time_ms_last_queued_ = capture_time_ms;
186     TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
187                              "capture_time_ms", capture_time_ms);
188   }
189   paced_sender::PacketList* packet_list = NULL;
190   switch (priority) {
191     case kHighPriority:
192       packet_list = high_priority_packets_.get();
193       break;
194     case kNormalPriority:
195       packet_list = normal_priority_packets_.get();
196       break;
197     case kLowPriority:
198       packet_list = low_priority_packets_.get();
199       break;
200   }
201   packet_list->push_back(paced_sender::Packet(ssrc,
202                                               sequence_number,
203                                               capture_time_ms,
204                                               TickTime::MillisecondTimestamp(),
205                                               bytes,
206                                               retransmission));
207   return false;
208 }
209 
set_max_queue_length_ms(int max_queue_length_ms)210 void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
211   CriticalSectionScoped cs(critsect_.get());
212   max_queue_length_ms_ = max_queue_length_ms;
213 }
214 
QueueInMs() const215 int PacedSender::QueueInMs() const {
216   CriticalSectionScoped cs(critsect_.get());
217   int64_t now_ms = TickTime::MillisecondTimestamp();
218   int64_t oldest_packet_enqueue_time = now_ms;
219   if (!high_priority_packets_->empty()) {
220     oldest_packet_enqueue_time = std::min(
221         oldest_packet_enqueue_time,
222         high_priority_packets_->front().enqueue_time_ms_);
223   }
224   if (!normal_priority_packets_->empty()) {
225     oldest_packet_enqueue_time = std::min(
226         oldest_packet_enqueue_time,
227         normal_priority_packets_->front().enqueue_time_ms_);
228   }
229   if (!low_priority_packets_->empty()) {
230     oldest_packet_enqueue_time = std::min(
231         oldest_packet_enqueue_time,
232         low_priority_packets_->front().enqueue_time_ms_);
233   }
234   return now_ms - oldest_packet_enqueue_time;
235 }
236 
TimeUntilNextProcess()237 int32_t PacedSender::TimeUntilNextProcess() {
238   CriticalSectionScoped cs(critsect_.get());
239   int64_t elapsed_time_ms =
240       (TickTime::Now() - time_last_update_).Milliseconds();
241   if (elapsed_time_ms <= 0) {
242     return kMinPacketLimitMs;
243   }
244   if (elapsed_time_ms >= kMinPacketLimitMs) {
245     return 0;
246   }
247   return kMinPacketLimitMs - elapsed_time_ms;
248 }
249 
Process()250 int32_t PacedSender::Process() {
251   TickTime now = TickTime::Now();
252   CriticalSectionScoped cs(critsect_.get());
253   int elapsed_time_ms = (now - time_last_update_).Milliseconds();
254   time_last_update_ = now;
255   if (!enabled_) {
256     return 0;
257   }
258   if (!paused_) {
259     if (elapsed_time_ms > 0) {
260       uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
261       UpdateBytesPerInterval(delta_time_ms);
262     }
263     paced_sender::PacketList* packet_list;
264     while (ShouldSendNextPacket(&packet_list)) {
265       if (!SendPacketFromList(packet_list))
266         return 0;
267     }
268     if (high_priority_packets_->empty() &&
269         normal_priority_packets_->empty() &&
270         low_priority_packets_->empty() &&
271         padding_budget_->bytes_remaining() > 0) {
272       int padding_needed = padding_budget_->bytes_remaining();
273       critsect_->Leave();
274       int bytes_sent = callback_->TimeToSendPadding(padding_needed);
275       critsect_->Enter();
276       media_budget_->UseBudget(bytes_sent);
277       padding_budget_->UseBudget(bytes_sent);
278     }
279   }
280   return 0;
281 }
282 
283 // MUST have critsect_ when calling.
SendPacketFromList(paced_sender::PacketList * packet_list)284 bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list)
285     EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) {
286   paced_sender::Packet packet = GetNextPacketFromList(packet_list);
287   critsect_->Leave();
288 
289   const bool success = callback_->TimeToSendPacket(packet.ssrc_,
290                                                    packet.sequence_number_,
291                                                    packet.capture_time_ms_,
292                                                    packet.retransmission_);
293   critsect_->Enter();
294   // If packet cannot be sent then keep it in packet list and exit early.
295   // There's no need to send more packets.
296   if (!success) {
297     return false;
298   }
299   packet_list->pop_front();
300   const bool last_packet = packet_list->empty() ||
301       packet_list->front().capture_time_ms_ > packet.capture_time_ms_;
302   if (packet_list != high_priority_packets_.get()) {
303     if (packet.capture_time_ms_ > capture_time_ms_last_sent_) {
304       capture_time_ms_last_sent_ = packet.capture_time_ms_;
305     } else if (packet.capture_time_ms_ == capture_time_ms_last_sent_ &&
306                last_packet) {
307       TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend",
308           packet.capture_time_ms_);
309     }
310   }
311   return true;
312 }
313 
314 // MUST have critsect_ when calling.
UpdateBytesPerInterval(uint32_t delta_time_ms)315 void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
316   media_budget_->IncreaseBudget(delta_time_ms);
317   padding_budget_->IncreaseBudget(delta_time_ms);
318 }
319 
320 // MUST have critsect_ when calling.
ShouldSendNextPacket(paced_sender::PacketList ** packet_list)321 bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
322   *packet_list = NULL;
323   if (media_budget_->bytes_remaining() <= 0) {
324     // All bytes consumed for this interval.
325     // Check if we have not sent in a too long time.
326     if ((TickTime::Now() - time_last_send_).Milliseconds() >
327         kMaxQueueTimeWithoutSendingMs) {
328       if (!high_priority_packets_->empty()) {
329         *packet_list = high_priority_packets_.get();
330         return true;
331       }
332       if (!normal_priority_packets_->empty()) {
333         *packet_list = normal_priority_packets_.get();
334         return true;
335       }
336     }
337     // Send any old packets to avoid queuing for too long.
338     if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) {
339       int64_t high_priority_capture_time = -1;
340       if (!high_priority_packets_->empty()) {
341         high_priority_capture_time =
342             high_priority_packets_->front().capture_time_ms_;
343         *packet_list = high_priority_packets_.get();
344       }
345       if (!normal_priority_packets_->empty() &&
346           (high_priority_capture_time == -1 || high_priority_capture_time >
347           normal_priority_packets_->front().capture_time_ms_)) {
348         *packet_list = normal_priority_packets_.get();
349       }
350       if (*packet_list)
351         return true;
352     }
353     return false;
354   }
355   if (!high_priority_packets_->empty()) {
356     *packet_list = high_priority_packets_.get();
357     return true;
358   }
359   if (!normal_priority_packets_->empty()) {
360     *packet_list = normal_priority_packets_.get();
361     return true;
362   }
363   if (!low_priority_packets_->empty()) {
364     *packet_list = low_priority_packets_.get();
365     return true;
366   }
367   return false;
368 }
369 
GetNextPacketFromList(paced_sender::PacketList * packets)370 paced_sender::Packet PacedSender::GetNextPacketFromList(
371     paced_sender::PacketList* packets) {
372   paced_sender::Packet packet = packets->front();
373   UpdateMediaBytesSent(packet.bytes_);
374   return packet;
375 }
376 
377 // MUST have critsect_ when calling.
UpdateMediaBytesSent(int num_bytes)378 void PacedSender::UpdateMediaBytesSent(int num_bytes) {
379   time_last_send_ = TickTime::Now();
380   media_budget_->UseBudget(num_bytes);
381   padding_budget_->UseBudget(num_bytes);
382 }
383 
384 }  // namespace webrtc
385