• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group.h"
6 
7 #include <utility>
8 
9 #include "base/feature_list.h"
10 #include "base/functional/bind.h"
11 #include "base/functional/callback_helpers.h"
12 #include "base/task/task_features.h"
13 #include "base/task/thread_pool/task_tracker.h"
14 #include "build/build_config.h"
15 #include "third_party/abseil-cpp/absl/base/attributes.h"
16 
17 #if BUILDFLAG(IS_WIN)
18 #include "base/win/com_init_check_hook.h"
19 #include "base/win/scoped_winrt_initializer.h"
20 #endif
21 
22 namespace base {
23 namespace internal {
24 
25 namespace {
26 
27 // ThreadGroup that owns the current thread, if any.
28 ABSL_CONST_INIT thread_local const ThreadGroup* current_thread_group = nullptr;
29 
30 }  // namespace
31 
32 constexpr ThreadGroup::YieldSortKey ThreadGroup::kMaxYieldSortKey;
33 
ScheduleReleaseTaskSource(RegisteredTaskSource task_source)34 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleReleaseTaskSource(
35     RegisteredTaskSource task_source) {
36   task_sources_to_release_.push_back(std::move(task_source));
37 }
38 
39 ThreadGroup::BaseScopedCommandsExecutor::BaseScopedCommandsExecutor() = default;
40 
~BaseScopedCommandsExecutor()41 ThreadGroup::BaseScopedCommandsExecutor::~BaseScopedCommandsExecutor() {
42   CheckedLock::AssertNoLockHeldOnCurrentThread();
43 }
44 
45 ThreadGroup::ScopedReenqueueExecutor::ScopedReenqueueExecutor() = default;
46 
~ScopedReenqueueExecutor()47 ThreadGroup::ScopedReenqueueExecutor::~ScopedReenqueueExecutor() {
48   if (destination_thread_group_) {
49     destination_thread_group_->PushTaskSourceAndWakeUpWorkers(
50         std::move(transaction_with_task_source_.value()));
51   }
52 }
53 
54 void ThreadGroup::ScopedReenqueueExecutor::
SchedulePushTaskSourceAndWakeUpWorkers(TransactionWithRegisteredTaskSource transaction_with_task_source,ThreadGroup * destination_thread_group)55     SchedulePushTaskSourceAndWakeUpWorkers(
56         TransactionWithRegisteredTaskSource transaction_with_task_source,
57         ThreadGroup* destination_thread_group) {
58   DCHECK(destination_thread_group);
59   DCHECK(!destination_thread_group_);
60   DCHECK(!transaction_with_task_source_);
61   transaction_with_task_source_.emplace(
62       std::move(transaction_with_task_source));
63   destination_thread_group_ = destination_thread_group;
64 }
65 
ThreadGroup(TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate,ThreadGroup * predecessor_thread_group)66 ThreadGroup::ThreadGroup(TrackedRef<TaskTracker> task_tracker,
67                          TrackedRef<Delegate> delegate,
68                          ThreadGroup* predecessor_thread_group)
69     : task_tracker_(std::move(task_tracker)),
70       delegate_(std::move(delegate)),
71       lock_(predecessor_thread_group ? &predecessor_thread_group->lock_
72                                      : nullptr) {
73   DCHECK(task_tracker_);
74 }
75 
76 ThreadGroup::~ThreadGroup() = default;
77 
BindToCurrentThread()78 void ThreadGroup::BindToCurrentThread() {
79   DCHECK(!CurrentThreadHasGroup());
80   current_thread_group = this;
81 }
82 
UnbindFromCurrentThread()83 void ThreadGroup::UnbindFromCurrentThread() {
84   DCHECK(IsBoundToCurrentThread());
85   current_thread_group = nullptr;
86 }
87 
IsBoundToCurrentThread() const88 bool ThreadGroup::IsBoundToCurrentThread() const {
89   return current_thread_group == this;
90 }
91 
Start()92 void ThreadGroup::Start() {
93   CheckedAutoLock auto_lock(lock_);
94 }
95 
96 size_t
GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const97 ThreadGroup::GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired()
98     const {
99   // For simplicity, only 1 worker is assigned to each task source regardless of
100   // its max concurrency, with the exception of the top task source.
101   const size_t num_queued =
102       priority_queue_.GetNumTaskSourcesWithPriority(TaskPriority::BEST_EFFORT);
103   if (num_queued == 0 ||
104       !task_tracker_->CanRunPriority(TaskPriority::BEST_EFFORT)) {
105     return 0U;
106   }
107   if (priority_queue_.PeekSortKey().priority() == TaskPriority::BEST_EFFORT) {
108     // Assign the correct number of workers for the top TaskSource (-1 for the
109     // worker that is already accounted for in |num_queued|).
110     return std::max<size_t>(
111         1, num_queued +
112                priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1);
113   }
114   return num_queued;
115 }
116 
117 size_t
GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() const118 ThreadGroup::GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired()
119     const {
120   // For simplicity, only 1 worker is assigned to each task source regardless of
121   // its max concurrency, with the exception of the top task source.
122   const size_t num_queued = priority_queue_.GetNumTaskSourcesWithPriority(
123                                 TaskPriority::USER_VISIBLE) +
124                             priority_queue_.GetNumTaskSourcesWithPriority(
125                                 TaskPriority::USER_BLOCKING);
126   if (num_queued == 0 ||
127       !task_tracker_->CanRunPriority(TaskPriority::HIGHEST)) {
128     return 0U;
129   }
130   auto priority = priority_queue_.PeekSortKey().priority();
131   if (priority == TaskPriority::USER_VISIBLE ||
132       priority == TaskPriority::USER_BLOCKING) {
133     // Assign the correct number of workers for the top TaskSource (-1 for the
134     // worker that is already accounted for in |num_queued|).
135     return std::max<size_t>(
136         1, num_queued +
137                priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1);
138   }
139   return num_queued;
140 }
141 
RemoveTaskSource(const TaskSource & task_source)142 RegisteredTaskSource ThreadGroup::RemoveTaskSource(
143     const TaskSource& task_source) {
144   CheckedAutoLock auto_lock(lock_);
145   return priority_queue_.RemoveTaskSource(task_source);
146 }
147 
ReEnqueueTaskSourceLockRequired(BaseScopedCommandsExecutor * workers_executor,ScopedReenqueueExecutor * reenqueue_executor,TransactionWithRegisteredTaskSource transaction_with_task_source)148 void ThreadGroup::ReEnqueueTaskSourceLockRequired(
149     BaseScopedCommandsExecutor* workers_executor,
150     ScopedReenqueueExecutor* reenqueue_executor,
151     TransactionWithRegisteredTaskSource transaction_with_task_source) {
152   // Decide in which thread group the TaskSource should be reenqueued.
153   ThreadGroup* destination_thread_group = delegate_->GetThreadGroupForTraits(
154       transaction_with_task_source.transaction.traits());
155 
156   bool push_to_immediate_queue =
157       transaction_with_task_source.task_source.WillReEnqueue(
158           TimeTicks::Now(), &transaction_with_task_source.transaction);
159 
160   if (destination_thread_group == this) {
161     // Another worker that was running a task from this task source may have
162     // reenqueued it already, in which case its heap_handle will be valid. It
163     // shouldn't be queued twice so the task source registration is released.
164     if (transaction_with_task_source.task_source->immediate_heap_handle()
165             .IsValid()) {
166       workers_executor->ScheduleReleaseTaskSource(
167           std::move(transaction_with_task_source.task_source));
168     } else {
169       // If the TaskSource should be reenqueued in the current thread group,
170       // reenqueue it inside the scope of the lock.
171       if (push_to_immediate_queue) {
172         auto sort_key = transaction_with_task_source.task_source->GetSortKey();
173         // When moving |task_source| into |priority_queue_|, it may be destroyed
174         // on another thread as soon as |lock_| is released, since we're no
175         // longer holding a reference to it. To prevent UAF, release
176         // |transaction| before moving |task_source|. Ref. crbug.com/1412008
177         transaction_with_task_source.transaction.Release();
178         priority_queue_.Push(
179             std::move(transaction_with_task_source.task_source), sort_key);
180       }
181     }
182     // This is called unconditionally to ensure there are always workers to run
183     // task sources in the queue. Some ThreadGroup implementations only invoke
184     // TakeRegisteredTaskSource() once per wake up and hence this is required to
185     // avoid races that could leave a task source stranded in the queue with no
186     // active workers.
187     EnsureEnoughWorkersLockRequired(workers_executor);
188   } else {
189     // Otherwise, schedule a reenqueue after releasing the lock.
190     reenqueue_executor->SchedulePushTaskSourceAndWakeUpWorkers(
191         std::move(transaction_with_task_source), destination_thread_group);
192   }
193 }
194 
TakeRegisteredTaskSource(BaseScopedCommandsExecutor * executor)195 RegisteredTaskSource ThreadGroup::TakeRegisteredTaskSource(
196     BaseScopedCommandsExecutor* executor) {
197   DCHECK(!priority_queue_.IsEmpty());
198 
199   auto run_status = priority_queue_.PeekTaskSource().WillRunTask();
200 
201   if (run_status == TaskSource::RunStatus::kDisallowed) {
202     executor->ScheduleReleaseTaskSource(priority_queue_.PopTaskSource());
203     return nullptr;
204   }
205 
206   if (run_status == TaskSource::RunStatus::kAllowedSaturated)
207     return priority_queue_.PopTaskSource();
208 
209   // If the TaskSource isn't saturated, check whether TaskTracker allows it to
210   // remain in the PriorityQueue.
211   // The canonical way of doing this is to pop the task source to return, call
212   // RegisterTaskSource() to get an additional RegisteredTaskSource, and
213   // reenqueue that task source if valid. Instead, it is cheaper and equivalent
214   // to peek the task source, call RegisterTaskSource() to get an additional
215   // RegisteredTaskSource to replace if valid, and only pop |priority_queue_|
216   // otherwise.
217   RegisteredTaskSource task_source =
218       task_tracker_->RegisterTaskSource(priority_queue_.PeekTaskSource().get());
219   if (!task_source)
220     return priority_queue_.PopTaskSource();
221   // Replace the top task_source and then update the queue.
222   std::swap(priority_queue_.PeekTaskSource(), task_source);
223   priority_queue_.UpdateSortKey(*task_source.get(), task_source->GetSortKey());
224   return task_source;
225 }
226 
UpdateSortKeyImpl(BaseScopedCommandsExecutor * executor,TaskSource::Transaction transaction)227 void ThreadGroup::UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor,
228                                     TaskSource::Transaction transaction) {
229   CheckedAutoLock auto_lock(lock_);
230   priority_queue_.UpdateSortKey(*transaction.task_source(),
231                                 transaction.task_source()->GetSortKey());
232   EnsureEnoughWorkersLockRequired(executor);
233 }
234 
PushTaskSourceAndWakeUpWorkersImpl(BaseScopedCommandsExecutor * executor,TransactionWithRegisteredTaskSource transaction_with_task_source)235 void ThreadGroup::PushTaskSourceAndWakeUpWorkersImpl(
236     BaseScopedCommandsExecutor* executor,
237     TransactionWithRegisteredTaskSource transaction_with_task_source) {
238   CheckedAutoLock auto_lock(lock_);
239   DCHECK(!replacement_thread_group_);
240   DCHECK_EQ(delegate_->GetThreadGroupForTraits(
241                 transaction_with_task_source.transaction.traits()),
242             this);
243   if (transaction_with_task_source.task_source->immediate_heap_handle()
244           .IsValid()) {
245     // If the task source changed group, it is possible that multiple concurrent
246     // workers try to enqueue it. Only the first enqueue should succeed.
247     executor->ScheduleReleaseTaskSource(
248         std::move(transaction_with_task_source.task_source));
249     return;
250   }
251   auto sort_key = transaction_with_task_source.task_source->GetSortKey();
252   // When moving |task_source| into |priority_queue_|, it may be destroyed
253   // on another thread as soon as |lock_| is released, since we're no longer
254   // holding a reference to it. To prevent UAF, release |transaction| before
255   // moving |task_source|. Ref. crbug.com/1412008
256   transaction_with_task_source.transaction.Release();
257   priority_queue_.Push(std::move(transaction_with_task_source.task_source),
258                        sort_key);
259   EnsureEnoughWorkersLockRequired(executor);
260 }
261 
InvalidateAndHandoffAllTaskSourcesToOtherThreadGroup(ThreadGroup * destination_thread_group)262 void ThreadGroup::InvalidateAndHandoffAllTaskSourcesToOtherThreadGroup(
263     ThreadGroup* destination_thread_group) {
264   CheckedAutoLock current_thread_group_lock(lock_);
265   CheckedAutoLock destination_thread_group_lock(
266       destination_thread_group->lock_);
267   destination_thread_group->priority_queue_ = std::move(priority_queue_);
268   replacement_thread_group_ = destination_thread_group;
269 }
270 
HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(ThreadGroup * destination_thread_group)271 void ThreadGroup::HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(
272     ThreadGroup* destination_thread_group) {
273   CheckedAutoLock current_thread_group_lock(lock_);
274   CheckedAutoLock destination_thread_group_lock(
275       destination_thread_group->lock_);
276   PriorityQueue new_priority_queue;
277   TaskSourceSortKey top_sort_key;
278   // This works because all USER_BLOCKING tasks are at the front of the queue.
279   while (!priority_queue_.IsEmpty() &&
280          (top_sort_key = priority_queue_.PeekSortKey()).priority() ==
281              TaskPriority::USER_BLOCKING) {
282     new_priority_queue.Push(priority_queue_.PopTaskSource(), top_sort_key);
283   }
284   while (!priority_queue_.IsEmpty()) {
285     top_sort_key = priority_queue_.PeekSortKey();
286     destination_thread_group->priority_queue_.Push(
287         priority_queue_.PopTaskSource(), top_sort_key);
288   }
289   priority_queue_ = std::move(new_priority_queue);
290 }
291 
ShouldYield(TaskSourceSortKey sort_key)292 bool ThreadGroup::ShouldYield(TaskSourceSortKey sort_key) {
293   DCHECK(TS_UNCHECKED_READ(max_allowed_sort_key_).is_lock_free());
294 
295   if (!task_tracker_->CanRunPriority(sort_key.priority()))
296     return true;
297   // It is safe to read |max_allowed_sort_key_| without a lock since this
298   // variable is atomic, keeping in mind that threads may not immediately see
299   // the new value when it is updated.
300   auto max_allowed_sort_key =
301       TS_UNCHECKED_READ(max_allowed_sort_key_).load(std::memory_order_relaxed);
302 
303   // To reduce unnecessary yielding, a task will never yield to a BEST_EFFORT
304   // task regardless of its worker_count.
305   if (sort_key.priority() > max_allowed_sort_key.priority ||
306       max_allowed_sort_key.priority == TaskPriority::BEST_EFFORT) {
307     return false;
308   }
309   // Otherwise, a task only yields to a task of equal priority if its
310   // worker_count would be greater still after yielding, e.g. a job with 1
311   // worker doesn't yield to a job with 0 workers.
312   if (sort_key.priority() == max_allowed_sort_key.priority &&
313       sort_key.worker_count() <= max_allowed_sort_key.worker_count + 1) {
314     return false;
315   }
316 
317   // Reset |max_allowed_sort_key_| so that only one thread should yield at a
318   // time for a given task.
319   max_allowed_sort_key =
320       TS_UNCHECKED_READ(max_allowed_sort_key_)
321           .exchange(kMaxYieldSortKey, std::memory_order_relaxed);
322   // Another thread might have decided to yield and racily reset
323   // |max_allowed_sort_key_|, in which case this thread doesn't yield.
324   return max_allowed_sort_key.priority != TaskPriority::BEST_EFFORT;
325 }
326 
327 #if BUILDFLAG(IS_WIN)
328 // static
329 std::unique_ptr<win::ScopedWindowsThreadEnvironment>
GetScopedWindowsThreadEnvironment(WorkerEnvironment environment)330 ThreadGroup::GetScopedWindowsThreadEnvironment(WorkerEnvironment environment) {
331   std::unique_ptr<win::ScopedWindowsThreadEnvironment> scoped_environment;
332   if (environment == WorkerEnvironment::COM_MTA) {
333     scoped_environment = std::make_unique<win::ScopedWinrtInitializer>();
334   }
335 
336   DCHECK(!scoped_environment || scoped_environment->Succeeded());
337   return scoped_environment;
338 }
339 #endif
340 
341 // static
CurrentThreadHasGroup()342 bool ThreadGroup::CurrentThreadHasGroup() {
343   return current_thread_group != nullptr;
344 }
345 
346 }  // namespace internal
347 }  // namespace base
348