• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
6 #define BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
7 
8 #include <stddef.h>
9 
10 #include <atomic>
11 #include <limits>
12 #include <memory>
13 #include <utility>
14 
15 #include "base/base_export.h"
16 #include "base/functional/callback.h"
17 #include "base/memory/raw_ptr.h"
18 #include "base/synchronization/condition_variable.h"
19 #include "base/task/common/checked_lock.h"
20 #include "base/task/post_job.h"
21 #include "base/task/task_traits.h"
22 #include "base/task/thread_pool/task.h"
23 #include "base/task/thread_pool/task_source.h"
24 #include "base/task/thread_pool/task_source_sort_key.h"
25 
26 namespace base {
27 namespace internal {
28 
29 class PooledTaskRunnerDelegate;
30 
31 // A JobTaskSource generates many Tasks from a single RepeatingClosure.
32 //
33 // Derived classes control the intended concurrency with GetMaxConcurrency().
34 class BASE_EXPORT JobTaskSource : public TaskSource {
35  public:
36   JobTaskSource(const Location& from_here,
37                 const TaskTraits& traits,
38                 RepeatingCallback<void(JobDelegate*)> worker_task,
39                 MaxConcurrencyCallback max_concurrency_callback,
40                 PooledTaskRunnerDelegate* delegate);
41   JobTaskSource(const JobTaskSource&) = delete;
42   JobTaskSource& operator=(const JobTaskSource&) = delete;
43 
CreateJobHandle(scoped_refptr<internal::JobTaskSource> task_source)44   static JobHandle CreateJobHandle(
45       scoped_refptr<internal::JobTaskSource> task_source) {
46     return JobHandle(std::move(task_source));
47   }
48 
49   // Notifies this task source that max concurrency was increased, and the
50   // number of worker should be adjusted.
51   void NotifyConcurrencyIncrease();
52 
53   // Informs this JobTaskSource that the current thread would like to join and
54   // contribute to running |worker_task|. Returns true if the joining thread can
55   // contribute (RunJoinTask() can be called), or false if joining was completed
56   // and all other workers returned because either there's no work remaining or
57   // Job was cancelled.
58   bool WillJoin();
59 
60   // Contributes to running |worker_task| and returns true if the joining thread
61   // can contribute again (RunJoinTask() can be called again), or false if
62   // joining was completed and all other workers returned because either there's
63   // no work remaining or Job was cancelled. This should be called only after
64   // WillJoin() or RunJoinTask() previously returned true.
65   bool RunJoinTask();
66 
67   // Cancels this JobTaskSource, causing all workers to yield and WillRunTask()
68   // to return RunStatus::kDisallowed.
69   void Cancel(TaskSource::Transaction* transaction = nullptr);
70 
71   // TaskSource:
72   ExecutionEnvironment GetExecutionEnvironment() override;
73   size_t GetRemainingConcurrency() const override;
74   TaskSourceSortKey GetSortKey() const override;
75   TimeTicks GetDelayedSortKey() const override;
76   bool HasReadyTasks(TimeTicks now) const override;
77 
78   bool IsActive() const;
79   size_t GetWorkerCount() const;
80 
81   // Returns the maximum number of tasks from this TaskSource that can run
82   // concurrently.
83   size_t GetMaxConcurrency() const;
84 
85   uint8_t AcquireTaskId();
86   void ReleaseTaskId(uint8_t task_id);
87 
88   // Returns true if a worker should return from the worker task on the current
89   // thread ASAP.
90   bool ShouldYield();
91 
delegate()92   PooledTaskRunnerDelegate* delegate() const { return delegate_; }
93 
94  private:
95   // Atomic internal state to track the number of workers running a task from
96   // this JobTaskSource and whether this JobTaskSource is canceled. All
97   // operations are performed with std::memory_order_relaxed as State is only
98   // ever modified under a lock or read atomically (optimistic read).
99   class State {
100    public:
101     static constexpr uint32_t kCanceledMask = 1;
102     static constexpr int kWorkerCountBitOffset = 1;
103     static constexpr uint32_t kWorkerCountIncrement = 1
104                                                       << kWorkerCountBitOffset;
105 
106     struct Value {
worker_countValue107       uint8_t worker_count() const {
108         return static_cast<uint8_t>(value >> kWorkerCountBitOffset);
109       }
110       // Returns true if canceled.
is_canceledValue111       bool is_canceled() const { return value & kCanceledMask; }
112 
113       uint32_t value;
114     };
115 
116     State();
117     ~State();
118 
119     // Sets as canceled. Returns the state
120     // before the operation.
121     Value Cancel();
122 
123     // Increments the worker count by 1. Returns the state before the operation.
124     Value IncrementWorkerCount();
125 
126     // Decrements the worker count by 1. Returns the state before the operation.
127     Value DecrementWorkerCount();
128 
129     // Loads and returns the state.
130     Value Load() const;
131 
132    private:
133     std::atomic<uint32_t> value_{0};
134   };
135 
136   // Atomic flag that indicates if the joining thread is currently waiting on
137   // another worker to yield or to signal.
138   class JoinFlag {
139    public:
140     static constexpr uint32_t kNotWaiting = 0;
141     static constexpr uint32_t kWaitingForWorkerToSignal = 1;
142     static constexpr uint32_t kWaitingForWorkerToYield = 3;
143     // kWaitingForWorkerToYield is 3 because the impl relies on the following
144     // property.
145     static_assert((kWaitingForWorkerToYield & kWaitingForWorkerToSignal) ==
146                       kWaitingForWorkerToSignal,
147                   "");
148 
149     JoinFlag();
150     ~JoinFlag();
151 
152     // Returns true if the status is not kNotWaiting, using
153     // std::memory_order_relaxed.
IsWaiting()154     bool IsWaiting() {
155       return value_.load(std::memory_order_relaxed) != kNotWaiting;
156     }
157 
158     // Resets the status as kNotWaiting  using std::memory_order_relaxed.
159     void Reset();
160 
161     // Sets the status as kWaitingForWorkerToYield using
162     // std::memory_order_relaxed.
163     void SetWaiting();
164 
165     // If the flag is kWaitingForWorkerToYield, returns true indicating that the
166     // worker should yield, and atomically updates to kWaitingForWorkerToSignal
167     // (using std::memory_order_relaxed) to ensure that a single worker yields
168     // in response to SetWaiting().
169     bool ShouldWorkerYield();
170 
171     // If the flag is kWaiting*, returns true indicating that the worker should
172     // signal, and atomically updates to kNotWaiting (using
173     // std::memory_order_relaxed) to ensure that a single worker signals in
174     // response to SetWaiting().
175     bool ShouldWorkerSignal();
176 
177    private:
178     std::atomic<uint32_t> value_{kNotWaiting};
179   };
180 
181   ~JobTaskSource() override;
182 
183   // Called from the joining thread. Waits for the worker count to be below or
184   // equal to max concurrency (will happen when a worker calls
185   // DidProcessTask()). Returns true if the joining thread should run a task, or
186   // false if joining was completed and all other workers returned because
187   // either there's no work remaining or Job was cancelled.
188   bool WaitForParticipationOpportunity() EXCLUSIVE_LOCKS_REQUIRED(worker_lock_);
189 
190   size_t GetMaxConcurrency(size_t worker_count) const;
191 
192   // TaskSource:
193   RunStatus WillRunTask() override;
194   Task TakeTask(TaskSource::Transaction* transaction) override;
195   Task Clear(TaskSource::Transaction* transaction) override;
196   bool DidProcessTask(TaskSource::Transaction* transaction) override;
197   bool WillReEnqueue(TimeTicks now,
198                      TaskSource::Transaction* transaction) override;
199   bool OnBecomeReady() override;
200 
201   // Synchronizes access to workers state.
202   mutable CheckedLock worker_lock_{UniversalSuccessor()};
203 
204   // Current atomic state (atomic despite the lock to allow optimistic reads
205   // and cancellation without the lock).
206   State state_ GUARDED_BY(worker_lock_);
207   // Normally, |join_flag_| is protected by |lock_|, except in ShouldYield()
208   // hence the use of atomics.
209   JoinFlag join_flag_ GUARDED_BY(worker_lock_);
210   // Signaled when |join_flag_| is kWaiting* and a worker returns.
211   std::unique_ptr<ConditionVariable> worker_released_condition_
212       GUARDED_BY(worker_lock_);
213 
214   std::atomic<uint32_t> assigned_task_ids_{0};
215 
216   const Location from_here_;
217   RepeatingCallback<size_t(size_t)> max_concurrency_callback_;
218 
219   // Worker task set by the job owner.
220   RepeatingCallback<void(JobDelegate*)> worker_task_;
221   // Task returned from TakeTask(), that calls |worker_task_| internally.
222   RepeatingClosure primary_task_;
223 
224   const TimeTicks ready_time_;
225   raw_ptr<PooledTaskRunnerDelegate> delegate_;
226 };
227 
228 }  // namespace internal
229 }  // namespace base
230 
231 #endif  // BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
232