1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/job_task_source.h"
6
7 #include <bit>
8 #include <type_traits>
9 #include <utility>
10
11 #include "base/check_op.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/notreached.h"
16 #include "base/task/common/checked_lock.h"
17 #include "base/task/task_features.h"
18 #include "base/task/thread_pool/pooled_task_runner_delegate.h"
19 #include "base/threading/thread_restrictions.h"
20 #include "base/time/time.h"
21 #include "base/time/time_override.h"
22 #include "base/trace_event/base_tracing.h"
23
24 namespace base {
25 namespace internal {
26
27 namespace {
28
29 // Capped to allow assigning task_ids from a bitfield.
30 constexpr size_t kMaxWorkersPerJob = 32;
31 static_assert(
32 kMaxWorkersPerJob <=
33 std::numeric_limits<
34 std::invoke_result<decltype(&JobDelegate::GetTaskId),
35 JobDelegate>::type>::max(),
36 "AcquireTaskId return type isn't big enough to fit kMaxWorkersPerJob");
37
38 } // namespace
39
40 JobTaskSource::State::State() = default;
41 JobTaskSource::State::~State() = default;
42
Cancel()43 JobTaskSource::State::Value JobTaskSource::State::Cancel() {
44 return {value_.fetch_or(kCanceledMask, std::memory_order_relaxed)};
45 }
46
DecrementWorkerCount()47 JobTaskSource::State::Value JobTaskSource::State::DecrementWorkerCount() {
48 const uint32_t value_before_sub =
49 value_.fetch_sub(kWorkerCountIncrement, std::memory_order_relaxed);
50 DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0);
51 return {value_before_sub};
52 }
53
IncrementWorkerCount()54 JobTaskSource::State::Value JobTaskSource::State::IncrementWorkerCount() {
55 uint32_t value_before_add =
56 value_.fetch_add(kWorkerCountIncrement, std::memory_order_relaxed);
57 // The worker count must not overflow a uint8_t.
58 DCHECK((value_before_add >> kWorkerCountBitOffset) < ((1 << 8) - 1));
59 return {value_before_add};
60 }
61
Load() const62 JobTaskSource::State::Value JobTaskSource::State::Load() const {
63 return {value_.load(std::memory_order_relaxed)};
64 }
65
66 JobTaskSource::JoinFlag::JoinFlag() = default;
67 JobTaskSource::JoinFlag::~JoinFlag() = default;
68
Reset()69 void JobTaskSource::JoinFlag::Reset() {
70 value_.store(kNotWaiting, std::memory_order_relaxed);
71 }
72
SetWaiting()73 void JobTaskSource::JoinFlag::SetWaiting() {
74 value_.store(kWaitingForWorkerToYield, std::memory_order_relaxed);
75 }
76
ShouldWorkerYield()77 bool JobTaskSource::JoinFlag::ShouldWorkerYield() {
78 // The fetch_and() sets the state to kWaitingForWorkerToSignal if it was
79 // previously kWaitingForWorkerToYield, otherwise it leaves it unchanged.
80 return value_.fetch_and(kWaitingForWorkerToSignal,
81 std::memory_order_relaxed) ==
82 kWaitingForWorkerToYield;
83 }
84
ShouldWorkerSignal()85 bool JobTaskSource::JoinFlag::ShouldWorkerSignal() {
86 return value_.exchange(kNotWaiting, std::memory_order_relaxed) != kNotWaiting;
87 }
88
JobTaskSource(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback,PooledTaskRunnerDelegate * delegate)89 JobTaskSource::JobTaskSource(const Location& from_here,
90 const TaskTraits& traits,
91 RepeatingCallback<void(JobDelegate*)> worker_task,
92 MaxConcurrencyCallback max_concurrency_callback,
93 PooledTaskRunnerDelegate* delegate)
94 : TaskSource(traits, TaskSourceExecutionMode::kJob),
95 max_concurrency_callback_(std::move(max_concurrency_callback)),
96 worker_task_(std::move(worker_task)),
97 primary_task_(base::BindRepeating(
98 [](JobTaskSource* self) {
99 CheckedLock::AssertNoLockHeldOnCurrentThread();
100 // Each worker task has its own delegate with associated state.
101 JobDelegate job_delegate{self, self->delegate_};
102 self->worker_task_.Run(&job_delegate);
103 },
104 base::Unretained(this))),
105 task_metadata_(from_here),
106 ready_time_(TimeTicks::Now()),
107 delegate_(delegate) {
108 DCHECK(delegate_);
109 task_metadata_.sequence_num = -1;
110 }
111
~JobTaskSource()112 JobTaskSource::~JobTaskSource() {
113 // Make sure there's no outstanding active run operation left.
114 DCHECK_EQ(state_.Load().worker_count(), 0U);
115 }
116
GetExecutionEnvironment()117 ExecutionEnvironment JobTaskSource::GetExecutionEnvironment() {
118 return {SequenceToken::Create()};
119 }
120
WillEnqueue(int sequence_num,TaskAnnotator & annotator)121 void JobTaskSource::WillEnqueue(int sequence_num, TaskAnnotator& annotator) {
122 if (task_metadata_.sequence_num != -1) {
123 // WillEnqueue() was already called.
124 return;
125 }
126 task_metadata_.sequence_num = sequence_num;
127 annotator.WillQueueTask("ThreadPool_PostJob", &task_metadata_);
128 }
129
WillJoin()130 bool JobTaskSource::WillJoin() {
131 TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity");
132 CheckedAutoLock auto_lock(worker_lock_);
133 DCHECK(!worker_released_condition_); // This may only be called once.
134 worker_lock_.CreateConditionVariableAndEmplace(worker_released_condition_);
135 // Prevent wait from triggering a ScopedBlockingCall as this would cause
136 // |ThreadGroup::lock_| to be acquired, causing lock inversion.
137 worker_released_condition_->declare_only_used_while_idle();
138 const auto state_before_add = state_.IncrementWorkerCount();
139
140 if (!state_before_add.is_canceled() &&
141 state_before_add.worker_count() <
142 GetMaxConcurrency(state_before_add.worker_count())) {
143 return true;
144 }
145 return WaitForParticipationOpportunity();
146 }
147
RunJoinTask()148 bool JobTaskSource::RunJoinTask() {
149 JobDelegate job_delegate{this, nullptr};
150 worker_task_.Run(&job_delegate);
151
152 // It is safe to read |state_| without a lock since this variable is atomic
153 // and the call to GetMaxConcurrency() is used for a best effort early exit.
154 // Stale values will only cause WaitForParticipationOpportunity() to be
155 // called.
156 const auto state = TS_UNCHECKED_READ(state_).Load();
157 // The condition is slightly different from the one in WillJoin() since we're
158 // using |state| that was already incremented to include the joining thread.
159 if (!state.is_canceled() &&
160 state.worker_count() <= GetMaxConcurrency(state.worker_count() - 1)) {
161 return true;
162 }
163
164 TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity");
165 CheckedAutoLock auto_lock(worker_lock_);
166 return WaitForParticipationOpportunity();
167 }
168
Cancel(TaskSource::Transaction * transaction)169 void JobTaskSource::Cancel(TaskSource::Transaction* transaction) {
170 // Sets the kCanceledMask bit on |state_| so that further calls to
171 // WillRunTask() never succeed. std::memory_order_relaxed without a lock is
172 // safe because this task source never needs to be re-enqueued after Cancel().
173 TS_UNCHECKED_READ(state_).Cancel();
174 }
175
176 // EXCLUSIVE_LOCK_REQUIRED(worker_lock_)
WaitForParticipationOpportunity()177 bool JobTaskSource::WaitForParticipationOpportunity() {
178 DCHECK(!join_flag_.IsWaiting());
179
180 // std::memory_order_relaxed is sufficient because no other state is
181 // synchronized with |state_| outside of |lock_|.
182 auto state = state_.Load();
183 // |worker_count - 1| to exclude the joining thread which is not active.
184 size_t max_concurrency = GetMaxConcurrency(state.worker_count() - 1);
185
186 // Wait until either:
187 // A) |worker_count| is below or equal to max concurrency and state is not
188 // canceled.
189 // B) All other workers returned and |worker_count| is 1.
190 while (!((state.worker_count() <= max_concurrency && !state.is_canceled()) ||
191 state.worker_count() == 1)) {
192 // std::memory_order_relaxed is sufficient because no other state is
193 // synchronized with |join_flag_| outside of |lock_|.
194 join_flag_.SetWaiting();
195
196 // To avoid unnecessarily waiting, if either condition A) or B) change
197 // |lock_| is taken and |worker_released_condition_| signaled if necessary:
198 // 1- In DidProcessTask(), after worker count is decremented.
199 // 2- In NotifyConcurrencyIncrease(), following a max_concurrency increase.
200 worker_released_condition_->Wait();
201 state = state_.Load();
202 // |worker_count - 1| to exclude the joining thread which is not active.
203 max_concurrency = GetMaxConcurrency(state.worker_count() - 1);
204 }
205 // It's possible though unlikely that the joining thread got a participation
206 // opportunity without a worker signaling.
207 join_flag_.Reset();
208
209 // Case A:
210 if (state.worker_count() <= max_concurrency && !state.is_canceled()) {
211 return true;
212 }
213 // Case B:
214 // Only the joining thread remains.
215 DCHECK_EQ(state.worker_count(), 1U);
216 DCHECK(state.is_canceled() || max_concurrency == 0U);
217 state_.DecrementWorkerCount();
218 // Prevent subsequent accesses to user callbacks.
219 state_.Cancel();
220 return false;
221 }
222
WillRunTask()223 TaskSource::RunStatus JobTaskSource::WillRunTask() {
224 CheckedAutoLock auto_lock(worker_lock_);
225 auto state_before_add = state_.Load();
226
227 // Don't allow this worker to run the task if either:
228 // A) |state_| was canceled.
229 // B) |worker_count| is already at |max_concurrency|.
230 // C) |max_concurrency| was lowered below or to |worker_count|.
231 // Case A:
232 if (state_before_add.is_canceled()) {
233 return RunStatus::kDisallowed;
234 }
235
236 const size_t max_concurrency =
237 GetMaxConcurrency(state_before_add.worker_count());
238 if (state_before_add.worker_count() < max_concurrency) {
239 state_before_add = state_.IncrementWorkerCount();
240 }
241 const size_t worker_count_before_add = state_before_add.worker_count();
242 // Case B) or C):
243 if (worker_count_before_add >= max_concurrency) {
244 return RunStatus::kDisallowed;
245 }
246
247 DCHECK_LT(worker_count_before_add, max_concurrency);
248 return max_concurrency == worker_count_before_add + 1
249 ? RunStatus::kAllowedSaturated
250 : RunStatus::kAllowedNotSaturated;
251 }
252
GetRemainingConcurrency() const253 size_t JobTaskSource::GetRemainingConcurrency() const {
254 // It is safe to read |state_| without a lock since this variable is atomic,
255 // and no other state is synchronized with GetRemainingConcurrency().
256 const auto state = TS_UNCHECKED_READ(state_).Load();
257 if (state.is_canceled()) {
258 return 0;
259 }
260 const size_t max_concurrency = GetMaxConcurrency(state.worker_count());
261 // Avoid underflows.
262 if (state.worker_count() > max_concurrency)
263 return 0;
264 return max_concurrency - state.worker_count();
265 }
266
IsActive() const267 bool JobTaskSource::IsActive() const {
268 CheckedAutoLock auto_lock(worker_lock_);
269 auto state = state_.Load();
270 return GetMaxConcurrency(state.worker_count()) != 0 ||
271 state.worker_count() != 0;
272 }
273
GetWorkerCount() const274 size_t JobTaskSource::GetWorkerCount() const {
275 return TS_UNCHECKED_READ(state_).Load().worker_count();
276 }
277
NotifyConcurrencyIncrease()278 void JobTaskSource::NotifyConcurrencyIncrease() {
279 // Avoid unnecessary locks when NotifyConcurrencyIncrease() is spuriously
280 // called.
281 if (GetRemainingConcurrency() == 0) {
282 return;
283 }
284
285 {
286 // Lock is taken to access |join_flag_| below and signal
287 // |worker_released_condition_|.
288 CheckedAutoLock auto_lock(worker_lock_);
289 if (join_flag_.ShouldWorkerSignal()) {
290 worker_released_condition_->Signal();
291 }
292 }
293
294 // Make sure the task source is in the queue if not already.
295 // Caveat: it's possible but unlikely that the task source has already reached
296 // its intended concurrency and doesn't need to be enqueued if there
297 // previously were too many worker. For simplicity, the task source is always
298 // enqueued and will get discarded if already saturated when it is popped from
299 // the priority queue.
300 delegate_->EnqueueJobTaskSource(this);
301 }
302
GetMaxConcurrency() const303 size_t JobTaskSource::GetMaxConcurrency() const {
304 return GetMaxConcurrency(TS_UNCHECKED_READ(state_).Load().worker_count());
305 }
306
GetMaxConcurrency(size_t worker_count) const307 size_t JobTaskSource::GetMaxConcurrency(size_t worker_count) const {
308 return std::min(max_concurrency_callback_.Run(worker_count),
309 kMaxWorkersPerJob);
310 }
311
AcquireTaskId()312 uint8_t JobTaskSource::AcquireTaskId() {
313 static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8,
314 "TaskId bitfield isn't big enough to fit kMaxWorkersPerJob.");
315 uint32_t assigned_task_ids =
316 assigned_task_ids_.load(std::memory_order_relaxed);
317 uint32_t new_assigned_task_ids = 0;
318 int task_id = 0;
319 // memory_order_acquire on success, matched with memory_order_release in
320 // ReleaseTaskId() so that operations done by previous threads that had
321 // the same task_id become visible to the current thread.
322 do {
323 // Count trailing one bits. This is the id of the right-most 0-bit in
324 // |assigned_task_ids|.
325 task_id = std::countr_one(assigned_task_ids);
326 new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id);
327 } while (!assigned_task_ids_.compare_exchange_weak(
328 assigned_task_ids, new_assigned_task_ids, std::memory_order_acquire,
329 std::memory_order_relaxed));
330 return static_cast<uint8_t>(task_id);
331 }
332
ReleaseTaskId(uint8_t task_id)333 void JobTaskSource::ReleaseTaskId(uint8_t task_id) {
334 // memory_order_release to match AcquireTaskId().
335 uint32_t previous_task_ids = assigned_task_ids_.fetch_and(
336 ~(uint32_t(1) << task_id), std::memory_order_release);
337 DCHECK(previous_task_ids & (uint32_t(1) << task_id));
338 }
339
ShouldYield()340 bool JobTaskSource::ShouldYield() {
341 // It is safe to read |join_flag_| and |state_| without a lock since these
342 // variables are atomic, keeping in mind that threads may not immediately see
343 // the new value when it is updated.
344 return TS_UNCHECKED_READ(join_flag_).ShouldWorkerYield() ||
345 TS_UNCHECKED_READ(state_).Load().is_canceled();
346 }
347
TakeTask(TaskSource::Transaction * transaction)348 Task JobTaskSource::TakeTask(TaskSource::Transaction* transaction) {
349 // JobTaskSource members are not lock-protected so no need to acquire a lock
350 // if |transaction| is nullptr.
351 DCHECK_GT(TS_UNCHECKED_READ(state_).Load().worker_count(), 0U);
352 DCHECK(primary_task_);
353 return {task_metadata_, primary_task_};
354 }
355
DidProcessTask(TaskSource::Transaction *)356 bool JobTaskSource::DidProcessTask(TaskSource::Transaction* /*transaction*/) {
357 // Lock is needed to access |join_flag_| below and signal
358 // |worker_released_condition_|.
359 CheckedAutoLock auto_lock(worker_lock_);
360 const auto state_before_sub = state_.DecrementWorkerCount();
361
362 if (join_flag_.ShouldWorkerSignal()) {
363 worker_released_condition_->Signal();
364 }
365
366 // A canceled task source should never get re-enqueued.
367 if (state_before_sub.is_canceled()) {
368 return false;
369 }
370
371 DCHECK_GT(state_before_sub.worker_count(), 0U);
372
373 // Re-enqueue the TaskSource if the task ran and the worker count is below the
374 // max concurrency.
375 // |worker_count - 1| to exclude the returning thread.
376 return state_before_sub.worker_count() <=
377 GetMaxConcurrency(state_before_sub.worker_count() - 1);
378 }
379
380 // This is a no-op and should always return true.
WillReEnqueue(TimeTicks now,TaskSource::Transaction *)381 bool JobTaskSource::WillReEnqueue(TimeTicks now,
382 TaskSource::Transaction* /*transaction*/) {
383 return true;
384 }
385
386 // This is a no-op.
OnBecomeReady()387 bool JobTaskSource::OnBecomeReady() {
388 return false;
389 }
390
GetSortKey() const391 TaskSourceSortKey JobTaskSource::GetSortKey() const {
392 return TaskSourceSortKey(priority_racy(), ready_time_,
393 TS_UNCHECKED_READ(state_).Load().worker_count());
394 }
395
396 // This function isn't expected to be called since a job is never delayed.
397 // However, the class still needs to provide an override.
GetDelayedSortKey() const398 TimeTicks JobTaskSource::GetDelayedSortKey() const {
399 return TimeTicks();
400 }
401
402 // This function isn't expected to be called since a job is never delayed.
403 // However, the class still needs to provide an override.
HasReadyTasks(TimeTicks now) const404 bool JobTaskSource::HasReadyTasks(TimeTicks now) const {
405 NOTREACHED();
406 }
407
Clear(TaskSource::Transaction * transaction)408 std::optional<Task> JobTaskSource::Clear(TaskSource::Transaction* transaction) {
409 Cancel();
410
411 // Nothing is cleared since other workers might still racily run tasks. For
412 // simplicity, the destructor will take care of it once all references are
413 // released.
414 return std::nullopt;
415 }
416
417 } // namespace internal
418 } // namespace base
419