• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/pooled_single_thread_task_runner_manager.h"
6 
7 #include <memory>
8 #include <string>
9 #include <utility>
10 
11 #include "base/debug/leak_annotations.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/memory/raw_ptr.h"
16 #include "base/ranges/algorithm.h"
17 #include "base/strings/stringprintf.h"
18 #include "base/synchronization/atomic_flag.h"
19 #include "base/task/default_delayed_task_handle_delegate.h"
20 #include "base/task/single_thread_task_runner.h"
21 #include "base/task/task_features.h"
22 #include "base/task/task_traits.h"
23 #include "base/task/thread_pool/delayed_task_manager.h"
24 #include "base/task/thread_pool/priority_queue.h"
25 #include "base/task/thread_pool/sequence.h"
26 #include "base/task/thread_pool/task.h"
27 #include "base/task/thread_pool/task_source.h"
28 #include "base/task/thread_pool/task_tracker.h"
29 #include "base/task/thread_pool/worker_thread.h"
30 #include "base/threading/platform_thread.h"
31 #include "base/time/time.h"
32 #include "build/build_config.h"
33 
34 #if BUILDFLAG(IS_WIN)
35 #include <windows.h>
36 
37 #include "base/win/scoped_com_initializer.h"
38 #endif  // BUILDFLAG(IS_WIN)
39 
40 namespace base {
41 namespace internal {
42 
43 namespace {
44 
45 // Boolean indicating whether there's a PooledSingleThreadTaskRunnerManager
46 // instance alive in this process. This variable should only be set when the
47 // PooledSingleThreadTaskRunnerManager instance is brought up (on the main
48 // thread; before any tasks are posted) and decremented when the instance is
49 // brought down (i.e., only when unit tests tear down the task environment and
50 // never in production). This makes the variable const while worker threads are
51 // up and as such it doesn't need to be atomic. It is used to tell when a task
52 // is posted from the main thread after the task environment was brought down in
53 // unit tests so that PooledSingleThreadTaskRunnerManager bound TaskRunners
54 // can return false on PostTask, letting such callers know they should complete
55 // necessary work synchronously. Note: |!g_manager_is_alive| is generally
56 // equivalent to |!ThreadPoolInstance::Get()| but has the advantage of being
57 // valid in thread_pool unit tests that don't instantiate a full
58 // thread pool.
59 bool g_manager_is_alive = false;
60 
61 bool g_use_utility_thread_group = false;
62 
GetEnvironmentIndexForTraits(const TaskTraits & traits)63 size_t GetEnvironmentIndexForTraits(const TaskTraits& traits) {
64   const bool is_background =
65       traits.priority() == TaskPriority::BEST_EFFORT &&
66       traits.thread_policy() == ThreadPolicy::PREFER_BACKGROUND &&
67       CanUseBackgroundThreadTypeForWorkerThread();
68   const bool is_utility =
69       !is_background && traits.priority() <= TaskPriority::USER_VISIBLE &&
70       traits.thread_policy() == ThreadPolicy::PREFER_BACKGROUND &&
71       g_use_utility_thread_group;
72   if (traits.may_block() || traits.with_base_sync_primitives()) {
73     return is_background ? BACKGROUND_BLOCKING
74            : is_utility  ? UTILITY_BLOCKING
75                          : FOREGROUND_BLOCKING;
76   }
77   return is_background ? BACKGROUND : is_utility ? UTILITY : FOREGROUND;
78 }
79 
80 // Allows for checking the PlatformThread::CurrentRef() against a set
81 // PlatformThreadRef atomically without using locks.
82 class AtomicThreadRefChecker {
83  public:
84   AtomicThreadRefChecker() = default;
85   AtomicThreadRefChecker(const AtomicThreadRefChecker&) = delete;
86   AtomicThreadRefChecker& operator=(const AtomicThreadRefChecker&) = delete;
87   ~AtomicThreadRefChecker() = default;
88 
Set()89   void Set() {
90     thread_ref_ = PlatformThread::CurrentRef();
91     is_set_.Set();
92   }
93 
IsCurrentThreadSameAsSetThread()94   bool IsCurrentThreadSameAsSetThread() {
95     return is_set_.IsSet() && thread_ref_ == PlatformThread::CurrentRef();
96   }
97 
98  private:
99   AtomicFlag is_set_;
100   PlatformThreadRef thread_ref_;
101 };
102 
103 class WorkerThreadDelegate : public WorkerThread::Delegate {
104  public:
WorkerThreadDelegate(const std::string & thread_name,WorkerThread::ThreadLabel thread_label,TrackedRef<TaskTracker> task_tracker)105   WorkerThreadDelegate(const std::string& thread_name,
106                        WorkerThread::ThreadLabel thread_label,
107                        TrackedRef<TaskTracker> task_tracker)
108       : task_tracker_(std::move(task_tracker)),
109         thread_name_(thread_name),
110         thread_label_(thread_label) {}
111   WorkerThreadDelegate(const WorkerThreadDelegate&) = delete;
112   WorkerThreadDelegate& operator=(const WorkerThreadDelegate&) = delete;
113 
set_worker(WorkerThread * worker)114   void set_worker(WorkerThread* worker) {
115     DCHECK(!worker_);
116     worker_ = worker;
117   }
118 
GetThreadLabel() const119   WorkerThread::ThreadLabel GetThreadLabel() const final {
120     return thread_label_;
121   }
122 
OnMainEntry(WorkerThread *)123   void OnMainEntry(WorkerThread* /* worker */) override {
124     thread_ref_checker_.Set();
125     PlatformThread::SetName(thread_name_);
126   }
127 
GetWork(WorkerThread * worker)128   RegisteredTaskSource GetWork(WorkerThread* worker) override {
129     CheckedAutoLock auto_lock(lock_);
130     DCHECK(worker_awake_);
131 
132     auto task_source = GetWorkLockRequired(worker);
133     if (!task_source) {
134       // The worker will sleep after this returns nullptr.
135       worker_awake_ = false;
136       return nullptr;
137     }
138     auto run_status = task_source.WillRunTask();
139     DCHECK_NE(run_status, TaskSource::RunStatus::kDisallowed);
140     return task_source;
141   }
142 
DidProcessTask(RegisteredTaskSource task_source)143   void DidProcessTask(RegisteredTaskSource task_source) override {
144     if (task_source) {
145       auto task_source_with_transaction =
146           TransactionWithRegisteredTaskSource::FromTaskSource(
147               std::move(task_source));
148       task_source_with_transaction.task_source.WillReEnqueue(
149           TimeTicks::Now(), &task_source_with_transaction.transaction);
150       EnqueueTaskSource(std::move(task_source_with_transaction));
151     }
152   }
153 
GetSleepTimeout()154   TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
155 
PostTaskNow(scoped_refptr<Sequence> sequence,Task task)156   bool PostTaskNow(scoped_refptr<Sequence> sequence, Task task) {
157     auto transaction = sequence->BeginTransaction();
158 
159     // |task| will be pushed to |sequence|, and |sequence| will be queued
160     // to |priority_queue_| iff |sequence_should_be_queued| is true.
161     const bool sequence_should_be_queued = transaction.WillPushImmediateTask();
162     RegisteredTaskSource task_source;
163     if (sequence_should_be_queued) {
164       task_source = task_tracker_->RegisterTaskSource(sequence);
165       // We shouldn't push |task| if we're not allowed to queue |task_source|.
166       if (!task_source)
167         return false;
168     }
169     if (!task_tracker_->WillPostTaskNow(task, transaction.traits().priority()))
170       return false;
171     transaction.PushImmediateTask(std::move(task));
172     if (task_source) {
173       bool should_wakeup =
174           EnqueueTaskSource({std::move(task_source), std::move(transaction)});
175       if (should_wakeup)
176         worker_->WakeUp();
177     }
178     return true;
179   }
180 
RunsTasksInCurrentSequence()181   bool RunsTasksInCurrentSequence() {
182     // We check the thread ref instead of the sequence for the benefit of COM
183     // callbacks which may execute without a sequence context.
184     return thread_ref_checker_.IsCurrentThreadSameAsSetThread();
185   }
186 
OnMainExit(WorkerThread *)187   void OnMainExit(WorkerThread* /* worker */) override {}
188 
DidUpdateCanRunPolicy()189   void DidUpdateCanRunPolicy() {
190     bool should_wakeup = false;
191     {
192       CheckedAutoLock auto_lock(lock_);
193       if (!worker_awake_ && CanRunNextTaskSource()) {
194         should_wakeup = true;
195         worker_awake_ = true;
196       }
197     }
198     if (should_wakeup)
199       worker_->WakeUp();
200   }
201 
EnableFlushPriorityQueueTaskSourcesOnDestroyForTesting()202   void EnableFlushPriorityQueueTaskSourcesOnDestroyForTesting() {
203     CheckedAutoLock auto_lock(lock_);
204     priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
205   }
206 
207  protected:
GetWorkLockRequired(WorkerThread * worker)208   RegisteredTaskSource GetWorkLockRequired(WorkerThread* worker)
209       EXCLUSIVE_LOCKS_REQUIRED(lock_) {
210     if (!CanRunNextTaskSource()) {
211       return nullptr;
212     }
213     return priority_queue_.PopTaskSource();
214   }
215 
task_tracker()216   const TrackedRef<TaskTracker>& task_tracker() { return task_tracker_; }
217 
218   CheckedLock lock_;
219   bool worker_awake_ GUARDED_BY(lock_) = false;
220 
221   const TrackedRef<TaskTracker> task_tracker_;
222 
223  private:
224   // Enqueues a task source in this single-threaded worker's priority queue.
225   // Returns true iff the worker must wakeup, i.e. task source is allowed to run
226   // and the worker was not awake.
EnqueueTaskSource(TransactionWithRegisteredTaskSource transaction_with_task_source)227   bool EnqueueTaskSource(
228       TransactionWithRegisteredTaskSource transaction_with_task_source) {
229     CheckedAutoLock auto_lock(lock_);
230     auto sort_key = transaction_with_task_source.task_source->GetSortKey();
231     // When moving |task_source| into |priority_queue_|, it may be destroyed
232     // on another thread as soon as |lock_| is released, since we're no longer
233     // holding a reference to it. To prevent UAF, release |transaction| before
234     // moving |task_source|. Ref. crbug.com/1412008
235     transaction_with_task_source.transaction.Release();
236     priority_queue_.Push(std::move(transaction_with_task_source.task_source),
237                          sort_key);
238     if (!worker_awake_ && CanRunNextTaskSource()) {
239       worker_awake_ = true;
240       return true;
241     }
242     return false;
243   }
244 
CanRunNextTaskSource()245   bool CanRunNextTaskSource() EXCLUSIVE_LOCKS_REQUIRED(lock_) {
246     return !priority_queue_.IsEmpty() &&
247            task_tracker_->CanRunPriority(
248                priority_queue_.PeekSortKey().priority());
249   }
250 
251   const std::string thread_name_;
252   const WorkerThread::ThreadLabel thread_label_;
253 
254   // The WorkerThread that has |this| as a delegate. Must be set before
255   // starting or posting a task to the WorkerThread, because it's used in
256   // OnMainEntry() and PostTaskNow().
257   raw_ptr<WorkerThread> worker_ = nullptr;
258 
259   PriorityQueue priority_queue_ GUARDED_BY(lock_);
260 
261   AtomicThreadRefChecker thread_ref_checker_;
262 };
263 
264 #if BUILDFLAG(IS_WIN)
265 
266 class WorkerThreadCOMDelegate : public WorkerThreadDelegate {
267  public:
WorkerThreadCOMDelegate(const std::string & thread_name,WorkerThread::ThreadLabel thread_label,TrackedRef<TaskTracker> task_tracker)268   WorkerThreadCOMDelegate(const std::string& thread_name,
269                           WorkerThread::ThreadLabel thread_label,
270                           TrackedRef<TaskTracker> task_tracker)
271       : WorkerThreadDelegate(thread_name,
272                              thread_label,
273                              std::move(task_tracker)) {}
274 
275   WorkerThreadCOMDelegate(const WorkerThreadCOMDelegate&) = delete;
276   WorkerThreadCOMDelegate& operator=(const WorkerThreadCOMDelegate&) = delete;
~WorkerThreadCOMDelegate()277   ~WorkerThreadCOMDelegate() override { DCHECK(!scoped_com_initializer_); }
278 
279   // WorkerThread::Delegate:
OnMainEntry(WorkerThread * worker)280   void OnMainEntry(WorkerThread* worker) override {
281     WorkerThreadDelegate::OnMainEntry(worker);
282 
283     scoped_com_initializer_ = std::make_unique<win::ScopedCOMInitializer>();
284   }
285 
GetWork(WorkerThread * worker)286   RegisteredTaskSource GetWork(WorkerThread* worker) override {
287     // This scheme below allows us to cover the following scenarios:
288     // * Only WorkerThreadDelegate::GetWork() has work:
289     //   Always return the task source from GetWork().
290     // * Only the Windows Message Queue has work:
291     //   Always return the task source from GetWorkFromWindowsMessageQueue();
292     // * Both WorkerThreadDelegate::GetWork() and the Windows Message Queue
293     //   have work:
294     //   Process task sources from each source round-robin style.
295     CheckedAutoLock auto_lock(lock_);
296 
297     // |worker_awake_| is always set before a call to WakeUp(), but it is
298     // not set when messages are added to the Windows Message Queue. Ensure that
299     // it is set before getting work, to avoid unnecessary wake ups.
300     //
301     // Note: It wouldn't be sufficient to set |worker_awake_| in WaitForWork()
302     // when MsgWaitForMultipleObjectsEx() indicates that it was woken up by a
303     // Windows Message, because of the following scenario:
304     //  T1: PostTask
305     //      Queue task
306     //      Set |worker_awake_| to true
307     //  T2: Woken up by a Windows Message
308     //      Set |worker_awake_| to true
309     //      Run the task posted by T1
310     //      Wait for work
311     //  T1: WakeUp()
312     //  T2: Woken up by Waitable Event
313     //      Does not set |worker_awake_| (wake up not from Windows Message)
314     //      GetWork
315     //      !! Getting work while |worker_awake_| is false !!
316     worker_awake_ = true;
317     RegisteredTaskSource task_source;
318     if (get_work_first_) {
319       task_source = WorkerThreadDelegate::GetWorkLockRequired(worker);
320       if (task_source)
321         get_work_first_ = false;
322     }
323 
324     if (!task_source) {
325       CheckedAutoUnlock auto_unlock(lock_);
326       task_source = GetWorkFromWindowsMessageQueue();
327       if (task_source)
328         get_work_first_ = true;
329     }
330 
331     if (!task_source && !get_work_first_) {
332       // This case is important if we checked the Windows Message Queue first
333       // and found there was no work. We don't want to return null immediately
334       // as that could cause the thread to go to sleep while work is waiting via
335       // WorkerThreadDelegate::GetWork().
336       task_source = WorkerThreadDelegate::GetWorkLockRequired(worker);
337     }
338     if (!task_source) {
339       // The worker will sleep after this returns nullptr.
340       worker_awake_ = false;
341       return nullptr;
342     }
343     auto run_status = task_source.WillRunTask();
344     DCHECK_NE(run_status, TaskSource::RunStatus::kDisallowed);
345     return task_source;
346   }
347 
OnMainExit(WorkerThread *)348   void OnMainExit(WorkerThread* /* worker */) override {
349     scoped_com_initializer_.reset();
350   }
351 
WaitForWork(WaitableEvent * wake_up_event)352   void WaitForWork(WaitableEvent* wake_up_event) override {
353     DCHECK(wake_up_event);
354     const TimeDelta sleep_time = GetSleepTimeout();
355     const DWORD milliseconds_wait = checked_cast<DWORD>(
356         sleep_time.is_max() ? INFINITE : sleep_time.InMilliseconds());
357     const HANDLE wake_up_event_handle = wake_up_event->handle();
358     MsgWaitForMultipleObjectsEx(1, &wake_up_event_handle, milliseconds_wait,
359                                 QS_ALLINPUT, 0);
360   }
361 
362  private:
GetWorkFromWindowsMessageQueue()363   RegisteredTaskSource GetWorkFromWindowsMessageQueue() {
364     MSG msg;
365     if (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) != FALSE) {
366       Task pump_message_task(FROM_HERE,
367                              BindOnce(
368                                  [](MSG msg) {
369                                    TranslateMessage(&msg);
370                                    DispatchMessage(&msg);
371                                  },
372                                  std::move(msg)),
373                              TimeTicks::Now(), TimeDelta());
374       if (task_tracker()->WillPostTask(
375               &pump_message_task, TaskShutdownBehavior::SKIP_ON_SHUTDOWN)) {
376         auto transaction = message_pump_sequence_->BeginTransaction();
377         const bool sequence_should_be_queued =
378             transaction.WillPushImmediateTask();
379         DCHECK(sequence_should_be_queued)
380             << "GetWorkFromWindowsMessageQueue() does not expect "
381                "queueing of pump tasks.";
382         auto registered_task_source = task_tracker_->RegisterTaskSource(
383             std::move(message_pump_sequence_));
384         if (!registered_task_source)
385           return nullptr;
386         transaction.PushImmediateTask(std::move(pump_message_task));
387         return registered_task_source;
388       } else {
389         // `pump_message_task`'s destructor may run sequence-affine code, so it
390         // must be leaked when `WillPostTask` returns false.
391         auto leak = std::make_unique<Task>(std::move(pump_message_task));
392         ANNOTATE_LEAKING_OBJECT_PTR(leak.get());
393         leak.release();
394       }
395     }
396     return nullptr;
397   }
398 
399   bool get_work_first_ = true;
400   const scoped_refptr<Sequence> message_pump_sequence_ =
401       MakeRefCounted<Sequence>(TaskTraits{MayBlock()},
402                                nullptr,
403                                TaskSourceExecutionMode::kParallel);
404   std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_;
405 };
406 
407 #endif  // BUILDFLAG(IS_WIN)
408 
409 }  // namespace
410 
411 class PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunner
412     : public SingleThreadTaskRunner {
413  public:
414   // Constructs a PooledSingleThreadTaskRunner that indirectly controls the
415   // lifetime of a dedicated |worker| for |traits|.
PooledSingleThreadTaskRunner(PooledSingleThreadTaskRunnerManager * const outer,const TaskTraits & traits,WorkerThread * worker,SingleThreadTaskRunnerThreadMode thread_mode)416   PooledSingleThreadTaskRunner(PooledSingleThreadTaskRunnerManager* const outer,
417                                const TaskTraits& traits,
418                                WorkerThread* worker,
419                                SingleThreadTaskRunnerThreadMode thread_mode)
420       : outer_(outer),
421         worker_(worker),
422         thread_mode_(thread_mode),
423         sequence_(
424             MakeRefCounted<Sequence>(traits,
425                                      this,
426                                      TaskSourceExecutionMode::kSingleThread)) {
427     DCHECK(outer_);
428     DCHECK(worker_);
429   }
430   PooledSingleThreadTaskRunner(const PooledSingleThreadTaskRunner&) = delete;
431   PooledSingleThreadTaskRunner& operator=(const PooledSingleThreadTaskRunner&) =
432       delete;
433 
434   // SingleThreadTaskRunner:
PostDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)435   bool PostDelayedTask(const Location& from_here,
436                        OnceClosure closure,
437                        TimeDelta delay) override {
438     if (!g_manager_is_alive)
439       return false;
440 
441     Task task(from_here, std::move(closure), TimeTicks::Now(), delay,
442               GetDefaultTaskLeeway());
443     return PostTask(std::move(task));
444   }
445 
PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,const Location & from_here,OnceClosure closure,TimeTicks delayed_run_time,subtle::DelayPolicy delay_policy)446   bool PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,
447                          const Location& from_here,
448                          OnceClosure closure,
449                          TimeTicks delayed_run_time,
450                          subtle::DelayPolicy delay_policy) override {
451     if (!g_manager_is_alive)
452       return false;
453 
454     Task task(from_here, std::move(closure), TimeTicks::Now(), delayed_run_time,
455               GetDefaultTaskLeeway(), delay_policy);
456     return PostTask(std::move(task));
457   }
458 
PostNonNestableDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)459   bool PostNonNestableDelayedTask(const Location& from_here,
460                                   OnceClosure closure,
461                                   TimeDelta delay) override {
462     // Tasks are never nested within the thread pool.
463     return PostDelayedTask(from_here, std::move(closure), delay);
464   }
465 
RunsTasksInCurrentSequence() const466   bool RunsTasksInCurrentSequence() const override {
467     if (!g_manager_is_alive)
468       return false;
469     return GetDelegate()->RunsTasksInCurrentSequence();
470   }
471 
472  private:
~PooledSingleThreadTaskRunner()473   ~PooledSingleThreadTaskRunner() override {
474     // Only unregister if this is a DEDICATED SingleThreadTaskRunner. SHARED
475     // task runner WorkerThreads are managed separately as they are reused.
476     // |g_manager_is_alive| avoids a use-after-free should this
477     // PooledSingleThreadTaskRunner outlive its manager. It is safe to access
478     // |g_manager_is_alive| without synchronization primitives as it is const
479     // for the lifetime of the manager and ~PooledSingleThreadTaskRunner()
480     // either happens prior to the end of JoinForTesting() (which happens-before
481     // manager's destruction) or on main thread after the task environment's
482     // entire destruction (which happens-after the manager's destruction). Yes,
483     // there's a theoretical use case where the last ref to this
484     // PooledSingleThreadTaskRunner is handed to a thread not controlled by
485     // thread_pool and that this ends up causing
486     // ~PooledSingleThreadTaskRunner() to race with
487     // ~PooledSingleThreadTaskRunnerManager() but this is intentionally not
488     // supported (and it doesn't matter in production where we leak the task
489     // environment for such reasons). TSan should catch this weird paradigm
490     // should anyone elect to use it in a unit test and the error would point
491     // here.
492     if (g_manager_is_alive &&
493         thread_mode_ == SingleThreadTaskRunnerThreadMode::DEDICATED) {
494       outer_->UnregisterWorkerThread(worker_);
495     }
496   }
497 
PostTask(Task task)498   bool PostTask(Task task) {
499     if (!outer_->task_tracker_->WillPostTask(&task,
500                                              sequence_->shutdown_behavior())) {
501       // `task`'s destructor may run sequence-affine code, so it must be leaked
502       // when `WillPostTask` returns false.
503       auto leak = std::make_unique<Task>(std::move(task));
504       ANNOTATE_LEAKING_OBJECT_PTR(leak.get());
505       leak.release();
506       return false;
507     }
508 
509     if (task.delayed_run_time.is_null())
510       return GetDelegate()->PostTaskNow(sequence_, std::move(task));
511 
512     // Unretained(GetDelegate()) is safe because this TaskRunner and its
513     // worker are kept alive as long as there are pending Tasks.
514     outer_->delayed_task_manager_->AddDelayedTask(
515         std::move(task),
516         BindOnce(IgnoreResult(&WorkerThreadDelegate::PostTaskNow),
517                  Unretained(GetDelegate()), sequence_),
518         this);
519     return true;
520   }
521 
GetDelegate() const522   WorkerThreadDelegate* GetDelegate() const {
523     return static_cast<WorkerThreadDelegate*>(worker_->delegate());
524   }
525 
526   // Dangling but safe since use is controlled by `g_manager_is_alive`.
527   const raw_ptr<PooledSingleThreadTaskRunnerManager,
528                 DisableDanglingPtrDetection>
529       outer_;
530 
531   const raw_ptr<WorkerThread, DanglingUntriaged> worker_;
532   const SingleThreadTaskRunnerThreadMode thread_mode_;
533   const scoped_refptr<Sequence> sequence_;
534 };
535 
PooledSingleThreadTaskRunnerManager(TrackedRef<TaskTracker> task_tracker,DelayedTaskManager * delayed_task_manager)536 PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunnerManager(
537     TrackedRef<TaskTracker> task_tracker,
538     DelayedTaskManager* delayed_task_manager)
539     : task_tracker_(std::move(task_tracker)),
540       delayed_task_manager_(delayed_task_manager) {
541   DCHECK(task_tracker_);
542   DCHECK(delayed_task_manager_);
543 #if BUILDFLAG(IS_WIN)
544   static_assert(std::extent<decltype(shared_com_worker_threads_)>() ==
545                     std::extent<decltype(shared_worker_threads_)>(),
546                 "The size of |shared_com_worker_threads_| must match "
547                 "|shared_worker_threads_|");
548   static_assert(
549       std::extent<
550           std::remove_reference<decltype(shared_com_worker_threads_[0])>>() ==
551           std::extent<
552               std::remove_reference<decltype(shared_worker_threads_[0])>>(),
553       "The size of |shared_com_worker_threads_| must match "
554       "|shared_worker_threads_|");
555 #endif  // BUILDFLAG(IS_WIN)
556   DCHECK(!g_manager_is_alive);
557   g_manager_is_alive = true;
558 }
559 
~PooledSingleThreadTaskRunnerManager()560 PooledSingleThreadTaskRunnerManager::~PooledSingleThreadTaskRunnerManager() {
561   DCHECK(g_manager_is_alive);
562   g_manager_is_alive = false;
563   g_use_utility_thread_group = false;
564 }
565 
Start(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,WorkerThreadObserver * worker_thread_observer)566 void PooledSingleThreadTaskRunnerManager::Start(
567     scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
568     WorkerThreadObserver* worker_thread_observer) {
569   DCHECK(!worker_thread_observer_);
570   worker_thread_observer_ = worker_thread_observer;
571 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
572   DCHECK(io_thread_task_runner);
573   io_thread_task_runner_ = std::move(io_thread_task_runner);
574 #endif  // (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
575 
576   g_use_utility_thread_group = CanUseUtilityThreadTypeForWorkerThread() &&
577                                FeatureList::IsEnabled(kUseUtilityThreadGroup);
578 
579   decltype(workers_) workers_to_start;
580   {
581     CheckedAutoLock auto_lock(lock_);
582     started_ = true;
583     workers_to_start = workers_;
584   }
585 
586   // Start workers that were created before this method was called.
587   // Workers that already need to wake up are already signaled as part of
588   // PooledSingleThreadTaskRunner::PostTaskNow(). As a result, it's
589   // unnecessary to call WakeUp() for each worker (in fact, an extraneous
590   // WakeUp() would be racy and wrong - see https://crbug.com/862582).
591   for (scoped_refptr<WorkerThread> worker : workers_to_start) {
592     worker->Start(io_thread_task_runner_, worker_thread_observer_);
593   }
594 }
595 
DidUpdateCanRunPolicy()596 void PooledSingleThreadTaskRunnerManager::DidUpdateCanRunPolicy() {
597   decltype(workers_) workers_to_update;
598 
599   {
600     CheckedAutoLock auto_lock(lock_);
601     if (!started_)
602       return;
603     workers_to_update = workers_;
604   }
605   // Any worker created after the lock is released will see the latest
606   // CanRunPolicy if tasks are posted to it and thus doesn't need a
607   // DidUpdateCanRunPolicy() notification.
608   for (auto& worker : workers_to_update) {
609     static_cast<WorkerThreadDelegate*>(worker->delegate())
610         ->DidUpdateCanRunPolicy();
611   }
612 }
613 
614 scoped_refptr<SingleThreadTaskRunner>
CreateSingleThreadTaskRunner(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)615 PooledSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunner(
616     const TaskTraits& traits,
617     SingleThreadTaskRunnerThreadMode thread_mode) {
618   return CreateTaskRunnerImpl<WorkerThreadDelegate>(traits, thread_mode);
619 }
620 
621 #if BUILDFLAG(IS_WIN)
622 scoped_refptr<SingleThreadTaskRunner>
CreateCOMSTATaskRunner(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)623 PooledSingleThreadTaskRunnerManager::CreateCOMSTATaskRunner(
624     const TaskTraits& traits,
625     SingleThreadTaskRunnerThreadMode thread_mode) {
626   return CreateTaskRunnerImpl<WorkerThreadCOMDelegate>(traits, thread_mode);
627 }
628 #endif  // BUILDFLAG(IS_WIN)
629 
630 // static
631 PooledSingleThreadTaskRunnerManager::ContinueOnShutdown
TraitsToContinueOnShutdown(const TaskTraits & traits)632 PooledSingleThreadTaskRunnerManager::TraitsToContinueOnShutdown(
633     const TaskTraits& traits) {
634   if (traits.shutdown_behavior() == TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN)
635     return IS_CONTINUE_ON_SHUTDOWN;
636   return IS_NOT_CONTINUE_ON_SHUTDOWN;
637 }
638 
639 template <typename DelegateType>
640 scoped_refptr<PooledSingleThreadTaskRunnerManager::PooledSingleThreadTaskRunner>
CreateTaskRunnerImpl(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)641 PooledSingleThreadTaskRunnerManager::CreateTaskRunnerImpl(
642     const TaskTraits& traits,
643     SingleThreadTaskRunnerThreadMode thread_mode) {
644   DCHECK(thread_mode != SingleThreadTaskRunnerThreadMode::SHARED ||
645          !traits.with_base_sync_primitives())
646       << "Using WithBaseSyncPrimitives() on a shared SingleThreadTaskRunner "
647          "may cause deadlocks. Either reevaluate your usage (e.g. use "
648          "SequencedTaskRunner) or use "
649          "SingleThreadTaskRunnerThreadMode::DEDICATED.";
650   // To simplify the code, |dedicated_worker| is a local only variable that
651   // allows the code to treat both the DEDICATED and SHARED cases similarly for
652   // SingleThreadTaskRunnerThreadMode. In DEDICATED, the scoped_refptr is backed
653   // by a local variable and in SHARED, the scoped_refptr is backed by a member
654   // variable.
655   WorkerThread* dedicated_worker = nullptr;
656   WorkerThread*& worker =
657       thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
658           ? dedicated_worker
659           : GetSharedWorkerThreadForTraits<DelegateType>(traits);
660   bool new_worker = false;
661   bool started;
662   {
663     CheckedAutoLock auto_lock(lock_);
664     if (!worker) {
665       const auto& environment_params =
666           kEnvironmentParams[GetEnvironmentIndexForTraits(traits)];
667       std::string worker_name;
668       if (thread_mode == SingleThreadTaskRunnerThreadMode::SHARED)
669         worker_name += "Shared";
670       worker_name += environment_params.name_suffix;
671       worker = CreateAndRegisterWorkerThread<DelegateType>(
672           worker_name, thread_mode, environment_params.thread_type_hint);
673       new_worker = true;
674     }
675     started = started_;
676   }
677 
678   if (new_worker && started)
679     worker->Start(io_thread_task_runner_, worker_thread_observer_);
680 
681   return MakeRefCounted<PooledSingleThreadTaskRunner>(this, traits, worker,
682                                                       thread_mode);
683 }
684 
JoinForTesting()685 void PooledSingleThreadTaskRunnerManager::JoinForTesting() {
686   decltype(workers_) local_workers;
687   {
688     CheckedAutoLock auto_lock(lock_);
689     local_workers = std::move(workers_);
690   }
691 
692   for (const auto& worker : local_workers) {
693     static_cast<WorkerThreadDelegate*>(worker->delegate())
694         ->EnableFlushPriorityQueueTaskSourcesOnDestroyForTesting();
695     worker->JoinForTesting();
696   }
697 
698   {
699     CheckedAutoLock auto_lock(lock_);
700     DCHECK(workers_.empty())
701         << "New worker(s) unexpectedly registered during join.";
702     workers_ = std::move(local_workers);
703   }
704 
705   // Release shared WorkerThreads at the end so they get joined above. If
706   // this call happens before the joins, the WorkerThreads are effectively
707   // detached and may outlive the PooledSingleThreadTaskRunnerManager.
708   ReleaseSharedWorkerThreads();
709 }
710 
711 template <>
712 std::unique_ptr<WorkerThreadDelegate>
CreateWorkerThreadDelegate(const std::string & name,int id,SingleThreadTaskRunnerThreadMode thread_mode)713 PooledSingleThreadTaskRunnerManager::CreateWorkerThreadDelegate<
714     WorkerThreadDelegate>(const std::string& name,
715                           int id,
716                           SingleThreadTaskRunnerThreadMode thread_mode) {
717   return std::make_unique<WorkerThreadDelegate>(
718       StringPrintf("ThreadPoolSingleThread%s%d", name.c_str(), id),
719       thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
720           ? WorkerThread::ThreadLabel::DEDICATED
721           : WorkerThread::ThreadLabel::SHARED,
722       task_tracker_);
723 }
724 
725 #if BUILDFLAG(IS_WIN)
726 template <>
727 std::unique_ptr<WorkerThreadDelegate>
CreateWorkerThreadDelegate(const std::string & name,int id,SingleThreadTaskRunnerThreadMode thread_mode)728 PooledSingleThreadTaskRunnerManager::CreateWorkerThreadDelegate<
729     WorkerThreadCOMDelegate>(const std::string& name,
730                              int id,
731                              SingleThreadTaskRunnerThreadMode thread_mode) {
732   return std::make_unique<WorkerThreadCOMDelegate>(
733       StringPrintf("ThreadPoolSingleThreadCOMSTA%s%d", name.c_str(), id),
734       thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
735           ? WorkerThread::ThreadLabel::DEDICATED_COM
736           : WorkerThread::ThreadLabel::SHARED_COM,
737       task_tracker_);
738 }
739 #endif  // BUILDFLAG(IS_WIN)
740 
741 template <typename DelegateType>
742 WorkerThread*
CreateAndRegisterWorkerThread(const std::string & name,SingleThreadTaskRunnerThreadMode thread_mode,ThreadType thread_type_hint)743 PooledSingleThreadTaskRunnerManager::CreateAndRegisterWorkerThread(
744     const std::string& name,
745     SingleThreadTaskRunnerThreadMode thread_mode,
746     ThreadType thread_type_hint) {
747   int id = next_worker_id_++;
748   std::unique_ptr<WorkerThreadDelegate> delegate =
749       CreateWorkerThreadDelegate<DelegateType>(name, id, thread_mode);
750   WorkerThreadDelegate* delegate_raw = delegate.get();
751   scoped_refptr<WorkerThread> worker = MakeRefCounted<WorkerThread>(
752       thread_type_hint, std::move(delegate), task_tracker_, workers_.size());
753   delegate_raw->set_worker(worker.get());
754   workers_.emplace_back(std::move(worker));
755   return workers_.back().get();
756 }
757 
758 template <>
759 WorkerThread*&
GetSharedWorkerThreadForTraits(const TaskTraits & traits)760 PooledSingleThreadTaskRunnerManager::GetSharedWorkerThreadForTraits<
761     WorkerThreadDelegate>(const TaskTraits& traits) {
762   return shared_worker_threads_[GetEnvironmentIndexForTraits(traits)]
763                                [TraitsToContinueOnShutdown(traits)];
764 }
765 
766 #if BUILDFLAG(IS_WIN)
767 template <>
768 WorkerThread*&
GetSharedWorkerThreadForTraits(const TaskTraits & traits)769 PooledSingleThreadTaskRunnerManager::GetSharedWorkerThreadForTraits<
770     WorkerThreadCOMDelegate>(const TaskTraits& traits) {
771   return shared_com_worker_threads_[GetEnvironmentIndexForTraits(traits)]
772                                    [TraitsToContinueOnShutdown(traits)];
773 }
774 #endif  // BUILDFLAG(IS_WIN)
775 
UnregisterWorkerThread(WorkerThread * worker)776 void PooledSingleThreadTaskRunnerManager::UnregisterWorkerThread(
777     WorkerThread* worker) {
778   // Cleanup uses a CheckedLock, so call Cleanup() after releasing |lock_|.
779   scoped_refptr<WorkerThread> worker_to_destroy;
780   {
781     CheckedAutoLock auto_lock(lock_);
782 
783     // Skip when joining (the join logic takes care of the rest).
784     if (workers_.empty())
785       return;
786 
787     auto worker_iter = ranges::find(workers_, worker);
788     DCHECK(worker_iter != workers_.end());
789     worker_to_destroy = std::move(*worker_iter);
790     workers_.erase(worker_iter);
791   }
792   worker_to_destroy->Cleanup();
793 }
794 
ReleaseSharedWorkerThreads()795 void PooledSingleThreadTaskRunnerManager::ReleaseSharedWorkerThreads() {
796   decltype(shared_worker_threads_) local_shared_worker_threads;
797 #if BUILDFLAG(IS_WIN)
798   decltype(shared_com_worker_threads_) local_shared_com_worker_threads;
799 #endif
800   {
801     CheckedAutoLock auto_lock(lock_);
802     for (size_t i = 0; i < std::size(shared_worker_threads_); ++i) {
803       for (size_t j = 0; j < std::size(shared_worker_threads_[i]); ++j) {
804         local_shared_worker_threads[i][j] = shared_worker_threads_[i][j];
805         shared_worker_threads_[i][j] = nullptr;
806 #if BUILDFLAG(IS_WIN)
807         local_shared_com_worker_threads[i][j] =
808             shared_com_worker_threads_[i][j];
809         shared_com_worker_threads_[i][j] = nullptr;
810 #endif
811       }
812     }
813   }
814 
815   for (size_t i = 0; i < std::size(local_shared_worker_threads); ++i) {
816     for (size_t j = 0; j < std::size(local_shared_worker_threads[i]); ++j) {
817       if (local_shared_worker_threads[i][j])
818         UnregisterWorkerThread(local_shared_worker_threads[i][j]);
819 #if BUILDFLAG(IS_WIN)
820       if (local_shared_com_worker_threads[i][j])
821         UnregisterWorkerThread(local_shared_com_worker_threads[i][j]);
822 #endif
823     }
824   }
825 }
826 
827 }  // namespace internal
828 }  // namespace base
829