1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/worker_thread.h"
6
7 #include <stddef.h>
8
9 #include <atomic>
10 #include <memory>
11 #include <utility>
12 #include <vector>
13
14 #include "base/compiler_specific.h"
15 #include "base/functional/bind.h"
16 #include "base/functional/callback_helpers.h"
17 #include "base/memory/ptr_util.h"
18 #include "base/memory/raw_ptr.h"
19 #include "base/memory/ref_counted.h"
20 #include "base/message_loop/message_pump_type.h"
21 #include "base/synchronization/condition_variable.h"
22 #include "base/task/common/checked_lock.h"
23 #include "base/task/thread_pool/environment_config.h"
24 #include "base/task/thread_pool/sequence.h"
25 #include "base/task/thread_pool/task.h"
26 #include "base/task/thread_pool/task_tracker.h"
27 #include "base/task/thread_pool/test_utils.h"
28 #include "base/task/thread_pool/worker_thread_observer.h"
29 #include "base/test/bind.h"
30 #include "base/test/test_timeouts.h"
31 #include "base/test/test_waitable_event.h"
32 #include "base/threading/platform_thread.h"
33 #include "base/threading/simple_thread.h"
34 #include "base/threading/thread.h"
35 #include "base/time/time.h"
36 #include "build/build_config.h"
37 #include "partition_alloc/buildflags.h"
38 #include "partition_alloc/partition_alloc_config.h"
39 #include "partition_alloc/shim/allocator_shim.h"
40 #include "partition_alloc/shim/allocator_shim_default_dispatch_to_partition_alloc.h"
41 #include "testing/gmock/include/gmock/gmock.h"
42 #include "testing/gtest/include/gtest/gtest.h"
43
44 #if PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
45 PA_CONFIG(THREAD_CACHE_SUPPORTED)
46 #include "partition_alloc/extended_api.h"
47 #include "partition_alloc/thread_cache.h"
48 #endif // PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
49 // PA_CONFIG(THREAD_CACHE_SUPPORTED)
50
51 using testing::_;
52 using testing::Mock;
53 using testing::Ne;
54 using testing::StrictMock;
55
56 namespace base::internal {
57 namespace {
58
59 const size_t kNumSequencesPerTest = 150;
60
61 class WorkerThreadDefaultDelegate : public WorkerThread::Delegate {
62 public:
63 WorkerThreadDefaultDelegate() = default;
64 WorkerThreadDefaultDelegate(const WorkerThreadDefaultDelegate&) = delete;
65 WorkerThreadDefaultDelegate& operator=(const WorkerThreadDefaultDelegate&) =
66 delete;
67
68 // WorkerThread::Delegate:
GetThreadLabel() const69 WorkerThread::ThreadLabel GetThreadLabel() const override {
70 return WorkerThread::ThreadLabel::DEDICATED;
71 }
OnMainEntry(WorkerThread * worker)72 void OnMainEntry(WorkerThread* worker) override {}
GetWork(WorkerThread * worker)73 RegisteredTaskSource GetWork(WorkerThread* worker) override {
74 return nullptr;
75 }
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)76 RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
77 WorkerThread* worker) override {
78 ADD_FAILURE() << "Unexpected call to SwapProcessedTask()";
79 return nullptr;
80 }
GetSleepTimeout()81 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
82 };
83 // The test parameter is the number of Tasks per Sequence returned by GetWork().
84
85 class ThreadPoolWorkerTest : public testing::Test {
86 public:
87 ThreadPoolWorkerTest(const ThreadPoolWorkerTest&) = delete;
88 ThreadPoolWorkerTest& operator=(const ThreadPoolWorkerTest&) = delete;
89
90 protected:
ThreadPoolWorkerTest()91 ThreadPoolWorkerTest() {
92 Thread::Options service_thread_options;
93 service_thread_options.message_pump_type = MessagePumpType::IO;
94 service_thread_.StartWithOptions(std::move(service_thread_options));
95 }
96
97 Thread service_thread_ = Thread("ServiceThread");
98 };
99
100 // The test parameter is the number of Tasks per Sequence returned by GetWork().
101 class ThreadPoolWorkerTestParam : public testing::TestWithParam<int> {
102 public:
103 ThreadPoolWorkerTestParam(const ThreadPoolWorkerTestParam&) = delete;
104 ThreadPoolWorkerTestParam& operator=(const ThreadPoolWorkerTestParam&) =
105 delete;
106
107 protected:
ThreadPoolWorkerTestParam()108 ThreadPoolWorkerTestParam()
109 : num_get_work_cv_(lock_.CreateConditionVariable()) {
110 Thread::Options service_thread_options;
111 service_thread_options.message_pump_type = MessagePumpType::IO;
112 service_thread_.StartWithOptions(std::move(service_thread_options));
113 }
114
SetUp()115 void SetUp() override {
116 worker_ = MakeRefCounted<WorkerThread>(
117 ThreadType::kDefault,
118 std::make_unique<TestWorkerThreadDelegate>(this),
119 task_tracker_.GetTrackedRef(), 0);
120 ASSERT_TRUE(worker_);
121 worker_->Start(service_thread_.task_runner());
122 worker_set_.Signal();
123 main_entry_called_.Wait();
124 }
125
TearDown()126 void TearDown() override {
127 // |worker_| needs to be released before ~TaskTracker() as it holds a
128 // TrackedRef to it.
129 worker_->JoinForTesting();
130 worker_ = nullptr;
131 }
132
TasksPerSequence() const133 int TasksPerSequence() const { return GetParam(); }
134
135 // Wait until GetWork() has been called |num_get_work| times.
WaitForNumGetWork(size_t num_get_work)136 void WaitForNumGetWork(size_t num_get_work) {
137 CheckedAutoLock auto_lock(lock_);
138 while (num_get_work_ < num_get_work) {
139 num_get_work_cv_.Wait();
140 }
141 }
142
SetMaxGetWork(size_t max_get_work)143 void SetMaxGetWork(size_t max_get_work) {
144 CheckedAutoLock auto_lock(lock_);
145 max_get_work_ = max_get_work;
146 }
147
SetNumSequencesToCreate(size_t num_sequences_to_create)148 void SetNumSequencesToCreate(size_t num_sequences_to_create) {
149 CheckedAutoLock auto_lock(lock_);
150 EXPECT_EQ(0U, num_sequences_to_create_);
151 num_sequences_to_create_ = num_sequences_to_create;
152 }
153
NumRunTasks()154 size_t NumRunTasks() {
155 CheckedAutoLock auto_lock(lock_);
156 return num_run_tasks_;
157 }
158
CreatedTaskSources()159 std::vector<scoped_refptr<TaskSource>> CreatedTaskSources() {
160 CheckedAutoLock auto_lock(lock_);
161 return created_sequences_;
162 }
163
DidProcessTaskSequences()164 std::vector<scoped_refptr<TaskSource>> DidProcessTaskSequences() {
165 CheckedAutoLock auto_lock(lock_);
166 return did_run_task_sources_;
167 }
168
169 scoped_refptr<WorkerThread> worker_;
170 Thread service_thread_ = Thread("ServiceThread");
171
172 private:
173 class TestWorkerThreadDelegate
174 : public WorkerThreadDefaultDelegate {
175 public:
TestWorkerThreadDelegate(ThreadPoolWorkerTestParam * outer)176 explicit TestWorkerThreadDelegate(ThreadPoolWorkerTestParam* outer)
177 : outer_(outer) {}
178 TestWorkerThreadDelegate(
179 const TestWorkerThreadDelegate&) = delete;
180 TestWorkerThreadDelegate& operator=(
181 const TestWorkerThreadDelegate&) = delete;
182
~TestWorkerThreadDelegate()183 ~TestWorkerThreadDelegate() override {
184 EXPECT_FALSE(IsCallToDidProcessTaskExpected());
185 }
186
187 // WorkerThread::Delegate:
OnMainEntry(WorkerThread * worker)188 void OnMainEntry(WorkerThread* worker) override {
189 outer_->worker_set_.Wait();
190 EXPECT_EQ(outer_->worker_.get(), worker);
191 EXPECT_FALSE(IsCallToDidProcessTaskExpected());
192
193 // Without synchronization, OnMainEntry() could be called twice without
194 // generating an error.
195 CheckedAutoLock auto_lock(outer_->lock_);
196 EXPECT_FALSE(outer_->main_entry_called_.IsSignaled());
197 outer_->main_entry_called_.Signal();
198 }
199
GetWork(WorkerThread * worker)200 RegisteredTaskSource GetWork(WorkerThread* worker) override {
201 EXPECT_FALSE(IsCallToDidProcessTaskExpected());
202 EXPECT_EQ(outer_->worker_.get(), worker);
203
204 {
205 CheckedAutoLock auto_lock(outer_->lock_);
206
207 // Increment the number of times that this method has been called.
208 ++outer_->num_get_work_;
209 outer_->num_get_work_cv_.Signal();
210
211 // Verify that this method isn't called more times than expected.
212 EXPECT_LE(outer_->num_get_work_, outer_->max_get_work_);
213
214 // Check if a Sequence should be returned.
215 if (outer_->num_sequences_to_create_ == 0) {
216 return nullptr;
217 }
218 --outer_->num_sequences_to_create_;
219 }
220
221 // Create a Sequence with TasksPerSequence() Tasks.
222 scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
223 TaskTraits(), nullptr, TaskSourceExecutionMode::kParallel);
224 Sequence::Transaction sequence_transaction(sequence->BeginTransaction());
225 for (int i = 0; i < outer_->TasksPerSequence(); ++i) {
226 Task task(FROM_HERE,
227 BindOnce(&ThreadPoolWorkerTestParam::RunTaskCallback,
228 Unretained(outer_)),
229 TimeTicks::Now(), TimeDelta());
230 sequence_transaction.WillPushImmediateTask();
231 EXPECT_TRUE(outer_->task_tracker_.WillPostTask(
232 &task, sequence->shutdown_behavior()));
233 sequence_transaction.PushImmediateTask(std::move(task));
234 }
235 auto registered_task_source =
236 outer_->task_tracker_.RegisterTaskSource(sequence);
237 EXPECT_TRUE(registered_task_source);
238
239 ExpectCallToDidProcessTask();
240
241 {
242 // Add the Sequence to the vector of created Sequences.
243 CheckedAutoLock auto_lock(outer_->lock_);
244 outer_->created_sequences_.push_back(sequence);
245 }
246 auto run_status = registered_task_source.WillRunTask();
247 EXPECT_NE(run_status, TaskSource::RunStatus::kDisallowed);
248 return registered_task_source;
249 }
250
SwapProcessedTask(RegisteredTaskSource registered_task_source,WorkerThread * worker)251 RegisteredTaskSource SwapProcessedTask(
252 RegisteredTaskSource registered_task_source,
253 WorkerThread* worker) override {
254 {
255 CheckedAutoLock auto_lock(expect_did_run_task_lock_);
256 EXPECT_TRUE(expect_did_run_task_);
257 expect_did_run_task_ = false;
258 }
259
260 // If TasksPerSequence() is 1, |registered_task_source| should be nullptr.
261 // Otherwise, |registered_task_source| should contain TasksPerSequence() -
262 // 1 Tasks.
263 if (outer_->TasksPerSequence() == 1) {
264 EXPECT_FALSE(registered_task_source);
265 } else {
266 EXPECT_TRUE(registered_task_source);
267 EXPECT_TRUE(registered_task_source.WillReEnqueue(TimeTicks::Now()));
268
269 // Verify the number of Tasks in |registered_task_source|.
270 for (int i = 0; i < outer_->TasksPerSequence() - 1; ++i) {
271 registered_task_source.WillRunTask();
272 IgnoreResult(registered_task_source.TakeTask());
273 if (i < outer_->TasksPerSequence() - 2) {
274 EXPECT_TRUE(registered_task_source.DidProcessTask());
275 EXPECT_TRUE(registered_task_source.WillReEnqueue(TimeTicks::Now()));
276 } else {
277 EXPECT_FALSE(registered_task_source.DidProcessTask());
278 }
279 }
280 scoped_refptr<TaskSource> task_source =
281 registered_task_source.Unregister();
282 {
283 // Add |task_source| to |did_run_task_sources_|.
284 CheckedAutoLock auto_lock(outer_->lock_);
285 outer_->did_run_task_sources_.push_back(std::move(task_source));
286 EXPECT_LE(outer_->did_run_task_sources_.size(),
287 outer_->created_sequences_.size());
288 }
289 }
290 return GetWork(worker);
291 }
292
293 private:
294 // Expect a call to DidProcessTask() before the next call to any other
295 // method of this delegate.
ExpectCallToDidProcessTask()296 void ExpectCallToDidProcessTask() {
297 CheckedAutoLock auto_lock(expect_did_run_task_lock_);
298 expect_did_run_task_ = true;
299 }
300
IsCallToDidProcessTaskExpected() const301 bool IsCallToDidProcessTaskExpected() const {
302 CheckedAutoLock auto_lock(expect_did_run_task_lock_);
303 return expect_did_run_task_;
304 }
305
306 raw_ptr<ThreadPoolWorkerTestParam> outer_;
307
308 // Synchronizes access to |expect_did_run_task_|.
309 mutable CheckedLock expect_did_run_task_lock_;
310
311 // Whether the next method called on this delegate should be
312 // DidProcessTask().
313 bool expect_did_run_task_ = false;
314 };
315
RunTaskCallback()316 void RunTaskCallback() {
317 CheckedAutoLock auto_lock(lock_);
318 ++num_run_tasks_;
319 EXPECT_LE(num_run_tasks_, created_sequences_.size());
320 }
321
322 TaskTracker task_tracker_;
323
324 // Synchronizes access to all members below.
325 mutable CheckedLock lock_;
326
327 // Signaled once OnMainEntry() has been called.
328 TestWaitableEvent main_entry_called_;
329
330 // Number of Sequences that should be created by GetWork(). When this
331 // is 0, GetWork() returns nullptr.
332 size_t num_sequences_to_create_ = 0;
333
334 // Number of times that GetWork() has been called.
335 size_t num_get_work_ = 0;
336
337 // Maximum number of times that GetWork() can be called.
338 size_t max_get_work_ = 0;
339
340 // Condition variable signaled when |num_get_work_| is incremented.
341 ConditionVariable num_get_work_cv_;
342
343 // Sequences created by GetWork().
344 std::vector<scoped_refptr<TaskSource>> created_sequences_;
345
346 // Sequences passed to DidProcessTask().
347 std::vector<scoped_refptr<TaskSource>> did_run_task_sources_;
348
349 // Number of times that RunTaskCallback() has been called.
350 size_t num_run_tasks_ = 0;
351
352 // Signaled after |worker_| is set.
353 TestWaitableEvent worker_set_;
354 };
355
356 } // namespace
357
358 // Verify that when GetWork() continuously returns Sequences, all Tasks in these
359 // Sequences run successfully. The test wakes up the WorkerThread once.
TEST_P(ThreadPoolWorkerTestParam,ContinuousWork)360 TEST_P(ThreadPoolWorkerTestParam, ContinuousWork) {
361 // Set GetWork() to return |kNumSequencesPerTest| Sequences before starting to
362 // return nullptr.
363 SetNumSequencesToCreate(kNumSequencesPerTest);
364
365 // Expect |kNumSequencesPerTest| calls to GetWork() in which it returns a
366 // Sequence and one call in which its returns nullptr.
367 const size_t kExpectedNumGetWork = kNumSequencesPerTest + 1;
368 SetMaxGetWork(kExpectedNumGetWork);
369
370 // Wake up |worker_| and wait until GetWork() has been invoked the
371 // expected amount of times.
372 worker_->WakeUp();
373 WaitForNumGetWork(kExpectedNumGetWork);
374
375 // All tasks should have run.
376 EXPECT_EQ(kNumSequencesPerTest, NumRunTasks());
377
378 // If Sequences returned by GetWork() contain more than one Task, they aren't
379 // empty after the worker pops Tasks from them and thus should be returned to
380 // DidProcessTask().
381 if (TasksPerSequence() > 1) {
382 EXPECT_EQ(CreatedTaskSources(), DidProcessTaskSequences());
383 } else {
384 EXPECT_TRUE(DidProcessTaskSequences().empty());
385 }
386 }
387
388 // Verify that when GetWork() alternates between returning a Sequence and
389 // returning nullptr, all Tasks in the returned Sequences run successfully. The
390 // test wakes up the WorkerThread once for each Sequence.
TEST_P(ThreadPoolWorkerTestParam,IntermittentWork)391 TEST_P(ThreadPoolWorkerTestParam, IntermittentWork) {
392 for (size_t i = 0; i < kNumSequencesPerTest; ++i) {
393 // Set GetWork() to return 1 Sequence before starting to return
394 // nullptr.
395 SetNumSequencesToCreate(1);
396
397 // Expect |i + 1| calls to GetWork() in which it returns a Sequence and
398 // |i + 1| calls in which it returns nullptr.
399 const size_t expected_num_get_work = 2 * (i + 1);
400 SetMaxGetWork(expected_num_get_work);
401
402 // Wake up |worker_| and wait until GetWork() has been invoked
403 // the expected amount of times.
404 worker_->WakeUp();
405 WaitForNumGetWork(expected_num_get_work);
406
407 // The Task should have run
408 EXPECT_EQ(i + 1, NumRunTasks());
409
410 // If Sequences returned by GetWork() contain more than one Task, they
411 // aren't empty after the worker pops Tasks from them and thus should be
412 // returned to DidProcessTask().
413 if (TasksPerSequence() > 1) {
414 EXPECT_EQ(CreatedTaskSources(), DidProcessTaskSequences());
415 } else {
416 EXPECT_TRUE(DidProcessTaskSequences().empty());
417 }
418 }
419 }
420
421 INSTANTIATE_TEST_SUITE_P(OneTaskPerSequence,
422 ThreadPoolWorkerTestParam,
423 ::testing::Values(1));
424 INSTANTIATE_TEST_SUITE_P(TwoTasksPerSequence,
425 ThreadPoolWorkerTestParam,
426 ::testing::Values(2));
427
428 namespace {
429
430 class ControllableCleanupDelegate : public WorkerThreadDefaultDelegate {
431 public:
432 class Controls : public RefCountedThreadSafe<Controls> {
433 public:
434 Controls() = default;
435 Controls(const Controls&) = delete;
436 Controls& operator=(const Controls&) = delete;
437
HaveWorkBlock()438 void HaveWorkBlock() { work_running_.Reset(); }
439
UnblockWork()440 void UnblockWork() { work_running_.Signal(); }
441
WaitForWorkToRun()442 void WaitForWorkToRun() { work_processed_.Wait(); }
443
WaitForCleanupRequest()444 void WaitForCleanupRequest() { cleanup_requested_.Wait(); }
445
WaitForDelegateDestroy()446 void WaitForDelegateDestroy() { destroyed_.Wait(); }
447
WaitForMainExit()448 void WaitForMainExit() { exited_.Wait(); }
449
set_expect_get_work(bool expect_get_work)450 void set_expect_get_work(bool expect_get_work) {
451 expect_get_work_ = expect_get_work;
452 }
453
ResetState()454 void ResetState() {
455 work_running_.Signal();
456 work_processed_.Reset();
457 cleanup_requested_.Reset();
458 exited_.Reset();
459 work_requested_ = false;
460 }
461
set_can_cleanup(bool can_cleanup)462 void set_can_cleanup(bool can_cleanup) { can_cleanup_ = can_cleanup; }
463
464 private:
465 friend class ControllableCleanupDelegate;
466 friend class RefCountedThreadSafe<Controls>;
467 ~Controls() = default;
468
469 TestWaitableEvent work_running_{WaitableEvent::ResetPolicy::MANUAL,
470 WaitableEvent::InitialState::SIGNALED};
471 TestWaitableEvent work_processed_;
472 TestWaitableEvent cleanup_requested_;
473 TestWaitableEvent destroyed_;
474 TestWaitableEvent exited_;
475
476 bool expect_get_work_ = true;
477 bool can_cleanup_ = false;
478 bool work_requested_ = false;
479 };
480
ControllableCleanupDelegate(TaskTracker * task_tracker)481 explicit ControllableCleanupDelegate(TaskTracker* task_tracker)
482 : task_tracker_(task_tracker), controls_(new Controls()) {}
483
484 ControllableCleanupDelegate(const ControllableCleanupDelegate&) = delete;
485 ControllableCleanupDelegate& operator=(const ControllableCleanupDelegate&) =
486 delete;
~ControllableCleanupDelegate()487 ~ControllableCleanupDelegate() override { controls_->destroyed_.Signal(); }
488
GetWork(WorkerThread * worker)489 RegisteredTaskSource GetWork(WorkerThread* worker) override {
490 EXPECT_TRUE(controls_->expect_get_work_);
491
492 // Sends one item of work to signal |work_processed_|. On subsequent calls,
493 // sends nullptr to indicate there's no more work to be done.
494 if (controls_->work_requested_) {
495 if (CanCleanup(worker)) {
496 OnCleanup();
497 worker->Cleanup();
498 controls_->set_expect_get_work(false);
499 }
500 return nullptr;
501 }
502
503 controls_->work_requested_ = true;
504 scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
505 TaskTraits(WithBaseSyncPrimitives(),
506 TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN),
507 nullptr, TaskSourceExecutionMode::kParallel);
508 Task task(FROM_HERE,
509 BindOnce(
510 [](TestWaitableEvent* work_processed,
511 TestWaitableEvent* work_running) {
512 work_processed->Signal();
513 work_running->Wait();
514 },
515 Unretained(&controls_->work_processed_),
516 Unretained(&controls_->work_running_)),
517 TimeTicks::Now(), TimeDelta());
518 auto transaction = sequence->BeginTransaction();
519 transaction.WillPushImmediateTask();
520 EXPECT_TRUE(
521 task_tracker_->WillPostTask(&task, sequence->shutdown_behavior()));
522 transaction.PushImmediateTask(std::move(task));
523 auto registered_task_source =
524 task_tracker_->RegisterTaskSource(std::move(sequence));
525 EXPECT_TRUE(registered_task_source);
526 registered_task_source.WillRunTask();
527 return registered_task_source;
528 }
529
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)530 RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
531 WorkerThread* worker) override {
532 return GetWork(worker);
533 }
534
OnMainExit(WorkerThread * worker)535 void OnMainExit(WorkerThread* worker) override {
536 controls_->exited_.Signal();
537 }
538
CanCleanup(WorkerThread * worker)539 bool CanCleanup(WorkerThread* worker) {
540 // Saving |can_cleanup_| now so that callers waiting on |cleanup_requested_|
541 // have the thread go to sleep and then allow timing out.
542 bool can_cleanup = controls_->can_cleanup_;
543 controls_->cleanup_requested_.Signal();
544 return can_cleanup;
545 }
546
OnCleanup()547 void OnCleanup() {
548 EXPECT_TRUE(controls_->can_cleanup_);
549 EXPECT_TRUE(controls_->cleanup_requested_.IsSignaled());
550 }
551
552 // ControllableCleanupDelegate:
controls()553 scoped_refptr<Controls> controls() { return controls_; }
554
555 private:
556 scoped_refptr<Sequence> work_sequence_;
557 const raw_ptr<TaskTracker> task_tracker_;
558 scoped_refptr<Controls> controls_;
559 };
560
561 class MockedControllableCleanupDelegate : public ControllableCleanupDelegate {
562 public:
MockedControllableCleanupDelegate(TaskTracker * task_tracker)563 explicit MockedControllableCleanupDelegate(TaskTracker* task_tracker)
564 : ControllableCleanupDelegate(task_tracker) {}
565 MockedControllableCleanupDelegate(const MockedControllableCleanupDelegate&) =
566 delete;
567 MockedControllableCleanupDelegate& operator=(
568 const MockedControllableCleanupDelegate&) = delete;
569 ~MockedControllableCleanupDelegate() override = default;
570
571 // WorkerThread::Delegate:
572 MOCK_METHOD1(OnMainEntry, void(WorkerThread* worker));
573 };
574
575 } // namespace
576
577 // Verify that calling WorkerThread::Cleanup() from GetWork() causes
578 // the WorkerThread's thread to exit.
TEST_F(ThreadPoolWorkerTest,WorkerCleanupFromGetWork)579 TEST_F(ThreadPoolWorkerTest, WorkerCleanupFromGetWork) {
580 TaskTracker task_tracker;
581 // Will be owned by WorkerThread.
582 MockedControllableCleanupDelegate* delegate =
583 new StrictMock<MockedControllableCleanupDelegate>(&task_tracker);
584 scoped_refptr<ControllableCleanupDelegate::Controls> controls =
585 delegate->controls();
586 controls->set_can_cleanup(true);
587 EXPECT_CALL(*delegate, OnMainEntry(_));
588 auto worker = MakeRefCounted<WorkerThread>(
589 ThreadType::kDefault, WrapUnique(delegate), task_tracker.GetTrackedRef(),
590 0);
591 worker->Start(service_thread_.task_runner());
592 worker->WakeUp();
593 controls->WaitForWorkToRun();
594 Mock::VerifyAndClear(delegate);
595 controls->WaitForMainExit();
596 // Join the worker to avoid leaks.
597 worker->JoinForTesting();
598 }
599
TEST_F(ThreadPoolWorkerTest,WorkerCleanupDuringWork)600 TEST_F(ThreadPoolWorkerTest, WorkerCleanupDuringWork) {
601 TaskTracker task_tracker;
602 // Will be owned by WorkerThread.
603 // No mock here as that's reasonably covered by other tests and the delegate
604 // may destroy on a different thread. Mocks aren't designed with that in mind.
605 std::unique_ptr<ControllableCleanupDelegate> delegate =
606 std::make_unique<ControllableCleanupDelegate>(&task_tracker);
607 scoped_refptr<ControllableCleanupDelegate::Controls> controls =
608 delegate->controls();
609
610 controls->HaveWorkBlock();
611
612 auto worker = MakeRefCounted<WorkerThread>(
613 ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
614 0);
615 worker->Start(service_thread_.task_runner());
616 worker->WakeUp();
617
618 controls->WaitForWorkToRun();
619 worker->Cleanup();
620 worker = nullptr;
621 controls->UnblockWork();
622 controls->WaitForDelegateDestroy();
623 }
624
TEST_F(ThreadPoolWorkerTest,WorkerCleanupDuringWait)625 TEST_F(ThreadPoolWorkerTest, WorkerCleanupDuringWait) {
626 TaskTracker task_tracker;
627 // Will be owned by WorkerThread.
628 // No mock here as that's reasonably covered by other tests and the delegate
629 // may destroy on a different thread. Mocks aren't designed with that in mind.
630 std::unique_ptr<ControllableCleanupDelegate> delegate =
631 std::make_unique<ControllableCleanupDelegate>(&task_tracker);
632 scoped_refptr<ControllableCleanupDelegate::Controls> controls =
633 delegate->controls();
634
635 auto worker = MakeRefCounted<WorkerThread>(
636 ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
637 0);
638 worker->Start(service_thread_.task_runner());
639 worker->WakeUp();
640
641 controls->WaitForCleanupRequest();
642 worker->Cleanup();
643 worker = nullptr;
644 controls->WaitForDelegateDestroy();
645 }
646
TEST_F(ThreadPoolWorkerTest,WorkerCleanupDuringShutdown)647 TEST_F(ThreadPoolWorkerTest, WorkerCleanupDuringShutdown) {
648 TaskTracker task_tracker;
649 // Will be owned by WorkerThread.
650 // No mock here as that's reasonably covered by other tests and the delegate
651 // may destroy on a different thread. Mocks aren't designed with that in mind.
652 std::unique_ptr<ControllableCleanupDelegate> delegate =
653 std::make_unique<ControllableCleanupDelegate>(&task_tracker);
654 scoped_refptr<ControllableCleanupDelegate::Controls> controls =
655 delegate->controls();
656
657 controls->HaveWorkBlock();
658
659 auto worker = MakeRefCounted<WorkerThread>(
660 ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
661 0);
662 worker->Start(service_thread_.task_runner());
663 worker->WakeUp();
664
665 controls->WaitForWorkToRun();
666 test::ShutdownTaskTracker(&task_tracker);
667 worker->Cleanup();
668 worker = nullptr;
669 controls->UnblockWork();
670 controls->WaitForDelegateDestroy();
671 }
672
673 // Verify that Start() is a no-op after Cleanup().
TEST_F(ThreadPoolWorkerTest,CleanupBeforeStart)674 TEST_F(ThreadPoolWorkerTest, CleanupBeforeStart) {
675 TaskTracker task_tracker;
676 // Will be owned by WorkerThread.
677 // No mock here as that's reasonably covered by other tests and the delegate
678 // may destroy on a different thread. Mocks aren't designed with that in mind.
679 std::unique_ptr<ControllableCleanupDelegate> delegate =
680 std::make_unique<ControllableCleanupDelegate>(&task_tracker);
681 scoped_refptr<ControllableCleanupDelegate::Controls> controls =
682 delegate->controls();
683 controls->set_expect_get_work(false);
684
685 auto worker = MakeRefCounted<WorkerThread>(
686 ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
687 0);
688
689 worker->Cleanup();
690 worker->Start(service_thread_.task_runner());
691
692 EXPECT_FALSE(worker->ThreadAliveForTesting());
693 }
694
695 namespace {
696
697 class CallJoinFromDifferentThread : public SimpleThread {
698 public:
CallJoinFromDifferentThread(WorkerThread * worker_to_join)699 explicit CallJoinFromDifferentThread(
700 WorkerThread* worker_to_join)
701 : SimpleThread("WorkerThreadJoinThread"),
702 worker_to_join_(worker_to_join) {}
703
704 CallJoinFromDifferentThread(const CallJoinFromDifferentThread&) = delete;
705 CallJoinFromDifferentThread& operator=(const CallJoinFromDifferentThread&) =
706 delete;
707 ~CallJoinFromDifferentThread() override = default;
708
Run()709 void Run() override {
710 run_started_event_.Signal();
711 worker_to_join_.ExtractAsDangling()->JoinForTesting();
712 }
713
WaitForRunToStart()714 void WaitForRunToStart() { run_started_event_.Wait(); }
715
716 private:
717 raw_ptr<WorkerThread> worker_to_join_;
718 TestWaitableEvent run_started_event_;
719 };
720
721 } // namespace
722
TEST_F(ThreadPoolWorkerTest,WorkerCleanupDuringJoin)723 TEST_F(ThreadPoolWorkerTest, WorkerCleanupDuringJoin) {
724 TaskTracker task_tracker;
725 // Will be owned by WorkerThread.
726 // No mock here as that's reasonably covered by other tests and the
727 // delegate may destroy on a different thread. Mocks aren't designed with that
728 // in mind.
729 std::unique_ptr<ControllableCleanupDelegate> delegate =
730 std::make_unique<ControllableCleanupDelegate>(&task_tracker);
731 scoped_refptr<ControllableCleanupDelegate::Controls> controls =
732 delegate->controls();
733
734 controls->HaveWorkBlock();
735
736 auto worker = MakeRefCounted<WorkerThread>(
737 ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
738 0);
739 worker->Start(service_thread_.task_runner());
740 worker->WakeUp();
741
742 controls->WaitForWorkToRun();
743 CallJoinFromDifferentThread join_from_different_thread(worker.get());
744 join_from_different_thread.Start();
745 join_from_different_thread.WaitForRunToStart();
746 // Sleep here to give the other thread a chance to call JoinForTesting().
747 // Receiving a signal that Run() was called doesn't mean JoinForTesting() was
748 // necessarily called, and we can't signal after JoinForTesting() as
749 // JoinForTesting() blocks until we call UnblockWork().
750 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
751 worker->Cleanup();
752 worker = nullptr;
753 controls->UnblockWork();
754 controls->WaitForDelegateDestroy();
755 join_from_different_thread.Join();
756 }
757
758 namespace {
759
760 class ExpectThreadTypeDelegate : public WorkerThreadDefaultDelegate {
761 public:
ExpectThreadTypeDelegate()762 ExpectThreadTypeDelegate()
763 : thread_type_verified_in_get_work_event_(
764 WaitableEvent::ResetPolicy::AUTOMATIC) {}
765 ExpectThreadTypeDelegate(const ExpectThreadTypeDelegate&) = delete;
766 ExpectThreadTypeDelegate& operator=(const ExpectThreadTypeDelegate&) = delete;
767
SetExpectedThreadType(ThreadType expected_thread_type)768 void SetExpectedThreadType(ThreadType expected_thread_type) {
769 expected_thread_type_ = expected_thread_type;
770 }
771
WaitForThreadTypeVerifiedInGetWork()772 void WaitForThreadTypeVerifiedInGetWork() {
773 thread_type_verified_in_get_work_event_.Wait();
774 }
775
776 // WorkerThread::Delegate:
OnMainEntry(WorkerThread * worker)777 void OnMainEntry(WorkerThread* worker) override { VerifyThreadType(); }
GetWork(WorkerThread * worker)778 RegisteredTaskSource GetWork(WorkerThread* worker) override {
779 VerifyThreadType();
780 thread_type_verified_in_get_work_event_.Signal();
781 return nullptr;
782 }
783
784 private:
VerifyThreadType()785 void VerifyThreadType() {
786 CheckedAutoLock auto_lock(expected_thread_type_lock_);
787 EXPECT_EQ(expected_thread_type_, PlatformThread::GetCurrentThreadType());
788 }
789
790 // Signaled after GetWork() has verified the thread type of the worker thread.
791 TestWaitableEvent thread_type_verified_in_get_work_event_;
792
793 // Synchronizes access to |expected_thread_type_|.
794 CheckedLock expected_thread_type_lock_;
795
796 // Expected thread type for the next call to OnMainEntry() or GetWork().
797 ThreadType expected_thread_type_ = ThreadType::kDefault;
798 };
799
800 } // namespace
801
TEST_F(ThreadPoolWorkerTest,BumpThreadTypeOfAliveThreadDuringShutdown)802 TEST_F(ThreadPoolWorkerTest, BumpThreadTypeOfAliveThreadDuringShutdown) {
803 if (!CanUseBackgroundThreadTypeForWorkerThread()) {
804 return;
805 }
806
807 TaskTracker task_tracker;
808
809 // Block shutdown to ensure that the worker doesn't exit when StartShutdown()
810 // is called.
811 scoped_refptr<Sequence> sequence =
812 MakeRefCounted<Sequence>(TaskTraits{TaskShutdownBehavior::BLOCK_SHUTDOWN},
813 nullptr, TaskSourceExecutionMode::kParallel);
814 auto registered_task_source =
815 task_tracker.RegisterTaskSource(std::move(sequence));
816
817 std::unique_ptr<ExpectThreadTypeDelegate> delegate(
818 new ExpectThreadTypeDelegate);
819 ExpectThreadTypeDelegate* delegate_raw = delegate.get();
820 delegate_raw->SetExpectedThreadType(ThreadType::kBackground);
821 auto worker = MakeRefCounted<WorkerThread>(
822 ThreadType::kBackground, std::move(delegate),
823 task_tracker.GetTrackedRef(), 0);
824 worker->Start(service_thread_.task_runner());
825
826 // Verify that the initial thread type is kBackground (or kNormal if thread
827 // type can't be increased).
828 worker->WakeUp();
829 delegate_raw->WaitForThreadTypeVerifiedInGetWork();
830
831 // Verify that the thread type is bumped to kNormal during shutdown.
832 delegate_raw->SetExpectedThreadType(ThreadType::kDefault);
833 task_tracker.StartShutdown();
834 worker->WakeUp();
835 delegate_raw->WaitForThreadTypeVerifiedInGetWork();
836
837 worker->JoinForTesting();
838 }
839
840 namespace {
841
842 class VerifyCallsToObserverDelegate : public WorkerThreadDefaultDelegate {
843 public:
VerifyCallsToObserverDelegate(test::MockWorkerThreadObserver * observer)844 explicit VerifyCallsToObserverDelegate(
845 test::MockWorkerThreadObserver* observer)
846 : observer_(observer) {}
847 VerifyCallsToObserverDelegate(const VerifyCallsToObserverDelegate&) = delete;
848 VerifyCallsToObserverDelegate& operator=(
849 const VerifyCallsToObserverDelegate&) = delete;
850
851 // WorkerThread::Delegate:
OnMainEntry(WorkerThread * worker)852 void OnMainEntry(WorkerThread* worker) override {
853 Mock::VerifyAndClear(observer_);
854 }
855
OnMainExit(WorkerThread * worker)856 void OnMainExit(WorkerThread* worker) override {
857 observer_->AllowCallsOnMainExit(1);
858 }
859
860 private:
861 const raw_ptr<test::MockWorkerThreadObserver> observer_;
862 };
863
864 } // namespace
865
866 // Verify that the WorkerThreadObserver is notified when the worker enters
867 // and exits its main function.
TEST_F(ThreadPoolWorkerTest,WorkerThreadObserver)868 TEST_F(ThreadPoolWorkerTest, WorkerThreadObserver) {
869 StrictMock<test::MockWorkerThreadObserver> observer;
870 TaskTracker task_tracker;
871 auto delegate = std::make_unique<VerifyCallsToObserverDelegate>(&observer);
872 auto worker = MakeRefCounted<WorkerThread>(
873 ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
874 0);
875 EXPECT_CALL(observer, OnWorkerThreadMainEntry());
876 worker->Start(service_thread_.task_runner(), &observer);
877 worker->Cleanup();
878 // Join the worker to avoid leaks.
879 worker->JoinForTesting();
880 Mock::VerifyAndClear(&observer);
881 }
882
883 #if PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
884 PA_CONFIG(THREAD_CACHE_SUPPORTED)
885 namespace {
FreeForTest(void * data)886 NOINLINE void FreeForTest(void* data) {
887 free(data);
888 }
889
890 using ::testing::AllOf;
891 using ::testing::Ge;
892 using ::testing::Le;
893
894 } // namespace
895
896 class WorkerThreadThreadCacheDelegate : public WorkerThreadDefaultDelegate {
897 public:
PrepareForTesting()898 void PrepareForTesting() {
899 TimeTicks now = TimeTicks::Now();
900 WorkerThreadDefaultDelegate::set_first_sleep_time_for_testing(now);
901 first_sleep_time_for_testing_ = now;
902 }
903
GetSleepDurationBeforePurge(TimeTicks)904 TimeDelta GetSleepDurationBeforePurge(TimeTicks) override {
905 // Expect `GetSleepDurationBeforePurge()` to return
906 // `kFirstSleepDurationBeforePurge` when invoked within
907 // `kFirstSleepDurationBeforePurge` of the first sleep (with
908 // `kPurgeThreadCacheIdleDelay` tolerance due to alignment).
909 EXPECT_THAT(
910 WorkerThreadDefaultDelegate::GetSleepDurationBeforePurge(
911 /* now=*/first_sleep_time_for_testing_),
912 AllOf(Ge(WorkerThread::Delegate::kFirstSleepDurationBeforePurge),
913 Le(WorkerThread::Delegate::kFirstSleepDurationBeforePurge +
914 WorkerThread::Delegate::kPurgeThreadCacheIdleDelay)));
915
916 // Expect `GetSleepDurationBeforePurge()` to return
917 // `WorkerThread::Delegate::kPurgeThreadCacheIdleDelay` when invoked later
918 // than `kFirstSleepDurationBeforePurge` after the first sleep (with
919 // `kPurgeThreadCacheIdleDelay` tolerance due to alignment).
920 constexpr base::TimeDelta kEpsilon = base::Microseconds(1);
921 EXPECT_THAT(
922 WorkerThreadDefaultDelegate::GetSleepDurationBeforePurge(
923 /* now=*/first_sleep_time_for_testing_ +
924 WorkerThread::Delegate::kFirstSleepDurationBeforePurge + kEpsilon),
925 AllOf(Ge(WorkerThread::Delegate::kPurgeThreadCacheIdleDelay),
926 Le(WorkerThread::Delegate::kPurgeThreadCacheIdleDelay +
927 WorkerThread::Delegate::kPurgeThreadCacheIdleDelay)));
928
929 // The output of this function is used to drive real sleep in tests.
930 // Return a constant so that the tests can validate the wakeups without
931 // timing out.
932 return GetSleepTimeout();
933 }
934
WaitForWork()935 void WaitForWork() override {
936 // Fill several buckets before going to sleep.
937 for (size_t size = 8;
938 size < partition_alloc::ThreadCache::kDefaultSizeThreshold; size++) {
939 void* data = malloc(size);
940 // A simple malloc() / free() pair can be discarded by the compiler (and
941 // is), making the test fail. It is sufficient to make |FreeForTest()| a
942 // NOINLINE function for the call to not be eliminated, but it is
943 // required.
944 FreeForTest(data);
945 }
946
947 size_t cached_memory_before =
948 partition_alloc::ThreadCache::Get()->CachedMemory();
949 WorkerThreadDefaultDelegate::WaitForWork();
950 size_t cached_memory_after =
951 partition_alloc::ThreadCache::Get()->CachedMemory();
952
953 if (!test_done_) {
954 if (purge_expected_) {
955 EXPECT_LT(cached_memory_after, cached_memory_before / 2);
956 } else {
957 EXPECT_GT(cached_memory_after, cached_memory_before / 2);
958 }
959 // Unblock the test.
960 wakeup_done_.Signal();
961 test_done_ = true;
962 }
963 }
964
965 // Avoid using the default sleep timeout which is infinite and prevents the
966 // tests from completing.
GetSleepTimeout()967 TimeDelta GetSleepTimeout() override {
968 return WorkerThread::Delegate::kPurgeThreadCacheIdleDelay +
969 TestTimeouts::tiny_timeout();
970 }
971
SetPurgeExpectation(bool purge_expected)972 void SetPurgeExpectation(bool purge_expected) {
973 purge_expected_ = purge_expected;
974 }
975
976 TimeTicks first_sleep_time_for_testing_;
977 bool test_done_ = false;
978 bool purge_expected_ = false;
979 base::WaitableEvent wakeup_done_;
980 };
981
TEST_F(ThreadPoolWorkerTest,WorkerThreadCacheNoPurgeOnSignal)982 TEST_F(ThreadPoolWorkerTest, WorkerThreadCacheNoPurgeOnSignal) {
983 // Make sure the thread cache is enabled in the main partition.
984 partition_alloc::internal::ThreadCacheProcessScopeForTesting scope(
985 allocator_shim::internal::PartitionAllocMalloc::Allocator());
986
987 TaskTracker task_tracker;
988 auto delegate = std::make_unique<WorkerThreadThreadCacheDelegate>();
989 auto* delegate_raw = delegate.get();
990 auto worker = MakeRefCounted<WorkerThread>(
991 ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
992 0);
993 delegate_raw->PrepareForTesting();
994
995 // No purge is expected on waking up from a signal.
996 delegate_raw->SetPurgeExpectation(false);
997
998 // Wake up before the thread is started to make sure the first wakeup is
999 // caused by a signal.
1000 worker->WakeUp();
1001 worker->Start(service_thread_.task_runner(), nullptr);
1002
1003 // Wait until a wakeup has completed.
1004 delegate_raw->wakeup_done_.Wait();
1005 worker->JoinForTesting();
1006 }
1007
TEST_F(ThreadPoolWorkerTest,PurgeOnUninteruptedSleep)1008 TEST_F(ThreadPoolWorkerTest, PurgeOnUninteruptedSleep) {
1009 // Make sure the thread cache is enabled in the main partition.
1010 partition_alloc::internal::ThreadCacheProcessScopeForTesting scope(
1011 allocator_shim::internal::PartitionAllocMalloc::Allocator());
1012
1013 TaskTracker task_tracker;
1014 auto delegate = std::make_unique<WorkerThreadThreadCacheDelegate>();
1015 auto* delegate_raw = delegate.get();
1016 auto worker = MakeRefCounted<WorkerThread>(
1017 ThreadType::kDefault, std::move(delegate), task_tracker.GetTrackedRef(),
1018 0);
1019
1020 delegate_raw->PrepareForTesting();
1021
1022 // A purge will take place
1023 delegate_raw->SetPurgeExpectation(true);
1024
1025 worker->Start(service_thread_.task_runner(), nullptr);
1026
1027 // Wait until a wakeup has completed.
1028 delegate_raw->wakeup_done_.Wait();
1029 worker->JoinForTesting();
1030 }
1031
1032 #endif // PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
1033 // PA_CONFIG(THREAD_CACHE_SUPPORTED)
1034
1035 } // namespace base::internal
1036