• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/worker_thread.h"
6 
7 #include <stddef.h>
8 
9 #include <algorithm>
10 #include <atomic>
11 #include <utility>
12 
13 #include "base/allocator/partition_allocator/partition_alloc_buildflags.h"
14 #include "base/allocator/partition_allocator/partition_alloc_config.h"
15 #include "base/check_op.h"
16 #include "base/compiler_specific.h"
17 #include "base/debug/alias.h"
18 #include "base/feature_list.h"
19 #include "base/functional/callback_helpers.h"
20 #include "base/synchronization/waitable_event.h"
21 #include "base/task/task_features.h"
22 #include "base/task/thread_pool/environment_config.h"
23 #include "base/task/thread_pool/task_tracker.h"
24 #include "base/task/thread_pool/worker_thread_observer.h"
25 #include "base/threading/hang_watcher.h"
26 #include "base/time/time.h"
27 #include "base/time/time_override.h"
28 #include "base/trace_event/base_tracing.h"
29 #include "build/build_config.h"
30 #include "third_party/abseil-cpp/absl/types/optional.h"
31 
32 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
33 #include "base/files/file_descriptor_watcher_posix.h"
34 #endif
35 
36 #if BUILDFLAG(IS_APPLE)
37 #include "base/mac/scoped_nsautorelease_pool.h"
38 #endif
39 
40 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
41     PA_CONFIG(THREAD_CACHE_SUPPORTED)
42 #include "base/allocator/partition_allocator/thread_cache.h"
43 #endif
44 
45 namespace base::internal {
46 
47 namespace {
48 
49 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
50     PA_CONFIG(THREAD_CACHE_SUPPORTED)
51 // Returns the desired sleep time before the worker has to wake up to purge
52 // the cache thread or reclaim itself. |min_sleep_time| contains the minimal
53 // acceptable amount of time to sleep.
GetSleepTimeBeforePurge(TimeDelta min_sleep_time)54 TimeDelta GetSleepTimeBeforePurge(TimeDelta min_sleep_time) {
55   const TimeTicks now = TimeTicks::Now();
56 
57   // Do not wake up to purge within the first minute of process lifetime. In
58   // short lived processes this will avoid waking up to try and save memory
59   // for a heap that will be going away soon. For longer lived processes this
60   // should allow for better performance at process startup since even if a
61   // worker goes to sleep for kPurgeThreadCacheIdleDelay it's very likely it
62   // will be needed soon after because of heavy startup workloads.
63   constexpr TimeDelta kFirstSleepLength = Minutes(1);
64 
65   // Use the first time a worker goes to sleep in this process as an
66   // approximation of the process creation time.
67   static const TimeTicks first_scheduled_wake = now + kFirstSleepLength;
68 
69   // Align wakeups for purges to reduce the chances of taking the CPU out of
70   // sleep multiple times for these operations.
71   constexpr TimeDelta kPurgeThreadCacheIdleDelay = Seconds(1);
72   const TimeTicks snapped_wake =
73       (now + min_sleep_time)
74           .SnappedToNextTick(TimeTicks(), kPurgeThreadCacheIdleDelay);
75 
76   // Avoid scheduling at |first_scheduled_wake| if it would result in a sleep
77   // that's too short.
78   return std::max(snapped_wake - now, first_scheduled_wake - now);
79 }
80 #endif  // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
81         // PA_CONFIG(THREAD_CACHE_SUPPORTED)
82 
IsDelayFirstWorkerSleepEnabled()83 bool IsDelayFirstWorkerSleepEnabled() {
84   static bool state = FeatureList::IsEnabled(kDelayFirstWorkerWake);
85   return state;
86 }
87 
88 }  // namespace
89 
90 constexpr TimeDelta WorkerThread::Delegate::kPurgeThreadCacheIdleDelay;
91 
WaitForWork(WaitableEvent * wake_up_event)92 void WorkerThread::Delegate::WaitForWork(WaitableEvent* wake_up_event) {
93   DCHECK(wake_up_event);
94   const TimeDelta sleep_time = GetSleepTimeout();
95 
96   // When a thread goes to sleep, the memory retained by its thread cache is
97   // trapped there for as long as the thread sleeps. To prevent that, we can
98   // either purge the thread cache right before going to sleep, or after some
99   // delay.
100   //
101   // Purging the thread cache incurs a cost on the next task, since its thread
102   // cache will be empty and allocation performance initially lower. As a lot of
103   // sleeps are very short, do not purge all the time (this would also make
104   // sleep / wakeups cycles more costly).
105   //
106   // Instead, sleep for min(timeout, 1s). If the wait times out then purge at
107   // that point, and go to sleep for the remaining of the time. This ensures
108   // that we do no work for short sleeps, and that threads do not get awaken
109   // many times.
110 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
111     PA_CONFIG(THREAD_CACHE_SUPPORTED)
112   TimeDelta min_sleep_time = std::min(sleep_time, kPurgeThreadCacheIdleDelay);
113 
114   if (IsDelayFirstWorkerSleepEnabled())
115     min_sleep_time = GetSleepTimeBeforePurge(min_sleep_time);
116 
117   const bool was_signaled = wake_up_event->TimedWait(min_sleep_time);
118 
119   // Timed out.
120   if (!was_signaled) {
121     partition_alloc::ThreadCache::PurgeCurrentThread();
122 
123     // The thread woke up to purge before its standard reclaim time. Sleep for
124     // what's remaining until then.
125     if (sleep_time > min_sleep_time) {
126       wake_up_event->TimedWait(sleep_time.is_max()
127                                    ? base::TimeDelta::Max()
128                                    : sleep_time - min_sleep_time);
129     }
130   }
131 #else
132   wake_up_event->TimedWait(sleep_time);
133 #endif  // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
134         // PA_CONFIG(THREAD_CACHE_SUPPORTED)
135 }
136 
WorkerThread(ThreadType thread_type_hint,std::unique_ptr<Delegate> delegate,TrackedRef<TaskTracker> task_tracker,size_t sequence_num,const CheckedLock * predecessor_lock)137 WorkerThread::WorkerThread(ThreadType thread_type_hint,
138                            std::unique_ptr<Delegate> delegate,
139                            TrackedRef<TaskTracker> task_tracker,
140                            size_t sequence_num,
141                            const CheckedLock* predecessor_lock)
142     : thread_lock_(predecessor_lock),
143       delegate_(std::move(delegate)),
144       task_tracker_(std::move(task_tracker)),
145       thread_type_hint_(thread_type_hint),
146       current_thread_type_(GetDesiredThreadType()),
147       sequence_num_(sequence_num) {
148   DCHECK(delegate_);
149   DCHECK(task_tracker_);
150   DCHECK(CanUseBackgroundThreadTypeForWorkerThread() ||
151          thread_type_hint_ != ThreadType::kBackground);
152   DCHECK(CanUseUtilityThreadTypeForWorkerThread() ||
153          thread_type_hint != ThreadType::kUtility);
154   wake_up_event_.declare_only_used_while_idle();
155 }
156 
Start(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,WorkerThreadObserver * worker_thread_observer)157 bool WorkerThread::Start(
158     scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
159     WorkerThreadObserver* worker_thread_observer) {
160   CheckedLock::AssertNoLockHeldOnCurrentThread();
161 
162   // Prime kDelayFirstWorkerWake's feature state right away on thread creation
163   // instead of looking it up for the first time later on thread as this avoids
164   // a data race in tests that may ~FeatureList while the first worker thread
165   // is still initializing (the first WorkerThread will be started on the main
166   // thread as part of ThreadPoolImpl::Start() so doing it then avoids this
167   // race), crbug.com/1344573.
168   // Note 1: the feature state is always available at this point as
169   // ThreadPoolInstance::Start() contractually happens-after FeatureList
170   // initialization.
171   // Note 2: This is done on Start instead of in the constructor as construction
172   // happens under a ThreadGroupImpl lock which precludes calling into
173   // FeatureList (as that can also use a lock).
174   IsDelayFirstWorkerSleepEnabled();
175 
176   CheckedAutoLock auto_lock(thread_lock_);
177   DCHECK(thread_handle_.is_null());
178 
179 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
180   DCHECK(io_thread_task_runner);
181   io_thread_task_runner_ = std::move(io_thread_task_runner);
182 #endif
183 
184   if (should_exit_.IsSet() || join_called_for_testing_.IsSet())
185     return true;
186 
187   DCHECK(!worker_thread_observer_);
188   worker_thread_observer_ = worker_thread_observer;
189 
190   self_ = this;
191 
192   constexpr size_t kDefaultStackSize = 0;
193   PlatformThread::CreateWithType(kDefaultStackSize, this, &thread_handle_,
194                                  current_thread_type_);
195 
196   if (thread_handle_.is_null()) {
197     self_ = nullptr;
198     return false;
199   }
200 
201   return true;
202 }
203 
WakeUp()204 void WorkerThread::WakeUp() {
205   // Signalling an event can deschedule the current thread. Since being
206   // descheduled while holding a lock is undesirable (https://crbug.com/890978),
207   // assert that no lock is held by the current thread.
208   CheckedLock::AssertNoLockHeldOnCurrentThread();
209   // Calling WakeUp() after Cleanup() or Join() is wrong because the
210   // WorkerThread cannot run more tasks.
211   DCHECK(!join_called_for_testing_.IsSet());
212   DCHECK(!should_exit_.IsSet());
213   TRACE_EVENT_INSTANT("wakeup.flow", "WorkerThread::WakeUp",
214                       perfetto::Flow::FromPointer(this));
215   wake_up_event_.Signal();
216 }
217 
JoinForTesting()218 void WorkerThread::JoinForTesting() {
219   DCHECK(!join_called_for_testing_.IsSet());
220   join_called_for_testing_.Set();
221   wake_up_event_.Signal();
222 
223   PlatformThreadHandle thread_handle;
224 
225   {
226     CheckedAutoLock auto_lock(thread_lock_);
227 
228     if (thread_handle_.is_null())
229       return;
230 
231     thread_handle = thread_handle_;
232     // Reset |thread_handle_| so it isn't joined by the destructor.
233     thread_handle_ = PlatformThreadHandle();
234   }
235 
236   PlatformThread::Join(thread_handle);
237 }
238 
ThreadAliveForTesting() const239 bool WorkerThread::ThreadAliveForTesting() const {
240   CheckedAutoLock auto_lock(thread_lock_);
241   return !thread_handle_.is_null();
242 }
243 
~WorkerThread()244 WorkerThread::~WorkerThread() {
245   CheckedAutoLock auto_lock(thread_lock_);
246 
247   // If |thread_handle_| wasn't joined, detach it.
248   if (!thread_handle_.is_null()) {
249     DCHECK(!join_called_for_testing_.IsSet());
250     PlatformThread::Detach(thread_handle_);
251   }
252 }
253 
Cleanup()254 void WorkerThread::Cleanup() {
255   DCHECK(!should_exit_.IsSet());
256   should_exit_.Set();
257   wake_up_event_.Signal();
258 }
259 
MaybeUpdateThreadType()260 void WorkerThread::MaybeUpdateThreadType() {
261   UpdateThreadType(GetDesiredThreadType());
262 }
263 
BeginUnusedPeriod()264 void WorkerThread::BeginUnusedPeriod() {
265   CheckedAutoLock auto_lock(thread_lock_);
266   DCHECK(last_used_time_.is_null());
267   last_used_time_ = subtle::TimeTicksNowIgnoringOverride();
268 }
269 
EndUnusedPeriod()270 void WorkerThread::EndUnusedPeriod() {
271   CheckedAutoLock auto_lock(thread_lock_);
272   DCHECK(!last_used_time_.is_null());
273   last_used_time_ = TimeTicks();
274 }
275 
GetLastUsedTime() const276 TimeTicks WorkerThread::GetLastUsedTime() const {
277   CheckedAutoLock auto_lock(thread_lock_);
278   return last_used_time_;
279 }
280 
ShouldExit() const281 bool WorkerThread::ShouldExit() const {
282   // The ordering of the checks is important below. This WorkerThread may be
283   // released and outlive |task_tracker_| in unit tests. However, when the
284   // WorkerThread is released, |should_exit_| will be set, so check that
285   // first.
286   return should_exit_.IsSet() || join_called_for_testing_.IsSet() ||
287          task_tracker_->IsShutdownComplete();
288 }
289 
GetDesiredThreadType() const290 ThreadType WorkerThread::GetDesiredThreadType() const {
291   // To avoid shutdown hangs, disallow a type below kNormal during shutdown
292   if (task_tracker_->HasShutdownStarted())
293     return ThreadType::kDefault;
294 
295   return thread_type_hint_;
296 }
297 
UpdateThreadType(ThreadType desired_thread_type)298 void WorkerThread::UpdateThreadType(ThreadType desired_thread_type) {
299   if (desired_thread_type == current_thread_type_)
300     return;
301 
302   PlatformThread::SetCurrentThreadType(desired_thread_type);
303   current_thread_type_ = desired_thread_type;
304 }
305 
ThreadMain()306 void WorkerThread::ThreadMain() {
307 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
308   DCHECK(io_thread_task_runner_);
309   FileDescriptorWatcher file_descriptor_watcher(io_thread_task_runner_);
310 #endif
311 
312   if (thread_type_hint_ == ThreadType::kBackground) {
313     switch (delegate_->GetThreadLabel()) {
314       case ThreadLabel::POOLED:
315         RunBackgroundPooledWorker();
316         return;
317       case ThreadLabel::SHARED:
318         RunBackgroundSharedWorker();
319         return;
320       case ThreadLabel::DEDICATED:
321         RunBackgroundDedicatedWorker();
322         return;
323 #if BUILDFLAG(IS_WIN)
324       case ThreadLabel::SHARED_COM:
325         RunBackgroundSharedCOMWorker();
326         return;
327       case ThreadLabel::DEDICATED_COM:
328         RunBackgroundDedicatedCOMWorker();
329         return;
330 #endif  // BUILDFLAG(IS_WIN)
331     }
332   }
333 
334   switch (delegate_->GetThreadLabel()) {
335     case ThreadLabel::POOLED:
336       RunPooledWorker();
337       return;
338     case ThreadLabel::SHARED:
339       RunSharedWorker();
340       return;
341     case ThreadLabel::DEDICATED:
342       RunDedicatedWorker();
343       return;
344 #if BUILDFLAG(IS_WIN)
345     case ThreadLabel::SHARED_COM:
346       RunSharedCOMWorker();
347       return;
348     case ThreadLabel::DEDICATED_COM:
349       RunDedicatedCOMWorker();
350       return;
351 #endif  // BUILDFLAG(IS_WIN)
352   }
353 }
354 
RunPooledWorker()355 NOINLINE void WorkerThread::RunPooledWorker() {
356   RunWorker();
357   NO_CODE_FOLDING();
358 }
359 
RunBackgroundPooledWorker()360 NOINLINE void WorkerThread::RunBackgroundPooledWorker() {
361   RunWorker();
362   NO_CODE_FOLDING();
363 }
364 
RunSharedWorker()365 NOINLINE void WorkerThread::RunSharedWorker() {
366   RunWorker();
367   NO_CODE_FOLDING();
368 }
369 
RunBackgroundSharedWorker()370 NOINLINE void WorkerThread::RunBackgroundSharedWorker() {
371   RunWorker();
372   NO_CODE_FOLDING();
373 }
374 
RunDedicatedWorker()375 NOINLINE void WorkerThread::RunDedicatedWorker() {
376   RunWorker();
377   NO_CODE_FOLDING();
378 }
379 
RunBackgroundDedicatedWorker()380 NOINLINE void WorkerThread::RunBackgroundDedicatedWorker() {
381   RunWorker();
382   NO_CODE_FOLDING();
383 }
384 
385 #if BUILDFLAG(IS_WIN)
RunSharedCOMWorker()386 NOINLINE void WorkerThread::RunSharedCOMWorker() {
387   RunWorker();
388   NO_CODE_FOLDING();
389 }
390 
RunBackgroundSharedCOMWorker()391 NOINLINE void WorkerThread::RunBackgroundSharedCOMWorker() {
392   RunWorker();
393   NO_CODE_FOLDING();
394 }
395 
RunDedicatedCOMWorker()396 NOINLINE void WorkerThread::RunDedicatedCOMWorker() {
397   RunWorker();
398   NO_CODE_FOLDING();
399 }
400 
RunBackgroundDedicatedCOMWorker()401 NOINLINE void WorkerThread::RunBackgroundDedicatedCOMWorker() {
402   RunWorker();
403   NO_CODE_FOLDING();
404 }
405 #endif  // BUILDFLAG(IS_WIN)
406 
RunWorker()407 void WorkerThread::RunWorker() {
408   DCHECK_EQ(self_, this);
409   TRACE_EVENT_INSTANT0("base", "WorkerThread born", TRACE_EVENT_SCOPE_THREAD);
410   TRACE_EVENT_BEGIN0("base", "WorkerThread active");
411 
412   if (worker_thread_observer_)
413     worker_thread_observer_->OnWorkerThreadMainEntry();
414 
415   delegate_->OnMainEntry(this);
416 
417   // Background threads can take an arbitrary amount of time to complete, do not
418   // watch them for hangs. Ignore priority boosting for now.
419   const bool watch_for_hangs =
420       base::HangWatcher::IsThreadPoolHangWatchingEnabled() &&
421       GetDesiredThreadType() != ThreadType::kBackground;
422 
423   // If this process has a HangWatcher register this thread for watching.
424   base::ScopedClosureRunner unregister_for_hang_watching;
425   if (watch_for_hangs) {
426     unregister_for_hang_watching = base::HangWatcher::RegisterThread(
427         base::HangWatcher::ThreadType::kThreadPoolThread);
428   }
429 
430   // A WorkerThread starts out waiting for work.
431   {
432     TRACE_EVENT_END0("base", "WorkerThread active");
433     // TODO(crbug.com/1021571): Remove this once fixed.
434     PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
435     delegate_->WaitForWork(&wake_up_event_);
436     TRACE_EVENT_BEGIN("base", "WorkerThread active",
437                       perfetto::TerminatingFlow::FromPointer(this));
438   }
439   bool got_work_this_wakeup = false;
440   while (!ShouldExit()) {
441 #if BUILDFLAG(IS_APPLE)
442     mac::ScopedNSAutoreleasePool autorelease_pool;
443 #endif
444     absl::optional<WatchHangsInScope> hang_watch_scope;
445     if (watch_for_hangs)
446       hang_watch_scope.emplace();
447 
448     UpdateThreadType(GetDesiredThreadType());
449 
450     // Get the task source containing the next task to execute.
451     RegisteredTaskSource task_source = delegate_->GetWork(this);
452     if (!task_source) {
453       // Exit immediately if GetWork() resulted in detaching this worker.
454       if (ShouldExit())
455         break;
456 
457       // If this is the first time we called GetWork and the worker's still
458       // alive, record that this is an unnecessary wakeup.
459       if (!got_work_this_wakeup)
460         delegate_->RecordUnnecessaryWakeup();
461 
462       TRACE_EVENT_END0("base", "WorkerThread active");
463       // TODO(crbug.com/1021571): Remove this once fixed.
464       PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
465       hang_watch_scope.reset();
466       delegate_->WaitForWork(&wake_up_event_);
467       got_work_this_wakeup = false;
468 
469       TRACE_EVENT_BEGIN("base", "WorkerThread active",
470                         perfetto::TerminatingFlow::FromPointer(this));
471       continue;
472     }
473 
474     got_work_this_wakeup = true;
475 
476     // Alias pointer for investigation of memory corruption. crbug.com/1218384
477     TaskSource* task_source_before_run = task_source.get();
478     base::debug::Alias(&task_source_before_run);
479 
480     task_source = task_tracker_->RunAndPopNextTask(std::move(task_source));
481 
482     // Alias pointer for investigation of memory corruption. crbug.com/1218384
483     TaskSource* task_source_before_move = task_source.get();
484     base::debug::Alias(&task_source_before_move);
485 
486     delegate_->DidProcessTask(std::move(task_source));
487 
488     // Check that task_source is always cleared, to help investigation of memory
489     // corruption where task_source is non-null after being moved.
490     // crbug.com/1218384
491     CHECK(!task_source);
492 
493     // Calling WakeUp() guarantees that this WorkerThread will run Tasks from
494     // TaskSources returned by the GetWork() method of |delegate_| until it
495     // returns nullptr. Resetting |wake_up_event_| here doesn't break this
496     // invariant and avoids a useless loop iteration before going to sleep if
497     // WakeUp() is called while this WorkerThread is awake.
498     wake_up_event_.Reset();
499   }
500 
501   // Important: It is unsafe to access unowned state (e.g. |task_tracker_|)
502   // after invoking OnMainExit().
503 
504   delegate_->OnMainExit(this);
505 
506   if (worker_thread_observer_)
507     worker_thread_observer_->OnWorkerThreadMainExit();
508 
509   // Release the self-reference to |this|. This can result in deleting |this|
510   // and as such no more member accesses should be made after this point.
511   self_ = nullptr;
512 
513   TRACE_EVENT_END0("base", "WorkerThread active");
514   TRACE_EVENT_INSTANT0("base", "WorkerThread dead", TRACE_EVENT_SCOPE_THREAD);
515   // TODO(crbug.com/1021571): Remove this once fixed.
516   PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
517 }
518 
519 }  // namespace base::internal
520