1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/delayed_task_manager.h"
6
7 #include <algorithm>
8
9 #include "base/check.h"
10 #include "base/feature_list.h"
11 #include "base/functional/bind.h"
12 #include "base/task/common/checked_lock.h"
13 #include "base/task/sequenced_task_runner.h"
14 #include "base/task/task_features.h"
15 #include "base/task/task_runner.h"
16 #include "base/task/thread_pool/task.h"
17 #include "third_party/abseil-cpp/absl/types/optional.h"
18
19 namespace base {
20 namespace internal {
21
22 DelayedTaskManager::DelayedTask::DelayedTask() = default;
23
DelayedTask(Task task,PostTaskNowCallback callback,scoped_refptr<TaskRunner> task_runner)24 DelayedTaskManager::DelayedTask::DelayedTask(
25 Task task,
26 PostTaskNowCallback callback,
27 scoped_refptr<TaskRunner> task_runner)
28 : task(std::move(task)),
29 callback(std::move(callback)),
30 task_runner(std::move(task_runner)) {}
31
32 DelayedTaskManager::DelayedTask::DelayedTask(
33 DelayedTaskManager::DelayedTask&& other) = default;
34
35 DelayedTaskManager::DelayedTask::~DelayedTask() = default;
36
37 DelayedTaskManager::DelayedTask& DelayedTaskManager::DelayedTask::operator=(
38 DelayedTaskManager::DelayedTask&& other) = default;
39
operator >(const DelayedTask & other) const40 bool DelayedTaskManager::DelayedTask::operator>(
41 const DelayedTask& other) const {
42 TimeTicks latest_delayed_run_time = task.latest_delayed_run_time();
43 TimeTicks other_latest_delayed_run_time =
44 other.task.latest_delayed_run_time();
45 return std::tie(latest_delayed_run_time, task.sequence_num) >
46 std::tie(other_latest_delayed_run_time, other.task.sequence_num);
47 }
48
DelayedTaskManager(const TickClock * tick_clock)49 DelayedTaskManager::DelayedTaskManager(const TickClock* tick_clock)
50 : process_ripe_tasks_closure_(
51 BindRepeating(&DelayedTaskManager::ProcessRipeTasks,
52 Unretained(this))),
53 schedule_process_ripe_tasks_closure_(BindRepeating(
54 &DelayedTaskManager::ScheduleProcessRipeTasksOnServiceThread,
55 Unretained(this))),
56 tick_clock_(tick_clock) {
57 DETACH_FROM_SEQUENCE(sequence_checker_);
58 DCHECK(tick_clock_);
59 }
60
~DelayedTaskManager()61 DelayedTaskManager::~DelayedTaskManager() {
62 delayed_task_handle_.CancelTask();
63 }
64
Start(scoped_refptr<SequencedTaskRunner> service_thread_task_runner)65 void DelayedTaskManager::Start(
66 scoped_refptr<SequencedTaskRunner> service_thread_task_runner) {
67 DCHECK(service_thread_task_runner);
68
69 TimeTicks process_ripe_tasks_time;
70 subtle::DelayPolicy delay_policy;
71 {
72 CheckedAutoLock auto_lock(queue_lock_);
73 DCHECK(!service_thread_task_runner_);
74 service_thread_task_runner_ = std::move(service_thread_task_runner);
75 align_wake_ups_ = FeatureList::IsEnabled(kAlignWakeUps);
76 max_precise_delay = kMaxPreciseDelay.Get();
77 std::tie(process_ripe_tasks_time, delay_policy) =
78 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
79 }
80 if (!process_ripe_tasks_time.is_max()) {
81 service_thread_task_runner_->PostTask(FROM_HERE,
82 schedule_process_ripe_tasks_closure_);
83 }
84 }
85
AddDelayedTask(Task task,PostTaskNowCallback post_task_now_callback,scoped_refptr<TaskRunner> task_runner)86 void DelayedTaskManager::AddDelayedTask(
87 Task task,
88 PostTaskNowCallback post_task_now_callback,
89 scoped_refptr<TaskRunner> task_runner) {
90 DCHECK(task.task);
91 DCHECK(!task.delayed_run_time.is_null());
92 DCHECK(!task.queue_time.is_null());
93
94 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
95 // for details.
96 CHECK(task.task);
97 TimeTicks process_ripe_tasks_time;
98 subtle::DelayPolicy delay_policy;
99 {
100 CheckedAutoLock auto_lock(queue_lock_);
101 task.delay_policy = subtle::MaybeOverrideDelayPolicy(
102 task.delay_policy, task.delayed_run_time - task.queue_time,
103 max_precise_delay);
104
105 auto [old_process_ripe_tasks_time, old_delay_policy] =
106 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
107 delayed_task_queue_.insert(DelayedTask(std::move(task),
108 std::move(post_task_now_callback),
109 std::move(task_runner)));
110 // Not started or already shutdown.
111 if (service_thread_task_runner_ == nullptr)
112 return;
113
114 std::tie(process_ripe_tasks_time, delay_policy) =
115 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
116 // The next invocation of ProcessRipeTasks() doesn't need to change.
117 if (old_process_ripe_tasks_time == process_ripe_tasks_time &&
118 old_delay_policy == delay_policy) {
119 return;
120 }
121 }
122 if (!process_ripe_tasks_time.is_max()) {
123 service_thread_task_runner_->PostTask(FROM_HERE,
124 schedule_process_ripe_tasks_closure_);
125 }
126 }
127
ProcessRipeTasks()128 void DelayedTaskManager::ProcessRipeTasks() {
129 std::vector<DelayedTask> ripe_delayed_tasks;
130 TimeTicks process_ripe_tasks_time;
131
132 {
133 CheckedAutoLock auto_lock(queue_lock_);
134
135 // Already shutdown.
136 if (!service_thread_task_runner_)
137 return;
138
139 const TimeTicks now = tick_clock_->NowTicks();
140 // A delayed task is ripe if it reached its delayed run time or if it is
141 // canceled. If it is canceled, schedule its deletion on the correct
142 // sequence now rather than in the future, to minimize CPU wake ups and save
143 // power.
144 while (!delayed_task_queue_.empty() &&
145 (delayed_task_queue_.top().task.earliest_delayed_run_time() <= now ||
146 !delayed_task_queue_.top().task.task.MaybeValid())) {
147 // The const_cast on top is okay since the DelayedTask is
148 // transactionally being popped from |delayed_task_queue_| right after
149 // and the move doesn't alter the sort order.
150 ripe_delayed_tasks.push_back(
151 std::move(const_cast<DelayedTask&>(delayed_task_queue_.top())));
152 delayed_task_queue_.pop();
153 }
154 std::tie(process_ripe_tasks_time, std::ignore) =
155 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
156 }
157 if (!process_ripe_tasks_time.is_max()) {
158 if (service_thread_task_runner_->RunsTasksInCurrentSequence()) {
159 ScheduleProcessRipeTasksOnServiceThread();
160 } else {
161 // ProcessRipeTasks may be called on another thread under tests.
162 service_thread_task_runner_->PostTask(
163 FROM_HERE, schedule_process_ripe_tasks_closure_);
164 }
165 }
166
167 for (auto& delayed_task : ripe_delayed_tasks) {
168 std::move(delayed_task.callback).Run(std::move(delayed_task.task));
169 }
170 }
171
NextScheduledRunTime() const172 absl::optional<TimeTicks> DelayedTaskManager::NextScheduledRunTime() const {
173 CheckedAutoLock auto_lock(queue_lock_);
174 if (delayed_task_queue_.empty())
175 return absl::nullopt;
176 return delayed_task_queue_.top().task.delayed_run_time;
177 }
178
TopTaskDelayPolicyForTesting() const179 subtle::DelayPolicy DelayedTaskManager::TopTaskDelayPolicyForTesting() const {
180 CheckedAutoLock auto_lock(queue_lock_);
181 return delayed_task_queue_.top().task.delay_policy;
182 }
183
Shutdown()184 void DelayedTaskManager::Shutdown() {
185 scoped_refptr<SequencedTaskRunner> service_thread_task_runner;
186
187 {
188 CheckedAutoLock auto_lock(queue_lock_);
189 // Prevent delayed tasks from being posted or processed after this.
190 service_thread_task_runner = service_thread_task_runner_;
191 }
192
193 if (service_thread_task_runner) {
194 // Cancel our delayed task on the service thread. This cannot be done from
195 // ~DelayedTaskManager because the delayed task handle is sequence-affine.
196 service_thread_task_runner->PostTask(
197 FROM_HERE,
198 base::BindOnce(
199 [](DelayedTaskManager* manager) {
200 DCHECK_CALLED_ON_VALID_SEQUENCE(manager->sequence_checker_);
201 manager->delayed_task_handle_.CancelTask();
202 },
203 // Unretained() is safe because the caller must flush tasks posted
204 // to the service thread before deleting `this`.
205 Unretained(this)));
206 }
207 }
208
209 std::pair<TimeTicks, subtle::DelayPolicy> DelayedTaskManager::
GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired()210 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired() {
211 queue_lock_.AssertAcquired();
212 if (delayed_task_queue_.empty()) {
213 return std::make_pair(TimeTicks::Max(),
214 subtle::DelayPolicy::kFlexibleNoSooner);
215 }
216
217 const DelayedTask& ripest_delayed_task = delayed_task_queue_.top();
218 subtle::DelayPolicy delay_policy = ripest_delayed_task.task.delay_policy;
219 return std::make_pair(ripest_delayed_task.task.delayed_run_time,
220 delay_policy);
221 }
222
ScheduleProcessRipeTasksOnServiceThread()223 void DelayedTaskManager::ScheduleProcessRipeTasksOnServiceThread() {
224 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
225
226 TimeTicks process_ripe_tasks_time;
227 subtle::DelayPolicy delay_policy;
228 {
229 CheckedAutoLock auto_lock(queue_lock_);
230 std::tie(process_ripe_tasks_time, delay_policy) =
231 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
232 }
233 DCHECK(!process_ripe_tasks_time.is_null());
234 if (process_ripe_tasks_time.is_max())
235 return;
236 delayed_task_handle_.CancelTask();
237 delayed_task_handle_ =
238 service_thread_task_runner_->PostCancelableDelayedTaskAt(
239 subtle::PostDelayedTaskPassKey(), FROM_HERE,
240 process_ripe_tasks_closure_, process_ripe_tasks_time, delay_policy);
241 }
242
243 } // namespace internal
244 } // namespace base
245