1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/sequence.h"
6
7 #include <utility>
8
9 #include "base/check.h"
10 #include "base/critical_closure.h"
11 #include "base/feature_list.h"
12 #include "base/functional/bind.h"
13 #include "base/memory/ptr_util.h"
14 #include "base/task/task_features.h"
15 #include "base/time/time.h"
16
17 namespace base {
18 namespace internal {
19
20 namespace {
21
22 // Asserts that a lock is acquired and annotates the scope such that
23 // base/thread_annotations.h can recognize that the lock is acquired.
24 class SCOPED_LOCKABLE AnnotateLockAcquired {
25 public:
26 explicit AnnotateLockAcquired(const CheckedLock& lock)
EXCLUSIVE_LOCK_FUNCTION(lock)27 EXCLUSIVE_LOCK_FUNCTION(lock)
28 : acquired_lock_(lock) {
29 acquired_lock_->AssertAcquired();
30 }
31
UNLOCK_FUNCTION()32 ~AnnotateLockAcquired() UNLOCK_FUNCTION() {
33 acquired_lock_->AssertAcquired();
34 }
35
36 private:
37 const raw_ref<const CheckedLock> acquired_lock_;
38 };
39
MaybeMakeCriticalClosure(TaskShutdownBehavior shutdown_behavior,Task & task)40 void MaybeMakeCriticalClosure(TaskShutdownBehavior shutdown_behavior,
41 Task& task) {
42 switch (shutdown_behavior) {
43 case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
44 // Nothing to do.
45 break;
46 case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
47 // MakeCriticalClosure() is arguably useful for SKIP_ON_SHUTDOWN, possibly
48 // in combination with is_immediate=false. However, SKIP_ON_SHUTDOWN is
49 // the default and hence the theoretical benefits don't warrant the
50 // performance implications.
51 break;
52 case TaskShutdownBehavior::BLOCK_SHUTDOWN:
53 task.task =
54 MakeCriticalClosure(task.posted_from, std::move(task.task),
55 /*is_immediate=*/task.delayed_run_time.is_null());
56 break;
57 }
58 }
59
60 } // namespace
61
Transaction(Sequence * sequence)62 Sequence::Transaction::Transaction(Sequence* sequence)
63 : TaskSource::Transaction(sequence) {}
64
65 Sequence::Transaction::Transaction(Sequence::Transaction&& other) = default;
66
67 Sequence::Transaction::~Transaction() = default;
68
WillPushImmediateTask()69 bool Sequence::Transaction::WillPushImmediateTask() {
70 // In a Transaction.
71 AnnotateLockAcquired annotate(sequence()->lock_);
72
73 bool was_immediate =
74 sequence()->is_immediate_.exchange(true, std::memory_order_relaxed);
75 return !was_immediate;
76 }
77
PushImmediateTask(Task task)78 void Sequence::Transaction::PushImmediateTask(Task task) {
79 // In a Transaction.
80 AnnotateLockAcquired annotate(sequence()->lock_);
81
82 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
83 // for details.
84 CHECK(task.task);
85 DCHECK(!task.queue_time.is_null());
86 DCHECK(sequence()->is_immediate_.load(std::memory_order_relaxed));
87
88 bool was_unretained = sequence()->IsEmpty() && !sequence()->has_worker_;
89 bool queue_was_empty = sequence()->queue_.empty();
90
91 MaybeMakeCriticalClosure(sequence()->traits_.shutdown_behavior(), task);
92
93 sequence()->queue_.push(std::move(task));
94
95 if (queue_was_empty)
96 sequence()->UpdateReadyTimes();
97
98 // AddRef() matched by manual Release() when the sequence has no more tasks
99 // to run (in DidProcessTask() or Clear()).
100 if (was_unretained && sequence()->task_runner())
101 sequence()->task_runner()->AddRef();
102 }
103
PushDelayedTask(Task task)104 bool Sequence::Transaction::PushDelayedTask(Task task) {
105 // In a Transaction.
106 AnnotateLockAcquired annotate(sequence()->lock_);
107
108 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
109 // for details.
110 CHECK(task.task);
111 DCHECK(!task.queue_time.is_null());
112 DCHECK(!task.delayed_run_time.is_null());
113
114 bool top_will_change = sequence()->DelayedSortKeyWillChange(task);
115 bool was_empty = sequence()->IsEmpty();
116
117 MaybeMakeCriticalClosure(sequence()->traits_.shutdown_behavior(), task);
118
119 sequence()->delayed_queue_.insert(std::move(task));
120
121 if (sequence()->queue_.empty())
122 sequence()->UpdateReadyTimes();
123
124 // AddRef() matched by manual Release() when the sequence has no more tasks
125 // to run (in DidProcessTask() or Clear()).
126 if (was_empty && !sequence()->has_worker_ && sequence()->task_runner())
127 sequence()->task_runner()->AddRef();
128
129 return top_will_change;
130 }
131
132 // Delayed tasks are ordered by latest_delayed_run_time(). The top task may
133 // not be the first task eligible to run, but tasks will always become ripe
134 // before their latest_delayed_run_time().
operator ()(const Task & lhs,const Task & rhs) const135 bool Sequence::DelayedTaskGreater::operator()(const Task& lhs,
136 const Task& rhs) const {
137 TimeTicks lhs_latest_delayed_run_time = lhs.latest_delayed_run_time();
138 TimeTicks rhs_latest_delayed_run_time = rhs.latest_delayed_run_time();
139 return std::tie(lhs_latest_delayed_run_time, lhs.sequence_num) >
140 std::tie(rhs_latest_delayed_run_time, rhs.sequence_num);
141 }
142
WillRunTask()143 TaskSource::RunStatus Sequence::WillRunTask() {
144 // There should never be a second call to WillRunTask() before DidProcessTask
145 // since the RunStatus is always marked a saturated.
146 DCHECK(!has_worker_);
147
148 // It's ok to access |has_worker_| outside of a Transaction since
149 // WillRunTask() is externally synchronized, always called in sequence with
150 // TakeTask() and DidProcessTask() and only called if HasReadyTasks(), which
151 // means it won't race with Push[Immediate/Delayed]Task().
152 has_worker_ = true;
153 return RunStatus::kAllowedSaturated;
154 }
155
OnBecomeReady()156 bool Sequence::OnBecomeReady() {
157 DCHECK(!has_worker_);
158 // std::memory_order_relaxed is sufficient because no other state is
159 // synchronized with |is_immediate_| outside of |lock_|.
160 return !is_immediate_.exchange(true, std::memory_order_relaxed);
161 }
162
GetRemainingConcurrency() const163 size_t Sequence::GetRemainingConcurrency() const {
164 return 1;
165 }
166
TakeNextImmediateTask()167 Task Sequence::TakeNextImmediateTask() {
168 Task next_task = std::move(queue_.front());
169 queue_.pop();
170 return next_task;
171 }
172
TakeEarliestTask()173 Task Sequence::TakeEarliestTask() {
174 if (queue_.empty())
175 return delayed_queue_.take_top();
176
177 if (delayed_queue_.empty())
178 return TakeNextImmediateTask();
179
180 // Both queues contain at least a task. Decide from which one the task should
181 // be taken.
182 if (queue_.front().queue_time <=
183 delayed_queue_.top().latest_delayed_run_time())
184 return TakeNextImmediateTask();
185
186 return delayed_queue_.take_top();
187 }
188
UpdateReadyTimes()189 void Sequence::UpdateReadyTimes() {
190 DCHECK(!IsEmpty());
191 if (queue_.empty()) {
192 latest_ready_time_.store(delayed_queue_.top().latest_delayed_run_time(),
193 std::memory_order_relaxed);
194 earliest_ready_time_.store(delayed_queue_.top().earliest_delayed_run_time(),
195 std::memory_order_relaxed);
196 return;
197 }
198
199 if (delayed_queue_.empty()) {
200 latest_ready_time_.store(queue_.front().queue_time,
201 std::memory_order_relaxed);
202 } else {
203 latest_ready_time_.store(
204 std::min(queue_.front().queue_time,
205 delayed_queue_.top().latest_delayed_run_time()),
206 std::memory_order_relaxed);
207 }
208 earliest_ready_time_.store(TimeTicks(), std::memory_order_relaxed);
209 }
210
TakeTask(TaskSource::Transaction * transaction)211 Task Sequence::TakeTask(TaskSource::Transaction* transaction) {
212 CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
213 AnnotateLockAcquired annotate(lock_);
214
215 DCHECK(has_worker_);
216 DCHECK(is_immediate_.load(std::memory_order_relaxed));
217 DCHECK(!queue_.empty() || !delayed_queue_.empty());
218
219 auto next_task = TakeEarliestTask();
220
221 if (!IsEmpty())
222 UpdateReadyTimes();
223
224 return next_task;
225 }
226
DidProcessTask(TaskSource::Transaction * transaction)227 bool Sequence::DidProcessTask(TaskSource::Transaction* transaction) {
228 CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
229 AnnotateLockAcquired annotate(lock_);
230
231 // There should never be a call to DidProcessTask without an associated
232 // WillRunTask().
233 DCHECK(has_worker_);
234 has_worker_ = false;
235
236 // See comment on TaskSource::task_runner_ for lifetime management details.
237 if (IsEmpty()) {
238 is_immediate_.store(false, std::memory_order_relaxed);
239 ReleaseTaskRunner();
240 return false;
241 }
242
243 // Let the caller re-enqueue this non-empty Sequence regardless of
244 // |run_result| so it can continue churning through this Sequence's tasks and
245 // skip/delete them in the proper scope.
246 return true;
247 }
248
WillReEnqueue(TimeTicks now,TaskSource::Transaction * transaction)249 bool Sequence::WillReEnqueue(TimeTicks now,
250 TaskSource::Transaction* transaction) {
251 CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
252 AnnotateLockAcquired annotate(lock_);
253
254 // This should always be called from a worker thread and it will be
255 // called after DidProcessTask().
256 DCHECK(is_immediate_.load(std::memory_order_relaxed));
257
258 bool has_ready_tasks = HasReadyTasks(now);
259 if (!has_ready_tasks)
260 is_immediate_.store(false, std::memory_order_relaxed);
261
262 return has_ready_tasks;
263 }
264
DelayedSortKeyWillChange(const Task & delayed_task) const265 bool Sequence::DelayedSortKeyWillChange(const Task& delayed_task) const {
266 // If sequence has already been picked up by a worker or moved, no need to
267 // proceed further here.
268 if (is_immediate_.load(std::memory_order_relaxed)) {
269 return false;
270 }
271
272 if (IsEmpty()) {
273 return true;
274 }
275
276 return delayed_task.latest_delayed_run_time() <
277 delayed_queue_.top().latest_delayed_run_time();
278 }
279
HasReadyTasks(TimeTicks now) const280 bool Sequence::HasReadyTasks(TimeTicks now) const {
281 return now >= TS_UNCHECKED_READ(earliest_ready_time_)
282 .load(std::memory_order_relaxed);
283 }
284
HasImmediateTasks() const285 bool Sequence::HasImmediateTasks() const {
286 return !queue_.empty();
287 }
288
GetSortKey() const289 TaskSourceSortKey Sequence::GetSortKey() const {
290 return TaskSourceSortKey(
291 priority_racy(),
292 TS_UNCHECKED_READ(latest_ready_time_).load(std::memory_order_relaxed));
293 }
294
GetDelayedSortKey() const295 TimeTicks Sequence::GetDelayedSortKey() const {
296 return TS_UNCHECKED_READ(latest_ready_time_).load(std::memory_order_relaxed);
297 }
298
Clear(TaskSource::Transaction * transaction)299 Task Sequence::Clear(TaskSource::Transaction* transaction) {
300 CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
301 AnnotateLockAcquired annotate(lock_);
302
303 // See comment on TaskSource::task_runner_ for lifetime management details.
304 if (!IsEmpty() && !has_worker_) {
305 ReleaseTaskRunner();
306 }
307
308 return Task(
309 FROM_HERE,
310 base::BindOnce(
311 [](base::queue<Task> queue,
312 base::IntrusiveHeap<Task, DelayedTaskGreater> delayed_queue) {
313 while (!queue.empty())
314 queue.pop();
315
316 while (!delayed_queue.empty())
317 delayed_queue.pop();
318 },
319 std::move(queue_), std::move(delayed_queue_)),
320 TimeTicks(), TimeDelta());
321 }
322
ReleaseTaskRunner()323 void Sequence::ReleaseTaskRunner() {
324 if (!task_runner())
325 return;
326 // No member access after this point, releasing |task_runner()| might delete
327 // |this|.
328 task_runner()->Release();
329 }
330
Sequence(const TaskTraits & traits,TaskRunner * task_runner,TaskSourceExecutionMode execution_mode)331 Sequence::Sequence(const TaskTraits& traits,
332 TaskRunner* task_runner,
333 TaskSourceExecutionMode execution_mode)
334 : TaskSource(traits, task_runner, execution_mode) {}
335
336 Sequence::~Sequence() = default;
337
BeginTransaction()338 Sequence::Transaction Sequence::BeginTransaction() {
339 return Transaction(this);
340 }
341
GetExecutionEnvironment()342 ExecutionEnvironment Sequence::GetExecutionEnvironment() {
343 return {token_, &sequence_local_storage_};
344 }
345
IsEmpty() const346 bool Sequence::IsEmpty() const {
347 return queue_.empty() && delayed_queue_.empty();
348 }
349
350 } // namespace internal
351 } // namespace base
352