1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/delayed_task_manager.h"
6
7 #include <algorithm>
8
9 #include "base/check.h"
10 #include "base/feature_list.h"
11 #include "base/functional/bind.h"
12 #include "base/task/common/checked_lock.h"
13 #include "base/task/sequenced_task_runner.h"
14 #include "base/task/task_features.h"
15 #include "base/task/task_runner.h"
16 #include "base/task/thread_pool/task.h"
17 #include "third_party/abseil-cpp/absl/types/optional.h"
18
19 namespace base {
20 namespace internal {
21
22 DelayedTaskManager::DelayedTask::DelayedTask() = default;
23
DelayedTask(Task task,PostTaskNowCallback callback,scoped_refptr<TaskRunner> task_runner)24 DelayedTaskManager::DelayedTask::DelayedTask(
25 Task task,
26 PostTaskNowCallback callback,
27 scoped_refptr<TaskRunner> task_runner)
28 : task(std::move(task)),
29 callback(std::move(callback)),
30 task_runner(std::move(task_runner)) {}
31
32 DelayedTaskManager::DelayedTask::DelayedTask(
33 DelayedTaskManager::DelayedTask&& other) = default;
34
35 DelayedTaskManager::DelayedTask::~DelayedTask() = default;
36
37 DelayedTaskManager::DelayedTask& DelayedTaskManager::DelayedTask::operator=(
38 DelayedTaskManager::DelayedTask&& other) = default;
39
operator >(const DelayedTask & other) const40 bool DelayedTaskManager::DelayedTask::operator>(
41 const DelayedTask& other) const {
42 TimeTicks latest_delayed_run_time = task.latest_delayed_run_time();
43 TimeTicks other_latest_delayed_run_time =
44 other.task.latest_delayed_run_time();
45 return std::tie(latest_delayed_run_time, task.sequence_num) >
46 std::tie(other_latest_delayed_run_time, other.task.sequence_num);
47 }
48
DelayedTaskManager(const TickClock * tick_clock)49 DelayedTaskManager::DelayedTaskManager(const TickClock* tick_clock)
50 : process_ripe_tasks_closure_(
51 BindRepeating(&DelayedTaskManager::ProcessRipeTasks,
52 Unretained(this))),
53 schedule_process_ripe_tasks_closure_(BindRepeating(
54 &DelayedTaskManager::ScheduleProcessRipeTasksOnServiceThread,
55 Unretained(this))),
56 tick_clock_(tick_clock) {
57 DETACH_FROM_SEQUENCE(sequence_checker_);
58 DCHECK(tick_clock_);
59 }
60
~DelayedTaskManager()61 DelayedTaskManager::~DelayedTaskManager() {
62 delayed_task_handle_.CancelTask();
63 }
64
Start(scoped_refptr<SequencedTaskRunner> service_thread_task_runner)65 void DelayedTaskManager::Start(
66 scoped_refptr<SequencedTaskRunner> service_thread_task_runner) {
67 DCHECK(service_thread_task_runner);
68
69 TimeTicks process_ripe_tasks_time;
70 subtle::DelayPolicy delay_policy;
71 {
72 CheckedAutoLock auto_lock(queue_lock_);
73 DCHECK(!service_thread_task_runner_);
74 service_thread_task_runner_ = std::move(service_thread_task_runner);
75 align_wake_ups_ = FeatureList::IsEnabled(kAlignWakeUps);
76 std::tie(process_ripe_tasks_time, delay_policy) =
77 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
78 }
79 if (!process_ripe_tasks_time.is_max()) {
80 service_thread_task_runner_->PostTask(FROM_HERE,
81 schedule_process_ripe_tasks_closure_);
82 }
83 }
84
AddDelayedTask(Task task,PostTaskNowCallback post_task_now_callback,scoped_refptr<TaskRunner> task_runner)85 void DelayedTaskManager::AddDelayedTask(
86 Task task,
87 PostTaskNowCallback post_task_now_callback,
88 scoped_refptr<TaskRunner> task_runner) {
89 DCHECK(task.task);
90 DCHECK(!task.delayed_run_time.is_null());
91 DCHECK(!task.queue_time.is_null());
92
93 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
94 // for details.
95 CHECK(task.task);
96 TimeTicks process_ripe_tasks_time;
97 subtle::DelayPolicy delay_policy;
98 {
99 CheckedAutoLock auto_lock(queue_lock_);
100 auto [old_process_ripe_tasks_time, old_delay_policy] =
101 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
102 delayed_task_queue_.insert(DelayedTask(std::move(task),
103 std::move(post_task_now_callback),
104 std::move(task_runner)));
105 // Not started or already shutdown.
106 if (service_thread_task_runner_ == nullptr)
107 return;
108
109 std::tie(process_ripe_tasks_time, delay_policy) =
110 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
111 // The next invocation of ProcessRipeTasks() doesn't need to change.
112 if (old_process_ripe_tasks_time == process_ripe_tasks_time &&
113 old_delay_policy == delay_policy) {
114 return;
115 }
116 }
117 if (!process_ripe_tasks_time.is_max()) {
118 service_thread_task_runner_->PostTask(FROM_HERE,
119 schedule_process_ripe_tasks_closure_);
120 }
121 }
122
ProcessRipeTasks()123 void DelayedTaskManager::ProcessRipeTasks() {
124 std::vector<DelayedTask> ripe_delayed_tasks;
125 TimeTicks process_ripe_tasks_time;
126
127 {
128 CheckedAutoLock auto_lock(queue_lock_);
129
130 // Already shutdown.
131 if (!service_thread_task_runner_)
132 return;
133
134 const TimeTicks now = tick_clock_->NowTicks();
135 // A delayed task is ripe if it reached its delayed run time or if it is
136 // canceled. If it is canceled, schedule its deletion on the correct
137 // sequence now rather than in the future, to minimize CPU wake ups and save
138 // power.
139 while (!delayed_task_queue_.empty() &&
140 (delayed_task_queue_.top().task.earliest_delayed_run_time() <= now ||
141 !delayed_task_queue_.top().task.task.MaybeValid())) {
142 // The const_cast on top is okay since the DelayedTask is
143 // transactionally being popped from |delayed_task_queue_| right after
144 // and the move doesn't alter the sort order.
145 ripe_delayed_tasks.push_back(
146 std::move(const_cast<DelayedTask&>(delayed_task_queue_.top())));
147 delayed_task_queue_.pop();
148 }
149 std::tie(process_ripe_tasks_time, std::ignore) =
150 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
151 }
152 if (!process_ripe_tasks_time.is_max()) {
153 if (service_thread_task_runner_->RunsTasksInCurrentSequence()) {
154 ScheduleProcessRipeTasksOnServiceThread();
155 } else {
156 // ProcessRipeTasks may be called on another thread under tests.
157 service_thread_task_runner_->PostTask(
158 FROM_HERE, schedule_process_ripe_tasks_closure_);
159 }
160 }
161
162 for (auto& delayed_task : ripe_delayed_tasks) {
163 std::move(delayed_task.callback).Run(std::move(delayed_task.task));
164 }
165 }
166
NextScheduledRunTime() const167 absl::optional<TimeTicks> DelayedTaskManager::NextScheduledRunTime() const {
168 CheckedAutoLock auto_lock(queue_lock_);
169 if (delayed_task_queue_.empty())
170 return absl::nullopt;
171 return delayed_task_queue_.top().task.delayed_run_time;
172 }
173
TopTaskDelayPolicyForTesting() const174 subtle::DelayPolicy DelayedTaskManager::TopTaskDelayPolicyForTesting() const {
175 CheckedAutoLock auto_lock(queue_lock_);
176 return delayed_task_queue_.top().task.delay_policy;
177 }
178
Shutdown()179 void DelayedTaskManager::Shutdown() {
180 scoped_refptr<SequencedTaskRunner> service_thread_task_runner;
181
182 {
183 CheckedAutoLock auto_lock(queue_lock_);
184 // Prevent delayed tasks from being posted or processed after this.
185 service_thread_task_runner = service_thread_task_runner_;
186 }
187
188 if (service_thread_task_runner) {
189 // Cancel our delayed task on the service thread. This cannot be done from
190 // ~DelayedTaskManager because the delayed task handle is sequence-affine.
191 service_thread_task_runner->PostTask(
192 FROM_HERE,
193 base::BindOnce(
194 [](DelayedTaskManager* manager) {
195 DCHECK_CALLED_ON_VALID_SEQUENCE(manager->sequence_checker_);
196 manager->delayed_task_handle_.CancelTask();
197 },
198 // Unretained() is safe because the caller must flush tasks posted
199 // to the service thread before deleting `this`.
200 Unretained(this)));
201 }
202 }
203
204 std::pair<TimeTicks, subtle::DelayPolicy> DelayedTaskManager::
GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired()205 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired() {
206 queue_lock_.AssertAcquired();
207 if (delayed_task_queue_.empty()) {
208 return std::make_pair(TimeTicks::Max(),
209 subtle::DelayPolicy::kFlexibleNoSooner);
210 }
211
212 const DelayedTask& ripest_delayed_task = delayed_task_queue_.top();
213 subtle::DelayPolicy delay_policy = ripest_delayed_task.task.delay_policy;
214 return std::make_pair(ripest_delayed_task.task.delayed_run_time,
215 delay_policy);
216 }
217
ScheduleProcessRipeTasksOnServiceThread()218 void DelayedTaskManager::ScheduleProcessRipeTasksOnServiceThread() {
219 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
220
221 TimeTicks process_ripe_tasks_time;
222 subtle::DelayPolicy delay_policy;
223 {
224 CheckedAutoLock auto_lock(queue_lock_);
225 std::tie(process_ripe_tasks_time, delay_policy) =
226 GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
227 }
228 DCHECK(!process_ripe_tasks_time.is_null());
229 if (process_ripe_tasks_time.is_max())
230 return;
231 delayed_task_handle_.CancelTask();
232 delayed_task_handle_ =
233 service_thread_task_runner_->PostCancelableDelayedTaskAt(
234 subtle::PostDelayedTaskPassKey(), FROM_HERE,
235 process_ripe_tasks_closure_, process_ripe_tasks_time, delay_policy);
236 }
237
238 } // namespace internal
239 } // namespace base
240