1 // Copyright 2016 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_ 6 #define BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_ 7 8 #include <stddef.h> 9 10 #include <memory> 11 #include <string> 12 #include <vector> 13 14 #include "base/base_export.h" 15 #include "base/check.h" 16 #include "base/compiler_specific.h" 17 #include "base/dcheck_is_on.h" 18 #include "base/gtest_prod_util.h" 19 #include "base/memory/raw_ptr.h" 20 #include "base/strings/string_piece.h" 21 #include "base/synchronization/condition_variable.h" 22 #include "base/synchronization/waitable_event.h" 23 #include "base/task/sequenced_task_runner.h" 24 #include "base/task/task_features.h" 25 #include "base/task/thread_pool/task.h" 26 #include "base/task/thread_pool/task_source.h" 27 #include "base/task/thread_pool/thread_group.h" 28 #include "base/task/thread_pool/tracked_ref.h" 29 #include "base/task/thread_pool/worker_thread.h" 30 #include "base/task/thread_pool/worker_thread_set.h" 31 #include "base/time/time.h" 32 #include "third_party/abseil-cpp/absl/types/optional.h" 33 34 namespace base { 35 36 class WorkerThreadObserver; 37 38 namespace internal { 39 40 class TaskTracker; 41 42 // A group of workers that run Tasks. 43 // 44 // The thread group doesn't create threads until Start() is called. Tasks can be 45 // posted at any time but will not run until after Start() is called. 46 // 47 // This class is thread-safe. 48 class BASE_EXPORT ThreadGroupImpl : public ThreadGroup { 49 public: 50 // Constructs a group without workers. 51 // 52 // |histogram_label| is used to label the thread group's histograms as 53 // "ThreadPool." + histogram_name + "." + |histogram_label| + extra suffixes. 54 // It must not be empty. |thread group_label| is used to label the thread 55 // group's threads, it must not be empty. |thread_type_hint| is the preferred 56 // thread type; the actual thread type depends on shutdown state and platform 57 // capabilities. |task_tracker| keeps track of tasks. 58 ThreadGroupImpl(StringPiece histogram_label, 59 StringPiece thread_group_label, 60 ThreadType thread_type_hint, 61 TrackedRef<TaskTracker> task_tracker, 62 TrackedRef<Delegate> delegate); 63 64 // Creates threads, allowing existing and future tasks to run. The thread 65 // group runs at most |max_tasks| / `max_best_effort_tasks` unblocked task 66 // with any / BEST_EFFORT priority concurrently. It reclaims unused threads 67 // after `suggested_reclaim_time`. It uses `service_thread_task_runner` to 68 // monitor for blocked tasks, `service_thread_task_runner` is used to setup 69 // FileDescriptorWatcher on worker threads. It must refer to a Thread with 70 // MessagePumpType::IO. If specified, it notifies |worker_thread_observer| 71 // when a worker enters and exits its main function (the observer must not be 72 // destroyed before JoinForTesting() has returned). |worker_environment| 73 // specifies the environment in which tasks are executed. 74 // |may_block_threshold| is the timeout after which a task in a MAY_BLOCK 75 // ScopedBlockingCall is considered blocked (the thread group will choose an 76 // appropriate value if none is specified). 77 // `synchronous_thread_start_for_testing` is true if this ThreadGroupImpl 78 // should synchronously wait for OnMainEntry() after starting each worker. Can 79 // only be called once. CHECKs on failure. 80 void Start(size_t max_tasks, 81 size_t max_best_effort_tasks, 82 TimeDelta suggested_reclaim_time, 83 scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner, 84 WorkerThreadObserver* worker_thread_observer, 85 WorkerEnvironment worker_environment, 86 bool synchronous_thread_start_for_testing = false, 87 absl::optional<TimeDelta> may_block_threshold = 88 absl::optional<TimeDelta>()); 89 90 ThreadGroupImpl(const ThreadGroupImpl&) = delete; 91 ThreadGroupImpl& operator=(const ThreadGroupImpl&) = delete; 92 // Destroying a ThreadGroupImpl is not allowed in production; it is always 93 // leaked. In tests, it can only be destroyed after JoinForTesting() has 94 // returned. 95 ~ThreadGroupImpl() override; 96 97 // ThreadGroup: 98 void JoinForTesting() override; 99 size_t GetMaxConcurrentNonBlockedTasksDeprecated() const override; 100 void DidUpdateCanRunPolicy() override; 101 void OnShutdownStarted() override; 102 103 // Waits until at least |n| workers are idle. Note that while workers are 104 // disallowed from cleaning up during this call: tests using a custom 105 // |suggested_reclaim_time_| need to be careful to invoke this swiftly after 106 // unblocking the waited upon workers as: if a worker is already detached by 107 // the time this is invoked, it will never make it onto the idle set and 108 // this call will hang. 109 void WaitForWorkersIdleForTesting(size_t n); 110 111 // Waits until at least |n| workers are idle. 112 void WaitForWorkersIdleLockRequiredForTesting(size_t n) 113 EXCLUSIVE_LOCKS_REQUIRED(lock_); 114 115 // Waits until all workers are idle. 116 void WaitForAllWorkersIdleForTesting(); 117 118 // Waits until |n| workers have cleaned up (went through 119 // WorkerThreadDelegateImpl::OnMainExit()) since the last call to 120 // WaitForWorkersCleanedUpForTesting() (or Start() if that wasn't called yet). 121 void WaitForWorkersCleanedUpForTesting(size_t n); 122 123 // Returns the number of workers in this thread group. 124 size_t NumberOfWorkersForTesting() const; 125 126 // Returns |max_tasks_|/|max_best_effort_tasks_|. 127 size_t GetMaxTasksForTesting() const; 128 size_t GetMaxBestEffortTasksForTesting() const; 129 130 // Returns the number of workers that are idle (i.e. not running tasks). 131 size_t NumberOfIdleWorkersForTesting() const; 132 133 private: 134 class ScopedCommandsExecutor; 135 class WorkerThreadDelegateImpl; 136 137 // Friend tests so that they can access |blocked_workers_poll_period| and 138 // may_block_threshold(). 139 friend class ThreadGroupImplBlockingTest; 140 friend class ThreadGroupImplMayBlockTest; 141 FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest, 142 ThreadBlockUnblockPremature); 143 FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest, 144 ThreadBlockUnblockPrematureBestEffort); 145 146 // ThreadGroup: 147 void UpdateSortKey(TaskSource::Transaction transaction) override; 148 void PushTaskSourceAndWakeUpWorkers( 149 RegisteredTaskSourceAndTransaction transaction_with_task_source) override; 150 void EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor* executor) 151 override EXCLUSIVE_LOCKS_REQUIRED(lock_); 152 153 // Creates a worker and schedules its start, if needed, to maintain one idle 154 // worker, |max_tasks_| permitting. 155 void MaintainAtLeastOneIdleWorkerLockRequired( 156 ScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_); 157 158 // Creates a worker, adds it to the thread group, schedules its start and 159 // returns it. Cannot be called before Start(). 160 scoped_refptr<WorkerThreadWaitableEvent> CreateAndRegisterWorkerLockRequired( 161 ScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_); 162 163 // Returns the number of workers that are awake (i.e. not on the idle set). 164 size_t GetNumAwakeWorkersLockRequired() const EXCLUSIVE_LOCKS_REQUIRED(lock_); 165 166 // Returns the desired number of awake workers, given current workload and 167 // concurrency limits. 168 size_t GetDesiredNumAwakeWorkersLockRequired() const 169 EXCLUSIVE_LOCKS_REQUIRED(lock_); 170 171 // Examines the list of WorkerThreads and increments |max_tasks_| for each 172 // worker that has been within the scope of a MAY_BLOCK ScopedBlockingCall for 173 // more than BlockedThreshold(). Reschedules a call if necessary. 174 void AdjustMaxTasks(); 175 176 // Returns the threshold after which the max tasks is increased to compensate 177 // for a worker that is within a MAY_BLOCK ScopedBlockingCall. may_block_threshold_for_testing()178 TimeDelta may_block_threshold_for_testing() const { 179 return after_start().may_block_threshold; 180 } 181 182 // Interval at which the service thread checks for workers in this thread 183 // group that have been in a MAY_BLOCK ScopedBlockingCall for more than 184 // may_block_threshold(). blocked_workers_poll_period_for_testing()185 TimeDelta blocked_workers_poll_period_for_testing() const { 186 return after_start().blocked_workers_poll_period; 187 } 188 189 // Starts calling AdjustMaxTasks() periodically on 190 // |service_thread_task_runner_|. 191 void ScheduleAdjustMaxTasks(); 192 193 // Schedules AdjustMaxTasks() through |executor| if required. 194 void MaybeScheduleAdjustMaxTasksLockRequired(ScopedCommandsExecutor* executor) 195 EXCLUSIVE_LOCKS_REQUIRED(lock_); 196 197 // Returns true if AdjustMaxTasks() should periodically be called on 198 // |service_thread_task_runner_|. 199 bool ShouldPeriodicallyAdjustMaxTasksLockRequired() 200 EXCLUSIVE_LOCKS_REQUIRED(lock_); 201 202 // Updates the minimum priority allowed to run below which tasks should yield. 203 // This should be called whenever |num_running_tasks_| or |max_tasks| changes, 204 // or when a new task is added to |priority_queue_|. 205 void UpdateMinAllowedPriorityLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_); 206 207 bool IsOnIdleSetLockRequired(WorkerThreadWaitableEvent* worker) const 208 EXCLUSIVE_LOCKS_REQUIRED(lock_); 209 210 // Increments/decrements the number of tasks of |priority| that are currently 211 // running in this thread group. Must be invoked before/after running a task. 212 void DecrementTasksRunningLockRequired(TaskPriority priority) 213 EXCLUSIVE_LOCKS_REQUIRED(lock_); 214 void IncrementTasksRunningLockRequired(TaskPriority priority) 215 EXCLUSIVE_LOCKS_REQUIRED(lock_); 216 217 // Increments/decrements the number of [best effort] tasks that can run in 218 // this thread group. 219 void DecrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_); 220 void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_); 221 void DecrementMaxBestEffortTasksLockRequired() 222 EXCLUSIVE_LOCKS_REQUIRED(lock_); 223 void IncrementMaxBestEffortTasksLockRequired() 224 EXCLUSIVE_LOCKS_REQUIRED(lock_); 225 226 // Values set at Start() and never modified afterwards. 227 struct InitializedInStart { 228 InitializedInStart(); 229 ~InitializedInStart(); 230 231 #if DCHECK_IS_ON() 232 // Set after all members of this struct are set. 233 bool initialized = false; 234 #endif 235 236 // Initial value of |max_tasks_|. 237 size_t initial_max_tasks = 0; 238 239 // Suggested reclaim time for workers. 240 TimeDelta suggested_reclaim_time; 241 bool no_worker_reclaim = false; 242 243 // Environment to be initialized per worker. 244 WorkerEnvironment worker_environment = WorkerEnvironment::NONE; 245 246 scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner; 247 248 // Optional observer notified when a worker enters and exits its main. 249 raw_ptr<WorkerThreadObserver> worker_thread_observer = nullptr; 250 251 // Threshold after which the max tasks is increased to compensate for a 252 // worker that is within a MAY_BLOCK ScopedBlockingCall. 253 TimeDelta may_block_threshold; 254 255 // The period between calls to AdjustMaxTasks() when the thread group is at 256 // capacity. 257 TimeDelta blocked_workers_poll_period; 258 259 // Whether EnsureEnoughWorkersLockRequired() should be called at the end of 260 // GetWork() instead of at the beginning. 261 bool ensure_enough_workers_at_end_of_get_work = false; 262 263 } initialized_in_start_; 264 in_start()265 InitializedInStart& in_start() { 266 #if DCHECK_IS_ON() 267 DCHECK(!initialized_in_start_.initialized); 268 #endif 269 return initialized_in_start_; 270 } after_start()271 const InitializedInStart& after_start() const { 272 #if DCHECK_IS_ON() 273 DCHECK(initialized_in_start_.initialized); 274 #endif 275 return initialized_in_start_; 276 } 277 278 const std::string histogram_label_; 279 const std::string thread_group_label_; 280 const ThreadType thread_type_hint_; 281 282 // All workers owned by this thread group. 283 std::vector<scoped_refptr<WorkerThreadWaitableEvent>> workers_ 284 GUARDED_BY(lock_); 285 size_t worker_sequence_num_ GUARDED_BY(lock_) = 0; 286 287 bool shutdown_started_ GUARDED_BY(lock_) = false; 288 289 // Maximum number of tasks of any priority / BEST_EFFORT priority that can run 290 // concurrently in this thread group. 291 size_t max_tasks_ GUARDED_BY(lock_) = 0; 292 size_t max_best_effort_tasks_ GUARDED_BY(lock_) = 0; 293 294 // Number of tasks of any priority / BEST_EFFORT priority that are currently 295 // running in this thread group. 296 size_t num_running_tasks_ GUARDED_BY(lock_) = 0; 297 size_t num_running_best_effort_tasks_ GUARDED_BY(lock_) = 0; 298 299 // Number of workers running a task of any priority / BEST_EFFORT priority 300 // that are within the scope of a MAY_BLOCK ScopedBlockingCall but haven't 301 // caused a max tasks increase yet. 302 int num_unresolved_may_block_ GUARDED_BY(lock_) = 0; 303 int num_unresolved_best_effort_may_block_ GUARDED_BY(lock_) = 0; 304 305 // Ordered set of idle workers; the order uses pointer comparison, this is 306 // arbitrary but stable. Initially, all workers are on this set. A worker is 307 // removed from the set before its WakeUp() function is called and when it 308 // receives work from GetWork() (a worker calls GetWork() when its sleep 309 // timeout expires, even if its WakeUp() method hasn't been called). A worker 310 // is inserted on this set when it receives nullptr from GetWork(). 311 WorkerThreadSet idle_workers_set_ GUARDED_BY(lock_); 312 313 // Signaled when a worker is added to the idle workers set. 314 std::unique_ptr<ConditionVariable> idle_workers_set_cv_for_testing_ 315 GUARDED_BY(lock_); 316 317 // Whether an AdjustMaxTasks() task was posted to the service thread. 318 bool adjust_max_tasks_posted_ GUARDED_BY(lock_) = false; 319 320 // Indicates to the delegates that workers are not permitted to cleanup. 321 bool worker_cleanup_disallowed_for_testing_ GUARDED_BY(lock_) = false; 322 323 // Counts the number of workers cleaned up (went through 324 // WorkerThreadDelegateImpl::OnMainExit()) since the last call to 325 // WaitForWorkersCleanedUpForTesting() (or Start() if that wasn't called yet). 326 // |some_workers_cleaned_up_for_testing_| is true if this was ever 327 // incremented. Tests with a custom |suggested_reclaim_time_| can wait on a 328 // specific number of workers being cleaned up via 329 // WaitForWorkersCleanedUpForTesting(). 330 size_t num_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = 0; 331 #if DCHECK_IS_ON() 332 bool some_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = false; 333 #endif 334 335 // Signaled, if non-null, when |num_workers_cleaned_up_for_testing_| is 336 // incremented. 337 std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_ 338 GUARDED_BY(lock_); 339 340 // Set at the start of JoinForTesting(). 341 bool join_for_testing_started_ GUARDED_BY(lock_) = false; 342 343 // Null-opt unless |synchronous_thread_start_for_testing| was true at 344 // construction. In that case, it's signaled each time 345 // WorkerThreadDelegateImpl::OnMainEntry() completes. 346 absl::optional<WaitableEvent> worker_started_for_testing_; 347 348 // Ensures recently cleaned up workers (ref. 349 // WorkerThreadDelegateImpl::CleanupLockRequired()) had time to exit as 350 // they have a raw reference to |this| (and to TaskTracker) which can 351 // otherwise result in racy use-after-frees per no longer being part of 352 // |workers_| and hence not being explicitly joined in JoinForTesting(): 353 // https://crbug.com/810464. Uses AtomicRefCount to make its only public 354 // method thread-safe. 355 TrackedRefFactory<ThreadGroupImpl> tracked_ref_factory_; 356 }; 357 358 } // namespace internal 359 } // namespace base 360 361 #endif // BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_ 362