1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/task_tracker.h"
6
7 #include <atomic>
8 #include <string>
9 #include <utility>
10
11 #include "base/base_switches.h"
12 #include "base/command_line.h"
13 #include "base/compiler_specific.h"
14 #include "base/debug/alias.h"
15 #include "base/functional/callback.h"
16 #include "base/json/json_writer.h"
17 #include "base/logging.h"
18 #include "base/memory/ptr_util.h"
19 #include "base/metrics/histogram_macros.h"
20 #include "base/notreached.h"
21 #include "base/sequence_token.h"
22 #include "base/strings/string_util.h"
23 #include "base/synchronization/condition_variable.h"
24 #include "base/synchronization/waitable_event.h"
25 #include "base/task/scoped_set_task_priority_for_current_thread.h"
26 #include "base/task/sequenced_task_runner.h"
27 #include "base/task/single_thread_task_runner.h"
28 #include "base/threading/sequence_local_storage_map.h"
29 #include "base/threading/thread_restrictions.h"
30 #include "base/time/time.h"
31 #include "base/trace_event/base_tracing.h"
32 #include "base/values.h"
33 #include "build/build_config.h"
34 #include "third_party/abseil-cpp/absl/base/attributes.h"
35 #include "third_party/abseil-cpp/absl/types/optional.h"
36
37 namespace base {
38 namespace internal {
39
40 namespace {
41
42 #if BUILDFLAG(ENABLE_BASE_TRACING)
43 using perfetto::protos::pbzero::ChromeThreadPoolTask;
44 using perfetto::protos::pbzero::ChromeTrackEvent;
45 #endif // BUILDFLAG(ENABLE_BASE_TRACING)
46
47 constexpr const char* kExecutionModeString[] = {"parallel", "sequenced",
48 "single thread", "job"};
49 static_assert(
50 std::size(kExecutionModeString) ==
51 static_cast<size_t>(TaskSourceExecutionMode::kMax) + 1,
52 "Array kExecutionModeString is out of sync with TaskSourceExecutionMode.");
53
HasLogBestEffortTasksSwitch()54 bool HasLogBestEffortTasksSwitch() {
55 // The CommandLine might not be initialized if ThreadPool is initialized in a
56 // dynamic library which doesn't have access to argc/argv.
57 return CommandLine::InitializedForCurrentProcess() &&
58 CommandLine::ForCurrentProcess()->HasSwitch(
59 switches::kLogBestEffortTasks);
60 }
61
62 #if BUILDFLAG(ENABLE_BASE_TRACING)
TaskPriorityToProto(TaskPriority priority)63 ChromeThreadPoolTask::Priority TaskPriorityToProto(TaskPriority priority) {
64 switch (priority) {
65 case TaskPriority::BEST_EFFORT:
66 return ChromeThreadPoolTask::PRIORITY_BEST_EFFORT;
67 case TaskPriority::USER_VISIBLE:
68 return ChromeThreadPoolTask::PRIORITY_USER_VISIBLE;
69 case TaskPriority::USER_BLOCKING:
70 return ChromeThreadPoolTask::PRIORITY_USER_BLOCKING;
71 }
72 }
73
ExecutionModeToProto(TaskSourceExecutionMode mode)74 ChromeThreadPoolTask::ExecutionMode ExecutionModeToProto(
75 TaskSourceExecutionMode mode) {
76 switch (mode) {
77 case TaskSourceExecutionMode::kParallel:
78 return ChromeThreadPoolTask::EXECUTION_MODE_PARALLEL;
79 case TaskSourceExecutionMode::kSequenced:
80 return ChromeThreadPoolTask::EXECUTION_MODE_SEQUENCED;
81 case TaskSourceExecutionMode::kSingleThread:
82 return ChromeThreadPoolTask::EXECUTION_MODE_SINGLE_THREAD;
83 case TaskSourceExecutionMode::kJob:
84 return ChromeThreadPoolTask::EXECUTION_MODE_JOB;
85 }
86 }
87
ShutdownBehaviorToProto(TaskShutdownBehavior shutdown_behavior)88 ChromeThreadPoolTask::ShutdownBehavior ShutdownBehaviorToProto(
89 TaskShutdownBehavior shutdown_behavior) {
90 switch (shutdown_behavior) {
91 case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
92 return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_CONTINUE_ON_SHUTDOWN;
93 case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
94 return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_SKIP_ON_SHUTDOWN;
95 case TaskShutdownBehavior::BLOCK_SHUTDOWN:
96 return ChromeThreadPoolTask::SHUTDOWN_BEHAVIOR_BLOCK_SHUTDOWN;
97 }
98 }
99 #endif // BUILDFLAG(ENABLE_BASE_TRACING)
100
EmitThreadPoolTraceEventMetadata(perfetto::EventContext & ctx,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)101 auto EmitThreadPoolTraceEventMetadata(perfetto::EventContext& ctx,
102 const TaskTraits& traits,
103 TaskSource* task_source,
104 const SequenceToken& token) {
105 #if BUILDFLAG(ENABLE_BASE_TRACING)
106 // Other parameters are included only when "scheduler" category is enabled.
107 const uint8_t* scheduler_category_enabled =
108 TRACE_EVENT_API_GET_CATEGORY_GROUP_ENABLED("scheduler");
109
110 if (!*scheduler_category_enabled)
111 return;
112 auto* task = ctx.event<perfetto::protos::pbzero::ChromeTrackEvent>()
113 ->set_thread_pool_task();
114 task->set_task_priority(TaskPriorityToProto(traits.priority()));
115 task->set_execution_mode(ExecutionModeToProto(task_source->execution_mode()));
116 task->set_shutdown_behavior(
117 ShutdownBehaviorToProto(traits.shutdown_behavior()));
118 if (token.IsValid())
119 task->set_sequence_token(token.ToInternalValue());
120 #endif // BUILDFLAG(ENABLE_BASE_TRACING)
121 }
122
123 ABSL_CONST_INIT thread_local bool fizzle_block_shutdown_tasks = false;
124
125 } // namespace
126
127 // Atomic internal state used by TaskTracker to track items that are blocking
128 // Shutdown. An "item" consist of either:
129 // - A running SKIP_ON_SHUTDOWN task
130 // - A queued/running BLOCK_SHUTDOWN TaskSource.
131 // Sequential consistency shouldn't be assumed from these calls (i.e. a thread
132 // reading |HasShutdownStarted() == true| isn't guaranteed to see all writes
133 // made before |StartShutdown()| on the thread that invoked it).
134 class TaskTracker::State {
135 public:
136 State() = default;
137 State(const State&) = delete;
138 State& operator=(const State&) = delete;
139
140 // Sets a flag indicating that shutdown has started. Returns true if there are
141 // items blocking shutdown. Can only be called once.
StartShutdown()142 bool StartShutdown() {
143 const auto new_value =
144 subtle::NoBarrier_AtomicIncrement(&bits_, kShutdownHasStartedMask);
145
146 // Check that the "shutdown has started" bit isn't zero. This would happen
147 // if it was incremented twice.
148 DCHECK(new_value & kShutdownHasStartedMask);
149
150 const auto num_items_blocking_shutdown =
151 new_value >> kNumItemsBlockingShutdownBitOffset;
152 return num_items_blocking_shutdown != 0;
153 }
154
155 // Returns true if shutdown has started.
HasShutdownStarted() const156 bool HasShutdownStarted() const {
157 return subtle::NoBarrier_Load(&bits_) & kShutdownHasStartedMask;
158 }
159
160 // Returns true if there are items blocking shutdown.
AreItemsBlockingShutdown() const161 bool AreItemsBlockingShutdown() const {
162 const auto num_items_blocking_shutdown =
163 subtle::NoBarrier_Load(&bits_) >> kNumItemsBlockingShutdownBitOffset;
164 DCHECK_GE(num_items_blocking_shutdown, 0);
165 return num_items_blocking_shutdown != 0;
166 }
167
168 // Increments the number of items blocking shutdown. Returns true if
169 // shutdown has started.
IncrementNumItemsBlockingShutdown()170 bool IncrementNumItemsBlockingShutdown() {
171 #if DCHECK_IS_ON()
172 // Verify that no overflow will occur.
173 const auto num_items_blocking_shutdown =
174 subtle::NoBarrier_Load(&bits_) >> kNumItemsBlockingShutdownBitOffset;
175 DCHECK_LT(num_items_blocking_shutdown,
176 std::numeric_limits<subtle::Atomic32>::max() -
177 kNumItemsBlockingShutdownIncrement);
178 #endif
179
180 const auto new_bits = subtle::NoBarrier_AtomicIncrement(
181 &bits_, kNumItemsBlockingShutdownIncrement);
182 return new_bits & kShutdownHasStartedMask;
183 }
184
185 // Decrements the number of items blocking shutdown. Returns true if shutdown
186 // has started and the number of tasks blocking shutdown becomes zero.
DecrementNumItemsBlockingShutdown()187 bool DecrementNumItemsBlockingShutdown() {
188 const auto new_bits = subtle::NoBarrier_AtomicIncrement(
189 &bits_, -kNumItemsBlockingShutdownIncrement);
190 const bool shutdown_has_started = new_bits & kShutdownHasStartedMask;
191 const auto num_items_blocking_shutdown =
192 new_bits >> kNumItemsBlockingShutdownBitOffset;
193 DCHECK_GE(num_items_blocking_shutdown, 0);
194 return shutdown_has_started && num_items_blocking_shutdown == 0;
195 }
196
197 private:
198 static constexpr subtle::Atomic32 kShutdownHasStartedMask = 1;
199 static constexpr subtle::Atomic32 kNumItemsBlockingShutdownBitOffset = 1;
200 static constexpr subtle::Atomic32 kNumItemsBlockingShutdownIncrement =
201 1 << kNumItemsBlockingShutdownBitOffset;
202
203 // The LSB indicates whether shutdown has started. The other bits count the
204 // number of items blocking shutdown.
205 // No barriers are required to read/write |bits_| as this class is only used
206 // as an atomic state checker, it doesn't provide sequential consistency
207 // guarantees w.r.t. external state. Sequencing of the TaskTracker::State
208 // operations themselves is guaranteed by the AtomicIncrement RMW (read-
209 // modify-write) semantics however. For example, if two threads are racing to
210 // call IncrementNumItemsBlockingShutdown() and StartShutdown() respectively,
211 // either the first thread will win and the StartShutdown() call will see the
212 // blocking task or the second thread will win and
213 // IncrementNumItemsBlockingShutdown() will know that shutdown has started.
214 subtle::Atomic32 bits_ = 0;
215 };
216
TaskTracker()217 TaskTracker::TaskTracker()
218 : has_log_best_effort_tasks_switch_(HasLogBestEffortTasksSwitch()),
219 state_(new State),
220 can_run_policy_(CanRunPolicy::kAll),
221 flush_cv_(flush_lock_.CreateConditionVariable()),
222 shutdown_lock_(&flush_lock_),
223 tracked_ref_factory_(this) {
224 // |flush_cv_| is only waited upon in FlushForTesting(), avoid instantiating a
225 // ScopedBlockingCallWithBaseSyncPrimitives from test threads intentionally
226 // idling themselves to wait on the ThreadPool.
227 flush_cv_->declare_only_used_while_idle();
228 }
229
230 TaskTracker::~TaskTracker() = default;
231
StartShutdown()232 void TaskTracker::StartShutdown() {
233 CheckedAutoLock auto_lock(shutdown_lock_);
234
235 // This method can only be called once.
236 DCHECK(!shutdown_event_);
237 DCHECK(!state_->HasShutdownStarted());
238
239 shutdown_event_ = std::make_unique<WaitableEvent>();
240
241 const bool tasks_are_blocking_shutdown = state_->StartShutdown();
242
243 // From now, if a thread causes the number of tasks blocking shutdown to
244 // become zero, it will call OnBlockingShutdownTasksComplete().
245
246 if (!tasks_are_blocking_shutdown) {
247 // If another thread posts a BLOCK_SHUTDOWN task at this moment, it will
248 // block until this method releases |shutdown_lock_|. Then, it will fail
249 // DCHECK(!shutdown_event_->IsSignaled()). This is the desired behavior
250 // because posting a BLOCK_SHUTDOWN task after StartShutdown() when no
251 // tasks are blocking shutdown isn't allowed.
252 shutdown_event_->Signal();
253 return;
254 }
255 }
256
CompleteShutdown()257 void TaskTracker::CompleteShutdown() {
258 // It is safe to access |shutdown_event_| without holding |lock_| because the
259 // pointer never changes after being set by StartShutdown(), which must
260 // happen-before this.
261 DCHECK(TS_UNCHECKED_READ(shutdown_event_));
262
263 {
264 base::ScopedAllowBaseSyncPrimitives allow_wait;
265 // Allow tests to wait for and introduce logging about the shutdown tasks
266 // before we block this thread.
267 BeginCompleteShutdown(*TS_UNCHECKED_READ(shutdown_event_));
268 // Now block the thread until all tasks are done.
269 TS_UNCHECKED_READ(shutdown_event_)->Wait();
270 }
271
272 // Unblock FlushForTesting() and perform the FlushAsyncForTesting callback
273 // when shutdown completes.
274 {
275 CheckedAutoLock auto_lock(flush_lock_);
276 flush_cv_->Broadcast();
277 }
278 InvokeFlushCallbacksForTesting();
279 }
280
FlushForTesting()281 void TaskTracker::FlushForTesting() {
282 AssertFlushForTestingAllowed();
283 CheckedAutoLock auto_lock(flush_lock_);
284 while (num_incomplete_task_sources_.load(std::memory_order_acquire) != 0 &&
285 !IsShutdownComplete()) {
286 flush_cv_->Wait();
287 }
288 }
289
FlushAsyncForTesting(OnceClosure flush_callback)290 void TaskTracker::FlushAsyncForTesting(OnceClosure flush_callback) {
291 DCHECK(flush_callback);
292 {
293 CheckedAutoLock auto_lock(flush_lock_);
294 flush_callbacks_for_testing_.push_back(std::move(flush_callback));
295 }
296
297 if (num_incomplete_task_sources_.load(std::memory_order_acquire) == 0 ||
298 IsShutdownComplete()) {
299 InvokeFlushCallbacksForTesting();
300 }
301 }
302
SetCanRunPolicy(CanRunPolicy can_run_policy)303 void TaskTracker::SetCanRunPolicy(CanRunPolicy can_run_policy) {
304 can_run_policy_.store(can_run_policy);
305 }
306
WillPostTask(Task * task,TaskShutdownBehavior shutdown_behavior)307 bool TaskTracker::WillPostTask(Task* task,
308 TaskShutdownBehavior shutdown_behavior) {
309 DCHECK(task);
310 DCHECK(task->task);
311
312 if (state_->HasShutdownStarted()) {
313 // A non BLOCK_SHUTDOWN task is allowed to be posted iff shutdown hasn't
314 // started and the task is not delayed.
315 if (shutdown_behavior != TaskShutdownBehavior::BLOCK_SHUTDOWN ||
316 !task->delayed_run_time.is_null() || fizzle_block_shutdown_tasks) {
317 return false;
318 }
319
320 // A BLOCK_SHUTDOWN task posted after shutdown has completed without setting
321 // `fizzle_block_shutdown_tasks` is an ordering bug. This aims to catch
322 // those early.
323 CheckedAutoLock auto_lock(shutdown_lock_);
324 DCHECK(shutdown_event_);
325 DCHECK(!shutdown_event_->IsSignaled())
326 << "posted_from: " << task->posted_from.ToString();
327 }
328
329 // TODO(scheduler-dev): Record the task traits here.
330 task_annotator_.WillQueueTask("ThreadPool_PostTask", task);
331
332 return true;
333 }
334
WillPostTaskNow(const Task & task,TaskPriority priority) const335 bool TaskTracker::WillPostTaskNow(const Task& task,
336 TaskPriority priority) const {
337 // Delayed tasks's TaskShutdownBehavior is implicitly capped at
338 // SKIP_ON_SHUTDOWN. i.e. it cannot BLOCK_SHUTDOWN, TaskTracker will not wait
339 // for a delayed task in a BLOCK_SHUTDOWN TaskSource and will also skip
340 // delayed tasks that happen to become ripe during shutdown.
341 if (!task.delayed_run_time.is_null() && state_->HasShutdownStarted())
342 return false;
343
344 if (has_log_best_effort_tasks_switch_ &&
345 priority == TaskPriority::BEST_EFFORT) {
346 // A TaskPriority::BEST_EFFORT task is being posted.
347 LOG(INFO) << task.posted_from.ToString();
348 }
349 return true;
350 }
351
RegisterTaskSource(scoped_refptr<TaskSource> task_source)352 RegisteredTaskSource TaskTracker::RegisterTaskSource(
353 scoped_refptr<TaskSource> task_source) {
354 DCHECK(task_source);
355
356 TaskShutdownBehavior shutdown_behavior = task_source->shutdown_behavior();
357 if (!BeforeQueueTaskSource(shutdown_behavior))
358 return nullptr;
359
360 num_incomplete_task_sources_.fetch_add(1, std::memory_order_relaxed);
361 return RegisteredTaskSource(std::move(task_source), this);
362 }
363
CanRunPriority(TaskPriority priority) const364 bool TaskTracker::CanRunPriority(TaskPriority priority) const {
365 auto can_run_policy = can_run_policy_.load();
366
367 if (can_run_policy == CanRunPolicy::kAll)
368 return true;
369
370 if (can_run_policy == CanRunPolicy::kForegroundOnly &&
371 priority >= TaskPriority::USER_VISIBLE) {
372 return true;
373 }
374
375 return false;
376 }
377
RunAndPopNextTask(RegisteredTaskSource task_source)378 RegisteredTaskSource TaskTracker::RunAndPopNextTask(
379 RegisteredTaskSource task_source) {
380 DCHECK(task_source);
381
382 const bool should_run_tasks = BeforeRunTask(task_source->shutdown_behavior());
383
384 // Run the next task in |task_source|.
385 absl::optional<Task> task;
386 TaskTraits traits;
387 {
388 auto transaction = task_source->BeginTransaction();
389 task = should_run_tasks ? task_source.TakeTask(&transaction)
390 : task_source.Clear(&transaction);
391 traits = transaction.traits();
392 }
393
394 if (task) {
395 // Skip delayed tasks if shutdown started.
396 if (!task->delayed_run_time.is_null() && state_->HasShutdownStarted())
397 task->task = base::DoNothingWithBoundArgs(std::move(task->task));
398
399 // Run the |task| (whether it's a worker task or the Clear() closure).
400 RunTask(std::move(task.value()), task_source.get(), traits);
401 }
402 if (should_run_tasks)
403 AfterRunTask(task_source->shutdown_behavior());
404 const bool task_source_must_be_queued = task_source.DidProcessTask();
405 // |task_source| should be reenqueued iff requested by DidProcessTask().
406 if (task_source_must_be_queued)
407 return task_source;
408 return nullptr;
409 }
410
HasShutdownStarted() const411 bool TaskTracker::HasShutdownStarted() const {
412 return state_->HasShutdownStarted();
413 }
414
IsShutdownComplete() const415 bool TaskTracker::IsShutdownComplete() const {
416 CheckedAutoLock auto_lock(shutdown_lock_);
417 return shutdown_event_ && shutdown_event_->IsSignaled();
418 }
419
BeginFizzlingBlockShutdownTasks()420 void TaskTracker::BeginFizzlingBlockShutdownTasks() {
421 fizzle_block_shutdown_tasks = true;
422 }
423
EndFizzlingBlockShutdownTasks()424 void TaskTracker::EndFizzlingBlockShutdownTasks() {
425 fizzle_block_shutdown_tasks = false;
426 }
427
RunTask(Task task,TaskSource * task_source,const TaskTraits & traits)428 void TaskTracker::RunTask(Task task,
429 TaskSource* task_source,
430 const TaskTraits& traits) {
431 DCHECK(task_source);
432
433 const auto environment = task_source->GetExecutionEnvironment();
434
435 absl::optional<ScopedDisallowSingleton> disallow_singleton;
436 absl::optional<ScopedDisallowBlocking> disallow_blocking;
437 absl::optional<ScopedDisallowBaseSyncPrimitives> disallow_sync_primitives;
438 if (traits.shutdown_behavior() == TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN)
439 disallow_singleton.emplace();
440 if (!traits.may_block())
441 disallow_blocking.emplace();
442 if (!traits.with_base_sync_primitives())
443 disallow_sync_primitives.emplace();
444
445 {
446 DCHECK(environment.token.IsValid());
447 ScopedSetSequenceTokenForCurrentThread
448 scoped_set_sequence_token_for_current_thread(environment.token);
449 ScopedSetTaskPriorityForCurrentThread
450 scoped_set_task_priority_for_current_thread(traits.priority());
451
452 // Local storage map used if none is provided by |environment|.
453 absl::optional<SequenceLocalStorageMap> local_storage_map;
454 if (!environment.sequence_local_storage)
455 local_storage_map.emplace();
456
457 ScopedSetSequenceLocalStorageMapForCurrentThread
458 scoped_set_sequence_local_storage_map_for_current_thread(
459 environment.sequence_local_storage
460 ? environment.sequence_local_storage.get()
461 : &local_storage_map.value());
462
463 // Set up TaskRunner CurrentDefaultHandle as expected for the scope of the
464 // task.
465 absl::optional<SequencedTaskRunner::CurrentDefaultHandle>
466 sequenced_task_runner_current_default_handle;
467 absl::optional<SingleThreadTaskRunner::CurrentDefaultHandle>
468 single_thread_task_runner_current_default_handle;
469 switch (task_source->execution_mode()) {
470 case TaskSourceExecutionMode::kJob:
471 case TaskSourceExecutionMode::kParallel:
472 break;
473 case TaskSourceExecutionMode::kSequenced:
474 DCHECK(task_source->task_runner());
475 sequenced_task_runner_current_default_handle.emplace(
476 static_cast<SequencedTaskRunner*>(task_source->task_runner()));
477 break;
478 case TaskSourceExecutionMode::kSingleThread:
479 DCHECK(task_source->task_runner());
480 single_thread_task_runner_current_default_handle.emplace(
481 static_cast<SingleThreadTaskRunner*>(task_source->task_runner()));
482 break;
483 }
484
485 RunTaskWithShutdownBehavior(task, traits, task_source, environment.token);
486
487 // Make sure the arguments bound to the callback are deleted within the
488 // scope in which the callback runs.
489 task.task = OnceClosure();
490 }
491 }
492
BeginCompleteShutdown(base::WaitableEvent & shutdown_event)493 void TaskTracker::BeginCompleteShutdown(base::WaitableEvent& shutdown_event) {
494 // Do nothing in production, tests may override this.
495 }
496
HasIncompleteTaskSourcesForTesting() const497 bool TaskTracker::HasIncompleteTaskSourcesForTesting() const {
498 return num_incomplete_task_sources_.load(std::memory_order_acquire) != 0;
499 }
500
BeforeQueueTaskSource(TaskShutdownBehavior shutdown_behavior)501 bool TaskTracker::BeforeQueueTaskSource(
502 TaskShutdownBehavior shutdown_behavior) {
503 if (shutdown_behavior == TaskShutdownBehavior::BLOCK_SHUTDOWN) {
504 // BLOCK_SHUTDOWN task sources block shutdown between the moment they are
505 // queued and the moment their last task completes its execution.
506 const bool shutdown_started = state_->IncrementNumItemsBlockingShutdown();
507
508 if (shutdown_started) {
509 // A BLOCK_SHUTDOWN task posted after shutdown has completed is an
510 // ordering bug. This aims to catch those early.
511 CheckedAutoLock auto_lock(shutdown_lock_);
512 DCHECK(shutdown_event_);
513 DCHECK(!shutdown_event_->IsSignaled());
514 }
515
516 return true;
517 }
518
519 // A non BLOCK_SHUTDOWN task is allowed to be posted iff shutdown hasn't
520 // started.
521 return !state_->HasShutdownStarted();
522 }
523
BeforeRunTask(TaskShutdownBehavior shutdown_behavior)524 bool TaskTracker::BeforeRunTask(TaskShutdownBehavior shutdown_behavior) {
525 switch (shutdown_behavior) {
526 case TaskShutdownBehavior::BLOCK_SHUTDOWN: {
527 // The number of tasks blocking shutdown has been incremented when the
528 // task was posted.
529 DCHECK(state_->AreItemsBlockingShutdown());
530
531 // Trying to run a BLOCK_SHUTDOWN task after shutdown has completed is
532 // unexpected as it either shouldn't have been posted if shutdown
533 // completed or should be blocking shutdown if it was posted before it
534 // did.
535 DCHECK(!state_->HasShutdownStarted() || !IsShutdownComplete());
536
537 return true;
538 }
539
540 case TaskShutdownBehavior::SKIP_ON_SHUTDOWN: {
541 // SKIP_ON_SHUTDOWN tasks block shutdown while they are running.
542 const bool shutdown_started = state_->IncrementNumItemsBlockingShutdown();
543
544 if (shutdown_started) {
545 // The SKIP_ON_SHUTDOWN task isn't allowed to run during shutdown.
546 // Decrement the number of tasks blocking shutdown that was wrongly
547 // incremented.
548 DecrementNumItemsBlockingShutdown();
549 return false;
550 }
551
552 return true;
553 }
554
555 case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN: {
556 return !state_->HasShutdownStarted();
557 }
558 }
559
560 NOTREACHED();
561 return false;
562 }
563
AfterRunTask(TaskShutdownBehavior shutdown_behavior)564 void TaskTracker::AfterRunTask(TaskShutdownBehavior shutdown_behavior) {
565 if (shutdown_behavior == TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
566 DecrementNumItemsBlockingShutdown();
567 }
568 }
569
UnregisterTaskSource(scoped_refptr<TaskSource> task_source)570 scoped_refptr<TaskSource> TaskTracker::UnregisterTaskSource(
571 scoped_refptr<TaskSource> task_source) {
572 DCHECK(task_source);
573 if (task_source->shutdown_behavior() ==
574 TaskShutdownBehavior::BLOCK_SHUTDOWN) {
575 DecrementNumItemsBlockingShutdown();
576 }
577 DecrementNumIncompleteTaskSources();
578 return task_source;
579 }
580
DecrementNumItemsBlockingShutdown()581 void TaskTracker::DecrementNumItemsBlockingShutdown() {
582 const bool shutdown_started_and_no_items_block_shutdown =
583 state_->DecrementNumItemsBlockingShutdown();
584 if (!shutdown_started_and_no_items_block_shutdown)
585 return;
586
587 CheckedAutoLock auto_lock(shutdown_lock_);
588 DCHECK(shutdown_event_);
589 shutdown_event_->Signal();
590 }
591
DecrementNumIncompleteTaskSources()592 void TaskTracker::DecrementNumIncompleteTaskSources() {
593 const auto prev_num_incomplete_task_sources =
594 num_incomplete_task_sources_.fetch_sub(1);
595 DCHECK_GE(prev_num_incomplete_task_sources, 1);
596 if (prev_num_incomplete_task_sources == 1) {
597 {
598 CheckedAutoLock auto_lock(flush_lock_);
599 flush_cv_->Broadcast();
600 }
601 InvokeFlushCallbacksForTesting();
602 }
603 }
604
InvokeFlushCallbacksForTesting()605 void TaskTracker::InvokeFlushCallbacksForTesting() {
606 base::circular_deque<OnceClosure> flush_callbacks;
607 {
608 CheckedAutoLock auto_lock(flush_lock_);
609 flush_callbacks = std::move(flush_callbacks_for_testing_);
610 }
611 for (auto& flush_callback : flush_callbacks)
612 std::move(flush_callback).Run();
613 }
614
RunContinueOnShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)615 NOINLINE void TaskTracker::RunContinueOnShutdown(Task& task,
616 const TaskTraits& traits,
617 TaskSource* task_source,
618 const SequenceToken& token) {
619 NO_CODE_FOLDING();
620 RunTaskImpl(task, traits, task_source, token);
621 }
622
RunSkipOnShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)623 NOINLINE void TaskTracker::RunSkipOnShutdown(Task& task,
624 const TaskTraits& traits,
625 TaskSource* task_source,
626 const SequenceToken& token) {
627 NO_CODE_FOLDING();
628 RunTaskImpl(task, traits, task_source, token);
629 }
630
RunBlockShutdown(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)631 NOINLINE void TaskTracker::RunBlockShutdown(Task& task,
632 const TaskTraits& traits,
633 TaskSource* task_source,
634 const SequenceToken& token) {
635 NO_CODE_FOLDING();
636 RunTaskImpl(task, traits, task_source, token);
637 }
638
RunTaskImpl(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)639 void TaskTracker::RunTaskImpl(Task& task,
640 const TaskTraits& traits,
641 TaskSource* task_source,
642 const SequenceToken& token) {
643 task_annotator_.RunTask(
644 "ThreadPool_RunTask", task, [&](perfetto::EventContext& ctx) {
645 EmitThreadPoolTraceEventMetadata(ctx, traits, task_source, token);
646 });
647 }
648
RunTaskWithShutdownBehavior(Task & task,const TaskTraits & traits,TaskSource * task_source,const SequenceToken & token)649 void TaskTracker::RunTaskWithShutdownBehavior(Task& task,
650 const TaskTraits& traits,
651 TaskSource* task_source,
652 const SequenceToken& token) {
653 switch (traits.shutdown_behavior()) {
654 case TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN:
655 RunContinueOnShutdown(task, traits, task_source, token);
656 return;
657 case TaskShutdownBehavior::SKIP_ON_SHUTDOWN:
658 RunSkipOnShutdown(task, traits, task_source, token);
659 return;
660 case TaskShutdownBehavior::BLOCK_SHUTDOWN:
661 RunBlockShutdown(task, traits, task_source, token);
662 return;
663 }
664 }
665
666 } // namespace internal
667 } // namespace base
668