• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/worker_thread_waitable_event.h"
6 
7 #include <stddef.h>
8 
9 #include <atomic>
10 #include <memory>
11 #include <utility>
12 #include <vector>
13 
14 #include "base/allocator/partition_allocator/src/partition_alloc/partition_alloc_buildflags.h"
15 #include "base/allocator/partition_allocator/src/partition_alloc/partition_alloc_config.h"
16 #include "base/allocator/partition_allocator/src/partition_alloc/shim/allocator_shim.h"
17 #include "base/allocator/partition_allocator/src/partition_alloc/shim/allocator_shim_default_dispatch_to_partition_alloc.h"
18 #include "base/compiler_specific.h"
19 #include "base/functional/bind.h"
20 #include "base/functional/callback_helpers.h"
21 #include "base/memory/ptr_util.h"
22 #include "base/memory/raw_ptr.h"
23 #include "base/memory/ref_counted.h"
24 #include "base/message_loop/message_pump_type.h"
25 #include "base/synchronization/condition_variable.h"
26 #include "base/task/common/checked_lock.h"
27 #include "base/task/thread_pool/environment_config.h"
28 #include "base/task/thread_pool/sequence.h"
29 #include "base/task/thread_pool/task.h"
30 #include "base/task/thread_pool/task_tracker.h"
31 #include "base/task/thread_pool/test_utils.h"
32 #include "base/task/thread_pool/worker_thread_observer.h"
33 #include "base/test/test_timeouts.h"
34 #include "base/test/test_waitable_event.h"
35 #include "base/threading/platform_thread.h"
36 #include "base/threading/simple_thread.h"
37 #include "base/threading/thread.h"
38 #include "base/time/time.h"
39 #include "build/build_config.h"
40 #include "testing/gmock/include/gmock/gmock.h"
41 #include "testing/gtest/include/gtest/gtest.h"
42 
43 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
44     PA_CONFIG(THREAD_CACHE_SUPPORTED)
45 #include "base/allocator/partition_allocator/src/partition_alloc/extended_api.h"
46 #include "base/allocator/partition_allocator/src/partition_alloc/thread_cache.h"
47 #endif  // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
48         // PA_CONFIG(THREAD_CACHE_SUPPORTED)
49 
50 using testing::_;
51 using testing::Mock;
52 using testing::Ne;
53 using testing::StrictMock;
54 
55 namespace base {
56 namespace internal {
57 namespace {
58 
59 const size_t kNumSequencesPerTest = 150;
60 
61 class WorkerThreadDefaultDelegate : public WorkerThreadWaitableEvent::Delegate {
62  public:
63   WorkerThreadDefaultDelegate() = default;
64   WorkerThreadDefaultDelegate(const WorkerThreadDefaultDelegate&) = delete;
65   WorkerThreadDefaultDelegate& operator=(const WorkerThreadDefaultDelegate&) =
66       delete;
67 
68   // WorkerThreadWaitableEvent::Delegate:
GetThreadLabel() const69   WorkerThread::ThreadLabel GetThreadLabel() const override {
70     return WorkerThread::ThreadLabel::DEDICATED;
71   }
OnMainEntry(WorkerThread * worker)72   void OnMainEntry(WorkerThread* worker) override {}
GetWork(WorkerThread * worker)73   RegisteredTaskSource GetWork(WorkerThread* worker) override {
74     return nullptr;
75   }
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)76   RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
77                                          WorkerThread* worker) override {
78     ADD_FAILURE() << "Unexpected call to SwapProcessedTask()";
79     return nullptr;
80   }
GetSleepTimeout()81   TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
82 };
83 
84 // The test parameter is the number of Tasks per Sequence returned by GetWork().
85 class ThreadPoolWorkerTest : public testing::TestWithParam<int> {
86  public:
87   ThreadPoolWorkerTest(const ThreadPoolWorkerTest&) = delete;
88   ThreadPoolWorkerTest& operator=(const ThreadPoolWorkerTest&) = delete;
89 
90  protected:
ThreadPoolWorkerTest()91   ThreadPoolWorkerTest() : num_get_work_cv_(lock_.CreateConditionVariable()) {
92     Thread::Options service_thread_options;
93     service_thread_options.message_pump_type = MessagePumpType::IO;
94     service_thread_.StartWithOptions(std::move(service_thread_options));
95   }
96 
SetUp()97   void SetUp() override {
98     worker_ = MakeRefCounted<WorkerThreadWaitableEvent>(
99         ThreadType::kDefault, std::make_unique<TestWorkerThreadWaitableEventDelegate>(this),
100         task_tracker_.GetTrackedRef(), 0);
101     ASSERT_TRUE(worker_);
102     worker_->Start(service_thread_.task_runner());
103     worker_set_.Signal();
104     main_entry_called_.Wait();
105   }
106 
TearDown()107   void TearDown() override {
108     // |worker_| needs to be released before ~TaskTracker() as it holds a
109     // TrackedRef to it.
110     worker_->JoinForTesting();
111     worker_ = nullptr;
112   }
113 
TasksPerSequence() const114   int TasksPerSequence() const { return GetParam(); }
115 
116   // Wait until GetWork() has been called |num_get_work| times.
WaitForNumGetWork(size_t num_get_work)117   void WaitForNumGetWork(size_t num_get_work) {
118     CheckedAutoLock auto_lock(lock_);
119     while (num_get_work_ < num_get_work)
120       num_get_work_cv_->Wait();
121   }
122 
SetMaxGetWork(size_t max_get_work)123   void SetMaxGetWork(size_t max_get_work) {
124     CheckedAutoLock auto_lock(lock_);
125     max_get_work_ = max_get_work;
126   }
127 
SetNumSequencesToCreate(size_t num_sequences_to_create)128   void SetNumSequencesToCreate(size_t num_sequences_to_create) {
129     CheckedAutoLock auto_lock(lock_);
130     EXPECT_EQ(0U, num_sequences_to_create_);
131     num_sequences_to_create_ = num_sequences_to_create;
132   }
133 
NumRunTasks()134   size_t NumRunTasks() {
135     CheckedAutoLock auto_lock(lock_);
136     return num_run_tasks_;
137   }
138 
CreatedTaskSources()139   std::vector<scoped_refptr<TaskSource>> CreatedTaskSources() {
140     CheckedAutoLock auto_lock(lock_);
141     return created_sequences_;
142   }
143 
DidProcessTaskSequences()144   std::vector<scoped_refptr<TaskSource>> DidProcessTaskSequences() {
145     CheckedAutoLock auto_lock(lock_);
146     return did_run_task_sources_;
147   }
148 
149   scoped_refptr<WorkerThreadWaitableEvent> worker_;
150   Thread service_thread_ = Thread("ServiceThread");
151 
152  private:
153   class TestWorkerThreadWaitableEventDelegate : public WorkerThreadDefaultDelegate {
154    public:
TestWorkerThreadWaitableEventDelegate(ThreadPoolWorkerTest * outer)155     explicit TestWorkerThreadWaitableEventDelegate(ThreadPoolWorkerTest* outer)
156         : outer_(outer) {}
157     TestWorkerThreadWaitableEventDelegate(const TestWorkerThreadWaitableEventDelegate&) = delete;
158     TestWorkerThreadWaitableEventDelegate& operator=(const TestWorkerThreadWaitableEventDelegate&) =
159         delete;
160 
~TestWorkerThreadWaitableEventDelegate()161     ~TestWorkerThreadWaitableEventDelegate() override {
162       EXPECT_FALSE(IsCallToDidProcessTaskExpected());
163     }
164 
165     // WorkerThread::Delegate:
OnMainEntry(WorkerThread * worker)166     void OnMainEntry(WorkerThread* worker) override {
167       outer_->worker_set_.Wait();
168       EXPECT_EQ(outer_->worker_.get(), worker);
169       EXPECT_FALSE(IsCallToDidProcessTaskExpected());
170 
171       // Without synchronization, OnMainEntry() could be called twice without
172       // generating an error.
173       CheckedAutoLock auto_lock(outer_->lock_);
174       EXPECT_FALSE(outer_->main_entry_called_.IsSignaled());
175       outer_->main_entry_called_.Signal();
176     }
177 
GetWork(WorkerThread * worker)178     RegisteredTaskSource GetWork(WorkerThread* worker) override {
179       EXPECT_FALSE(IsCallToDidProcessTaskExpected());
180       EXPECT_EQ(outer_->worker_.get(), worker);
181 
182       {
183         CheckedAutoLock auto_lock(outer_->lock_);
184 
185         // Increment the number of times that this method has been called.
186         ++outer_->num_get_work_;
187         outer_->num_get_work_cv_->Signal();
188 
189         // Verify that this method isn't called more times than expected.
190         EXPECT_LE(outer_->num_get_work_, outer_->max_get_work_);
191 
192         // Check if a Sequence should be returned.
193         if (outer_->num_sequences_to_create_ == 0)
194           return nullptr;
195         --outer_->num_sequences_to_create_;
196       }
197 
198       // Create a Sequence with TasksPerSequence() Tasks.
199       scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
200           TaskTraits(), nullptr, TaskSourceExecutionMode::kParallel);
201       Sequence::Transaction sequence_transaction(sequence->BeginTransaction());
202       for (int i = 0; i < outer_->TasksPerSequence(); ++i) {
203         Task task(FROM_HERE,
204                   BindOnce(&ThreadPoolWorkerTest::RunTaskCallback,
205                            Unretained(outer_)),
206                   TimeTicks::Now(), TimeDelta());
207         sequence_transaction.WillPushImmediateTask();
208         EXPECT_TRUE(outer_->task_tracker_.WillPostTask(
209             &task, sequence->shutdown_behavior()));
210         sequence_transaction.PushImmediateTask(std::move(task));
211       }
212       auto registered_task_source =
213           outer_->task_tracker_.RegisterTaskSource(sequence);
214       EXPECT_TRUE(registered_task_source);
215 
216       ExpectCallToDidProcessTask();
217 
218       {
219         // Add the Sequence to the vector of created Sequences.
220         CheckedAutoLock auto_lock(outer_->lock_);
221         outer_->created_sequences_.push_back(sequence);
222       }
223       auto run_status = registered_task_source.WillRunTask();
224       EXPECT_NE(run_status, TaskSource::RunStatus::kDisallowed);
225       return registered_task_source;
226     }
227 
SwapProcessedTask(RegisteredTaskSource registered_task_source,WorkerThread * worker)228     RegisteredTaskSource SwapProcessedTask(
229         RegisteredTaskSource registered_task_source,
230         WorkerThread* worker) override {
231       {
232         CheckedAutoLock auto_lock(expect_did_run_task_lock_);
233         EXPECT_TRUE(expect_did_run_task_);
234         expect_did_run_task_ = false;
235       }
236 
237       // If TasksPerSequence() is 1, |registered_task_source| should be nullptr.
238       // Otherwise, |registered_task_source| should contain TasksPerSequence() -
239       // 1 Tasks.
240       if (outer_->TasksPerSequence() == 1) {
241         EXPECT_FALSE(registered_task_source);
242       } else {
243         EXPECT_TRUE(registered_task_source);
244         EXPECT_TRUE(registered_task_source.WillReEnqueue(TimeTicks::Now()));
245 
246         // Verify the number of Tasks in |registered_task_source|.
247         for (int i = 0; i < outer_->TasksPerSequence() - 1; ++i) {
248           registered_task_source.WillRunTask();
249           IgnoreResult(registered_task_source.TakeTask());
250           if (i < outer_->TasksPerSequence() - 2) {
251             EXPECT_TRUE(registered_task_source.DidProcessTask());
252             EXPECT_TRUE(registered_task_source.WillReEnqueue(TimeTicks::Now()));
253           } else {
254             EXPECT_FALSE(registered_task_source.DidProcessTask());
255           }
256         }
257         scoped_refptr<TaskSource> task_source =
258             registered_task_source.Unregister();
259         {
260           // Add |task_source| to |did_run_task_sources_|.
261           CheckedAutoLock auto_lock(outer_->lock_);
262           outer_->did_run_task_sources_.push_back(std::move(task_source));
263           EXPECT_LE(outer_->did_run_task_sources_.size(),
264                     outer_->created_sequences_.size());
265         }
266       }
267       return GetWork(worker);
268     }
269 
270    private:
271     // Expect a call to DidProcessTask() before the next call to any other
272     // method of this delegate.
ExpectCallToDidProcessTask()273     void ExpectCallToDidProcessTask() {
274       CheckedAutoLock auto_lock(expect_did_run_task_lock_);
275       expect_did_run_task_ = true;
276     }
277 
IsCallToDidProcessTaskExpected() const278     bool IsCallToDidProcessTaskExpected() const {
279       CheckedAutoLock auto_lock(expect_did_run_task_lock_);
280       return expect_did_run_task_;
281     }
282 
283     raw_ptr<ThreadPoolWorkerTest> outer_;
284 
285     // Synchronizes access to |expect_did_run_task_|.
286     mutable CheckedLock expect_did_run_task_lock_;
287 
288     // Whether the next method called on this delegate should be
289     // DidProcessTask().
290     bool expect_did_run_task_ = false;
291   };
292 
RunTaskCallback()293   void RunTaskCallback() {
294     CheckedAutoLock auto_lock(lock_);
295     ++num_run_tasks_;
296     EXPECT_LE(num_run_tasks_, created_sequences_.size());
297   }
298 
299   TaskTracker task_tracker_;
300 
301   // Synchronizes access to all members below.
302   mutable CheckedLock lock_;
303 
304   // Signaled once OnMainEntry() has been called.
305   TestWaitableEvent main_entry_called_;
306 
307   // Number of Sequences that should be created by GetWork(). When this
308   // is 0, GetWork() returns nullptr.
309   size_t num_sequences_to_create_ = 0;
310 
311   // Number of times that GetWork() has been called.
312   size_t num_get_work_ = 0;
313 
314   // Maximum number of times that GetWork() can be called.
315   size_t max_get_work_ = 0;
316 
317   // Condition variable signaled when |num_get_work_| is incremented.
318   std::unique_ptr<ConditionVariable> num_get_work_cv_;
319 
320   // Sequences created by GetWork().
321   std::vector<scoped_refptr<TaskSource>> created_sequences_;
322 
323   // Sequences passed to DidProcessTask().
324   std::vector<scoped_refptr<TaskSource>> did_run_task_sources_;
325 
326   // Number of times that RunTaskCallback() has been called.
327   size_t num_run_tasks_ = 0;
328 
329   // Signaled after |worker_| is set.
330   TestWaitableEvent worker_set_;
331 };
332 
333 }  // namespace
334 
335 // Verify that when GetWork() continuously returns Sequences, all Tasks in these
336 // Sequences run successfully. The test wakes up the WorkerThread once.
TEST_P(ThreadPoolWorkerTest,ContinuousWork)337 TEST_P(ThreadPoolWorkerTest, ContinuousWork) {
338   // Set GetWork() to return |kNumSequencesPerTest| Sequences before starting to
339   // return nullptr.
340   SetNumSequencesToCreate(kNumSequencesPerTest);
341 
342   // Expect |kNumSequencesPerTest| calls to GetWork() in which it returns a
343   // Sequence and one call in which its returns nullptr.
344   const size_t kExpectedNumGetWork = kNumSequencesPerTest + 1;
345   SetMaxGetWork(kExpectedNumGetWork);
346 
347   // Wake up |worker_| and wait until GetWork() has been invoked the
348   // expected amount of times.
349   worker_->WakeUp();
350   WaitForNumGetWork(kExpectedNumGetWork);
351 
352   // All tasks should have run.
353   EXPECT_EQ(kNumSequencesPerTest, NumRunTasks());
354 
355   // If Sequences returned by GetWork() contain more than one Task, they aren't
356   // empty after the worker pops Tasks from them and thus should be returned to
357   // DidProcessTask().
358   if (TasksPerSequence() > 1)
359     EXPECT_EQ(CreatedTaskSources(), DidProcessTaskSequences());
360   else
361     EXPECT_TRUE(DidProcessTaskSequences().empty());
362 }
363 
364 // Verify that when GetWork() alternates between returning a Sequence and
365 // returning nullptr, all Tasks in the returned Sequences run successfully. The
366 // test wakes up the WorkerThread once for each Sequence.
TEST_P(ThreadPoolWorkerTest,IntermittentWork)367 TEST_P(ThreadPoolWorkerTest, IntermittentWork) {
368   for (size_t i = 0; i < kNumSequencesPerTest; ++i) {
369     // Set GetWork() to return 1 Sequence before starting to return
370     // nullptr.
371     SetNumSequencesToCreate(1);
372 
373     // Expect |i + 1| calls to GetWork() in which it returns a Sequence and
374     // |i + 1| calls in which it returns nullptr.
375     const size_t expected_num_get_work = 2 * (i + 1);
376     SetMaxGetWork(expected_num_get_work);
377 
378     // Wake up |worker_| and wait until GetWork() has been invoked
379     // the expected amount of times.
380     worker_->WakeUp();
381     WaitForNumGetWork(expected_num_get_work);
382 
383     // The Task should have run
384     EXPECT_EQ(i + 1, NumRunTasks());
385 
386     // If Sequences returned by GetWork() contain more than one Task, they
387     // aren't empty after the worker pops Tasks from them and thus should be
388     // returned to DidProcessTask().
389     if (TasksPerSequence() > 1)
390       EXPECT_EQ(CreatedTaskSources(), DidProcessTaskSequences());
391     else
392       EXPECT_TRUE(DidProcessTaskSequences().empty());
393   }
394 }
395 
396 INSTANTIATE_TEST_SUITE_P(OneTaskPerSequence,
397                          ThreadPoolWorkerTest,
398                          ::testing::Values(1));
399 INSTANTIATE_TEST_SUITE_P(TwoTasksPerSequence,
400                          ThreadPoolWorkerTest,
401                          ::testing::Values(2));
402 
403 namespace {
404 
405 class ControllableCleanupDelegate : public WorkerThreadDefaultDelegate {
406  public:
407   class Controls : public RefCountedThreadSafe<Controls> {
408    public:
409     Controls() = default;
410     Controls(const Controls&) = delete;
411     Controls& operator=(const Controls&) = delete;
412 
HaveWorkBlock()413     void HaveWorkBlock() { work_running_.Reset(); }
414 
UnblockWork()415     void UnblockWork() { work_running_.Signal(); }
416 
WaitForWorkToRun()417     void WaitForWorkToRun() { work_processed_.Wait(); }
418 
WaitForCleanupRequest()419     void WaitForCleanupRequest() { cleanup_requested_.Wait(); }
420 
WaitForDelegateDestroy()421     void WaitForDelegateDestroy() { destroyed_.Wait(); }
422 
WaitForMainExit()423     void WaitForMainExit() { exited_.Wait(); }
424 
set_expect_get_work(bool expect_get_work)425     void set_expect_get_work(bool expect_get_work) {
426       expect_get_work_ = expect_get_work;
427     }
428 
ResetState()429     void ResetState() {
430       work_running_.Signal();
431       work_processed_.Reset();
432       cleanup_requested_.Reset();
433       exited_.Reset();
434       work_requested_ = false;
435     }
436 
set_can_cleanup(bool can_cleanup)437     void set_can_cleanup(bool can_cleanup) { can_cleanup_ = can_cleanup; }
438 
439    private:
440     friend class ControllableCleanupDelegate;
441     friend class RefCountedThreadSafe<Controls>;
442     ~Controls() = default;
443 
444     TestWaitableEvent work_running_{WaitableEvent::ResetPolicy::MANUAL,
445                                     WaitableEvent::InitialState::SIGNALED};
446     TestWaitableEvent work_processed_;
447     TestWaitableEvent cleanup_requested_;
448     TestWaitableEvent destroyed_;
449     TestWaitableEvent exited_;
450 
451     bool expect_get_work_ = true;
452     bool can_cleanup_ = false;
453     bool work_requested_ = false;
454   };
455 
ControllableCleanupDelegate(TaskTracker * task_tracker)456   explicit ControllableCleanupDelegate(TaskTracker* task_tracker)
457       : task_tracker_(task_tracker), controls_(new Controls()) {}
458 
459   ControllableCleanupDelegate(const ControllableCleanupDelegate&) = delete;
460   ControllableCleanupDelegate& operator=(const ControllableCleanupDelegate&) =
461       delete;
~ControllableCleanupDelegate()462   ~ControllableCleanupDelegate() override { controls_->destroyed_.Signal(); }
463 
GetWork(WorkerThread * worker)464   RegisteredTaskSource GetWork(WorkerThread* worker) override {
465     EXPECT_TRUE(controls_->expect_get_work_);
466 
467     // Sends one item of work to signal |work_processed_|. On subsequent calls,
468     // sends nullptr to indicate there's no more work to be done.
469     if (controls_->work_requested_) {
470       if (CanCleanup(worker)) {
471         OnCleanup();
472         worker->Cleanup();
473         controls_->set_expect_get_work(false);
474       }
475       return nullptr;
476     }
477 
478     controls_->work_requested_ = true;
479     scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
480         TaskTraits(WithBaseSyncPrimitives(),
481                    TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN),
482         nullptr, TaskSourceExecutionMode::kParallel);
483     Task task(FROM_HERE,
484               BindOnce(
485                   [](TestWaitableEvent* work_processed,
486                      TestWaitableEvent* work_running) {
487                     work_processed->Signal();
488                     work_running->Wait();
489                   },
490                   Unretained(&controls_->work_processed_),
491                   Unretained(&controls_->work_running_)),
492               TimeTicks::Now(), TimeDelta());
493     auto transaction = sequence->BeginTransaction();
494     transaction.WillPushImmediateTask();
495     EXPECT_TRUE(
496         task_tracker_->WillPostTask(&task, sequence->shutdown_behavior()));
497     transaction.PushImmediateTask(std::move(task));
498     auto registered_task_source =
499         task_tracker_->RegisterTaskSource(std::move(sequence));
500     EXPECT_TRUE(registered_task_source);
501     registered_task_source.WillRunTask();
502     return registered_task_source;
503   }
504 
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)505   RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source, WorkerThread* worker) override {
506     return GetWork(worker);
507   }
508 
OnMainExit(WorkerThread * worker)509   void OnMainExit(WorkerThread* worker) override {
510     controls_->exited_.Signal();
511   }
512 
CanCleanup(WorkerThread * worker)513   bool CanCleanup(WorkerThread* worker) {
514     // Saving |can_cleanup_| now so that callers waiting on |cleanup_requested_|
515     // have the thread go to sleep and then allow timing out.
516     bool can_cleanup = controls_->can_cleanup_;
517     controls_->cleanup_requested_.Signal();
518     return can_cleanup;
519   }
520 
OnCleanup()521   void OnCleanup() {
522     EXPECT_TRUE(controls_->can_cleanup_);
523     EXPECT_TRUE(controls_->cleanup_requested_.IsSignaled());
524   }
525 
526   // ControllableCleanupDelegate:
controls()527   scoped_refptr<Controls> controls() { return controls_; }
528 
529  private:
530   scoped_refptr<Sequence> work_sequence_;
531   const raw_ptr<TaskTracker> task_tracker_;
532   scoped_refptr<Controls> controls_;
533 };
534 
535 class MockedControllableCleanupDelegate : public ControllableCleanupDelegate {
536  public:
MockedControllableCleanupDelegate(TaskTracker * task_tracker)537   explicit MockedControllableCleanupDelegate(TaskTracker* task_tracker)
538       : ControllableCleanupDelegate(task_tracker) {}
539   MockedControllableCleanupDelegate(const MockedControllableCleanupDelegate&) =
540       delete;
541   MockedControllableCleanupDelegate& operator=(
542       const MockedControllableCleanupDelegate&) = delete;
543   ~MockedControllableCleanupDelegate() override = default;
544 
545   // WorkerThread::Delegate:
546   MOCK_METHOD1(OnMainEntry, void(WorkerThread* worker));
547 };
548 
549 }  // namespace
550 
551 // Verify that calling WorkerThread::Cleanup() from GetWork() causes
552 // the WorkerThread's thread to exit.
TEST(ThreadPoolWorkerTest,WorkerCleanupFromGetWork)553 TEST(ThreadPoolWorkerTest, WorkerCleanupFromGetWork) {
554   Thread service_thread = Thread("ServiceThread");
555   Thread::Options service_thread_options;
556   service_thread_options.message_pump_type = MessagePumpType::IO;
557   service_thread.StartWithOptions(std::move(service_thread_options));
558   TaskTracker task_tracker;
559   // Will be owned by WorkerThread.
560   MockedControllableCleanupDelegate* delegate =
561       new StrictMock<MockedControllableCleanupDelegate>(&task_tracker);
562   scoped_refptr<ControllableCleanupDelegate::Controls> controls =
563       delegate->controls();
564   controls->set_can_cleanup(true);
565   EXPECT_CALL(*delegate, OnMainEntry(_));
566   auto worker =
567       MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, WrapUnique(delegate),
568                                    task_tracker.GetTrackedRef(), 0);
569   worker->Start(service_thread.task_runner());
570   worker->WakeUp();
571   controls->WaitForWorkToRun();
572   Mock::VerifyAndClear(delegate);
573   controls->WaitForMainExit();
574   // Join the worker to avoid leaks.
575   worker->JoinForTesting();
576 }
577 
TEST(ThreadPoolWorkerTest,WorkerCleanupDuringWork)578 TEST(ThreadPoolWorkerTest, WorkerCleanupDuringWork) {
579   Thread service_thread = Thread("ServiceThread");
580   Thread::Options service_thread_options;
581   service_thread_options.message_pump_type = MessagePumpType::IO;
582   service_thread.StartWithOptions(std::move(service_thread_options));
583   TaskTracker task_tracker;
584   // Will be owned by WorkerThread.
585   // No mock here as that's reasonably covered by other tests and the delegate
586   // may destroy on a different thread. Mocks aren't designed with that in mind.
587   std::unique_ptr<ControllableCleanupDelegate> delegate =
588       std::make_unique<ControllableCleanupDelegate>(&task_tracker);
589   scoped_refptr<ControllableCleanupDelegate::Controls> controls =
590       delegate->controls();
591 
592   controls->HaveWorkBlock();
593 
594   auto worker =
595       MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate),
596                                    task_tracker.GetTrackedRef(), 0);
597   worker->Start(service_thread.task_runner());
598   worker->WakeUp();
599 
600   controls->WaitForWorkToRun();
601   worker->Cleanup();
602   worker = nullptr;
603   controls->UnblockWork();
604   controls->WaitForDelegateDestroy();
605 }
606 
TEST(ThreadPoolWorkerTest,WorkerCleanupDuringWait)607 TEST(ThreadPoolWorkerTest, WorkerCleanupDuringWait) {
608   Thread service_thread = Thread("ServiceThread");
609   Thread::Options service_thread_options;
610   service_thread_options.message_pump_type = MessagePumpType::IO;
611   service_thread.StartWithOptions(std::move(service_thread_options));
612   TaskTracker task_tracker;
613   // Will be owned by WorkerThread.
614   // No mock here as that's reasonably covered by other tests and the delegate
615   // may destroy on a different thread. Mocks aren't designed with that in mind.
616   std::unique_ptr<ControllableCleanupDelegate> delegate =
617       std::make_unique<ControllableCleanupDelegate>(&task_tracker);
618   scoped_refptr<ControllableCleanupDelegate::Controls> controls =
619       delegate->controls();
620 
621   auto worker =
622       MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate),
623                                    task_tracker.GetTrackedRef(), 0);
624   worker->Start(service_thread.task_runner());
625   worker->WakeUp();
626 
627   controls->WaitForCleanupRequest();
628   worker->Cleanup();
629   worker = nullptr;
630   controls->WaitForDelegateDestroy();
631 }
632 
TEST(ThreadPoolWorkerTest,WorkerCleanupDuringShutdown)633 TEST(ThreadPoolWorkerTest, WorkerCleanupDuringShutdown) {
634   TaskTracker task_tracker;
635   Thread service_thread = Thread("ServiceThread");
636   Thread::Options service_thread_options;
637   service_thread_options.message_pump_type = MessagePumpType::IO;
638   service_thread.StartWithOptions(std::move(service_thread_options));
639   // Will be owned by WorkerThread.
640   // No mock here as that's reasonably covered by other tests and the delegate
641   // may destroy on a different thread. Mocks aren't designed with that in mind.
642   std::unique_ptr<ControllableCleanupDelegate> delegate =
643       std::make_unique<ControllableCleanupDelegate>(&task_tracker);
644   scoped_refptr<ControllableCleanupDelegate::Controls> controls =
645       delegate->controls();
646 
647   controls->HaveWorkBlock();
648 
649   auto worker =
650       MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate),
651                                    task_tracker.GetTrackedRef(), 0);
652   worker->Start(service_thread.task_runner());
653   worker->WakeUp();
654 
655   controls->WaitForWorkToRun();
656   test::ShutdownTaskTracker(&task_tracker);
657   worker->Cleanup();
658   worker = nullptr;
659   controls->UnblockWork();
660   controls->WaitForDelegateDestroy();
661 }
662 
663 // Verify that Start() is a no-op after Cleanup().
TEST(ThreadPoolWorkerTest,CleanupBeforeStart)664 TEST(ThreadPoolWorkerTest, CleanupBeforeStart) {
665   Thread service_thread = Thread("ServiceThread");
666   Thread::Options service_thread_options;
667   service_thread_options.message_pump_type = MessagePumpType::IO;
668   service_thread.StartWithOptions(std::move(service_thread_options));
669   TaskTracker task_tracker;
670   // Will be owned by WorkerThread.
671   // No mock here as that's reasonably covered by other tests and the delegate
672   // may destroy on a different thread. Mocks aren't designed with that in mind.
673   std::unique_ptr<ControllableCleanupDelegate> delegate =
674       std::make_unique<ControllableCleanupDelegate>(&task_tracker);
675   scoped_refptr<ControllableCleanupDelegate::Controls> controls =
676       delegate->controls();
677   controls->set_expect_get_work(false);
678 
679   auto worker =
680       MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate),
681                                    task_tracker.GetTrackedRef(), 0);
682 
683   worker->Cleanup();
684   worker->Start(service_thread.task_runner());
685 
686   EXPECT_FALSE(worker->ThreadAliveForTesting());
687 }
688 
689 namespace {
690 
691 class CallJoinFromDifferentThread : public SimpleThread {
692  public:
CallJoinFromDifferentThread(WorkerThreadWaitableEvent * worker_to_join)693   explicit CallJoinFromDifferentThread(WorkerThreadWaitableEvent* worker_to_join)
694       : SimpleThread("WorkerThreadJoinThread"),
695         worker_to_join_(worker_to_join) {}
696 
697   CallJoinFromDifferentThread(const CallJoinFromDifferentThread&) = delete;
698   CallJoinFromDifferentThread& operator=(const CallJoinFromDifferentThread&) =
699       delete;
700   ~CallJoinFromDifferentThread() override = default;
701 
Run()702   void Run() override {
703     run_started_event_.Signal();
704     worker_to_join_.ExtractAsDangling()->JoinForTesting();
705   }
706 
WaitForRunToStart()707   void WaitForRunToStart() { run_started_event_.Wait(); }
708 
709  private:
710   raw_ptr<WorkerThreadWaitableEvent> worker_to_join_;
711   TestWaitableEvent run_started_event_;
712 };
713 
714 }  // namespace
715 
TEST(ThreadPoolWorkerTest,WorkerCleanupDuringJoin)716 TEST(ThreadPoolWorkerTest, WorkerCleanupDuringJoin) {
717   Thread service_thread = Thread("ServiceThread");
718   Thread::Options service_thread_options;
719   service_thread_options.message_pump_type = MessagePumpType::IO;
720   service_thread.StartWithOptions(std::move(service_thread_options));
721   TaskTracker task_tracker;
722   // Will be owned by WorkerThread.
723   // No mock here as that's reasonably covered by other tests and the
724   // delegate may destroy on a different thread. Mocks aren't designed with that
725   // in mind.
726   std::unique_ptr<ControllableCleanupDelegate> delegate =
727       std::make_unique<ControllableCleanupDelegate>(&task_tracker);
728   scoped_refptr<ControllableCleanupDelegate::Controls> controls =
729       delegate->controls();
730 
731   controls->HaveWorkBlock();
732 
733   auto worker =
734       MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate),
735                                    task_tracker.GetTrackedRef(), 0);
736   worker->Start(service_thread.task_runner());
737   worker->WakeUp();
738 
739   controls->WaitForWorkToRun();
740   CallJoinFromDifferentThread join_from_different_thread(worker.get());
741   join_from_different_thread.Start();
742   join_from_different_thread.WaitForRunToStart();
743   // Sleep here to give the other thread a chance to call JoinForTesting().
744   // Receiving a signal that Run() was called doesn't mean JoinForTesting() was
745   // necessarily called, and we can't signal after JoinForTesting() as
746   // JoinForTesting() blocks until we call UnblockWork().
747   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
748   worker->Cleanup();
749   worker = nullptr;
750   controls->UnblockWork();
751   controls->WaitForDelegateDestroy();
752   join_from_different_thread.Join();
753 }
754 
755 namespace {
756 
757 class ExpectThreadTypeDelegate : public WorkerThreadDefaultDelegate {
758  public:
ExpectThreadTypeDelegate()759   ExpectThreadTypeDelegate()
760       : thread_type_verified_in_get_work_event_(
761             WaitableEvent::ResetPolicy::AUTOMATIC) {}
762   ExpectThreadTypeDelegate(const ExpectThreadTypeDelegate&) = delete;
763   ExpectThreadTypeDelegate& operator=(const ExpectThreadTypeDelegate&) = delete;
764 
SetExpectedThreadType(ThreadType expected_thread_type)765   void SetExpectedThreadType(ThreadType expected_thread_type) {
766     expected_thread_type_ = expected_thread_type;
767   }
768 
WaitForThreadTypeVerifiedInGetWork()769   void WaitForThreadTypeVerifiedInGetWork() {
770     thread_type_verified_in_get_work_event_.Wait();
771   }
772 
773   // WorkerThread::Delegate:
OnMainEntry(WorkerThread * worker)774   void OnMainEntry(WorkerThread* worker) override { VerifyThreadType(); }
GetWork(WorkerThread * worker)775   RegisteredTaskSource GetWork(WorkerThread* worker) override {
776     VerifyThreadType();
777     thread_type_verified_in_get_work_event_.Signal();
778     return nullptr;
779   }
780 
781  private:
VerifyThreadType()782   void VerifyThreadType() {
783     CheckedAutoLock auto_lock(expected_thread_type_lock_);
784     EXPECT_EQ(expected_thread_type_, PlatformThread::GetCurrentThreadType());
785   }
786 
787   // Signaled after GetWork() has verified the thread type of the worker thread.
788   TestWaitableEvent thread_type_verified_in_get_work_event_;
789 
790   // Synchronizes access to |expected_thread_type_|.
791   CheckedLock expected_thread_type_lock_;
792 
793   // Expected thread type for the next call to OnMainEntry() or GetWork().
794   ThreadType expected_thread_type_ = ThreadType::kDefault;
795 };
796 
797 }  // namespace
798 
TEST(ThreadPoolWorkerTest,BumpThreadTypeOfAliveThreadDuringShutdown)799 TEST(ThreadPoolWorkerTest, BumpThreadTypeOfAliveThreadDuringShutdown) {
800   if (!CanUseBackgroundThreadTypeForWorkerThread())
801     return;
802 
803   TaskTracker task_tracker;
804   Thread service_thread = Thread("ServiceThread");
805   Thread::Options service_thread_options;
806   service_thread_options.message_pump_type = MessagePumpType::IO;
807   service_thread.StartWithOptions(std::move(service_thread_options));
808 
809   // Block shutdown to ensure that the worker doesn't exit when StartShutdown()
810   // is called.
811   scoped_refptr<Sequence> sequence =
812       MakeRefCounted<Sequence>(TaskTraits{TaskShutdownBehavior::BLOCK_SHUTDOWN},
813                                nullptr, TaskSourceExecutionMode::kParallel);
814   auto registered_task_source =
815       task_tracker.RegisterTaskSource(std::move(sequence));
816 
817   std::unique_ptr<ExpectThreadTypeDelegate> delegate(
818       new ExpectThreadTypeDelegate);
819   ExpectThreadTypeDelegate* delegate_raw = delegate.get();
820   delegate_raw->SetExpectedThreadType(ThreadType::kBackground);
821   auto worker =
822       MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kBackground, std::move(delegate),
823                                    task_tracker.GetTrackedRef(), 0);
824   worker->Start(service_thread.task_runner());
825 
826   // Verify that the initial thread type is kBackground (or kNormal if thread
827   // type can't be increased).
828   worker->WakeUp();
829   delegate_raw->WaitForThreadTypeVerifiedInGetWork();
830 
831   // Verify that the thread type is bumped to kNormal during shutdown.
832   delegate_raw->SetExpectedThreadType(ThreadType::kDefault);
833   task_tracker.StartShutdown();
834   worker->WakeUp();
835   delegate_raw->WaitForThreadTypeVerifiedInGetWork();
836 
837   worker->JoinForTesting();
838 }
839 
840 namespace {
841 
842 class VerifyCallsToObserverDelegate : public WorkerThreadDefaultDelegate {
843  public:
VerifyCallsToObserverDelegate(test::MockWorkerThreadObserver * observer)844   explicit VerifyCallsToObserverDelegate(
845       test::MockWorkerThreadObserver* observer)
846       : observer_(observer) {}
847   VerifyCallsToObserverDelegate(const VerifyCallsToObserverDelegate&) = delete;
848   VerifyCallsToObserverDelegate& operator=(
849       const VerifyCallsToObserverDelegate&) = delete;
850 
851   // WorkerThread::Delegate:
OnMainEntry(WorkerThread * worker)852   void OnMainEntry(WorkerThread* worker) override {
853     Mock::VerifyAndClear(observer_);
854   }
855 
OnMainExit(WorkerThread * worker)856   void OnMainExit(WorkerThread* worker) override {
857     observer_->AllowCallsOnMainExit(1);
858   }
859 
860  private:
861   const raw_ptr<test::MockWorkerThreadObserver> observer_;
862 };
863 
864 }  // namespace
865 
866 // Verify that the WorkerThreadObserver is notified when the worker enters
867 // and exits its main function.
TEST(ThreadPoolWorkerTest,WorkerThreadObserver)868 TEST(ThreadPoolWorkerTest, WorkerThreadObserver) {
869   StrictMock<test::MockWorkerThreadObserver> observer;
870   TaskTracker task_tracker;
871   Thread service_thread = Thread("ServiceThread");
872   Thread::Options service_thread_options;
873   service_thread_options.message_pump_type = MessagePumpType::IO;
874   service_thread.StartWithOptions(std::move(service_thread_options));
875   auto delegate = std::make_unique<VerifyCallsToObserverDelegate>(&observer);
876   auto worker =
877       MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate),
878                                    task_tracker.GetTrackedRef(), 0);
879   EXPECT_CALL(observer, OnWorkerThreadMainEntry());
880   worker->Start(service_thread.task_runner(), &observer);
881   worker->Cleanup();
882   // Join the worker to avoid leaks.
883   worker->JoinForTesting();
884   Mock::VerifyAndClear(&observer);
885 }
886 
887 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
888     PA_CONFIG(THREAD_CACHE_SUPPORTED)
889 namespace {
FreeForTest(void * data)890 NOINLINE void FreeForTest(void* data) {
891   free(data);
892 }
893 }  // namespace
894 
895 class WorkerThreadThreadCacheDelegate : public WorkerThreadDefaultDelegate {
896  public:
WaitForWork()897   void WaitForWork() override {
898     // Fill several buckets before going to sleep.
899     for (size_t size = 8;
900          size < partition_alloc::ThreadCache::kDefaultSizeThreshold; size++) {
901       void* data = malloc(size);
902       // A simple malloc() / free() pair can be discarded by the compiler (and
903       // is), making the test fail. It is sufficient to make |FreeForTest()| a
904       // NOINLINE function for the call to not be eliminated, but it is
905       // required.
906       FreeForTest(data);
907     }
908 
909     size_t cached_memory_before =
910         partition_alloc::ThreadCache::Get()->CachedMemory();
911     WorkerThreadDefaultDelegate::WaitForWork();
912     size_t cached_memory_after =
913         partition_alloc::ThreadCache::Get()->CachedMemory();
914 
915     if (!first_wakeup_done_) {
916       // First time we sleep is a short sleep, no cache purging.
917       //
918       // Here and below, cannot assert on exact thread cache size, since
919       // anything that allocates will make it fluctuate.
920       EXPECT_GT(cached_memory_after, cached_memory_before / 2);
921       first_wakeup_done_.store(true, std::memory_order_release);
922     } else {
923       // Second one is long, should purge.
924       EXPECT_LT(cached_memory_after, cached_memory_before / 2);
925     }
926   }
927 
928   std::atomic<bool> first_wakeup_done_{false};
929 };
930 
931 // TODO(crbug.com/1469364): Re-enable this test on Mac.
932 #if BUILDFLAG(IS_MAC)
933 #define MAYBE_Purge DISABLED_Purge
934 #else
935 #define MAYBE_Purge Purge
936 #endif
TEST(ThreadPoolWorkerThreadCachePurgeTest,MAYBE_Purge)937 TEST(ThreadPoolWorkerThreadCachePurgeTest, MAYBE_Purge) {
938   // Make sure the thread cache is enabled in the main partition.
939   partition_alloc::internal::ThreadCacheProcessScopeForTesting scope(
940       allocator_shim::internal::PartitionAllocMalloc::Allocator());
941 
942   Thread service_thread = Thread("ServiceThread");
943   Thread::Options service_thread_options;
944   service_thread_options.message_pump_type = MessagePumpType::IO;
945   service_thread.StartWithOptions(std::move(service_thread_options));
946   TaskTracker task_tracker;
947   auto delegate = std::make_unique<WorkerThreadThreadCacheDelegate>();
948   auto* delegate_raw = delegate.get();
949   auto worker =
950       MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate),
951                                    task_tracker.GetTrackedRef(), 0);
952   // Wake up before the thread is started to make sure the first sleep is short.
953   worker->WakeUp();
954   worker->Start(service_thread.task_runner(), nullptr);
955 
956   while (delegate_raw->first_wakeup_done_.load(std::memory_order_acquire)) {
957   }
958 
959   // Have to use real sleep unfortunately rather than virtual time, because
960   // WaitableEvent uses the non-overridable variant of TimeTicks.
961   PlatformThread::Sleep(1.1 *
962                         WorkerThread::Delegate::kPurgeThreadCacheIdleDelay);
963   worker->JoinForTesting();
964 }
965 
966 #endif  // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
967         // PA_CONFIG(THREAD_CACHE_SUPPORTED)
968 
969 }  // namespace internal
970 }  // namespace base
971