• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/worker_thread.h"
6 
7 #include <stddef.h>
8 
9 #include <algorithm>
10 #include <atomic>
11 #include <optional>
12 #include <utility>
13 
14 #include "base/check_op.h"
15 #include "base/compiler_specific.h"
16 #include "base/debug/alias.h"
17 #include "base/feature_list.h"
18 #include "base/functional/callback_helpers.h"
19 #include "base/synchronization/waitable_event.h"
20 #include "base/task/task_features.h"
21 #include "base/task/thread_pool/environment_config.h"
22 #include "base/task/thread_pool/worker_thread_observer.h"
23 #include "base/threading/hang_watcher.h"
24 #include "base/time/time.h"
25 #include "base/time/time_override.h"
26 #include "base/trace_event/base_tracing.h"
27 #include "build/build_config.h"
28 #include "partition_alloc/buildflags.h"
29 
30 #if PA_BUILDFLAG(USE_PARTITION_ALLOC)
31 #include "partition_alloc/partition_alloc_config.h"  // nogncheck
32 #endif
33 
34 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
35 #include "base/files/file_descriptor_watcher_posix.h"
36 #endif
37 
38 #if BUILDFLAG(IS_APPLE)
39 #include "base/apple/scoped_nsautorelease_pool.h"
40 #endif
41 
42 #if PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
43     PA_CONFIG(THREAD_CACHE_SUPPORTED)
44 #include "partition_alloc/thread_cache.h"  // nogncheck
45 #endif
46 
47 namespace base::internal {
48 
49 constexpr TimeDelta WorkerThread::Delegate::kPurgeThreadCacheIdleDelay;
50 
GetThreadLabel() const51 WorkerThread::ThreadLabel WorkerThread::Delegate::GetThreadLabel() const {
52   return WorkerThread::ThreadLabel::POOLED;
53 }
54 
TimedWait(TimeDelta timeout)55 bool WorkerThread::Delegate::TimedWait(TimeDelta timeout) {
56   return wake_up_event_.TimedWait(timeout);
57 }
58 
WaitForWork()59 void WorkerThread::Delegate::WaitForWork() {
60   const TimeDelta sleep_duration_before_worker_reclaim = GetSleepTimeout();
61 
62   // When a thread goes to sleep, the memory retained by its thread cache is
63   // trapped there for as long as the thread sleeps. To prevent that, we can
64   // either purge the thread cache right before going to sleep, or after some
65   // delay.
66   //
67   // Purging the thread cache incurs a cost on the next task, since its thread
68   // cache will be empty and allocation performance initially lower. As a lot of
69   // sleeps are very short, do not purge all the time (this would also make
70   // sleep / wakeups cycles more costly).
71   //
72   // Instead, sleep for min(timeout, 1s). If the wait times out then purge at
73   // that point, and go to sleep for the remaining of the time. This ensures
74   // that we do no work for short sleeps, and that threads do not get awaken
75   // many times.
76 #if PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
77     PA_CONFIG(THREAD_CACHE_SUPPORTED)
78   const TimeDelta sleep_duration_before_purge =
79       GetSleepDurationBeforePurge(base::TimeTicks::Now());
80 
81   const bool was_signaled = TimedWait(std::min(
82       sleep_duration_before_purge, sleep_duration_before_worker_reclaim));
83   // Timed out.
84   if (!was_signaled) {
85     partition_alloc::ThreadCache::PurgeCurrentThread();
86 
87     // The thread woke up to purge before its standard reclaim time. Sleep for
88     // what's remaining until then.
89     if (sleep_duration_before_worker_reclaim > sleep_duration_before_purge) {
90       TimedWait(sleep_duration_before_worker_reclaim.is_max()
91                     ? TimeDelta::Max()
92                     : sleep_duration_before_worker_reclaim -
93                           sleep_duration_before_purge);
94     }
95   }
96 #else
97   TimedWait(sleep_duration_before_worker_reclaim);
98 #endif  // PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
99         // PA_CONFIG(THREAD_CACHE_SUPPORTED)
100 }
101 
IsDelayFirstWorkerSleepEnabled()102 bool WorkerThread::Delegate::IsDelayFirstWorkerSleepEnabled() {
103   static bool state = FeatureList::IsEnabled(kDelayFirstWorkerWake);
104   return state;
105 }
106 
107 #if PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
108     PA_CONFIG(THREAD_CACHE_SUPPORTED)
GetSleepDurationBeforePurge(TimeTicks now)109 TimeDelta WorkerThread::Delegate::GetSleepDurationBeforePurge(TimeTicks now) {
110   base::TimeDelta sleep_duration_before_purge = kPurgeThreadCacheIdleDelay;
111 
112   if (!IsDelayFirstWorkerSleepEnabled()) {
113     return sleep_duration_before_purge;
114   }
115 
116   // Use the first time a worker goes to sleep in this process as an
117   // approximation of the process creation time.
118   static const TimeTicks first_sleep_time = now;
119   const TimeTicks first_sleep_time_to_use =
120       !first_sleep_time_for_testing_.is_null() ? first_sleep_time_for_testing_
121                                                : first_sleep_time;
122   const base::TimeTicks first_wake_time =
123       first_sleep_time_to_use + kFirstSleepDurationBeforePurge;
124 
125   // A sleep that occurs within `kFirstSleepDurationBeforePurge` of the
126   // first sleep lasts at least `kFirstSleepDurationBeforePurge`.
127   if (now <= first_wake_time) {
128     // Avoid sleeping for less than `sleep_duration_before_purge` since that is
129     // the shortest expected duration to wait for a purge.
130     sleep_duration_before_purge =
131         std::max(kFirstSleepDurationBeforePurge, sleep_duration_before_purge);
132   }
133 
134   // Align wakeups for purges to reduce the chances of taking the CPU out of
135   // sleep multiple times for these operations. This can happen if many workers
136   // in the same process scheduled wakeups. This can create a situation where
137   // any one worker wakes every `kPurgeThreadCacheIdleDelay` / N where N is the
138   // number of workers.
139   const TimeTicks snapped_purge_time =
140       (now + sleep_duration_before_purge)
141           .SnappedToNextTick(TimeTicks(), kPurgeThreadCacheIdleDelay);
142 
143   return snapped_purge_time - now;
144 }
145 
146 #endif  // PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
147         // PA_CONFIG(THREAD_CACHE_SUPPORTED)
148 
WorkerThread(ThreadType thread_type_hint,std::unique_ptr<Delegate> delegate,TrackedRef<TaskTracker> task_tracker,size_t sequence_num,const CheckedLock * predecessor_lock,void * flow_terminator)149 WorkerThread::WorkerThread(ThreadType thread_type_hint,
150                            std::unique_ptr<Delegate> delegate,
151                            TrackedRef<TaskTracker> task_tracker,
152                            size_t sequence_num,
153                            const CheckedLock* predecessor_lock,
154                            void* flow_terminator)
155     : thread_lock_(predecessor_lock),
156       task_tracker_(std::move(task_tracker)),
157       thread_type_hint_(thread_type_hint),
158       current_thread_type_(GetDesiredThreadType()),
159       sequence_num_(sequence_num),
160       flow_terminator_(flow_terminator == nullptr
161                            ? reinterpret_cast<intptr_t>(this)
162                            : reinterpret_cast<intptr_t>(flow_terminator)),
163       delegate_(std::move(delegate)) {
164   DCHECK(task_tracker_);
165   DCHECK(CanUseBackgroundThreadTypeForWorkerThread() ||
166          thread_type_hint_ != ThreadType::kBackground);
167   DCHECK(CanUseUtilityThreadTypeForWorkerThread() ||
168          thread_type_hint != ThreadType::kUtility);
169   DCHECK(delegate_);
170   delegate_->wake_up_event_.declare_only_used_while_idle();
171 }
172 
Start(scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,WorkerThreadObserver * worker_thread_observer)173 bool WorkerThread::Start(
174     scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
175     WorkerThreadObserver* worker_thread_observer) {
176   CheckedLock::AssertNoLockHeldOnCurrentThread();
177 
178   // Prime kDelayFirstWorkerWake's feature state right away on thread creation
179   // instead of looking it up for the first time later on thread as this avoids
180   // a data race in tests that may ~FeatureList while the first worker thread
181   // is still initializing (the first WorkerThread will be started on the main
182   // thread as part of ThreadPoolImpl::Start() so doing it then avoids this
183   // race), crbug.com/1344573.
184   // Note 1: the feature state is always available at this point as
185   // ThreadPoolInstance::Start() contractually happens-after FeatureList
186   // initialization.
187   // Note 2: This is done on Start instead of in the constructor as construction
188   // happens under a ThreadGroup lock which precludes calling into
189   // FeatureList (as that can also use a lock).
190   delegate()->IsDelayFirstWorkerSleepEnabled();
191 
192   CheckedAutoLock auto_lock(thread_lock_);
193   DCHECK(thread_handle_.is_null());
194 
195 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
196   DCHECK(io_thread_task_runner);
197   io_thread_task_runner_ = std::move(io_thread_task_runner);
198 #endif
199 
200   if (should_exit_.IsSet() || join_called_for_testing_.IsSet()) {
201     return true;
202   }
203 
204   DCHECK(!worker_thread_observer_);
205   worker_thread_observer_ = worker_thread_observer;
206 
207   self_ = this;
208 
209   constexpr size_t kDefaultStackSize = 0;
210   PlatformThread::CreateWithType(kDefaultStackSize, this, &thread_handle_,
211                                  current_thread_type_);
212 
213   if (thread_handle_.is_null()) {
214     self_ = nullptr;
215     return false;
216   }
217 
218   return true;
219 }
220 
Destroy()221 void WorkerThread::Destroy() {
222   CheckedAutoLock auto_lock(thread_lock_);
223 
224   // If |thread_handle_| wasn't joined, detach it.
225   if (!thread_handle_.is_null()) {
226     PlatformThread::Detach(thread_handle_);
227   }
228 }
229 
ThreadAliveForTesting() const230 bool WorkerThread::ThreadAliveForTesting() const {
231   CheckedAutoLock auto_lock(thread_lock_);
232   return !thread_handle_.is_null();
233 }
234 
JoinForTesting()235 void WorkerThread::JoinForTesting() {
236   DCHECK(!join_called_for_testing_.IsSet());
237   join_called_for_testing_.Set();
238   delegate_->wake_up_event_.Signal();
239 
240   PlatformThreadHandle thread_handle;
241 
242   {
243     CheckedAutoLock auto_lock(thread_lock_);
244 
245     if (thread_handle_.is_null()) {
246       return;
247     }
248 
249     thread_handle = thread_handle_;
250     // Reset |thread_handle_| so it isn't joined by the destructor.
251     thread_handle_ = PlatformThreadHandle();
252   }
253 
254   PlatformThread::Join(thread_handle);
255 }
256 
Cleanup()257 void WorkerThread::Cleanup() {
258   DCHECK(!should_exit_.IsSet());
259   should_exit_.Set();
260   delegate_->wake_up_event_.Signal();
261 }
262 
WakeUp()263 void WorkerThread::WakeUp() {
264   // Signalling an event can deschedule the current thread. Since being
265   // descheduled while holding a lock is undesirable (https://crbug.com/890978),
266   // assert that no lock is held by the current thread.
267   CheckedLock::AssertNoLockHeldOnCurrentThread();
268   // Calling WakeUp() after Cleanup() or Join() is wrong because the
269   // WorkerThread cannot run more tasks.
270   DCHECK(!join_called_for_testing_.IsSet());
271   DCHECK(!should_exit_.IsSet());
272   TRACE_EVENT_INSTANT("wakeup.flow", "WorkerThread::WakeUp",
273                       perfetto::Flow::FromPointer(this));
274 
275   delegate_->wake_up_event_.Signal();
276 }
277 
delegate()278 WorkerThread::Delegate* WorkerThread::delegate() {
279   return delegate_.get();
280 }
281 
~WorkerThread()282 WorkerThread::~WorkerThread() {
283   Destroy();
284 }
285 
MaybeUpdateThreadType()286 void WorkerThread::MaybeUpdateThreadType() {
287   UpdateThreadType(GetDesiredThreadType());
288 }
289 
BeginUnusedPeriod()290 void WorkerThread::BeginUnusedPeriod() {
291   CheckedAutoLock auto_lock(thread_lock_);
292   DCHECK(last_used_time_.is_null());
293   last_used_time_ = subtle::TimeTicksNowIgnoringOverride();
294 }
295 
EndUnusedPeriod()296 void WorkerThread::EndUnusedPeriod() {
297   CheckedAutoLock auto_lock(thread_lock_);
298   DCHECK(!last_used_time_.is_null());
299   last_used_time_ = TimeTicks();
300 }
301 
GetLastUsedTime() const302 TimeTicks WorkerThread::GetLastUsedTime() const {
303   CheckedAutoLock auto_lock(thread_lock_);
304   return last_used_time_;
305 }
306 
ShouldExit() const307 bool WorkerThread::ShouldExit() const {
308   // The ordering of the checks is important below. This WorkerThread may be
309   // released and outlive |task_tracker_| in unit tests. However, when the
310   // WorkerThread is released, |should_exit_| will be set, so check that
311   // first.
312   return should_exit_.IsSet() || join_called_for_testing_.IsSet() ||
313          task_tracker_->IsShutdownComplete();
314 }
315 
GetDesiredThreadType() const316 ThreadType WorkerThread::GetDesiredThreadType() const {
317   // To avoid shutdown hangs, disallow a type below kNormal during shutdown
318   if (task_tracker_->HasShutdownStarted())
319     return ThreadType::kDefault;
320 
321   return thread_type_hint_;
322 }
323 
UpdateThreadType(ThreadType desired_thread_type)324 void WorkerThread::UpdateThreadType(ThreadType desired_thread_type) {
325   if (desired_thread_type == current_thread_type_)
326     return;
327 
328   PlatformThread::SetCurrentThreadType(desired_thread_type);
329   current_thread_type_ = desired_thread_type;
330 }
331 
ThreadMain()332 void WorkerThread::ThreadMain() {
333 #if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
334   DCHECK(io_thread_task_runner_);
335   FileDescriptorWatcher file_descriptor_watcher(io_thread_task_runner_);
336 #endif
337 
338   if (thread_type_hint_ == ThreadType::kBackground) {
339     switch (delegate()->GetThreadLabel()) {
340       case ThreadLabel::POOLED:
341         RunBackgroundPooledWorker();
342         return;
343       case ThreadLabel::SHARED:
344         RunBackgroundSharedWorker();
345         return;
346       case ThreadLabel::DEDICATED:
347         RunBackgroundDedicatedWorker();
348         return;
349 #if BUILDFLAG(IS_WIN)
350       case ThreadLabel::SHARED_COM:
351         RunBackgroundSharedCOMWorker();
352         return;
353       case ThreadLabel::DEDICATED_COM:
354         RunBackgroundDedicatedCOMWorker();
355         return;
356 #endif  // BUILDFLAG(IS_WIN)
357     }
358   }
359 
360   switch (delegate()->GetThreadLabel()) {
361     case ThreadLabel::POOLED:
362       RunPooledWorker();
363       return;
364     case ThreadLabel::SHARED:
365       RunSharedWorker();
366       return;
367     case ThreadLabel::DEDICATED:
368       RunDedicatedWorker();
369       return;
370 #if BUILDFLAG(IS_WIN)
371     case ThreadLabel::SHARED_COM:
372       RunSharedCOMWorker();
373       return;
374     case ThreadLabel::DEDICATED_COM:
375       RunDedicatedCOMWorker();
376       return;
377 #endif  // BUILDFLAG(IS_WIN)
378   }
379 }
380 
RunPooledWorker()381 NOINLINE void WorkerThread::RunPooledWorker() {
382   RunWorker();
383   NO_CODE_FOLDING();
384 }
385 
RunBackgroundPooledWorker()386 NOINLINE void WorkerThread::RunBackgroundPooledWorker() {
387   RunWorker();
388   NO_CODE_FOLDING();
389 }
390 
RunSharedWorker()391 NOINLINE void WorkerThread::RunSharedWorker() {
392   RunWorker();
393   NO_CODE_FOLDING();
394 }
395 
RunBackgroundSharedWorker()396 NOINLINE void WorkerThread::RunBackgroundSharedWorker() {
397   RunWorker();
398   NO_CODE_FOLDING();
399 }
400 
RunDedicatedWorker()401 NOINLINE void WorkerThread::RunDedicatedWorker() {
402   RunWorker();
403   NO_CODE_FOLDING();
404 }
405 
RunBackgroundDedicatedWorker()406 NOINLINE void WorkerThread::RunBackgroundDedicatedWorker() {
407   RunWorker();
408   NO_CODE_FOLDING();
409 }
410 
411 #if BUILDFLAG(IS_WIN)
RunSharedCOMWorker()412 NOINLINE void WorkerThread::RunSharedCOMWorker() {
413   RunWorker();
414   NO_CODE_FOLDING();
415 }
416 
RunBackgroundSharedCOMWorker()417 NOINLINE void WorkerThread::RunBackgroundSharedCOMWorker() {
418   RunWorker();
419   NO_CODE_FOLDING();
420 }
421 
RunDedicatedCOMWorker()422 NOINLINE void WorkerThread::RunDedicatedCOMWorker() {
423   RunWorker();
424   NO_CODE_FOLDING();
425 }
426 
RunBackgroundDedicatedCOMWorker()427 NOINLINE void WorkerThread::RunBackgroundDedicatedCOMWorker() {
428   RunWorker();
429   NO_CODE_FOLDING();
430 }
431 #endif  // BUILDFLAG(IS_WIN)
432 
RunWorker()433 void WorkerThread::RunWorker() {
434   DCHECK_EQ(self_, this);
435   TRACE_EVENT_INSTANT0("base", "WorkerThread born", TRACE_EVENT_SCOPE_THREAD);
436   TRACE_EVENT_BEGIN0("base", "WorkerThread active");
437 
438   if (worker_thread_observer_) {
439     worker_thread_observer_->OnWorkerThreadMainEntry();
440   }
441 
442   delegate()->OnMainEntry(this);
443 
444   // Background threads can take an arbitrary amount of time to complete, do not
445   // watch them for hangs. Ignore priority boosting for now.
446   const bool watch_for_hangs =
447       base::HangWatcher::IsThreadPoolHangWatchingEnabled() &&
448       GetDesiredThreadType() != ThreadType::kBackground;
449 
450   // If this process has a HangWatcher register this thread for watching.
451   base::ScopedClosureRunner unregister_for_hang_watching;
452   if (watch_for_hangs) {
453     unregister_for_hang_watching = base::HangWatcher::RegisterThread(
454         base::HangWatcher::ThreadType::kThreadPoolThread);
455   }
456 
457   while (!ShouldExit()) {
458 #if BUILDFLAG(IS_APPLE)
459     apple::ScopedNSAutoreleasePool autorelease_pool;
460 #endif
461     std::optional<WatchHangsInScope> hang_watch_scope;
462 
463     TRACE_EVENT_END0("base", "WorkerThread active");
464     hang_watch_scope.reset();
465     delegate()->WaitForWork();
466     TRACE_EVENT_BEGIN("base", "WorkerThread active",
467                       perfetto::TerminatingFlow::FromPointer(
468                           reinterpret_cast<void*>(flow_terminator_)));
469 
470     // Don't GetWork() in the case where we woke up for Cleanup().
471     if (ShouldExit()) {
472       break;
473     }
474 
475     if (watch_for_hangs) {
476       hang_watch_scope.emplace();
477     }
478 
479     // Thread type needs to be updated before GetWork.
480     UpdateThreadType(GetDesiredThreadType());
481 
482     // Get the task source containing the first task to execute.
483     RegisteredTaskSource task_source = delegate()->GetWork(this);
484 
485     // If acquiring work failed and the worker's still alive,
486     // record that this is an unnecessary wakeup.
487     if (!task_source && !ShouldExit()) {
488       delegate()->RecordUnnecessaryWakeup();
489     }
490 
491     while (task_source) {
492       // Alias pointer for investigation of memory corruption. crbug.com/1218384
493       TaskSource* task_source_before_run = task_source.get();
494       base::debug::Alias(&task_source_before_run);
495 
496       task_source = task_tracker_->RunAndPopNextTask(std::move(task_source));
497       // Alias pointer for investigation of memory corruption. crbug.com/1218384
498       TaskSource* task_source_before_move = task_source.get();
499       base::debug::Alias(&task_source_before_move);
500 
501       // We emplace the hang_watch_scope here so that each hang watch scope
502       // covers one GetWork (or SwapProcessedTask) as well as one
503       // RunAndPopNextTask.
504       if (watch_for_hangs) {
505         hang_watch_scope.emplace();
506       }
507 
508       RegisteredTaskSource new_task_source =
509           delegate()->SwapProcessedTask(std::move(task_source), this);
510 
511       UpdateThreadType(GetDesiredThreadType());
512 
513       // Check that task_source is always cleared, to help investigation of
514       // memory corruption where task_source is non-null after being moved.
515       // crbug.com/1218384
516       CHECK(!task_source);
517       task_source = std::move(new_task_source);
518     }
519   }
520 
521   // Important: It is unsafe to access unowned state (e.g. |task_tracker_|)
522   // after invoking OnMainExit().
523 
524   delegate()->OnMainExit(this);
525 
526   if (worker_thread_observer_) {
527     worker_thread_observer_->OnWorkerThreadMainExit();
528   }
529 
530   // Release the self-reference to |this|. This can result in deleting |this|
531   // and as such no more member accesses should be made after this point.
532   self_ = nullptr;
533 
534   TRACE_EVENT_END0("base", "WorkerThread active");
535   TRACE_EVENT_INSTANT0("base", "WorkerThread dead", TRACE_EVENT_SCOPE_THREAD);
536 }
537 
538 }  // namespace base::internal
539