1 /*
2 * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "modules/pacing/task_queue_paced_sender.h"
12
13 #include <algorithm>
14 #include <utility>
15 #include "absl/memory/memory.h"
16 #include "rtc_base/checks.h"
17 #include "rtc_base/event.h"
18 #include "rtc_base/logging.h"
19 #include "rtc_base/task_utils/to_queued_task.h"
20 #include "rtc_base/trace_event.h"
21
22 namespace webrtc {
23 namespace {
24 // If no calls to MaybeProcessPackets() happen, make sure we update stats
25 // at least every |kMaxTimeBetweenStatsUpdates| as long as the pacer isn't
26 // completely drained.
27 constexpr TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33);
28 // Don't call UpdateStats() more than |kMinTimeBetweenStatsUpdates| apart,
29 // for performance reasons.
30 constexpr TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
31 } // namespace
32
TaskQueuePacedSender(Clock * clock,PacketRouter * packet_router,RtcEventLog * event_log,const WebRtcKeyValueConfig * field_trials,TaskQueueFactory * task_queue_factory,TimeDelta hold_back_window)33 TaskQueuePacedSender::TaskQueuePacedSender(
34 Clock* clock,
35 PacketRouter* packet_router,
36 RtcEventLog* event_log,
37 const WebRtcKeyValueConfig* field_trials,
38 TaskQueueFactory* task_queue_factory,
39 TimeDelta hold_back_window)
40 : clock_(clock),
41 hold_back_window_(hold_back_window),
42 pacing_controller_(clock,
43 packet_router,
44 event_log,
45 field_trials,
46 PacingController::ProcessMode::kDynamic),
47 next_process_time_(Timestamp::MinusInfinity()),
48 stats_update_scheduled_(false),
49 last_stats_time_(Timestamp::MinusInfinity()),
50 is_shutdown_(false),
51 task_queue_(task_queue_factory->CreateTaskQueue(
52 "TaskQueuePacedSender",
53 TaskQueueFactory::Priority::NORMAL)) {}
54
~TaskQueuePacedSender()55 TaskQueuePacedSender::~TaskQueuePacedSender() {
56 // Post an immediate task to mark the queue as shutting down.
57 // The rtc::TaskQueue destructor will wait for pending tasks to
58 // complete before continuing.
59 task_queue_.PostTask([&]() {
60 RTC_DCHECK_RUN_ON(&task_queue_);
61 is_shutdown_ = true;
62 });
63 }
64
CreateProbeCluster(DataRate bitrate,int cluster_id)65 void TaskQueuePacedSender::CreateProbeCluster(DataRate bitrate,
66 int cluster_id) {
67 task_queue_.PostTask([this, bitrate, cluster_id]() {
68 RTC_DCHECK_RUN_ON(&task_queue_);
69 pacing_controller_.CreateProbeCluster(bitrate, cluster_id);
70 MaybeProcessPackets(Timestamp::MinusInfinity());
71 });
72 }
73
Pause()74 void TaskQueuePacedSender::Pause() {
75 task_queue_.PostTask([this]() {
76 RTC_DCHECK_RUN_ON(&task_queue_);
77 pacing_controller_.Pause();
78 });
79 }
80
Resume()81 void TaskQueuePacedSender::Resume() {
82 task_queue_.PostTask([this]() {
83 RTC_DCHECK_RUN_ON(&task_queue_);
84 pacing_controller_.Resume();
85 MaybeProcessPackets(Timestamp::MinusInfinity());
86 });
87 }
88
SetCongestionWindow(DataSize congestion_window_size)89 void TaskQueuePacedSender::SetCongestionWindow(
90 DataSize congestion_window_size) {
91 task_queue_.PostTask([this, congestion_window_size]() {
92 RTC_DCHECK_RUN_ON(&task_queue_);
93 pacing_controller_.SetCongestionWindow(congestion_window_size);
94 MaybeProcessPackets(Timestamp::MinusInfinity());
95 });
96 }
97
UpdateOutstandingData(DataSize outstanding_data)98 void TaskQueuePacedSender::UpdateOutstandingData(DataSize outstanding_data) {
99 if (task_queue_.IsCurrent()) {
100 RTC_DCHECK_RUN_ON(&task_queue_);
101 // Fast path since this can be called once per sent packet while on the
102 // task queue.
103 pacing_controller_.UpdateOutstandingData(outstanding_data);
104 return;
105 }
106
107 task_queue_.PostTask([this, outstanding_data]() {
108 RTC_DCHECK_RUN_ON(&task_queue_);
109 pacing_controller_.UpdateOutstandingData(outstanding_data);
110 MaybeProcessPackets(Timestamp::MinusInfinity());
111 });
112 }
113
SetPacingRates(DataRate pacing_rate,DataRate padding_rate)114 void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
115 DataRate padding_rate) {
116 task_queue_.PostTask([this, pacing_rate, padding_rate]() {
117 RTC_DCHECK_RUN_ON(&task_queue_);
118 pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
119 MaybeProcessPackets(Timestamp::MinusInfinity());
120 });
121 }
122
EnqueuePackets(std::vector<std::unique_ptr<RtpPacketToSend>> packets)123 void TaskQueuePacedSender::EnqueuePackets(
124 std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
125 #if RTC_TRACE_EVENTS_ENABLED
126 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
127 "TaskQueuePacedSender::EnqueuePackets");
128 for (auto& packet : packets) {
129 TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
130 "TaskQueuePacedSender::EnqueuePackets::Loop",
131 "sequence_number", packet->SequenceNumber(), "rtp_timestamp",
132 packet->Timestamp());
133 }
134 #endif
135
136 task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable {
137 RTC_DCHECK_RUN_ON(&task_queue_);
138 for (auto& packet : packets_) {
139 pacing_controller_.EnqueuePacket(std::move(packet));
140 }
141 MaybeProcessPackets(Timestamp::MinusInfinity());
142 });
143 }
144
SetAccountForAudioPackets(bool account_for_audio)145 void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
146 task_queue_.PostTask([this, account_for_audio]() {
147 RTC_DCHECK_RUN_ON(&task_queue_);
148 pacing_controller_.SetAccountForAudioPackets(account_for_audio);
149 });
150 }
151
SetIncludeOverhead()152 void TaskQueuePacedSender::SetIncludeOverhead() {
153 task_queue_.PostTask([this]() {
154 RTC_DCHECK_RUN_ON(&task_queue_);
155 pacing_controller_.SetIncludeOverhead();
156 });
157 }
158
SetTransportOverhead(DataSize overhead_per_packet)159 void TaskQueuePacedSender::SetTransportOverhead(DataSize overhead_per_packet) {
160 task_queue_.PostTask([this, overhead_per_packet]() {
161 RTC_DCHECK_RUN_ON(&task_queue_);
162 pacing_controller_.SetTransportOverhead(overhead_per_packet);
163 });
164 }
165
SetQueueTimeLimit(TimeDelta limit)166 void TaskQueuePacedSender::SetQueueTimeLimit(TimeDelta limit) {
167 task_queue_.PostTask([this, limit]() {
168 RTC_DCHECK_RUN_ON(&task_queue_);
169 pacing_controller_.SetQueueTimeLimit(limit);
170 MaybeProcessPackets(Timestamp::MinusInfinity());
171 });
172 }
173
ExpectedQueueTime() const174 TimeDelta TaskQueuePacedSender::ExpectedQueueTime() const {
175 return GetStats().expected_queue_time;
176 }
177
QueueSizeData() const178 DataSize TaskQueuePacedSender::QueueSizeData() const {
179 return GetStats().queue_size;
180 }
181
FirstSentPacketTime() const182 absl::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const {
183 return GetStats().first_sent_packet_time;
184 }
185
OldestPacketWaitTime() const186 TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
187 return GetStats().oldest_packet_wait_time;
188 }
189
OnStatsUpdated(const Stats & stats)190 void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) {
191 MutexLock lock(&stats_mutex_);
192 current_stats_ = stats;
193 }
194
MaybeProcessPackets(Timestamp scheduled_process_time)195 void TaskQueuePacedSender::MaybeProcessPackets(
196 Timestamp scheduled_process_time) {
197 RTC_DCHECK_RUN_ON(&task_queue_);
198
199 if (is_shutdown_) {
200 return;
201 }
202
203 // Normally, run ProcessPackets() only if this is the scheduled task.
204 // If it is not but it is already time to process and there either is
205 // no scheduled task or the schedule has shifted forward in time, run
206 // anyway and clear any schedule.
207 Timestamp next_process_time = pacing_controller_.NextSendTime();
208 const Timestamp now = clock_->CurrentTime();
209 const bool is_scheduled_call = next_process_time_ == scheduled_process_time;
210 if (is_scheduled_call) {
211 // Indicate no pending scheduled call.
212 next_process_time_ = Timestamp::MinusInfinity();
213 }
214 if (is_scheduled_call ||
215 (now >= next_process_time && (next_process_time_.IsInfinite() ||
216 next_process_time < next_process_time_))) {
217 pacing_controller_.ProcessPackets();
218 next_process_time = pacing_controller_.NextSendTime();
219 }
220
221 const TimeDelta min_sleep = pacing_controller_.IsProbing()
222 ? PacingController::kMinSleepTime
223 : hold_back_window_;
224 next_process_time = std::max(now + min_sleep, next_process_time);
225
226 TimeDelta sleep_time = next_process_time - now;
227 if (next_process_time_.IsMinusInfinity() ||
228 next_process_time <=
229 next_process_time_ - PacingController::kMinSleepTime) {
230 next_process_time_ = next_process_time;
231
232 task_queue_.PostDelayedTask(
233 [this, next_process_time]() { MaybeProcessPackets(next_process_time); },
234 sleep_time.ms<uint32_t>());
235 }
236
237 MaybeUpdateStats(false);
238 }
239
MaybeUpdateStats(bool is_scheduled_call)240 void TaskQueuePacedSender::MaybeUpdateStats(bool is_scheduled_call) {
241 if (is_shutdown_) {
242 if (is_scheduled_call) {
243 stats_update_scheduled_ = false;
244 }
245 return;
246 }
247
248 Timestamp now = clock_->CurrentTime();
249 if (is_scheduled_call) {
250 // Allow scheduled task to process packets to clear up an remaining debt
251 // level in an otherwise empty queue.
252 pacing_controller_.ProcessPackets();
253 } else {
254 if (now - last_stats_time_ < kMinTimeBetweenStatsUpdates) {
255 // Too frequent unscheduled stats update, return early.
256 return;
257 }
258 }
259
260 Stats new_stats;
261 new_stats.expected_queue_time = pacing_controller_.ExpectedQueueTime();
262 new_stats.first_sent_packet_time = pacing_controller_.FirstSentPacketTime();
263 new_stats.oldest_packet_wait_time = pacing_controller_.OldestPacketWaitTime();
264 new_stats.queue_size = pacing_controller_.QueueSizeData();
265 OnStatsUpdated(new_stats);
266
267 last_stats_time_ = now;
268
269 bool pacer_drained = pacing_controller_.QueueSizePackets() == 0 &&
270 pacing_controller_.CurrentBufferLevel().IsZero();
271
272 // If there's anything interesting to get from the pacer and this is a
273 // scheduled call (or no scheduled call in flight), post a new scheduled stats
274 // update.
275 if (!pacer_drained) {
276 if (!stats_update_scheduled_) {
277 // There is no pending delayed task to update stats, add one.
278 // Treat this call as being scheduled in order to bootstrap scheduling
279 // loop.
280 stats_update_scheduled_ = true;
281 is_scheduled_call = true;
282 }
283
284 // Only if on the scheduled call loop do we want to schedule a new delayed
285 // task.
286 if (is_scheduled_call) {
287 task_queue_.PostDelayedTask(
288 [this]() {
289 RTC_DCHECK_RUN_ON(&task_queue_);
290 MaybeUpdateStats(true);
291 },
292 kMaxTimeBetweenStatsUpdates.ms<uint32_t>());
293 }
294 } else if (is_scheduled_call) {
295 // This is a scheduled call, signing out since there's nothing interesting
296 // left to check.
297 stats_update_scheduled_ = false;
298 }
299 }
300
GetStats() const301 TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const {
302 MutexLock lock(&stats_mutex_);
303 return current_stats_;
304 }
305
306 } // namespace webrtc
307