• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group.h"
6 
7 #include <memory>
8 #include <tuple>
9 #include <utility>
10 
11 #include "base/barrier_closure.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/location.h"
15 #include "base/memory/ref_counted.h"
16 #include "base/task/task_runner.h"
17 #include "base/task/task_traits.h"
18 #include "base/task/thread_pool/can_run_policy_test.h"
19 #include "base/task/thread_pool/delayed_task_manager.h"
20 #include "base/task/thread_pool/pooled_sequenced_task_runner.h"
21 #include "base/task/thread_pool/task_tracker.h"
22 #include "base/task/thread_pool/test_task_factory.h"
23 #include "base/task/thread_pool/test_utils.h"
24 #include "base/task/thread_pool/thread_group_impl.h"
25 #include "base/test/bind.h"
26 #include "base/test/scoped_feature_list.h"
27 #include "base/test/test_timeouts.h"
28 #include "base/test/test_waitable_event.h"
29 #include "base/threading/platform_thread.h"
30 #include "base/threading/scoped_blocking_call.h"
31 #include "base/threading/scoped_blocking_call_internal.h"
32 #include "base/threading/simple_thread.h"
33 #include "base/threading/thread.h"
34 #include "base/threading/thread_restrictions.h"
35 #include "build/build_config.h"
36 #include "testing/gtest/include/gtest/gtest.h"
37 
38 #if BUILDFLAG(IS_WIN)
39 #include "base/win/com_init_check_hook.h"
40 #include "base/win/com_init_util.h"
41 #endif
42 
43 namespace base {
44 namespace internal {
45 
46 namespace {
47 
48 constexpr size_t kMaxTasks = 4;
49 constexpr size_t kTooManyTasks = 1000;
50 // By default, tests allow half of the thread group to be used by best-effort
51 // tasks.
52 constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
53 constexpr size_t kNumThreadsPostingTasks = 4;
54 constexpr size_t kNumTasksPostedPerThread = 150;
55 
56 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
57 
58 class ThreadPostingTasks : public SimpleThread {
59  public:
60   // Constructs a thread that posts |num_tasks_posted_per_thread| tasks to
61   // |thread_group| through an |execution_mode| task runner. If
62   // |post_nested_task| is YES, each task posted by this thread posts another
63   // task when it runs.
ThreadPostingTasks(test::MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate_,TaskSourceExecutionMode execution_mode,PostNestedTask post_nested_task)64   ThreadPostingTasks(
65       test::MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate_,
66       TaskSourceExecutionMode execution_mode,
67       PostNestedTask post_nested_task)
68       : SimpleThread("ThreadPostingTasks"),
69         post_nested_task_(post_nested_task),
70         factory_(test::CreatePooledTaskRunnerWithExecutionMode(
71                      execution_mode,
72                      mock_pooled_task_runner_delegate_),
73                  execution_mode) {}
74   ThreadPostingTasks(const ThreadPostingTasks&) = delete;
75   ThreadPostingTasks& operator=(const ThreadPostingTasks&) = delete;
76 
factory() const77   const test::TestTaskFactory* factory() const { return &factory_; }
78 
79  private:
Run()80   void Run() override {
81     for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
82       EXPECT_TRUE(factory_.PostTask(post_nested_task_, OnceClosure()));
83   }
84 
85   const scoped_refptr<TaskRunner> task_runner_;
86   const PostNestedTask post_nested_task_;
87   test::TestTaskFactory factory_;
88 };
89 
90 class ThreadGroupTestBase : public testing::Test, public ThreadGroup::Delegate {
91  public:
92   ThreadGroupTestBase(const ThreadGroupTestBase&) = delete;
93   ThreadGroupTestBase& operator=(const ThreadGroupTestBase&) = delete;
94 
95  protected:
96   ThreadGroupTestBase() = default;
97 
SetUp()98   void SetUp() override {
99     service_thread_.Start();
100     delayed_task_manager_.Start(service_thread_.task_runner());
101     CreateThreadGroup();
102   }
103 
TearDown()104   void TearDown() override {
105     delayed_task_manager_.Shutdown();
106     service_thread_.Stop();
107     DestroyThreadGroup();
108   }
109 
CreateThreadGroup()110   void CreateThreadGroup() {
111     ASSERT_FALSE(thread_group_);
112     thread_group_ = std::make_unique<ThreadGroupImpl>(
113         "TestThreadGroup", "A", ThreadType::kDefault,
114         task_tracker_.GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
115 
116     mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
117   }
118 
StartThreadGroup(ThreadGroup::WorkerEnvironment worker_environment=ThreadGroup::WorkerEnvironment::NONE)119   void StartThreadGroup(ThreadGroup::WorkerEnvironment worker_environment =
120                             ThreadGroup::WorkerEnvironment::NONE) {
121     ASSERT_TRUE(thread_group_);
122     ThreadGroupImpl* thread_group_impl =
123         static_cast<ThreadGroupImpl*>(thread_group_.get());
124     thread_group_impl->Start(kMaxTasks, kMaxBestEffortTasks, TimeDelta::Max(),
125                              service_thread_.task_runner(), nullptr,
126                              worker_environment);
127   }
128 
DestroyThreadGroup()129   void DestroyThreadGroup() {
130     if (!thread_group_) {
131       return;
132     }
133 
134     thread_group_->JoinForTesting();
135     mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
136     thread_group_.reset();
137   }
138 
139   Thread service_thread_{"ThreadPoolServiceThread"};
140   TaskTracker task_tracker_;
141   DelayedTaskManager delayed_task_manager_;
142   test::MockPooledTaskRunnerDelegate mock_pooled_task_runner_delegate_ = {
143       task_tracker_.GetTrackedRef(), &delayed_task_manager_};
144 
145   std::unique_ptr<ThreadGroup> thread_group_;
146 
147  private:
148   // ThreadGroup::Delegate:
GetThreadGroupForTraits(const TaskTraits & traits)149   ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override {
150     return thread_group_.get();
151   }
152 
153   TrackedRefFactory<ThreadGroup::Delegate> tracked_ref_factory_{this};
154 };
155 
156 using ThreadGroupTest = ThreadGroupTestBase;
157 
158 class ThreadGroupTestAllJobImpls : public ThreadGroupTestBase,
159                                    public testing::WithParamInterface<bool> {
160  public:
ThreadGroupTestAllJobImpls()161   ThreadGroupTestAllJobImpls() {
162     if (GetParam()) {
163       scoped_feature_list_.InitAndEnableFeature(kUseNewJobImplementation);
164     } else {
165       scoped_feature_list_.InitAndDisableFeature(kUseNewJobImplementation);
166     }
167   }
168 
169  private:
170   base::test::ScopedFeatureList scoped_feature_list_;
171 };
172 
173 // TODO(etiennep): Audit tests that don't need TaskSourceExecutionMode
174 // parameter.
175 class ThreadGroupTestAllExecutionModes
176     : public ThreadGroupTestBase,
177       public testing::WithParamInterface<TaskSourceExecutionMode> {
178  public:
179   ThreadGroupTestAllExecutionModes() = default;
180   ThreadGroupTestAllExecutionModes(const ThreadGroupTestAllExecutionModes&) =
181       delete;
182   ThreadGroupTestAllExecutionModes& operator=(
183       const ThreadGroupTestAllExecutionModes&) = delete;
184 
execution_mode() const185   TaskSourceExecutionMode execution_mode() const { return GetParam(); }
186 
CreatePooledTaskRunner(const TaskTraits & traits={})187   scoped_refptr<TaskRunner> CreatePooledTaskRunner(
188       const TaskTraits& traits = {}) {
189     return test::CreatePooledTaskRunnerWithExecutionMode(
190         execution_mode(), &mock_pooled_task_runner_delegate_, traits);
191   }
192 };
193 
ShouldNotRun()194 void ShouldNotRun() {
195   ADD_FAILURE() << "Ran a task that shouldn't run.";
196 }
197 
198 }  // namespace
199 
TEST_P(ThreadGroupTestAllExecutionModes,PostTasks)200 TEST_P(ThreadGroupTestAllExecutionModes, PostTasks) {
201   StartThreadGroup();
202   // Create threads to post tasks.
203   std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
204   for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
205     threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
206         &mock_pooled_task_runner_delegate_, execution_mode(),
207         PostNestedTask::NO));
208     threads_posting_tasks.back()->Start();
209   }
210 
211   // Wait for all tasks to run.
212   for (const auto& thread_posting_tasks : threads_posting_tasks) {
213     thread_posting_tasks->Join();
214     thread_posting_tasks->factory()->WaitForAllTasksToRun();
215   }
216 
217   // Flush the task tracker to be sure that no task accesses its TestTaskFactory
218   // after |thread_posting_tasks| is destroyed.
219   task_tracker_.FlushForTesting();
220 }
221 
TEST_P(ThreadGroupTestAllExecutionModes,NestedPostTasks)222 TEST_P(ThreadGroupTestAllExecutionModes, NestedPostTasks) {
223   StartThreadGroup();
224   // Create threads to post tasks. Each task posted by these threads will post
225   // another task when it runs.
226   std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
227   for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
228     threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
229         &mock_pooled_task_runner_delegate_, execution_mode(),
230         PostNestedTask::YES));
231     threads_posting_tasks.back()->Start();
232   }
233 
234   // Wait for all tasks to run.
235   for (const auto& thread_posting_tasks : threads_posting_tasks) {
236     thread_posting_tasks->Join();
237     thread_posting_tasks->factory()->WaitForAllTasksToRun();
238   }
239 
240   // Flush the task tracker to be sure that no task accesses its TestTaskFactory
241   // after |thread_posting_tasks| is destroyed.
242   task_tracker_.FlushForTesting();
243 }
244 
245 // Verify that a Task can't be posted after shutdown.
TEST_P(ThreadGroupTestAllExecutionModes,PostTaskAfterShutdown)246 TEST_P(ThreadGroupTestAllExecutionModes, PostTaskAfterShutdown) {
247   StartThreadGroup();
248   auto task_runner = CreatePooledTaskRunner();
249   test::ShutdownTaskTracker(&task_tracker_);
250   EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
251 }
252 
253 // Verify that a Task runs shortly after its delay expires.
TEST_P(ThreadGroupTestAllExecutionModes,PostDelayedTask)254 TEST_P(ThreadGroupTestAllExecutionModes, PostDelayedTask) {
255   StartThreadGroup();
256   // kJob doesn't support delays.
257   if (execution_mode() == TaskSourceExecutionMode::kJob)
258     return;
259 
260   TestWaitableEvent task_ran(WaitableEvent::ResetPolicy::AUTOMATIC);
261   auto task_runner = CreatePooledTaskRunner();
262 
263   // Wait until the task runner is up and running to make sure the test below is
264   // solely timing the delayed task, not bringing up a physical thread.
265   task_runner->PostTask(
266       FROM_HERE, BindOnce(&TestWaitableEvent::Signal, Unretained(&task_ran)));
267   task_ran.Wait();
268   ASSERT_TRUE(!task_ran.IsSignaled());
269 
270   // Post a task with a short delay.
271   const TimeTicks start_time = TimeTicks::Now();
272   EXPECT_TRUE(task_runner->PostDelayedTask(
273       FROM_HERE, BindOnce(&TestWaitableEvent::Signal, Unretained(&task_ran)),
274       TestTimeouts::tiny_timeout()));
275 
276   // Wait until the task runs.
277   task_ran.Wait();
278 
279   // Expect the task to run after its delay expires, but no more than a
280   // reasonable amount of time after that (overloaded bots can be slow sometimes
281   // so give it 10X flexibility).
282   const TimeDelta actual_delay = TimeTicks::Now() - start_time;
283   EXPECT_GE(actual_delay, TestTimeouts::tiny_timeout());
284   EXPECT_LT(actual_delay, 10 * TestTimeouts::tiny_timeout());
285 }
286 
287 // Verify that the RunsTasksInCurrentSequence() method of a SEQUENCED TaskRunner
288 // returns false when called from a task that isn't part of the sequence. Note:
289 // Tests that use TestTaskFactory already verify that
290 // RunsTasksInCurrentSequence() returns true when appropriate so this method
291 // complements it to get full coverage of that method.
TEST_P(ThreadGroupTestAllExecutionModes,SequencedRunsTasksInCurrentSequence)292 TEST_P(ThreadGroupTestAllExecutionModes, SequencedRunsTasksInCurrentSequence) {
293   StartThreadGroup();
294   auto task_runner = CreatePooledTaskRunner();
295   auto sequenced_task_runner = test::CreatePooledSequencedTaskRunner(
296       TaskTraits(), &mock_pooled_task_runner_delegate_);
297 
298   TestWaitableEvent task_ran;
299   task_runner->PostTask(
300       FROM_HERE,
301       BindOnce(
302           [](scoped_refptr<SequencedTaskRunner> sequenced_task_runner,
303              TestWaitableEvent* task_ran) {
304             EXPECT_FALSE(sequenced_task_runner->RunsTasksInCurrentSequence());
305             task_ran->Signal();
306           },
307           sequenced_task_runner, Unretained(&task_ran)));
308   task_ran.Wait();
309 }
310 
311 // Verify that tasks posted before Start run after Start.
TEST_P(ThreadGroupTestAllExecutionModes,PostBeforeStart)312 TEST_P(ThreadGroupTestAllExecutionModes, PostBeforeStart) {
313   TestWaitableEvent task_1_running;
314   TestWaitableEvent task_2_running;
315 
316   auto task_runner = CreatePooledTaskRunner();
317   task_runner->PostTask(FROM_HERE, BindOnce(&TestWaitableEvent::Signal,
318                                             Unretained(&task_1_running)));
319   task_runner->PostTask(FROM_HERE, BindOnce(&TestWaitableEvent::Signal,
320                                             Unretained(&task_2_running)));
321 
322   // Workers should not be created and tasks should not run before the thread
323   // group is started. The sleep is to give time for the tasks to potentially
324   // run.
325   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
326   EXPECT_FALSE(task_1_running.IsSignaled());
327   EXPECT_FALSE(task_2_running.IsSignaled());
328 
329   StartThreadGroup();
330 
331   // Tasks should run shortly after the thread group is started.
332   task_1_running.Wait();
333   task_2_running.Wait();
334 
335   task_tracker_.FlushForTesting();
336 }
337 
338 // Verify that tasks only run when allowed by the CanRunPolicy.
TEST_P(ThreadGroupTestAllExecutionModes,CanRunPolicyBasic)339 TEST_P(ThreadGroupTestAllExecutionModes, CanRunPolicyBasic) {
340   StartThreadGroup();
341   test::TestCanRunPolicyBasic(
342       thread_group_.get(),
343       [this](TaskPriority priority) {
344         return CreatePooledTaskRunner({priority});
345       },
346       &task_tracker_);
347 }
348 
TEST_F(ThreadGroupTest,CanRunPolicyUpdatedBeforeRun)349 TEST_F(ThreadGroupTest, CanRunPolicyUpdatedBeforeRun) {
350   StartThreadGroup();
351   // This test only works with SequencedTaskRunner become it assumes
352   // ordered execution of 2 posted tasks.
353   test::TestCanRunPolicyChangedBeforeRun(
354       thread_group_.get(),
355       [this](TaskPriority priority) {
356         return test::CreatePooledSequencedTaskRunner(
357             {priority}, &mock_pooled_task_runner_delegate_);
358       },
359       &task_tracker_);
360 }
361 
TEST_P(ThreadGroupTestAllExecutionModes,CanRunPolicyLoad)362 TEST_P(ThreadGroupTestAllExecutionModes, CanRunPolicyLoad) {
363   StartThreadGroup();
364   test::TestCanRunPolicyLoad(
365       thread_group_.get(),
366       [this](TaskPriority priority) {
367         return CreatePooledTaskRunner({priority});
368       },
369       &task_tracker_);
370 }
371 
372 // Verifies that ShouldYield() returns true for a priority that is not allowed
373 // to run by the CanRunPolicy.
TEST_F(ThreadGroupTest,CanRunPolicyShouldYield)374 TEST_F(ThreadGroupTest, CanRunPolicyShouldYield) {
375   StartThreadGroup();
376 
377   task_tracker_.SetCanRunPolicy(CanRunPolicy::kNone);
378   thread_group_->DidUpdateCanRunPolicy();
379   EXPECT_TRUE(
380       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
381   EXPECT_TRUE(
382       thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
383 
384   task_tracker_.SetCanRunPolicy(CanRunPolicy::kForegroundOnly);
385   thread_group_->DidUpdateCanRunPolicy();
386   EXPECT_TRUE(
387       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
388   EXPECT_FALSE(
389       thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
390 
391   task_tracker_.SetCanRunPolicy(CanRunPolicy::kAll);
392   thread_group_->DidUpdateCanRunPolicy();
393   EXPECT_FALSE(
394       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
395   EXPECT_FALSE(
396       thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
397 }
398 
399 // Verify that the maximum number of BEST_EFFORT tasks that can run concurrently
400 // in a thread group does not affect Sequences with a priority that was
401 // increased from BEST_EFFORT to USER_BLOCKING.
TEST_F(ThreadGroupTest,UpdatePriorityBestEffortToUserBlocking)402 TEST_F(ThreadGroupTest, UpdatePriorityBestEffortToUserBlocking) {
403   StartThreadGroup();
404 
405   CheckedLock num_tasks_running_lock;
406 
407   std::unique_ptr<ConditionVariable> num_tasks_running_cv =
408       num_tasks_running_lock.CreateConditionVariable();
409   num_tasks_running_cv->declare_only_used_while_idle();
410 
411   size_t num_tasks_running = 0;
412 
413   // Post |kMaxTasks| BEST_EFFORT tasks that block until they all start running.
414   std::vector<scoped_refptr<PooledSequencedTaskRunner>> task_runners;
415 
416   for (size_t i = 0; i < kMaxTasks; ++i) {
417     task_runners.push_back(MakeRefCounted<PooledSequencedTaskRunner>(
418         TaskTraits(TaskPriority::BEST_EFFORT),
419         &mock_pooled_task_runner_delegate_));
420     task_runners.back()->PostTask(
421         FROM_HERE, BindLambdaForTesting([&]() {
422           // Increment the number of tasks running.
423           {
424             CheckedAutoLock auto_lock(num_tasks_running_lock);
425             ++num_tasks_running;
426           }
427           num_tasks_running_cv->Broadcast();
428 
429           // Wait until all posted tasks are running.
430           CheckedAutoLock auto_lock(num_tasks_running_lock);
431           while (num_tasks_running < kMaxTasks)
432             num_tasks_running_cv->Wait();
433         }));
434   }
435 
436   // Wait until |kMaxBestEffort| tasks start running.
437   {
438     CheckedAutoLock auto_lock(num_tasks_running_lock);
439     while (num_tasks_running < kMaxBestEffortTasks)
440       num_tasks_running_cv->Wait();
441   }
442 
443   // Update the priority of all TaskRunners to USER_BLOCKING.
444   for (size_t i = 0; i < kMaxTasks; ++i)
445     task_runners[i]->UpdatePriority(TaskPriority::USER_BLOCKING);
446 
447   // Wait until all posted tasks start running. This should not block forever,
448   // even in a thread group that enforces a maximum number of concurrent
449   // BEST_EFFORT tasks lower than |kMaxTasks|.
450   static_assert(kMaxBestEffortTasks < kMaxTasks, "");
451   {
452     CheckedAutoLock auto_lock(num_tasks_running_lock);
453     while (num_tasks_running < kMaxTasks)
454       num_tasks_running_cv->Wait();
455   }
456 
457   task_tracker_.FlushForTesting();
458 }
459 
460 // Regression test for crbug.com/955953.
TEST_P(ThreadGroupTestAllExecutionModes,ScopedBlockingCallTwice)461 TEST_P(ThreadGroupTestAllExecutionModes, ScopedBlockingCallTwice) {
462   StartThreadGroup();
463   auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
464       execution_mode(), &mock_pooled_task_runner_delegate_, {MayBlock()});
465 
466   TestWaitableEvent task_ran;
467   task_runner->PostTask(FROM_HERE,
468                         BindOnce(
469                             [](TestWaitableEvent* task_ran) {
470                               {
471                                 ScopedBlockingCall scoped_blocking_call(
472                                     FROM_HERE, BlockingType::MAY_BLOCK);
473                               }
474                               {
475                                 ScopedBlockingCall scoped_blocking_call(
476                                     FROM_HERE, BlockingType::MAY_BLOCK);
477                               }
478                               task_ran->Signal();
479                             },
480                             Unretained(&task_ran)));
481   task_ran.Wait();
482 }
483 
484 #if BUILDFLAG(IS_WIN)
TEST_P(ThreadGroupTestAllExecutionModes,COMMTAWorkerEnvironment)485 TEST_P(ThreadGroupTestAllExecutionModes, COMMTAWorkerEnvironment) {
486   StartThreadGroup(ThreadGroup::WorkerEnvironment::COM_MTA);
487   auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
488       execution_mode(), &mock_pooled_task_runner_delegate_);
489 
490   TestWaitableEvent task_ran;
491   task_runner->PostTask(
492       FROM_HERE, BindOnce(
493                      [](TestWaitableEvent* task_ran) {
494                        win::AssertComApartmentType(win::ComApartmentType::MTA);
495                        task_ran->Signal();
496                      },
497                      Unretained(&task_ran)));
498   task_ran.Wait();
499 }
500 
TEST_P(ThreadGroupTestAllExecutionModes,NoWorkerEnvironment)501 TEST_P(ThreadGroupTestAllExecutionModes, NoWorkerEnvironment) {
502   StartThreadGroup(ThreadGroup::WorkerEnvironment::NONE);
503   auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
504       execution_mode(), &mock_pooled_task_runner_delegate_);
505 
506   TestWaitableEvent task_ran;
507   task_runner->PostTask(
508       FROM_HERE, BindOnce(
509                      [](TestWaitableEvent* task_ran) {
510                        win::AssertComApartmentType(win::ComApartmentType::NONE);
511                        task_ran->Signal();
512                      },
513                      Unretained(&task_ran)));
514   task_ran.Wait();
515 }
516 #endif
517 
518 // Verifies that ShouldYield() returns false when there is no pending task.
TEST_F(ThreadGroupTest,ShouldYieldSingleTask)519 TEST_F(ThreadGroupTest, ShouldYieldSingleTask) {
520   StartThreadGroup();
521 
522   test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
523                                &mock_pooled_task_runner_delegate_)
524       ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
525                    EXPECT_FALSE(thread_group_->ShouldYield(
526                        {TaskPriority::BEST_EFFORT, TimeTicks::Now()}));
527                    EXPECT_FALSE(thread_group_->ShouldYield(
528                        {TaskPriority::USER_VISIBLE, TimeTicks::Now()}));
529                    EXPECT_FALSE(thread_group_->ShouldYield(
530                        {TaskPriority::USER_VISIBLE, TimeTicks::Now()}));
531                  }));
532 
533   task_tracker_.FlushForTesting();
534 }
535 
536 // Verify that tasks from a JobTaskSource run at the intended concurrency.
TEST_P(ThreadGroupTestAllJobImpls,ScheduleJobTaskSource)537 TEST_P(ThreadGroupTestAllJobImpls, ScheduleJobTaskSource) {
538   StartThreadGroup();
539 
540   TestWaitableEvent threads_running;
541   TestWaitableEvent threads_continue;
542 
543   RepeatingClosure threads_running_barrier = BarrierClosure(
544       kMaxTasks,
545       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
546 
547   auto job_task = base::MakeRefCounted<test::MockJobTask>(
548       BindLambdaForTesting(
549           [&threads_running_barrier, &threads_continue](JobDelegate*) {
550             threads_running_barrier.Run();
551             threads_continue.Wait();
552           }),
553       /* num_tasks_to_run */ kMaxTasks);
554   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
555       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
556   task_source->NotifyConcurrencyIncrease();
557 
558   threads_running.Wait();
559   threads_continue.Signal();
560 
561   // Flush the task tracker to be sure that no local variables are accessed by
562   // tasks after the end of the scope.
563   task_tracker_.FlushForTesting();
564 }
565 
566 // Verify that tasks from a JobTaskSource run at the intended concurrency.
TEST_P(ThreadGroupTestAllJobImpls,ScheduleJobTaskSourceMultipleTime)567 TEST_P(ThreadGroupTestAllJobImpls, ScheduleJobTaskSourceMultipleTime) {
568   StartThreadGroup();
569 
570   TestWaitableEvent thread_running;
571   TestWaitableEvent thread_continue;
572   auto job_task = base::MakeRefCounted<test::MockJobTask>(
573       BindLambdaForTesting([&thread_running, &thread_continue](JobDelegate*) {
574         DCHECK(!thread_running.IsSignaled());
575         thread_running.Signal();
576         thread_continue.Wait();
577       }),
578       /* num_tasks_to_run */ 1);
579   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
580       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
581 
582   // Multiple calls to NotifyConcurrencyIncrease() should have the same effect
583   // as a single call.
584   task_source->NotifyConcurrencyIncrease();
585   task_source->NotifyConcurrencyIncrease();
586 
587   thread_running.Wait();
588   thread_continue.Signal();
589 
590   // Once the worker task ran, enqueuing the task source has no effect.
591   task_source->NotifyConcurrencyIncrease();
592 
593   // Flush the task tracker to be sure that no local variables are accessed by
594   // tasks after the end of the scope.
595   task_tracker_.FlushForTesting();
596 }
597 
598 // Verify that Cancel() on a job stops running the worker task and causes
599 // current workers to yield.
TEST_P(ThreadGroupTestAllJobImpls,CancelJobTaskSource)600 TEST_P(ThreadGroupTestAllJobImpls, CancelJobTaskSource) {
601   StartThreadGroup();
602 
603   CheckedLock tasks_running_lock;
604   std::unique_ptr<ConditionVariable> tasks_running_cv =
605       tasks_running_lock.CreateConditionVariable();
606   bool tasks_running = false;
607 
608   // Schedule a big number of tasks.
609   auto job_task = base::MakeRefCounted<test::MockJobTask>(
610       BindLambdaForTesting([&](JobDelegate* delegate) {
611         {
612           CheckedAutoLock auto_lock(tasks_running_lock);
613           tasks_running = true;
614         }
615         tasks_running_cv->Signal();
616 
617         while (!delegate->ShouldYield()) {
618         }
619       }),
620       /* num_tasks_to_run */ kTooManyTasks);
621   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
622       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
623   task_source->NotifyConcurrencyIncrease();
624 
625   JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
626 
627   // Wait for at least 1 task to start running.
628   {
629     CheckedAutoLock auto_lock(tasks_running_lock);
630     while (!tasks_running)
631       tasks_running_cv->Wait();
632   }
633 
634   // Cancels pending tasks and unblocks running ones.
635   job_handle.Cancel();
636 
637   // This should not block since the job got cancelled.
638   task_tracker_.FlushForTesting();
639 }
640 
641 // Verify that calling JobTaskSource::NotifyConcurrencyIncrease() (re-)schedule
642 // tasks with the intended concurrency.
TEST_P(ThreadGroupTestAllJobImpls,JobTaskSourceConcurrencyIncrease)643 TEST_P(ThreadGroupTestAllJobImpls, JobTaskSourceConcurrencyIncrease) {
644   StartThreadGroup();
645 
646   TestWaitableEvent threads_running_a;
647   TestWaitableEvent threads_continue;
648 
649   // Initially schedule half the tasks.
650   RepeatingClosure threads_running_barrier = BarrierClosure(
651       kMaxTasks / 2,
652       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running_a)));
653 
654   auto job_state = base::MakeRefCounted<test::MockJobTask>(
655       BindLambdaForTesting(
656           [&threads_running_barrier, &threads_continue](JobDelegate*) {
657             threads_running_barrier.Run();
658             threads_continue.Wait();
659           }),
660       /* num_tasks_to_run */ kMaxTasks / 2);
661   auto task_source = job_state->GetJobTaskSource(
662       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
663   task_source->NotifyConcurrencyIncrease();
664 
665   threads_running_a.Wait();
666   // Reset |threads_running_barrier| for the remaining tasks.
667   TestWaitableEvent threads_running_b;
668   threads_running_barrier = BarrierClosure(
669       kMaxTasks / 2,
670       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running_b)));
671   job_state->SetNumTasksToRun(kMaxTasks);
672 
673   // Unblocks tasks to let them racily wait for NotifyConcurrencyIncrease() to
674   // be called.
675   threads_continue.Signal();
676   task_source->NotifyConcurrencyIncrease();
677   // Wait for the remaining tasks. This should not block forever.
678   threads_running_b.Wait();
679 
680   // Flush the task tracker to be sure that no local variables are accessed by
681   // tasks after the end of the scope.
682   task_tracker_.FlushForTesting();
683 }
684 
685 // Verify that a JobTaskSource that becomes empty while in the queue eventually
686 // gets discarded.
TEST_P(ThreadGroupTestAllJobImpls,ScheduleEmptyJobTaskSource)687 TEST_P(ThreadGroupTestAllJobImpls, ScheduleEmptyJobTaskSource) {
688   StartThreadGroup();
689 
690   task_tracker_.SetCanRunPolicy(CanRunPolicy::kNone);
691 
692   auto job_task = base::MakeRefCounted<test::MockJobTask>(
693       BindRepeating([](JobDelegate*) { ShouldNotRun(); }),
694       /* num_tasks_to_run */ 1);
695   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
696       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
697   task_source->NotifyConcurrencyIncrease();
698 
699   // The worker task will never run.
700   job_task->SetNumTasksToRun(0);
701 
702   task_tracker_.SetCanRunPolicy(CanRunPolicy::kAll);
703   thread_group_->DidUpdateCanRunPolicy();
704 
705   // This should not block since there's no task to run.
706   task_tracker_.FlushForTesting();
707 }
708 
709 // Verify that Join() on a job contributes to max concurrency and waits for all
710 // workers to return.
TEST_P(ThreadGroupTestAllJobImpls,JoinJobTaskSource)711 TEST_P(ThreadGroupTestAllJobImpls, JoinJobTaskSource) {
712   StartThreadGroup();
713 
714   TestWaitableEvent threads_continue;
715   RepeatingClosure threads_continue_barrier = BarrierClosure(
716       kMaxTasks + 1,
717       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_continue)));
718 
719   auto job_task = base::MakeRefCounted<test::MockJobTask>(
720       BindLambdaForTesting([&](JobDelegate*) {
721         threads_continue_barrier.Run();
722         threads_continue.Wait();
723       }),
724       /* num_tasks_to_run */ kMaxTasks + 1);
725   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
726       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
727   task_source->NotifyConcurrencyIncrease();
728 
729   JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
730   job_handle.Join();
731   // All worker tasks should complete before Join() returns.
732   EXPECT_EQ(0U, job_task->GetMaxConcurrency(0));
733   thread_group_->JoinForTesting();
734   EXPECT_EQ(1U, task_source->HasOneRef());
735   // Prevent TearDown() from calling JoinForTesting() again.
736   mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
737   thread_group_ = nullptr;
738 }
739 
740 // Verify that finishing work outside of a job unblocks workers with a stale
741 // max concurrency.
TEST_P(ThreadGroupTestAllJobImpls,JoinJobTaskSourceStaleConcurrency)742 TEST_P(ThreadGroupTestAllJobImpls, JoinJobTaskSourceStaleConcurrency) {
743   StartThreadGroup();
744 
745   TestWaitableEvent thread_running;
746   std::atomic_size_t max_concurrency(1);
747   auto task_source = CreateJobTaskSource(
748       FROM_HERE, TaskTraits{},
749       BindLambdaForTesting([&](JobDelegate*) { thread_running.Signal(); }),
750       BindLambdaForTesting(
751           [&](size_t /*worker_count*/) -> size_t { return max_concurrency; }),
752       &mock_pooled_task_runner_delegate_);
753   task_source->NotifyConcurrencyIncrease();
754 
755   JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
756   thread_running.Wait();
757 
758   // Racily update max concurrency to unblock the task that was waiting on
759   // NotifyMaxConcurrency().
760   max_concurrency = 0;
761   job_handle.Join();
762 
763   // This should not block since the job was joined.
764   task_tracker_.FlushForTesting();
765 }
766 
767 // Verify that cancelling a job unblocks workers with a stale max concurrency.
TEST_P(ThreadGroupTestAllJobImpls,CancelJobTaskSourceWithStaleConcurrency)768 TEST_P(ThreadGroupTestAllJobImpls, CancelJobTaskSourceWithStaleConcurrency) {
769   StartThreadGroup();
770 
771   TestWaitableEvent thread_running;
772   auto task_source = CreateJobTaskSource(
773       FROM_HERE, TaskTraits{},
774       BindLambdaForTesting([&](JobDelegate*) { thread_running.Signal(); }),
775       BindRepeating([](size_t /*worker_count*/) -> size_t { return 1; }),
776       &mock_pooled_task_runner_delegate_);
777   task_source->NotifyConcurrencyIncrease();
778 
779   JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
780   thread_running.Wait();
781   job_handle.Cancel();
782 
783   // This should not block since the job got cancelled.
784   task_tracker_.FlushForTesting();
785 }
786 
787 // Verify that the maximum number of BEST_EFFORT tasks that can run concurrently
788 // in a thread group does not affect JobTaskSource with a priority that was
789 // increased from BEST_EFFORT to USER_BLOCKING.
TEST_P(ThreadGroupTestAllJobImpls,JobTaskSourceUpdatePriority)790 TEST_P(ThreadGroupTestAllJobImpls, JobTaskSourceUpdatePriority) {
791   StartThreadGroup();
792 
793   CheckedLock num_tasks_running_lock;
794 
795   std::unique_ptr<ConditionVariable> num_tasks_running_cv =
796       num_tasks_running_lock.CreateConditionVariable();
797   num_tasks_running_cv->declare_only_used_while_idle();
798 
799   size_t num_tasks_running = 0;
800 
801   auto job_task = base::MakeRefCounted<test::MockJobTask>(
802       BindLambdaForTesting([&](JobDelegate*) {
803         // Increment the number of tasks running.
804         {
805           CheckedAutoLock auto_lock(num_tasks_running_lock);
806           ++num_tasks_running;
807         }
808         num_tasks_running_cv->Broadcast();
809 
810         // Wait until all posted tasks are running.
811         CheckedAutoLock auto_lock(num_tasks_running_lock);
812         while (num_tasks_running < kMaxTasks)
813           num_tasks_running_cv->Wait();
814       }),
815       /* num_tasks_to_run */ kMaxTasks);
816   scoped_refptr<JobTaskSource> task_source =
817       job_task->GetJobTaskSource(FROM_HERE, {TaskPriority::BEST_EFFORT},
818                                  &mock_pooled_task_runner_delegate_);
819   task_source->NotifyConcurrencyIncrease();
820 
821   // Wait until |kMaxBestEffort| tasks start running.
822   {
823     CheckedAutoLock auto_lock(num_tasks_running_lock);
824     while (num_tasks_running < kMaxBestEffortTasks)
825       num_tasks_running_cv->Wait();
826   }
827 
828   // Update the priority to USER_BLOCKING.
829   auto transaction = task_source->BeginTransaction();
830   transaction.UpdatePriority(TaskPriority::USER_BLOCKING);
831   thread_group_->UpdateSortKey(std::move(transaction));
832 
833   // Wait until all posted tasks start running. This should not block forever,
834   // even in a thread group that enforces a maximum number of concurrent
835   // BEST_EFFORT tasks lower than |kMaxTasks|.
836   static_assert(kMaxBestEffortTasks < kMaxTasks, "");
837   {
838     CheckedAutoLock auto_lock(num_tasks_running_lock);
839     while (num_tasks_running < kMaxTasks)
840       num_tasks_running_cv->Wait();
841   }
842 
843   // Flush the task tracker to be sure that no local variables are accessed by
844   // tasks after the end of the scope.
845   task_tracker_.FlushForTesting();
846 }
847 
848 INSTANTIATE_TEST_SUITE_P(GenericParallel,
849                          ThreadGroupTestAllExecutionModes,
850                          ::testing::Values(TaskSourceExecutionMode::kParallel));
851 INSTANTIATE_TEST_SUITE_P(
852     GenericSequenced,
853     ThreadGroupTestAllExecutionModes,
854     ::testing::Values(TaskSourceExecutionMode::kSequenced));
855 INSTANTIATE_TEST_SUITE_P(GenericJob,
856                          ThreadGroupTestAllExecutionModes,
857                          ::testing::Values(TaskSourceExecutionMode::kJob));
858 
859 INSTANTIATE_TEST_SUITE_P(,
860                          ThreadGroupTestAllJobImpls,
861                          testing::Bool(),
__anon832ccca21602(const testing::TestParamInfo<bool>& info) 862                          [](const testing::TestParamInfo<bool>& info) {
863                            if (info.param) {
864                              return "NewJob";
865                            }
866                            return "OldJob";
867                          });
868 
869 }  // namespace internal
870 }  // namespace base
871