• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/delayed_task_manager.h"
6 
7 #include <algorithm>
8 
9 #include "base/check.h"
10 #include "base/feature_list.h"
11 #include "base/functional/bind.h"
12 #include "base/task/common/checked_lock.h"
13 #include "base/task/sequenced_task_runner.h"
14 #include "base/task/task_features.h"
15 #include "base/task/task_runner.h"
16 #include "base/task/thread_pool/task.h"
17 #include "third_party/abseil-cpp/absl/types/optional.h"
18 
19 namespace base {
20 namespace internal {
21 
22 DelayedTaskManager::DelayedTask::DelayedTask() = default;
23 
DelayedTask(Task task,PostTaskNowCallback callback,scoped_refptr<TaskRunner> task_runner)24 DelayedTaskManager::DelayedTask::DelayedTask(
25     Task task,
26     PostTaskNowCallback callback,
27     scoped_refptr<TaskRunner> task_runner)
28     : task(std::move(task)),
29       callback(std::move(callback)),
30       task_runner(std::move(task_runner)) {}
31 
32 DelayedTaskManager::DelayedTask::DelayedTask(
33     DelayedTaskManager::DelayedTask&& other) = default;
34 
35 DelayedTaskManager::DelayedTask::~DelayedTask() = default;
36 
37 DelayedTaskManager::DelayedTask& DelayedTaskManager::DelayedTask::operator=(
38     DelayedTaskManager::DelayedTask&& other) = default;
39 
operator >(const DelayedTask & other) const40 bool DelayedTaskManager::DelayedTask::operator>(
41     const DelayedTask& other) const {
42   TimeTicks latest_delayed_run_time = task.latest_delayed_run_time();
43   TimeTicks other_latest_delayed_run_time =
44       other.task.latest_delayed_run_time();
45   return std::tie(latest_delayed_run_time, task.sequence_num) >
46          std::tie(other_latest_delayed_run_time, other.task.sequence_num);
47 }
48 
DelayedTaskManager(const TickClock * tick_clock)49 DelayedTaskManager::DelayedTaskManager(const TickClock* tick_clock)
50     : process_ripe_tasks_closure_(
51           BindRepeating(&DelayedTaskManager::ProcessRipeTasks,
52                         Unretained(this))),
53       schedule_process_ripe_tasks_closure_(BindRepeating(
54           &DelayedTaskManager::ScheduleProcessRipeTasksOnServiceThread,
55           Unretained(this))),
56       tick_clock_(tick_clock) {
57   DETACH_FROM_SEQUENCE(sequence_checker_);
58   DCHECK(tick_clock_);
59 }
60 
~DelayedTaskManager()61 DelayedTaskManager::~DelayedTaskManager() {
62   delayed_task_handle_.CancelTask();
63 }
64 
Start(scoped_refptr<SequencedTaskRunner> service_thread_task_runner)65 void DelayedTaskManager::Start(
66     scoped_refptr<SequencedTaskRunner> service_thread_task_runner) {
67   DCHECK(service_thread_task_runner);
68 
69   TimeTicks process_ripe_tasks_time;
70   subtle::DelayPolicy delay_policy;
71   {
72     CheckedAutoLock auto_lock(queue_lock_);
73     DCHECK(!service_thread_task_runner_);
74     service_thread_task_runner_ = std::move(service_thread_task_runner);
75     align_wake_ups_ = FeatureList::IsEnabled(kAlignWakeUps);
76     std::tie(process_ripe_tasks_time, delay_policy) =
77         GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
78   }
79   if (!process_ripe_tasks_time.is_max()) {
80     service_thread_task_runner_->PostTask(FROM_HERE,
81                                           schedule_process_ripe_tasks_closure_);
82   }
83 }
84 
AddDelayedTask(Task task,PostTaskNowCallback post_task_now_callback,scoped_refptr<TaskRunner> task_runner)85 void DelayedTaskManager::AddDelayedTask(
86     Task task,
87     PostTaskNowCallback post_task_now_callback,
88     scoped_refptr<TaskRunner> task_runner) {
89   DCHECK(task.task);
90   DCHECK(!task.delayed_run_time.is_null());
91   DCHECK(!task.queue_time.is_null());
92 
93   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
94   // for details.
95   CHECK(task.task);
96   TimeTicks process_ripe_tasks_time;
97   subtle::DelayPolicy delay_policy;
98   {
99     CheckedAutoLock auto_lock(queue_lock_);
100     auto [old_process_ripe_tasks_time, old_delay_policy] =
101         GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
102     delayed_task_queue_.insert(DelayedTask(std::move(task),
103                                            std::move(post_task_now_callback),
104                                            std::move(task_runner)));
105     // Not started or already shutdown.
106     if (service_thread_task_runner_ == nullptr)
107       return;
108 
109     std::tie(process_ripe_tasks_time, delay_policy) =
110         GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
111     // The next invocation of ProcessRipeTasks() doesn't need to change.
112     if (old_process_ripe_tasks_time == process_ripe_tasks_time &&
113         old_delay_policy == delay_policy) {
114       return;
115     }
116   }
117   if (!process_ripe_tasks_time.is_max()) {
118     service_thread_task_runner_->PostTask(FROM_HERE,
119                                           schedule_process_ripe_tasks_closure_);
120   }
121 }
122 
ProcessRipeTasks()123 void DelayedTaskManager::ProcessRipeTasks() {
124   std::vector<DelayedTask> ripe_delayed_tasks;
125   TimeTicks process_ripe_tasks_time;
126 
127   {
128     CheckedAutoLock auto_lock(queue_lock_);
129 
130     // Already shutdown.
131     if (!service_thread_task_runner_)
132       return;
133 
134     const TimeTicks now = tick_clock_->NowTicks();
135     // A delayed task is ripe if it reached its delayed run time or if it is
136     // canceled. If it is canceled, schedule its deletion on the correct
137     // sequence now rather than in the future, to minimize CPU wake ups and save
138     // power.
139     while (!delayed_task_queue_.empty() &&
140            (delayed_task_queue_.top().task.earliest_delayed_run_time() <= now ||
141             !delayed_task_queue_.top().task.task.MaybeValid())) {
142       // The const_cast on top is okay since the DelayedTask is
143       // transactionally being popped from |delayed_task_queue_| right after
144       // and the move doesn't alter the sort order.
145       ripe_delayed_tasks.push_back(
146           std::move(const_cast<DelayedTask&>(delayed_task_queue_.top())));
147       delayed_task_queue_.pop();
148     }
149     std::tie(process_ripe_tasks_time, std::ignore) =
150         GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
151   }
152   if (!process_ripe_tasks_time.is_max()) {
153     if (service_thread_task_runner_->RunsTasksInCurrentSequence()) {
154       ScheduleProcessRipeTasksOnServiceThread();
155     } else {
156       // ProcessRipeTasks may be called on another thread under tests.
157       service_thread_task_runner_->PostTask(
158           FROM_HERE, schedule_process_ripe_tasks_closure_);
159     }
160   }
161 
162   for (auto& delayed_task : ripe_delayed_tasks) {
163     std::move(delayed_task.callback).Run(std::move(delayed_task.task));
164   }
165 }
166 
NextScheduledRunTime() const167 absl::optional<TimeTicks> DelayedTaskManager::NextScheduledRunTime() const {
168   CheckedAutoLock auto_lock(queue_lock_);
169   if (delayed_task_queue_.empty())
170     return absl::nullopt;
171   return delayed_task_queue_.top().task.delayed_run_time;
172 }
173 
TopTaskDelayPolicyForTesting() const174 subtle::DelayPolicy DelayedTaskManager::TopTaskDelayPolicyForTesting() const {
175   CheckedAutoLock auto_lock(queue_lock_);
176   return delayed_task_queue_.top().task.delay_policy;
177 }
178 
Shutdown()179 void DelayedTaskManager::Shutdown() {
180   scoped_refptr<SequencedTaskRunner> service_thread_task_runner;
181 
182   {
183     CheckedAutoLock auto_lock(queue_lock_);
184     // Prevent delayed tasks from being posted or processed after this.
185     service_thread_task_runner = service_thread_task_runner_;
186   }
187 
188   if (service_thread_task_runner) {
189     // Cancel our delayed task on the service thread. This cannot be done from
190     // ~DelayedTaskManager because the delayed task handle is sequence-affine.
191     service_thread_task_runner->PostTask(
192         FROM_HERE,
193         base::BindOnce(
194             [](DelayedTaskManager* manager) {
195               DCHECK_CALLED_ON_VALID_SEQUENCE(manager->sequence_checker_);
196               manager->delayed_task_handle_.CancelTask();
197             },
198             // Unretained() is safe because the caller must flush tasks posted
199             // to the service thread before deleting `this`.
200             Unretained(this)));
201   }
202 }
203 
204 std::pair<TimeTicks, subtle::DelayPolicy> DelayedTaskManager::
GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired()205     GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired() {
206   queue_lock_.AssertAcquired();
207   if (delayed_task_queue_.empty()) {
208     return std::make_pair(TimeTicks::Max(),
209                           subtle::DelayPolicy::kFlexibleNoSooner);
210   }
211 
212   const DelayedTask& ripest_delayed_task = delayed_task_queue_.top();
213   subtle::DelayPolicy delay_policy = ripest_delayed_task.task.delay_policy;
214   return std::make_pair(ripest_delayed_task.task.delayed_run_time,
215                         delay_policy);
216 }
217 
ScheduleProcessRipeTasksOnServiceThread()218 void DelayedTaskManager::ScheduleProcessRipeTasksOnServiceThread() {
219   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
220 
221   TimeTicks process_ripe_tasks_time;
222   subtle::DelayPolicy delay_policy;
223   {
224     CheckedAutoLock auto_lock(queue_lock_);
225     std::tie(process_ripe_tasks_time, delay_policy) =
226         GetTimeAndDelayPolicyToScheduleProcessRipeTasksLockRequired();
227   }
228   DCHECK(!process_ripe_tasks_time.is_null());
229   if (process_ripe_tasks_time.is_max())
230     return;
231   delayed_task_handle_.CancelTask();
232   delayed_task_handle_ =
233       service_thread_task_runner_->PostCancelableDelayedTaskAt(
234           subtle::PostDelayedTaskPassKey(), FROM_HERE,
235           process_ripe_tasks_closure_, process_ripe_tasks_time, delay_policy);
236 }
237 
238 }  // namespace internal
239 }  // namespace base
240