• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/job_task_source.h"
6 
7 #include <bit>
8 #include <type_traits>
9 #include <utility>
10 
11 #include "base/check_op.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/notreached.h"
16 #include "base/task/common/checked_lock.h"
17 #include "base/task/task_features.h"
18 #include "base/task/thread_pool/pooled_task_runner_delegate.h"
19 #include "base/threading/thread_restrictions.h"
20 #include "base/time/time.h"
21 #include "base/time/time_override.h"
22 #include "base/trace_event/base_tracing.h"
23 
24 namespace base {
25 namespace internal {
26 
27 namespace {
28 
29 // Capped to allow assigning task_ids from a bitfield.
30 constexpr size_t kMaxWorkersPerJob = 32;
31 static_assert(
32     kMaxWorkersPerJob <=
33         std::numeric_limits<
34             std::invoke_result<decltype(&JobDelegate::GetTaskId),
35                                JobDelegate>::type>::max(),
36     "AcquireTaskId return type isn't big enough to fit kMaxWorkersPerJob");
37 
38 }  // namespace
39 
40 JobTaskSource::State::State() = default;
41 JobTaskSource::State::~State() = default;
42 
Cancel()43 JobTaskSource::State::Value JobTaskSource::State::Cancel() {
44   return {value_.fetch_or(kCanceledMask, std::memory_order_relaxed)};
45 }
46 
DecrementWorkerCount()47 JobTaskSource::State::Value JobTaskSource::State::DecrementWorkerCount() {
48   const uint32_t value_before_sub =
49       value_.fetch_sub(kWorkerCountIncrement, std::memory_order_relaxed);
50   DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0);
51   return {value_before_sub};
52 }
53 
IncrementWorkerCount()54 JobTaskSource::State::Value JobTaskSource::State::IncrementWorkerCount() {
55   uint32_t value_before_add =
56       value_.fetch_add(kWorkerCountIncrement, std::memory_order_relaxed);
57   // The worker count must not overflow a uint8_t.
58   DCHECK((value_before_add >> kWorkerCountBitOffset) < ((1 << 8) - 1));
59   return {value_before_add};
60 }
61 
Load() const62 JobTaskSource::State::Value JobTaskSource::State::Load() const {
63   return {value_.load(std::memory_order_relaxed)};
64 }
65 
66 JobTaskSource::JoinFlag::JoinFlag() = default;
67 JobTaskSource::JoinFlag::~JoinFlag() = default;
68 
Reset()69 void JobTaskSource::JoinFlag::Reset() {
70   value_.store(kNotWaiting, std::memory_order_relaxed);
71 }
72 
SetWaiting()73 void JobTaskSource::JoinFlag::SetWaiting() {
74   value_.store(kWaitingForWorkerToYield, std::memory_order_relaxed);
75 }
76 
ShouldWorkerYield()77 bool JobTaskSource::JoinFlag::ShouldWorkerYield() {
78   // The fetch_and() sets the state to kWaitingForWorkerToSignal if it was
79   // previously kWaitingForWorkerToYield, otherwise it leaves it unchanged.
80   return value_.fetch_and(kWaitingForWorkerToSignal,
81                           std::memory_order_relaxed) ==
82          kWaitingForWorkerToYield;
83 }
84 
ShouldWorkerSignal()85 bool JobTaskSource::JoinFlag::ShouldWorkerSignal() {
86   return value_.exchange(kNotWaiting, std::memory_order_relaxed) != kNotWaiting;
87 }
88 
JobTaskSource(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback,PooledTaskRunnerDelegate * delegate)89 JobTaskSource::JobTaskSource(const Location& from_here,
90                              const TaskTraits& traits,
91                              RepeatingCallback<void(JobDelegate*)> worker_task,
92                              MaxConcurrencyCallback max_concurrency_callback,
93                              PooledTaskRunnerDelegate* delegate)
94     : TaskSource(traits, TaskSourceExecutionMode::kJob),
95       max_concurrency_callback_(std::move(max_concurrency_callback)),
96       worker_task_(std::move(worker_task)),
97       primary_task_(base::BindRepeating(
98           [](JobTaskSource* self) {
99             CheckedLock::AssertNoLockHeldOnCurrentThread();
100             // Each worker task has its own delegate with associated state.
101             JobDelegate job_delegate{self, self->delegate_};
102             self->worker_task_.Run(&job_delegate);
103           },
104           base::Unretained(this))),
105       task_metadata_(from_here),
106       ready_time_(TimeTicks::Now()),
107       delegate_(delegate) {
108   DCHECK(delegate_);
109   task_metadata_.sequence_num = -1;
110 }
111 
~JobTaskSource()112 JobTaskSource::~JobTaskSource() {
113   // Make sure there's no outstanding active run operation left.
114   DCHECK_EQ(state_.Load().worker_count(), 0U);
115 }
116 
GetExecutionEnvironment()117 ExecutionEnvironment JobTaskSource::GetExecutionEnvironment() {
118   return {SequenceToken::Create()};
119 }
120 
WillEnqueue(int sequence_num,TaskAnnotator & annotator)121 void JobTaskSource::WillEnqueue(int sequence_num, TaskAnnotator& annotator) {
122   if (task_metadata_.sequence_num != -1) {
123     // WillEnqueue() was already called.
124     return;
125   }
126   task_metadata_.sequence_num = sequence_num;
127   annotator.WillQueueTask("ThreadPool_PostJob", &task_metadata_);
128 }
129 
WillJoin()130 bool JobTaskSource::WillJoin() {
131   TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity");
132   CheckedAutoLock auto_lock(worker_lock_);
133   DCHECK(!worker_released_condition_);  // This may only be called once.
134   worker_lock_.CreateConditionVariableAndEmplace(worker_released_condition_);
135   // Prevent wait from triggering a ScopedBlockingCall as this would cause
136   // |ThreadGroup::lock_| to be acquired, causing lock inversion.
137   worker_released_condition_->declare_only_used_while_idle();
138   const auto state_before_add = state_.IncrementWorkerCount();
139 
140   if (!state_before_add.is_canceled() &&
141       state_before_add.worker_count() <
142           GetMaxConcurrency(state_before_add.worker_count())) {
143     return true;
144   }
145   return WaitForParticipationOpportunity();
146 }
147 
RunJoinTask()148 bool JobTaskSource::RunJoinTask() {
149   JobDelegate job_delegate{this, nullptr};
150   worker_task_.Run(&job_delegate);
151 
152   // It is safe to read |state_| without a lock since this variable is atomic
153   // and the call to GetMaxConcurrency() is used for a best effort early exit.
154   // Stale values will only cause WaitForParticipationOpportunity() to be
155   // called.
156   const auto state = TS_UNCHECKED_READ(state_).Load();
157   // The condition is slightly different from the one in WillJoin() since we're
158   // using |state| that was already incremented to include the joining thread.
159   if (!state.is_canceled() &&
160       state.worker_count() <= GetMaxConcurrency(state.worker_count() - 1)) {
161     return true;
162   }
163 
164   TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity");
165   CheckedAutoLock auto_lock(worker_lock_);
166   return WaitForParticipationOpportunity();
167 }
168 
Cancel(TaskSource::Transaction * transaction)169 void JobTaskSource::Cancel(TaskSource::Transaction* transaction) {
170   // Sets the kCanceledMask bit on |state_| so that further calls to
171   // WillRunTask() never succeed. std::memory_order_relaxed without a lock is
172   // safe because this task source never needs to be re-enqueued after Cancel().
173   TS_UNCHECKED_READ(state_).Cancel();
174 }
175 
176 // EXCLUSIVE_LOCK_REQUIRED(worker_lock_)
WaitForParticipationOpportunity()177 bool JobTaskSource::WaitForParticipationOpportunity() {
178   DCHECK(!join_flag_.IsWaiting());
179 
180   // std::memory_order_relaxed is sufficient because no other state is
181   // synchronized with |state_| outside of |lock_|.
182   auto state = state_.Load();
183   // |worker_count - 1| to exclude the joining thread which is not active.
184   size_t max_concurrency = GetMaxConcurrency(state.worker_count() - 1);
185 
186   // Wait until either:
187   //  A) |worker_count| is below or equal to max concurrency and state is not
188   //  canceled.
189   //  B) All other workers returned and |worker_count| is 1.
190   while (!((state.worker_count() <= max_concurrency && !state.is_canceled()) ||
191            state.worker_count() == 1)) {
192     // std::memory_order_relaxed is sufficient because no other state is
193     // synchronized with |join_flag_| outside of |lock_|.
194     join_flag_.SetWaiting();
195 
196     // To avoid unnecessarily waiting, if either condition A) or B) change
197     // |lock_| is taken and |worker_released_condition_| signaled if necessary:
198     // 1- In DidProcessTask(), after worker count is decremented.
199     // 2- In NotifyConcurrencyIncrease(), following a max_concurrency increase.
200     worker_released_condition_->Wait();
201     state = state_.Load();
202     // |worker_count - 1| to exclude the joining thread which is not active.
203     max_concurrency = GetMaxConcurrency(state.worker_count() - 1);
204   }
205   // It's possible though unlikely that the joining thread got a participation
206   // opportunity without a worker signaling.
207   join_flag_.Reset();
208 
209   // Case A:
210   if (state.worker_count() <= max_concurrency && !state.is_canceled()) {
211     return true;
212   }
213   // Case B:
214   // Only the joining thread remains.
215   DCHECK_EQ(state.worker_count(), 1U);
216   DCHECK(state.is_canceled() || max_concurrency == 0U);
217   state_.DecrementWorkerCount();
218   // Prevent subsequent accesses to user callbacks.
219   state_.Cancel();
220   return false;
221 }
222 
WillRunTask()223 TaskSource::RunStatus JobTaskSource::WillRunTask() {
224   CheckedAutoLock auto_lock(worker_lock_);
225   auto state_before_add = state_.Load();
226 
227   // Don't allow this worker to run the task if either:
228   //   A) |state_| was canceled.
229   //   B) |worker_count| is already at |max_concurrency|.
230   //   C) |max_concurrency| was lowered below or to |worker_count|.
231   // Case A:
232   if (state_before_add.is_canceled()) {
233     return RunStatus::kDisallowed;
234   }
235 
236   const size_t max_concurrency =
237       GetMaxConcurrency(state_before_add.worker_count());
238   if (state_before_add.worker_count() < max_concurrency) {
239     state_before_add = state_.IncrementWorkerCount();
240   }
241   const size_t worker_count_before_add = state_before_add.worker_count();
242   // Case B) or C):
243   if (worker_count_before_add >= max_concurrency) {
244     return RunStatus::kDisallowed;
245   }
246 
247   DCHECK_LT(worker_count_before_add, max_concurrency);
248   return max_concurrency == worker_count_before_add + 1
249              ? RunStatus::kAllowedSaturated
250              : RunStatus::kAllowedNotSaturated;
251 }
252 
GetRemainingConcurrency() const253 size_t JobTaskSource::GetRemainingConcurrency() const {
254   // It is safe to read |state_| without a lock since this variable is atomic,
255   // and no other state is synchronized with GetRemainingConcurrency().
256   const auto state = TS_UNCHECKED_READ(state_).Load();
257   if (state.is_canceled()) {
258     return 0;
259   }
260   const size_t max_concurrency = GetMaxConcurrency(state.worker_count());
261   // Avoid underflows.
262   if (state.worker_count() > max_concurrency)
263     return 0;
264   return max_concurrency - state.worker_count();
265 }
266 
IsActive() const267 bool JobTaskSource::IsActive() const {
268   CheckedAutoLock auto_lock(worker_lock_);
269   auto state = state_.Load();
270   return GetMaxConcurrency(state.worker_count()) != 0 ||
271          state.worker_count() != 0;
272 }
273 
GetWorkerCount() const274 size_t JobTaskSource::GetWorkerCount() const {
275   return TS_UNCHECKED_READ(state_).Load().worker_count();
276 }
277 
NotifyConcurrencyIncrease()278 void JobTaskSource::NotifyConcurrencyIncrease() {
279   // Avoid unnecessary locks when NotifyConcurrencyIncrease() is spuriously
280   // called.
281   if (GetRemainingConcurrency() == 0) {
282     return;
283   }
284 
285   {
286     // Lock is taken to access |join_flag_| below and signal
287     // |worker_released_condition_|.
288     CheckedAutoLock auto_lock(worker_lock_);
289     if (join_flag_.ShouldWorkerSignal()) {
290       worker_released_condition_->Signal();
291     }
292   }
293 
294   // Make sure the task source is in the queue if not already.
295   // Caveat: it's possible but unlikely that the task source has already reached
296   // its intended concurrency and doesn't need to be enqueued if there
297   // previously were too many worker. For simplicity, the task source is always
298   // enqueued and will get discarded if already saturated when it is popped from
299   // the priority queue.
300   delegate_->EnqueueJobTaskSource(this);
301 }
302 
GetMaxConcurrency() const303 size_t JobTaskSource::GetMaxConcurrency() const {
304   return GetMaxConcurrency(TS_UNCHECKED_READ(state_).Load().worker_count());
305 }
306 
GetMaxConcurrency(size_t worker_count) const307 size_t JobTaskSource::GetMaxConcurrency(size_t worker_count) const {
308   return std::min(max_concurrency_callback_.Run(worker_count),
309                   kMaxWorkersPerJob);
310 }
311 
AcquireTaskId()312 uint8_t JobTaskSource::AcquireTaskId() {
313   static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8,
314                 "TaskId bitfield isn't big enough to fit kMaxWorkersPerJob.");
315   uint32_t assigned_task_ids =
316       assigned_task_ids_.load(std::memory_order_relaxed);
317   uint32_t new_assigned_task_ids = 0;
318   int task_id = 0;
319   // memory_order_acquire on success, matched with memory_order_release in
320   // ReleaseTaskId() so that operations done by previous threads that had
321   // the same task_id become visible to the current thread.
322   do {
323     // Count trailing one bits. This is the id of the right-most 0-bit in
324     // |assigned_task_ids|.
325     task_id = std::countr_one(assigned_task_ids);
326     new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id);
327   } while (!assigned_task_ids_.compare_exchange_weak(
328       assigned_task_ids, new_assigned_task_ids, std::memory_order_acquire,
329       std::memory_order_relaxed));
330   return static_cast<uint8_t>(task_id);
331 }
332 
ReleaseTaskId(uint8_t task_id)333 void JobTaskSource::ReleaseTaskId(uint8_t task_id) {
334   // memory_order_release to match AcquireTaskId().
335   uint32_t previous_task_ids = assigned_task_ids_.fetch_and(
336       ~(uint32_t(1) << task_id), std::memory_order_release);
337   DCHECK(previous_task_ids & (uint32_t(1) << task_id));
338 }
339 
ShouldYield()340 bool JobTaskSource::ShouldYield() {
341   // It is safe to read |join_flag_| and |state_| without a lock since these
342   // variables are atomic, keeping in mind that threads may not immediately see
343   // the new value when it is updated.
344   return TS_UNCHECKED_READ(join_flag_).ShouldWorkerYield() ||
345          TS_UNCHECKED_READ(state_).Load().is_canceled();
346 }
347 
TakeTask(TaskSource::Transaction * transaction)348 Task JobTaskSource::TakeTask(TaskSource::Transaction* transaction) {
349   // JobTaskSource members are not lock-protected so no need to acquire a lock
350   // if |transaction| is nullptr.
351   DCHECK_GT(TS_UNCHECKED_READ(state_).Load().worker_count(), 0U);
352   DCHECK(primary_task_);
353   return {task_metadata_, primary_task_};
354 }
355 
DidProcessTask(TaskSource::Transaction *)356 bool JobTaskSource::DidProcessTask(TaskSource::Transaction* /*transaction*/) {
357   // Lock is needed to access |join_flag_| below and signal
358   // |worker_released_condition_|.
359   CheckedAutoLock auto_lock(worker_lock_);
360   const auto state_before_sub = state_.DecrementWorkerCount();
361 
362   if (join_flag_.ShouldWorkerSignal()) {
363     worker_released_condition_->Signal();
364   }
365 
366   // A canceled task source should never get re-enqueued.
367   if (state_before_sub.is_canceled()) {
368     return false;
369   }
370 
371   DCHECK_GT(state_before_sub.worker_count(), 0U);
372 
373   // Re-enqueue the TaskSource if the task ran and the worker count is below the
374   // max concurrency.
375   // |worker_count - 1| to exclude the returning thread.
376   return state_before_sub.worker_count() <=
377          GetMaxConcurrency(state_before_sub.worker_count() - 1);
378 }
379 
380 // This is a no-op and should always return true.
WillReEnqueue(TimeTicks now,TaskSource::Transaction *)381 bool JobTaskSource::WillReEnqueue(TimeTicks now,
382                                   TaskSource::Transaction* /*transaction*/) {
383   return true;
384 }
385 
386 // This is a no-op.
OnBecomeReady()387 bool JobTaskSource::OnBecomeReady() {
388   return false;
389 }
390 
GetSortKey() const391 TaskSourceSortKey JobTaskSource::GetSortKey() const {
392   return TaskSourceSortKey(priority_racy(), ready_time_,
393                            TS_UNCHECKED_READ(state_).Load().worker_count());
394 }
395 
396 // This function isn't expected to be called since a job is never delayed.
397 // However, the class still needs to provide an override.
GetDelayedSortKey() const398 TimeTicks JobTaskSource::GetDelayedSortKey() const {
399   return TimeTicks();
400 }
401 
402 // This function isn't expected to be called since a job is never delayed.
403 // However, the class still needs to provide an override.
HasReadyTasks(TimeTicks now) const404 bool JobTaskSource::HasReadyTasks(TimeTicks now) const {
405   NOTREACHED();
406 }
407 
Clear(TaskSource::Transaction * transaction)408 std::optional<Task> JobTaskSource::Clear(TaskSource::Transaction* transaction) {
409   Cancel();
410 
411   // Nothing is cleared since other workers might still racily run tasks. For
412   // simplicity, the destructor will take care of it once all references are
413   // released.
414   return std::nullopt;
415 }
416 
417 }  // namespace internal
418 }  // namespace base
419