• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/task_tracker.h"
6 
7 #include <atomic>
8 #include <string>
9 #include <utility>
10 
11 #include "base/base_switches.h"
12 #include "base/command_line.h"
13 #include "base/compiler_specific.h"
14 #include "base/debug/alias.h"
15 #include "base/functional/callback.h"
16 #include "base/json/json_writer.h"
17 #include "base/logging.h"
18 #include "base/memory/ptr_util.h"
19 #include "base/metrics/histogram_macros.h"
20 #include "base/notreached.h"
21 #include "base/sequence_token.h"
22 #include "base/strings/string_util.h"
23 #include "base/synchronization/condition_variable.h"
24 #include "base/synchronization/waitable_event.h"
25 #include "base/task/scoped_set_task_priority_for_current_thread.h"
26 #include "base/task/sequenced_task_runner.h"
27 #include "base/task/single_thread_task_runner.h"
28 #include "base/task/thread_pool/job_task_source.h"
29 #include "base/threading/sequence_local_storage_map.h"
30 #include "base/threading/thread_restrictions.h"
31 #include "base/time/time.h"
32 #include "base/trace_event/base_tracing.h"
33 #include "base/values.h"
34 #include "build/build_config.h"
35 #include "third_party/abseil-cpp/absl/base/attributes.h"
36 #include "third_party/abseil-cpp/absl/types/optional.h"
37 
38 namespace base {
39 namespace internal {
40 
41 namespace {
42 
43 #if BUILDFLAG(ENABLE_BASE_TRACING)
44 using perfetto::protos::pbzero::ChromeThreadPoolTask;
45 using perfetto::protos::pbzero::ChromeTrackEvent;
46 #endif  // BUILDFLAG(ENABLE_BASE_TRACING)
47 
48 constexpr const char* kExecutionModeString[] = {"parallel", "sequenced",
49                                                 "single thread", "job"};
50 static_assert(
51     std::size(kExecutionModeString) ==
52         static_cast<size_t>(TaskSourceExecutionMode::kMax) + 1,
53     "Array kExecutionModeString is out of sync with TaskSourceExecutionMode.");
54 
HasLogBestEffortTasksSwitch()55 bool HasLogBestEffortTasksSwitch() {
56   // The CommandLine might not be initialized if ThreadPool is initialized in a
57   // dynamic library which doesn't have access to argc/argv.
58   return CommandLine::InitializedForCurrentProcess() &&
59          CommandLine::ForCurrentProcess()->HasSwitch(
60              switches::kLogBestEffortTasks);
61 }
62 
63 #if BUILDFLAG(ENABLE_BASE_TRACING)
TaskPriorityToProto(TaskPriority priority)64 ChromeThreadPoolTask::Priority TaskPriorityToProto(TaskPriority priority) {
65   switch (priority) {
66     case TaskPriority::BEST_EFFORT:
67       return ChromeThreadPoolTask::PRIORITY_BEST_EFFORT;
68     case TaskPriority::USER_VISIBLE:
69       return ChromeThreadPoolTask::PRIORITY_USER_VISIBLE;
70     case TaskPriority::USER_BLOCKING:
71       return ChromeThreadPoolTask::PRIORITY_USER_BLOCKING;
72   }
73 }
74 
ExecutionModeToProto(TaskSourceExecutionMode mode)75 ChromeThreadPoolTask::ExecutionMode ExecutionModeToProto(
76     TaskSourceExecutionMode mode) {
77   switch (mode) {
78     case TaskSourceExecutionMode::kParallel:
79       return ChromeThreadPoolTask::EXECUTION_MODE_PARALLEL;
80     case TaskSourceExecutionMode::kSequenced:
81       return ChromeThreadPoolTask::EXECUTION_MODE_SEQUENCED;
82     case TaskSourceExecutionMode::kSingleThread:
83       return ChromeThreadPoolTask::EXECUTION_MODE_SINGLE_THREAD;
84     case TaskSourceExecutionMode::kJob:
85       return ChromeThreadPoolTask::EXECUTION_MODE_JOB;
86   }
87 }
88 
ShutdownBehaviorToProto(TaskShutdownBehavior shutdown_behavior)89 ChromeThreadPoolTask::ShutdownBehavior ShutdownBehaviorToProto(
90     TaskShutdownBehavior shutdown_behavior) {
91   switch (shutdown_behavior) {
92     case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
93       return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_CONTINUE_ON_SHUTDOWN;
94     case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
95       return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_SKIP_ON_SHUTDOWN;
96     case TaskShutdownBehavior::BLOCK_SHUTDOWN:
97       return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_BLOCK_SHUTDOWN;
98   }
99 }
100 #endif  //  BUILDFLAG(ENABLE_BASE_TRACING)
101 
EmitThreadPoolTraceEventMetadata(perfetto::EventContext & ctx,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)102 auto EmitThreadPoolTraceEventMetadata(perfetto::EventContext& ctx,
103                                       const TaskTraits& traits,
104                                       TaskSource* task_source,
105                                       const SequenceToken& token) {
106 #if BUILDFLAG(ENABLE_BASE_TRACING)
107   // Other parameters are included only when "scheduler" category is enabled.
108   const uint8_t* scheduler_category_enabled =
109       TRACE_EVENT_API_GET_CATEGORY_GROUP_ENABLED("scheduler");
110 
111   if (!*scheduler_category_enabled)
112     return;
113   auto* task = ctx.event<perfetto::protos::pbzero::ChromeTrackEvent>()
114                    ->set_thread_pool_task();
115   task->set_task_priority(TaskPriorityToProto(traits.priority()));
116   task->set_execution_mode(ExecutionModeToProto(task_source->execution_mode()));
117   task->set_shutdown_behavior(
118       ShutdownBehaviorToProto(traits.shutdown_behavior()));
119   if (token.IsValid())
120     task->set_sequence_token(token.ToInternalValue());
121 #endif  //  BUILDFLAG(ENABLE_BASE_TRACING)
122 }
123 
124 ABSL_CONST_INIT thread_local bool fizzle_block_shutdown_tasks = false;
125 
126 }  // namespace
127 
128 // Atomic internal state used by TaskTracker to track items that are blocking
129 // Shutdown. An "item" consist of either:
130 // - A running SKIP_ON_SHUTDOWN task
131 // - A queued/running BLOCK_SHUTDOWN TaskSource.
132 // Sequential consistency shouldn't be assumed from these calls (i.e. a thread
133 // reading |HasShutdownStarted() == true| isn't guaranteed to see all writes
134 // made before |StartShutdown()| on the thread that invoked it).
135 class TaskTracker::State {
136  public:
137   State() = default;
138   State(const State&) = delete;
139   State& operator=(const State&) = delete;
140 
141   // Sets a flag indicating that shutdown has started. Returns true if there are
142   // items blocking shutdown. Can only be called once.
StartShutdown()143   bool StartShutdown() {
144     const auto new_value =
145         subtle::NoBarrier_AtomicIncrement(&bits_, kShutdownHasStartedMask);
146 
147     // Check that the "shutdown has started" bit isn't zero. This would happen
148     // if it was incremented twice.
149     DCHECK(new_value & kShutdownHasStartedMask);
150 
151     const auto num_items_blocking_shutdown =
152         new_value >> kNumItemsBlockingShutdownBitOffset;
153     return num_items_blocking_shutdown != 0;
154   }
155 
156   // Returns true if shutdown has started.
HasShutdownStarted() const157   bool HasShutdownStarted() const {
158     return subtle::NoBarrier_Load(&bits_) & kShutdownHasStartedMask;
159   }
160 
161   // Returns true if there are items blocking shutdown.
AreItemsBlockingShutdown() const162   bool AreItemsBlockingShutdown() const {
163     const auto num_items_blocking_shutdown =
164         subtle::NoBarrier_Load(&bits_) >> kNumItemsBlockingShutdownBitOffset;
165     DCHECK_GE(num_items_blocking_shutdown, 0);
166     return num_items_blocking_shutdown != 0;
167   }
168 
169   // Increments the number of items blocking shutdown. Returns true if
170   // shutdown has started.
IncrementNumItemsBlockingShutdown()171   bool IncrementNumItemsBlockingShutdown() {
172 #if DCHECK_IS_ON()
173     // Verify that no overflow will occur.
174     const auto num_items_blocking_shutdown =
175         subtle::NoBarrier_Load(&bits_) >> kNumItemsBlockingShutdownBitOffset;
176     DCHECK_LT(num_items_blocking_shutdown,
177               std::numeric_limits<subtle::Atomic32>::max() -
178                   kNumItemsBlockingShutdownIncrement);
179 #endif
180 
181     const auto new_bits = subtle::NoBarrier_AtomicIncrement(
182         &bits_, kNumItemsBlockingShutdownIncrement);
183     return new_bits & kShutdownHasStartedMask;
184   }
185 
186   // Decrements the number of items blocking shutdown. Returns true if shutdown
187   // has started and the number of tasks blocking shutdown becomes zero.
DecrementNumItemsBlockingShutdown()188   bool DecrementNumItemsBlockingShutdown() {
189     const auto new_bits = subtle::NoBarrier_AtomicIncrement(
190         &bits_, -kNumItemsBlockingShutdownIncrement);
191     const bool shutdown_has_started = new_bits & kShutdownHasStartedMask;
192     const auto num_items_blocking_shutdown =
193         new_bits >> kNumItemsBlockingShutdownBitOffset;
194     DCHECK_GE(num_items_blocking_shutdown, 0);
195     return shutdown_has_started && num_items_blocking_shutdown == 0;
196   }
197 
198  private:
199   static constexpr subtle::Atomic32 kShutdownHasStartedMask = 1;
200   static constexpr subtle::Atomic32 kNumItemsBlockingShutdownBitOffset = 1;
201   static constexpr subtle::Atomic32 kNumItemsBlockingShutdownIncrement =
202       1 << kNumItemsBlockingShutdownBitOffset;
203 
204   // The LSB indicates whether shutdown has started. The other bits count the
205   // number of items blocking shutdown.
206   // No barriers are required to read/write |bits_| as this class is only used
207   // as an atomic state checker, it doesn't provide sequential consistency
208   // guarantees w.r.t. external state. Sequencing of the TaskTracker::State
209   // operations themselves is guaranteed by the AtomicIncrement RMW (read-
210   // modify-write) semantics however. For example, if two threads are racing to
211   // call IncrementNumItemsBlockingShutdown() and StartShutdown() respectively,
212   // either the first thread will win and the StartShutdown() call will see the
213   // blocking task or the second thread will win and
214   // IncrementNumItemsBlockingShutdown() will know that shutdown has started.
215   subtle::Atomic32 bits_ = 0;
216 };
217 
TaskTracker()218 TaskTracker::TaskTracker()
219     : has_log_best_effort_tasks_switch_(HasLogBestEffortTasksSwitch()),
220       state_(new State),
221       can_run_policy_(CanRunPolicy::kAll),
222       flush_cv_(flush_lock_.CreateConditionVariable()),
223       shutdown_lock_(&flush_lock_),
224       tracked_ref_factory_(this) {
225   // |flush_cv_| is only waited upon in FlushForTesting(), avoid instantiating a
226   // ScopedBlockingCallWithBaseSyncPrimitives from test threads intentionally
227   // idling themselves to wait on the ThreadPool.
228   flush_cv_->declare_only_used_while_idle();
229 }
230 
231 TaskTracker::~TaskTracker() = default;
232 
StartShutdown()233 void TaskTracker::StartShutdown() {
234   CheckedAutoLock auto_lock(shutdown_lock_);
235 
236   // This method can only be called once.
237   DCHECK(!shutdown_event_);
238   DCHECK(!state_->HasShutdownStarted());
239 
240   shutdown_event_ = std::make_unique<WaitableEvent>();
241 
242   const bool tasks_are_blocking_shutdown = state_->StartShutdown();
243 
244   // From now, if a thread causes the number of tasks blocking shutdown to
245   // become zero, it will call OnBlockingShutdownTasksComplete().
246 
247   if (!tasks_are_blocking_shutdown) {
248     // If another thread posts a BLOCK_SHUTDOWN task at this moment, it will
249     // block until this method releases |shutdown_lock_|. Then, it will fail
250     // DCHECK(!shutdown_event_->IsSignaled()). This is the desired behavior
251     // because posting a BLOCK_SHUTDOWN task after StartShutdown() when no
252     // tasks are blocking shutdown isn't allowed.
253     shutdown_event_->Signal();
254     return;
255   }
256 }
257 
CompleteShutdown()258 void TaskTracker::CompleteShutdown() {
259   // It is safe to access |shutdown_event_| without holding |lock_| because the
260   // pointer never changes after being set by StartShutdown(), which must
261   // happen-before this.
262   DCHECK(TS_UNCHECKED_READ(shutdown_event_));
263 
264   {
265     base::ScopedAllowBaseSyncPrimitives allow_wait;
266     // Allow tests to wait for and introduce logging about the shutdown tasks
267     // before we block this thread.
268     BeginCompleteShutdown(*TS_UNCHECKED_READ(shutdown_event_));
269     // Now block the thread until all tasks are done.
270     TS_UNCHECKED_READ(shutdown_event_)->Wait();
271   }
272 
273   // Unblock FlushForTesting() and perform the FlushAsyncForTesting callback
274   // when shutdown completes.
275   {
276     CheckedAutoLock auto_lock(flush_lock_);
277     flush_cv_->Broadcast();
278   }
279   InvokeFlushCallbacksForTesting();
280 }
281 
FlushForTesting()282 void TaskTracker::FlushForTesting() {
283   AssertFlushForTestingAllowed();
284   CheckedAutoLock auto_lock(flush_lock_);
285   while (num_incomplete_task_sources_.load(std::memory_order_acquire) != 0 &&
286          !IsShutdownComplete()) {
287     flush_cv_->Wait();
288   }
289 }
290 
FlushAsyncForTesting(OnceClosure flush_callback)291 void TaskTracker::FlushAsyncForTesting(OnceClosure flush_callback) {
292   DCHECK(flush_callback);
293   {
294     CheckedAutoLock auto_lock(flush_lock_);
295     flush_callbacks_for_testing_.push_back(std::move(flush_callback));
296   }
297 
298   if (num_incomplete_task_sources_.load(std::memory_order_acquire) == 0 ||
299       IsShutdownComplete()) {
300     InvokeFlushCallbacksForTesting();
301   }
302 }
303 
SetCanRunPolicy(CanRunPolicy can_run_policy)304 void TaskTracker::SetCanRunPolicy(CanRunPolicy can_run_policy) {
305   can_run_policy_.store(can_run_policy);
306 }
307 
WillEnqueueJob(JobTaskSource * task_source)308 void TaskTracker::WillEnqueueJob(JobTaskSource* task_source) {
309   task_source->WillEnqueue(sequence_nums_.GetNext(), task_annotator_);
310 }
311 
WillPostTask(Task * task,TaskShutdownBehavior shutdown_behavior)312 bool TaskTracker::WillPostTask(Task* task,
313                                TaskShutdownBehavior shutdown_behavior) {
314   DCHECK(task);
315   DCHECK(task->task);
316 
317   task->sequence_num = sequence_nums_.GetNext();
318   if (state_->HasShutdownStarted()) {
319     // A non BLOCK_SHUTDOWN task is allowed to be posted iff shutdown hasn't
320     // started and the task is not delayed.
321     if (shutdown_behavior != TaskShutdownBehavior::BLOCK_SHUTDOWN ||
322         !task->delayed_run_time.is_null() || fizzle_block_shutdown_tasks) {
323       return false;
324     }
325 
326     // A BLOCK_SHUTDOWN task posted after shutdown has completed without setting
327     // `fizzle_block_shutdown_tasks` is an ordering bug. This aims to catch
328     // those early.
329     CheckedAutoLock auto_lock(shutdown_lock_);
330     DCHECK(shutdown_event_);
331     DCHECK(!shutdown_event_->IsSignaled())
332         << "posted_from: " << task->posted_from.ToString();
333   }
334 
335   // TODO(scheduler-dev): Record the task traits here.
336   task_annotator_.WillQueueTask("ThreadPool_PostTask", task);
337 
338   return true;
339 }
340 
WillPostTaskNow(const Task & task,TaskPriority priority) const341 bool TaskTracker::WillPostTaskNow(const Task& task,
342                                   TaskPriority priority) const {
343   // Delayed tasks's TaskShutdownBehavior is implicitly capped at
344   // SKIP_ON_SHUTDOWN. i.e. it cannot BLOCK_SHUTDOWN, TaskTracker will not wait
345   // for a delayed task in a BLOCK_SHUTDOWN TaskSource and will also skip
346   // delayed tasks that happen to become ripe during shutdown.
347   if (!task.delayed_run_time.is_null() && state_->HasShutdownStarted())
348     return false;
349 
350   if (has_log_best_effort_tasks_switch_ &&
351       priority == TaskPriority::BEST_EFFORT) {
352     // A TaskPriority::BEST_EFFORT task is being posted.
353     LOG(INFO) << task.posted_from.ToString();
354   }
355   return true;
356 }
357 
RegisterTaskSource(scoped_refptr<TaskSource> task_source)358 RegisteredTaskSource TaskTracker::RegisterTaskSource(
359     scoped_refptr<TaskSource> task_source) {
360   DCHECK(task_source);
361 
362   TaskShutdownBehavior shutdown_behavior = task_source->shutdown_behavior();
363   if (!BeforeQueueTaskSource(shutdown_behavior))
364     return nullptr;
365 
366   num_incomplete_task_sources_.fetch_add(1, std::memory_order_relaxed);
367   return RegisteredTaskSource(std::move(task_source), this);
368 }
369 
CanRunPriority(TaskPriority priority) const370 bool TaskTracker::CanRunPriority(TaskPriority priority) const {
371   auto can_run_policy = can_run_policy_.load();
372 
373   if (can_run_policy == CanRunPolicy::kAll)
374     return true;
375 
376   if (can_run_policy == CanRunPolicy::kForegroundOnly &&
377       priority >= TaskPriority::USER_VISIBLE) {
378     return true;
379   }
380 
381   return false;
382 }
383 
RunAndPopNextTask(RegisteredTaskSource task_source)384 RegisteredTaskSource TaskTracker::RunAndPopNextTask(
385     RegisteredTaskSource task_source) {
386   DCHECK(task_source);
387 
388   const bool should_run_tasks = BeforeRunTask(task_source->shutdown_behavior());
389 
390   // Run the next task in |task_source|.
391   absl::optional<Task> task;
392   TaskTraits traits;
393   {
394     auto transaction = task_source->BeginTransaction();
395     task = should_run_tasks ? task_source.TakeTask(&transaction)
396                             : task_source.Clear(&transaction);
397     traits = transaction.traits();
398   }
399 
400   if (task) {
401     // Skip delayed tasks if shutdown started.
402     if (!task->delayed_run_time.is_null() && state_->HasShutdownStarted())
403       task->task = base::DoNothingWithBoundArgs(std::move(task->task));
404 
405     // Run the |task| (whether it's a worker task or the Clear() closure).
406     RunTask(std::move(task.value()), task_source.get(), traits);
407   }
408   if (should_run_tasks)
409     AfterRunTask(task_source->shutdown_behavior());
410   const bool task_source_must_be_queued = task_source.DidProcessTask();
411   // |task_source| should be reenqueued iff requested by DidProcessTask().
412   if (task_source_must_be_queued)
413     return task_source;
414   return nullptr;
415 }
416 
HasShutdownStarted() const417 bool TaskTracker::HasShutdownStarted() const {
418   return state_->HasShutdownStarted();
419 }
420 
IsShutdownComplete() const421 bool TaskTracker::IsShutdownComplete() const {
422   CheckedAutoLock auto_lock(shutdown_lock_);
423   return shutdown_event_ && shutdown_event_->IsSignaled();
424 }
425 
BeginFizzlingBlockShutdownTasks()426 void TaskTracker::BeginFizzlingBlockShutdownTasks() {
427   fizzle_block_shutdown_tasks = true;
428 }
429 
EndFizzlingBlockShutdownTasks()430 void TaskTracker::EndFizzlingBlockShutdownTasks() {
431   fizzle_block_shutdown_tasks = false;
432 }
433 
RunTask(Task task,TaskSource * task_source,const TaskTraits & traits)434 void TaskTracker::RunTask(Task task,
435                           TaskSource* task_source,
436                           const TaskTraits& traits) {
437   DCHECK(task_source);
438 
439   const auto environment = task_source->GetExecutionEnvironment();
440 
441   absl::optional<ScopedDisallowSingleton> disallow_singleton;
442   absl::optional<ScopedDisallowBlocking> disallow_blocking;
443   absl::optional<ScopedDisallowBaseSyncPrimitives> disallow_sync_primitives;
444   if (traits.shutdown_behavior() == TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN)
445     disallow_singleton.emplace();
446   if (!traits.may_block())
447     disallow_blocking.emplace();
448   if (!traits.with_base_sync_primitives())
449     disallow_sync_primitives.emplace();
450 
451   {
452     DCHECK(environment.token.IsValid());
453     ScopedSetSequenceTokenForCurrentThread
454         scoped_set_sequence_token_for_current_thread(environment.token);
455     ScopedSetTaskPriorityForCurrentThread
456         scoped_set_task_priority_for_current_thread(traits.priority());
457 
458     // Local storage map used if none is provided by |environment|.
459     absl::optional<SequenceLocalStorageMap> local_storage_map;
460     if (!environment.sequence_local_storage)
461       local_storage_map.emplace();
462 
463     ScopedSetSequenceLocalStorageMapForCurrentThread
464         scoped_set_sequence_local_storage_map_for_current_thread(
465             environment.sequence_local_storage
466                 ? environment.sequence_local_storage.get()
467                 : &local_storage_map.value());
468 
469     // Set up TaskRunner CurrentDefaultHandle as expected for the scope of the
470     // task.
471     absl::optional<SequencedTaskRunner::CurrentDefaultHandle>
472         sequenced_task_runner_current_default_handle;
473     absl::optional<SingleThreadTaskRunner::CurrentDefaultHandle>
474         single_thread_task_runner_current_default_handle;
475     switch (task_source->execution_mode()) {
476       case TaskSourceExecutionMode::kJob:
477       case TaskSourceExecutionMode::kParallel:
478         break;
479       case TaskSourceExecutionMode::kSequenced:
480         DCHECK(task_source->task_runner());
481         sequenced_task_runner_current_default_handle.emplace(
482             static_cast<SequencedTaskRunner*>(task_source->task_runner()));
483         break;
484       case TaskSourceExecutionMode::kSingleThread:
485         DCHECK(task_source->task_runner());
486         single_thread_task_runner_current_default_handle.emplace(
487             static_cast<SingleThreadTaskRunner*>(task_source->task_runner()));
488         break;
489     }
490 
491     RunTaskWithShutdownBehavior(task, traits, task_source, environment.token);
492 
493     // Make sure the arguments bound to the callback are deleted within the
494     // scope in which the callback runs.
495     task.task = OnceClosure();
496   }
497 }
498 
BeginCompleteShutdown(base::WaitableEvent & shutdown_event)499 void TaskTracker::BeginCompleteShutdown(base::WaitableEvent& shutdown_event) {
500   // Do nothing in production, tests may override this.
501 }
502 
HasIncompleteTaskSourcesForTesting() const503 bool TaskTracker::HasIncompleteTaskSourcesForTesting() const {
504   return num_incomplete_task_sources_.load(std::memory_order_acquire) != 0;
505 }
506 
BeforeQueueTaskSource(TaskShutdownBehavior shutdown_behavior)507 bool TaskTracker::BeforeQueueTaskSource(
508     TaskShutdownBehavior shutdown_behavior) {
509   if (shutdown_behavior == TaskShutdownBehavior::BLOCK_SHUTDOWN) {
510     // BLOCK_SHUTDOWN task sources block shutdown between the moment they are
511     // queued and the moment their last task completes its execution.
512     const bool shutdown_started = state_->IncrementNumItemsBlockingShutdown();
513 
514     if (shutdown_started) {
515       // A BLOCK_SHUTDOWN task posted after shutdown has completed is an
516       // ordering bug. This aims to catch those early.
517       CheckedAutoLock auto_lock(shutdown_lock_);
518       DCHECK(shutdown_event_);
519       DCHECK(!shutdown_event_->IsSignaled());
520     }
521 
522     return true;
523   }
524 
525   // A non BLOCK_SHUTDOWN task is allowed to be posted iff shutdown hasn't
526   // started.
527   return !state_->HasShutdownStarted();
528 }
529 
BeforeRunTask(TaskShutdownBehavior shutdown_behavior)530 bool TaskTracker::BeforeRunTask(TaskShutdownBehavior shutdown_behavior) {
531   switch (shutdown_behavior) {
532     case TaskShutdownBehavior::BLOCK_SHUTDOWN: {
533       // The number of tasks blocking shutdown has been incremented when the
534       // task was posted.
535       DCHECK(state_->AreItemsBlockingShutdown());
536 
537       // Trying to run a BLOCK_SHUTDOWN task after shutdown has completed is
538       // unexpected as it either shouldn't have been posted if shutdown
539       // completed or should be blocking shutdown if it was posted before it
540       // did.
541       DCHECK(!state_->HasShutdownStarted() || !IsShutdownComplete());
542 
543       return true;
544     }
545 
546     case TaskShutdownBehavior::SKIP_ON_SHUTDOWN: {
547       // SKIP_ON_SHUTDOWN tasks block shutdown while they are running.
548       const bool shutdown_started = state_->IncrementNumItemsBlockingShutdown();
549 
550       if (shutdown_started) {
551         // The SKIP_ON_SHUTDOWN task isn't allowed to run during shutdown.
552         // Decrement the number of tasks blocking shutdown that was wrongly
553         // incremented.
554         DecrementNumItemsBlockingShutdown();
555         return false;
556       }
557 
558       return true;
559     }
560 
561     case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN: {
562       return !state_->HasShutdownStarted();
563     }
564   }
565 
566   NOTREACHED();
567   return false;
568 }
569 
AfterRunTask(TaskShutdownBehavior shutdown_behavior)570 void TaskTracker::AfterRunTask(TaskShutdownBehavior shutdown_behavior) {
571   if (shutdown_behavior == TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
572     DecrementNumItemsBlockingShutdown();
573   }
574 }
575 
UnregisterTaskSource(scoped_refptr<TaskSource> task_source)576 scoped_refptr<TaskSource> TaskTracker::UnregisterTaskSource(
577     scoped_refptr<TaskSource> task_source) {
578   DCHECK(task_source);
579   if (task_source->shutdown_behavior() ==
580       TaskShutdownBehavior::BLOCK_SHUTDOWN) {
581     DecrementNumItemsBlockingShutdown();
582   }
583   DecrementNumIncompleteTaskSources();
584   return task_source;
585 }
586 
DecrementNumItemsBlockingShutdown()587 void TaskTracker::DecrementNumItemsBlockingShutdown() {
588   const bool shutdown_started_and_no_items_block_shutdown =
589       state_->DecrementNumItemsBlockingShutdown();
590   if (!shutdown_started_and_no_items_block_shutdown)
591     return;
592 
593   CheckedAutoLock auto_lock(shutdown_lock_);
594   DCHECK(shutdown_event_);
595   shutdown_event_->Signal();
596 }
597 
DecrementNumIncompleteTaskSources()598 void TaskTracker::DecrementNumIncompleteTaskSources() {
599   const auto prev_num_incomplete_task_sources =
600       num_incomplete_task_sources_.fetch_sub(1);
601   DCHECK_GE(prev_num_incomplete_task_sources, 1);
602   if (prev_num_incomplete_task_sources == 1) {
603     {
604       CheckedAutoLock auto_lock(flush_lock_);
605       flush_cv_->Broadcast();
606     }
607     InvokeFlushCallbacksForTesting();
608   }
609 }
610 
InvokeFlushCallbacksForTesting()611 void TaskTracker::InvokeFlushCallbacksForTesting() {
612   base::circular_deque<OnceClosure> flush_callbacks;
613   {
614     CheckedAutoLock auto_lock(flush_lock_);
615     flush_callbacks = std::move(flush_callbacks_for_testing_);
616   }
617   for (auto& flush_callback : flush_callbacks)
618     std::move(flush_callback).Run();
619 }
620 
RunContinueOnShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)621 NOINLINE void TaskTracker::RunContinueOnShutdown(Task& task,
622                                                  const TaskTraits& traits,
623                                                  TaskSource* task_source,
624                                                  const SequenceToken& token) {
625   NO_CODE_FOLDING();
626   RunTaskImpl(task, traits, task_source, token);
627 }
628 
RunSkipOnShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)629 NOINLINE void TaskTracker::RunSkipOnShutdown(Task& task,
630                                              const TaskTraits& traits,
631                                              TaskSource* task_source,
632                                              const SequenceToken& token) {
633   NO_CODE_FOLDING();
634   RunTaskImpl(task, traits, task_source, token);
635 }
636 
RunBlockShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)637 NOINLINE void TaskTracker::RunBlockShutdown(Task& task,
638                                             const TaskTraits& traits,
639                                             TaskSource* task_source,
640                                             const SequenceToken& token) {
641   NO_CODE_FOLDING();
642   RunTaskImpl(task, traits, task_source, token);
643 }
644 
RunTaskImpl(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)645 void TaskTracker::RunTaskImpl(Task& task,
646                               const TaskTraits& traits,
647                               TaskSource* task_source,
648                               const SequenceToken& token) {
649   task_annotator_.RunTask(
650       "ThreadPool_RunTask", task, [&](perfetto::EventContext& ctx) {
651         EmitThreadPoolTraceEventMetadata(ctx, traits, task_source, token);
652       });
653 }
654 
RunTaskWithShutdownBehavior(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)655 void TaskTracker::RunTaskWithShutdownBehavior(Task& task,
656                                               const TaskTraits& traits,
657                                               TaskSource* task_source,
658                                               const SequenceToken& token) {
659   switch (traits.shutdown_behavior()) {
660     case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
661       RunContinueOnShutdown(task, traits, task_source, token);
662       return;
663     case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
664       RunSkipOnShutdown(task, traits, task_source, token);
665       return;
666     case TaskShutdownBehavior::BLOCK_SHUTDOWN:
667       RunBlockShutdown(task, traits, task_source, token);
668       return;
669   }
670 }
671 
672 }  // namespace internal
673 }  // namespace base
674