1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/worker_thread.h"
6
7 #include <stddef.h>
8
9 #include <algorithm>
10 #include <atomic>
11 #include <optional>
12 #include <utility>
13
14 #include "base/check_op.h"
15 #include "base/compiler_specific.h"
16 #include "base/debug/alias.h"
17 #include "base/feature_list.h"
18 #include "base/functional/callback_helpers.h"
19 #include "base/synchronization/waitable_event.h"
20 #include "base/task/task_features.h"
21 #include "base/task/thread_pool/environment_config.h"
22 #include "base/task/thread_pool/worker_thread_observer.h"
23 #include "base/threading/hang_watcher.h"
24 #include "base/time/time.h"
25 #include "base/time/time_override.h"
26 #include "base/trace_event/base_tracing.h"
27 #include "build/build_config.h"
28 #include "partition_alloc/buildflags.h"
29
30 #if PA_BUILDFLAG(USE_PARTITION_ALLOC)
31 #include "partition_alloc/partition_alloc_config.h" // nogncheck
32 #endif
33
34 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
35 #include "base/files/file_descriptor_watcher_posix.h"
36 #endif
37
38 #if BUILDFLAG(IS_APPLE)
39 #include "base/apple/scoped_nsautorelease_pool.h"
40 #endif
41
42 #if PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
43 PA_CONFIG(THREAD_CACHE_SUPPORTED)
44 #include "partition_alloc/thread_cache.h" // nogncheck
45 #endif
46
47 namespace base::internal {
48
49 constexpr TimeDelta WorkerThread::Delegate::kPurgeThreadCacheIdleDelay;
50
GetThreadLabel() const51 WorkerThread::ThreadLabel WorkerThread::Delegate::GetThreadLabel() const {
52 return WorkerThread::ThreadLabel::POOLED;
53 }
54
TimedWait(TimeDelta timeout)55 bool WorkerThread::Delegate::TimedWait(TimeDelta timeout) {
56 return wake_up_event_.TimedWait(timeout);
57 }
58
WaitForWork()59 void WorkerThread::Delegate::WaitForWork() {
60 const TimeDelta sleep_duration_before_worker_reclaim = GetSleepTimeout();
61
62 // When a thread goes to sleep, the memory retained by its thread cache is
63 // trapped there for as long as the thread sleeps. To prevent that, we can
64 // either purge the thread cache right before going to sleep, or after some
65 // delay.
66 //
67 // Purging the thread cache incurs a cost on the next task, since its thread
68 // cache will be empty and allocation performance initially lower. As a lot of
69 // sleeps are very short, do not purge all the time (this would also make
70 // sleep / wakeups cycles more costly).
71 //
72 // Instead, sleep for min(timeout, 1s). If the wait times out then purge at
73 // that point, and go to sleep for the remaining of the time. This ensures
74 // that we do no work for short sleeps, and that threads do not get awaken
75 // many times.
76 #if PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
77 PA_CONFIG(THREAD_CACHE_SUPPORTED)
78 const TimeDelta sleep_duration_before_purge =
79 GetSleepDurationBeforePurge(base::TimeTicks::Now());
80
81 const bool was_signaled = TimedWait(std::min(
82 sleep_duration_before_purge, sleep_duration_before_worker_reclaim));
83 // Timed out.
84 if (!was_signaled) {
85 partition_alloc::ThreadCache::PurgeCurrentThread();
86
87 // The thread woke up to purge before its standard reclaim time. Sleep for
88 // what's remaining until then.
89 if (sleep_duration_before_worker_reclaim > sleep_duration_before_purge) {
90 TimedWait(sleep_duration_before_worker_reclaim.is_max()
91 ? TimeDelta::Max()
92 : sleep_duration_before_worker_reclaim -
93 sleep_duration_before_purge);
94 }
95 }
96 #else
97 TimedWait(sleep_duration_before_worker_reclaim);
98 #endif // PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
99 // PA_CONFIG(THREAD_CACHE_SUPPORTED)
100 }
101
IsDelayFirstWorkerSleepEnabled()102 bool WorkerThread::Delegate::IsDelayFirstWorkerSleepEnabled() {
103 static bool state = FeatureList::IsEnabled(kDelayFirstWorkerWake);
104 return state;
105 }
106
107 #if PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
108 PA_CONFIG(THREAD_CACHE_SUPPORTED)
GetSleepDurationBeforePurge(TimeTicks now)109 TimeDelta WorkerThread::Delegate::GetSleepDurationBeforePurge(TimeTicks now) {
110 base::TimeDelta sleep_duration_before_purge = kPurgeThreadCacheIdleDelay;
111
112 if (!IsDelayFirstWorkerSleepEnabled()) {
113 return sleep_duration_before_purge;
114 }
115
116 // Use the first time a worker goes to sleep in this process as an
117 // approximation of the process creation time.
118 static const TimeTicks first_sleep_time = now;
119 const TimeTicks first_sleep_time_to_use =
120 !first_sleep_time_for_testing_.is_null() ? first_sleep_time_for_testing_
121 : first_sleep_time;
122 const base::TimeTicks first_wake_time =
123 first_sleep_time_to_use + kFirstSleepDurationBeforePurge;
124
125 // A sleep that occurs within `kFirstSleepDurationBeforePurge` of the
126 // first sleep lasts at least `kFirstSleepDurationBeforePurge`.
127 if (now <= first_wake_time) {
128 // Avoid sleeping for less than `sleep_duration_before_purge` since that is
129 // the shortest expected duration to wait for a purge.
130 sleep_duration_before_purge =
131 std::max(kFirstSleepDurationBeforePurge, sleep_duration_before_purge);
132 }
133
134 // Align wakeups for purges to reduce the chances of taking the CPU out of
135 // sleep multiple times for these operations. This can happen if many workers
136 // in the same process scheduled wakeups. This can create a situation where
137 // any one worker wakes every `kPurgeThreadCacheIdleDelay` / N where N is the
138 // number of workers.
139 const TimeTicks snapped_purge_time =
140 (now + sleep_duration_before_purge)
141 .SnappedToNextTick(TimeTicks(), kPurgeThreadCacheIdleDelay);
142
143 return snapped_purge_time - now;
144 }
145
146 #endif // PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
147 // PA_CONFIG(THREAD_CACHE_SUPPORTED)
148
WorkerThread(ThreadType thread_type_hint,std::unique_ptr<Delegate> delegate,TrackedRef<TaskTracker> task_tracker,size_t sequence_num,const CheckedLock * predecessor_lock,void * flow_terminator)149 WorkerThread::WorkerThread(ThreadType thread_type_hint,
150 std::unique_ptr<Delegate> delegate,
151 TrackedRef<TaskTracker> task_tracker,
152 size_t sequence_num,
153 const CheckedLock* predecessor_lock,
154 void* flow_terminator)
155 : thread_lock_(predecessor_lock),
156 task_tracker_(std::move(task_tracker)),
157 thread_type_hint_(thread_type_hint),
158 current_thread_type_(GetDesiredThreadType()),
159 sequence_num_(sequence_num),
160 flow_terminator_(flow_terminator == nullptr
161 ? reinterpret_cast<intptr_t>(this)
162 : reinterpret_cast<intptr_t>(flow_terminator)),
163 delegate_(std::move(delegate)) {
164 DCHECK(task_tracker_);
165 DCHECK(CanUseBackgroundThreadTypeForWorkerThread() ||
166 thread_type_hint_ != ThreadType::kBackground);
167 DCHECK(CanUseUtilityThreadTypeForWorkerThread() ||
168 thread_type_hint != ThreadType::kUtility);
169 DCHECK(delegate_);
170 delegate_->wake_up_event_.declare_only_used_while_idle();
171 }
172
Start(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,WorkerThreadObserver * worker_thread_observer)173 bool WorkerThread::Start(
174 scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
175 WorkerThreadObserver* worker_thread_observer) {
176 CheckedLock::AssertNoLockHeldOnCurrentThread();
177
178 // Prime kDelayFirstWorkerWake's feature state right away on thread creation
179 // instead of looking it up for the first time later on thread as this avoids
180 // a data race in tests that may ~FeatureList while the first worker thread
181 // is still initializing (the first WorkerThread will be started on the main
182 // thread as part of ThreadPoolImpl::Start() so doing it then avoids this
183 // race), crbug.com/1344573.
184 // Note 1: the feature state is always available at this point as
185 // ThreadPoolInstance::Start() contractually happens-after FeatureList
186 // initialization.
187 // Note 2: This is done on Start instead of in the constructor as construction
188 // happens under a ThreadGroup lock which precludes calling into
189 // FeatureList (as that can also use a lock).
190 delegate()->IsDelayFirstWorkerSleepEnabled();
191
192 CheckedAutoLock auto_lock(thread_lock_);
193 DCHECK(thread_handle_.is_null());
194
195 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
196 DCHECK(io_thread_task_runner);
197 io_thread_task_runner_ = std::move(io_thread_task_runner);
198 #endif
199
200 if (should_exit_.IsSet() || join_called_for_testing_.IsSet()) {
201 return true;
202 }
203
204 DCHECK(!worker_thread_observer_);
205 worker_thread_observer_ = worker_thread_observer;
206
207 self_ = this;
208
209 constexpr size_t kDefaultStackSize = 0;
210 PlatformThread::CreateWithType(kDefaultStackSize, this, &thread_handle_,
211 current_thread_type_);
212
213 if (thread_handle_.is_null()) {
214 self_ = nullptr;
215 return false;
216 }
217
218 return true;
219 }
220
Destroy()221 void WorkerThread::Destroy() {
222 CheckedAutoLock auto_lock(thread_lock_);
223
224 // If |thread_handle_| wasn't joined, detach it.
225 if (!thread_handle_.is_null()) {
226 PlatformThread::Detach(thread_handle_);
227 }
228 }
229
ThreadAliveForTesting() const230 bool WorkerThread::ThreadAliveForTesting() const {
231 CheckedAutoLock auto_lock(thread_lock_);
232 return !thread_handle_.is_null();
233 }
234
JoinForTesting()235 void WorkerThread::JoinForTesting() {
236 DCHECK(!join_called_for_testing_.IsSet());
237 join_called_for_testing_.Set();
238 delegate_->wake_up_event_.Signal();
239
240 PlatformThreadHandle thread_handle;
241
242 {
243 CheckedAutoLock auto_lock(thread_lock_);
244
245 if (thread_handle_.is_null()) {
246 return;
247 }
248
249 thread_handle = thread_handle_;
250 // Reset |thread_handle_| so it isn't joined by the destructor.
251 thread_handle_ = PlatformThreadHandle();
252 }
253
254 PlatformThread::Join(thread_handle);
255 }
256
Cleanup()257 void WorkerThread::Cleanup() {
258 DCHECK(!should_exit_.IsSet());
259 should_exit_.Set();
260 delegate_->wake_up_event_.Signal();
261 }
262
WakeUp()263 void WorkerThread::WakeUp() {
264 // Signalling an event can deschedule the current thread. Since being
265 // descheduled while holding a lock is undesirable (https://crbug.com/890978),
266 // assert that no lock is held by the current thread.
267 CheckedLock::AssertNoLockHeldOnCurrentThread();
268 // Calling WakeUp() after Cleanup() or Join() is wrong because the
269 // WorkerThread cannot run more tasks.
270 DCHECK(!join_called_for_testing_.IsSet());
271 DCHECK(!should_exit_.IsSet());
272 TRACE_EVENT_INSTANT("wakeup.flow", "WorkerThread::WakeUp",
273 perfetto::Flow::FromPointer(this));
274
275 delegate_->wake_up_event_.Signal();
276 }
277
delegate()278 WorkerThread::Delegate* WorkerThread::delegate() {
279 return delegate_.get();
280 }
281
~WorkerThread()282 WorkerThread::~WorkerThread() {
283 Destroy();
284 }
285
MaybeUpdateThreadType()286 void WorkerThread::MaybeUpdateThreadType() {
287 UpdateThreadType(GetDesiredThreadType());
288 }
289
BeginUnusedPeriod()290 void WorkerThread::BeginUnusedPeriod() {
291 CheckedAutoLock auto_lock(thread_lock_);
292 DCHECK(last_used_time_.is_null());
293 last_used_time_ = subtle::TimeTicksNowIgnoringOverride();
294 }
295
EndUnusedPeriod()296 void WorkerThread::EndUnusedPeriod() {
297 CheckedAutoLock auto_lock(thread_lock_);
298 DCHECK(!last_used_time_.is_null());
299 last_used_time_ = TimeTicks();
300 }
301
GetLastUsedTime() const302 TimeTicks WorkerThread::GetLastUsedTime() const {
303 CheckedAutoLock auto_lock(thread_lock_);
304 return last_used_time_;
305 }
306
ShouldExit() const307 bool WorkerThread::ShouldExit() const {
308 // The ordering of the checks is important below. This WorkerThread may be
309 // released and outlive |task_tracker_| in unit tests. However, when the
310 // WorkerThread is released, |should_exit_| will be set, so check that
311 // first.
312 return should_exit_.IsSet() || join_called_for_testing_.IsSet() ||
313 task_tracker_->IsShutdownComplete();
314 }
315
GetDesiredThreadType() const316 ThreadType WorkerThread::GetDesiredThreadType() const {
317 // To avoid shutdown hangs, disallow a type below kNormal during shutdown
318 if (task_tracker_->HasShutdownStarted())
319 return ThreadType::kDefault;
320
321 return thread_type_hint_;
322 }
323
UpdateThreadType(ThreadType desired_thread_type)324 void WorkerThread::UpdateThreadType(ThreadType desired_thread_type) {
325 if (desired_thread_type == current_thread_type_)
326 return;
327
328 PlatformThread::SetCurrentThreadType(desired_thread_type);
329 current_thread_type_ = desired_thread_type;
330 }
331
ThreadMain()332 void WorkerThread::ThreadMain() {
333 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
334 DCHECK(io_thread_task_runner_);
335 FileDescriptorWatcher file_descriptor_watcher(io_thread_task_runner_);
336 #endif
337
338 if (thread_type_hint_ == ThreadType::kBackground) {
339 switch (delegate()->GetThreadLabel()) {
340 case ThreadLabel::POOLED:
341 RunBackgroundPooledWorker();
342 return;
343 case ThreadLabel::SHARED:
344 RunBackgroundSharedWorker();
345 return;
346 case ThreadLabel::DEDICATED:
347 RunBackgroundDedicatedWorker();
348 return;
349 #if BUILDFLAG(IS_WIN)
350 case ThreadLabel::SHARED_COM:
351 RunBackgroundSharedCOMWorker();
352 return;
353 case ThreadLabel::DEDICATED_COM:
354 RunBackgroundDedicatedCOMWorker();
355 return;
356 #endif // BUILDFLAG(IS_WIN)
357 }
358 }
359
360 switch (delegate()->GetThreadLabel()) {
361 case ThreadLabel::POOLED:
362 RunPooledWorker();
363 return;
364 case ThreadLabel::SHARED:
365 RunSharedWorker();
366 return;
367 case ThreadLabel::DEDICATED:
368 RunDedicatedWorker();
369 return;
370 #if BUILDFLAG(IS_WIN)
371 case ThreadLabel::SHARED_COM:
372 RunSharedCOMWorker();
373 return;
374 case ThreadLabel::DEDICATED_COM:
375 RunDedicatedCOMWorker();
376 return;
377 #endif // BUILDFLAG(IS_WIN)
378 }
379 }
380
RunPooledWorker()381 NOINLINE void WorkerThread::RunPooledWorker() {
382 RunWorker();
383 NO_CODE_FOLDING();
384 }
385
RunBackgroundPooledWorker()386 NOINLINE void WorkerThread::RunBackgroundPooledWorker() {
387 RunWorker();
388 NO_CODE_FOLDING();
389 }
390
RunSharedWorker()391 NOINLINE void WorkerThread::RunSharedWorker() {
392 RunWorker();
393 NO_CODE_FOLDING();
394 }
395
RunBackgroundSharedWorker()396 NOINLINE void WorkerThread::RunBackgroundSharedWorker() {
397 RunWorker();
398 NO_CODE_FOLDING();
399 }
400
RunDedicatedWorker()401 NOINLINE void WorkerThread::RunDedicatedWorker() {
402 RunWorker();
403 NO_CODE_FOLDING();
404 }
405
RunBackgroundDedicatedWorker()406 NOINLINE void WorkerThread::RunBackgroundDedicatedWorker() {
407 RunWorker();
408 NO_CODE_FOLDING();
409 }
410
411 #if BUILDFLAG(IS_WIN)
RunSharedCOMWorker()412 NOINLINE void WorkerThread::RunSharedCOMWorker() {
413 RunWorker();
414 NO_CODE_FOLDING();
415 }
416
RunBackgroundSharedCOMWorker()417 NOINLINE void WorkerThread::RunBackgroundSharedCOMWorker() {
418 RunWorker();
419 NO_CODE_FOLDING();
420 }
421
RunDedicatedCOMWorker()422 NOINLINE void WorkerThread::RunDedicatedCOMWorker() {
423 RunWorker();
424 NO_CODE_FOLDING();
425 }
426
RunBackgroundDedicatedCOMWorker()427 NOINLINE void WorkerThread::RunBackgroundDedicatedCOMWorker() {
428 RunWorker();
429 NO_CODE_FOLDING();
430 }
431 #endif // BUILDFLAG(IS_WIN)
432
RunWorker()433 void WorkerThread::RunWorker() {
434 DCHECK_EQ(self_, this);
435 TRACE_EVENT_INSTANT0("base", "WorkerThread born", TRACE_EVENT_SCOPE_THREAD);
436 TRACE_EVENT_BEGIN0("base", "WorkerThread active");
437
438 if (worker_thread_observer_) {
439 worker_thread_observer_->OnWorkerThreadMainEntry();
440 }
441
442 delegate()->OnMainEntry(this);
443
444 // Background threads can take an arbitrary amount of time to complete, do not
445 // watch them for hangs. Ignore priority boosting for now.
446 const bool watch_for_hangs =
447 base::HangWatcher::IsThreadPoolHangWatchingEnabled() &&
448 GetDesiredThreadType() != ThreadType::kBackground;
449
450 // If this process has a HangWatcher register this thread for watching.
451 base::ScopedClosureRunner unregister_for_hang_watching;
452 if (watch_for_hangs) {
453 unregister_for_hang_watching = base::HangWatcher::RegisterThread(
454 base::HangWatcher::ThreadType::kThreadPoolThread);
455 }
456
457 while (!ShouldExit()) {
458 #if BUILDFLAG(IS_APPLE)
459 apple::ScopedNSAutoreleasePool autorelease_pool;
460 #endif
461 std::optional<WatchHangsInScope> hang_watch_scope;
462
463 TRACE_EVENT_END0("base", "WorkerThread active");
464 hang_watch_scope.reset();
465 delegate()->WaitForWork();
466 TRACE_EVENT_BEGIN("base", "WorkerThread active",
467 perfetto::TerminatingFlow::FromPointer(
468 reinterpret_cast<void*>(flow_terminator_)));
469
470 // Don't GetWork() in the case where we woke up for Cleanup().
471 if (ShouldExit()) {
472 break;
473 }
474
475 if (watch_for_hangs) {
476 hang_watch_scope.emplace();
477 }
478
479 // Thread type needs to be updated before GetWork.
480 UpdateThreadType(GetDesiredThreadType());
481
482 // Get the task source containing the first task to execute.
483 RegisteredTaskSource task_source = delegate()->GetWork(this);
484
485 // If acquiring work failed and the worker's still alive,
486 // record that this is an unnecessary wakeup.
487 if (!task_source && !ShouldExit()) {
488 delegate()->RecordUnnecessaryWakeup();
489 }
490
491 while (task_source) {
492 // Alias pointer for investigation of memory corruption. crbug.com/1218384
493 TaskSource* task_source_before_run = task_source.get();
494 base::debug::Alias(&task_source_before_run);
495
496 task_source = task_tracker_->RunAndPopNextTask(std::move(task_source));
497 // Alias pointer for investigation of memory corruption. crbug.com/1218384
498 TaskSource* task_source_before_move = task_source.get();
499 base::debug::Alias(&task_source_before_move);
500
501 // We emplace the hang_watch_scope here so that each hang watch scope
502 // covers one GetWork (or SwapProcessedTask) as well as one
503 // RunAndPopNextTask.
504 if (watch_for_hangs) {
505 hang_watch_scope.emplace();
506 }
507
508 RegisteredTaskSource new_task_source =
509 delegate()->SwapProcessedTask(std::move(task_source), this);
510
511 UpdateThreadType(GetDesiredThreadType());
512
513 // Check that task_source is always cleared, to help investigation of
514 // memory corruption where task_source is non-null after being moved.
515 // crbug.com/1218384
516 CHECK(!task_source);
517 task_source = std::move(new_task_source);
518 }
519 }
520
521 // Important: It is unsafe to access unowned state (e.g. |task_tracker_|)
522 // after invoking OnMainExit().
523
524 delegate()->OnMainExit(this);
525
526 if (worker_thread_observer_) {
527 worker_thread_observer_->OnWorkerThreadMainExit();
528 }
529
530 // Release the self-reference to |this|. This can result in deleting |this|
531 // and as such no more member accesses should be made after this point.
532 self_ = nullptr;
533
534 TRACE_EVENT_END0("base", "WorkerThread active");
535 TRACE_EVENT_INSTANT0("base", "WorkerThread dead", TRACE_EVENT_SCOPE_THREAD);
536 }
537
538 } // namespace base::internal
539