• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_OLD_H_
6 #define BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_OLD_H_
7 
8 #include <stddef.h>
9 
10 #include <atomic>
11 #include <limits>
12 #include <memory>
13 #include <utility>
14 
15 #include "base/base_export.h"
16 #include "base/functional/callback.h"
17 #include "base/memory/raw_ptr.h"
18 #include "base/synchronization/condition_variable.h"
19 #include "base/task/common/checked_lock.h"
20 #include "base/task/post_job.h"
21 #include "base/task/task_traits.h"
22 #include "base/task/thread_pool/job_task_source_interface.h"
23 #include "base/task/thread_pool/task.h"
24 #include "base/task/thread_pool/task_source.h"
25 #include "base/task/thread_pool/task_source_sort_key.h"
26 
27 namespace base {
28 namespace internal {
29 
30 class PooledTaskRunnerDelegate;
31 
32 // A JobTaskSource generates many Tasks from a single RepeatingClosure.
33 //
34 // Derived classes control the intended concurrency with GetMaxConcurrency().
35 class BASE_EXPORT JobTaskSourceOld : public JobTaskSource {
36  public:
37   JobTaskSourceOld(const Location& from_here,
38                    const TaskTraits& traits,
39                    RepeatingCallback<void(JobDelegate*)> worker_task,
40                    MaxConcurrencyCallback max_concurrency_callback,
41                    PooledTaskRunnerDelegate* delegate);
42   JobTaskSourceOld(const JobTaskSource&) = delete;
43   JobTaskSourceOld& operator=(const JobTaskSourceOld&) = delete;
44 
45   // Called before the task source is enqueued to initialize task metadata.
46   void WillEnqueue(int sequence_num, TaskAnnotator& annotator) override;
47 
48   // Notifies this task source that max concurrency was increased, and the
49   // number of worker should be adjusted.
50   bool NotifyConcurrencyIncrease() override;
51 
52   // Informs this JobTaskSource that the current thread would like to join and
53   // contribute to running |worker_task|. Returns true if the joining thread can
54   // contribute (RunJoinTask() can be called), or false if joining was completed
55   // and all other workers returned because either there's no work remaining or
56   // Job was cancelled.
57   bool WillJoin() override;
58 
59   // Contributes to running |worker_task| and returns true if the joining thread
60   // can contribute again (RunJoinTask() can be called again), or false if
61   // joining was completed and all other workers returned because either there's
62   // no work remaining or Job was cancelled. This should be called only after
63   // WillJoin() or RunJoinTask() previously returned true.
64   bool RunJoinTask() override;
65 
66   // Cancels this JobTaskSource, causing all workers to yield and WillRunTask()
67   // to return RunStatus::kDisallowed.
68   void Cancel(TaskSource::Transaction* transaction = nullptr) override;
69 
70   // TaskSource:
71   ExecutionEnvironment GetExecutionEnvironment() override;
72   size_t GetRemainingConcurrency() const override;
73   TaskSourceSortKey GetSortKey() const override;
74   TimeTicks GetDelayedSortKey() const override;
75   bool HasReadyTasks(TimeTicks now) const override;
76 
77   bool IsActive() const override;
78   size_t GetWorkerCount() const override;
79 
80   // Returns the maximum number of tasks from this TaskSource that can run
81   // concurrently.
82   size_t GetMaxConcurrency() const override;
83 
84   uint8_t AcquireTaskId() override;
85   void ReleaseTaskId(uint8_t task_id) override;
86 
87   // Returns true if a worker should return from the worker task on the current
88   // thread ASAP.
89   bool ShouldYield() override;
90 
91   PooledTaskRunnerDelegate* GetDelegate() const override;
92 
93  private:
94   // Atomic internal state to track the number of workers running a task from
95   // this JobTaskSource and whether this JobTaskSource is canceled. All
96   // operations are performed with std::memory_order_relaxed as State is only
97   // ever modified under a lock or read atomically (optimistic read).
98   class State {
99    public:
100     static constexpr uint32_t kCanceledMask = 1;
101     static constexpr int kWorkerCountBitOffset = 1;
102     static constexpr uint32_t kWorkerCountIncrement = 1
103                                                       << kWorkerCountBitOffset;
104 
105     struct Value {
worker_countValue106       uint8_t worker_count() const {
107         return static_cast<uint8_t>(value >> kWorkerCountBitOffset);
108       }
109       // Returns true if canceled.
is_canceledValue110       bool is_canceled() const { return value & kCanceledMask; }
111 
112       uint32_t value;
113     };
114 
115     State();
116     ~State();
117 
118     // Sets as canceled. Returns the state
119     // before the operation.
120     Value Cancel();
121 
122     // Increments the worker count by 1. Returns the state before the operation.
123     Value IncrementWorkerCount();
124 
125     // Decrements the worker count by 1. Returns the state before the operation.
126     Value DecrementWorkerCount();
127 
128     // Loads and returns the state.
129     Value Load() const;
130 
131    private:
132     std::atomic<uint32_t> value_{0};
133   };
134 
135   // Atomic flag that indicates if the joining thread is currently waiting on
136   // another worker to yield or to signal.
137   class JoinFlag {
138    public:
139     static constexpr uint32_t kNotWaiting = 0;
140     static constexpr uint32_t kWaitingForWorkerToSignal = 1;
141     static constexpr uint32_t kWaitingForWorkerToYield = 3;
142     // kWaitingForWorkerToYield is 3 because the impl relies on the following
143     // property.
144     static_assert((kWaitingForWorkerToYield & kWaitingForWorkerToSignal) ==
145                       kWaitingForWorkerToSignal,
146                   "");
147 
148     JoinFlag();
149     ~JoinFlag();
150 
151     // Returns true if the status is not kNotWaiting, using
152     // std::memory_order_relaxed.
IsWaiting()153     bool IsWaiting() {
154       return value_.load(std::memory_order_relaxed) != kNotWaiting;
155     }
156 
157     // Resets the status as kNotWaiting  using std::memory_order_relaxed.
158     void Reset();
159 
160     // Sets the status as kWaitingForWorkerToYield using
161     // std::memory_order_relaxed.
162     void SetWaiting();
163 
164     // If the flag is kWaitingForWorkerToYield, returns true indicating that the
165     // worker should yield, and atomically updates to kWaitingForWorkerToSignal
166     // (using std::memory_order_relaxed) to ensure that a single worker yields
167     // in response to SetWaiting().
168     bool ShouldWorkerYield();
169 
170     // If the flag is kWaiting*, returns true indicating that the worker should
171     // signal, and atomically updates to kNotWaiting (using
172     // std::memory_order_relaxed) to ensure that a single worker signals in
173     // response to SetWaiting().
174     bool ShouldWorkerSignal();
175 
176    private:
177     std::atomic<uint32_t> value_{kNotWaiting};
178   };
179 
180   ~JobTaskSourceOld() override;
181 
182   // Called from the joining thread. Waits for the worker count to be below or
183   // equal to max concurrency (will happen when a worker calls
184   // DidProcessTask()). Returns true if the joining thread should run a task, or
185   // false if joining was completed and all other workers returned because
186   // either there's no work remaining or Job was cancelled.
187   bool WaitForParticipationOpportunity() EXCLUSIVE_LOCKS_REQUIRED(worker_lock_);
188 
189   size_t GetMaxConcurrency(size_t worker_count) const;
190 
191   // TaskSource:
192   RunStatus WillRunTask() override;
193   Task TakeTask(TaskSource::Transaction* transaction) override;
194   absl::optional<Task> Clear(TaskSource::Transaction* transaction) override;
195   bool DidProcessTask(TaskSource::Transaction* transaction) override;
196   bool WillReEnqueue(TimeTicks now,
197                      TaskSource::Transaction* transaction) override;
198   bool OnBecomeReady() override;
199 
200   // Synchronizes access to workers state.
201   mutable CheckedLock worker_lock_{UniversalSuccessor()};
202 
203   // Current atomic state (atomic despite the lock to allow optimistic reads
204   // and cancellation without the lock).
205   State state_ GUARDED_BY(worker_lock_);
206   // Normally, |join_flag_| is protected by |lock_|, except in ShouldYield()
207   // hence the use of atomics.
208   JoinFlag join_flag_ GUARDED_BY(worker_lock_);
209   // Signaled when |join_flag_| is kWaiting* and a worker returns.
210   std::unique_ptr<ConditionVariable> worker_released_condition_
211       GUARDED_BY(worker_lock_);
212 
213   std::atomic<uint32_t> assigned_task_ids_{0};
214 
215   RepeatingCallback<size_t(size_t)> max_concurrency_callback_;
216 
217   // Worker task set by the job owner.
218   RepeatingCallback<void(JobDelegate*)> worker_task_;
219   // Task returned from TakeTask(), that calls |worker_task_| internally.
220   RepeatingClosure primary_task_;
221 
222   TaskMetadata task_metadata_;
223 
224   const TimeTicks ready_time_;
225   raw_ptr<PooledTaskRunnerDelegate, LeakedDanglingUntriaged> delegate_;
226 };
227 
228 }  // namespace internal
229 }  // namespace base
230 
231 #endif  // BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_OLD_H_
232