1 // Copyright 2019 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_ 6 #define BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_ 7 8 #include <stddef.h> 9 10 #include <atomic> 11 #include <cstdint> 12 #include <utility> 13 14 #include "base/base_export.h" 15 #include "base/functional/callback.h" 16 #include "base/memory/raw_ptr.h" 17 #include "base/synchronization/waitable_event.h" 18 #include "base/task/common/checked_lock.h" 19 #include "base/task/common/task_annotator.h" 20 #include "base/task/post_job.h" 21 #include "base/task/task_traits.h" 22 #include "base/task/thread_pool/job_task_source_interface.h" 23 #include "base/task/thread_pool/task.h" 24 #include "base/task/thread_pool/task_source.h" 25 #include "base/task/thread_pool/task_source_sort_key.h" 26 27 namespace base::internal { 28 29 class PooledTaskRunnerDelegate; 30 31 // A JobTaskSource generates many Tasks from a single RepeatingClosure. 32 // 33 // Derived classes control the intended concurrency with GetMaxConcurrency(). 34 class BASE_EXPORT JobTaskSourceNew : public JobTaskSource { 35 public: 36 JobTaskSourceNew(const Location& from_here, 37 const TaskTraits& traits, 38 RepeatingCallback<void(JobDelegate*)> worker_task, 39 MaxConcurrencyCallback max_concurrency_callback, 40 PooledTaskRunnerDelegate* delegate); 41 JobTaskSourceNew(const JobTaskSource&) = delete; 42 JobTaskSourceNew& operator=(const JobTaskSourceNew&) = delete; 43 44 // Called before the task source is enqueued to initialize task metadata. 45 void WillEnqueue(int sequence_num, TaskAnnotator& annotator) override; 46 47 // Notifies this task source that max concurrency increased. Returns false iff 48 // there was an unsuccessful attempt to enqueue the task source. 49 bool NotifyConcurrencyIncrease() override; 50 51 // Informs this JobTaskSource that the current thread would like to join and 52 // contribute to running |worker_task|. Returns true if the joining thread can 53 // contribute (RunJoinTask() can be called), or false if joining was completed 54 // and all other workers returned because either there's no work remaining or 55 // Job was cancelled. 56 bool WillJoin() override; 57 58 // Contributes to running |worker_task| and returns true if the joining thread 59 // can contribute again (RunJoinTask() can be called again), or false if 60 // joining was completed and all other workers returned because either there's 61 // no work remaining or Job was cancelled. This should be called only after 62 // WillJoin() or RunJoinTask() previously returned true. 63 bool RunJoinTask() override; 64 65 // Cancels this JobTaskSource, causing all workers to yield and WillRunTask() 66 // to return RunStatus::kDisallowed. 67 void Cancel(TaskSource::Transaction* transaction = nullptr) override; 68 69 // TaskSource: 70 ExecutionEnvironment GetExecutionEnvironment() override; 71 size_t GetRemainingConcurrency() const override; 72 TaskSourceSortKey GetSortKey() const override; 73 TimeTicks GetDelayedSortKey() const override; 74 bool HasReadyTasks(TimeTicks now) const override; 75 76 bool IsActive() const override; 77 size_t GetWorkerCount() const override; 78 79 // Returns the maximum number of tasks from this TaskSource that can run 80 // concurrently. 81 size_t GetMaxConcurrency() const override; 82 83 uint8_t AcquireTaskId() override; 84 void ReleaseTaskId(uint8_t task_id) override; 85 86 // Returns true if a worker should return from the worker task on the current 87 // thread ASAP. 88 bool ShouldYield() override; 89 90 PooledTaskRunnerDelegate* GetDelegate() const override; 91 92 private: 93 // Atomic variable to track job state. 94 class State { 95 public: 96 // When set, the job is canceled. 97 static constexpr uint32_t kCanceledMask = 1 << 0; 98 // When set, the Join()'ing thread wants to be signaled when worker count 99 // is decremented or capacity is created by a max concurrency increase. 100 static constexpr uint32_t kSignalJoinMask = 1 << 1; 101 // When set, the job is queued. Note: The job may be queued when this is not 102 // set, see details in JobTaskSource::State::ExitWillRunTask(). 103 static constexpr uint32_t kQueuedMask = 1 << 2; 104 // When set, WillRunTask() is not running *or* WillRunTask() is running and 105 // there was a request to keep the job queued (via 106 // `ShouldQueueUponCapacityIncrease()` or `WillReenqueue()`). 107 static constexpr uint32_t kOutsideWillRunTaskOrMustReenqueueMask = 1 << 3; 108 // Offset for the number of workers running the job. 109 static constexpr int kWorkerCountBitOffset = 4; 110 static constexpr uint32_t kWorkerCountIncrement = 1 111 << kWorkerCountBitOffset; 112 113 struct Value { worker_countValue114 uint8_t worker_count() const { 115 return static_cast<uint8_t>(value >> kWorkerCountBitOffset); 116 } canceledValue117 bool canceled() const { return value & kCanceledMask; } signal_joinValue118 bool signal_join() const { return value & kSignalJoinMask; } queuedValue119 bool queued() const { return value & kQueuedMask; } outside_will_run_task_or_must_reenqueueValue120 bool outside_will_run_task_or_must_reenqueue() const { 121 return value & kOutsideWillRunTaskOrMustReenqueueMask; 122 } 123 124 uint32_t value; 125 }; 126 127 State(); 128 ~State(); 129 130 // Sets as canceled. Returns the state before the operation. 131 Value Cancel(); 132 133 // Increments the worker count by 1. Returns the state before the operation. 134 // 135 // This requires holding `increment_worker_count_lock()`, to allow 136 // WaitForParticipationOpportunity() to check worker count and apply changes 137 // with a guarantee that it hasn't been incremented in between (worker count 138 // could still be decremented while the lock is held). 139 Value IncrementWorkerCount() 140 EXCLUSIVE_LOCKS_REQUIRED(increment_worker_count_lock()); 141 142 // Decrements the worker count by 1. Returns the state before the operation. 143 Value DecrementWorkerCount(); 144 145 // Requests to signal the Join()'ing thread when worker count is 146 // decremented or capacity is created by increasing "max concurrency". 147 // Returns the state before the operation. 148 Value RequestSignalJoin(); 149 150 // Returns whether the Join()'ing thread should be signaled when worker 151 // count is decremented or capacity is created by increasing "max 152 // concurrency". Resets the bit so that this won't return true until 153 // `RequestSignalJoin()` is called again. 154 bool FetchAndResetRequestSignalJoin(); 155 156 // Indicates that max capacity was increased above the number of workers. 157 // Returns true iff the job should be queued. 158 bool ShouldQueueUponCapacityIncrease(); 159 160 // Indicates that WillRunTask() was entered. Returns the previous state. 161 Value EnterWillRunTask(); 162 163 // Indicates that WillRunTask() will exit. `saturated` is true iff 164 // `WillRunTask()` determined that max concurrency is reached. Returns true 165 // iff `ShouldQueueUponCapacityIncrease()` or `WillQueue()` was invoked 166 // since `EnterWillRunTask()`. 167 bool ExitWillRunTask(bool saturated); 168 169 // Indicates that `DidProcessTask()` decided to re-enqueue the job. If this 170 // returns false, the job shouldn't re-enqueue the job (another worker 171 // currently in `WillRunTask()` will request that it remains in the queue). 172 bool WillReenqueue(); 173 174 // Loads and returns the state. 175 Value Load() const; 176 177 // Returns a lock that must be held to call `IncrementWorkerCount()`. increment_worker_count_lock()178 CheckedLock& increment_worker_count_lock() { 179 return increment_worker_count_lock_; 180 } 181 182 private: 183 std::atomic<uint32_t> value_{kOutsideWillRunTaskOrMustReenqueueMask}; 184 CheckedLock increment_worker_count_lock_{UniversalSuccessor()}; 185 }; 186 187 ~JobTaskSourceNew() override; 188 189 // Called from the joining thread. Waits for the worker count to be below or 190 // equal to max concurrency (may happen when "max concurrency" increases or 191 // the worker count is decremented). Returns true if the joining thread should 192 // run a task, or false if joining was completed and all other workers 193 // returned because either there's no work remaining or Job was cancelled. 194 bool WaitForParticipationOpportunity(); 195 196 size_t GetMaxConcurrency(size_t worker_count) const; 197 198 // TaskSource: 199 RunStatus WillRunTask() override; 200 Task TakeTask(TaskSource::Transaction* transaction) override; 201 absl::optional<Task> Clear(TaskSource::Transaction* transaction) override; 202 bool DidProcessTask(TaskSource::Transaction* transaction) override; 203 bool WillReEnqueue(TimeTicks now, 204 TaskSource::Transaction* transaction) override; 205 bool OnBecomeReady() override; 206 207 State state_; 208 209 // Signaled when the joining thread wants to particpate and capacity is 210 // created by increasing "max concurrency" or decrementing the worker count. 211 WaitableEvent join_event_{WaitableEvent::ResetPolicy::AUTOMATIC}; 212 213 std::atomic<uint32_t> assigned_task_ids_{0}; 214 215 RepeatingCallback<size_t(size_t)> max_concurrency_callback_; 216 217 // Worker task set by the job owner. 218 RepeatingCallback<void(JobDelegate*)> worker_task_; 219 // Task returned from TakeTask(), that calls |worker_task_| internally. 220 RepeatingClosure primary_task_; 221 222 TaskMetadata task_metadata_; 223 224 const TimeTicks ready_time_; 225 raw_ptr<PooledTaskRunnerDelegate, LeakedDanglingUntriaged> delegate_; 226 }; 227 228 } // namespace base::internal 229 230 #endif // BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_ 231