• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_H_
6 #define BASE_TASK_THREAD_POOL_THREAD_GROUP_H_
7 
8 #include <memory>
9 #include <vector>
10 
11 #include "base/base_export.h"
12 #include "base/memory/raw_ptr.h"
13 #include "base/task/common/checked_lock.h"
14 #include "base/task/thread_pool/priority_queue.h"
15 #include "base/task/thread_pool/task.h"
16 #include "base/task/thread_pool/task_source.h"
17 #include "base/task/thread_pool/tracked_ref.h"
18 #include "build/build_config.h"
19 #include "third_party/abseil-cpp/absl/types/optional.h"
20 
21 #if BUILDFLAG(IS_WIN)
22 #include "base/win/scoped_windows_thread_environment.h"
23 #endif
24 
25 namespace base {
26 namespace internal {
27 
28 class TaskTracker;
29 
30 // Interface and base implementation for a thread group. A thread group is a
31 // subset of the threads in the thread pool (see GetThreadGroupForTraits() for
32 // thread group selection logic when posting tasks and creating task runners).
33 class BASE_EXPORT ThreadGroup {
34  public:
35   // Delegate interface for ThreadGroup.
36   class BASE_EXPORT Delegate {
37    public:
38     virtual ~Delegate() = default;
39 
40     // Invoked when a TaskSource with |traits| is non-empty after the
41     // ThreadGroup has run a task from it. The implementation must return the
42     // thread group in which the TaskSource should be reenqueued.
43     virtual ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) = 0;
44   };
45 
46   enum class WorkerEnvironment {
47     // No special worker environment required.
48     NONE,
49 #if BUILDFLAG(IS_WIN)
50     // Initialize a COM MTA on the worker.
51     COM_MTA,
52 #endif  // BUILDFLAG(IS_WIN)
53   };
54 
55   ThreadGroup(const ThreadGroup&) = delete;
56   ThreadGroup& operator=(const ThreadGroup&) = delete;
57   virtual ~ThreadGroup();
58 
59   // Registers the thread group in TLS.
60   void BindToCurrentThread();
61 
62   // Resets the thread group in TLS.
63   void UnbindFromCurrentThread();
64 
65   // Returns true if the thread group is registered in TLS.
66   bool IsBoundToCurrentThread() const;
67 
68   // Removes |task_source| from |priority_queue_|. Returns a
69   // RegisteredTaskSource that evaluats to true if successful, or false if
70   // |task_source| is not currently in |priority_queue_|, such as when a worker
71   // is running a task from it.
72   RegisteredTaskSource RemoveTaskSource(const TaskSource& task_source);
73 
74   // Updates the position of the TaskSource in |transaction| in this
75   // ThreadGroup's PriorityQueue based on the TaskSource's current traits.
76   //
77   // Implementations should instantiate a concrete ScopedCommandsExecutor and
78   // invoke UpdateSortKeyImpl().
79   virtual void UpdateSortKey(TaskSource::Transaction transaction) = 0;
80 
81   // Pushes the TaskSource in |transaction_with_task_source| into this
82   // ThreadGroup's PriorityQueue and wakes up workers as appropriate.
83   //
84   // Implementations should instantiate a concrete ScopedCommandsExecutor and
85   // invoke PushTaskSourceAndWakeUpWorkersImpl().
86   virtual void PushTaskSourceAndWakeUpWorkers(
87       TransactionWithRegisteredTaskSource transaction_with_task_source) = 0;
88 
89   // Removes all task sources from this ThreadGroup's PriorityQueue and enqueues
90   // them in another |destination_thread_group|. After this method is called,
91   // any task sources posted to this ThreadGroup will be forwarded to
92   // |destination_thread_group|.
93   //
94   // TODO(crbug.com/756547): Remove this method once the UseNativeThreadPool
95   // experiment is complete.
96   void InvalidateAndHandoffAllTaskSourcesToOtherThreadGroup(
97       ThreadGroup* destination_thread_group);
98 
99   // Move all task sources except the ones with TaskPriority::USER_BLOCKING,
100   // from this ThreadGroup's PriorityQueue to the |destination_thread_group|'s.
101   void HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(
102       ThreadGroup* destination_thread_group);
103 
104   // Returns true if a task with |sort_key| running in this thread group should
105   // return ASAP, either because its priority is not allowed to run or because
106   // work of higher priority is pending. Thread-safe but may return an outdated
107   // result (if a task unnecessarily yields due to this, it will simply be
108   // re-scheduled).
109   bool ShouldYield(TaskSourceSortKey sort_key);
110 
111   // Prevents new tasks from starting to run and waits for currently running
112   // tasks to complete their execution. It is guaranteed that no thread will do
113   // work on behalf of this ThreadGroup after this returns. It is
114   // invalid to post a task once this is called. TaskTracker::Flush() can be
115   // called before this to complete existing tasks, which might otherwise post a
116   // task during JoinForTesting(). This can only be called once.
117   virtual void JoinForTesting() = 0;
118 
119   // Returns the maximum number of non-blocked tasks that can run concurrently
120   // in this ThreadGroup.
121   //
122   // TODO(fdoray): Remove this method. https://crbug.com/687264
123   virtual size_t GetMaxConcurrentNonBlockedTasksDeprecated() const = 0;
124 
125   // Wakes up workers as appropriate for the new CanRunPolicy policy. Must be
126   // called after an update to CanRunPolicy in TaskTracker.
127   virtual void DidUpdateCanRunPolicy() = 0;
128 
129   virtual void OnShutdownStarted() = 0;
130 
131   // Returns true if a thread group is registered in TLS. Used by diagnostic
132   // code to check whether it's inside a ThreadPool task.
133   static bool CurrentThreadHasGroup();
134 
135  protected:
136   // Derived classes must implement a ScopedCommandsExecutor that derives from
137   // this to perform operations at the end of a scope, when all locks have been
138   // released.
139   class BaseScopedCommandsExecutor {
140    public:
141     BaseScopedCommandsExecutor(const BaseScopedCommandsExecutor&) = delete;
142     BaseScopedCommandsExecutor& operator=(const BaseScopedCommandsExecutor&) =
143         delete;
144 
145     void ScheduleReleaseTaskSource(RegisteredTaskSource task_source);
146 
147    protected:
148     BaseScopedCommandsExecutor();
149     ~BaseScopedCommandsExecutor();
150 
151    private:
152     std::vector<RegisteredTaskSource> task_sources_to_release_;
153   };
154 
155   // Allows a task source to be pushed to a ThreadGroup's PriorityQueue at the
156   // end of a scope, when all locks have been released.
157   class ScopedReenqueueExecutor {
158    public:
159     ScopedReenqueueExecutor();
160     ScopedReenqueueExecutor(const ScopedReenqueueExecutor&) = delete;
161     ScopedReenqueueExecutor& operator=(const ScopedReenqueueExecutor&) = delete;
162     ~ScopedReenqueueExecutor();
163 
164     // A TransactionWithRegisteredTaskSource and the ThreadGroup in which it
165     // should be enqueued.
166     void SchedulePushTaskSourceAndWakeUpWorkers(
167         TransactionWithRegisteredTaskSource transaction_with_task_source,
168         ThreadGroup* destination_thread_group);
169 
170    private:
171     // A TransactionWithRegisteredTaskSource and the thread group in which it
172     // should be enqueued.
173     absl::optional<TransactionWithRegisteredTaskSource>
174         transaction_with_task_source_;
175     raw_ptr<ThreadGroup> destination_thread_group_ = nullptr;
176   };
177 
178   // |predecessor_thread_group| is a ThreadGroup whose lock can be acquired
179   // before the constructed ThreadGroup's lock. This is necessary to move all
180   // task sources from |predecessor_thread_group| to the constructed ThreadGroup
181   // and support the UseNativeThreadPool experiment.
182   //
183   // TODO(crbug.com/756547): Remove |predecessor_thread_group| once the
184   // experiment is complete.
185   ThreadGroup(TrackedRef<TaskTracker> task_tracker,
186               TrackedRef<Delegate> delegate,
187               ThreadGroup* predecessor_thread_group = nullptr);
188 
189 #if BUILDFLAG(IS_WIN)
190   static std::unique_ptr<win::ScopedWindowsThreadEnvironment>
191   GetScopedWindowsThreadEnvironment(WorkerEnvironment environment);
192 #endif
193 
194   const TrackedRef<TaskTracker> task_tracker_;
195   const TrackedRef<Delegate> delegate_;
196 
197   void Start();
198 
199   // Returns the number of workers required of workers to run all queued
200   // BEST_EFFORT task sources allowed to run by the current CanRunPolicy.
201   size_t GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const
202       EXCLUSIVE_LOCKS_REQUIRED(lock_);
203 
204   // Returns the number of workers required to run all queued
205   // USER_VISIBLE/USER_BLOCKING task sources allowed to run by the current
206   // CanRunPolicy.
207   size_t GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() const
208       EXCLUSIVE_LOCKS_REQUIRED(lock_);
209 
210   // Ensures that there are enough workers to run queued task sources.
211   // |executor| is forwarded from the one received in
212   // PushTaskSourceAndWakeUpWorkersImpl()
213   virtual void EnsureEnoughWorkersLockRequired(
214       BaseScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_) = 0;
215 
216   // Reenqueues a |transaction_with_task_source| from which a Task just ran in
217   // the current ThreadGroup into the appropriate ThreadGroup.
218   void ReEnqueueTaskSourceLockRequired(
219       BaseScopedCommandsExecutor* workers_executor,
220       ScopedReenqueueExecutor* reenqueue_executor,
221       TransactionWithRegisteredTaskSource transaction_with_task_source)
222       EXCLUSIVE_LOCKS_REQUIRED(lock_);
223 
224   // Returns the next task source from |priority_queue_| if permitted to run and
225   // pops |priority_queue_| if the task source returned no longer needs to be
226   // queued (reached its maximum concurrency). Otherwise returns nullptr and
227   // pops |priority_queue_| so this can be called again.
228   RegisteredTaskSource TakeRegisteredTaskSource(
229       BaseScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);
230 
231   // Must be invoked by implementations of the corresponding non-Impl() methods.
232   void UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor,
233                          TaskSource::Transaction transaction);
234   void PushTaskSourceAndWakeUpWorkersImpl(
235       BaseScopedCommandsExecutor* executor,
236       TransactionWithRegisteredTaskSource transaction_with_task_source);
237 
238   // Synchronizes accesses to all members of this class which are neither const,
239   // atomic, nor immutable after start. Since this lock is a bottleneck to post
240   // and schedule work, only simple data structure manipulations are allowed
241   // within its scope (no thread creation or wake up).
242   mutable CheckedLock lock_;
243 
GUARDED_BY(lock_)244   bool disable_fair_scheduling_ GUARDED_BY(lock_){false};
245 
246   // PriorityQueue from which all threads of this ThreadGroup get work.
247   PriorityQueue priority_queue_ GUARDED_BY(lock_);
248 
249   struct YieldSortKey {
250     TaskPriority priority;
251     uint8_t worker_count;
252   };
253   // Sort key which compares greater than or equal to any other sort key.
254   static constexpr YieldSortKey kMaxYieldSortKey = {TaskPriority::BEST_EFFORT,
255                                                     0U};
256 
257   // When the thread group is at or above capacity and has pending work, this is
258   // set to contain the priority and worker count of the next TaskSource to
259   // schedule, or kMaxYieldSortKey otherwise. This is used to decide whether a
260   // TaskSource should yield. Once ShouldYield() returns true, it is reset to
261   // kMaxYieldSortKey to prevent additional from unnecessary yielding. This is
262   // expected to be always kept up-to-date by derived classes when |lock_| is
263   // released. It is annotated as GUARDED_BY(lock_) because it is always updated
264   // under the lock (to avoid races with other state during the update) but it
265   // is nonetheless always safe to read it without the lock (since it's atomic).
GUARDED_BY(lock_)266   std::atomic<YieldSortKey> max_allowed_sort_key_ GUARDED_BY(lock_){
267       kMaxYieldSortKey};
268 
269   // If |replacement_thread_group_| is non-null, this ThreadGroup is invalid and
270   // all task sources should be scheduled on |replacement_thread_group_|. Used
271   // to support the UseNativeThreadPool experiment.
272   raw_ptr<ThreadGroup> replacement_thread_group_ = nullptr;
273 };
274 
275 }  // namespace internal
276 }  // namespace base
277 
278 #endif  // BASE_TASK_THREAD_POOL_THREAD_GROUP_H_
279