1 // Copyright 2019 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_THREAD_POOL_TASK_SOURCE_H_ 6 #define BASE_TASK_THREAD_POOL_TASK_SOURCE_H_ 7 8 #include <stddef.h> 9 10 #include "base/base_export.h" 11 #include "base/containers/intrusive_heap.h" 12 #include "base/dcheck_is_on.h" 13 #include "base/memory/raw_ptr.h" 14 #include "base/memory/raw_ptr_exclusion.h" 15 #include "base/memory/ref_counted.h" 16 #include "base/sequence_token.h" 17 #include "base/task/common/checked_lock.h" 18 #include "base/task/task_traits.h" 19 #include "base/task/thread_pool/task.h" 20 #include "base/task/thread_pool/task_source_sort_key.h" 21 #include "base/threading/sequence_local_storage_map.h" 22 #include "base/time/time.h" 23 24 namespace base { 25 namespace internal { 26 27 class TaskTracker; 28 29 enum class TaskSourceExecutionMode { 30 kParallel, 31 kSequenced, 32 kSingleThread, 33 kJob, 34 kMax = kJob, 35 }; 36 37 struct BASE_EXPORT ExecutionEnvironment { 38 SequenceToken token; 39 raw_ptr<SequenceLocalStorageMap> sequence_local_storage; 40 }; 41 42 // A TaskSource is a virtual class that provides a series of Tasks that must be 43 // executed immediately or in the future. 44 // 45 // When a task source has delayed tasks but no immediate tasks, the scheduler 46 // must call OnBecomeReady() after HasReadyTasks(now) == true, which is 47 // guaranteed once now >= GetDelayedSortKey(). 48 // 49 // A task source is registered when it's ready to be added to the immediate 50 // queue. A task source is ready to be queued when either: 51 // 1- It has new tasks that can run concurrently as a result of external 52 // operations, e.g. posting a new immediate task to an empty Sequence or 53 // increasing max concurrency of a JobTaskSource; 54 // 2- A worker finished running a task from it and both DidProcessTask() and 55 // WillReEnqueue() returned true; or 56 // 3- A worker is about to run a task from it and WillRunTask() returned 57 // kAllowedNotSaturated. 58 // 4- A delayed task became ready and OnBecomeReady() returns true. 59 // 60 // A worker may perform the following sequence of operations on a 61 // RegisteredTaskSource after obtaining it from the queue: 62 // 1- Check whether a task can run with WillRunTask() (and register/enqueue the 63 // task source again if not saturated). 64 // 2- (optional) Iff (1) determined that a task can run, access the next task 65 // with TakeTask(). 66 // 3- (optional) Execute the task. 67 // 4- Inform the task source that a task was processed with DidProcessTask(), 68 // and re-enqueue the task source iff requested. The task source is ready to 69 // run immediately iff WillReEnqueue() returns true. 70 // When a task source is registered multiple times, many overlapping chains of 71 // operations may run concurrently, as permitted by WillRunTask(). This allows 72 // tasks from the same task source to run in parallel. 73 // However, the following invariants are kept: 74 // - The number of workers concurrently running tasks never goes over the 75 // intended concurrency. 76 // - If the task source has more tasks that can run concurrently, it must be 77 // queued. 78 // 79 // Note: there is a known refcounted-ownership cycle in the ThreadPool 80 // architecture: TaskSource -> TaskRunner -> TaskSource -> ... This is okay so 81 // long as the other owners of TaskSource (PriorityQueue and WorkerThread in 82 // alternation and ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork() 83 // temporarily) keep running it (and taking Tasks from it as a result). A 84 // dangling reference cycle would only occur should they release their reference 85 // to it while it's not empty. In other words, it is only correct for them to 86 // release it when DidProcessTask() returns false. 87 // 88 // This class is thread-safe. 89 class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> { 90 public: 91 // Indicates whether WillRunTask() allows TakeTask() to be called on a 92 // RegisteredTaskSource. 93 enum class RunStatus { 94 // TakeTask() cannot be called. 95 kDisallowed, 96 // TakeTask() may called, and the TaskSource has not reached its maximum 97 // concurrency (i.e. the TaskSource still needs to be queued). 98 kAllowedNotSaturated, 99 // TakeTask() may called, and the TaskSource has reached its maximum 100 // concurrency (i.e. the TaskSource no longer needs to be queued). 101 kAllowedSaturated, 102 }; 103 104 // A Transaction can perform multiple operations atomically on a 105 // TaskSource. While a Transaction is alive, it is guaranteed that nothing 106 // else will access the TaskSource; the TaskSource's lock is held for the 107 // lifetime of the Transaction. No Transaction must be held when ~TaskSource() 108 // is called. 109 class BASE_EXPORT Transaction { 110 public: 111 Transaction(Transaction&& other); 112 Transaction(const Transaction&) = delete; 113 Transaction& operator=(const Transaction&) = delete; 114 ~Transaction(); 115 116 operator bool() const { return !!task_source_; } 117 118 // Sets TaskSource priority to |priority|. 119 void UpdatePriority(TaskPriority priority); 120 121 // Returns the traits of all Tasks in the TaskSource. traits()122 TaskTraits traits() const { return task_source_->traits_; } 123 task_source()124 TaskSource* task_source() const { return task_source_; } 125 126 void Release(); 127 128 protected: 129 explicit Transaction(TaskSource* task_source); 130 131 private: 132 friend class TaskSource; 133 134 // This field is not a raw_ptr<> because it was filtered by the rewriter 135 // for: #union 136 RAW_PTR_EXCLUSION TaskSource* task_source_; 137 }; 138 139 // |traits| is metadata that applies to all Tasks in the TaskSource. 140 // |task_runner| is a reference to the TaskRunner feeding this TaskSource. 141 // |task_runner| can be nullptr only for tasks with no TaskRunner, in which 142 // case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the 143 // execution mode of |task_runner|. 144 TaskSource(const TaskTraits& traits, 145 TaskRunner* task_runner, 146 TaskSourceExecutionMode execution_mode); 147 TaskSource(const TaskSource&) = delete; 148 TaskSource& operator=(const TaskSource&) = delete; 149 150 // Begins a Transaction. This method cannot be called on a thread which has an 151 // active TaskSource::Transaction. 152 [[nodiscard]] Transaction BeginTransaction(); 153 154 virtual ExecutionEnvironment GetExecutionEnvironment() = 0; 155 156 // Thread-safe but the returned value may immediately be obsolete. As such 157 // this should only be used as a best-effort guess of how many more workers 158 // are needed. This may be called on an empty task source. 159 virtual size_t GetRemainingConcurrency() const = 0; 160 161 // Returns a TaskSourceSortKey representing the priority of the TaskSource. 162 virtual TaskSourceSortKey GetSortKey() const = 0; 163 // Returns a Timeticks object representing the next delayed runtime of the 164 // TaskSource. 165 virtual TimeTicks GetDelayedSortKey() const = 0; 166 // Returns true if there are tasks ready to be executed. Thread-safe but the 167 // returned value may immediately be obsolete. 168 virtual bool HasReadyTasks(TimeTicks now) const = 0; 169 // Returns true if the TaskSource should be moved to the immediate queue 170 // due to ready delayed tasks. Note: Returns false if the TaskSource contains 171 // ready delayed tasks, but expects to already be in the immediate queue. 172 virtual bool OnBecomeReady() = 0; 173 174 // Support for IntrusiveHeap in ThreadGroup::PriorityQueue. 175 void SetImmediateHeapHandle(const HeapHandle& handle); 176 void ClearImmediateHeapHandle(); GetImmediateHeapHandle()177 HeapHandle GetImmediateHeapHandle() const { 178 return immediate_pq_heap_handle_; 179 } 180 immediate_heap_handle()181 HeapHandle immediate_heap_handle() const { return immediate_pq_heap_handle_; } 182 183 // Support for IntrusiveHeap in ThreadGroup::DelayedPriorityQueue. 184 void SetDelayedHeapHandle(const HeapHandle& handle); 185 void ClearDelayedHeapHandle(); GetDelayedHeapHandle()186 HeapHandle GetDelayedHeapHandle() const { return delayed_pq_heap_handle_; } 187 delayed_heap_handle()188 HeapHandle delayed_heap_handle() const { return delayed_pq_heap_handle_; } 189 190 // Returns the shutdown behavior of all Tasks in the TaskSource. Can be 191 // accessed without a Transaction because it is never mutated. shutdown_behavior()192 TaskShutdownBehavior shutdown_behavior() const { 193 return traits_.shutdown_behavior(); 194 } 195 // Returns a racy priority of the TaskSource. Can be accessed without a 196 // Transaction but may return an outdated result. priority_racy()197 TaskPriority priority_racy() const { 198 return priority_racy_.load(std::memory_order_relaxed); 199 } 200 // Returns the thread policy of the TaskSource. Can be accessed without a 201 // Transaction because it is never mutated. thread_policy()202 ThreadPolicy thread_policy() const { return traits_.thread_policy(); } 203 204 // A reference to TaskRunner is only retained between 205 // PushImmediateTask()/PushDelayedTask() and when DidProcessTask() returns 206 // false, guaranteeing it is safe to dereference this pointer. Otherwise, the 207 // caller should guarantee such TaskRunner still exists before dereferencing. task_runner()208 TaskRunner* task_runner() const { return task_runner_; } 209 execution_mode()210 TaskSourceExecutionMode execution_mode() const { return execution_mode_; } 211 212 void ClearForTesting(); 213 214 protected: 215 virtual ~TaskSource(); 216 217 virtual RunStatus WillRunTask() = 0; 218 219 // Implementations of TakeTask(), DidProcessTask(), WillReEnqueue(), and 220 // Clear() must ensure proper synchronization iff |transaction| is nullptr. 221 virtual Task TakeTask(TaskSource::Transaction* transaction) = 0; 222 virtual bool DidProcessTask(TaskSource::Transaction* transaction) = 0; 223 virtual bool WillReEnqueue(TimeTicks now, 224 TaskSource::Transaction* transaction) = 0; 225 226 // This may be called for each outstanding RegisteredTaskSource that's ready. 227 // The implementation needs to support this being called multiple times; 228 // unless it guarantees never to hand-out multiple RegisteredTaskSources that 229 // are concurrently ready. 230 virtual Task Clear(TaskSource::Transaction* transaction) = 0; 231 232 // Sets TaskSource priority to |priority|. 233 void UpdatePriority(TaskPriority priority); 234 235 // The TaskTraits of all Tasks in the TaskSource. 236 TaskTraits traits_; 237 238 // The cached priority for atomic access. 239 std::atomic<TaskPriority> priority_racy_; 240 241 // Synchronizes access to all members. 242 mutable CheckedLock lock_{UniversalPredecessor()}; 243 244 private: 245 friend class RefCountedThreadSafe<TaskSource>; 246 friend class RegisteredTaskSource; 247 248 // The TaskSource's position in its current PriorityQueue. Access is protected 249 // by the PriorityQueue's lock. 250 HeapHandle immediate_pq_heap_handle_; 251 252 // The TaskSource's position in its current DelayedPriorityQueue. Access is 253 // protected by the DelayedPriorityQueue's lock. 254 HeapHandle delayed_pq_heap_handle_; 255 256 // A pointer to the TaskRunner that posts to this TaskSource, if any. The 257 // derived class is responsible for calling AddRef() when a TaskSource from 258 // which no Task is executing becomes non-empty and Release() when 259 // it becomes empty again (e.g. when DidProcessTask() returns false). 260 // 261 // In practise, this pointer is going to become dangling. See task_runner() 262 // comment. 263 raw_ptr<TaskRunner, DisableDanglingPtrDetection> task_runner_; 264 265 TaskSourceExecutionMode execution_mode_; 266 }; 267 268 // Wrapper around TaskSource to signify the intent to queue and run it. 269 // RegisteredTaskSource can only be created with TaskTracker and may only be 270 // used by a single worker at a time. However, the same task source may be 271 // registered several times, spawning multiple RegisteredTaskSources. A 272 // RegisteredTaskSource resets to its initial state when WillRunTask() fails 273 // or after DidProcessTask() and WillReEnqueue(), so it can be used again. 274 class BASE_EXPORT RegisteredTaskSource { 275 public: 276 RegisteredTaskSource(); 277 RegisteredTaskSource(std::nullptr_t); 278 RegisteredTaskSource(RegisteredTaskSource&& other) noexcept; 279 RegisteredTaskSource(const RegisteredTaskSource&) = delete; 280 RegisteredTaskSource& operator=(const RegisteredTaskSource&) = delete; 281 ~RegisteredTaskSource(); 282 283 RegisteredTaskSource& operator=(RegisteredTaskSource&& other); 284 285 operator bool() const { return task_source_ != nullptr; } 286 TaskSource* operator->() const { return task_source_.get(); } get()287 TaskSource* get() const { return task_source_.get(); } 288 289 static RegisteredTaskSource CreateForTesting( 290 scoped_refptr<TaskSource> task_source, 291 TaskTracker* task_tracker = nullptr); 292 293 // Can only be called if this RegisteredTaskSource is in its initial state. 294 // Returns the underlying task source. An Optional is used in preparation for 295 // the merge between ThreadPool and TaskQueueManager (in Blink). 296 // https://crbug.com/783309 297 scoped_refptr<TaskSource> Unregister(); 298 299 // Informs this TaskSource that the current worker would like to run a Task 300 // from it. Can only be called if in its initial state. Returns a RunStatus 301 // that indicates if the operation is allowed (TakeTask() can be called). 302 TaskSource::RunStatus WillRunTask(); 303 304 // Returns the next task to run from this TaskSource. This should be called 305 // only after WillRunTask() returned RunStatus::kAllowed*. |transaction| is 306 // optional and should only be provided if this operation is already part of 307 // a transaction. 308 [[nodiscard]] Task TakeTask(TaskSource::Transaction* transaction = nullptr); 309 310 // Must be called after WillRunTask() or once the task was run if TakeTask() 311 // was called. This resets this RegisteredTaskSource to its initial state so 312 // that WillRunTask() may be called again. |transaction| is optional and 313 // should only be provided if this operation is already part of a transaction. 314 // Returns true if the TaskSource should be queued after this operation. 315 bool DidProcessTask(TaskSource::Transaction* transaction = nullptr); 316 317 // Must be called iff DidProcessTask() previously returns true . 318 // |transaction| is optional and should only be provided if this 319 // operation is already part of a transaction. Returns true if the 320 // TaskSource is ready to run immediately. 321 bool WillReEnqueue(TimeTicks now, 322 TaskSource::Transaction* transaction = nullptr); 323 324 // Returns a task that clears this TaskSource to make it empty. |transaction| 325 // is optional and should only be provided if this operation is already part 326 // of a transaction. 327 [[nodiscard]] Task Clear(TaskSource::Transaction* transaction = nullptr); 328 329 private: 330 friend class TaskTracker; 331 RegisteredTaskSource(scoped_refptr<TaskSource> task_source, 332 TaskTracker* task_tracker); 333 334 #if DCHECK_IS_ON() 335 // Indicates the step of a task execution chain. 336 enum class State { 337 kInitial, // WillRunTask() may be called. 338 kReady, // After WillRunTask() returned a valid RunStatus. 339 }; 340 341 State run_step_ = State::kInitial; 342 #endif // DCHECK_IS_ON() 343 344 scoped_refptr<TaskSource> task_source_; 345 // This field is not a raw_ptr<> because it was filtered by the rewriter for: 346 // #union 347 RAW_PTR_EXCLUSION TaskTracker* task_tracker_ = nullptr; 348 }; 349 350 // A pair of Transaction and RegisteredTaskSource. Useful to carry a 351 // RegisteredTaskSource with an associated Transaction. 352 // TODO(crbug.com/839091): Rename to RegisteredTaskSourceAndTransaction. 353 struct BASE_EXPORT TransactionWithRegisteredTaskSource { 354 public: 355 TransactionWithRegisteredTaskSource(RegisteredTaskSource task_source_in, 356 TaskSource::Transaction transaction_in); 357 358 TransactionWithRegisteredTaskSource( 359 TransactionWithRegisteredTaskSource&& other) = default; 360 TransactionWithRegisteredTaskSource( 361 const TransactionWithRegisteredTaskSource&) = delete; 362 TransactionWithRegisteredTaskSource& operator=( 363 const TransactionWithRegisteredTaskSource&) = delete; 364 ~TransactionWithRegisteredTaskSource() = default; 365 366 static TransactionWithRegisteredTaskSource FromTaskSource( 367 RegisteredTaskSource task_source_in); 368 369 RegisteredTaskSource task_source; 370 TaskSource::Transaction transaction; 371 }; 372 373 struct BASE_EXPORT TaskSourceAndTransaction { 374 public: 375 TaskSourceAndTransaction(scoped_refptr<TaskSource> task_source_in, 376 TaskSource::Transaction transaction_in); 377 378 TaskSourceAndTransaction(TaskSourceAndTransaction&& other); 379 TaskSourceAndTransaction(const TaskSourceAndTransaction&) = delete; 380 TaskSourceAndTransaction& operator=(const TaskSourceAndTransaction&) = delete; 381 ~TaskSourceAndTransaction(); 382 383 static TaskSourceAndTransaction FromTaskSource( 384 scoped_refptr<TaskSource> task_source_in); 385 386 scoped_refptr<TaskSource> task_source; 387 TaskSource::Transaction transaction; 388 }; 389 390 } // namespace internal 391 } // namespace base 392 393 #endif // BASE_TASK_THREAD_POOL_TASK_SOURCE_H_ 394