• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/worker_thread.h"
6 
7 #include <stddef.h>
8 
9 #include <atomic>
10 #include <memory>
11 #include <utility>
12 #include <vector>
13 
14 #include "base/compiler_specific.h"
15 #include "base/functional/bind.h"
16 #include "base/functional/callback_helpers.h"
17 #include "base/memory/ptr_util.h"
18 #include "base/memory/raw_ptr.h"
19 #include "base/memory/ref_counted.h"
20 #include "base/message_loop/message_pump_type.h"
21 #include "base/synchronization/condition_variable.h"
22 #include "base/task/common/checked_lock.h"
23 #include "base/task/thread_pool/environment_config.h"
24 #include "base/task/thread_pool/sequence.h"
25 #include "base/task/thread_pool/task.h"
26 #include "base/task/thread_pool/task_tracker.h"
27 #include "base/task/thread_pool/test_utils.h"
28 #include "base/task/thread_pool/worker_thread_observer.h"
29 #include "base/test/bind.h"
30 #include "base/test/test_timeouts.h"
31 #include "base/test/test_waitable_event.h"
32 #include "base/threading/platform_thread.h"
33 #include "base/threading/simple_thread.h"
34 #include "base/threading/thread.h"
35 #include "base/time/time.h"
36 #include "build/build_config.h"
37 #include "partition_alloc/buildflags.h"
38 #include "partition_alloc/partition_alloc_config.h"
39 #include "partition_alloc/shim/allocator_shim.h"
40 #include "partition_alloc/shim/allocator_shim_default_dispatch_to_partition_alloc.h"
41 #include "testing/gmock/include/gmock/gmock.h"
42 #include "testing/gtest/include/gtest/gtest.h"
43 
44 #if PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
45     PA_CONFIG(THREAD_CACHE_SUPPORTED)
46 #include "partition_alloc/extended_api.h"
47 #include "partition_alloc/thread_cache.h"
48 #endif  // PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
49         // PA_CONFIG(THREAD_CACHE_SUPPORTED)
50 
51 using testing::_;
52 using testing::Mock;
53 using testing::Ne;
54 using testing::StrictMock;
55 
56 namespace base::internal {
57 namespace {
58 
59 const size_t kNumSequencesPerTest = 150;
60 
61 class WorkerThreadDefaultDelegate : public WorkerThread::Delegate {
62  public:
63   WorkerThreadDefaultDelegate() = default;
64   WorkerThreadDefaultDelegate(const WorkerThreadDefaultDelegate&) = delete;
65   WorkerThreadDefaultDelegate& operator=(const WorkerThreadDefaultDelegate&) =
66       delete;
67 
68   // WorkerThread::Delegate:
GetThreadLabel() const69   WorkerThread::ThreadLabel GetThreadLabel() const override {
70     return WorkerThread::ThreadLabel::DEDICATED;
71   }
OnMainEntry(WorkerThread * worker)72   void OnMainEntry(WorkerThread* worker) override {}
GetWork(WorkerThread * worker)73   RegisteredTaskSource GetWork(WorkerThread* worker) override {
74     return nullptr;
75   }
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)76   RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
77                                          WorkerThread* worker) override {
78     ADD_FAILURE() << "Unexpected call to SwapProcessedTask()";
79     return nullptr;
80   }
GetSleepTimeout()81   TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
82 };
83 // The test parameter is the number of Tasks per Sequence returned by GetWork().
84 
85 class ThreadPoolWorkerTest : public testing::Test {
86  public:
87   ThreadPoolWorkerTest(const ThreadPoolWorkerTest&) = delete;
88   ThreadPoolWorkerTest& operator=(const ThreadPoolWorkerTest&) = delete;
89 
90  protected:
ThreadPoolWorkerTest()91   ThreadPoolWorkerTest() {
92     Thread::Options service_thread_options;
93     service_thread_options.message_pump_type = MessagePumpType::IO;
94     service_thread_.StartWithOptions(std::move(service_thread_options));
95   }
96 
97   Thread service_thread_ = Thread("ServiceThread");
98 };
99 
100 // The test parameter is the number of Tasks per Sequence returned by GetWork().
101 class ThreadPoolWorkerTestParam : public testing::TestWithParam<int> {
102  public:
103   ThreadPoolWorkerTestParam(const ThreadPoolWorkerTestParam&) = delete;
104   ThreadPoolWorkerTestParam& operator=(const ThreadPoolWorkerTestParam&) =
105       delete;
106 
107  protected:
ThreadPoolWorkerTestParam()108   ThreadPoolWorkerTestParam()
109       : num_get_work_cv_(lock_.CreateConditionVariable()) {
110     Thread::Options service_thread_options;
111     service_thread_options.message_pump_type = MessagePumpType::IO;
112     service_thread_.StartWithOptions(std::move(service_thread_options));
113   }
114 
SetUp()115   void SetUp() override {
116     worker_ = MakeRefCounted<WorkerThread>(
117         ThreadType::kDefault,
118         std::make_unique<TestWorkerThreadDelegate>(this),
119         task_tracker_.GetTrackedRef(), 0);
120     ASSERT_TRUE(worker_);
121     worker_->Start(service_thread_.task_runner());
122     worker_set_.Signal();
123     main_entry_called_.Wait();
124   }
125 
TearDown()126   void TearDown() override {
127     // |worker_| needs to be released before ~TaskTracker() as it holds a
128     // TrackedRef to it.
129     worker_->JoinForTesting();
130     worker_ = nullptr;
131   }
132 
TasksPerSequence() const133   int TasksPerSequence() const { return GetParam(); }
134 
135   // Wait until GetWork() has been called |num_get_work| times.
WaitForNumGetWork(size_t num_get_work)136   void WaitForNumGetWork(size_t num_get_work) {
137     CheckedAutoLock auto_lock(lock_);
138     while (num_get_work_ < num_get_work) {
139       num_get_work_cv_.Wait();
140     }
141   }
142 
SetMaxGetWork(size_t max_get_work)143   void SetMaxGetWork(size_t max_get_work) {
144     CheckedAutoLock auto_lock(lock_);
145     max_get_work_ = max_get_work;
146   }
147 
SetNumSequencesToCreate(size_t num_sequences_to_create)148   void SetNumSequencesToCreate(size_t num_sequences_to_create) {
149     CheckedAutoLock auto_lock(lock_);
150     EXPECT_EQ(0U, num_sequences_to_create_);
151     num_sequences_to_create_ = num_sequences_to_create;
152   }
153 
NumRunTasks()154   size_t NumRunTasks() {
155     CheckedAutoLock auto_lock(lock_);
156     return num_run_tasks_;
157   }
158 
CreatedTaskSources()159   std::vector<scoped_refptr<TaskSource>> CreatedTaskSources() {
160     CheckedAutoLock auto_lock(lock_);
161     return created_sequences_;
162   }
163 
DidProcessTaskSequences()164   std::vector<scoped_refptr<TaskSource>> DidProcessTaskSequences() {
165     CheckedAutoLock auto_lock(lock_);
166     return did_run_task_sources_;
167   }
168 
169   scoped_refptr<WorkerThread> worker_;
170   Thread service_thread_ = Thread("ServiceThread");
171 
172  private:
173   class TestWorkerThreadDelegate
174       : public WorkerThreadDefaultDelegate {
175    public:
TestWorkerThreadDelegate(ThreadPoolWorkerTestParam * outer)176     explicit TestWorkerThreadDelegate(ThreadPoolWorkerTestParam* outer)
177         : outer_(outer) {}
178     TestWorkerThreadDelegate(
179         const TestWorkerThreadDelegate&) = delete;
180     TestWorkerThreadDelegate& operator=(
181         const TestWorkerThreadDelegate&) = delete;
182 
~TestWorkerThreadDelegate()183     ~TestWorkerThreadDelegate() override {
184       EXPECT_FALSE(IsCallToDidProcessTaskExpected());
185     }
186 
187     // WorkerThread::Delegate:
OnMainEntry(WorkerThread * worker)188     void OnMainEntry(WorkerThread* worker) override {
189       outer_->worker_set_.Wait();
190       EXPECT_EQ(outer_->worker_.get(), worker);
191       EXPECT_FALSE(IsCallToDidProcessTaskExpected());
192 
193       // Without synchronization, OnMainEntry() could be called twice without
194       // generating an error.
195       CheckedAutoLock auto_lock(outer_->lock_);
196       EXPECT_FALSE(outer_->main_entry_called_.IsSignaled());
197       outer_->main_entry_called_.Signal();
198     }
199 
GetWork(WorkerThread * worker)200     RegisteredTaskSource GetWork(WorkerThread* worker) override {
201       EXPECT_FALSE(IsCallToDidProcessTaskExpected());
202       EXPECT_EQ(outer_->worker_.get(), worker);
203 
204       {
205         CheckedAutoLock auto_lock(outer_->lock_);
206 
207         // Increment the number of times that this method has been called.
208         ++outer_->num_get_work_;
209         outer_->num_get_work_cv_.Signal();
210 
211         // Verify that this method isn't called more times than expected.
212         EXPECT_LE(outer_->num_get_work_, outer_->max_get_work_);
213 
214         // Check if a Sequence should be returned.
215         if (outer_->num_sequences_to_create_ == 0) {
216           return nullptr;
217         }
218         --outer_->num_sequences_to_create_;
219       }
220 
221       // Create a Sequence with TasksPerSequence() Tasks.
222       scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
223           TaskTraits(), nullptr, TaskSourceExecutionMode::kParallel);
224       Sequence::Transaction sequence_transaction(sequence->BeginTransaction());
225       for (int i = 0; i < outer_->TasksPerSequence(); ++i) {
226         Task task(FROM_HERE,
227                   BindOnce(&ThreadPoolWorkerTestParam::RunTaskCallback,
228                            Unretained(outer_)),
229                   TimeTicks::Now(), TimeDelta());
230         sequence_transaction.WillPushImmediateTask();
231         EXPECT_TRUE(outer_->task_tracker_.WillPostTask(
232             &task, sequence->shutdown_behavior()));
233         sequence_transaction.PushImmediateTask(std::move(task));
234       }
235       auto registered_task_source =
236           outer_->task_tracker_.RegisterTaskSource(sequence);
237       EXPECT_TRUE(registered_task_source);
238 
239       ExpectCallToDidProcessTask();
240 
241       {
242         // Add the Sequence to the vector of created Sequences.
243         CheckedAutoLock auto_lock(outer_->lock_);
244         outer_->created_sequences_.push_back(sequence);
245       }
246       auto run_status = registered_task_source.WillRunTask();
247       EXPECT_NE(run_status, TaskSource::RunStatus::kDisallowed);
248       return registered_task_source;
249     }
250 
SwapProcessedTask(RegisteredTaskSource registered_task_source,WorkerThread * worker)251     RegisteredTaskSource SwapProcessedTask(
252         RegisteredTaskSource registered_task_source,
253         WorkerThread* worker) override {
254       {
255         CheckedAutoLock auto_lock(expect_did_run_task_lock_);
256         EXPECT_TRUE(expect_did_run_task_);
257         expect_did_run_task_ = false;
258       }
259 
260       // If TasksPerSequence() is 1, |registered_task_source| should be nullptr.
261       // Otherwise, |registered_task_source| should contain TasksPerSequence() -
262       // 1 Tasks.
263       if (outer_->TasksPerSequence() == 1) {
264         EXPECT_FALSE(registered_task_source);
265       } else {
266         EXPECT_TRUE(registered_task_source);
267         EXPECT_TRUE(registered_task_source.WillReEnqueue(TimeTicks::Now()));
268 
269         // Verify the number of Tasks in |registered_task_source|.
270         for (int i = 0; i < outer_->TasksPerSequence() - 1; ++i) {
271           registered_task_source.WillRunTask();
272           IgnoreResult(registered_task_source.TakeTask());
273           if (i < outer_->TasksPerSequence() - 2) {
274             EXPECT_TRUE(registered_task_source.DidProcessTask());
275             EXPECT_TRUE(registered_task_source.WillReEnqueue(TimeTicks::Now()));
276           } else {
277             EXPECT_FALSE(registered_task_source.DidProcessTask());
278           }
279         }
280         scoped_refptr<TaskSource> task_source =
281             registered_task_source.Unregister();
282         {
283           // Add |task_source| to |did_run_task_sources_|.
284           CheckedAutoLock auto_lock(outer_->lock_);
285           outer_->did_run_task_sources_.push_back(std::move(task_source));
286           EXPECT_LE(outer_->did_run_task_sources_.size(),
287                     outer_->created_sequences_.size());
288         }
289       }
290       return GetWork(worker);
291     }
292 
293    private:
294     // Expect a call to DidProcessTask() before the next call to any other
295     // method of this delegate.
ExpectCallToDidProcessTask()296     void ExpectCallToDidProcessTask() {
297       CheckedAutoLock auto_lock(expect_did_run_task_lock_);
298       expect_did_run_task_ = true;
299     }
300 
IsCallToDidProcessTaskExpected() const301     bool IsCallToDidProcessTaskExpected() const {
302       CheckedAutoLock auto_lock(expect_did_run_task_lock_);
303       return expect_did_run_task_;
304     }
305 
306     raw_ptr<ThreadPoolWorkerTestParam> outer_;
307 
308     // Synchronizes access to |expect_did_run_task_|.
309     mutable CheckedLock expect_did_run_task_lock_;
310 
311     // Whether the next method called on this delegate should be
312     // DidProcessTask().
313     bool expect_did_run_task_ = false;
314   };
315 
RunTaskCallback()316   void RunTaskCallback() {
317     CheckedAutoLock auto_lock(lock_);
318     ++num_run_tasks_;
319     EXPECT_LE(num_run_tasks_, created_sequences_.size());
320   }
321 
322   TaskTracker task_tracker_;
323 
324   // Synchronizes access to all members below.
325   mutable CheckedLock lock_;
326 
327   // Signaled once OnMainEntry() has been called.
328   TestWaitableEvent main_entry_called_;
329 
330   // Number of Sequences that should be created by GetWork(). When this
331   // is 0, GetWork() returns nullptr.
332   size_t num_sequences_to_create_ = 0;
333 
334   // Number of times that GetWork() has been called.
335   size_t num_get_work_ = 0;
336 
337   // Maximum number of times that GetWork() can be called.
338   size_t max_get_work_ = 0;
339 
340   // Condition variable signaled when |num_get_work_| is incremented.
341   ConditionVariable num_get_work_cv_;
342 
343   // Sequences created by GetWork().
344   std::vector<scoped_refptr<TaskSource>> created_sequences_;
345 
346   // Sequences passed to DidProcessTask().
347   std::vector<scoped_refptr<TaskSource>> did_run_task_sources_;
348 
349   // Number of times that RunTaskCallback() has been called.
350   size_t num_run_tasks_ = 0;
351 
352   // Signaled after |worker_| is set.
353   TestWaitableEvent worker_set_;
354 };
355 
356 }  // namespace
357 
358 // Verify that when GetWork() continuously returns Sequences, all Tasks in these
359 // Sequences run successfully. The test wakes up the WorkerThread once.
TEST_P(ThreadPoolWorkerTestParam,ContinuousWork)360 TEST_P(ThreadPoolWorkerTestParam, ContinuousWork) {
361   // Set GetWork() to return |kNumSequencesPerTest| Sequences before starting to
362   // return nullptr.
363   SetNumSequencesToCreate(kNumSequencesPerTest);
364 
365   // Expect |kNumSequencesPerTest| calls to GetWork() in which it returns a
366   // Sequence and one call in which its returns nullptr.
367   const size_t kExpectedNumGetWork = kNumSequencesPerTest + 1;
368   SetMaxGetWork(kExpectedNumGetWork);
369 
370   // Wake up |worker_| and wait until GetWork() has been invoked the
371   // expected amount of times.
372   worker_->WakeUp();
373   WaitForNumGetWork(kExpectedNumGetWork);
374 
375   // All tasks should have run.
376   EXPECT_EQ(kNumSequencesPerTest, NumRunTasks());
377 
378   // If Sequences returned by GetWork() contain more than one Task, they aren't
379   // empty after the worker pops Tasks from them and thus should be returned to
380   // DidProcessTask().
381   if (TasksPerSequence() > 1) {
382     EXPECT_EQ(CreatedTaskSources(), DidProcessTaskSequences());
383   } else {
384     EXPECT_TRUE(DidProcessTaskSequences().empty());
385   }
386 }
387 
388 // Verify that when GetWork() alternates between returning a Sequence and
389 // returning nullptr, all Tasks in the returned Sequences run successfully. The
390 // test wakes up the WorkerThread once for each Sequence.
TEST_P(ThreadPoolWorkerTestParam,IntermittentWork)391 TEST_P(ThreadPoolWorkerTestParam, IntermittentWork) {
392   for (size_t i = 0; i < kNumSequencesPerTest; ++i) {
393     // Set GetWork() to return 1 Sequence before starting to return
394     // nullptr.
395     SetNumSequencesToCreate(1);
396 
397     // Expect |i + 1| calls to GetWork() in which it returns a Sequence and
398     // |i + 1| calls in which it returns nullptr.
399     const size_t expected_num_get_work = 2 * (i + 1);
400     SetMaxGetWork(expected_num_get_work);
401 
402     // Wake up |worker_| and wait until GetWork() has been invoked
403     // the expected amount of times.
404     worker_->WakeUp();
405     WaitForNumGetWork(expected_num_get_work);
406 
407     // The Task should have run
408     EXPECT_EQ(i + 1, NumRunTasks());
409 
410     // If Sequences returned by GetWork() contain more than one Task, they
411     // aren't empty after the worker pops Tasks from them and thus should be
412     // returned to DidProcessTask().
413     if (TasksPerSequence() > 1) {
414       EXPECT_EQ(CreatedTaskSources(), DidProcessTaskSequences());
415     } else {
416       EXPECT_TRUE(DidProcessTaskSequences().empty());
417     }
418   }
419 }
420 
421 INSTANTIATE_TEST_SUITE_P(OneTaskPerSequence,
422                          ThreadPoolWorkerTestParam,
423                          ::testing::Values(1));
424 INSTANTIATE_TEST_SUITE_P(TwoTasksPerSequence,
425                          ThreadPoolWorkerTestParam,
426                          ::testing::Values(2));
427 
428 namespace {
429 
430 class ControllableCleanupDelegate : public WorkerThreadDefaultDelegate {
431  public:
432   class Controls : public RefCountedThreadSafe<Controls> {
433    public:
434     Controls() = default;
435     Controls(const Controls&) = delete;
436     Controls& operator=(const Controls&) = delete;
437 
HaveWorkBlock()438     void HaveWorkBlock() { work_running_.Reset(); }
439 
UnblockWork()440     void UnblockWork() { work_running_.Signal(); }
441 
WaitForWorkToRun()442     void WaitForWorkToRun() { work_processed_.Wait(); }
443 
WaitForCleanupRequest()444     void WaitForCleanupRequest() { cleanup_requested_.Wait(); }
445 
WaitForDelegateDestroy()446     void WaitForDelegateDestroy() { destroyed_.Wait(); }
447 
WaitForMainExit()448     void WaitForMainExit() { exited_.Wait(); }
449 
set_expect_get_work(bool expect_get_work)450     void set_expect_get_work(bool expect_get_work) {
451       expect_get_work_ = expect_get_work;
452     }
453 
ResetState()454     void ResetState() {
455       work_running_.Signal();
456       work_processed_.Reset();
457       cleanup_requested_.Reset();
458       exited_.Reset();
459       work_requested_ = false;
460     }
461 
set_can_cleanup(bool can_cleanup)462     void set_can_cleanup(bool can_cleanup) { can_cleanup_ = can_cleanup; }
463 
464    private:
465     friend class ControllableCleanupDelegate;
466     friend class RefCountedThreadSafe<Controls>;
467     ~Controls() = default;
468 
469     TestWaitableEvent work_running_{WaitableEvent::ResetPolicy::MANUAL,
470                                     WaitableEvent::InitialState::SIGNALED};
471     TestWaitableEvent work_processed_;
472     TestWaitableEvent cleanup_requested_;
473     TestWaitableEvent destroyed_;
474     TestWaitableEvent exited_;
475 
476     bool expect_get_work_ = true;
477     bool can_cleanup_ = false;
478     bool work_requested_ = false;
479   };
480 
ControllableCleanupDelegate(TaskTracker * task_tracker)481   explicit ControllableCleanupDelegate(TaskTracker* task_tracker)
482       : task_tracker_(task_tracker), controls_(new Controls()) {}
483 
484   ControllableCleanupDelegate(const ControllableCleanupDelegate&) = delete;
485   ControllableCleanupDelegate& operator=(const ControllableCleanupDelegate&) =
486       delete;
~ControllableCleanupDelegate()487   ~ControllableCleanupDelegate() override { controls_->destroyed_.Signal(); }
488 
GetWork(WorkerThread * worker)489   RegisteredTaskSource GetWork(WorkerThread* worker) override {
490     EXPECT_TRUE(controls_->expect_get_work_);
491 
492     // Sends one item of work to signal |work_processed_|. On subsequent calls,
493     // sends nullptr to indicate there's no more work to be done.
494     if (controls_->work_requested_) {
495       if (CanCleanup(worker)) {
496         OnCleanup();
497         worker->Cleanup();
498         controls_->set_expect_get_work(false);
499       }
500       return nullptr;
501     }
502 
503     controls_->work_requested_ = true;
504     scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
505         TaskTraits(WithBaseSyncPrimitives(),
506                    TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN),
507         nullptr, TaskSourceExecutionMode::kParallel);
508     Task task(FROM_HERE,
509               BindOnce(
510                   [](TestWaitableEvent* work_processed,
511                      TestWaitableEvent* work_running) {
512                     work_processed->Signal();
513                     work_running->Wait();
514                   },
515                   Unretained(&controls_->work_processed_),
516                   Unretained(&controls_->work_running_)),
517               TimeTicks::Now(), TimeDelta());
518     auto transaction = sequence->BeginTransaction();
519     transaction.WillPushImmediateTask();
520     EXPECT_TRUE(
521         task_tracker_->WillPostTask(&task, sequence->shutdown_behavior()));
522     transaction.PushImmediateTask(std::move(task));
523     auto registered_task_source =
524         task_tracker_->RegisterTaskSource(std::move(sequence));
525     EXPECT_TRUE(registered_task_source);
526     registered_task_source.WillRunTask();
527     return registered_task_source;
528   }
529 
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)530   RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
531                                          WorkerThread* worker) override {
532     return GetWork(worker);
533   }
534 
OnMainExit(WorkerThread * worker)535   void OnMainExit(WorkerThread* worker) override {
536     controls_->exited_.Signal();
537   }
538 
CanCleanup(WorkerThread * worker)539   bool CanCleanup(WorkerThread* worker) {
540     // Saving |can_cleanup_| now so that callers waiting on |cleanup_requested_|
541     // have the thread go to sleep and then allow timing out.
542     bool can_cleanup = controls_->can_cleanup_;
543     controls_->cleanup_requested_.Signal();
544     return can_cleanup;
545   }
546 
OnCleanup()547   void OnCleanup() {
548     EXPECT_TRUE(controls_->can_cleanup_);
549     EXPECT_TRUE(controls_->cleanup_requested_.IsSignaled());
550   }
551 
552   // ControllableCleanupDelegate:
controls()553   scoped_refptr<Controls> controls() { return controls_; }
554 
555  private:
556   scoped_refptr<Sequence> work_sequence_;
557   const raw_ptr<TaskTracker> task_tracker_;
558   scoped_refptr<Controls> controls_;
559 };
560 
561 class MockedControllableCleanupDelegate : public ControllableCleanupDelegate {
562  public:
MockedControllableCleanupDelegate(TaskTracker * task_tracker)563   explicit MockedControllableCleanupDelegate(TaskTracker* task_tracker)
564       : ControllableCleanupDelegate(task_tracker) {}
565   MockedControllableCleanupDelegate(const MockedControllableCleanupDelegate&) =
566       delete;
567   MockedControllableCleanupDelegate& operator=(
568       const MockedControllableCleanupDelegate&) = delete;
569   ~MockedControllableCleanupDelegate() override = default;
570 
571   // WorkerThread::Delegate:
572   MOCK_METHOD1(OnMainEntry, void(WorkerThread* worker));
573 };
574 
575 }  // namespace
576 
577 // Verify that calling WorkerThread::Cleanup() from GetWork() causes
578 // the WorkerThread's thread to exit.
TEST_F(ThreadPoolWorkerTest,WorkerCleanupFromGetWork)579 TEST_F(ThreadPoolWorkerTest, WorkerCleanupFromGetWork) {
580   TaskTracker task_tracker;
581   // Will be owned by WorkerThread.
582   MockedControllableCleanupDelegate* delegate =
583       new StrictMock<MockedControllableCleanupDelegate>(&task_tracker);
584   scoped_refptr<ControllableCleanupDelegate::Controls> controls =
585       delegate->controls();
586   controls->set_can_cleanup(true);
587   EXPECT_CALL(*delegate, OnMainEntry(_));
588   auto worker = MakeRefCounted<WorkerThread>(
589       ThreadType::kDefault, WrapUnique(delegate), task_tracker.GetTrackedRef(),
590       0);
591   worker->Start(service_thread_.task_runner());
592   worker->WakeUp();
593   controls->WaitForWorkToRun();
594   Mock::VerifyAndClear(delegate);
595   controls->WaitForMainExit();
596   // Join the worker to avoid leaks.
597   worker->JoinForTesting();
598 }
599 
TEST_F(ThreadPoolWorkerTest,WorkerCleanupDuringWork)600 TEST_F(ThreadPoolWorkerTest, WorkerCleanupDuringWork) {
601   TaskTracker task_tracker;
602   // Will be owned by WorkerThread.
603   // No mock here as that's reasonably covered by other tests and the delegate
604   // may destroy on a different thread. Mocks aren't designed with that in mind.
605   std::unique_ptr<ControllableCleanupDelegate> delegate =
606       std::make_unique<ControllableCleanupDelegate>(&task_tracker);
607   scoped_refptr<ControllableCleanupDelegate::Controls> controls =
608       delegate->controls();
609 
610   controls->HaveWorkBlock();
611 
612   auto worker = MakeRefCounted<WorkerThread>(
613       ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
614       0);
615   worker->Start(service_thread_.task_runner());
616   worker->WakeUp();
617 
618   controls->WaitForWorkToRun();
619   worker->Cleanup();
620   worker = nullptr;
621   controls->UnblockWork();
622   controls->WaitForDelegateDestroy();
623 }
624 
TEST_F(ThreadPoolWorkerTest,WorkerCleanupDuringWait)625 TEST_F(ThreadPoolWorkerTest, WorkerCleanupDuringWait) {
626   TaskTracker task_tracker;
627   // Will be owned by WorkerThread.
628   // No mock here as that's reasonably covered by other tests and the delegate
629   // may destroy on a different thread. Mocks aren't designed with that in mind.
630   std::unique_ptr<ControllableCleanupDelegate> delegate =
631       std::make_unique<ControllableCleanupDelegate>(&task_tracker);
632   scoped_refptr<ControllableCleanupDelegate::Controls> controls =
633       delegate->controls();
634 
635   auto worker = MakeRefCounted<WorkerThread>(
636       ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
637       0);
638   worker->Start(service_thread_.task_runner());
639   worker->WakeUp();
640 
641   controls->WaitForCleanupRequest();
642   worker->Cleanup();
643   worker = nullptr;
644   controls->WaitForDelegateDestroy();
645 }
646 
TEST_F(ThreadPoolWorkerTest,WorkerCleanupDuringShutdown)647 TEST_F(ThreadPoolWorkerTest, WorkerCleanupDuringShutdown) {
648   TaskTracker task_tracker;
649   // Will be owned by WorkerThread.
650   // No mock here as that's reasonably covered by other tests and the delegate
651   // may destroy on a different thread. Mocks aren't designed with that in mind.
652   std::unique_ptr<ControllableCleanupDelegate> delegate =
653       std::make_unique<ControllableCleanupDelegate>(&task_tracker);
654   scoped_refptr<ControllableCleanupDelegate::Controls> controls =
655       delegate->controls();
656 
657   controls->HaveWorkBlock();
658 
659   auto worker = MakeRefCounted<WorkerThread>(
660       ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
661       0);
662   worker->Start(service_thread_.task_runner());
663   worker->WakeUp();
664 
665   controls->WaitForWorkToRun();
666   test::ShutdownTaskTracker(&task_tracker);
667   worker->Cleanup();
668   worker = nullptr;
669   controls->UnblockWork();
670   controls->WaitForDelegateDestroy();
671 }
672 
673 // Verify that Start() is a no-op after Cleanup().
TEST_F(ThreadPoolWorkerTest,CleanupBeforeStart)674 TEST_F(ThreadPoolWorkerTest, CleanupBeforeStart) {
675   TaskTracker task_tracker;
676   // Will be owned by WorkerThread.
677   // No mock here as that's reasonably covered by other tests and the delegate
678   // may destroy on a different thread. Mocks aren't designed with that in mind.
679   std::unique_ptr<ControllableCleanupDelegate> delegate =
680       std::make_unique<ControllableCleanupDelegate>(&task_tracker);
681   scoped_refptr<ControllableCleanupDelegate::Controls> controls =
682       delegate->controls();
683   controls->set_expect_get_work(false);
684 
685   auto worker = MakeRefCounted<WorkerThread>(
686       ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
687       0);
688 
689   worker->Cleanup();
690   worker->Start(service_thread_.task_runner());
691 
692   EXPECT_FALSE(worker->ThreadAliveForTesting());
693 }
694 
695 namespace {
696 
697 class CallJoinFromDifferentThread : public SimpleThread {
698  public:
CallJoinFromDifferentThread(WorkerThread * worker_to_join)699   explicit CallJoinFromDifferentThread(
700       WorkerThread* worker_to_join)
701       : SimpleThread("WorkerThreadJoinThread"),
702         worker_to_join_(worker_to_join) {}
703 
704   CallJoinFromDifferentThread(const CallJoinFromDifferentThread&) = delete;
705   CallJoinFromDifferentThread& operator=(const CallJoinFromDifferentThread&) =
706       delete;
707   ~CallJoinFromDifferentThread() override = default;
708 
Run()709   void Run() override {
710     run_started_event_.Signal();
711     worker_to_join_.ExtractAsDangling()->JoinForTesting();
712   }
713 
WaitForRunToStart()714   void WaitForRunToStart() { run_started_event_.Wait(); }
715 
716  private:
717   raw_ptr<WorkerThread> worker_to_join_;
718   TestWaitableEvent run_started_event_;
719 };
720 
721 }  // namespace
722 
TEST_F(ThreadPoolWorkerTest,WorkerCleanupDuringJoin)723 TEST_F(ThreadPoolWorkerTest, WorkerCleanupDuringJoin) {
724   TaskTracker task_tracker;
725   // Will be owned by WorkerThread.
726   // No mock here as that's reasonably covered by other tests and the
727   // delegate may destroy on a different thread. Mocks aren't designed with that
728   // in mind.
729   std::unique_ptr<ControllableCleanupDelegate> delegate =
730       std::make_unique<ControllableCleanupDelegate>(&task_tracker);
731   scoped_refptr<ControllableCleanupDelegate::Controls> controls =
732       delegate->controls();
733 
734   controls->HaveWorkBlock();
735 
736   auto worker = MakeRefCounted<WorkerThread>(
737       ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
738       0);
739   worker->Start(service_thread_.task_runner());
740   worker->WakeUp();
741 
742   controls->WaitForWorkToRun();
743   CallJoinFromDifferentThread join_from_different_thread(worker.get());
744   join_from_different_thread.Start();
745   join_from_different_thread.WaitForRunToStart();
746   // Sleep here to give the other thread a chance to call JoinForTesting().
747   // Receiving a signal that Run() was called doesn't mean JoinForTesting() was
748   // necessarily called, and we can't signal after JoinForTesting() as
749   // JoinForTesting() blocks until we call UnblockWork().
750   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
751   worker->Cleanup();
752   worker = nullptr;
753   controls->UnblockWork();
754   controls->WaitForDelegateDestroy();
755   join_from_different_thread.Join();
756 }
757 
758 namespace {
759 
760 class ExpectThreadTypeDelegate : public WorkerThreadDefaultDelegate {
761  public:
ExpectThreadTypeDelegate()762   ExpectThreadTypeDelegate()
763       : thread_type_verified_in_get_work_event_(
764             WaitableEvent::ResetPolicy::AUTOMATIC) {}
765   ExpectThreadTypeDelegate(const ExpectThreadTypeDelegate&) = delete;
766   ExpectThreadTypeDelegate& operator=(const ExpectThreadTypeDelegate&) = delete;
767 
SetExpectedThreadType(ThreadType expected_thread_type)768   void SetExpectedThreadType(ThreadType expected_thread_type) {
769     expected_thread_type_ = expected_thread_type;
770   }
771 
WaitForThreadTypeVerifiedInGetWork()772   void WaitForThreadTypeVerifiedInGetWork() {
773     thread_type_verified_in_get_work_event_.Wait();
774   }
775 
776   // WorkerThread::Delegate:
OnMainEntry(WorkerThread * worker)777   void OnMainEntry(WorkerThread* worker) override { VerifyThreadType(); }
GetWork(WorkerThread * worker)778   RegisteredTaskSource GetWork(WorkerThread* worker) override {
779     VerifyThreadType();
780     thread_type_verified_in_get_work_event_.Signal();
781     return nullptr;
782   }
783 
784  private:
VerifyThreadType()785   void VerifyThreadType() {
786     CheckedAutoLock auto_lock(expected_thread_type_lock_);
787     EXPECT_EQ(expected_thread_type_, PlatformThread::GetCurrentThreadType());
788   }
789 
790   // Signaled after GetWork() has verified the thread type of the worker thread.
791   TestWaitableEvent thread_type_verified_in_get_work_event_;
792 
793   // Synchronizes access to |expected_thread_type_|.
794   CheckedLock expected_thread_type_lock_;
795 
796   // Expected thread type for the next call to OnMainEntry() or GetWork().
797   ThreadType expected_thread_type_ = ThreadType::kDefault;
798 };
799 
800 }  // namespace
801 
TEST_F(ThreadPoolWorkerTest,BumpThreadTypeOfAliveThreadDuringShutdown)802 TEST_F(ThreadPoolWorkerTest, BumpThreadTypeOfAliveThreadDuringShutdown) {
803   if (!CanUseBackgroundThreadTypeForWorkerThread()) {
804     return;
805   }
806 
807   TaskTracker task_tracker;
808 
809   // Block shutdown to ensure that the worker doesn't exit when StartShutdown()
810   // is called.
811   scoped_refptr<Sequence> sequence =
812       MakeRefCounted<Sequence>(TaskTraits{TaskShutdownBehavior::BLOCK_SHUTDOWN},
813                                nullptr, TaskSourceExecutionMode::kParallel);
814   auto registered_task_source =
815       task_tracker.RegisterTaskSource(std::move(sequence));
816 
817   std::unique_ptr<ExpectThreadTypeDelegate> delegate(
818       new ExpectThreadTypeDelegate);
819   ExpectThreadTypeDelegate* delegate_raw = delegate.get();
820   delegate_raw->SetExpectedThreadType(ThreadType::kBackground);
821   auto worker = MakeRefCounted<WorkerThread>(
822       ThreadType::kBackground, std::move(delegate),
823       task_tracker.GetTrackedRef(), 0);
824   worker->Start(service_thread_.task_runner());
825 
826   // Verify that the initial thread type is kBackground (or kNormal if thread
827   // type can't be increased).
828   worker->WakeUp();
829   delegate_raw->WaitForThreadTypeVerifiedInGetWork();
830 
831   // Verify that the thread type is bumped to kNormal during shutdown.
832   delegate_raw->SetExpectedThreadType(ThreadType::kDefault);
833   task_tracker.StartShutdown();
834   worker->WakeUp();
835   delegate_raw->WaitForThreadTypeVerifiedInGetWork();
836 
837   worker->JoinForTesting();
838 }
839 
840 namespace {
841 
842 class VerifyCallsToObserverDelegate : public WorkerThreadDefaultDelegate {
843  public:
VerifyCallsToObserverDelegate(test::MockWorkerThreadObserver * observer)844   explicit VerifyCallsToObserverDelegate(
845       test::MockWorkerThreadObserver* observer)
846       : observer_(observer) {}
847   VerifyCallsToObserverDelegate(const VerifyCallsToObserverDelegate&) = delete;
848   VerifyCallsToObserverDelegate& operator=(
849       const VerifyCallsToObserverDelegate&) = delete;
850 
851   // WorkerThread::Delegate:
OnMainEntry(WorkerThread * worker)852   void OnMainEntry(WorkerThread* worker) override {
853     Mock::VerifyAndClear(observer_);
854   }
855 
OnMainExit(WorkerThread * worker)856   void OnMainExit(WorkerThread* worker) override {
857     observer_->AllowCallsOnMainExit(1);
858   }
859 
860  private:
861   const raw_ptr<test::MockWorkerThreadObserver> observer_;
862 };
863 
864 }  // namespace
865 
866 // Verify that the WorkerThreadObserver is notified when the worker enters
867 // and exits its main function.
TEST_F(ThreadPoolWorkerTest,WorkerThreadObserver)868 TEST_F(ThreadPoolWorkerTest, WorkerThreadObserver) {
869   StrictMock<test::MockWorkerThreadObserver> observer;
870   TaskTracker task_tracker;
871   auto delegate = std::make_unique<VerifyCallsToObserverDelegate>(&observer);
872   auto worker = MakeRefCounted<WorkerThread>(
873       ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
874       0);
875   EXPECT_CALL(observer, OnWorkerThreadMainEntry());
876   worker->Start(service_thread_.task_runner(), &observer);
877   worker->Cleanup();
878   // Join the worker to avoid leaks.
879   worker->JoinForTesting();
880   Mock::VerifyAndClear(&observer);
881 }
882 
883 #if PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
884     PA_CONFIG(THREAD_CACHE_SUPPORTED)
885 namespace {
FreeForTest(void * data)886 NOINLINE void FreeForTest(void* data) {
887   free(data);
888 }
889 
890 using ::testing::AllOf;
891 using ::testing::Ge;
892 using ::testing::Le;
893 
894 }  // namespace
895 
896 class WorkerThreadThreadCacheDelegate : public WorkerThreadDefaultDelegate {
897  public:
PrepareForTesting()898   void PrepareForTesting() {
899     TimeTicks now = TimeTicks::Now();
900     WorkerThreadDefaultDelegate::set_first_sleep_time_for_testing(now);
901     first_sleep_time_for_testing_ = now;
902   }
903 
GetSleepDurationBeforePurge(TimeTicks)904   TimeDelta GetSleepDurationBeforePurge(TimeTicks) override {
905     // Expect `GetSleepDurationBeforePurge()` to return
906     // `kFirstSleepDurationBeforePurge` when invoked within
907     // `kFirstSleepDurationBeforePurge` of the first sleep (with
908     // `kPurgeThreadCacheIdleDelay` tolerance due to alignment).
909     EXPECT_THAT(
910         WorkerThreadDefaultDelegate::GetSleepDurationBeforePurge(
911             /* now=*/first_sleep_time_for_testing_),
912         AllOf(Ge(WorkerThread::Delegate::kFirstSleepDurationBeforePurge),
913               Le(WorkerThread::Delegate::kFirstSleepDurationBeforePurge +
914                  WorkerThread::Delegate::kPurgeThreadCacheIdleDelay)));
915 
916     // Expect `GetSleepDurationBeforePurge()` to return
917     // `WorkerThread::Delegate::kPurgeThreadCacheIdleDelay` when invoked later
918     // than `kFirstSleepDurationBeforePurge` after the first sleep (with
919     // `kPurgeThreadCacheIdleDelay` tolerance due to alignment).
920     constexpr base::TimeDelta kEpsilon = base::Microseconds(1);
921     EXPECT_THAT(
922         WorkerThreadDefaultDelegate::GetSleepDurationBeforePurge(
923             /* now=*/first_sleep_time_for_testing_ +
924             WorkerThread::Delegate::kFirstSleepDurationBeforePurge + kEpsilon),
925         AllOf(Ge(WorkerThread::Delegate::kPurgeThreadCacheIdleDelay),
926               Le(WorkerThread::Delegate::kPurgeThreadCacheIdleDelay +
927                  WorkerThread::Delegate::kPurgeThreadCacheIdleDelay)));
928 
929     // The output of this function is used to drive real sleep in tests.
930     // Return a constant so that the tests can validate the wakeups without
931     // timing out.
932     return GetSleepTimeout();
933   }
934 
WaitForWork()935   void WaitForWork() override {
936     // Fill several buckets before going to sleep.
937     for (size_t size = 8;
938          size < partition_alloc::ThreadCache::kDefaultSizeThreshold; size++) {
939       void* data = malloc(size);
940       // A simple malloc() / free() pair can be discarded by the compiler (and
941       // is), making the test fail. It is sufficient to make |FreeForTest()| a
942       // NOINLINE function for the call to not be eliminated, but it is
943       // required.
944       FreeForTest(data);
945     }
946 
947     size_t cached_memory_before =
948         partition_alloc::ThreadCache::Get()->CachedMemory();
949     WorkerThreadDefaultDelegate::WaitForWork();
950     size_t cached_memory_after =
951         partition_alloc::ThreadCache::Get()->CachedMemory();
952 
953     if (!test_done_) {
954       if (purge_expected_) {
955         EXPECT_LT(cached_memory_after, cached_memory_before / 2);
956       } else {
957         EXPECT_GT(cached_memory_after, cached_memory_before / 2);
958       }
959       // Unblock the test.
960       wakeup_done_.Signal();
961       test_done_ = true;
962     }
963   }
964 
965   // Avoid using the default sleep timeout which is infinite and prevents the
966   // tests from completing.
GetSleepTimeout()967   TimeDelta GetSleepTimeout() override {
968     return WorkerThread::Delegate::kPurgeThreadCacheIdleDelay +
969            TestTimeouts::tiny_timeout();
970   }
971 
SetPurgeExpectation(bool purge_expected)972   void SetPurgeExpectation(bool purge_expected) {
973     purge_expected_ = purge_expected;
974   }
975 
976   TimeTicks first_sleep_time_for_testing_;
977   bool test_done_ = false;
978   bool purge_expected_ = false;
979   base::WaitableEvent wakeup_done_;
980 };
981 
TEST_F(ThreadPoolWorkerTest,WorkerThreadCacheNoPurgeOnSignal)982 TEST_F(ThreadPoolWorkerTest, WorkerThreadCacheNoPurgeOnSignal) {
983   // Make sure the thread cache is enabled in the main partition.
984   partition_alloc::internal::ThreadCacheProcessScopeForTesting scope(
985       allocator_shim::internal::PartitionAllocMalloc::Allocator());
986 
987   TaskTracker task_tracker;
988   auto delegate = std::make_unique<WorkerThreadThreadCacheDelegate>();
989   auto* delegate_raw = delegate.get();
990   auto worker = MakeRefCounted<WorkerThread>(
991       ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
992       0);
993   delegate_raw->PrepareForTesting();
994 
995   // No purge is expected on waking up from a signal.
996   delegate_raw->SetPurgeExpectation(false);
997 
998   // Wake up before the thread is started to make sure the first wakeup is
999   // caused by a signal.
1000   worker->WakeUp();
1001   worker->Start(service_thread_.task_runner(), nullptr);
1002 
1003   // Wait until a wakeup has completed.
1004   delegate_raw->wakeup_done_.Wait();
1005   worker->JoinForTesting();
1006 }
1007 
TEST_F(ThreadPoolWorkerTest,PurgeOnUninteruptedSleep)1008 TEST_F(ThreadPoolWorkerTest, PurgeOnUninteruptedSleep) {
1009   // Make sure the thread cache is enabled in the main partition.
1010   partition_alloc::internal::ThreadCacheProcessScopeForTesting scope(
1011       allocator_shim::internal::PartitionAllocMalloc::Allocator());
1012 
1013   TaskTracker task_tracker;
1014   auto delegate = std::make_unique<WorkerThreadThreadCacheDelegate>();
1015   auto* delegate_raw = delegate.get();
1016   auto worker = MakeRefCounted<WorkerThread>(
1017       ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
1018       0);
1019 
1020   delegate_raw->PrepareForTesting();
1021 
1022   // A purge will take place
1023   delegate_raw->SetPurgeExpectation(true);
1024 
1025   worker->Start(service_thread_.task_runner(), nullptr);
1026 
1027   // Wait until a wakeup has completed.
1028   delegate_raw->wakeup_done_.Wait();
1029   worker->JoinForTesting();
1030 }
1031 
1032 #endif  // PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
1033         // PA_CONFIG(THREAD_CACHE_SUPPORTED)
1034 
1035 }  // namespace base::internal
1036