• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/task_tracker.h"
6 
7 #include <atomic>
8 #include <optional>
9 #include <string>
10 #include <utility>
11 
12 #include "base/base_switches.h"
13 #include "base/command_line.h"
14 #include "base/compiler_specific.h"
15 #include "base/debug/alias.h"
16 #include "base/functional/callback.h"
17 #include "base/json/json_writer.h"
18 #include "base/logging.h"
19 #include "base/memory/ptr_util.h"
20 #include "base/metrics/histogram_macros.h"
21 #include "base/notreached.h"
22 #include "base/sequence_token.h"
23 #include "base/strings/string_util.h"
24 #include "base/synchronization/condition_variable.h"
25 #include "base/synchronization/waitable_event.h"
26 #include "base/task/scoped_set_task_priority_for_current_thread.h"
27 #include "base/task/sequenced_task_runner.h"
28 #include "base/task/single_thread_task_runner.h"
29 #include "base/task/thread_pool/job_task_source.h"
30 #include "base/task/thread_pool/task_source.h"
31 #include "base/threading/sequence_local_storage_map.h"
32 #include "base/threading/thread_restrictions.h"
33 #include "base/time/time.h"
34 #include "base/trace_event/base_tracing.h"
35 #include "base/values.h"
36 #include "build/build_config.h"
37 
38 namespace base {
39 namespace internal {
40 
41 namespace {
42 
43 #if BUILDFLAG(ENABLE_BASE_TRACING)
44 using perfetto::protos::pbzero::ChromeThreadPoolTask;
45 using perfetto::protos::pbzero::ChromeTrackEvent;
46 #endif  // BUILDFLAG(ENABLE_BASE_TRACING)
47 
48 constexpr const char* kExecutionModeString[] = {"parallel", "sequenced",
49                                                 "single thread", "job"};
50 static_assert(
51     std::size(kExecutionModeString) ==
52         static_cast<size_t>(TaskSourceExecutionMode::kMax) + 1,
53     "Array kExecutionModeString is out of sync with TaskSourceExecutionMode.");
54 
HasLogBestEffortTasksSwitch()55 bool HasLogBestEffortTasksSwitch() {
56   // The CommandLine might not be initialized if ThreadPool is initialized in a
57   // dynamic library which doesn't have access to argc/argv.
58   return CommandLine::InitializedForCurrentProcess() &&
59          CommandLine::ForCurrentProcess()->HasSwitch(
60              switches::kLogBestEffortTasks);
61 }
62 
63 #if BUILDFLAG(ENABLE_BASE_TRACING)
TaskPriorityToProto(TaskPriority priority)64 ChromeThreadPoolTask::Priority TaskPriorityToProto(TaskPriority priority) {
65   switch (priority) {
66     case TaskPriority::BEST_EFFORT:
67       return ChromeThreadPoolTask::PRIORITY_BEST_EFFORT;
68     case TaskPriority::USER_VISIBLE:
69       return ChromeThreadPoolTask::PRIORITY_USER_VISIBLE;
70     case TaskPriority::USER_BLOCKING:
71       return ChromeThreadPoolTask::PRIORITY_USER_BLOCKING;
72   }
73 }
74 
ExecutionModeToProto(TaskSourceExecutionMode mode)75 ChromeThreadPoolTask::ExecutionMode ExecutionModeToProto(
76     TaskSourceExecutionMode mode) {
77   switch (mode) {
78     case TaskSourceExecutionMode::kParallel:
79       return ChromeThreadPoolTask::EXECUTION_MODE_PARALLEL;
80     case TaskSourceExecutionMode::kSequenced:
81       return ChromeThreadPoolTask::EXECUTION_MODE_SEQUENCED;
82     case TaskSourceExecutionMode::kSingleThread:
83       return ChromeThreadPoolTask::EXECUTION_MODE_SINGLE_THREAD;
84     case TaskSourceExecutionMode::kJob:
85       return ChromeThreadPoolTask::EXECUTION_MODE_JOB;
86   }
87 }
88 
ShutdownBehaviorToProto(TaskShutdownBehavior shutdown_behavior)89 ChromeThreadPoolTask::ShutdownBehavior ShutdownBehaviorToProto(
90     TaskShutdownBehavior shutdown_behavior) {
91   switch (shutdown_behavior) {
92     case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
93       return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_CONTINUE_ON_SHUTDOWN;
94     case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
95       return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_SKIP_ON_SHUTDOWN;
96     case TaskShutdownBehavior::BLOCK_SHUTDOWN:
97       return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_BLOCK_SHUTDOWN;
98   }
99 }
100 #endif  //  BUILDFLAG(ENABLE_BASE_TRACING)
101 
EmitThreadPoolTraceEventMetadata(perfetto::EventContext & ctx,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)102 auto EmitThreadPoolTraceEventMetadata(perfetto::EventContext& ctx,
103                                       const TaskTraits& traits,
104                                       TaskSource* task_source,
105                                       const SequenceToken& token) {
106 #if BUILDFLAG(ENABLE_BASE_TRACING)
107   // Other parameters are included only when "scheduler" category is enabled.
108   const uint8_t* scheduler_category_enabled =
109       TRACE_EVENT_API_GET_CATEGORY_GROUP_ENABLED("scheduler");
110 
111   if (!*scheduler_category_enabled)
112     return;
113   auto* task = ctx.event<perfetto::protos::pbzero::ChromeTrackEvent>()
114                    ->set_thread_pool_task();
115   task->set_task_priority(TaskPriorityToProto(traits.priority()));
116   task->set_execution_mode(ExecutionModeToProto(task_source->execution_mode()));
117   task->set_shutdown_behavior(
118       ShutdownBehaviorToProto(traits.shutdown_behavior()));
119   if (token.IsValid())
120     task->set_sequence_token(token.ToInternalValue());
121 #endif  //  BUILDFLAG(ENABLE_BASE_TRACING)
122 }
123 
124 // If this is greater than 0 on a given thread, it will ignore the DCHECK which
125 // prevents posting BLOCK_SHUTDOWN tasks after shutdown. There are cases where
126 // posting back to a BLOCK_SHUTDOWN sequence is a coincidence rather than part
127 // of a shutdown blocking series of tasks, this prevents racy DCHECKs in those
128 // cases.
129 constinit thread_local int fizzle_block_shutdown_tasks_ref = 0;
130 
131 }  // namespace
132 
133 // Atomic internal state used by TaskTracker to track items that are blocking
134 // Shutdown. An "item" consist of either:
135 // - A running SKIP_ON_SHUTDOWN task
136 // - A queued/running BLOCK_SHUTDOWN TaskSource.
137 // Sequential consistency shouldn't be assumed from these calls (i.e. a thread
138 // reading |HasShutdownStarted() == true| isn't guaranteed to see all writes
139 // made before |StartShutdown()| on the thread that invoked it).
140 class TaskTracker::State {
141  public:
142   State() = default;
143   State(const State&) = delete;
144   State& operator=(const State&) = delete;
145 
146   // Sets a flag indicating that shutdown has started. Returns true if there are
147   // items blocking shutdown. Can only be called once.
StartShutdown()148   bool StartShutdown() {
149     const auto new_value =
150         subtle::NoBarrier_AtomicIncrement(&bits_, kShutdownHasStartedMask);
151 
152     // Check that the "shutdown has started" bit isn't zero. This would happen
153     // if it was incremented twice.
154     DCHECK(new_value & kShutdownHasStartedMask);
155 
156     const auto num_items_blocking_shutdown =
157         new_value >> kNumItemsBlockingShutdownBitOffset;
158     return num_items_blocking_shutdown != 0;
159   }
160 
161   // Returns true if shutdown has started.
HasShutdownStarted() const162   bool HasShutdownStarted() const {
163     return subtle::NoBarrier_Load(&bits_) & kShutdownHasStartedMask;
164   }
165 
166   // Returns true if there are items blocking shutdown.
AreItemsBlockingShutdown() const167   bool AreItemsBlockingShutdown() const {
168     const auto num_items_blocking_shutdown =
169         subtle::NoBarrier_Load(&bits_) >> kNumItemsBlockingShutdownBitOffset;
170     DCHECK_GE(num_items_blocking_shutdown, 0);
171     return num_items_blocking_shutdown != 0;
172   }
173 
174   // Increments the number of items blocking shutdown. Returns true if
175   // shutdown has started.
IncrementNumItemsBlockingShutdown()176   bool IncrementNumItemsBlockingShutdown() {
177 #if DCHECK_IS_ON()
178     // Verify that no overflow will occur.
179     const auto num_items_blocking_shutdown =
180         subtle::NoBarrier_Load(&bits_) >> kNumItemsBlockingShutdownBitOffset;
181     DCHECK_LT(num_items_blocking_shutdown,
182               std::numeric_limits<subtle::Atomic32>::max() -
183                   kNumItemsBlockingShutdownIncrement);
184 #endif
185 
186     const auto new_bits = subtle::NoBarrier_AtomicIncrement(
187         &bits_, kNumItemsBlockingShutdownIncrement);
188     return new_bits & kShutdownHasStartedMask;
189   }
190 
191   // Decrements the number of items blocking shutdown. Returns true if shutdown
192   // has started and the number of tasks blocking shutdown becomes zero.
DecrementNumItemsBlockingShutdown()193   bool DecrementNumItemsBlockingShutdown() {
194     const auto new_bits = subtle::NoBarrier_AtomicIncrement(
195         &bits_, -kNumItemsBlockingShutdownIncrement);
196     const bool shutdown_has_started = new_bits & kShutdownHasStartedMask;
197     const auto num_items_blocking_shutdown =
198         new_bits >> kNumItemsBlockingShutdownBitOffset;
199     DCHECK_GE(num_items_blocking_shutdown, 0);
200     return shutdown_has_started && num_items_blocking_shutdown == 0;
201   }
202 
203  private:
204   static constexpr subtle::Atomic32 kShutdownHasStartedMask = 1;
205   static constexpr subtle::Atomic32 kNumItemsBlockingShutdownBitOffset = 1;
206   static constexpr subtle::Atomic32 kNumItemsBlockingShutdownIncrement =
207       1 << kNumItemsBlockingShutdownBitOffset;
208 
209   // The LSB indicates whether shutdown has started. The other bits count the
210   // number of items blocking shutdown.
211   // No barriers are required to read/write |bits_| as this class is only used
212   // as an atomic state checker, it doesn't provide sequential consistency
213   // guarantees w.r.t. external state. Sequencing of the TaskTracker::State
214   // operations themselves is guaranteed by the AtomicIncrement RMW (read-
215   // modify-write) semantics however. For example, if two threads are racing to
216   // call IncrementNumItemsBlockingShutdown() and StartShutdown() respectively,
217   // either the first thread will win and the StartShutdown() call will see the
218   // blocking task or the second thread will win and
219   // IncrementNumItemsBlockingShutdown() will know that shutdown has started.
220   subtle::Atomic32 bits_ = 0;
221 };
222 
TaskTracker()223 TaskTracker::TaskTracker()
224     : has_log_best_effort_tasks_switch_(HasLogBestEffortTasksSwitch()),
225       state_(new State),
226       can_run_policy_(CanRunPolicy::kAll),
227       flush_cv_(flush_lock_.CreateConditionVariable()),
228       shutdown_lock_(&flush_lock_),
229       tracked_ref_factory_(this) {
230   // |flush_cv_| is only waited upon in FlushForTesting(), avoid instantiating a
231   // ScopedBlockingCallWithBaseSyncPrimitives from test threads intentionally
232   // idling themselves to wait on the ThreadPool.
233   flush_cv_.declare_only_used_while_idle();
234 }
235 
236 TaskTracker::~TaskTracker() = default;
237 
StartShutdown()238 void TaskTracker::StartShutdown() {
239   CheckedAutoLock auto_lock(shutdown_lock_);
240 
241   // This method can only be called once.
242   DCHECK(!shutdown_event_);
243   DCHECK(!state_->HasShutdownStarted());
244 
245   shutdown_event_.emplace();
246 
247   const bool tasks_are_blocking_shutdown = state_->StartShutdown();
248 
249   // From now, if a thread causes the number of tasks blocking shutdown to
250   // become zero, it will call OnBlockingShutdownTasksComplete().
251 
252   if (!tasks_are_blocking_shutdown) {
253     // If another thread posts a BLOCK_SHUTDOWN task at this moment, it will
254     // block until this method releases |shutdown_lock_|. Then, it will fail
255     // DCHECK(!shutdown_event_->IsSignaled()). This is the desired behavior
256     // because posting a BLOCK_SHUTDOWN task after StartShutdown() when no
257     // tasks are blocking shutdown isn't allowed.
258     shutdown_event_->Signal();
259     return;
260   }
261 }
262 
CompleteShutdown()263 void TaskTracker::CompleteShutdown() {
264   // It is safe to access |shutdown_event_| without holding |lock_| because the
265   // pointer never changes after being set by StartShutdown(), which must
266   // happen-before this.
267   DCHECK(TS_UNCHECKED_READ(shutdown_event_));
268 
269   {
270     base::ScopedAllowBaseSyncPrimitives allow_wait;
271     // Allow tests to wait for and introduce logging about the shutdown tasks
272     // before we block this thread.
273     BeginCompleteShutdown(*TS_UNCHECKED_READ(shutdown_event_));
274     // Now block the thread until all tasks are done.
275     TS_UNCHECKED_READ(shutdown_event_)->Wait();
276   }
277 
278   // Unblock FlushForTesting() and perform the FlushAsyncForTesting callback
279   // when shutdown completes.
280   {
281     CheckedAutoLock auto_lock(flush_lock_);
282     flush_cv_.Broadcast();
283   }
284   InvokeFlushCallbacksForTesting();
285 }
286 
FlushForTesting()287 void TaskTracker::FlushForTesting() {
288   AssertFlushForTestingAllowed();
289   CheckedAutoLock auto_lock(flush_lock_);
290   while (num_incomplete_task_sources_.load(std::memory_order_acquire) != 0 &&
291          !IsShutdownComplete()) {
292     flush_cv_.Wait();
293   }
294 }
295 
FlushAsyncForTesting(OnceClosure flush_callback)296 void TaskTracker::FlushAsyncForTesting(OnceClosure flush_callback) {
297   DCHECK(flush_callback);
298   {
299     CheckedAutoLock auto_lock(flush_lock_);
300     flush_callbacks_for_testing_.push_back(std::move(flush_callback));
301   }
302 
303   if (num_incomplete_task_sources_.load(std::memory_order_acquire) == 0 ||
304       IsShutdownComplete()) {
305     InvokeFlushCallbacksForTesting();
306   }
307 }
308 
SetCanRunPolicy(CanRunPolicy can_run_policy)309 void TaskTracker::SetCanRunPolicy(CanRunPolicy can_run_policy) {
310   can_run_policy_.store(can_run_policy);
311 }
312 
WillEnqueueJob(JobTaskSource * task_source)313 void TaskTracker::WillEnqueueJob(JobTaskSource* task_source) {
314   task_source->WillEnqueue(sequence_nums_.GetNext(), task_annotator_);
315 }
316 
WillPostTask(Task * task,TaskShutdownBehavior shutdown_behavior)317 bool TaskTracker::WillPostTask(Task* task,
318                                TaskShutdownBehavior shutdown_behavior) {
319   DCHECK(task);
320   DCHECK(task->task);
321 
322   task->sequence_num = sequence_nums_.GetNext();
323   if (state_->HasShutdownStarted()) {
324     // A non BLOCK_SHUTDOWN task is allowed to be posted iff shutdown hasn't
325     // started and the task is not delayed.
326     if (shutdown_behavior != TaskShutdownBehavior::BLOCK_SHUTDOWN ||
327         !task->delayed_run_time.is_null() ||
328         fizzle_block_shutdown_tasks_ref > 0) {
329       return false;
330     }
331 
332     // A BLOCK_SHUTDOWN task posted after shutdown has completed is an ordering
333     // bug. This aims to catch those early. In some cases it's a racy
334     // coincidence (i.e. posting back to a BLOCK_SHUTDOWN sequence from a task
335     // that wasn't itself guaranteed to finish before shutdown), in those cases
336     // a ScopedFizzleBlockShutdownTasks can bump
337     // `fizzle_block_shutdown_tasks_ref` to bypass this DCHECK.
338     CheckedAutoLock auto_lock(shutdown_lock_);
339     DCHECK(shutdown_event_);
340     DCHECK(!shutdown_event_->IsSignaled())
341         << "posted_from: " << task->posted_from.ToString();
342   }
343 
344   // TODO(scheduler-dev): Record the task traits here.
345   task_annotator_.WillQueueTask("ThreadPool_PostTask", task);
346 
347   return true;
348 }
349 
WillPostTaskNow(const Task & task,TaskPriority priority) const350 bool TaskTracker::WillPostTaskNow(const Task& task,
351                                   TaskPriority priority) const {
352   // Delayed tasks's TaskShutdownBehavior is implicitly capped at
353   // SKIP_ON_SHUTDOWN. i.e. it cannot BLOCK_SHUTDOWN, TaskTracker will not wait
354   // for a delayed task in a BLOCK_SHUTDOWN TaskSource and will also skip
355   // delayed tasks that happen to become ripe during shutdown.
356   if (!task.delayed_run_time.is_null() && state_->HasShutdownStarted())
357     return false;
358 
359   if (has_log_best_effort_tasks_switch_ &&
360       priority == TaskPriority::BEST_EFFORT) {
361     // A TaskPriority::BEST_EFFORT task is being posted.
362     LOG(INFO) << task.posted_from.ToString();
363   }
364   return true;
365 }
366 
RegisterTaskSource(scoped_refptr<TaskSource> task_source)367 RegisteredTaskSource TaskTracker::RegisterTaskSource(
368     scoped_refptr<TaskSource> task_source) {
369   DCHECK(task_source);
370 
371   TaskShutdownBehavior shutdown_behavior = task_source->shutdown_behavior();
372   if (!BeforeQueueTaskSource(shutdown_behavior))
373     return nullptr;
374 
375   num_incomplete_task_sources_.fetch_add(1, std::memory_order_relaxed);
376   return RegisteredTaskSource(std::move(task_source), this);
377 }
378 
CanRunPriority(TaskPriority priority) const379 bool TaskTracker::CanRunPriority(TaskPriority priority) const {
380   auto can_run_policy = can_run_policy_.load();
381 
382   if (can_run_policy == CanRunPolicy::kAll)
383     return true;
384 
385   if (can_run_policy == CanRunPolicy::kForegroundOnly &&
386       priority >= TaskPriority::USER_VISIBLE) {
387     return true;
388   }
389 
390   return false;
391 }
392 
RunAndPopNextTask(RegisteredTaskSource task_source)393 RegisteredTaskSource TaskTracker::RunAndPopNextTask(
394     RegisteredTaskSource task_source) {
395   DCHECK(task_source);
396 
397   const bool should_run_tasks = BeforeRunTask(task_source->shutdown_behavior());
398 
399   // Run the next task in |task_source|.
400   std::optional<Task> task;
401   TaskTraits traits;
402   {
403     auto transaction = task_source->BeginTransaction();
404     task = should_run_tasks ? task_source.TakeTask(&transaction)
405                             : task_source.Clear(&transaction);
406     traits = transaction.traits();
407   }
408 
409   if (task) {
410     // Skip delayed tasks if shutdown started.
411     if (!task->delayed_run_time.is_null() && state_->HasShutdownStarted())
412       task->task = base::DoNothingWithBoundArgs(std::move(task->task));
413 
414     // Run the |task| (whether it's a worker task or the Clear() closure).
415     RunTask(std::move(task.value()), task_source.get(), traits);
416   }
417   if (should_run_tasks)
418     AfterRunTask(task_source->shutdown_behavior());
419 
420   const bool task_source_must_be_queued = task_source.DidProcessTask();
421   // |task_source| should be reenqueued iff requested by DidProcessTask().
422   if (task_source_must_be_queued)
423     return task_source;
424   return nullptr;
425 }
426 
HasShutdownStarted() const427 bool TaskTracker::HasShutdownStarted() const {
428   return state_->HasShutdownStarted();
429 }
430 
IsShutdownComplete() const431 bool TaskTracker::IsShutdownComplete() const {
432   CheckedAutoLock auto_lock(shutdown_lock_);
433   return shutdown_event_ && shutdown_event_->IsSignaled();
434 }
435 
BeginFizzlingBlockShutdownTasks()436 void TaskTracker::BeginFizzlingBlockShutdownTasks() {
437   ++fizzle_block_shutdown_tasks_ref;
438 }
439 
EndFizzlingBlockShutdownTasks()440 void TaskTracker::EndFizzlingBlockShutdownTasks() {
441   CHECK_GE(--fizzle_block_shutdown_tasks_ref, 0);
442 }
443 
RunTask(Task task,TaskSource * task_source,const TaskTraits & traits)444 void TaskTracker::RunTask(Task task,
445                           TaskSource* task_source,
446                           const TaskTraits& traits) {
447   DCHECK(task_source);
448 
449   const auto environment = task_source->GetExecutionEnvironment();
450 
451   struct BlockShutdownTaskFizzler {
452     BlockShutdownTaskFizzler() {
453       // Nothing outside RunTask should be bumping
454       // `fizzle_block_shutdown_tasks_ref`.
455       DCHECK_EQ(fizzle_block_shutdown_tasks_ref, 0);
456       ++fizzle_block_shutdown_tasks_ref;
457     }
458     ~BlockShutdownTaskFizzler() {
459       --fizzle_block_shutdown_tasks_ref;
460       // The refs should be balanced after running the task.
461       DCHECK_EQ(fizzle_block_shutdown_tasks_ref, 0);
462     }
463   };
464   std::optional<ScopedDisallowSingleton> disallow_singleton;
465   std::optional<ScopedDisallowBlocking> disallow_blocking;
466   std::optional<ScopedDisallowBaseSyncPrimitives> disallow_sync_primitives;
467   std::optional<BlockShutdownTaskFizzler> fizzle_block_shutdown_tasks;
468   if (traits.shutdown_behavior() ==
469       TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
470     disallow_singleton.emplace();
471     fizzle_block_shutdown_tasks.emplace();
472   }
473   if (!traits.may_block())
474     disallow_blocking.emplace();
475   if (!traits.with_base_sync_primitives())
476     disallow_sync_primitives.emplace();
477 
478   {
479     DCHECK(environment.token.IsValid());
480     TaskScope task_scope(environment.token,
481                          /* is_thread_bound=*/task_source->execution_mode() ==
482                              TaskSourceExecutionMode::kSingleThread);
483     ScopedSetTaskPriorityForCurrentThread
484         scoped_set_task_priority_for_current_thread(traits.priority());
485 
486     // Local storage map used if none is provided by |environment|.
487     std::optional<SequenceLocalStorageMap> local_storage_map;
488     if (!environment.sequence_local_storage)
489       local_storage_map.emplace();
490 
491     ScopedSetSequenceLocalStorageMapForCurrentThread
492         scoped_set_sequence_local_storage_map_for_current_thread(
493             environment.sequence_local_storage
494                 ? environment.sequence_local_storage
495                 : &local_storage_map.value());
496 
497     // Set up TaskRunner CurrentDefaultHandle as expected for the scope of the
498     // task.
499     std::optional<SequencedTaskRunner::CurrentDefaultHandle>
500         sequenced_task_runner_current_default_handle;
501     std::optional<SingleThreadTaskRunner::CurrentDefaultHandle>
502         single_thread_task_runner_current_default_handle;
503     if (environment.sequenced_task_runner) {
504       DCHECK_EQ(TaskSourceExecutionMode::kSequenced,
505                 task_source->execution_mode());
506       sequenced_task_runner_current_default_handle.emplace(
507           environment.sequenced_task_runner);
508     } else if (environment.single_thread_task_runner) {
509       DCHECK_EQ(TaskSourceExecutionMode::kSingleThread,
510                 task_source->execution_mode());
511       single_thread_task_runner_current_default_handle.emplace(
512           environment.single_thread_task_runner);
513     } else {
514       DCHECK_NE(TaskSourceExecutionMode::kSequenced,
515                 task_source->execution_mode());
516       DCHECK_NE(TaskSourceExecutionMode::kSingleThread,
517                 task_source->execution_mode());
518     }
519 
520     RunTaskWithShutdownBehavior(task, traits, task_source, environment.token);
521 
522     // Make sure the arguments bound to the callback are deleted within the
523     // scope in which the callback runs.
524     task.task = OnceClosure();
525   }
526 }
527 
BeginCompleteShutdown(base::WaitableEvent & shutdown_event)528 void TaskTracker::BeginCompleteShutdown(base::WaitableEvent& shutdown_event) {
529   // Do nothing in production, tests may override this.
530 }
531 
HasIncompleteTaskSourcesForTesting() const532 bool TaskTracker::HasIncompleteTaskSourcesForTesting() const {
533   return num_incomplete_task_sources_.load(std::memory_order_acquire) != 0;
534 }
535 
BeforeQueueTaskSource(TaskShutdownBehavior shutdown_behavior)536 bool TaskTracker::BeforeQueueTaskSource(
537     TaskShutdownBehavior shutdown_behavior) {
538   if (shutdown_behavior == TaskShutdownBehavior::BLOCK_SHUTDOWN) {
539     // BLOCK_SHUTDOWN task sources block shutdown between the moment they are
540     // queued and the moment their last task completes its execution.
541     const bool shutdown_started = state_->IncrementNumItemsBlockingShutdown();
542 
543     if (shutdown_started) {
544       // A BLOCK_SHUTDOWN task posted after shutdown has completed is an
545       // ordering bug. This aims to catch those early.
546       CheckedAutoLock auto_lock(shutdown_lock_);
547       DCHECK(shutdown_event_);
548       DCHECK(!shutdown_event_->IsSignaled());
549     }
550 
551     return true;
552   }
553 
554   // A non BLOCK_SHUTDOWN task is allowed to be posted iff shutdown hasn't
555   // started.
556   return !state_->HasShutdownStarted();
557 }
558 
BeforeRunTask(TaskShutdownBehavior shutdown_behavior)559 bool TaskTracker::BeforeRunTask(TaskShutdownBehavior shutdown_behavior) {
560   switch (shutdown_behavior) {
561     case TaskShutdownBehavior::BLOCK_SHUTDOWN: {
562       // The number of tasks blocking shutdown has been incremented when the
563       // task was posted.
564       DCHECK(state_->AreItemsBlockingShutdown());
565 
566       // Trying to run a BLOCK_SHUTDOWN task after shutdown has completed is
567       // unexpected as it either shouldn't have been posted if shutdown
568       // completed or should be blocking shutdown if it was posted before it
569       // did.
570       DCHECK(!state_->HasShutdownStarted() || !IsShutdownComplete());
571 
572       return true;
573     }
574 
575     case TaskShutdownBehavior::SKIP_ON_SHUTDOWN: {
576       // SKIP_ON_SHUTDOWN tasks block shutdown while they are running.
577       const bool shutdown_started = state_->IncrementNumItemsBlockingShutdown();
578 
579       if (shutdown_started) {
580         // The SKIP_ON_SHUTDOWN task isn't allowed to run during shutdown.
581         // Decrement the number of tasks blocking shutdown that was wrongly
582         // incremented.
583         DecrementNumItemsBlockingShutdown();
584         return false;
585       }
586 
587       return true;
588     }
589 
590     case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN: {
591       return !state_->HasShutdownStarted();
592     }
593   }
594 
595   NOTREACHED();
596 }
597 
AfterRunTask(TaskShutdownBehavior shutdown_behavior)598 void TaskTracker::AfterRunTask(TaskShutdownBehavior shutdown_behavior) {
599   if (shutdown_behavior == TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
600     DecrementNumItemsBlockingShutdown();
601   }
602 }
603 
UnregisterTaskSource(scoped_refptr<TaskSource> task_source)604 scoped_refptr<TaskSource> TaskTracker::UnregisterTaskSource(
605     scoped_refptr<TaskSource> task_source) {
606   DCHECK(task_source);
607   if (task_source->shutdown_behavior() ==
608       TaskShutdownBehavior::BLOCK_SHUTDOWN) {
609     DecrementNumItemsBlockingShutdown();
610   }
611   DecrementNumIncompleteTaskSources();
612   return task_source;
613 }
614 
DecrementNumItemsBlockingShutdown()615 void TaskTracker::DecrementNumItemsBlockingShutdown() {
616   const bool shutdown_started_and_no_items_block_shutdown =
617       state_->DecrementNumItemsBlockingShutdown();
618   if (!shutdown_started_and_no_items_block_shutdown)
619     return;
620 
621   CheckedAutoLock auto_lock(shutdown_lock_);
622   DCHECK(shutdown_event_);
623   shutdown_event_->Signal();
624 }
625 
DecrementNumIncompleteTaskSources()626 void TaskTracker::DecrementNumIncompleteTaskSources() {
627   const auto prev_num_incomplete_task_sources =
628       num_incomplete_task_sources_.fetch_sub(1);
629   DCHECK_GE(prev_num_incomplete_task_sources, 1);
630   if (prev_num_incomplete_task_sources == 1) {
631     {
632       CheckedAutoLock auto_lock(flush_lock_);
633       flush_cv_.Broadcast();
634     }
635     InvokeFlushCallbacksForTesting();
636   }
637 }
638 
InvokeFlushCallbacksForTesting()639 void TaskTracker::InvokeFlushCallbacksForTesting() {
640   base::circular_deque<OnceClosure> flush_callbacks;
641   {
642     CheckedAutoLock auto_lock(flush_lock_);
643     flush_callbacks = std::move(flush_callbacks_for_testing_);
644   }
645   for (auto& flush_callback : flush_callbacks)
646     std::move(flush_callback).Run();
647 }
648 
RunContinueOnShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)649 NOINLINE void TaskTracker::RunContinueOnShutdown(Task& task,
650                                                  const TaskTraits& traits,
651                                                  TaskSource* task_source,
652                                                  const SequenceToken& token) {
653   NO_CODE_FOLDING();
654   RunTaskImpl(task, traits, task_source, token);
655 }
656 
RunSkipOnShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)657 NOINLINE void TaskTracker::RunSkipOnShutdown(Task& task,
658                                              const TaskTraits& traits,
659                                              TaskSource* task_source,
660                                              const SequenceToken& token) {
661   NO_CODE_FOLDING();
662   RunTaskImpl(task, traits, task_source, token);
663 }
664 
RunBlockShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)665 NOINLINE void TaskTracker::RunBlockShutdown(Task& task,
666                                             const TaskTraits& traits,
667                                             TaskSource* task_source,
668                                             const SequenceToken& token) {
669   NO_CODE_FOLDING();
670   RunTaskImpl(task, traits, task_source, token);
671 }
672 
RunTaskImpl(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)673 void TaskTracker::RunTaskImpl(Task& task,
674                               const TaskTraits& traits,
675                               TaskSource* task_source,
676                               const SequenceToken& token) {
677   task_annotator_.RunTask(
678       "ThreadPool_RunTask", task, [&](perfetto::EventContext& ctx) {
679         EmitThreadPoolTraceEventMetadata(ctx, traits, task_source, token);
680       });
681 }
682 
RunTaskWithShutdownBehavior(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)683 void TaskTracker::RunTaskWithShutdownBehavior(Task& task,
684                                               const TaskTraits& traits,
685                                               TaskSource* task_source,
686                                               const SequenceToken& token) {
687   switch (traits.shutdown_behavior()) {
688     case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
689       RunContinueOnShutdown(task, traits, task_source, token);
690       return;
691     case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
692       RunSkipOnShutdown(task, traits, task_source, token);
693       return;
694     case TaskShutdownBehavior::BLOCK_SHUTDOWN:
695       RunBlockShutdown(task, traits, task_source, token);
696       return;
697   }
698 }
699 
700 }  // namespace internal
701 }  // namespace base
702