• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/worker_thread.h"
6 
7 #include <stddef.h>
8 
9 #include <algorithm>
10 #include <atomic>
11 #include <utility>
12 
13 #include "base/allocator/partition_allocator/src/partition_alloc/partition_alloc_buildflags.h"
14 #include "base/allocator/partition_allocator/src/partition_alloc/partition_alloc_config.h"
15 #include "base/check_op.h"
16 #include "base/compiler_specific.h"
17 #include "base/debug/alias.h"
18 #include "base/feature_list.h"
19 #include "base/functional/callback_helpers.h"
20 #include "base/synchronization/waitable_event.h"
21 #include "base/task/task_features.h"
22 #include "base/task/thread_pool/environment_config.h"
23 #include "base/task/thread_pool/worker_thread_observer.h"
24 #include "base/threading/hang_watcher.h"
25 #include "base/time/time.h"
26 #include "base/time/time_override.h"
27 #include "base/trace_event/base_tracing.h"
28 #include "build/build_config.h"
29 #include "third_party/abseil-cpp/absl/types/optional.h"
30 
31 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
32 #include "base/files/file_descriptor_watcher_posix.h"
33 #endif
34 
35 #if BUILDFLAG(IS_APPLE)
36 #include "base/apple/scoped_nsautorelease_pool.h"
37 #endif
38 
39 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
40     PA_CONFIG(THREAD_CACHE_SUPPORTED)
41 #include "base/allocator/partition_allocator/src/partition_alloc/thread_cache.h"
42 #endif
43 
44 namespace base::internal {
45 
46 constexpr TimeDelta WorkerThread::Delegate::kPurgeThreadCacheIdleDelay;
47 
WaitForWork()48 void WorkerThread::Delegate::WaitForWork() {
49   const TimeDelta sleep_time = GetSleepTimeout();
50 
51   // When a thread goes to sleep, the memory retained by its thread cache is
52   // trapped there for as long as the thread sleeps. To prevent that, we can
53   // either purge the thread cache right before going to sleep, or after some
54   // delay.
55   //
56   // Purging the thread cache incurs a cost on the next task, since its thread
57   // cache will be empty and allocation performance initially lower. As a lot of
58   // sleeps are very short, do not purge all the time (this would also make
59   // sleep / wakeups cycles more costly).
60   //
61   // Instead, sleep for min(timeout, 1s). If the wait times out then purge at
62   // that point, and go to sleep for the remaining of the time. This ensures
63   // that we do no work for short sleeps, and that threads do not get awaken
64   // many times.
65 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
66     PA_CONFIG(THREAD_CACHE_SUPPORTED)
67   TimeDelta min_sleep_time = std::min(sleep_time, kPurgeThreadCacheIdleDelay);
68 
69   if (IsDelayFirstWorkerSleepEnabled()) {
70     min_sleep_time = GetSleepTimeBeforePurge(min_sleep_time);
71   }
72 
73   const bool was_signaled = TimedWait(min_sleep_time);
74   // Timed out.
75   if (!was_signaled) {
76     partition_alloc::ThreadCache::PurgeCurrentThread();
77 
78     // The thread woke up to purge before its standard reclaim time. Sleep for
79     // what's remaining until then.
80     if (sleep_time > min_sleep_time) {
81       TimedWait(sleep_time.is_max() ? TimeDelta::Max()
82                                     : sleep_time - min_sleep_time);
83     }
84   }
85 #else
86   TimedWait(sleep_time);
87 #endif  // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
88         // PA_CONFIG(THREAD_CACHE_SUPPORTED)
89 }
90 
IsDelayFirstWorkerSleepEnabled()91 bool WorkerThread::Delegate::IsDelayFirstWorkerSleepEnabled() {
92   static bool state = FeatureList::IsEnabled(kDelayFirstWorkerWake);
93   return state;
94 }
95 
96 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
97     PA_CONFIG(THREAD_CACHE_SUPPORTED)
98 // Returns the desired sleep time before the worker has to wake up to purge
99 // the cache thread or reclaim itself. |min_sleep_time| contains the minimal
100 // acceptable amount of time to sleep.
GetSleepTimeBeforePurge(TimeDelta min_sleep_time)101 TimeDelta WorkerThread::Delegate::GetSleepTimeBeforePurge(
102     TimeDelta min_sleep_time) {
103   const TimeTicks now = TimeTicks::Now();
104 
105   // Do not wake up to purge within the first minute of process lifetime. In
106   // short lived processes this will avoid waking up to try and save memory
107   // for a heap that will be going away soon. For longer lived processes this
108   // should allow for better performance at process startup since even if a
109   // worker goes to sleep for kPurgeThreadCacheIdleDelay it's very likely it
110   // will be needed soon after because of heavy startup workloads.
111   constexpr TimeDelta kFirstSleepLength = Minutes(1);
112 
113   // Use the first time a worker goes to sleep in this process as an
114   // approximation of the process creation time.
115   static const TimeTicks first_sleep_time = now;
116 
117   // A sleep that occurs within `kFirstSleepLength` of the
118   // first sleep lasts at least `kFirstSleepLength`.
119   if (now <= first_sleep_time + kFirstSleepLength) {
120     min_sleep_time = std::max(kFirstSleepLength, min_sleep_time);
121   }
122 
123   // Align wakeups for purges to reduce the chances of taking the CPU out of
124   // sleep multiple times for these operations.
125   const TimeTicks snapped_wake =
126       (now + min_sleep_time)
127           .SnappedToNextTick(TimeTicks(), kPurgeThreadCacheIdleDelay);
128 
129   return snapped_wake - now;
130 }
131 #endif  // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
132         // PA_CONFIG(THREAD_CACHE_SUPPORTED)
133 
WorkerThread(ThreadType thread_type_hint,TrackedRef<TaskTracker> task_tracker,size_t sequence_num,const CheckedLock * predecessor_lock)134 WorkerThread::WorkerThread(ThreadType thread_type_hint,
135                            TrackedRef<TaskTracker> task_tracker,
136                            size_t sequence_num,
137                            const CheckedLock* predecessor_lock)
138     : thread_lock_(predecessor_lock),
139       task_tracker_(std::move(task_tracker)),
140       thread_type_hint_(thread_type_hint),
141       current_thread_type_(GetDesiredThreadType()),
142       sequence_num_(sequence_num) {
143   DCHECK(task_tracker_);
144   DCHECK(CanUseBackgroundThreadTypeForWorkerThread() ||
145          thread_type_hint_ != ThreadType::kBackground);
146   DCHECK(CanUseUtilityThreadTypeForWorkerThread() ||
147          thread_type_hint != ThreadType::kUtility);
148 }
149 
Start(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,WorkerThreadObserver * worker_thread_observer)150 bool WorkerThread::Start(
151     scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
152     WorkerThreadObserver* worker_thread_observer) {
153   CheckedLock::AssertNoLockHeldOnCurrentThread();
154 
155   // Prime kDelayFirstWorkerWake's feature state right away on thread creation
156   // instead of looking it up for the first time later on thread as this avoids
157   // a data race in tests that may ~FeatureList while the first worker thread
158   // is still initializing (the first WorkerThread will be started on the main
159   // thread as part of ThreadPoolImpl::Start() so doing it then avoids this
160   // race), crbug.com/1344573.
161   // Note 1: the feature state is always available at this point as
162   // ThreadPoolInstance::Start() contractually happens-after FeatureList
163   // initialization.
164   // Note 2: This is done on Start instead of in the constructor as construction
165   // happens under a ThreadGroupImpl lock which precludes calling into
166   // FeatureList (as that can also use a lock).
167   delegate()->IsDelayFirstWorkerSleepEnabled();
168 
169   CheckedAutoLock auto_lock(thread_lock_);
170   DCHECK(thread_handle_.is_null());
171 
172 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
173   DCHECK(io_thread_task_runner);
174   io_thread_task_runner_ = std::move(io_thread_task_runner);
175 #endif
176 
177   if (should_exit_.IsSet() || join_called_for_testing_.IsSet())
178     return true;
179 
180   DCHECK(!worker_thread_observer_);
181   worker_thread_observer_ = worker_thread_observer;
182 
183   self_ = this;
184 
185   constexpr size_t kDefaultStackSize = 0;
186   PlatformThread::CreateWithType(kDefaultStackSize, this, &thread_handle_,
187                                  current_thread_type_);
188 
189   if (thread_handle_.is_null()) {
190     self_ = nullptr;
191     return false;
192   }
193 
194   return true;
195 }
196 
ThreadAliveForTesting() const197 bool WorkerThread::ThreadAliveForTesting() const {
198   CheckedAutoLock auto_lock(thread_lock_);
199   return !thread_handle_.is_null();
200 }
201 
~WorkerThread()202 WorkerThread::~WorkerThread() {
203   CheckedAutoLock auto_lock(thread_lock_);
204 
205   // If |thread_handle_| wasn't joined, detach it.
206   if (!thread_handle_.is_null()) {
207     DCHECK(!join_called_for_testing_.IsSet());
208     PlatformThread::Detach(thread_handle_);
209   }
210 }
211 
MaybeUpdateThreadType()212 void WorkerThread::MaybeUpdateThreadType() {
213   UpdateThreadType(GetDesiredThreadType());
214 }
215 
BeginUnusedPeriod()216 void WorkerThread::BeginUnusedPeriod() {
217   CheckedAutoLock auto_lock(thread_lock_);
218   DCHECK(last_used_time_.is_null());
219   last_used_time_ = subtle::TimeTicksNowIgnoringOverride();
220 }
221 
EndUnusedPeriod()222 void WorkerThread::EndUnusedPeriod() {
223   CheckedAutoLock auto_lock(thread_lock_);
224   DCHECK(!last_used_time_.is_null());
225   last_used_time_ = TimeTicks();
226 }
227 
GetLastUsedTime() const228 TimeTicks WorkerThread::GetLastUsedTime() const {
229   CheckedAutoLock auto_lock(thread_lock_);
230   return last_used_time_;
231 }
232 
ShouldExit() const233 bool WorkerThread::ShouldExit() const {
234   // The ordering of the checks is important below. This WorkerThread may be
235   // released and outlive |task_tracker_| in unit tests. However, when the
236   // WorkerThread is released, |should_exit_| will be set, so check that
237   // first.
238   return should_exit_.IsSet() || join_called_for_testing_.IsSet() ||
239          task_tracker_->IsShutdownComplete();
240 }
241 
GetDesiredThreadType() const242 ThreadType WorkerThread::GetDesiredThreadType() const {
243   // To avoid shutdown hangs, disallow a type below kNormal during shutdown
244   if (task_tracker_->HasShutdownStarted())
245     return ThreadType::kDefault;
246 
247   return thread_type_hint_;
248 }
249 
UpdateThreadType(ThreadType desired_thread_type)250 void WorkerThread::UpdateThreadType(ThreadType desired_thread_type) {
251   if (desired_thread_type == current_thread_type_)
252     return;
253 
254   PlatformThread::SetCurrentThreadType(desired_thread_type);
255   current_thread_type_ = desired_thread_type;
256 }
257 
ThreadMain()258 void WorkerThread::ThreadMain() {
259 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
260   DCHECK(io_thread_task_runner_);
261   FileDescriptorWatcher file_descriptor_watcher(io_thread_task_runner_);
262 #endif
263 
264   if (thread_type_hint_ == ThreadType::kBackground) {
265     switch (delegate()->GetThreadLabel()) {
266       case ThreadLabel::POOLED:
267         RunBackgroundPooledWorker();
268         return;
269       case ThreadLabel::SHARED:
270         RunBackgroundSharedWorker();
271         return;
272       case ThreadLabel::DEDICATED:
273         RunBackgroundDedicatedWorker();
274         return;
275 #if BUILDFLAG(IS_WIN)
276       case ThreadLabel::SHARED_COM:
277         RunBackgroundSharedCOMWorker();
278         return;
279       case ThreadLabel::DEDICATED_COM:
280         RunBackgroundDedicatedCOMWorker();
281         return;
282 #endif  // BUILDFLAG(IS_WIN)
283     }
284   }
285 
286   switch (delegate()->GetThreadLabel()) {
287     case ThreadLabel::POOLED:
288       RunPooledWorker();
289       return;
290     case ThreadLabel::SHARED:
291       RunSharedWorker();
292       return;
293     case ThreadLabel::DEDICATED:
294       RunDedicatedWorker();
295       return;
296 #if BUILDFLAG(IS_WIN)
297     case ThreadLabel::SHARED_COM:
298       RunSharedCOMWorker();
299       return;
300     case ThreadLabel::DEDICATED_COM:
301       RunDedicatedCOMWorker();
302       return;
303 #endif  // BUILDFLAG(IS_WIN)
304   }
305 }
306 
RunPooledWorker()307 NOINLINE void WorkerThread::RunPooledWorker() {
308   RunWorker();
309   NO_CODE_FOLDING();
310 }
311 
RunBackgroundPooledWorker()312 NOINLINE void WorkerThread::RunBackgroundPooledWorker() {
313   RunWorker();
314   NO_CODE_FOLDING();
315 }
316 
RunSharedWorker()317 NOINLINE void WorkerThread::RunSharedWorker() {
318   RunWorker();
319   NO_CODE_FOLDING();
320 }
321 
RunBackgroundSharedWorker()322 NOINLINE void WorkerThread::RunBackgroundSharedWorker() {
323   RunWorker();
324   NO_CODE_FOLDING();
325 }
326 
RunDedicatedWorker()327 NOINLINE void WorkerThread::RunDedicatedWorker() {
328   RunWorker();
329   NO_CODE_FOLDING();
330 }
331 
RunBackgroundDedicatedWorker()332 NOINLINE void WorkerThread::RunBackgroundDedicatedWorker() {
333   RunWorker();
334   NO_CODE_FOLDING();
335 }
336 
337 #if BUILDFLAG(IS_WIN)
RunSharedCOMWorker()338 NOINLINE void WorkerThread::RunSharedCOMWorker() {
339   RunWorker();
340   NO_CODE_FOLDING();
341 }
342 
RunBackgroundSharedCOMWorker()343 NOINLINE void WorkerThread::RunBackgroundSharedCOMWorker() {
344   RunWorker();
345   NO_CODE_FOLDING();
346 }
347 
RunDedicatedCOMWorker()348 NOINLINE void WorkerThread::RunDedicatedCOMWorker() {
349   RunWorker();
350   NO_CODE_FOLDING();
351 }
352 
RunBackgroundDedicatedCOMWorker()353 NOINLINE void WorkerThread::RunBackgroundDedicatedCOMWorker() {
354   RunWorker();
355   NO_CODE_FOLDING();
356 }
357 #endif  // BUILDFLAG(IS_WIN)
358 
RunWorker()359 void WorkerThread::RunWorker() {
360   DCHECK_EQ(self_, this);
361   TRACE_EVENT_INSTANT0("base", "WorkerThread born", TRACE_EVENT_SCOPE_THREAD);
362   TRACE_EVENT_BEGIN0("base", "WorkerThread active");
363 
364   if (worker_thread_observer_) {
365     worker_thread_observer_->OnWorkerThreadMainEntry();
366   }
367 
368   delegate()->OnMainEntry(this);
369 
370   // Background threads can take an arbitrary amount of time to complete, do not
371   // watch them for hangs. Ignore priority boosting for now.
372   const bool watch_for_hangs =
373       base::HangWatcher::IsThreadPoolHangWatchingEnabled() &&
374       GetDesiredThreadType() != ThreadType::kBackground;
375 
376   // If this process has a HangWatcher register this thread for watching.
377   base::ScopedClosureRunner unregister_for_hang_watching;
378   if (watch_for_hangs) {
379     unregister_for_hang_watching = base::HangWatcher::RegisterThread(
380         base::HangWatcher::ThreadType::kThreadPoolThread);
381   }
382 
383   while (!ShouldExit()) {
384 #if BUILDFLAG(IS_APPLE)
385     apple::ScopedNSAutoreleasePool autorelease_pool;
386 #endif
387     absl::optional<WatchHangsInScope> hang_watch_scope;
388 
389     TRACE_EVENT_END0("base", "WorkerThread active");
390     // TODO(crbug.com/1021571): Remove this once fixed.
391     PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
392     hang_watch_scope.reset();
393     delegate()->WaitForWork();
394     TRACE_EVENT_BEGIN("base", "WorkerThread active",
395                       perfetto::TerminatingFlow::FromPointer(this));
396 
397     // Don't GetWork() in the case where we woke up for Cleanup().
398     if (ShouldExit()) {
399       break;
400     }
401 
402     if (watch_for_hangs) {
403       hang_watch_scope.emplace();
404     }
405 
406     // Thread type needs to be updated before GetWork.
407     UpdateThreadType(GetDesiredThreadType());
408 
409     // Get the task source containing the first task to execute.
410     RegisteredTaskSource task_source = delegate()->GetWork(this);
411 
412     // If acquiring work failed and the worker's still alive,
413     // record that this is an unnecessary wakeup.
414     if (!task_source && !ShouldExit()) {
415       delegate()->RecordUnnecessaryWakeup();
416     }
417 
418     while (task_source) {
419       // Alias pointer for investigation of memory corruption. crbug.com/1218384
420       TaskSource* task_source_before_run = task_source.get();
421       base::debug::Alias(&task_source_before_run);
422 
423       task_source = task_tracker_->RunAndPopNextTask(std::move(task_source));
424 
425       // Alias pointer for investigation of memory corruption. crbug.com/1218384
426       TaskSource* task_source_before_move = task_source.get();
427       base::debug::Alias(&task_source_before_move);
428 
429       // We emplace the hang_watch_scope here so that each hang watch scope
430       // covers one GetWork (or SwapProcessedTask) as well as one
431       // RunAndPopNextTask.
432       if (watch_for_hangs) {
433         hang_watch_scope.emplace();
434       }
435 
436       RegisteredTaskSource new_task_source =
437           delegate()->SwapProcessedTask(std::move(task_source), this);
438 
439       UpdateThreadType(GetDesiredThreadType());
440 
441       // Check that task_source is always cleared, to help investigation of
442       // memory corruption where task_source is non-null after being moved.
443       // crbug.com/1218384
444       CHECK(!task_source);
445       task_source = std::move(new_task_source);
446     }
447   }
448 
449   // Important: It is unsafe to access unowned state (e.g. |task_tracker_|)
450   // after invoking OnMainExit().
451 
452   delegate()->OnMainExit(this);
453 
454   if (worker_thread_observer_) {
455     worker_thread_observer_->OnWorkerThreadMainExit();
456   }
457 
458   // Release the self-reference to |this|. This can result in deleting |this|
459   // and as such no more member accesses should be made after this point.
460   self_ = nullptr;
461 
462   TRACE_EVENT_END0("base", "WorkerThread active");
463   TRACE_EVENT_INSTANT0("base", "WorkerThread dead", TRACE_EVENT_SCOPE_THREAD);
464   // TODO(crbug.com/1021571): Remove this once fixed.
465   PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
466 }
467 
468 }  // namespace base::internal
469