• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/thread_group.h"
6 
7 #include <memory>
8 #include <tuple>
9 #include <utility>
10 
11 #include "base/barrier_closure.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/location.h"
15 #include "base/memory/ref_counted.h"
16 #include "base/task/task_runner.h"
17 #include "base/task/task_traits.h"
18 #include "base/task/thread_pool/can_run_policy_test.h"
19 #include "base/task/thread_pool/delayed_task_manager.h"
20 #include "base/task/thread_pool/pooled_sequenced_task_runner.h"
21 #include "base/task/thread_pool/task_tracker.h"
22 #include "base/task/thread_pool/test_task_factory.h"
23 #include "base/task/thread_pool/test_utils.h"
24 #include "base/task/thread_pool/thread_group_impl.h"
25 #include "base/test/bind.h"
26 #include "base/test/test_timeouts.h"
27 #include "base/test/test_waitable_event.h"
28 #include "base/threading/platform_thread.h"
29 #include "base/threading/scoped_blocking_call.h"
30 #include "base/threading/scoped_blocking_call_internal.h"
31 #include "base/threading/simple_thread.h"
32 #include "base/threading/thread.h"
33 #include "base/threading/thread_restrictions.h"
34 #include "build/build_config.h"
35 #include "testing/gtest/include/gtest/gtest.h"
36 
37 #if BUILDFLAG(IS_WIN)
38 #include "base/win/com_init_check_hook.h"
39 #include "base/win/com_init_util.h"
40 #endif
41 
42 namespace base {
43 namespace internal {
44 
45 namespace {
46 
47 constexpr size_t kMaxTasks = 4;
48 constexpr size_t kTooManyTasks = 1000;
49 // By default, tests allow half of the thread group to be used by best-effort
50 // tasks.
51 constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
52 constexpr size_t kNumThreadsPostingTasks = 4;
53 constexpr size_t kNumTasksPostedPerThread = 150;
54 
55 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
56 
57 class ThreadPostingTasks : public SimpleThread {
58  public:
59   // Constructs a thread that posts |num_tasks_posted_per_thread| tasks to
60   // |thread_group| through an |execution_mode| task runner. If
61   // |post_nested_task| is YES, each task posted by this thread posts another
62   // task when it runs.
ThreadPostingTasks(test::MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate_,TaskSourceExecutionMode execution_mode,PostNestedTask post_nested_task)63   ThreadPostingTasks(
64       test::MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate_,
65       TaskSourceExecutionMode execution_mode,
66       PostNestedTask post_nested_task)
67       : SimpleThread("ThreadPostingTasks"),
68         post_nested_task_(post_nested_task),
69         factory_(test::CreatePooledTaskRunnerWithExecutionMode(
70                      execution_mode,
71                      mock_pooled_task_runner_delegate_),
72                  execution_mode) {}
73   ThreadPostingTasks(const ThreadPostingTasks&) = delete;
74   ThreadPostingTasks& operator=(const ThreadPostingTasks&) = delete;
75 
factory() const76   const test::TestTaskFactory* factory() const { return &factory_; }
77 
78  private:
Run()79   void Run() override {
80     for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
81       EXPECT_TRUE(factory_.PostTask(post_nested_task_, OnceClosure()));
82   }
83 
84   const scoped_refptr<TaskRunner> task_runner_;
85   const PostNestedTask post_nested_task_;
86   test::TestTaskFactory factory_;
87 };
88 
89 class ThreadGroupTestBase : public testing::Test, public ThreadGroup::Delegate {
90  public:
91   ThreadGroupTestBase(const ThreadGroupTestBase&) = delete;
92   ThreadGroupTestBase& operator=(const ThreadGroupTestBase&) = delete;
93 
94  protected:
95   ThreadGroupTestBase() = default;
96 
SetUp()97   void SetUp() override {
98     service_thread_.Start();
99     delayed_task_manager_.Start(service_thread_.task_runner());
100     CreateThreadGroup();
101   }
102 
TearDown()103   void TearDown() override {
104     delayed_task_manager_.Shutdown();
105     service_thread_.Stop();
106     DestroyThreadGroup();
107   }
108 
CreateThreadGroup()109   void CreateThreadGroup() {
110     ASSERT_FALSE(thread_group_);
111     thread_group_ = std::make_unique<ThreadGroupImpl>(
112         "TestThreadGroup", "A", ThreadType::kDefault,
113         task_tracker_.GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
114 
115     mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
116   }
117 
StartThreadGroup(ThreadGroup::WorkerEnvironment worker_environment=ThreadGroup::WorkerEnvironment::NONE)118   void StartThreadGroup(ThreadGroup::WorkerEnvironment worker_environment =
119                             ThreadGroup::WorkerEnvironment::NONE) {
120     ASSERT_TRUE(thread_group_);
121     ThreadGroupImpl* thread_group_impl =
122         static_cast<ThreadGroupImpl*>(thread_group_.get());
123     thread_group_impl->Start(kMaxTasks, kMaxBestEffortTasks, TimeDelta::Max(),
124                              service_thread_.task_runner(), nullptr,
125                              worker_environment,
126                              /*synchronous_thread_start_for_testing=*/false,
127                              /*may_block_threshold=*/{});
128   }
129 
DestroyThreadGroup()130   void DestroyThreadGroup() {
131     if (!thread_group_) {
132       return;
133     }
134 
135     thread_group_->JoinForTesting();
136     mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
137     thread_group_.reset();
138   }
139 
140   Thread service_thread_{"ThreadPoolServiceThread"};
141   TaskTracker task_tracker_;
142   DelayedTaskManager delayed_task_manager_;
143   test::MockPooledTaskRunnerDelegate mock_pooled_task_runner_delegate_ = {
144       task_tracker_.GetTrackedRef(), &delayed_task_manager_};
145 
146   std::unique_ptr<ThreadGroup> thread_group_;
147 
148  private:
149   // ThreadGroup::Delegate:
GetThreadGroupForTraits(const TaskTraits & traits)150   ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override {
151     return thread_group_.get();
152   }
153 
154   TrackedRefFactory<ThreadGroup::Delegate> tracked_ref_factory_{this};
155 };
156 
157 using ThreadGroupTest = ThreadGroupTestBase;
158 
159 // TODO(etiennep): Audit tests that don't need TaskSourceExecutionMode
160 // parameter.
161 class ThreadGroupTestAllExecutionModes
162     : public ThreadGroupTestBase,
163       public testing::WithParamInterface<TaskSourceExecutionMode> {
164  public:
165   ThreadGroupTestAllExecutionModes() = default;
166   ThreadGroupTestAllExecutionModes(const ThreadGroupTestAllExecutionModes&) =
167       delete;
168   ThreadGroupTestAllExecutionModes& operator=(
169       const ThreadGroupTestAllExecutionModes&) = delete;
170 
execution_mode() const171   TaskSourceExecutionMode execution_mode() const { return GetParam(); }
172 
CreatePooledTaskRunner(const TaskTraits & traits={})173   scoped_refptr<TaskRunner> CreatePooledTaskRunner(
174       const TaskTraits& traits = {}) {
175     return test::CreatePooledTaskRunnerWithExecutionMode(
176         execution_mode(), &mock_pooled_task_runner_delegate_, traits);
177   }
178 };
179 
ShouldNotRun()180 void ShouldNotRun() {
181   ADD_FAILURE() << "Ran a task that shouldn't run.";
182 }
183 
184 }  // namespace
185 
TEST_P(ThreadGroupTestAllExecutionModes,PostTasks)186 TEST_P(ThreadGroupTestAllExecutionModes, PostTasks) {
187   StartThreadGroup();
188   // Create threads to post tasks.
189   std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
190   for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
191     threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
192         &mock_pooled_task_runner_delegate_, execution_mode(),
193         PostNestedTask::NO));
194     threads_posting_tasks.back()->Start();
195   }
196 
197   // Wait for all tasks to run.
198   for (const auto& thread_posting_tasks : threads_posting_tasks) {
199     thread_posting_tasks->Join();
200     thread_posting_tasks->factory()->WaitForAllTasksToRun();
201   }
202 
203   // Flush the task tracker to be sure that no task accesses its TestTaskFactory
204   // after |thread_posting_tasks| is destroyed.
205   task_tracker_.FlushForTesting();
206 }
207 
TEST_P(ThreadGroupTestAllExecutionModes,NestedPostTasks)208 TEST_P(ThreadGroupTestAllExecutionModes, NestedPostTasks) {
209   StartThreadGroup();
210   // Create threads to post tasks. Each task posted by these threads will post
211   // another task when it runs.
212   std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
213   for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
214     threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
215         &mock_pooled_task_runner_delegate_, execution_mode(),
216         PostNestedTask::YES));
217     threads_posting_tasks.back()->Start();
218   }
219 
220   // Wait for all tasks to run.
221   for (const auto& thread_posting_tasks : threads_posting_tasks) {
222     thread_posting_tasks->Join();
223     thread_posting_tasks->factory()->WaitForAllTasksToRun();
224   }
225 
226   // Flush the task tracker to be sure that no task accesses its TestTaskFactory
227   // after |thread_posting_tasks| is destroyed.
228   task_tracker_.FlushForTesting();
229 }
230 
231 // Verify that a Task can't be posted after shutdown.
TEST_P(ThreadGroupTestAllExecutionModes,PostTaskAfterShutdown)232 TEST_P(ThreadGroupTestAllExecutionModes, PostTaskAfterShutdown) {
233   StartThreadGroup();
234   auto task_runner = CreatePooledTaskRunner();
235   test::ShutdownTaskTracker(&task_tracker_);
236   EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
237 }
238 
239 // Verify that a Task runs shortly after its delay expires.
TEST_P(ThreadGroupTestAllExecutionModes,PostDelayedTask)240 TEST_P(ThreadGroupTestAllExecutionModes, PostDelayedTask) {
241   StartThreadGroup();
242   // kJob doesn't support delays.
243   if (execution_mode() == TaskSourceExecutionMode::kJob)
244     return;
245 
246   TestWaitableEvent task_ran(WaitableEvent::ResetPolicy::AUTOMATIC);
247   auto task_runner = CreatePooledTaskRunner();
248 
249   // Wait until the task runner is up and running to make sure the test below is
250   // solely timing the delayed task, not bringing up a physical thread.
251   task_runner->PostTask(
252       FROM_HERE, BindOnce(&TestWaitableEvent::Signal, Unretained(&task_ran)));
253   task_ran.Wait();
254   ASSERT_TRUE(!task_ran.IsSignaled());
255 
256   // Post a task with a short delay.
257   const TimeTicks start_time = TimeTicks::Now();
258   EXPECT_TRUE(task_runner->PostDelayedTask(
259       FROM_HERE, BindOnce(&TestWaitableEvent::Signal, Unretained(&task_ran)),
260       TestTimeouts::tiny_timeout()));
261 
262   // Wait until the task runs.
263   task_ran.Wait();
264 
265   // Expect the task to run after its delay expires, but no more than a
266   // reasonable amount of time after that (overloaded bots can be slow sometimes
267   // so give it 10X flexibility).
268   const TimeDelta actual_delay = TimeTicks::Now() - start_time;
269   EXPECT_GE(actual_delay, TestTimeouts::tiny_timeout());
270   EXPECT_LT(actual_delay, 10 * TestTimeouts::tiny_timeout());
271 }
272 
273 // Verify that the RunsTasksInCurrentSequence() method of a SEQUENCED TaskRunner
274 // returns false when called from a task that isn't part of the sequence. Note:
275 // Tests that use TestTaskFactory already verify that
276 // RunsTasksInCurrentSequence() returns true when appropriate so this method
277 // complements it to get full coverage of that method.
TEST_P(ThreadGroupTestAllExecutionModes,SequencedRunsTasksInCurrentSequence)278 TEST_P(ThreadGroupTestAllExecutionModes, SequencedRunsTasksInCurrentSequence) {
279   StartThreadGroup();
280   auto task_runner = CreatePooledTaskRunner();
281   auto sequenced_task_runner = test::CreatePooledSequencedTaskRunner(
282       TaskTraits(), &mock_pooled_task_runner_delegate_);
283 
284   TestWaitableEvent task_ran;
285   task_runner->PostTask(
286       FROM_HERE,
287       BindOnce(
288           [](scoped_refptr<SequencedTaskRunner> sequenced_task_runner,
289              TestWaitableEvent* task_ran) {
290             EXPECT_FALSE(sequenced_task_runner->RunsTasksInCurrentSequence());
291             task_ran->Signal();
292           },
293           sequenced_task_runner, Unretained(&task_ran)));
294   task_ran.Wait();
295 }
296 
297 // Verify that tasks posted before Start run after Start.
TEST_P(ThreadGroupTestAllExecutionModes,PostBeforeStart)298 TEST_P(ThreadGroupTestAllExecutionModes, PostBeforeStart) {
299   TestWaitableEvent task_1_running;
300   TestWaitableEvent task_2_running;
301 
302   auto task_runner = CreatePooledTaskRunner();
303   task_runner->PostTask(FROM_HERE, BindOnce(&TestWaitableEvent::Signal,
304                                             Unretained(&task_1_running)));
305   task_runner->PostTask(FROM_HERE, BindOnce(&TestWaitableEvent::Signal,
306                                             Unretained(&task_2_running)));
307 
308   // Workers should not be created and tasks should not run before the thread
309   // group is started. The sleep is to give time for the tasks to potentially
310   // run.
311   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
312   EXPECT_FALSE(task_1_running.IsSignaled());
313   EXPECT_FALSE(task_2_running.IsSignaled());
314 
315   StartThreadGroup();
316 
317   // Tasks should run shortly after the thread group is started.
318   task_1_running.Wait();
319   task_2_running.Wait();
320 
321   task_tracker_.FlushForTesting();
322 }
323 
324 // Verify that tasks only run when allowed by the CanRunPolicy.
TEST_P(ThreadGroupTestAllExecutionModes,CanRunPolicyBasic)325 TEST_P(ThreadGroupTestAllExecutionModes, CanRunPolicyBasic) {
326   StartThreadGroup();
327   test::TestCanRunPolicyBasic(
328       thread_group_.get(),
329       [this](TaskPriority priority) {
330         return CreatePooledTaskRunner({priority});
331       },
332       &task_tracker_);
333 }
334 
TEST_F(ThreadGroupTest,CanRunPolicyUpdatedBeforeRun)335 TEST_F(ThreadGroupTest, CanRunPolicyUpdatedBeforeRun) {
336   StartThreadGroup();
337   // This test only works with SequencedTaskRunner become it assumes
338   // ordered execution of 2 posted tasks.
339   test::TestCanRunPolicyChangedBeforeRun(
340       thread_group_.get(),
341       [this](TaskPriority priority) {
342         return test::CreatePooledSequencedTaskRunner(
343             {priority}, &mock_pooled_task_runner_delegate_);
344       },
345       &task_tracker_);
346 }
347 
TEST_P(ThreadGroupTestAllExecutionModes,CanRunPolicyLoad)348 TEST_P(ThreadGroupTestAllExecutionModes, CanRunPolicyLoad) {
349   StartThreadGroup();
350   test::TestCanRunPolicyLoad(
351       thread_group_.get(),
352       [this](TaskPriority priority) {
353         return CreatePooledTaskRunner({priority});
354       },
355       &task_tracker_);
356 }
357 
358 // Verifies that ShouldYield() returns true for a priority that is not allowed
359 // to run by the CanRunPolicy.
TEST_F(ThreadGroupTest,CanRunPolicyShouldYield)360 TEST_F(ThreadGroupTest, CanRunPolicyShouldYield) {
361   StartThreadGroup();
362 
363   task_tracker_.SetCanRunPolicy(CanRunPolicy::kNone);
364   thread_group_->DidUpdateCanRunPolicy();
365   EXPECT_TRUE(
366       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
367   EXPECT_TRUE(
368       thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
369 
370   task_tracker_.SetCanRunPolicy(CanRunPolicy::kForegroundOnly);
371   thread_group_->DidUpdateCanRunPolicy();
372   EXPECT_TRUE(
373       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
374   EXPECT_FALSE(
375       thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
376 
377   task_tracker_.SetCanRunPolicy(CanRunPolicy::kAll);
378   thread_group_->DidUpdateCanRunPolicy();
379   EXPECT_FALSE(
380       thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
381   EXPECT_FALSE(
382       thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
383 }
384 
TEST_F(ThreadGroupTest,SetMaxTasks)385 TEST_F(ThreadGroupTest, SetMaxTasks) {
386   StartThreadGroup();
387 
388   constexpr size_t kNewMaxTasks = kMaxTasks / 2;
389 
390   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
391   thread_group_->SetMaxTasks(kNewMaxTasks);
392   ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kNewMaxTasks);
393 
394   TestWaitableEvent threads_running;
395   TestWaitableEvent busy_threads_continue;
396   const scoped_refptr<TaskRunner> task_runner =
397       test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
398                                    &mock_pooled_task_runner_delegate_);
399 
400   RepeatingClosure threads_running_barrier = BarrierClosure(
401       kNewMaxTasks,
402       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
403 
404   // Posting these tasks should cause new workers to be created.
405   for (size_t i = 0; i < kNewMaxTasks; ++i) {
406     task_runner->PostTask(
407         FROM_HERE, BindLambdaForTesting(
408                        [&busy_threads_continue, &threads_running_barrier]() {
409                          threads_running_barrier.Run();
410                          busy_threads_continue.Wait();
411                        }));
412   }
413   threads_running.Wait();
414 
415   AtomicFlag is_exiting;
416   // These tasks should not get executed until after other tasks become
417   // unblocked.
418   for (size_t i = 0; i < kNewMaxTasks; ++i) {
419     task_runner->PostTask(FROM_HERE, BindOnce(
420                                          [](AtomicFlag* is_exiting) {
421                                            EXPECT_TRUE(is_exiting->IsSet());
422                                          },
423                                          Unretained(&is_exiting)));
424   }
425   // Give time for those idle workers to possibly do work (which should not
426   // happen).
427   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
428 
429   is_exiting.Set();
430   thread_group_->ResetMaxTasks();
431   busy_threads_continue.Signal();
432   task_tracker_.FlushForTesting();
433 }
434 
435 // Verify that the maximum number of BEST_EFFORT tasks that can run concurrently
436 // in a thread group does not affect Sequences with a priority that was
437 // increased from BEST_EFFORT to USER_BLOCKING.
TEST_F(ThreadGroupTest,UpdatePriorityBestEffortToUserBlocking)438 TEST_F(ThreadGroupTest, UpdatePriorityBestEffortToUserBlocking) {
439   StartThreadGroup();
440 
441   CheckedLock num_tasks_running_lock;
442 
443   ConditionVariable num_tasks_running_cv =
444       num_tasks_running_lock.CreateConditionVariable();
445   num_tasks_running_cv.declare_only_used_while_idle();
446 
447   size_t num_tasks_running = 0;
448 
449   // Post |kMaxTasks| BEST_EFFORT tasks that block until they all start running.
450   std::vector<scoped_refptr<PooledSequencedTaskRunner>> task_runners;
451 
452   for (size_t i = 0; i < kMaxTasks; ++i) {
453     task_runners.push_back(MakeRefCounted<PooledSequencedTaskRunner>(
454         TaskTraits(TaskPriority::BEST_EFFORT),
455         &mock_pooled_task_runner_delegate_));
456     task_runners.back()->PostTask(
457         FROM_HERE, BindLambdaForTesting([&] {
458           // Increment the number of tasks running.
459           {
460             CheckedAutoLock auto_lock(num_tasks_running_lock);
461             ++num_tasks_running;
462           }
463           num_tasks_running_cv.Broadcast();
464 
465           // Wait until all posted tasks are running.
466           CheckedAutoLock auto_lock(num_tasks_running_lock);
467           while (num_tasks_running < kMaxTasks)
468             num_tasks_running_cv.Wait();
469         }));
470   }
471 
472   // Wait until |kMaxBestEffort| tasks start running.
473   {
474     CheckedAutoLock auto_lock(num_tasks_running_lock);
475     while (num_tasks_running < kMaxBestEffortTasks)
476       num_tasks_running_cv.Wait();
477   }
478 
479   // Update the priority of all TaskRunners to USER_BLOCKING.
480   for (size_t i = 0; i < kMaxTasks; ++i)
481     task_runners[i]->UpdatePriority(TaskPriority::USER_BLOCKING);
482 
483   // Wait until all posted tasks start running. This should not block forever,
484   // even in a thread group that enforces a maximum number of concurrent
485   // BEST_EFFORT tasks lower than |kMaxTasks|.
486   static_assert(kMaxBestEffortTasks < kMaxTasks, "");
487   {
488     CheckedAutoLock auto_lock(num_tasks_running_lock);
489     while (num_tasks_running < kMaxTasks)
490       num_tasks_running_cv.Wait();
491   }
492 
493   task_tracker_.FlushForTesting();
494 }
495 
496 // Regression test for crbug.com/955953.
TEST_P(ThreadGroupTestAllExecutionModes,ScopedBlockingCallTwice)497 TEST_P(ThreadGroupTestAllExecutionModes, ScopedBlockingCallTwice) {
498   StartThreadGroup();
499   auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
500       execution_mode(), &mock_pooled_task_runner_delegate_, {MayBlock()});
501 
502   TestWaitableEvent task_ran;
503   task_runner->PostTask(FROM_HERE,
504                         BindOnce(
505                             [](TestWaitableEvent* task_ran) {
506                               {
507                                 ScopedBlockingCall scoped_blocking_call(
508                                     FROM_HERE, BlockingType::MAY_BLOCK);
509                               }
510                               {
511                                 ScopedBlockingCall scoped_blocking_call(
512                                     FROM_HERE, BlockingType::MAY_BLOCK);
513                               }
514                               task_ran->Signal();
515                             },
516                             Unretained(&task_ran)));
517   task_ran.Wait();
518 }
519 
520 #if BUILDFLAG(IS_WIN)
TEST_P(ThreadGroupTestAllExecutionModes,COMMTAWorkerEnvironment)521 TEST_P(ThreadGroupTestAllExecutionModes, COMMTAWorkerEnvironment) {
522   StartThreadGroup(ThreadGroup::WorkerEnvironment::COM_MTA);
523   auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
524       execution_mode(), &mock_pooled_task_runner_delegate_);
525 
526   TestWaitableEvent task_ran;
527   task_runner->PostTask(
528       FROM_HERE, BindOnce(
529                      [](TestWaitableEvent* task_ran) {
530                        win::AssertComApartmentType(win::ComApartmentType::MTA);
531                        task_ran->Signal();
532                      },
533                      Unretained(&task_ran)));
534   task_ran.Wait();
535 }
536 
TEST_P(ThreadGroupTestAllExecutionModes,NoWorkerEnvironment)537 TEST_P(ThreadGroupTestAllExecutionModes, NoWorkerEnvironment) {
538   StartThreadGroup(ThreadGroup::WorkerEnvironment::NONE);
539   auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
540       execution_mode(), &mock_pooled_task_runner_delegate_);
541 
542   TestWaitableEvent task_ran;
543   task_runner->PostTask(
544       FROM_HERE, BindOnce(
545                      [](TestWaitableEvent* task_ran) {
546                        win::AssertComApartmentType(win::ComApartmentType::NONE);
547                        task_ran->Signal();
548                      },
549                      Unretained(&task_ran)));
550   task_ran.Wait();
551 }
552 #endif
553 
554 // Verifies that ShouldYield() returns false when there is no pending task.
TEST_F(ThreadGroupTest,ShouldYieldSingleTask)555 TEST_F(ThreadGroupTest, ShouldYieldSingleTask) {
556   StartThreadGroup();
557 
558   test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
559                                &mock_pooled_task_runner_delegate_)
560       ->PostTask(FROM_HERE, BindLambdaForTesting([&] {
561                    EXPECT_FALSE(thread_group_->ShouldYield(
562                        {TaskPriority::BEST_EFFORT, TimeTicks::Now()}));
563                    EXPECT_FALSE(thread_group_->ShouldYield(
564                        {TaskPriority::USER_VISIBLE, TimeTicks::Now()}));
565                    EXPECT_FALSE(thread_group_->ShouldYield(
566                        {TaskPriority::USER_VISIBLE, TimeTicks::Now()}));
567                  }));
568 
569   task_tracker_.FlushForTesting();
570 }
571 
572 // Verify that tasks from a JobTaskSource run at the intended concurrency.
TEST_F(ThreadGroupTest,ScheduleJobTaskSource)573 TEST_F(ThreadGroupTest, ScheduleJobTaskSource) {
574   StartThreadGroup();
575 
576   TestWaitableEvent threads_running;
577   TestWaitableEvent threads_continue;
578 
579   RepeatingClosure threads_running_barrier = BarrierClosure(
580       kMaxTasks,
581       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
582 
583   auto job_task = base::MakeRefCounted<test::MockJobTask>(
584       BindLambdaForTesting(
585           [&threads_running_barrier, &threads_continue](JobDelegate*) {
586             threads_running_barrier.Run();
587             threads_continue.Wait();
588           }),
589       /* num_tasks_to_run */ kMaxTasks);
590   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
591       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
592 
593   auto registered_task_source =
594       task_tracker_.RegisterTaskSource(std::move(task_source));
595   EXPECT_TRUE(registered_task_source);
596   thread_group_->PushTaskSourceAndWakeUpWorkers(
597       RegisteredTaskSourceAndTransaction::FromTaskSource(
598           std::move(registered_task_source)));
599 
600   threads_running.Wait();
601   threads_continue.Signal();
602 
603   // Flush the task tracker to be sure that no local variables are accessed by
604   // tasks after the end of the scope.
605   task_tracker_.FlushForTesting();
606 }
607 
608 // Verify that tasks from a JobTaskSource run at the intended concurrency.
TEST_F(ThreadGroupTest,ScheduleJobTaskSourceMultipleTime)609 TEST_F(ThreadGroupTest, ScheduleJobTaskSourceMultipleTime) {
610   StartThreadGroup();
611 
612   TestWaitableEvent thread_running;
613   TestWaitableEvent thread_continue;
614   auto job_task = base::MakeRefCounted<test::MockJobTask>(
615       BindLambdaForTesting([&thread_running, &thread_continue](JobDelegate*) {
616         DCHECK(!thread_running.IsSignaled());
617         thread_running.Signal();
618         thread_continue.Wait();
619       }),
620       /* num_tasks_to_run */ 1);
621   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
622       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
623 
624   thread_group_->PushTaskSourceAndWakeUpWorkers(
625       RegisteredTaskSourceAndTransaction::FromTaskSource(
626           task_tracker_.RegisterTaskSource(task_source)));
627 
628   // Enqueuing the task source again shouldn't affect the number of time it's
629   // run.
630   thread_group_->PushTaskSourceAndWakeUpWorkers(
631       RegisteredTaskSourceAndTransaction::FromTaskSource(
632           task_tracker_.RegisterTaskSource(task_source)));
633 
634   thread_running.Wait();
635   thread_continue.Signal();
636 
637   // Once the worker task ran, enqueuing the task source has no effect.
638   thread_group_->PushTaskSourceAndWakeUpWorkers(
639       RegisteredTaskSourceAndTransaction::FromTaskSource(
640           task_tracker_.RegisterTaskSource(task_source)));
641 
642   // Flush the task tracker to be sure that no local variables are accessed by
643   // tasks after the end of the scope.
644   task_tracker_.FlushForTesting();
645 }
646 
647 // Verify that Cancel() on a job stops running the worker task and causes
648 // current workers to yield.
TEST_F(ThreadGroupTest,CancelJobTaskSource)649 TEST_F(ThreadGroupTest, CancelJobTaskSource) {
650   StartThreadGroup();
651 
652   CheckedLock tasks_running_lock;
653   ConditionVariable tasks_running_cv =
654       tasks_running_lock.CreateConditionVariable();
655   bool tasks_running = false;
656 
657   // Schedule a big number of tasks.
658   auto job_task = base::MakeRefCounted<test::MockJobTask>(
659       BindLambdaForTesting([&](JobDelegate* delegate) {
660         {
661           CheckedAutoLock auto_lock(tasks_running_lock);
662           tasks_running = true;
663         }
664         tasks_running_cv.Signal();
665 
666         while (!delegate->ShouldYield()) {
667         }
668       }),
669       /* num_tasks_to_run */ kTooManyTasks);
670   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
671       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
672 
673   mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
674   JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
675 
676   // Wait for at least 1 task to start running.
677   {
678     CheckedAutoLock auto_lock(tasks_running_lock);
679     while (!tasks_running)
680       tasks_running_cv.Wait();
681   }
682 
683   // Cancels pending tasks and unblocks running ones.
684   job_handle.Cancel();
685 
686   // This should not block since the job got cancelled.
687   task_tracker_.FlushForTesting();
688 }
689 
690 // Verify that calling JobTaskSource::NotifyConcurrencyIncrease() (re-)schedule
691 // tasks with the intended concurrency.
TEST_F(ThreadGroupTest,JobTaskSourceConcurrencyIncrease)692 TEST_F(ThreadGroupTest, JobTaskSourceConcurrencyIncrease) {
693   StartThreadGroup();
694 
695   TestWaitableEvent threads_running_a;
696   TestWaitableEvent threads_continue;
697 
698   // Initially schedule half the tasks.
699   RepeatingClosure threads_running_barrier = BarrierClosure(
700       kMaxTasks / 2,
701       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running_a)));
702 
703   auto job_state = base::MakeRefCounted<test::MockJobTask>(
704       BindLambdaForTesting(
705           [&threads_running_barrier, &threads_continue](JobDelegate*) {
706             threads_running_barrier.Run();
707             threads_continue.Wait();
708           }),
709       /* num_tasks_to_run */ kMaxTasks / 2);
710   auto task_source = job_state->GetJobTaskSource(
711       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
712 
713   auto registered_task_source = task_tracker_.RegisterTaskSource(task_source);
714   EXPECT_TRUE(registered_task_source);
715   thread_group_->PushTaskSourceAndWakeUpWorkers(
716       RegisteredTaskSourceAndTransaction::FromTaskSource(
717           std::move(registered_task_source)));
718 
719   threads_running_a.Wait();
720   // Reset |threads_running_barrier| for the remaining tasks.
721   TestWaitableEvent threads_running_b;
722   threads_running_barrier = BarrierClosure(
723       kMaxTasks / 2,
724       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running_b)));
725   job_state->SetNumTasksToRun(kMaxTasks);
726 
727   // Unblocks tasks to let them racily wait for NotifyConcurrencyIncrease() to
728   // be called.
729   threads_continue.Signal();
730   task_source->NotifyConcurrencyIncrease();
731   // Wait for the remaining tasks. This should not block forever.
732   threads_running_b.Wait();
733 
734   // Flush the task tracker to be sure that no local variables are accessed by
735   // tasks after the end of the scope.
736   task_tracker_.FlushForTesting();
737 }
738 
739 // Verify that a JobTaskSource that becomes empty while in the queue eventually
740 // gets discarded.
TEST_F(ThreadGroupTest,ScheduleEmptyJobTaskSource)741 TEST_F(ThreadGroupTest, ScheduleEmptyJobTaskSource) {
742   StartThreadGroup();
743 
744   task_tracker_.SetCanRunPolicy(CanRunPolicy::kNone);
745 
746   auto job_task = base::MakeRefCounted<test::MockJobTask>(
747       BindRepeating([](JobDelegate*) { ShouldNotRun(); }),
748       /* num_tasks_to_run */ 1);
749   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
750       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
751 
752   auto registered_task_source =
753       task_tracker_.RegisterTaskSource(std::move(task_source));
754   EXPECT_TRUE(registered_task_source);
755   thread_group_->PushTaskSourceAndWakeUpWorkers(
756       RegisteredTaskSourceAndTransaction::FromTaskSource(
757           std::move(registered_task_source)));
758 
759   // The worker task will never run.
760   job_task->SetNumTasksToRun(0);
761 
762   task_tracker_.SetCanRunPolicy(CanRunPolicy::kAll);
763   thread_group_->DidUpdateCanRunPolicy();
764 
765   // This should not block since there's no task to run.
766   task_tracker_.FlushForTesting();
767 }
768 
769 // Verify that Join() on a job contributes to max concurrency and waits for all
770 // workers to return.
TEST_F(ThreadGroupTest,JoinJobTaskSource)771 TEST_F(ThreadGroupTest, JoinJobTaskSource) {
772   StartThreadGroup();
773 
774   TestWaitableEvent threads_continue;
775   RepeatingClosure threads_continue_barrier = BarrierClosure(
776       kMaxTasks + 1,
777       BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_continue)));
778 
779   auto job_task = base::MakeRefCounted<test::MockJobTask>(
780       BindLambdaForTesting([&](JobDelegate*) {
781         threads_continue_barrier.Run();
782         threads_continue.Wait();
783       }),
784       /* num_tasks_to_run */ kMaxTasks + 1);
785   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
786       FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
787 
788   mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
789   JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
790   job_handle.Join();
791   // All worker tasks should complete before Join() returns.
792   EXPECT_EQ(0U, job_task->GetMaxConcurrency(0));
793   thread_group_->JoinForTesting();
794   EXPECT_EQ(1U, task_source->HasOneRef());
795   // Prevent TearDown() from calling JoinForTesting() again.
796   mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
797   thread_group_ = nullptr;
798 }
799 
800 // Verify that finishing work outside of a job unblocks workers with a stale
801 // max concurrency.
TEST_F(ThreadGroupTest,JoinJobTaskSourceStaleConcurrency)802 TEST_F(ThreadGroupTest, JoinJobTaskSourceStaleConcurrency) {
803   StartThreadGroup();
804 
805   TestWaitableEvent thread_running;
806   std::atomic_size_t max_concurrency(1);
807   auto task_source = MakeRefCounted<JobTaskSource>(
808       FROM_HERE, TaskTraits{},
809       BindLambdaForTesting([&](JobDelegate*) { thread_running.Signal(); }),
810       BindLambdaForTesting(
811           [&](size_t /*worker_count*/) -> size_t { return max_concurrency; }),
812       &mock_pooled_task_runner_delegate_);
813 
814   mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
815   JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
816   thread_running.Wait();
817 
818   // Racily update max concurrency to unblock the task that was waiting on
819   // NotifyMaxConcurrency().
820   max_concurrency = 0;
821   job_handle.Join();
822 
823   // This should not block since the job was joined.
824   task_tracker_.FlushForTesting();
825 }
826 
827 // Verify that cancelling a job unblocks workers with a stale max concurrency.
TEST_F(ThreadGroupTest,CancelJobTaskSourceWithStaleConcurrency)828 TEST_F(ThreadGroupTest, CancelJobTaskSourceWithStaleConcurrency) {
829   StartThreadGroup();
830 
831   TestWaitableEvent thread_running;
832   auto task_source = MakeRefCounted<JobTaskSource>(
833       FROM_HERE, TaskTraits{},
834       BindLambdaForTesting([&](JobDelegate*) { thread_running.Signal(); }),
835       BindRepeating([](size_t /*worker_count*/) -> size_t { return 1; }),
836       &mock_pooled_task_runner_delegate_);
837 
838   mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
839   JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
840   thread_running.Wait();
841   job_handle.Cancel();
842 
843   // This should not block since the job got cancelled.
844   task_tracker_.FlushForTesting();
845 }
846 
847 // Verify that the maximum number of BEST_EFFORT tasks that can run concurrently
848 // in a thread group does not affect JobTaskSource with a priority that was
849 // increased from BEST_EFFORT to USER_BLOCKING.
TEST_F(ThreadGroupTest,JobTaskSourceUpdatePriority)850 TEST_F(ThreadGroupTest, JobTaskSourceUpdatePriority) {
851   StartThreadGroup();
852 
853   CheckedLock num_tasks_running_lock;
854 
855   ConditionVariable num_tasks_running_cv =
856       num_tasks_running_lock.CreateConditionVariable();
857   num_tasks_running_cv.declare_only_used_while_idle();
858 
859   size_t num_tasks_running = 0;
860 
861   auto job_task = base::MakeRefCounted<test::MockJobTask>(
862       BindLambdaForTesting([&](JobDelegate*) {
863         // Increment the number of tasks running.
864         {
865           CheckedAutoLock auto_lock(num_tasks_running_lock);
866           ++num_tasks_running;
867         }
868         num_tasks_running_cv.Broadcast();
869 
870         // Wait until all posted tasks are running.
871         CheckedAutoLock auto_lock(num_tasks_running_lock);
872         while (num_tasks_running < kMaxTasks)
873           num_tasks_running_cv.Wait();
874       }),
875       /* num_tasks_to_run */ kMaxTasks);
876   scoped_refptr<JobTaskSource> task_source =
877       job_task->GetJobTaskSource(FROM_HERE, {TaskPriority::BEST_EFFORT},
878                                  &mock_pooled_task_runner_delegate_);
879 
880   auto registered_task_source = task_tracker_.RegisterTaskSource(task_source);
881   EXPECT_TRUE(registered_task_source);
882   thread_group_->PushTaskSourceAndWakeUpWorkers(
883       RegisteredTaskSourceAndTransaction::FromTaskSource(
884           std::move(registered_task_source)));
885 
886   // Wait until |kMaxBestEffort| tasks start running.
887   {
888     CheckedAutoLock auto_lock(num_tasks_running_lock);
889     while (num_tasks_running < kMaxBestEffortTasks)
890       num_tasks_running_cv.Wait();
891   }
892 
893   // Update the priority to USER_BLOCKING.
894   auto transaction = task_source->BeginTransaction();
895   transaction.UpdatePriority(TaskPriority::USER_BLOCKING);
896   thread_group_->UpdateSortKey(std::move(transaction));
897 
898   // Wait until all posted tasks start running. This should not block forever,
899   // even in a thread group that enforces a maximum number of concurrent
900   // BEST_EFFORT tasks lower than |kMaxTasks|.
901   static_assert(kMaxBestEffortTasks < kMaxTasks, "");
902   {
903     CheckedAutoLock auto_lock(num_tasks_running_lock);
904     while (num_tasks_running < kMaxTasks)
905       num_tasks_running_cv.Wait();
906   }
907 
908   // Flush the task tracker to be sure that no local variables are accessed by
909   // tasks after the end of the scope.
910   task_tracker_.FlushForTesting();
911 }
912 
913 INSTANTIATE_TEST_SUITE_P(GenericParallel,
914                          ThreadGroupTestAllExecutionModes,
915                          ::testing::Values(TaskSourceExecutionMode::kParallel));
916 INSTANTIATE_TEST_SUITE_P(
917     GenericSequenced,
918     ThreadGroupTestAllExecutionModes,
919     ::testing::Values(TaskSourceExecutionMode::kSequenced));
920 INSTANTIATE_TEST_SUITE_P(GenericJob,
921                          ThreadGroupTestAllExecutionModes,
922                          ::testing::Values(TaskSourceExecutionMode::kJob));
923 
924 }  // namespace internal
925 }  // namespace base
926