1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_group.h"
6
7 #include <string_view>
8 #include <utility>
9
10 #include "base/check.h"
11 #include "base/functional/bind.h"
12 #include "base/functional/callback_helpers.h"
13 #include "base/task/task_features.h"
14 #include "base/task/thread_pool/task_tracker.h"
15 #include "build/build_config.h"
16
17 #if BUILDFLAG(IS_WIN)
18 #include "base/win/com_init_check_hook.h"
19 #include "base/win/scoped_winrt_initializer.h"
20 #endif
21
22 namespace base {
23 namespace internal {
24
25 namespace {
26
27 constexpr size_t kMaxNumberOfWorkers = 256;
28
29 // In a background thread group:
30 // - Blocking calls take more time than in a foreground thread group.
31 // - We want to minimize impact on foreground work, not maximize execution
32 // throughput.
33 // For these reasons, the timeout to increase the maximum number of concurrent
34 // tasks when there is a MAY_BLOCK ScopedBlockingCall is *long*. It is not
35 // infinite because execution throughput should not be reduced forever if a task
36 // blocks forever.
37 //
38 // TODO(fdoray): On platforms without background thread groups, blocking in a
39 // BEST_EFFORT task should:
40 // 1. Increment the maximum number of concurrent tasks after a *short* timeout,
41 // to allow scheduling of USER_VISIBLE/USER_BLOCKING tasks.
42 // 2. Increment the maximum number of concurrent BEST_EFFORT tasks after a
43 // *long* timeout, because we only want to allow more BEST_EFFORT tasks to be
44 // be scheduled concurrently when we believe that a BEST_EFFORT task is
45 // blocked forever.
46 // Currently, only 1. is true as the configuration is per thread group.
47 // TODO(crbug.com/40612168): Fix racy condition when MayBlockThreshold ==
48 // BlockedWorkersPoll.
49 constexpr TimeDelta kForegroundMayBlockThreshold = Milliseconds(1000);
50 constexpr TimeDelta kForegroundBlockedWorkersPoll = Milliseconds(1200);
51 constexpr TimeDelta kBackgroundMayBlockThreshold = Seconds(10);
52 constexpr TimeDelta kBackgroundBlockedWorkersPoll = Seconds(12);
53
54 // ThreadGroup that owns the current thread, if any.
55 constinit thread_local const ThreadGroup* current_thread_group = nullptr;
56
57 } // namespace
58
59 constexpr ThreadGroup::YieldSortKey ThreadGroup::kMaxYieldSortKey;
60
ScheduleReleaseTaskSource(RegisteredTaskSource task_source)61 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleReleaseTaskSource(
62 RegisteredTaskSource task_source) {
63 task_sources_to_release_.push_back(std::move(task_source));
64 }
65
ScheduleAdjustMaxTasks()66 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleAdjustMaxTasks() {
67 DCHECK(!must_schedule_adjust_max_tasks_);
68 must_schedule_adjust_max_tasks_ = true;
69 }
70
ScheduleStart(scoped_refptr<WorkerThread> worker)71 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleStart(
72 scoped_refptr<WorkerThread> worker) {
73 workers_to_start_.emplace_back(std::move(worker));
74 }
75
BaseScopedCommandsExecutor(ThreadGroup * outer)76 ThreadGroup::BaseScopedCommandsExecutor::BaseScopedCommandsExecutor(
77 ThreadGroup* outer)
78 : outer_(outer) {}
79
~BaseScopedCommandsExecutor()80 ThreadGroup::BaseScopedCommandsExecutor::~BaseScopedCommandsExecutor() {
81 CheckedLock::AssertNoLockHeldOnCurrentThread();
82 Flush();
83 }
84
Flush()85 void ThreadGroup::BaseScopedCommandsExecutor::Flush() {
86 // Start workers. Happens after wake ups (implemented by children and thus
87 // called on their destructor, i.e. before this) to prevent the case where a
88 // worker enters its main function, is descheduled because it wasn't woken up
89 // yet, and is woken up immediately after.
90 for (auto worker : workers_to_start_) {
91 worker->Start(outer_->after_start().service_thread_task_runner,
92 outer_->after_start().worker_thread_observer);
93 if (outer_->worker_started_for_testing_) {
94 outer_->worker_started_for_testing_->Wait();
95 }
96 }
97 workers_to_start_.clear();
98
99 if (must_schedule_adjust_max_tasks_) {
100 outer_->ScheduleAdjustMaxTasks();
101 }
102 }
103
104 ThreadGroup::ScopedReenqueueExecutor::ScopedReenqueueExecutor() = default;
105
~ScopedReenqueueExecutor()106 ThreadGroup::ScopedReenqueueExecutor::~ScopedReenqueueExecutor() {
107 if (destination_thread_group_) {
108 destination_thread_group_->PushTaskSourceAndWakeUpWorkers(
109 std::move(transaction_with_task_source_.value()));
110 }
111 }
112
113 void ThreadGroup::ScopedReenqueueExecutor::
SchedulePushTaskSourceAndWakeUpWorkers(RegisteredTaskSourceAndTransaction transaction_with_task_source,ThreadGroup * destination_thread_group)114 SchedulePushTaskSourceAndWakeUpWorkers(
115 RegisteredTaskSourceAndTransaction transaction_with_task_source,
116 ThreadGroup* destination_thread_group) {
117 DCHECK(destination_thread_group);
118 DCHECK(!destination_thread_group_);
119 DCHECK(!transaction_with_task_source_);
120 transaction_with_task_source_.emplace(
121 std::move(transaction_with_task_source));
122 destination_thread_group_ = destination_thread_group;
123 }
124
ThreadGroup(std::string_view histogram_label,std::string_view thread_group_label,ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)125 ThreadGroup::ThreadGroup(std::string_view histogram_label,
126 std::string_view thread_group_label,
127 ThreadType thread_type_hint,
128 TrackedRef<TaskTracker> task_tracker,
129 TrackedRef<Delegate> delegate)
130 : task_tracker_(std::move(task_tracker)),
131 delegate_(std::move(delegate)),
132 histogram_label_(histogram_label),
133 thread_group_label_(thread_group_label),
134 thread_type_hint_(thread_type_hint),
135 idle_workers_set_cv_for_testing_(lock_.CreateConditionVariable()) {
136 DCHECK(!thread_group_label_.empty());
137 }
138
StartImpl(size_t max_tasks,size_t max_best_effort_tasks,TimeDelta suggested_reclaim_time,scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,WorkerThreadObserver * worker_thread_observer,WorkerEnvironment worker_environment,bool synchronous_thread_start_for_testing,std::optional<TimeDelta> may_block_threshold)139 void ThreadGroup::StartImpl(
140 size_t max_tasks,
141 size_t max_best_effort_tasks,
142 TimeDelta suggested_reclaim_time,
143 scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
144 WorkerThreadObserver* worker_thread_observer,
145 WorkerEnvironment worker_environment,
146 bool synchronous_thread_start_for_testing,
147 std::optional<TimeDelta> may_block_threshold) {
148 if (synchronous_thread_start_for_testing) {
149 worker_started_for_testing_.emplace(WaitableEvent::ResetPolicy::AUTOMATIC);
150 // Don't emit a ScopedBlockingCallWithBaseSyncPrimitives from this
151 // WaitableEvent or it defeats the purpose of having threads start without
152 // externally visible side-effects.
153 worker_started_for_testing_->declare_only_used_while_idle();
154 }
155
156 in_start().may_block_threshold =
157 may_block_threshold ? may_block_threshold.value()
158 : (thread_type_hint_ != ThreadType::kBackground
159 ? kForegroundMayBlockThreshold
160 : kBackgroundMayBlockThreshold);
161 in_start().blocked_workers_poll_period =
162 thread_type_hint_ != ThreadType::kBackground
163 ? kForegroundBlockedWorkersPoll
164 : kBackgroundBlockedWorkersPoll;
165
166 CheckedAutoLock auto_lock(lock_);
167
168 max_tasks_ = max_tasks;
169 baseline_max_tasks_ = max_tasks;
170 DCHECK_GE(max_tasks_, 1U);
171 in_start().initial_max_tasks = std::min(max_tasks_, kMaxNumberOfWorkers);
172 max_best_effort_tasks_ = max_best_effort_tasks;
173 in_start().suggested_reclaim_time = suggested_reclaim_time;
174 in_start().worker_environment = worker_environment;
175 in_start().service_thread_task_runner = std::move(service_thread_task_runner);
176 in_start().worker_thread_observer = worker_thread_observer;
177
178 #if DCHECK_IS_ON()
179 in_start().initialized = true;
180 #endif
181 }
182
183 ThreadGroup::~ThreadGroup() = default;
184
BindToCurrentThread()185 void ThreadGroup::BindToCurrentThread() {
186 DCHECK(!CurrentThreadHasGroup());
187 current_thread_group = this;
188 }
189
UnbindFromCurrentThread()190 void ThreadGroup::UnbindFromCurrentThread() {
191 DCHECK(IsBoundToCurrentThread());
192 current_thread_group = nullptr;
193 }
194
IsBoundToCurrentThread() const195 bool ThreadGroup::IsBoundToCurrentThread() const {
196 return current_thread_group == this;
197 }
198
SetMaxTasks(size_t max_tasks)199 void ThreadGroup::SetMaxTasks(size_t max_tasks) {
200 CheckedAutoLock auto_lock(lock_);
201 size_t extra_tasks = max_tasks_ - baseline_max_tasks_;
202 baseline_max_tasks_ = std::min(max_tasks, after_start().initial_max_tasks);
203 max_tasks_ = baseline_max_tasks_ + extra_tasks;
204 }
205
ResetMaxTasks()206 void ThreadGroup::ResetMaxTasks() {
207 SetMaxTasks(after_start().initial_max_tasks);
208 }
209
210 size_t
GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const211 ThreadGroup::GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired()
212 const {
213 // For simplicity, only 1 worker is assigned to each task source regardless of
214 // its max concurrency, with the exception of the top task source.
215 const size_t num_queued =
216 priority_queue_.GetNumTaskSourcesWithPriority(TaskPriority::BEST_EFFORT);
217 if (num_queued == 0 ||
218 !task_tracker_->CanRunPriority(TaskPriority::BEST_EFFORT)) {
219 return 0U;
220 }
221 if (priority_queue_.PeekSortKey().priority() == TaskPriority::BEST_EFFORT) {
222 // Assign the correct number of workers for the top TaskSource (-1 for the
223 // worker that is already accounted for in |num_queued|).
224 return std::max<size_t>(
225 1, num_queued +
226 priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1);
227 }
228 return num_queued;
229 }
230
231 size_t
GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() const232 ThreadGroup::GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired()
233 const {
234 // For simplicity, only 1 worker is assigned to each task source regardless of
235 // its max concurrency, with the exception of the top task source.
236 const size_t num_queued = priority_queue_.GetNumTaskSourcesWithPriority(
237 TaskPriority::USER_VISIBLE) +
238 priority_queue_.GetNumTaskSourcesWithPriority(
239 TaskPriority::USER_BLOCKING);
240 if (num_queued == 0 ||
241 !task_tracker_->CanRunPriority(TaskPriority::HIGHEST)) {
242 return 0U;
243 }
244 auto priority = priority_queue_.PeekSortKey().priority();
245 if (priority == TaskPriority::USER_VISIBLE ||
246 priority == TaskPriority::USER_BLOCKING) {
247 // Assign the correct number of workers for the top TaskSource (-1 for the
248 // worker that is already accounted for in |num_queued|).
249 return std::max<size_t>(
250 1, num_queued +
251 priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1);
252 }
253 return num_queued;
254 }
255
RemoveTaskSource(const TaskSource & task_source)256 RegisteredTaskSource ThreadGroup::RemoveTaskSource(
257 const TaskSource& task_source) {
258 CheckedAutoLock auto_lock(lock_);
259 return priority_queue_.RemoveTaskSource(task_source);
260 }
261
ReEnqueueTaskSourceLockRequired(BaseScopedCommandsExecutor * workers_executor,ScopedReenqueueExecutor * reenqueue_executor,RegisteredTaskSourceAndTransaction transaction_with_task_source)262 void ThreadGroup::ReEnqueueTaskSourceLockRequired(
263 BaseScopedCommandsExecutor* workers_executor,
264 ScopedReenqueueExecutor* reenqueue_executor,
265 RegisteredTaskSourceAndTransaction transaction_with_task_source) {
266 // Decide in which thread group the TaskSource should be reenqueued.
267 ThreadGroup* destination_thread_group = delegate_->GetThreadGroupForTraits(
268 transaction_with_task_source.transaction.traits());
269
270 bool push_to_immediate_queue =
271 transaction_with_task_source.task_source.WillReEnqueue(
272 TimeTicks::Now(), &transaction_with_task_source.transaction);
273
274 if (destination_thread_group == this) {
275 // Another worker that was running a task from this task source may have
276 // reenqueued it already, in which case its heap_handle will be valid. It
277 // shouldn't be queued twice so the task source registration is released.
278 if (transaction_with_task_source.task_source->immediate_heap_handle()
279 .IsValid()) {
280 workers_executor->ScheduleReleaseTaskSource(
281 std::move(transaction_with_task_source.task_source));
282 } else {
283 // If the TaskSource should be reenqueued in the current thread group,
284 // reenqueue it inside the scope of the lock.
285 if (push_to_immediate_queue) {
286 auto sort_key = transaction_with_task_source.task_source->GetSortKey();
287 // When moving |task_source| into |priority_queue_|, it may be destroyed
288 // on another thread as soon as |lock_| is released, since we're no
289 // longer holding a reference to it. To prevent UAF, release
290 // |transaction| before moving |task_source|. Ref. crbug.com/1412008
291 transaction_with_task_source.transaction.Release();
292 priority_queue_.Push(
293 std::move(transaction_with_task_source.task_source), sort_key);
294 }
295 }
296 // This is called unconditionally to ensure there are always workers to run
297 // task sources in the queue. Some ThreadGroup implementations only invoke
298 // TakeRegisteredTaskSource() once per wake up and hence this is required to
299 // avoid races that could leave a task source stranded in the queue with no
300 // active workers.
301 EnsureEnoughWorkersLockRequired(workers_executor);
302 } else {
303 // Otherwise, schedule a reenqueue after releasing the lock.
304 reenqueue_executor->SchedulePushTaskSourceAndWakeUpWorkers(
305 std::move(transaction_with_task_source), destination_thread_group);
306 }
307 }
308
TakeRegisteredTaskSource(BaseScopedCommandsExecutor * executor)309 RegisteredTaskSource ThreadGroup::TakeRegisteredTaskSource(
310 BaseScopedCommandsExecutor* executor) {
311 DCHECK(!priority_queue_.IsEmpty());
312
313 auto run_status = priority_queue_.PeekTaskSource().WillRunTask();
314
315 if (run_status == TaskSource::RunStatus::kDisallowed) {
316 executor->ScheduleReleaseTaskSource(priority_queue_.PopTaskSource());
317 return nullptr;
318 }
319
320 if (run_status == TaskSource::RunStatus::kAllowedSaturated) {
321 return priority_queue_.PopTaskSource();
322 }
323
324 // If the TaskSource isn't saturated, check whether TaskTracker allows it to
325 // remain in the PriorityQueue.
326 // The canonical way of doing this is to pop the task source to return, call
327 // RegisterTaskSource() to get an additional RegisteredTaskSource, and
328 // reenqueue that task source if valid. Instead, it is cheaper and equivalent
329 // to peek the task source, call RegisterTaskSource() to get an additional
330 // RegisteredTaskSource to replace if valid, and only pop |priority_queue_|
331 // otherwise.
332 RegisteredTaskSource task_source =
333 task_tracker_->RegisterTaskSource(priority_queue_.PeekTaskSource().get());
334 if (!task_source) {
335 return priority_queue_.PopTaskSource();
336 }
337 // Replace the top task_source and then update the queue.
338 std::swap(priority_queue_.PeekTaskSource(), task_source);
339 priority_queue_.UpdateSortKey(*task_source.get(), task_source->GetSortKey());
340 return task_source;
341 }
342
UpdateSortKeyImpl(BaseScopedCommandsExecutor * executor,TaskSource::Transaction transaction)343 void ThreadGroup::UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor,
344 TaskSource::Transaction transaction) {
345 CheckedAutoLock auto_lock(lock_);
346 priority_queue_.UpdateSortKey(*transaction.task_source(),
347 transaction.task_source()->GetSortKey());
348 EnsureEnoughWorkersLockRequired(executor);
349 }
350
PushTaskSourceAndWakeUpWorkersImpl(BaseScopedCommandsExecutor * executor,RegisteredTaskSourceAndTransaction transaction_with_task_source)351 void ThreadGroup::PushTaskSourceAndWakeUpWorkersImpl(
352 BaseScopedCommandsExecutor* executor,
353 RegisteredTaskSourceAndTransaction transaction_with_task_source) {
354 DCHECK_EQ(delegate_->GetThreadGroupForTraits(
355 transaction_with_task_source.transaction.traits()),
356 this);
357 CheckedAutoLock lock(lock_);
358 if (transaction_with_task_source.task_source->immediate_heap_handle()
359 .IsValid()) {
360 // If the task source changed group, it is possible that multiple concurrent
361 // workers try to enqueue it. Only the first enqueue should succeed.
362 executor->ScheduleReleaseTaskSource(
363 std::move(transaction_with_task_source.task_source));
364 return;
365 }
366 auto sort_key = transaction_with_task_source.task_source->GetSortKey();
367 // When moving |task_source| into |priority_queue_|, it may be destroyed
368 // on another thread as soon as |lock_| is released, since we're no longer
369 // holding a reference to it. To prevent UAF, release |transaction| before
370 // moving |task_source|. Ref. crbug.com/1412008
371 transaction_with_task_source.transaction.Release();
372 priority_queue_.Push(std::move(transaction_with_task_source.task_source),
373 sort_key);
374 EnsureEnoughWorkersLockRequired(executor);
375 }
376
EnqueueAllTaskSources(PriorityQueue * new_priority_queue)377 void ThreadGroup::EnqueueAllTaskSources(PriorityQueue* new_priority_queue) {
378 CheckedAutoLock lock(lock_);
379 while (!new_priority_queue->IsEmpty()) {
380 TaskSourceSortKey top_sort_key = new_priority_queue->PeekSortKey();
381 RegisteredTaskSource task_source = new_priority_queue->PopTaskSource();
382 priority_queue_.Push(std::move(task_source), top_sort_key);
383 }
384 }
385
HandoffAllTaskSourcesToOtherThreadGroup(ThreadGroup * destination_thread_group)386 void ThreadGroup::HandoffAllTaskSourcesToOtherThreadGroup(
387 ThreadGroup* destination_thread_group) {
388 PriorityQueue new_priority_queue;
389 TaskSourceSortKey top_sort_key;
390 {
391 CheckedAutoLock current_thread_group_lock(lock_);
392 new_priority_queue.swap(priority_queue_);
393 }
394 destination_thread_group->EnqueueAllTaskSources(&new_priority_queue);
395 }
396
HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(ThreadGroup * destination_thread_group)397 void ThreadGroup::HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(
398 ThreadGroup* destination_thread_group) {
399 PriorityQueue new_priority_queue;
400 TaskSourceSortKey top_sort_key;
401 {
402 // This works because all USER_BLOCKING tasks are at the front of the queue.
403 CheckedAutoLock current_thread_group_lock(lock_);
404 while (!priority_queue_.IsEmpty() &&
405 (top_sort_key = priority_queue_.PeekSortKey()).priority() ==
406 TaskPriority::USER_BLOCKING) {
407 new_priority_queue.Push(priority_queue_.PopTaskSource(), top_sort_key);
408 }
409 new_priority_queue.swap(priority_queue_);
410 }
411 destination_thread_group->EnqueueAllTaskSources(&new_priority_queue);
412 }
413
ShouldYield(TaskSourceSortKey sort_key)414 bool ThreadGroup::ShouldYield(TaskSourceSortKey sort_key) {
415 DCHECK(TS_UNCHECKED_READ(max_allowed_sort_key_).is_lock_free());
416
417 if (!task_tracker_->CanRunPriority(sort_key.priority()))
418 return true;
419 // It is safe to read |max_allowed_sort_key_| without a lock since this
420 // variable is atomic, keeping in mind that threads may not immediately see
421 // the new value when it is updated.
422 auto max_allowed_sort_key =
423 TS_UNCHECKED_READ(max_allowed_sort_key_).load(std::memory_order_relaxed);
424
425 // To reduce unnecessary yielding, a task will never yield to a BEST_EFFORT
426 // task regardless of its worker_count.
427 if (sort_key.priority() > max_allowed_sort_key.priority ||
428 max_allowed_sort_key.priority == TaskPriority::BEST_EFFORT) {
429 return false;
430 }
431 // Otherwise, a task only yields to a task of equal priority if its
432 // worker_count would be greater still after yielding, e.g. a job with 1
433 // worker doesn't yield to a job with 0 workers.
434 if (sort_key.priority() == max_allowed_sort_key.priority &&
435 sort_key.worker_count() <= max_allowed_sort_key.worker_count + 1) {
436 return false;
437 }
438
439 // Reset |max_allowed_sort_key_| so that only one thread should yield at a
440 // time for a given task.
441 max_allowed_sort_key =
442 TS_UNCHECKED_READ(max_allowed_sort_key_)
443 .exchange(kMaxYieldSortKey, std::memory_order_relaxed);
444 // Another thread might have decided to yield and racily reset
445 // |max_allowed_sort_key_|, in which case this thread doesn't yield.
446 return max_allowed_sort_key.priority != TaskPriority::BEST_EFFORT;
447 }
448
449 #if BUILDFLAG(IS_WIN)
450 // static
451 std::unique_ptr<win::ScopedWindowsThreadEnvironment>
GetScopedWindowsThreadEnvironment(WorkerEnvironment environment)452 ThreadGroup::GetScopedWindowsThreadEnvironment(WorkerEnvironment environment) {
453 std::unique_ptr<win::ScopedWindowsThreadEnvironment> scoped_environment;
454 if (environment == WorkerEnvironment::COM_MTA) {
455 scoped_environment = std::make_unique<win::ScopedWinrtInitializer>();
456 }
457 // Continuing execution with an uninitialized apartment may lead to broken
458 // program invariants later on.
459 CHECK(!scoped_environment || scoped_environment->Succeeded());
460 return scoped_environment;
461 }
462 #endif
463
464 // static
CurrentThreadHasGroup()465 bool ThreadGroup::CurrentThreadHasGroup() {
466 return current_thread_group != nullptr;
467 }
468
GetMaxTasksForTesting() const469 size_t ThreadGroup::GetMaxTasksForTesting() const {
470 CheckedAutoLock auto_lock(lock_);
471 return max_tasks_;
472 }
473
GetMaxBestEffortTasksForTesting() const474 size_t ThreadGroup::GetMaxBestEffortTasksForTesting() const {
475 CheckedAutoLock auto_lock(lock_);
476 return max_best_effort_tasks_;
477 }
478
WaitForWorkersIdleLockRequiredForTesting(size_t n)479 void ThreadGroup::WaitForWorkersIdleLockRequiredForTesting(size_t n) {
480 // Make sure workers do not cleanup while watching the idle count.
481 AutoReset<bool> ban_cleanups(&worker_cleanup_disallowed_for_testing_, true);
482
483 while (NumberOfIdleWorkersLockRequiredForTesting() < n) {
484 idle_workers_set_cv_for_testing_.Wait();
485 }
486 }
487
WaitForWorkersIdleForTesting(size_t n)488 void ThreadGroup::WaitForWorkersIdleForTesting(size_t n) {
489 CheckedAutoLock auto_lock(lock_);
490
491 #if DCHECK_IS_ON()
492 DCHECK(!some_workers_cleaned_up_for_testing_)
493 << "Workers detached prior to waiting for a specific number of idle "
494 "workers. Doing the wait under such conditions is flaky. Consider "
495 "setting the suggested reclaim time to TimeDelta::Max() in Start().";
496 #endif
497
498 WaitForWorkersIdleLockRequiredForTesting(n);
499 }
500
WaitForAllWorkersIdleForTesting()501 void ThreadGroup::WaitForAllWorkersIdleForTesting() {
502 CheckedAutoLock auto_lock(lock_);
503 WaitForWorkersIdleLockRequiredForTesting(workers_.size());
504 }
505
WaitForWorkersCleanedUpForTesting(size_t n)506 void ThreadGroup::WaitForWorkersCleanedUpForTesting(size_t n) {
507 CheckedAutoLock auto_lock(lock_);
508
509 if (!num_workers_cleaned_up_for_testing_cv_) {
510 lock_.CreateConditionVariableAndEmplace(
511 num_workers_cleaned_up_for_testing_cv_);
512 }
513
514 while (num_workers_cleaned_up_for_testing_ < n) {
515 num_workers_cleaned_up_for_testing_cv_->Wait();
516 }
517
518 num_workers_cleaned_up_for_testing_ = 0;
519 }
520
GetMaxConcurrentNonBlockedTasksDeprecated() const521 size_t ThreadGroup::GetMaxConcurrentNonBlockedTasksDeprecated() const {
522 #if DCHECK_IS_ON()
523 CheckedAutoLock auto_lock(lock_);
524 DCHECK_NE(after_start().initial_max_tasks, 0U)
525 << "GetMaxConcurrentTasksDeprecated() should only be called after the "
526 << "thread group has started.";
527 #endif
528 return after_start().initial_max_tasks;
529 }
530
NumberOfWorkersForTesting() const531 size_t ThreadGroup::NumberOfWorkersForTesting() const {
532 CheckedAutoLock auto_lock(lock_);
533 return workers_.size();
534 }
535
NumberOfIdleWorkersForTesting() const536 size_t ThreadGroup::NumberOfIdleWorkersForTesting() const {
537 CheckedAutoLock auto_lock(lock_);
538 return NumberOfIdleWorkersLockRequiredForTesting();
539 }
540
GetDesiredNumAwakeWorkersLockRequired() const541 size_t ThreadGroup::GetDesiredNumAwakeWorkersLockRequired() const {
542 // Number of BEST_EFFORT task sources that are running or queued and allowed
543 // to run by the CanRunPolicy.
544 const size_t num_running_or_queued_can_run_best_effort_task_sources =
545 num_running_best_effort_tasks_ +
546 GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
547
548 const size_t workers_for_best_effort_task_sources =
549 std::max(std::min(num_running_or_queued_can_run_best_effort_task_sources,
550 max_best_effort_tasks_),
551 num_running_best_effort_tasks_);
552
553 // Number of USER_{VISIBLE|BLOCKING} task sources that are running or queued.
554 const size_t num_running_or_queued_foreground_task_sources =
555 (num_running_tasks_ - num_running_best_effort_tasks_) +
556 GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
557
558 const size_t workers_for_foreground_task_sources =
559 num_running_or_queued_foreground_task_sources;
560
561 return std::min({workers_for_best_effort_task_sources +
562 workers_for_foreground_task_sources,
563 max_tasks_, kMaxNumberOfWorkers});
564 }
565
MaybeScheduleAdjustMaxTasksLockRequired(BaseScopedCommandsExecutor * executor)566 void ThreadGroup::MaybeScheduleAdjustMaxTasksLockRequired(
567 BaseScopedCommandsExecutor* executor) {
568 if (!adjust_max_tasks_posted_ &&
569 ShouldPeriodicallyAdjustMaxTasksLockRequired()) {
570 executor->ScheduleAdjustMaxTasks();
571 adjust_max_tasks_posted_ = true;
572 }
573 }
574
ShouldPeriodicallyAdjustMaxTasksLockRequired()575 bool ThreadGroup::ShouldPeriodicallyAdjustMaxTasksLockRequired() {
576 // AdjustMaxTasks() should be scheduled to periodically adjust |max_tasks_|
577 // and |max_best_effort_tasks_| when (1) the concurrency limits are not large
578 // enough to accommodate all queued and running task sources and an idle
579 // worker and (2) there are unresolved MAY_BLOCK ScopedBlockingCalls.
580 // - When (1) is false: No worker would be created or woken up if the
581 // concurrency limits were increased, so there is no hurry to increase them.
582 // - When (2) is false: The concurrency limits could not be increased by
583 // AdjustMaxTasks().
584
585 const size_t num_running_or_queued_best_effort_task_sources =
586 num_running_best_effort_tasks_ +
587 GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
588 if (num_running_or_queued_best_effort_task_sources > max_best_effort_tasks_ &&
589 num_unresolved_best_effort_may_block_ > 0) {
590 return true;
591 }
592
593 const size_t num_running_or_queued_task_sources =
594 num_running_tasks_ +
595 GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() +
596 GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
597 constexpr size_t kIdleWorker = 1;
598 return num_running_or_queued_task_sources + kIdleWorker > max_tasks_ &&
599 num_unresolved_may_block_ > 0;
600 }
601
UpdateMinAllowedPriorityLockRequired()602 void ThreadGroup::UpdateMinAllowedPriorityLockRequired() {
603 if (priority_queue_.IsEmpty() || num_running_tasks_ < max_tasks_) {
604 max_allowed_sort_key_.store(kMaxYieldSortKey, std::memory_order_relaxed);
605 } else {
606 max_allowed_sort_key_.store({priority_queue_.PeekSortKey().priority(),
607 priority_queue_.PeekSortKey().worker_count()},
608 std::memory_order_relaxed);
609 }
610 }
611
DecrementTasksRunningLockRequired(TaskPriority priority)612 void ThreadGroup::DecrementTasksRunningLockRequired(TaskPriority priority) {
613 DCHECK_GT(num_running_tasks_, 0U);
614 --num_running_tasks_;
615 if (priority == TaskPriority::BEST_EFFORT) {
616 DCHECK_GT(num_running_best_effort_tasks_, 0U);
617 --num_running_best_effort_tasks_;
618 }
619 UpdateMinAllowedPriorityLockRequired();
620 }
621
IncrementTasksRunningLockRequired(TaskPriority priority)622 void ThreadGroup::IncrementTasksRunningLockRequired(TaskPriority priority) {
623 ++num_running_tasks_;
624 DCHECK_LE(num_running_tasks_, max_tasks_);
625 DCHECK_LE(num_running_tasks_, kMaxNumberOfWorkers);
626 if (priority == TaskPriority::BEST_EFFORT) {
627 ++num_running_best_effort_tasks_;
628 DCHECK_LE(num_running_best_effort_tasks_, num_running_tasks_);
629 DCHECK_LE(num_running_best_effort_tasks_, max_best_effort_tasks_);
630 }
631 UpdateMinAllowedPriorityLockRequired();
632 }
633
DecrementMaxTasksLockRequired()634 void ThreadGroup::DecrementMaxTasksLockRequired() {
635 DCHECK_GT(num_running_tasks_, 0U);
636 DCHECK_GT(max_tasks_, 0U);
637 --max_tasks_;
638 UpdateMinAllowedPriorityLockRequired();
639 }
640
IncrementMaxTasksLockRequired()641 void ThreadGroup::IncrementMaxTasksLockRequired() {
642 DCHECK_GT(num_running_tasks_, 0U);
643 ++max_tasks_;
644 UpdateMinAllowedPriorityLockRequired();
645 }
646
DecrementMaxBestEffortTasksLockRequired()647 void ThreadGroup::DecrementMaxBestEffortTasksLockRequired() {
648 DCHECK_GT(num_running_tasks_, 0U);
649 DCHECK_GT(max_best_effort_tasks_, 0U);
650 --max_best_effort_tasks_;
651 UpdateMinAllowedPriorityLockRequired();
652 }
653
IncrementMaxBestEffortTasksLockRequired()654 void ThreadGroup::IncrementMaxBestEffortTasksLockRequired() {
655 DCHECK_GT(num_running_tasks_, 0U);
656 ++max_best_effort_tasks_;
657 UpdateMinAllowedPriorityLockRequired();
658 }
659
660 ThreadGroup::InitializedInStart::InitializedInStart() = default;
661 ThreadGroup::InitializedInStart::~InitializedInStart() = default;
662
663 } // namespace internal
664 } // namespace base
665