• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_THREAD_POOL_TASK_SOURCE_H_
6 #define BASE_TASK_THREAD_POOL_TASK_SOURCE_H_
7 
8 #include <stddef.h>
9 
10 #include "base/base_export.h"
11 #include "base/containers/intrusive_heap.h"
12 #include "base/dcheck_is_on.h"
13 #include "base/memory/raw_ptr.h"
14 #include "base/memory/raw_ptr_exclusion.h"
15 #include "base/memory/ref_counted.h"
16 #include "base/sequence_token.h"
17 #include "base/task/common/checked_lock.h"
18 #include "base/task/task_traits.h"
19 #include "base/task/thread_pool/task.h"
20 #include "base/task/thread_pool/task_source_sort_key.h"
21 #include "base/threading/sequence_local_storage_map.h"
22 #include "base/time/time.h"
23 
24 namespace base {
25 namespace internal {
26 
27 class TaskTracker;
28 
29 enum class TaskSourceExecutionMode {
30   kParallel,
31   kSequenced,
32   kSingleThread,
33   kJob,
34   kMax = kJob,
35 };
36 
37 struct BASE_EXPORT ExecutionEnvironment {
38   SequenceToken token;
39   raw_ptr<SequenceLocalStorageMap> sequence_local_storage;
40 };
41 
42 // A TaskSource is a virtual class that provides a series of Tasks that must be
43 // executed immediately or in the future.
44 //
45 // When a task source has delayed tasks but no immediate tasks, the scheduler
46 // must call OnBecomeReady() after HasReadyTasks(now) == true, which is
47 // guaranteed once now >= GetDelayedSortKey().
48 //
49 // A task source is registered when it's ready to be added to the immediate
50 // queue. A task source is ready to be queued when either:
51 // 1- It has new tasks that can run concurrently as a result of external
52 //    operations, e.g. posting a new immediate task to an empty Sequence or
53 //    increasing max concurrency of a JobTaskSource;
54 // 2- A worker finished running a task from it and both DidProcessTask() and
55 //    WillReEnqueue() returned true; or
56 // 3- A worker is about to run a task from it and WillRunTask() returned
57 //    kAllowedNotSaturated.
58 // 4- A delayed task became ready and OnBecomeReady() returns true.
59 //
60 // A worker may perform the following sequence of operations on a
61 // RegisteredTaskSource after obtaining it from the queue:
62 // 1- Check whether a task can run with WillRunTask() (and register/enqueue the
63 //    task source again if not saturated).
64 // 2- (optional) Iff (1) determined that a task can run, access the next task
65 //    with TakeTask().
66 // 3- (optional) Execute the task.
67 // 4- Inform the task source that a task was processed with DidProcessTask(),
68 //    and re-enqueue the task source iff requested. The task source is ready to
69 //    run immediately iff WillReEnqueue() returns true.
70 // When a task source is registered multiple times, many overlapping chains of
71 // operations may run concurrently, as permitted by WillRunTask(). This allows
72 // tasks from the same task source to run in parallel.
73 // However, the following invariants are kept:
74 // - The number of workers concurrently running tasks never goes over the
75 //   intended concurrency.
76 // - If the task source has more tasks that can run concurrently, it must be
77 //   queued.
78 //
79 // Note: there is a known refcounted-ownership cycle in the ThreadPool
80 // architecture: TaskSource -> TaskRunner -> TaskSource -> ... This is okay so
81 // long as the other owners of TaskSource (PriorityQueue and WorkerThread in
82 // alternation and ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork()
83 // temporarily) keep running it (and taking Tasks from it as a result). A
84 // dangling reference cycle would only occur should they release their reference
85 // to it while it's not empty. In other words, it is only correct for them to
86 // release it when DidProcessTask() returns false.
87 //
88 // This class is thread-safe.
89 class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> {
90  public:
91   // Indicates whether WillRunTask() allows TakeTask() to be called on a
92   // RegisteredTaskSource.
93   enum class RunStatus {
94     // TakeTask() cannot be called.
95     kDisallowed,
96     // TakeTask() may called, and the TaskSource has not reached its maximum
97     // concurrency (i.e. the TaskSource still needs to be queued).
98     kAllowedNotSaturated,
99     // TakeTask() may called, and the TaskSource has reached its maximum
100     // concurrency (i.e. the TaskSource no longer needs to be queued).
101     kAllowedSaturated,
102   };
103 
104   // A Transaction can perform multiple operations atomically on a
105   // TaskSource. While a Transaction is alive, it is guaranteed that nothing
106   // else will access the TaskSource; the TaskSource's lock is held for the
107   // lifetime of the Transaction. No Transaction must be held when ~TaskSource()
108   // is called.
109   class BASE_EXPORT Transaction {
110    public:
111     Transaction(Transaction&& other);
112     Transaction(const Transaction&) = delete;
113     Transaction& operator=(const Transaction&) = delete;
114     ~Transaction();
115 
116     operator bool() const { return !!task_source_; }
117 
118     // Sets TaskSource priority to |priority|.
119     void UpdatePriority(TaskPriority priority);
120 
121     // Returns the traits of all Tasks in the TaskSource.
traits()122     TaskTraits traits() const { return task_source_->traits_; }
123 
task_source()124     TaskSource* task_source() const { return task_source_; }
125 
126     void Release();
127 
128    protected:
129     explicit Transaction(TaskSource* task_source);
130 
131    private:
132     friend class TaskSource;
133 
134     // This field is not a raw_ptr<> because it was filtered by the rewriter
135     // for: #union
136     RAW_PTR_EXCLUSION TaskSource* task_source_;
137   };
138 
139   // |traits| is metadata that applies to all Tasks in the TaskSource.
140   // |task_runner| is a reference to the TaskRunner feeding this TaskSource.
141   // |task_runner| can be nullptr only for tasks with no TaskRunner, in which
142   // case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the
143   // execution mode of |task_runner|.
144   TaskSource(const TaskTraits& traits,
145              TaskRunner* task_runner,
146              TaskSourceExecutionMode execution_mode);
147   TaskSource(const TaskSource&) = delete;
148   TaskSource& operator=(const TaskSource&) = delete;
149 
150   // Begins a Transaction. This method cannot be called on a thread which has an
151   // active TaskSource::Transaction.
152   [[nodiscard]] Transaction BeginTransaction();
153 
154   virtual ExecutionEnvironment GetExecutionEnvironment() = 0;
155 
156   // Thread-safe but the returned value may immediately be obsolete. As such
157   // this should only be used as a best-effort guess of how many more workers
158   // are needed. This may be called on an empty task source.
159   virtual size_t GetRemainingConcurrency() const = 0;
160 
161   // Returns a TaskSourceSortKey representing the priority of the TaskSource.
162   virtual TaskSourceSortKey GetSortKey() const = 0;
163   // Returns a Timeticks object representing the next delayed runtime of the
164   // TaskSource.
165   virtual TimeTicks GetDelayedSortKey() const = 0;
166   // Returns true if there are tasks ready to be executed. Thread-safe but the
167   // returned value may immediately be obsolete.
168   virtual bool HasReadyTasks(TimeTicks now) const = 0;
169   // Returns true if the TaskSource should be moved to the immediate queue
170   // due to ready delayed tasks. Note: Returns false if the TaskSource contains
171   // ready delayed tasks, but expects to already be in the immediate queue.
172   virtual bool OnBecomeReady() = 0;
173 
174   // Support for IntrusiveHeap in ThreadGroup::PriorityQueue.
175   void SetImmediateHeapHandle(const HeapHandle& handle);
176   void ClearImmediateHeapHandle();
GetImmediateHeapHandle()177   HeapHandle GetImmediateHeapHandle() const {
178     return immediate_pq_heap_handle_;
179   }
180 
immediate_heap_handle()181   HeapHandle immediate_heap_handle() const { return immediate_pq_heap_handle_; }
182 
183   // Support for IntrusiveHeap in ThreadGroup::DelayedPriorityQueue.
184   void SetDelayedHeapHandle(const HeapHandle& handle);
185   void ClearDelayedHeapHandle();
GetDelayedHeapHandle()186   HeapHandle GetDelayedHeapHandle() const { return delayed_pq_heap_handle_; }
187 
delayed_heap_handle()188   HeapHandle delayed_heap_handle() const { return delayed_pq_heap_handle_; }
189 
190   // Returns the shutdown behavior of all Tasks in the TaskSource. Can be
191   // accessed without a Transaction because it is never mutated.
shutdown_behavior()192   TaskShutdownBehavior shutdown_behavior() const {
193     return traits_.shutdown_behavior();
194   }
195   // Returns a racy priority of the TaskSource. Can be accessed without a
196   // Transaction but may return an outdated result.
priority_racy()197   TaskPriority priority_racy() const {
198     return priority_racy_.load(std::memory_order_relaxed);
199   }
200   // Returns the thread policy of the TaskSource. Can be accessed without a
201   // Transaction because it is never mutated.
thread_policy()202   ThreadPolicy thread_policy() const { return traits_.thread_policy(); }
203 
204   // A reference to TaskRunner is only retained between
205   // PushImmediateTask()/PushDelayedTask() and when DidProcessTask() returns
206   // false, guaranteeing it is safe to dereference this pointer. Otherwise, the
207   // caller should guarantee such TaskRunner still exists before dereferencing.
task_runner()208   TaskRunner* task_runner() const { return task_runner_; }
209 
execution_mode()210   TaskSourceExecutionMode execution_mode() const { return execution_mode_; }
211 
212   void ClearForTesting();
213 
214  protected:
215   virtual ~TaskSource();
216 
217   virtual RunStatus WillRunTask() = 0;
218 
219   // Implementations of TakeTask(), DidProcessTask(), WillReEnqueue(), and
220   // Clear() must ensure proper synchronization iff |transaction| is nullptr.
221   virtual Task TakeTask(TaskSource::Transaction* transaction) = 0;
222   virtual bool DidProcessTask(TaskSource::Transaction* transaction) = 0;
223   virtual bool WillReEnqueue(TimeTicks now,
224                              TaskSource::Transaction* transaction) = 0;
225 
226   // This may be called for each outstanding RegisteredTaskSource that's ready.
227   // The implementation needs to support this being called multiple times;
228   // unless it guarantees never to hand-out multiple RegisteredTaskSources that
229   // are concurrently ready.
230   virtual Task Clear(TaskSource::Transaction* transaction) = 0;
231 
232   // Sets TaskSource priority to |priority|.
233   void UpdatePriority(TaskPriority priority);
234 
235   // The TaskTraits of all Tasks in the TaskSource.
236   TaskTraits traits_;
237 
238   // The cached priority for atomic access.
239   std::atomic<TaskPriority> priority_racy_;
240 
241   // Synchronizes access to all members.
242   mutable CheckedLock lock_{UniversalPredecessor()};
243 
244  private:
245   friend class RefCountedThreadSafe<TaskSource>;
246   friend class RegisteredTaskSource;
247 
248   // The TaskSource's position in its current PriorityQueue. Access is protected
249   // by the PriorityQueue's lock.
250   HeapHandle immediate_pq_heap_handle_;
251 
252   // The TaskSource's position in its current DelayedPriorityQueue. Access is
253   // protected by the DelayedPriorityQueue's lock.
254   HeapHandle delayed_pq_heap_handle_;
255 
256   // A pointer to the TaskRunner that posts to this TaskSource, if any. The
257   // derived class is responsible for calling AddRef() when a TaskSource from
258   // which no Task is executing becomes non-empty and Release() when
259   // it becomes empty again (e.g. when DidProcessTask() returns false).
260   //
261   // In practise, this pointer is going to become dangling. See task_runner()
262   // comment.
263   raw_ptr<TaskRunner, DisableDanglingPtrDetection> task_runner_;
264 
265   TaskSourceExecutionMode execution_mode_;
266 };
267 
268 // Wrapper around TaskSource to signify the intent to queue and run it.
269 // RegisteredTaskSource can only be created with TaskTracker and may only be
270 // used by a single worker at a time. However, the same task source may be
271 // registered several times, spawning multiple RegisteredTaskSources. A
272 // RegisteredTaskSource resets to its initial state when WillRunTask() fails
273 // or after DidProcessTask() and WillReEnqueue(), so it can be used again.
274 class BASE_EXPORT RegisteredTaskSource {
275  public:
276   RegisteredTaskSource();
277   RegisteredTaskSource(std::nullptr_t);
278   RegisteredTaskSource(RegisteredTaskSource&& other) noexcept;
279   RegisteredTaskSource(const RegisteredTaskSource&) = delete;
280   RegisteredTaskSource& operator=(const RegisteredTaskSource&) = delete;
281   ~RegisteredTaskSource();
282 
283   RegisteredTaskSource& operator=(RegisteredTaskSource&& other);
284 
285   operator bool() const { return task_source_ != nullptr; }
286   TaskSource* operator->() const { return task_source_.get(); }
get()287   TaskSource* get() const { return task_source_.get(); }
288 
289   static RegisteredTaskSource CreateForTesting(
290       scoped_refptr<TaskSource> task_source,
291       TaskTracker* task_tracker = nullptr);
292 
293   // Can only be called if this RegisteredTaskSource is in its initial state.
294   // Returns the underlying task source. An Optional is used in preparation for
295   // the merge between ThreadPool and TaskQueueManager (in Blink).
296   // https://crbug.com/783309
297   scoped_refptr<TaskSource> Unregister();
298 
299   // Informs this TaskSource that the current worker would like to run a Task
300   // from it. Can only be called if in its initial state. Returns a RunStatus
301   // that indicates if the operation is allowed (TakeTask() can be called).
302   TaskSource::RunStatus WillRunTask();
303 
304   // Returns the next task to run from this TaskSource. This should be called
305   // only after WillRunTask() returned RunStatus::kAllowed*. |transaction| is
306   // optional and should only be provided if this operation is already part of
307   // a transaction.
308   [[nodiscard]] Task TakeTask(TaskSource::Transaction* transaction = nullptr);
309 
310   // Must be called after WillRunTask() or once the task was run if TakeTask()
311   // was called. This resets this RegisteredTaskSource to its initial state so
312   // that WillRunTask() may be called again. |transaction| is optional and
313   // should only be provided if this operation is already part of a transaction.
314   // Returns true if the TaskSource should be queued after this operation.
315   bool DidProcessTask(TaskSource::Transaction* transaction = nullptr);
316 
317   // Must be called iff DidProcessTask() previously returns true .
318   // |transaction| is optional and should only be provided if this
319   // operation is already part of a transaction. Returns true if the
320   // TaskSource is ready to run immediately.
321   bool WillReEnqueue(TimeTicks now,
322                      TaskSource::Transaction* transaction = nullptr);
323 
324   // Returns a task that clears this TaskSource to make it empty. |transaction|
325   // is optional and should only be provided if this operation is already part
326   // of a transaction.
327   [[nodiscard]] Task Clear(TaskSource::Transaction* transaction = nullptr);
328 
329  private:
330   friend class TaskTracker;
331   RegisteredTaskSource(scoped_refptr<TaskSource> task_source,
332                        TaskTracker* task_tracker);
333 
334 #if DCHECK_IS_ON()
335   // Indicates the step of a task execution chain.
336   enum class State {
337     kInitial,       // WillRunTask() may be called.
338     kReady,         // After WillRunTask() returned a valid RunStatus.
339   };
340 
341   State run_step_ = State::kInitial;
342 #endif  // DCHECK_IS_ON()
343 
344   scoped_refptr<TaskSource> task_source_;
345   // This field is not a raw_ptr<> because it was filtered by the rewriter for:
346   // #union
347   RAW_PTR_EXCLUSION TaskTracker* task_tracker_ = nullptr;
348 };
349 
350 // A pair of Transaction and RegisteredTaskSource. Useful to carry a
351 // RegisteredTaskSource with an associated Transaction.
352 // TODO(crbug.com/839091): Rename to RegisteredTaskSourceAndTransaction.
353 struct BASE_EXPORT TransactionWithRegisteredTaskSource {
354  public:
355   TransactionWithRegisteredTaskSource(RegisteredTaskSource task_source_in,
356                                       TaskSource::Transaction transaction_in);
357 
358   TransactionWithRegisteredTaskSource(
359       TransactionWithRegisteredTaskSource&& other) = default;
360   TransactionWithRegisteredTaskSource(
361       const TransactionWithRegisteredTaskSource&) = delete;
362   TransactionWithRegisteredTaskSource& operator=(
363       const TransactionWithRegisteredTaskSource&) = delete;
364   ~TransactionWithRegisteredTaskSource() = default;
365 
366   static TransactionWithRegisteredTaskSource FromTaskSource(
367       RegisteredTaskSource task_source_in);
368 
369   RegisteredTaskSource task_source;
370   TaskSource::Transaction transaction;
371 };
372 
373 struct BASE_EXPORT TaskSourceAndTransaction {
374  public:
375   TaskSourceAndTransaction(scoped_refptr<TaskSource> task_source_in,
376                            TaskSource::Transaction transaction_in);
377 
378   TaskSourceAndTransaction(TaskSourceAndTransaction&& other);
379   TaskSourceAndTransaction(const TaskSourceAndTransaction&) = delete;
380   TaskSourceAndTransaction& operator=(const TaskSourceAndTransaction&) = delete;
381   ~TaskSourceAndTransaction();
382 
383   static TaskSourceAndTransaction FromTaskSource(
384       scoped_refptr<TaskSource> task_source_in);
385 
386   scoped_refptr<TaskSource> task_source;
387   TaskSource::Transaction transaction;
388 };
389 
390 }  // namespace internal
391 }  // namespace base
392 
393 #endif  // BASE_TASK_THREAD_POOL_TASK_SOURCE_H_
394