• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group_impl.h"
6 
7 #include <optional>
8 #include <string_view>
9 
10 #include "base/auto_reset.h"
11 #include "base/metrics/histogram_macros.h"
12 #include "base/sequence_token.h"
13 #include "base/strings/stringprintf.h"
14 #include "base/task/common/checked_lock.h"
15 #include "base/task/thread_pool/worker_thread.h"
16 #include "base/threading/scoped_blocking_call.h"
17 #include "base/threading/scoped_blocking_call_internal.h"
18 #include "base/threading/thread_checker.h"
19 #include "base/time/time_override.h"
20 #include "base/trace_event/base_tracing.h"
21 #include "third_party/abseil-cpp/absl/container/inlined_vector.h"
22 
23 namespace base {
24 namespace internal {
25 
26 namespace {
27 
28 constexpr size_t kMaxNumberOfWorkers = 256;
29 
30 }  // namespace
31 
32 // Upon destruction, executes actions that control the number of active workers.
33 // Useful to satisfy locking requirements of these actions.
34 class ThreadGroupImpl::ScopedCommandsExecutor
35     : public ThreadGroup::BaseScopedCommandsExecutor {
36  public:
ScopedCommandsExecutor(ThreadGroupImpl * outer)37   explicit ScopedCommandsExecutor(ThreadGroupImpl* outer)
38       : BaseScopedCommandsExecutor(outer) {}
39 
40   ScopedCommandsExecutor(const ScopedCommandsExecutor&) = delete;
41   ScopedCommandsExecutor& operator=(const ScopedCommandsExecutor&) = delete;
~ScopedCommandsExecutor()42   ~ScopedCommandsExecutor() override {
43     CheckedLock::AssertNoLockHeldOnCurrentThread();
44 
45     // Wake up workers.
46     for (auto worker : workers_to_wake_up_) {
47       worker->WakeUp();
48     }
49   }
50 
ScheduleWakeUp(scoped_refptr<WorkerThread> worker)51   void ScheduleWakeUp(scoped_refptr<WorkerThread> worker) {
52     workers_to_wake_up_.emplace_back(std::move(worker));
53   }
54 
55  private:
56   absl::InlinedVector<scoped_refptr<WorkerThread>, 2> workers_to_wake_up_;
57 };
58 
59 class ThreadGroupImpl::WorkerDelegate : public WorkerThread::Delegate,
60                                         public BlockingObserver {
61  public:
62   // |outer| owns the worker for which this delegate is constructed. If
63   // |is_excess| is true, this worker will be eligible for reclaim.
64   explicit WorkerDelegate(TrackedRef<ThreadGroupImpl> outer, bool is_excess);
65   WorkerDelegate(const WorkerDelegate&) = delete;
66   WorkerDelegate& operator=(const WorkerDelegate&) = delete;
67 
68   // WorkerThread::Delegate:
69   void OnMainEntry(WorkerThread* worker) override;
70   void OnMainExit(WorkerThread* worker) override;
71   RegisteredTaskSource GetWork(WorkerThread* worker) override;
72   RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
73                                          WorkerThread* worker) override;
74   void RecordUnnecessaryWakeup() override;
75   TimeDelta GetSleepTimeout() override;
76 
77   // BlockingObserver:
78   void BlockingStarted(BlockingType blocking_type) override;
79   void BlockingTypeUpgraded() override;
80   void BlockingEnded() override;
81 
82   // WorkerThread::Delegate:
83 
84   // Notifies the worker of shutdown, possibly marking the running task as
85   // MAY_BLOCK.
86   void OnShutdownStartedLockRequired(BaseScopedCommandsExecutor* executor)
87       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
88 
89   // Increments max [best effort] tasks iff this worker has been within a
90   // ScopedBlockingCall for more than |may_block_threshold|.
91   void MaybeIncrementMaxTasksLockRequired()
92       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
93 
94   // Increments max [best effort] tasks.
95   void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
96 
current_task_priority_lock_required() const97   TaskPriority current_task_priority_lock_required() const
98       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
99     return *read_any().current_task_priority;
100   }
101 
102   // Exposed for AnnotateAcquiredLockAlias.
lock() const103   const CheckedLock& lock() const LOCK_RETURNED(outer_->lock_) {
104     return outer_->lock_;
105   }
106 
107  private:
108   // Returns true iff the worker can get work. Cleans up the worker or puts it
109   // on the idle set if it can't get work.
110   bool CanGetWorkLockRequired(BaseScopedCommandsExecutor* executor,
111                               WorkerThread* worker)
112       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
113 
114   // Calls cleanup on |worker| and removes it from the thread group. Called from
115   // GetWork() when no work is available and CanCleanupLockRequired() returns
116   // true.
117   void CleanupLockRequired(BaseScopedCommandsExecutor* executor,
118                            WorkerThread* worker)
119       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
120 
121   // Called in GetWork() when a worker becomes idle.
122   void OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor* executor,
123                                        WorkerThread* worker)
124       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
125 
126   RegisteredTaskSource GetWorkLockRequired(BaseScopedCommandsExecutor* executor,
127                                            WorkerThread* worker)
128       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
129 
130   // Returns true if |worker| is allowed to cleanup and remove itself from the
131   // thread group. Called from GetWork() when no work is available.
132   bool CanCleanupLockRequired(const WorkerThread* worker)
133       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
134 
135   // Only used in DCHECKs.
136   template <typename Worker>
ContainsWorker(const std::vector<scoped_refptr<Worker>> & workers,const WorkerThread * worker)137   bool ContainsWorker(const std::vector<scoped_refptr<Worker>>& workers,
138                       const WorkerThread* worker) {
139     auto it = ranges::find_if(
140         workers,
141         [worker](const scoped_refptr<Worker>& i) { return i.get() == worker; });
142     return it != workers.end();
143   }
144 
145   // Accessed only from the worker thread.
146   struct WorkerOnly {
147     WorkerOnly();
148     ~WorkerOnly();
149     // Associated WorkerThread, if any, initialized in OnMainEntry().
150     raw_ptr<WorkerThread> worker_thread_;
151 
152 #if BUILDFLAG(IS_WIN)
153     std::unique_ptr<win::ScopedWindowsThreadEnvironment> win_thread_environment;
154 #endif  // BUILDFLAG(IS_WIN)
155   } worker_only_;
156 
157   // Writes from the worker thread protected by |outer_->lock_|. Reads from any
158   // thread, protected by |outer_->lock_| when not on the worker thread.
159   struct WriteWorkerReadAny {
160     // The priority of the task the worker is currently running if any.
161     std::optional<TaskPriority> current_task_priority;
162     // The shutdown behavior of the task the worker is currently running if any.
163     std::optional<TaskShutdownBehavior> current_shutdown_behavior;
164 
165     // Time when MayBlockScopeEntered() was last called. Reset when
166     // BlockingScopeExited() is called.
167     TimeTicks blocking_start_time;
168 
169     // Whether the worker is currently running a task (i.e. GetWork() has
170     // returned a non-empty task source and DidProcessTask() hasn't been called
171     // yet).
is_running_taskbase::internal::ThreadGroupImpl::WorkerDelegate::WriteWorkerReadAny172     bool is_running_task() const { return !!current_shutdown_behavior; }
173   } write_worker_read_any_;
174 
worker_only()175   WorkerOnly& worker_only() {
176     DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
177     return worker_only_;
178   }
179 
write_worker()180   WriteWorkerReadAny& write_worker() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
181     DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
182     return write_worker_read_any_;
183   }
184 
read_any() const185   const WriteWorkerReadAny& read_any() const
186       EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
187     return write_worker_read_any_;
188   }
189 
read_worker() const190   const WriteWorkerReadAny& read_worker() const {
191     DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
192     return write_worker_read_any_;
193   }
194 
195   const TrackedRef<ThreadGroupImpl> outer_;
196 
197   // Whether the worker is in excess. This must be decided at worker creation
198   // time to prevent unnecessarily discarding TLS state, as well as any behavior
199   // the OS has learned about a given thread.
200   const bool is_excess_;
201 
202   // Whether |outer_->max_tasks_|/|outer_->max_best_effort_tasks_| were
203   // incremented due to a ScopedBlockingCall on the thread.
204   bool incremented_max_tasks_since_blocked_ GUARDED_BY(outer_->lock_) = false;
205   bool incremented_max_best_effort_tasks_since_blocked_
206       GUARDED_BY(outer_->lock_) = false;
207   // Whether |outer_->max_tasks_| and |outer_->max_best_effort_tasks_| was
208   // incremented due to running CONTINUE_ON_SHUTDOWN on the thread during
209   // shutdown.
210   bool incremented_max_tasks_for_shutdown_ GUARDED_BY(outer_->lock_) = false;
211 
212   // Verifies that specific calls are always made from the worker thread.
213   THREAD_CHECKER(worker_thread_checker_);
214 };
215 
ThreadGroupImpl(std::string_view histogram_label,std::string_view thread_group_label,ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)216 ThreadGroupImpl::ThreadGroupImpl(std::string_view histogram_label,
217                                  std::string_view thread_group_label,
218                                  ThreadType thread_type_hint,
219                                  TrackedRef<TaskTracker> task_tracker,
220                                  TrackedRef<Delegate> delegate)
221     : ThreadGroup(histogram_label,
222                   thread_group_label,
223                   thread_type_hint,
224                   std::move(task_tracker),
225                   std::move(delegate)),
226       tracked_ref_factory_(this) {
227   DCHECK(!thread_group_label_.empty());
228 }
229 
Start(size_t max_tasks,size_t max_best_effort_tasks,TimeDelta suggested_reclaim_time,scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,WorkerThreadObserver * worker_thread_observer,WorkerEnvironment worker_environment,bool synchronous_thread_start_for_testing,std::optional<TimeDelta> may_block_threshold)230 void ThreadGroupImpl::Start(
231     size_t max_tasks,
232     size_t max_best_effort_tasks,
233     TimeDelta suggested_reclaim_time,
234     scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
235     WorkerThreadObserver* worker_thread_observer,
236     WorkerEnvironment worker_environment,
237     bool synchronous_thread_start_for_testing,
238     std::optional<TimeDelta> may_block_threshold) {
239   ThreadGroup::StartImpl(
240       max_tasks, max_best_effort_tasks, suggested_reclaim_time,
241       service_thread_task_runner, worker_thread_observer, worker_environment,
242       synchronous_thread_start_for_testing, may_block_threshold);
243 
244   ScopedCommandsExecutor executor(this);
245   CheckedAutoLock auto_lock(lock_);
246   DCHECK(workers_.empty());
247   EnsureEnoughWorkersLockRequired(&executor);
248 }
249 
~ThreadGroupImpl()250 ThreadGroupImpl::~ThreadGroupImpl() {
251   // ThreadGroup should only ever be deleted:
252   //  1) In tests, after JoinForTesting().
253   //  2) In production, iff initialization failed.
254   // In both cases |workers_| should be empty.
255   DCHECK(workers_.empty());
256 }
257 
UpdateSortKey(TaskSource::Transaction transaction)258 void ThreadGroupImpl::UpdateSortKey(TaskSource::Transaction transaction) {
259   ScopedCommandsExecutor executor(this);
260   UpdateSortKeyImpl(&executor, std::move(transaction));
261 }
262 
PushTaskSourceAndWakeUpWorkers(RegisteredTaskSourceAndTransaction transaction_with_task_source)263 void ThreadGroupImpl::PushTaskSourceAndWakeUpWorkers(
264     RegisteredTaskSourceAndTransaction transaction_with_task_source) {
265   ScopedCommandsExecutor executor(this);
266   PushTaskSourceAndWakeUpWorkersImpl(&executor,
267                                      std::move(transaction_with_task_source));
268 }
269 
WorkerDelegate(TrackedRef<ThreadGroupImpl> outer,bool is_excess)270 ThreadGroupImpl::WorkerDelegate::WorkerDelegate(
271     TrackedRef<ThreadGroupImpl> outer,
272     bool is_excess)
273     : outer_(outer), is_excess_(is_excess) {
274   // Bound in OnMainEntry().
275   DETACH_FROM_THREAD(worker_thread_checker_);
276 }
277 
278 ThreadGroupImpl::WorkerDelegate::WorkerOnly::WorkerOnly() = default;
279 ThreadGroupImpl::WorkerDelegate::WorkerOnly::~WorkerOnly() = default;
280 
GetSleepTimeout()281 TimeDelta ThreadGroupImpl::WorkerDelegate::GetSleepTimeout() {
282   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
283   if (!is_excess_) {
284     return TimeDelta::Max();
285   }
286   // Sleep for an extra 10% to avoid the following pathological case:
287   //   0) A task is running on a timer which matches
288   //      |after_start().suggested_reclaim_time|.
289   //   1) The timer fires and this worker is created by
290   //      MaintainAtLeastOneIdleWorkerLockRequired() because the last idle
291   //      worker was assigned the task.
292   //   2) This worker begins sleeping |after_start().suggested_reclaim_time|
293   //      (at the front of the idle set).
294   //   3) The task assigned to the other worker completes and the worker goes
295   //      back in the idle set (this worker may now second on the idle set;
296   //      its GetLastUsedTime() is set to Now()).
297   //   4) The sleep in (2) expires. Since (3) was fast this worker is likely
298   //      to have been second on the idle set long enough for
299   //      CanCleanupLockRequired() to be satisfied in which case this worker
300   //      is cleaned up.
301   //   5) The timer fires at roughly the same time and we're back to (1) if
302   //      (4) resulted in a clean up; causing thread churn.
303   //
304   //   Sleeping 10% longer in (2) makes it much less likely that (4) occurs
305   //   before (5). In that case (5) will cause (3) and refresh this worker's
306   //   GetLastUsedTime(), making CanCleanupLockRequired() return false in (4)
307   //   and avoiding churn.
308   //
309   //   Of course the same problem arises if in (0) the timer matches
310   //   |after_start().suggested_reclaim_time * 1.1| but it's expected that any
311   //   timer slower than |after_start().suggested_reclaim_time| will cause
312   //   such churn during long idle periods. If this is a problem in practice,
313   //   the standby thread configuration and algorithm should be revisited.
314   return outer_->after_start().suggested_reclaim_time * 1.1;
315 }
316 
OnMainEntry(WorkerThread * worker)317 void ThreadGroupImpl::WorkerDelegate::OnMainEntry(WorkerThread* worker) {
318   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
319 
320   {
321 #if DCHECK_IS_ON()
322     CheckedAutoLock auto_lock(outer_->lock_);
323     DCHECK(
324         ContainsWorker(outer_->workers_, static_cast<WorkerThread*>(worker)));
325 #endif
326   }
327 
328 #if BUILDFLAG(IS_WIN)
329   worker_only().win_thread_environment = GetScopedWindowsThreadEnvironment(
330       outer_->after_start().worker_environment);
331 #endif  // BUILDFLAG(IS_WIN)
332 
333   PlatformThread::SetName(
334       StringPrintf("ThreadPool%sWorker", outer_->thread_group_label_.c_str()));
335 
336   outer_->BindToCurrentThread();
337   worker_only().worker_thread_ = static_cast<WorkerThread*>(worker);
338   SetBlockingObserverForCurrentThread(this);
339 
340   if (outer_->worker_started_for_testing_) {
341     // When |worker_started_for_testing_| is set, the thread that starts workers
342     // should wait for a worker to have started before starting the next one,
343     // and there should only be one thread that wakes up workers at a time.
344     DCHECK(!outer_->worker_started_for_testing_->IsSignaled());
345     outer_->worker_started_for_testing_->Signal();
346   }
347 }
348 
OnMainExit(WorkerThread * worker_base)349 void ThreadGroupImpl::WorkerDelegate::OnMainExit(WorkerThread* worker_base) {
350   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
351 
352 #if DCHECK_IS_ON()
353   WorkerThread* worker = static_cast<WorkerThread*>(worker_base);
354   {
355     bool shutdown_complete = outer_->task_tracker_->IsShutdownComplete();
356     CheckedAutoLock auto_lock(outer_->lock_);
357 
358     // |worker| should already have been removed from the idle workers set and
359     // |workers_| by the time the thread is about to exit. (except in the
360     // cases where the thread group is no longer going to be used - in which
361     // case, it's fine for there to be invalid workers in the thread group).
362     if (!shutdown_complete && !outer_->join_for_testing_started_) {
363       DCHECK(!outer_->idle_workers_set_.Contains(worker));
364       DCHECK(!ContainsWorker(outer_->workers_, worker));
365     }
366   }
367 #endif
368 
369 #if BUILDFLAG(IS_WIN)
370   worker_only().win_thread_environment.reset();
371 #endif  // BUILDFLAG(IS_WIN)
372 
373   // Count cleaned up workers for tests. It's important to do this here
374   // instead of at the end of CleanupLockRequired() because some side-effects
375   // of cleaning up happen outside the lock (e.g. recording histograms) and
376   // resuming from tests must happen-after that point or checks on the main
377   // thread will be flaky (crbug.com/1047733).
378   CheckedAutoLock auto_lock(outer_->lock_);
379   ++outer_->num_workers_cleaned_up_for_testing_;
380 #if DCHECK_IS_ON()
381   outer_->some_workers_cleaned_up_for_testing_ = true;
382 #endif
383   if (outer_->num_workers_cleaned_up_for_testing_cv_) {
384     outer_->num_workers_cleaned_up_for_testing_cv_->Signal();
385   }
386 }
387 
CanGetWorkLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)388 bool ThreadGroupImpl::WorkerDelegate::CanGetWorkLockRequired(
389     BaseScopedCommandsExecutor* executor,
390     WorkerThread* worker_base) {
391   WorkerThread* worker = static_cast<WorkerThread*>(worker_base);
392 
393   const bool is_on_idle_workers_set = outer_->IsOnIdleSetLockRequired(worker);
394   DCHECK_EQ(is_on_idle_workers_set, outer_->idle_workers_set_.Contains(worker));
395 
396   // This occurs when the when WorkerThread::Delegate::WaitForWork() times out
397   // (i.e. when the worker's wakes up after GetSleepTimeout()).
398   if (is_on_idle_workers_set) {
399     if (CanCleanupLockRequired(worker)) {
400       CleanupLockRequired(executor, worker);
401     }
402     return false;
403   }
404 
405   // If too many workers are running, this worker should not get work, until
406   // tasks are no longer in excess (i.e. max tasks increases). This ensures that
407   // if this worker is in excess, it gets a chance to being cleaned up.
408   if (outer_->GetNumAwakeWorkersLockRequired() > outer_->max_tasks_) {
409     OnWorkerBecomesIdleLockRequired(executor, worker);
410     return false;
411   }
412 
413   return true;
414 }
415 
GetWork(WorkerThread * worker)416 RegisteredTaskSource ThreadGroupImpl::WorkerDelegate::GetWork(
417     WorkerThread* worker) {
418   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
419   DCHECK(!read_worker().current_task_priority);
420   DCHECK(!read_worker().current_shutdown_behavior);
421 
422   ScopedCommandsExecutor executor(outer_.get());
423   CheckedAutoLock auto_lock(outer_->lock_);
424   return GetWorkLockRequired(&executor, worker);
425 }
426 
GetWorkLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker)427 RegisteredTaskSource ThreadGroupImpl::WorkerDelegate::GetWorkLockRequired(
428     BaseScopedCommandsExecutor* executor,
429     WorkerThread* worker) {
430   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
431   DCHECK(ContainsWorker(outer_->workers_, worker));
432 
433   if (!CanGetWorkLockRequired(executor, worker)) {
434     return nullptr;
435   }
436 
437   RegisteredTaskSource task_source;
438   TaskPriority priority;
439   while (!task_source && !outer_->priority_queue_.IsEmpty()) {
440     // Enforce the CanRunPolicy and that no more than |max_best_effort_tasks_|
441     // BEST_EFFORT tasks run concurrently.
442     priority = outer_->priority_queue_.PeekSortKey().priority();
443     if (!outer_->task_tracker_->CanRunPriority(priority) ||
444         (priority == TaskPriority::BEST_EFFORT &&
445          outer_->num_running_best_effort_tasks_ >=
446              outer_->max_best_effort_tasks_)) {
447       break;
448     }
449 
450     task_source = outer_->TakeRegisteredTaskSource(executor);
451   }
452   if (!task_source) {
453     OnWorkerBecomesIdleLockRequired(executor, worker);
454     return nullptr;
455   }
456 
457   // Running task bookkeeping.
458   outer_->IncrementTasksRunningLockRequired(priority);
459 
460   write_worker().current_task_priority = priority;
461   write_worker().current_shutdown_behavior = task_source->shutdown_behavior();
462 
463   // Subtle: This must be after the call to WillRunTask() inside
464   // TakeRegisteredTaskSource(), so that any state used by WillRunTask() to
465   // determine that the task source must remain in the TaskQueue is also used
466   // to determine the desired number of workers. Concretely, this wouldn't
467   // work:
468   //
469   //   Thread 1: GetWork() calls EnsureEnoughWorkers(). No worker woken up
470   //             because the queue contains a job with max concurrency = 1 and
471   //             the current worker is awake.
472   //   Thread 2: Increases the job's max concurrency.
473   //             ShouldQueueUponCapacityIncrease() returns false because the
474   //             job is already queued.
475   //   Thread 1: Calls WillRunTask() on the job. It returns
476   //             kAllowedNotSaturated because max concurrency is not reached.
477   //             But no extra worker is woken up to run the job!
478   outer_->EnsureEnoughWorkersLockRequired(executor);
479 
480   return task_source;
481 }
482 
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)483 RegisteredTaskSource ThreadGroupImpl::WorkerDelegate::SwapProcessedTask(
484     RegisteredTaskSource task_source,
485     WorkerThread* worker) {
486   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
487   DCHECK(read_worker().current_task_priority);
488   DCHECK(read_worker().current_shutdown_behavior);
489 
490   // A transaction to the TaskSource to reenqueue, if any. Instantiated here as
491   // |TaskSource::lock_| is a UniversalPredecessor and must always be acquired
492   // prior to acquiring a second lock
493   std::optional<RegisteredTaskSourceAndTransaction>
494       transaction_with_task_source;
495   if (task_source) {
496     transaction_with_task_source.emplace(
497         RegisteredTaskSourceAndTransaction::FromTaskSource(
498             std::move(task_source)));
499   }
500 
501   // Calling WakeUp() guarantees that this WorkerThread will run Tasks from
502   // TaskSources returned by the GetWork() method of |delegate_| until it
503   // returns nullptr. Resetting |wake_up_event_| here doesn't break this
504   // invariant and avoids a useless loop iteration before going to sleep if
505   // WakeUp() is called while this WorkerThread is awake.
506   wake_up_event_.Reset();
507 
508   ScopedCommandsExecutor workers_executor(outer_.get());
509   ScopedReenqueueExecutor reenqueue_executor;
510   CheckedAutoLock auto_lock(outer_->lock_);
511 
512   // During shutdown, max_tasks may have been incremented in
513   // OnShutdownStartedLockRequired().
514   if (incremented_max_tasks_for_shutdown_) {
515     DCHECK(outer_->shutdown_started_);
516     outer_->DecrementMaxTasksLockRequired();
517     if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
518       outer_->DecrementMaxBestEffortTasksLockRequired();
519     }
520     incremented_max_tasks_since_blocked_ = false;
521     incremented_max_best_effort_tasks_since_blocked_ = false;
522     incremented_max_tasks_for_shutdown_ = false;
523   }
524 
525   DCHECK(read_worker().blocking_start_time.is_null());
526   DCHECK(!incremented_max_tasks_since_blocked_);
527   DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
528 
529   // Running task bookkeeping.
530   outer_->DecrementTasksRunningLockRequired(
531       *read_worker().current_task_priority);
532   write_worker().current_shutdown_behavior = std::nullopt;
533   write_worker().current_task_priority = std::nullopt;
534 
535   if (transaction_with_task_source) {
536     outer_->ReEnqueueTaskSourceLockRequired(
537         &workers_executor, &reenqueue_executor,
538         std::move(transaction_with_task_source.value()));
539   }
540 
541   return GetWorkLockRequired(&workers_executor,
542                              static_cast<WorkerThread*>(worker));
543 }
544 
CanCleanupLockRequired(const WorkerThread * worker)545 bool ThreadGroupImpl::WorkerDelegate::CanCleanupLockRequired(
546     const WorkerThread* worker) {
547   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
548   if (!is_excess_) {
549     return false;
550   }
551 
552   const TimeTicks last_used_time = worker->GetLastUsedTime();
553   if (last_used_time.is_null() ||
554       subtle::TimeTicksNowIgnoringOverride() - last_used_time <
555           outer_->after_start().suggested_reclaim_time) {
556     return false;
557   }
558   if (!outer_->worker_cleanup_disallowed_for_testing_) [[likely]] {
559     return true;
560   }
561   return false;
562 }
563 
CleanupLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)564 void ThreadGroupImpl::WorkerDelegate::CleanupLockRequired(
565     BaseScopedCommandsExecutor* executor,
566     WorkerThread* worker_base) {
567   WorkerThread* worker = static_cast<WorkerThread*>(worker_base);
568   DCHECK(!outer_->join_for_testing_started_);
569   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
570 
571   worker->Cleanup();
572 
573   if (outer_->IsOnIdleSetLockRequired(worker)) {
574     outer_->idle_workers_set_.Remove(worker);
575   }
576 
577   // Remove the worker from |workers_|.
578   auto worker_iter = ranges::find(outer_->workers_, worker);
579   CHECK(worker_iter != outer_->workers_.end(), base::NotFatalUntil::M125);
580   outer_->workers_.erase(worker_iter);
581 }
582 
OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)583 void ThreadGroupImpl::WorkerDelegate::OnWorkerBecomesIdleLockRequired(
584     BaseScopedCommandsExecutor* executor,
585     WorkerThread* worker_base) {
586   WorkerThread* worker = static_cast<WorkerThread*>(worker_base);
587 
588   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
589   DCHECK(!outer_->idle_workers_set_.Contains(worker));
590 
591   // Add the worker to the idle set.
592   outer_->idle_workers_set_.Insert(worker);
593   DCHECK_LE(outer_->idle_workers_set_.Size(), outer_->workers_.size());
594   outer_->idle_workers_set_cv_for_testing_.Broadcast();
595 }
596 
RecordUnnecessaryWakeup()597 void ThreadGroupImpl::WorkerDelegate::RecordUnnecessaryWakeup() {
598   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
599 
600   base::BooleanHistogram::FactoryGet(
601       std::string("ThreadPool.UnnecessaryWakeup.") + outer_->histogram_label_,
602       base::Histogram::kUmaTargetedHistogramFlag)
603       ->Add(true);
604 
605   TRACE_EVENT_INSTANT("wakeup.flow", "ThreadPool.UnnecessaryWakeup");
606 }
607 
BlockingStarted(BlockingType blocking_type)608 void ThreadGroupImpl::WorkerDelegate::BlockingStarted(
609     BlockingType blocking_type) {
610   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
611   DCHECK(worker_only().worker_thread_);
612   // Skip if this blocking scope happened outside of a RunTask.
613   if (!read_worker().current_task_priority) {
614     return;
615   }
616 
617   worker_only().worker_thread_->MaybeUpdateThreadType();
618 
619   // WillBlock is always used when time overrides is active. crbug.com/1038867
620   if (base::subtle::ScopedTimeClockOverrides::overrides_active()) {
621     blocking_type = BlockingType::WILL_BLOCK;
622   }
623 
624   ScopedCommandsExecutor executor(outer_.get());
625   CheckedAutoLock auto_lock(outer_->lock_);
626 
627   DCHECK(!incremented_max_tasks_since_blocked_);
628   DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
629   DCHECK(read_worker().blocking_start_time.is_null());
630   write_worker().blocking_start_time = subtle::TimeTicksNowIgnoringOverride();
631 
632   if (incremented_max_tasks_for_shutdown_) {
633     return;
634   }
635 
636   if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT) {
637     ++outer_->num_unresolved_best_effort_may_block_;
638   }
639 
640   if (blocking_type == BlockingType::WILL_BLOCK) {
641     incremented_max_tasks_since_blocked_ = true;
642     outer_->IncrementMaxTasksLockRequired();
643     outer_->EnsureEnoughWorkersLockRequired(&executor);
644   } else {
645     ++outer_->num_unresolved_may_block_;
646   }
647 
648   outer_->MaybeScheduleAdjustMaxTasksLockRequired(&executor);
649 }
650 
BlockingTypeUpgraded()651 void ThreadGroupImpl::WorkerDelegate::BlockingTypeUpgraded() {
652   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
653   // Skip if this blocking scope happened outside of a RunTask.
654   if (!read_worker().current_task_priority) {
655     return;
656   }
657 
658   // The blocking type always being WILL_BLOCK in this experiment and with
659   // time overrides, it should never be considered "upgraded".
660   if (base::subtle::ScopedTimeClockOverrides::overrides_active()) {
661     return;
662   }
663 
664   ScopedCommandsExecutor executor(outer_.get());
665   CheckedAutoLock auto_lock(outer_->lock_);
666 
667   // Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the
668   // same scope already caused the max tasks to be incremented.
669   if (incremented_max_tasks_since_blocked_) {
670     return;
671   }
672 
673   // Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the
674   // same scope.
675   --outer_->num_unresolved_may_block_;
676 
677   incremented_max_tasks_since_blocked_ = true;
678   outer_->IncrementMaxTasksLockRequired();
679   outer_->EnsureEnoughWorkersLockRequired(&executor);
680 }
681 
BlockingEnded()682 void ThreadGroupImpl::WorkerDelegate::BlockingEnded() {
683   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
684   // Skip if this blocking scope happened outside of a RunTask.
685   if (!read_worker().current_task_priority) {
686     return;
687   }
688 
689   CheckedAutoLock auto_lock(outer_->lock_);
690   DCHECK(!read_worker().blocking_start_time.is_null());
691   write_worker().blocking_start_time = TimeTicks();
692   if (!incremented_max_tasks_for_shutdown_) {
693     if (incremented_max_tasks_since_blocked_) {
694       outer_->DecrementMaxTasksLockRequired();
695     } else {
696       --outer_->num_unresolved_may_block_;
697     }
698 
699     if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
700       if (incremented_max_best_effort_tasks_since_blocked_) {
701         outer_->DecrementMaxBestEffortTasksLockRequired();
702       } else {
703         --outer_->num_unresolved_best_effort_may_block_;
704       }
705     }
706   }
707 
708   incremented_max_tasks_since_blocked_ = false;
709   incremented_max_best_effort_tasks_since_blocked_ = false;
710 }
711 
712 // BlockingObserver:
713 // Notifies the worker of shutdown, possibly marking the running task as
714 // MAY_BLOCK.
OnShutdownStartedLockRequired(BaseScopedCommandsExecutor * executor)715 void ThreadGroupImpl::WorkerDelegate::OnShutdownStartedLockRequired(
716     BaseScopedCommandsExecutor* executor) {
717   if (!read_any().is_running_task()) {
718     return;
719   }
720   // Workers running a CONTINUE_ON_SHUTDOWN tasks are replaced by incrementing
721   // max_tasks/max_best_effort_tasks. The effect is reverted in
722   // DidProcessTask().
723   if (*read_any().current_shutdown_behavior ==
724       TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
725     incremented_max_tasks_for_shutdown_ = true;
726     IncrementMaxTasksLockRequired();
727   }
728 }
729 
730 // Increments max [best effort] tasks iff this worker has been within a
731 // ScopedBlockingCall for more than |may_block_threshold|.
MaybeIncrementMaxTasksLockRequired()732 void ThreadGroupImpl::WorkerDelegate::MaybeIncrementMaxTasksLockRequired()
733     EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
734   if (read_any().blocking_start_time.is_null() ||
735       subtle::TimeTicksNowIgnoringOverride() - read_any().blocking_start_time <
736           outer_->after_start().may_block_threshold) {
737     return;
738   }
739   IncrementMaxTasksLockRequired();
740 }
741 
742 // Increments max [best effort] tasks.
IncrementMaxTasksLockRequired()743 void ThreadGroupImpl::WorkerDelegate::IncrementMaxTasksLockRequired()
744     EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
745   if (!incremented_max_tasks_since_blocked_) {
746     outer_->IncrementMaxTasksLockRequired();
747     // Update state for an unresolved ScopedBlockingCall.
748     if (!read_any().blocking_start_time.is_null()) {
749       incremented_max_tasks_since_blocked_ = true;
750       --outer_->num_unresolved_may_block_;
751     }
752   }
753   if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT &&
754       !incremented_max_best_effort_tasks_since_blocked_) {
755     outer_->IncrementMaxBestEffortTasksLockRequired();
756     // Update state for an unresolved ScopedBlockingCall.
757     if (!read_any().blocking_start_time.is_null()) {
758       incremented_max_best_effort_tasks_since_blocked_ = true;
759       --outer_->num_unresolved_best_effort_may_block_;
760     }
761   }
762 }
763 
JoinForTesting()764 void ThreadGroupImpl::JoinForTesting() {
765   decltype(workers_) workers_copy;
766   {
767     CheckedAutoLock auto_lock(lock_);
768     priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
769 
770     DCHECK_GT(workers_.size(), size_t(0))
771         << "Joined an unstarted thread group.";
772 
773     join_for_testing_started_ = true;
774 
775     // Ensure WorkerThreads in |workers_| do not attempt to cleanup while
776     // being joined.
777     worker_cleanup_disallowed_for_testing_ = true;
778 
779     // Make a copy of the WorkerThreads so that we can call
780     // WorkerThread::JoinForTesting() without holding |lock_| since
781     // WorkerThreads may need to access |workers_|.
782     workers_copy = workers_;
783   }
784   for (const auto& worker : workers_copy) {
785     static_cast<WorkerThread*>(worker.get())->JoinForTesting();
786   }
787 
788   CheckedAutoLock auto_lock(lock_);
789   DCHECK(workers_ == workers_copy);
790   // Release |workers_| to clear their TrackedRef against |this|.
791   workers_.clear();
792 }
793 
NumberOfIdleWorkersLockRequiredForTesting() const794 size_t ThreadGroupImpl::NumberOfIdleWorkersLockRequiredForTesting() const {
795   return idle_workers_set_.Size();
796 }
797 
MaintainAtLeastOneIdleWorkerLockRequired(ScopedCommandsExecutor * executor)798 void ThreadGroupImpl::MaintainAtLeastOneIdleWorkerLockRequired(
799     ScopedCommandsExecutor* executor) {
800   if (workers_.size() == kMaxNumberOfWorkers) {
801     return;
802   }
803   DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
804 
805   if (!idle_workers_set_.IsEmpty()) {
806     return;
807   }
808 
809   if (workers_.size() >= max_tasks_) {
810     return;
811   }
812 
813   scoped_refptr<WorkerThread> new_worker =
814       CreateAndRegisterWorkerLockRequired(executor);
815   DCHECK(new_worker);
816   idle_workers_set_.Insert(new_worker.get());
817 }
818 
819 scoped_refptr<WorkerThread>
CreateAndRegisterWorkerLockRequired(ScopedCommandsExecutor * executor)820 ThreadGroupImpl::CreateAndRegisterWorkerLockRequired(
821     ScopedCommandsExecutor* executor) {
822   DCHECK(!join_for_testing_started_);
823   DCHECK_LT(workers_.size(), max_tasks_);
824   DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
825   DCHECK(idle_workers_set_.IsEmpty());
826 
827   // WorkerThread needs |lock_| as a predecessor for its thread lock because in
828   // GetWork(), |lock_| is first acquired and then the thread lock is acquired
829   // when GetLastUsedTime() is called on the worker by CanGetWorkLockRequired().
830   scoped_refptr<WorkerThread> worker = MakeRefCounted<WorkerThread>(
831       thread_type_hint_,
832       std::make_unique<WorkerDelegate>(
833           tracked_ref_factory_.GetTrackedRef(),
834           /* is_excess=*/workers_.size() >= after_start().initial_max_tasks),
835       task_tracker_, worker_sequence_num_++, &lock_);
836 
837   workers_.push_back(worker);
838   executor->ScheduleStart(worker);
839   DCHECK_LE(workers_.size(), max_tasks_);
840 
841   return worker;
842 }
843 
GetNumAwakeWorkersLockRequired() const844 size_t ThreadGroupImpl::GetNumAwakeWorkersLockRequired() const {
845   DCHECK_GE(workers_.size(), idle_workers_set_.Size());
846   size_t num_awake_workers = workers_.size() - idle_workers_set_.Size();
847   DCHECK_GE(num_awake_workers, num_running_tasks_);
848   return num_awake_workers;
849 }
850 
DidUpdateCanRunPolicy()851 void ThreadGroupImpl::DidUpdateCanRunPolicy() {
852   ScopedCommandsExecutor executor(this);
853   CheckedAutoLock auto_lock(lock_);
854   EnsureEnoughWorkersLockRequired(&executor);
855 }
856 
OnShutdownStarted()857 void ThreadGroupImpl::OnShutdownStarted() {
858   ScopedCommandsExecutor executor(this);
859   CheckedAutoLock auto_lock(lock_);
860 
861   // Don't do anything if the thread group isn't started.
862   if (max_tasks_ == 0) {
863     return;
864   }
865   if (join_for_testing_started_) [[unlikely]] {
866     return;
867   }
868 
869   // Start a MAY_BLOCK scope on each worker that is already running a task.
870   for (scoped_refptr<WorkerThread>& worker : workers_) {
871     // The delegates of workers inside a ThreadGroupImpl should be
872     // `WorkerDelegate`s.
873     WorkerDelegate* delegate = static_cast<WorkerDelegate*>(worker->delegate());
874     AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
875     delegate->OnShutdownStartedLockRequired(&executor);
876   }
877   EnsureEnoughWorkersLockRequired(&executor);
878 
879   shutdown_started_ = true;
880 }
881 
EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor * base_executor)882 void ThreadGroupImpl::EnsureEnoughWorkersLockRequired(
883     BaseScopedCommandsExecutor* base_executor) {
884   // Don't do anything if the thread group isn't started.
885   if (max_tasks_ == 0) {
886     return;
887   }
888   if (join_for_testing_started_) [[unlikely]] {
889     return;
890   }
891 
892   ScopedCommandsExecutor* executor =
893       static_cast<ScopedCommandsExecutor*>(base_executor);
894 
895   const size_t desired_num_awake_workers =
896       GetDesiredNumAwakeWorkersLockRequired();
897   const size_t num_awake_workers = GetNumAwakeWorkersLockRequired();
898 
899   size_t num_workers_to_wake_up =
900       ClampSub(desired_num_awake_workers, num_awake_workers);
901   num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(2U));
902 
903   // Wake up the appropriate number of workers.
904   for (size_t i = 0; i < num_workers_to_wake_up; ++i) {
905     MaintainAtLeastOneIdleWorkerLockRequired(executor);
906     WorkerThread* worker_to_wakeup = idle_workers_set_.Take();
907     DCHECK(worker_to_wakeup);
908     executor->ScheduleWakeUp(worker_to_wakeup);
909   }
910 
911   // In the case where the loop above didn't wake up any worker and we don't
912   // have excess workers, the idle worker should be maintained. This happens
913   // when called from the last worker awake, or a recent increase in |max_tasks|
914   // now makes it possible to keep an idle worker.
915   if (desired_num_awake_workers == num_awake_workers) {
916     MaintainAtLeastOneIdleWorkerLockRequired(executor);
917   }
918 
919   // This function is called every time a task source is (re-)enqueued,
920   // hence the minimum priority needs to be updated.
921   UpdateMinAllowedPriorityLockRequired();
922 
923   // Ensure that the number of workers is periodically adjusted if needed.
924   MaybeScheduleAdjustMaxTasksLockRequired(executor);
925 }
926 
IsOnIdleSetLockRequired(WorkerThread * worker) const927 bool ThreadGroupImpl::IsOnIdleSetLockRequired(WorkerThread* worker) const {
928   // To avoid searching through the idle set : use GetLastUsedTime() not being
929   // null (or being directly on top of the idle set) as a proxy for being on
930   // the idle set.
931   return idle_workers_set_.Peek() == worker ||
932          !worker->GetLastUsedTime().is_null();
933 }
934 
ScheduleAdjustMaxTasks()935 void ThreadGroupImpl::ScheduleAdjustMaxTasks() {
936   // |adjust_max_tasks_posted_| can't change before the task posted below runs.
937   // Skip check on NaCl to avoid unsafe reference acquisition warning.
938 #if !BUILDFLAG(IS_NACL)
939   DCHECK(TS_UNCHECKED_READ(adjust_max_tasks_posted_));
940 #endif
941 
942   after_start().service_thread_task_runner->PostDelayedTask(
943       FROM_HERE, BindOnce(&ThreadGroupImpl::AdjustMaxTasks, Unretained(this)),
944       after_start().blocked_workers_poll_period);
945 }
946 
AdjustMaxTasks()947 void ThreadGroupImpl::AdjustMaxTasks() {
948   DCHECK(
949       after_start().service_thread_task_runner->RunsTasksInCurrentSequence());
950 
951   ScopedCommandsExecutor executor(this);
952   CheckedAutoLock auto_lock(lock_);
953   DCHECK(adjust_max_tasks_posted_);
954   adjust_max_tasks_posted_ = false;
955 
956   // Increment max tasks for each worker that has been within a MAY_BLOCK
957   // ScopedBlockingCall for more than may_block_threshold.
958   for (scoped_refptr<WorkerThread> worker : workers_) {
959     // The delegates of workers inside a ThreadGroupImpl should be
960     // `WorkerDelegate`s.
961     WorkerDelegate* delegate = static_cast<WorkerDelegate*>(worker->delegate());
962     AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
963     delegate->MaybeIncrementMaxTasksLockRequired();
964   }
965 
966   // Wake up workers according to the updated |max_tasks_|. This will also
967   // reschedule AdjustMaxTasks() if necessary.
968   EnsureEnoughWorkersLockRequired(&executor);
969 }
970 
971 }  // namespace internal
972 }  // namespace base
973