• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/job_task_source_old.h"
6 
7 #include <bit>
8 #include <type_traits>
9 #include <utility>
10 
11 #include "base/check_op.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/notreached.h"
16 #include "base/task/common/checked_lock.h"
17 #include "base/task/task_features.h"
18 #include "base/task/thread_pool/pooled_task_runner_delegate.h"
19 #include "base/template_util.h"
20 #include "base/threading/thread_restrictions.h"
21 #include "base/time/time.h"
22 #include "base/time/time_override.h"
23 #include "base/trace_event/base_tracing.h"
24 
25 namespace base {
26 namespace internal {
27 
28 namespace {
29 
30 // Capped to allow assigning task_ids from a bitfield.
31 constexpr size_t kMaxWorkersPerJob = 32;
32 static_assert(
33     kMaxWorkersPerJob <=
34         std::numeric_limits<
35             std::invoke_result<decltype(&JobDelegate::GetTaskId),
36                                JobDelegate>::type>::max(),
37     "AcquireTaskId return type isn't big enough to fit kMaxWorkersPerJob");
38 
39 }  // namespace
40 
41 JobTaskSourceOld::State::State() = default;
42 JobTaskSourceOld::State::~State() = default;
43 
Cancel()44 JobTaskSourceOld::State::Value JobTaskSourceOld::State::Cancel() {
45   return {value_.fetch_or(kCanceledMask, std::memory_order_relaxed)};
46 }
47 
DecrementWorkerCount()48 JobTaskSourceOld::State::Value JobTaskSourceOld::State::DecrementWorkerCount() {
49   const uint32_t value_before_sub =
50       value_.fetch_sub(kWorkerCountIncrement, std::memory_order_relaxed);
51   DCHECK((value_before_sub >> kWorkerCountBitOffset) > 0);
52   return {value_before_sub};
53 }
54 
IncrementWorkerCount()55 JobTaskSourceOld::State::Value JobTaskSourceOld::State::IncrementWorkerCount() {
56   uint32_t value_before_add =
57       value_.fetch_add(kWorkerCountIncrement, std::memory_order_relaxed);
58   // The worker count must not overflow a uint8_t.
59   DCHECK((value_before_add >> kWorkerCountBitOffset) < ((1 << 8) - 1));
60   return {value_before_add};
61 }
62 
Load() const63 JobTaskSourceOld::State::Value JobTaskSourceOld::State::Load() const {
64   return {value_.load(std::memory_order_relaxed)};
65 }
66 
67 JobTaskSourceOld::JoinFlag::JoinFlag() = default;
68 JobTaskSourceOld::JoinFlag::~JoinFlag() = default;
69 
Reset()70 void JobTaskSourceOld::JoinFlag::Reset() {
71   value_.store(kNotWaiting, std::memory_order_relaxed);
72 }
73 
SetWaiting()74 void JobTaskSourceOld::JoinFlag::SetWaiting() {
75   value_.store(kWaitingForWorkerToYield, std::memory_order_relaxed);
76 }
77 
ShouldWorkerYield()78 bool JobTaskSourceOld::JoinFlag::ShouldWorkerYield() {
79   // The fetch_and() sets the state to kWaitingForWorkerToSignal if it was
80   // previously kWaitingForWorkerToYield, otherwise it leaves it unchanged.
81   return value_.fetch_and(kWaitingForWorkerToSignal,
82                           std::memory_order_relaxed) ==
83          kWaitingForWorkerToYield;
84 }
85 
ShouldWorkerSignal()86 bool JobTaskSourceOld::JoinFlag::ShouldWorkerSignal() {
87   return value_.exchange(kNotWaiting, std::memory_order_relaxed) != kNotWaiting;
88 }
89 
JobTaskSourceOld(const Location & from_here,const TaskTraits & traits,RepeatingCallback<void (JobDelegate *)> worker_task,MaxConcurrencyCallback max_concurrency_callback,PooledTaskRunnerDelegate * delegate)90 JobTaskSourceOld::JobTaskSourceOld(
91     const Location& from_here,
92     const TaskTraits& traits,
93     RepeatingCallback<void(JobDelegate*)> worker_task,
94     MaxConcurrencyCallback max_concurrency_callback,
95     PooledTaskRunnerDelegate* delegate)
96     : JobTaskSource(traits, nullptr, TaskSourceExecutionMode::kJob),
97       max_concurrency_callback_(std::move(max_concurrency_callback)),
98       worker_task_(std::move(worker_task)),
99       primary_task_(base::BindRepeating(
100           [](JobTaskSourceOld* self) {
101             CheckedLock::AssertNoLockHeldOnCurrentThread();
102             // Each worker task has its own delegate with associated state.
103             JobDelegate job_delegate{self, self->delegate_};
104             self->worker_task_.Run(&job_delegate);
105           },
106           base::Unretained(this))),
107       task_metadata_(from_here),
108       ready_time_(TimeTicks::Now()),
109       delegate_(delegate) {
110   DCHECK(delegate_);
111 }
112 
~JobTaskSourceOld()113 JobTaskSourceOld::~JobTaskSourceOld() {
114   // Make sure there's no outstanding active run operation left.
115   DCHECK_EQ(state_.Load().worker_count(), 0U);
116 }
117 
GetExecutionEnvironment()118 ExecutionEnvironment JobTaskSourceOld::GetExecutionEnvironment() {
119   return {SequenceToken::Create(), nullptr};
120 }
121 
WillEnqueue(int sequence_num,TaskAnnotator & annotator)122 void JobTaskSourceOld::WillEnqueue(int sequence_num, TaskAnnotator& annotator) {
123   if (task_metadata_.sequence_num != -1) {
124     // WillEnqueue() was already called.
125     return;
126   }
127   task_metadata_.sequence_num = sequence_num;
128   annotator.WillQueueTask("ThreadPool_PostJob", &task_metadata_);
129 }
130 
WillJoin()131 bool JobTaskSourceOld::WillJoin() {
132   TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity");
133   CheckedAutoLock auto_lock(worker_lock_);
134   DCHECK(!worker_released_condition_);  // This may only be called once.
135   worker_released_condition_ = worker_lock_.CreateConditionVariable();
136   // Prevent wait from triggering a ScopedBlockingCall as this would cause
137   // |ThreadGroup::lock_| to be acquired, causing lock inversion.
138   worker_released_condition_->declare_only_used_while_idle();
139   const auto state_before_add = state_.IncrementWorkerCount();
140 
141   if (!state_before_add.is_canceled() &&
142       state_before_add.worker_count() <
143           GetMaxConcurrency(state_before_add.worker_count())) {
144     return true;
145   }
146   return WaitForParticipationOpportunity();
147 }
148 
RunJoinTask()149 bool JobTaskSourceOld::RunJoinTask() {
150   JobDelegate job_delegate{this, nullptr};
151   worker_task_.Run(&job_delegate);
152 
153   // It is safe to read |state_| without a lock since this variable is atomic
154   // and the call to GetMaxConcurrency() is used for a best effort early exit.
155   // Stale values will only cause WaitForParticipationOpportunity() to be
156   // called.
157   const auto state = TS_UNCHECKED_READ(state_).Load();
158   // The condition is slightly different from the one in WillJoin() since we're
159   // using |state| that was already incremented to include the joining thread.
160   if (!state.is_canceled() &&
161       state.worker_count() <= GetMaxConcurrency(state.worker_count() - 1)) {
162     return true;
163   }
164 
165   TRACE_EVENT0("base", "Job.WaitForParticipationOpportunity");
166   CheckedAutoLock auto_lock(worker_lock_);
167   return WaitForParticipationOpportunity();
168 }
169 
Cancel(TaskSource::Transaction * transaction)170 void JobTaskSourceOld::Cancel(TaskSource::Transaction* transaction) {
171   // Sets the kCanceledMask bit on |state_| so that further calls to
172   // WillRunTask() never succeed. std::memory_order_relaxed without a lock is
173   // safe because this task source never needs to be re-enqueued after Cancel().
174   TS_UNCHECKED_READ(state_).Cancel();
175 }
176 
177 // EXCLUSIVE_LOCK_REQUIRED(worker_lock_)
WaitForParticipationOpportunity()178 bool JobTaskSourceOld::WaitForParticipationOpportunity() {
179   DCHECK(!join_flag_.IsWaiting());
180 
181   // std::memory_order_relaxed is sufficient because no other state is
182   // synchronized with |state_| outside of |lock_|.
183   auto state = state_.Load();
184   // |worker_count - 1| to exclude the joining thread which is not active.
185   size_t max_concurrency = GetMaxConcurrency(state.worker_count() - 1);
186 
187   // Wait until either:
188   //  A) |worker_count| is below or equal to max concurrency and state is not
189   //  canceled.
190   //  B) All other workers returned and |worker_count| is 1.
191   while (!((state.worker_count() <= max_concurrency && !state.is_canceled()) ||
192            state.worker_count() == 1)) {
193     // std::memory_order_relaxed is sufficient because no other state is
194     // synchronized with |join_flag_| outside of |lock_|.
195     join_flag_.SetWaiting();
196 
197     // To avoid unnecessarily waiting, if either condition A) or B) change
198     // |lock_| is taken and |worker_released_condition_| signaled if necessary:
199     // 1- In DidProcessTask(), after worker count is decremented.
200     // 2- In NotifyConcurrencyIncrease(), following a max_concurrency increase.
201     worker_released_condition_->Wait();
202     state = state_.Load();
203     // |worker_count - 1| to exclude the joining thread which is not active.
204     max_concurrency = GetMaxConcurrency(state.worker_count() - 1);
205   }
206   // It's possible though unlikely that the joining thread got a participation
207   // opportunity without a worker signaling.
208   join_flag_.Reset();
209 
210   // Case A:
211   if (state.worker_count() <= max_concurrency && !state.is_canceled()) {
212     return true;
213   }
214   // Case B:
215   // Only the joining thread remains.
216   DCHECK_EQ(state.worker_count(), 1U);
217   DCHECK(state.is_canceled() || max_concurrency == 0U);
218   state_.DecrementWorkerCount();
219   // Prevent subsequent accesses to user callbacks.
220   state_.Cancel();
221   return false;
222 }
223 
WillRunTask()224 TaskSource::RunStatus JobTaskSourceOld::WillRunTask() {
225   CheckedAutoLock auto_lock(worker_lock_);
226   auto state_before_add = state_.Load();
227 
228   // Don't allow this worker to run the task if either:
229   //   A) |state_| was canceled.
230   //   B) |worker_count| is already at |max_concurrency|.
231   //   C) |max_concurrency| was lowered below or to |worker_count|.
232   // Case A:
233   if (state_before_add.is_canceled()) {
234     return RunStatus::kDisallowed;
235   }
236 
237   const size_t max_concurrency =
238       GetMaxConcurrency(state_before_add.worker_count());
239   if (state_before_add.worker_count() < max_concurrency) {
240     state_before_add = state_.IncrementWorkerCount();
241   }
242   const size_t worker_count_before_add = state_before_add.worker_count();
243   // Case B) or C):
244   if (worker_count_before_add >= max_concurrency) {
245     return RunStatus::kDisallowed;
246   }
247 
248   DCHECK_LT(worker_count_before_add, max_concurrency);
249   return max_concurrency == worker_count_before_add + 1
250              ? RunStatus::kAllowedSaturated
251              : RunStatus::kAllowedNotSaturated;
252 }
253 
GetRemainingConcurrency() const254 size_t JobTaskSourceOld::GetRemainingConcurrency() const {
255   // It is safe to read |state_| without a lock since this variable is atomic,
256   // and no other state is synchronized with GetRemainingConcurrency().
257   const auto state = TS_UNCHECKED_READ(state_).Load();
258   if (state.is_canceled()) {
259     return 0;
260   }
261   const size_t max_concurrency = GetMaxConcurrency(state.worker_count());
262   // Avoid underflows.
263   if (state.worker_count() > max_concurrency) {
264     return 0;
265   }
266   return max_concurrency - state.worker_count();
267 }
268 
IsActive() const269 bool JobTaskSourceOld::IsActive() const {
270   CheckedAutoLock auto_lock(worker_lock_);
271   auto state = state_.Load();
272   return GetMaxConcurrency(state.worker_count()) != 0 ||
273          state.worker_count() != 0;
274 }
275 
GetWorkerCount() const276 size_t JobTaskSourceOld::GetWorkerCount() const {
277   return TS_UNCHECKED_READ(state_).Load().worker_count();
278 }
279 
NotifyConcurrencyIncrease()280 bool JobTaskSourceOld::NotifyConcurrencyIncrease() {
281   // Avoid unnecessary locks when NotifyConcurrencyIncrease() is spuriously
282   // called.
283   if (GetRemainingConcurrency() == 0) {
284     return true;
285   }
286 
287   {
288     // Lock is taken to access |join_flag_| below and signal
289     // |worker_released_condition_|.
290     CheckedAutoLock auto_lock(worker_lock_);
291     if (join_flag_.ShouldWorkerSignal()) {
292       worker_released_condition_->Signal();
293     }
294   }
295 
296   // Make sure the task source is in the queue if not already.
297   // Caveat: it's possible but unlikely that the task source has already reached
298   // its intended concurrency and doesn't need to be enqueued if there
299   // previously were too many worker. For simplicity, the task source is always
300   // enqueued and will get discarded if already saturated when it is popped from
301   // the priority queue.
302   return delegate_->EnqueueJobTaskSource(this);
303 }
304 
GetMaxConcurrency() const305 size_t JobTaskSourceOld::GetMaxConcurrency() const {
306   return GetMaxConcurrency(TS_UNCHECKED_READ(state_).Load().worker_count());
307 }
308 
GetMaxConcurrency(size_t worker_count) const309 size_t JobTaskSourceOld::GetMaxConcurrency(size_t worker_count) const {
310   return std::min(max_concurrency_callback_.Run(worker_count),
311                   kMaxWorkersPerJob);
312 }
313 
AcquireTaskId()314 uint8_t JobTaskSourceOld::AcquireTaskId() {
315   static_assert(kMaxWorkersPerJob <= sizeof(assigned_task_ids_) * 8,
316                 "TaskId bitfield isn't big enough to fit kMaxWorkersPerJob.");
317   uint32_t assigned_task_ids =
318       assigned_task_ids_.load(std::memory_order_relaxed);
319   uint32_t new_assigned_task_ids = 0;
320   int task_id = 0;
321   // memory_order_acquire on success, matched with memory_order_release in
322   // ReleaseTaskId() so that operations done by previous threads that had
323   // the same task_id become visible to the current thread.
324   do {
325     // Count trailing one bits. This is the id of the right-most 0-bit in
326     // |assigned_task_ids|.
327     task_id = std::countr_one(assigned_task_ids);
328     new_assigned_task_ids = assigned_task_ids | (uint32_t(1) << task_id);
329   } while (!assigned_task_ids_.compare_exchange_weak(
330       assigned_task_ids, new_assigned_task_ids, std::memory_order_acquire,
331       std::memory_order_relaxed));
332   return static_cast<uint8_t>(task_id);
333 }
334 
ReleaseTaskId(uint8_t task_id)335 void JobTaskSourceOld::ReleaseTaskId(uint8_t task_id) {
336   // memory_order_release to match AcquireTaskId().
337   uint32_t previous_task_ids = assigned_task_ids_.fetch_and(
338       ~(uint32_t(1) << task_id), std::memory_order_release);
339   DCHECK(previous_task_ids & (uint32_t(1) << task_id));
340 }
341 
ShouldYield()342 bool JobTaskSourceOld::ShouldYield() {
343   // It is safe to read |join_flag_| and |state_| without a lock since these
344   // variables are atomic, keeping in mind that threads may not immediately see
345   // the new value when it is updated.
346   return TS_UNCHECKED_READ(join_flag_).ShouldWorkerYield() ||
347          TS_UNCHECKED_READ(state_).Load().is_canceled();
348 }
349 
GetDelegate() const350 PooledTaskRunnerDelegate* JobTaskSourceOld::GetDelegate() const {
351   return delegate_;
352 }
353 
TakeTask(TaskSource::Transaction * transaction)354 Task JobTaskSourceOld::TakeTask(TaskSource::Transaction* transaction) {
355   // JobTaskSource members are not lock-protected so no need to acquire a lock
356   // if |transaction| is nullptr.
357   DCHECK_GT(TS_UNCHECKED_READ(state_).Load().worker_count(), 0U);
358   DCHECK(primary_task_);
359   return {task_metadata_, primary_task_};
360 }
361 
DidProcessTask(TaskSource::Transaction *)362 bool JobTaskSourceOld::DidProcessTask(
363     TaskSource::Transaction* /*transaction*/) {
364   // Lock is needed to access |join_flag_| below and signal
365   // |worker_released_condition_|.
366   CheckedAutoLock auto_lock(worker_lock_);
367   const auto state_before_sub = state_.DecrementWorkerCount();
368 
369   if (join_flag_.ShouldWorkerSignal()) {
370     worker_released_condition_->Signal();
371   }
372 
373   // A canceled task source should never get re-enqueued.
374   if (state_before_sub.is_canceled()) {
375     return false;
376   }
377 
378   DCHECK_GT(state_before_sub.worker_count(), 0U);
379 
380   // Re-enqueue the TaskSource if the task ran and the worker count is below the
381   // max concurrency.
382   // |worker_count - 1| to exclude the returning thread.
383   return state_before_sub.worker_count() <=
384          GetMaxConcurrency(state_before_sub.worker_count() - 1);
385 }
386 
387 // This is a no-op and should always return true.
WillReEnqueue(TimeTicks now,TaskSource::Transaction *)388 bool JobTaskSourceOld::WillReEnqueue(TimeTicks now,
389                                      TaskSource::Transaction* /*transaction*/) {
390   return true;
391 }
392 
393 // This is a no-op.
OnBecomeReady()394 bool JobTaskSourceOld::OnBecomeReady() {
395   return false;
396 }
397 
GetSortKey() const398 TaskSourceSortKey JobTaskSourceOld::GetSortKey() const {
399   return TaskSourceSortKey(priority_racy(), ready_time_,
400                            TS_UNCHECKED_READ(state_).Load().worker_count());
401 }
402 
403 // This function isn't expected to be called since a job is never delayed.
404 // However, the class still needs to provide an override.
GetDelayedSortKey() const405 TimeTicks JobTaskSourceOld::GetDelayedSortKey() const {
406   return TimeTicks();
407 }
408 
409 // This function isn't expected to be called since a job is never delayed.
410 // However, the class still needs to provide an override.
HasReadyTasks(TimeTicks now) const411 bool JobTaskSourceOld::HasReadyTasks(TimeTicks now) const {
412   NOTREACHED();
413   return true;
414 }
415 
Clear(TaskSource::Transaction * transaction)416 absl::optional<Task> JobTaskSourceOld::Clear(
417     TaskSource::Transaction* transaction) {
418   Cancel();
419 
420   // Nothing is cleared since other workers might still racily run tasks. For
421   // simplicity, the destructor will take care of it once all references are
422   // released.
423   return absl::nullopt;
424 }
425 
426 }  // namespace internal
427 }  // namespace base
428