1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_group_impl.h"
6
7 #include <optional>
8 #include <string_view>
9
10 #include "base/auto_reset.h"
11 #include "base/metrics/histogram_macros.h"
12 #include "base/sequence_token.h"
13 #include "base/strings/stringprintf.h"
14 #include "base/task/common/checked_lock.h"
15 #include "base/task/thread_pool/worker_thread.h"
16 #include "base/threading/scoped_blocking_call.h"
17 #include "base/threading/scoped_blocking_call_internal.h"
18 #include "base/threading/thread_checker.h"
19 #include "base/time/time_override.h"
20 #include "base/trace_event/base_tracing.h"
21 #include "third_party/abseil-cpp/absl/container/inlined_vector.h"
22
23 namespace base {
24 namespace internal {
25
26 namespace {
27
28 constexpr size_t kMaxNumberOfWorkers = 256;
29
30 } // namespace
31
32 // Upon destruction, executes actions that control the number of active workers.
33 // Useful to satisfy locking requirements of these actions.
34 class ThreadGroupImpl::ScopedCommandsExecutor
35 : public ThreadGroup::BaseScopedCommandsExecutor {
36 public:
ScopedCommandsExecutor(ThreadGroupImpl * outer)37 explicit ScopedCommandsExecutor(ThreadGroupImpl* outer)
38 : BaseScopedCommandsExecutor(outer) {}
39
40 ScopedCommandsExecutor(const ScopedCommandsExecutor&) = delete;
41 ScopedCommandsExecutor& operator=(const ScopedCommandsExecutor&) = delete;
~ScopedCommandsExecutor()42 ~ScopedCommandsExecutor() override {
43 CheckedLock::AssertNoLockHeldOnCurrentThread();
44
45 // Wake up workers.
46 for (auto worker : workers_to_wake_up_) {
47 worker->WakeUp();
48 }
49 }
50
ScheduleWakeUp(scoped_refptr<WorkerThread> worker)51 void ScheduleWakeUp(scoped_refptr<WorkerThread> worker) {
52 workers_to_wake_up_.emplace_back(std::move(worker));
53 }
54
55 private:
56 absl::InlinedVector<scoped_refptr<WorkerThread>, 2> workers_to_wake_up_;
57 };
58
59 class ThreadGroupImpl::WorkerDelegate : public WorkerThread::Delegate,
60 public BlockingObserver {
61 public:
62 // |outer| owns the worker for which this delegate is constructed. If
63 // |is_excess| is true, this worker will be eligible for reclaim.
64 explicit WorkerDelegate(TrackedRef<ThreadGroupImpl> outer, bool is_excess);
65 WorkerDelegate(const WorkerDelegate&) = delete;
66 WorkerDelegate& operator=(const WorkerDelegate&) = delete;
67
68 // WorkerThread::Delegate:
69 void OnMainEntry(WorkerThread* worker) override;
70 void OnMainExit(WorkerThread* worker) override;
71 RegisteredTaskSource GetWork(WorkerThread* worker) override;
72 RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
73 WorkerThread* worker) override;
74 void RecordUnnecessaryWakeup() override;
75 TimeDelta GetSleepTimeout() override;
76
77 // BlockingObserver:
78 void BlockingStarted(BlockingType blocking_type) override;
79 void BlockingTypeUpgraded() override;
80 void BlockingEnded() override;
81
82 // WorkerThread::Delegate:
83
84 // Notifies the worker of shutdown, possibly marking the running task as
85 // MAY_BLOCK.
86 void OnShutdownStartedLockRequired(BaseScopedCommandsExecutor* executor)
87 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
88
89 // Increments max [best effort] tasks iff this worker has been within a
90 // ScopedBlockingCall for more than |may_block_threshold|.
91 void MaybeIncrementMaxTasksLockRequired()
92 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
93
94 // Increments max [best effort] tasks.
95 void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
96
current_task_priority_lock_required() const97 TaskPriority current_task_priority_lock_required() const
98 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
99 return *read_any().current_task_priority;
100 }
101
102 // Exposed for AnnotateAcquiredLockAlias.
lock() const103 const CheckedLock& lock() const LOCK_RETURNED(outer_->lock_) {
104 return outer_->lock_;
105 }
106
107 private:
108 // Returns true iff the worker can get work. Cleans up the worker or puts it
109 // on the idle set if it can't get work.
110 bool CanGetWorkLockRequired(BaseScopedCommandsExecutor* executor,
111 WorkerThread* worker)
112 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
113
114 // Calls cleanup on |worker| and removes it from the thread group. Called from
115 // GetWork() when no work is available and CanCleanupLockRequired() returns
116 // true.
117 void CleanupLockRequired(BaseScopedCommandsExecutor* executor,
118 WorkerThread* worker)
119 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
120
121 // Called in GetWork() when a worker becomes idle.
122 void OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor* executor,
123 WorkerThread* worker)
124 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
125
126 RegisteredTaskSource GetWorkLockRequired(BaseScopedCommandsExecutor* executor,
127 WorkerThread* worker)
128 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
129
130 // Returns true if |worker| is allowed to cleanup and remove itself from the
131 // thread group. Called from GetWork() when no work is available.
132 bool CanCleanupLockRequired(const WorkerThread* worker)
133 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);
134
135 // Only used in DCHECKs.
136 template <typename Worker>
ContainsWorker(const std::vector<scoped_refptr<Worker>> & workers,const WorkerThread * worker)137 bool ContainsWorker(const std::vector<scoped_refptr<Worker>>& workers,
138 const WorkerThread* worker) {
139 auto it = ranges::find_if(
140 workers,
141 [worker](const scoped_refptr<Worker>& i) { return i.get() == worker; });
142 return it != workers.end();
143 }
144
145 // Accessed only from the worker thread.
146 struct WorkerOnly {
147 WorkerOnly();
148 ~WorkerOnly();
149 // Associated WorkerThread, if any, initialized in OnMainEntry().
150 raw_ptr<WorkerThread> worker_thread_;
151
152 #if BUILDFLAG(IS_WIN)
153 std::unique_ptr<win::ScopedWindowsThreadEnvironment> win_thread_environment;
154 #endif // BUILDFLAG(IS_WIN)
155 } worker_only_;
156
157 // Writes from the worker thread protected by |outer_->lock_|. Reads from any
158 // thread, protected by |outer_->lock_| when not on the worker thread.
159 struct WriteWorkerReadAny {
160 // The priority of the task the worker is currently running if any.
161 std::optional<TaskPriority> current_task_priority;
162 // The shutdown behavior of the task the worker is currently running if any.
163 std::optional<TaskShutdownBehavior> current_shutdown_behavior;
164
165 // Time when MayBlockScopeEntered() was last called. Reset when
166 // BlockingScopeExited() is called.
167 TimeTicks blocking_start_time;
168
169 // Whether the worker is currently running a task (i.e. GetWork() has
170 // returned a non-empty task source and DidProcessTask() hasn't been called
171 // yet).
is_running_taskbase::internal::ThreadGroupImpl::WorkerDelegate::WriteWorkerReadAny172 bool is_running_task() const { return !!current_shutdown_behavior; }
173 } write_worker_read_any_;
174
worker_only()175 WorkerOnly& worker_only() {
176 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
177 return worker_only_;
178 }
179
write_worker()180 WriteWorkerReadAny& write_worker() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
181 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
182 return write_worker_read_any_;
183 }
184
read_any() const185 const WriteWorkerReadAny& read_any() const
186 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
187 return write_worker_read_any_;
188 }
189
read_worker() const190 const WriteWorkerReadAny& read_worker() const {
191 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
192 return write_worker_read_any_;
193 }
194
195 const TrackedRef<ThreadGroupImpl> outer_;
196
197 // Whether the worker is in excess. This must be decided at worker creation
198 // time to prevent unnecessarily discarding TLS state, as well as any behavior
199 // the OS has learned about a given thread.
200 const bool is_excess_;
201
202 // Whether |outer_->max_tasks_|/|outer_->max_best_effort_tasks_| were
203 // incremented due to a ScopedBlockingCall on the thread.
204 bool incremented_max_tasks_since_blocked_ GUARDED_BY(outer_->lock_) = false;
205 bool incremented_max_best_effort_tasks_since_blocked_
206 GUARDED_BY(outer_->lock_) = false;
207 // Whether |outer_->max_tasks_| and |outer_->max_best_effort_tasks_| was
208 // incremented due to running CONTINUE_ON_SHUTDOWN on the thread during
209 // shutdown.
210 bool incremented_max_tasks_for_shutdown_ GUARDED_BY(outer_->lock_) = false;
211
212 // Verifies that specific calls are always made from the worker thread.
213 THREAD_CHECKER(worker_thread_checker_);
214 };
215
ThreadGroupImpl(std::string_view histogram_label,std::string_view thread_group_label,ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,TrackedRef<Delegate> delegate)216 ThreadGroupImpl::ThreadGroupImpl(std::string_view histogram_label,
217 std::string_view thread_group_label,
218 ThreadType thread_type_hint,
219 TrackedRef<TaskTracker> task_tracker,
220 TrackedRef<Delegate> delegate)
221 : ThreadGroup(histogram_label,
222 thread_group_label,
223 thread_type_hint,
224 std::move(task_tracker),
225 std::move(delegate)),
226 tracked_ref_factory_(this) {
227 DCHECK(!thread_group_label_.empty());
228 }
229
Start(size_t max_tasks,size_t max_best_effort_tasks,TimeDelta suggested_reclaim_time,scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,WorkerThreadObserver * worker_thread_observer,WorkerEnvironment worker_environment,bool synchronous_thread_start_for_testing,std::optional<TimeDelta> may_block_threshold)230 void ThreadGroupImpl::Start(
231 size_t max_tasks,
232 size_t max_best_effort_tasks,
233 TimeDelta suggested_reclaim_time,
234 scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
235 WorkerThreadObserver* worker_thread_observer,
236 WorkerEnvironment worker_environment,
237 bool synchronous_thread_start_for_testing,
238 std::optional<TimeDelta> may_block_threshold) {
239 ThreadGroup::StartImpl(
240 max_tasks, max_best_effort_tasks, suggested_reclaim_time,
241 service_thread_task_runner, worker_thread_observer, worker_environment,
242 synchronous_thread_start_for_testing, may_block_threshold);
243
244 ScopedCommandsExecutor executor(this);
245 CheckedAutoLock auto_lock(lock_);
246 DCHECK(workers_.empty());
247 EnsureEnoughWorkersLockRequired(&executor);
248 }
249
~ThreadGroupImpl()250 ThreadGroupImpl::~ThreadGroupImpl() {
251 // ThreadGroup should only ever be deleted:
252 // 1) In tests, after JoinForTesting().
253 // 2) In production, iff initialization failed.
254 // In both cases |workers_| should be empty.
255 DCHECK(workers_.empty());
256 }
257
UpdateSortKey(TaskSource::Transaction transaction)258 void ThreadGroupImpl::UpdateSortKey(TaskSource::Transaction transaction) {
259 ScopedCommandsExecutor executor(this);
260 UpdateSortKeyImpl(&executor, std::move(transaction));
261 }
262
PushTaskSourceAndWakeUpWorkers(RegisteredTaskSourceAndTransaction transaction_with_task_source)263 void ThreadGroupImpl::PushTaskSourceAndWakeUpWorkers(
264 RegisteredTaskSourceAndTransaction transaction_with_task_source) {
265 ScopedCommandsExecutor executor(this);
266 PushTaskSourceAndWakeUpWorkersImpl(&executor,
267 std::move(transaction_with_task_source));
268 }
269
WorkerDelegate(TrackedRef<ThreadGroupImpl> outer,bool is_excess)270 ThreadGroupImpl::WorkerDelegate::WorkerDelegate(
271 TrackedRef<ThreadGroupImpl> outer,
272 bool is_excess)
273 : outer_(outer), is_excess_(is_excess) {
274 // Bound in OnMainEntry().
275 DETACH_FROM_THREAD(worker_thread_checker_);
276 }
277
278 ThreadGroupImpl::WorkerDelegate::WorkerOnly::WorkerOnly() = default;
279 ThreadGroupImpl::WorkerDelegate::WorkerOnly::~WorkerOnly() = default;
280
GetSleepTimeout()281 TimeDelta ThreadGroupImpl::WorkerDelegate::GetSleepTimeout() {
282 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
283 if (!is_excess_) {
284 return TimeDelta::Max();
285 }
286 // Sleep for an extra 10% to avoid the following pathological case:
287 // 0) A task is running on a timer which matches
288 // |after_start().suggested_reclaim_time|.
289 // 1) The timer fires and this worker is created by
290 // MaintainAtLeastOneIdleWorkerLockRequired() because the last idle
291 // worker was assigned the task.
292 // 2) This worker begins sleeping |after_start().suggested_reclaim_time|
293 // (at the front of the idle set).
294 // 3) The task assigned to the other worker completes and the worker goes
295 // back in the idle set (this worker may now second on the idle set;
296 // its GetLastUsedTime() is set to Now()).
297 // 4) The sleep in (2) expires. Since (3) was fast this worker is likely
298 // to have been second on the idle set long enough for
299 // CanCleanupLockRequired() to be satisfied in which case this worker
300 // is cleaned up.
301 // 5) The timer fires at roughly the same time and we're back to (1) if
302 // (4) resulted in a clean up; causing thread churn.
303 //
304 // Sleeping 10% longer in (2) makes it much less likely that (4) occurs
305 // before (5). In that case (5) will cause (3) and refresh this worker's
306 // GetLastUsedTime(), making CanCleanupLockRequired() return false in (4)
307 // and avoiding churn.
308 //
309 // Of course the same problem arises if in (0) the timer matches
310 // |after_start().suggested_reclaim_time * 1.1| but it's expected that any
311 // timer slower than |after_start().suggested_reclaim_time| will cause
312 // such churn during long idle periods. If this is a problem in practice,
313 // the standby thread configuration and algorithm should be revisited.
314 return outer_->after_start().suggested_reclaim_time * 1.1;
315 }
316
OnMainEntry(WorkerThread * worker)317 void ThreadGroupImpl::WorkerDelegate::OnMainEntry(WorkerThread* worker) {
318 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
319
320 {
321 #if DCHECK_IS_ON()
322 CheckedAutoLock auto_lock(outer_->lock_);
323 DCHECK(
324 ContainsWorker(outer_->workers_, static_cast<WorkerThread*>(worker)));
325 #endif
326 }
327
328 #if BUILDFLAG(IS_WIN)
329 worker_only().win_thread_environment = GetScopedWindowsThreadEnvironment(
330 outer_->after_start().worker_environment);
331 #endif // BUILDFLAG(IS_WIN)
332
333 PlatformThread::SetName(
334 StringPrintf("ThreadPool%sWorker", outer_->thread_group_label_.c_str()));
335
336 outer_->BindToCurrentThread();
337 worker_only().worker_thread_ = static_cast<WorkerThread*>(worker);
338 SetBlockingObserverForCurrentThread(this);
339
340 if (outer_->worker_started_for_testing_) {
341 // When |worker_started_for_testing_| is set, the thread that starts workers
342 // should wait for a worker to have started before starting the next one,
343 // and there should only be one thread that wakes up workers at a time.
344 DCHECK(!outer_->worker_started_for_testing_->IsSignaled());
345 outer_->worker_started_for_testing_->Signal();
346 }
347 }
348
OnMainExit(WorkerThread * worker_base)349 void ThreadGroupImpl::WorkerDelegate::OnMainExit(WorkerThread* worker_base) {
350 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
351
352 #if DCHECK_IS_ON()
353 WorkerThread* worker = static_cast<WorkerThread*>(worker_base);
354 {
355 bool shutdown_complete = outer_->task_tracker_->IsShutdownComplete();
356 CheckedAutoLock auto_lock(outer_->lock_);
357
358 // |worker| should already have been removed from the idle workers set and
359 // |workers_| by the time the thread is about to exit. (except in the
360 // cases where the thread group is no longer going to be used - in which
361 // case, it's fine for there to be invalid workers in the thread group).
362 if (!shutdown_complete && !outer_->join_for_testing_started_) {
363 DCHECK(!outer_->idle_workers_set_.Contains(worker));
364 DCHECK(!ContainsWorker(outer_->workers_, worker));
365 }
366 }
367 #endif
368
369 #if BUILDFLAG(IS_WIN)
370 worker_only().win_thread_environment.reset();
371 #endif // BUILDFLAG(IS_WIN)
372
373 // Count cleaned up workers for tests. It's important to do this here
374 // instead of at the end of CleanupLockRequired() because some side-effects
375 // of cleaning up happen outside the lock (e.g. recording histograms) and
376 // resuming from tests must happen-after that point or checks on the main
377 // thread will be flaky (crbug.com/1047733).
378 CheckedAutoLock auto_lock(outer_->lock_);
379 ++outer_->num_workers_cleaned_up_for_testing_;
380 #if DCHECK_IS_ON()
381 outer_->some_workers_cleaned_up_for_testing_ = true;
382 #endif
383 if (outer_->num_workers_cleaned_up_for_testing_cv_) {
384 outer_->num_workers_cleaned_up_for_testing_cv_->Signal();
385 }
386 }
387
CanGetWorkLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)388 bool ThreadGroupImpl::WorkerDelegate::CanGetWorkLockRequired(
389 BaseScopedCommandsExecutor* executor,
390 WorkerThread* worker_base) {
391 WorkerThread* worker = static_cast<WorkerThread*>(worker_base);
392
393 const bool is_on_idle_workers_set = outer_->IsOnIdleSetLockRequired(worker);
394 DCHECK_EQ(is_on_idle_workers_set, outer_->idle_workers_set_.Contains(worker));
395
396 // This occurs when the when WorkerThread::Delegate::WaitForWork() times out
397 // (i.e. when the worker's wakes up after GetSleepTimeout()).
398 if (is_on_idle_workers_set) {
399 if (CanCleanupLockRequired(worker)) {
400 CleanupLockRequired(executor, worker);
401 }
402 return false;
403 }
404
405 // If too many workers are running, this worker should not get work, until
406 // tasks are no longer in excess (i.e. max tasks increases). This ensures that
407 // if this worker is in excess, it gets a chance to being cleaned up.
408 if (outer_->GetNumAwakeWorkersLockRequired() > outer_->max_tasks_) {
409 OnWorkerBecomesIdleLockRequired(executor, worker);
410 return false;
411 }
412
413 return true;
414 }
415
GetWork(WorkerThread * worker)416 RegisteredTaskSource ThreadGroupImpl::WorkerDelegate::GetWork(
417 WorkerThread* worker) {
418 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
419 DCHECK(!read_worker().current_task_priority);
420 DCHECK(!read_worker().current_shutdown_behavior);
421
422 ScopedCommandsExecutor executor(outer_.get());
423 CheckedAutoLock auto_lock(outer_->lock_);
424 return GetWorkLockRequired(&executor, worker);
425 }
426
GetWorkLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker)427 RegisteredTaskSource ThreadGroupImpl::WorkerDelegate::GetWorkLockRequired(
428 BaseScopedCommandsExecutor* executor,
429 WorkerThread* worker) {
430 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
431 DCHECK(ContainsWorker(outer_->workers_, worker));
432
433 if (!CanGetWorkLockRequired(executor, worker)) {
434 return nullptr;
435 }
436
437 RegisteredTaskSource task_source;
438 TaskPriority priority;
439 while (!task_source && !outer_->priority_queue_.IsEmpty()) {
440 // Enforce the CanRunPolicy and that no more than |max_best_effort_tasks_|
441 // BEST_EFFORT tasks run concurrently.
442 priority = outer_->priority_queue_.PeekSortKey().priority();
443 if (!outer_->task_tracker_->CanRunPriority(priority) ||
444 (priority == TaskPriority::BEST_EFFORT &&
445 outer_->num_running_best_effort_tasks_ >=
446 outer_->max_best_effort_tasks_)) {
447 break;
448 }
449
450 task_source = outer_->TakeRegisteredTaskSource(executor);
451 }
452 if (!task_source) {
453 OnWorkerBecomesIdleLockRequired(executor, worker);
454 return nullptr;
455 }
456
457 // Running task bookkeeping.
458 outer_->IncrementTasksRunningLockRequired(priority);
459
460 write_worker().current_task_priority = priority;
461 write_worker().current_shutdown_behavior = task_source->shutdown_behavior();
462
463 // Subtle: This must be after the call to WillRunTask() inside
464 // TakeRegisteredTaskSource(), so that any state used by WillRunTask() to
465 // determine that the task source must remain in the TaskQueue is also used
466 // to determine the desired number of workers. Concretely, this wouldn't
467 // work:
468 //
469 // Thread 1: GetWork() calls EnsureEnoughWorkers(). No worker woken up
470 // because the queue contains a job with max concurrency = 1 and
471 // the current worker is awake.
472 // Thread 2: Increases the job's max concurrency.
473 // ShouldQueueUponCapacityIncrease() returns false because the
474 // job is already queued.
475 // Thread 1: Calls WillRunTask() on the job. It returns
476 // kAllowedNotSaturated because max concurrency is not reached.
477 // But no extra worker is woken up to run the job!
478 outer_->EnsureEnoughWorkersLockRequired(executor);
479
480 return task_source;
481 }
482
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)483 RegisteredTaskSource ThreadGroupImpl::WorkerDelegate::SwapProcessedTask(
484 RegisteredTaskSource task_source,
485 WorkerThread* worker) {
486 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
487 DCHECK(read_worker().current_task_priority);
488 DCHECK(read_worker().current_shutdown_behavior);
489
490 // A transaction to the TaskSource to reenqueue, if any. Instantiated here as
491 // |TaskSource::lock_| is a UniversalPredecessor and must always be acquired
492 // prior to acquiring a second lock
493 std::optional<RegisteredTaskSourceAndTransaction>
494 transaction_with_task_source;
495 if (task_source) {
496 transaction_with_task_source.emplace(
497 RegisteredTaskSourceAndTransaction::FromTaskSource(
498 std::move(task_source)));
499 }
500
501 // Calling WakeUp() guarantees that this WorkerThread will run Tasks from
502 // TaskSources returned by the GetWork() method of |delegate_| until it
503 // returns nullptr. Resetting |wake_up_event_| here doesn't break this
504 // invariant and avoids a useless loop iteration before going to sleep if
505 // WakeUp() is called while this WorkerThread is awake.
506 wake_up_event_.Reset();
507
508 ScopedCommandsExecutor workers_executor(outer_.get());
509 ScopedReenqueueExecutor reenqueue_executor;
510 CheckedAutoLock auto_lock(outer_->lock_);
511
512 // During shutdown, max_tasks may have been incremented in
513 // OnShutdownStartedLockRequired().
514 if (incremented_max_tasks_for_shutdown_) {
515 DCHECK(outer_->shutdown_started_);
516 outer_->DecrementMaxTasksLockRequired();
517 if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
518 outer_->DecrementMaxBestEffortTasksLockRequired();
519 }
520 incremented_max_tasks_since_blocked_ = false;
521 incremented_max_best_effort_tasks_since_blocked_ = false;
522 incremented_max_tasks_for_shutdown_ = false;
523 }
524
525 DCHECK(read_worker().blocking_start_time.is_null());
526 DCHECK(!incremented_max_tasks_since_blocked_);
527 DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
528
529 // Running task bookkeeping.
530 outer_->DecrementTasksRunningLockRequired(
531 *read_worker().current_task_priority);
532 write_worker().current_shutdown_behavior = std::nullopt;
533 write_worker().current_task_priority = std::nullopt;
534
535 if (transaction_with_task_source) {
536 outer_->ReEnqueueTaskSourceLockRequired(
537 &workers_executor, &reenqueue_executor,
538 std::move(transaction_with_task_source.value()));
539 }
540
541 return GetWorkLockRequired(&workers_executor,
542 static_cast<WorkerThread*>(worker));
543 }
544
CanCleanupLockRequired(const WorkerThread * worker)545 bool ThreadGroupImpl::WorkerDelegate::CanCleanupLockRequired(
546 const WorkerThread* worker) {
547 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
548 if (!is_excess_) {
549 return false;
550 }
551
552 const TimeTicks last_used_time = worker->GetLastUsedTime();
553 if (last_used_time.is_null() ||
554 subtle::TimeTicksNowIgnoringOverride() - last_used_time <
555 outer_->after_start().suggested_reclaim_time) {
556 return false;
557 }
558 if (!outer_->worker_cleanup_disallowed_for_testing_) [[likely]] {
559 return true;
560 }
561 return false;
562 }
563
CleanupLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)564 void ThreadGroupImpl::WorkerDelegate::CleanupLockRequired(
565 BaseScopedCommandsExecutor* executor,
566 WorkerThread* worker_base) {
567 WorkerThread* worker = static_cast<WorkerThread*>(worker_base);
568 DCHECK(!outer_->join_for_testing_started_);
569 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
570
571 worker->Cleanup();
572
573 if (outer_->IsOnIdleSetLockRequired(worker)) {
574 outer_->idle_workers_set_.Remove(worker);
575 }
576
577 // Remove the worker from |workers_|.
578 auto worker_iter = ranges::find(outer_->workers_, worker);
579 CHECK(worker_iter != outer_->workers_.end(), base::NotFatalUntil::M125);
580 outer_->workers_.erase(worker_iter);
581 }
582
OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor * executor,WorkerThread * worker_base)583 void ThreadGroupImpl::WorkerDelegate::OnWorkerBecomesIdleLockRequired(
584 BaseScopedCommandsExecutor* executor,
585 WorkerThread* worker_base) {
586 WorkerThread* worker = static_cast<WorkerThread*>(worker_base);
587
588 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
589 DCHECK(!outer_->idle_workers_set_.Contains(worker));
590
591 // Add the worker to the idle set.
592 outer_->idle_workers_set_.Insert(worker);
593 DCHECK_LE(outer_->idle_workers_set_.Size(), outer_->workers_.size());
594 outer_->idle_workers_set_cv_for_testing_.Broadcast();
595 }
596
RecordUnnecessaryWakeup()597 void ThreadGroupImpl::WorkerDelegate::RecordUnnecessaryWakeup() {
598 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
599
600 base::BooleanHistogram::FactoryGet(
601 std::string("ThreadPool.UnnecessaryWakeup.") + outer_->histogram_label_,
602 base::Histogram::kUmaTargetedHistogramFlag)
603 ->Add(true);
604
605 TRACE_EVENT_INSTANT("wakeup.flow", "ThreadPool.UnnecessaryWakeup");
606 }
607
BlockingStarted(BlockingType blocking_type)608 void ThreadGroupImpl::WorkerDelegate::BlockingStarted(
609 BlockingType blocking_type) {
610 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
611 DCHECK(worker_only().worker_thread_);
612 // Skip if this blocking scope happened outside of a RunTask.
613 if (!read_worker().current_task_priority) {
614 return;
615 }
616
617 worker_only().worker_thread_->MaybeUpdateThreadType();
618
619 // WillBlock is always used when time overrides is active. crbug.com/1038867
620 if (base::subtle::ScopedTimeClockOverrides::overrides_active()) {
621 blocking_type = BlockingType::WILL_BLOCK;
622 }
623
624 ScopedCommandsExecutor executor(outer_.get());
625 CheckedAutoLock auto_lock(outer_->lock_);
626
627 DCHECK(!incremented_max_tasks_since_blocked_);
628 DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
629 DCHECK(read_worker().blocking_start_time.is_null());
630 write_worker().blocking_start_time = subtle::TimeTicksNowIgnoringOverride();
631
632 if (incremented_max_tasks_for_shutdown_) {
633 return;
634 }
635
636 if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT) {
637 ++outer_->num_unresolved_best_effort_may_block_;
638 }
639
640 if (blocking_type == BlockingType::WILL_BLOCK) {
641 incremented_max_tasks_since_blocked_ = true;
642 outer_->IncrementMaxTasksLockRequired();
643 outer_->EnsureEnoughWorkersLockRequired(&executor);
644 } else {
645 ++outer_->num_unresolved_may_block_;
646 }
647
648 outer_->MaybeScheduleAdjustMaxTasksLockRequired(&executor);
649 }
650
BlockingTypeUpgraded()651 void ThreadGroupImpl::WorkerDelegate::BlockingTypeUpgraded() {
652 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
653 // Skip if this blocking scope happened outside of a RunTask.
654 if (!read_worker().current_task_priority) {
655 return;
656 }
657
658 // The blocking type always being WILL_BLOCK in this experiment and with
659 // time overrides, it should never be considered "upgraded".
660 if (base::subtle::ScopedTimeClockOverrides::overrides_active()) {
661 return;
662 }
663
664 ScopedCommandsExecutor executor(outer_.get());
665 CheckedAutoLock auto_lock(outer_->lock_);
666
667 // Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the
668 // same scope already caused the max tasks to be incremented.
669 if (incremented_max_tasks_since_blocked_) {
670 return;
671 }
672
673 // Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the
674 // same scope.
675 --outer_->num_unresolved_may_block_;
676
677 incremented_max_tasks_since_blocked_ = true;
678 outer_->IncrementMaxTasksLockRequired();
679 outer_->EnsureEnoughWorkersLockRequired(&executor);
680 }
681
BlockingEnded()682 void ThreadGroupImpl::WorkerDelegate::BlockingEnded() {
683 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
684 // Skip if this blocking scope happened outside of a RunTask.
685 if (!read_worker().current_task_priority) {
686 return;
687 }
688
689 CheckedAutoLock auto_lock(outer_->lock_);
690 DCHECK(!read_worker().blocking_start_time.is_null());
691 write_worker().blocking_start_time = TimeTicks();
692 if (!incremented_max_tasks_for_shutdown_) {
693 if (incremented_max_tasks_since_blocked_) {
694 outer_->DecrementMaxTasksLockRequired();
695 } else {
696 --outer_->num_unresolved_may_block_;
697 }
698
699 if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
700 if (incremented_max_best_effort_tasks_since_blocked_) {
701 outer_->DecrementMaxBestEffortTasksLockRequired();
702 } else {
703 --outer_->num_unresolved_best_effort_may_block_;
704 }
705 }
706 }
707
708 incremented_max_tasks_since_blocked_ = false;
709 incremented_max_best_effort_tasks_since_blocked_ = false;
710 }
711
712 // BlockingObserver:
713 // Notifies the worker of shutdown, possibly marking the running task as
714 // MAY_BLOCK.
OnShutdownStartedLockRequired(BaseScopedCommandsExecutor * executor)715 void ThreadGroupImpl::WorkerDelegate::OnShutdownStartedLockRequired(
716 BaseScopedCommandsExecutor* executor) {
717 if (!read_any().is_running_task()) {
718 return;
719 }
720 // Workers running a CONTINUE_ON_SHUTDOWN tasks are replaced by incrementing
721 // max_tasks/max_best_effort_tasks. The effect is reverted in
722 // DidProcessTask().
723 if (*read_any().current_shutdown_behavior ==
724 TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
725 incremented_max_tasks_for_shutdown_ = true;
726 IncrementMaxTasksLockRequired();
727 }
728 }
729
730 // Increments max [best effort] tasks iff this worker has been within a
731 // ScopedBlockingCall for more than |may_block_threshold|.
MaybeIncrementMaxTasksLockRequired()732 void ThreadGroupImpl::WorkerDelegate::MaybeIncrementMaxTasksLockRequired()
733 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
734 if (read_any().blocking_start_time.is_null() ||
735 subtle::TimeTicksNowIgnoringOverride() - read_any().blocking_start_time <
736 outer_->after_start().may_block_threshold) {
737 return;
738 }
739 IncrementMaxTasksLockRequired();
740 }
741
742 // Increments max [best effort] tasks.
IncrementMaxTasksLockRequired()743 void ThreadGroupImpl::WorkerDelegate::IncrementMaxTasksLockRequired()
744 EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
745 if (!incremented_max_tasks_since_blocked_) {
746 outer_->IncrementMaxTasksLockRequired();
747 // Update state for an unresolved ScopedBlockingCall.
748 if (!read_any().blocking_start_time.is_null()) {
749 incremented_max_tasks_since_blocked_ = true;
750 --outer_->num_unresolved_may_block_;
751 }
752 }
753 if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT &&
754 !incremented_max_best_effort_tasks_since_blocked_) {
755 outer_->IncrementMaxBestEffortTasksLockRequired();
756 // Update state for an unresolved ScopedBlockingCall.
757 if (!read_any().blocking_start_time.is_null()) {
758 incremented_max_best_effort_tasks_since_blocked_ = true;
759 --outer_->num_unresolved_best_effort_may_block_;
760 }
761 }
762 }
763
JoinForTesting()764 void ThreadGroupImpl::JoinForTesting() {
765 decltype(workers_) workers_copy;
766 {
767 CheckedAutoLock auto_lock(lock_);
768 priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();
769
770 DCHECK_GT(workers_.size(), size_t(0))
771 << "Joined an unstarted thread group.";
772
773 join_for_testing_started_ = true;
774
775 // Ensure WorkerThreads in |workers_| do not attempt to cleanup while
776 // being joined.
777 worker_cleanup_disallowed_for_testing_ = true;
778
779 // Make a copy of the WorkerThreads so that we can call
780 // WorkerThread::JoinForTesting() without holding |lock_| since
781 // WorkerThreads may need to access |workers_|.
782 workers_copy = workers_;
783 }
784 for (const auto& worker : workers_copy) {
785 static_cast<WorkerThread*>(worker.get())->JoinForTesting();
786 }
787
788 CheckedAutoLock auto_lock(lock_);
789 DCHECK(workers_ == workers_copy);
790 // Release |workers_| to clear their TrackedRef against |this|.
791 workers_.clear();
792 }
793
NumberOfIdleWorkersLockRequiredForTesting() const794 size_t ThreadGroupImpl::NumberOfIdleWorkersLockRequiredForTesting() const {
795 return idle_workers_set_.Size();
796 }
797
MaintainAtLeastOneIdleWorkerLockRequired(ScopedCommandsExecutor * executor)798 void ThreadGroupImpl::MaintainAtLeastOneIdleWorkerLockRequired(
799 ScopedCommandsExecutor* executor) {
800 if (workers_.size() == kMaxNumberOfWorkers) {
801 return;
802 }
803 DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
804
805 if (!idle_workers_set_.IsEmpty()) {
806 return;
807 }
808
809 if (workers_.size() >= max_tasks_) {
810 return;
811 }
812
813 scoped_refptr<WorkerThread> new_worker =
814 CreateAndRegisterWorkerLockRequired(executor);
815 DCHECK(new_worker);
816 idle_workers_set_.Insert(new_worker.get());
817 }
818
819 scoped_refptr<WorkerThread>
CreateAndRegisterWorkerLockRequired(ScopedCommandsExecutor * executor)820 ThreadGroupImpl::CreateAndRegisterWorkerLockRequired(
821 ScopedCommandsExecutor* executor) {
822 DCHECK(!join_for_testing_started_);
823 DCHECK_LT(workers_.size(), max_tasks_);
824 DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
825 DCHECK(idle_workers_set_.IsEmpty());
826
827 // WorkerThread needs |lock_| as a predecessor for its thread lock because in
828 // GetWork(), |lock_| is first acquired and then the thread lock is acquired
829 // when GetLastUsedTime() is called on the worker by CanGetWorkLockRequired().
830 scoped_refptr<WorkerThread> worker = MakeRefCounted<WorkerThread>(
831 thread_type_hint_,
832 std::make_unique<WorkerDelegate>(
833 tracked_ref_factory_.GetTrackedRef(),
834 /* is_excess=*/workers_.size() >= after_start().initial_max_tasks),
835 task_tracker_, worker_sequence_num_++, &lock_);
836
837 workers_.push_back(worker);
838 executor->ScheduleStart(worker);
839 DCHECK_LE(workers_.size(), max_tasks_);
840
841 return worker;
842 }
843
GetNumAwakeWorkersLockRequired() const844 size_t ThreadGroupImpl::GetNumAwakeWorkersLockRequired() const {
845 DCHECK_GE(workers_.size(), idle_workers_set_.Size());
846 size_t num_awake_workers = workers_.size() - idle_workers_set_.Size();
847 DCHECK_GE(num_awake_workers, num_running_tasks_);
848 return num_awake_workers;
849 }
850
DidUpdateCanRunPolicy()851 void ThreadGroupImpl::DidUpdateCanRunPolicy() {
852 ScopedCommandsExecutor executor(this);
853 CheckedAutoLock auto_lock(lock_);
854 EnsureEnoughWorkersLockRequired(&executor);
855 }
856
OnShutdownStarted()857 void ThreadGroupImpl::OnShutdownStarted() {
858 ScopedCommandsExecutor executor(this);
859 CheckedAutoLock auto_lock(lock_);
860
861 // Don't do anything if the thread group isn't started.
862 if (max_tasks_ == 0) {
863 return;
864 }
865 if (join_for_testing_started_) [[unlikely]] {
866 return;
867 }
868
869 // Start a MAY_BLOCK scope on each worker that is already running a task.
870 for (scoped_refptr<WorkerThread>& worker : workers_) {
871 // The delegates of workers inside a ThreadGroupImpl should be
872 // `WorkerDelegate`s.
873 WorkerDelegate* delegate = static_cast<WorkerDelegate*>(worker->delegate());
874 AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
875 delegate->OnShutdownStartedLockRequired(&executor);
876 }
877 EnsureEnoughWorkersLockRequired(&executor);
878
879 shutdown_started_ = true;
880 }
881
EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor * base_executor)882 void ThreadGroupImpl::EnsureEnoughWorkersLockRequired(
883 BaseScopedCommandsExecutor* base_executor) {
884 // Don't do anything if the thread group isn't started.
885 if (max_tasks_ == 0) {
886 return;
887 }
888 if (join_for_testing_started_) [[unlikely]] {
889 return;
890 }
891
892 ScopedCommandsExecutor* executor =
893 static_cast<ScopedCommandsExecutor*>(base_executor);
894
895 const size_t desired_num_awake_workers =
896 GetDesiredNumAwakeWorkersLockRequired();
897 const size_t num_awake_workers = GetNumAwakeWorkersLockRequired();
898
899 size_t num_workers_to_wake_up =
900 ClampSub(desired_num_awake_workers, num_awake_workers);
901 num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(2U));
902
903 // Wake up the appropriate number of workers.
904 for (size_t i = 0; i < num_workers_to_wake_up; ++i) {
905 MaintainAtLeastOneIdleWorkerLockRequired(executor);
906 WorkerThread* worker_to_wakeup = idle_workers_set_.Take();
907 DCHECK(worker_to_wakeup);
908 executor->ScheduleWakeUp(worker_to_wakeup);
909 }
910
911 // In the case where the loop above didn't wake up any worker and we don't
912 // have excess workers, the idle worker should be maintained. This happens
913 // when called from the last worker awake, or a recent increase in |max_tasks|
914 // now makes it possible to keep an idle worker.
915 if (desired_num_awake_workers == num_awake_workers) {
916 MaintainAtLeastOneIdleWorkerLockRequired(executor);
917 }
918
919 // This function is called every time a task source is (re-)enqueued,
920 // hence the minimum priority needs to be updated.
921 UpdateMinAllowedPriorityLockRequired();
922
923 // Ensure that the number of workers is periodically adjusted if needed.
924 MaybeScheduleAdjustMaxTasksLockRequired(executor);
925 }
926
IsOnIdleSetLockRequired(WorkerThread * worker) const927 bool ThreadGroupImpl::IsOnIdleSetLockRequired(WorkerThread* worker) const {
928 // To avoid searching through the idle set : use GetLastUsedTime() not being
929 // null (or being directly on top of the idle set) as a proxy for being on
930 // the idle set.
931 return idle_workers_set_.Peek() == worker ||
932 !worker->GetLastUsedTime().is_null();
933 }
934
ScheduleAdjustMaxTasks()935 void ThreadGroupImpl::ScheduleAdjustMaxTasks() {
936 // |adjust_max_tasks_posted_| can't change before the task posted below runs.
937 // Skip check on NaCl to avoid unsafe reference acquisition warning.
938 #if !BUILDFLAG(IS_NACL)
939 DCHECK(TS_UNCHECKED_READ(adjust_max_tasks_posted_));
940 #endif
941
942 after_start().service_thread_task_runner->PostDelayedTask(
943 FROM_HERE, BindOnce(&ThreadGroupImpl::AdjustMaxTasks, Unretained(this)),
944 after_start().blocked_workers_poll_period);
945 }
946
AdjustMaxTasks()947 void ThreadGroupImpl::AdjustMaxTasks() {
948 DCHECK(
949 after_start().service_thread_task_runner->RunsTasksInCurrentSequence());
950
951 ScopedCommandsExecutor executor(this);
952 CheckedAutoLock auto_lock(lock_);
953 DCHECK(adjust_max_tasks_posted_);
954 adjust_max_tasks_posted_ = false;
955
956 // Increment max tasks for each worker that has been within a MAY_BLOCK
957 // ScopedBlockingCall for more than may_block_threshold.
958 for (scoped_refptr<WorkerThread> worker : workers_) {
959 // The delegates of workers inside a ThreadGroupImpl should be
960 // `WorkerDelegate`s.
961 WorkerDelegate* delegate = static_cast<WorkerDelegate*>(worker->delegate());
962 AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
963 delegate->MaybeIncrementMaxTasksLockRequired();
964 }
965
966 // Wake up workers according to the updated |max_tasks_|. This will also
967 // reschedule AdjustMaxTasks() if necessary.
968 EnsureEnoughWorkersLockRequired(&executor);
969 }
970
971 } // namespace internal
972 } // namespace base
973