1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_group.h"
6
7 #include <utility>
8
9 #include "base/check.h"
10 #include "base/feature_list.h"
11 #include "base/functional/bind.h"
12 #include "base/functional/callback_helpers.h"
13 #include "base/task/task_features.h"
14 #include "base/task/thread_pool/task_tracker.h"
15 #include "build/build_config.h"
16 #include "third_party/abseil-cpp/absl/base/attributes.h"
17
18 #if BUILDFLAG(IS_WIN)
19 #include "base/win/com_init_check_hook.h"
20 #include "base/win/scoped_winrt_initializer.h"
21 #endif
22
23 namespace base {
24 namespace internal {
25
26 namespace {
27
28 // ThreadGroup that owns the current thread, if any.
29 ABSL_CONST_INIT thread_local const ThreadGroup* current_thread_group = nullptr;
30
31 } // namespace
32
33 constexpr ThreadGroup::YieldSortKey ThreadGroup::kMaxYieldSortKey;
34
ScheduleReleaseTaskSource(RegisteredTaskSource task_source)35 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleReleaseTaskSource(
36 RegisteredTaskSource task_source) {
37 task_sources_to_release_.push_back(std::move(task_source));
38 }
39
40 ThreadGroup::BaseScopedCommandsExecutor::BaseScopedCommandsExecutor() = default;
41
~BaseScopedCommandsExecutor()42 ThreadGroup::BaseScopedCommandsExecutor::~BaseScopedCommandsExecutor() {
43 CheckedLock::AssertNoLockHeldOnCurrentThread();
44 }
45
46 ThreadGroup::ScopedReenqueueExecutor::ScopedReenqueueExecutor() = default;
47
~ScopedReenqueueExecutor()48 ThreadGroup::ScopedReenqueueExecutor::~ScopedReenqueueExecutor() {
49 if (destination_thread_group_) {
50 destination_thread_group_->PushTaskSourceAndWakeUpWorkers(
51 std::move(transaction_with_task_source_.value()));
52 }
53 }
54
55 void ThreadGroup::ScopedReenqueueExecutor::
SchedulePushTaskSourceAndWakeUpWorkers(RegisteredTaskSourceAndTransaction transaction_with_task_source,ThreadGroup * destination_thread_group)56 SchedulePushTaskSourceAndWakeUpWorkers(
57 RegisteredTaskSourceAndTransaction transaction_with_task_source,
58 ThreadGroup* destination_thread_group) {
59 DCHECK(destination_thread_group);
60 DCHECK(!destination_thread_group_);
61 DCHECK(!transaction_with_task_source_);
62 transaction_with_task_source_.emplace(
63 std::move(transaction_with_task_source));
64 destination_thread_group_ = destination_thread_group;
65 }
66
ThreadGroup(TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)67 ThreadGroup::ThreadGroup(TrackedRef<TaskTracker> task_tracker,
68 TrackedRef<Delegate> delegate)
69 : task_tracker_(std::move(task_tracker)), delegate_(std::move(delegate)) {
70 DCHECK(task_tracker_);
71 }
72
73 ThreadGroup::~ThreadGroup() = default;
74
BindToCurrentThread()75 void ThreadGroup::BindToCurrentThread() {
76 DCHECK(!CurrentThreadHasGroup());
77 current_thread_group = this;
78 }
79
UnbindFromCurrentThread()80 void ThreadGroup::UnbindFromCurrentThread() {
81 DCHECK(IsBoundToCurrentThread());
82 current_thread_group = nullptr;
83 }
84
IsBoundToCurrentThread() const85 bool ThreadGroup::IsBoundToCurrentThread() const {
86 return current_thread_group == this;
87 }
88
Start()89 void ThreadGroup::Start() {
90 CheckedAutoLock auto_lock(lock_);
91 }
92
93 size_t
GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const94 ThreadGroup::GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired()
95 const {
96 // For simplicity, only 1 worker is assigned to each task source regardless of
97 // its max concurrency, with the exception of the top task source.
98 const size_t num_queued =
99 priority_queue_.GetNumTaskSourcesWithPriority(TaskPriority::BEST_EFFORT);
100 if (num_queued == 0 ||
101 !task_tracker_->CanRunPriority(TaskPriority::BEST_EFFORT)) {
102 return 0U;
103 }
104 if (priority_queue_.PeekSortKey().priority() == TaskPriority::BEST_EFFORT) {
105 // Assign the correct number of workers for the top TaskSource (-1 for the
106 // worker that is already accounted for in |num_queued|).
107 return std::max<size_t>(
108 1, num_queued +
109 priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1);
110 }
111 return num_queued;
112 }
113
114 size_t
GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() const115 ThreadGroup::GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired()
116 const {
117 // For simplicity, only 1 worker is assigned to each task source regardless of
118 // its max concurrency, with the exception of the top task source.
119 const size_t num_queued = priority_queue_.GetNumTaskSourcesWithPriority(
120 TaskPriority::USER_VISIBLE) +
121 priority_queue_.GetNumTaskSourcesWithPriority(
122 TaskPriority::USER_BLOCKING);
123 if (num_queued == 0 ||
124 !task_tracker_->CanRunPriority(TaskPriority::HIGHEST)) {
125 return 0U;
126 }
127 auto priority = priority_queue_.PeekSortKey().priority();
128 if (priority == TaskPriority::USER_VISIBLE ||
129 priority == TaskPriority::USER_BLOCKING) {
130 // Assign the correct number of workers for the top TaskSource (-1 for the
131 // worker that is already accounted for in |num_queued|).
132 return std::max<size_t>(
133 1, num_queued +
134 priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1);
135 }
136 return num_queued;
137 }
138
RemoveTaskSource(const TaskSource & task_source)139 RegisteredTaskSource ThreadGroup::RemoveTaskSource(
140 const TaskSource& task_source) {
141 CheckedAutoLock auto_lock(lock_);
142 return priority_queue_.RemoveTaskSource(task_source);
143 }
144
ReEnqueueTaskSourceLockRequired(BaseScopedCommandsExecutor * workers_executor,ScopedReenqueueExecutor * reenqueue_executor,RegisteredTaskSourceAndTransaction transaction_with_task_source)145 void ThreadGroup::ReEnqueueTaskSourceLockRequired(
146 BaseScopedCommandsExecutor* workers_executor,
147 ScopedReenqueueExecutor* reenqueue_executor,
148 RegisteredTaskSourceAndTransaction transaction_with_task_source) {
149 // Decide in which thread group the TaskSource should be reenqueued.
150 ThreadGroup* destination_thread_group = delegate_->GetThreadGroupForTraits(
151 transaction_with_task_source.transaction.traits());
152
153 bool push_to_immediate_queue =
154 transaction_with_task_source.task_source.WillReEnqueue(
155 TimeTicks::Now(), &transaction_with_task_source.transaction);
156
157 if (destination_thread_group == this) {
158 // Another worker that was running a task from this task source may have
159 // reenqueued it already, in which case its heap_handle will be valid. It
160 // shouldn't be queued twice so the task source registration is released.
161 if (transaction_with_task_source.task_source->immediate_heap_handle()
162 .IsValid()) {
163 workers_executor->ScheduleReleaseTaskSource(
164 std::move(transaction_with_task_source.task_source));
165 } else {
166 // If the TaskSource should be reenqueued in the current thread group,
167 // reenqueue it inside the scope of the lock.
168 if (push_to_immediate_queue) {
169 auto sort_key = transaction_with_task_source.task_source->GetSortKey();
170 // When moving |task_source| into |priority_queue_|, it may be destroyed
171 // on another thread as soon as |lock_| is released, since we're no
172 // longer holding a reference to it. To prevent UAF, release
173 // |transaction| before moving |task_source|. Ref. crbug.com/1412008
174 transaction_with_task_source.transaction.Release();
175 priority_queue_.Push(
176 std::move(transaction_with_task_source.task_source), sort_key);
177 }
178 }
179 // This is called unconditionally to ensure there are always workers to run
180 // task sources in the queue. Some ThreadGroup implementations only invoke
181 // TakeRegisteredTaskSource() once per wake up and hence this is required to
182 // avoid races that could leave a task source stranded in the queue with no
183 // active workers.
184 EnsureEnoughWorkersLockRequired(workers_executor);
185 } else {
186 // Otherwise, schedule a reenqueue after releasing the lock.
187 reenqueue_executor->SchedulePushTaskSourceAndWakeUpWorkers(
188 std::move(transaction_with_task_source), destination_thread_group);
189 }
190 }
191
TakeRegisteredTaskSource(BaseScopedCommandsExecutor * executor)192 RegisteredTaskSource ThreadGroup::TakeRegisteredTaskSource(
193 BaseScopedCommandsExecutor* executor) {
194 DCHECK(!priority_queue_.IsEmpty());
195
196 auto run_status = priority_queue_.PeekTaskSource().WillRunTask();
197
198 if (run_status == TaskSource::RunStatus::kDisallowed) {
199 executor->ScheduleReleaseTaskSource(priority_queue_.PopTaskSource());
200 return nullptr;
201 }
202
203 if (run_status == TaskSource::RunStatus::kAllowedSaturated)
204 return priority_queue_.PopTaskSource();
205
206 // If the TaskSource isn't saturated, check whether TaskTracker allows it to
207 // remain in the PriorityQueue.
208 // The canonical way of doing this is to pop the task source to return, call
209 // RegisterTaskSource() to get an additional RegisteredTaskSource, and
210 // reenqueue that task source if valid. Instead, it is cheaper and equivalent
211 // to peek the task source, call RegisterTaskSource() to get an additional
212 // RegisteredTaskSource to replace if valid, and only pop |priority_queue_|
213 // otherwise.
214 RegisteredTaskSource task_source =
215 task_tracker_->RegisterTaskSource(priority_queue_.PeekTaskSource().get());
216 if (!task_source)
217 return priority_queue_.PopTaskSource();
218 // Replace the top task_source and then update the queue.
219 std::swap(priority_queue_.PeekTaskSource(), task_source);
220 priority_queue_.UpdateSortKey(*task_source.get(), task_source->GetSortKey());
221 return task_source;
222 }
223
UpdateSortKeyImpl(BaseScopedCommandsExecutor * executor,TaskSource::Transaction transaction)224 void ThreadGroup::UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor,
225 TaskSource::Transaction transaction) {
226 CheckedAutoLock auto_lock(lock_);
227 priority_queue_.UpdateSortKey(*transaction.task_source(),
228 transaction.task_source()->GetSortKey());
229 EnsureEnoughWorkersLockRequired(executor);
230 }
231
PushTaskSourceAndWakeUpWorkersImpl(BaseScopedCommandsExecutor * executor,RegisteredTaskSourceAndTransaction transaction_with_task_source)232 void ThreadGroup::PushTaskSourceAndWakeUpWorkersImpl(
233 BaseScopedCommandsExecutor* executor,
234 RegisteredTaskSourceAndTransaction transaction_with_task_source) {
235 CheckedAutoLock auto_lock(lock_);
236 DCHECK(!replacement_thread_group_);
237 DCHECK_EQ(delegate_->GetThreadGroupForTraits(
238 transaction_with_task_source.transaction.traits()),
239 this);
240 if (transaction_with_task_source.task_source->immediate_heap_handle()
241 .IsValid()) {
242 // If the task source changed group, it is possible that multiple concurrent
243 // workers try to enqueue it. Only the first enqueue should succeed.
244 executor->ScheduleReleaseTaskSource(
245 std::move(transaction_with_task_source.task_source));
246 return;
247 }
248 auto sort_key = transaction_with_task_source.task_source->GetSortKey();
249 // When moving |task_source| into |priority_queue_|, it may be destroyed
250 // on another thread as soon as |lock_| is released, since we're no longer
251 // holding a reference to it. To prevent UAF, release |transaction| before
252 // moving |task_source|. Ref. crbug.com/1412008
253 transaction_with_task_source.transaction.Release();
254 priority_queue_.Push(std::move(transaction_with_task_source.task_source),
255 sort_key);
256 EnsureEnoughWorkersLockRequired(executor);
257 }
258
HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(ThreadGroup * destination_thread_group)259 void ThreadGroup::HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(
260 ThreadGroup* destination_thread_group) {
261 PriorityQueue new_priority_queue;
262 TaskSourceSortKey top_sort_key;
263 {
264 // This works because all USER_BLOCKING tasks are at the front of the queue.
265 CheckedAutoLock current_thread_group_lock(lock_);
266 while (!priority_queue_.IsEmpty() &&
267 (top_sort_key = priority_queue_.PeekSortKey()).priority() ==
268 TaskPriority::USER_BLOCKING) {
269 new_priority_queue.Push(priority_queue_.PopTaskSource(), top_sort_key);
270 }
271 new_priority_queue.swap(priority_queue_);
272 }
273 {
274 CheckedAutoLock destination_thread_group_lock(
275 destination_thread_group->lock_);
276 while (!new_priority_queue.IsEmpty()) {
277 top_sort_key = new_priority_queue.PeekSortKey();
278 destination_thread_group->priority_queue_.Push(
279 new_priority_queue.PopTaskSource(), top_sort_key);
280 }
281 }
282 }
283
ShouldYield(TaskSourceSortKey sort_key)284 bool ThreadGroup::ShouldYield(TaskSourceSortKey sort_key) {
285 DCHECK(TS_UNCHECKED_READ(max_allowed_sort_key_).is_lock_free());
286
287 if (!task_tracker_->CanRunPriority(sort_key.priority()))
288 return true;
289 // It is safe to read |max_allowed_sort_key_| without a lock since this
290 // variable is atomic, keeping in mind that threads may not immediately see
291 // the new value when it is updated.
292 auto max_allowed_sort_key =
293 TS_UNCHECKED_READ(max_allowed_sort_key_).load(std::memory_order_relaxed);
294
295 // To reduce unnecessary yielding, a task will never yield to a BEST_EFFORT
296 // task regardless of its worker_count.
297 if (sort_key.priority() > max_allowed_sort_key.priority ||
298 max_allowed_sort_key.priority == TaskPriority::BEST_EFFORT) {
299 return false;
300 }
301 // Otherwise, a task only yields to a task of equal priority if its
302 // worker_count would be greater still after yielding, e.g. a job with 1
303 // worker doesn't yield to a job with 0 workers.
304 if (sort_key.priority() == max_allowed_sort_key.priority &&
305 sort_key.worker_count() <= max_allowed_sort_key.worker_count + 1) {
306 return false;
307 }
308
309 // Reset |max_allowed_sort_key_| so that only one thread should yield at a
310 // time for a given task.
311 max_allowed_sort_key =
312 TS_UNCHECKED_READ(max_allowed_sort_key_)
313 .exchange(kMaxYieldSortKey, std::memory_order_relaxed);
314 // Another thread might have decided to yield and racily reset
315 // |max_allowed_sort_key_|, in which case this thread doesn't yield.
316 return max_allowed_sort_key.priority != TaskPriority::BEST_EFFORT;
317 }
318
319 #if BUILDFLAG(IS_WIN)
320 // static
321 std::unique_ptr<win::ScopedWindowsThreadEnvironment>
GetScopedWindowsThreadEnvironment(WorkerEnvironment environment)322 ThreadGroup::GetScopedWindowsThreadEnvironment(WorkerEnvironment environment) {
323 std::unique_ptr<win::ScopedWindowsThreadEnvironment> scoped_environment;
324 if (environment == WorkerEnvironment::COM_MTA) {
325 scoped_environment = std::make_unique<win::ScopedWinrtInitializer>();
326
327 // TODO(crbug.com/1498668): rollback the change or replace it with a CHECK
328 // before closing the bug.
329 DUMP_WILL_BE_CHECK(scoped_environment->Succeeded());
330 }
331
332 DCHECK(!scoped_environment || scoped_environment->Succeeded());
333 return scoped_environment;
334 }
335 #endif
336
337 // static
CurrentThreadHasGroup()338 bool ThreadGroup::CurrentThreadHasGroup() {
339 return current_thread_group != nullptr;
340 }
341
342 } // namespace internal
343 } // namespace base
344