• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group.h"
6 
7 #include <string_view>
8 #include <utility>
9 
10 #include "base/check.h"
11 #include "base/functional/bind.h"
12 #include "base/functional/callback_helpers.h"
13 #include "base/task/task_features.h"
14 #include "base/task/thread_pool/task_tracker.h"
15 #include "build/build_config.h"
16 
17 #if BUILDFLAG(IS_WIN)
18 #include "base/win/com_init_check_hook.h"
19 #include "base/win/scoped_winrt_initializer.h"
20 #endif
21 
22 namespace base {
23 namespace internal {
24 
25 namespace {
26 
27 constexpr size_t kMaxNumberOfWorkers = 256;
28 
29 // In a background thread group:
30 // - Blocking calls take more time than in a foreground thread group.
31 // - We want to minimize impact on foreground work, not maximize execution
32 //   throughput.
33 // For these reasons, the timeout to increase the maximum number of concurrent
34 // tasks when there is a MAY_BLOCK ScopedBlockingCall is *long*. It is not
35 // infinite because execution throughput should not be reduced forever if a task
36 // blocks forever.
37 //
38 // TODO(fdoray): On platforms without background thread groups, blocking in a
39 // BEST_EFFORT task should:
40 // 1. Increment the maximum number of concurrent tasks after a *short* timeout,
41 //    to allow scheduling of USER_VISIBLE/USER_BLOCKING tasks.
42 // 2. Increment the maximum number of concurrent BEST_EFFORT tasks after a
43 //    *long* timeout, because we only want to allow more BEST_EFFORT tasks to be
44 //    be scheduled concurrently when we believe that a BEST_EFFORT task is
45 //    blocked forever.
46 // Currently, only 1. is true as the configuration is per thread group.
47 // TODO(crbug.com/40612168): Fix racy condition when MayBlockThreshold ==
48 // BlockedWorkersPoll.
49 constexpr TimeDelta kForegroundMayBlockThreshold = Milliseconds(1000);
50 constexpr TimeDelta kForegroundBlockedWorkersPoll = Milliseconds(1200);
51 constexpr TimeDelta kBackgroundMayBlockThreshold = Seconds(10);
52 constexpr TimeDelta kBackgroundBlockedWorkersPoll = Seconds(12);
53 
54 // ThreadGroup that owns the current thread, if any.
55 constinit thread_local const ThreadGroup* current_thread_group = nullptr;
56 
57 }  // namespace
58 
59 constexpr ThreadGroup::YieldSortKey ThreadGroup::kMaxYieldSortKey;
60 
ScheduleReleaseTaskSource(RegisteredTaskSource task_source)61 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleReleaseTaskSource(
62     RegisteredTaskSource task_source) {
63   task_sources_to_release_.push_back(std::move(task_source));
64 }
65 
ScheduleAdjustMaxTasks()66 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleAdjustMaxTasks() {
67   DCHECK(!must_schedule_adjust_max_tasks_);
68   must_schedule_adjust_max_tasks_ = true;
69 }
70 
ScheduleStart(scoped_refptr<WorkerThread> worker)71 void ThreadGroup::BaseScopedCommandsExecutor::ScheduleStart(
72     scoped_refptr<WorkerThread> worker) {
73   workers_to_start_.emplace_back(std::move(worker));
74 }
75 
BaseScopedCommandsExecutor(ThreadGroup * outer)76 ThreadGroup::BaseScopedCommandsExecutor::BaseScopedCommandsExecutor(
77     ThreadGroup* outer)
78     : outer_(outer) {}
79 
~BaseScopedCommandsExecutor()80 ThreadGroup::BaseScopedCommandsExecutor::~BaseScopedCommandsExecutor() {
81   CheckedLock::AssertNoLockHeldOnCurrentThread();
82   Flush();
83 }
84 
Flush()85 void ThreadGroup::BaseScopedCommandsExecutor::Flush() {
86   // Start workers. Happens after wake ups (implemented by children and thus
87   // called on their destructor, i.e. before this) to prevent the case where a
88   // worker enters its main function, is descheduled because it wasn't woken up
89   // yet, and is woken up immediately after.
90   for (auto worker : workers_to_start_) {
91     worker->Start(outer_->after_start().service_thread_task_runner,
92                   outer_->after_start().worker_thread_observer);
93     if (outer_->worker_started_for_testing_) {
94       outer_->worker_started_for_testing_->Wait();
95     }
96   }
97   workers_to_start_.clear();
98 
99   if (must_schedule_adjust_max_tasks_) {
100     outer_->ScheduleAdjustMaxTasks();
101   }
102 }
103 
104 ThreadGroup::ScopedReenqueueExecutor::ScopedReenqueueExecutor() = default;
105 
~ScopedReenqueueExecutor()106 ThreadGroup::ScopedReenqueueExecutor::~ScopedReenqueueExecutor() {
107   if (destination_thread_group_) {
108     destination_thread_group_->PushTaskSourceAndWakeUpWorkers(
109         std::move(transaction_with_task_source_.value()));
110   }
111 }
112 
113 void ThreadGroup::ScopedReenqueueExecutor::
SchedulePushTaskSourceAndWakeUpWorkers(RegisteredTaskSourceAndTransaction transaction_with_task_source,ThreadGroup * destination_thread_group)114     SchedulePushTaskSourceAndWakeUpWorkers(
115         RegisteredTaskSourceAndTransaction transaction_with_task_source,
116         ThreadGroup* destination_thread_group) {
117   DCHECK(destination_thread_group);
118   DCHECK(!destination_thread_group_);
119   DCHECK(!transaction_with_task_source_);
120   transaction_with_task_source_.emplace(
121       std::move(transaction_with_task_source));
122   destination_thread_group_ = destination_thread_group;
123 }
124 
ThreadGroup(std::string_view histogram_label,std::string_view thread_group_label,ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)125 ThreadGroup::ThreadGroup(std::string_view histogram_label,
126                          std::string_view thread_group_label,
127                          ThreadType thread_type_hint,
128                          TrackedRef<TaskTracker> task_tracker,
129                          TrackedRef<Delegate> delegate)
130     : task_tracker_(std::move(task_tracker)),
131       delegate_(std::move(delegate)),
132       histogram_label_(histogram_label),
133       thread_group_label_(thread_group_label),
134       thread_type_hint_(thread_type_hint),
135       idle_workers_set_cv_for_testing_(lock_.CreateConditionVariable()) {
136   DCHECK(!thread_group_label_.empty());
137 }
138 
StartImpl(size_t max_tasks,size_t max_best_effort_tasks,TimeDelta suggested_reclaim_time,scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,WorkerThreadObserver * worker_thread_observer,WorkerEnvironment worker_environment,bool synchronous_thread_start_for_testing,std::optional<TimeDelta> may_block_threshold)139 void ThreadGroup::StartImpl(
140     size_t max_tasks,
141     size_t max_best_effort_tasks,
142     TimeDelta suggested_reclaim_time,
143     scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
144     WorkerThreadObserver* worker_thread_observer,
145     WorkerEnvironment worker_environment,
146     bool synchronous_thread_start_for_testing,
147     std::optional<TimeDelta> may_block_threshold) {
148   if (synchronous_thread_start_for_testing) {
149     worker_started_for_testing_.emplace(WaitableEvent::ResetPolicy::AUTOMATIC);
150     // Don't emit a ScopedBlockingCallWithBaseSyncPrimitives from this
151     // WaitableEvent or it defeats the purpose of having threads start without
152     // externally visible side-effects.
153     worker_started_for_testing_->declare_only_used_while_idle();
154   }
155 
156   in_start().may_block_threshold =
157       may_block_threshold ? may_block_threshold.value()
158                           : (thread_type_hint_ != ThreadType::kBackground
159                                  ? kForegroundMayBlockThreshold
160                                  : kBackgroundMayBlockThreshold);
161   in_start().blocked_workers_poll_period =
162       thread_type_hint_ != ThreadType::kBackground
163           ? kForegroundBlockedWorkersPoll
164           : kBackgroundBlockedWorkersPoll;
165 
166   CheckedAutoLock auto_lock(lock_);
167 
168   max_tasks_ = max_tasks;
169   baseline_max_tasks_ = max_tasks;
170   DCHECK_GE(max_tasks_, 1U);
171   in_start().initial_max_tasks = std::min(max_tasks_, kMaxNumberOfWorkers);
172   max_best_effort_tasks_ = max_best_effort_tasks;
173   in_start().suggested_reclaim_time = suggested_reclaim_time;
174   in_start().worker_environment = worker_environment;
175   in_start().service_thread_task_runner = std::move(service_thread_task_runner);
176   in_start().worker_thread_observer = worker_thread_observer;
177 
178 #if DCHECK_IS_ON()
179   in_start().initialized = true;
180 #endif
181 }
182 
183 ThreadGroup::~ThreadGroup() = default;
184 
BindToCurrentThread()185 void ThreadGroup::BindToCurrentThread() {
186   DCHECK(!CurrentThreadHasGroup());
187   current_thread_group = this;
188 }
189 
UnbindFromCurrentThread()190 void ThreadGroup::UnbindFromCurrentThread() {
191   DCHECK(IsBoundToCurrentThread());
192   current_thread_group = nullptr;
193 }
194 
IsBoundToCurrentThread() const195 bool ThreadGroup::IsBoundToCurrentThread() const {
196   return current_thread_group == this;
197 }
198 
SetMaxTasks(size_t max_tasks)199 void ThreadGroup::SetMaxTasks(size_t max_tasks) {
200   CheckedAutoLock auto_lock(lock_);
201   size_t extra_tasks = max_tasks_ - baseline_max_tasks_;
202   baseline_max_tasks_ = std::min(max_tasks, after_start().initial_max_tasks);
203   max_tasks_ = baseline_max_tasks_ + extra_tasks;
204 }
205 
ResetMaxTasks()206 void ThreadGroup::ResetMaxTasks() {
207   SetMaxTasks(after_start().initial_max_tasks);
208 }
209 
210 size_t
GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() const211 ThreadGroup::GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired()
212     const {
213   // For simplicity, only 1 worker is assigned to each task source regardless of
214   // its max concurrency, with the exception of the top task source.
215   const size_t num_queued =
216       priority_queue_.GetNumTaskSourcesWithPriority(TaskPriority::BEST_EFFORT);
217   if (num_queued == 0 ||
218       !task_tracker_->CanRunPriority(TaskPriority::BEST_EFFORT)) {
219     return 0U;
220   }
221   if (priority_queue_.PeekSortKey().priority() == TaskPriority::BEST_EFFORT) {
222     // Assign the correct number of workers for the top TaskSource (-1 for the
223     // worker that is already accounted for in |num_queued|).
224     return std::max<size_t>(
225         1, num_queued +
226                priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1);
227   }
228   return num_queued;
229 }
230 
231 size_t
GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired() const232 ThreadGroup::GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired()
233     const {
234   // For simplicity, only 1 worker is assigned to each task source regardless of
235   // its max concurrency, with the exception of the top task source.
236   const size_t num_queued = priority_queue_.GetNumTaskSourcesWithPriority(
237                                 TaskPriority::USER_VISIBLE) +
238                             priority_queue_.GetNumTaskSourcesWithPriority(
239                                 TaskPriority::USER_BLOCKING);
240   if (num_queued == 0 ||
241       !task_tracker_->CanRunPriority(TaskPriority::HIGHEST)) {
242     return 0U;
243   }
244   auto priority = priority_queue_.PeekSortKey().priority();
245   if (priority == TaskPriority::USER_VISIBLE ||
246       priority == TaskPriority::USER_BLOCKING) {
247     // Assign the correct number of workers for the top TaskSource (-1 for the
248     // worker that is already accounted for in |num_queued|).
249     return std::max<size_t>(
250         1, num_queued +
251                priority_queue_.PeekTaskSource()->GetRemainingConcurrency() - 1);
252   }
253   return num_queued;
254 }
255 
RemoveTaskSource(const TaskSource & task_source)256 RegisteredTaskSource ThreadGroup::RemoveTaskSource(
257     const TaskSource& task_source) {
258   CheckedAutoLock auto_lock(lock_);
259   return priority_queue_.RemoveTaskSource(task_source);
260 }
261 
ReEnqueueTaskSourceLockRequired(BaseScopedCommandsExecutor * workers_executor,ScopedReenqueueExecutor * reenqueue_executor,RegisteredTaskSourceAndTransaction transaction_with_task_source)262 void ThreadGroup::ReEnqueueTaskSourceLockRequired(
263     BaseScopedCommandsExecutor* workers_executor,
264     ScopedReenqueueExecutor* reenqueue_executor,
265     RegisteredTaskSourceAndTransaction transaction_with_task_source) {
266   // Decide in which thread group the TaskSource should be reenqueued.
267   ThreadGroup* destination_thread_group = delegate_->GetThreadGroupForTraits(
268       transaction_with_task_source.transaction.traits());
269 
270   bool push_to_immediate_queue =
271       transaction_with_task_source.task_source.WillReEnqueue(
272           TimeTicks::Now(), &transaction_with_task_source.transaction);
273 
274   if (destination_thread_group == this) {
275     // Another worker that was running a task from this task source may have
276     // reenqueued it already, in which case its heap_handle will be valid. It
277     // shouldn't be queued twice so the task source registration is released.
278     if (transaction_with_task_source.task_source->immediate_heap_handle()
279             .IsValid()) {
280       workers_executor->ScheduleReleaseTaskSource(
281           std::move(transaction_with_task_source.task_source));
282     } else {
283       // If the TaskSource should be reenqueued in the current thread group,
284       // reenqueue it inside the scope of the lock.
285       if (push_to_immediate_queue) {
286         auto sort_key = transaction_with_task_source.task_source->GetSortKey();
287         // When moving |task_source| into |priority_queue_|, it may be destroyed
288         // on another thread as soon as |lock_| is released, since we're no
289         // longer holding a reference to it. To prevent UAF, release
290         // |transaction| before moving |task_source|. Ref. crbug.com/1412008
291         transaction_with_task_source.transaction.Release();
292         priority_queue_.Push(
293             std::move(transaction_with_task_source.task_source), sort_key);
294       }
295     }
296     // This is called unconditionally to ensure there are always workers to run
297     // task sources in the queue. Some ThreadGroup implementations only invoke
298     // TakeRegisteredTaskSource() once per wake up and hence this is required to
299     // avoid races that could leave a task source stranded in the queue with no
300     // active workers.
301     EnsureEnoughWorkersLockRequired(workers_executor);
302   } else {
303     // Otherwise, schedule a reenqueue after releasing the lock.
304     reenqueue_executor->SchedulePushTaskSourceAndWakeUpWorkers(
305         std::move(transaction_with_task_source), destination_thread_group);
306   }
307 }
308 
TakeRegisteredTaskSource(BaseScopedCommandsExecutor * executor)309 RegisteredTaskSource ThreadGroup::TakeRegisteredTaskSource(
310     BaseScopedCommandsExecutor* executor) {
311   DCHECK(!priority_queue_.IsEmpty());
312 
313   auto run_status = priority_queue_.PeekTaskSource().WillRunTask();
314 
315   if (run_status == TaskSource::RunStatus::kDisallowed) {
316     executor->ScheduleReleaseTaskSource(priority_queue_.PopTaskSource());
317     return nullptr;
318   }
319 
320   if (run_status == TaskSource::RunStatus::kAllowedSaturated) {
321     return priority_queue_.PopTaskSource();
322   }
323 
324   // If the TaskSource isn't saturated, check whether TaskTracker allows it to
325   // remain in the PriorityQueue.
326   // The canonical way of doing this is to pop the task source to return, call
327   // RegisterTaskSource() to get an additional RegisteredTaskSource, and
328   // reenqueue that task source if valid. Instead, it is cheaper and equivalent
329   // to peek the task source, call RegisterTaskSource() to get an additional
330   // RegisteredTaskSource to replace if valid, and only pop |priority_queue_|
331   // otherwise.
332   RegisteredTaskSource task_source =
333       task_tracker_->RegisterTaskSource(priority_queue_.PeekTaskSource().get());
334   if (!task_source) {
335     return priority_queue_.PopTaskSource();
336   }
337   // Replace the top task_source and then update the queue.
338   std::swap(priority_queue_.PeekTaskSource(), task_source);
339   priority_queue_.UpdateSortKey(*task_source.get(), task_source->GetSortKey());
340   return task_source;
341 }
342 
UpdateSortKeyImpl(BaseScopedCommandsExecutor * executor,TaskSource::Transaction transaction)343 void ThreadGroup::UpdateSortKeyImpl(BaseScopedCommandsExecutor* executor,
344                                     TaskSource::Transaction transaction) {
345   CheckedAutoLock auto_lock(lock_);
346   priority_queue_.UpdateSortKey(*transaction.task_source(),
347                                 transaction.task_source()->GetSortKey());
348   EnsureEnoughWorkersLockRequired(executor);
349 }
350 
PushTaskSourceAndWakeUpWorkersImpl(BaseScopedCommandsExecutor * executor,RegisteredTaskSourceAndTransaction transaction_with_task_source)351 void ThreadGroup::PushTaskSourceAndWakeUpWorkersImpl(
352     BaseScopedCommandsExecutor* executor,
353     RegisteredTaskSourceAndTransaction transaction_with_task_source) {
354   DCHECK_EQ(delegate_->GetThreadGroupForTraits(
355                 transaction_with_task_source.transaction.traits()),
356             this);
357   CheckedAutoLock lock(lock_);
358   if (transaction_with_task_source.task_source->immediate_heap_handle()
359           .IsValid()) {
360     // If the task source changed group, it is possible that multiple concurrent
361     // workers try to enqueue it. Only the first enqueue should succeed.
362     executor->ScheduleReleaseTaskSource(
363         std::move(transaction_with_task_source.task_source));
364     return;
365   }
366   auto sort_key = transaction_with_task_source.task_source->GetSortKey();
367   // When moving |task_source| into |priority_queue_|, it may be destroyed
368   // on another thread as soon as |lock_| is released, since we're no longer
369   // holding a reference to it. To prevent UAF, release |transaction| before
370   // moving |task_source|. Ref. crbug.com/1412008
371   transaction_with_task_source.transaction.Release();
372   priority_queue_.Push(std::move(transaction_with_task_source.task_source),
373                        sort_key);
374   EnsureEnoughWorkersLockRequired(executor);
375 }
376 
EnqueueAllTaskSources(PriorityQueue * new_priority_queue)377 void ThreadGroup::EnqueueAllTaskSources(PriorityQueue* new_priority_queue) {
378   CheckedAutoLock lock(lock_);
379   while (!new_priority_queue->IsEmpty()) {
380     TaskSourceSortKey top_sort_key = new_priority_queue->PeekSortKey();
381     RegisteredTaskSource task_source = new_priority_queue->PopTaskSource();
382     priority_queue_.Push(std::move(task_source), top_sort_key);
383   }
384 }
385 
HandoffAllTaskSourcesToOtherThreadGroup(ThreadGroup * destination_thread_group)386 void ThreadGroup::HandoffAllTaskSourcesToOtherThreadGroup(
387     ThreadGroup* destination_thread_group) {
388   PriorityQueue new_priority_queue;
389   TaskSourceSortKey top_sort_key;
390   {
391     CheckedAutoLock current_thread_group_lock(lock_);
392     new_priority_queue.swap(priority_queue_);
393   }
394   destination_thread_group->EnqueueAllTaskSources(&new_priority_queue);
395 }
396 
HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(ThreadGroup * destination_thread_group)397 void ThreadGroup::HandoffNonUserBlockingTaskSourcesToOtherThreadGroup(
398     ThreadGroup* destination_thread_group) {
399   PriorityQueue new_priority_queue;
400   TaskSourceSortKey top_sort_key;
401   {
402     // This works because all USER_BLOCKING tasks are at the front of the queue.
403     CheckedAutoLock current_thread_group_lock(lock_);
404     while (!priority_queue_.IsEmpty() &&
405            (top_sort_key = priority_queue_.PeekSortKey()).priority() ==
406                TaskPriority::USER_BLOCKING) {
407       new_priority_queue.Push(priority_queue_.PopTaskSource(), top_sort_key);
408     }
409     new_priority_queue.swap(priority_queue_);
410   }
411   destination_thread_group->EnqueueAllTaskSources(&new_priority_queue);
412 }
413 
ShouldYield(TaskSourceSortKey sort_key)414 bool ThreadGroup::ShouldYield(TaskSourceSortKey sort_key) {
415   DCHECK(TS_UNCHECKED_READ(max_allowed_sort_key_).is_lock_free());
416 
417   if (!task_tracker_->CanRunPriority(sort_key.priority()))
418     return true;
419   // It is safe to read |max_allowed_sort_key_| without a lock since this
420   // variable is atomic, keeping in mind that threads may not immediately see
421   // the new value when it is updated.
422   auto max_allowed_sort_key =
423       TS_UNCHECKED_READ(max_allowed_sort_key_).load(std::memory_order_relaxed);
424 
425   // To reduce unnecessary yielding, a task will never yield to a BEST_EFFORT
426   // task regardless of its worker_count.
427   if (sort_key.priority() > max_allowed_sort_key.priority ||
428       max_allowed_sort_key.priority == TaskPriority::BEST_EFFORT) {
429     return false;
430   }
431   // Otherwise, a task only yields to a task of equal priority if its
432   // worker_count would be greater still after yielding, e.g. a job with 1
433   // worker doesn't yield to a job with 0 workers.
434   if (sort_key.priority() == max_allowed_sort_key.priority &&
435       sort_key.worker_count() <= max_allowed_sort_key.worker_count + 1) {
436     return false;
437   }
438 
439   // Reset |max_allowed_sort_key_| so that only one thread should yield at a
440   // time for a given task.
441   max_allowed_sort_key =
442       TS_UNCHECKED_READ(max_allowed_sort_key_)
443           .exchange(kMaxYieldSortKey, std::memory_order_relaxed);
444   // Another thread might have decided to yield and racily reset
445   // |max_allowed_sort_key_|, in which case this thread doesn't yield.
446   return max_allowed_sort_key.priority != TaskPriority::BEST_EFFORT;
447 }
448 
449 #if BUILDFLAG(IS_WIN)
450 // static
451 std::unique_ptr<win::ScopedWindowsThreadEnvironment>
GetScopedWindowsThreadEnvironment(WorkerEnvironment environment)452 ThreadGroup::GetScopedWindowsThreadEnvironment(WorkerEnvironment environment) {
453   std::unique_ptr<win::ScopedWindowsThreadEnvironment> scoped_environment;
454   if (environment == WorkerEnvironment::COM_MTA) {
455     scoped_environment = std::make_unique<win::ScopedWinrtInitializer>();
456   }
457   // Continuing execution with an uninitialized apartment may lead to broken
458   // program invariants later on.
459   CHECK(!scoped_environment || scoped_environment->Succeeded());
460   return scoped_environment;
461 }
462 #endif
463 
464 // static
CurrentThreadHasGroup()465 bool ThreadGroup::CurrentThreadHasGroup() {
466   return current_thread_group != nullptr;
467 }
468 
GetMaxTasksForTesting() const469 size_t ThreadGroup::GetMaxTasksForTesting() const {
470   CheckedAutoLock auto_lock(lock_);
471   return max_tasks_;
472 }
473 
GetMaxBestEffortTasksForTesting() const474 size_t ThreadGroup::GetMaxBestEffortTasksForTesting() const {
475   CheckedAutoLock auto_lock(lock_);
476   return max_best_effort_tasks_;
477 }
478 
WaitForWorkersIdleLockRequiredForTesting(size_t n)479 void ThreadGroup::WaitForWorkersIdleLockRequiredForTesting(size_t n) {
480   // Make sure workers do not cleanup while watching the idle count.
481   AutoReset<bool> ban_cleanups(&worker_cleanup_disallowed_for_testing_, true);
482 
483   while (NumberOfIdleWorkersLockRequiredForTesting() < n) {
484     idle_workers_set_cv_for_testing_.Wait();
485   }
486 }
487 
WaitForWorkersIdleForTesting(size_t n)488 void ThreadGroup::WaitForWorkersIdleForTesting(size_t n) {
489   CheckedAutoLock auto_lock(lock_);
490 
491 #if DCHECK_IS_ON()
492   DCHECK(!some_workers_cleaned_up_for_testing_)
493       << "Workers detached prior to waiting for a specific number of idle "
494          "workers. Doing the wait under such conditions is flaky. Consider "
495          "setting the suggested reclaim time to TimeDelta::Max() in Start().";
496 #endif
497 
498   WaitForWorkersIdleLockRequiredForTesting(n);
499 }
500 
WaitForAllWorkersIdleForTesting()501 void ThreadGroup::WaitForAllWorkersIdleForTesting() {
502   CheckedAutoLock auto_lock(lock_);
503   WaitForWorkersIdleLockRequiredForTesting(workers_.size());
504 }
505 
WaitForWorkersCleanedUpForTesting(size_t n)506 void ThreadGroup::WaitForWorkersCleanedUpForTesting(size_t n) {
507   CheckedAutoLock auto_lock(lock_);
508 
509   if (!num_workers_cleaned_up_for_testing_cv_) {
510     lock_.CreateConditionVariableAndEmplace(
511         num_workers_cleaned_up_for_testing_cv_);
512   }
513 
514   while (num_workers_cleaned_up_for_testing_ < n) {
515     num_workers_cleaned_up_for_testing_cv_->Wait();
516   }
517 
518   num_workers_cleaned_up_for_testing_ = 0;
519 }
520 
GetMaxConcurrentNonBlockedTasksDeprecated() const521 size_t ThreadGroup::GetMaxConcurrentNonBlockedTasksDeprecated() const {
522 #if DCHECK_IS_ON()
523   CheckedAutoLock auto_lock(lock_);
524   DCHECK_NE(after_start().initial_max_tasks, 0U)
525       << "GetMaxConcurrentTasksDeprecated() should only be called after the "
526       << "thread group has started.";
527 #endif
528   return after_start().initial_max_tasks;
529 }
530 
NumberOfWorkersForTesting() const531 size_t ThreadGroup::NumberOfWorkersForTesting() const {
532   CheckedAutoLock auto_lock(lock_);
533   return workers_.size();
534 }
535 
NumberOfIdleWorkersForTesting() const536 size_t ThreadGroup::NumberOfIdleWorkersForTesting() const {
537   CheckedAutoLock auto_lock(lock_);
538   return NumberOfIdleWorkersLockRequiredForTesting();
539 }
540 
GetDesiredNumAwakeWorkersLockRequired() const541 size_t ThreadGroup::GetDesiredNumAwakeWorkersLockRequired() const {
542   // Number of BEST_EFFORT task sources that are running or queued and allowed
543   // to run by the CanRunPolicy.
544   const size_t num_running_or_queued_can_run_best_effort_task_sources =
545       num_running_best_effort_tasks_ +
546       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
547 
548   const size_t workers_for_best_effort_task_sources =
549       std::max(std::min(num_running_or_queued_can_run_best_effort_task_sources,
550                         max_best_effort_tasks_),
551                num_running_best_effort_tasks_);
552 
553   // Number of USER_{VISIBLE|BLOCKING} task sources that are running or queued.
554   const size_t num_running_or_queued_foreground_task_sources =
555       (num_running_tasks_ - num_running_best_effort_tasks_) +
556       GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
557 
558   const size_t workers_for_foreground_task_sources =
559       num_running_or_queued_foreground_task_sources;
560 
561   return std::min({workers_for_best_effort_task_sources +
562                        workers_for_foreground_task_sources,
563                    max_tasks_, kMaxNumberOfWorkers});
564 }
565 
MaybeScheduleAdjustMaxTasksLockRequired(BaseScopedCommandsExecutor * executor)566 void ThreadGroup::MaybeScheduleAdjustMaxTasksLockRequired(
567     BaseScopedCommandsExecutor* executor) {
568   if (!adjust_max_tasks_posted_ &&
569       ShouldPeriodicallyAdjustMaxTasksLockRequired()) {
570     executor->ScheduleAdjustMaxTasks();
571     adjust_max_tasks_posted_ = true;
572   }
573 }
574 
ShouldPeriodicallyAdjustMaxTasksLockRequired()575 bool ThreadGroup::ShouldPeriodicallyAdjustMaxTasksLockRequired() {
576   // AdjustMaxTasks() should be scheduled to periodically adjust |max_tasks_|
577   // and |max_best_effort_tasks_| when (1) the concurrency limits are not large
578   // enough to accommodate all queued and running task sources and an idle
579   // worker and (2) there are unresolved MAY_BLOCK ScopedBlockingCalls.
580   // - When (1) is false: No worker would be created or woken up if the
581   //   concurrency limits were increased, so there is no hurry to increase them.
582   // - When (2) is false: The concurrency limits could not be increased by
583   //   AdjustMaxTasks().
584 
585   const size_t num_running_or_queued_best_effort_task_sources =
586       num_running_best_effort_tasks_ +
587       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
588   if (num_running_or_queued_best_effort_task_sources > max_best_effort_tasks_ &&
589       num_unresolved_best_effort_may_block_ > 0) {
590     return true;
591   }
592 
593   const size_t num_running_or_queued_task_sources =
594       num_running_tasks_ +
595       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() +
596       GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
597   constexpr size_t kIdleWorker = 1;
598   return num_running_or_queued_task_sources + kIdleWorker > max_tasks_ &&
599          num_unresolved_may_block_ > 0;
600 }
601 
UpdateMinAllowedPriorityLockRequired()602 void ThreadGroup::UpdateMinAllowedPriorityLockRequired() {
603   if (priority_queue_.IsEmpty() || num_running_tasks_ < max_tasks_) {
604     max_allowed_sort_key_.store(kMaxYieldSortKey, std::memory_order_relaxed);
605   } else {
606     max_allowed_sort_key_.store({priority_queue_.PeekSortKey().priority(),
607                                  priority_queue_.PeekSortKey().worker_count()},
608                                 std::memory_order_relaxed);
609   }
610 }
611 
DecrementTasksRunningLockRequired(TaskPriority priority)612 void ThreadGroup::DecrementTasksRunningLockRequired(TaskPriority priority) {
613   DCHECK_GT(num_running_tasks_, 0U);
614   --num_running_tasks_;
615   if (priority == TaskPriority::BEST_EFFORT) {
616     DCHECK_GT(num_running_best_effort_tasks_, 0U);
617     --num_running_best_effort_tasks_;
618   }
619   UpdateMinAllowedPriorityLockRequired();
620 }
621 
IncrementTasksRunningLockRequired(TaskPriority priority)622 void ThreadGroup::IncrementTasksRunningLockRequired(TaskPriority priority) {
623   ++num_running_tasks_;
624   DCHECK_LE(num_running_tasks_, max_tasks_);
625   DCHECK_LE(num_running_tasks_, kMaxNumberOfWorkers);
626   if (priority == TaskPriority::BEST_EFFORT) {
627     ++num_running_best_effort_tasks_;
628     DCHECK_LE(num_running_best_effort_tasks_, num_running_tasks_);
629     DCHECK_LE(num_running_best_effort_tasks_, max_best_effort_tasks_);
630   }
631   UpdateMinAllowedPriorityLockRequired();
632 }
633 
DecrementMaxTasksLockRequired()634 void ThreadGroup::DecrementMaxTasksLockRequired() {
635   DCHECK_GT(num_running_tasks_, 0U);
636   DCHECK_GT(max_tasks_, 0U);
637   --max_tasks_;
638   UpdateMinAllowedPriorityLockRequired();
639 }
640 
IncrementMaxTasksLockRequired()641 void ThreadGroup::IncrementMaxTasksLockRequired() {
642   DCHECK_GT(num_running_tasks_, 0U);
643   ++max_tasks_;
644   UpdateMinAllowedPriorityLockRequired();
645 }
646 
DecrementMaxBestEffortTasksLockRequired()647 void ThreadGroup::DecrementMaxBestEffortTasksLockRequired() {
648   DCHECK_GT(num_running_tasks_, 0U);
649   DCHECK_GT(max_best_effort_tasks_, 0U);
650   --max_best_effort_tasks_;
651   UpdateMinAllowedPriorityLockRequired();
652 }
653 
IncrementMaxBestEffortTasksLockRequired()654 void ThreadGroup::IncrementMaxBestEffortTasksLockRequired() {
655   DCHECK_GT(num_running_tasks_, 0U);
656   ++max_best_effort_tasks_;
657   UpdateMinAllowedPriorityLockRequired();
658 }
659 
660 ThreadGroup::InitializedInStart::InitializedInStart() = default;
661 ThreadGroup::InitializedInStart::~InitializedInStart() = default;
662 
663 }  // namespace internal
664 }  // namespace base
665