1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/worker_thread.h"
6
7 #include <stddef.h>
8
9 #include <algorithm>
10 #include <atomic>
11 #include <utility>
12
13 #include "base/allocator/partition_allocator/partition_alloc_buildflags.h"
14 #include "base/allocator/partition_allocator/partition_alloc_config.h"
15 #include "base/check_op.h"
16 #include "base/compiler_specific.h"
17 #include "base/debug/alias.h"
18 #include "base/feature_list.h"
19 #include "base/functional/callback_helpers.h"
20 #include "base/synchronization/waitable_event.h"
21 #include "base/task/task_features.h"
22 #include "base/task/thread_pool/environment_config.h"
23 #include "base/task/thread_pool/task_tracker.h"
24 #include "base/task/thread_pool/worker_thread_observer.h"
25 #include "base/threading/hang_watcher.h"
26 #include "base/time/time.h"
27 #include "base/time/time_override.h"
28 #include "base/trace_event/base_tracing.h"
29 #include "build/build_config.h"
30 #include "third_party/abseil-cpp/absl/types/optional.h"
31
32 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
33 #include "base/files/file_descriptor_watcher_posix.h"
34 #endif
35
36 #if BUILDFLAG(IS_APPLE)
37 #include "base/mac/scoped_nsautorelease_pool.h"
38 #endif
39
40 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
41 PA_CONFIG(THREAD_CACHE_SUPPORTED)
42 #include "base/allocator/partition_allocator/thread_cache.h"
43 #endif
44
45 namespace base::internal {
46
47 namespace {
48
49 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
50 PA_CONFIG(THREAD_CACHE_SUPPORTED)
51 // Returns the desired sleep time before the worker has to wake up to purge
52 // the cache thread or reclaim itself. |min_sleep_time| contains the minimal
53 // acceptable amount of time to sleep.
GetSleepTimeBeforePurge(TimeDelta min_sleep_time)54 TimeDelta GetSleepTimeBeforePurge(TimeDelta min_sleep_time) {
55 const TimeTicks now = TimeTicks::Now();
56
57 // Do not wake up to purge within the first minute of process lifetime. In
58 // short lived processes this will avoid waking up to try and save memory
59 // for a heap that will be going away soon. For longer lived processes this
60 // should allow for better performance at process startup since even if a
61 // worker goes to sleep for kPurgeThreadCacheIdleDelay it's very likely it
62 // will be needed soon after because of heavy startup workloads.
63 constexpr TimeDelta kFirstSleepLength = Minutes(1);
64
65 // Use the first time a worker goes to sleep in this process as an
66 // approximation of the process creation time.
67 static const TimeTicks first_scheduled_wake = now + kFirstSleepLength;
68
69 // Align wakeups for purges to reduce the chances of taking the CPU out of
70 // sleep multiple times for these operations.
71 constexpr TimeDelta kPurgeThreadCacheIdleDelay = Seconds(1);
72 const TimeTicks snapped_wake =
73 (now + min_sleep_time)
74 .SnappedToNextTick(TimeTicks(), kPurgeThreadCacheIdleDelay);
75
76 // Avoid scheduling at |first_scheduled_wake| if it would result in a sleep
77 // that's too short.
78 return std::max(snapped_wake - now, first_scheduled_wake - now);
79 }
80 #endif // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
81 // PA_CONFIG(THREAD_CACHE_SUPPORTED)
82
IsDelayFirstWorkerSleepEnabled()83 bool IsDelayFirstWorkerSleepEnabled() {
84 static bool state = FeatureList::IsEnabled(kDelayFirstWorkerWake);
85 return state;
86 }
87
88 } // namespace
89
90 constexpr TimeDelta WorkerThread::Delegate::kPurgeThreadCacheIdleDelay;
91
WaitForWork(WaitableEvent * wake_up_event)92 void WorkerThread::Delegate::WaitForWork(WaitableEvent* wake_up_event) {
93 DCHECK(wake_up_event);
94 const TimeDelta sleep_time = GetSleepTimeout();
95
96 // When a thread goes to sleep, the memory retained by its thread cache is
97 // trapped there for as long as the thread sleeps. To prevent that, we can
98 // either purge the thread cache right before going to sleep, or after some
99 // delay.
100 //
101 // Purging the thread cache incurs a cost on the next task, since its thread
102 // cache will be empty and allocation performance initially lower. As a lot of
103 // sleeps are very short, do not purge all the time (this would also make
104 // sleep / wakeups cycles more costly).
105 //
106 // Instead, sleep for min(timeout, 1s). If the wait times out then purge at
107 // that point, and go to sleep for the remaining of the time. This ensures
108 // that we do no work for short sleeps, and that threads do not get awaken
109 // many times.
110 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
111 PA_CONFIG(THREAD_CACHE_SUPPORTED)
112 TimeDelta min_sleep_time = std::min(sleep_time, kPurgeThreadCacheIdleDelay);
113
114 if (IsDelayFirstWorkerSleepEnabled())
115 min_sleep_time = GetSleepTimeBeforePurge(min_sleep_time);
116
117 const bool was_signaled = wake_up_event->TimedWait(min_sleep_time);
118
119 // Timed out.
120 if (!was_signaled) {
121 partition_alloc::ThreadCache::PurgeCurrentThread();
122
123 // The thread woke up to purge before its standard reclaim time. Sleep for
124 // what's remaining until then.
125 if (sleep_time > min_sleep_time) {
126 wake_up_event->TimedWait(sleep_time.is_max()
127 ? base::TimeDelta::Max()
128 : sleep_time - min_sleep_time);
129 }
130 }
131 #else
132 wake_up_event->TimedWait(sleep_time);
133 #endif // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
134 // PA_CONFIG(THREAD_CACHE_SUPPORTED)
135 }
136
WorkerThread(ThreadType thread_type_hint,std::unique_ptr<Delegate> delegate,TrackedRef<TaskTracker> task_tracker,size_t sequence_num,const CheckedLock * predecessor_lock)137 WorkerThread::WorkerThread(ThreadType thread_type_hint,
138 std::unique_ptr<Delegate> delegate,
139 TrackedRef<TaskTracker> task_tracker,
140 size_t sequence_num,
141 const CheckedLock* predecessor_lock)
142 : thread_lock_(predecessor_lock),
143 delegate_(std::move(delegate)),
144 task_tracker_(std::move(task_tracker)),
145 thread_type_hint_(thread_type_hint),
146 current_thread_type_(GetDesiredThreadType()),
147 sequence_num_(sequence_num) {
148 DCHECK(delegate_);
149 DCHECK(task_tracker_);
150 DCHECK(CanUseBackgroundThreadTypeForWorkerThread() ||
151 thread_type_hint_ != ThreadType::kBackground);
152 DCHECK(CanUseUtilityThreadTypeForWorkerThread() ||
153 thread_type_hint != ThreadType::kUtility);
154 wake_up_event_.declare_only_used_while_idle();
155 }
156
Start(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,WorkerThreadObserver * worker_thread_observer)157 bool WorkerThread::Start(
158 scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
159 WorkerThreadObserver* worker_thread_observer) {
160 CheckedLock::AssertNoLockHeldOnCurrentThread();
161
162 // Prime kDelayFirstWorkerWake's feature state right away on thread creation
163 // instead of looking it up for the first time later on thread as this avoids
164 // a data race in tests that may ~FeatureList while the first worker thread
165 // is still initializing (the first WorkerThread will be started on the main
166 // thread as part of ThreadPoolImpl::Start() so doing it then avoids this
167 // race), crbug.com/1344573.
168 // Note 1: the feature state is always available at this point as
169 // ThreadPoolInstance::Start() contractually happens-after FeatureList
170 // initialization.
171 // Note 2: This is done on Start instead of in the constructor as construction
172 // happens under a ThreadGroupImpl lock which precludes calling into
173 // FeatureList (as that can also use a lock).
174 IsDelayFirstWorkerSleepEnabled();
175
176 CheckedAutoLock auto_lock(thread_lock_);
177 DCHECK(thread_handle_.is_null());
178
179 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
180 DCHECK(io_thread_task_runner);
181 io_thread_task_runner_ = std::move(io_thread_task_runner);
182 #endif
183
184 if (should_exit_.IsSet() || join_called_for_testing_.IsSet())
185 return true;
186
187 DCHECK(!worker_thread_observer_);
188 worker_thread_observer_ = worker_thread_observer;
189
190 self_ = this;
191
192 constexpr size_t kDefaultStackSize = 0;
193 PlatformThread::CreateWithType(kDefaultStackSize, this, &thread_handle_,
194 current_thread_type_);
195
196 if (thread_handle_.is_null()) {
197 self_ = nullptr;
198 return false;
199 }
200
201 return true;
202 }
203
WakeUp()204 void WorkerThread::WakeUp() {
205 // Signalling an event can deschedule the current thread. Since being
206 // descheduled while holding a lock is undesirable (https://crbug.com/890978),
207 // assert that no lock is held by the current thread.
208 CheckedLock::AssertNoLockHeldOnCurrentThread();
209 // Calling WakeUp() after Cleanup() or Join() is wrong because the
210 // WorkerThread cannot run more tasks.
211 DCHECK(!join_called_for_testing_.IsSet());
212 DCHECK(!should_exit_.IsSet());
213 TRACE_EVENT_INSTANT("wakeup.flow", "WorkerThread::WakeUp",
214 perfetto::Flow::FromPointer(this));
215 wake_up_event_.Signal();
216 }
217
JoinForTesting()218 void WorkerThread::JoinForTesting() {
219 DCHECK(!join_called_for_testing_.IsSet());
220 join_called_for_testing_.Set();
221 wake_up_event_.Signal();
222
223 PlatformThreadHandle thread_handle;
224
225 {
226 CheckedAutoLock auto_lock(thread_lock_);
227
228 if (thread_handle_.is_null())
229 return;
230
231 thread_handle = thread_handle_;
232 // Reset |thread_handle_| so it isn't joined by the destructor.
233 thread_handle_ = PlatformThreadHandle();
234 }
235
236 PlatformThread::Join(thread_handle);
237 }
238
ThreadAliveForTesting() const239 bool WorkerThread::ThreadAliveForTesting() const {
240 CheckedAutoLock auto_lock(thread_lock_);
241 return !thread_handle_.is_null();
242 }
243
~WorkerThread()244 WorkerThread::~WorkerThread() {
245 CheckedAutoLock auto_lock(thread_lock_);
246
247 // If |thread_handle_| wasn't joined, detach it.
248 if (!thread_handle_.is_null()) {
249 DCHECK(!join_called_for_testing_.IsSet());
250 PlatformThread::Detach(thread_handle_);
251 }
252 }
253
Cleanup()254 void WorkerThread::Cleanup() {
255 DCHECK(!should_exit_.IsSet());
256 should_exit_.Set();
257 wake_up_event_.Signal();
258 }
259
MaybeUpdateThreadType()260 void WorkerThread::MaybeUpdateThreadType() {
261 UpdateThreadType(GetDesiredThreadType());
262 }
263
BeginUnusedPeriod()264 void WorkerThread::BeginUnusedPeriod() {
265 CheckedAutoLock auto_lock(thread_lock_);
266 DCHECK(last_used_time_.is_null());
267 last_used_time_ = subtle::TimeTicksNowIgnoringOverride();
268 }
269
EndUnusedPeriod()270 void WorkerThread::EndUnusedPeriod() {
271 CheckedAutoLock auto_lock(thread_lock_);
272 DCHECK(!last_used_time_.is_null());
273 last_used_time_ = TimeTicks();
274 }
275
GetLastUsedTime() const276 TimeTicks WorkerThread::GetLastUsedTime() const {
277 CheckedAutoLock auto_lock(thread_lock_);
278 return last_used_time_;
279 }
280
ShouldExit() const281 bool WorkerThread::ShouldExit() const {
282 // The ordering of the checks is important below. This WorkerThread may be
283 // released and outlive |task_tracker_| in unit tests. However, when the
284 // WorkerThread is released, |should_exit_| will be set, so check that
285 // first.
286 return should_exit_.IsSet() || join_called_for_testing_.IsSet() ||
287 task_tracker_->IsShutdownComplete();
288 }
289
GetDesiredThreadType() const290 ThreadType WorkerThread::GetDesiredThreadType() const {
291 // To avoid shutdown hangs, disallow a type below kNormal during shutdown
292 if (task_tracker_->HasShutdownStarted())
293 return ThreadType::kDefault;
294
295 return thread_type_hint_;
296 }
297
UpdateThreadType(ThreadType desired_thread_type)298 void WorkerThread::UpdateThreadType(ThreadType desired_thread_type) {
299 if (desired_thread_type == current_thread_type_)
300 return;
301
302 PlatformThread::SetCurrentThreadType(desired_thread_type);
303 current_thread_type_ = desired_thread_type;
304 }
305
ThreadMain()306 void WorkerThread::ThreadMain() {
307 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
308 DCHECK(io_thread_task_runner_);
309 FileDescriptorWatcher file_descriptor_watcher(io_thread_task_runner_);
310 #endif
311
312 if (thread_type_hint_ == ThreadType::kBackground) {
313 switch (delegate_->GetThreadLabel()) {
314 case ThreadLabel::POOLED:
315 RunBackgroundPooledWorker();
316 return;
317 case ThreadLabel::SHARED:
318 RunBackgroundSharedWorker();
319 return;
320 case ThreadLabel::DEDICATED:
321 RunBackgroundDedicatedWorker();
322 return;
323 #if BUILDFLAG(IS_WIN)
324 case ThreadLabel::SHARED_COM:
325 RunBackgroundSharedCOMWorker();
326 return;
327 case ThreadLabel::DEDICATED_COM:
328 RunBackgroundDedicatedCOMWorker();
329 return;
330 #endif // BUILDFLAG(IS_WIN)
331 }
332 }
333
334 switch (delegate_->GetThreadLabel()) {
335 case ThreadLabel::POOLED:
336 RunPooledWorker();
337 return;
338 case ThreadLabel::SHARED:
339 RunSharedWorker();
340 return;
341 case ThreadLabel::DEDICATED:
342 RunDedicatedWorker();
343 return;
344 #if BUILDFLAG(IS_WIN)
345 case ThreadLabel::SHARED_COM:
346 RunSharedCOMWorker();
347 return;
348 case ThreadLabel::DEDICATED_COM:
349 RunDedicatedCOMWorker();
350 return;
351 #endif // BUILDFLAG(IS_WIN)
352 }
353 }
354
RunPooledWorker()355 NOINLINE void WorkerThread::RunPooledWorker() {
356 RunWorker();
357 NO_CODE_FOLDING();
358 }
359
RunBackgroundPooledWorker()360 NOINLINE void WorkerThread::RunBackgroundPooledWorker() {
361 RunWorker();
362 NO_CODE_FOLDING();
363 }
364
RunSharedWorker()365 NOINLINE void WorkerThread::RunSharedWorker() {
366 RunWorker();
367 NO_CODE_FOLDING();
368 }
369
RunBackgroundSharedWorker()370 NOINLINE void WorkerThread::RunBackgroundSharedWorker() {
371 RunWorker();
372 NO_CODE_FOLDING();
373 }
374
RunDedicatedWorker()375 NOINLINE void WorkerThread::RunDedicatedWorker() {
376 RunWorker();
377 NO_CODE_FOLDING();
378 }
379
RunBackgroundDedicatedWorker()380 NOINLINE void WorkerThread::RunBackgroundDedicatedWorker() {
381 RunWorker();
382 NO_CODE_FOLDING();
383 }
384
385 #if BUILDFLAG(IS_WIN)
RunSharedCOMWorker()386 NOINLINE void WorkerThread::RunSharedCOMWorker() {
387 RunWorker();
388 NO_CODE_FOLDING();
389 }
390
RunBackgroundSharedCOMWorker()391 NOINLINE void WorkerThread::RunBackgroundSharedCOMWorker() {
392 RunWorker();
393 NO_CODE_FOLDING();
394 }
395
RunDedicatedCOMWorker()396 NOINLINE void WorkerThread::RunDedicatedCOMWorker() {
397 RunWorker();
398 NO_CODE_FOLDING();
399 }
400
RunBackgroundDedicatedCOMWorker()401 NOINLINE void WorkerThread::RunBackgroundDedicatedCOMWorker() {
402 RunWorker();
403 NO_CODE_FOLDING();
404 }
405 #endif // BUILDFLAG(IS_WIN)
406
RunWorker()407 void WorkerThread::RunWorker() {
408 DCHECK_EQ(self_, this);
409 TRACE_EVENT_INSTANT0("base", "WorkerThread born", TRACE_EVENT_SCOPE_THREAD);
410 TRACE_EVENT_BEGIN0("base", "WorkerThread active");
411
412 if (worker_thread_observer_)
413 worker_thread_observer_->OnWorkerThreadMainEntry();
414
415 delegate_->OnMainEntry(this);
416
417 // Background threads can take an arbitrary amount of time to complete, do not
418 // watch them for hangs. Ignore priority boosting for now.
419 const bool watch_for_hangs =
420 base::HangWatcher::IsThreadPoolHangWatchingEnabled() &&
421 GetDesiredThreadType() != ThreadType::kBackground;
422
423 // If this process has a HangWatcher register this thread for watching.
424 base::ScopedClosureRunner unregister_for_hang_watching;
425 if (watch_for_hangs) {
426 unregister_for_hang_watching = base::HangWatcher::RegisterThread(
427 base::HangWatcher::ThreadType::kThreadPoolThread);
428 }
429
430 // A WorkerThread starts out waiting for work.
431 {
432 TRACE_EVENT_END0("base", "WorkerThread active");
433 // TODO(crbug.com/1021571): Remove this once fixed.
434 PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
435 delegate_->WaitForWork(&wake_up_event_);
436 TRACE_EVENT_BEGIN("base", "WorkerThread active",
437 perfetto::TerminatingFlow::FromPointer(this));
438 }
439 bool got_work_this_wakeup = false;
440 while (!ShouldExit()) {
441 #if BUILDFLAG(IS_APPLE)
442 mac::ScopedNSAutoreleasePool autorelease_pool;
443 #endif
444 absl::optional<WatchHangsInScope> hang_watch_scope;
445 if (watch_for_hangs)
446 hang_watch_scope.emplace();
447
448 UpdateThreadType(GetDesiredThreadType());
449
450 // Get the task source containing the next task to execute.
451 RegisteredTaskSource task_source = delegate_->GetWork(this);
452 if (!task_source) {
453 // Exit immediately if GetWork() resulted in detaching this worker.
454 if (ShouldExit())
455 break;
456
457 // If this is the first time we called GetWork and the worker's still
458 // alive, record that this is an unnecessary wakeup.
459 if (!got_work_this_wakeup)
460 delegate_->RecordUnnecessaryWakeup();
461
462 TRACE_EVENT_END0("base", "WorkerThread active");
463 // TODO(crbug.com/1021571): Remove this once fixed.
464 PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
465 hang_watch_scope.reset();
466 delegate_->WaitForWork(&wake_up_event_);
467 got_work_this_wakeup = false;
468
469 TRACE_EVENT_BEGIN("base", "WorkerThread active",
470 perfetto::TerminatingFlow::FromPointer(this));
471 continue;
472 }
473
474 got_work_this_wakeup = true;
475
476 // Alias pointer for investigation of memory corruption. crbug.com/1218384
477 TaskSource* task_source_before_run = task_source.get();
478 base::debug::Alias(&task_source_before_run);
479
480 task_source = task_tracker_->RunAndPopNextTask(std::move(task_source));
481
482 // Alias pointer for investigation of memory corruption. crbug.com/1218384
483 TaskSource* task_source_before_move = task_source.get();
484 base::debug::Alias(&task_source_before_move);
485
486 delegate_->DidProcessTask(std::move(task_source));
487
488 // Check that task_source is always cleared, to help investigation of memory
489 // corruption where task_source is non-null after being moved.
490 // crbug.com/1218384
491 CHECK(!task_source);
492
493 // Calling WakeUp() guarantees that this WorkerThread will run Tasks from
494 // TaskSources returned by the GetWork() method of |delegate_| until it
495 // returns nullptr. Resetting |wake_up_event_| here doesn't break this
496 // invariant and avoids a useless loop iteration before going to sleep if
497 // WakeUp() is called while this WorkerThread is awake.
498 wake_up_event_.Reset();
499 }
500
501 // Important: It is unsafe to access unowned state (e.g. |task_tracker_|)
502 // after invoking OnMainExit().
503
504 delegate_->OnMainExit(this);
505
506 if (worker_thread_observer_)
507 worker_thread_observer_->OnWorkerThreadMainExit();
508
509 // Release the self-reference to |this|. This can result in deleting |this|
510 // and as such no more member accesses should be made after this point.
511 self_ = nullptr;
512
513 TRACE_EVENT_END0("base", "WorkerThread active");
514 TRACE_EVENT_INSTANT0("base", "WorkerThread dead", TRACE_EVENT_SCOPE_THREAD);
515 // TODO(crbug.com/1021571): Remove this once fixed.
516 PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
517 }
518
519 } // namespace base::internal
520