• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group_impl.h"
6 
7 #include <stddef.h>
8 
9 #include <algorithm>
10 #include <type_traits>
11 #include <utility>
12 
13 #include "base/atomicops.h"
14 #include "base/auto_reset.h"
15 #include "base/compiler_specific.h"
16 #include "base/containers/stack_container.h"
17 #include "base/feature_list.h"
18 #include "base/functional/bind.h"
19 #include "base/functional/callback_helpers.h"
20 #include "base/location.h"
21 #include "base/memory/ptr_util.h"
22 #include "base/memory/raw_ptr.h"
23 #include "base/metrics/histogram_macros.h"
24 #include "base/numerics/clamped_math.h"
25 #include "base/ranges/algorithm.h"
26 #include "base/sequence_token.h"
27 #include "base/strings/string_piece.h"
28 #include "base/strings/string_util.h"
29 #include "base/strings/stringprintf.h"
30 #include "base/synchronization/waitable_event.h"
31 #include "base/task/task_traits.h"
32 #include "base/task/thread_pool/task_tracker.h"
33 #include "base/threading/platform_thread.h"
34 #include "base/threading/scoped_blocking_call.h"
35 #include "base/threading/scoped_blocking_call_internal.h"
36 #include "base/threading/thread_checker.h"
37 #include "base/threading/thread_restrictions.h"
38 #include "base/time/time_override.h"
39 #include "build/build_config.h"
40 #include "third_party/abseil-cpp/absl/types/optional.h"
41 
42 #if BUILDFLAG(IS_WIN)
43 #include "base/win/scoped_com_initializer.h"
44 #include "base/win/scoped_windows_thread_environment.h"
45 #include "base/win/scoped_winrt_initializer.h"
46 #include "base/win/windows_version.h"
47 #endif  // BUILDFLAG(IS_WIN)
48 
49 namespace base {
50 namespace internal {
51 
52 namespace {
53 
54 constexpr size_t kMaxNumberOfWorkers = 256;
55 
56 // In a background thread group:
57 // - Blocking calls take more time than in a foreground thread group.
58 // - We want to minimize impact on foreground work, not maximize execution
59 //   throughput.
60 // For these reasons, the timeout to increase the maximum number of concurrent
61 // tasks when there is a MAY_BLOCK ScopedBlockingCall is *long*. It is not
62 // infinite because execution throughput should not be reduced forever if a task
63 // blocks forever.
64 //
65 // TODO(fdoray): On platforms without background thread groups, blocking in a
66 // BEST_EFFORT task should:
67 // 1. Increment the maximum number of concurrent tasks after a *short* timeout,
68 //    to allow scheduling of USER_VISIBLE/USER_BLOCKING tasks.
69 // 2. Increment the maximum number of concurrent BEST_EFFORT tasks after a
70 //    *long* timeout, because we only want to allow more BEST_EFFORT tasks to be
71 //    be scheduled concurrently when we believe that a BEST_EFFORT task is
72 //    blocked forever.
73 // Currently, only 1. is true as the configuration is per thread group.
74 // TODO(https://crbug.com/927755): Fix racy condition when MayBlockThreshold ==
75 // BlockedWorkersPoll.
76 constexpr TimeDelta kForegroundMayBlockThreshold = Milliseconds(1000);
77 constexpr TimeDelta kForegroundBlockedWorkersPoll = Milliseconds(1200);
78 constexpr TimeDelta kBackgroundMayBlockThreshold = Seconds(10);
79 constexpr TimeDelta kBackgroundBlockedWorkersPoll = Seconds(12);
80 
81 // Only used in DCHECKs.
ContainsWorker(const std::vector<scoped_refptr<WorkerThread>> & workers,const WorkerThread * worker)82 bool ContainsWorker(const std::vector<scoped_refptr<WorkerThread>>& workers,
83                     const WorkerThread* worker) {
84   auto it =
85       ranges::find_if(workers, [worker](const scoped_refptr<WorkerThread>& i) {
86         return i.get() == worker;
87       });
88   return it != workers.end();
89 }
90 
91 }  // namespace
92 
93 // Upon destruction, executes actions that control the number of active workers.
94 // Useful to satisfy locking requirements of these actions.
95 class ThreadGroupImpl::ScopedCommandsExecutor
96     : public ThreadGroup::BaseScopedCommandsExecutor {
97  public:
ScopedCommandsExecutor(ThreadGroupImpl * outer)98   explicit ScopedCommandsExecutor(ThreadGroupImpl* outer) : outer_(outer) {}
99 
100   ScopedCommandsExecutor(const ScopedCommandsExecutor&) = delete;
101   ScopedCommandsExecutor& operator=(const ScopedCommandsExecutor&) = delete;
~ScopedCommandsExecutor()102   ~ScopedCommandsExecutor() { FlushImpl(); }
103 
ScheduleWakeUp(scoped_refptr<WorkerThread> worker)104   void ScheduleWakeUp(scoped_refptr<WorkerThread> worker) {
105     workers_to_wake_up_.AddWorker(std::move(worker));
106   }
107 
ScheduleStart(scoped_refptr<WorkerThread> worker)108   void ScheduleStart(scoped_refptr<WorkerThread> worker) {
109     workers_to_start_.AddWorker(std::move(worker));
110   }
111 
FlushWorkerCreation(CheckedLock * held_lock)112   void FlushWorkerCreation(CheckedLock* held_lock) {
113     if (workers_to_wake_up_.empty() && workers_to_start_.empty())
114       return;
115     CheckedAutoUnlock auto_unlock(*held_lock);
116     FlushImpl();
117     workers_to_wake_up_.clear();
118     workers_to_start_.clear();
119     must_schedule_adjust_max_tasks_ = false;
120   }
121 
ScheduleAdjustMaxTasks()122   void ScheduleAdjustMaxTasks() {
123     DCHECK(!must_schedule_adjust_max_tasks_);
124     must_schedule_adjust_max_tasks_ = true;
125   }
126 
127  private:
128   class WorkerContainer {
129    public:
130     WorkerContainer() = default;
131     WorkerContainer(const WorkerContainer&) = delete;
132     WorkerContainer& operator=(const WorkerContainer&) = delete;
133 
AddWorker(scoped_refptr<WorkerThread> worker)134     void AddWorker(scoped_refptr<WorkerThread> worker) {
135       if (!worker)
136         return;
137       if (!first_worker_)
138         first_worker_ = std::move(worker);
139       else
140         additional_workers_.push_back(std::move(worker));
141     }
142 
143     template <typename Action>
ForEachWorker(Action action)144     void ForEachWorker(Action action) {
145       if (first_worker_) {
146         action(first_worker_.get());
147         for (scoped_refptr<WorkerThread> worker : additional_workers_)
148           action(worker.get());
149       } else {
150         DCHECK(additional_workers_.empty());
151       }
152     }
153 
empty() const154     bool empty() const { return first_worker_ == nullptr; }
155 
clear()156     void clear() {
157       first_worker_.reset();
158       additional_workers_.clear();
159     }
160 
161    private:
162     // The purpose of |first_worker| is to avoid a heap allocation by the vector
163     // in the case where there is only one worker in the container.
164     scoped_refptr<WorkerThread> first_worker_;
165     std::vector<scoped_refptr<WorkerThread>> additional_workers_;
166   };
167 
FlushImpl()168   void FlushImpl() {
169     CheckedLock::AssertNoLockHeldOnCurrentThread();
170 
171     // Wake up workers.
172     workers_to_wake_up_.ForEachWorker(
173         [](WorkerThread* worker) { worker->WakeUp(); });
174 
175     // Start workers. Happens after wake ups to prevent the case where a worker
176     // enters its main function, is descheduled because it wasn't woken up yet,
177     // and is woken up immediately after.
178     workers_to_start_.ForEachWorker([&](WorkerThread* worker) {
179       worker->Start(outer_->after_start().service_thread_task_runner,
180                     outer_->after_start().worker_thread_observer);
181       if (outer_->worker_started_for_testing_)
182         outer_->worker_started_for_testing_->Wait();
183     });
184 
185     if (must_schedule_adjust_max_tasks_)
186       outer_->ScheduleAdjustMaxTasks();
187   }
188 
189   const raw_ptr<ThreadGroupImpl> outer_;
190 
191   WorkerContainer workers_to_wake_up_;
192   WorkerContainer workers_to_start_;
193   bool must_schedule_adjust_max_tasks_ = false;
194 };
195 
196 class ThreadGroupImpl::WorkerThreadDelegateImpl : public WorkerThread::Delegate,
197                                                   public BlockingObserver {
198  public:
199   // |outer| owns the worker for which this delegate is constructed. If
200   // |is_excess| is true, this worker will be eligible for reclaim.
201   explicit WorkerThreadDelegateImpl(TrackedRef<ThreadGroupImpl> outer,
202                                     bool is_excess);
203   WorkerThreadDelegateImpl(const WorkerThreadDelegateImpl&) = delete;
204   WorkerThreadDelegateImpl& operator=(const WorkerThreadDelegateImpl&) = delete;
205 
206   // OnMainExit() handles the thread-affine cleanup; WorkerThreadDelegateImpl
207   // can thereafter safely be deleted from any thread.
208   ~WorkerThreadDelegateImpl() override = default;
209 
210   // WorkerThread::Delegate:
211   WorkerThread::ThreadLabel GetThreadLabel() const override;
212   void OnMainEntry(WorkerThread* worker) override;
213   RegisteredTaskSource GetWork(WorkerThread* worker) override;
214   void DidProcessTask(RegisteredTaskSource task_source) override;
215   TimeDelta GetSleepTimeout() override;
216   void OnMainExit(WorkerThread* worker) override;
217   void RecordUnnecessaryWakeup() override;
218 
219   // BlockingObserver:
220   void BlockingStarted(BlockingType blocking_type) override;
221   void BlockingTypeUpgraded() override;
222   void BlockingEnded() override;
223 
224   // Notifies the worker of shutdown, possibly marking the running task as
225   // MAY_BLOCK.
226   void OnShutdownStartedLockRequired(ScopedCommandsExecutor* executor)
227       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
228 
229   // Returns true iff the worker can get work. Cleans up the worker or puts it
230   // on the idle set if it can't get work.
231   bool CanGetWorkLockRequired(ScopedCommandsExecutor* executor,
232                               WorkerThread* worker)
233       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
234 
235   // Increments max [best effort] tasks iff this worker has been within a
236   // ScopedBlockingCall for more than |may_block_threshold|.
237   void MaybeIncrementMaxTasksLockRequired()
238       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
239 
240   // Increments max [best effort] tasks.
241   void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
242 
current_task_priority_lock_required() const243   TaskPriority current_task_priority_lock_required() const
244       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
245     return *read_any().current_task_priority;
246   }
247 
248   // True if this worker is be eligible for reclaim.
is_excess() const249   bool is_excess() const { return is_excess_; }
250 
251   // Exposed for AnnotateAcquiredLockAlias
lock() const252   const CheckedLock& lock() const LOCK_RETURNED(outer_->lock_) {
253     return outer_->lock_;
254   }
255 
256  private:
257   // Returns true if |worker| is allowed to cleanup and remove itself from the
258   // thread group. Called from GetWork() when no work is available.
259   bool CanCleanupLockRequired(const WorkerThread* worker) const
260       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
261 
262   // Calls cleanup on |worker| and removes it from the thread group. Called from
263   // GetWork() when no work is available and CanCleanupLockRequired() returns
264   // true.
265   void CleanupLockRequired(ScopedCommandsExecutor* executor,
266                            WorkerThread* worker)
267       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
268 
269   // Called in GetWork() when a worker becomes idle.
270   void OnWorkerBecomesIdleLockRequired(ScopedCommandsExecutor* executor,
271                                        WorkerThread* worker)
272       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
273 
274   // Accessed only from the worker thread.
275   struct WorkerOnly {
276     // Associated WorkerThread, if any, initialized in OnMainEntry().
277     raw_ptr<WorkerThread> worker_thread_;
278 
279 #if BUILDFLAG(IS_WIN)
280     std::unique_ptr<win::ScopedWindowsThreadEnvironment> win_thread_environment;
281 #endif  // BUILDFLAG(IS_WIN)
282   } worker_only_;
283 
284   // Writes from the worker thread protected by |outer_->lock_|. Reads from any
285   // thread, protected by |outer_->lock_| when not on the worker thread.
286   struct WriteWorkerReadAny {
287     // The priority of the task the worker is currently running if any.
288     absl::optional<TaskPriority> current_task_priority;
289     // The shutdown behavior of the task the worker is currently running if any.
290     absl::optional<TaskShutdownBehavior> current_shutdown_behavior;
291 
292     // Time when MayBlockScopeEntered() was last called. Reset when
293     // BlockingScopeExited() is called.
294     TimeTicks blocking_start_time;
295 
296     // Whether the worker is currently running a task (i.e. GetWork() has
297     // returned a non-empty task source and DidProcessTask() hasn't been called
298     // yet).
is_running_taskbase::internal::ThreadGroupImpl::WorkerThreadDelegateImpl::WriteWorkerReadAny299     bool is_running_task() const { return !!current_shutdown_behavior; }
300   } write_worker_read_any_;
301 
worker_only()302   WorkerOnly& worker_only() {
303     DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
304     return worker_only_;
305   }
306 
write_worker()307   WriteWorkerReadAny& write_worker() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
308     DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
309     return write_worker_read_any_;
310   }
311 
read_any() const312   const WriteWorkerReadAny& read_any() const
313       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
314     return write_worker_read_any_;
315   }
316 
read_worker() const317   const WriteWorkerReadAny& read_worker() const {
318     DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
319     return write_worker_read_any_;
320   }
321 
322   const TrackedRef<ThreadGroupImpl> outer_;
323 
324   const bool is_excess_;
325 
326   // Whether |outer_->max_tasks_|/|outer_->max_best_effort_tasks_| were
327   // incremented due to a ScopedBlockingCall on the thread.
328   bool incremented_max_tasks_since_blocked_ GUARDED_BY(outer_->lock_) = false;
329   bool incremented_max_best_effort_tasks_since_blocked_
330       GUARDED_BY(outer_->lock_) = false;
331   // Whether |outer_->max_tasks_| and |outer_->max_best_effort_tasks_| was
332   // incremented due to running CONTINUE_ON_SHUTDOWN on the thread during
333   // shutdown.
334   bool incremented_max_tasks_for_shutdown_ GUARDED_BY(outer_->lock_) = false;
335 
336   // Verifies that specific calls are always made from the worker thread.
337   THREAD_CHECKER(worker_thread_checker_);
338 };
339 
ThreadGroupImpl(StringPiece histogram_label,StringPiece thread_group_label,ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate,ThreadGroup * predecessor_thread_group)340 ThreadGroupImpl::ThreadGroupImpl(StringPiece histogram_label,
341                                  StringPiece thread_group_label,
342                                  ThreadType thread_type_hint,
343                                  TrackedRef<TaskTracker> task_tracker,
344                                  TrackedRef<Delegate> delegate,
345                                  ThreadGroup* predecessor_thread_group)
346     : ThreadGroup(std::move(task_tracker),
347                   std::move(delegate),
348                   predecessor_thread_group),
349       histogram_label_(histogram_label),
350       thread_group_label_(thread_group_label),
351       thread_type_hint_(thread_type_hint),
352       idle_workers_set_cv_for_testing_(lock_.CreateConditionVariable()),
353       tracked_ref_factory_(this) {
354   DCHECK(!thread_group_label_.empty());
355 }
356 
Start(size_t max_tasks,size_t max_best_effort_tasks,TimeDelta suggested_reclaim_time,scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,WorkerThreadObserver * worker_thread_observer,WorkerEnvironment worker_environment,bool synchronous_thread_start_for_testing,absl::optional<TimeDelta> may_block_threshold)357 void ThreadGroupImpl::Start(
358     size_t max_tasks,
359     size_t max_best_effort_tasks,
360     TimeDelta suggested_reclaim_time,
361     scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
362     WorkerThreadObserver* worker_thread_observer,
363     WorkerEnvironment worker_environment,
364     bool synchronous_thread_start_for_testing,
365     absl::optional<TimeDelta> may_block_threshold) {
366   ThreadGroup::Start();
367 
368   DCHECK(!replacement_thread_group_);
369 
370   in_start().no_worker_reclaim = FeatureList::IsEnabled(kNoWorkerThreadReclaim);
371   in_start().may_block_threshold =
372       may_block_threshold ? may_block_threshold.value()
373                           : (thread_type_hint_ != ThreadType::kBackground
374                                  ? kForegroundMayBlockThreshold
375                                  : kBackgroundMayBlockThreshold);
376   in_start().blocked_workers_poll_period =
377       thread_type_hint_ != ThreadType::kBackground
378           ? kForegroundBlockedWorkersPoll
379           : kBackgroundBlockedWorkersPoll;
380 
381   ScopedCommandsExecutor executor(this);
382   CheckedAutoLock auto_lock(lock_);
383 
384   DCHECK(workers_.empty());
385   max_tasks_ = max_tasks;
386   DCHECK_GE(max_tasks_, 1U);
387   in_start().initial_max_tasks = max_tasks_;
388   DCHECK_LE(in_start().initial_max_tasks, kMaxNumberOfWorkers);
389   max_best_effort_tasks_ = max_best_effort_tasks;
390   in_start().suggested_reclaim_time = suggested_reclaim_time;
391   in_start().worker_environment = worker_environment;
392   in_start().service_thread_task_runner = std::move(service_thread_task_runner);
393   in_start().worker_thread_observer = worker_thread_observer;
394 
395 #if DCHECK_IS_ON()
396   in_start().initialized = true;
397 #endif
398 
399   if (synchronous_thread_start_for_testing) {
400     worker_started_for_testing_.emplace(WaitableEvent::ResetPolicy::AUTOMATIC);
401     // Don't emit a ScopedBlockingCallWithBaseSyncPrimitives from this
402     // WaitableEvent or it defeats the purpose of having threads start without
403     // externally visible side-effects.
404     worker_started_for_testing_->declare_only_used_while_idle();
405   }
406 
407   EnsureEnoughWorkersLockRequired(&executor);
408 }
409 
~ThreadGroupImpl()410 ThreadGroupImpl::~ThreadGroupImpl() {
411   // ThreadGroup should only ever be deleted:
412   //  1) In tests, after JoinForTesting().
413   //  2) In production, iff initialization failed.
414   // In both cases |workers_| should be empty.
415   DCHECK(workers_.empty());
416 }
417 
UpdateSortKey(TaskSource::Transaction transaction)418 void ThreadGroupImpl::UpdateSortKey(TaskSource::Transaction transaction) {
419   ScopedCommandsExecutor executor(this);
420   UpdateSortKeyImpl(&executor, std::move(transaction));
421 }
422 
PushTaskSourceAndWakeUpWorkers(TransactionWithRegisteredTaskSource transaction_with_task_source)423 void ThreadGroupImpl::PushTaskSourceAndWakeUpWorkers(
424     TransactionWithRegisteredTaskSource transaction_with_task_source) {
425   ScopedCommandsExecutor executor(this);
426   PushTaskSourceAndWakeUpWorkersImpl(&executor,
427                                      std::move(transaction_with_task_source));
428 }
429 
GetMaxConcurrentNonBlockedTasksDeprecated() const430 size_t ThreadGroupImpl::GetMaxConcurrentNonBlockedTasksDeprecated() const {
431 #if DCHECK_IS_ON()
432   CheckedAutoLock auto_lock(lock_);
433   DCHECK_NE(after_start().initial_max_tasks, 0U)
434       << "GetMaxConcurrentTasksDeprecated() should only be called after the "
435       << "thread group has started.";
436 #endif
437   return after_start().initial_max_tasks;
438 }
439 
WaitForWorkersIdleForTesting(size_t n)440 void ThreadGroupImpl::WaitForWorkersIdleForTesting(size_t n) {
441   CheckedAutoLock auto_lock(lock_);
442 
443 #if DCHECK_IS_ON()
444   DCHECK(!some_workers_cleaned_up_for_testing_)
445       << "Workers detached prior to waiting for a specific number of idle "
446          "workers. Doing the wait under such conditions is flaky. Consider "
447          "setting the suggested reclaim time to TimeDelta::Max() in Start().";
448 #endif
449 
450   WaitForWorkersIdleLockRequiredForTesting(n);
451 }
452 
WaitForAllWorkersIdleForTesting()453 void ThreadGroupImpl::WaitForAllWorkersIdleForTesting() {
454   CheckedAutoLock auto_lock(lock_);
455   WaitForWorkersIdleLockRequiredForTesting(workers_.size());
456 }
457 
WaitForWorkersCleanedUpForTesting(size_t n)458 void ThreadGroupImpl::WaitForWorkersCleanedUpForTesting(size_t n) {
459   CheckedAutoLock auto_lock(lock_);
460 
461   if (!num_workers_cleaned_up_for_testing_cv_)
462     num_workers_cleaned_up_for_testing_cv_ = lock_.CreateConditionVariable();
463 
464   while (num_workers_cleaned_up_for_testing_ < n)
465     num_workers_cleaned_up_for_testing_cv_->Wait();
466 
467   num_workers_cleaned_up_for_testing_ = 0;
468 }
469 
JoinForTesting()470 void ThreadGroupImpl::JoinForTesting() {
471   decltype(workers_) workers_copy;
472   {
473     CheckedAutoLock auto_lock(lock_);
474     priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
475 
476     DCHECK_GT(workers_.size(), size_t(0))
477         << "Joined an unstarted thread group.";
478 
479     join_for_testing_started_ = true;
480 
481     // Ensure WorkerThreads in |workers_| do not attempt to cleanup while
482     // being joined.
483     worker_cleanup_disallowed_for_testing_ = true;
484 
485     // Make a copy of the WorkerThreads so that we can call
486     // WorkerThread::JoinForTesting() without holding |lock_| since
487     // WorkerThreads may need to access |workers_|.
488     workers_copy = workers_;
489   }
490   for (const auto& worker : workers_copy)
491     worker->JoinForTesting();
492 
493   CheckedAutoLock auto_lock(lock_);
494   DCHECK(workers_ == workers_copy);
495   // Release |workers_| to clear their TrackedRef against |this|.
496   workers_.clear();
497 }
498 
NumberOfWorkersForTesting() const499 size_t ThreadGroupImpl::NumberOfWorkersForTesting() const {
500   CheckedAutoLock auto_lock(lock_);
501   return workers_.size();
502 }
503 
GetMaxTasksForTesting() const504 size_t ThreadGroupImpl::GetMaxTasksForTesting() const {
505   CheckedAutoLock auto_lock(lock_);
506   return max_tasks_;
507 }
508 
GetMaxBestEffortTasksForTesting() const509 size_t ThreadGroupImpl::GetMaxBestEffortTasksForTesting() const {
510   CheckedAutoLock auto_lock(lock_);
511   return max_best_effort_tasks_;
512 }
513 
NumberOfIdleWorkersForTesting() const514 size_t ThreadGroupImpl::NumberOfIdleWorkersForTesting() const {
515   CheckedAutoLock auto_lock(lock_);
516   return idle_workers_set_.Size();
517 }
518 
WorkerThreadDelegateImpl(TrackedRef<ThreadGroupImpl> outer,bool is_excess)519 ThreadGroupImpl::WorkerThreadDelegateImpl::WorkerThreadDelegateImpl(
520     TrackedRef<ThreadGroupImpl> outer,
521     bool is_excess)
522     : outer_(std::move(outer)), is_excess_(is_excess) {
523   // Bound in OnMainEntry().
524   DETACH_FROM_THREAD(worker_thread_checker_);
525 }
526 
527 WorkerThread::ThreadLabel
GetThreadLabel() const528 ThreadGroupImpl::WorkerThreadDelegateImpl::GetThreadLabel() const {
529   return WorkerThread::ThreadLabel::POOLED;
530 }
531 
OnMainEntry(WorkerThread * worker)532 void ThreadGroupImpl::WorkerThreadDelegateImpl::OnMainEntry(
533     WorkerThread* worker) {
534   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
535 
536   {
537 #if DCHECK_IS_ON()
538     CheckedAutoLock auto_lock(outer_->lock_);
539     DCHECK(ContainsWorker(outer_->workers_, worker));
540 #endif
541   }
542 
543 #if BUILDFLAG(IS_WIN)
544   worker_only().win_thread_environment = GetScopedWindowsThreadEnvironment(
545       outer_->after_start().worker_environment);
546 #endif  // BUILDFLAG(IS_WIN)
547 
548   PlatformThread::SetName(
549       StringPrintf("ThreadPool%sWorker", outer_->thread_group_label_.c_str()));
550 
551   outer_->BindToCurrentThread();
552   worker_only().worker_thread_ = worker;
553   SetBlockingObserverForCurrentThread(this);
554 
555   if (outer_->worker_started_for_testing_) {
556     // When |worker_started_for_testing_| is set, the thread that starts workers
557     // should wait for a worker to have started before starting the next one,
558     // and there should only be one thread that wakes up workers at a time.
559     DCHECK(!outer_->worker_started_for_testing_->IsSignaled());
560     outer_->worker_started_for_testing_->Signal();
561   }
562 }
563 
GetWork(WorkerThread * worker)564 RegisteredTaskSource ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork(
565     WorkerThread* worker) {
566   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
567   DCHECK(!read_worker().current_task_priority);
568   DCHECK(!read_worker().current_shutdown_behavior);
569 
570   ScopedCommandsExecutor executor(outer_.get());
571   CheckedAutoLock auto_lock(outer_->lock_);
572 
573   DCHECK(ContainsWorker(outer_->workers_, worker));
574 
575   // Use this opportunity, before assigning work to this worker, to create/wake
576   // additional workers if needed (doing this here allows us to reduce
577   // potentially expensive create/wake directly on PostTask()).
578   //
579   // Note: FlushWorkerCreation() below releases |outer_->lock_|. It is thus
580   // important that all other operations come after it to keep this method
581   // transactional.
582   outer_->EnsureEnoughWorkersLockRequired(&executor);
583   executor.FlushWorkerCreation(&outer_->lock_);
584 
585   if (!CanGetWorkLockRequired(&executor, worker))
586     return nullptr;
587 
588   RegisteredTaskSource task_source;
589   TaskPriority priority;
590   while (!task_source && !outer_->priority_queue_.IsEmpty()) {
591     // Enforce the CanRunPolicy and that no more than |max_best_effort_tasks_|
592     // BEST_EFFORT tasks run concurrently.
593     priority = outer_->priority_queue_.PeekSortKey().priority();
594     if (!outer_->task_tracker_->CanRunPriority(priority) ||
595         (priority == TaskPriority::BEST_EFFORT &&
596          outer_->num_running_best_effort_tasks_ >=
597              outer_->max_best_effort_tasks_)) {
598       break;
599     }
600 
601     task_source = outer_->TakeRegisteredTaskSource(&executor);
602   }
603   if (!task_source) {
604     OnWorkerBecomesIdleLockRequired(&executor, worker);
605     return nullptr;
606   }
607 
608   // Running task bookkeeping.
609   outer_->IncrementTasksRunningLockRequired(priority);
610   DCHECK(!outer_->idle_workers_set_.Contains(worker));
611   write_worker().current_task_priority = priority;
612   write_worker().current_shutdown_behavior = task_source->shutdown_behavior();
613 
614   return task_source;
615 }
616 
DidProcessTask(RegisteredTaskSource task_source)617 void ThreadGroupImpl::WorkerThreadDelegateImpl::DidProcessTask(
618     RegisteredTaskSource task_source) {
619   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
620   DCHECK(read_worker().current_task_priority);
621   DCHECK(read_worker().current_shutdown_behavior);
622 
623   // A transaction to the TaskSource to reenqueue, if any. Instantiated here as
624   // |TaskSource::lock_| is a UniversalPredecessor and must always be acquired
625   // prior to acquiring a second lock
626   absl::optional<TransactionWithRegisteredTaskSource>
627       transaction_with_task_source;
628   if (task_source) {
629     transaction_with_task_source.emplace(
630         TransactionWithRegisteredTaskSource::FromTaskSource(
631             std::move(task_source)));
632   }
633 
634   ScopedCommandsExecutor workers_executor(outer_.get());
635   ScopedReenqueueExecutor reenqueue_executor;
636   CheckedAutoLock auto_lock(outer_->lock_);
637 
638   // During shutdown, max_tasks may have been incremented in StartShutdown().
639   if (incremented_max_tasks_for_shutdown_) {
640     DCHECK(outer_->shutdown_started_);
641     outer_->DecrementMaxTasksLockRequired();
642     if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
643       outer_->DecrementMaxBestEffortTasksLockRequired();
644     }
645     incremented_max_tasks_since_blocked_ = false;
646     incremented_max_best_effort_tasks_since_blocked_ = false;
647     incremented_max_tasks_for_shutdown_ = false;
648   }
649 
650   DCHECK(read_worker().blocking_start_time.is_null());
651   DCHECK(!incremented_max_tasks_since_blocked_);
652   DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
653 
654   // Running task bookkeeping.
655   outer_->DecrementTasksRunningLockRequired(
656       *read_worker().current_task_priority);
657   write_worker().current_shutdown_behavior = absl::nullopt;
658   write_worker().current_task_priority = absl::nullopt;
659 
660   if (transaction_with_task_source) {
661     outer_->ReEnqueueTaskSourceLockRequired(
662         &workers_executor, &reenqueue_executor,
663         std::move(transaction_with_task_source.value()));
664   }
665 }
666 
GetSleepTimeout()667 TimeDelta ThreadGroupImpl::WorkerThreadDelegateImpl::GetSleepTimeout() {
668   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
669   if (!is_excess())
670     return TimeDelta::Max();
671   // Sleep for an extra 10% to avoid the following pathological case:
672   //   0) A task is running on a timer which matches
673   //      |after_start().suggested_reclaim_time|.
674   //   1) The timer fires and this worker is created by
675   //      MaintainAtLeastOneIdleWorkerLockRequired() because the last idle
676   //      worker was assigned the task.
677   //   2) This worker begins sleeping |after_start().suggested_reclaim_time| (at
678   //      the front of the idle set).
679   //   3) The task assigned to the other worker completes and the worker goes
680   //      back in the idle set (this worker may now second on the idle set;
681   //      its GetLastUsedTime() is set to Now()).
682   //   4) The sleep in (2) expires. Since (3) was fast this worker is likely to
683   //      have been second on the idle set long enough for
684   //      CanCleanupLockRequired() to be satisfied in which case this worker is
685   //      cleaned up.
686   //   5) The timer fires at roughly the same time and we're back to (1) if (4)
687   //      resulted in a clean up; causing thread churn.
688   //
689   //   Sleeping 10% longer in (2) makes it much less likely that (4) occurs
690   //   before (5). In that case (5) will cause (3) and refresh this worker's
691   //   GetLastUsedTime(), making CanCleanupLockRequired() return false in (4)
692   //   and avoiding churn.
693   //
694   //   Of course the same problem arises if in (0) the timer matches
695   //   |after_start().suggested_reclaim_time * 1.1| but it's expected that any
696   //   timer slower than |after_start().suggested_reclaim_time| will cause such
697   //   churn during long idle periods. If this is a problem in practice, the
698   //   standby thread configuration and algorithm should be revisited.
699   return outer_->after_start().suggested_reclaim_time * 1.1;
700 }
701 
CanCleanupLockRequired(const WorkerThread * worker) const702 bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanCleanupLockRequired(
703     const WorkerThread* worker) const {
704   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
705   if (!is_excess())
706     return false;
707 
708   const TimeTicks last_used_time = worker->GetLastUsedTime();
709   return !last_used_time.is_null() &&
710          subtle::TimeTicksNowIgnoringOverride() - last_used_time >=
711              outer_->after_start().suggested_reclaim_time &&
712          LIKELY(!outer_->worker_cleanup_disallowed_for_testing_);
713 }
714 
CleanupLockRequired(ScopedCommandsExecutor * executor,WorkerThread * worker)715 void ThreadGroupImpl::WorkerThreadDelegateImpl::CleanupLockRequired(
716     ScopedCommandsExecutor* executor,
717     WorkerThread* worker) {
718   DCHECK(!outer_->join_for_testing_started_);
719   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
720 
721   worker->Cleanup();
722 
723   if (outer_->IsOnIdleSetLockRequired(worker))
724     outer_->idle_workers_set_.Remove(worker);
725 
726   // Remove the worker from |workers_|.
727   auto worker_iter = ranges::find(outer_->workers_, worker);
728   DCHECK(worker_iter != outer_->workers_.end());
729   outer_->workers_.erase(worker_iter);
730 }
731 
OnWorkerBecomesIdleLockRequired(ScopedCommandsExecutor * executor,WorkerThread * worker)732 void ThreadGroupImpl::WorkerThreadDelegateImpl::OnWorkerBecomesIdleLockRequired(
733     ScopedCommandsExecutor* executor,
734     WorkerThread* worker) {
735   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
736   DCHECK(!outer_->idle_workers_set_.Contains(worker));
737 
738   // Add the worker to the idle set.
739   outer_->idle_workers_set_.Insert(worker);
740   DCHECK_LE(outer_->idle_workers_set_.Size(), outer_->workers_.size());
741   outer_->idle_workers_set_cv_for_testing_->Broadcast();
742 }
743 
OnMainExit(WorkerThread * worker)744 void ThreadGroupImpl::WorkerThreadDelegateImpl::OnMainExit(
745     WorkerThread* worker) {
746   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
747 
748 #if DCHECK_IS_ON()
749   {
750     bool shutdown_complete = outer_->task_tracker_->IsShutdownComplete();
751     CheckedAutoLock auto_lock(outer_->lock_);
752 
753     // |worker| should already have been removed from the idle workers set and
754     // |workers_| by the time the thread is about to exit. (except in the cases
755     // where the thread group is no longer going to be used - in which case,
756     // it's fine for there to be invalid workers in the thread group.
757     if (!shutdown_complete && !outer_->join_for_testing_started_) {
758       DCHECK(!outer_->idle_workers_set_.Contains(worker));
759       DCHECK(!ContainsWorker(outer_->workers_, worker));
760     }
761   }
762 #endif
763 
764 #if BUILDFLAG(IS_WIN)
765   worker_only().win_thread_environment.reset();
766 #endif  // BUILDFLAG(IS_WIN)
767 
768   // Count cleaned up workers for tests. It's important to do this here instead
769   // of at the end of CleanupLockRequired() because some side-effects of
770   // cleaning up happen outside the lock (e.g. recording histograms) and
771   // resuming from tests must happen-after that point or checks on the main
772   // thread will be flaky (crbug.com/1047733).
773   CheckedAutoLock auto_lock(outer_->lock_);
774   ++outer_->num_workers_cleaned_up_for_testing_;
775 #if DCHECK_IS_ON()
776   outer_->some_workers_cleaned_up_for_testing_ = true;
777 #endif
778   if (outer_->num_workers_cleaned_up_for_testing_cv_)
779     outer_->num_workers_cleaned_up_for_testing_cv_->Signal();
780 }
781 
RecordUnnecessaryWakeup()782 void ThreadGroupImpl::WorkerThreadDelegateImpl::RecordUnnecessaryWakeup() {
783   base::BooleanHistogram::FactoryGet(
784       std::string("ThreadPool.UnnecessaryWakeup.") + outer_->histogram_label_,
785       base::Histogram::kUmaTargetedHistogramFlag)
786       ->Add(true);
787 
788   TRACE_EVENT_INSTANT("wakeup.flow", "ThreadPool.UnnecessaryWakeup");
789 }
790 
BlockingStarted(BlockingType blocking_type)791 void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingStarted(
792     BlockingType blocking_type) {
793   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
794   DCHECK(worker_only().worker_thread_);
795   // Skip if this blocking scope happened outside of a RunTask.
796   if (!read_worker().current_task_priority) {
797     return;
798   }
799 
800   worker_only().worker_thread_->MaybeUpdateThreadType();
801 
802   // WillBlock is always used when time overrides is active. crbug.com/1038867
803   if (base::subtle::ScopedTimeClockOverrides::overrides_active()) {
804     blocking_type = BlockingType::WILL_BLOCK;
805   }
806 
807   ScopedCommandsExecutor executor(outer_.get());
808   CheckedAutoLock auto_lock(outer_->lock_);
809 
810   DCHECK(!incremented_max_tasks_since_blocked_);
811   DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
812   DCHECK(read_worker().blocking_start_time.is_null());
813   write_worker().blocking_start_time = subtle::TimeTicksNowIgnoringOverride();
814 
815   if (incremented_max_tasks_for_shutdown_)
816     return;
817 
818   if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT)
819     ++outer_->num_unresolved_best_effort_may_block_;
820 
821   if (blocking_type == BlockingType::WILL_BLOCK) {
822     incremented_max_tasks_since_blocked_ = true;
823     outer_->IncrementMaxTasksLockRequired();
824     outer_->EnsureEnoughWorkersLockRequired(&executor);
825   } else {
826     ++outer_->num_unresolved_may_block_;
827   }
828 
829   outer_->MaybeScheduleAdjustMaxTasksLockRequired(&executor);
830 }
831 
BlockingTypeUpgraded()832 void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingTypeUpgraded() {
833   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
834   // Skip if this blocking scope happened outside of a RunTask.
835   if (!read_worker().current_task_priority) {
836     return;
837   }
838 
839   // The blocking type always being WILL_BLOCK in this experiment and with time
840   // overrides, it should never be considered "upgraded".
841   if (base::subtle::ScopedTimeClockOverrides::overrides_active())
842     return;
843 
844   ScopedCommandsExecutor executor(outer_.get());
845   CheckedAutoLock auto_lock(outer_->lock_);
846 
847   // Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the
848   // same scope already caused the max tasks to be incremented.
849   if (incremented_max_tasks_since_blocked_)
850     return;
851 
852   // Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the
853   // same scope.
854   --outer_->num_unresolved_may_block_;
855 
856   incremented_max_tasks_since_blocked_ = true;
857   outer_->IncrementMaxTasksLockRequired();
858   outer_->EnsureEnoughWorkersLockRequired(&executor);
859 }
860 
BlockingEnded()861 void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingEnded() {
862   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
863   // Skip if this blocking scope happened outside of a RunTask.
864   if (!read_worker().current_task_priority) {
865     return;
866   }
867 
868   CheckedAutoLock auto_lock(outer_->lock_);
869   DCHECK(!read_worker().blocking_start_time.is_null());
870   write_worker().blocking_start_time = TimeTicks();
871   if (!incremented_max_tasks_for_shutdown_) {
872     if (incremented_max_tasks_since_blocked_)
873       outer_->DecrementMaxTasksLockRequired();
874     else
875       --outer_->num_unresolved_may_block_;
876 
877     if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
878       if (incremented_max_best_effort_tasks_since_blocked_)
879         outer_->DecrementMaxBestEffortTasksLockRequired();
880       else
881         --outer_->num_unresolved_best_effort_may_block_;
882     }
883   }
884 
885   incremented_max_tasks_since_blocked_ = false;
886   incremented_max_best_effort_tasks_since_blocked_ = false;
887 }
888 
OnShutdownStartedLockRequired(ScopedCommandsExecutor * executor)889 void ThreadGroupImpl::WorkerThreadDelegateImpl::OnShutdownStartedLockRequired(
890     ScopedCommandsExecutor* executor) {
891   if (!read_any().is_running_task())
892     return;
893   // Workers running a CONTINUE_ON_SHUTDOWN tasks are replaced by incrementing
894   // max_tasks/max_best_effort_tasks. The effect is reverted in
895   // DidProcessTask().
896   if (*read_any().current_shutdown_behavior ==
897       TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
898     incremented_max_tasks_for_shutdown_ = true;
899     IncrementMaxTasksLockRequired();
900   }
901 }
902 
CanGetWorkLockRequired(ScopedCommandsExecutor * executor,WorkerThread * worker)903 bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanGetWorkLockRequired(
904     ScopedCommandsExecutor* executor,
905     WorkerThread* worker) {
906   const bool is_on_idle_workers_set = outer_->IsOnIdleSetLockRequired(worker);
907   DCHECK_EQ(is_on_idle_workers_set, outer_->idle_workers_set_.Contains(worker));
908 
909   if (is_on_idle_workers_set) {
910     if (CanCleanupLockRequired(worker))
911       CleanupLockRequired(executor, worker);
912     return false;
913   }
914 
915   // If too many workers are running, this worker should not get work, until
916   // tasks are no longer in excess (i.e. max tasks increases). This ensures that
917   // if this worker is in excess, it gets a chance to being cleaned up.
918   if (outer_->GetNumAwakeWorkersLockRequired() > outer_->max_tasks_) {
919     OnWorkerBecomesIdleLockRequired(executor, worker);
920     return false;
921   }
922 
923   return true;
924 }
925 
926 void ThreadGroupImpl::WorkerThreadDelegateImpl::
MaybeIncrementMaxTasksLockRequired()927     MaybeIncrementMaxTasksLockRequired() {
928   if (read_any().blocking_start_time.is_null() ||
929       subtle::TimeTicksNowIgnoringOverride() - read_any().blocking_start_time <
930           outer_->after_start().may_block_threshold) {
931     return;
932   }
933   IncrementMaxTasksLockRequired();
934 }
935 
936 void ThreadGroupImpl::WorkerThreadDelegateImpl::
IncrementMaxTasksLockRequired()937     IncrementMaxTasksLockRequired() {
938   if (!incremented_max_tasks_since_blocked_) {
939     outer_->IncrementMaxTasksLockRequired();
940     // Update state for an unresolved ScopedBlockingCall.
941     if (!read_any().blocking_start_time.is_null()) {
942       incremented_max_tasks_since_blocked_ = true;
943       --outer_->num_unresolved_may_block_;
944     }
945   }
946   if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT &&
947       !incremented_max_best_effort_tasks_since_blocked_) {
948     outer_->IncrementMaxBestEffortTasksLockRequired();
949     // Update state for an unresolved ScopedBlockingCall.
950     if (!read_any().blocking_start_time.is_null()) {
951       incremented_max_best_effort_tasks_since_blocked_ = true;
952       --outer_->num_unresolved_best_effort_may_block_;
953     }
954   }
955 }
956 
WaitForWorkersIdleLockRequiredForTesting(size_t n)957 void ThreadGroupImpl::WaitForWorkersIdleLockRequiredForTesting(size_t n) {
958   // Make sure workers do not cleanup while watching the idle count.
959   AutoReset<bool> ban_cleanups(&worker_cleanup_disallowed_for_testing_, true);
960 
961   while (idle_workers_set_.Size() < n)
962     idle_workers_set_cv_for_testing_->Wait();
963 }
964 
MaintainAtLeastOneIdleWorkerLockRequired(ScopedCommandsExecutor * executor)965 void ThreadGroupImpl::MaintainAtLeastOneIdleWorkerLockRequired(
966     ScopedCommandsExecutor* executor) {
967   if (workers_.size() == kMaxNumberOfWorkers)
968     return;
969   DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
970 
971   if (!idle_workers_set_.IsEmpty())
972     return;
973 
974   if (workers_.size() >= max_tasks_)
975     return;
976 
977   scoped_refptr<WorkerThread> new_worker =
978       CreateAndRegisterWorkerLockRequired(executor);
979   DCHECK(new_worker);
980   idle_workers_set_.Insert(new_worker.get());
981 }
982 
983 scoped_refptr<WorkerThread>
CreateAndRegisterWorkerLockRequired(ScopedCommandsExecutor * executor)984 ThreadGroupImpl::CreateAndRegisterWorkerLockRequired(
985     ScopedCommandsExecutor* executor) {
986   DCHECK(!join_for_testing_started_);
987   DCHECK_LT(workers_.size(), max_tasks_);
988   DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
989   DCHECK(idle_workers_set_.IsEmpty());
990 
991   // WorkerThread needs |lock_| as a predecessor for its thread lock
992   // because in WakeUpOneWorker, |lock_| is first acquired and then
993   // the thread lock is acquired when WakeUp is called on the worker.
994   scoped_refptr<WorkerThread> worker = MakeRefCounted<WorkerThread>(
995       thread_type_hint_,
996       std::make_unique<WorkerThreadDelegateImpl>(
997           tracked_ref_factory_.GetTrackedRef(),
998           /* is_excess=*/after_start().no_worker_reclaim
999               ? workers_.size() >= after_start().initial_max_tasks
1000               : true),
1001       task_tracker_, worker_sequence_num_++, &lock_);
1002 
1003   workers_.push_back(worker);
1004   executor->ScheduleStart(worker);
1005   DCHECK_LE(workers_.size(), max_tasks_);
1006 
1007   return worker;
1008 }
1009 
GetNumAwakeWorkersLockRequired() const1010 size_t ThreadGroupImpl::GetNumAwakeWorkersLockRequired() const {
1011   DCHECK_GE(workers_.size(), idle_workers_set_.Size());
1012   size_t num_awake_workers = workers_.size() - idle_workers_set_.Size();
1013   DCHECK_GE(num_awake_workers, num_running_tasks_);
1014   return num_awake_workers;
1015 }
1016 
GetDesiredNumAwakeWorkersLockRequired() const1017 size_t ThreadGroupImpl::GetDesiredNumAwakeWorkersLockRequired() const {
1018   // Number of BEST_EFFORT task sources that are running or queued and allowed
1019   // to run by the CanRunPolicy.
1020   const size_t num_running_or_queued_can_run_best_effort_task_sources =
1021       num_running_best_effort_tasks_ +
1022       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
1023 
1024   const size_t workers_for_best_effort_task_sources =
1025       std::max(std::min(num_running_or_queued_can_run_best_effort_task_sources,
1026                         max_best_effort_tasks_),
1027                num_running_best_effort_tasks_);
1028 
1029   // Number of USER_{VISIBLE|BLOCKING} task sources that are running or queued.
1030   const size_t num_running_or_queued_foreground_task_sources =
1031       (num_running_tasks_ - num_running_best_effort_tasks_) +
1032       GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
1033 
1034   const size_t workers_for_foreground_task_sources =
1035       num_running_or_queued_foreground_task_sources;
1036 
1037   return std::min({workers_for_best_effort_task_sources +
1038                        workers_for_foreground_task_sources,
1039                    max_tasks_, kMaxNumberOfWorkers});
1040 }
1041 
DidUpdateCanRunPolicy()1042 void ThreadGroupImpl::DidUpdateCanRunPolicy() {
1043   ScopedCommandsExecutor executor(this);
1044   CheckedAutoLock auto_lock(lock_);
1045   EnsureEnoughWorkersLockRequired(&executor);
1046 }
1047 
OnShutdownStarted()1048 void ThreadGroupImpl::OnShutdownStarted() {
1049   ScopedCommandsExecutor executor(this);
1050   CheckedAutoLock auto_lock(lock_);
1051 
1052   // Don't do anything if the thread group isn't started.
1053   if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_))
1054     return;
1055 
1056   // Start a MAY_BLOCK scope on each worker that is already running a task.
1057   for (scoped_refptr<WorkerThread>& worker : workers_) {
1058     // The delegates of workers inside a ThreadGroupImpl should be
1059     // WorkerThreadDelegateImpls.
1060     WorkerThreadDelegateImpl* delegate =
1061         static_cast<WorkerThreadDelegateImpl*>(worker->delegate());
1062     AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
1063     delegate->OnShutdownStartedLockRequired(&executor);
1064   }
1065   EnsureEnoughWorkersLockRequired(&executor);
1066 
1067   shutdown_started_ = true;
1068 }
1069 
EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor * base_executor)1070 void ThreadGroupImpl::EnsureEnoughWorkersLockRequired(
1071     BaseScopedCommandsExecutor* base_executor) {
1072   // Don't do anything if the thread group isn't started.
1073   if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_))
1074     return;
1075 
1076   ScopedCommandsExecutor* executor =
1077       static_cast<ScopedCommandsExecutor*>(base_executor);
1078 
1079   const size_t desired_num_awake_workers =
1080       GetDesiredNumAwakeWorkersLockRequired();
1081   const size_t num_awake_workers = GetNumAwakeWorkersLockRequired();
1082 
1083   size_t num_workers_to_wake_up =
1084       ClampSub(desired_num_awake_workers, num_awake_workers);
1085   num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(2U));
1086 
1087   // Wake up the appropriate number of workers.
1088   for (size_t i = 0; i < num_workers_to_wake_up; ++i) {
1089     MaintainAtLeastOneIdleWorkerLockRequired(executor);
1090     WorkerThread* worker_to_wakeup = idle_workers_set_.Take();
1091     DCHECK(worker_to_wakeup);
1092     executor->ScheduleWakeUp(worker_to_wakeup);
1093   }
1094 
1095   // In the case where the loop above didn't wake up any worker and we don't
1096   // have excess workers, the idle worker should be maintained. This happens
1097   // when called from the last worker awake, or a recent increase in |max_tasks|
1098   // now makes it possible to keep an idle worker.
1099   if (desired_num_awake_workers == num_awake_workers)
1100     MaintainAtLeastOneIdleWorkerLockRequired(executor);
1101 
1102   // This function is called every time a task source is (re-)enqueued,
1103   // hence the minimum priority needs to be updated.
1104   UpdateMinAllowedPriorityLockRequired();
1105 
1106   // Ensure that the number of workers is periodically adjusted if needed.
1107   MaybeScheduleAdjustMaxTasksLockRequired(executor);
1108 }
1109 
AdjustMaxTasks()1110 void ThreadGroupImpl::AdjustMaxTasks() {
1111   DCHECK(
1112       after_start().service_thread_task_runner->RunsTasksInCurrentSequence());
1113 
1114   ScopedCommandsExecutor executor(this);
1115   CheckedAutoLock auto_lock(lock_);
1116   DCHECK(adjust_max_tasks_posted_);
1117   adjust_max_tasks_posted_ = false;
1118 
1119   // Increment max tasks for each worker that has been within a MAY_BLOCK
1120   // ScopedBlockingCall for more than may_block_threshold.
1121   for (scoped_refptr<WorkerThread> worker : workers_) {
1122     // The delegates of workers inside a ThreadGroupImpl should be
1123     // WorkerThreadDelegateImpls.
1124     WorkerThreadDelegateImpl* delegate =
1125         static_cast<WorkerThreadDelegateImpl*>(worker->delegate());
1126     AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
1127     delegate->MaybeIncrementMaxTasksLockRequired();
1128   }
1129 
1130   // Wake up workers according to the updated |max_tasks_|. This will also
1131   // reschedule AdjustMaxTasks() if necessary.
1132   EnsureEnoughWorkersLockRequired(&executor);
1133 }
1134 
ScheduleAdjustMaxTasks()1135 void ThreadGroupImpl::ScheduleAdjustMaxTasks() {
1136   // |adjust_max_tasks_posted_| can't change before the task posted below runs.
1137   // Skip check on NaCl to avoid unsafe reference acquisition warning.
1138 #if !BUILDFLAG(IS_NACL)
1139   DCHECK(TS_UNCHECKED_READ(adjust_max_tasks_posted_));
1140 #endif
1141 
1142   after_start().service_thread_task_runner->PostDelayedTask(
1143       FROM_HERE, BindOnce(&ThreadGroupImpl::AdjustMaxTasks, Unretained(this)),
1144       after_start().blocked_workers_poll_period);
1145 }
1146 
MaybeScheduleAdjustMaxTasksLockRequired(ScopedCommandsExecutor * executor)1147 void ThreadGroupImpl::MaybeScheduleAdjustMaxTasksLockRequired(
1148     ScopedCommandsExecutor* executor) {
1149   if (!adjust_max_tasks_posted_ &&
1150       ShouldPeriodicallyAdjustMaxTasksLockRequired()) {
1151     executor->ScheduleAdjustMaxTasks();
1152     adjust_max_tasks_posted_ = true;
1153   }
1154 }
1155 
ShouldPeriodicallyAdjustMaxTasksLockRequired()1156 bool ThreadGroupImpl::ShouldPeriodicallyAdjustMaxTasksLockRequired() {
1157   // AdjustMaxTasks() should be scheduled to periodically adjust |max_tasks_|
1158   // and |max_best_effort_tasks_| when (1) the concurrency limits are not large
1159   // enough to accommodate all queued and running task sources and an idle
1160   // worker and (2) there are unresolved MAY_BLOCK ScopedBlockingCalls.
1161   // - When (1) is false: No worker would be created or woken up if the
1162   //   concurrency limits were increased, so there is no hurry to increase them.
1163   // - When (2) is false: The concurrency limits could not be increased by
1164   //   AdjustMaxTasks().
1165 
1166   const size_t num_running_or_queued_best_effort_task_sources =
1167       num_running_best_effort_tasks_ +
1168       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
1169   if (num_running_or_queued_best_effort_task_sources > max_best_effort_tasks_ &&
1170       num_unresolved_best_effort_may_block_ > 0) {
1171     return true;
1172   }
1173 
1174   const size_t num_running_or_queued_task_sources =
1175       num_running_tasks_ +
1176       GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() +
1177       GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
1178   constexpr size_t kIdleWorker = 1;
1179   return num_running_or_queued_task_sources + kIdleWorker > max_tasks_ &&
1180          num_unresolved_may_block_ > 0;
1181 }
1182 
UpdateMinAllowedPriorityLockRequired()1183 void ThreadGroupImpl::UpdateMinAllowedPriorityLockRequired() {
1184   if (priority_queue_.IsEmpty() || num_running_tasks_ < max_tasks_) {
1185     max_allowed_sort_key_.store(kMaxYieldSortKey, std::memory_order_relaxed);
1186   } else {
1187     max_allowed_sort_key_.store({priority_queue_.PeekSortKey().priority(),
1188                                  priority_queue_.PeekSortKey().worker_count()},
1189                                 std::memory_order_relaxed);
1190   }
1191 }
1192 
IsOnIdleSetLockRequired(WorkerThread * worker) const1193 bool ThreadGroupImpl::IsOnIdleSetLockRequired(WorkerThread* worker) const {
1194   // To avoid searching through the idle set : use GetLastUsedTime() not being
1195   // null (or being directly on top of the idle set) as a proxy for being on
1196   // the idle set.
1197   return idle_workers_set_.Peek() == worker ||
1198          !worker->GetLastUsedTime().is_null();
1199 }
1200 
DecrementTasksRunningLockRequired(TaskPriority priority)1201 void ThreadGroupImpl::DecrementTasksRunningLockRequired(TaskPriority priority) {
1202   DCHECK_GT(num_running_tasks_, 0U);
1203   --num_running_tasks_;
1204   if (priority == TaskPriority::BEST_EFFORT) {
1205     DCHECK_GT(num_running_best_effort_tasks_, 0U);
1206     --num_running_best_effort_tasks_;
1207   }
1208   UpdateMinAllowedPriorityLockRequired();
1209 }
1210 
IncrementTasksRunningLockRequired(TaskPriority priority)1211 void ThreadGroupImpl::IncrementTasksRunningLockRequired(TaskPriority priority) {
1212   ++num_running_tasks_;
1213   DCHECK_LE(num_running_tasks_, max_tasks_);
1214   DCHECK_LE(num_running_tasks_, kMaxNumberOfWorkers);
1215   if (priority == TaskPriority::BEST_EFFORT) {
1216     ++num_running_best_effort_tasks_;
1217     DCHECK_LE(num_running_best_effort_tasks_, num_running_tasks_);
1218     DCHECK_LE(num_running_best_effort_tasks_, max_best_effort_tasks_);
1219   }
1220   UpdateMinAllowedPriorityLockRequired();
1221 }
1222 
DecrementMaxTasksLockRequired()1223 void ThreadGroupImpl::DecrementMaxTasksLockRequired() {
1224   DCHECK_GT(num_running_tasks_, 0U);
1225   DCHECK_GT(max_tasks_, 0U);
1226   --max_tasks_;
1227   UpdateMinAllowedPriorityLockRequired();
1228 }
1229 
IncrementMaxTasksLockRequired()1230 void ThreadGroupImpl::IncrementMaxTasksLockRequired() {
1231   DCHECK_GT(num_running_tasks_, 0U);
1232   ++max_tasks_;
1233   UpdateMinAllowedPriorityLockRequired();
1234 }
1235 
DecrementMaxBestEffortTasksLockRequired()1236 void ThreadGroupImpl::DecrementMaxBestEffortTasksLockRequired() {
1237   DCHECK_GT(num_running_tasks_, 0U);
1238   DCHECK_GT(max_best_effort_tasks_, 0U);
1239   --max_best_effort_tasks_;
1240   UpdateMinAllowedPriorityLockRequired();
1241 }
1242 
IncrementMaxBestEffortTasksLockRequired()1243 void ThreadGroupImpl::IncrementMaxBestEffortTasksLockRequired() {
1244   DCHECK_GT(num_running_tasks_, 0U);
1245   ++max_best_effort_tasks_;
1246   UpdateMinAllowedPriorityLockRequired();
1247 }
1248 
1249 ThreadGroupImpl::InitializedInStart::InitializedInStart() = default;
1250 ThreadGroupImpl::InitializedInStart::~InitializedInStart() = default;
1251 
1252 }  // namespace internal
1253 }  // namespace base
1254