1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/task_tracker.h"
6
7 #include <atomic>
8 #include <optional>
9 #include <string>
10 #include <utility>
11
12 #include "base/base_switches.h"
13 #include "base/command_line.h"
14 #include "base/compiler_specific.h"
15 #include "base/debug/alias.h"
16 #include "base/functional/callback.h"
17 #include "base/json/json_writer.h"
18 #include "base/logging.h"
19 #include "base/memory/ptr_util.h"
20 #include "base/metrics/histogram_macros.h"
21 #include "base/notreached.h"
22 #include "base/sequence_token.h"
23 #include "base/strings/string_util.h"
24 #include "base/synchronization/condition_variable.h"
25 #include "base/synchronization/waitable_event.h"
26 #include "base/task/scoped_set_task_priority_for_current_thread.h"
27 #include "base/task/sequenced_task_runner.h"
28 #include "base/task/single_thread_task_runner.h"
29 #include "base/task/thread_pool/job_task_source.h"
30 #include "base/task/thread_pool/task_source.h"
31 #include "base/threading/sequence_local_storage_map.h"
32 #include "base/threading/thread_restrictions.h"
33 #include "base/time/time.h"
34 #include "base/trace_event/base_tracing.h"
35 #include "base/values.h"
36 #include "build/build_config.h"
37
38 namespace base {
39 namespace internal {
40
41 namespace {
42
43 #if BUILDFLAG(ENABLE_BASE_TRACING)
44 using perfetto::protos::pbzero::ChromeThreadPoolTask;
45 using perfetto::protos::pbzero::ChromeTrackEvent;
46 #endif // BUILDFLAG(ENABLE_BASE_TRACING)
47
48 constexpr const char* kExecutionModeString[] = {"parallel", "sequenced",
49 "single thread", "job"};
50 static_assert(
51 std::size(kExecutionModeString) ==
52 static_cast<size_t>(TaskSourceExecutionMode::kMax) + 1,
53 "Array kExecutionModeString is out of sync with TaskSourceExecutionMode.");
54
HasLogBestEffortTasksSwitch()55 bool HasLogBestEffortTasksSwitch() {
56 // The CommandLine might not be initialized if ThreadPool is initialized in a
57 // dynamic library which doesn't have access to argc/argv.
58 return CommandLine::InitializedForCurrentProcess() &&
59 CommandLine::ForCurrentProcess()->HasSwitch(
60 switches::kLogBestEffortTasks);
61 }
62
63 #if BUILDFLAG(ENABLE_BASE_TRACING)
TaskPriorityToProto(TaskPriority priority)64 ChromeThreadPoolTask::Priority TaskPriorityToProto(TaskPriority priority) {
65 switch (priority) {
66 case TaskPriority::BEST_EFFORT:
67 return ChromeThreadPoolTask::PRIORITY_BEST_EFFORT;
68 case TaskPriority::USER_VISIBLE:
69 return ChromeThreadPoolTask::PRIORITY_USER_VISIBLE;
70 case TaskPriority::USER_BLOCKING:
71 return ChromeThreadPoolTask::PRIORITY_USER_BLOCKING;
72 }
73 }
74
ExecutionModeToProto(TaskSourceExecutionMode mode)75 ChromeThreadPoolTask::ExecutionMode ExecutionModeToProto(
76 TaskSourceExecutionMode mode) {
77 switch (mode) {
78 case TaskSourceExecutionMode::kParallel:
79 return ChromeThreadPoolTask::EXECUTION_MODE_PARALLEL;
80 case TaskSourceExecutionMode::kSequenced:
81 return ChromeThreadPoolTask::EXECUTION_MODE_SEQUENCED;
82 case TaskSourceExecutionMode::kSingleThread:
83 return ChromeThreadPoolTask::EXECUTION_MODE_SINGLE_THREAD;
84 case TaskSourceExecutionMode::kJob:
85 return ChromeThreadPoolTask::EXECUTION_MODE_JOB;
86 }
87 }
88
ShutdownBehaviorToProto(TaskShutdownBehavior shutdown_behavior)89 ChromeThreadPoolTask::ShutdownBehavior ShutdownBehaviorToProto(
90 TaskShutdownBehavior shutdown_behavior) {
91 switch (shutdown_behavior) {
92 case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
93 return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_CONTINUE_ON_SHUTDOWN;
94 case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
95 return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_SKIP_ON_SHUTDOWN;
96 case TaskShutdownBehavior::BLOCK_SHUTDOWN:
97 return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_BLOCK_SHUTDOWN;
98 }
99 }
100 #endif // BUILDFLAG(ENABLE_BASE_TRACING)
101
EmitThreadPoolTraceEventMetadata(perfetto::EventContext & ctx,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)102 auto EmitThreadPoolTraceEventMetadata(perfetto::EventContext& ctx,
103 const TaskTraits& traits,
104 TaskSource* task_source,
105 const SequenceToken& token) {
106 #if BUILDFLAG(ENABLE_BASE_TRACING)
107 // Other parameters are included only when "scheduler" category is enabled.
108 const uint8_t* scheduler_category_enabled =
109 TRACE_EVENT_API_GET_CATEGORY_GROUP_ENABLED("scheduler");
110
111 if (!*scheduler_category_enabled)
112 return;
113 auto* task = ctx.event<perfetto::protos::pbzero::ChromeTrackEvent>()
114 ->set_thread_pool_task();
115 task->set_task_priority(TaskPriorityToProto(traits.priority()));
116 task->set_execution_mode(ExecutionModeToProto(task_source->execution_mode()));
117 task->set_shutdown_behavior(
118 ShutdownBehaviorToProto(traits.shutdown_behavior()));
119 if (token.IsValid())
120 task->set_sequence_token(token.ToInternalValue());
121 #endif // BUILDFLAG(ENABLE_BASE_TRACING)
122 }
123
124 // If this is greater than 0 on a given thread, it will ignore the DCHECK which
125 // prevents posting BLOCK_SHUTDOWN tasks after shutdown. There are cases where
126 // posting back to a BLOCK_SHUTDOWN sequence is a coincidence rather than part
127 // of a shutdown blocking series of tasks, this prevents racy DCHECKs in those
128 // cases.
129 constinit thread_local int fizzle_block_shutdown_tasks_ref = 0;
130
131 } // namespace
132
133 // Atomic internal state used by TaskTracker to track items that are blocking
134 // Shutdown. An "item" consist of either:
135 // - A running SKIP_ON_SHUTDOWN task
136 // - A queued/running BLOCK_SHUTDOWN TaskSource.
137 // Sequential consistency shouldn't be assumed from these calls (i.e. a thread
138 // reading |HasShutdownStarted() == true| isn't guaranteed to see all writes
139 // made before |StartShutdown()| on the thread that invoked it).
140 class TaskTracker::State {
141 public:
142 State() = default;
143 State(const State&) = delete;
144 State& operator=(const State&) = delete;
145
146 // Sets a flag indicating that shutdown has started. Returns true if there are
147 // items blocking shutdown. Can only be called once.
StartShutdown()148 bool StartShutdown() {
149 const auto new_value =
150 subtle::NoBarrier_AtomicIncrement(&bits_, kShutdownHasStartedMask);
151
152 // Check that the "shutdown has started" bit isn't zero. This would happen
153 // if it was incremented twice.
154 DCHECK(new_value & kShutdownHasStartedMask);
155
156 const auto num_items_blocking_shutdown =
157 new_value >> kNumItemsBlockingShutdownBitOffset;
158 return num_items_blocking_shutdown != 0;
159 }
160
161 // Returns true if shutdown has started.
HasShutdownStarted() const162 bool HasShutdownStarted() const {
163 return subtle::NoBarrier_Load(&bits_) & kShutdownHasStartedMask;
164 }
165
166 // Returns true if there are items blocking shutdown.
AreItemsBlockingShutdown() const167 bool AreItemsBlockingShutdown() const {
168 const auto num_items_blocking_shutdown =
169 subtle::NoBarrier_Load(&bits_) >> kNumItemsBlockingShutdownBitOffset;
170 DCHECK_GE(num_items_blocking_shutdown, 0);
171 return num_items_blocking_shutdown != 0;
172 }
173
174 // Increments the number of items blocking shutdown. Returns true if
175 // shutdown has started.
IncrementNumItemsBlockingShutdown()176 bool IncrementNumItemsBlockingShutdown() {
177 #if DCHECK_IS_ON()
178 // Verify that no overflow will occur.
179 const auto num_items_blocking_shutdown =
180 subtle::NoBarrier_Load(&bits_) >> kNumItemsBlockingShutdownBitOffset;
181 DCHECK_LT(num_items_blocking_shutdown,
182 std::numeric_limits<subtle::Atomic32>::max() -
183 kNumItemsBlockingShutdownIncrement);
184 #endif
185
186 const auto new_bits = subtle::NoBarrier_AtomicIncrement(
187 &bits_, kNumItemsBlockingShutdownIncrement);
188 return new_bits & kShutdownHasStartedMask;
189 }
190
191 // Decrements the number of items blocking shutdown. Returns true if shutdown
192 // has started and the number of tasks blocking shutdown becomes zero.
DecrementNumItemsBlockingShutdown()193 bool DecrementNumItemsBlockingShutdown() {
194 const auto new_bits = subtle::NoBarrier_AtomicIncrement(
195 &bits_, -kNumItemsBlockingShutdownIncrement);
196 const bool shutdown_has_started = new_bits & kShutdownHasStartedMask;
197 const auto num_items_blocking_shutdown =
198 new_bits >> kNumItemsBlockingShutdownBitOffset;
199 DCHECK_GE(num_items_blocking_shutdown, 0);
200 return shutdown_has_started && num_items_blocking_shutdown == 0;
201 }
202
203 private:
204 static constexpr subtle::Atomic32 kShutdownHasStartedMask = 1;
205 static constexpr subtle::Atomic32 kNumItemsBlockingShutdownBitOffset = 1;
206 static constexpr subtle::Atomic32 kNumItemsBlockingShutdownIncrement =
207 1 << kNumItemsBlockingShutdownBitOffset;
208
209 // The LSB indicates whether shutdown has started. The other bits count the
210 // number of items blocking shutdown.
211 // No barriers are required to read/write |bits_| as this class is only used
212 // as an atomic state checker, it doesn't provide sequential consistency
213 // guarantees w.r.t. external state. Sequencing of the TaskTracker::State
214 // operations themselves is guaranteed by the AtomicIncrement RMW (read-
215 // modify-write) semantics however. For example, if two threads are racing to
216 // call IncrementNumItemsBlockingShutdown() and StartShutdown() respectively,
217 // either the first thread will win and the StartShutdown() call will see the
218 // blocking task or the second thread will win and
219 // IncrementNumItemsBlockingShutdown() will know that shutdown has started.
220 subtle::Atomic32 bits_ = 0;
221 };
222
TaskTracker()223 TaskTracker::TaskTracker()
224 : has_log_best_effort_tasks_switch_(HasLogBestEffortTasksSwitch()),
225 state_(new State),
226 can_run_policy_(CanRunPolicy::kAll),
227 flush_cv_(flush_lock_.CreateConditionVariable()),
228 shutdown_lock_(&flush_lock_),
229 tracked_ref_factory_(this) {
230 // |flush_cv_| is only waited upon in FlushForTesting(), avoid instantiating a
231 // ScopedBlockingCallWithBaseSyncPrimitives from test threads intentionally
232 // idling themselves to wait on the ThreadPool.
233 flush_cv_.declare_only_used_while_idle();
234 }
235
236 TaskTracker::~TaskTracker() = default;
237
StartShutdown()238 void TaskTracker::StartShutdown() {
239 CheckedAutoLock auto_lock(shutdown_lock_);
240
241 // This method can only be called once.
242 DCHECK(!shutdown_event_);
243 DCHECK(!state_->HasShutdownStarted());
244
245 shutdown_event_.emplace();
246
247 const bool tasks_are_blocking_shutdown = state_->StartShutdown();
248
249 // From now, if a thread causes the number of tasks blocking shutdown to
250 // become zero, it will call OnBlockingShutdownTasksComplete().
251
252 if (!tasks_are_blocking_shutdown) {
253 // If another thread posts a BLOCK_SHUTDOWN task at this moment, it will
254 // block until this method releases |shutdown_lock_|. Then, it will fail
255 // DCHECK(!shutdown_event_->IsSignaled()). This is the desired behavior
256 // because posting a BLOCK_SHUTDOWN task after StartShutdown() when no
257 // tasks are blocking shutdown isn't allowed.
258 shutdown_event_->Signal();
259 return;
260 }
261 }
262
CompleteShutdown()263 void TaskTracker::CompleteShutdown() {
264 // It is safe to access |shutdown_event_| without holding |lock_| because the
265 // pointer never changes after being set by StartShutdown(), which must
266 // happen-before this.
267 DCHECK(TS_UNCHECKED_READ(shutdown_event_));
268
269 {
270 base::ScopedAllowBaseSyncPrimitives allow_wait;
271 // Allow tests to wait for and introduce logging about the shutdown tasks
272 // before we block this thread.
273 BeginCompleteShutdown(*TS_UNCHECKED_READ(shutdown_event_));
274 // Now block the thread until all tasks are done.
275 TS_UNCHECKED_READ(shutdown_event_)->Wait();
276 }
277
278 // Unblock FlushForTesting() and perform the FlushAsyncForTesting callback
279 // when shutdown completes.
280 {
281 CheckedAutoLock auto_lock(flush_lock_);
282 flush_cv_.Broadcast();
283 }
284 InvokeFlushCallbacksForTesting();
285 }
286
FlushForTesting()287 void TaskTracker::FlushForTesting() {
288 AssertFlushForTestingAllowed();
289 CheckedAutoLock auto_lock(flush_lock_);
290 while (num_incomplete_task_sources_.load(std::memory_order_acquire) != 0 &&
291 !IsShutdownComplete()) {
292 flush_cv_.Wait();
293 }
294 }
295
FlushAsyncForTesting(OnceClosure flush_callback)296 void TaskTracker::FlushAsyncForTesting(OnceClosure flush_callback) {
297 DCHECK(flush_callback);
298 {
299 CheckedAutoLock auto_lock(flush_lock_);
300 flush_callbacks_for_testing_.push_back(std::move(flush_callback));
301 }
302
303 if (num_incomplete_task_sources_.load(std::memory_order_acquire) == 0 ||
304 IsShutdownComplete()) {
305 InvokeFlushCallbacksForTesting();
306 }
307 }
308
SetCanRunPolicy(CanRunPolicy can_run_policy)309 void TaskTracker::SetCanRunPolicy(CanRunPolicy can_run_policy) {
310 can_run_policy_.store(can_run_policy);
311 }
312
WillEnqueueJob(JobTaskSource * task_source)313 void TaskTracker::WillEnqueueJob(JobTaskSource* task_source) {
314 task_source->WillEnqueue(sequence_nums_.GetNext(), task_annotator_);
315 }
316
WillPostTask(Task * task,TaskShutdownBehavior shutdown_behavior)317 bool TaskTracker::WillPostTask(Task* task,
318 TaskShutdownBehavior shutdown_behavior) {
319 DCHECK(task);
320 DCHECK(task->task);
321
322 task->sequence_num = sequence_nums_.GetNext();
323 if (state_->HasShutdownStarted()) {
324 // A non BLOCK_SHUTDOWN task is allowed to be posted iff shutdown hasn't
325 // started and the task is not delayed.
326 if (shutdown_behavior != TaskShutdownBehavior::BLOCK_SHUTDOWN ||
327 !task->delayed_run_time.is_null() ||
328 fizzle_block_shutdown_tasks_ref > 0) {
329 return false;
330 }
331
332 // A BLOCK_SHUTDOWN task posted after shutdown has completed is an ordering
333 // bug. This aims to catch those early. In some cases it's a racy
334 // coincidence (i.e. posting back to a BLOCK_SHUTDOWN sequence from a task
335 // that wasn't itself guaranteed to finish before shutdown), in those cases
336 // a ScopedFizzleBlockShutdownTasks can bump
337 // `fizzle_block_shutdown_tasks_ref` to bypass this DCHECK.
338 CheckedAutoLock auto_lock(shutdown_lock_);
339 DCHECK(shutdown_event_);
340 DCHECK(!shutdown_event_->IsSignaled())
341 << "posted_from: " << task->posted_from.ToString();
342 }
343
344 // TODO(scheduler-dev): Record the task traits here.
345 task_annotator_.WillQueueTask("ThreadPool_PostTask", task);
346
347 return true;
348 }
349
WillPostTaskNow(const Task & task,TaskPriority priority) const350 bool TaskTracker::WillPostTaskNow(const Task& task,
351 TaskPriority priority) const {
352 // Delayed tasks's TaskShutdownBehavior is implicitly capped at
353 // SKIP_ON_SHUTDOWN. i.e. it cannot BLOCK_SHUTDOWN, TaskTracker will not wait
354 // for a delayed task in a BLOCK_SHUTDOWN TaskSource and will also skip
355 // delayed tasks that happen to become ripe during shutdown.
356 if (!task.delayed_run_time.is_null() && state_->HasShutdownStarted())
357 return false;
358
359 if (has_log_best_effort_tasks_switch_ &&
360 priority == TaskPriority::BEST_EFFORT) {
361 // A TaskPriority::BEST_EFFORT task is being posted.
362 LOG(INFO) << task.posted_from.ToString();
363 }
364 return true;
365 }
366
RegisterTaskSource(scoped_refptr<TaskSource> task_source)367 RegisteredTaskSource TaskTracker::RegisterTaskSource(
368 scoped_refptr<TaskSource> task_source) {
369 DCHECK(task_source);
370
371 TaskShutdownBehavior shutdown_behavior = task_source->shutdown_behavior();
372 if (!BeforeQueueTaskSource(shutdown_behavior))
373 return nullptr;
374
375 num_incomplete_task_sources_.fetch_add(1, std::memory_order_relaxed);
376 return RegisteredTaskSource(std::move(task_source), this);
377 }
378
CanRunPriority(TaskPriority priority) const379 bool TaskTracker::CanRunPriority(TaskPriority priority) const {
380 auto can_run_policy = can_run_policy_.load();
381
382 if (can_run_policy == CanRunPolicy::kAll)
383 return true;
384
385 if (can_run_policy == CanRunPolicy::kForegroundOnly &&
386 priority >= TaskPriority::USER_VISIBLE) {
387 return true;
388 }
389
390 return false;
391 }
392
RunAndPopNextTask(RegisteredTaskSource task_source)393 RegisteredTaskSource TaskTracker::RunAndPopNextTask(
394 RegisteredTaskSource task_source) {
395 DCHECK(task_source);
396
397 const bool should_run_tasks = BeforeRunTask(task_source->shutdown_behavior());
398
399 // Run the next task in |task_source|.
400 std::optional<Task> task;
401 TaskTraits traits;
402 {
403 auto transaction = task_source->BeginTransaction();
404 task = should_run_tasks ? task_source.TakeTask(&transaction)
405 : task_source.Clear(&transaction);
406 traits = transaction.traits();
407 }
408
409 if (task) {
410 // Skip delayed tasks if shutdown started.
411 if (!task->delayed_run_time.is_null() && state_->HasShutdownStarted())
412 task->task = base::DoNothingWithBoundArgs(std::move(task->task));
413
414 // Run the |task| (whether it's a worker task or the Clear() closure).
415 RunTask(std::move(task.value()), task_source.get(), traits);
416 }
417 if (should_run_tasks)
418 AfterRunTask(task_source->shutdown_behavior());
419
420 const bool task_source_must_be_queued = task_source.DidProcessTask();
421 // |task_source| should be reenqueued iff requested by DidProcessTask().
422 if (task_source_must_be_queued)
423 return task_source;
424 return nullptr;
425 }
426
HasShutdownStarted() const427 bool TaskTracker::HasShutdownStarted() const {
428 return state_->HasShutdownStarted();
429 }
430
IsShutdownComplete() const431 bool TaskTracker::IsShutdownComplete() const {
432 CheckedAutoLock auto_lock(shutdown_lock_);
433 return shutdown_event_ && shutdown_event_->IsSignaled();
434 }
435
BeginFizzlingBlockShutdownTasks()436 void TaskTracker::BeginFizzlingBlockShutdownTasks() {
437 ++fizzle_block_shutdown_tasks_ref;
438 }
439
EndFizzlingBlockShutdownTasks()440 void TaskTracker::EndFizzlingBlockShutdownTasks() {
441 CHECK_GE(--fizzle_block_shutdown_tasks_ref, 0);
442 }
443
RunTask(Task task,TaskSource * task_source,const TaskTraits & traits)444 void TaskTracker::RunTask(Task task,
445 TaskSource* task_source,
446 const TaskTraits& traits) {
447 DCHECK(task_source);
448
449 const auto environment = task_source->GetExecutionEnvironment();
450
451 struct BlockShutdownTaskFizzler {
452 BlockShutdownTaskFizzler() {
453 // Nothing outside RunTask should be bumping
454 // `fizzle_block_shutdown_tasks_ref`.
455 DCHECK_EQ(fizzle_block_shutdown_tasks_ref, 0);
456 ++fizzle_block_shutdown_tasks_ref;
457 }
458 ~BlockShutdownTaskFizzler() {
459 --fizzle_block_shutdown_tasks_ref;
460 // The refs should be balanced after running the task.
461 DCHECK_EQ(fizzle_block_shutdown_tasks_ref, 0);
462 }
463 };
464 std::optional<ScopedDisallowSingleton> disallow_singleton;
465 std::optional<ScopedDisallowBlocking> disallow_blocking;
466 std::optional<ScopedDisallowBaseSyncPrimitives> disallow_sync_primitives;
467 std::optional<BlockShutdownTaskFizzler> fizzle_block_shutdown_tasks;
468 if (traits.shutdown_behavior() ==
469 TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
470 disallow_singleton.emplace();
471 fizzle_block_shutdown_tasks.emplace();
472 }
473 if (!traits.may_block())
474 disallow_blocking.emplace();
475 if (!traits.with_base_sync_primitives())
476 disallow_sync_primitives.emplace();
477
478 {
479 DCHECK(environment.token.IsValid());
480 TaskScope task_scope(environment.token,
481 /* is_thread_bound=*/task_source->execution_mode() ==
482 TaskSourceExecutionMode::kSingleThread);
483 ScopedSetTaskPriorityForCurrentThread
484 scoped_set_task_priority_for_current_thread(traits.priority());
485
486 // Local storage map used if none is provided by |environment|.
487 std::optional<SequenceLocalStorageMap> local_storage_map;
488 if (!environment.sequence_local_storage)
489 local_storage_map.emplace();
490
491 ScopedSetSequenceLocalStorageMapForCurrentThread
492 scoped_set_sequence_local_storage_map_for_current_thread(
493 environment.sequence_local_storage
494 ? environment.sequence_local_storage
495 : &local_storage_map.value());
496
497 // Set up TaskRunner CurrentDefaultHandle as expected for the scope of the
498 // task.
499 std::optional<SequencedTaskRunner::CurrentDefaultHandle>
500 sequenced_task_runner_current_default_handle;
501 std::optional<SingleThreadTaskRunner::CurrentDefaultHandle>
502 single_thread_task_runner_current_default_handle;
503 if (environment.sequenced_task_runner) {
504 DCHECK_EQ(TaskSourceExecutionMode::kSequenced,
505 task_source->execution_mode());
506 sequenced_task_runner_current_default_handle.emplace(
507 environment.sequenced_task_runner);
508 } else if (environment.single_thread_task_runner) {
509 DCHECK_EQ(TaskSourceExecutionMode::kSingleThread,
510 task_source->execution_mode());
511 single_thread_task_runner_current_default_handle.emplace(
512 environment.single_thread_task_runner);
513 } else {
514 DCHECK_NE(TaskSourceExecutionMode::kSequenced,
515 task_source->execution_mode());
516 DCHECK_NE(TaskSourceExecutionMode::kSingleThread,
517 task_source->execution_mode());
518 }
519
520 RunTaskWithShutdownBehavior(task, traits, task_source, environment.token);
521
522 // Make sure the arguments bound to the callback are deleted within the
523 // scope in which the callback runs.
524 task.task = OnceClosure();
525 }
526 }
527
BeginCompleteShutdown(base::WaitableEvent & shutdown_event)528 void TaskTracker::BeginCompleteShutdown(base::WaitableEvent& shutdown_event) {
529 // Do nothing in production, tests may override this.
530 }
531
HasIncompleteTaskSourcesForTesting() const532 bool TaskTracker::HasIncompleteTaskSourcesForTesting() const {
533 return num_incomplete_task_sources_.load(std::memory_order_acquire) != 0;
534 }
535
BeforeQueueTaskSource(TaskShutdownBehavior shutdown_behavior)536 bool TaskTracker::BeforeQueueTaskSource(
537 TaskShutdownBehavior shutdown_behavior) {
538 if (shutdown_behavior == TaskShutdownBehavior::BLOCK_SHUTDOWN) {
539 // BLOCK_SHUTDOWN task sources block shutdown between the moment they are
540 // queued and the moment their last task completes its execution.
541 const bool shutdown_started = state_->IncrementNumItemsBlockingShutdown();
542
543 if (shutdown_started) {
544 // A BLOCK_SHUTDOWN task posted after shutdown has completed is an
545 // ordering bug. This aims to catch those early.
546 CheckedAutoLock auto_lock(shutdown_lock_);
547 DCHECK(shutdown_event_);
548 DCHECK(!shutdown_event_->IsSignaled());
549 }
550
551 return true;
552 }
553
554 // A non BLOCK_SHUTDOWN task is allowed to be posted iff shutdown hasn't
555 // started.
556 return !state_->HasShutdownStarted();
557 }
558
BeforeRunTask(TaskShutdownBehavior shutdown_behavior)559 bool TaskTracker::BeforeRunTask(TaskShutdownBehavior shutdown_behavior) {
560 switch (shutdown_behavior) {
561 case TaskShutdownBehavior::BLOCK_SHUTDOWN: {
562 // The number of tasks blocking shutdown has been incremented when the
563 // task was posted.
564 DCHECK(state_->AreItemsBlockingShutdown());
565
566 // Trying to run a BLOCK_SHUTDOWN task after shutdown has completed is
567 // unexpected as it either shouldn't have been posted if shutdown
568 // completed or should be blocking shutdown if it was posted before it
569 // did.
570 DCHECK(!state_->HasShutdownStarted() || !IsShutdownComplete());
571
572 return true;
573 }
574
575 case TaskShutdownBehavior::SKIP_ON_SHUTDOWN: {
576 // SKIP_ON_SHUTDOWN tasks block shutdown while they are running.
577 const bool shutdown_started = state_->IncrementNumItemsBlockingShutdown();
578
579 if (shutdown_started) {
580 // The SKIP_ON_SHUTDOWN task isn't allowed to run during shutdown.
581 // Decrement the number of tasks blocking shutdown that was wrongly
582 // incremented.
583 DecrementNumItemsBlockingShutdown();
584 return false;
585 }
586
587 return true;
588 }
589
590 case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN: {
591 return !state_->HasShutdownStarted();
592 }
593 }
594
595 NOTREACHED();
596 }
597
AfterRunTask(TaskShutdownBehavior shutdown_behavior)598 void TaskTracker::AfterRunTask(TaskShutdownBehavior shutdown_behavior) {
599 if (shutdown_behavior == TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
600 DecrementNumItemsBlockingShutdown();
601 }
602 }
603
UnregisterTaskSource(scoped_refptr<TaskSource> task_source)604 scoped_refptr<TaskSource> TaskTracker::UnregisterTaskSource(
605 scoped_refptr<TaskSource> task_source) {
606 DCHECK(task_source);
607 if (task_source->shutdown_behavior() ==
608 TaskShutdownBehavior::BLOCK_SHUTDOWN) {
609 DecrementNumItemsBlockingShutdown();
610 }
611 DecrementNumIncompleteTaskSources();
612 return task_source;
613 }
614
DecrementNumItemsBlockingShutdown()615 void TaskTracker::DecrementNumItemsBlockingShutdown() {
616 const bool shutdown_started_and_no_items_block_shutdown =
617 state_->DecrementNumItemsBlockingShutdown();
618 if (!shutdown_started_and_no_items_block_shutdown)
619 return;
620
621 CheckedAutoLock auto_lock(shutdown_lock_);
622 DCHECK(shutdown_event_);
623 shutdown_event_->Signal();
624 }
625
DecrementNumIncompleteTaskSources()626 void TaskTracker::DecrementNumIncompleteTaskSources() {
627 const auto prev_num_incomplete_task_sources =
628 num_incomplete_task_sources_.fetch_sub(1);
629 DCHECK_GE(prev_num_incomplete_task_sources, 1);
630 if (prev_num_incomplete_task_sources == 1) {
631 {
632 CheckedAutoLock auto_lock(flush_lock_);
633 flush_cv_.Broadcast();
634 }
635 InvokeFlushCallbacksForTesting();
636 }
637 }
638
InvokeFlushCallbacksForTesting()639 void TaskTracker::InvokeFlushCallbacksForTesting() {
640 base::circular_deque<OnceClosure> flush_callbacks;
641 {
642 CheckedAutoLock auto_lock(flush_lock_);
643 flush_callbacks = std::move(flush_callbacks_for_testing_);
644 }
645 for (auto& flush_callback : flush_callbacks)
646 std::move(flush_callback).Run();
647 }
648
RunContinueOnShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)649 NOINLINE void TaskTracker::RunContinueOnShutdown(Task& task,
650 const TaskTraits& traits,
651 TaskSource* task_source,
652 const SequenceToken& token) {
653 NO_CODE_FOLDING();
654 RunTaskImpl(task, traits, task_source, token);
655 }
656
RunSkipOnShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)657 NOINLINE void TaskTracker::RunSkipOnShutdown(Task& task,
658 const TaskTraits& traits,
659 TaskSource* task_source,
660 const SequenceToken& token) {
661 NO_CODE_FOLDING();
662 RunTaskImpl(task, traits, task_source, token);
663 }
664
RunBlockShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)665 NOINLINE void TaskTracker::RunBlockShutdown(Task& task,
666 const TaskTraits& traits,
667 TaskSource* task_source,
668 const SequenceToken& token) {
669 NO_CODE_FOLDING();
670 RunTaskImpl(task, traits, task_source, token);
671 }
672
RunTaskImpl(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)673 void TaskTracker::RunTaskImpl(Task& task,
674 const TaskTraits& traits,
675 TaskSource* task_source,
676 const SequenceToken& token) {
677 task_annotator_.RunTask(
678 "ThreadPool_RunTask", task, [&](perfetto::EventContext& ctx) {
679 EmitThreadPoolTraceEventMetadata(ctx, traits, task_source, token);
680 });
681 }
682
RunTaskWithShutdownBehavior(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)683 void TaskTracker::RunTaskWithShutdownBehavior(Task& task,
684 const TaskTraits& traits,
685 TaskSource* task_source,
686 const SequenceToken& token) {
687 switch (traits.shutdown_behavior()) {
688 case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
689 RunContinueOnShutdown(task, traits, task_source, token);
690 return;
691 case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
692 RunSkipOnShutdown(task, traits, task_source, token);
693 return;
694 case TaskShutdownBehavior::BLOCK_SHUTDOWN:
695 RunBlockShutdown(task, traits, task_source, token);
696 return;
697 }
698 }
699
700 } // namespace internal
701 } // namespace base
702