• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
6 #define BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
7 
8 #include <stddef.h>
9 
10 #include <atomic>
11 #include <cstdint>
12 #include <utility>
13 
14 #include "base/base_export.h"
15 #include "base/functional/callback.h"
16 #include "base/memory/raw_ptr.h"
17 #include "base/synchronization/waitable_event.h"
18 #include "base/task/common/checked_lock.h"
19 #include "base/task/common/task_annotator.h"
20 #include "base/task/post_job.h"
21 #include "base/task/task_traits.h"
22 #include "base/task/thread_pool/job_task_source_interface.h"
23 #include "base/task/thread_pool/task.h"
24 #include "base/task/thread_pool/task_source.h"
25 #include "base/task/thread_pool/task_source_sort_key.h"
26 
27 namespace base::internal {
28 
29 class PooledTaskRunnerDelegate;
30 
31 // A JobTaskSource generates many Tasks from a single RepeatingClosure.
32 //
33 // Derived classes control the intended concurrency with GetMaxConcurrency().
34 class BASE_EXPORT JobTaskSourceNew : public JobTaskSource {
35  public:
36   JobTaskSourceNew(const Location& from_here,
37                    const TaskTraits& traits,
38                    RepeatingCallback<void(JobDelegate*)> worker_task,
39                    MaxConcurrencyCallback max_concurrency_callback,
40                    PooledTaskRunnerDelegate* delegate);
41   JobTaskSourceNew(const JobTaskSource&) = delete;
42   JobTaskSourceNew& operator=(const JobTaskSourceNew&) = delete;
43 
44   // Called before the task source is enqueued to initialize task metadata.
45   void WillEnqueue(int sequence_num, TaskAnnotator& annotator) override;
46 
47   // Notifies this task source that max concurrency increased. Returns false iff
48   // there was an unsuccessful attempt to enqueue the task source.
49   bool NotifyConcurrencyIncrease() override;
50 
51   // Informs this JobTaskSource that the current thread would like to join and
52   // contribute to running |worker_task|. Returns true if the joining thread can
53   // contribute (RunJoinTask() can be called), or false if joining was completed
54   // and all other workers returned because either there's no work remaining or
55   // Job was cancelled.
56   bool WillJoin() override;
57 
58   // Contributes to running |worker_task| and returns true if the joining thread
59   // can contribute again (RunJoinTask() can be called again), or false if
60   // joining was completed and all other workers returned because either there's
61   // no work remaining or Job was cancelled. This should be called only after
62   // WillJoin() or RunJoinTask() previously returned true.
63   bool RunJoinTask() override;
64 
65   // Cancels this JobTaskSource, causing all workers to yield and WillRunTask()
66   // to return RunStatus::kDisallowed.
67   void Cancel(TaskSource::Transaction* transaction = nullptr) override;
68 
69   // TaskSource:
70   ExecutionEnvironment GetExecutionEnvironment() override;
71   size_t GetRemainingConcurrency() const override;
72   TaskSourceSortKey GetSortKey() const override;
73   TimeTicks GetDelayedSortKey() const override;
74   bool HasReadyTasks(TimeTicks now) const override;
75 
76   bool IsActive() const override;
77   size_t GetWorkerCount() const override;
78 
79   // Returns the maximum number of tasks from this TaskSource that can run
80   // concurrently.
81   size_t GetMaxConcurrency() const override;
82 
83   uint8_t AcquireTaskId() override;
84   void ReleaseTaskId(uint8_t task_id) override;
85 
86   // Returns true if a worker should return from the worker task on the current
87   // thread ASAP.
88   bool ShouldYield() override;
89 
90   PooledTaskRunnerDelegate* GetDelegate() const override;
91 
92  private:
93   // Atomic variable to track job state.
94   class State {
95    public:
96     // When set, the job is canceled.
97     static constexpr uint32_t kCanceledMask = 1 << 0;
98     // When set, the Join()'ing thread wants to be signaled when worker count
99     // is decremented or capacity is created by a max concurrency increase.
100     static constexpr uint32_t kSignalJoinMask = 1 << 1;
101     // When set, the job is queued. Note: The job may be queued when this is not
102     // set, see details in JobTaskSource::State::ExitWillRunTask().
103     static constexpr uint32_t kQueuedMask = 1 << 2;
104     // When set, WillRunTask() is not running *or* WillRunTask() is running and
105     // there was a request to keep the job queued (via
106     // `ShouldQueueUponCapacityIncrease()` or `WillReenqueue()`).
107     static constexpr uint32_t kOutsideWillRunTaskOrMustReenqueueMask = 1 << 3;
108     // Offset for the number of workers running the job.
109     static constexpr int kWorkerCountBitOffset = 4;
110     static constexpr uint32_t kWorkerCountIncrement = 1
111                                                       << kWorkerCountBitOffset;
112 
113     struct Value {
worker_countValue114       uint8_t worker_count() const {
115         return static_cast<uint8_t>(value >> kWorkerCountBitOffset);
116       }
canceledValue117       bool canceled() const { return value & kCanceledMask; }
signal_joinValue118       bool signal_join() const { return value & kSignalJoinMask; }
queuedValue119       bool queued() const { return value & kQueuedMask; }
outside_will_run_task_or_must_reenqueueValue120       bool outside_will_run_task_or_must_reenqueue() const {
121         return value & kOutsideWillRunTaskOrMustReenqueueMask;
122       }
123 
124       uint32_t value;
125     };
126 
127     State();
128     ~State();
129 
130     // Sets as canceled. Returns the state before the operation.
131     Value Cancel();
132 
133     // Increments the worker count by 1. Returns the state before the operation.
134     //
135     // This requires holding `increment_worker_count_lock()`, to allow
136     // WaitForParticipationOpportunity() to check worker count and apply changes
137     // with a guarantee that it hasn't been incremented in between (worker count
138     // could still be decremented while the lock is held).
139     Value IncrementWorkerCount()
140         EXCLUSIVE_LOCKS_REQUIRED(increment_worker_count_lock());
141 
142     // Decrements the worker count by 1. Returns the state before the operation.
143     Value DecrementWorkerCount();
144 
145     // Requests to signal the Join()'ing thread when worker count is
146     // decremented or capacity is created by increasing "max concurrency".
147     // Returns the state before the operation.
148     Value RequestSignalJoin();
149 
150     // Returns whether the Join()'ing thread should be signaled when worker
151     // count is decremented or capacity is created by increasing "max
152     // concurrency". Resets the bit so that this won't return true until
153     // `RequestSignalJoin()` is called again.
154     bool FetchAndResetRequestSignalJoin();
155 
156     // Indicates that max capacity was increased above the number of workers.
157     // Returns true iff the job should be queued.
158     bool ShouldQueueUponCapacityIncrease();
159 
160     // Indicates that WillRunTask() was entered. Returns the previous state.
161     Value EnterWillRunTask();
162 
163     // Indicates that WillRunTask() will exit. `saturated` is true iff
164     // `WillRunTask()` determined that max concurrency is reached. Returns true
165     // iff `ShouldQueueUponCapacityIncrease()` or `WillQueue()` was invoked
166     // since `EnterWillRunTask()`.
167     bool ExitWillRunTask(bool saturated);
168 
169     // Indicates that `DidProcessTask()` decided to re-enqueue the job. If this
170     // returns false, the job shouldn't re-enqueue the job (another worker
171     // currently in `WillRunTask()` will request that it remains in the queue).
172     bool WillReenqueue();
173 
174     // Loads and returns the state.
175     Value Load() const;
176 
177     // Returns a lock that must be held to call `IncrementWorkerCount()`.
increment_worker_count_lock()178     CheckedLock& increment_worker_count_lock() {
179       return increment_worker_count_lock_;
180     }
181 
182    private:
183     std::atomic<uint32_t> value_{kOutsideWillRunTaskOrMustReenqueueMask};
184     CheckedLock increment_worker_count_lock_{UniversalSuccessor()};
185   };
186 
187   ~JobTaskSourceNew() override;
188 
189   // Called from the joining thread. Waits for the worker count to be below or
190   // equal to max concurrency (may happen when "max concurrency" increases or
191   // the worker count is decremented). Returns true if the joining thread should
192   // run a task, or false if joining was completed and all other workers
193   // returned because either there's no work remaining or Job was cancelled.
194   bool WaitForParticipationOpportunity();
195 
196   size_t GetMaxConcurrency(size_t worker_count) const;
197 
198   // TaskSource:
199   RunStatus WillRunTask() override;
200   Task TakeTask(TaskSource::Transaction* transaction) override;
201   absl::optional<Task> Clear(TaskSource::Transaction* transaction) override;
202   bool DidProcessTask(TaskSource::Transaction* transaction) override;
203   bool WillReEnqueue(TimeTicks now,
204                      TaskSource::Transaction* transaction) override;
205   bool OnBecomeReady() override;
206 
207   State state_;
208 
209   // Signaled when the joining thread wants to particpate and capacity is
210   // created by increasing "max concurrency" or decrementing the worker count.
211   WaitableEvent join_event_{WaitableEvent::ResetPolicy::AUTOMATIC};
212 
213   std::atomic<uint32_t> assigned_task_ids_{0};
214 
215   RepeatingCallback<size_t(size_t)> max_concurrency_callback_;
216 
217   // Worker task set by the job owner.
218   RepeatingCallback<void(JobDelegate*)> worker_task_;
219   // Task returned from TakeTask(), that calls |worker_task_| internally.
220   RepeatingClosure primary_task_;
221 
222   TaskMetadata task_metadata_;
223 
224   const TimeTicks ready_time_;
225   raw_ptr<PooledTaskRunnerDelegate, LeakedDanglingUntriaged> delegate_;
226 };
227 
228 }  // namespace base::internal
229 
230 #endif  // BASE_TASK_THREAD_POOL_JOB_TASK_SOURCE_H_
231