1 // Copyright 2016 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_H_ 6 #define BASE_TASK_THREAD_POOL_THREAD_GROUP_H_ 7 8 #include <memory> 9 #include <vector> 10 11 #include "base/base_export.h" 12 #include "base/memory/raw_ptr.h" 13 #include "base/task/common/checked_lock.h" 14 #include "base/task/thread_pool/priority_queue.h" 15 #include "base/task/thread_pool/task.h" 16 #include "base/task/thread_pool/task_source.h" 17 #include "base/task/thread_pool/tracked_ref.h" 18 #include "build/build_config.h" 19 #include "third_party/abseil-cpp/absl/types/optional.h" 20 21 #if BUILDFLAG(IS_WIN) 22 #include "base/win/scoped_windows_thread_environment.h" 23 #endif 24 25 namespace base { 26 namespace internal { 27 28 class TaskTracker; 29 30 // Interface and base implementation for a thread group. A thread group is a 31 // subset of the threads in the thread pool (see GetThreadGroupForTraits() for 32 // thread group selection logic when posting tasks and creating task runners). 33 class BASE_EXPORT ThreadGroup { 34 public: 35 // Delegate interface for ThreadGroup. 36 class BASE_EXPORT Delegate { 37 public: 38 virtual ~Delegate() = default; 39 40 // Invoked when a TaskSource with |traits| is non-empty after the 41 // ThreadGroup has run a task from it. The implementation must return the 42 // thread group in which the TaskSource should be reenqueued. 43 virtual ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) = 0; 44 }; 45 46 enum class WorkerEnvironment { 47 // No special worker environment required. 48 NONE, 49 #if BUILDFLAG(IS_WIN) 50 // Initialize a COM MTA on the worker. 51 COM_MTA, 52 #endif // BUILDFLAG(IS_WIN) 53 }; 54 55 ThreadGroup(const ThreadGroup&) = delete; 56 ThreadGroup& operator=(const ThreadGroup&) = delete; 57 virtual ~ThreadGroup(); 58 59 // Registers the thread group in TLS. 60 void BindToCurrentThread(); 61 62 // Resets the thread group in TLS. 63 void UnbindFromCurrentThread(); 64 65 // Returns true if the thread group is registered in TLS. 66 bool IsBoundToCurrentThread() const; 67 68 // Removes |task_source| from |priority_queue_|. Returns a 69 // RegisteredTaskSource that evaluats to true if successful, or false if 70 // |task_source| is not currently in |priority_queue_|, such as when a worker 71 // is running a task from it. 72 RegisteredTaskSource RemoveTaskSource(const TaskSource& task_source); 73 74 // Updates the position of the TaskSource in |transaction| in this 75 // ThreadGroup's PriorityQueue based on the TaskSource's current traits. 76 // 77 // Implementations should instantiate a concrete ScopedCommandsExecutor and 78 // invoke UpdateSortKeyImpl(). 79 virtual void UpdateSortKey(TaskSource::Transaction transaction) = 0; 80 81 // Pushes the TaskSource in |transaction_with_task_source| into this 82 // ThreadGroup's PriorityQueue and wakes up workers as appropriate. 83 // 84 // Implementations should instantiate a concrete ScopedCommandsExecutor and 85 // invoke PushTaskSourceAndWakeUpWorkersImpl(). 86 virtual void PushTaskSourceAndWakeUpWorkers( 87 RegisteredTaskSourceAndTransaction transaction_with_task_source) = 0; 88 89 // Move all task sources except the ones with TaskPriority::USER_BLOCKING, 90 // from this ThreadGroup's PriorityQueue to the |destination_thread_group|'s. 91 void HandoffNonUserBlockingTaskSourcesToOtherThreadGroup( 92 ThreadGroup* destination_thread_group); 93 94 // Returns true if a task with |sort_key| running in this thread group should 95 // return ASAP, either because its priority is not allowed to run or because 96 // work of higher priority is pending. Thread-safe but may return an outdated 97 // result (if a task unnecessarily yields due to this, it will simply be 98 // re-scheduled). 99 bool ShouldYield(TaskSourceSortKey sort_key); 100 101 // Prevents new tasks from starting to run and waits for currently running 102 // tasks to complete their execution. It is guaranteed that no thread will do 103 // work on behalf of this ThreadGroup after this returns. It is 104 // invalid to post a task once this is called. TaskTracker::Flush() can be 105 // called before this to complete existing tasks, which might otherwise post a 106 // task during JoinForTesting(). This can only be called once. 107 virtual void JoinForTesting() = 0; 108 109 // Returns the maximum number of non-blocked tasks that can run concurrently 110 // in this ThreadGroup. 111 // 112 // TODO(fdoray): Remove this method. https://crbug.com/687264 113 virtual size_t GetMaxConcurrentNonBlockedTasksDeprecated() const = 0; 114 115 // Wakes up workers as appropriate for the new CanRunPolicy policy. Must be 116 // called after an update to CanRunPolicy in TaskTracker. 117 virtual void DidUpdateCanRunPolicy() = 0; 118 119 virtual void OnShutdownStarted() = 0; 120 121 // Returns true if a thread group is registered in TLS. Used by diagnostic 122 // code to check whether it's inside a ThreadPool task. 123 static bool CurrentThreadHasGroup(); 124 125 protected: 126 // Derived classes must implement a ScopedCommandsExecutor that derives from 127 // this to perform operations at the end of a scope, when all locks have been 128 // released. 129 class BaseScopedCommandsExecutor { 130 public: 131 BaseScopedCommandsExecutor(const BaseScopedCommandsExecutor&) = delete; 132 BaseScopedCommandsExecutor& operator=(const BaseScopedCommandsExecutor&) = 133 delete; 134 135 void ScheduleReleaseTaskSource(RegisteredTaskSource task_source); 136 137 protected: 138 BaseScopedCommandsExecutor(); 139 ~BaseScopedCommandsExecutor(); 140 141 private: 142 std::vector<RegisteredTaskSource> task_sources_to_release_; 143 }; 144 145 // Allows a task source to be pushed to a ThreadGroup's PriorityQueue at the 146 // end of a scope, when all locks have been released. 147 class ScopedReenqueueExecutor { 148 public: 149 ScopedReenqueueExecutor(); 150 ScopedReenqueueExecutor(const ScopedReenqueueExecutor&) = delete; 151 ScopedReenqueueExecutor& operator=(const ScopedReenqueueExecutor&) = delete; 152 ~ScopedReenqueueExecutor(); 153 154 // A RegisteredTaskSourceAndTransaction and the ThreadGroup in which it 155 // should be enqueued. 156 void SchedulePushTaskSourceAndWakeUpWorkers( 157 RegisteredTaskSourceAndTransaction transaction_with_task_source, 158 ThreadGroup* destination_thread_group); 159 160 private: 161 // A RegisteredTaskSourceAndTransaction and the thread group in which it 162 // should be enqueued. 163 absl::optional<RegisteredTaskSourceAndTransaction> 164 transaction_with_task_source_; 165 raw_ptr<ThreadGroup> destination_thread_group_ = nullptr; 166 }; 167 168 ThreadGroup(TrackedRef<TaskTracker> task_tracker, 169 TrackedRef<Delegate> delegate); 170 171 #if BUILDFLAG(IS_WIN) 172 static std::unique_ptr<win::ScopedWindowsThreadEnvironment> 173 GetScopedWindowsThreadEnvironment(WorkerEnvironment environment); 174 #endif 175 176 const TrackedRef<TaskTracker> task_tracker_; 177 const TrackedRef<Delegate> delegate_; 178 179 void Start(); 180 181 // Returns the number of workers required of workers to run all queued 182 // BEST_EFFORT task sources allowed to run by the current CanRunPolicy. 183 size_t GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const 184 EXCLUSIVE_LOCKS_REQUIRED(lock_); 185 186 // Returns the number of workers required to run all queued 187 // USER_VISIBLE/USER_BLOCKING task sources allowed to run by the current 188 // CanRunPolicy. 189 size_t GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() const 190 EXCLUSIVE_LOCKS_REQUIRED(lock_); 191 192 // Ensures that there are enough workers to run queued task sources. 193 // |executor| is forwarded from the one received in 194 // PushTaskSourceAndWakeUpWorkersImpl() 195 virtual void EnsureEnoughWorkersLockRequired( 196 BaseScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_) = 0; 197 198 // Reenqueues a |transaction_with_task_source| from which a Task just ran in 199 // the current ThreadGroup into the appropriate ThreadGroup. 200 void ReEnqueueTaskSourceLockRequired( 201 BaseScopedCommandsExecutor* workers_executor, 202 ScopedReenqueueExecutor* reenqueue_executor, 203 RegisteredTaskSourceAndTransaction transaction_with_task_source) 204 EXCLUSIVE_LOCKS_REQUIRED(lock_); 205 206 // Returns the next task source from |priority_queue_| if permitted to run and 207 // pops |priority_queue_| if the task source returned no longer needs to be 208 // queued (reached its maximum concurrency). Otherwise returns nullptr and 209 // pops |priority_queue_| so this can be called again. 210 RegisteredTaskSource TakeRegisteredTaskSource( 211 BaseScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_); 212 213 // Must be invoked by implementations of the corresponding non-Impl() methods. 214 void UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor, 215 TaskSource::Transaction transaction); 216 void PushTaskSourceAndWakeUpWorkersImpl( 217 BaseScopedCommandsExecutor* executor, 218 RegisteredTaskSourceAndTransaction transaction_with_task_source); 219 220 // Synchronizes accesses to all members of this class which are neither const, 221 // atomic, nor immutable after start. Since this lock is a bottleneck to post 222 // and schedule work, only simple data structure manipulations are allowed 223 // within its scope (no thread creation or wake up). 224 mutable CheckedLock lock_{}; 225 GUARDED_BY(lock_)226 bool disable_fair_scheduling_ GUARDED_BY(lock_){false}; 227 228 // PriorityQueue from which all threads of this ThreadGroup get work. 229 PriorityQueue priority_queue_ GUARDED_BY(lock_); 230 231 struct YieldSortKey { 232 TaskPriority priority; 233 uint8_t worker_count; 234 }; 235 // Sort key which compares greater than or equal to any other sort key. 236 static constexpr YieldSortKey kMaxYieldSortKey = {TaskPriority::BEST_EFFORT, 237 0U}; 238 239 // When the thread group is at or above capacity and has pending work, this is 240 // set to contain the priority and worker count of the next TaskSource to 241 // schedule, or kMaxYieldSortKey otherwise. This is used to decide whether a 242 // TaskSource should yield. Once ShouldYield() returns true, it is reset to 243 // kMaxYieldSortKey to prevent additional from unnecessary yielding. This is 244 // expected to be always kept up-to-date by derived classes when |lock_| is 245 // released. It is annotated as GUARDED_BY(lock_) because it is always updated 246 // under the lock (to avoid races with other state during the update) but it 247 // is nonetheless always safe to read it without the lock (since it's atomic). GUARDED_BY(lock_)248 std::atomic<YieldSortKey> max_allowed_sort_key_ GUARDED_BY(lock_){ 249 kMaxYieldSortKey}; 250 251 // If |replacement_thread_group_| is non-null, this ThreadGroup is invalid and 252 // all task sources should be scheduled on |replacement_thread_group_|. Used 253 // to support the UseNativeThreadPool experiment. 254 raw_ptr<ThreadGroup> replacement_thread_group_ = nullptr; 255 }; 256 257 } // namespace internal 258 } // namespace base 259 260 #endif // BASE_TASK_THREAD_POOL_THREAD_GROUP_H_ 261