1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/pooled_single_thread_task_runner_manager.h"
6
7 #include <memory>
8 #include <string>
9 #include <utility>
10
11 #include "base/debug/leak_annotations.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/memory/raw_ptr.h"
16 #include "base/ranges/algorithm.h"
17 #include "base/strings/stringprintf.h"
18 #include "base/synchronization/atomic_flag.h"
19 #include "base/task/default_delayed_task_handle_delegate.h"
20 #include "base/task/single_thread_task_runner.h"
21 #include "base/task/task_features.h"
22 #include "base/task/task_traits.h"
23 #include "base/task/thread_pool/delayed_task_manager.h"
24 #include "base/task/thread_pool/priority_queue.h"
25 #include "base/task/thread_pool/sequence.h"
26 #include "base/task/thread_pool/task.h"
27 #include "base/task/thread_pool/task_source.h"
28 #include "base/task/thread_pool/task_tracker.h"
29 #include "base/task/thread_pool/worker_thread.h"
30 #include "base/threading/platform_thread.h"
31 #include "base/time/time.h"
32 #include "build/build_config.h"
33
34 #if BUILDFLAG(IS_WIN)
35 #include <windows.h>
36
37 #include "base/win/scoped_com_initializer.h"
38 #endif // BUILDFLAG(IS_WIN)
39
40 namespace base {
41 namespace internal {
42
43 namespace {
44
45 // Boolean indicating whether there's a PooledSingleThreadTaskRunnerManager
46 // instance alive in this process. This variable should only be set when the
47 // PooledSingleThreadTaskRunnerManager instance is brought up (on the main
48 // thread; before any tasks are posted) and decremented when the instance is
49 // brought down (i.e., only when unit tests tear down the task environment and
50 // never in production). This makes the variable const while worker threads are
51 // up and as such it doesn't need to be atomic. It is used to tell when a task
52 // is posted from the main thread after the task environment was brought down in
53 // unit tests so that PooledSingleThreadTaskRunnerManager bound TaskRunners
54 // can return false on PostTask, letting such callers know they should complete
55 // necessary work synchronously. Note: |!g_manager_is_alive| is generally
56 // equivalent to |!ThreadPoolInstance::Get()| but has the advantage of being
57 // valid in thread_pool unit tests that don't instantiate a full
58 // thread pool.
59 bool g_manager_is_alive = false;
60
61 bool g_use_utility_thread_group = false;
62
GetEnvironmentIndexForTraits(const TaskTraits & traits)63 size_t GetEnvironmentIndexForTraits(const TaskTraits& traits) {
64 const bool is_background =
65 traits.priority() == TaskPriority::BEST_EFFORT &&
66 traits.thread_policy() == ThreadPolicy::PREFER_BACKGROUND &&
67 CanUseBackgroundThreadTypeForWorkerThread();
68 const bool is_utility =
69 !is_background && traits.priority() <= TaskPriority::USER_VISIBLE &&
70 traits.thread_policy() == ThreadPolicy::PREFER_BACKGROUND &&
71 g_use_utility_thread_group;
72 if (traits.may_block() || traits.with_base_sync_primitives()) {
73 return is_background ? BACKGROUND_BLOCKING
74 : is_utility ? UTILITY_BLOCKING
75 : FOREGROUND_BLOCKING;
76 }
77 return is_background ? BACKGROUND : is_utility ? UTILITY : FOREGROUND;
78 }
79
80 // Allows for checking the PlatformThread::CurrentRef() against a set
81 // PlatformThreadRef atomically without using locks.
82 class AtomicThreadRefChecker {
83 public:
84 AtomicThreadRefChecker() = default;
85 AtomicThreadRefChecker(const AtomicThreadRefChecker&) = delete;
86 AtomicThreadRefChecker& operator=(const AtomicThreadRefChecker&) = delete;
87 ~AtomicThreadRefChecker() = default;
88
Set()89 void Set() {
90 thread_ref_ = PlatformThread::CurrentRef();
91 is_set_.Set();
92 }
93
IsCurrentThreadSameAsSetThread()94 bool IsCurrentThreadSameAsSetThread() {
95 return is_set_.IsSet() && thread_ref_ == PlatformThread::CurrentRef();
96 }
97
98 private:
99 AtomicFlag is_set_;
100 PlatformThreadRef thread_ref_;
101 };
102
103 class WorkerThreadDelegate : public WorkerThread::Delegate {
104 public:
WorkerThreadDelegate(const std::string & thread_name,WorkerThread::ThreadLabel thread_label,TrackedRef<TaskTracker> task_tracker)105 WorkerThreadDelegate(const std::string& thread_name,
106 WorkerThread::ThreadLabel thread_label,
107 TrackedRef<TaskTracker> task_tracker)
108 : task_tracker_(std::move(task_tracker)),
109 thread_name_(thread_name),
110 thread_label_(thread_label) {}
111 WorkerThreadDelegate(const WorkerThreadDelegate&) = delete;
112 WorkerThreadDelegate& operator=(const WorkerThreadDelegate&) = delete;
113
set_worker(WorkerThread * worker)114 void set_worker(WorkerThread* worker) {
115 DCHECK(!worker_);
116 worker_ = worker;
117 }
118
GetThreadLabel() const119 WorkerThread::ThreadLabel GetThreadLabel() const final {
120 return thread_label_;
121 }
122
OnMainEntry(WorkerThread *)123 void OnMainEntry(WorkerThread* /* worker */) override {
124 thread_ref_checker_.Set();
125 PlatformThread::SetName(thread_name_);
126 }
127
GetWork(WorkerThread * worker)128 RegisteredTaskSource GetWork(WorkerThread* worker) override {
129 CheckedAutoLock auto_lock(lock_);
130 DCHECK(worker_awake_);
131
132 auto task_source = GetWorkLockRequired(worker);
133 if (!task_source) {
134 // The worker will sleep after this returns nullptr.
135 worker_awake_ = false;
136 return nullptr;
137 }
138 auto run_status = task_source.WillRunTask();
139 DCHECK_NE(run_status, TaskSource::RunStatus::kDisallowed);
140 return task_source;
141 }
142
DidProcessTask(RegisteredTaskSource task_source)143 void DidProcessTask(RegisteredTaskSource task_source) override {
144 if (task_source) {
145 auto task_source_with_transaction =
146 TransactionWithRegisteredTaskSource::FromTaskSource(
147 std::move(task_source));
148 task_source_with_transaction.task_source.WillReEnqueue(
149 TimeTicks::Now(), &task_source_with_transaction.transaction);
150 EnqueueTaskSource(std::move(task_source_with_transaction));
151 }
152 }
153
GetSleepTimeout()154 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
155
PostTaskNow(scoped_refptr<Sequence> sequence,Task task)156 bool PostTaskNow(scoped_refptr<Sequence> sequence, Task task) {
157 auto transaction = sequence->BeginTransaction();
158
159 // |task| will be pushed to |sequence|, and |sequence| will be queued
160 // to |priority_queue_| iff |sequence_should_be_queued| is true.
161 const bool sequence_should_be_queued = transaction.WillPushImmediateTask();
162 RegisteredTaskSource task_source;
163 if (sequence_should_be_queued) {
164 task_source = task_tracker_->RegisterTaskSource(sequence);
165 // We shouldn't push |task| if we're not allowed to queue |task_source|.
166 if (!task_source)
167 return false;
168 }
169 if (!task_tracker_->WillPostTaskNow(task, transaction.traits().priority()))
170 return false;
171 transaction.PushImmediateTask(std::move(task));
172 if (task_source) {
173 bool should_wakeup =
174 EnqueueTaskSource({std::move(task_source), std::move(transaction)});
175 if (should_wakeup)
176 worker_->WakeUp();
177 }
178 return true;
179 }
180
RunsTasksInCurrentSequence()181 bool RunsTasksInCurrentSequence() {
182 // We check the thread ref instead of the sequence for the benefit of COM
183 // callbacks which may execute without a sequence context.
184 return thread_ref_checker_.IsCurrentThreadSameAsSetThread();
185 }
186
OnMainExit(WorkerThread *)187 void OnMainExit(WorkerThread* /* worker */) override {}
188
DidUpdateCanRunPolicy()189 void DidUpdateCanRunPolicy() {
190 bool should_wakeup = false;
191 {
192 CheckedAutoLock auto_lock(lock_);
193 if (!worker_awake_ && CanRunNextTaskSource()) {
194 should_wakeup = true;
195 worker_awake_ = true;
196 }
197 }
198 if (should_wakeup)
199 worker_->WakeUp();
200 }
201
EnableFlushPriorityQueueTaskSourcesOnDestroyForTesting()202 void EnableFlushPriorityQueueTaskSourcesOnDestroyForTesting() {
203 CheckedAutoLock auto_lock(lock_);
204 priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
205 }
206
207 protected:
GetWorkLockRequired(WorkerThread * worker)208 RegisteredTaskSource GetWorkLockRequired(WorkerThread* worker)
209 EXCLUSIVE_LOCKS_REQUIRED(lock_) {
210 if (!CanRunNextTaskSource()) {
211 return nullptr;
212 }
213 return priority_queue_.PopTaskSource();
214 }
215
task_tracker()216 const TrackedRef<TaskTracker>& task_tracker() { return task_tracker_; }
217
218 CheckedLock lock_;
219 bool worker_awake_ GUARDED_BY(lock_) = false;
220
221 const TrackedRef<TaskTracker> task_tracker_;
222
223 private:
224 // Enqueues a task source in this single-threaded worker's priority queue.
225 // Returns true iff the worker must wakeup, i.e. task source is allowed to run
226 // and the worker was not awake.
EnqueueTaskSource(TransactionWithRegisteredTaskSource transaction_with_task_source)227 bool EnqueueTaskSource(
228 TransactionWithRegisteredTaskSource transaction_with_task_source) {
229 CheckedAutoLock auto_lock(lock_);
230 auto sort_key = transaction_with_task_source.task_source->GetSortKey();
231 // When moving |task_source| into |priority_queue_|, it may be destroyed
232 // on another thread as soon as |lock_| is released, since we're no longer
233 // holding a reference to it. To prevent UAF, release |transaction| before
234 // moving |task_source|. Ref. crbug.com/1412008
235 transaction_with_task_source.transaction.Release();
236 priority_queue_.Push(std::move(transaction_with_task_source.task_source),
237 sort_key);
238 if (!worker_awake_ && CanRunNextTaskSource()) {
239 worker_awake_ = true;
240 return true;
241 }
242 return false;
243 }
244
CanRunNextTaskSource()245 bool CanRunNextTaskSource() EXCLUSIVE_LOCKS_REQUIRED(lock_) {
246 return !priority_queue_.IsEmpty() &&
247 task_tracker_->CanRunPriority(
248 priority_queue_.PeekSortKey().priority());
249 }
250
251 const std::string thread_name_;
252 const WorkerThread::ThreadLabel thread_label_;
253
254 // The WorkerThread that has |this| as a delegate. Must be set before
255 // starting or posting a task to the WorkerThread, because it's used in
256 // OnMainEntry() and PostTaskNow().
257 raw_ptr<WorkerThread> worker_ = nullptr;
258
259 PriorityQueue priority_queue_ GUARDED_BY(lock_);
260
261 AtomicThreadRefChecker thread_ref_checker_;
262 };
263
264 #if BUILDFLAG(IS_WIN)
265
266 class WorkerThreadCOMDelegate : public WorkerThreadDelegate {
267 public:
WorkerThreadCOMDelegate(const std::string & thread_name,WorkerThread::ThreadLabel thread_label,TrackedRef<TaskTracker> task_tracker)268 WorkerThreadCOMDelegate(const std::string& thread_name,
269 WorkerThread::ThreadLabel thread_label,
270 TrackedRef<TaskTracker> task_tracker)
271 : WorkerThreadDelegate(thread_name,
272 thread_label,
273 std::move(task_tracker)) {}
274
275 WorkerThreadCOMDelegate(const WorkerThreadCOMDelegate&) = delete;
276 WorkerThreadCOMDelegate& operator=(const WorkerThreadCOMDelegate&) = delete;
~WorkerThreadCOMDelegate()277 ~WorkerThreadCOMDelegate() override { DCHECK(!scoped_com_initializer_); }
278
279 // WorkerThread::Delegate:
OnMainEntry(WorkerThread * worker)280 void OnMainEntry(WorkerThread* worker) override {
281 WorkerThreadDelegate::OnMainEntry(worker);
282
283 scoped_com_initializer_ = std::make_unique<win::ScopedCOMInitializer>();
284 }
285
GetWork(WorkerThread * worker)286 RegisteredTaskSource GetWork(WorkerThread* worker) override {
287 // This scheme below allows us to cover the following scenarios:
288 // * Only WorkerThreadDelegate::GetWork() has work:
289 // Always return the task source from GetWork().
290 // * Only the Windows Message Queue has work:
291 // Always return the task source from GetWorkFromWindowsMessageQueue();
292 // * Both WorkerThreadDelegate::GetWork() and the Windows Message Queue
293 // have work:
294 // Process task sources from each source round-robin style.
295 CheckedAutoLock auto_lock(lock_);
296
297 // |worker_awake_| is always set before a call to WakeUp(), but it is
298 // not set when messages are added to the Windows Message Queue. Ensure that
299 // it is set before getting work, to avoid unnecessary wake ups.
300 //
301 // Note: It wouldn't be sufficient to set |worker_awake_| in WaitForWork()
302 // when MsgWaitForMultipleObjectsEx() indicates that it was woken up by a
303 // Windows Message, because of the following scenario:
304 // T1: PostTask
305 // Queue task
306 // Set |worker_awake_| to true
307 // T2: Woken up by a Windows Message
308 // Set |worker_awake_| to true
309 // Run the task posted by T1
310 // Wait for work
311 // T1: WakeUp()
312 // T2: Woken up by Waitable Event
313 // Does not set |worker_awake_| (wake up not from Windows Message)
314 // GetWork
315 // !! Getting work while |worker_awake_| is false !!
316 worker_awake_ = true;
317 RegisteredTaskSource task_source;
318 if (get_work_first_) {
319 task_source = WorkerThreadDelegate::GetWorkLockRequired(worker);
320 if (task_source)
321 get_work_first_ = false;
322 }
323
324 if (!task_source) {
325 CheckedAutoUnlock auto_unlock(lock_);
326 task_source = GetWorkFromWindowsMessageQueue();
327 if (task_source)
328 get_work_first_ = true;
329 }
330
331 if (!task_source && !get_work_first_) {
332 // This case is important if we checked the Windows Message Queue first
333 // and found there was no work. We don't want to return null immediately
334 // as that could cause the thread to go to sleep while work is waiting via
335 // WorkerThreadDelegate::GetWork().
336 task_source = WorkerThreadDelegate::GetWorkLockRequired(worker);
337 }
338 if (!task_source) {
339 // The worker will sleep after this returns nullptr.
340 worker_awake_ = false;
341 return nullptr;
342 }
343 auto run_status = task_source.WillRunTask();
344 DCHECK_NE(run_status, TaskSource::RunStatus::kDisallowed);
345 return task_source;
346 }
347
OnMainExit(WorkerThread *)348 void OnMainExit(WorkerThread* /* worker */) override {
349 scoped_com_initializer_.reset();
350 }
351
WaitForWork(WaitableEvent * wake_up_event)352 void WaitForWork(WaitableEvent* wake_up_event) override {
353 DCHECK(wake_up_event);
354 const TimeDelta sleep_time = GetSleepTimeout();
355 const DWORD milliseconds_wait = checked_cast<DWORD>(
356 sleep_time.is_max() ? INFINITE : sleep_time.InMilliseconds());
357 const HANDLE wake_up_event_handle = wake_up_event->handle();
358 MsgWaitForMultipleObjectsEx(1, &wake_up_event_handle, milliseconds_wait,
359 QS_ALLINPUT, 0);
360 }
361
362 private:
GetWorkFromWindowsMessageQueue()363 RegisteredTaskSource GetWorkFromWindowsMessageQueue() {
364 MSG msg;
365 if (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) != FALSE) {
366 Task pump_message_task(FROM_HERE,
367 BindOnce(
368 [](MSG msg) {
369 TranslateMessage(&msg);
370 DispatchMessage(&msg);
371 },
372 std::move(msg)),
373 TimeTicks::Now(), TimeDelta());
374 if (task_tracker()->WillPostTask(
375 &pump_message_task, TaskShutdownBehavior::SKIP_ON_SHUTDOWN)) {
376 auto transaction = message_pump_sequence_->BeginTransaction();
377 const bool sequence_should_be_queued =
378 transaction.WillPushImmediateTask();
379 DCHECK(sequence_should_be_queued)
380 << "GetWorkFromWindowsMessageQueue() does not expect "
381 "queueing of pump tasks.";
382 auto registered_task_source = task_tracker_->RegisterTaskSource(
383 std::move(message_pump_sequence_));
384 if (!registered_task_source)
385 return nullptr;
386 transaction.PushImmediateTask(std::move(pump_message_task));
387 return registered_task_source;
388 } else {
389 // `pump_message_task`'s destructor may run sequence-affine code, so it
390 // must be leaked when `WillPostTask` returns false.
391 auto leak = std::make_unique<Task>(std::move(pump_message_task));
392 ANNOTATE_LEAKING_OBJECT_PTR(leak.get());
393 leak.release();
394 }
395 }
396 return nullptr;
397 }
398
399 bool get_work_first_ = true;
400 const scoped_refptr<Sequence> message_pump_sequence_ =
401 MakeRefCounted<Sequence>(TaskTraits{MayBlock()},
402 nullptr,
403 TaskSourceExecutionMode::kParallel);
404 std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_;
405 };
406
407 #endif // BUILDFLAG(IS_WIN)
408
409 } // namespace
410
411 class PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunner
412 : public SingleThreadTaskRunner {
413 public:
414 // Constructs a PooledSingleThreadTaskRunner that indirectly controls the
415 // lifetime of a dedicated |worker| for |traits|.
PooledSingleThreadTaskRunner(PooledSingleThreadTaskRunnerManager * const outer,const TaskTraits & traits,WorkerThread * worker,SingleThreadTaskRunnerThreadMode thread_mode)416 PooledSingleThreadTaskRunner(PooledSingleThreadTaskRunnerManager* const outer,
417 const TaskTraits& traits,
418 WorkerThread* worker,
419 SingleThreadTaskRunnerThreadMode thread_mode)
420 : outer_(outer),
421 worker_(worker),
422 thread_mode_(thread_mode),
423 sequence_(
424 MakeRefCounted<Sequence>(traits,
425 this,
426 TaskSourceExecutionMode::kSingleThread)) {
427 DCHECK(outer_);
428 DCHECK(worker_);
429 }
430 PooledSingleThreadTaskRunner(const PooledSingleThreadTaskRunner&) = delete;
431 PooledSingleThreadTaskRunner& operator=(const PooledSingleThreadTaskRunner&) =
432 delete;
433
434 // SingleThreadTaskRunner:
PostDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)435 bool PostDelayedTask(const Location& from_here,
436 OnceClosure closure,
437 TimeDelta delay) override {
438 if (!g_manager_is_alive)
439 return false;
440
441 Task task(from_here, std::move(closure), TimeTicks::Now(), delay,
442 GetDefaultTaskLeeway());
443 return PostTask(std::move(task));
444 }
445
PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,const Location & from_here,OnceClosure closure,TimeTicks delayed_run_time,subtle::DelayPolicy delay_policy)446 bool PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,
447 const Location& from_here,
448 OnceClosure closure,
449 TimeTicks delayed_run_time,
450 subtle::DelayPolicy delay_policy) override {
451 if (!g_manager_is_alive)
452 return false;
453
454 Task task(from_here, std::move(closure), TimeTicks::Now(), delayed_run_time,
455 GetDefaultTaskLeeway(), delay_policy);
456 return PostTask(std::move(task));
457 }
458
PostNonNestableDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)459 bool PostNonNestableDelayedTask(const Location& from_here,
460 OnceClosure closure,
461 TimeDelta delay) override {
462 // Tasks are never nested within the thread pool.
463 return PostDelayedTask(from_here, std::move(closure), delay);
464 }
465
RunsTasksInCurrentSequence() const466 bool RunsTasksInCurrentSequence() const override {
467 if (!g_manager_is_alive)
468 return false;
469 return GetDelegate()->RunsTasksInCurrentSequence();
470 }
471
472 private:
~PooledSingleThreadTaskRunner()473 ~PooledSingleThreadTaskRunner() override {
474 // Only unregister if this is a DEDICATED SingleThreadTaskRunner. SHARED
475 // task runner WorkerThreads are managed separately as they are reused.
476 // |g_manager_is_alive| avoids a use-after-free should this
477 // PooledSingleThreadTaskRunner outlive its manager. It is safe to access
478 // |g_manager_is_alive| without synchronization primitives as it is const
479 // for the lifetime of the manager and ~PooledSingleThreadTaskRunner()
480 // either happens prior to the end of JoinForTesting() (which happens-before
481 // manager's destruction) or on main thread after the task environment's
482 // entire destruction (which happens-after the manager's destruction). Yes,
483 // there's a theoretical use case where the last ref to this
484 // PooledSingleThreadTaskRunner is handed to a thread not controlled by
485 // thread_pool and that this ends up causing
486 // ~PooledSingleThreadTaskRunner() to race with
487 // ~PooledSingleThreadTaskRunnerManager() but this is intentionally not
488 // supported (and it doesn't matter in production where we leak the task
489 // environment for such reasons). TSan should catch this weird paradigm
490 // should anyone elect to use it in a unit test and the error would point
491 // here.
492 if (g_manager_is_alive &&
493 thread_mode_ == SingleThreadTaskRunnerThreadMode::DEDICATED) {
494 outer_->UnregisterWorkerThread(worker_);
495 }
496 }
497
PostTask(Task task)498 bool PostTask(Task task) {
499 if (!outer_->task_tracker_->WillPostTask(&task,
500 sequence_->shutdown_behavior())) {
501 // `task`'s destructor may run sequence-affine code, so it must be leaked
502 // when `WillPostTask` returns false.
503 auto leak = std::make_unique<Task>(std::move(task));
504 ANNOTATE_LEAKING_OBJECT_PTR(leak.get());
505 leak.release();
506 return false;
507 }
508
509 if (task.delayed_run_time.is_null())
510 return GetDelegate()->PostTaskNow(sequence_, std::move(task));
511
512 // Unretained(GetDelegate()) is safe because this TaskRunner and its
513 // worker are kept alive as long as there are pending Tasks.
514 outer_->delayed_task_manager_->AddDelayedTask(
515 std::move(task),
516 BindOnce(IgnoreResult(&WorkerThreadDelegate::PostTaskNow),
517 Unretained(GetDelegate()), sequence_),
518 this);
519 return true;
520 }
521
GetDelegate() const522 WorkerThreadDelegate* GetDelegate() const {
523 return static_cast<WorkerThreadDelegate*>(worker_->delegate());
524 }
525
526 // Dangling but safe since use is controlled by `g_manager_is_alive`.
527 const raw_ptr<PooledSingleThreadTaskRunnerManager,
528 DisableDanglingPtrDetection>
529 outer_;
530
531 const raw_ptr<WorkerThread, DanglingUntriaged> worker_;
532 const SingleThreadTaskRunnerThreadMode thread_mode_;
533 const scoped_refptr<Sequence> sequence_;
534 };
535
PooledSingleThreadTaskRunnerManager(TrackedRef<TaskTracker> task_tracker,DelayedTaskManager * delayed_task_manager)536 PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunnerManager(
537 TrackedRef<TaskTracker> task_tracker,
538 DelayedTaskManager* delayed_task_manager)
539 : task_tracker_(std::move(task_tracker)),
540 delayed_task_manager_(delayed_task_manager) {
541 DCHECK(task_tracker_);
542 DCHECK(delayed_task_manager_);
543 #if BUILDFLAG(IS_WIN)
544 static_assert(std::extent<decltype(shared_com_worker_threads_)>() ==
545 std::extent<decltype(shared_worker_threads_)>(),
546 "The size of |shared_com_worker_threads_| must match "
547 "|shared_worker_threads_|");
548 static_assert(
549 std::extent<
550 std::remove_reference<decltype(shared_com_worker_threads_[0])>>() ==
551 std::extent<
552 std::remove_reference<decltype(shared_worker_threads_[0])>>(),
553 "The size of |shared_com_worker_threads_| must match "
554 "|shared_worker_threads_|");
555 #endif // BUILDFLAG(IS_WIN)
556 DCHECK(!g_manager_is_alive);
557 g_manager_is_alive = true;
558 }
559
~PooledSingleThreadTaskRunnerManager()560 PooledSingleThreadTaskRunnerManager::~PooledSingleThreadTaskRunnerManager() {
561 DCHECK(g_manager_is_alive);
562 g_manager_is_alive = false;
563 g_use_utility_thread_group = false;
564 }
565
Start(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,WorkerThreadObserver * worker_thread_observer)566 void PooledSingleThreadTaskRunnerManager::Start(
567 scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
568 WorkerThreadObserver* worker_thread_observer) {
569 DCHECK(!worker_thread_observer_);
570 worker_thread_observer_ = worker_thread_observer;
571 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
572 DCHECK(io_thread_task_runner);
573 io_thread_task_runner_ = std::move(io_thread_task_runner);
574 #endif // (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
575
576 g_use_utility_thread_group = CanUseUtilityThreadTypeForWorkerThread() &&
577 FeatureList::IsEnabled(kUseUtilityThreadGroup);
578
579 decltype(workers_) workers_to_start;
580 {
581 CheckedAutoLock auto_lock(lock_);
582 started_ = true;
583 workers_to_start = workers_;
584 }
585
586 // Start workers that were created before this method was called.
587 // Workers that already need to wake up are already signaled as part of
588 // PooledSingleThreadTaskRunner::PostTaskNow(). As a result, it's
589 // unnecessary to call WakeUp() for each worker (in fact, an extraneous
590 // WakeUp() would be racy and wrong - see https://crbug.com/862582).
591 for (scoped_refptr<WorkerThread> worker : workers_to_start) {
592 worker->Start(io_thread_task_runner_, worker_thread_observer_);
593 }
594 }
595
DidUpdateCanRunPolicy()596 void PooledSingleThreadTaskRunnerManager::DidUpdateCanRunPolicy() {
597 decltype(workers_) workers_to_update;
598
599 {
600 CheckedAutoLock auto_lock(lock_);
601 if (!started_)
602 return;
603 workers_to_update = workers_;
604 }
605 // Any worker created after the lock is released will see the latest
606 // CanRunPolicy if tasks are posted to it and thus doesn't need a
607 // DidUpdateCanRunPolicy() notification.
608 for (auto& worker : workers_to_update) {
609 static_cast<WorkerThreadDelegate*>(worker->delegate())
610 ->DidUpdateCanRunPolicy();
611 }
612 }
613
614 scoped_refptr<SingleThreadTaskRunner>
CreateSingleThreadTaskRunner(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)615 PooledSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunner(
616 const TaskTraits& traits,
617 SingleThreadTaskRunnerThreadMode thread_mode) {
618 return CreateTaskRunnerImpl<WorkerThreadDelegate>(traits, thread_mode);
619 }
620
621 #if BUILDFLAG(IS_WIN)
622 scoped_refptr<SingleThreadTaskRunner>
CreateCOMSTATaskRunner(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)623 PooledSingleThreadTaskRunnerManager::CreateCOMSTATaskRunner(
624 const TaskTraits& traits,
625 SingleThreadTaskRunnerThreadMode thread_mode) {
626 return CreateTaskRunnerImpl<WorkerThreadCOMDelegate>(traits, thread_mode);
627 }
628 #endif // BUILDFLAG(IS_WIN)
629
630 // static
631 PooledSingleThreadTaskRunnerManager::ContinueOnShutdown
TraitsToContinueOnShutdown(const TaskTraits & traits)632 PooledSingleThreadTaskRunnerManager::TraitsToContinueOnShutdown(
633 const TaskTraits& traits) {
634 if (traits.shutdown_behavior() == TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN)
635 return IS_CONTINUE_ON_SHUTDOWN;
636 return IS_NOT_CONTINUE_ON_SHUTDOWN;
637 }
638
639 template <typename DelegateType>
640 scoped_refptr<PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunner>
CreateTaskRunnerImpl(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)641 PooledSingleThreadTaskRunnerManager::CreateTaskRunnerImpl(
642 const TaskTraits& traits,
643 SingleThreadTaskRunnerThreadMode thread_mode) {
644 DCHECK(thread_mode != SingleThreadTaskRunnerThreadMode::SHARED ||
645 !traits.with_base_sync_primitives())
646 << "Using WithBaseSyncPrimitives() on a shared SingleThreadTaskRunner "
647 "may cause deadlocks. Either reevaluate your usage (e.g. use "
648 "SequencedTaskRunner) or use "
649 "SingleThreadTaskRunnerThreadMode::DEDICATED.";
650 // To simplify the code, |dedicated_worker| is a local only variable that
651 // allows the code to treat both the DEDICATED and SHARED cases similarly for
652 // SingleThreadTaskRunnerThreadMode. In DEDICATED, the scoped_refptr is backed
653 // by a local variable and in SHARED, the scoped_refptr is backed by a member
654 // variable.
655 WorkerThread* dedicated_worker = nullptr;
656 WorkerThread*& worker =
657 thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
658 ? dedicated_worker
659 : GetSharedWorkerThreadForTraits<DelegateType>(traits);
660 bool new_worker = false;
661 bool started;
662 {
663 CheckedAutoLock auto_lock(lock_);
664 if (!worker) {
665 const auto& environment_params =
666 kEnvironmentParams[GetEnvironmentIndexForTraits(traits)];
667 std::string worker_name;
668 if (thread_mode == SingleThreadTaskRunnerThreadMode::SHARED)
669 worker_name += "Shared";
670 worker_name += environment_params.name_suffix;
671 worker = CreateAndRegisterWorkerThread<DelegateType>(
672 worker_name, thread_mode, environment_params.thread_type_hint);
673 new_worker = true;
674 }
675 started = started_;
676 }
677
678 if (new_worker && started)
679 worker->Start(io_thread_task_runner_, worker_thread_observer_);
680
681 return MakeRefCounted<PooledSingleThreadTaskRunner>(this, traits, worker,
682 thread_mode);
683 }
684
JoinForTesting()685 void PooledSingleThreadTaskRunnerManager::JoinForTesting() {
686 decltype(workers_) local_workers;
687 {
688 CheckedAutoLock auto_lock(lock_);
689 local_workers = std::move(workers_);
690 }
691
692 for (const auto& worker : local_workers) {
693 static_cast<WorkerThreadDelegate*>(worker->delegate())
694 ->EnableFlushPriorityQueueTaskSourcesOnDestroyForTesting();
695 worker->JoinForTesting();
696 }
697
698 {
699 CheckedAutoLock auto_lock(lock_);
700 DCHECK(workers_.empty())
701 << "New worker(s) unexpectedly registered during join.";
702 workers_ = std::move(local_workers);
703 }
704
705 // Release shared WorkerThreads at the end so they get joined above. If
706 // this call happens before the joins, the WorkerThreads are effectively
707 // detached and may outlive the PooledSingleThreadTaskRunnerManager.
708 ReleaseSharedWorkerThreads();
709 }
710
711 template <>
712 std::unique_ptr<WorkerThreadDelegate>
CreateWorkerThreadDelegate(const std::string & name,int id,SingleThreadTaskRunnerThreadMode thread_mode)713 PooledSingleThreadTaskRunnerManager::CreateWorkerThreadDelegate<
714 WorkerThreadDelegate>(const std::string& name,
715 int id,
716 SingleThreadTaskRunnerThreadMode thread_mode) {
717 return std::make_unique<WorkerThreadDelegate>(
718 StringPrintf("ThreadPoolSingleThread%s%d", name.c_str(), id),
719 thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
720 ? WorkerThread::ThreadLabel::DEDICATED
721 : WorkerThread::ThreadLabel::SHARED,
722 task_tracker_);
723 }
724
725 #if BUILDFLAG(IS_WIN)
726 template <>
727 std::unique_ptr<WorkerThreadDelegate>
CreateWorkerThreadDelegate(const std::string & name,int id,SingleThreadTaskRunnerThreadMode thread_mode)728 PooledSingleThreadTaskRunnerManager::CreateWorkerThreadDelegate<
729 WorkerThreadCOMDelegate>(const std::string& name,
730 int id,
731 SingleThreadTaskRunnerThreadMode thread_mode) {
732 return std::make_unique<WorkerThreadCOMDelegate>(
733 StringPrintf("ThreadPoolSingleThreadCOMSTA%s%d", name.c_str(), id),
734 thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
735 ? WorkerThread::ThreadLabel::DEDICATED_COM
736 : WorkerThread::ThreadLabel::SHARED_COM,
737 task_tracker_);
738 }
739 #endif // BUILDFLAG(IS_WIN)
740
741 template <typename DelegateType>
742 WorkerThread*
CreateAndRegisterWorkerThread(const std::string & name,SingleThreadTaskRunnerThreadMode thread_mode,ThreadType thread_type_hint)743 PooledSingleThreadTaskRunnerManager::CreateAndRegisterWorkerThread(
744 const std::string& name,
745 SingleThreadTaskRunnerThreadMode thread_mode,
746 ThreadType thread_type_hint) {
747 int id = next_worker_id_++;
748 std::unique_ptr<WorkerThreadDelegate> delegate =
749 CreateWorkerThreadDelegate<DelegateType>(name, id, thread_mode);
750 WorkerThreadDelegate* delegate_raw = delegate.get();
751 scoped_refptr<WorkerThread> worker = MakeRefCounted<WorkerThread>(
752 thread_type_hint, std::move(delegate), task_tracker_, workers_.size());
753 delegate_raw->set_worker(worker.get());
754 workers_.emplace_back(std::move(worker));
755 return workers_.back().get();
756 }
757
758 template <>
759 WorkerThread*&
GetSharedWorkerThreadForTraits(const TaskTraits & traits)760 PooledSingleThreadTaskRunnerManager::GetSharedWorkerThreadForTraits<
761 WorkerThreadDelegate>(const TaskTraits& traits) {
762 return shared_worker_threads_[GetEnvironmentIndexForTraits(traits)]
763 [TraitsToContinueOnShutdown(traits)];
764 }
765
766 #if BUILDFLAG(IS_WIN)
767 template <>
768 WorkerThread*&
GetSharedWorkerThreadForTraits(const TaskTraits & traits)769 PooledSingleThreadTaskRunnerManager::GetSharedWorkerThreadForTraits<
770 WorkerThreadCOMDelegate>(const TaskTraits& traits) {
771 return shared_com_worker_threads_[GetEnvironmentIndexForTraits(traits)]
772 [TraitsToContinueOnShutdown(traits)];
773 }
774 #endif // BUILDFLAG(IS_WIN)
775
UnregisterWorkerThread(WorkerThread * worker)776 void PooledSingleThreadTaskRunnerManager::UnregisterWorkerThread(
777 WorkerThread* worker) {
778 // Cleanup uses a CheckedLock, so call Cleanup() after releasing |lock_|.
779 scoped_refptr<WorkerThread> worker_to_destroy;
780 {
781 CheckedAutoLock auto_lock(lock_);
782
783 // Skip when joining (the join logic takes care of the rest).
784 if (workers_.empty())
785 return;
786
787 auto worker_iter = ranges::find(workers_, worker);
788 DCHECK(worker_iter != workers_.end());
789 worker_to_destroy = std::move(*worker_iter);
790 workers_.erase(worker_iter);
791 }
792 worker_to_destroy->Cleanup();
793 }
794
ReleaseSharedWorkerThreads()795 void PooledSingleThreadTaskRunnerManager::ReleaseSharedWorkerThreads() {
796 decltype(shared_worker_threads_) local_shared_worker_threads;
797 #if BUILDFLAG(IS_WIN)
798 decltype(shared_com_worker_threads_) local_shared_com_worker_threads;
799 #endif
800 {
801 CheckedAutoLock auto_lock(lock_);
802 for (size_t i = 0; i < std::size(shared_worker_threads_); ++i) {
803 for (size_t j = 0; j < std::size(shared_worker_threads_[i]); ++j) {
804 local_shared_worker_threads[i][j] = shared_worker_threads_[i][j];
805 shared_worker_threads_[i][j] = nullptr;
806 #if BUILDFLAG(IS_WIN)
807 local_shared_com_worker_threads[i][j] =
808 shared_com_worker_threads_[i][j];
809 shared_com_worker_threads_[i][j] = nullptr;
810 #endif
811 }
812 }
813 }
814
815 for (size_t i = 0; i < std::size(local_shared_worker_threads); ++i) {
816 for (size_t j = 0; j < std::size(local_shared_worker_threads[i]); ++j) {
817 if (local_shared_worker_threads[i][j])
818 UnregisterWorkerThread(local_shared_worker_threads[i][j]);
819 #if BUILDFLAG(IS_WIN)
820 if (local_shared_com_worker_threads[i][j])
821 UnregisterWorkerThread(local_shared_com_worker_threads[i][j]);
822 #endif
823 }
824 }
825 }
826
827 } // namespace internal
828 } // namespace base
829