1 // Copyright 2019 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_ 6 #define BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_ 7 8 #include <stddef.h> 9 10 #include <atomic> 11 #include <limits> 12 #include <memory> 13 #include <utility> 14 15 #include "base/base_export.h" 16 #include "base/functional/callback.h" 17 #include "base/memory/raw_ptr.h" 18 #include "base/synchronization/condition_variable.h" 19 #include "base/task/common/checked_lock.h" 20 #include "base/task/post_job.h" 21 #include "base/task/task_traits.h" 22 #include "base/task/thread_pool/task.h" 23 #include "base/task/thread_pool/task_source.h" 24 #include "base/task/thread_pool/task_source_sort_key.h" 25 26 namespace base { 27 namespace internal { 28 29 class PooledTaskRunnerDelegate; 30 31 // A JobTaskSource generates many Tasks from a single RepeatingClosure. 32 // 33 // Derived classes control the intended concurrency with GetMaxConcurrency(). 34 class BASE_EXPORT JobTaskSource : public TaskSource { 35 public: 36 JobTaskSource(const Location& from_here, 37 const TaskTraits& traits, 38 RepeatingCallback<void(JobDelegate*)> worker_task, 39 MaxConcurrencyCallback max_concurrency_callback, 40 PooledTaskRunnerDelegate* delegate); 41 JobTaskSource(const JobTaskSource&) = delete; 42 JobTaskSource& operator=(const JobTaskSource&) = delete; 43 CreateJobHandle(scoped_refptr<internal::JobTaskSource> task_source)44 static JobHandle CreateJobHandle( 45 scoped_refptr<internal::JobTaskSource> task_source) { 46 return JobHandle(std::move(task_source)); 47 } 48 49 // Notifies this task source that max concurrency was increased, and the 50 // number of worker should be adjusted. 51 void NotifyConcurrencyIncrease(); 52 53 // Informs this JobTaskSource that the current thread would like to join and 54 // contribute to running |worker_task|. Returns true if the joining thread can 55 // contribute (RunJoinTask() can be called), or false if joining was completed 56 // and all other workers returned because either there's no work remaining or 57 // Job was cancelled. 58 bool WillJoin(); 59 60 // Contributes to running |worker_task| and returns true if the joining thread 61 // can contribute again (RunJoinTask() can be called again), or false if 62 // joining was completed and all other workers returned because either there's 63 // no work remaining or Job was cancelled. This should be called only after 64 // WillJoin() or RunJoinTask() previously returned true. 65 bool RunJoinTask(); 66 67 // Cancels this JobTaskSource, causing all workers to yield and WillRunTask() 68 // to return RunStatus::kDisallowed. 69 void Cancel(TaskSource::Transaction* transaction = nullptr); 70 71 // TaskSource: 72 ExecutionEnvironment GetExecutionEnvironment() override; 73 size_t GetRemainingConcurrency() const override; 74 TaskSourceSortKey GetSortKey() const override; 75 TimeTicks GetDelayedSortKey() const override; 76 bool HasReadyTasks(TimeTicks now) const override; 77 78 bool IsActive() const; 79 size_t GetWorkerCount() const; 80 81 // Returns the maximum number of tasks from this TaskSource that can run 82 // concurrently. 83 size_t GetMaxConcurrency() const; 84 85 uint8_t AcquireTaskId(); 86 void ReleaseTaskId(uint8_t task_id); 87 88 // Returns true if a worker should return from the worker task on the current 89 // thread ASAP. 90 bool ShouldYield(); 91 delegate()92 PooledTaskRunnerDelegate* delegate() const { return delegate_; } 93 94 private: 95 // Atomic internal state to track the number of workers running a task from 96 // this JobTaskSource and whether this JobTaskSource is canceled. All 97 // operations are performed with std::memory_order_relaxed as State is only 98 // ever modified under a lock or read atomically (optimistic read). 99 class State { 100 public: 101 static constexpr uint32_t kCanceledMask = 1; 102 static constexpr int kWorkerCountBitOffset = 1; 103 static constexpr uint32_t kWorkerCountIncrement = 1 104 << kWorkerCountBitOffset; 105 106 struct Value { worker_countValue107 uint8_t worker_count() const { 108 return static_cast<uint8_t>(value >> kWorkerCountBitOffset); 109 } 110 // Returns true if canceled. is_canceledValue111 bool is_canceled() const { return value & kCanceledMask; } 112 113 uint32_t value; 114 }; 115 116 State(); 117 ~State(); 118 119 // Sets as canceled. Returns the state 120 // before the operation. 121 Value Cancel(); 122 123 // Increments the worker count by 1. Returns the state before the operation. 124 Value IncrementWorkerCount(); 125 126 // Decrements the worker count by 1. Returns the state before the operation. 127 Value DecrementWorkerCount(); 128 129 // Loads and returns the state. 130 Value Load() const; 131 132 private: 133 std::atomic<uint32_t> value_{0}; 134 }; 135 136 // Atomic flag that indicates if the joining thread is currently waiting on 137 // another worker to yield or to signal. 138 class JoinFlag { 139 public: 140 static constexpr uint32_t kNotWaiting = 0; 141 static constexpr uint32_t kWaitingForWorkerToSignal = 1; 142 static constexpr uint32_t kWaitingForWorkerToYield = 3; 143 // kWaitingForWorkerToYield is 3 because the impl relies on the following 144 // property. 145 static_assert((kWaitingForWorkerToYield & kWaitingForWorkerToSignal) == 146 kWaitingForWorkerToSignal, 147 ""); 148 149 JoinFlag(); 150 ~JoinFlag(); 151 152 // Returns true if the status is not kNotWaiting, using 153 // std::memory_order_relaxed. IsWaiting()154 bool IsWaiting() { 155 return value_.load(std::memory_order_relaxed) != kNotWaiting; 156 } 157 158 // Resets the status as kNotWaiting using std::memory_order_relaxed. 159 void Reset(); 160 161 // Sets the status as kWaitingForWorkerToYield using 162 // std::memory_order_relaxed. 163 void SetWaiting(); 164 165 // If the flag is kWaitingForWorkerToYield, returns true indicating that the 166 // worker should yield, and atomically updates to kWaitingForWorkerToSignal 167 // (using std::memory_order_relaxed) to ensure that a single worker yields 168 // in response to SetWaiting(). 169 bool ShouldWorkerYield(); 170 171 // If the flag is kWaiting*, returns true indicating that the worker should 172 // signal, and atomically updates to kNotWaiting (using 173 // std::memory_order_relaxed) to ensure that a single worker signals in 174 // response to SetWaiting(). 175 bool ShouldWorkerSignal(); 176 177 private: 178 std::atomic<uint32_t> value_{kNotWaiting}; 179 }; 180 181 ~JobTaskSource() override; 182 183 // Called from the joining thread. Waits for the worker count to be below or 184 // equal to max concurrency (will happen when a worker calls 185 // DidProcessTask()). Returns true if the joining thread should run a task, or 186 // false if joining was completed and all other workers returned because 187 // either there's no work remaining or Job was cancelled. 188 bool WaitForParticipationOpportunity() EXCLUSIVE_LOCKS_REQUIRED(worker_lock_); 189 190 size_t GetMaxConcurrency(size_t worker_count) const; 191 192 // TaskSource: 193 RunStatus WillRunTask() override; 194 Task TakeTask(TaskSource::Transaction* transaction) override; 195 Task Clear(TaskSource::Transaction* transaction) override; 196 bool DidProcessTask(TaskSource::Transaction* transaction) override; 197 bool WillReEnqueue(TimeTicks now, 198 TaskSource::Transaction* transaction) override; 199 bool OnBecomeReady() override; 200 201 // Synchronizes access to workers state. 202 mutable CheckedLock worker_lock_{UniversalSuccessor()}; 203 204 // Current atomic state (atomic despite the lock to allow optimistic reads 205 // and cancellation without the lock). 206 State state_ GUARDED_BY(worker_lock_); 207 // Normally, |join_flag_| is protected by |lock_|, except in ShouldYield() 208 // hence the use of atomics. 209 JoinFlag join_flag_ GUARDED_BY(worker_lock_); 210 // Signaled when |join_flag_| is kWaiting* and a worker returns. 211 std::unique_ptr<ConditionVariable> worker_released_condition_ 212 GUARDED_BY(worker_lock_); 213 214 std::atomic<uint32_t> assigned_task_ids_{0}; 215 216 const Location from_here_; 217 RepeatingCallback<size_t(size_t)> max_concurrency_callback_; 218 219 // Worker task set by the job owner. 220 RepeatingCallback<void(JobDelegate*)> worker_task_; 221 // Task returned from TakeTask(), that calls |worker_task_| internally. 222 RepeatingClosure primary_task_; 223 224 const TimeTicks ready_time_; 225 raw_ptr<PooledTaskRunnerDelegate> delegate_; 226 }; 227 228 } // namespace internal 229 } // namespace base 230 231 #endif // BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_ 232