1 // Copyright 2019 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_POST_JOB_H_ 6 #define BASE_TASK_POST_JOB_H_ 7 8 #include <limits> 9 10 #include "base/base_export.h" 11 #include "base/dcheck_is_on.h" 12 #include "base/functional/callback.h" 13 #include "base/location.h" 14 #include "base/memory/raw_ptr.h" 15 16 namespace base { 17 namespace internal { 18 class JobTaskSource; 19 class PooledTaskRunnerDelegate; 20 } 21 22 class TaskTraits; 23 enum class TaskPriority : uint8_t; 24 25 // Delegate that's passed to Job's worker task, providing an entry point to 26 // communicate with the scheduler. To prevent deadlocks, JobDelegate methods 27 // should never be called while holding a user lock. 28 class BASE_EXPORT JobDelegate { 29 public: 30 // A JobDelegate is instantiated for each worker task that is run. 31 // |task_source| is the task source whose worker task is running with this 32 // delegate and |pooled_task_runner_delegate| is used by ShouldYield() to 33 // check whether the pool wants this worker task to yield (null if this worker 34 // should never yield -- e.g. when the main thread is a worker). 35 JobDelegate(internal::JobTaskSource* task_source, 36 internal::PooledTaskRunnerDelegate* pooled_task_runner_delegate); 37 38 JobDelegate(const JobDelegate&) = delete; 39 JobDelegate& operator=(const JobDelegate&) = delete; 40 41 ~JobDelegate(); 42 43 // Returns true if this thread *must* return from the worker task on the 44 // current thread ASAP. Workers should periodically invoke ShouldYield (or 45 // YieldIfNeeded()) as often as is reasonable. 46 bool ShouldYield(); 47 48 // If ShouldYield(), this will pause the current thread (allowing it to be 49 // replaced in the pool); no-ops otherwise. If it pauses, it will resume and 50 // return from this call whenever higher priority work completes. 51 // Prefer ShouldYield() over this (only use YieldIfNeeded() when unwinding 52 // the stack is not possible). 53 void YieldIfNeeded(); 54 55 // Notifies the scheduler that max concurrency was increased, and the number 56 // of worker should be adjusted accordingly. See PostJob() for more details. 57 void NotifyConcurrencyIncrease(); 58 59 // Returns a task_id unique among threads currently running this job, such 60 // that GetTaskId() < worker count. To achieve this, the same task_id may be 61 // reused by a different thread after a worker_task returns. 62 uint8_t GetTaskId(); 63 64 // Returns true if the current task is called from the thread currently 65 // running JobHandle::Join(). IsJoiningThread()66 bool IsJoiningThread() const { 67 return pooled_task_runner_delegate_ == nullptr; 68 } 69 70 private: 71 static constexpr uint8_t kInvalidTaskId = std::numeric_limits<uint8_t>::max(); 72 73 const raw_ptr<internal::JobTaskSource> task_source_; 74 const raw_ptr<internal::PooledTaskRunnerDelegate> 75 pooled_task_runner_delegate_; 76 uint8_t task_id_ = kInvalidTaskId; 77 78 #if DCHECK_IS_ON() 79 // Value returned by the last call to ShouldYield(). 80 bool last_should_yield_ = false; 81 #endif 82 }; 83 84 // Handle returned when posting a Job. Provides methods to control execution of 85 // the posted Job. To prevent deadlocks, JobHandle methods should never be 86 // called while holding a user lock. 87 class BASE_EXPORT JobHandle { 88 public: 89 JobHandle(); 90 91 JobHandle(const JobHandle&) = delete; 92 JobHandle& operator=(const JobHandle&) = delete; 93 94 // A job must either be joined, canceled or detached before the JobHandle is 95 // destroyed. 96 ~JobHandle(); 97 98 JobHandle(JobHandle&&); 99 JobHandle& operator=(JobHandle&&); 100 101 // Returns true if associated with a Job. 102 explicit operator bool() const { return task_source_ != nullptr; } 103 104 // Returns true if there's any work pending or any worker running. 105 bool IsActive() const; 106 107 // Update this Job's priority. 108 void UpdatePriority(TaskPriority new_priority); 109 110 // Notifies the scheduler that max concurrency was increased, and the number 111 // of workers should be adjusted accordingly. See PostJob() for more details. 112 void NotifyConcurrencyIncrease(); 113 114 // Contributes to the job on this thread. Doesn't return until all tasks have 115 // completed and max concurrency becomes 0. This also promotes this Job's 116 // priority to be at least as high as the calling thread's priority. When 117 // called immediately, prefer CreateJob(...).Join() over PostJob(...).Join() 118 // to avoid having too many workers scheduled for executing the workload. 119 void Join(); 120 121 // Forces all existing workers to yield ASAP. Waits until they have all 122 // returned from the Job's callback before returning. 123 void Cancel(); 124 125 // Forces all existing workers to yield ASAP but doesn’t wait for them. 126 // Warning, this is dangerous if the Job's callback is bound to or has access 127 // to state which may be deleted after this call. 128 void CancelAndDetach(); 129 130 // Can be invoked before ~JobHandle() to avoid waiting on the job completing. 131 void Detach(); 132 133 private: 134 friend class internal::JobTaskSource; 135 136 explicit JobHandle(scoped_refptr<internal::JobTaskSource> task_source); 137 138 scoped_refptr<internal::JobTaskSource> task_source_; 139 }; 140 141 // Callback used in PostJob() to control the maximum number of threads calling 142 // the worker task concurrently. 143 144 // Returns the maximum number of threads which may call a job's worker task 145 // concurrently. |worker_count| is the number of threads currently assigned to 146 // this job which some callers may need to determine their return value. 147 using MaxConcurrencyCallback = 148 RepeatingCallback<size_t(size_t /*worker_count*/)>; 149 150 // Posts a repeating |worker_task| with specific |traits| to run in parallel on 151 // base::ThreadPool. 152 // Returns a JobHandle associated with the Job, which can be joined, canceled or 153 // detached. 154 // ThreadPool APIs, including PostJob() and methods of the returned JobHandle, 155 // must never be called while holding a lock that could be acquired by 156 // |worker_task| or |max_concurrency_callback| -- that could result in a 157 // deadlock. This is because [1] |max_concurrency_callback| may be invoked while 158 // holding internal ThreadPool lock (A), hence |max_concurrency_callback| can 159 // only use a lock (B) if that lock is *never* held while calling back into a 160 // ThreadPool entry point from any thread (A=>B/B=>A deadlock) and [2] 161 // |worker_task| or |max_concurrency_callback| is invoked synchronously from 162 // JobHandle::Join() (A=>JobHandle::Join()=>A deadlock). 163 // To avoid scheduling overhead, |worker_task| should do as much work as 164 // possible in a loop when invoked, and JobDelegate::ShouldYield() should be 165 // periodically invoked to conditionally exit and let the scheduler prioritize 166 // work. 167 // 168 // A canonical implementation of |worker_task| looks like: 169 // void WorkerTask(JobDelegate* job_delegate) { 170 // while (!job_delegate->ShouldYield()) { 171 // auto work_item = worker_queue.TakeWorkItem(); // Smallest unit of work. 172 // if (!work_item) 173 // return: 174 // ProcessWork(work_item); 175 // } 176 // } 177 // 178 // |max_concurrency_callback| controls the maximum number of threads calling 179 // |worker_task| concurrently. |worker_task| is only invoked if the number of 180 // threads previously running |worker_task| was less than the value returned by 181 // |max_concurrency_callback|. In general, |max_concurrency_callback| should 182 // return the latest number of incomplete work items (smallest unit of work) 183 // left to processed. JobHandle/JobDelegate::NotifyConcurrencyIncrease() *must* 184 // be invoked shortly after |max_concurrency_callback| starts returning a value 185 // larger than previously returned values. This usually happens when new work 186 // items are added and the API user wants additional threads to invoke 187 // |worker_task| concurrently. The callbacks may be called concurrently on any 188 // thread until the job is complete. If the job handle is detached, the 189 // callbacks may still be called, so they must not access global state that 190 // could be destroyed. 191 // 192 // |traits| requirements: 193 // - base::ThreadPolicy must be specified if the priority of the task runner 194 // will ever be increased from BEST_EFFORT. 195 JobHandle BASE_EXPORT PostJob(const Location& from_here, 196 const TaskTraits& traits, 197 RepeatingCallback<void(JobDelegate*)> worker_task, 198 MaxConcurrencyCallback max_concurrency_callback); 199 200 // Creates and returns a JobHandle associated with a Job. Unlike PostJob(), this 201 // doesn't immediately schedules |worker_task| to run on base::ThreadPool 202 // workers; the Job is then scheduled by calling either 203 // NotifyConcurrencyIncrease() or Join(). 204 JobHandle BASE_EXPORT 205 CreateJob(const Location& from_here, 206 const TaskTraits& traits, 207 RepeatingCallback<void(JobDelegate*)> worker_task, 208 MaxConcurrencyCallback max_concurrency_callback); 209 210 } // namespace base 211 212 #endif // BASE_TASK_POST_JOB_H_ 213