1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_group_impl.h"
6
7 #include <stddef.h>
8
9 #include <algorithm>
10 #include <atomic>
11 #include <memory>
12 #include <unordered_set>
13 #include <utility>
14 #include <vector>
15
16 #include "base/atomicops.h"
17 #include "base/barrier_closure.h"
18 #include "base/functional/bind.h"
19 #include "base/functional/callback.h"
20 #include "base/functional/callback_helpers.h"
21 #include "base/memory/ptr_util.h"
22 #include "base/memory/raw_ptr.h"
23 #include "base/memory/ref_counted.h"
24 #include "base/metrics/statistics_recorder.h"
25 #include "base/synchronization/atomic_flag.h"
26 #include "base/synchronization/condition_variable.h"
27 #include "base/synchronization/lock.h"
28 #include "base/task/task_features.h"
29 #include "base/task/task_runner.h"
30 #include "base/task/thread_pool/delayed_task_manager.h"
31 #include "base/task/thread_pool/environment_config.h"
32 #include "base/task/thread_pool/pooled_task_runner_delegate.h"
33 #include "base/task/thread_pool/sequence.h"
34 #include "base/task/thread_pool/task_source_sort_key.h"
35 #include "base/task/thread_pool/task_tracker.h"
36 #include "base/task/thread_pool/test_task_factory.h"
37 #include "base/task/thread_pool/test_utils.h"
38 #include "base/task/thread_pool/worker_thread_observer.h"
39 #include "base/test/bind.h"
40 #include "base/test/gtest_util.h"
41 #include "base/test/scoped_feature_list.h"
42 #include "base/test/test_simple_task_runner.h"
43 #include "base/test/test_timeouts.h"
44 #include "base/test/test_waitable_event.h"
45 #include "base/threading/platform_thread.h"
46 #include "base/threading/scoped_blocking_call.h"
47 #include "base/threading/simple_thread.h"
48 #include "base/threading/thread.h"
49 #include "base/threading/thread_checker_impl.h"
50 #include "base/time/time.h"
51 #include "base/timer/timer.h"
52 #include "build/build_config.h"
53 #include "testing/gtest/include/gtest/gtest.h"
54 #include "third_party/abseil-cpp/absl/types/optional.h"
55
56 namespace base {
57 namespace internal {
58 namespace {
59
60 constexpr size_t kMaxTasks = 4;
61 constexpr size_t kNumThreadsPostingTasks = 4;
62 constexpr size_t kNumTasksPostedPerThread = 150;
63 // This can't be lower because Windows' TestWaitableEvent wakes up too early
64 // when a small timeout is used. This results in many spurious wake ups before a
65 // worker is allowed to cleanup.
66 constexpr TimeDelta kReclaimTimeForCleanupTests = Milliseconds(500);
67 constexpr size_t kLargeNumber = 512;
68
69 class ThreadGroupImplImplTestBase : public ThreadGroup::Delegate {
70 public:
71 ThreadGroupImplImplTestBase(const ThreadGroupImplImplTestBase&) = delete;
72 ThreadGroupImplImplTestBase& operator=(const ThreadGroupImplImplTestBase&) =
73 delete;
74
75 protected:
ThreadGroupImplImplTestBase()76 ThreadGroupImplImplTestBase()
77 : service_thread_("ThreadPoolServiceThread"),
78 tracked_ref_factory_(this) {}
79
CommonTearDown()80 void CommonTearDown() {
81 delayed_task_manager_.Shutdown();
82 service_thread_.Stop();
83 task_tracker_.FlushForTesting();
84 if (thread_group_)
85 thread_group_->JoinForTesting();
86 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
87 thread_group_.reset();
88 }
89
CreateThreadGroup(ThreadType thread_type=ThreadType::kDefault)90 void CreateThreadGroup(ThreadType thread_type = ThreadType::kDefault) {
91 ASSERT_FALSE(thread_group_);
92 service_thread_.Start();
93 delayed_task_manager_.Start(service_thread_.task_runner());
94 thread_group_ = std::make_unique<ThreadGroupImpl>(
95 "TestThreadGroup", "A", thread_type, task_tracker_.GetTrackedRef(),
96 tracked_ref_factory_.GetTrackedRef());
97 ASSERT_TRUE(thread_group_);
98
99 mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
100 }
101
StartThreadGroup(TimeDelta suggested_reclaim_time,size_t max_tasks,absl::optional<int> max_best_effort_tasks=absl::nullopt,WorkerThreadObserver * worker_observer=nullptr,absl::optional<TimeDelta> may_block_threshold=absl::nullopt)102 void StartThreadGroup(
103 TimeDelta suggested_reclaim_time,
104 size_t max_tasks,
105 absl::optional<int> max_best_effort_tasks = absl::nullopt,
106 WorkerThreadObserver* worker_observer = nullptr,
107 absl::optional<TimeDelta> may_block_threshold = absl::nullopt) {
108 ASSERT_TRUE(thread_group_);
109 thread_group_->Start(
110 max_tasks,
111 max_best_effort_tasks ? max_best_effort_tasks.value() : max_tasks,
112 suggested_reclaim_time, service_thread_.task_runner(), worker_observer,
113 ThreadGroup::WorkerEnvironment::NONE,
114 /* synchronous_thread_start_for_testing=*/false, may_block_threshold);
115 }
116
CreateAndStartThreadGroup(TimeDelta suggested_reclaim_time=TimeDelta::Max (),size_t max_tasks=kMaxTasks,absl::optional<int> max_best_effort_tasks=absl::nullopt,WorkerThreadObserver * worker_observer=nullptr,absl::optional<TimeDelta> may_block_threshold=absl::nullopt)117 void CreateAndStartThreadGroup(
118 TimeDelta suggested_reclaim_time = TimeDelta::Max(),
119 size_t max_tasks = kMaxTasks,
120 absl::optional<int> max_best_effort_tasks = absl::nullopt,
121 WorkerThreadObserver* worker_observer = nullptr,
122 absl::optional<TimeDelta> may_block_threshold = absl::nullopt) {
123 CreateThreadGroup();
124 StartThreadGroup(suggested_reclaim_time, max_tasks, max_best_effort_tasks,
125 worker_observer, may_block_threshold);
126 }
127
128 Thread service_thread_;
129 TaskTracker task_tracker_;
130 std::unique_ptr<ThreadGroupImpl> thread_group_;
131 DelayedTaskManager delayed_task_manager_;
132 TrackedRefFactory<ThreadGroup::Delegate> tracked_ref_factory_;
133 test::MockPooledTaskRunnerDelegate mock_pooled_task_runner_delegate_ = {
134 task_tracker_.GetTrackedRef(), &delayed_task_manager_};
135
136 private:
137 // ThreadGroup::Delegate:
GetThreadGroupForTraits(const TaskTraits & traits)138 ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override {
139 return thread_group_.get();
140 }
141 };
142
143 class ThreadGroupImplImplTest : public ThreadGroupImplImplTestBase,
144 public testing::Test {
145 public:
146 ThreadGroupImplImplTest(const ThreadGroupImplImplTest&) = delete;
147 ThreadGroupImplImplTest& operator=(const ThreadGroupImplImplTest&) = delete;
148
149 protected:
150 ThreadGroupImplImplTest() = default;
151
SetUp()152 void SetUp() override { CreateAndStartThreadGroup(); }
153
TearDown()154 void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
155 };
156
157 class ThreadGroupImplImplTestParam
158 : public ThreadGroupImplImplTestBase,
159 public testing::TestWithParam<TaskSourceExecutionMode> {
160 public:
161 ThreadGroupImplImplTestParam(const ThreadGroupImplImplTestParam&) = delete;
162 ThreadGroupImplImplTestParam& operator=(const ThreadGroupImplImplTestParam&) =
163 delete;
164
165 protected:
166 ThreadGroupImplImplTestParam() = default;
167
SetUp()168 void SetUp() override { CreateAndStartThreadGroup(); }
169
TearDown()170 void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
171 };
172
173 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
174
175 class ThreadPostingTasksWaitIdle : public SimpleThread {
176 public:
177 // Constructs a thread that posts tasks to |thread_group| through an
178 // |execution_mode| task runner. The thread waits until all workers in
179 // |thread_group| are idle before posting a new task.
ThreadPostingTasksWaitIdle(ThreadGroupImpl * thread_group,test::MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate_,TaskSourceExecutionMode execution_mode)180 ThreadPostingTasksWaitIdle(
181 ThreadGroupImpl* thread_group,
182 test::MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate_,
183 TaskSourceExecutionMode execution_mode)
184 : SimpleThread("ThreadPostingTasksWaitIdle"),
185 thread_group_(thread_group),
186 factory_(CreatePooledTaskRunnerWithExecutionMode(
187 execution_mode,
188 mock_pooled_task_runner_delegate_),
189 execution_mode) {
190 DCHECK(thread_group_);
191 }
192 ThreadPostingTasksWaitIdle(const ThreadPostingTasksWaitIdle&) = delete;
193 ThreadPostingTasksWaitIdle& operator=(const ThreadPostingTasksWaitIdle&) =
194 delete;
195
factory() const196 const test::TestTaskFactory* factory() const { return &factory_; }
197
198 private:
Run()199 void Run() override {
200 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
201 thread_group_->WaitForAllWorkersIdleForTesting();
202 EXPECT_TRUE(factory_.PostTask(PostNestedTask::NO, OnceClosure()));
203 }
204 }
205
206 const raw_ptr<ThreadGroupImpl> thread_group_;
207 const scoped_refptr<TaskRunner> task_runner_;
208 test::TestTaskFactory factory_;
209 };
210
211 } // namespace
212
TEST_P(ThreadGroupImplImplTestParam,PostTasksWaitAllWorkersIdle)213 TEST_P(ThreadGroupImplImplTestParam, PostTasksWaitAllWorkersIdle) {
214 // Create threads to post tasks. To verify that workers can sleep and be woken
215 // up when new tasks are posted, wait for all workers to become idle before
216 // posting a new task.
217 std::vector<std::unique_ptr<ThreadPostingTasksWaitIdle>>
218 threads_posting_tasks;
219 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
220 threads_posting_tasks.push_back(
221 std::make_unique<ThreadPostingTasksWaitIdle>(
222 thread_group_.get(), &mock_pooled_task_runner_delegate_,
223 GetParam()));
224 threads_posting_tasks.back()->Start();
225 }
226
227 // Wait for all tasks to run.
228 for (const auto& thread_posting_tasks : threads_posting_tasks) {
229 thread_posting_tasks->Join();
230 thread_posting_tasks->factory()->WaitForAllTasksToRun();
231 }
232
233 // Wait until all workers are idle to be sure that no task accesses its
234 // TestTaskFactory after |thread_posting_tasks| is destroyed.
235 thread_group_->WaitForAllWorkersIdleForTesting();
236 }
237
TEST_P(ThreadGroupImplImplTestParam,PostTasksWithOneAvailableWorker)238 TEST_P(ThreadGroupImplImplTestParam, PostTasksWithOneAvailableWorker) {
239 // Post blocking tasks to keep all workers busy except one until |event| is
240 // signaled. Use different factories so that tasks are added to different
241 // sequences and can run simultaneously when the execution mode is SEQUENCED.
242 TestWaitableEvent event;
243 std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories;
244 for (size_t i = 0; i < (kMaxTasks - 1); ++i) {
245 blocked_task_factories.push_back(std::make_unique<test::TestTaskFactory>(
246 CreatePooledTaskRunnerWithExecutionMode(
247 GetParam(), &mock_pooled_task_runner_delegate_),
248 GetParam()));
249 EXPECT_TRUE(blocked_task_factories.back()->PostTask(
250 PostNestedTask::NO,
251 BindOnce(&TestWaitableEvent::Wait, Unretained(&event))));
252 blocked_task_factories.back()->WaitForAllTasksToRun();
253 }
254
255 // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact
256 // that only one worker in |thread_group_| isn't busy.
257 test::TestTaskFactory short_task_factory(
258 CreatePooledTaskRunnerWithExecutionMode(
259 GetParam(), &mock_pooled_task_runner_delegate_),
260 GetParam());
261 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
262 EXPECT_TRUE(short_task_factory.PostTask(PostNestedTask::NO, OnceClosure()));
263 short_task_factory.WaitForAllTasksToRun();
264
265 // Release tasks waiting on |event|.
266 event.Signal();
267
268 // Wait until all workers are idle to be sure that no task accesses
269 // its TestTaskFactory after it is destroyed.
270 thread_group_->WaitForAllWorkersIdleForTesting();
271 }
272
TEST_P(ThreadGroupImplImplTestParam,Saturate)273 TEST_P(ThreadGroupImplImplTestParam, Saturate) {
274 // Verify that it is possible to have |kMaxTasks| tasks/sequences running
275 // simultaneously. Use different factories so that the blocking tasks are
276 // added to different sequences and can run simultaneously when the execution
277 // mode is SEQUENCED.
278 TestWaitableEvent event;
279 std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
280 for (size_t i = 0; i < kMaxTasks; ++i) {
281 factories.push_back(std::make_unique<test::TestTaskFactory>(
282 CreatePooledTaskRunnerWithExecutionMode(
283 GetParam(), &mock_pooled_task_runner_delegate_),
284 GetParam()));
285 EXPECT_TRUE(factories.back()->PostTask(
286 PostNestedTask::NO,
287 BindOnce(&TestWaitableEvent::Wait, Unretained(&event))));
288 factories.back()->WaitForAllTasksToRun();
289 }
290
291 // Release tasks waiting on |event|.
292 event.Signal();
293
294 // Wait until all workers are idle to be sure that no task accesses
295 // its TestTaskFactory after it is destroyed.
296 thread_group_->WaitForAllWorkersIdleForTesting();
297 }
298
299 // Verifies that ShouldYield() returns true for priorities lower than the
300 // highest priority pending while the thread group is flooded with USER_VISIBLE
301 // tasks.
TEST_F(ThreadGroupImplImplTest,ShouldYieldFloodedUserVisible)302 TEST_F(ThreadGroupImplImplTest, ShouldYieldFloodedUserVisible) {
303 TestWaitableEvent threads_running;
304 TestWaitableEvent threads_continue;
305
306 // Saturate workers with USER_VISIBLE tasks to ensure ShouldYield() returns
307 // true when a tasks of higher priority is posted.
308 RepeatingClosure threads_running_barrier = BarrierClosure(
309 kMaxTasks,
310 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
311
312 auto job_task = base::MakeRefCounted<test::MockJobTask>(
313 BindLambdaForTesting(
314 [&threads_running_barrier, &threads_continue](JobDelegate* delegate) {
315 threads_running_barrier.Run();
316 threads_continue.Wait();
317 }),
318 /* num_tasks_to_run */ kMaxTasks);
319 scoped_refptr<JobTaskSource> task_source =
320 job_task->GetJobTaskSource(FROM_HERE, {TaskPriority::USER_VISIBLE},
321 &mock_pooled_task_runner_delegate_);
322 task_source->NotifyConcurrencyIncrease();
323
324 threads_running.Wait();
325
326 // Posting a BEST_EFFORT task should not cause any other tasks to yield.
327 // Once this task gets to run, no other task needs to yield.
328 // Note: This is only true because this test is using a single ThreadGroup.
329 // Under the ThreadPool this wouldn't be racy because BEST_EFFORT tasks
330 // run in an independent ThreadGroup.
331 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT},
332 &mock_pooled_task_runner_delegate_)
333 ->PostTask(
334 FROM_HERE, BindLambdaForTesting([&]() {
335 EXPECT_FALSE(thread_group_->ShouldYield(
336 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/1}));
337 }));
338 // A BEST_EFFORT task with more workers shouldn't have to yield.
339 EXPECT_FALSE(thread_group_->ShouldYield(
340 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/2}));
341 EXPECT_FALSE(thread_group_->ShouldYield(
342 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
343 EXPECT_FALSE(thread_group_->ShouldYield(
344 {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/0}));
345 EXPECT_FALSE(thread_group_->ShouldYield(
346 {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/0}));
347
348 // Posting a USER_VISIBLE task should cause BEST_EFFORT and USER_VISIBLE with
349 // higher worker_count tasks to yield.
350 auto post_user_visible = [&]() {
351 test::CreatePooledTaskRunner({TaskPriority::USER_VISIBLE},
352 &mock_pooled_task_runner_delegate_)
353 ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
354 EXPECT_FALSE(thread_group_->ShouldYield(
355 {TaskPriority::USER_VISIBLE, TimeTicks(),
356 /* worker_count=*/1}));
357 }));
358 };
359 // A USER_VISIBLE task with too many workers should yield.
360 post_user_visible();
361 EXPECT_TRUE(thread_group_->ShouldYield(
362 {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/2}));
363 post_user_visible();
364 EXPECT_TRUE(thread_group_->ShouldYield(
365 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
366 post_user_visible();
367 EXPECT_FALSE(thread_group_->ShouldYield(
368 {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/1}));
369 EXPECT_FALSE(thread_group_->ShouldYield(
370 {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/0}));
371
372 // Posting a USER_BLOCKING task should cause BEST_EFFORT, USER_VISIBLE and
373 // USER_BLOCKING with higher worker_count tasks to yield.
374 auto post_user_blocking = [&]() {
375 test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
376 &mock_pooled_task_runner_delegate_)
377 ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
378 // Once this task got to start, no other task needs to
379 // yield.
380 EXPECT_FALSE(thread_group_->ShouldYield(
381 {TaskPriority::USER_BLOCKING, TimeTicks(),
382 /* worker_count=*/1}));
383 }));
384 };
385 // A USER_BLOCKING task with too many workers should have to yield.
386 post_user_blocking();
387 EXPECT_TRUE(thread_group_->ShouldYield(
388 {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/2}));
389 post_user_blocking();
390 EXPECT_TRUE(thread_group_->ShouldYield(
391 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
392 post_user_blocking();
393 EXPECT_TRUE(thread_group_->ShouldYield(
394 {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/0}));
395 post_user_blocking();
396 EXPECT_FALSE(thread_group_->ShouldYield(
397 {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/1}));
398
399 threads_continue.Signal();
400 task_tracker_.FlushForTesting();
401 }
402
403 INSTANTIATE_TEST_SUITE_P(Parallel,
404 ThreadGroupImplImplTestParam,
405 ::testing::Values(TaskSourceExecutionMode::kParallel));
406 INSTANTIATE_TEST_SUITE_P(
407 Sequenced,
408 ThreadGroupImplImplTestParam,
409 ::testing::Values(TaskSourceExecutionMode::kSequenced));
410
411 INSTANTIATE_TEST_SUITE_P(Job,
412 ThreadGroupImplImplTestParam,
413 ::testing::Values(TaskSourceExecutionMode::kJob));
414
415 namespace {
416
417 class ThreadGroupImplImplStartInBodyTest : public ThreadGroupImplImplTest {
418 public:
SetUp()419 void SetUp() override {
420 CreateThreadGroup();
421 // Let the test start the thread group.
422 }
423 };
424
TaskPostedBeforeStart(PlatformThreadRef * platform_thread_ref,TestWaitableEvent * task_running,TestWaitableEvent * barrier)425 void TaskPostedBeforeStart(PlatformThreadRef* platform_thread_ref,
426 TestWaitableEvent* task_running,
427 TestWaitableEvent* barrier) {
428 *platform_thread_ref = PlatformThread::CurrentRef();
429 task_running->Signal();
430 barrier->Wait();
431 }
432
433 } // namespace
434
435 // Verify that 2 tasks posted before Start() to a ThreadGroupImpl with
436 // more than 2 workers run on different workers when Start() is called.
TEST_F(ThreadGroupImplImplStartInBodyTest,PostTasksBeforeStart)437 TEST_F(ThreadGroupImplImplStartInBodyTest, PostTasksBeforeStart) {
438 PlatformThreadRef task_1_thread_ref;
439 PlatformThreadRef task_2_thread_ref;
440 TestWaitableEvent task_1_running;
441 TestWaitableEvent task_2_running;
442
443 // This event is used to prevent a task from completing before the other task
444 // starts running. If that happened, both tasks could run on the same worker
445 // and this test couldn't verify that the correct number of workers were woken
446 // up.
447 TestWaitableEvent barrier;
448
449 test::CreatePooledTaskRunner({WithBaseSyncPrimitives()},
450 &mock_pooled_task_runner_delegate_)
451 ->PostTask(
452 FROM_HERE,
453 BindOnce(&TaskPostedBeforeStart, Unretained(&task_1_thread_ref),
454 Unretained(&task_1_running), Unretained(&barrier)));
455 test::CreatePooledTaskRunner({WithBaseSyncPrimitives()},
456 &mock_pooled_task_runner_delegate_)
457 ->PostTask(
458 FROM_HERE,
459 BindOnce(&TaskPostedBeforeStart, Unretained(&task_2_thread_ref),
460 Unretained(&task_2_running), Unretained(&barrier)));
461
462 // Workers should not be created and tasks should not run before the thread
463 // group is started.
464 EXPECT_EQ(0U, thread_group_->NumberOfWorkersForTesting());
465 EXPECT_FALSE(task_1_running.IsSignaled());
466 EXPECT_FALSE(task_2_running.IsSignaled());
467
468 StartThreadGroup(TimeDelta::Max(), kMaxTasks);
469
470 // Tasks should run shortly after the thread group is started.
471 task_1_running.Wait();
472 task_2_running.Wait();
473
474 // Tasks should run on different threads.
475 EXPECT_NE(task_1_thread_ref, task_2_thread_ref);
476
477 barrier.Signal();
478 task_tracker_.FlushForTesting();
479 }
480
481 // Verify that posting many tasks before Start will cause the number of workers
482 // to grow to |max_tasks_| after Start.
TEST_F(ThreadGroupImplImplStartInBodyTest,PostManyTasks)483 TEST_F(ThreadGroupImplImplStartInBodyTest, PostManyTasks) {
484 scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
485 {WithBaseSyncPrimitives()}, &mock_pooled_task_runner_delegate_);
486 constexpr size_t kNumTasksPosted = 2 * kMaxTasks;
487
488 TestWaitableEvent threads_running;
489 TestWaitableEvent threads_continue;
490
491 RepeatingClosure threads_running_barrier = BarrierClosure(
492 kMaxTasks,
493 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
494 // Posting these tasks should cause new workers to be created.
495 for (size_t i = 0; i < kMaxTasks; ++i) {
496 task_runner->PostTask(
497 FROM_HERE, BindLambdaForTesting([&]() {
498 threads_running_barrier.Run();
499 threads_continue.Wait();
500 }));
501 }
502 // Post the remaining |kNumTasksPosted - kMaxTasks| tasks, don't wait for them
503 // as they'll be blocked behind the above kMaxtasks.
504 for (size_t i = kMaxTasks; i < kNumTasksPosted; ++i)
505 task_runner->PostTask(FROM_HERE, DoNothing());
506
507 EXPECT_EQ(0U, thread_group_->NumberOfWorkersForTesting());
508
509 StartThreadGroup(TimeDelta::Max(), kMaxTasks);
510 EXPECT_GT(thread_group_->NumberOfWorkersForTesting(), 0U);
511 EXPECT_EQ(kMaxTasks, thread_group_->GetMaxTasksForTesting());
512
513 threads_running.Wait();
514 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(),
515 thread_group_->GetMaxTasksForTesting());
516 threads_continue.Signal();
517 task_tracker_.FlushForTesting();
518 }
519
520 namespace {
521
522 class BackgroundThreadGroupImplTest : public ThreadGroupImplImplTest {
523 public:
CreateAndStartThreadGroup(TimeDelta suggested_reclaim_time=TimeDelta::Max (),size_t max_tasks=kMaxTasks,absl::optional<int> max_best_effort_tasks=absl::nullopt,WorkerThreadObserver * worker_observer=nullptr,absl::optional<TimeDelta> may_block_threshold=absl::nullopt)524 void CreateAndStartThreadGroup(
525 TimeDelta suggested_reclaim_time = TimeDelta::Max(),
526 size_t max_tasks = kMaxTasks,
527 absl::optional<int> max_best_effort_tasks = absl::nullopt,
528 WorkerThreadObserver* worker_observer = nullptr,
529 absl::optional<TimeDelta> may_block_threshold = absl::nullopt) {
530 if (!CanUseBackgroundThreadTypeForWorkerThread())
531 return;
532 CreateThreadGroup(ThreadType::kBackground);
533 StartThreadGroup(suggested_reclaim_time, max_tasks, max_best_effort_tasks,
534 worker_observer, may_block_threshold);
535 }
536
SetUp()537 void SetUp() override { CreateAndStartThreadGroup(); }
538 };
539
540 } // namespace
541
542 // Verify that ScopedBlockingCall updates thread type when necessary per
543 // shutdown state.
TEST_F(BackgroundThreadGroupImplTest,UpdatePriorityBlockingStarted)544 TEST_F(BackgroundThreadGroupImplTest, UpdatePriorityBlockingStarted) {
545 if (!CanUseBackgroundThreadTypeForWorkerThread())
546 return;
547
548 const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
549 {MayBlock(), WithBaseSyncPrimitives(), TaskPriority::BEST_EFFORT},
550 &mock_pooled_task_runner_delegate_);
551
552 TestWaitableEvent threads_running;
553 RepeatingClosure threads_running_barrier = BarrierClosure(
554 kMaxTasks,
555 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
556
557 TestWaitableEvent blocking_threads_continue;
558
559 for (size_t i = 0; i < kMaxTasks; ++i) {
560 task_runner->PostTask(
561 FROM_HERE, BindLambdaForTesting([&]() {
562 EXPECT_EQ(ThreadType::kBackground,
563 PlatformThread::GetCurrentThreadType());
564 {
565 // ScopedBlockingCall before shutdown doesn't affect priority.
566 ScopedBlockingCall scoped_blocking_call(FROM_HERE,
567 BlockingType::MAY_BLOCK);
568 EXPECT_EQ(ThreadType::kBackground,
569 PlatformThread::GetCurrentThreadType());
570 }
571 threads_running_barrier.Run();
572 blocking_threads_continue.Wait();
573 // This is reached after StartShutdown(), at which point we expect
574 // ScopedBlockingCall to update thread priority.
575 ScopedBlockingCall scoped_blocking_call(FROM_HERE,
576 BlockingType::MAY_BLOCK);
577 EXPECT_EQ(ThreadType::kDefault,
578 PlatformThread::GetCurrentThreadType());
579 }));
580 }
581 threads_running.Wait();
582
583 task_tracker_.StartShutdown();
584 blocking_threads_continue.Signal();
585 task_tracker_.FlushForTesting();
586 }
587
588 namespace {
589
590 class ThreadGroupImplStandbyPolicyTest : public ThreadGroupImplImplTestBase,
591 public testing::Test {
592 public:
593 ThreadGroupImplStandbyPolicyTest() = default;
594 ThreadGroupImplStandbyPolicyTest(const ThreadGroupImplStandbyPolicyTest&) =
595 delete;
596 ThreadGroupImplStandbyPolicyTest& operator=(
597 const ThreadGroupImplStandbyPolicyTest&) = delete;
598
SetUp()599 void SetUp() override {
600 CreateAndStartThreadGroup(kReclaimTimeForCleanupTests);
601 }
602
TearDown()603 void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
604 };
605
606 } // namespace
607
TEST_F(ThreadGroupImplStandbyPolicyTest,InitOne)608 TEST_F(ThreadGroupImplStandbyPolicyTest, InitOne) {
609 EXPECT_EQ(1U, thread_group_->NumberOfWorkersForTesting());
610 }
611
612 namespace {
613
614 enum class OptionalBlockingType {
615 NO_BLOCK,
616 MAY_BLOCK,
617 WILL_BLOCK,
618 };
619
620 struct NestedBlockingType {
NestedBlockingTypebase::internal::__anona2e50db30d11::NestedBlockingType621 NestedBlockingType(BlockingType first_in,
622 OptionalBlockingType second_in,
623 BlockingType behaves_as_in)
624 : first(first_in), second(second_in), behaves_as(behaves_as_in) {}
625
626 BlockingType first;
627 OptionalBlockingType second;
628 BlockingType behaves_as;
629 };
630
631 class NestedScopedBlockingCall {
632 public:
NestedScopedBlockingCall(const NestedBlockingType & nested_blocking_type)633 explicit NestedScopedBlockingCall(
634 const NestedBlockingType& nested_blocking_type)
635 : first_scoped_blocking_call_(FROM_HERE, nested_blocking_type.first),
636 second_scoped_blocking_call_(
637 nested_blocking_type.second == OptionalBlockingType::WILL_BLOCK
638 ? std::make_unique<ScopedBlockingCall>(FROM_HERE,
639 BlockingType::WILL_BLOCK)
640 : (nested_blocking_type.second ==
641 OptionalBlockingType::MAY_BLOCK
642 ? std::make_unique<ScopedBlockingCall>(
643 FROM_HERE,
644 BlockingType::MAY_BLOCK)
645 : nullptr)) {}
646 NestedScopedBlockingCall(const NestedScopedBlockingCall&) = delete;
647 NestedScopedBlockingCall& operator=(const NestedScopedBlockingCall&) = delete;
648
649 private:
650 ScopedBlockingCall first_scoped_blocking_call_;
651 std::unique_ptr<ScopedBlockingCall> second_scoped_blocking_call_;
652 };
653
654 } // namespace
655
656 class ThreadGroupImplBlockingTest
657 : public ThreadGroupImplImplTestBase,
658 public testing::TestWithParam<NestedBlockingType> {
659 public:
660 ThreadGroupImplBlockingTest() = default;
661 ThreadGroupImplBlockingTest(const ThreadGroupImplBlockingTest&) = delete;
662 ThreadGroupImplBlockingTest& operator=(const ThreadGroupImplBlockingTest&) =
663 delete;
664
ParamInfoToString(::testing::TestParamInfo<NestedBlockingType> param_info)665 static std::string ParamInfoToString(
666 ::testing::TestParamInfo<NestedBlockingType> param_info) {
667 std::string str = param_info.param.first == BlockingType::MAY_BLOCK
668 ? "MAY_BLOCK"
669 : "WILL_BLOCK";
670 if (param_info.param.second == OptionalBlockingType::MAY_BLOCK)
671 str += "_MAY_BLOCK";
672 else if (param_info.param.second == OptionalBlockingType::WILL_BLOCK)
673 str += "_WILL_BLOCK";
674 return str;
675 }
676
TearDown()677 void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
678
679 protected:
680 // Saturates the thread group with a task that first blocks, waits to be
681 // unblocked, then exits.
SaturateWithBlockingTasks(const NestedBlockingType & nested_blocking_type,TaskPriority priority=TaskPriority::USER_BLOCKING)682 void SaturateWithBlockingTasks(
683 const NestedBlockingType& nested_blocking_type,
684 TaskPriority priority = TaskPriority::USER_BLOCKING) {
685 TestWaitableEvent threads_running;
686
687 const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
688 {MayBlock(), WithBaseSyncPrimitives(), priority},
689 &mock_pooled_task_runner_delegate_);
690
691 RepeatingClosure threads_running_barrier = BarrierClosure(
692 kMaxTasks,
693 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
694
695 for (size_t i = 0; i < kMaxTasks; ++i) {
696 task_runner->PostTask(
697 FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier,
698 nested_blocking_type]() {
699 NestedScopedBlockingCall nested_scoped_blocking_call(
700 nested_blocking_type);
701 threads_running_barrier.Run();
702 blocking_threads_continue_.Wait();
703 }));
704 }
705 threads_running.Wait();
706 }
707
708 // Saturates the thread group with a task that waits for other tasks without
709 // entering a ScopedBlockingCall, then exits.
SaturateWithBusyTasks(TaskPriority priority=TaskPriority::USER_BLOCKING,TaskShutdownBehavior shutdown_behavior=TaskShutdownBehavior::SKIP_ON_SHUTDOWN)710 void SaturateWithBusyTasks(
711 TaskPriority priority = TaskPriority::USER_BLOCKING,
712 TaskShutdownBehavior shutdown_behavior =
713 TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
714 TestWaitableEvent threads_running;
715
716 const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
717 {MayBlock(), WithBaseSyncPrimitives(), priority, shutdown_behavior},
718 &mock_pooled_task_runner_delegate_);
719
720 RepeatingClosure threads_running_barrier = BarrierClosure(
721 kMaxTasks,
722 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
723 // Posting these tasks should cause new workers to be created.
724 for (size_t i = 0; i < kMaxTasks; ++i) {
725 task_runner->PostTask(
726 FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier]() {
727 threads_running_barrier.Run();
728 busy_threads_continue_.Wait();
729 }));
730 }
731 threads_running.Wait();
732 }
733
734 // Returns how long we can expect a change to |max_tasks_| to occur
735 // after a task has become blocked.
GetMaxTasksChangeSleepTime()736 TimeDelta GetMaxTasksChangeSleepTime() {
737 return std::max(thread_group_->blocked_workers_poll_period_for_testing(),
738 thread_group_->may_block_threshold_for_testing()) +
739 TestTimeouts::tiny_timeout();
740 }
741
742 // Waits indefinitely, until |thread_group_|'s max tasks increases to
743 // |expected_max_tasks|.
ExpectMaxTasksIncreasesTo(size_t expected_max_tasks)744 void ExpectMaxTasksIncreasesTo(size_t expected_max_tasks) {
745 size_t max_tasks = thread_group_->GetMaxTasksForTesting();
746 while (max_tasks != expected_max_tasks) {
747 PlatformThread::Sleep(GetMaxTasksChangeSleepTime());
748 size_t new_max_tasks = thread_group_->GetMaxTasksForTesting();
749 ASSERT_GE(new_max_tasks, max_tasks);
750 max_tasks = new_max_tasks;
751 }
752 }
753
754 // Unblocks tasks posted by SaturateWithBlockingTasks().
UnblockBlockingTasks()755 void UnblockBlockingTasks() { blocking_threads_continue_.Signal(); }
756
757 // Unblocks tasks posted by SaturateWithBusyTasks().
UnblockBusyTasks()758 void UnblockBusyTasks() { busy_threads_continue_.Signal(); }
759
760 const scoped_refptr<TaskRunner> task_runner_ =
761 test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
762 &mock_pooled_task_runner_delegate_);
763
764 private:
765 TestWaitableEvent blocking_threads_continue_;
766 TestWaitableEvent busy_threads_continue_;
767 };
768
769 // Verify that SaturateWithBlockingTasks() causes max tasks to increase and
770 // creates a worker if needed. Also verify that UnblockBlockingTasks() decreases
771 // max tasks after an increase.
TEST_P(ThreadGroupImplBlockingTest,ThreadBlockedUnblocked)772 TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblocked) {
773 CreateAndStartThreadGroup();
774
775 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
776
777 SaturateWithBlockingTasks(GetParam());
778
779 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
780 // should not block forever.
781 SaturateWithBusyTasks();
782
783 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
784
785 UnblockBusyTasks();
786 UnblockBlockingTasks();
787 task_tracker_.FlushForTesting();
788 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
789 }
790
791 // Verify that SaturateWithBlockingTasks() of BEST_EFFORT tasks causes max best
792 // effort tasks to increase and creates a worker if needed. Also verify that
793 // UnblockBlockingTasks() decreases max best effort tasks after an increase.
TEST_P(ThreadGroupImplBlockingTest,ThreadBlockedUnblockedBestEffort)794 TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblockedBestEffort) {
795 CreateAndStartThreadGroup();
796
797 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
798 ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
799
800 SaturateWithBlockingTasks(GetParam(), TaskPriority::BEST_EFFORT);
801
802 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
803 // should not block forever.
804 SaturateWithBusyTasks(TaskPriority::BEST_EFFORT);
805
806 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
807
808 UnblockBusyTasks();
809 UnblockBlockingTasks();
810 task_tracker_.FlushForTesting();
811 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
812 EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
813 }
814
815 // Verify that flooding the thread group with more BEST_EFFORT tasks than
816 // kMaxBestEffortTasks doesn't prevent USER_VISIBLE tasks from running.
TEST_P(ThreadGroupImplBlockingTest,TooManyBestEffortTasks)817 TEST_P(ThreadGroupImplBlockingTest, TooManyBestEffortTasks) {
818 constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
819
820 CreateAndStartThreadGroup(TimeDelta::Max(), kMaxTasks, kMaxBestEffortTasks);
821
822 TestWaitableEvent threads_continue;
823 {
824 TestWaitableEvent entered_blocking_scope;
825 RepeatingClosure entered_blocking_scope_barrier = BarrierClosure(
826 kMaxBestEffortTasks + 1, BindOnce(&TestWaitableEvent::Signal,
827 Unretained(&entered_blocking_scope)));
828 TestWaitableEvent exit_blocking_scope;
829
830 TestWaitableEvent threads_running;
831 RepeatingClosure threads_running_barrier = BarrierClosure(
832 kMaxBestEffortTasks + 1,
833 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
834
835 const auto best_effort_task_runner =
836 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
837 &mock_pooled_task_runner_delegate_);
838 for (size_t i = 0; i < kMaxBestEffortTasks + 1; ++i) {
839 best_effort_task_runner->PostTask(
840 FROM_HERE, BindLambdaForTesting([&]() {
841 {
842 NestedScopedBlockingCall scoped_blocking_call(GetParam());
843 entered_blocking_scope_barrier.Run();
844 exit_blocking_scope.Wait();
845 }
846 threads_running_barrier.Run();
847 threads_continue.Wait();
848 }));
849 }
850 entered_blocking_scope.Wait();
851 exit_blocking_scope.Signal();
852 threads_running.Wait();
853 }
854
855 // At this point, kMaxBestEffortTasks + 1 threads are running (plus
856 // potentially the idle thread), but max_task and max_best_effort_task are
857 // back to normal.
858 EXPECT_GE(thread_group_->NumberOfWorkersForTesting(),
859 kMaxBestEffortTasks + 1);
860 EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
861 kMaxBestEffortTasks + 2);
862 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
863
864 TestWaitableEvent threads_running;
865 task_runner_->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
866 threads_running.Signal();
867 threads_continue.Wait();
868 }));
869
870 // This should not block forever.
871 threads_running.Wait();
872
873 EXPECT_GE(thread_group_->NumberOfWorkersForTesting(),
874 kMaxBestEffortTasks + 2);
875 EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
876 kMaxBestEffortTasks + 3);
877 threads_continue.Signal();
878
879 task_tracker_.FlushForTesting();
880 }
881
882 // Verify that tasks posted in a saturated thread group before a
883 // ScopedBlockingCall will execute after ScopedBlockingCall is instantiated.
TEST_P(ThreadGroupImplBlockingTest,PostBeforeBlocking)884 TEST_P(ThreadGroupImplBlockingTest, PostBeforeBlocking) {
885 CreateAndStartThreadGroup();
886
887 TestWaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC);
888 TestWaitableEvent thread_can_block;
889 TestWaitableEvent threads_continue;
890
891 for (size_t i = 0; i < kMaxTasks; ++i) {
892 task_runner_->PostTask(
893 FROM_HERE,
894 BindOnce(
895 [](const NestedBlockingType& nested_blocking_type,
896 TestWaitableEvent* thread_running,
897 TestWaitableEvent* thread_can_block,
898 TestWaitableEvent* threads_continue) {
899 thread_running->Signal();
900 thread_can_block->Wait();
901
902 NestedScopedBlockingCall nested_scoped_blocking_call(
903 nested_blocking_type);
904 threads_continue->Wait();
905 },
906 GetParam(), Unretained(&thread_running),
907 Unretained(&thread_can_block), Unretained(&threads_continue)));
908 thread_running.Wait();
909 }
910
911 // All workers should be occupied and the thread group should be saturated.
912 // Workers have not entered ScopedBlockingCall yet.
913 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
914 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
915
916 TestWaitableEvent extra_threads_running;
917 TestWaitableEvent extra_threads_continue;
918 RepeatingClosure extra_threads_running_barrier = BarrierClosure(
919 kMaxTasks,
920 BindOnce(&TestWaitableEvent::Signal, Unretained(&extra_threads_running)));
921 for (size_t i = 0; i < kMaxTasks; ++i) {
922 task_runner_->PostTask(
923 FROM_HERE, BindOnce(
924 [](RepeatingClosure* extra_threads_running_barrier,
925 TestWaitableEvent* extra_threads_continue) {
926 extra_threads_running_barrier->Run();
927 extra_threads_continue->Wait();
928 },
929 Unretained(&extra_threads_running_barrier),
930 Unretained(&extra_threads_continue)));
931 }
932
933 // Allow tasks to enter ScopedBlockingCall. Workers should be created for the
934 // tasks we just posted.
935 thread_can_block.Signal();
936
937 // Should not block forever.
938 extra_threads_running.Wait();
939 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
940 extra_threads_continue.Signal();
941
942 threads_continue.Signal();
943 task_tracker_.FlushForTesting();
944 }
945
946 // Verify that workers become idle when the thread group is over-capacity and
947 // that those workers do no work.
TEST_P(ThreadGroupImplBlockingTest,WorkersIdleWhenOverCapacity)948 TEST_P(ThreadGroupImplBlockingTest, WorkersIdleWhenOverCapacity) {
949 CreateAndStartThreadGroup();
950
951 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
952
953 SaturateWithBlockingTasks(GetParam());
954
955 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks.
956 SaturateWithBusyTasks();
957
958 ASSERT_EQ(thread_group_->NumberOfIdleWorkersForTesting(), 0U);
959 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
960
961 AtomicFlag is_exiting;
962 // These tasks should not get executed until after other tasks become
963 // unblocked.
964 for (size_t i = 0; i < kMaxTasks; ++i) {
965 task_runner_->PostTask(FROM_HERE, BindOnce(
966 [](AtomicFlag* is_exiting) {
967 EXPECT_TRUE(is_exiting->IsSet());
968 },
969 Unretained(&is_exiting)));
970 }
971
972 // The original |kMaxTasks| will finish their tasks after being unblocked.
973 // There will be work in the work queue, but the thread group should now be
974 // over-capacity and workers will become idle.
975 UnblockBlockingTasks();
976 thread_group_->WaitForWorkersIdleForTesting(kMaxTasks);
977 EXPECT_EQ(thread_group_->NumberOfIdleWorkersForTesting(), kMaxTasks);
978
979 // Posting more tasks should not cause workers idle from the thread group
980 // being over capacity to begin doing work.
981 for (size_t i = 0; i < kMaxTasks; ++i) {
982 task_runner_->PostTask(FROM_HERE, BindOnce(
983 [](AtomicFlag* is_exiting) {
984 EXPECT_TRUE(is_exiting->IsSet());
985 },
986 Unretained(&is_exiting)));
987 }
988
989 // Give time for those idle workers to possibly do work (which should not
990 // happen).
991 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
992
993 is_exiting.Set();
994 // Unblocks the new workers.
995 UnblockBusyTasks();
996 task_tracker_.FlushForTesting();
997 }
998
999 // Verify that an increase of max tasks with SaturateWithBlockingTasks()
1000 // increases the number of tasks that can run before ShouldYield returns true.
TEST_P(ThreadGroupImplBlockingTest,ThreadBlockedUnblockedShouldYield)1001 TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblockedShouldYield) {
1002 CreateAndStartThreadGroup();
1003
1004 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1005
1006 EXPECT_FALSE(
1007 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1008 SaturateWithBlockingTasks(GetParam());
1009 EXPECT_FALSE(
1010 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1011
1012 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
1013 // should not block forever.
1014 SaturateWithBusyTasks();
1015
1016 // All tasks can run, hence ShouldYield returns false.
1017 EXPECT_FALSE(
1018 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1019
1020 // Post a USER_VISIBLE task that can't run since workers are saturated. This
1021 // should cause BEST_EFFORT tasks to yield.
1022 test::CreatePooledTaskRunner({TaskPriority::USER_VISIBLE},
1023 &mock_pooled_task_runner_delegate_)
1024 ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1025 EXPECT_FALSE(thread_group_->ShouldYield(
1026 {TaskPriority::BEST_EFFORT, TimeTicks()}));
1027 }));
1028 EXPECT_TRUE(
1029 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1030
1031 // Post a USER_BLOCKING task that can't run since workers are saturated. This
1032 // should cause USER_VISIBLE tasks to yield.
1033 test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
1034 &mock_pooled_task_runner_delegate_)
1035 ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1036 EXPECT_FALSE(thread_group_->ShouldYield(
1037 {TaskPriority::USER_VISIBLE, TimeTicks()}));
1038 }));
1039 EXPECT_TRUE(
1040 thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
1041
1042 UnblockBusyTasks();
1043 UnblockBlockingTasks();
1044 task_tracker_.FlushForTesting();
1045 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1046 }
1047
1048 INSTANTIATE_TEST_SUITE_P(
1049 All,
1050 ThreadGroupImplBlockingTest,
1051 ::testing::Values(NestedBlockingType(BlockingType::MAY_BLOCK,
1052 OptionalBlockingType::NO_BLOCK,
1053 BlockingType::MAY_BLOCK),
1054 NestedBlockingType(BlockingType::WILL_BLOCK,
1055 OptionalBlockingType::NO_BLOCK,
1056 BlockingType::WILL_BLOCK),
1057 NestedBlockingType(BlockingType::MAY_BLOCK,
1058 OptionalBlockingType::WILL_BLOCK,
1059 BlockingType::WILL_BLOCK),
1060 NestedBlockingType(BlockingType::WILL_BLOCK,
1061 OptionalBlockingType::MAY_BLOCK,
1062 BlockingType::WILL_BLOCK)),
1063 ThreadGroupImplBlockingTest::ParamInfoToString);
1064
1065 // Verify that if a thread enters the scope of a MAY_BLOCK ScopedBlockingCall,
1066 // but exits the scope before the MayBlock threshold is reached, that the max
1067 // tasks does not increase.
TEST_F(ThreadGroupImplBlockingTest,ThreadBlockUnblockPremature)1068 TEST_F(ThreadGroupImplBlockingTest, ThreadBlockUnblockPremature) {
1069 // Create a thread group with an infinite MayBlock threshold so that a
1070 // MAY_BLOCK ScopedBlockingCall never increases the max tasks.
1071 CreateAndStartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1072 kMaxTasks, // |max_tasks|
1073 absl::nullopt, // |max_best_effort_tasks|
1074 nullptr, // |worker_observer|
1075 TimeDelta::Max()); // |may_block_threshold|
1076
1077 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1078
1079 SaturateWithBlockingTasks(NestedBlockingType(BlockingType::MAY_BLOCK,
1080 OptionalBlockingType::NO_BLOCK,
1081 BlockingType::MAY_BLOCK));
1082 PlatformThread::Sleep(
1083 2 * thread_group_->blocked_workers_poll_period_for_testing());
1084 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
1085 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1086
1087 UnblockBlockingTasks();
1088 task_tracker_.FlushForTesting();
1089 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1090 }
1091
1092 // Verify that if a BEST_EFFORT task enters the scope of a WILL_BLOCK
1093 // ScopedBlockingCall, but exits the scope before the MayBlock threshold is
1094 // reached, that the max best effort tasks does not increase.
TEST_F(ThreadGroupImplBlockingTest,ThreadBlockUnblockPrematureBestEffort)1095 TEST_F(ThreadGroupImplBlockingTest, ThreadBlockUnblockPrematureBestEffort) {
1096 // Create a thread group with an infinite MayBlock threshold so that a
1097 // MAY_BLOCK ScopedBlockingCall never increases the max tasks.
1098 CreateAndStartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1099 kMaxTasks, // |max_tasks|
1100 kMaxTasks, // |max_best_effort_tasks|
1101 nullptr, // |worker_observer|
1102 TimeDelta::Max()); // |may_block_threshold|
1103
1104 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1105 ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1106
1107 SaturateWithBlockingTasks(NestedBlockingType(BlockingType::WILL_BLOCK,
1108 OptionalBlockingType::NO_BLOCK,
1109 BlockingType::WILL_BLOCK),
1110 TaskPriority::BEST_EFFORT);
1111 PlatformThread::Sleep(
1112 2 * thread_group_->blocked_workers_poll_period_for_testing());
1113 EXPECT_GE(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
1114 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), 2 * kMaxTasks);
1115 EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1116
1117 UnblockBlockingTasks();
1118 task_tracker_.FlushForTesting();
1119 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1120 EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1121 }
1122
1123 // Verify that if max tasks is incremented because of a MAY_BLOCK
1124 // ScopedBlockingCall, it isn't incremented again when there is a nested
1125 // WILL_BLOCK ScopedBlockingCall.
TEST_F(ThreadGroupImplBlockingTest,MayBlockIncreaseCapacityNestedWillBlock)1126 TEST_F(ThreadGroupImplBlockingTest, MayBlockIncreaseCapacityNestedWillBlock) {
1127 CreateAndStartThreadGroup();
1128
1129 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1130 auto task_runner =
1131 test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
1132 &mock_pooled_task_runner_delegate_);
1133 TestWaitableEvent can_return;
1134
1135 // Saturate the thread group so that a MAY_BLOCK ScopedBlockingCall would
1136 // increment the max tasks.
1137 for (size_t i = 0; i < kMaxTasks - 1; ++i) {
1138 task_runner->PostTask(
1139 FROM_HERE, BindOnce(&TestWaitableEvent::Wait, Unretained(&can_return)));
1140 }
1141
1142 TestWaitableEvent can_instantiate_will_block;
1143 TestWaitableEvent did_instantiate_will_block;
1144
1145 // Post a task that instantiates a MAY_BLOCK ScopedBlockingCall.
1146 task_runner->PostTask(
1147 FROM_HERE,
1148 BindOnce(
1149 [](TestWaitableEvent* can_instantiate_will_block,
1150 TestWaitableEvent* did_instantiate_will_block,
1151 TestWaitableEvent* can_return) {
1152 ScopedBlockingCall may_block(FROM_HERE, BlockingType::MAY_BLOCK);
1153 can_instantiate_will_block->Wait();
1154 ScopedBlockingCall will_block(FROM_HERE, BlockingType::WILL_BLOCK);
1155 did_instantiate_will_block->Signal();
1156 can_return->Wait();
1157 },
1158 Unretained(&can_instantiate_will_block),
1159 Unretained(&did_instantiate_will_block), Unretained(&can_return)));
1160
1161 // After a short delay, max tasks should be incremented.
1162 ExpectMaxTasksIncreasesTo(kMaxTasks + 1);
1163
1164 // Wait until the task instantiates a WILL_BLOCK ScopedBlockingCall.
1165 can_instantiate_will_block.Signal();
1166 did_instantiate_will_block.Wait();
1167
1168 // Max tasks shouldn't be incremented again.
1169 EXPECT_EQ(kMaxTasks + 1, thread_group_->GetMaxTasksForTesting());
1170
1171 // Tear down.
1172 can_return.Signal();
1173 task_tracker_.FlushForTesting();
1174 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1175 }
1176
1177 // Verify that OnShutdownStarted() causes max tasks to increase and creates a
1178 // worker if needed. Also verify that UnblockBusyTasks() decreases max tasks
1179 // after an increase.
TEST_F(ThreadGroupImplBlockingTest,ThreadBusyShutdown)1180 TEST_F(ThreadGroupImplBlockingTest, ThreadBusyShutdown) {
1181 CreateAndStartThreadGroup();
1182 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1183
1184 SaturateWithBusyTasks(TaskPriority::BEST_EFFORT,
1185 TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN);
1186 thread_group_->OnShutdownStarted();
1187
1188 // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
1189 // should not block forever.
1190 SaturateWithBusyTasks(TaskPriority::BEST_EFFORT,
1191 TaskShutdownBehavior::BLOCK_SHUTDOWN);
1192
1193 EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
1194
1195 UnblockBusyTasks();
1196 task_tracker_.FlushForTesting();
1197 thread_group_->JoinForTesting();
1198 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1199 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
1200 thread_group_.reset();
1201 }
1202
1203 enum class ReclaimType { DELAYED_RECLAIM, NO_RECLAIM };
1204
1205 class ThreadGroupImplOverCapacityTest
1206 : public ThreadGroupImplImplTestBase,
1207 public testing::TestWithParam<ReclaimType> {
1208 public:
1209 ThreadGroupImplOverCapacityTest() = default;
1210 ThreadGroupImplOverCapacityTest(const ThreadGroupImplOverCapacityTest&) =
1211 delete;
1212 ThreadGroupImplOverCapacityTest& operator=(
1213 const ThreadGroupImplOverCapacityTest&) = delete;
1214
SetUp()1215 void SetUp() override {
1216 if (GetParam() == ReclaimType::NO_RECLAIM) {
1217 feature_list.InitAndEnableFeature(kNoWorkerThreadReclaim);
1218 }
1219 CreateThreadGroup();
1220 task_runner_ =
1221 test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
1222 &mock_pooled_task_runner_delegate_);
1223 }
1224
TearDown()1225 void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
1226
1227 protected:
1228 base::test::ScopedFeatureList feature_list;
1229 scoped_refptr<TaskRunner> task_runner_;
1230 static constexpr size_t kLocalMaxTasks = 3;
1231
CreateThreadGroup()1232 void CreateThreadGroup() {
1233 ASSERT_FALSE(thread_group_);
1234 service_thread_.Start();
1235 delayed_task_manager_.Start(service_thread_.task_runner());
1236 thread_group_ = std::make_unique<ThreadGroupImpl>(
1237 "OverCapacityTestThreadGroup", "A", ThreadType::kDefault,
1238 task_tracker_.GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
1239 ASSERT_TRUE(thread_group_);
1240
1241 mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
1242 }
1243 };
1244
1245 // Verify that workers that become idle due to the thread group being over
1246 // capacity will eventually cleanup.
TEST_P(ThreadGroupImplOverCapacityTest,VerifyCleanup)1247 TEST_P(ThreadGroupImplOverCapacityTest, VerifyCleanup) {
1248 StartThreadGroup(kReclaimTimeForCleanupTests, kLocalMaxTasks);
1249 TestWaitableEvent threads_running;
1250 TestWaitableEvent threads_continue;
1251 RepeatingClosure threads_running_barrier = BarrierClosure(
1252 kLocalMaxTasks,
1253 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
1254
1255 TestWaitableEvent blocked_call_continue;
1256 RepeatingClosure closure = BindRepeating(
1257 [](RepeatingClosure* threads_running_barrier,
1258 TestWaitableEvent* threads_continue,
1259 TestWaitableEvent* blocked_call_continue) {
1260 threads_running_barrier->Run();
1261 {
1262 ScopedBlockingCall scoped_blocking_call(FROM_HERE,
1263 BlockingType::WILL_BLOCK);
1264 blocked_call_continue->Wait();
1265 }
1266 threads_continue->Wait();
1267 },
1268 Unretained(&threads_running_barrier), Unretained(&threads_continue),
1269 Unretained(&blocked_call_continue));
1270
1271 for (size_t i = 0; i < kLocalMaxTasks; ++i)
1272 task_runner_->PostTask(FROM_HERE, closure);
1273
1274 threads_running.Wait();
1275
1276 TestWaitableEvent extra_threads_running;
1277 TestWaitableEvent extra_threads_continue;
1278
1279 RepeatingClosure extra_threads_running_barrier = BarrierClosure(
1280 kLocalMaxTasks,
1281 BindOnce(&TestWaitableEvent::Signal, Unretained(&extra_threads_running)));
1282 // These tasks should run on the new threads from increasing max tasks.
1283 for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1284 task_runner_->PostTask(
1285 FROM_HERE, BindOnce(
1286 [](RepeatingClosure* extra_threads_running_barrier,
1287 TestWaitableEvent* extra_threads_continue) {
1288 extra_threads_running_barrier->Run();
1289 extra_threads_continue->Wait();
1290 },
1291 Unretained(&extra_threads_running_barrier),
1292 Unretained(&extra_threads_continue)));
1293 }
1294 extra_threads_running.Wait();
1295
1296 ASSERT_EQ(kLocalMaxTasks * 2, thread_group_->NumberOfWorkersForTesting());
1297 EXPECT_EQ(kLocalMaxTasks * 2, thread_group_->GetMaxTasksForTesting());
1298 blocked_call_continue.Signal();
1299 extra_threads_continue.Signal();
1300
1301 // Periodically post tasks to ensure that posting tasks does not prevent
1302 // workers that are idle due to the thread group being over capacity from
1303 // cleaning up.
1304 for (int i = 0; i < 16; ++i) {
1305 task_runner_->PostDelayedTask(FROM_HERE, DoNothing(),
1306 kReclaimTimeForCleanupTests * i * 0.5);
1307 }
1308
1309 if (GetParam() == ReclaimType::DELAYED_RECLAIM) {
1310 // Note: one worker above capacity will not get cleaned up since it's on the
1311 // front of the idle set.
1312 thread_group_->WaitForWorkersCleanedUpForTesting(kLocalMaxTasks - 1);
1313 EXPECT_EQ(kLocalMaxTasks + 1, thread_group_->NumberOfWorkersForTesting());
1314 threads_continue.Signal();
1315 } else {
1316 // When workers are't automatically reclaimed after a delay, blocking tasks
1317 // need to return for extra workers to be cleaned up.
1318 threads_continue.Signal();
1319 thread_group_->WaitForWorkersCleanedUpForTesting(kLocalMaxTasks);
1320 EXPECT_EQ(kLocalMaxTasks, thread_group_->NumberOfWorkersForTesting());
1321 }
1322
1323 threads_continue.Signal();
1324 task_tracker_.FlushForTesting();
1325 }
1326
1327 INSTANTIATE_TEST_SUITE_P(ReclaimType,
1328 ThreadGroupImplOverCapacityTest,
1329 ::testing::Values(ReclaimType::DELAYED_RECLAIM,
1330 ReclaimType::NO_RECLAIM));
1331
1332 // Verify that the maximum number of workers is 256 and that hitting the max
1333 // leaves the thread group in a valid state with regards to max tasks.
TEST_F(ThreadGroupImplBlockingTest,MaximumWorkersTest)1334 TEST_F(ThreadGroupImplBlockingTest, MaximumWorkersTest) {
1335 CreateAndStartThreadGroup();
1336
1337 constexpr size_t kMaxNumberOfWorkers = 256;
1338 constexpr size_t kNumExtraTasks = 10;
1339
1340 TestWaitableEvent early_blocking_threads_running;
1341 RepeatingClosure early_threads_barrier_closure =
1342 BarrierClosure(kMaxNumberOfWorkers,
1343 BindOnce(&TestWaitableEvent::Signal,
1344 Unretained(&early_blocking_threads_running)));
1345
1346 TestWaitableEvent early_threads_finished;
1347 RepeatingClosure early_threads_finished_barrier = BarrierClosure(
1348 kMaxNumberOfWorkers, BindOnce(&TestWaitableEvent::Signal,
1349 Unretained(&early_threads_finished)));
1350
1351 TestWaitableEvent early_release_threads_continue;
1352
1353 // Post ScopedBlockingCall tasks to hit the worker cap.
1354 for (size_t i = 0; i < kMaxNumberOfWorkers; ++i) {
1355 task_runner_->PostTask(
1356 FROM_HERE, BindOnce(
1357 [](RepeatingClosure* early_threads_barrier_closure,
1358 TestWaitableEvent* early_release_threads_continue,
1359 RepeatingClosure* early_threads_finished) {
1360 {
1361 ScopedBlockingCall scoped_blocking_call(
1362 FROM_HERE, BlockingType::WILL_BLOCK);
1363 early_threads_barrier_closure->Run();
1364 early_release_threads_continue->Wait();
1365 }
1366 early_threads_finished->Run();
1367 },
1368 Unretained(&early_threads_barrier_closure),
1369 Unretained(&early_release_threads_continue),
1370 Unretained(&early_threads_finished_barrier)));
1371 }
1372
1373 early_blocking_threads_running.Wait();
1374 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(),
1375 kMaxTasks + kMaxNumberOfWorkers);
1376
1377 TestWaitableEvent late_release_thread_contine;
1378 TestWaitableEvent late_blocking_threads_running;
1379
1380 RepeatingClosure late_threads_barrier_closure = BarrierClosure(
1381 kNumExtraTasks, BindOnce(&TestWaitableEvent::Signal,
1382 Unretained(&late_blocking_threads_running)));
1383
1384 // Posts additional tasks. Note: we should already have |kMaxNumberOfWorkers|
1385 // tasks running. These tasks should not be able to get executed yet as the
1386 // thread group is already at its max worker cap.
1387 for (size_t i = 0; i < kNumExtraTasks; ++i) {
1388 task_runner_->PostTask(
1389 FROM_HERE, BindOnce(
1390 [](RepeatingClosure* late_threads_barrier_closure,
1391 TestWaitableEvent* late_release_thread_contine) {
1392 ScopedBlockingCall scoped_blocking_call(
1393 FROM_HERE, BlockingType::WILL_BLOCK);
1394 late_threads_barrier_closure->Run();
1395 late_release_thread_contine->Wait();
1396 },
1397 Unretained(&late_threads_barrier_closure),
1398 Unretained(&late_release_thread_contine)));
1399 }
1400
1401 // Give time to see if we exceed the max number of workers.
1402 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
1403 EXPECT_LE(thread_group_->NumberOfWorkersForTesting(), kMaxNumberOfWorkers);
1404
1405 early_release_threads_continue.Signal();
1406 early_threads_finished.Wait();
1407 late_blocking_threads_running.Wait();
1408
1409 TestWaitableEvent final_tasks_running;
1410 TestWaitableEvent final_tasks_continue;
1411 RepeatingClosure final_tasks_running_barrier = BarrierClosure(
1412 kMaxTasks,
1413 BindOnce(&TestWaitableEvent::Signal, Unretained(&final_tasks_running)));
1414
1415 // Verify that we are still able to saturate the thread group.
1416 for (size_t i = 0; i < kMaxTasks; ++i) {
1417 task_runner_->PostTask(FROM_HERE,
1418 BindOnce(
1419 [](RepeatingClosure* closure,
1420 TestWaitableEvent* final_tasks_continue) {
1421 closure->Run();
1422 final_tasks_continue->Wait();
1423 },
1424 Unretained(&final_tasks_running_barrier),
1425 Unretained(&final_tasks_continue)));
1426 }
1427 final_tasks_running.Wait();
1428 EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks + kNumExtraTasks);
1429 late_release_thread_contine.Signal();
1430 final_tasks_continue.Signal();
1431 task_tracker_.FlushForTesting();
1432 }
1433
1434 // Verify that the maximum number of best-effort tasks that can run concurrently
1435 // is honored.
TEST_F(ThreadGroupImplImplStartInBodyTest,MaxBestEffortTasks)1436 TEST_F(ThreadGroupImplImplStartInBodyTest, MaxBestEffortTasks) {
1437 constexpr int kMaxBestEffortTasks = kMaxTasks / 2;
1438 StartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1439 kMaxTasks, // |max_tasks|
1440 kMaxBestEffortTasks); // |max_best_effort_tasks|
1441 const scoped_refptr<TaskRunner> foreground_runner =
1442 test::CreatePooledTaskRunner({MayBlock()},
1443 &mock_pooled_task_runner_delegate_);
1444 const scoped_refptr<TaskRunner> background_runner =
1445 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1446 &mock_pooled_task_runner_delegate_);
1447
1448 // It should be possible to have |kMaxBestEffortTasks|
1449 // TaskPriority::BEST_EFFORT tasks running concurrently.
1450 TestWaitableEvent best_effort_tasks_running;
1451 TestWaitableEvent unblock_best_effort_tasks;
1452 RepeatingClosure best_effort_tasks_running_barrier = BarrierClosure(
1453 kMaxBestEffortTasks, BindOnce(&TestWaitableEvent::Signal,
1454 Unretained(&best_effort_tasks_running)));
1455
1456 for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1457 background_runner->PostTask(
1458 FROM_HERE, base::BindLambdaForTesting([&]() {
1459 best_effort_tasks_running_barrier.Run();
1460 unblock_best_effort_tasks.Wait();
1461 }));
1462 }
1463 best_effort_tasks_running.Wait();
1464
1465 // No more TaskPriority::BEST_EFFORT task should run.
1466 AtomicFlag extra_best_effort_task_can_run;
1467 TestWaitableEvent extra_best_effort_task_running;
1468 background_runner->PostTask(
1469 FROM_HERE, base::BindLambdaForTesting([&]() {
1470 EXPECT_TRUE(extra_best_effort_task_can_run.IsSet());
1471 extra_best_effort_task_running.Signal();
1472 }));
1473
1474 // An extra foreground task should be able to run.
1475 TestWaitableEvent foreground_task_running;
1476 foreground_runner->PostTask(
1477 FROM_HERE, base::BindOnce(&TestWaitableEvent::Signal,
1478 Unretained(&foreground_task_running)));
1479 foreground_task_running.Wait();
1480
1481 // Completion of the TaskPriority::BEST_EFFORT tasks should allow the extra
1482 // TaskPriority::BEST_EFFORT task to run.
1483 extra_best_effort_task_can_run.Set();
1484 unblock_best_effort_tasks.Signal();
1485 extra_best_effort_task_running.Wait();
1486
1487 // Wait for all tasks to complete before exiting to avoid invalid accesses.
1488 task_tracker_.FlushForTesting();
1489 }
1490
1491 // Verify that flooding the thread group with BEST_EFFORT tasks doesn't cause
1492 // the creation of more than |max_best_effort_tasks| + 1 workers.
TEST_F(ThreadGroupImplImplStartInBodyTest,FloodBestEffortTasksDoesNotCreateTooManyWorkers)1493 TEST_F(ThreadGroupImplImplStartInBodyTest,
1494 FloodBestEffortTasksDoesNotCreateTooManyWorkers) {
1495 constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
1496 StartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1497 kMaxTasks, // |max_tasks|
1498 kMaxBestEffortTasks); // |max_best_effort_tasks|
1499
1500 const scoped_refptr<TaskRunner> runner =
1501 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1502 &mock_pooled_task_runner_delegate_);
1503
1504 for (size_t i = 0; i < kLargeNumber; ++i) {
1505 runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1506 EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
1507 kMaxBestEffortTasks + 1);
1508 }));
1509 }
1510
1511 // Wait for all tasks to complete before exiting to avoid invalid accesses.
1512 task_tracker_.FlushForTesting();
1513 }
1514
1515 // Previously, a WILL_BLOCK ScopedBlockingCall unconditionally woke up a worker
1516 // if the priority queue was non-empty. Sometimes, that caused multiple workers
1517 // to be woken up for the same sequence. This test verifies that it is no longer
1518 // the case:
1519 // 1. Post and run task A.
1520 // 2. Post task B from task A.
1521 // 3. Task A enters a WILL_BLOCK ScopedBlockingCall. Once the idle thread is
1522 // created, this should no-op because there are already enough workers
1523 // (previously, a worker would be woken up because the priority queue isn't
1524 // empty).
1525 // 5. Wait for all tasks to complete.
TEST_F(ThreadGroupImplImplStartInBodyTest,RepeatedWillBlockDoesNotCreateTooManyWorkers)1526 TEST_F(ThreadGroupImplImplStartInBodyTest,
1527 RepeatedWillBlockDoesNotCreateTooManyWorkers) {
1528 constexpr size_t kNumWorkers = 2U;
1529 StartThreadGroup(TimeDelta::Max(), // |suggested_reclaim_time|
1530 kNumWorkers, // |max_tasks|
1531 absl::nullopt); // |max_best_effort_tasks|
1532 const scoped_refptr<TaskRunner> runner = test::CreatePooledTaskRunner(
1533 {MayBlock()}, &mock_pooled_task_runner_delegate_);
1534
1535 for (size_t i = 0; i < kLargeNumber; ++i) {
1536 runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1537 runner->PostTask(
1538 FROM_HERE, BindLambdaForTesting([&]() {
1539 EXPECT_LE(
1540 thread_group_->NumberOfWorkersForTesting(),
1541 kNumWorkers + 1);
1542 }));
1543 // Number of workers should not increase when there is
1544 // enough capacity to accommodate queued and running
1545 // sequences.
1546 ScopedBlockingCall scoped_blocking_call(
1547 FROM_HERE, BlockingType::WILL_BLOCK);
1548 EXPECT_EQ(kNumWorkers + 1,
1549 thread_group_->NumberOfWorkersForTesting());
1550 }));
1551 // Wait for all tasks to complete.
1552 task_tracker_.FlushForTesting();
1553 }
1554 }
1555
1556 namespace {
1557
1558 class ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest
1559 : public ThreadGroupImplImplTestBase,
1560 public testing::TestWithParam<BlockingType> {
1561 public:
1562 static constexpr int kMaxBestEffortTasks = kMaxTasks / 2;
1563
1564 ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest() = default;
1565 ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest(
1566 const ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest&) = delete;
1567 ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest& operator=(
1568 const ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest&) = delete;
1569
SetUp()1570 void SetUp() override {
1571 CreateThreadGroup();
1572 thread_group_->Start(kMaxTasks, kMaxBestEffortTasks, base::TimeDelta::Max(),
1573 service_thread_.task_runner(), nullptr,
1574 ThreadGroup::WorkerEnvironment::NONE);
1575 }
1576
TearDown()1577 void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
1578
1579 private:
1580 };
1581
1582 } // namespace
1583
TEST_P(ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,BlockingCallAndMaxBestEffortTasksTest)1584 TEST_P(ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
1585 BlockingCallAndMaxBestEffortTasksTest) {
1586 const scoped_refptr<TaskRunner> background_runner =
1587 test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1588 &mock_pooled_task_runner_delegate_);
1589
1590 // Post |kMaxBestEffortTasks| TaskPriority::BEST_EFFORT tasks that block in a
1591 // ScopedBlockingCall.
1592 TestWaitableEvent blocking_best_effort_tasks_running;
1593 TestWaitableEvent unblock_blocking_best_effort_tasks;
1594 RepeatingClosure blocking_best_effort_tasks_running_barrier =
1595 BarrierClosure(kMaxBestEffortTasks,
1596 BindOnce(&TestWaitableEvent::Signal,
1597 Unretained(&blocking_best_effort_tasks_running)));
1598 for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1599 background_runner->PostTask(
1600 FROM_HERE, base::BindLambdaForTesting([&]() {
1601 blocking_best_effort_tasks_running_barrier.Run();
1602 ScopedBlockingCall scoped_blocking_call(FROM_HERE, GetParam());
1603 unblock_blocking_best_effort_tasks.Wait();
1604 }));
1605 }
1606 blocking_best_effort_tasks_running.Wait();
1607
1608 // Post an extra |kMaxBestEffortTasks| TaskPriority::BEST_EFFORT tasks. They
1609 // should be able to run, because the existing TaskPriority::BEST_EFFORT tasks
1610 // are blocked within a ScopedBlockingCall.
1611 //
1612 // Note: We block the tasks until they have all started running to make sure
1613 // that it is possible to run an extra |kMaxBestEffortTasks| concurrently.
1614 TestWaitableEvent best_effort_tasks_running;
1615 TestWaitableEvent unblock_best_effort_tasks;
1616 RepeatingClosure best_effort_tasks_running_barrier = BarrierClosure(
1617 kMaxBestEffortTasks, BindOnce(&TestWaitableEvent::Signal,
1618 Unretained(&best_effort_tasks_running)));
1619 for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1620 background_runner->PostTask(
1621 FROM_HERE, base::BindLambdaForTesting([&]() {
1622 best_effort_tasks_running_barrier.Run();
1623 unblock_best_effort_tasks.Wait();
1624 }));
1625 }
1626 best_effort_tasks_running.Wait();
1627
1628 // Unblock all tasks and tear down.
1629 unblock_blocking_best_effort_tasks.Signal();
1630 unblock_best_effort_tasks.Signal();
1631 task_tracker_.FlushForTesting();
1632 }
1633
1634 INSTANTIATE_TEST_SUITE_P(MayBlock,
1635 ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
1636 ::testing::Values(BlockingType::MAY_BLOCK));
1637 INSTANTIATE_TEST_SUITE_P(WillBlock,
1638 ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
1639 ::testing::Values(BlockingType::WILL_BLOCK));
1640
1641 // Verify that worker detachment doesn't race with worker cleanup, regression
1642 // test for https://crbug.com/810464.
TEST_F(ThreadGroupImplImplStartInBodyTest,RacyCleanup)1643 TEST_F(ThreadGroupImplImplStartInBodyTest, RacyCleanup) {
1644 constexpr size_t kLocalMaxTasks = 256;
1645 constexpr TimeDelta kReclaimTimeForRacyCleanupTest = Milliseconds(10);
1646
1647 thread_group_->Start(kLocalMaxTasks, kLocalMaxTasks,
1648 kReclaimTimeForRacyCleanupTest,
1649 service_thread_.task_runner(), nullptr,
1650 ThreadGroup::WorkerEnvironment::NONE);
1651
1652 scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
1653 {WithBaseSyncPrimitives()}, &mock_pooled_task_runner_delegate_);
1654
1655 TestWaitableEvent threads_running;
1656 TestWaitableEvent unblock_threads;
1657 RepeatingClosure threads_running_barrier = BarrierClosure(
1658 kLocalMaxTasks,
1659 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
1660
1661 for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1662 task_runner->PostTask(
1663 FROM_HERE,
1664 BindOnce(
1665 [](OnceClosure on_running, TestWaitableEvent* unblock_threads) {
1666 std::move(on_running).Run();
1667 unblock_threads->Wait();
1668 },
1669 threads_running_barrier, Unretained(&unblock_threads)));
1670 }
1671
1672 // Wait for all workers to be ready and release them all at once.
1673 threads_running.Wait();
1674 unblock_threads.Signal();
1675
1676 // Sleep to wakeup precisely when all workers are going to try to cleanup per
1677 // being idle.
1678 PlatformThread::Sleep(kReclaimTimeForRacyCleanupTest);
1679
1680 thread_group_->JoinForTesting();
1681
1682 // Unwinding this test will be racy if worker cleanup can race with
1683 // ThreadGroupImpl destruction : https://crbug.com/810464.
1684 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
1685 thread_group_.reset();
1686 }
1687
1688 } // namespace internal
1689 } // namespace base
1690