• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/modules/pacing/include/paced_sender.h"
12 
13 #include <assert.h>
14 
15 #include <map>
16 #include <set>
17 
18 #include "webrtc/modules/interface/module_common_types.h"
19 #include "webrtc/system_wrappers/interface/clock.h"
20 #include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
21 #include "webrtc/system_wrappers/interface/trace_event.h"
22 
23 namespace {
24 // Time limit in milliseconds between packet bursts.
25 const int kMinPacketLimitMs = 5;
26 
27 // Upper cap on process interval, in case process has not been called in a long
28 // time.
29 const int kMaxIntervalTimeMs = 30;
30 
31 // Max time that the first packet in the queue can sit in the queue if no
32 // packets are sent, regardless of buffer state. In practice only in effect at
33 // low bitrates (less than 320 kbits/s).
34 const int kMaxQueueTimeWithoutSendingUs = 30000;
35 
36 }  // namespace
37 
38 namespace webrtc {
39 namespace paced_sender {
40 struct Packet {
Packetwebrtc::paced_sender::Packet41   Packet(uint32_t ssrc,
42          uint16_t seq_number,
43          int64_t capture_time_ms,
44          int64_t enqueue_time_ms,
45          int length_in_bytes,
46          bool retransmission)
47       : ssrc(ssrc),
48         sequence_number(seq_number),
49         capture_time_ms(capture_time_ms),
50         enqueue_time_ms(enqueue_time_ms),
51         bytes(length_in_bytes),
52         retransmission(retransmission) {}
53   uint32_t ssrc;
54   uint16_t sequence_number;
55   int64_t capture_time_ms;
56   int64_t enqueue_time_ms;
57   int bytes;
58   bool retransmission;
59 };
60 
61 // STL list style class which prevents duplicates in the list.
62 class PacketList {
63  public:
PacketList()64   PacketList() {};
65 
empty() const66   bool empty() const {
67     return packet_list_.empty();
68   }
69 
front() const70   Packet front() const {
71     return packet_list_.front();
72   }
73 
pop_front()74   void pop_front() {
75     Packet& packet = packet_list_.front();
76     uint16_t sequence_number = packet.sequence_number;
77     uint32_t ssrc = packet.ssrc;
78     packet_list_.pop_front();
79     sequence_number_set_[ssrc].erase(sequence_number);
80   }
81 
push_back(const Packet & packet)82   void push_back(const Packet& packet) {
83     if (sequence_number_set_[packet.ssrc].find(packet.sequence_number) ==
84         sequence_number_set_[packet.ssrc].end()) {
85       // Don't insert duplicates.
86       packet_list_.push_back(packet);
87       sequence_number_set_[packet.ssrc].insert(packet.sequence_number);
88     }
89   }
90 
91  private:
92   std::list<Packet> packet_list_;
93   std::map<uint32_t, std::set<uint16_t> > sequence_number_set_;
94 };
95 
96 class IntervalBudget {
97  public:
IntervalBudget(int initial_target_rate_kbps)98   explicit IntervalBudget(int initial_target_rate_kbps)
99       : target_rate_kbps_(initial_target_rate_kbps),
100         bytes_remaining_(0) {}
101 
set_target_rate_kbps(int target_rate_kbps)102   void set_target_rate_kbps(int target_rate_kbps) {
103     target_rate_kbps_ = target_rate_kbps;
104   }
105 
IncreaseBudget(int delta_time_ms)106   void IncreaseBudget(int delta_time_ms) {
107     int bytes = target_rate_kbps_ * delta_time_ms / 8;
108     if (bytes_remaining_ < 0) {
109       // We overused last interval, compensate this interval.
110       bytes_remaining_ = bytes_remaining_ + bytes;
111     } else {
112       // If we underused last interval we can't use it this interval.
113       bytes_remaining_ = bytes;
114     }
115   }
116 
UseBudget(int bytes)117   void UseBudget(int bytes) {
118     bytes_remaining_ = std::max(bytes_remaining_ - bytes,
119                                 -500 * target_rate_kbps_ / 8);
120   }
121 
bytes_remaining() const122   int bytes_remaining() const { return bytes_remaining_; }
123 
124  private:
125   int target_rate_kbps_;
126   int bytes_remaining_;
127 };
128 }  // namespace paced_sender
129 
130 const float PacedSender::kDefaultPaceMultiplier = 2.5f;
131 
PacedSender(Clock * clock,Callback * callback,int max_bitrate_kbps,int min_bitrate_kbps)132 PacedSender::PacedSender(Clock* clock,
133                          Callback* callback,
134                          int max_bitrate_kbps,
135                          int min_bitrate_kbps)
136     : clock_(clock),
137       callback_(callback),
138       critsect_(CriticalSectionWrapper::CreateCriticalSection()),
139       enabled_(true),
140       paused_(false),
141       max_queue_length_ms_(kDefaultMaxQueueLengthMs),
142       media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
143       padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
144       time_last_update_us_(clock->TimeInMicroseconds()),
145       capture_time_ms_last_queued_(0),
146       capture_time_ms_last_sent_(0),
147       high_priority_packets_(new paced_sender::PacketList),
148       normal_priority_packets_(new paced_sender::PacketList),
149       low_priority_packets_(new paced_sender::PacketList) {
150   UpdateBytesPerInterval(kMinPacketLimitMs);
151 }
152 
~PacedSender()153 PacedSender::~PacedSender() {}
154 
Pause()155 void PacedSender::Pause() {
156   CriticalSectionScoped cs(critsect_.get());
157   paused_ = true;
158 }
159 
Resume()160 void PacedSender::Resume() {
161   CriticalSectionScoped cs(critsect_.get());
162   paused_ = false;
163 }
164 
SetStatus(bool enable)165 void PacedSender::SetStatus(bool enable) {
166   CriticalSectionScoped cs(critsect_.get());
167   enabled_ = enable;
168 }
169 
Enabled() const170 bool PacedSender::Enabled() const {
171   CriticalSectionScoped cs(critsect_.get());
172   return enabled_;
173 }
174 
UpdateBitrate(int max_bitrate_kbps,int min_bitrate_kbps)175 void PacedSender::UpdateBitrate(int max_bitrate_kbps,
176                                 int min_bitrate_kbps) {
177   CriticalSectionScoped cs(critsect_.get());
178   media_budget_->set_target_rate_kbps(max_bitrate_kbps);
179   padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
180 }
181 
SendPacket(Priority priority,uint32_t ssrc,uint16_t sequence_number,int64_t capture_time_ms,int bytes,bool retransmission)182 bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
183     uint16_t sequence_number, int64_t capture_time_ms, int bytes,
184     bool retransmission) {
185   CriticalSectionScoped cs(critsect_.get());
186 
187   if (!enabled_) {
188     return true;  // We can send now.
189   }
190   if (capture_time_ms < 0) {
191     capture_time_ms = clock_->TimeInMilliseconds();
192   }
193   if (priority != kHighPriority &&
194       capture_time_ms > capture_time_ms_last_queued_) {
195     capture_time_ms_last_queued_ = capture_time_ms;
196     TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
197                              "capture_time_ms", capture_time_ms);
198   }
199   paced_sender::PacketList* packet_list = NULL;
200   switch (priority) {
201     case kHighPriority:
202       packet_list = high_priority_packets_.get();
203       break;
204     case kNormalPriority:
205       packet_list = normal_priority_packets_.get();
206       break;
207     case kLowPriority:
208       packet_list = low_priority_packets_.get();
209       break;
210   }
211   packet_list->push_back(paced_sender::Packet(ssrc,
212                                               sequence_number,
213                                               capture_time_ms,
214                                               clock_->TimeInMilliseconds(),
215                                               bytes,
216                                               retransmission));
217   return false;
218 }
219 
set_max_queue_length_ms(int max_queue_length_ms)220 void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
221   CriticalSectionScoped cs(critsect_.get());
222   max_queue_length_ms_ = max_queue_length_ms;
223 }
224 
QueueInMs() const225 int PacedSender::QueueInMs() const {
226   CriticalSectionScoped cs(critsect_.get());
227   int64_t now_ms = clock_->TimeInMilliseconds();
228   int64_t oldest_packet_enqueue_time = now_ms;
229   if (!high_priority_packets_->empty()) {
230     oldest_packet_enqueue_time =
231         std::min(oldest_packet_enqueue_time,
232                  high_priority_packets_->front().enqueue_time_ms);
233   }
234   if (!normal_priority_packets_->empty()) {
235     oldest_packet_enqueue_time =
236         std::min(oldest_packet_enqueue_time,
237                  normal_priority_packets_->front().enqueue_time_ms);
238   }
239   if (!low_priority_packets_->empty()) {
240     oldest_packet_enqueue_time =
241         std::min(oldest_packet_enqueue_time,
242                  low_priority_packets_->front().enqueue_time_ms);
243   }
244   return now_ms - oldest_packet_enqueue_time;
245 }
246 
TimeUntilNextProcess()247 int32_t PacedSender::TimeUntilNextProcess() {
248   CriticalSectionScoped cs(critsect_.get());
249   int64_t elapsed_time_ms = (clock_->TimeInMicroseconds() -
250       time_last_update_us_ + 500) / 1000;
251   if (elapsed_time_ms <= 0) {
252     return kMinPacketLimitMs;
253   }
254   if (elapsed_time_ms >= kMinPacketLimitMs) {
255     return 0;
256   }
257   return kMinPacketLimitMs - elapsed_time_ms;
258 }
259 
Process()260 int32_t PacedSender::Process() {
261   int64_t now_us = clock_->TimeInMicroseconds();
262   CriticalSectionScoped cs(critsect_.get());
263   int elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000;
264   time_last_update_us_ = now_us;
265   if (!enabled_) {
266     return 0;
267   }
268   if (!paused_) {
269     if (elapsed_time_ms > 0) {
270       uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
271       UpdateBytesPerInterval(delta_time_ms);
272     }
273     paced_sender::PacketList* packet_list;
274     while (ShouldSendNextPacket(&packet_list)) {
275       if (!SendPacketFromList(packet_list))
276         return 0;
277     }
278     if (high_priority_packets_->empty() &&
279         normal_priority_packets_->empty() &&
280         low_priority_packets_->empty() &&
281         padding_budget_->bytes_remaining() > 0) {
282       int padding_needed = padding_budget_->bytes_remaining();
283       critsect_->Leave();
284       int bytes_sent = callback_->TimeToSendPadding(padding_needed);
285       critsect_->Enter();
286       media_budget_->UseBudget(bytes_sent);
287       padding_budget_->UseBudget(bytes_sent);
288     }
289   }
290   return 0;
291 }
292 
SendPacketFromList(paced_sender::PacketList * packet_list)293 bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list)
294     EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) {
295   paced_sender::Packet packet = GetNextPacketFromList(packet_list);
296   critsect_->Leave();
297 
298   const bool success = callback_->TimeToSendPacket(packet.ssrc,
299                                                    packet.sequence_number,
300                                                    packet.capture_time_ms,
301                                                    packet.retransmission);
302   critsect_->Enter();
303   // If packet cannot be sent then keep it in packet list and exit early.
304   // There's no need to send more packets.
305   if (!success) {
306     return false;
307   }
308   packet_list->pop_front();
309   const bool last_packet =
310       packet_list->empty() ||
311       packet_list->front().capture_time_ms > packet.capture_time_ms;
312   if (packet_list != high_priority_packets_.get()) {
313     if (packet.capture_time_ms > capture_time_ms_last_sent_) {
314       capture_time_ms_last_sent_ = packet.capture_time_ms;
315     } else if (packet.capture_time_ms == capture_time_ms_last_sent_ &&
316                last_packet) {
317       TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", packet.capture_time_ms);
318     }
319   }
320   return true;
321 }
322 
UpdateBytesPerInterval(uint32_t delta_time_ms)323 void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
324   media_budget_->IncreaseBudget(delta_time_ms);
325   padding_budget_->IncreaseBudget(delta_time_ms);
326 }
327 
ShouldSendNextPacket(paced_sender::PacketList ** packet_list)328 bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
329   *packet_list = NULL;
330   if (media_budget_->bytes_remaining() <= 0) {
331     // All bytes consumed for this interval.
332     // Check if we have not sent in a too long time.
333     if (clock_->TimeInMicroseconds() - time_last_send_us_ >
334         kMaxQueueTimeWithoutSendingUs) {
335       if (!high_priority_packets_->empty()) {
336         *packet_list = high_priority_packets_.get();
337         return true;
338       }
339       if (!normal_priority_packets_->empty()) {
340         *packet_list = normal_priority_packets_.get();
341         return true;
342       }
343     }
344     // Send any old packets to avoid queuing for too long.
345     if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) {
346       int64_t high_priority_capture_time = -1;
347       if (!high_priority_packets_->empty()) {
348         high_priority_capture_time =
349             high_priority_packets_->front().capture_time_ms;
350         *packet_list = high_priority_packets_.get();
351       }
352       if (!normal_priority_packets_->empty() &&
353           (high_priority_capture_time == -1 ||
354            high_priority_capture_time >
355                normal_priority_packets_->front().capture_time_ms)) {
356         *packet_list = normal_priority_packets_.get();
357       }
358       if (*packet_list)
359         return true;
360     }
361     return false;
362   }
363   if (!high_priority_packets_->empty()) {
364     *packet_list = high_priority_packets_.get();
365     return true;
366   }
367   if (!normal_priority_packets_->empty()) {
368     *packet_list = normal_priority_packets_.get();
369     return true;
370   }
371   if (!low_priority_packets_->empty()) {
372     *packet_list = low_priority_packets_.get();
373     return true;
374   }
375   return false;
376 }
377 
GetNextPacketFromList(paced_sender::PacketList * packets)378 paced_sender::Packet PacedSender::GetNextPacketFromList(
379     paced_sender::PacketList* packets) {
380   paced_sender::Packet packet = packets->front();
381   UpdateMediaBytesSent(packet.bytes);
382   return packet;
383 }
384 
UpdateMediaBytesSent(int num_bytes)385 void PacedSender::UpdateMediaBytesSent(int num_bytes) {
386   time_last_send_us_ = clock_->TimeInMicroseconds();
387   media_budget_->UseBudget(num_bytes);
388   padding_budget_->UseBudget(num_bytes);
389 }
390 
391 }  // namespace webrtc
392