• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group_impl.h"
6 
7 #include <stddef.h>
8 
9 #include <algorithm>
10 #include <atomic>
11 #include <memory>
12 #include <unordered_set>
13 #include <utility>
14 #include <vector>
15 
16 #include "base/atomicops.h"
17 #include "base/barrier_closure.h"
18 #include "base/functional/bind.h"
19 #include "base/functional/callback.h"
20 #include "base/functional/callback_helpers.h"
21 #include "base/memory/ptr_util.h"
22 #include "base/memory/raw_ptr.h"
23 #include "base/memory/ref_counted.h"
24 #include "base/metrics/statistics_recorder.h"
25 #include "base/synchronization/atomic_flag.h"
26 #include "base/synchronization/condition_variable.h"
27 #include "base/synchronization/lock.h"
28 #include "base/task/task_features.h"
29 #include "base/task/task_runner.h"
30 #include "base/task/thread_pool/delayed_task_manager.h"
31 #include "base/task/thread_pool/environment_config.h"
32 #include "base/task/thread_pool/pooled_task_runner_delegate.h"
33 #include "base/task/thread_pool/sequence.h"
34 #include "base/task/thread_pool/task_source_sort_key.h"
35 #include "base/task/thread_pool/task_tracker.h"
36 #include "base/task/thread_pool/test_task_factory.h"
37 #include "base/task/thread_pool/test_utils.h"
38 #include "base/task/thread_pool/worker_thread_observer.h"
39 #include "base/test/bind.h"
40 #include "base/test/gtest_util.h"
41 #include "base/test/scoped_feature_list.h"
42 #include "base/test/test_simple_task_runner.h"
43 #include "base/test/test_timeouts.h"
44 #include "base/test/test_waitable_event.h"
45 #include "base/threading/platform_thread.h"
46 #include "base/threading/scoped_blocking_call.h"
47 #include "base/threading/simple_thread.h"
48 #include "base/threading/thread.h"
49 #include "base/threading/thread_checker_impl.h"
50 #include "base/time/time.h"
51 #include "base/timer/timer.h"
52 #include "build/build_config.h"
53 #include "testing/gtest/include/gtest/gtest.h"
54 #include "third_party/abseil-cpp/absl/types/optional.h"
55 
56 namespace base {
57 namespace internal {
58 namespace {
59 
60 constexpr size_t kMaxTasks = 4;
61 constexpr size_t kNumThreadsPostingTasks = 4;
62 constexpr size_t kNumTasksPostedPerThread = 150;
63 // This can't be lower because Windows' TestWaitableEvent wakes up too early
64 // when a small timeout is used. This results in many spurious wake ups before a
65 // worker is allowed to cleanup.
66 constexpr TimeDelta kReclaimTimeForCleanupTests = Milliseconds(500);
67 constexpr size_t kLargeNumber = 512;
68 
69 class ThreadGroupImplImplTestBase : public ThreadGroup::Delegate {
70  public:
71   ThreadGroupImplImplTestBase(const ThreadGroupImplImplTestBase&) = delete;
72   ThreadGroupImplImplTestBase& operator=(const ThreadGroupImplImplTestBase&) =
73       delete;
74 
75  protected:
ThreadGroupImplImplTestBase()76   ThreadGroupImplImplTestBase()
77       : service_thread_("ThreadPoolServiceThread"),
78         tracked_ref_factory_(this) {}
79 
CommonTearDown()80   void CommonTearDown() {
81     delayed_task_manager_.Shutdown();
82     service_thread_.Stop();
83     task_tracker_.FlushForTesting();
84     if (thread_group_)
85       thread_group_->JoinForTesting();
86     mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
87     thread_group_.reset();
88   }
89 
CreateThreadGroup(ThreadType thread_type=ThreadType::kDefault)90   void CreateThreadGroup(ThreadType thread_type = ThreadType::kDefault) {
91     ASSERT_FALSE(thread_group_);
92     service_thread_.Start();
93     delayed_task_manager_.Start(service_thread_.task_runner());
94     thread_group_ = std::make_unique<ThreadGroupImpl>(
95         "TestThreadGroup", "A", thread_type, task_tracker_.GetTrackedRef(),
96         tracked_ref_factory_.GetTrackedRef());
97     ASSERT_TRUE(thread_group_);
98 
99     mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
100   }
101 
StartThreadGroup(TimeDelta suggested_reclaim_time,size_t max_tasks,absl::optional<int> max_best_effort_tasks=absl::nullopt,WorkerThreadObserver * worker_observer=nullptr,absl::optional<TimeDelta> may_block_threshold=absl::nullopt)102   void StartThreadGroup(
103       TimeDelta suggested_reclaim_time,
104       size_t max_tasks,
105       absl::optional<int> max_best_effort_tasks = absl::nullopt,
106       WorkerThreadObserver* worker_observer = nullptr,
107       absl::optional<TimeDelta> may_block_threshold = absl::nullopt) {
108     ASSERT_TRUE(thread_group_);
109     thread_group_->Start(
110         max_tasks,
111         max_best_effort_tasks ? max_best_effort_tasks.value() : max_tasks,
112         suggested_reclaim_time, service_thread_.task_runner(), worker_observer,
113         ThreadGroup::WorkerEnvironment::NONE,
114         /* synchronous_thread_start_for_testing=*/false, may_block_threshold);
115   }
116 
CreateAndStartThreadGroup(TimeDelta suggested_reclaim_time=TimeDelta::Max (),size_t max_tasks=kMaxTasks,absl::optional<int> max_best_effort_tasks=absl::nullopt,WorkerThreadObserver * worker_observer=nullptr,absl::optional<TimeDelta> may_block_threshold=absl::nullopt)117   void CreateAndStartThreadGroup(
118       TimeDelta suggested_reclaim_time = TimeDelta::Max(),
119       size_t max_tasks = kMaxTasks,
120       absl::optional<int> max_best_effort_tasks = absl::nullopt,
121       WorkerThreadObserver* worker_observer = nullptr,
122       absl::optional<TimeDelta> may_block_threshold = absl::nullopt) {
123     CreateThreadGroup();
124     StartThreadGroup(suggested_reclaim_time, max_tasks, max_best_effort_tasks,
125                      worker_observer, may_block_threshold);
126   }
127 
128   Thread service_thread_;
129   TaskTracker task_tracker_;
130   std::unique_ptr<ThreadGroupImpl> thread_group_;
131   DelayedTaskManager delayed_task_manager_;
132   TrackedRefFactory<ThreadGroup::Delegate> tracked_ref_factory_;
133   test::MockPooledTaskRunnerDelegate mock_pooled_task_runner_delegate_ = {
134       task_tracker_.GetTrackedRef(), &delayed_task_manager_};
135 
136  private:
137   // ThreadGroup::Delegate:
GetThreadGroupForTraits(const TaskTraits & traits)138   ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override {
139     return thread_group_.get();
140   }
141 };
142 
143 class ThreadGroupImplImplTest : public ThreadGroupImplImplTestBase,
144                                 public testing::Test {
145  public:
146   ThreadGroupImplImplTest(const ThreadGroupImplImplTest&) = delete;
147   ThreadGroupImplImplTest& operator=(const ThreadGroupImplImplTest&) = delete;
148 
149  protected:
150   ThreadGroupImplImplTest() = default;
151 
SetUp()152   void SetUp() override { CreateAndStartThreadGroup(); }
153 
TearDown()154   void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
155 };
156 
157 class ThreadGroupImplImplTestParam
158     : public ThreadGroupImplImplTestBase,
159       public testing::TestWithParam<TaskSourceExecutionMode> {
160  public:
161   ThreadGroupImplImplTestParam(const ThreadGroupImplImplTestParam&) = delete;
162   ThreadGroupImplImplTestParam& operator=(const ThreadGroupImplImplTestParam&) =
163       delete;
164 
165  protected:
166   ThreadGroupImplImplTestParam() = default;
167 
SetUp()168   void SetUp() override { CreateAndStartThreadGroup(); }
169 
TearDown()170   void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
171 };
172 
173 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
174 
175 class ThreadPostingTasksWaitIdle : public SimpleThread {
176  public:
177   // Constructs a thread that posts tasks to |thread_group| through an
178   // |execution_mode| task runner. The thread waits until all workers in
179   // |thread_group| are idle before posting a new task.
ThreadPostingTasksWaitIdle(ThreadGroupImpl * thread_group,test::MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate_,TaskSourceExecutionMode execution_mode)180   ThreadPostingTasksWaitIdle(
181       ThreadGroupImpl* thread_group,
182       test::MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate_,
183       TaskSourceExecutionMode execution_mode)
184       : SimpleThread("ThreadPostingTasksWaitIdle"),
185         thread_group_(thread_group),
186         factory_(CreatePooledTaskRunnerWithExecutionMode(
187                      execution_mode,
188                      mock_pooled_task_runner_delegate_),
189                  execution_mode) {
190     DCHECK(thread_group_);
191   }
192   ThreadPostingTasksWaitIdle(const ThreadPostingTasksWaitIdle&) = delete;
193   ThreadPostingTasksWaitIdle& operator=(const ThreadPostingTasksWaitIdle&) =
194       delete;
195 
factory() const196   const test::TestTaskFactory* factory() const { return &factory_; }
197 
198  private:
Run()199   void Run() override {
200     for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
201       thread_group_->WaitForAllWorkersIdleForTesting();
202       EXPECT_TRUE(factory_.PostTask(PostNestedTask::NO, OnceClosure()));
203     }
204   }
205 
206   const raw_ptr<ThreadGroupImpl> thread_group_;
207   const scoped_refptr<TaskRunner> task_runner_;
208   test::TestTaskFactory factory_;
209 };
210 
211 }  // namespace
212 
TEST_P(ThreadGroupImplImplTestParam,PostTasksWaitAllWorkersIdle)213 TEST_P(ThreadGroupImplImplTestParam, PostTasksWaitAllWorkersIdle) {
214   // Create threads to post tasks. To verify that workers can sleep and be woken
215   // up when new tasks are posted, wait for all workers to become idle before
216   // posting a new task.
217   std::vector<std::unique_ptr<ThreadPostingTasksWaitIdle>>
218       threads_posting_tasks;
219   for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
220     threads_posting_tasks.push_back(
221         std::make_unique<ThreadPostingTasksWaitIdle>(
222             thread_group_.get(), &mock_pooled_task_runner_delegate_,
223             GetParam()));
224     threads_posting_tasks.back()->Start();
225   }
226 
227   // Wait for all tasks to run.
228   for (const auto& thread_posting_tasks : threads_posting_tasks) {
229     thread_posting_tasks->Join();
230     thread_posting_tasks->factory()->WaitForAllTasksToRun();
231   }
232 
233   // Wait until all workers are idle to be sure that no task accesses its
234   // TestTaskFactory after |thread_posting_tasks| is destroyed.
235   thread_group_->WaitForAllWorkersIdleForTesting();
236 }
237 
TEST_P(ThreadGroupImplImplTestParam,PostTasksWithOneAvailableWorker)238 TEST_P(ThreadGroupImplImplTestParam, PostTasksWithOneAvailableWorker) {
239   // Post blocking tasks to keep all workers busy except one until |event| is
240   // signaled. Use different factories so that tasks are added to different
241   // sequences and can run simultaneously when the execution mode is SEQUENCED.
242   TestWaitableEvent event;
243   std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories;
244   for (size_t i = 0; i < (kMaxTasks - 1); ++i) {
245     blocked_task_factories.push_back(std::make_unique<test::TestTaskFactory>(
246         CreatePooledTaskRunnerWithExecutionMode(
247             GetParam(), &mock_pooled_task_runner_delegate_),
248         GetParam()));
249     EXPECT_TRUE(blocked_task_factories.back()->PostTask(
250         PostNestedTask::NO,
251         BindOnce(&TestWaitableEvent::Wait, Unretained(&event))));
252     blocked_task_factories.back()->WaitForAllTasksToRun();
253   }
254 
255   // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact
256   // that only one worker in |thread_group_| isn't busy.
257   test::TestTaskFactory short_task_factory(
258       CreatePooledTaskRunnerWithExecutionMode(
259           GetParam(), &mock_pooled_task_runner_delegate_),
260       GetParam());
261   for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
262     EXPECT_TRUE(short_task_factory.PostTask(PostNestedTask::NO, OnceClosure()));
263   short_task_factory.WaitForAllTasksToRun();
264 
265   // Release tasks waiting on |event|.
266   event.Signal();
267 
268   // Wait until all workers are idle to be sure that no task accesses
269   // its TestTaskFactory after it is destroyed.
270   thread_group_->WaitForAllWorkersIdleForTesting();
271 }
272 
TEST_P(ThreadGroupImplImplTestParam,Saturate)273 TEST_P(ThreadGroupImplImplTestParam, Saturate) {
274   // Verify that it is possible to have |kMaxTasks| tasks/sequences running
275   // simultaneously. Use different factories so that the blocking tasks are
276   // added to different sequences and can run simultaneously when the execution
277   // mode is SEQUENCED.
278   TestWaitableEvent event;
279   std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
280   for (size_t i = 0; i < kMaxTasks; ++i) {
281     factories.push_back(std::make_unique<test::TestTaskFactory>(
282         CreatePooledTaskRunnerWithExecutionMode(
283             GetParam(), &mock_pooled_task_runner_delegate_),
284         GetParam()));
285     EXPECT_TRUE(factories.back()->PostTask(
286         PostNestedTask::NO,
287         BindOnce(&TestWaitableEvent::Wait, Unretained(&event))));
288     factories.back()->WaitForAllTasksToRun();
289   }
290 
291   // Release tasks waiting on |event|.
292   event.Signal();
293 
294   // Wait until all workers are idle to be sure that no task accesses
295   // its TestTaskFactory after it is destroyed.
296   thread_group_->WaitForAllWorkersIdleForTesting();
297 }
298 
299 // Verifies that ShouldYield() returns true for priorities lower than the
300 // highest priority pending while the thread group is flooded with USER_VISIBLE
301 // tasks.
TEST_F(ThreadGroupImplImplTest,ShouldYieldFloodedUserVisible)302 TEST_F(ThreadGroupImplImplTest, ShouldYieldFloodedUserVisible) {
303   TestWaitableEvent threads_running;
304   TestWaitableEvent threads_continue;
305 
306   // Saturate workers with USER_VISIBLE tasks to ensure ShouldYield() returns
307   // true when a tasks of higher priority is posted.
308   RepeatingClosure threads_running_barrier = BarrierClosure(
309       kMaxTasks,
310       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
311 
312   auto job_task = base::MakeRefCounted<test::MockJobTask>(
313       BindLambdaForTesting(
314           [&threads_running_barrier, &threads_continue](JobDelegate* delegate) {
315             threads_running_barrier.Run();
316             threads_continue.Wait();
317           }),
318       /* num_tasks_to_run */ kMaxTasks);
319   scoped_refptr<JobTaskSource> task_source =
320       job_task->GetJobTaskSource(FROM_HERE, {TaskPriority::USER_VISIBLE},
321                                  &mock_pooled_task_runner_delegate_);
322   task_source->NotifyConcurrencyIncrease();
323 
324   threads_running.Wait();
325 
326   // Posting a BEST_EFFORT task should not cause any other tasks to yield.
327   // Once this task gets to run, no other task needs to yield.
328   // Note: This is only true because this test is using a single ThreadGroup.
329   //       Under the ThreadPool this wouldn't be racy because BEST_EFFORT tasks
330   //       run in an independent ThreadGroup.
331   test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT},
332                                &mock_pooled_task_runner_delegate_)
333       ->PostTask(
334           FROM_HERE, BindLambdaForTesting([&]() {
335             EXPECT_FALSE(thread_group_->ShouldYield(
336                 {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/1}));
337           }));
338   // A BEST_EFFORT task with more workers shouldn't have to yield.
339   EXPECT_FALSE(thread_group_->ShouldYield(
340       {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/2}));
341   EXPECT_FALSE(thread_group_->ShouldYield(
342       {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
343   EXPECT_FALSE(thread_group_->ShouldYield(
344       {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/0}));
345   EXPECT_FALSE(thread_group_->ShouldYield(
346       {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/0}));
347 
348   // Posting a USER_VISIBLE task should cause BEST_EFFORT and USER_VISIBLE with
349   // higher worker_count tasks to yield.
350   auto post_user_visible = [&]() {
351     test::CreatePooledTaskRunner({TaskPriority::USER_VISIBLE},
352                                  &mock_pooled_task_runner_delegate_)
353         ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
354                      EXPECT_FALSE(thread_group_->ShouldYield(
355                          {TaskPriority::USER_VISIBLE, TimeTicks(),
356                           /* worker_count=*/1}));
357                    }));
358   };
359   // A USER_VISIBLE task with too many workers should yield.
360   post_user_visible();
361   EXPECT_TRUE(thread_group_->ShouldYield(
362       {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/2}));
363   post_user_visible();
364   EXPECT_TRUE(thread_group_->ShouldYield(
365       {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
366   post_user_visible();
367   EXPECT_FALSE(thread_group_->ShouldYield(
368       {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/1}));
369   EXPECT_FALSE(thread_group_->ShouldYield(
370       {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/0}));
371 
372   // Posting a USER_BLOCKING task should cause BEST_EFFORT, USER_VISIBLE and
373   // USER_BLOCKING with higher worker_count tasks to yield.
374   auto post_user_blocking = [&]() {
375     test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
376                                  &mock_pooled_task_runner_delegate_)
377         ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
378                      // Once this task got to start, no other task needs to
379                      // yield.
380                      EXPECT_FALSE(thread_group_->ShouldYield(
381                          {TaskPriority::USER_BLOCKING, TimeTicks(),
382                           /* worker_count=*/1}));
383                    }));
384   };
385   // A USER_BLOCKING task with too many workers should have to yield.
386   post_user_blocking();
387   EXPECT_TRUE(thread_group_->ShouldYield(
388       {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/2}));
389   post_user_blocking();
390   EXPECT_TRUE(thread_group_->ShouldYield(
391       {TaskPriority::BEST_EFFORT, TimeTicks(), /* worker_count=*/0}));
392   post_user_blocking();
393   EXPECT_TRUE(thread_group_->ShouldYield(
394       {TaskPriority::USER_VISIBLE, TimeTicks(), /* worker_count=*/0}));
395   post_user_blocking();
396   EXPECT_FALSE(thread_group_->ShouldYield(
397       {TaskPriority::USER_BLOCKING, TimeTicks(), /* worker_count=*/1}));
398 
399   threads_continue.Signal();
400   task_tracker_.FlushForTesting();
401 }
402 
403 INSTANTIATE_TEST_SUITE_P(Parallel,
404                          ThreadGroupImplImplTestParam,
405                          ::testing::Values(TaskSourceExecutionMode::kParallel));
406 INSTANTIATE_TEST_SUITE_P(
407     Sequenced,
408     ThreadGroupImplImplTestParam,
409     ::testing::Values(TaskSourceExecutionMode::kSequenced));
410 
411 INSTANTIATE_TEST_SUITE_P(Job,
412                          ThreadGroupImplImplTestParam,
413                          ::testing::Values(TaskSourceExecutionMode::kJob));
414 
415 namespace {
416 
417 class ThreadGroupImplImplStartInBodyTest : public ThreadGroupImplImplTest {
418  public:
SetUp()419   void SetUp() override {
420     CreateThreadGroup();
421     // Let the test start the thread group.
422   }
423 };
424 
TaskPostedBeforeStart(PlatformThreadRef * platform_thread_ref,TestWaitableEvent * task_running,TestWaitableEvent * barrier)425 void TaskPostedBeforeStart(PlatformThreadRef* platform_thread_ref,
426                            TestWaitableEvent* task_running,
427                            TestWaitableEvent* barrier) {
428   *platform_thread_ref = PlatformThread::CurrentRef();
429   task_running->Signal();
430   barrier->Wait();
431 }
432 
433 }  // namespace
434 
435 // Verify that 2 tasks posted before Start() to a ThreadGroupImpl with
436 // more than 2 workers run on different workers when Start() is called.
TEST_F(ThreadGroupImplImplStartInBodyTest,PostTasksBeforeStart)437 TEST_F(ThreadGroupImplImplStartInBodyTest, PostTasksBeforeStart) {
438   PlatformThreadRef task_1_thread_ref;
439   PlatformThreadRef task_2_thread_ref;
440   TestWaitableEvent task_1_running;
441   TestWaitableEvent task_2_running;
442 
443   // This event is used to prevent a task from completing before the other task
444   // starts running. If that happened, both tasks could run on the same worker
445   // and this test couldn't verify that the correct number of workers were woken
446   // up.
447   TestWaitableEvent barrier;
448 
449   test::CreatePooledTaskRunner({WithBaseSyncPrimitives()},
450                                &mock_pooled_task_runner_delegate_)
451       ->PostTask(
452           FROM_HERE,
453           BindOnce(&TaskPostedBeforeStart, Unretained(&task_1_thread_ref),
454                    Unretained(&task_1_running), Unretained(&barrier)));
455   test::CreatePooledTaskRunner({WithBaseSyncPrimitives()},
456                                &mock_pooled_task_runner_delegate_)
457       ->PostTask(
458           FROM_HERE,
459           BindOnce(&TaskPostedBeforeStart, Unretained(&task_2_thread_ref),
460                    Unretained(&task_2_running), Unretained(&barrier)));
461 
462   // Workers should not be created and tasks should not run before the thread
463   // group is started.
464   EXPECT_EQ(0U, thread_group_->NumberOfWorkersForTesting());
465   EXPECT_FALSE(task_1_running.IsSignaled());
466   EXPECT_FALSE(task_2_running.IsSignaled());
467 
468   StartThreadGroup(TimeDelta::Max(), kMaxTasks);
469 
470   // Tasks should run shortly after the thread group is started.
471   task_1_running.Wait();
472   task_2_running.Wait();
473 
474   // Tasks should run on different threads.
475   EXPECT_NE(task_1_thread_ref, task_2_thread_ref);
476 
477   barrier.Signal();
478   task_tracker_.FlushForTesting();
479 }
480 
481 // Verify that posting many tasks before Start will cause the number of workers
482 // to grow to |max_tasks_| after Start.
TEST_F(ThreadGroupImplImplStartInBodyTest,PostManyTasks)483 TEST_F(ThreadGroupImplImplStartInBodyTest, PostManyTasks) {
484   scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
485       {WithBaseSyncPrimitives()}, &mock_pooled_task_runner_delegate_);
486   constexpr size_t kNumTasksPosted = 2 * kMaxTasks;
487 
488   TestWaitableEvent threads_running;
489   TestWaitableEvent threads_continue;
490 
491   RepeatingClosure threads_running_barrier = BarrierClosure(
492       kMaxTasks,
493       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
494   // Posting these tasks should cause new workers to be created.
495   for (size_t i = 0; i < kMaxTasks; ++i) {
496     task_runner->PostTask(
497         FROM_HERE, BindLambdaForTesting([&]() {
498           threads_running_barrier.Run();
499           threads_continue.Wait();
500         }));
501   }
502   // Post the remaining |kNumTasksPosted - kMaxTasks| tasks, don't wait for them
503   // as they'll be blocked behind the above kMaxtasks.
504   for (size_t i = kMaxTasks; i < kNumTasksPosted; ++i)
505     task_runner->PostTask(FROM_HERE, DoNothing());
506 
507   EXPECT_EQ(0U, thread_group_->NumberOfWorkersForTesting());
508 
509   StartThreadGroup(TimeDelta::Max(), kMaxTasks);
510   EXPECT_GT(thread_group_->NumberOfWorkersForTesting(), 0U);
511   EXPECT_EQ(kMaxTasks, thread_group_->GetMaxTasksForTesting());
512 
513   threads_running.Wait();
514   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(),
515             thread_group_->GetMaxTasksForTesting());
516   threads_continue.Signal();
517   task_tracker_.FlushForTesting();
518 }
519 
520 namespace {
521 
522 class BackgroundThreadGroupImplTest : public ThreadGroupImplImplTest {
523  public:
CreateAndStartThreadGroup(TimeDelta suggested_reclaim_time=TimeDelta::Max (),size_t max_tasks=kMaxTasks,absl::optional<int> max_best_effort_tasks=absl::nullopt,WorkerThreadObserver * worker_observer=nullptr,absl::optional<TimeDelta> may_block_threshold=absl::nullopt)524   void CreateAndStartThreadGroup(
525       TimeDelta suggested_reclaim_time = TimeDelta::Max(),
526       size_t max_tasks = kMaxTasks,
527       absl::optional<int> max_best_effort_tasks = absl::nullopt,
528       WorkerThreadObserver* worker_observer = nullptr,
529       absl::optional<TimeDelta> may_block_threshold = absl::nullopt) {
530     if (!CanUseBackgroundThreadTypeForWorkerThread())
531       return;
532     CreateThreadGroup(ThreadType::kBackground);
533     StartThreadGroup(suggested_reclaim_time, max_tasks, max_best_effort_tasks,
534                      worker_observer, may_block_threshold);
535   }
536 
SetUp()537   void SetUp() override { CreateAndStartThreadGroup(); }
538 };
539 
540 }  // namespace
541 
542 // Verify that ScopedBlockingCall updates thread type when necessary per
543 // shutdown state.
TEST_F(BackgroundThreadGroupImplTest,UpdatePriorityBlockingStarted)544 TEST_F(BackgroundThreadGroupImplTest, UpdatePriorityBlockingStarted) {
545   if (!CanUseBackgroundThreadTypeForWorkerThread())
546     return;
547 
548   const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
549       {MayBlock(), WithBaseSyncPrimitives(), TaskPriority::BEST_EFFORT},
550       &mock_pooled_task_runner_delegate_);
551 
552   TestWaitableEvent threads_running;
553   RepeatingClosure threads_running_barrier = BarrierClosure(
554       kMaxTasks,
555       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
556 
557   TestWaitableEvent blocking_threads_continue;
558 
559   for (size_t i = 0; i < kMaxTasks; ++i) {
560     task_runner->PostTask(
561         FROM_HERE, BindLambdaForTesting([&]() {
562           EXPECT_EQ(ThreadType::kBackground,
563                     PlatformThread::GetCurrentThreadType());
564           {
565             // ScopedBlockingCall before shutdown doesn't affect priority.
566             ScopedBlockingCall scoped_blocking_call(FROM_HERE,
567                                                     BlockingType::MAY_BLOCK);
568             EXPECT_EQ(ThreadType::kBackground,
569                       PlatformThread::GetCurrentThreadType());
570           }
571           threads_running_barrier.Run();
572           blocking_threads_continue.Wait();
573           // This is reached after StartShutdown(), at which point we expect
574           // ScopedBlockingCall to update thread priority.
575           ScopedBlockingCall scoped_blocking_call(FROM_HERE,
576                                                   BlockingType::MAY_BLOCK);
577           EXPECT_EQ(ThreadType::kDefault,
578                     PlatformThread::GetCurrentThreadType());
579         }));
580   }
581   threads_running.Wait();
582 
583   task_tracker_.StartShutdown();
584   blocking_threads_continue.Signal();
585   task_tracker_.FlushForTesting();
586 }
587 
588 namespace {
589 
590 class ThreadGroupImplStandbyPolicyTest : public ThreadGroupImplImplTestBase,
591                                          public testing::Test {
592  public:
593   ThreadGroupImplStandbyPolicyTest() = default;
594   ThreadGroupImplStandbyPolicyTest(const ThreadGroupImplStandbyPolicyTest&) =
595       delete;
596   ThreadGroupImplStandbyPolicyTest& operator=(
597       const ThreadGroupImplStandbyPolicyTest&) = delete;
598 
SetUp()599   void SetUp() override {
600     CreateAndStartThreadGroup(kReclaimTimeForCleanupTests);
601   }
602 
TearDown()603   void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
604 };
605 
606 }  // namespace
607 
TEST_F(ThreadGroupImplStandbyPolicyTest,InitOne)608 TEST_F(ThreadGroupImplStandbyPolicyTest, InitOne) {
609   EXPECT_EQ(1U, thread_group_->NumberOfWorkersForTesting());
610 }
611 
612 namespace {
613 
614 enum class OptionalBlockingType {
615   NO_BLOCK,
616   MAY_BLOCK,
617   WILL_BLOCK,
618 };
619 
620 struct NestedBlockingType {
NestedBlockingTypebase::internal::__anona2e50db30d11::NestedBlockingType621   NestedBlockingType(BlockingType first_in,
622                      OptionalBlockingType second_in,
623                      BlockingType behaves_as_in)
624       : first(first_in), second(second_in), behaves_as(behaves_as_in) {}
625 
626   BlockingType first;
627   OptionalBlockingType second;
628   BlockingType behaves_as;
629 };
630 
631 class NestedScopedBlockingCall {
632  public:
NestedScopedBlockingCall(const NestedBlockingType & nested_blocking_type)633   explicit NestedScopedBlockingCall(
634       const NestedBlockingType& nested_blocking_type)
635       : first_scoped_blocking_call_(FROM_HERE, nested_blocking_type.first),
636         second_scoped_blocking_call_(
637             nested_blocking_type.second == OptionalBlockingType::WILL_BLOCK
638                 ? std::make_unique<ScopedBlockingCall>(FROM_HERE,
639                                                        BlockingType::WILL_BLOCK)
640                 : (nested_blocking_type.second ==
641                            OptionalBlockingType::MAY_BLOCK
642                        ? std::make_unique<ScopedBlockingCall>(
643                              FROM_HERE,
644                              BlockingType::MAY_BLOCK)
645                        : nullptr)) {}
646   NestedScopedBlockingCall(const NestedScopedBlockingCall&) = delete;
647   NestedScopedBlockingCall& operator=(const NestedScopedBlockingCall&) = delete;
648 
649  private:
650   ScopedBlockingCall first_scoped_blocking_call_;
651   std::unique_ptr<ScopedBlockingCall> second_scoped_blocking_call_;
652 };
653 
654 }  // namespace
655 
656 class ThreadGroupImplBlockingTest
657     : public ThreadGroupImplImplTestBase,
658       public testing::TestWithParam<NestedBlockingType> {
659  public:
660   ThreadGroupImplBlockingTest() = default;
661   ThreadGroupImplBlockingTest(const ThreadGroupImplBlockingTest&) = delete;
662   ThreadGroupImplBlockingTest& operator=(const ThreadGroupImplBlockingTest&) =
663       delete;
664 
ParamInfoToString(::testing::TestParamInfo<NestedBlockingType> param_info)665   static std::string ParamInfoToString(
666       ::testing::TestParamInfo<NestedBlockingType> param_info) {
667     std::string str = param_info.param.first == BlockingType::MAY_BLOCK
668                           ? "MAY_BLOCK"
669                           : "WILL_BLOCK";
670     if (param_info.param.second == OptionalBlockingType::MAY_BLOCK)
671       str += "_MAY_BLOCK";
672     else if (param_info.param.second == OptionalBlockingType::WILL_BLOCK)
673       str += "_WILL_BLOCK";
674     return str;
675   }
676 
TearDown()677   void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
678 
679  protected:
680   // Saturates the thread group with a task that first blocks, waits to be
681   // unblocked, then exits.
SaturateWithBlockingTasks(const NestedBlockingType & nested_blocking_type,TaskPriority priority=TaskPriority::USER_BLOCKING)682   void SaturateWithBlockingTasks(
683       const NestedBlockingType& nested_blocking_type,
684       TaskPriority priority = TaskPriority::USER_BLOCKING) {
685     TestWaitableEvent threads_running;
686 
687     const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
688         {MayBlock(), WithBaseSyncPrimitives(), priority},
689         &mock_pooled_task_runner_delegate_);
690 
691     RepeatingClosure threads_running_barrier = BarrierClosure(
692         kMaxTasks,
693         BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
694 
695     for (size_t i = 0; i < kMaxTasks; ++i) {
696       task_runner->PostTask(
697           FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier,
698                                            nested_blocking_type]() {
699             NestedScopedBlockingCall nested_scoped_blocking_call(
700                 nested_blocking_type);
701             threads_running_barrier.Run();
702             blocking_threads_continue_.Wait();
703           }));
704     }
705     threads_running.Wait();
706   }
707 
708   // Saturates the thread group with a task that waits for other tasks without
709   // entering a ScopedBlockingCall, then exits.
SaturateWithBusyTasks(TaskPriority priority=TaskPriority::USER_BLOCKING,TaskShutdownBehavior shutdown_behavior=TaskShutdownBehavior::SKIP_ON_SHUTDOWN)710   void SaturateWithBusyTasks(
711       TaskPriority priority = TaskPriority::USER_BLOCKING,
712       TaskShutdownBehavior shutdown_behavior =
713           TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
714     TestWaitableEvent threads_running;
715 
716     const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
717         {MayBlock(), WithBaseSyncPrimitives(), priority, shutdown_behavior},
718         &mock_pooled_task_runner_delegate_);
719 
720     RepeatingClosure threads_running_barrier = BarrierClosure(
721         kMaxTasks,
722         BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
723     // Posting these tasks should cause new workers to be created.
724     for (size_t i = 0; i < kMaxTasks; ++i) {
725       task_runner->PostTask(
726           FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier]() {
727             threads_running_barrier.Run();
728             busy_threads_continue_.Wait();
729           }));
730     }
731     threads_running.Wait();
732   }
733 
734   // Returns how long we can expect a change to |max_tasks_| to occur
735   // after a task has become blocked.
GetMaxTasksChangeSleepTime()736   TimeDelta GetMaxTasksChangeSleepTime() {
737     return std::max(thread_group_->blocked_workers_poll_period_for_testing(),
738                     thread_group_->may_block_threshold_for_testing()) +
739            TestTimeouts::tiny_timeout();
740   }
741 
742   // Waits indefinitely, until |thread_group_|'s max tasks increases to
743   // |expected_max_tasks|.
ExpectMaxTasksIncreasesTo(size_t expected_max_tasks)744   void ExpectMaxTasksIncreasesTo(size_t expected_max_tasks) {
745     size_t max_tasks = thread_group_->GetMaxTasksForTesting();
746     while (max_tasks != expected_max_tasks) {
747       PlatformThread::Sleep(GetMaxTasksChangeSleepTime());
748       size_t new_max_tasks = thread_group_->GetMaxTasksForTesting();
749       ASSERT_GE(new_max_tasks, max_tasks);
750       max_tasks = new_max_tasks;
751     }
752   }
753 
754   // Unblocks tasks posted by SaturateWithBlockingTasks().
UnblockBlockingTasks()755   void UnblockBlockingTasks() { blocking_threads_continue_.Signal(); }
756 
757   // Unblocks tasks posted by SaturateWithBusyTasks().
UnblockBusyTasks()758   void UnblockBusyTasks() { busy_threads_continue_.Signal(); }
759 
760   const scoped_refptr<TaskRunner> task_runner_ =
761       test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
762                                    &mock_pooled_task_runner_delegate_);
763 
764  private:
765   TestWaitableEvent blocking_threads_continue_;
766   TestWaitableEvent busy_threads_continue_;
767 };
768 
769 // Verify that SaturateWithBlockingTasks() causes max tasks to increase and
770 // creates a worker if needed. Also verify that UnblockBlockingTasks() decreases
771 // max tasks after an increase.
TEST_P(ThreadGroupImplBlockingTest,ThreadBlockedUnblocked)772 TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblocked) {
773   CreateAndStartThreadGroup();
774 
775   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
776 
777   SaturateWithBlockingTasks(GetParam());
778 
779   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
780   // should not block forever.
781   SaturateWithBusyTasks();
782 
783   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
784 
785   UnblockBusyTasks();
786   UnblockBlockingTasks();
787   task_tracker_.FlushForTesting();
788   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
789 }
790 
791 // Verify that SaturateWithBlockingTasks() of BEST_EFFORT tasks causes max best
792 // effort tasks to increase and creates a worker if needed. Also verify that
793 // UnblockBlockingTasks() decreases max best effort tasks after an increase.
TEST_P(ThreadGroupImplBlockingTest,ThreadBlockedUnblockedBestEffort)794 TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblockedBestEffort) {
795   CreateAndStartThreadGroup();
796 
797   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
798   ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
799 
800   SaturateWithBlockingTasks(GetParam(), TaskPriority::BEST_EFFORT);
801 
802   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
803   // should not block forever.
804   SaturateWithBusyTasks(TaskPriority::BEST_EFFORT);
805 
806   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
807 
808   UnblockBusyTasks();
809   UnblockBlockingTasks();
810   task_tracker_.FlushForTesting();
811   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
812   EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
813 }
814 
815 // Verify that flooding the thread group with more BEST_EFFORT tasks than
816 // kMaxBestEffortTasks doesn't prevent USER_VISIBLE tasks from running.
TEST_P(ThreadGroupImplBlockingTest,TooManyBestEffortTasks)817 TEST_P(ThreadGroupImplBlockingTest, TooManyBestEffortTasks) {
818   constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
819 
820   CreateAndStartThreadGroup(TimeDelta::Max(), kMaxTasks, kMaxBestEffortTasks);
821 
822   TestWaitableEvent threads_continue;
823   {
824     TestWaitableEvent entered_blocking_scope;
825     RepeatingClosure entered_blocking_scope_barrier = BarrierClosure(
826         kMaxBestEffortTasks + 1, BindOnce(&TestWaitableEvent::Signal,
827                                           Unretained(&entered_blocking_scope)));
828     TestWaitableEvent exit_blocking_scope;
829 
830     TestWaitableEvent threads_running;
831     RepeatingClosure threads_running_barrier = BarrierClosure(
832         kMaxBestEffortTasks + 1,
833         BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
834 
835     const auto best_effort_task_runner =
836         test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
837                                      &mock_pooled_task_runner_delegate_);
838     for (size_t i = 0; i < kMaxBestEffortTasks + 1; ++i) {
839       best_effort_task_runner->PostTask(
840           FROM_HERE, BindLambdaForTesting([&]() {
841             {
842               NestedScopedBlockingCall scoped_blocking_call(GetParam());
843               entered_blocking_scope_barrier.Run();
844               exit_blocking_scope.Wait();
845             }
846             threads_running_barrier.Run();
847             threads_continue.Wait();
848           }));
849     }
850     entered_blocking_scope.Wait();
851     exit_blocking_scope.Signal();
852     threads_running.Wait();
853   }
854 
855   // At this point, kMaxBestEffortTasks + 1 threads are running (plus
856   // potentially the idle thread), but max_task and max_best_effort_task are
857   // back to normal.
858   EXPECT_GE(thread_group_->NumberOfWorkersForTesting(),
859             kMaxBestEffortTasks + 1);
860   EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
861             kMaxBestEffortTasks + 2);
862   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
863 
864   TestWaitableEvent threads_running;
865   task_runner_->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
866                            threads_running.Signal();
867                            threads_continue.Wait();
868                          }));
869 
870   // This should not block forever.
871   threads_running.Wait();
872 
873   EXPECT_GE(thread_group_->NumberOfWorkersForTesting(),
874             kMaxBestEffortTasks + 2);
875   EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
876             kMaxBestEffortTasks + 3);
877   threads_continue.Signal();
878 
879   task_tracker_.FlushForTesting();
880 }
881 
882 // Verify that tasks posted in a saturated thread group before a
883 // ScopedBlockingCall will execute after ScopedBlockingCall is instantiated.
TEST_P(ThreadGroupImplBlockingTest,PostBeforeBlocking)884 TEST_P(ThreadGroupImplBlockingTest, PostBeforeBlocking) {
885   CreateAndStartThreadGroup();
886 
887   TestWaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC);
888   TestWaitableEvent thread_can_block;
889   TestWaitableEvent threads_continue;
890 
891   for (size_t i = 0; i < kMaxTasks; ++i) {
892     task_runner_->PostTask(
893         FROM_HERE,
894         BindOnce(
895             [](const NestedBlockingType& nested_blocking_type,
896                TestWaitableEvent* thread_running,
897                TestWaitableEvent* thread_can_block,
898                TestWaitableEvent* threads_continue) {
899               thread_running->Signal();
900               thread_can_block->Wait();
901 
902               NestedScopedBlockingCall nested_scoped_blocking_call(
903                   nested_blocking_type);
904               threads_continue->Wait();
905             },
906             GetParam(), Unretained(&thread_running),
907             Unretained(&thread_can_block), Unretained(&threads_continue)));
908     thread_running.Wait();
909   }
910 
911   // All workers should be occupied and the thread group should be saturated.
912   // Workers have not entered ScopedBlockingCall yet.
913   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
914   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
915 
916   TestWaitableEvent extra_threads_running;
917   TestWaitableEvent extra_threads_continue;
918   RepeatingClosure extra_threads_running_barrier = BarrierClosure(
919       kMaxTasks,
920       BindOnce(&TestWaitableEvent::Signal, Unretained(&extra_threads_running)));
921   for (size_t i = 0; i < kMaxTasks; ++i) {
922     task_runner_->PostTask(
923         FROM_HERE, BindOnce(
924                        [](RepeatingClosure* extra_threads_running_barrier,
925                           TestWaitableEvent* extra_threads_continue) {
926                          extra_threads_running_barrier->Run();
927                          extra_threads_continue->Wait();
928                        },
929                        Unretained(&extra_threads_running_barrier),
930                        Unretained(&extra_threads_continue)));
931   }
932 
933   // Allow tasks to enter ScopedBlockingCall. Workers should be created for the
934   // tasks we just posted.
935   thread_can_block.Signal();
936 
937   // Should not block forever.
938   extra_threads_running.Wait();
939   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
940   extra_threads_continue.Signal();
941 
942   threads_continue.Signal();
943   task_tracker_.FlushForTesting();
944 }
945 
946 // Verify that workers become idle when the thread group is over-capacity and
947 // that those workers do no work.
TEST_P(ThreadGroupImplBlockingTest,WorkersIdleWhenOverCapacity)948 TEST_P(ThreadGroupImplBlockingTest, WorkersIdleWhenOverCapacity) {
949   CreateAndStartThreadGroup();
950 
951   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
952 
953   SaturateWithBlockingTasks(GetParam());
954 
955   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks.
956   SaturateWithBusyTasks();
957 
958   ASSERT_EQ(thread_group_->NumberOfIdleWorkersForTesting(), 0U);
959   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
960 
961   AtomicFlag is_exiting;
962   // These tasks should not get executed until after other tasks become
963   // unblocked.
964   for (size_t i = 0; i < kMaxTasks; ++i) {
965     task_runner_->PostTask(FROM_HERE, BindOnce(
966                                           [](AtomicFlag* is_exiting) {
967                                             EXPECT_TRUE(is_exiting->IsSet());
968                                           },
969                                           Unretained(&is_exiting)));
970   }
971 
972   // The original |kMaxTasks| will finish their tasks after being unblocked.
973   // There will be work in the work queue, but the thread group should now be
974   // over-capacity and workers will become idle.
975   UnblockBlockingTasks();
976   thread_group_->WaitForWorkersIdleForTesting(kMaxTasks);
977   EXPECT_EQ(thread_group_->NumberOfIdleWorkersForTesting(), kMaxTasks);
978 
979   // Posting more tasks should not cause workers idle from the thread group
980   // being over capacity to begin doing work.
981   for (size_t i = 0; i < kMaxTasks; ++i) {
982     task_runner_->PostTask(FROM_HERE, BindOnce(
983                                           [](AtomicFlag* is_exiting) {
984                                             EXPECT_TRUE(is_exiting->IsSet());
985                                           },
986                                           Unretained(&is_exiting)));
987   }
988 
989   // Give time for those idle workers to possibly do work (which should not
990   // happen).
991   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
992 
993   is_exiting.Set();
994   // Unblocks the new workers.
995   UnblockBusyTasks();
996   task_tracker_.FlushForTesting();
997 }
998 
999 // Verify that an increase of max tasks with SaturateWithBlockingTasks()
1000 // increases the number of tasks that can run before ShouldYield returns true.
TEST_P(ThreadGroupImplBlockingTest,ThreadBlockedUnblockedShouldYield)1001 TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblockedShouldYield) {
1002   CreateAndStartThreadGroup();
1003 
1004   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1005 
1006   EXPECT_FALSE(
1007       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1008   SaturateWithBlockingTasks(GetParam());
1009   EXPECT_FALSE(
1010       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1011 
1012   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
1013   // should not block forever.
1014   SaturateWithBusyTasks();
1015 
1016   // All tasks can run, hence ShouldYield returns false.
1017   EXPECT_FALSE(
1018       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1019 
1020   // Post a USER_VISIBLE task that can't run since workers are saturated. This
1021   // should cause BEST_EFFORT tasks to yield.
1022   test::CreatePooledTaskRunner({TaskPriority::USER_VISIBLE},
1023                                &mock_pooled_task_runner_delegate_)
1024       ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1025                    EXPECT_FALSE(thread_group_->ShouldYield(
1026                        {TaskPriority::BEST_EFFORT, TimeTicks()}));
1027                  }));
1028   EXPECT_TRUE(
1029       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
1030 
1031   // Post a USER_BLOCKING task that can't run since workers are saturated. This
1032   // should cause USER_VISIBLE tasks to yield.
1033   test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
1034                                &mock_pooled_task_runner_delegate_)
1035       ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1036                    EXPECT_FALSE(thread_group_->ShouldYield(
1037                        {TaskPriority::USER_VISIBLE, TimeTicks()}));
1038                  }));
1039   EXPECT_TRUE(
1040       thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
1041 
1042   UnblockBusyTasks();
1043   UnblockBlockingTasks();
1044   task_tracker_.FlushForTesting();
1045   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1046 }
1047 
1048 INSTANTIATE_TEST_SUITE_P(
1049     All,
1050     ThreadGroupImplBlockingTest,
1051     ::testing::Values(NestedBlockingType(BlockingType::MAY_BLOCK,
1052                                          OptionalBlockingType::NO_BLOCK,
1053                                          BlockingType::MAY_BLOCK),
1054                       NestedBlockingType(BlockingType::WILL_BLOCK,
1055                                          OptionalBlockingType::NO_BLOCK,
1056                                          BlockingType::WILL_BLOCK),
1057                       NestedBlockingType(BlockingType::MAY_BLOCK,
1058                                          OptionalBlockingType::WILL_BLOCK,
1059                                          BlockingType::WILL_BLOCK),
1060                       NestedBlockingType(BlockingType::WILL_BLOCK,
1061                                          OptionalBlockingType::MAY_BLOCK,
1062                                          BlockingType::WILL_BLOCK)),
1063     ThreadGroupImplBlockingTest::ParamInfoToString);
1064 
1065 // Verify that if a thread enters the scope of a MAY_BLOCK ScopedBlockingCall,
1066 // but exits the scope before the MayBlock threshold is reached, that the max
1067 // tasks does not increase.
TEST_F(ThreadGroupImplBlockingTest,ThreadBlockUnblockPremature)1068 TEST_F(ThreadGroupImplBlockingTest, ThreadBlockUnblockPremature) {
1069   // Create a thread group with an infinite MayBlock threshold so that a
1070   // MAY_BLOCK ScopedBlockingCall never increases the max tasks.
1071   CreateAndStartThreadGroup(TimeDelta::Max(),   // |suggested_reclaim_time|
1072                             kMaxTasks,          // |max_tasks|
1073                             absl::nullopt,      // |max_best_effort_tasks|
1074                             nullptr,            // |worker_observer|
1075                             TimeDelta::Max());  // |may_block_threshold|
1076 
1077   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1078 
1079   SaturateWithBlockingTasks(NestedBlockingType(BlockingType::MAY_BLOCK,
1080                                                OptionalBlockingType::NO_BLOCK,
1081                                                BlockingType::MAY_BLOCK));
1082   PlatformThread::Sleep(
1083       2 * thread_group_->blocked_workers_poll_period_for_testing());
1084   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
1085   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1086 
1087   UnblockBlockingTasks();
1088   task_tracker_.FlushForTesting();
1089   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1090 }
1091 
1092 // Verify that if a BEST_EFFORT task enters the scope of a WILL_BLOCK
1093 // ScopedBlockingCall, but exits the scope before the MayBlock threshold is
1094 // reached, that the max best effort tasks does not increase.
TEST_F(ThreadGroupImplBlockingTest,ThreadBlockUnblockPrematureBestEffort)1095 TEST_F(ThreadGroupImplBlockingTest, ThreadBlockUnblockPrematureBestEffort) {
1096   // Create a thread group with an infinite MayBlock threshold so that a
1097   // MAY_BLOCK ScopedBlockingCall never increases the max tasks.
1098   CreateAndStartThreadGroup(TimeDelta::Max(),   // |suggested_reclaim_time|
1099                             kMaxTasks,          // |max_tasks|
1100                             kMaxTasks,          // |max_best_effort_tasks|
1101                             nullptr,            // |worker_observer|
1102                             TimeDelta::Max());  // |may_block_threshold|
1103 
1104   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1105   ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1106 
1107   SaturateWithBlockingTasks(NestedBlockingType(BlockingType::WILL_BLOCK,
1108                                                OptionalBlockingType::NO_BLOCK,
1109                                                BlockingType::WILL_BLOCK),
1110                             TaskPriority::BEST_EFFORT);
1111   PlatformThread::Sleep(
1112       2 * thread_group_->blocked_workers_poll_period_for_testing());
1113   EXPECT_GE(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
1114   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), 2 * kMaxTasks);
1115   EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1116 
1117   UnblockBlockingTasks();
1118   task_tracker_.FlushForTesting();
1119   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1120   EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
1121 }
1122 
1123 // Verify that if max tasks is incremented because of a MAY_BLOCK
1124 // ScopedBlockingCall, it isn't incremented again when there is a nested
1125 // WILL_BLOCK ScopedBlockingCall.
TEST_F(ThreadGroupImplBlockingTest,MayBlockIncreaseCapacityNestedWillBlock)1126 TEST_F(ThreadGroupImplBlockingTest, MayBlockIncreaseCapacityNestedWillBlock) {
1127   CreateAndStartThreadGroup();
1128 
1129   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1130   auto task_runner =
1131       test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
1132                                    &mock_pooled_task_runner_delegate_);
1133   TestWaitableEvent can_return;
1134 
1135   // Saturate the thread group so that a MAY_BLOCK ScopedBlockingCall would
1136   // increment the max tasks.
1137   for (size_t i = 0; i < kMaxTasks - 1; ++i) {
1138     task_runner->PostTask(
1139         FROM_HERE, BindOnce(&TestWaitableEvent::Wait, Unretained(&can_return)));
1140   }
1141 
1142   TestWaitableEvent can_instantiate_will_block;
1143   TestWaitableEvent did_instantiate_will_block;
1144 
1145   // Post a task that instantiates a MAY_BLOCK ScopedBlockingCall.
1146   task_runner->PostTask(
1147       FROM_HERE,
1148       BindOnce(
1149           [](TestWaitableEvent* can_instantiate_will_block,
1150              TestWaitableEvent* did_instantiate_will_block,
1151              TestWaitableEvent* can_return) {
1152             ScopedBlockingCall may_block(FROM_HERE, BlockingType::MAY_BLOCK);
1153             can_instantiate_will_block->Wait();
1154             ScopedBlockingCall will_block(FROM_HERE, BlockingType::WILL_BLOCK);
1155             did_instantiate_will_block->Signal();
1156             can_return->Wait();
1157           },
1158           Unretained(&can_instantiate_will_block),
1159           Unretained(&did_instantiate_will_block), Unretained(&can_return)));
1160 
1161   // After a short delay, max tasks should be incremented.
1162   ExpectMaxTasksIncreasesTo(kMaxTasks + 1);
1163 
1164   // Wait until the task instantiates a WILL_BLOCK ScopedBlockingCall.
1165   can_instantiate_will_block.Signal();
1166   did_instantiate_will_block.Wait();
1167 
1168   // Max tasks shouldn't be incremented again.
1169   EXPECT_EQ(kMaxTasks + 1, thread_group_->GetMaxTasksForTesting());
1170 
1171   // Tear down.
1172   can_return.Signal();
1173   task_tracker_.FlushForTesting();
1174   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1175 }
1176 
1177 // Verify that OnShutdownStarted() causes max tasks to increase and creates a
1178 // worker if needed. Also verify that UnblockBusyTasks() decreases max tasks
1179 // after an increase.
TEST_F(ThreadGroupImplBlockingTest,ThreadBusyShutdown)1180 TEST_F(ThreadGroupImplBlockingTest, ThreadBusyShutdown) {
1181   CreateAndStartThreadGroup();
1182   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1183 
1184   SaturateWithBusyTasks(TaskPriority::BEST_EFFORT,
1185                         TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN);
1186   thread_group_->OnShutdownStarted();
1187 
1188   // Forces |kMaxTasks| extra workers to be instantiated by posting tasks. This
1189   // should not block forever.
1190   SaturateWithBusyTasks(TaskPriority::BEST_EFFORT,
1191                         TaskShutdownBehavior::BLOCK_SHUTDOWN);
1192 
1193   EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
1194 
1195   UnblockBusyTasks();
1196   task_tracker_.FlushForTesting();
1197   thread_group_->JoinForTesting();
1198   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
1199   mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
1200   thread_group_.reset();
1201 }
1202 
1203 enum class ReclaimType { DELAYED_RECLAIM, NO_RECLAIM };
1204 
1205 class ThreadGroupImplOverCapacityTest
1206     : public ThreadGroupImplImplTestBase,
1207       public testing::TestWithParam<ReclaimType> {
1208  public:
1209   ThreadGroupImplOverCapacityTest() = default;
1210   ThreadGroupImplOverCapacityTest(const ThreadGroupImplOverCapacityTest&) =
1211       delete;
1212   ThreadGroupImplOverCapacityTest& operator=(
1213       const ThreadGroupImplOverCapacityTest&) = delete;
1214 
SetUp()1215   void SetUp() override {
1216     if (GetParam() == ReclaimType::NO_RECLAIM) {
1217       feature_list.InitAndEnableFeature(kNoWorkerThreadReclaim);
1218     }
1219     CreateThreadGroup();
1220     task_runner_ =
1221         test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
1222                                      &mock_pooled_task_runner_delegate_);
1223   }
1224 
TearDown()1225   void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
1226 
1227  protected:
1228   base::test::ScopedFeatureList feature_list;
1229   scoped_refptr<TaskRunner> task_runner_;
1230   static constexpr size_t kLocalMaxTasks = 3;
1231 
CreateThreadGroup()1232   void CreateThreadGroup() {
1233     ASSERT_FALSE(thread_group_);
1234     service_thread_.Start();
1235     delayed_task_manager_.Start(service_thread_.task_runner());
1236     thread_group_ = std::make_unique<ThreadGroupImpl>(
1237         "OverCapacityTestThreadGroup", "A", ThreadType::kDefault,
1238         task_tracker_.GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
1239     ASSERT_TRUE(thread_group_);
1240 
1241     mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
1242   }
1243 };
1244 
1245 // Verify that workers that become idle due to the thread group being over
1246 // capacity will eventually cleanup.
TEST_P(ThreadGroupImplOverCapacityTest,VerifyCleanup)1247 TEST_P(ThreadGroupImplOverCapacityTest, VerifyCleanup) {
1248   StartThreadGroup(kReclaimTimeForCleanupTests, kLocalMaxTasks);
1249   TestWaitableEvent threads_running;
1250   TestWaitableEvent threads_continue;
1251   RepeatingClosure threads_running_barrier = BarrierClosure(
1252       kLocalMaxTasks,
1253       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
1254 
1255   TestWaitableEvent blocked_call_continue;
1256   RepeatingClosure closure = BindRepeating(
1257       [](RepeatingClosure* threads_running_barrier,
1258          TestWaitableEvent* threads_continue,
1259          TestWaitableEvent* blocked_call_continue) {
1260         threads_running_barrier->Run();
1261         {
1262           ScopedBlockingCall scoped_blocking_call(FROM_HERE,
1263                                                   BlockingType::WILL_BLOCK);
1264           blocked_call_continue->Wait();
1265         }
1266         threads_continue->Wait();
1267       },
1268       Unretained(&threads_running_barrier), Unretained(&threads_continue),
1269       Unretained(&blocked_call_continue));
1270 
1271   for (size_t i = 0; i < kLocalMaxTasks; ++i)
1272     task_runner_->PostTask(FROM_HERE, closure);
1273 
1274   threads_running.Wait();
1275 
1276   TestWaitableEvent extra_threads_running;
1277   TestWaitableEvent extra_threads_continue;
1278 
1279   RepeatingClosure extra_threads_running_barrier = BarrierClosure(
1280       kLocalMaxTasks,
1281       BindOnce(&TestWaitableEvent::Signal, Unretained(&extra_threads_running)));
1282   // These tasks should run on the new threads from increasing max tasks.
1283   for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1284     task_runner_->PostTask(
1285         FROM_HERE, BindOnce(
1286                        [](RepeatingClosure* extra_threads_running_barrier,
1287                           TestWaitableEvent* extra_threads_continue) {
1288                          extra_threads_running_barrier->Run();
1289                          extra_threads_continue->Wait();
1290                        },
1291                        Unretained(&extra_threads_running_barrier),
1292                        Unretained(&extra_threads_continue)));
1293   }
1294   extra_threads_running.Wait();
1295 
1296   ASSERT_EQ(kLocalMaxTasks * 2, thread_group_->NumberOfWorkersForTesting());
1297   EXPECT_EQ(kLocalMaxTasks * 2, thread_group_->GetMaxTasksForTesting());
1298   blocked_call_continue.Signal();
1299   extra_threads_continue.Signal();
1300 
1301   // Periodically post tasks to ensure that posting tasks does not prevent
1302   // workers that are idle due to the thread group being over capacity from
1303   // cleaning up.
1304   for (int i = 0; i < 16; ++i) {
1305     task_runner_->PostDelayedTask(FROM_HERE, DoNothing(),
1306                                   kReclaimTimeForCleanupTests * i * 0.5);
1307   }
1308 
1309   if (GetParam() == ReclaimType::DELAYED_RECLAIM) {
1310     // Note: one worker above capacity will not get cleaned up since it's on the
1311     // front of the idle set.
1312     thread_group_->WaitForWorkersCleanedUpForTesting(kLocalMaxTasks - 1);
1313     EXPECT_EQ(kLocalMaxTasks + 1, thread_group_->NumberOfWorkersForTesting());
1314     threads_continue.Signal();
1315   } else {
1316     // When workers are't automatically reclaimed after a delay, blocking tasks
1317     // need to return for extra workers to be cleaned up.
1318     threads_continue.Signal();
1319     thread_group_->WaitForWorkersCleanedUpForTesting(kLocalMaxTasks);
1320     EXPECT_EQ(kLocalMaxTasks, thread_group_->NumberOfWorkersForTesting());
1321   }
1322 
1323   threads_continue.Signal();
1324   task_tracker_.FlushForTesting();
1325 }
1326 
1327 INSTANTIATE_TEST_SUITE_P(ReclaimType,
1328                          ThreadGroupImplOverCapacityTest,
1329                          ::testing::Values(ReclaimType::DELAYED_RECLAIM,
1330                                            ReclaimType::NO_RECLAIM));
1331 
1332 // Verify that the maximum number of workers is 256 and that hitting the max
1333 // leaves the thread group in a valid state with regards to max tasks.
TEST_F(ThreadGroupImplBlockingTest,MaximumWorkersTest)1334 TEST_F(ThreadGroupImplBlockingTest, MaximumWorkersTest) {
1335   CreateAndStartThreadGroup();
1336 
1337   constexpr size_t kMaxNumberOfWorkers = 256;
1338   constexpr size_t kNumExtraTasks = 10;
1339 
1340   TestWaitableEvent early_blocking_threads_running;
1341   RepeatingClosure early_threads_barrier_closure =
1342       BarrierClosure(kMaxNumberOfWorkers,
1343                      BindOnce(&TestWaitableEvent::Signal,
1344                               Unretained(&early_blocking_threads_running)));
1345 
1346   TestWaitableEvent early_threads_finished;
1347   RepeatingClosure early_threads_finished_barrier = BarrierClosure(
1348       kMaxNumberOfWorkers, BindOnce(&TestWaitableEvent::Signal,
1349                                     Unretained(&early_threads_finished)));
1350 
1351   TestWaitableEvent early_release_threads_continue;
1352 
1353   // Post ScopedBlockingCall tasks to hit the worker cap.
1354   for (size_t i = 0; i < kMaxNumberOfWorkers; ++i) {
1355     task_runner_->PostTask(
1356         FROM_HERE, BindOnce(
1357                        [](RepeatingClosure* early_threads_barrier_closure,
1358                           TestWaitableEvent* early_release_threads_continue,
1359                           RepeatingClosure* early_threads_finished) {
1360                          {
1361                            ScopedBlockingCall scoped_blocking_call(
1362                                FROM_HERE, BlockingType::WILL_BLOCK);
1363                            early_threads_barrier_closure->Run();
1364                            early_release_threads_continue->Wait();
1365                          }
1366                          early_threads_finished->Run();
1367                        },
1368                        Unretained(&early_threads_barrier_closure),
1369                        Unretained(&early_release_threads_continue),
1370                        Unretained(&early_threads_finished_barrier)));
1371   }
1372 
1373   early_blocking_threads_running.Wait();
1374   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(),
1375             kMaxTasks + kMaxNumberOfWorkers);
1376 
1377   TestWaitableEvent late_release_thread_contine;
1378   TestWaitableEvent late_blocking_threads_running;
1379 
1380   RepeatingClosure late_threads_barrier_closure = BarrierClosure(
1381       kNumExtraTasks, BindOnce(&TestWaitableEvent::Signal,
1382                                Unretained(&late_blocking_threads_running)));
1383 
1384   // Posts additional tasks. Note: we should already have |kMaxNumberOfWorkers|
1385   // tasks running. These tasks should not be able to get executed yet as the
1386   // thread group is already at its max worker cap.
1387   for (size_t i = 0; i < kNumExtraTasks; ++i) {
1388     task_runner_->PostTask(
1389         FROM_HERE, BindOnce(
1390                        [](RepeatingClosure* late_threads_barrier_closure,
1391                           TestWaitableEvent* late_release_thread_contine) {
1392                          ScopedBlockingCall scoped_blocking_call(
1393                              FROM_HERE, BlockingType::WILL_BLOCK);
1394                          late_threads_barrier_closure->Run();
1395                          late_release_thread_contine->Wait();
1396                        },
1397                        Unretained(&late_threads_barrier_closure),
1398                        Unretained(&late_release_thread_contine)));
1399   }
1400 
1401   // Give time to see if we exceed the max number of workers.
1402   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
1403   EXPECT_LE(thread_group_->NumberOfWorkersForTesting(), kMaxNumberOfWorkers);
1404 
1405   early_release_threads_continue.Signal();
1406   early_threads_finished.Wait();
1407   late_blocking_threads_running.Wait();
1408 
1409   TestWaitableEvent final_tasks_running;
1410   TestWaitableEvent final_tasks_continue;
1411   RepeatingClosure final_tasks_running_barrier = BarrierClosure(
1412       kMaxTasks,
1413       BindOnce(&TestWaitableEvent::Signal, Unretained(&final_tasks_running)));
1414 
1415   // Verify that we are still able to saturate the thread group.
1416   for (size_t i = 0; i < kMaxTasks; ++i) {
1417     task_runner_->PostTask(FROM_HERE,
1418                            BindOnce(
1419                                [](RepeatingClosure* closure,
1420                                   TestWaitableEvent* final_tasks_continue) {
1421                                  closure->Run();
1422                                  final_tasks_continue->Wait();
1423                                },
1424                                Unretained(&final_tasks_running_barrier),
1425                                Unretained(&final_tasks_continue)));
1426   }
1427   final_tasks_running.Wait();
1428   EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks + kNumExtraTasks);
1429   late_release_thread_contine.Signal();
1430   final_tasks_continue.Signal();
1431   task_tracker_.FlushForTesting();
1432 }
1433 
1434 // Verify that the maximum number of best-effort tasks that can run concurrently
1435 // is honored.
TEST_F(ThreadGroupImplImplStartInBodyTest,MaxBestEffortTasks)1436 TEST_F(ThreadGroupImplImplStartInBodyTest, MaxBestEffortTasks) {
1437   constexpr int kMaxBestEffortTasks = kMaxTasks / 2;
1438   StartThreadGroup(TimeDelta::Max(),      // |suggested_reclaim_time|
1439                    kMaxTasks,             // |max_tasks|
1440                    kMaxBestEffortTasks);  // |max_best_effort_tasks|
1441   const scoped_refptr<TaskRunner> foreground_runner =
1442       test::CreatePooledTaskRunner({MayBlock()},
1443                                    &mock_pooled_task_runner_delegate_);
1444   const scoped_refptr<TaskRunner> background_runner =
1445       test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1446                                    &mock_pooled_task_runner_delegate_);
1447 
1448   // It should be possible to have |kMaxBestEffortTasks|
1449   // TaskPriority::BEST_EFFORT tasks running concurrently.
1450   TestWaitableEvent best_effort_tasks_running;
1451   TestWaitableEvent unblock_best_effort_tasks;
1452   RepeatingClosure best_effort_tasks_running_barrier = BarrierClosure(
1453       kMaxBestEffortTasks, BindOnce(&TestWaitableEvent::Signal,
1454                                     Unretained(&best_effort_tasks_running)));
1455 
1456   for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1457     background_runner->PostTask(
1458         FROM_HERE, base::BindLambdaForTesting([&]() {
1459           best_effort_tasks_running_barrier.Run();
1460           unblock_best_effort_tasks.Wait();
1461         }));
1462   }
1463   best_effort_tasks_running.Wait();
1464 
1465   // No more TaskPriority::BEST_EFFORT task should run.
1466   AtomicFlag extra_best_effort_task_can_run;
1467   TestWaitableEvent extra_best_effort_task_running;
1468   background_runner->PostTask(
1469       FROM_HERE, base::BindLambdaForTesting([&]() {
1470         EXPECT_TRUE(extra_best_effort_task_can_run.IsSet());
1471         extra_best_effort_task_running.Signal();
1472       }));
1473 
1474   // An extra foreground task should be able to run.
1475   TestWaitableEvent foreground_task_running;
1476   foreground_runner->PostTask(
1477       FROM_HERE, base::BindOnce(&TestWaitableEvent::Signal,
1478                                 Unretained(&foreground_task_running)));
1479   foreground_task_running.Wait();
1480 
1481   // Completion of the TaskPriority::BEST_EFFORT tasks should allow the extra
1482   // TaskPriority::BEST_EFFORT task to run.
1483   extra_best_effort_task_can_run.Set();
1484   unblock_best_effort_tasks.Signal();
1485   extra_best_effort_task_running.Wait();
1486 
1487   // Wait for all tasks to complete before exiting to avoid invalid accesses.
1488   task_tracker_.FlushForTesting();
1489 }
1490 
1491 // Verify that flooding the thread group with BEST_EFFORT tasks doesn't cause
1492 // the creation of more than |max_best_effort_tasks| + 1 workers.
TEST_F(ThreadGroupImplImplStartInBodyTest,FloodBestEffortTasksDoesNotCreateTooManyWorkers)1493 TEST_F(ThreadGroupImplImplStartInBodyTest,
1494        FloodBestEffortTasksDoesNotCreateTooManyWorkers) {
1495   constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
1496   StartThreadGroup(TimeDelta::Max(),      // |suggested_reclaim_time|
1497                    kMaxTasks,             // |max_tasks|
1498                    kMaxBestEffortTasks);  // |max_best_effort_tasks|
1499 
1500   const scoped_refptr<TaskRunner> runner =
1501       test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1502                                    &mock_pooled_task_runner_delegate_);
1503 
1504   for (size_t i = 0; i < kLargeNumber; ++i) {
1505     runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1506                        EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
1507                                  kMaxBestEffortTasks + 1);
1508                      }));
1509   }
1510 
1511   // Wait for all tasks to complete before exiting to avoid invalid accesses.
1512   task_tracker_.FlushForTesting();
1513 }
1514 
1515 // Previously, a WILL_BLOCK ScopedBlockingCall unconditionally woke up a worker
1516 // if the priority queue was non-empty. Sometimes, that caused multiple workers
1517 // to be woken up for the same sequence. This test verifies that it is no longer
1518 // the case:
1519 // 1. Post and run task A.
1520 // 2. Post task B from task A.
1521 // 3. Task A enters a WILL_BLOCK ScopedBlockingCall. Once the idle thread is
1522 //    created, this should no-op because there are already enough workers
1523 //    (previously, a worker would be woken up because the priority queue isn't
1524 //    empty).
1525 // 5. Wait for all tasks to complete.
TEST_F(ThreadGroupImplImplStartInBodyTest,RepeatedWillBlockDoesNotCreateTooManyWorkers)1526 TEST_F(ThreadGroupImplImplStartInBodyTest,
1527        RepeatedWillBlockDoesNotCreateTooManyWorkers) {
1528   constexpr size_t kNumWorkers = 2U;
1529   StartThreadGroup(TimeDelta::Max(),  // |suggested_reclaim_time|
1530                    kNumWorkers,       // |max_tasks|
1531                    absl::nullopt);    // |max_best_effort_tasks|
1532   const scoped_refptr<TaskRunner> runner = test::CreatePooledTaskRunner(
1533       {MayBlock()}, &mock_pooled_task_runner_delegate_);
1534 
1535   for (size_t i = 0; i < kLargeNumber; ++i) {
1536     runner->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
1537                        runner->PostTask(
1538                            FROM_HERE, BindLambdaForTesting([&]() {
1539                              EXPECT_LE(
1540                                  thread_group_->NumberOfWorkersForTesting(),
1541                                  kNumWorkers + 1);
1542                            }));
1543                        // Number of workers should not increase when there is
1544                        // enough capacity to accommodate queued and running
1545                        // sequences.
1546                        ScopedBlockingCall scoped_blocking_call(
1547                            FROM_HERE, BlockingType::WILL_BLOCK);
1548                        EXPECT_EQ(kNumWorkers + 1,
1549                                  thread_group_->NumberOfWorkersForTesting());
1550                      }));
1551     // Wait for all tasks to complete.
1552     task_tracker_.FlushForTesting();
1553   }
1554 }
1555 
1556 namespace {
1557 
1558 class ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest
1559     : public ThreadGroupImplImplTestBase,
1560       public testing::TestWithParam<BlockingType> {
1561  public:
1562   static constexpr int kMaxBestEffortTasks = kMaxTasks / 2;
1563 
1564   ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest() = default;
1565   ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest(
1566       const ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest&) = delete;
1567   ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest& operator=(
1568       const ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest&) = delete;
1569 
SetUp()1570   void SetUp() override {
1571     CreateThreadGroup();
1572     thread_group_->Start(kMaxTasks, kMaxBestEffortTasks, base::TimeDelta::Max(),
1573                          service_thread_.task_runner(), nullptr,
1574                          ThreadGroup::WorkerEnvironment::NONE);
1575   }
1576 
TearDown()1577   void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
1578 
1579  private:
1580 };
1581 
1582 }  // namespace
1583 
TEST_P(ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,BlockingCallAndMaxBestEffortTasksTest)1584 TEST_P(ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
1585        BlockingCallAndMaxBestEffortTasksTest) {
1586   const scoped_refptr<TaskRunner> background_runner =
1587       test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
1588                                    &mock_pooled_task_runner_delegate_);
1589 
1590   // Post |kMaxBestEffortTasks| TaskPriority::BEST_EFFORT tasks that block in a
1591   // ScopedBlockingCall.
1592   TestWaitableEvent blocking_best_effort_tasks_running;
1593   TestWaitableEvent unblock_blocking_best_effort_tasks;
1594   RepeatingClosure blocking_best_effort_tasks_running_barrier =
1595       BarrierClosure(kMaxBestEffortTasks,
1596                      BindOnce(&TestWaitableEvent::Signal,
1597                               Unretained(&blocking_best_effort_tasks_running)));
1598   for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1599     background_runner->PostTask(
1600         FROM_HERE, base::BindLambdaForTesting([&]() {
1601           blocking_best_effort_tasks_running_barrier.Run();
1602           ScopedBlockingCall scoped_blocking_call(FROM_HERE, GetParam());
1603           unblock_blocking_best_effort_tasks.Wait();
1604         }));
1605   }
1606   blocking_best_effort_tasks_running.Wait();
1607 
1608   // Post an extra |kMaxBestEffortTasks| TaskPriority::BEST_EFFORT tasks. They
1609   // should be able to run, because the existing TaskPriority::BEST_EFFORT tasks
1610   // are blocked within a ScopedBlockingCall.
1611   //
1612   // Note: We block the tasks until they have all started running to make sure
1613   // that it is possible to run an extra |kMaxBestEffortTasks| concurrently.
1614   TestWaitableEvent best_effort_tasks_running;
1615   TestWaitableEvent unblock_best_effort_tasks;
1616   RepeatingClosure best_effort_tasks_running_barrier = BarrierClosure(
1617       kMaxBestEffortTasks, BindOnce(&TestWaitableEvent::Signal,
1618                                     Unretained(&best_effort_tasks_running)));
1619   for (int i = 0; i < kMaxBestEffortTasks; ++i) {
1620     background_runner->PostTask(
1621         FROM_HERE, base::BindLambdaForTesting([&]() {
1622           best_effort_tasks_running_barrier.Run();
1623           unblock_best_effort_tasks.Wait();
1624         }));
1625   }
1626   best_effort_tasks_running.Wait();
1627 
1628   // Unblock all tasks and tear down.
1629   unblock_blocking_best_effort_tasks.Signal();
1630   unblock_best_effort_tasks.Signal();
1631   task_tracker_.FlushForTesting();
1632 }
1633 
1634 INSTANTIATE_TEST_SUITE_P(MayBlock,
1635                          ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
1636                          ::testing::Values(BlockingType::MAY_BLOCK));
1637 INSTANTIATE_TEST_SUITE_P(WillBlock,
1638                          ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
1639                          ::testing::Values(BlockingType::WILL_BLOCK));
1640 
1641 // Verify that worker detachment doesn't race with worker cleanup, regression
1642 // test for https://crbug.com/810464.
TEST_F(ThreadGroupImplImplStartInBodyTest,RacyCleanup)1643 TEST_F(ThreadGroupImplImplStartInBodyTest, RacyCleanup) {
1644   constexpr size_t kLocalMaxTasks = 256;
1645   constexpr TimeDelta kReclaimTimeForRacyCleanupTest = Milliseconds(10);
1646 
1647   thread_group_->Start(kLocalMaxTasks, kLocalMaxTasks,
1648                        kReclaimTimeForRacyCleanupTest,
1649                        service_thread_.task_runner(), nullptr,
1650                        ThreadGroup::WorkerEnvironment::NONE);
1651 
1652   scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
1653       {WithBaseSyncPrimitives()}, &mock_pooled_task_runner_delegate_);
1654 
1655   TestWaitableEvent threads_running;
1656   TestWaitableEvent unblock_threads;
1657   RepeatingClosure threads_running_barrier = BarrierClosure(
1658       kLocalMaxTasks,
1659       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
1660 
1661   for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1662     task_runner->PostTask(
1663         FROM_HERE,
1664         BindOnce(
1665             [](OnceClosure on_running, TestWaitableEvent* unblock_threads) {
1666               std::move(on_running).Run();
1667               unblock_threads->Wait();
1668             },
1669             threads_running_barrier, Unretained(&unblock_threads)));
1670   }
1671 
1672   // Wait for all workers to be ready and release them all at once.
1673   threads_running.Wait();
1674   unblock_threads.Signal();
1675 
1676   // Sleep to wakeup precisely when all workers are going to try to cleanup per
1677   // being idle.
1678   PlatformThread::Sleep(kReclaimTimeForRacyCleanupTest);
1679 
1680   thread_group_->JoinForTesting();
1681 
1682   // Unwinding this test will be racy if worker cleanup can race with
1683   // ThreadGroupImpl destruction : https://crbug.com/810464.
1684   mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
1685   thread_group_.reset();
1686 }
1687 
1688 }  // namespace internal
1689 }  // namespace base
1690