1 // Copyright 2019 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_OLD_H_ 6 #define BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_OLD_H_ 7 8 #include <stddef.h> 9 10 #include <atomic> 11 #include <limits> 12 #include <memory> 13 #include <utility> 14 15 #include "base/base_export.h" 16 #include "base/functional/callback.h" 17 #include "base/memory/raw_ptr.h" 18 #include "base/synchronization/condition_variable.h" 19 #include "base/task/common/checked_lock.h" 20 #include "base/task/post_job.h" 21 #include "base/task/task_traits.h" 22 #include "base/task/thread_pool/job_task_source_interface.h" 23 #include "base/task/thread_pool/task.h" 24 #include "base/task/thread_pool/task_source.h" 25 #include "base/task/thread_pool/task_source_sort_key.h" 26 27 namespace base { 28 namespace internal { 29 30 class PooledTaskRunnerDelegate; 31 32 // A JobTaskSource generates many Tasks from a single RepeatingClosure. 33 // 34 // Derived classes control the intended concurrency with GetMaxConcurrency(). 35 class BASE_EXPORT JobTaskSourceOld : public JobTaskSource { 36 public: 37 JobTaskSourceOld(const Location& from_here, 38 const TaskTraits& traits, 39 RepeatingCallback<void(JobDelegate*)> worker_task, 40 MaxConcurrencyCallback max_concurrency_callback, 41 PooledTaskRunnerDelegate* delegate); 42 JobTaskSourceOld(const JobTaskSource&) = delete; 43 JobTaskSourceOld& operator=(const JobTaskSourceOld&) = delete; 44 45 // Called before the task source is enqueued to initialize task metadata. 46 void WillEnqueue(int sequence_num, TaskAnnotator& annotator) override; 47 48 // Notifies this task source that max concurrency was increased, and the 49 // number of worker should be adjusted. 50 bool NotifyConcurrencyIncrease() override; 51 52 // Informs this JobTaskSource that the current thread would like to join and 53 // contribute to running |worker_task|. Returns true if the joining thread can 54 // contribute (RunJoinTask() can be called), or false if joining was completed 55 // and all other workers returned because either there's no work remaining or 56 // Job was cancelled. 57 bool WillJoin() override; 58 59 // Contributes to running |worker_task| and returns true if the joining thread 60 // can contribute again (RunJoinTask() can be called again), or false if 61 // joining was completed and all other workers returned because either there's 62 // no work remaining or Job was cancelled. This should be called only after 63 // WillJoin() or RunJoinTask() previously returned true. 64 bool RunJoinTask() override; 65 66 // Cancels this JobTaskSource, causing all workers to yield and WillRunTask() 67 // to return RunStatus::kDisallowed. 68 void Cancel(TaskSource::Transaction* transaction = nullptr) override; 69 70 // TaskSource: 71 ExecutionEnvironment GetExecutionEnvironment() override; 72 size_t GetRemainingConcurrency() const override; 73 TaskSourceSortKey GetSortKey() const override; 74 TimeTicks GetDelayedSortKey() const override; 75 bool HasReadyTasks(TimeTicks now) const override; 76 77 bool IsActive() const override; 78 size_t GetWorkerCount() const override; 79 80 // Returns the maximum number of tasks from this TaskSource that can run 81 // concurrently. 82 size_t GetMaxConcurrency() const override; 83 84 uint8_t AcquireTaskId() override; 85 void ReleaseTaskId(uint8_t task_id) override; 86 87 // Returns true if a worker should return from the worker task on the current 88 // thread ASAP. 89 bool ShouldYield() override; 90 91 PooledTaskRunnerDelegate* GetDelegate() const override; 92 93 private: 94 // Atomic internal state to track the number of workers running a task from 95 // this JobTaskSource and whether this JobTaskSource is canceled. All 96 // operations are performed with std::memory_order_relaxed as State is only 97 // ever modified under a lock or read atomically (optimistic read). 98 class State { 99 public: 100 static constexpr uint32_t kCanceledMask = 1; 101 static constexpr int kWorkerCountBitOffset = 1; 102 static constexpr uint32_t kWorkerCountIncrement = 1 103 << kWorkerCountBitOffset; 104 105 struct Value { worker_countValue106 uint8_t worker_count() const { 107 return static_cast<uint8_t>(value >> kWorkerCountBitOffset); 108 } 109 // Returns true if canceled. is_canceledValue110 bool is_canceled() const { return value & kCanceledMask; } 111 112 uint32_t value; 113 }; 114 115 State(); 116 ~State(); 117 118 // Sets as canceled. Returns the state 119 // before the operation. 120 Value Cancel(); 121 122 // Increments the worker count by 1. Returns the state before the operation. 123 Value IncrementWorkerCount(); 124 125 // Decrements the worker count by 1. Returns the state before the operation. 126 Value DecrementWorkerCount(); 127 128 // Loads and returns the state. 129 Value Load() const; 130 131 private: 132 std::atomic<uint32_t> value_{0}; 133 }; 134 135 // Atomic flag that indicates if the joining thread is currently waiting on 136 // another worker to yield or to signal. 137 class JoinFlag { 138 public: 139 static constexpr uint32_t kNotWaiting = 0; 140 static constexpr uint32_t kWaitingForWorkerToSignal = 1; 141 static constexpr uint32_t kWaitingForWorkerToYield = 3; 142 // kWaitingForWorkerToYield is 3 because the impl relies on the following 143 // property. 144 static_assert((kWaitingForWorkerToYield & kWaitingForWorkerToSignal) == 145 kWaitingForWorkerToSignal, 146 ""); 147 148 JoinFlag(); 149 ~JoinFlag(); 150 151 // Returns true if the status is not kNotWaiting, using 152 // std::memory_order_relaxed. IsWaiting()153 bool IsWaiting() { 154 return value_.load(std::memory_order_relaxed) != kNotWaiting; 155 } 156 157 // Resets the status as kNotWaiting using std::memory_order_relaxed. 158 void Reset(); 159 160 // Sets the status as kWaitingForWorkerToYield using 161 // std::memory_order_relaxed. 162 void SetWaiting(); 163 164 // If the flag is kWaitingForWorkerToYield, returns true indicating that the 165 // worker should yield, and atomically updates to kWaitingForWorkerToSignal 166 // (using std::memory_order_relaxed) to ensure that a single worker yields 167 // in response to SetWaiting(). 168 bool ShouldWorkerYield(); 169 170 // If the flag is kWaiting*, returns true indicating that the worker should 171 // signal, and atomically updates to kNotWaiting (using 172 // std::memory_order_relaxed) to ensure that a single worker signals in 173 // response to SetWaiting(). 174 bool ShouldWorkerSignal(); 175 176 private: 177 std::atomic<uint32_t> value_{kNotWaiting}; 178 }; 179 180 ~JobTaskSourceOld() override; 181 182 // Called from the joining thread. Waits for the worker count to be below or 183 // equal to max concurrency (will happen when a worker calls 184 // DidProcessTask()). Returns true if the joining thread should run a task, or 185 // false if joining was completed and all other workers returned because 186 // either there's no work remaining or Job was cancelled. 187 bool WaitForParticipationOpportunity() EXCLUSIVE_LOCKS_REQUIRED(worker_lock_); 188 189 size_t GetMaxConcurrency(size_t worker_count) const; 190 191 // TaskSource: 192 RunStatus WillRunTask() override; 193 Task TakeTask(TaskSource::Transaction* transaction) override; 194 absl::optional<Task> Clear(TaskSource::Transaction* transaction) override; 195 bool DidProcessTask(TaskSource::Transaction* transaction) override; 196 bool WillReEnqueue(TimeTicks now, 197 TaskSource::Transaction* transaction) override; 198 bool OnBecomeReady() override; 199 200 // Synchronizes access to workers state. 201 mutable CheckedLock worker_lock_{UniversalSuccessor()}; 202 203 // Current atomic state (atomic despite the lock to allow optimistic reads 204 // and cancellation without the lock). 205 State state_ GUARDED_BY(worker_lock_); 206 // Normally, |join_flag_| is protected by |lock_|, except in ShouldYield() 207 // hence the use of atomics. 208 JoinFlag join_flag_ GUARDED_BY(worker_lock_); 209 // Signaled when |join_flag_| is kWaiting* and a worker returns. 210 std::unique_ptr<ConditionVariable> worker_released_condition_ 211 GUARDED_BY(worker_lock_); 212 213 std::atomic<uint32_t> assigned_task_ids_{0}; 214 215 RepeatingCallback<size_t(size_t)> max_concurrency_callback_; 216 217 // Worker task set by the job owner. 218 RepeatingCallback<void(JobDelegate*)> worker_task_; 219 // Task returned from TakeTask(), that calls |worker_task_| internally. 220 RepeatingClosure primary_task_; 221 222 TaskMetadata task_metadata_; 223 224 const TimeTicks ready_time_; 225 raw_ptr<PooledTaskRunnerDelegate, LeakedDanglingUntriaged> delegate_; 226 }; 227 228 } // namespace internal 229 } // namespace base 230 231 #endif // BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_OLD_H_ 232