1 // Copyright 2016 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_ 6 #define BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_ 7 8 #include <stddef.h> 9 10 #include <memory> 11 #include <string> 12 #include <vector> 13 14 #include "base/base_export.h" 15 #include "base/containers/stack.h" 16 #include "base/logging.h" 17 #include "base/macros.h" 18 #include "base/memory/ref_counted.h" 19 #include "base/strings/string_piece.h" 20 #include "base/synchronization/atomic_flag.h" 21 #include "base/synchronization/condition_variable.h" 22 #include "base/synchronization/waitable_event.h" 23 #include "base/task_runner.h" 24 #include "base/task_scheduler/priority_queue.h" 25 #include "base/task_scheduler/scheduler_lock.h" 26 #include "base/task_scheduler/scheduler_worker.h" 27 #include "base/task_scheduler/scheduler_worker_pool.h" 28 #include "base/task_scheduler/scheduler_worker_stack.h" 29 #include "base/task_scheduler/sequence.h" 30 #include "base/task_scheduler/task.h" 31 #include "base/task_scheduler/tracked_ref.h" 32 #include "base/time/time.h" 33 #include "build/build_config.h" 34 35 namespace base { 36 37 class HistogramBase; 38 class SchedulerWorkerObserver; 39 class SchedulerWorkerPoolParams; 40 41 namespace internal { 42 43 class DelayedTaskManager; 44 class TaskTracker; 45 46 // A pool of workers that run Tasks. 47 // 48 // The pool doesn't create threads until Start() is called. Tasks can be posted 49 // at any time but will not run until after Start() is called. 50 // 51 // This class is thread-safe. 52 class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool { 53 public: 54 enum class WorkerEnvironment { 55 // No special worker environment required. 56 NONE, 57 #if defined(OS_WIN) 58 // Initialize a COM MTA on the worker. 59 COM_MTA, 60 #endif // defined(OS_WIN) 61 }; 62 63 // Constructs a pool without workers. 64 // 65 // |histogram_label| is used to label the pool's histograms ("TaskScheduler." 66 // + histogram_name + "." + |histogram_label| + extra suffixes), it must not 67 // be empty. |pool_label| is used to label the pool's threads, it must not be 68 // empty. |priority_hint| is the preferred thread priority; the actual thread 69 // priority depends on shutdown state and platform capabilities. 70 // |task_tracker| keeps track of tasks. |delayed_task_manager| handles tasks 71 // posted with a delay. 72 SchedulerWorkerPoolImpl(StringPiece histogram_label, 73 StringPiece pool_label, 74 ThreadPriority priority_hint, 75 TrackedRef<TaskTracker> task_tracker, 76 DelayedTaskManager* delayed_task_manager); 77 78 // Creates workers following the |params| specification, allowing existing and 79 // future tasks to run. The pool will run at most |max_background_tasks| 80 // unblocked TaskPriority::BACKGROUND tasks concurrently. Uses 81 // |service_thread_task_runner| to monitor for blocked threads in the pool. If 82 // specified, |scheduler_worker_observer| will be notified when a worker 83 // enters and exits its main function. It must not be destroyed before 84 // JoinForTesting() has returned (must never be destroyed in production). 85 // |worker_environment| specifies any requested environment to execute the 86 // tasks. Can only be called once. CHECKs on failure. 87 void Start(const SchedulerWorkerPoolParams& params, 88 int max_background_tasks, 89 scoped_refptr<TaskRunner> service_thread_task_runner, 90 SchedulerWorkerObserver* scheduler_worker_observer, 91 WorkerEnvironment worker_environment); 92 93 // Destroying a SchedulerWorkerPoolImpl returned by Create() is not allowed in 94 // production; it is always leaked. In tests, it can only be destroyed after 95 // JoinForTesting() has returned. 96 ~SchedulerWorkerPoolImpl() override; 97 98 // SchedulerWorkerPool: 99 void JoinForTesting() override; 100 num_tasks_before_detach_histogram()101 const HistogramBase* num_tasks_before_detach_histogram() const { 102 return num_tasks_before_detach_histogram_; 103 } 104 num_tasks_between_waits_histogram()105 const HistogramBase* num_tasks_between_waits_histogram() const { 106 return num_tasks_between_waits_histogram_; 107 } 108 109 void GetHistograms(std::vector<const HistogramBase*>* histograms) const; 110 111 // Returns the maximum number of non-blocked tasks that can run concurrently 112 // in this pool. 113 // 114 // TODO(fdoray): Remove this method. https://crbug.com/687264 115 int GetMaxConcurrentNonBlockedTasksDeprecated() const; 116 117 // Waits until at least |n| workers are idle. Note that while workers are 118 // disallowed from cleaning up during this call: tests using a custom 119 // |suggested_reclaim_time_| need to be careful to invoke this swiftly after 120 // unblocking the waited upon workers as: if a worker is already detached by 121 // the time this is invoked, it will never make it onto the idle stack and 122 // this call will hang. 123 void WaitForWorkersIdleForTesting(size_t n); 124 125 // Waits until all workers are idle. 126 void WaitForAllWorkersIdleForTesting(); 127 128 // Waits until |n| workers have cleaned up (since the last call to 129 // WaitForWorkersCleanedUpForTesting() or Start() if it wasn't called yet). 130 void WaitForWorkersCleanedUpForTesting(size_t n); 131 132 // Returns the number of workers in this worker pool. 133 size_t NumberOfWorkersForTesting() const; 134 135 // Returns |max_tasks_|. 136 size_t GetMaxTasksForTesting() const; 137 138 // Returns the number of workers that are idle (i.e. not running tasks). 139 size_t NumberOfIdleWorkersForTesting() const; 140 141 // Sets the MayBlock waiting threshold to TimeDelta::Max(). 142 void MaximizeMayBlockThresholdForTesting(); 143 144 private: 145 class SchedulerWorkerDelegateImpl; 146 147 // Friend tests so that they can access |kBlockedWorkersPollPeriod| and 148 // BlockedThreshold(). 149 friend class TaskSchedulerWorkerPoolBlockingTest; 150 friend class TaskSchedulerWorkerPoolMayBlockTest; 151 152 // The period between calls to AdjustMaxTasks() when the pool is at capacity. 153 // This value was set unscientifically based on intuition and may be adjusted 154 // in the future. 155 static constexpr TimeDelta kBlockedWorkersPollPeriod = 156 TimeDelta::FromMilliseconds(50); 157 158 // SchedulerWorkerPool: 159 void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override; 160 161 // Waits until at least |n| workers are idle. |lock_| must be held to call 162 // this function. 163 void WaitForWorkersIdleLockRequiredForTesting(size_t n); 164 165 // Wakes up the last worker from this worker pool to go idle, if any. 166 void WakeUpOneWorker(); 167 168 // Performs the same action as WakeUpOneWorker() except asserts |lock_| is 169 // acquired rather than acquires it and returns true if worker wakeups are 170 // permitted. 171 bool WakeUpOneWorkerLockRequired(); 172 173 // Adds a worker, if needed, to maintain one idle worker, |max_tasks_| 174 // permitting. 175 void MaintainAtLeastOneIdleWorkerLockRequired(); 176 177 // Adds |worker| to |idle_workers_stack_|. 178 void AddToIdleWorkersStackLockRequired(SchedulerWorker* worker); 179 180 // Removes |worker| from |idle_workers_stack_|. 181 void RemoveFromIdleWorkersStackLockRequired(SchedulerWorker* worker); 182 183 // Returns true if worker cleanup is permitted. 184 bool CanWorkerCleanupForTestingLockRequired(); 185 186 // Tries to add a new SchedulerWorker to the pool. Returns the new 187 // SchedulerWorker on success, nullptr otherwise. Cannot be called before 188 // Start(). Must be called under the protection of |lock_|. 189 SchedulerWorker* CreateRegisterAndStartSchedulerWorkerLockRequired(); 190 191 // Returns the number of workers in the pool that should not run tasks due to 192 // the pool being over capacity. 193 size_t NumberOfExcessWorkersLockRequired() const; 194 195 // Examines the list of SchedulerWorkers and increments |max_tasks_| for each 196 // worker that has been within the scope of a MAY_BLOCK ScopedBlockingCall for 197 // more than BlockedThreshold(). 198 void AdjustMaxTasks(); 199 200 // Returns the threshold after which the max tasks is increased to compensate 201 // for a worker that is within a MAY_BLOCK ScopedBlockingCall. 202 TimeDelta MayBlockThreshold() const; 203 204 // Starts calling AdjustMaxTasks() periodically on 205 // |service_thread_task_runner_| if not already requested. 206 void ScheduleAdjustMaxTasksIfNeeded(); 207 208 // Calls AdjustMaxTasks() and schedules it again as necessary. May only be 209 // called from the service thread. 210 void AdjustMaxTasksFunction(); 211 212 // Returns true if AdjustMaxTasks() should periodically be called on 213 // |service_thread_task_runner_|. 214 bool ShouldPeriodicallyAdjustMaxTasksLockRequired(); 215 216 // Increments/decrements the number of tasks that can run in this pool. 217 // |is_running_background_task| indicates whether the worker causing the 218 // change is currently running a TaskPriority::BACKGROUND task. 219 void DecrementMaxTasksLockRequired(bool is_running_background_task); 220 void IncrementMaxTasksLockRequired(bool is_running_background_task); 221 222 const std::string pool_label_; 223 const ThreadPriority priority_hint_; 224 225 // PriorityQueue from which all threads of this worker pool get work. 226 PriorityQueue shared_priority_queue_; 227 228 // Suggested reclaim time for workers. Initialized by Start(). Never modified 229 // afterwards (i.e. can be read without synchronization after Start()). 230 TimeDelta suggested_reclaim_time_; 231 232 SchedulerBackwardCompatibility backward_compatibility_; 233 234 // Synchronizes accesses to |workers_|, |max_tasks_|, |max_background_tasks_|, 235 // |num_running_background_tasks_|, |num_pending_may_block_workers_|, 236 // |idle_workers_stack_|, |idle_workers_stack_cv_for_testing_|, 237 // |num_wake_ups_before_start_|, |cleanup_timestamps_|, |polling_max_tasks_|, 238 // |worker_cleanup_disallowed_for_testing_|, 239 // |num_workers_cleaned_up_for_testing_|, 240 // |SchedulerWorkerDelegateImpl::is_on_idle_workers_stack_|, 241 // |SchedulerWorkerDelegateImpl::incremented_max_tasks_since_blocked_| and 242 // |SchedulerWorkerDelegateImpl::may_block_start_time_|. Has 243 // |shared_priority_queue_|'s lock as its predecessor so that a worker can be 244 // pushed to |idle_workers_stack_| within the scope of a Transaction (more 245 // details in GetWork()). 246 mutable SchedulerLock lock_; 247 248 // All workers owned by this worker pool. 249 std::vector<scoped_refptr<SchedulerWorker>> workers_; 250 251 // The maximum number of tasks that can run concurrently in this pool. Workers 252 // can be added as needed up until there are |max_tasks_| workers. 253 size_t max_tasks_ = 0; 254 255 // Initial value of |max_tasks_| as set in Start(). 256 size_t initial_max_tasks_ = 0; 257 258 // The maximum number of background tasks that can run concurrently in this 259 // pool. 260 int max_background_tasks_ = 0; 261 262 // The number of background tasks that are currently running in this pool. 263 int num_running_background_tasks_ = 0; 264 265 // Number of workers that are within the scope of a MAY_BLOCK 266 // ScopedBlockingCall but haven't caused a max task increase yet. 267 int num_pending_may_block_workers_ = 0; 268 269 // Number of workers that are running a TaskPriority::BACKGROUND task and are 270 // within the scope of a MAY_BLOCK ScopedBlockingCall but haven't caused a max 271 // task increase yet. 272 int num_pending_background_may_block_workers_ = 0; 273 274 // Environment to be initialized per worker. 275 WorkerEnvironment worker_environment_ = WorkerEnvironment::NONE; 276 277 // Stack of idle workers. Initially, all workers are on this stack. A worker 278 // is removed from the stack before its WakeUp() function is called and when 279 // it receives work from GetWork() (a worker calls GetWork() when its sleep 280 // timeout expires, even if its WakeUp() method hasn't been called). A worker 281 // is pushed on this stack when it receives nullptr from GetWork(). 282 SchedulerWorkerStack idle_workers_stack_; 283 284 // Signaled when a worker is added to the idle workers stack. 285 std::unique_ptr<ConditionVariable> idle_workers_stack_cv_for_testing_; 286 287 // Number of wake ups that occurred before Start(). Never modified after 288 // Start() (i.e. can be read without synchronization after Start()). 289 int num_wake_ups_before_start_ = 0; 290 291 // Stack that contains the timestamps of when workers get cleaned up. 292 // Timestamps get popped off the stack as new workers are added. 293 base::stack<TimeTicks, std::vector<TimeTicks>> cleanup_timestamps_; 294 295 // Whether we are currently polling for necessary adjustments to |max_tasks_|. 296 bool polling_max_tasks_ = false; 297 298 // Indicates to the delegates that workers are not permitted to cleanup. 299 bool worker_cleanup_disallowed_for_testing_ = false; 300 301 // Counts the number of workers cleaned up since the last call to 302 // WaitForWorkersCleanedUpForTesting() (or Start() if it wasn't called yet). 303 // |some_workers_cleaned_up_for_testing_| is true if this was ever 304 // incremented. Tests with a custom |suggested_reclaim_time_| can wait on a 305 // specific number of workers being cleaned up via 306 // WaitForWorkersCleanedUpForTesting(). 307 size_t num_workers_cleaned_up_for_testing_ = 0; 308 #if DCHECK_IS_ON() 309 bool some_workers_cleaned_up_for_testing_ = false; 310 #endif 311 312 // Signaled, if non-null, when |num_workers_cleaned_up_for_testing_| is 313 // incremented. 314 std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_; 315 316 // Used for testing and makes MayBlockThreshold() return the maximum 317 // TimeDelta. 318 AtomicFlag maximum_blocked_threshold_for_testing_; 319 320 #if DCHECK_IS_ON() 321 // Set at the start of JoinForTesting(). 322 AtomicFlag join_for_testing_started_; 323 #endif 324 325 // TaskScheduler.DetachDuration.[worker pool name] histogram. Intentionally 326 // leaked. 327 HistogramBase* const detach_duration_histogram_; 328 329 // TaskScheduler.NumTasksBeforeDetach.[worker pool name] histogram. 330 // Intentionally leaked. 331 HistogramBase* const num_tasks_before_detach_histogram_; 332 333 // TaskScheduler.NumTasksBetweenWaits.[worker pool name] histogram. 334 // Intentionally leaked. 335 HistogramBase* const num_tasks_between_waits_histogram_; 336 337 scoped_refptr<TaskRunner> service_thread_task_runner_; 338 339 // Optional observer notified when a worker enters and exits its main 340 // function. Set in Start() and never modified afterwards. 341 SchedulerWorkerObserver* scheduler_worker_observer_ = nullptr; 342 343 // Ensures recently cleaned up workers (ref. 344 // SchedulerWorkerDelegateImpl::CleanupLockRequired()) had time to exit as 345 // they have a raw reference to |this| (and to TaskTracker) which can 346 // otherwise result in racy use-after-frees per no longer being part of 347 // |workers_| and hence not being explicitly joined in JoinForTesting() : 348 // https://crbug.com/810464. 349 TrackedRefFactory<SchedulerWorkerPoolImpl> tracked_ref_factory_; 350 351 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerPoolImpl); 352 }; 353 354 } // namespace internal 355 } // namespace base 356 357 #endif // BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_ 358