• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "modules/pacing/task_queue_paced_sender.h"
12 
13 #include <algorithm>
14 #include <utility>
15 #include "absl/memory/memory.h"
16 #include "rtc_base/checks.h"
17 #include "rtc_base/event.h"
18 #include "rtc_base/logging.h"
19 #include "rtc_base/task_utils/to_queued_task.h"
20 #include "rtc_base/trace_event.h"
21 
22 namespace webrtc {
23 namespace {
24 // If no calls to MaybeProcessPackets() happen, make sure we update stats
25 // at least every |kMaxTimeBetweenStatsUpdates| as long as the pacer isn't
26 // completely drained.
27 constexpr TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33);
28 // Don't call UpdateStats() more than |kMinTimeBetweenStatsUpdates| apart,
29 // for performance reasons.
30 constexpr TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
31 }  // namespace
32 
TaskQueuePacedSender(Clock * clock,PacketRouter * packet_router,RtcEventLog * event_log,const WebRtcKeyValueConfig * field_trials,TaskQueueFactory * task_queue_factory,TimeDelta hold_back_window)33 TaskQueuePacedSender::TaskQueuePacedSender(
34     Clock* clock,
35     PacketRouter* packet_router,
36     RtcEventLog* event_log,
37     const WebRtcKeyValueConfig* field_trials,
38     TaskQueueFactory* task_queue_factory,
39     TimeDelta hold_back_window)
40     : clock_(clock),
41       hold_back_window_(hold_back_window),
42       pacing_controller_(clock,
43                          packet_router,
44                          event_log,
45                          field_trials,
46                          PacingController::ProcessMode::kDynamic),
47       next_process_time_(Timestamp::MinusInfinity()),
48       stats_update_scheduled_(false),
49       last_stats_time_(Timestamp::MinusInfinity()),
50       is_shutdown_(false),
51       task_queue_(task_queue_factory->CreateTaskQueue(
52           "TaskQueuePacedSender",
53           TaskQueueFactory::Priority::NORMAL)) {}
54 
~TaskQueuePacedSender()55 TaskQueuePacedSender::~TaskQueuePacedSender() {
56   // Post an immediate task to mark the queue as shutting down.
57   // The rtc::TaskQueue destructor will wait for pending tasks to
58   // complete before continuing.
59   task_queue_.PostTask([&]() {
60     RTC_DCHECK_RUN_ON(&task_queue_);
61     is_shutdown_ = true;
62   });
63 }
64 
CreateProbeCluster(DataRate bitrate,int cluster_id)65 void TaskQueuePacedSender::CreateProbeCluster(DataRate bitrate,
66                                               int cluster_id) {
67   task_queue_.PostTask([this, bitrate, cluster_id]() {
68     RTC_DCHECK_RUN_ON(&task_queue_);
69     pacing_controller_.CreateProbeCluster(bitrate, cluster_id);
70     MaybeProcessPackets(Timestamp::MinusInfinity());
71   });
72 }
73 
Pause()74 void TaskQueuePacedSender::Pause() {
75   task_queue_.PostTask([this]() {
76     RTC_DCHECK_RUN_ON(&task_queue_);
77     pacing_controller_.Pause();
78   });
79 }
80 
Resume()81 void TaskQueuePacedSender::Resume() {
82   task_queue_.PostTask([this]() {
83     RTC_DCHECK_RUN_ON(&task_queue_);
84     pacing_controller_.Resume();
85     MaybeProcessPackets(Timestamp::MinusInfinity());
86   });
87 }
88 
SetCongestionWindow(DataSize congestion_window_size)89 void TaskQueuePacedSender::SetCongestionWindow(
90     DataSize congestion_window_size) {
91   task_queue_.PostTask([this, congestion_window_size]() {
92     RTC_DCHECK_RUN_ON(&task_queue_);
93     pacing_controller_.SetCongestionWindow(congestion_window_size);
94     MaybeProcessPackets(Timestamp::MinusInfinity());
95   });
96 }
97 
UpdateOutstandingData(DataSize outstanding_data)98 void TaskQueuePacedSender::UpdateOutstandingData(DataSize outstanding_data) {
99   if (task_queue_.IsCurrent()) {
100     RTC_DCHECK_RUN_ON(&task_queue_);
101     // Fast path since this can be called once per sent packet while on the
102     // task queue.
103     pacing_controller_.UpdateOutstandingData(outstanding_data);
104     return;
105   }
106 
107   task_queue_.PostTask([this, outstanding_data]() {
108     RTC_DCHECK_RUN_ON(&task_queue_);
109     pacing_controller_.UpdateOutstandingData(outstanding_data);
110     MaybeProcessPackets(Timestamp::MinusInfinity());
111   });
112 }
113 
SetPacingRates(DataRate pacing_rate,DataRate padding_rate)114 void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
115                                           DataRate padding_rate) {
116   task_queue_.PostTask([this, pacing_rate, padding_rate]() {
117     RTC_DCHECK_RUN_ON(&task_queue_);
118     pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
119     MaybeProcessPackets(Timestamp::MinusInfinity());
120   });
121 }
122 
EnqueuePackets(std::vector<std::unique_ptr<RtpPacketToSend>> packets)123 void TaskQueuePacedSender::EnqueuePackets(
124     std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
125 #if RTC_TRACE_EVENTS_ENABLED
126   TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
127                "TaskQueuePacedSender::EnqueuePackets");
128   for (auto& packet : packets) {
129     TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
130                  "TaskQueuePacedSender::EnqueuePackets::Loop",
131                  "sequence_number", packet->SequenceNumber(), "rtp_timestamp",
132                  packet->Timestamp());
133   }
134 #endif
135 
136   task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable {
137     RTC_DCHECK_RUN_ON(&task_queue_);
138     for (auto& packet : packets_) {
139       pacing_controller_.EnqueuePacket(std::move(packet));
140     }
141     MaybeProcessPackets(Timestamp::MinusInfinity());
142   });
143 }
144 
SetAccountForAudioPackets(bool account_for_audio)145 void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
146   task_queue_.PostTask([this, account_for_audio]() {
147     RTC_DCHECK_RUN_ON(&task_queue_);
148     pacing_controller_.SetAccountForAudioPackets(account_for_audio);
149   });
150 }
151 
SetIncludeOverhead()152 void TaskQueuePacedSender::SetIncludeOverhead() {
153   task_queue_.PostTask([this]() {
154     RTC_DCHECK_RUN_ON(&task_queue_);
155     pacing_controller_.SetIncludeOverhead();
156   });
157 }
158 
SetTransportOverhead(DataSize overhead_per_packet)159 void TaskQueuePacedSender::SetTransportOverhead(DataSize overhead_per_packet) {
160   task_queue_.PostTask([this, overhead_per_packet]() {
161     RTC_DCHECK_RUN_ON(&task_queue_);
162     pacing_controller_.SetTransportOverhead(overhead_per_packet);
163   });
164 }
165 
SetQueueTimeLimit(TimeDelta limit)166 void TaskQueuePacedSender::SetQueueTimeLimit(TimeDelta limit) {
167   task_queue_.PostTask([this, limit]() {
168     RTC_DCHECK_RUN_ON(&task_queue_);
169     pacing_controller_.SetQueueTimeLimit(limit);
170     MaybeProcessPackets(Timestamp::MinusInfinity());
171   });
172 }
173 
ExpectedQueueTime() const174 TimeDelta TaskQueuePacedSender::ExpectedQueueTime() const {
175   return GetStats().expected_queue_time;
176 }
177 
QueueSizeData() const178 DataSize TaskQueuePacedSender::QueueSizeData() const {
179   return GetStats().queue_size;
180 }
181 
FirstSentPacketTime() const182 absl::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const {
183   return GetStats().first_sent_packet_time;
184 }
185 
OldestPacketWaitTime() const186 TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
187   return GetStats().oldest_packet_wait_time;
188 }
189 
OnStatsUpdated(const Stats & stats)190 void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) {
191   MutexLock lock(&stats_mutex_);
192   current_stats_ = stats;
193 }
194 
MaybeProcessPackets(Timestamp scheduled_process_time)195 void TaskQueuePacedSender::MaybeProcessPackets(
196     Timestamp scheduled_process_time) {
197   RTC_DCHECK_RUN_ON(&task_queue_);
198 
199   if (is_shutdown_) {
200     return;
201   }
202 
203   // Normally, run ProcessPackets() only if this is the scheduled task.
204   // If it is not but it is already time to process and there either is
205   // no scheduled task or the schedule has shifted forward in time, run
206   // anyway and clear any schedule.
207   Timestamp next_process_time = pacing_controller_.NextSendTime();
208   const Timestamp now = clock_->CurrentTime();
209   const bool is_scheduled_call = next_process_time_ == scheduled_process_time;
210   if (is_scheduled_call) {
211     // Indicate no pending scheduled call.
212     next_process_time_ = Timestamp::MinusInfinity();
213   }
214   if (is_scheduled_call ||
215       (now >= next_process_time && (next_process_time_.IsInfinite() ||
216                                     next_process_time < next_process_time_))) {
217     pacing_controller_.ProcessPackets();
218     next_process_time = pacing_controller_.NextSendTime();
219   }
220 
221   const TimeDelta min_sleep = pacing_controller_.IsProbing()
222                                   ? PacingController::kMinSleepTime
223                                   : hold_back_window_;
224   next_process_time = std::max(now + min_sleep, next_process_time);
225 
226   TimeDelta sleep_time = next_process_time - now;
227   if (next_process_time_.IsMinusInfinity() ||
228       next_process_time <=
229           next_process_time_ - PacingController::kMinSleepTime) {
230     next_process_time_ = next_process_time;
231 
232     task_queue_.PostDelayedTask(
233         [this, next_process_time]() { MaybeProcessPackets(next_process_time); },
234         sleep_time.ms<uint32_t>());
235   }
236 
237   MaybeUpdateStats(false);
238 }
239 
MaybeUpdateStats(bool is_scheduled_call)240 void TaskQueuePacedSender::MaybeUpdateStats(bool is_scheduled_call) {
241   if (is_shutdown_) {
242     if (is_scheduled_call) {
243       stats_update_scheduled_ = false;
244     }
245     return;
246   }
247 
248   Timestamp now = clock_->CurrentTime();
249   if (is_scheduled_call) {
250     // Allow scheduled task to process packets to clear up an remaining debt
251     // level in an otherwise empty queue.
252     pacing_controller_.ProcessPackets();
253   } else {
254     if (now - last_stats_time_ < kMinTimeBetweenStatsUpdates) {
255       // Too frequent unscheduled stats update, return early.
256       return;
257     }
258   }
259 
260   Stats new_stats;
261   new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime();
262   new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime();
263   new_stats.oldest_packet_wait_time = pacing_controller_.OldestPacketWaitTime();
264   new_stats.queue_size = pacing_controller_.QueueSizeData();
265   OnStatsUpdated(new_stats);
266 
267   last_stats_time_ = now;
268 
269   bool pacer_drained = pacing_controller_.QueueSizePackets() == 0 &&
270                        pacing_controller_.CurrentBufferLevel().IsZero();
271 
272   // If there's anything interesting to get from the pacer and this is a
273   // scheduled call (or no scheduled call in flight), post a new scheduled stats
274   // update.
275   if (!pacer_drained) {
276     if (!stats_update_scheduled_) {
277       // There is no pending delayed task to update stats, add one.
278       // Treat this call as being scheduled in order to bootstrap scheduling
279       // loop.
280       stats_update_scheduled_ = true;
281       is_scheduled_call = true;
282     }
283 
284     // Only if on the scheduled call loop do we want to schedule a new delayed
285     // task.
286     if (is_scheduled_call) {
287       task_queue_.PostDelayedTask(
288           [this]() {
289             RTC_DCHECK_RUN_ON(&task_queue_);
290             MaybeUpdateStats(true);
291           },
292           kMaxTimeBetweenStatsUpdates.ms<uint32_t>());
293     }
294   } else if (is_scheduled_call) {
295     // This is a scheduled call, signing out since there's nothing interesting
296     // left to check.
297     stats_update_scheduled_ = false;
298   }
299 }
300 
GetStats() const301 TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const {
302   MutexLock lock(&stats_mutex_);
303   return current_stats_;
304 }
305 
306 }  // namespace webrtc
307