1 // Copyright 2016 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_H_ 6 #define BASE_TASK_THREAD_POOL_THREAD_GROUP_H_ 7 8 #include <memory> 9 #include <vector> 10 11 #include "base/base_export.h" 12 #include "base/memory/raw_ptr.h" 13 #include "base/task/common/checked_lock.h" 14 #include "base/task/thread_pool/priority_queue.h" 15 #include "base/task/thread_pool/task.h" 16 #include "base/task/thread_pool/task_source.h" 17 #include "base/task/thread_pool/tracked_ref.h" 18 #include "build/build_config.h" 19 #include "third_party/abseil-cpp/absl/types/optional.h" 20 21 #if BUILDFLAG(IS_WIN) 22 #include "base/win/scoped_windows_thread_environment.h" 23 #endif 24 25 namespace base { 26 namespace internal { 27 28 class TaskTracker; 29 30 // Interface and base implementation for a thread group. A thread group is a 31 // subset of the threads in the thread pool (see GetThreadGroupForTraits() for 32 // thread group selection logic when posting tasks and creating task runners). 33 class BASE_EXPORT ThreadGroup { 34 public: 35 // Delegate interface for ThreadGroup. 36 class BASE_EXPORT Delegate { 37 public: 38 virtual ~Delegate() = default; 39 40 // Invoked when a TaskSource with |traits| is non-empty after the 41 // ThreadGroup has run a task from it. The implementation must return the 42 // thread group in which the TaskSource should be reenqueued. 43 virtual ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) = 0; 44 }; 45 46 enum class WorkerEnvironment { 47 // No special worker environment required. 48 NONE, 49 #if BUILDFLAG(IS_WIN) 50 // Initialize a COM MTA on the worker. 51 COM_MTA, 52 #endif // BUILDFLAG(IS_WIN) 53 }; 54 55 ThreadGroup(const ThreadGroup&) = delete; 56 ThreadGroup& operator=(const ThreadGroup&) = delete; 57 virtual ~ThreadGroup(); 58 59 // Registers the thread group in TLS. 60 void BindToCurrentThread(); 61 62 // Resets the thread group in TLS. 63 void UnbindFromCurrentThread(); 64 65 // Returns true if the thread group is registered in TLS. 66 bool IsBoundToCurrentThread() const; 67 68 // Removes |task_source| from |priority_queue_|. Returns a 69 // RegisteredTaskSource that evaluats to true if successful, or false if 70 // |task_source| is not currently in |priority_queue_|, such as when a worker 71 // is running a task from it. 72 RegisteredTaskSource RemoveTaskSource(const TaskSource& task_source); 73 74 // Updates the position of the TaskSource in |transaction| in this 75 // ThreadGroup's PriorityQueue based on the TaskSource's current traits. 76 // 77 // Implementations should instantiate a concrete ScopedCommandsExecutor and 78 // invoke UpdateSortKeyImpl(). 79 virtual void UpdateSortKey(TaskSource::Transaction transaction) = 0; 80 81 // Pushes the TaskSource in |transaction_with_task_source| into this 82 // ThreadGroup's PriorityQueue and wakes up workers as appropriate. 83 // 84 // Implementations should instantiate a concrete ScopedCommandsExecutor and 85 // invoke PushTaskSourceAndWakeUpWorkersImpl(). 86 virtual void PushTaskSourceAndWakeUpWorkers( 87 TransactionWithRegisteredTaskSource transaction_with_task_source) = 0; 88 89 // Removes all task sources from this ThreadGroup's PriorityQueue and enqueues 90 // them in another |destination_thread_group|. After this method is called, 91 // any task sources posted to this ThreadGroup will be forwarded to 92 // |destination_thread_group|. 93 // 94 // TODO(crbug.com/756547): Remove this method once the UseNativeThreadPool 95 // experiment is complete. 96 void InvalidateAndHandoffAllTaskSourcesToOtherThreadGroup( 97 ThreadGroup* destination_thread_group); 98 99 // Move all task sources except the ones with TaskPriority::USER_BLOCKING, 100 // from this ThreadGroup's PriorityQueue to the |destination_thread_group|'s. 101 void HandoffNonUserBlockingTaskSourcesToOtherThreadGroup( 102 ThreadGroup* destination_thread_group); 103 104 // Returns true if a task with |sort_key| running in this thread group should 105 // return ASAP, either because its priority is not allowed to run or because 106 // work of higher priority is pending. Thread-safe but may return an outdated 107 // result (if a task unnecessarily yields due to this, it will simply be 108 // re-scheduled). 109 bool ShouldYield(TaskSourceSortKey sort_key); 110 111 // Prevents new tasks from starting to run and waits for currently running 112 // tasks to complete their execution. It is guaranteed that no thread will do 113 // work on behalf of this ThreadGroup after this returns. It is 114 // invalid to post a task once this is called. TaskTracker::Flush() can be 115 // called before this to complete existing tasks, which might otherwise post a 116 // task during JoinForTesting(). This can only be called once. 117 virtual void JoinForTesting() = 0; 118 119 // Returns the maximum number of non-blocked tasks that can run concurrently 120 // in this ThreadGroup. 121 // 122 // TODO(fdoray): Remove this method. https://crbug.com/687264 123 virtual size_t GetMaxConcurrentNonBlockedTasksDeprecated() const = 0; 124 125 // Wakes up workers as appropriate for the new CanRunPolicy policy. Must be 126 // called after an update to CanRunPolicy in TaskTracker. 127 virtual void DidUpdateCanRunPolicy() = 0; 128 129 virtual void OnShutdownStarted() = 0; 130 131 // Returns true if a thread group is registered in TLS. Used by diagnostic 132 // code to check whether it's inside a ThreadPool task. 133 static bool CurrentThreadHasGroup(); 134 135 protected: 136 // Derived classes must implement a ScopedCommandsExecutor that derives from 137 // this to perform operations at the end of a scope, when all locks have been 138 // released. 139 class BaseScopedCommandsExecutor { 140 public: 141 BaseScopedCommandsExecutor(const BaseScopedCommandsExecutor&) = delete; 142 BaseScopedCommandsExecutor& operator=(const BaseScopedCommandsExecutor&) = 143 delete; 144 145 void ScheduleReleaseTaskSource(RegisteredTaskSource task_source); 146 147 protected: 148 BaseScopedCommandsExecutor(); 149 ~BaseScopedCommandsExecutor(); 150 151 private: 152 std::vector<RegisteredTaskSource> task_sources_to_release_; 153 }; 154 155 // Allows a task source to be pushed to a ThreadGroup's PriorityQueue at the 156 // end of a scope, when all locks have been released. 157 class ScopedReenqueueExecutor { 158 public: 159 ScopedReenqueueExecutor(); 160 ScopedReenqueueExecutor(const ScopedReenqueueExecutor&) = delete; 161 ScopedReenqueueExecutor& operator=(const ScopedReenqueueExecutor&) = delete; 162 ~ScopedReenqueueExecutor(); 163 164 // A TransactionWithRegisteredTaskSource and the ThreadGroup in which it 165 // should be enqueued. 166 void SchedulePushTaskSourceAndWakeUpWorkers( 167 TransactionWithRegisteredTaskSource transaction_with_task_source, 168 ThreadGroup* destination_thread_group); 169 170 private: 171 // A TransactionWithRegisteredTaskSource and the thread group in which it 172 // should be enqueued. 173 absl::optional<TransactionWithRegisteredTaskSource> 174 transaction_with_task_source_; 175 raw_ptr<ThreadGroup> destination_thread_group_ = nullptr; 176 }; 177 178 // |predecessor_thread_group| is a ThreadGroup whose lock can be acquired 179 // before the constructed ThreadGroup's lock. This is necessary to move all 180 // task sources from |predecessor_thread_group| to the constructed ThreadGroup 181 // and support the UseNativeThreadPool experiment. 182 // 183 // TODO(crbug.com/756547): Remove |predecessor_thread_group| once the 184 // experiment is complete. 185 ThreadGroup(TrackedRef<TaskTracker> task_tracker, 186 TrackedRef<Delegate> delegate, 187 ThreadGroup* predecessor_thread_group = nullptr); 188 189 #if BUILDFLAG(IS_WIN) 190 static std::unique_ptr<win::ScopedWindowsThreadEnvironment> 191 GetScopedWindowsThreadEnvironment(WorkerEnvironment environment); 192 #endif 193 194 const TrackedRef<TaskTracker> task_tracker_; 195 const TrackedRef<Delegate> delegate_; 196 197 void Start(); 198 199 // Returns the number of workers required of workers to run all queued 200 // BEST_EFFORT task sources allowed to run by the current CanRunPolicy. 201 size_t GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const 202 EXCLUSIVE_LOCKS_REQUIRED(lock_); 203 204 // Returns the number of workers required to run all queued 205 // USER_VISIBLE/USER_BLOCKING task sources allowed to run by the current 206 // CanRunPolicy. 207 size_t GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() const 208 EXCLUSIVE_LOCKS_REQUIRED(lock_); 209 210 // Ensures that there are enough workers to run queued task sources. 211 // |executor| is forwarded from the one received in 212 // PushTaskSourceAndWakeUpWorkersImpl() 213 virtual void EnsureEnoughWorkersLockRequired( 214 BaseScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_) = 0; 215 216 // Reenqueues a |transaction_with_task_source| from which a Task just ran in 217 // the current ThreadGroup into the appropriate ThreadGroup. 218 void ReEnqueueTaskSourceLockRequired( 219 BaseScopedCommandsExecutor* workers_executor, 220 ScopedReenqueueExecutor* reenqueue_executor, 221 TransactionWithRegisteredTaskSource transaction_with_task_source) 222 EXCLUSIVE_LOCKS_REQUIRED(lock_); 223 224 // Returns the next task source from |priority_queue_| if permitted to run and 225 // pops |priority_queue_| if the task source returned no longer needs to be 226 // queued (reached its maximum concurrency). Otherwise returns nullptr and 227 // pops |priority_queue_| so this can be called again. 228 RegisteredTaskSource TakeRegisteredTaskSource( 229 BaseScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_); 230 231 // Must be invoked by implementations of the corresponding non-Impl() methods. 232 void UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor, 233 TaskSource::Transaction transaction); 234 void PushTaskSourceAndWakeUpWorkersImpl( 235 BaseScopedCommandsExecutor* executor, 236 TransactionWithRegisteredTaskSource transaction_with_task_source); 237 238 // Synchronizes accesses to all members of this class which are neither const, 239 // atomic, nor immutable after start. Since this lock is a bottleneck to post 240 // and schedule work, only simple data structure manipulations are allowed 241 // within its scope (no thread creation or wake up). 242 mutable CheckedLock lock_; 243 GUARDED_BY(lock_)244 bool disable_fair_scheduling_ GUARDED_BY(lock_){false}; 245 246 // PriorityQueue from which all threads of this ThreadGroup get work. 247 PriorityQueue priority_queue_ GUARDED_BY(lock_); 248 249 struct YieldSortKey { 250 TaskPriority priority; 251 uint8_t worker_count; 252 }; 253 // Sort key which compares greater than or equal to any other sort key. 254 static constexpr YieldSortKey kMaxYieldSortKey = {TaskPriority::BEST_EFFORT, 255 0U}; 256 257 // When the thread group is at or above capacity and has pending work, this is 258 // set to contain the priority and worker count of the next TaskSource to 259 // schedule, or kMaxYieldSortKey otherwise. This is used to decide whether a 260 // TaskSource should yield. Once ShouldYield() returns true, it is reset to 261 // kMaxYieldSortKey to prevent additional from unnecessary yielding. This is 262 // expected to be always kept up-to-date by derived classes when |lock_| is 263 // released. It is annotated as GUARDED_BY(lock_) because it is always updated 264 // under the lock (to avoid races with other state during the update) but it 265 // is nonetheless always safe to read it without the lock (since it's atomic). GUARDED_BY(lock_)266 std::atomic<YieldSortKey> max_allowed_sort_key_ GUARDED_BY(lock_){ 267 kMaxYieldSortKey}; 268 269 // If |replacement_thread_group_| is non-null, this ThreadGroup is invalid and 270 // all task sources should be scheduled on |replacement_thread_group_|. Used 271 // to support the UseNativeThreadPool experiment. 272 raw_ptr<ThreadGroup> replacement_thread_group_ = nullptr; 273 }; 274 275 } // namespace internal 276 } // namespace base 277 278 #endif // BASE_TASK_THREAD_POOL_THREAD_GROUP_H_ 279