• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group.h"
6 
7 #include <utility>
8 
9 #include "base/check.h"
10 #include "base/feature_list.h"
11 #include "base/functional/bind.h"
12 #include "base/functional/callback_helpers.h"
13 #include "base/task/task_features.h"
14 #include "base/task/thread_pool/task_tracker.h"
15 #include "build/build_config.h"
16 #include "third_party/abseil-cpp/absl/base/attributes.h"
17 
18 #if BUILDFLAG(IS_WIN)
19 #include "base/win/com_init_check_hook.h"
20 #include "base/win/scoped_winrt_initializer.h"
21 #endif
22 
23 namespace base {
24 namespace internal {
25 
26 namespace {
27 
28 // ThreadGroup that owns the current thread, if any.
29 ABSL_CONST_INIT thread_local const ThreadGroup* current_thread_group = nullptr;
30 
31 }  // namespace
32 
33 constexpr ThreadGroup::YieldSortKey ThreadGroup::kMaxYieldSortKey;
34 
ScheduleReleaseTaskSource(RegisteredTaskSource task_source)35 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleReleaseTaskSource(
36     RegisteredTaskSource task_source) {
37   task_sources_to_release_.push_back(std::move(task_source));
38 }
39 
40 ThreadGroup::BaseScopedCommandsExecutor::BaseScopedCommandsExecutor() = default;
41 
~BaseScopedCommandsExecutor()42 ThreadGroup::BaseScopedCommandsExecutor::~BaseScopedCommandsExecutor() {
43   CheckedLock::AssertNoLockHeldOnCurrentThread();
44 }
45 
46 ThreadGroup::ScopedReenqueueExecutor::ScopedReenqueueExecutor() = default;
47 
~ScopedReenqueueExecutor()48 ThreadGroup::ScopedReenqueueExecutor::~ScopedReenqueueExecutor() {
49   if (destination_thread_group_) {
50     destination_thread_group_->PushTaskSourceAndWakeUpWorkers(
51         std::move(transaction_with_task_source_.value()));
52   }
53 }
54 
55 void ThreadGroup::ScopedReenqueueExecutor::
SchedulePushTaskSourceAndWakeUpWorkers(RegisteredTaskSourceAndTransaction transaction_with_task_source,ThreadGroup * destination_thread_group)56     SchedulePushTaskSourceAndWakeUpWorkers(
57         RegisteredTaskSourceAndTransaction transaction_with_task_source,
58         ThreadGroup* destination_thread_group) {
59   DCHECK(destination_thread_group);
60   DCHECK(!destination_thread_group_);
61   DCHECK(!transaction_with_task_source_);
62   transaction_with_task_source_.emplace(
63       std::move(transaction_with_task_source));
64   destination_thread_group_ = destination_thread_group;
65 }
66 
ThreadGroup(TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)67 ThreadGroup::ThreadGroup(TrackedRef<TaskTracker> task_tracker,
68                          TrackedRef<Delegate> delegate)
69     : task_tracker_(std::move(task_tracker)), delegate_(std::move(delegate)) {
70   DCHECK(task_tracker_);
71 }
72 
73 ThreadGroup::~ThreadGroup() = default;
74 
BindToCurrentThread()75 void ThreadGroup::BindToCurrentThread() {
76   DCHECK(!CurrentThreadHasGroup());
77   current_thread_group = this;
78 }
79 
UnbindFromCurrentThread()80 void ThreadGroup::UnbindFromCurrentThread() {
81   DCHECK(IsBoundToCurrentThread());
82   current_thread_group = nullptr;
83 }
84 
IsBoundToCurrentThread() const85 bool ThreadGroup::IsBoundToCurrentThread() const {
86   return current_thread_group == this;
87 }
88 
Start()89 void ThreadGroup::Start() {
90   CheckedAutoLock auto_lock(lock_);
91 }
92 
93 size_t
GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const94 ThreadGroup::GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired()
95     const {
96   // For simplicity, only 1 worker is assigned to each task source regardless of
97   // its max concurrency, with the exception of the top task source.
98   const size_t num_queued =
99       priority_queue_.GetNumTaskSourcesWithPriority(TaskPriority::BEST_EFFORT);
100   if (num_queued == 0 ||
101       !task_tracker_->CanRunPriority(TaskPriority::BEST_EFFORT)) {
102     return 0U;
103   }
104   if (priority_queue_.PeekSortKey().priority() == TaskPriority::BEST_EFFORT) {
105     // Assign the correct number of workers for the top TaskSource (-1 for the
106     // worker that is already accounted for in |num_queued|).
107     return std::max<size_t>(
108         1, num_queued +
109                priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1);
110   }
111   return num_queued;
112 }
113 
114 size_t
GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() const115 ThreadGroup::GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired()
116     const {
117   // For simplicity, only 1 worker is assigned to each task source regardless of
118   // its max concurrency, with the exception of the top task source.
119   const size_t num_queued = priority_queue_.GetNumTaskSourcesWithPriority(
120                                 TaskPriority::USER_VISIBLE) +
121                             priority_queue_.GetNumTaskSourcesWithPriority(
122                                 TaskPriority::USER_BLOCKING);
123   if (num_queued == 0 ||
124       !task_tracker_->CanRunPriority(TaskPriority::HIGHEST)) {
125     return 0U;
126   }
127   auto priority = priority_queue_.PeekSortKey().priority();
128   if (priority == TaskPriority::USER_VISIBLE ||
129       priority == TaskPriority::USER_BLOCKING) {
130     // Assign the correct number of workers for the top TaskSource (-1 for the
131     // worker that is already accounted for in |num_queued|).
132     return std::max<size_t>(
133         1, num_queued +
134                priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1);
135   }
136   return num_queued;
137 }
138 
RemoveTaskSource(const TaskSource & task_source)139 RegisteredTaskSource ThreadGroup::RemoveTaskSource(
140     const TaskSource& task_source) {
141   CheckedAutoLock auto_lock(lock_);
142   return priority_queue_.RemoveTaskSource(task_source);
143 }
144 
ReEnqueueTaskSourceLockRequired(BaseScopedCommandsExecutor * workers_executor,ScopedReenqueueExecutor * reenqueue_executor,RegisteredTaskSourceAndTransaction transaction_with_task_source)145 void ThreadGroup::ReEnqueueTaskSourceLockRequired(
146     BaseScopedCommandsExecutor* workers_executor,
147     ScopedReenqueueExecutor* reenqueue_executor,
148     RegisteredTaskSourceAndTransaction transaction_with_task_source) {
149   // Decide in which thread group the TaskSource should be reenqueued.
150   ThreadGroup* destination_thread_group = delegate_->GetThreadGroupForTraits(
151       transaction_with_task_source.transaction.traits());
152 
153   bool push_to_immediate_queue =
154       transaction_with_task_source.task_source.WillReEnqueue(
155           TimeTicks::Now(), &transaction_with_task_source.transaction);
156 
157   if (destination_thread_group == this) {
158     // Another worker that was running a task from this task source may have
159     // reenqueued it already, in which case its heap_handle will be valid. It
160     // shouldn't be queued twice so the task source registration is released.
161     if (transaction_with_task_source.task_source->immediate_heap_handle()
162             .IsValid()) {
163       workers_executor->ScheduleReleaseTaskSource(
164           std::move(transaction_with_task_source.task_source));
165     } else {
166       // If the TaskSource should be reenqueued in the current thread group,
167       // reenqueue it inside the scope of the lock.
168       if (push_to_immediate_queue) {
169         auto sort_key = transaction_with_task_source.task_source->GetSortKey();
170         // When moving |task_source| into |priority_queue_|, it may be destroyed
171         // on another thread as soon as |lock_| is released, since we're no
172         // longer holding a reference to it. To prevent UAF, release
173         // |transaction| before moving |task_source|. Ref. crbug.com/1412008
174         transaction_with_task_source.transaction.Release();
175         priority_queue_.Push(
176             std::move(transaction_with_task_source.task_source), sort_key);
177       }
178     }
179     // This is called unconditionally to ensure there are always workers to run
180     // task sources in the queue. Some ThreadGroup implementations only invoke
181     // TakeRegisteredTaskSource() once per wake up and hence this is required to
182     // avoid races that could leave a task source stranded in the queue with no
183     // active workers.
184     EnsureEnoughWorkersLockRequired(workers_executor);
185   } else {
186     // Otherwise, schedule a reenqueue after releasing the lock.
187     reenqueue_executor->SchedulePushTaskSourceAndWakeUpWorkers(
188         std::move(transaction_with_task_source), destination_thread_group);
189   }
190 }
191 
TakeRegisteredTaskSource(BaseScopedCommandsExecutor * executor)192 RegisteredTaskSource ThreadGroup::TakeRegisteredTaskSource(
193     BaseScopedCommandsExecutor* executor) {
194   DCHECK(!priority_queue_.IsEmpty());
195 
196   auto run_status = priority_queue_.PeekTaskSource().WillRunTask();
197 
198   if (run_status == TaskSource::RunStatus::kDisallowed) {
199     executor->ScheduleReleaseTaskSource(priority_queue_.PopTaskSource());
200     return nullptr;
201   }
202 
203   if (run_status == TaskSource::RunStatus::kAllowedSaturated)
204     return priority_queue_.PopTaskSource();
205 
206   // If the TaskSource isn't saturated, check whether TaskTracker allows it to
207   // remain in the PriorityQueue.
208   // The canonical way of doing this is to pop the task source to return, call
209   // RegisterTaskSource() to get an additional RegisteredTaskSource, and
210   // reenqueue that task source if valid. Instead, it is cheaper and equivalent
211   // to peek the task source, call RegisterTaskSource() to get an additional
212   // RegisteredTaskSource to replace if valid, and only pop |priority_queue_|
213   // otherwise.
214   RegisteredTaskSource task_source =
215       task_tracker_->RegisterTaskSource(priority_queue_.PeekTaskSource().get());
216   if (!task_source)
217     return priority_queue_.PopTaskSource();
218   // Replace the top task_source and then update the queue.
219   std::swap(priority_queue_.PeekTaskSource(), task_source);
220   priority_queue_.UpdateSortKey(*task_source.get(), task_source->GetSortKey());
221   return task_source;
222 }
223 
UpdateSortKeyImpl(BaseScopedCommandsExecutor * executor,TaskSource::Transaction transaction)224 void ThreadGroup::UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor,
225                                     TaskSource::Transaction transaction) {
226   CheckedAutoLock auto_lock(lock_);
227   priority_queue_.UpdateSortKey(*transaction.task_source(),
228                                 transaction.task_source()->GetSortKey());
229   EnsureEnoughWorkersLockRequired(executor);
230 }
231 
PushTaskSourceAndWakeUpWorkersImpl(BaseScopedCommandsExecutor * executor,RegisteredTaskSourceAndTransaction transaction_with_task_source)232 void ThreadGroup::PushTaskSourceAndWakeUpWorkersImpl(
233     BaseScopedCommandsExecutor* executor,
234     RegisteredTaskSourceAndTransaction transaction_with_task_source) {
235   CheckedAutoLock auto_lock(lock_);
236   DCHECK(!replacement_thread_group_);
237   DCHECK_EQ(delegate_->GetThreadGroupForTraits(
238                 transaction_with_task_source.transaction.traits()),
239             this);
240   if (transaction_with_task_source.task_source->immediate_heap_handle()
241           .IsValid()) {
242     // If the task source changed group, it is possible that multiple concurrent
243     // workers try to enqueue it. Only the first enqueue should succeed.
244     executor->ScheduleReleaseTaskSource(
245         std::move(transaction_with_task_source.task_source));
246     return;
247   }
248   auto sort_key = transaction_with_task_source.task_source->GetSortKey();
249   // When moving |task_source| into |priority_queue_|, it may be destroyed
250   // on another thread as soon as |lock_| is released, since we're no longer
251   // holding a reference to it. To prevent UAF, release |transaction| before
252   // moving |task_source|. Ref. crbug.com/1412008
253   transaction_with_task_source.transaction.Release();
254   priority_queue_.Push(std::move(transaction_with_task_source.task_source),
255                        sort_key);
256   EnsureEnoughWorkersLockRequired(executor);
257 }
258 
HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(ThreadGroup * destination_thread_group)259 void ThreadGroup::HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(
260     ThreadGroup* destination_thread_group) {
261   PriorityQueue new_priority_queue;
262   TaskSourceSortKey top_sort_key;
263   {
264     // This works because all USER_BLOCKING tasks are at the front of the queue.
265     CheckedAutoLock current_thread_group_lock(lock_);
266     while (!priority_queue_.IsEmpty() &&
267            (top_sort_key = priority_queue_.PeekSortKey()).priority() ==
268                TaskPriority::USER_BLOCKING) {
269       new_priority_queue.Push(priority_queue_.PopTaskSource(), top_sort_key);
270     }
271     new_priority_queue.swap(priority_queue_);
272   }
273   {
274     CheckedAutoLock destination_thread_group_lock(
275         destination_thread_group->lock_);
276     while (!new_priority_queue.IsEmpty()) {
277       top_sort_key = new_priority_queue.PeekSortKey();
278       destination_thread_group->priority_queue_.Push(
279           new_priority_queue.PopTaskSource(), top_sort_key);
280     }
281   }
282 }
283 
ShouldYield(TaskSourceSortKey sort_key)284 bool ThreadGroup::ShouldYield(TaskSourceSortKey sort_key) {
285   DCHECK(TS_UNCHECKED_READ(max_allowed_sort_key_).is_lock_free());
286 
287   if (!task_tracker_->CanRunPriority(sort_key.priority()))
288     return true;
289   // It is safe to read |max_allowed_sort_key_| without a lock since this
290   // variable is atomic, keeping in mind that threads may not immediately see
291   // the new value when it is updated.
292   auto max_allowed_sort_key =
293       TS_UNCHECKED_READ(max_allowed_sort_key_).load(std::memory_order_relaxed);
294 
295   // To reduce unnecessary yielding, a task will never yield to a BEST_EFFORT
296   // task regardless of its worker_count.
297   if (sort_key.priority() > max_allowed_sort_key.priority ||
298       max_allowed_sort_key.priority == TaskPriority::BEST_EFFORT) {
299     return false;
300   }
301   // Otherwise, a task only yields to a task of equal priority if its
302   // worker_count would be greater still after yielding, e.g. a job with 1
303   // worker doesn't yield to a job with 0 workers.
304   if (sort_key.priority() == max_allowed_sort_key.priority &&
305       sort_key.worker_count() <= max_allowed_sort_key.worker_count + 1) {
306     return false;
307   }
308 
309   // Reset |max_allowed_sort_key_| so that only one thread should yield at a
310   // time for a given task.
311   max_allowed_sort_key =
312       TS_UNCHECKED_READ(max_allowed_sort_key_)
313           .exchange(kMaxYieldSortKey, std::memory_order_relaxed);
314   // Another thread might have decided to yield and racily reset
315   // |max_allowed_sort_key_|, in which case this thread doesn't yield.
316   return max_allowed_sort_key.priority != TaskPriority::BEST_EFFORT;
317 }
318 
319 #if BUILDFLAG(IS_WIN)
320 // static
321 std::unique_ptr<win::ScopedWindowsThreadEnvironment>
GetScopedWindowsThreadEnvironment(WorkerEnvironment environment)322 ThreadGroup::GetScopedWindowsThreadEnvironment(WorkerEnvironment environment) {
323   std::unique_ptr<win::ScopedWindowsThreadEnvironment> scoped_environment;
324   if (environment == WorkerEnvironment::COM_MTA) {
325     scoped_environment = std::make_unique<win::ScopedWinrtInitializer>();
326 
327     // TODO(crbug.com/1498668): rollback the change or replace it with a CHECK
328     // before closing the bug.
329     DUMP_WILL_BE_CHECK(scoped_environment->Succeeded());
330   }
331 
332   DCHECK(!scoped_environment || scoped_environment->Succeeded());
333   return scoped_environment;
334 }
335 #endif
336 
337 // static
CurrentThreadHasGroup()338 bool ThreadGroup::CurrentThreadHasGroup() {
339   return current_thread_group != nullptr;
340 }
341 
342 }  // namespace internal
343 }  // namespace base
344