• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/sequence.h"
6 
7 #include <utility>
8 
9 #include "base/check.h"
10 #include "base/critical_closure.h"
11 #include "base/feature_list.h"
12 #include "base/functional/bind.h"
13 #include "base/memory/ptr_util.h"
14 #include "base/task/task_features.h"
15 #include "base/time/time.h"
16 
17 namespace base {
18 namespace internal {
19 
20 namespace {
21 
22 // Asserts that a lock is acquired and annotates the scope such that
23 // base/thread_annotations.h can recognize that the lock is acquired.
24 class SCOPED_LOCKABLE AnnotateLockAcquired {
25  public:
26   explicit AnnotateLockAcquired(const CheckedLock& lock)
EXCLUSIVE_LOCK_FUNCTION(lock)27       EXCLUSIVE_LOCK_FUNCTION(lock)
28       : acquired_lock_(lock) {
29     acquired_lock_->AssertAcquired();
30   }
31 
UNLOCK_FUNCTION()32   ~AnnotateLockAcquired() UNLOCK_FUNCTION() {
33     acquired_lock_->AssertAcquired();
34   }
35 
36  private:
37   const raw_ref<const CheckedLock> acquired_lock_;
38 };
39 
MaybeMakeCriticalClosure(TaskShutdownBehavior shutdown_behavior,Task & task)40 void MaybeMakeCriticalClosure(TaskShutdownBehavior shutdown_behavior,
41                               Task& task) {
42   switch (shutdown_behavior) {
43     case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
44       // Nothing to do.
45       break;
46     case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
47       // MakeCriticalClosure() is arguably useful for SKIP_ON_SHUTDOWN, possibly
48       // in combination with is_immediate=false. However, SKIP_ON_SHUTDOWN is
49       // the default and hence the theoretical benefits don't warrant the
50       // performance implications.
51       break;
52     case TaskShutdownBehavior::BLOCK_SHUTDOWN:
53       task.task =
54           MakeCriticalClosure(task.posted_from, std::move(task.task),
55                               /*is_immediate=*/task.delayed_run_time.is_null());
56       break;
57   }
58 }
59 
60 }  // namespace
61 
Transaction(Sequence * sequence)62 Sequence::Transaction::Transaction(Sequence* sequence)
63     : TaskSource::Transaction(sequence) {}
64 
65 Sequence::Transaction::Transaction(Sequence::Transaction&& other) = default;
66 
67 Sequence::Transaction::~Transaction() = default;
68 
WillPushImmediateTask()69 bool Sequence::Transaction::WillPushImmediateTask() {
70   // In a Transaction.
71   AnnotateLockAcquired annotate(sequence()->lock_);
72 
73   bool was_immediate =
74       sequence()->is_immediate_.exchange(true, std::memory_order_relaxed);
75   return !was_immediate;
76 }
77 
PushImmediateTask(Task task)78 void Sequence::Transaction::PushImmediateTask(Task task) {
79   // In a Transaction.
80   AnnotateLockAcquired annotate(sequence()->lock_);
81 
82   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
83   // for details.
84   CHECK(task.task);
85   DCHECK(!task.queue_time.is_null());
86   DCHECK(sequence()->is_immediate_.load(std::memory_order_relaxed));
87 
88   bool was_unretained = sequence()->IsEmpty() && !sequence()->has_worker_;
89   bool queue_was_empty = sequence()->queue_.empty();
90 
91   MaybeMakeCriticalClosure(sequence()->traits_.shutdown_behavior(), task);
92 
93   sequence()->queue_.push(std::move(task));
94 
95   if (queue_was_empty)
96     sequence()->UpdateReadyTimes();
97 
98   // AddRef() matched by manual Release() when the sequence has no more tasks
99   // to run (in DidProcessTask() or Clear()).
100   if (was_unretained && sequence()->task_runner())
101     sequence()->task_runner()->AddRef();
102 }
103 
PushDelayedTask(Task task)104 bool Sequence::Transaction::PushDelayedTask(Task task) {
105   // In a Transaction.
106   AnnotateLockAcquired annotate(sequence()->lock_);
107 
108   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
109   // for details.
110   CHECK(task.task);
111   DCHECK(!task.queue_time.is_null());
112   DCHECK(!task.delayed_run_time.is_null());
113 
114   bool top_will_change = sequence()->DelayedSortKeyWillChange(task);
115   bool was_empty = sequence()->IsEmpty();
116 
117   MaybeMakeCriticalClosure(sequence()->traits_.shutdown_behavior(), task);
118 
119   sequence()->delayed_queue_.insert(std::move(task));
120 
121   if (sequence()->queue_.empty())
122     sequence()->UpdateReadyTimes();
123 
124   // AddRef() matched by manual Release() when the sequence has no more tasks
125   // to run (in DidProcessTask() or Clear()).
126   if (was_empty && !sequence()->has_worker_ && sequence()->task_runner())
127     sequence()->task_runner()->AddRef();
128 
129   return top_will_change;
130 }
131 
132 // Delayed tasks are ordered by latest_delayed_run_time(). The top task may
133 // not be the first task eligible to run, but tasks will always become ripe
134 // before their latest_delayed_run_time().
operator ()(const Task & lhs,const Task & rhs) const135 bool Sequence::DelayedTaskGreater::operator()(const Task& lhs,
136                                               const Task& rhs) const {
137   TimeTicks lhs_latest_delayed_run_time = lhs.latest_delayed_run_time();
138   TimeTicks rhs_latest_delayed_run_time = rhs.latest_delayed_run_time();
139   return std::tie(lhs_latest_delayed_run_time, lhs.sequence_num) >
140          std::tie(rhs_latest_delayed_run_time, rhs.sequence_num);
141 }
142 
WillRunTask()143 TaskSource::RunStatus Sequence::WillRunTask() {
144   // There should never be a second call to WillRunTask() before DidProcessTask
145   // since the RunStatus is always marked a saturated.
146   DCHECK(!has_worker_);
147 
148   // It's ok to access |has_worker_| outside of a Transaction since
149   // WillRunTask() is externally synchronized, always called in sequence with
150   // TakeTask() and DidProcessTask() and only called if HasReadyTasks(), which
151   // means it won't race with Push[Immediate/Delayed]Task().
152   has_worker_ = true;
153   return RunStatus::kAllowedSaturated;
154 }
155 
OnBecomeReady()156 bool Sequence::OnBecomeReady() {
157   DCHECK(!has_worker_);
158   // std::memory_order_relaxed is sufficient because no other state is
159   // synchronized with |is_immediate_| outside of |lock_|.
160   return !is_immediate_.exchange(true, std::memory_order_relaxed);
161 }
162 
GetRemainingConcurrency() const163 size_t Sequence::GetRemainingConcurrency() const {
164   return 1;
165 }
166 
TakeNextImmediateTask()167 Task Sequence::TakeNextImmediateTask() {
168   Task next_task = std::move(queue_.front());
169   queue_.pop();
170   return next_task;
171 }
172 
TakeEarliestTask()173 Task Sequence::TakeEarliestTask() {
174   if (queue_.empty())
175     return delayed_queue_.take_top();
176 
177   if (delayed_queue_.empty())
178     return TakeNextImmediateTask();
179 
180   // Both queues contain at least a task. Decide from which one the task should
181   // be taken.
182   if (queue_.front().queue_time <=
183       delayed_queue_.top().latest_delayed_run_time())
184     return TakeNextImmediateTask();
185 
186   return delayed_queue_.take_top();
187 }
188 
UpdateReadyTimes()189 void Sequence::UpdateReadyTimes() {
190   DCHECK(!IsEmpty());
191   if (queue_.empty()) {
192     latest_ready_time_.store(delayed_queue_.top().latest_delayed_run_time(),
193                              std::memory_order_relaxed);
194     earliest_ready_time_.store(delayed_queue_.top().earliest_delayed_run_time(),
195                                std::memory_order_relaxed);
196     return;
197   }
198 
199   if (delayed_queue_.empty()) {
200     latest_ready_time_.store(queue_.front().queue_time,
201                              std::memory_order_relaxed);
202   } else {
203     latest_ready_time_.store(
204         std::min(queue_.front().queue_time,
205                  delayed_queue_.top().latest_delayed_run_time()),
206         std::memory_order_relaxed);
207   }
208   earliest_ready_time_.store(TimeTicks(), std::memory_order_relaxed);
209 }
210 
TakeTask(TaskSource::Transaction * transaction)211 Task Sequence::TakeTask(TaskSource::Transaction* transaction) {
212   CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
213   AnnotateLockAcquired annotate(lock_);
214 
215   DCHECK(has_worker_);
216   DCHECK(is_immediate_.load(std::memory_order_relaxed));
217   DCHECK(!queue_.empty() || !delayed_queue_.empty());
218 
219   auto next_task = TakeEarliestTask();
220 
221   if (!IsEmpty())
222     UpdateReadyTimes();
223 
224   return next_task;
225 }
226 
DidProcessTask(TaskSource::Transaction * transaction)227 bool Sequence::DidProcessTask(TaskSource::Transaction* transaction) {
228   CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
229   AnnotateLockAcquired annotate(lock_);
230 
231   // There should never be a call to DidProcessTask without an associated
232   // WillRunTask().
233   DCHECK(has_worker_);
234   has_worker_ = false;
235 
236   // See comment on TaskSource::task_runner_ for lifetime management details.
237   if (IsEmpty()) {
238     is_immediate_.store(false, std::memory_order_relaxed);
239     ReleaseTaskRunner();
240     return false;
241   }
242 
243   // Let the caller re-enqueue this non-empty Sequence regardless of
244   // |run_result| so it can continue churning through this Sequence's tasks and
245   // skip/delete them in the proper scope.
246   return true;
247 }
248 
WillReEnqueue(TimeTicks now,TaskSource::Transaction * transaction)249 bool Sequence::WillReEnqueue(TimeTicks now,
250                              TaskSource::Transaction* transaction) {
251   CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
252   AnnotateLockAcquired annotate(lock_);
253 
254   // This should always be called from a worker thread and it will be
255   // called after DidProcessTask().
256   DCHECK(is_immediate_.load(std::memory_order_relaxed));
257 
258   bool has_ready_tasks = HasReadyTasks(now);
259   if (!has_ready_tasks)
260     is_immediate_.store(false, std::memory_order_relaxed);
261 
262   return has_ready_tasks;
263 }
264 
DelayedSortKeyWillChange(const Task & delayed_task) const265 bool Sequence::DelayedSortKeyWillChange(const Task& delayed_task) const {
266   // If sequence has already been picked up by a worker or moved, no need to
267   // proceed further here.
268   if (is_immediate_.load(std::memory_order_relaxed)) {
269     return false;
270   }
271 
272   if (IsEmpty()) {
273     return true;
274   }
275 
276   return delayed_task.latest_delayed_run_time() <
277          delayed_queue_.top().latest_delayed_run_time();
278 }
279 
HasReadyTasks(TimeTicks now) const280 bool Sequence::HasReadyTasks(TimeTicks now) const {
281   return now >= TS_UNCHECKED_READ(earliest_ready_time_)
282                     .load(std::memory_order_relaxed);
283 }
284 
HasImmediateTasks() const285 bool Sequence::HasImmediateTasks() const {
286   return !queue_.empty();
287 }
288 
GetSortKey() const289 TaskSourceSortKey Sequence::GetSortKey() const {
290   return TaskSourceSortKey(
291       priority_racy(),
292       TS_UNCHECKED_READ(latest_ready_time_).load(std::memory_order_relaxed));
293 }
294 
GetDelayedSortKey() const295 TimeTicks Sequence::GetDelayedSortKey() const {
296   return TS_UNCHECKED_READ(latest_ready_time_).load(std::memory_order_relaxed);
297 }
298 
Clear(TaskSource::Transaction * transaction)299 Task Sequence::Clear(TaskSource::Transaction* transaction) {
300   CheckedAutoLockMaybe auto_lock(transaction ? nullptr : &lock_);
301   AnnotateLockAcquired annotate(lock_);
302 
303   // See comment on TaskSource::task_runner_ for lifetime management details.
304   if (!IsEmpty() && !has_worker_) {
305     ReleaseTaskRunner();
306   }
307 
308   return Task(
309       FROM_HERE,
310       base::BindOnce(
311           [](base::queue<Task> queue,
312              base::IntrusiveHeap<Task, DelayedTaskGreater> delayed_queue) {
313             while (!queue.empty())
314               queue.pop();
315 
316             while (!delayed_queue.empty())
317               delayed_queue.pop();
318           },
319           std::move(queue_), std::move(delayed_queue_)),
320       TimeTicks(), TimeDelta());
321 }
322 
ReleaseTaskRunner()323 void Sequence::ReleaseTaskRunner() {
324   if (!task_runner())
325     return;
326   // No member access after this point, releasing |task_runner()| might delete
327   // |this|.
328   task_runner()->Release();
329 }
330 
Sequence(const TaskTraits & traits,TaskRunner * task_runner,TaskSourceExecutionMode execution_mode)331 Sequence::Sequence(const TaskTraits& traits,
332                    TaskRunner* task_runner,
333                    TaskSourceExecutionMode execution_mode)
334     : TaskSource(traits, task_runner, execution_mode) {}
335 
336 Sequence::~Sequence() = default;
337 
BeginTransaction()338 Sequence::Transaction Sequence::BeginTransaction() {
339   return Transaction(this);
340 }
341 
GetExecutionEnvironment()342 ExecutionEnvironment Sequence::GetExecutionEnvironment() {
343   return {token_, &sequence_local_storage_};
344 }
345 
IsEmpty() const346 bool Sequence::IsEmpty() const {
347   return queue_.empty() && delayed_queue_.empty();
348 }
349 
350 }  // namespace internal
351 }  // namespace base
352