1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/worker_thread_waitable_event.h"
6
7 #include <stddef.h>
8
9 #include <atomic>
10 #include <memory>
11 #include <utility>
12 #include <vector>
13
14 #include "base/allocator/partition_allocator/src/partition_alloc/partition_alloc_buildflags.h"
15 #include "base/allocator/partition_allocator/src/partition_alloc/partition_alloc_config.h"
16 #include "base/allocator/partition_allocator/src/partition_alloc/shim/allocator_shim.h"
17 #include "base/allocator/partition_allocator/src/partition_alloc/shim/allocator_shim_default_dispatch_to_partition_alloc.h"
18 #include "base/compiler_specific.h"
19 #include "base/functional/bind.h"
20 #include "base/functional/callback_helpers.h"
21 #include "base/memory/ptr_util.h"
22 #include "base/memory/raw_ptr.h"
23 #include "base/memory/ref_counted.h"
24 #include "base/message_loop/message_pump_type.h"
25 #include "base/synchronization/condition_variable.h"
26 #include "base/task/common/checked_lock.h"
27 #include "base/task/thread_pool/environment_config.h"
28 #include "base/task/thread_pool/sequence.h"
29 #include "base/task/thread_pool/task.h"
30 #include "base/task/thread_pool/task_tracker.h"
31 #include "base/task/thread_pool/test_utils.h"
32 #include "base/task/thread_pool/worker_thread_observer.h"
33 #include "base/test/test_timeouts.h"
34 #include "base/test/test_waitable_event.h"
35 #include "base/threading/platform_thread.h"
36 #include "base/threading/simple_thread.h"
37 #include "base/threading/thread.h"
38 #include "base/time/time.h"
39 #include "build/build_config.h"
40 #include "testing/gmock/include/gmock/gmock.h"
41 #include "testing/gtest/include/gtest/gtest.h"
42
43 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
44 PA_CONFIG(THREAD_CACHE_SUPPORTED)
45 #include "base/allocator/partition_allocator/src/partition_alloc/extended_api.h"
46 #include "base/allocator/partition_allocator/src/partition_alloc/thread_cache.h"
47 #endif // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
48 // PA_CONFIG(THREAD_CACHE_SUPPORTED)
49
50 using testing::_;
51 using testing::Mock;
52 using testing::Ne;
53 using testing::StrictMock;
54
55 namespace base {
56 namespace internal {
57 namespace {
58
59 const size_t kNumSequencesPerTest = 150;
60
61 class WorkerThreadDefaultDelegate : public WorkerThreadWaitableEvent::Delegate {
62 public:
63 WorkerThreadDefaultDelegate() = default;
64 WorkerThreadDefaultDelegate(const WorkerThreadDefaultDelegate&) = delete;
65 WorkerThreadDefaultDelegate& operator=(const WorkerThreadDefaultDelegate&) =
66 delete;
67
68 // WorkerThreadWaitableEvent::Delegate:
GetThreadLabel() const69 WorkerThread::ThreadLabel GetThreadLabel() const override {
70 return WorkerThread::ThreadLabel::DEDICATED;
71 }
OnMainEntry(WorkerThread * worker)72 void OnMainEntry(WorkerThread* worker) override {}
GetWork(WorkerThread * worker)73 RegisteredTaskSource GetWork(WorkerThread* worker) override {
74 return nullptr;
75 }
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)76 RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
77 WorkerThread* worker) override {
78 ADD_FAILURE() << "Unexpected call to SwapProcessedTask()";
79 return nullptr;
80 }
GetSleepTimeout()81 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
82 };
83
84 // The test parameter is the number of Tasks per Sequence returned by GetWork().
85 class ThreadPoolWorkerTest : public testing::TestWithParam<int> {
86 public:
87 ThreadPoolWorkerTest(const ThreadPoolWorkerTest&) = delete;
88 ThreadPoolWorkerTest& operator=(const ThreadPoolWorkerTest&) = delete;
89
90 protected:
ThreadPoolWorkerTest()91 ThreadPoolWorkerTest() : num_get_work_cv_(lock_.CreateConditionVariable()) {
92 Thread::Options service_thread_options;
93 service_thread_options.message_pump_type = MessagePumpType::IO;
94 service_thread_.StartWithOptions(std::move(service_thread_options));
95 }
96
SetUp()97 void SetUp() override {
98 worker_ = MakeRefCounted<WorkerThreadWaitableEvent>(
99 ThreadType::kDefault, std::make_unique<TestWorkerThreadWaitableEventDelegate>(this),
100 task_tracker_.GetTrackedRef(), 0);
101 ASSERT_TRUE(worker_);
102 worker_->Start(service_thread_.task_runner());
103 worker_set_.Signal();
104 main_entry_called_.Wait();
105 }
106
TearDown()107 void TearDown() override {
108 // |worker_| needs to be released before ~TaskTracker() as it holds a
109 // TrackedRef to it.
110 worker_->JoinForTesting();
111 worker_ = nullptr;
112 }
113
TasksPerSequence() const114 int TasksPerSequence() const { return GetParam(); }
115
116 // Wait until GetWork() has been called |num_get_work| times.
WaitForNumGetWork(size_t num_get_work)117 void WaitForNumGetWork(size_t num_get_work) {
118 CheckedAutoLock auto_lock(lock_);
119 while (num_get_work_ < num_get_work)
120 num_get_work_cv_->Wait();
121 }
122
SetMaxGetWork(size_t max_get_work)123 void SetMaxGetWork(size_t max_get_work) {
124 CheckedAutoLock auto_lock(lock_);
125 max_get_work_ = max_get_work;
126 }
127
SetNumSequencesToCreate(size_t num_sequences_to_create)128 void SetNumSequencesToCreate(size_t num_sequences_to_create) {
129 CheckedAutoLock auto_lock(lock_);
130 EXPECT_EQ(0U, num_sequences_to_create_);
131 num_sequences_to_create_ = num_sequences_to_create;
132 }
133
NumRunTasks()134 size_t NumRunTasks() {
135 CheckedAutoLock auto_lock(lock_);
136 return num_run_tasks_;
137 }
138
CreatedTaskSources()139 std::vector<scoped_refptr<TaskSource>> CreatedTaskSources() {
140 CheckedAutoLock auto_lock(lock_);
141 return created_sequences_;
142 }
143
DidProcessTaskSequences()144 std::vector<scoped_refptr<TaskSource>> DidProcessTaskSequences() {
145 CheckedAutoLock auto_lock(lock_);
146 return did_run_task_sources_;
147 }
148
149 scoped_refptr<WorkerThreadWaitableEvent> worker_;
150 Thread service_thread_ = Thread("ServiceThread");
151
152 private:
153 class TestWorkerThreadWaitableEventDelegate : public WorkerThreadDefaultDelegate {
154 public:
TestWorkerThreadWaitableEventDelegate(ThreadPoolWorkerTest * outer)155 explicit TestWorkerThreadWaitableEventDelegate(ThreadPoolWorkerTest* outer)
156 : outer_(outer) {}
157 TestWorkerThreadWaitableEventDelegate(const TestWorkerThreadWaitableEventDelegate&) = delete;
158 TestWorkerThreadWaitableEventDelegate& operator=(const TestWorkerThreadWaitableEventDelegate&) =
159 delete;
160
~TestWorkerThreadWaitableEventDelegate()161 ~TestWorkerThreadWaitableEventDelegate() override {
162 EXPECT_FALSE(IsCallToDidProcessTaskExpected());
163 }
164
165 // WorkerThread::Delegate:
OnMainEntry(WorkerThread * worker)166 void OnMainEntry(WorkerThread* worker) override {
167 outer_->worker_set_.Wait();
168 EXPECT_EQ(outer_->worker_.get(), worker);
169 EXPECT_FALSE(IsCallToDidProcessTaskExpected());
170
171 // Without synchronization, OnMainEntry() could be called twice without
172 // generating an error.
173 CheckedAutoLock auto_lock(outer_->lock_);
174 EXPECT_FALSE(outer_->main_entry_called_.IsSignaled());
175 outer_->main_entry_called_.Signal();
176 }
177
GetWork(WorkerThread * worker)178 RegisteredTaskSource GetWork(WorkerThread* worker) override {
179 EXPECT_FALSE(IsCallToDidProcessTaskExpected());
180 EXPECT_EQ(outer_->worker_.get(), worker);
181
182 {
183 CheckedAutoLock auto_lock(outer_->lock_);
184
185 // Increment the number of times that this method has been called.
186 ++outer_->num_get_work_;
187 outer_->num_get_work_cv_->Signal();
188
189 // Verify that this method isn't called more times than expected.
190 EXPECT_LE(outer_->num_get_work_, outer_->max_get_work_);
191
192 // Check if a Sequence should be returned.
193 if (outer_->num_sequences_to_create_ == 0)
194 return nullptr;
195 --outer_->num_sequences_to_create_;
196 }
197
198 // Create a Sequence with TasksPerSequence() Tasks.
199 scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
200 TaskTraits(), nullptr, TaskSourceExecutionMode::kParallel);
201 Sequence::Transaction sequence_transaction(sequence->BeginTransaction());
202 for (int i = 0; i < outer_->TasksPerSequence(); ++i) {
203 Task task(FROM_HERE,
204 BindOnce(&ThreadPoolWorkerTest::RunTaskCallback,
205 Unretained(outer_)),
206 TimeTicks::Now(), TimeDelta());
207 sequence_transaction.WillPushImmediateTask();
208 EXPECT_TRUE(outer_->task_tracker_.WillPostTask(
209 &task, sequence->shutdown_behavior()));
210 sequence_transaction.PushImmediateTask(std::move(task));
211 }
212 auto registered_task_source =
213 outer_->task_tracker_.RegisterTaskSource(sequence);
214 EXPECT_TRUE(registered_task_source);
215
216 ExpectCallToDidProcessTask();
217
218 {
219 // Add the Sequence to the vector of created Sequences.
220 CheckedAutoLock auto_lock(outer_->lock_);
221 outer_->created_sequences_.push_back(sequence);
222 }
223 auto run_status = registered_task_source.WillRunTask();
224 EXPECT_NE(run_status, TaskSource::RunStatus::kDisallowed);
225 return registered_task_source;
226 }
227
SwapProcessedTask(RegisteredTaskSource registered_task_source,WorkerThread * worker)228 RegisteredTaskSource SwapProcessedTask(
229 RegisteredTaskSource registered_task_source,
230 WorkerThread* worker) override {
231 {
232 CheckedAutoLock auto_lock(expect_did_run_task_lock_);
233 EXPECT_TRUE(expect_did_run_task_);
234 expect_did_run_task_ = false;
235 }
236
237 // If TasksPerSequence() is 1, |registered_task_source| should be nullptr.
238 // Otherwise, |registered_task_source| should contain TasksPerSequence() -
239 // 1 Tasks.
240 if (outer_->TasksPerSequence() == 1) {
241 EXPECT_FALSE(registered_task_source);
242 } else {
243 EXPECT_TRUE(registered_task_source);
244 EXPECT_TRUE(registered_task_source.WillReEnqueue(TimeTicks::Now()));
245
246 // Verify the number of Tasks in |registered_task_source|.
247 for (int i = 0; i < outer_->TasksPerSequence() - 1; ++i) {
248 registered_task_source.WillRunTask();
249 IgnoreResult(registered_task_source.TakeTask());
250 if (i < outer_->TasksPerSequence() - 2) {
251 EXPECT_TRUE(registered_task_source.DidProcessTask());
252 EXPECT_TRUE(registered_task_source.WillReEnqueue(TimeTicks::Now()));
253 } else {
254 EXPECT_FALSE(registered_task_source.DidProcessTask());
255 }
256 }
257 scoped_refptr<TaskSource> task_source =
258 registered_task_source.Unregister();
259 {
260 // Add |task_source| to |did_run_task_sources_|.
261 CheckedAutoLock auto_lock(outer_->lock_);
262 outer_->did_run_task_sources_.push_back(std::move(task_source));
263 EXPECT_LE(outer_->did_run_task_sources_.size(),
264 outer_->created_sequences_.size());
265 }
266 }
267 return GetWork(worker);
268 }
269
270 private:
271 // Expect a call to DidProcessTask() before the next call to any other
272 // method of this delegate.
ExpectCallToDidProcessTask()273 void ExpectCallToDidProcessTask() {
274 CheckedAutoLock auto_lock(expect_did_run_task_lock_);
275 expect_did_run_task_ = true;
276 }
277
IsCallToDidProcessTaskExpected() const278 bool IsCallToDidProcessTaskExpected() const {
279 CheckedAutoLock auto_lock(expect_did_run_task_lock_);
280 return expect_did_run_task_;
281 }
282
283 raw_ptr<ThreadPoolWorkerTest> outer_;
284
285 // Synchronizes access to |expect_did_run_task_|.
286 mutable CheckedLock expect_did_run_task_lock_;
287
288 // Whether the next method called on this delegate should be
289 // DidProcessTask().
290 bool expect_did_run_task_ = false;
291 };
292
RunTaskCallback()293 void RunTaskCallback() {
294 CheckedAutoLock auto_lock(lock_);
295 ++num_run_tasks_;
296 EXPECT_LE(num_run_tasks_, created_sequences_.size());
297 }
298
299 TaskTracker task_tracker_;
300
301 // Synchronizes access to all members below.
302 mutable CheckedLock lock_;
303
304 // Signaled once OnMainEntry() has been called.
305 TestWaitableEvent main_entry_called_;
306
307 // Number of Sequences that should be created by GetWork(). When this
308 // is 0, GetWork() returns nullptr.
309 size_t num_sequences_to_create_ = 0;
310
311 // Number of times that GetWork() has been called.
312 size_t num_get_work_ = 0;
313
314 // Maximum number of times that GetWork() can be called.
315 size_t max_get_work_ = 0;
316
317 // Condition variable signaled when |num_get_work_| is incremented.
318 std::unique_ptr<ConditionVariable> num_get_work_cv_;
319
320 // Sequences created by GetWork().
321 std::vector<scoped_refptr<TaskSource>> created_sequences_;
322
323 // Sequences passed to DidProcessTask().
324 std::vector<scoped_refptr<TaskSource>> did_run_task_sources_;
325
326 // Number of times that RunTaskCallback() has been called.
327 size_t num_run_tasks_ = 0;
328
329 // Signaled after |worker_| is set.
330 TestWaitableEvent worker_set_;
331 };
332
333 } // namespace
334
335 // Verify that when GetWork() continuously returns Sequences, all Tasks in these
336 // Sequences run successfully. The test wakes up the WorkerThread once.
TEST_P(ThreadPoolWorkerTest,ContinuousWork)337 TEST_P(ThreadPoolWorkerTest, ContinuousWork) {
338 // Set GetWork() to return |kNumSequencesPerTest| Sequences before starting to
339 // return nullptr.
340 SetNumSequencesToCreate(kNumSequencesPerTest);
341
342 // Expect |kNumSequencesPerTest| calls to GetWork() in which it returns a
343 // Sequence and one call in which its returns nullptr.
344 const size_t kExpectedNumGetWork = kNumSequencesPerTest + 1;
345 SetMaxGetWork(kExpectedNumGetWork);
346
347 // Wake up |worker_| and wait until GetWork() has been invoked the
348 // expected amount of times.
349 worker_->WakeUp();
350 WaitForNumGetWork(kExpectedNumGetWork);
351
352 // All tasks should have run.
353 EXPECT_EQ(kNumSequencesPerTest, NumRunTasks());
354
355 // If Sequences returned by GetWork() contain more than one Task, they aren't
356 // empty after the worker pops Tasks from them and thus should be returned to
357 // DidProcessTask().
358 if (TasksPerSequence() > 1)
359 EXPECT_EQ(CreatedTaskSources(), DidProcessTaskSequences());
360 else
361 EXPECT_TRUE(DidProcessTaskSequences().empty());
362 }
363
364 // Verify that when GetWork() alternates between returning a Sequence and
365 // returning nullptr, all Tasks in the returned Sequences run successfully. The
366 // test wakes up the WorkerThread once for each Sequence.
TEST_P(ThreadPoolWorkerTest,IntermittentWork)367 TEST_P(ThreadPoolWorkerTest, IntermittentWork) {
368 for (size_t i = 0; i < kNumSequencesPerTest; ++i) {
369 // Set GetWork() to return 1 Sequence before starting to return
370 // nullptr.
371 SetNumSequencesToCreate(1);
372
373 // Expect |i + 1| calls to GetWork() in which it returns a Sequence and
374 // |i + 1| calls in which it returns nullptr.
375 const size_t expected_num_get_work = 2 * (i + 1);
376 SetMaxGetWork(expected_num_get_work);
377
378 // Wake up |worker_| and wait until GetWork() has been invoked
379 // the expected amount of times.
380 worker_->WakeUp();
381 WaitForNumGetWork(expected_num_get_work);
382
383 // The Task should have run
384 EXPECT_EQ(i + 1, NumRunTasks());
385
386 // If Sequences returned by GetWork() contain more than one Task, they
387 // aren't empty after the worker pops Tasks from them and thus should be
388 // returned to DidProcessTask().
389 if (TasksPerSequence() > 1)
390 EXPECT_EQ(CreatedTaskSources(), DidProcessTaskSequences());
391 else
392 EXPECT_TRUE(DidProcessTaskSequences().empty());
393 }
394 }
395
396 INSTANTIATE_TEST_SUITE_P(OneTaskPerSequence,
397 ThreadPoolWorkerTest,
398 ::testing::Values(1));
399 INSTANTIATE_TEST_SUITE_P(TwoTasksPerSequence,
400 ThreadPoolWorkerTest,
401 ::testing::Values(2));
402
403 namespace {
404
405 class ControllableCleanupDelegate : public WorkerThreadDefaultDelegate {
406 public:
407 class Controls : public RefCountedThreadSafe<Controls> {
408 public:
409 Controls() = default;
410 Controls(const Controls&) = delete;
411 Controls& operator=(const Controls&) = delete;
412
HaveWorkBlock()413 void HaveWorkBlock() { work_running_.Reset(); }
414
UnblockWork()415 void UnblockWork() { work_running_.Signal(); }
416
WaitForWorkToRun()417 void WaitForWorkToRun() { work_processed_.Wait(); }
418
WaitForCleanupRequest()419 void WaitForCleanupRequest() { cleanup_requested_.Wait(); }
420
WaitForDelegateDestroy()421 void WaitForDelegateDestroy() { destroyed_.Wait(); }
422
WaitForMainExit()423 void WaitForMainExit() { exited_.Wait(); }
424
set_expect_get_work(bool expect_get_work)425 void set_expect_get_work(bool expect_get_work) {
426 expect_get_work_ = expect_get_work;
427 }
428
ResetState()429 void ResetState() {
430 work_running_.Signal();
431 work_processed_.Reset();
432 cleanup_requested_.Reset();
433 exited_.Reset();
434 work_requested_ = false;
435 }
436
set_can_cleanup(bool can_cleanup)437 void set_can_cleanup(bool can_cleanup) { can_cleanup_ = can_cleanup; }
438
439 private:
440 friend class ControllableCleanupDelegate;
441 friend class RefCountedThreadSafe<Controls>;
442 ~Controls() = default;
443
444 TestWaitableEvent work_running_{WaitableEvent::ResetPolicy::MANUAL,
445 WaitableEvent::InitialState::SIGNALED};
446 TestWaitableEvent work_processed_;
447 TestWaitableEvent cleanup_requested_;
448 TestWaitableEvent destroyed_;
449 TestWaitableEvent exited_;
450
451 bool expect_get_work_ = true;
452 bool can_cleanup_ = false;
453 bool work_requested_ = false;
454 };
455
ControllableCleanupDelegate(TaskTracker * task_tracker)456 explicit ControllableCleanupDelegate(TaskTracker* task_tracker)
457 : task_tracker_(task_tracker), controls_(new Controls()) {}
458
459 ControllableCleanupDelegate(const ControllableCleanupDelegate&) = delete;
460 ControllableCleanupDelegate& operator=(const ControllableCleanupDelegate&) =
461 delete;
~ControllableCleanupDelegate()462 ~ControllableCleanupDelegate() override { controls_->destroyed_.Signal(); }
463
GetWork(WorkerThread * worker)464 RegisteredTaskSource GetWork(WorkerThread* worker) override {
465 EXPECT_TRUE(controls_->expect_get_work_);
466
467 // Sends one item of work to signal |work_processed_|. On subsequent calls,
468 // sends nullptr to indicate there's no more work to be done.
469 if (controls_->work_requested_) {
470 if (CanCleanup(worker)) {
471 OnCleanup();
472 worker->Cleanup();
473 controls_->set_expect_get_work(false);
474 }
475 return nullptr;
476 }
477
478 controls_->work_requested_ = true;
479 scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
480 TaskTraits(WithBaseSyncPrimitives(),
481 TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN),
482 nullptr, TaskSourceExecutionMode::kParallel);
483 Task task(FROM_HERE,
484 BindOnce(
485 [](TestWaitableEvent* work_processed,
486 TestWaitableEvent* work_running) {
487 work_processed->Signal();
488 work_running->Wait();
489 },
490 Unretained(&controls_->work_processed_),
491 Unretained(&controls_->work_running_)),
492 TimeTicks::Now(), TimeDelta());
493 auto transaction = sequence->BeginTransaction();
494 transaction.WillPushImmediateTask();
495 EXPECT_TRUE(
496 task_tracker_->WillPostTask(&task, sequence->shutdown_behavior()));
497 transaction.PushImmediateTask(std::move(task));
498 auto registered_task_source =
499 task_tracker_->RegisterTaskSource(std::move(sequence));
500 EXPECT_TRUE(registered_task_source);
501 registered_task_source.WillRunTask();
502 return registered_task_source;
503 }
504
SwapProcessedTask(RegisteredTaskSource task_source,WorkerThread * worker)505 RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source, WorkerThread* worker) override {
506 return GetWork(worker);
507 }
508
OnMainExit(WorkerThread * worker)509 void OnMainExit(WorkerThread* worker) override {
510 controls_->exited_.Signal();
511 }
512
CanCleanup(WorkerThread * worker)513 bool CanCleanup(WorkerThread* worker) {
514 // Saving |can_cleanup_| now so that callers waiting on |cleanup_requested_|
515 // have the thread go to sleep and then allow timing out.
516 bool can_cleanup = controls_->can_cleanup_;
517 controls_->cleanup_requested_.Signal();
518 return can_cleanup;
519 }
520
OnCleanup()521 void OnCleanup() {
522 EXPECT_TRUE(controls_->can_cleanup_);
523 EXPECT_TRUE(controls_->cleanup_requested_.IsSignaled());
524 }
525
526 // ControllableCleanupDelegate:
controls()527 scoped_refptr<Controls> controls() { return controls_; }
528
529 private:
530 scoped_refptr<Sequence> work_sequence_;
531 const raw_ptr<TaskTracker> task_tracker_;
532 scoped_refptr<Controls> controls_;
533 };
534
535 class MockedControllableCleanupDelegate : public ControllableCleanupDelegate {
536 public:
MockedControllableCleanupDelegate(TaskTracker * task_tracker)537 explicit MockedControllableCleanupDelegate(TaskTracker* task_tracker)
538 : ControllableCleanupDelegate(task_tracker) {}
539 MockedControllableCleanupDelegate(const MockedControllableCleanupDelegate&) =
540 delete;
541 MockedControllableCleanupDelegate& operator=(
542 const MockedControllableCleanupDelegate&) = delete;
543 ~MockedControllableCleanupDelegate() override = default;
544
545 // WorkerThread::Delegate:
546 MOCK_METHOD1(OnMainEntry, void(WorkerThread* worker));
547 };
548
549 } // namespace
550
551 // Verify that calling WorkerThread::Cleanup() from GetWork() causes
552 // the WorkerThread's thread to exit.
TEST(ThreadPoolWorkerTest,WorkerCleanupFromGetWork)553 TEST(ThreadPoolWorkerTest, WorkerCleanupFromGetWork) {
554 Thread service_thread = Thread("ServiceThread");
555 Thread::Options service_thread_options;
556 service_thread_options.message_pump_type = MessagePumpType::IO;
557 service_thread.StartWithOptions(std::move(service_thread_options));
558 TaskTracker task_tracker;
559 // Will be owned by WorkerThread.
560 MockedControllableCleanupDelegate* delegate =
561 new StrictMock<MockedControllableCleanupDelegate>(&task_tracker);
562 scoped_refptr<ControllableCleanupDelegate::Controls> controls =
563 delegate->controls();
564 controls->set_can_cleanup(true);
565 EXPECT_CALL(*delegate, OnMainEntry(_));
566 auto worker =
567 MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, WrapUnique(delegate),
568 task_tracker.GetTrackedRef(), 0);
569 worker->Start(service_thread.task_runner());
570 worker->WakeUp();
571 controls->WaitForWorkToRun();
572 Mock::VerifyAndClear(delegate);
573 controls->WaitForMainExit();
574 // Join the worker to avoid leaks.
575 worker->JoinForTesting();
576 }
577
TEST(ThreadPoolWorkerTest,WorkerCleanupDuringWork)578 TEST(ThreadPoolWorkerTest, WorkerCleanupDuringWork) {
579 Thread service_thread = Thread("ServiceThread");
580 Thread::Options service_thread_options;
581 service_thread_options.message_pump_type = MessagePumpType::IO;
582 service_thread.StartWithOptions(std::move(service_thread_options));
583 TaskTracker task_tracker;
584 // Will be owned by WorkerThread.
585 // No mock here as that's reasonably covered by other tests and the delegate
586 // may destroy on a different thread. Mocks aren't designed with that in mind.
587 std::unique_ptr<ControllableCleanupDelegate> delegate =
588 std::make_unique<ControllableCleanupDelegate>(&task_tracker);
589 scoped_refptr<ControllableCleanupDelegate::Controls> controls =
590 delegate->controls();
591
592 controls->HaveWorkBlock();
593
594 auto worker =
595 MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate),
596 task_tracker.GetTrackedRef(), 0);
597 worker->Start(service_thread.task_runner());
598 worker->WakeUp();
599
600 controls->WaitForWorkToRun();
601 worker->Cleanup();
602 worker = nullptr;
603 controls->UnblockWork();
604 controls->WaitForDelegateDestroy();
605 }
606
TEST(ThreadPoolWorkerTest,WorkerCleanupDuringWait)607 TEST(ThreadPoolWorkerTest, WorkerCleanupDuringWait) {
608 Thread service_thread = Thread("ServiceThread");
609 Thread::Options service_thread_options;
610 service_thread_options.message_pump_type = MessagePumpType::IO;
611 service_thread.StartWithOptions(std::move(service_thread_options));
612 TaskTracker task_tracker;
613 // Will be owned by WorkerThread.
614 // No mock here as that's reasonably covered by other tests and the delegate
615 // may destroy on a different thread. Mocks aren't designed with that in mind.
616 std::unique_ptr<ControllableCleanupDelegate> delegate =
617 std::make_unique<ControllableCleanupDelegate>(&task_tracker);
618 scoped_refptr<ControllableCleanupDelegate::Controls> controls =
619 delegate->controls();
620
621 auto worker =
622 MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate),
623 task_tracker.GetTrackedRef(), 0);
624 worker->Start(service_thread.task_runner());
625 worker->WakeUp();
626
627 controls->WaitForCleanupRequest();
628 worker->Cleanup();
629 worker = nullptr;
630 controls->WaitForDelegateDestroy();
631 }
632
TEST(ThreadPoolWorkerTest,WorkerCleanupDuringShutdown)633 TEST(ThreadPoolWorkerTest, WorkerCleanupDuringShutdown) {
634 TaskTracker task_tracker;
635 Thread service_thread = Thread("ServiceThread");
636 Thread::Options service_thread_options;
637 service_thread_options.message_pump_type = MessagePumpType::IO;
638 service_thread.StartWithOptions(std::move(service_thread_options));
639 // Will be owned by WorkerThread.
640 // No mock here as that's reasonably covered by other tests and the delegate
641 // may destroy on a different thread. Mocks aren't designed with that in mind.
642 std::unique_ptr<ControllableCleanupDelegate> delegate =
643 std::make_unique<ControllableCleanupDelegate>(&task_tracker);
644 scoped_refptr<ControllableCleanupDelegate::Controls> controls =
645 delegate->controls();
646
647 controls->HaveWorkBlock();
648
649 auto worker =
650 MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate),
651 task_tracker.GetTrackedRef(), 0);
652 worker->Start(service_thread.task_runner());
653 worker->WakeUp();
654
655 controls->WaitForWorkToRun();
656 test::ShutdownTaskTracker(&task_tracker);
657 worker->Cleanup();
658 worker = nullptr;
659 controls->UnblockWork();
660 controls->WaitForDelegateDestroy();
661 }
662
663 // Verify that Start() is a no-op after Cleanup().
TEST(ThreadPoolWorkerTest,CleanupBeforeStart)664 TEST(ThreadPoolWorkerTest, CleanupBeforeStart) {
665 Thread service_thread = Thread("ServiceThread");
666 Thread::Options service_thread_options;
667 service_thread_options.message_pump_type = MessagePumpType::IO;
668 service_thread.StartWithOptions(std::move(service_thread_options));
669 TaskTracker task_tracker;
670 // Will be owned by WorkerThread.
671 // No mock here as that's reasonably covered by other tests and the delegate
672 // may destroy on a different thread. Mocks aren't designed with that in mind.
673 std::unique_ptr<ControllableCleanupDelegate> delegate =
674 std::make_unique<ControllableCleanupDelegate>(&task_tracker);
675 scoped_refptr<ControllableCleanupDelegate::Controls> controls =
676 delegate->controls();
677 controls->set_expect_get_work(false);
678
679 auto worker =
680 MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate),
681 task_tracker.GetTrackedRef(), 0);
682
683 worker->Cleanup();
684 worker->Start(service_thread.task_runner());
685
686 EXPECT_FALSE(worker->ThreadAliveForTesting());
687 }
688
689 namespace {
690
691 class CallJoinFromDifferentThread : public SimpleThread {
692 public:
CallJoinFromDifferentThread(WorkerThreadWaitableEvent * worker_to_join)693 explicit CallJoinFromDifferentThread(WorkerThreadWaitableEvent* worker_to_join)
694 : SimpleThread("WorkerThreadJoinThread"),
695 worker_to_join_(worker_to_join) {}
696
697 CallJoinFromDifferentThread(const CallJoinFromDifferentThread&) = delete;
698 CallJoinFromDifferentThread& operator=(const CallJoinFromDifferentThread&) =
699 delete;
700 ~CallJoinFromDifferentThread() override = default;
701
Run()702 void Run() override {
703 run_started_event_.Signal();
704 worker_to_join_.ExtractAsDangling()->JoinForTesting();
705 }
706
WaitForRunToStart()707 void WaitForRunToStart() { run_started_event_.Wait(); }
708
709 private:
710 raw_ptr<WorkerThreadWaitableEvent> worker_to_join_;
711 TestWaitableEvent run_started_event_;
712 };
713
714 } // namespace
715
TEST(ThreadPoolWorkerTest,WorkerCleanupDuringJoin)716 TEST(ThreadPoolWorkerTest, WorkerCleanupDuringJoin) {
717 Thread service_thread = Thread("ServiceThread");
718 Thread::Options service_thread_options;
719 service_thread_options.message_pump_type = MessagePumpType::IO;
720 service_thread.StartWithOptions(std::move(service_thread_options));
721 TaskTracker task_tracker;
722 // Will be owned by WorkerThread.
723 // No mock here as that's reasonably covered by other tests and the
724 // delegate may destroy on a different thread. Mocks aren't designed with that
725 // in mind.
726 std::unique_ptr<ControllableCleanupDelegate> delegate =
727 std::make_unique<ControllableCleanupDelegate>(&task_tracker);
728 scoped_refptr<ControllableCleanupDelegate::Controls> controls =
729 delegate->controls();
730
731 controls->HaveWorkBlock();
732
733 auto worker =
734 MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate),
735 task_tracker.GetTrackedRef(), 0);
736 worker->Start(service_thread.task_runner());
737 worker->WakeUp();
738
739 controls->WaitForWorkToRun();
740 CallJoinFromDifferentThread join_from_different_thread(worker.get());
741 join_from_different_thread.Start();
742 join_from_different_thread.WaitForRunToStart();
743 // Sleep here to give the other thread a chance to call JoinForTesting().
744 // Receiving a signal that Run() was called doesn't mean JoinForTesting() was
745 // necessarily called, and we can't signal after JoinForTesting() as
746 // JoinForTesting() blocks until we call UnblockWork().
747 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
748 worker->Cleanup();
749 worker = nullptr;
750 controls->UnblockWork();
751 controls->WaitForDelegateDestroy();
752 join_from_different_thread.Join();
753 }
754
755 namespace {
756
757 class ExpectThreadTypeDelegate : public WorkerThreadDefaultDelegate {
758 public:
ExpectThreadTypeDelegate()759 ExpectThreadTypeDelegate()
760 : thread_type_verified_in_get_work_event_(
761 WaitableEvent::ResetPolicy::AUTOMATIC) {}
762 ExpectThreadTypeDelegate(const ExpectThreadTypeDelegate&) = delete;
763 ExpectThreadTypeDelegate& operator=(const ExpectThreadTypeDelegate&) = delete;
764
SetExpectedThreadType(ThreadType expected_thread_type)765 void SetExpectedThreadType(ThreadType expected_thread_type) {
766 expected_thread_type_ = expected_thread_type;
767 }
768
WaitForThreadTypeVerifiedInGetWork()769 void WaitForThreadTypeVerifiedInGetWork() {
770 thread_type_verified_in_get_work_event_.Wait();
771 }
772
773 // WorkerThread::Delegate:
OnMainEntry(WorkerThread * worker)774 void OnMainEntry(WorkerThread* worker) override { VerifyThreadType(); }
GetWork(WorkerThread * worker)775 RegisteredTaskSource GetWork(WorkerThread* worker) override {
776 VerifyThreadType();
777 thread_type_verified_in_get_work_event_.Signal();
778 return nullptr;
779 }
780
781 private:
VerifyThreadType()782 void VerifyThreadType() {
783 CheckedAutoLock auto_lock(expected_thread_type_lock_);
784 EXPECT_EQ(expected_thread_type_, PlatformThread::GetCurrentThreadType());
785 }
786
787 // Signaled after GetWork() has verified the thread type of the worker thread.
788 TestWaitableEvent thread_type_verified_in_get_work_event_;
789
790 // Synchronizes access to |expected_thread_type_|.
791 CheckedLock expected_thread_type_lock_;
792
793 // Expected thread type for the next call to OnMainEntry() or GetWork().
794 ThreadType expected_thread_type_ = ThreadType::kDefault;
795 };
796
797 } // namespace
798
TEST(ThreadPoolWorkerTest,BumpThreadTypeOfAliveThreadDuringShutdown)799 TEST(ThreadPoolWorkerTest, BumpThreadTypeOfAliveThreadDuringShutdown) {
800 if (!CanUseBackgroundThreadTypeForWorkerThread())
801 return;
802
803 TaskTracker task_tracker;
804 Thread service_thread = Thread("ServiceThread");
805 Thread::Options service_thread_options;
806 service_thread_options.message_pump_type = MessagePumpType::IO;
807 service_thread.StartWithOptions(std::move(service_thread_options));
808
809 // Block shutdown to ensure that the worker doesn't exit when StartShutdown()
810 // is called.
811 scoped_refptr<Sequence> sequence =
812 MakeRefCounted<Sequence>(TaskTraits{TaskShutdownBehavior::BLOCK_SHUTDOWN},
813 nullptr, TaskSourceExecutionMode::kParallel);
814 auto registered_task_source =
815 task_tracker.RegisterTaskSource(std::move(sequence));
816
817 std::unique_ptr<ExpectThreadTypeDelegate> delegate(
818 new ExpectThreadTypeDelegate);
819 ExpectThreadTypeDelegate* delegate_raw = delegate.get();
820 delegate_raw->SetExpectedThreadType(ThreadType::kBackground);
821 auto worker =
822 MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kBackground, std::move(delegate),
823 task_tracker.GetTrackedRef(), 0);
824 worker->Start(service_thread.task_runner());
825
826 // Verify that the initial thread type is kBackground (or kNormal if thread
827 // type can't be increased).
828 worker->WakeUp();
829 delegate_raw->WaitForThreadTypeVerifiedInGetWork();
830
831 // Verify that the thread type is bumped to kNormal during shutdown.
832 delegate_raw->SetExpectedThreadType(ThreadType::kDefault);
833 task_tracker.StartShutdown();
834 worker->WakeUp();
835 delegate_raw->WaitForThreadTypeVerifiedInGetWork();
836
837 worker->JoinForTesting();
838 }
839
840 namespace {
841
842 class VerifyCallsToObserverDelegate : public WorkerThreadDefaultDelegate {
843 public:
VerifyCallsToObserverDelegate(test::MockWorkerThreadObserver * observer)844 explicit VerifyCallsToObserverDelegate(
845 test::MockWorkerThreadObserver* observer)
846 : observer_(observer) {}
847 VerifyCallsToObserverDelegate(const VerifyCallsToObserverDelegate&) = delete;
848 VerifyCallsToObserverDelegate& operator=(
849 const VerifyCallsToObserverDelegate&) = delete;
850
851 // WorkerThread::Delegate:
OnMainEntry(WorkerThread * worker)852 void OnMainEntry(WorkerThread* worker) override {
853 Mock::VerifyAndClear(observer_);
854 }
855
OnMainExit(WorkerThread * worker)856 void OnMainExit(WorkerThread* worker) override {
857 observer_->AllowCallsOnMainExit(1);
858 }
859
860 private:
861 const raw_ptr<test::MockWorkerThreadObserver> observer_;
862 };
863
864 } // namespace
865
866 // Verify that the WorkerThreadObserver is notified when the worker enters
867 // and exits its main function.
TEST(ThreadPoolWorkerTest,WorkerThreadObserver)868 TEST(ThreadPoolWorkerTest, WorkerThreadObserver) {
869 StrictMock<test::MockWorkerThreadObserver> observer;
870 TaskTracker task_tracker;
871 Thread service_thread = Thread("ServiceThread");
872 Thread::Options service_thread_options;
873 service_thread_options.message_pump_type = MessagePumpType::IO;
874 service_thread.StartWithOptions(std::move(service_thread_options));
875 auto delegate = std::make_unique<VerifyCallsToObserverDelegate>(&observer);
876 auto worker =
877 MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate),
878 task_tracker.GetTrackedRef(), 0);
879 EXPECT_CALL(observer, OnWorkerThreadMainEntry());
880 worker->Start(service_thread.task_runner(), &observer);
881 worker->Cleanup();
882 // Join the worker to avoid leaks.
883 worker->JoinForTesting();
884 Mock::VerifyAndClear(&observer);
885 }
886
887 #if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
888 PA_CONFIG(THREAD_CACHE_SUPPORTED)
889 namespace {
FreeForTest(void * data)890 NOINLINE void FreeForTest(void* data) {
891 free(data);
892 }
893 } // namespace
894
895 class WorkerThreadThreadCacheDelegate : public WorkerThreadDefaultDelegate {
896 public:
WaitForWork()897 void WaitForWork() override {
898 // Fill several buckets before going to sleep.
899 for (size_t size = 8;
900 size < partition_alloc::ThreadCache::kDefaultSizeThreshold; size++) {
901 void* data = malloc(size);
902 // A simple malloc() / free() pair can be discarded by the compiler (and
903 // is), making the test fail. It is sufficient to make |FreeForTest()| a
904 // NOINLINE function for the call to not be eliminated, but it is
905 // required.
906 FreeForTest(data);
907 }
908
909 size_t cached_memory_before =
910 partition_alloc::ThreadCache::Get()->CachedMemory();
911 WorkerThreadDefaultDelegate::WaitForWork();
912 size_t cached_memory_after =
913 partition_alloc::ThreadCache::Get()->CachedMemory();
914
915 if (!first_wakeup_done_) {
916 // First time we sleep is a short sleep, no cache purging.
917 //
918 // Here and below, cannot assert on exact thread cache size, since
919 // anything that allocates will make it fluctuate.
920 EXPECT_GT(cached_memory_after, cached_memory_before / 2);
921 first_wakeup_done_.store(true, std::memory_order_release);
922 } else {
923 // Second one is long, should purge.
924 EXPECT_LT(cached_memory_after, cached_memory_before / 2);
925 }
926 }
927
928 std::atomic<bool> first_wakeup_done_{false};
929 };
930
931 // TODO(crbug.com/1469364): Re-enable this test on Mac.
932 #if BUILDFLAG(IS_MAC)
933 #define MAYBE_Purge DISABLED_Purge
934 #else
935 #define MAYBE_Purge Purge
936 #endif
TEST(ThreadPoolWorkerThreadCachePurgeTest,MAYBE_Purge)937 TEST(ThreadPoolWorkerThreadCachePurgeTest, MAYBE_Purge) {
938 // Make sure the thread cache is enabled in the main partition.
939 partition_alloc::internal::ThreadCacheProcessScopeForTesting scope(
940 allocator_shim::internal::PartitionAllocMalloc::Allocator());
941
942 Thread service_thread = Thread("ServiceThread");
943 Thread::Options service_thread_options;
944 service_thread_options.message_pump_type = MessagePumpType::IO;
945 service_thread.StartWithOptions(std::move(service_thread_options));
946 TaskTracker task_tracker;
947 auto delegate = std::make_unique<WorkerThreadThreadCacheDelegate>();
948 auto* delegate_raw = delegate.get();
949 auto worker =
950 MakeRefCounted<WorkerThreadWaitableEvent>(ThreadType::kDefault, std::move(delegate),
951 task_tracker.GetTrackedRef(), 0);
952 // Wake up before the thread is started to make sure the first sleep is short.
953 worker->WakeUp();
954 worker->Start(service_thread.task_runner(), nullptr);
955
956 while (delegate_raw->first_wakeup_done_.load(std::memory_order_acquire)) {
957 }
958
959 // Have to use real sleep unfortunately rather than virtual time, because
960 // WaitableEvent uses the non-overridable variant of TimeTicks.
961 PlatformThread::Sleep(1.1 *
962 WorkerThread::Delegate::kPurgeThreadCacheIdleDelay);
963 worker->JoinForTesting();
964 }
965
966 #endif // BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
967 // PA_CONFIG(THREAD_CACHE_SUPPORTED)
968
969 } // namespace internal
970 } // namespace base
971