1 // Copyright 2016 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_ 6 #define BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_ 7 8 #include <stddef.h> 9 10 #include <memory> 11 #include <string> 12 #include <vector> 13 14 #include "base/base_export.h" 15 #include "base/check.h" 16 #include "base/compiler_specific.h" 17 #include "base/dcheck_is_on.h" 18 #include "base/gtest_prod_util.h" 19 #include "base/memory/raw_ptr.h" 20 #include "base/strings/string_piece.h" 21 #include "base/synchronization/condition_variable.h" 22 #include "base/synchronization/waitable_event.h" 23 #include "base/task/sequenced_task_runner.h" 24 #include "base/task/task_features.h" 25 #include "base/task/thread_pool/task.h" 26 #include "base/task/thread_pool/task_source.h" 27 #include "base/task/thread_pool/thread_group.h" 28 #include "base/task/thread_pool/tracked_ref.h" 29 #include "base/task/thread_pool/worker_thread.h" 30 #include "base/task/thread_pool/worker_thread_set.h" 31 #include "base/time/time.h" 32 #include "third_party/abseil-cpp/absl/types/optional.h" 33 34 namespace base { 35 36 class WorkerThreadObserver; 37 38 namespace internal { 39 40 class TaskTracker; 41 42 // A group of workers that run Tasks. 43 // 44 // The thread group doesn't create threads until Start() is called. Tasks can be 45 // posted at any time but will not run until after Start() is called. 46 // 47 // This class is thread-safe. 48 class BASE_EXPORT ThreadGroupImpl : public ThreadGroup { 49 public: 50 // Constructs a group without workers. 51 // 52 // |histogram_label| is used to label the thread group's histograms as 53 // "ThreadPool." + histogram_name + "." + |histogram_label| + extra suffixes. 54 // It must not be empty. |thread group_label| is used to label the thread 55 // group's threads, it must not be empty. |thread_type_hint| is the preferred 56 // thread type; the actual thread type depends on shutdown state and platform 57 // capabilities. |task_tracker| keeps track of tasks. 58 ThreadGroupImpl(StringPiece histogram_label, 59 StringPiece thread_group_label, 60 ThreadType thread_type_hint, 61 TrackedRef<TaskTracker> task_tracker, 62 TrackedRef<Delegate> delegate, 63 ThreadGroup* predecessor_thread_group = nullptr); 64 65 // Creates threads, allowing existing and future tasks to run. The thread 66 // group runs at most |max_tasks| / `max_best_effort_tasks` unblocked task 67 // with any / BEST_EFFORT priority concurrently. It reclaims unused threads 68 // after `suggested_reclaim_time`. It uses `service_thread_task_runner` to 69 // monitor for blocked tasks, `service_thread_task_runner` is used to setup 70 // FileDescriptorWatcher on worker threads. It must refer to a Thread with 71 // MessagePumpType::IO. If specified, it notifies |worker_thread_observer| 72 // when a worker enters and exits its main function (the observer must not be 73 // destroyed before JoinForTesting() has returned). |worker_environment| 74 // specifies the environment in which tasks are executed. 75 // |may_block_threshold| is the timeout after which a task in a MAY_BLOCK 76 // ScopedBlockingCall is considered blocked (the thread group will choose an 77 // appropriate value if none is specified). 78 // `synchronous_thread_start_for_testing` is true if this ThreadGroupImpl 79 // should synchronously wait for OnMainEntry() after starting each worker. Can 80 // only be called once. CHECKs on failure. 81 void Start(size_t max_tasks, 82 size_t max_best_effort_tasks, 83 TimeDelta suggested_reclaim_time, 84 scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner, 85 WorkerThreadObserver* worker_thread_observer, 86 WorkerEnvironment worker_environment, 87 bool synchronous_thread_start_for_testing = false, 88 absl::optional<TimeDelta> may_block_threshold = 89 absl::optional<TimeDelta>()); 90 91 ThreadGroupImpl(const ThreadGroupImpl&) = delete; 92 ThreadGroupImpl& operator=(const ThreadGroupImpl&) = delete; 93 // Destroying a ThreadGroupImpl returned by Create() is not allowed in 94 // production; it is always leaked. In tests, it can only be destroyed after 95 // JoinForTesting() has returned. 96 ~ThreadGroupImpl() override; 97 98 // ThreadGroup: 99 void JoinForTesting() override; 100 size_t GetMaxConcurrentNonBlockedTasksDeprecated() const override; 101 void DidUpdateCanRunPolicy() override; 102 void OnShutdownStarted() override; 103 104 // Waits until at least |n| workers are idle. Note that while workers are 105 // disallowed from cleaning up during this call: tests using a custom 106 // |suggested_reclaim_time_| need to be careful to invoke this swiftly after 107 // unblocking the waited upon workers as: if a worker is already detached by 108 // the time this is invoked, it will never make it onto the idle set and 109 // this call will hang. 110 void WaitForWorkersIdleForTesting(size_t n); 111 112 // Waits until at least |n| workers are idle. 113 void WaitForWorkersIdleLockRequiredForTesting(size_t n) 114 EXCLUSIVE_LOCKS_REQUIRED(lock_); 115 116 // Waits until all workers are idle. 117 void WaitForAllWorkersIdleForTesting(); 118 119 // Waits until |n| workers have cleaned up (went through 120 // WorkerThreadDelegateImpl::OnMainExit()) since the last call to 121 // WaitForWorkersCleanedUpForTesting() (or Start() if that wasn't called yet). 122 void WaitForWorkersCleanedUpForTesting(size_t n); 123 124 // Returns the number of workers in this thread group. 125 size_t NumberOfWorkersForTesting() const; 126 127 // Returns |max_tasks_|/|max_best_effort_tasks_|. 128 size_t GetMaxTasksForTesting() const; 129 size_t GetMaxBestEffortTasksForTesting() const; 130 131 // Returns the number of workers that are idle (i.e. not running tasks). 132 size_t NumberOfIdleWorkersForTesting() const; 133 134 private: 135 class ScopedCommandsExecutor; 136 class WorkerThreadDelegateImpl; 137 138 // Friend tests so that they can access |blocked_workers_poll_period| and 139 // may_block_threshold(). 140 friend class ThreadGroupImplBlockingTest; 141 friend class ThreadGroupImplMayBlockTest; 142 FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest, 143 ThreadBlockUnblockPremature); 144 FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest, 145 ThreadBlockUnblockPrematureBestEffort); 146 147 // ThreadGroup: 148 void UpdateSortKey(TaskSource::Transaction transaction) override; 149 void PushTaskSourceAndWakeUpWorkers( 150 TransactionWithRegisteredTaskSource transaction_with_task_source) 151 override; 152 void EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor* executor) 153 override EXCLUSIVE_LOCKS_REQUIRED(lock_); 154 155 // Creates a worker and schedules its start, if needed, to maintain one idle 156 // worker, |max_tasks_| permitting. 157 void MaintainAtLeastOneIdleWorkerLockRequired( 158 ScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_); 159 160 // Returns true if worker cleanup is permitted. 161 bool CanWorkerCleanupForTestingLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_); 162 163 // Creates a worker, adds it to the thread group, schedules its start and 164 // returns it. Cannot be called before Start(). 165 scoped_refptr<WorkerThread> CreateAndRegisterWorkerLockRequired( 166 ScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_); 167 168 // Returns the number of workers that are awake (i.e. not on the idle set). 169 size_t GetNumAwakeWorkersLockRequired() const EXCLUSIVE_LOCKS_REQUIRED(lock_); 170 171 // Returns the desired number of awake workers, given current workload and 172 // concurrency limits. 173 size_t GetDesiredNumAwakeWorkersLockRequired() const 174 EXCLUSIVE_LOCKS_REQUIRED(lock_); 175 176 // Examines the list of WorkerThreads and increments |max_tasks_| for each 177 // worker that has been within the scope of a MAY_BLOCK ScopedBlockingCall for 178 // more than BlockedThreshold(). Reschedules a call if necessary. 179 void AdjustMaxTasks(); 180 181 // Returns the threshold after which the max tasks is increased to compensate 182 // for a worker that is within a MAY_BLOCK ScopedBlockingCall. may_block_threshold_for_testing()183 TimeDelta may_block_threshold_for_testing() const { 184 return after_start().may_block_threshold; 185 } 186 187 // Interval at which the service thread checks for workers in this thread 188 // group that have been in a MAY_BLOCK ScopedBlockingCall for more than 189 // may_block_threshold(). blocked_workers_poll_period_for_testing()190 TimeDelta blocked_workers_poll_period_for_testing() const { 191 return after_start().blocked_workers_poll_period; 192 } 193 194 // Starts calling AdjustMaxTasks() periodically on 195 // |service_thread_task_runner_|. 196 void ScheduleAdjustMaxTasks(); 197 198 // Schedules AdjustMaxTasks() through |executor| if required. 199 void MaybeScheduleAdjustMaxTasksLockRequired(ScopedCommandsExecutor* executor) 200 EXCLUSIVE_LOCKS_REQUIRED(lock_); 201 202 // Returns true if AdjustMaxTasks() should periodically be called on 203 // |service_thread_task_runner_|. 204 bool ShouldPeriodicallyAdjustMaxTasksLockRequired() 205 EXCLUSIVE_LOCKS_REQUIRED(lock_); 206 207 // Updates the minimum priority allowed to run below which tasks should yield. 208 // This should be called whenever |num_running_tasks_| or |max_tasks| changes, 209 // or when a new task is added to |priority_queue_|. 210 void UpdateMinAllowedPriorityLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_); 211 212 bool IsOnIdleSetLockRequired(WorkerThread* worker) const 213 EXCLUSIVE_LOCKS_REQUIRED(lock_); 214 215 // Increments/decrements the number of tasks of |priority| that are currently 216 // running in this thread group. Must be invoked before/after running a task. 217 void DecrementTasksRunningLockRequired(TaskPriority priority) 218 EXCLUSIVE_LOCKS_REQUIRED(lock_); 219 void IncrementTasksRunningLockRequired(TaskPriority priority) 220 EXCLUSIVE_LOCKS_REQUIRED(lock_); 221 222 // Increments/decrements the number of [best effort] tasks that can run in 223 // this thread group. 224 void DecrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_); 225 void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_); 226 void DecrementMaxBestEffortTasksLockRequired() 227 EXCLUSIVE_LOCKS_REQUIRED(lock_); 228 void IncrementMaxBestEffortTasksLockRequired() 229 EXCLUSIVE_LOCKS_REQUIRED(lock_); 230 231 // Values set at Start() and never modified afterwards. 232 struct InitializedInStart { 233 InitializedInStart(); 234 ~InitializedInStart(); 235 236 #if DCHECK_IS_ON() 237 // Set after all members of this struct are set. 238 bool initialized = false; 239 #endif 240 241 // Initial value of |max_tasks_|. 242 size_t initial_max_tasks = 0; 243 244 // Suggested reclaim time for workers. 245 TimeDelta suggested_reclaim_time; 246 bool no_worker_reclaim = false; 247 248 // Environment to be initialized per worker. 249 WorkerEnvironment worker_environment = WorkerEnvironment::NONE; 250 251 scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner; 252 253 // Optional observer notified when a worker enters and exits its main. 254 raw_ptr<WorkerThreadObserver> worker_thread_observer = nullptr; 255 256 // Threshold after which the max tasks is increased to compensate for a 257 // worker that is within a MAY_BLOCK ScopedBlockingCall. 258 TimeDelta may_block_threshold; 259 260 // The period between calls to AdjustMaxTasks() when the thread group is at 261 // capacity. 262 TimeDelta blocked_workers_poll_period; 263 } initialized_in_start_; 264 in_start()265 InitializedInStart& in_start() { 266 #if DCHECK_IS_ON() 267 DCHECK(!initialized_in_start_.initialized); 268 #endif 269 return initialized_in_start_; 270 } after_start()271 const InitializedInStart& after_start() const { 272 #if DCHECK_IS_ON() 273 DCHECK(initialized_in_start_.initialized); 274 #endif 275 return initialized_in_start_; 276 } 277 278 const std::string histogram_label_; 279 const std::string thread_group_label_; 280 const ThreadType thread_type_hint_; 281 282 // All workers owned by this thread group. 283 std::vector<scoped_refptr<WorkerThread>> workers_ GUARDED_BY(lock_); 284 size_t worker_sequence_num_ GUARDED_BY(lock_) = 0; 285 286 bool shutdown_started_ GUARDED_BY(lock_) = false; 287 288 // Maximum number of tasks of any priority / BEST_EFFORT priority that can run 289 // concurrently in this thread group. 290 size_t max_tasks_ GUARDED_BY(lock_) = 0; 291 size_t max_best_effort_tasks_ GUARDED_BY(lock_) = 0; 292 293 // Number of tasks of any priority / BEST_EFFORT priority that are currently 294 // running in this thread group. 295 size_t num_running_tasks_ GUARDED_BY(lock_) = 0; 296 size_t num_running_best_effort_tasks_ GUARDED_BY(lock_) = 0; 297 298 // Number of workers running a task of any priority / BEST_EFFORT priority 299 // that are within the scope of a MAY_BLOCK ScopedBlockingCall but haven't 300 // caused a max tasks increase yet. 301 int num_unresolved_may_block_ GUARDED_BY(lock_) = 0; 302 int num_unresolved_best_effort_may_block_ GUARDED_BY(lock_) = 0; 303 304 // Ordered set of idle workers; the order uses pointer comparison, this is 305 // arbitrary but stable. Initially, all workers are on this set. A worker is 306 // removed from the set before its WakeUp() function is called and when it 307 // receives work from GetWork() (a worker calls GetWork() when its sleep 308 // timeout expires, even if its WakeUp() method hasn't been called). A worker 309 // is inserted on this set when it receives nullptr from GetWork(). 310 WorkerThreadSet idle_workers_set_ GUARDED_BY(lock_); 311 312 // Signaled when a worker is added to the idle workers set. 313 std::unique_ptr<ConditionVariable> idle_workers_set_cv_for_testing_ 314 GUARDED_BY(lock_); 315 316 // Whether an AdjustMaxTasks() task was posted to the service thread. 317 bool adjust_max_tasks_posted_ GUARDED_BY(lock_) = false; 318 319 // Indicates to the delegates that workers are not permitted to cleanup. 320 bool worker_cleanup_disallowed_for_testing_ GUARDED_BY(lock_) = false; 321 322 // Counts the number of workers cleaned up (went through 323 // WorkerThreadDelegateImpl::OnMainExit()) since the last call to 324 // WaitForWorkersCleanedUpForTesting() (or Start() if that wasn't called yet). 325 // |some_workers_cleaned_up_for_testing_| is true if this was ever 326 // incremented. Tests with a custom |suggested_reclaim_time_| can wait on a 327 // specific number of workers being cleaned up via 328 // WaitForWorkersCleanedUpForTesting(). 329 size_t num_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = 0; 330 #if DCHECK_IS_ON() 331 bool some_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = false; 332 #endif 333 334 // Signaled, if non-null, when |num_workers_cleaned_up_for_testing_| is 335 // incremented. 336 std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_ 337 GUARDED_BY(lock_); 338 339 // Set at the start of JoinForTesting(). 340 bool join_for_testing_started_ GUARDED_BY(lock_) = false; 341 342 // Null-opt unless |synchronous_thread_start_for_testing| was true at 343 // construction. In that case, it's signaled each time 344 // WorkerThreadDelegateImpl::OnMainEntry() completes. 345 absl::optional<WaitableEvent> worker_started_for_testing_; 346 347 // Ensures recently cleaned up workers (ref. 348 // WorkerThreadDelegateImpl::CleanupLockRequired()) had time to exit as 349 // they have a raw reference to |this| (and to TaskTracker) which can 350 // otherwise result in racy use-after-frees per no longer being part of 351 // |workers_| and hence not being explicitly joined in JoinForTesting(): 352 // https://crbug.com/810464. Uses AtomicRefCount to make its only public 353 // method thread-safe. 354 TrackedRefFactory<ThreadGroupImpl> tracked_ref_factory_; 355 }; 356 357 } // namespace internal 358 } // namespace base 359 360 #endif // BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_ 361