• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_POST_JOB_H_
6 #define BASE_TASK_POST_JOB_H_
7 
8 #include <limits>
9 
10 #include "base/base_export.h"
11 #include "base/dcheck_is_on.h"
12 #include "base/functional/callback.h"
13 #include "base/location.h"
14 #include "base/memory/raw_ptr.h"
15 
16 namespace base {
17 namespace internal {
18 class JobTaskSource;
19 class PooledTaskRunnerDelegate;
20 }
21 
22 class TaskTraits;
23 enum class TaskPriority : uint8_t;
24 
25 // Delegate that's passed to Job's worker task, providing an entry point to
26 // communicate with the scheduler. To prevent deadlocks, JobDelegate methods
27 // should never be called while holding a user lock.
28 class BASE_EXPORT JobDelegate {
29  public:
30   // A JobDelegate is instantiated for each worker task that is run.
31   // |task_source| is the task source whose worker task is running with this
32   // delegate and |pooled_task_runner_delegate| is used by ShouldYield() to
33   // check whether the pool wants this worker task to yield (null if this worker
34   // should never yield -- e.g. when the main thread is a worker).
35   JobDelegate(internal::JobTaskSource* task_source,
36               internal::PooledTaskRunnerDelegate* pooled_task_runner_delegate);
37 
38   JobDelegate(const JobDelegate&) = delete;
39   JobDelegate& operator=(const JobDelegate&) = delete;
40 
41   ~JobDelegate();
42 
43   // Returns true if this thread *must* return from the worker task on the
44   // current thread ASAP. Workers should periodically invoke ShouldYield (or
45   // YieldIfNeeded()) as often as is reasonable.
46   bool ShouldYield();
47 
48   // If ShouldYield(), this will pause the current thread (allowing it to be
49   // replaced in the pool); no-ops otherwise. If it pauses, it will resume and
50   // return from this call whenever higher priority work completes.
51   // Prefer ShouldYield() over this (only use YieldIfNeeded() when unwinding
52   // the stack is not possible).
53   void YieldIfNeeded();
54 
55   // Notifies the scheduler that max concurrency was increased, and the number
56   // of worker should be adjusted accordingly. See PostJob() for more details.
57   void NotifyConcurrencyIncrease();
58 
59   // Returns a task_id unique among threads currently running this job, such
60   // that GetTaskId() < worker count. To achieve this, the same task_id may be
61   // reused by a different thread after a worker_task returns.
62   uint8_t GetTaskId();
63 
64   // Returns true if the current task is called from the thread currently
65   // running JobHandle::Join().
IsJoiningThread()66   bool IsJoiningThread() const {
67     return pooled_task_runner_delegate_ == nullptr;
68   }
69 
70  private:
71   static constexpr uint8_t kInvalidTaskId = std::numeric_limits<uint8_t>::max();
72 
73   const raw_ptr<internal::JobTaskSource> task_source_;
74   const raw_ptr<internal::PooledTaskRunnerDelegate>
75       pooled_task_runner_delegate_;
76   uint8_t task_id_ = kInvalidTaskId;
77 
78 #if DCHECK_IS_ON()
79   // Value returned by the last call to ShouldYield().
80   bool last_should_yield_ = false;
81 #endif
82 };
83 
84 // Handle returned when posting a Job. Provides methods to control execution of
85 // the posted Job. To prevent deadlocks, JobHandle methods should never be
86 // called while holding a user lock.
87 class BASE_EXPORT JobHandle {
88  public:
89   JobHandle();
90 
91   JobHandle(const JobHandle&) = delete;
92   JobHandle& operator=(const JobHandle&) = delete;
93 
94   // A job must either be joined, canceled or detached before the JobHandle is
95   // destroyed.
96   ~JobHandle();
97 
98   JobHandle(JobHandle&&);
99   JobHandle& operator=(JobHandle&&);
100 
101   // Returns true if associated with a Job.
102   explicit operator bool() const { return task_source_ != nullptr; }
103 
104   // Returns true if there's any work pending or any worker running.
105   bool IsActive() const;
106 
107   // Update this Job's priority.
108   void UpdatePriority(TaskPriority new_priority);
109 
110   // Notifies the scheduler that max concurrency was increased, and the number
111   // of workers should be adjusted accordingly. See PostJob() for more details.
112   void NotifyConcurrencyIncrease();
113 
114   // Contributes to the job on this thread. Doesn't return until all tasks have
115   // completed and max concurrency becomes 0. This also promotes this Job's
116   // priority to be at least as high as the calling thread's priority. When
117   // called immediately, prefer CreateJob(...).Join() over PostJob(...).Join()
118   // to avoid having too many workers scheduled for executing the workload.
119   void Join();
120 
121   // Forces all existing workers to yield ASAP. Waits until they have all
122   // returned from the Job's callback before returning.
123   void Cancel();
124 
125   // Forces all existing workers to yield ASAP but doesn’t wait for them.
126   // Warning, this is dangerous if the Job's callback is bound to or has access
127   // to state which may be deleted after this call.
128   void CancelAndDetach();
129 
130   // Can be invoked before ~JobHandle() to avoid waiting on the job completing.
131   void Detach();
132 
133  private:
134   friend class internal::JobTaskSource;
135 
136   explicit JobHandle(scoped_refptr<internal::JobTaskSource> task_source);
137 
138   scoped_refptr<internal::JobTaskSource> task_source_;
139 };
140 
141 // Callback used in PostJob() to control the maximum number of threads calling
142 // the worker task concurrently.
143 
144 // Returns the maximum number of threads which may call a job's worker task
145 // concurrently. |worker_count| is the number of threads currently assigned to
146 // this job which some callers may need to determine their return value.
147 using MaxConcurrencyCallback =
148     RepeatingCallback<size_t(size_t /*worker_count*/)>;
149 
150 // Posts a repeating |worker_task| with specific |traits| to run in parallel on
151 // base::ThreadPool.
152 // Returns a JobHandle associated with the Job, which can be joined, canceled or
153 // detached.
154 // ThreadPool APIs, including PostJob() and methods of the returned JobHandle,
155 // must never be called while holding a lock that could be acquired by
156 // |worker_task| or |max_concurrency_callback| -- that could result in a
157 // deadlock. This is because [1] |max_concurrency_callback| may be invoked while
158 // holding internal ThreadPool lock (A), hence |max_concurrency_callback| can
159 // only use a lock (B) if that lock is *never* held while calling back into a
160 // ThreadPool entry point from any thread (A=>B/B=>A deadlock) and [2]
161 // |worker_task| or |max_concurrency_callback| is invoked synchronously from
162 // JobHandle::Join() (A=>JobHandle::Join()=>A deadlock).
163 // To avoid scheduling overhead, |worker_task| should do as much work as
164 // possible in a loop when invoked, and JobDelegate::ShouldYield() should be
165 // periodically invoked to conditionally exit and let the scheduler prioritize
166 // work.
167 //
168 // A canonical implementation of |worker_task| looks like:
169 //   void WorkerTask(JobDelegate* job_delegate) {
170 //     while (!job_delegate->ShouldYield()) {
171 //       auto work_item = worker_queue.TakeWorkItem(); // Smallest unit of work.
172 //       if (!work_item)
173 //         return:
174 //       ProcessWork(work_item);
175 //     }
176 //   }
177 //
178 // |max_concurrency_callback| controls the maximum number of threads calling
179 // |worker_task| concurrently. |worker_task| is only invoked if the number of
180 // threads previously running |worker_task| was less than the value returned by
181 // |max_concurrency_callback|. In general, |max_concurrency_callback| should
182 // return the latest number of incomplete work items (smallest unit of work)
183 // left to processed. JobHandle/JobDelegate::NotifyConcurrencyIncrease() *must*
184 // be invoked shortly after |max_concurrency_callback| starts returning a value
185 // larger than previously returned values. This usually happens when new work
186 // items are added and the API user wants additional threads to invoke
187 // |worker_task| concurrently. The callbacks may be called concurrently on any
188 // thread until the job is complete. If the job handle is detached, the
189 // callbacks may still be called, so they must not access global state that
190 // could be destroyed.
191 //
192 // |traits| requirements:
193 // - base::ThreadPolicy must be specified if the priority of the task runner
194 //   will ever be increased from BEST_EFFORT.
195 JobHandle BASE_EXPORT PostJob(const Location& from_here,
196                               const TaskTraits& traits,
197                               RepeatingCallback<void(JobDelegate*)> worker_task,
198                               MaxConcurrencyCallback max_concurrency_callback);
199 
200 // Creates and returns a JobHandle associated with a Job. Unlike PostJob(), this
201 // doesn't immediately schedules |worker_task| to run on base::ThreadPool
202 // workers; the Job is then scheduled by calling either
203 // NotifyConcurrencyIncrease() or Join().
204 JobHandle BASE_EXPORT
205 CreateJob(const Location& from_here,
206           const TaskTraits& traits,
207           RepeatingCallback<void(JobDelegate*)> worker_task,
208           MaxConcurrencyCallback max_concurrency_callback);
209 
210 }  // namespace base
211 
212 #endif  // BASE_TASK_POST_JOB_H_
213