• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group_impl.h"
6 
7 #include <stddef.h>
8 
9 #include <algorithm>
10 #include <type_traits>
11 #include <utility>
12 
13 #include "base/atomicops.h"
14 #include "base/auto_reset.h"
15 #include "base/compiler_specific.h"
16 #include "base/feature_list.h"
17 #include "base/functional/bind.h"
18 #include "base/functional/callback_helpers.h"
19 #include "base/location.h"
20 #include "base/memory/ptr_util.h"
21 #include "base/memory/raw_ptr.h"
22 #include "base/metrics/histogram_macros.h"
23 #include "base/numerics/clamped_math.h"
24 #include "base/ranges/algorithm.h"
25 #include "base/sequence_token.h"
26 #include "base/strings/string_piece.h"
27 #include "base/strings/string_util.h"
28 #include "base/strings/stringprintf.h"
29 #include "base/synchronization/waitable_event.h"
30 #include "base/task/task_features.h"
31 #include "base/task/task_traits.h"
32 #include "base/task/thread_pool/task_tracker.h"
33 #include "base/task/thread_pool/worker_thread_waitable_event.h"
34 #include "base/threading/platform_thread.h"
35 #include "base/threading/scoped_blocking_call.h"
36 #include "base/threading/scoped_blocking_call_internal.h"
37 #include "base/threading/thread_checker.h"
38 #include "base/threading/thread_restrictions.h"
39 #include "base/time/time_override.h"
40 #include "build/build_config.h"
41 #include "third_party/abseil-cpp/absl/types/optional.h"
42 
43 #if BUILDFLAG(IS_WIN)
44 #include "base/win/scoped_com_initializer.h"
45 #include "base/win/scoped_windows_thread_environment.h"
46 #include "base/win/scoped_winrt_initializer.h"
47 #include "base/win/windows_version.h"
48 #endif  // BUILDFLAG(IS_WIN)
49 
50 namespace base {
51 namespace internal {
52 
53 namespace {
54 
55 constexpr size_t kMaxNumberOfWorkers = 256;
56 
57 // In a background thread group:
58 // - Blocking calls take more time than in a foreground thread group.
59 // - We want to minimize impact on foreground work, not maximize execution
60 //   throughput.
61 // For these reasons, the timeout to increase the maximum number of concurrent
62 // tasks when there is a MAY_BLOCK ScopedBlockingCall is *long*. It is not
63 // infinite because execution throughput should not be reduced forever if a task
64 // blocks forever.
65 //
66 // TODO(fdoray): On platforms without background thread groups, blocking in a
67 // BEST_EFFORT task should:
68 // 1. Increment the maximum number of concurrent tasks after a *short* timeout,
69 //    to allow scheduling of USER_VISIBLE/USER_BLOCKING tasks.
70 // 2. Increment the maximum number of concurrent BEST_EFFORT tasks after a
71 //    *long* timeout, because we only want to allow more BEST_EFFORT tasks to be
72 //    be scheduled concurrently when we believe that a BEST_EFFORT task is
73 //    blocked forever.
74 // Currently, only 1. is true as the configuration is per thread group.
75 // TODO(https://crbug.com/927755): Fix racy condition when MayBlockThreshold ==
76 // BlockedWorkersPoll.
77 constexpr TimeDelta kForegroundMayBlockThreshold = Milliseconds(1000);
78 constexpr TimeDelta kForegroundBlockedWorkersPoll = Milliseconds(1200);
79 constexpr TimeDelta kBackgroundMayBlockThreshold = Seconds(10);
80 constexpr TimeDelta kBackgroundBlockedWorkersPoll = Seconds(12);
81 
82 // Only used in DCHECKs.
ContainsWorker(const std::vector<scoped_refptr<WorkerThreadWaitableEvent>> & workers,const WorkerThreadWaitableEvent * worker)83 bool ContainsWorker(
84     const std::vector<scoped_refptr<WorkerThreadWaitableEvent>>& workers,
85     const WorkerThreadWaitableEvent* worker) {
86   auto it = ranges::find_if(
87       workers, [worker](const scoped_refptr<WorkerThreadWaitableEvent>& i) {
88         return i.get() == worker;
89       });
90   return it != workers.end();
91 }
92 
93 }  // namespace
94 
95 // Upon destruction, executes actions that control the number of active workers.
96 // Useful to satisfy locking requirements of these actions.
97 class ThreadGroupImpl::ScopedCommandsExecutor
98     : public ThreadGroup::BaseScopedCommandsExecutor {
99  public:
ScopedCommandsExecutor(ThreadGroupImpl * outer)100   explicit ScopedCommandsExecutor(ThreadGroupImpl* outer) : outer_(outer) {}
101 
102   ScopedCommandsExecutor(const ScopedCommandsExecutor&) = delete;
103   ScopedCommandsExecutor& operator=(const ScopedCommandsExecutor&) = delete;
~ScopedCommandsExecutor()104   ~ScopedCommandsExecutor() { FlushImpl(); }
105 
ScheduleWakeUp(scoped_refptr<WorkerThreadWaitableEvent> worker)106   void ScheduleWakeUp(scoped_refptr<WorkerThreadWaitableEvent> worker) {
107     workers_to_wake_up_.AddWorker(std::move(worker));
108   }
109 
ScheduleStart(scoped_refptr<WorkerThreadWaitableEvent> worker)110   void ScheduleStart(scoped_refptr<WorkerThreadWaitableEvent> worker) {
111     workers_to_start_.AddWorker(std::move(worker));
112   }
113 
FlushWorkerCreation(CheckedLock * held_lock)114   void FlushWorkerCreation(CheckedLock* held_lock) {
115     if (workers_to_wake_up_.empty() && workers_to_start_.empty())
116       return;
117     CheckedAutoUnlock auto_unlock(*held_lock);
118     FlushImpl();
119     workers_to_wake_up_.clear();
120     workers_to_start_.clear();
121     must_schedule_adjust_max_tasks_ = false;
122   }
123 
ScheduleAdjustMaxTasks()124   void ScheduleAdjustMaxTasks() {
125     DCHECK(!must_schedule_adjust_max_tasks_);
126     must_schedule_adjust_max_tasks_ = true;
127   }
128 
129  private:
130   class WorkerContainer {
131    public:
132     WorkerContainer() = default;
133     WorkerContainer(const WorkerContainer&) = delete;
134     WorkerContainer& operator=(const WorkerContainer&) = delete;
135 
AddWorker(scoped_refptr<WorkerThreadWaitableEvent> worker)136     void AddWorker(scoped_refptr<WorkerThreadWaitableEvent> worker) {
137       if (!worker)
138         return;
139       if (!first_worker_)
140         first_worker_ = std::move(worker);
141       else
142         additional_workers_.push_back(std::move(worker));
143     }
144 
145     template <typename Action>
ForEachWorker(Action action)146     void ForEachWorker(Action action) {
147       if (first_worker_) {
148         action(first_worker_.get());
149         for (scoped_refptr<WorkerThreadWaitableEvent> worker :
150              additional_workers_) {
151           action(worker.get());
152         }
153       } else {
154         DCHECK(additional_workers_.empty());
155       }
156     }
157 
empty() const158     bool empty() const { return first_worker_ == nullptr; }
159 
clear()160     void clear() {
161       first_worker_.reset();
162       additional_workers_.clear();
163     }
164 
165    private:
166     // The purpose of |first_worker| is to avoid a heap allocation by the vector
167     // in the case where there is only one worker in the container.
168     scoped_refptr<WorkerThreadWaitableEvent> first_worker_;
169     std::vector<scoped_refptr<WorkerThreadWaitableEvent>> additional_workers_;
170   };
171 
FlushImpl()172   void FlushImpl() {
173     CheckedLock::AssertNoLockHeldOnCurrentThread();
174 
175     // Wake up workers.
176     workers_to_wake_up_.ForEachWorker(
177         [](WorkerThreadWaitableEvent* worker) { worker->WakeUp(); });
178 
179     // Start workers. Happens after wake ups to prevent the case where a worker
180     // enters its main function, is descheduled because it wasn't woken up yet,
181     // and is woken up immediately after.
182     workers_to_start_.ForEachWorker([&](WorkerThreadWaitableEvent* worker) {
183       worker->Start(outer_->after_start().service_thread_task_runner,
184                     outer_->after_start().worker_thread_observer);
185       if (outer_->worker_started_for_testing_)
186         outer_->worker_started_for_testing_->Wait();
187     });
188 
189     if (must_schedule_adjust_max_tasks_)
190       outer_->ScheduleAdjustMaxTasks();
191   }
192 
193   const raw_ptr<ThreadGroupImpl> outer_;
194 
195   WorkerContainer workers_to_wake_up_;
196   WorkerContainer workers_to_start_;
197   bool must_schedule_adjust_max_tasks_ = false;
198 };
199 
200 class ThreadGroupImpl::WorkerThreadDelegateImpl
201     : public WorkerThreadWaitableEvent::Delegate,
202       public BlockingObserver {
203  public:
204   // |outer| owns the worker for which this delegate is constructed. If
205   // |is_excess| is true, this worker will be eligible for reclaim.
206   explicit WorkerThreadDelegateImpl(TrackedRef<ThreadGroupImpl> outer,
207                                     bool is_excess);
208   WorkerThreadDelegateImpl(const WorkerThreadDelegateImpl&) = delete;
209   WorkerThreadDelegateImpl& operator=(const WorkerThreadDelegateImpl&) = delete;
210 
211   // OnMainExit() handles the thread-affine cleanup; WorkerThreadDelegateImpl
212   // can thereafter safely be deleted from any thread.
213   ~WorkerThreadDelegateImpl() override = default;
214 
215   // WorkerThreadWaitableEvent::Delegate:
216   WorkerThread::ThreadLabel GetThreadLabel() const override;
217   void OnMainEntry(WorkerThread* worker) override;
218   RegisteredTaskSource GetWork(WorkerThread* worker) override;
219   RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
220                                          WorkerThread* worker) override;
221   TimeDelta GetSleepTimeout() override;
222   void OnMainExit(WorkerThread* worker) override;
223   void RecordUnnecessaryWakeup() override;
224 
225   // BlockingObserver:
226   void BlockingStarted(BlockingType blocking_type) override;
227   void BlockingTypeUpgraded() override;
228   void BlockingEnded() override;
229 
230   // Notifies the worker of shutdown, possibly marking the running task as
231   // MAY_BLOCK.
232   void OnShutdownStartedLockRequired(ScopedCommandsExecutor* executor)
233       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
234 
235   // Returns true iff the worker can get work. Cleans up the worker or puts it
236   // on the idle set if it can't get work.
237   bool CanGetWorkLockRequired(ScopedCommandsExecutor* executor,
238                               WorkerThreadWaitableEvent* worker)
239       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
240 
241   // Increments max [best effort] tasks iff this worker has been within a
242   // ScopedBlockingCall for more than |may_block_threshold|.
243   void MaybeIncrementMaxTasksLockRequired()
244       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
245 
246   // Increments max [best effort] tasks.
247   void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
248 
current_task_priority_lock_required() const249   TaskPriority current_task_priority_lock_required() const
250       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
251     return *read_any().current_task_priority;
252   }
253 
254   // True if this worker is be eligible for reclaim.
is_excess() const255   bool is_excess() const { return is_excess_; }
256 
257   // Exposed for AnnotateAcquiredLockAlias
lock() const258   const CheckedLock& lock() const LOCK_RETURNED(outer_->lock_) {
259     return outer_->lock_;
260   }
261 
262  private:
263   // Returns true if |worker| is allowed to cleanup and remove itself from the
264   // thread group. Called from GetWork() when no work is available.
265   bool CanCleanupLockRequired(const WorkerThreadWaitableEvent* worker) const
266       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
267 
268   // Calls cleanup on |worker| and removes it from the thread group. Called from
269   // GetWork() when no work is available and CanCleanupLockRequired() returns
270   // true.
271   void CleanupLockRequired(ScopedCommandsExecutor* executor,
272                            WorkerThreadWaitableEvent* worker)
273       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
274 
275   // Called in GetWork() when a worker becomes idle.
276   void OnWorkerBecomesIdleLockRequired(ScopedCommandsExecutor* executor,
277                                        WorkerThreadWaitableEvent* worker)
278       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
279 
280   RegisteredTaskSource GetWorkLockRequired(ScopedCommandsExecutor* executor,
281                                            WorkerThreadWaitableEvent* worker)
282       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
283 
284   // Accessed only from the worker thread.
285   struct WorkerOnly {
286     // Associated WorkerThread, if any, initialized in OnMainEntry().
287     raw_ptr<WorkerThreadWaitableEvent> worker_thread_;
288 
289 #if BUILDFLAG(IS_WIN)
290     std::unique_ptr<win::ScopedWindowsThreadEnvironment> win_thread_environment;
291 #endif  // BUILDFLAG(IS_WIN)
292   } worker_only_;
293 
294   // Writes from the worker thread protected by |outer_->lock_|. Reads from any
295   // thread, protected by |outer_->lock_| when not on the worker thread.
296   struct WriteWorkerReadAny {
297     // The priority of the task the worker is currently running if any.
298     absl::optional<TaskPriority> current_task_priority;
299     // The shutdown behavior of the task the worker is currently running if any.
300     absl::optional<TaskShutdownBehavior> current_shutdown_behavior;
301 
302     // Time when MayBlockScopeEntered() was last called. Reset when
303     // BlockingScopeExited() is called.
304     TimeTicks blocking_start_time;
305 
306     // Whether the worker is currently running a task (i.e. GetWork() has
307     // returned a non-empty task source and SwapProcessedTask() hasn't been
308     // called yet).
is_running_taskbase::internal::ThreadGroupImpl::WorkerThreadDelegateImpl::WriteWorkerReadAny309     bool is_running_task() const { return !!current_shutdown_behavior; }
310   } write_worker_read_any_;
311 
worker_only()312   WorkerOnly& worker_only() {
313     DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
314     return worker_only_;
315   }
316 
write_worker()317   WriteWorkerReadAny& write_worker() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
318     DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
319     return write_worker_read_any_;
320   }
321 
read_any() const322   const WriteWorkerReadAny& read_any() const
323       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
324     return write_worker_read_any_;
325   }
326 
read_worker() const327   const WriteWorkerReadAny& read_worker() const {
328     DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
329     return write_worker_read_any_;
330   }
331 
332   const TrackedRef<ThreadGroupImpl> outer_;
333 
334   const bool is_excess_;
335 
336   // Whether |outer_->max_tasks_|/|outer_->max_best_effort_tasks_| were
337   // incremented due to a ScopedBlockingCall on the thread.
338   bool incremented_max_tasks_since_blocked_ GUARDED_BY(outer_->lock_) = false;
339   bool incremented_max_best_effort_tasks_since_blocked_
340       GUARDED_BY(outer_->lock_) = false;
341   // Whether |outer_->max_tasks_| and |outer_->max_best_effort_tasks_| was
342   // incremented due to running CONTINUE_ON_SHUTDOWN on the thread during
343   // shutdown.
344   bool incremented_max_tasks_for_shutdown_ GUARDED_BY(outer_->lock_) = false;
345 
346   // Verifies that specific calls are always made from the worker thread.
347   THREAD_CHECKER(worker_thread_checker_);
348 };
349 
ThreadGroupImpl(StringPiece histogram_label,StringPiece thread_group_label,ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)350 ThreadGroupImpl::ThreadGroupImpl(StringPiece histogram_label,
351                                  StringPiece thread_group_label,
352                                  ThreadType thread_type_hint,
353                                  TrackedRef<TaskTracker> task_tracker,
354                                  TrackedRef<Delegate> delegate)
355     : ThreadGroup(std::move(task_tracker), std::move(delegate)),
356       histogram_label_(histogram_label),
357       thread_group_label_(thread_group_label),
358       thread_type_hint_(thread_type_hint),
359       idle_workers_set_cv_for_testing_(lock_.CreateConditionVariable()),
360       tracked_ref_factory_(this) {
361   DCHECK(!thread_group_label_.empty());
362 }
363 
Start(size_t max_tasks,size_t max_best_effort_tasks,TimeDelta suggested_reclaim_time,scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,WorkerThreadObserver * worker_thread_observer,WorkerEnvironment worker_environment,bool synchronous_thread_start_for_testing,absl::optional<TimeDelta> may_block_threshold)364 void ThreadGroupImpl::Start(
365     size_t max_tasks,
366     size_t max_best_effort_tasks,
367     TimeDelta suggested_reclaim_time,
368     scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
369     WorkerThreadObserver* worker_thread_observer,
370     WorkerEnvironment worker_environment,
371     bool synchronous_thread_start_for_testing,
372     absl::optional<TimeDelta> may_block_threshold) {
373   ThreadGroup::Start();
374 
375   DCHECK(!replacement_thread_group_);
376 
377   in_start().no_worker_reclaim = FeatureList::IsEnabled(kNoWorkerThreadReclaim);
378   in_start().may_block_threshold =
379       may_block_threshold ? may_block_threshold.value()
380                           : (thread_type_hint_ != ThreadType::kBackground
381                                  ? kForegroundMayBlockThreshold
382                                  : kBackgroundMayBlockThreshold);
383   in_start().blocked_workers_poll_period =
384       thread_type_hint_ != ThreadType::kBackground
385           ? kForegroundBlockedWorkersPoll
386           : kBackgroundBlockedWorkersPoll;
387   in_start().ensure_enough_workers_at_end_of_get_work =
388       base::FeatureList::IsEnabled(kUseNewJobImplementation);
389 
390   ScopedCommandsExecutor executor(this);
391   CheckedAutoLock auto_lock(lock_);
392 
393   DCHECK(workers_.empty());
394   max_tasks_ = max_tasks;
395   DCHECK_GE(max_tasks_, 1U);
396   in_start().initial_max_tasks = max_tasks_;
397   DCHECK_LE(in_start().initial_max_tasks, kMaxNumberOfWorkers);
398   max_best_effort_tasks_ = max_best_effort_tasks;
399   in_start().suggested_reclaim_time = suggested_reclaim_time;
400   in_start().worker_environment = worker_environment;
401   in_start().service_thread_task_runner = std::move(service_thread_task_runner);
402   in_start().worker_thread_observer = worker_thread_observer;
403 
404 #if DCHECK_IS_ON()
405   in_start().initialized = true;
406 #endif
407 
408   if (synchronous_thread_start_for_testing) {
409     worker_started_for_testing_.emplace(WaitableEvent::ResetPolicy::AUTOMATIC);
410     // Don't emit a ScopedBlockingCallWithBaseSyncPrimitives from this
411     // WaitableEvent or it defeats the purpose of having threads start without
412     // externally visible side-effects.
413     worker_started_for_testing_->declare_only_used_while_idle();
414   }
415 
416   EnsureEnoughWorkersLockRequired(&executor);
417 }
418 
~ThreadGroupImpl()419 ThreadGroupImpl::~ThreadGroupImpl() {
420   // ThreadGroup should only ever be deleted:
421   //  1) In tests, after JoinForTesting().
422   //  2) In production, iff initialization failed.
423   // In both cases |workers_| should be empty.
424   DCHECK(workers_.empty());
425 }
426 
UpdateSortKey(TaskSource::Transaction transaction)427 void ThreadGroupImpl::UpdateSortKey(TaskSource::Transaction transaction) {
428   ScopedCommandsExecutor executor(this);
429   UpdateSortKeyImpl(&executor, std::move(transaction));
430 }
431 
PushTaskSourceAndWakeUpWorkers(RegisteredTaskSourceAndTransaction transaction_with_task_source)432 void ThreadGroupImpl::PushTaskSourceAndWakeUpWorkers(
433     RegisteredTaskSourceAndTransaction transaction_with_task_source) {
434   ScopedCommandsExecutor executor(this);
435   PushTaskSourceAndWakeUpWorkersImpl(&executor,
436                                      std::move(transaction_with_task_source));
437 }
438 
GetMaxConcurrentNonBlockedTasksDeprecated() const439 size_t ThreadGroupImpl::GetMaxConcurrentNonBlockedTasksDeprecated() const {
440 #if DCHECK_IS_ON()
441   CheckedAutoLock auto_lock(lock_);
442   DCHECK_NE(after_start().initial_max_tasks, 0U)
443       << "GetMaxConcurrentTasksDeprecated() should only be called after the "
444       << "thread group has started.";
445 #endif
446   return after_start().initial_max_tasks;
447 }
448 
WaitForWorkersIdleForTesting(size_t n)449 void ThreadGroupImpl::WaitForWorkersIdleForTesting(size_t n) {
450   CheckedAutoLock auto_lock(lock_);
451 
452 #if DCHECK_IS_ON()
453   DCHECK(!some_workers_cleaned_up_for_testing_)
454       << "Workers detached prior to waiting for a specific number of idle "
455          "workers. Doing the wait under such conditions is flaky. Consider "
456          "setting the suggested reclaim time to TimeDelta::Max() in Start().";
457 #endif
458 
459   WaitForWorkersIdleLockRequiredForTesting(n);
460 }
461 
WaitForAllWorkersIdleForTesting()462 void ThreadGroupImpl::WaitForAllWorkersIdleForTesting() {
463   CheckedAutoLock auto_lock(lock_);
464   WaitForWorkersIdleLockRequiredForTesting(workers_.size());
465 }
466 
WaitForWorkersCleanedUpForTesting(size_t n)467 void ThreadGroupImpl::WaitForWorkersCleanedUpForTesting(size_t n) {
468   CheckedAutoLock auto_lock(lock_);
469 
470   if (!num_workers_cleaned_up_for_testing_cv_)
471     num_workers_cleaned_up_for_testing_cv_ = lock_.CreateConditionVariable();
472 
473   while (num_workers_cleaned_up_for_testing_ < n)
474     num_workers_cleaned_up_for_testing_cv_->Wait();
475 
476   num_workers_cleaned_up_for_testing_ = 0;
477 }
478 
JoinForTesting()479 void ThreadGroupImpl::JoinForTesting() {
480   decltype(workers_) workers_copy;
481   {
482     CheckedAutoLock auto_lock(lock_);
483     priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
484 
485     DCHECK_GT(workers_.size(), size_t(0))
486         << "Joined an unstarted thread group.";
487 
488     join_for_testing_started_ = true;
489 
490     // Ensure WorkerThreads in |workers_| do not attempt to cleanup while
491     // being joined.
492     worker_cleanup_disallowed_for_testing_ = true;
493 
494     // Make a copy of the WorkerThreads so that we can call
495     // WorkerThreadWaitableEvent::JoinForTesting() without holding |lock_| since
496     // WorkerThreads may need to access |workers_|.
497     workers_copy = workers_;
498   }
499   for (const auto& worker : workers_copy)
500     worker->JoinForTesting();
501 
502   CheckedAutoLock auto_lock(lock_);
503   DCHECK(workers_ == workers_copy);
504   // Release |workers_| to clear their TrackedRef against |this|.
505   workers_.clear();
506 }
507 
NumberOfWorkersForTesting() const508 size_t ThreadGroupImpl::NumberOfWorkersForTesting() const {
509   CheckedAutoLock auto_lock(lock_);
510   return workers_.size();
511 }
512 
GetMaxTasksForTesting() const513 size_t ThreadGroupImpl::GetMaxTasksForTesting() const {
514   CheckedAutoLock auto_lock(lock_);
515   return max_tasks_;
516 }
517 
GetMaxBestEffortTasksForTesting() const518 size_t ThreadGroupImpl::GetMaxBestEffortTasksForTesting() const {
519   CheckedAutoLock auto_lock(lock_);
520   return max_best_effort_tasks_;
521 }
522 
NumberOfIdleWorkersForTesting() const523 size_t ThreadGroupImpl::NumberOfIdleWorkersForTesting() const {
524   CheckedAutoLock auto_lock(lock_);
525   return idle_workers_set_.Size();
526 }
527 
WorkerThreadDelegateImpl(TrackedRef<ThreadGroupImpl> outer,bool is_excess)528 ThreadGroupImpl::WorkerThreadDelegateImpl::WorkerThreadDelegateImpl(
529     TrackedRef<ThreadGroupImpl> outer,
530     bool is_excess)
531     : outer_(std::move(outer)), is_excess_(is_excess) {
532   // Bound in OnMainEntry().
533   DETACH_FROM_THREAD(worker_thread_checker_);
534 }
535 
536 WorkerThread::ThreadLabel
GetThreadLabel() const537 ThreadGroupImpl::WorkerThreadDelegateImpl::GetThreadLabel() const {
538   return WorkerThread::ThreadLabel::POOLED;
539 }
540 
OnMainEntry(WorkerThread * worker)541 void ThreadGroupImpl::WorkerThreadDelegateImpl::OnMainEntry(
542     WorkerThread* worker) {
543   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
544 
545   {
546 #if DCHECK_IS_ON()
547     CheckedAutoLock auto_lock(outer_->lock_);
548     DCHECK(ContainsWorker(outer_->workers_,
549                           static_cast<WorkerThreadWaitableEvent*>(worker)));
550 #endif
551   }
552 
553 #if BUILDFLAG(IS_WIN)
554   worker_only().win_thread_environment = GetScopedWindowsThreadEnvironment(
555       outer_->after_start().worker_environment);
556 #endif  // BUILDFLAG(IS_WIN)
557 
558   PlatformThread::SetName(
559       StringPrintf("ThreadPool%sWorker", outer_->thread_group_label_.c_str()));
560 
561   outer_->BindToCurrentThread();
562   worker_only().worker_thread_ =
563       static_cast<WorkerThreadWaitableEvent*>(worker);
564   SetBlockingObserverForCurrentThread(this);
565 
566   if (outer_->worker_started_for_testing_) {
567     // When |worker_started_for_testing_| is set, the thread that starts workers
568     // should wait for a worker to have started before starting the next one,
569     // and there should only be one thread that wakes up workers at a time.
570     DCHECK(!outer_->worker_started_for_testing_->IsSignaled());
571     outer_->worker_started_for_testing_->Signal();
572   }
573 }
574 
575 RegisteredTaskSource
GetWorkLockRequired(ScopedCommandsExecutor * executor,WorkerThreadWaitableEvent * worker)576 ThreadGroupImpl::WorkerThreadDelegateImpl::GetWorkLockRequired(
577     ScopedCommandsExecutor* executor,
578     WorkerThreadWaitableEvent* worker) {
579   DCHECK(ContainsWorker(outer_->workers_, worker));
580 
581   if (!outer_->after_start().ensure_enough_workers_at_end_of_get_work) {
582     // Use this opportunity, before assigning work to this worker, to
583     // create/wake additional workers if needed (doing this here allows us to
584     // reduce potentially expensive create/wake directly on PostTask()).
585     //
586     // Note: FlushWorkerCreation() below releases |outer_->lock_|. It is thus
587     // important that all other operations come after it to keep this method
588     // transactional.
589     outer_->EnsureEnoughWorkersLockRequired(executor);
590     executor->FlushWorkerCreation(&outer_->lock_);
591   }
592 
593   if (!CanGetWorkLockRequired(executor, worker)) {
594     return nullptr;
595   }
596 
597   RegisteredTaskSource task_source;
598   TaskPriority priority;
599   while (!task_source && !outer_->priority_queue_.IsEmpty()) {
600     // Enforce the CanRunPolicy and that no more than |max_best_effort_tasks_|
601     // BEST_EFFORT tasks run concurrently.
602     priority = outer_->priority_queue_.PeekSortKey().priority();
603     if (!outer_->task_tracker_->CanRunPriority(priority) ||
604         (priority == TaskPriority::BEST_EFFORT &&
605          outer_->num_running_best_effort_tasks_ >=
606              outer_->max_best_effort_tasks_)) {
607       break;
608     }
609 
610     task_source = outer_->TakeRegisteredTaskSource(executor);
611   }
612   if (!task_source) {
613     OnWorkerBecomesIdleLockRequired(executor, worker);
614     return nullptr;
615   }
616 
617   // Running task bookkeeping.
618   outer_->IncrementTasksRunningLockRequired(priority);
619   DCHECK(!outer_->idle_workers_set_.Contains(worker));
620   write_worker().current_task_priority = priority;
621   write_worker().current_shutdown_behavior = task_source->shutdown_behavior();
622 
623   if (outer_->after_start().ensure_enough_workers_at_end_of_get_work) {
624     // Subtle: This must be after the call to WillRunTask() inside
625     // TakeRegisteredTaskSource(), so that any state used by WillRunTask() to
626     // determine that the task source must remain in the TaskQueue is also used
627     // to determine the desired number of workers. Concretely, this wouldn't
628     // work:
629     //
630     //   Thread 1: GetWork() calls EnsureEnoughWorkers(). No worker woken up
631     //             because the queue contains a job with max concurrency = 1 and
632     //             the current worker is awake.
633     //   Thread 2: Increases the job's max concurrency.
634     //             ShouldQueueUponCapacityIncrease() returns false because the
635     //             job is already queued.
636     //   Thread 1: Calls WillRunTask() on the job. It returns
637     //             kAllowedNotSaturated because max concurrency is not reached.
638     //             But no extra worker is woken up to run the job!
639     outer_->EnsureEnoughWorkersLockRequired(executor);
640   }
641 
642   return task_source;
643 }
644 
GetWork(WorkerThread * worker_base)645 RegisteredTaskSource ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork(
646     WorkerThread* worker_base) {
647   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
648   DCHECK(!read_worker().current_task_priority);
649   DCHECK(!read_worker().current_shutdown_behavior);
650   WorkerThreadWaitableEvent* worker =
651       static_cast<WorkerThreadWaitableEvent*>(worker_base);
652 
653   ScopedCommandsExecutor executor(outer_.get());
654   CheckedAutoLock auto_lock(outer_->lock_);
655 
656   return GetWorkLockRequired(&executor, worker);
657 }
658 
659 RegisteredTaskSource
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker_thread)660 ThreadGroupImpl::WorkerThreadDelegateImpl::SwapProcessedTask(
661     RegisteredTaskSource task_source,
662     WorkerThread* worker_thread) {
663   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
664   DCHECK(read_worker().current_task_priority);
665   DCHECK(read_worker().current_shutdown_behavior);
666 
667   // A transaction to the TaskSource to reenqueue, if any. Instantiated here as
668   // |TaskSource::lock_| is a UniversalPredecessor and must always be acquired
669   // prior to acquiring a second lock
670   absl::optional<RegisteredTaskSourceAndTransaction>
671       transaction_with_task_source;
672   if (task_source) {
673     transaction_with_task_source.emplace(
674         RegisteredTaskSourceAndTransaction::FromTaskSource(
675             std::move(task_source)));
676   }
677 
678   // Calling WakeUp() guarantees that this WorkerThread will run Tasks from
679   // TaskSources returned by the GetWork() method of |delegate_| until it
680   // returns nullptr. Resetting |wake_up_event_| here doesn't break this
681   // invariant and avoids a useless loop iteration before going to sleep if
682   // WakeUp() is called while this WorkerThread is awake.
683   wake_up_event_.Reset();
684 
685   ScopedCommandsExecutor workers_executor(outer_.get());
686   ScopedReenqueueExecutor reenqueue_executor;
687   CheckedAutoLock auto_lock(outer_->lock_);
688 
689   // During shutdown, max_tasks may have been incremented in StartShutdown().
690   if (incremented_max_tasks_for_shutdown_) {
691     DCHECK(outer_->shutdown_started_);
692     outer_->DecrementMaxTasksLockRequired();
693     if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
694       outer_->DecrementMaxBestEffortTasksLockRequired();
695     }
696     incremented_max_tasks_since_blocked_ = false;
697     incremented_max_best_effort_tasks_since_blocked_ = false;
698     incremented_max_tasks_for_shutdown_ = false;
699   }
700 
701   DCHECK(read_worker().blocking_start_time.is_null());
702   DCHECK(!incremented_max_tasks_since_blocked_);
703   DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
704 
705   // Running task bookkeeping.
706   outer_->DecrementTasksRunningLockRequired(
707       *read_worker().current_task_priority);
708   write_worker().current_shutdown_behavior = absl::nullopt;
709   write_worker().current_task_priority = absl::nullopt;
710 
711   if (transaction_with_task_source) {
712     outer_->ReEnqueueTaskSourceLockRequired(
713         &workers_executor, &reenqueue_executor,
714         std::move(transaction_with_task_source.value()));
715   }
716 
717   return GetWorkLockRequired(
718       &workers_executor,
719       static_cast<WorkerThreadWaitableEvent*>(worker_thread));
720 }
721 
GetSleepTimeout()722 TimeDelta ThreadGroupImpl::WorkerThreadDelegateImpl::GetSleepTimeout() {
723   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
724   if (!is_excess())
725     return TimeDelta::Max();
726   // Sleep for an extra 10% to avoid the following pathological case:
727   //   0) A task is running on a timer which matches
728   //      |after_start().suggested_reclaim_time|.
729   //   1) The timer fires and this worker is created by
730   //      MaintainAtLeastOneIdleWorkerLockRequired() because the last idle
731   //      worker was assigned the task.
732   //   2) This worker begins sleeping |after_start().suggested_reclaim_time| (at
733   //      the front of the idle set).
734   //   3) The task assigned to the other worker completes and the worker goes
735   //      back in the idle set (this worker may now second on the idle set;
736   //      its GetLastUsedTime() is set to Now()).
737   //   4) The sleep in (2) expires. Since (3) was fast this worker is likely to
738   //      have been second on the idle set long enough for
739   //      CanCleanupLockRequired() to be satisfied in which case this worker is
740   //      cleaned up.
741   //   5) The timer fires at roughly the same time and we're back to (1) if (4)
742   //      resulted in a clean up; causing thread churn.
743   //
744   //   Sleeping 10% longer in (2) makes it much less likely that (4) occurs
745   //   before (5). In that case (5) will cause (3) and refresh this worker's
746   //   GetLastUsedTime(), making CanCleanupLockRequired() return false in (4)
747   //   and avoiding churn.
748   //
749   //   Of course the same problem arises if in (0) the timer matches
750   //   |after_start().suggested_reclaim_time * 1.1| but it's expected that any
751   //   timer slower than |after_start().suggested_reclaim_time| will cause such
752   //   churn during long idle periods. If this is a problem in practice, the
753   //   standby thread configuration and algorithm should be revisited.
754   return outer_->after_start().suggested_reclaim_time * 1.1;
755 }
756 
CanCleanupLockRequired(const WorkerThreadWaitableEvent * worker) const757 bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanCleanupLockRequired(
758     const WorkerThreadWaitableEvent* worker) const {
759   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
760   if (!is_excess())
761     return false;
762 
763   const TimeTicks last_used_time = worker->GetLastUsedTime();
764   return !last_used_time.is_null() &&
765          subtle::TimeTicksNowIgnoringOverride() - last_used_time >=
766              outer_->after_start().suggested_reclaim_time &&
767          LIKELY(!outer_->worker_cleanup_disallowed_for_testing_);
768 }
769 
CleanupLockRequired(ScopedCommandsExecutor * executor,WorkerThreadWaitableEvent * worker)770 void ThreadGroupImpl::WorkerThreadDelegateImpl::CleanupLockRequired(
771     ScopedCommandsExecutor* executor,
772     WorkerThreadWaitableEvent* worker) {
773   DCHECK(!outer_->join_for_testing_started_);
774   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
775 
776   worker->Cleanup();
777 
778   if (outer_->IsOnIdleSetLockRequired(worker))
779     outer_->idle_workers_set_.Remove(worker);
780 
781   // Remove the worker from |workers_|.
782   auto worker_iter = ranges::find(outer_->workers_, worker);
783   DCHECK(worker_iter != outer_->workers_.end());
784   outer_->workers_.erase(worker_iter);
785 }
786 
OnWorkerBecomesIdleLockRequired(ScopedCommandsExecutor * executor,WorkerThreadWaitableEvent * worker)787 void ThreadGroupImpl::WorkerThreadDelegateImpl::OnWorkerBecomesIdleLockRequired(
788     ScopedCommandsExecutor* executor,
789     WorkerThreadWaitableEvent* worker) {
790   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
791   DCHECK(!outer_->idle_workers_set_.Contains(worker));
792 
793   // Add the worker to the idle set.
794   outer_->idle_workers_set_.Insert(worker);
795   DCHECK_LE(outer_->idle_workers_set_.Size(), outer_->workers_.size());
796   outer_->idle_workers_set_cv_for_testing_->Broadcast();
797 }
798 
OnMainExit(WorkerThread * worker_base)799 void ThreadGroupImpl::WorkerThreadDelegateImpl::OnMainExit(
800     WorkerThread* worker_base) {
801   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
802 
803 #if DCHECK_IS_ON()
804   WorkerThreadWaitableEvent* worker =
805       static_cast<WorkerThreadWaitableEvent*>(worker_base);
806   {
807     bool shutdown_complete = outer_->task_tracker_->IsShutdownComplete();
808     CheckedAutoLock auto_lock(outer_->lock_);
809 
810     // |worker| should already have been removed from the idle workers set and
811     // |workers_| by the time the thread is about to exit. (except in the cases
812     // where the thread group is no longer going to be used - in which case,
813     // it's fine for there to be invalid workers in the thread group.
814     if (!shutdown_complete && !outer_->join_for_testing_started_) {
815       DCHECK(!outer_->idle_workers_set_.Contains(worker));
816       DCHECK(!ContainsWorker(outer_->workers_, worker));
817     }
818   }
819 #endif
820 
821 #if BUILDFLAG(IS_WIN)
822   worker_only().win_thread_environment.reset();
823 #endif  // BUILDFLAG(IS_WIN)
824 
825   // Count cleaned up workers for tests. It's important to do this here instead
826   // of at the end of CleanupLockRequired() because some side-effects of
827   // cleaning up happen outside the lock (e.g. recording histograms) and
828   // resuming from tests must happen-after that point or checks on the main
829   // thread will be flaky (crbug.com/1047733).
830   CheckedAutoLock auto_lock(outer_->lock_);
831   ++outer_->num_workers_cleaned_up_for_testing_;
832 #if DCHECK_IS_ON()
833   outer_->some_workers_cleaned_up_for_testing_ = true;
834 #endif
835   if (outer_->num_workers_cleaned_up_for_testing_cv_)
836     outer_->num_workers_cleaned_up_for_testing_cv_->Signal();
837 }
838 
RecordUnnecessaryWakeup()839 void ThreadGroupImpl::WorkerThreadDelegateImpl::RecordUnnecessaryWakeup() {
840   base::BooleanHistogram::FactoryGet(
841       std::string("ThreadPool.UnnecessaryWakeup.") + outer_->histogram_label_,
842       base::Histogram::kUmaTargetedHistogramFlag)
843       ->Add(true);
844 
845   TRACE_EVENT_INSTANT("wakeup.flow", "ThreadPool.UnnecessaryWakeup");
846 }
847 
BlockingStarted(BlockingType blocking_type)848 void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingStarted(
849     BlockingType blocking_type) {
850   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
851   DCHECK(worker_only().worker_thread_);
852   // Skip if this blocking scope happened outside of a RunTask.
853   if (!read_worker().current_task_priority) {
854     return;
855   }
856 
857   worker_only().worker_thread_->MaybeUpdateThreadType();
858 
859   // WillBlock is always used when time overrides is active. crbug.com/1038867
860   if (base::subtle::ScopedTimeClockOverrides::overrides_active()) {
861     blocking_type = BlockingType::WILL_BLOCK;
862   }
863 
864   ScopedCommandsExecutor executor(outer_.get());
865   CheckedAutoLock auto_lock(outer_->lock_);
866 
867   DCHECK(!incremented_max_tasks_since_blocked_);
868   DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
869   DCHECK(read_worker().blocking_start_time.is_null());
870   write_worker().blocking_start_time = subtle::TimeTicksNowIgnoringOverride();
871 
872   if (incremented_max_tasks_for_shutdown_)
873     return;
874 
875   if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT)
876     ++outer_->num_unresolved_best_effort_may_block_;
877 
878   if (blocking_type == BlockingType::WILL_BLOCK) {
879     incremented_max_tasks_since_blocked_ = true;
880     outer_->IncrementMaxTasksLockRequired();
881     outer_->EnsureEnoughWorkersLockRequired(&executor);
882   } else {
883     ++outer_->num_unresolved_may_block_;
884   }
885 
886   outer_->MaybeScheduleAdjustMaxTasksLockRequired(&executor);
887 }
888 
BlockingTypeUpgraded()889 void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingTypeUpgraded() {
890   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
891   // Skip if this blocking scope happened outside of a RunTask.
892   if (!read_worker().current_task_priority) {
893     return;
894   }
895 
896   // The blocking type always being WILL_BLOCK in this experiment and with time
897   // overrides, it should never be considered "upgraded".
898   if (base::subtle::ScopedTimeClockOverrides::overrides_active())
899     return;
900 
901   ScopedCommandsExecutor executor(outer_.get());
902   CheckedAutoLock auto_lock(outer_->lock_);
903 
904   // Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the
905   // same scope already caused the max tasks to be incremented.
906   if (incremented_max_tasks_since_blocked_)
907     return;
908 
909   // Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the
910   // same scope.
911   --outer_->num_unresolved_may_block_;
912 
913   incremented_max_tasks_since_blocked_ = true;
914   outer_->IncrementMaxTasksLockRequired();
915   outer_->EnsureEnoughWorkersLockRequired(&executor);
916 }
917 
BlockingEnded()918 void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingEnded() {
919   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
920   // Skip if this blocking scope happened outside of a RunTask.
921   if (!read_worker().current_task_priority) {
922     return;
923   }
924 
925   CheckedAutoLock auto_lock(outer_->lock_);
926   DCHECK(!read_worker().blocking_start_time.is_null());
927   write_worker().blocking_start_time = TimeTicks();
928   if (!incremented_max_tasks_for_shutdown_) {
929     if (incremented_max_tasks_since_blocked_)
930       outer_->DecrementMaxTasksLockRequired();
931     else
932       --outer_->num_unresolved_may_block_;
933 
934     if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
935       if (incremented_max_best_effort_tasks_since_blocked_)
936         outer_->DecrementMaxBestEffortTasksLockRequired();
937       else
938         --outer_->num_unresolved_best_effort_may_block_;
939     }
940   }
941 
942   incremented_max_tasks_since_blocked_ = false;
943   incremented_max_best_effort_tasks_since_blocked_ = false;
944 }
945 
OnShutdownStartedLockRequired(ScopedCommandsExecutor * executor)946 void ThreadGroupImpl::WorkerThreadDelegateImpl::OnShutdownStartedLockRequired(
947     ScopedCommandsExecutor* executor) {
948   if (!read_any().is_running_task())
949     return;
950   // Workers running a CONTINUE_ON_SHUTDOWN tasks are replaced by incrementing
951   // max_tasks/max_best_effort_tasks. The effect is reverted in
952   // SwapProcessedTask().
953   if (*read_any().current_shutdown_behavior ==
954       TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
955     incremented_max_tasks_for_shutdown_ = true;
956     IncrementMaxTasksLockRequired();
957   }
958 }
959 
CanGetWorkLockRequired(ScopedCommandsExecutor * executor,WorkerThreadWaitableEvent * worker)960 bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanGetWorkLockRequired(
961     ScopedCommandsExecutor* executor,
962     WorkerThreadWaitableEvent* worker) {
963   const bool is_on_idle_workers_set = outer_->IsOnIdleSetLockRequired(worker);
964   DCHECK_EQ(is_on_idle_workers_set, outer_->idle_workers_set_.Contains(worker));
965 
966   // This occurs when the when WorkerThread::Delegate::WaitForWork() times out
967   // (i.e. when the worker's wakes up after GetSleepTimeout()).
968   if (is_on_idle_workers_set) {
969     if (CanCleanupLockRequired(worker))
970       CleanupLockRequired(executor, worker);
971     return false;
972   }
973 
974   // If too many workers are running, this worker should not get work, until
975   // tasks are no longer in excess (i.e. max tasks increases). This ensures that
976   // if this worker is in excess, it gets a chance to being cleaned up.
977   if (outer_->GetNumAwakeWorkersLockRequired() > outer_->max_tasks_) {
978     OnWorkerBecomesIdleLockRequired(executor, worker);
979     return false;
980   }
981 
982   return true;
983 }
984 
985 void ThreadGroupImpl::WorkerThreadDelegateImpl::
MaybeIncrementMaxTasksLockRequired()986     MaybeIncrementMaxTasksLockRequired() {
987   if (read_any().blocking_start_time.is_null() ||
988       subtle::TimeTicksNowIgnoringOverride() - read_any().blocking_start_time <
989           outer_->after_start().may_block_threshold) {
990     return;
991   }
992   IncrementMaxTasksLockRequired();
993 }
994 
995 void ThreadGroupImpl::WorkerThreadDelegateImpl::
IncrementMaxTasksLockRequired()996     IncrementMaxTasksLockRequired() {
997   if (!incremented_max_tasks_since_blocked_) {
998     outer_->IncrementMaxTasksLockRequired();
999     // Update state for an unresolved ScopedBlockingCall.
1000     if (!read_any().blocking_start_time.is_null()) {
1001       incremented_max_tasks_since_blocked_ = true;
1002       --outer_->num_unresolved_may_block_;
1003     }
1004   }
1005   if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT &&
1006       !incremented_max_best_effort_tasks_since_blocked_) {
1007     outer_->IncrementMaxBestEffortTasksLockRequired();
1008     // Update state for an unresolved ScopedBlockingCall.
1009     if (!read_any().blocking_start_time.is_null()) {
1010       incremented_max_best_effort_tasks_since_blocked_ = true;
1011       --outer_->num_unresolved_best_effort_may_block_;
1012     }
1013   }
1014 }
1015 
WaitForWorkersIdleLockRequiredForTesting(size_t n)1016 void ThreadGroupImpl::WaitForWorkersIdleLockRequiredForTesting(size_t n) {
1017   // Make sure workers do not cleanup while watching the idle count.
1018   AutoReset<bool> ban_cleanups(&worker_cleanup_disallowed_for_testing_, true);
1019 
1020   while (idle_workers_set_.Size() < n)
1021     idle_workers_set_cv_for_testing_->Wait();
1022 }
1023 
MaintainAtLeastOneIdleWorkerLockRequired(ScopedCommandsExecutor * executor)1024 void ThreadGroupImpl::MaintainAtLeastOneIdleWorkerLockRequired(
1025     ScopedCommandsExecutor* executor) {
1026   if (workers_.size() == kMaxNumberOfWorkers)
1027     return;
1028   DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
1029 
1030   if (!idle_workers_set_.IsEmpty())
1031     return;
1032 
1033   if (workers_.size() >= max_tasks_)
1034     return;
1035 
1036   scoped_refptr<WorkerThreadWaitableEvent> new_worker =
1037       CreateAndRegisterWorkerLockRequired(executor);
1038   DCHECK(new_worker);
1039   idle_workers_set_.Insert(new_worker.get());
1040 }
1041 
1042 scoped_refptr<WorkerThreadWaitableEvent>
CreateAndRegisterWorkerLockRequired(ScopedCommandsExecutor * executor)1043 ThreadGroupImpl::CreateAndRegisterWorkerLockRequired(
1044     ScopedCommandsExecutor* executor) {
1045   DCHECK(!join_for_testing_started_);
1046   DCHECK_LT(workers_.size(), max_tasks_);
1047   DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
1048   DCHECK(idle_workers_set_.IsEmpty());
1049 
1050   // WorkerThread needs |lock_| as a predecessor for its thread lock because in
1051   // GetWork(), |lock_| is first acquired and then the thread lock is acquired
1052   // when GetLastUsedTime() is called on the worker by CanGetWorkLockRequired().
1053   scoped_refptr<WorkerThreadWaitableEvent> worker =
1054       MakeRefCounted<WorkerThreadWaitableEvent>(
1055           thread_type_hint_,
1056           std::make_unique<WorkerThreadDelegateImpl>(
1057               tracked_ref_factory_.GetTrackedRef(),
1058               /* is_excess=*/after_start().no_worker_reclaim
1059                   ? workers_.size() >= after_start().initial_max_tasks
1060                   : true),
1061           task_tracker_, worker_sequence_num_++, &lock_);
1062 
1063   workers_.push_back(worker);
1064   executor->ScheduleStart(worker);
1065   DCHECK_LE(workers_.size(), max_tasks_);
1066 
1067   return worker;
1068 }
1069 
GetNumAwakeWorkersLockRequired() const1070 size_t ThreadGroupImpl::GetNumAwakeWorkersLockRequired() const {
1071   DCHECK_GE(workers_.size(), idle_workers_set_.Size());
1072   size_t num_awake_workers = workers_.size() - idle_workers_set_.Size();
1073   DCHECK_GE(num_awake_workers, num_running_tasks_);
1074   return num_awake_workers;
1075 }
1076 
GetDesiredNumAwakeWorkersLockRequired() const1077 size_t ThreadGroupImpl::GetDesiredNumAwakeWorkersLockRequired() const {
1078   // Number of BEST_EFFORT task sources that are running or queued and allowed
1079   // to run by the CanRunPolicy.
1080   const size_t num_running_or_queued_can_run_best_effort_task_sources =
1081       num_running_best_effort_tasks_ +
1082       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
1083 
1084   const size_t workers_for_best_effort_task_sources =
1085       std::max(std::min(num_running_or_queued_can_run_best_effort_task_sources,
1086                         max_best_effort_tasks_),
1087                num_running_best_effort_tasks_);
1088 
1089   // Number of USER_{VISIBLE|BLOCKING} task sources that are running or queued.
1090   const size_t num_running_or_queued_foreground_task_sources =
1091       (num_running_tasks_ - num_running_best_effort_tasks_) +
1092       GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
1093 
1094   const size_t workers_for_foreground_task_sources =
1095       num_running_or_queued_foreground_task_sources;
1096 
1097   return std::min({workers_for_best_effort_task_sources +
1098                        workers_for_foreground_task_sources,
1099                    max_tasks_, kMaxNumberOfWorkers});
1100 }
1101 
DidUpdateCanRunPolicy()1102 void ThreadGroupImpl::DidUpdateCanRunPolicy() {
1103   ScopedCommandsExecutor executor(this);
1104   CheckedAutoLock auto_lock(lock_);
1105   EnsureEnoughWorkersLockRequired(&executor);
1106 }
1107 
OnShutdownStarted()1108 void ThreadGroupImpl::OnShutdownStarted() {
1109   ScopedCommandsExecutor executor(this);
1110   CheckedAutoLock auto_lock(lock_);
1111 
1112   // Don't do anything if the thread group isn't started.
1113   if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_))
1114     return;
1115 
1116   // Start a MAY_BLOCK scope on each worker that is already running a task.
1117   for (scoped_refptr<WorkerThreadWaitableEvent>& worker : workers_) {
1118     // The delegates of workers inside a ThreadGroupImpl should be
1119     // WorkerThreadDelegateImpls.
1120     WorkerThreadDelegateImpl* delegate =
1121         static_cast<WorkerThreadDelegateImpl*>(worker->delegate());
1122     AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
1123     delegate->OnShutdownStartedLockRequired(&executor);
1124   }
1125   EnsureEnoughWorkersLockRequired(&executor);
1126 
1127   shutdown_started_ = true;
1128 }
1129 
EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor * base_executor)1130 void ThreadGroupImpl::EnsureEnoughWorkersLockRequired(
1131     BaseScopedCommandsExecutor* base_executor) {
1132   // Don't do anything if the thread group isn't started.
1133   if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_))
1134     return;
1135 
1136   ScopedCommandsExecutor* executor =
1137       static_cast<ScopedCommandsExecutor*>(base_executor);
1138 
1139   const size_t desired_num_awake_workers =
1140       GetDesiredNumAwakeWorkersLockRequired();
1141   const size_t num_awake_workers = GetNumAwakeWorkersLockRequired();
1142 
1143   size_t num_workers_to_wake_up =
1144       ClampSub(desired_num_awake_workers, num_awake_workers);
1145   num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(2U));
1146 
1147   // Wake up the appropriate number of workers.
1148   for (size_t i = 0; i < num_workers_to_wake_up; ++i) {
1149     MaintainAtLeastOneIdleWorkerLockRequired(executor);
1150     WorkerThreadWaitableEvent* worker_to_wakeup = idle_workers_set_.Take();
1151     DCHECK(worker_to_wakeup);
1152     executor->ScheduleWakeUp(worker_to_wakeup);
1153   }
1154 
1155   // In the case where the loop above didn't wake up any worker and we don't
1156   // have excess workers, the idle worker should be maintained. This happens
1157   // when called from the last worker awake, or a recent increase in |max_tasks|
1158   // now makes it possible to keep an idle worker.
1159   if (desired_num_awake_workers == num_awake_workers)
1160     MaintainAtLeastOneIdleWorkerLockRequired(executor);
1161 
1162   // This function is called every time a task source is (re-)enqueued,
1163   // hence the minimum priority needs to be updated.
1164   UpdateMinAllowedPriorityLockRequired();
1165 
1166   // Ensure that the number of workers is periodically adjusted if needed.
1167   MaybeScheduleAdjustMaxTasksLockRequired(executor);
1168 }
1169 
AdjustMaxTasks()1170 void ThreadGroupImpl::AdjustMaxTasks() {
1171   DCHECK(
1172       after_start().service_thread_task_runner->RunsTasksInCurrentSequence());
1173 
1174   ScopedCommandsExecutor executor(this);
1175   CheckedAutoLock auto_lock(lock_);
1176   DCHECK(adjust_max_tasks_posted_);
1177   adjust_max_tasks_posted_ = false;
1178 
1179   // Increment max tasks for each worker that has been within a MAY_BLOCK
1180   // ScopedBlockingCall for more than may_block_threshold.
1181   for (scoped_refptr<WorkerThreadWaitableEvent> worker : workers_) {
1182     // The delegates of workers inside a ThreadGroupImpl should be
1183     // WorkerThreadDelegateImpls.
1184     WorkerThreadDelegateImpl* delegate =
1185         static_cast<WorkerThreadDelegateImpl*>(worker->delegate());
1186     AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
1187     delegate->MaybeIncrementMaxTasksLockRequired();
1188   }
1189 
1190   // Wake up workers according to the updated |max_tasks_|. This will also
1191   // reschedule AdjustMaxTasks() if necessary.
1192   EnsureEnoughWorkersLockRequired(&executor);
1193 }
1194 
ScheduleAdjustMaxTasks()1195 void ThreadGroupImpl::ScheduleAdjustMaxTasks() {
1196   // |adjust_max_tasks_posted_| can't change before the task posted below runs.
1197   // Skip check on NaCl to avoid unsafe reference acquisition warning.
1198 #if !BUILDFLAG(IS_NACL)
1199   DCHECK(TS_UNCHECKED_READ(adjust_max_tasks_posted_));
1200 #endif
1201 
1202   after_start().service_thread_task_runner->PostDelayedTask(
1203       FROM_HERE, BindOnce(&ThreadGroupImpl::AdjustMaxTasks, Unretained(this)),
1204       after_start().blocked_workers_poll_period);
1205 }
1206 
MaybeScheduleAdjustMaxTasksLockRequired(ScopedCommandsExecutor * executor)1207 void ThreadGroupImpl::MaybeScheduleAdjustMaxTasksLockRequired(
1208     ScopedCommandsExecutor* executor) {
1209   if (!adjust_max_tasks_posted_ &&
1210       ShouldPeriodicallyAdjustMaxTasksLockRequired()) {
1211     executor->ScheduleAdjustMaxTasks();
1212     adjust_max_tasks_posted_ = true;
1213   }
1214 }
1215 
ShouldPeriodicallyAdjustMaxTasksLockRequired()1216 bool ThreadGroupImpl::ShouldPeriodicallyAdjustMaxTasksLockRequired() {
1217   // AdjustMaxTasks() should be scheduled to periodically adjust |max_tasks_|
1218   // and |max_best_effort_tasks_| when (1) the concurrency limits are not large
1219   // enough to accommodate all queued and running task sources and an idle
1220   // worker and (2) there are unresolved MAY_BLOCK ScopedBlockingCalls.
1221   // - When (1) is false: No worker would be created or woken up if the
1222   //   concurrency limits were increased, so there is no hurry to increase them.
1223   // - When (2) is false: The concurrency limits could not be increased by
1224   //   AdjustMaxTasks().
1225 
1226   const size_t num_running_or_queued_best_effort_task_sources =
1227       num_running_best_effort_tasks_ +
1228       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
1229   if (num_running_or_queued_best_effort_task_sources > max_best_effort_tasks_ &&
1230       num_unresolved_best_effort_may_block_ > 0) {
1231     return true;
1232   }
1233 
1234   const size_t num_running_or_queued_task_sources =
1235       num_running_tasks_ +
1236       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() +
1237       GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
1238   constexpr size_t kIdleWorker = 1;
1239   return num_running_or_queued_task_sources + kIdleWorker > max_tasks_ &&
1240          num_unresolved_may_block_ > 0;
1241 }
1242 
UpdateMinAllowedPriorityLockRequired()1243 void ThreadGroupImpl::UpdateMinAllowedPriorityLockRequired() {
1244   if (priority_queue_.IsEmpty() || num_running_tasks_ < max_tasks_) {
1245     max_allowed_sort_key_.store(kMaxYieldSortKey, std::memory_order_relaxed);
1246   } else {
1247     max_allowed_sort_key_.store({priority_queue_.PeekSortKey().priority(),
1248                                  priority_queue_.PeekSortKey().worker_count()},
1249                                 std::memory_order_relaxed);
1250   }
1251 }
1252 
IsOnIdleSetLockRequired(WorkerThreadWaitableEvent * worker) const1253 bool ThreadGroupImpl::IsOnIdleSetLockRequired(
1254     WorkerThreadWaitableEvent* worker) const {
1255   // To avoid searching through the idle set : use GetLastUsedTime() not being
1256   // null (or being directly on top of the idle set) as a proxy for being on
1257   // the idle set.
1258   return idle_workers_set_.Peek() == worker ||
1259          !worker->GetLastUsedTime().is_null();
1260 }
1261 
DecrementTasksRunningLockRequired(TaskPriority priority)1262 void ThreadGroupImpl::DecrementTasksRunningLockRequired(TaskPriority priority) {
1263   DCHECK_GT(num_running_tasks_, 0U);
1264   --num_running_tasks_;
1265   if (priority == TaskPriority::BEST_EFFORT) {
1266     DCHECK_GT(num_running_best_effort_tasks_, 0U);
1267     --num_running_best_effort_tasks_;
1268   }
1269   UpdateMinAllowedPriorityLockRequired();
1270 }
1271 
IncrementTasksRunningLockRequired(TaskPriority priority)1272 void ThreadGroupImpl::IncrementTasksRunningLockRequired(TaskPriority priority) {
1273   ++num_running_tasks_;
1274   DCHECK_LE(num_running_tasks_, max_tasks_);
1275   DCHECK_LE(num_running_tasks_, kMaxNumberOfWorkers);
1276   if (priority == TaskPriority::BEST_EFFORT) {
1277     ++num_running_best_effort_tasks_;
1278     DCHECK_LE(num_running_best_effort_tasks_, num_running_tasks_);
1279     DCHECK_LE(num_running_best_effort_tasks_, max_best_effort_tasks_);
1280   }
1281   UpdateMinAllowedPriorityLockRequired();
1282 }
1283 
DecrementMaxTasksLockRequired()1284 void ThreadGroupImpl::DecrementMaxTasksLockRequired() {
1285   DCHECK_GT(num_running_tasks_, 0U);
1286   DCHECK_GT(max_tasks_, 0U);
1287   --max_tasks_;
1288   UpdateMinAllowedPriorityLockRequired();
1289 }
1290 
IncrementMaxTasksLockRequired()1291 void ThreadGroupImpl::IncrementMaxTasksLockRequired() {
1292   DCHECK_GT(num_running_tasks_, 0U);
1293   ++max_tasks_;
1294   UpdateMinAllowedPriorityLockRequired();
1295 }
1296 
DecrementMaxBestEffortTasksLockRequired()1297 void ThreadGroupImpl::DecrementMaxBestEffortTasksLockRequired() {
1298   DCHECK_GT(num_running_tasks_, 0U);
1299   DCHECK_GT(max_best_effort_tasks_, 0U);
1300   --max_best_effort_tasks_;
1301   UpdateMinAllowedPriorityLockRequired();
1302 }
1303 
IncrementMaxBestEffortTasksLockRequired()1304 void ThreadGroupImpl::IncrementMaxBestEffortTasksLockRequired() {
1305   DCHECK_GT(num_running_tasks_, 0U);
1306   ++max_best_effort_tasks_;
1307   UpdateMinAllowedPriorityLockRequired();
1308 }
1309 
1310 ThreadGroupImpl::InitializedInStart::InitializedInStart() = default;
1311 ThreadGroupImpl::InitializedInStart::~InitializedInStart() = default;
1312 
1313 }  // namespace internal
1314 }  // namespace base
1315