1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_group.h"
6
7 #include <memory>
8 #include <tuple>
9 #include <utility>
10
11 #include "base/barrier_closure.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/location.h"
15 #include "base/memory/ref_counted.h"
16 #include "base/task/task_runner.h"
17 #include "base/task/task_traits.h"
18 #include "base/task/thread_pool/can_run_policy_test.h"
19 #include "base/task/thread_pool/delayed_task_manager.h"
20 #include "base/task/thread_pool/pooled_sequenced_task_runner.h"
21 #include "base/task/thread_pool/task_tracker.h"
22 #include "base/task/thread_pool/test_task_factory.h"
23 #include "base/task/thread_pool/test_utils.h"
24 #include "base/task/thread_pool/thread_group_impl.h"
25 #include "base/test/bind.h"
26 #include "base/test/scoped_feature_list.h"
27 #include "base/test/test_timeouts.h"
28 #include "base/test/test_waitable_event.h"
29 #include "base/threading/platform_thread.h"
30 #include "base/threading/scoped_blocking_call.h"
31 #include "base/threading/scoped_blocking_call_internal.h"
32 #include "base/threading/simple_thread.h"
33 #include "base/threading/thread.h"
34 #include "base/threading/thread_restrictions.h"
35 #include "build/build_config.h"
36 #include "testing/gtest/include/gtest/gtest.h"
37
38 #if BUILDFLAG(IS_WIN)
39 #include "base/win/com_init_check_hook.h"
40 #include "base/win/com_init_util.h"
41 #endif
42
43 namespace base {
44 namespace internal {
45
46 namespace {
47
48 constexpr size_t kMaxTasks = 4;
49 constexpr size_t kTooManyTasks = 1000;
50 // By default, tests allow half of the thread group to be used by best-effort
51 // tasks.
52 constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
53 constexpr size_t kNumThreadsPostingTasks = 4;
54 constexpr size_t kNumTasksPostedPerThread = 150;
55
56 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
57
58 class ThreadPostingTasks : public SimpleThread {
59 public:
60 // Constructs a thread that posts |num_tasks_posted_per_thread| tasks to
61 // |thread_group| through an |execution_mode| task runner. If
62 // |post_nested_task| is YES, each task posted by this thread posts another
63 // task when it runs.
ThreadPostingTasks(test::MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate_,TaskSourceExecutionMode execution_mode,PostNestedTask post_nested_task)64 ThreadPostingTasks(
65 test::MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate_,
66 TaskSourceExecutionMode execution_mode,
67 PostNestedTask post_nested_task)
68 : SimpleThread("ThreadPostingTasks"),
69 post_nested_task_(post_nested_task),
70 factory_(test::CreatePooledTaskRunnerWithExecutionMode(
71 execution_mode,
72 mock_pooled_task_runner_delegate_),
73 execution_mode) {}
74 ThreadPostingTasks(const ThreadPostingTasks&) = delete;
75 ThreadPostingTasks& operator=(const ThreadPostingTasks&) = delete;
76
factory() const77 const test::TestTaskFactory* factory() const { return &factory_; }
78
79 private:
Run()80 void Run() override {
81 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
82 EXPECT_TRUE(factory_.PostTask(post_nested_task_, OnceClosure()));
83 }
84
85 const scoped_refptr<TaskRunner> task_runner_;
86 const PostNestedTask post_nested_task_;
87 test::TestTaskFactory factory_;
88 };
89
90 class ThreadGroupTestBase : public testing::Test, public ThreadGroup::Delegate {
91 public:
92 ThreadGroupTestBase(const ThreadGroupTestBase&) = delete;
93 ThreadGroupTestBase& operator=(const ThreadGroupTestBase&) = delete;
94
95 protected:
96 ThreadGroupTestBase() = default;
97
SetUp()98 void SetUp() override {
99 service_thread_.Start();
100 delayed_task_manager_.Start(service_thread_.task_runner());
101 CreateThreadGroup();
102 }
103
TearDown()104 void TearDown() override {
105 delayed_task_manager_.Shutdown();
106 service_thread_.Stop();
107 DestroyThreadGroup();
108 }
109
CreateThreadGroup()110 void CreateThreadGroup() {
111 ASSERT_FALSE(thread_group_);
112 thread_group_ = std::make_unique<ThreadGroupImpl>(
113 "TestThreadGroup", "A", ThreadType::kDefault,
114 task_tracker_.GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
115
116 mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
117 }
118
StartThreadGroup(ThreadGroup::WorkerEnvironment worker_environment=ThreadGroup::WorkerEnvironment::NONE)119 void StartThreadGroup(ThreadGroup::WorkerEnvironment worker_environment =
120 ThreadGroup::WorkerEnvironment::NONE) {
121 ASSERT_TRUE(thread_group_);
122 ThreadGroupImpl* thread_group_impl =
123 static_cast<ThreadGroupImpl*>(thread_group_.get());
124 thread_group_impl->Start(kMaxTasks, kMaxBestEffortTasks, TimeDelta::Max(),
125 service_thread_.task_runner(), nullptr,
126 worker_environment);
127 }
128
DestroyThreadGroup()129 void DestroyThreadGroup() {
130 if (!thread_group_) {
131 return;
132 }
133
134 thread_group_->JoinForTesting();
135 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
136 thread_group_.reset();
137 }
138
139 Thread service_thread_{"ThreadPoolServiceThread"};
140 TaskTracker task_tracker_;
141 DelayedTaskManager delayed_task_manager_;
142 test::MockPooledTaskRunnerDelegate mock_pooled_task_runner_delegate_ = {
143 task_tracker_.GetTrackedRef(), &delayed_task_manager_};
144
145 std::unique_ptr<ThreadGroup> thread_group_;
146
147 private:
148 // ThreadGroup::Delegate:
GetThreadGroupForTraits(const TaskTraits & traits)149 ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override {
150 return thread_group_.get();
151 }
152
153 TrackedRefFactory<ThreadGroup::Delegate> tracked_ref_factory_{this};
154 };
155
156 using ThreadGroupTest = ThreadGroupTestBase;
157
158 class ThreadGroupTestAllJobImpls : public ThreadGroupTestBase,
159 public testing::WithParamInterface<bool> {
160 public:
ThreadGroupTestAllJobImpls()161 ThreadGroupTestAllJobImpls() {
162 if (GetParam()) {
163 scoped_feature_list_.InitAndEnableFeature(kUseNewJobImplementation);
164 } else {
165 scoped_feature_list_.InitAndDisableFeature(kUseNewJobImplementation);
166 }
167 }
168
169 private:
170 base::test::ScopedFeatureList scoped_feature_list_;
171 };
172
173 // TODO(etiennep): Audit tests that don't need TaskSourceExecutionMode
174 // parameter.
175 class ThreadGroupTestAllExecutionModes
176 : public ThreadGroupTestBase,
177 public testing::WithParamInterface<TaskSourceExecutionMode> {
178 public:
179 ThreadGroupTestAllExecutionModes() = default;
180 ThreadGroupTestAllExecutionModes(const ThreadGroupTestAllExecutionModes&) =
181 delete;
182 ThreadGroupTestAllExecutionModes& operator=(
183 const ThreadGroupTestAllExecutionModes&) = delete;
184
execution_mode() const185 TaskSourceExecutionMode execution_mode() const { return GetParam(); }
186
CreatePooledTaskRunner(const TaskTraits & traits={})187 scoped_refptr<TaskRunner> CreatePooledTaskRunner(
188 const TaskTraits& traits = {}) {
189 return test::CreatePooledTaskRunnerWithExecutionMode(
190 execution_mode(), &mock_pooled_task_runner_delegate_, traits);
191 }
192 };
193
ShouldNotRun()194 void ShouldNotRun() {
195 ADD_FAILURE() << "Ran a task that shouldn't run.";
196 }
197
198 } // namespace
199
TEST_P(ThreadGroupTestAllExecutionModes,PostTasks)200 TEST_P(ThreadGroupTestAllExecutionModes, PostTasks) {
201 StartThreadGroup();
202 // Create threads to post tasks.
203 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
204 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
205 threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
206 &mock_pooled_task_runner_delegate_, execution_mode(),
207 PostNestedTask::NO));
208 threads_posting_tasks.back()->Start();
209 }
210
211 // Wait for all tasks to run.
212 for (const auto& thread_posting_tasks : threads_posting_tasks) {
213 thread_posting_tasks->Join();
214 thread_posting_tasks->factory()->WaitForAllTasksToRun();
215 }
216
217 // Flush the task tracker to be sure that no task accesses its TestTaskFactory
218 // after |thread_posting_tasks| is destroyed.
219 task_tracker_.FlushForTesting();
220 }
221
TEST_P(ThreadGroupTestAllExecutionModes,NestedPostTasks)222 TEST_P(ThreadGroupTestAllExecutionModes, NestedPostTasks) {
223 StartThreadGroup();
224 // Create threads to post tasks. Each task posted by these threads will post
225 // another task when it runs.
226 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
227 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
228 threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
229 &mock_pooled_task_runner_delegate_, execution_mode(),
230 PostNestedTask::YES));
231 threads_posting_tasks.back()->Start();
232 }
233
234 // Wait for all tasks to run.
235 for (const auto& thread_posting_tasks : threads_posting_tasks) {
236 thread_posting_tasks->Join();
237 thread_posting_tasks->factory()->WaitForAllTasksToRun();
238 }
239
240 // Flush the task tracker to be sure that no task accesses its TestTaskFactory
241 // after |thread_posting_tasks| is destroyed.
242 task_tracker_.FlushForTesting();
243 }
244
245 // Verify that a Task can't be posted after shutdown.
TEST_P(ThreadGroupTestAllExecutionModes,PostTaskAfterShutdown)246 TEST_P(ThreadGroupTestAllExecutionModes, PostTaskAfterShutdown) {
247 StartThreadGroup();
248 auto task_runner = CreatePooledTaskRunner();
249 test::ShutdownTaskTracker(&task_tracker_);
250 EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
251 }
252
253 // Verify that a Task runs shortly after its delay expires.
TEST_P(ThreadGroupTestAllExecutionModes,PostDelayedTask)254 TEST_P(ThreadGroupTestAllExecutionModes, PostDelayedTask) {
255 StartThreadGroup();
256 // kJob doesn't support delays.
257 if (execution_mode() == TaskSourceExecutionMode::kJob)
258 return;
259
260 TestWaitableEvent task_ran(WaitableEvent::ResetPolicy::AUTOMATIC);
261 auto task_runner = CreatePooledTaskRunner();
262
263 // Wait until the task runner is up and running to make sure the test below is
264 // solely timing the delayed task, not bringing up a physical thread.
265 task_runner->PostTask(
266 FROM_HERE, BindOnce(&TestWaitableEvent::Signal, Unretained(&task_ran)));
267 task_ran.Wait();
268 ASSERT_TRUE(!task_ran.IsSignaled());
269
270 // Post a task with a short delay.
271 const TimeTicks start_time = TimeTicks::Now();
272 EXPECT_TRUE(task_runner->PostDelayedTask(
273 FROM_HERE, BindOnce(&TestWaitableEvent::Signal, Unretained(&task_ran)),
274 TestTimeouts::tiny_timeout()));
275
276 // Wait until the task runs.
277 task_ran.Wait();
278
279 // Expect the task to run after its delay expires, but no more than a
280 // reasonable amount of time after that (overloaded bots can be slow sometimes
281 // so give it 10X flexibility).
282 const TimeDelta actual_delay = TimeTicks::Now() - start_time;
283 EXPECT_GE(actual_delay, TestTimeouts::tiny_timeout());
284 EXPECT_LT(actual_delay, 10 * TestTimeouts::tiny_timeout());
285 }
286
287 // Verify that the RunsTasksInCurrentSequence() method of a SEQUENCED TaskRunner
288 // returns false when called from a task that isn't part of the sequence. Note:
289 // Tests that use TestTaskFactory already verify that
290 // RunsTasksInCurrentSequence() returns true when appropriate so this method
291 // complements it to get full coverage of that method.
TEST_P(ThreadGroupTestAllExecutionModes,SequencedRunsTasksInCurrentSequence)292 TEST_P(ThreadGroupTestAllExecutionModes, SequencedRunsTasksInCurrentSequence) {
293 StartThreadGroup();
294 auto task_runner = CreatePooledTaskRunner();
295 auto sequenced_task_runner = test::CreatePooledSequencedTaskRunner(
296 TaskTraits(), &mock_pooled_task_runner_delegate_);
297
298 TestWaitableEvent task_ran;
299 task_runner->PostTask(
300 FROM_HERE,
301 BindOnce(
302 [](scoped_refptr<SequencedTaskRunner> sequenced_task_runner,
303 TestWaitableEvent* task_ran) {
304 EXPECT_FALSE(sequenced_task_runner->RunsTasksInCurrentSequence());
305 task_ran->Signal();
306 },
307 sequenced_task_runner, Unretained(&task_ran)));
308 task_ran.Wait();
309 }
310
311 // Verify that tasks posted before Start run after Start.
TEST_P(ThreadGroupTestAllExecutionModes,PostBeforeStart)312 TEST_P(ThreadGroupTestAllExecutionModes, PostBeforeStart) {
313 TestWaitableEvent task_1_running;
314 TestWaitableEvent task_2_running;
315
316 auto task_runner = CreatePooledTaskRunner();
317 task_runner->PostTask(FROM_HERE, BindOnce(&TestWaitableEvent::Signal,
318 Unretained(&task_1_running)));
319 task_runner->PostTask(FROM_HERE, BindOnce(&TestWaitableEvent::Signal,
320 Unretained(&task_2_running)));
321
322 // Workers should not be created and tasks should not run before the thread
323 // group is started. The sleep is to give time for the tasks to potentially
324 // run.
325 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
326 EXPECT_FALSE(task_1_running.IsSignaled());
327 EXPECT_FALSE(task_2_running.IsSignaled());
328
329 StartThreadGroup();
330
331 // Tasks should run shortly after the thread group is started.
332 task_1_running.Wait();
333 task_2_running.Wait();
334
335 task_tracker_.FlushForTesting();
336 }
337
338 // Verify that tasks only run when allowed by the CanRunPolicy.
TEST_P(ThreadGroupTestAllExecutionModes,CanRunPolicyBasic)339 TEST_P(ThreadGroupTestAllExecutionModes, CanRunPolicyBasic) {
340 StartThreadGroup();
341 test::TestCanRunPolicyBasic(
342 thread_group_.get(),
343 [this](TaskPriority priority) {
344 return CreatePooledTaskRunner({priority});
345 },
346 &task_tracker_);
347 }
348
TEST_F(ThreadGroupTest,CanRunPolicyUpdatedBeforeRun)349 TEST_F(ThreadGroupTest, CanRunPolicyUpdatedBeforeRun) {
350 StartThreadGroup();
351 // This test only works with SequencedTaskRunner become it assumes
352 // ordered execution of 2 posted tasks.
353 test::TestCanRunPolicyChangedBeforeRun(
354 thread_group_.get(),
355 [this](TaskPriority priority) {
356 return test::CreatePooledSequencedTaskRunner(
357 {priority}, &mock_pooled_task_runner_delegate_);
358 },
359 &task_tracker_);
360 }
361
TEST_P(ThreadGroupTestAllExecutionModes,CanRunPolicyLoad)362 TEST_P(ThreadGroupTestAllExecutionModes, CanRunPolicyLoad) {
363 StartThreadGroup();
364 test::TestCanRunPolicyLoad(
365 thread_group_.get(),
366 [this](TaskPriority priority) {
367 return CreatePooledTaskRunner({priority});
368 },
369 &task_tracker_);
370 }
371
372 // Verifies that ShouldYield() returns true for a priority that is not allowed
373 // to run by the CanRunPolicy.
TEST_F(ThreadGroupTest,CanRunPolicyShouldYield)374 TEST_F(ThreadGroupTest, CanRunPolicyShouldYield) {
375 StartThreadGroup();
376
377 task_tracker_.SetCanRunPolicy(CanRunPolicy::kNone);
378 thread_group_->DidUpdateCanRunPolicy();
379 EXPECT_TRUE(
380 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
381 EXPECT_TRUE(
382 thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
383
384 task_tracker_.SetCanRunPolicy(CanRunPolicy::kForegroundOnly);
385 thread_group_->DidUpdateCanRunPolicy();
386 EXPECT_TRUE(
387 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
388 EXPECT_FALSE(
389 thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
390
391 task_tracker_.SetCanRunPolicy(CanRunPolicy::kAll);
392 thread_group_->DidUpdateCanRunPolicy();
393 EXPECT_FALSE(
394 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
395 EXPECT_FALSE(
396 thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
397 }
398
399 // Verify that the maximum number of BEST_EFFORT tasks that can run concurrently
400 // in a thread group does not affect Sequences with a priority that was
401 // increased from BEST_EFFORT to USER_BLOCKING.
TEST_F(ThreadGroupTest,UpdatePriorityBestEffortToUserBlocking)402 TEST_F(ThreadGroupTest, UpdatePriorityBestEffortToUserBlocking) {
403 StartThreadGroup();
404
405 CheckedLock num_tasks_running_lock;
406
407 std::unique_ptr<ConditionVariable> num_tasks_running_cv =
408 num_tasks_running_lock.CreateConditionVariable();
409 num_tasks_running_cv->declare_only_used_while_idle();
410
411 size_t num_tasks_running = 0;
412
413 // Post |kMaxTasks| BEST_EFFORT tasks that block until they all start running.
414 std::vector<scoped_refptr<PooledSequencedTaskRunner>> task_runners;
415
416 for (size_t i = 0; i < kMaxTasks; ++i) {
417 task_runners.push_back(MakeRefCounted<PooledSequencedTaskRunner>(
418 TaskTraits(TaskPriority::BEST_EFFORT),
419 &mock_pooled_task_runner_delegate_));
420 task_runners.back()->PostTask(
421 FROM_HERE, BindLambdaForTesting([&]() {
422 // Increment the number of tasks running.
423 {
424 CheckedAutoLock auto_lock(num_tasks_running_lock);
425 ++num_tasks_running;
426 }
427 num_tasks_running_cv->Broadcast();
428
429 // Wait until all posted tasks are running.
430 CheckedAutoLock auto_lock(num_tasks_running_lock);
431 while (num_tasks_running < kMaxTasks)
432 num_tasks_running_cv->Wait();
433 }));
434 }
435
436 // Wait until |kMaxBestEffort| tasks start running.
437 {
438 CheckedAutoLock auto_lock(num_tasks_running_lock);
439 while (num_tasks_running < kMaxBestEffortTasks)
440 num_tasks_running_cv->Wait();
441 }
442
443 // Update the priority of all TaskRunners to USER_BLOCKING.
444 for (size_t i = 0; i < kMaxTasks; ++i)
445 task_runners[i]->UpdatePriority(TaskPriority::USER_BLOCKING);
446
447 // Wait until all posted tasks start running. This should not block forever,
448 // even in a thread group that enforces a maximum number of concurrent
449 // BEST_EFFORT tasks lower than |kMaxTasks|.
450 static_assert(kMaxBestEffortTasks < kMaxTasks, "");
451 {
452 CheckedAutoLock auto_lock(num_tasks_running_lock);
453 while (num_tasks_running < kMaxTasks)
454 num_tasks_running_cv->Wait();
455 }
456
457 task_tracker_.FlushForTesting();
458 }
459
460 // Regression test for crbug.com/955953.
TEST_P(ThreadGroupTestAllExecutionModes,ScopedBlockingCallTwice)461 TEST_P(ThreadGroupTestAllExecutionModes, ScopedBlockingCallTwice) {
462 StartThreadGroup();
463 auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
464 execution_mode(), &mock_pooled_task_runner_delegate_, {MayBlock()});
465
466 TestWaitableEvent task_ran;
467 task_runner->PostTask(FROM_HERE,
468 BindOnce(
469 [](TestWaitableEvent* task_ran) {
470 {
471 ScopedBlockingCall scoped_blocking_call(
472 FROM_HERE, BlockingType::MAY_BLOCK);
473 }
474 {
475 ScopedBlockingCall scoped_blocking_call(
476 FROM_HERE, BlockingType::MAY_BLOCK);
477 }
478 task_ran->Signal();
479 },
480 Unretained(&task_ran)));
481 task_ran.Wait();
482 }
483
484 #if BUILDFLAG(IS_WIN)
TEST_P(ThreadGroupTestAllExecutionModes,COMMTAWorkerEnvironment)485 TEST_P(ThreadGroupTestAllExecutionModes, COMMTAWorkerEnvironment) {
486 StartThreadGroup(ThreadGroup::WorkerEnvironment::COM_MTA);
487 auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
488 execution_mode(), &mock_pooled_task_runner_delegate_);
489
490 TestWaitableEvent task_ran;
491 task_runner->PostTask(
492 FROM_HERE, BindOnce(
493 [](TestWaitableEvent* task_ran) {
494 win::AssertComApartmentType(win::ComApartmentType::MTA);
495 task_ran->Signal();
496 },
497 Unretained(&task_ran)));
498 task_ran.Wait();
499 }
500
TEST_P(ThreadGroupTestAllExecutionModes,NoWorkerEnvironment)501 TEST_P(ThreadGroupTestAllExecutionModes, NoWorkerEnvironment) {
502 StartThreadGroup(ThreadGroup::WorkerEnvironment::NONE);
503 auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
504 execution_mode(), &mock_pooled_task_runner_delegate_);
505
506 TestWaitableEvent task_ran;
507 task_runner->PostTask(
508 FROM_HERE, BindOnce(
509 [](TestWaitableEvent* task_ran) {
510 win::AssertComApartmentType(win::ComApartmentType::NONE);
511 task_ran->Signal();
512 },
513 Unretained(&task_ran)));
514 task_ran.Wait();
515 }
516 #endif
517
518 // Verifies that ShouldYield() returns false when there is no pending task.
TEST_F(ThreadGroupTest,ShouldYieldSingleTask)519 TEST_F(ThreadGroupTest, ShouldYieldSingleTask) {
520 StartThreadGroup();
521
522 test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
523 &mock_pooled_task_runner_delegate_)
524 ->PostTask(FROM_HERE, BindLambdaForTesting([&]() {
525 EXPECT_FALSE(thread_group_->ShouldYield(
526 {TaskPriority::BEST_EFFORT, TimeTicks::Now()}));
527 EXPECT_FALSE(thread_group_->ShouldYield(
528 {TaskPriority::USER_VISIBLE, TimeTicks::Now()}));
529 EXPECT_FALSE(thread_group_->ShouldYield(
530 {TaskPriority::USER_VISIBLE, TimeTicks::Now()}));
531 }));
532
533 task_tracker_.FlushForTesting();
534 }
535
536 // Verify that tasks from a JobTaskSource run at the intended concurrency.
TEST_P(ThreadGroupTestAllJobImpls,ScheduleJobTaskSource)537 TEST_P(ThreadGroupTestAllJobImpls, ScheduleJobTaskSource) {
538 StartThreadGroup();
539
540 TestWaitableEvent threads_running;
541 TestWaitableEvent threads_continue;
542
543 RepeatingClosure threads_running_barrier = BarrierClosure(
544 kMaxTasks,
545 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
546
547 auto job_task = base::MakeRefCounted<test::MockJobTask>(
548 BindLambdaForTesting(
549 [&threads_running_barrier, &threads_continue](JobDelegate*) {
550 threads_running_barrier.Run();
551 threads_continue.Wait();
552 }),
553 /* num_tasks_to_run */ kMaxTasks);
554 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
555 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
556 task_source->NotifyConcurrencyIncrease();
557
558 threads_running.Wait();
559 threads_continue.Signal();
560
561 // Flush the task tracker to be sure that no local variables are accessed by
562 // tasks after the end of the scope.
563 task_tracker_.FlushForTesting();
564 }
565
566 // Verify that tasks from a JobTaskSource run at the intended concurrency.
TEST_P(ThreadGroupTestAllJobImpls,ScheduleJobTaskSourceMultipleTime)567 TEST_P(ThreadGroupTestAllJobImpls, ScheduleJobTaskSourceMultipleTime) {
568 StartThreadGroup();
569
570 TestWaitableEvent thread_running;
571 TestWaitableEvent thread_continue;
572 auto job_task = base::MakeRefCounted<test::MockJobTask>(
573 BindLambdaForTesting([&thread_running, &thread_continue](JobDelegate*) {
574 DCHECK(!thread_running.IsSignaled());
575 thread_running.Signal();
576 thread_continue.Wait();
577 }),
578 /* num_tasks_to_run */ 1);
579 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
580 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
581
582 // Multiple calls to NotifyConcurrencyIncrease() should have the same effect
583 // as a single call.
584 task_source->NotifyConcurrencyIncrease();
585 task_source->NotifyConcurrencyIncrease();
586
587 thread_running.Wait();
588 thread_continue.Signal();
589
590 // Once the worker task ran, enqueuing the task source has no effect.
591 task_source->NotifyConcurrencyIncrease();
592
593 // Flush the task tracker to be sure that no local variables are accessed by
594 // tasks after the end of the scope.
595 task_tracker_.FlushForTesting();
596 }
597
598 // Verify that Cancel() on a job stops running the worker task and causes
599 // current workers to yield.
TEST_P(ThreadGroupTestAllJobImpls,CancelJobTaskSource)600 TEST_P(ThreadGroupTestAllJobImpls, CancelJobTaskSource) {
601 StartThreadGroup();
602
603 CheckedLock tasks_running_lock;
604 std::unique_ptr<ConditionVariable> tasks_running_cv =
605 tasks_running_lock.CreateConditionVariable();
606 bool tasks_running = false;
607
608 // Schedule a big number of tasks.
609 auto job_task = base::MakeRefCounted<test::MockJobTask>(
610 BindLambdaForTesting([&](JobDelegate* delegate) {
611 {
612 CheckedAutoLock auto_lock(tasks_running_lock);
613 tasks_running = true;
614 }
615 tasks_running_cv->Signal();
616
617 while (!delegate->ShouldYield()) {
618 }
619 }),
620 /* num_tasks_to_run */ kTooManyTasks);
621 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
622 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
623 task_source->NotifyConcurrencyIncrease();
624
625 JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
626
627 // Wait for at least 1 task to start running.
628 {
629 CheckedAutoLock auto_lock(tasks_running_lock);
630 while (!tasks_running)
631 tasks_running_cv->Wait();
632 }
633
634 // Cancels pending tasks and unblocks running ones.
635 job_handle.Cancel();
636
637 // This should not block since the job got cancelled.
638 task_tracker_.FlushForTesting();
639 }
640
641 // Verify that calling JobTaskSource::NotifyConcurrencyIncrease() (re-)schedule
642 // tasks with the intended concurrency.
TEST_P(ThreadGroupTestAllJobImpls,JobTaskSourceConcurrencyIncrease)643 TEST_P(ThreadGroupTestAllJobImpls, JobTaskSourceConcurrencyIncrease) {
644 StartThreadGroup();
645
646 TestWaitableEvent threads_running_a;
647 TestWaitableEvent threads_continue;
648
649 // Initially schedule half the tasks.
650 RepeatingClosure threads_running_barrier = BarrierClosure(
651 kMaxTasks / 2,
652 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running_a)));
653
654 auto job_state = base::MakeRefCounted<test::MockJobTask>(
655 BindLambdaForTesting(
656 [&threads_running_barrier, &threads_continue](JobDelegate*) {
657 threads_running_barrier.Run();
658 threads_continue.Wait();
659 }),
660 /* num_tasks_to_run */ kMaxTasks / 2);
661 auto task_source = job_state->GetJobTaskSource(
662 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
663 task_source->NotifyConcurrencyIncrease();
664
665 threads_running_a.Wait();
666 // Reset |threads_running_barrier| for the remaining tasks.
667 TestWaitableEvent threads_running_b;
668 threads_running_barrier = BarrierClosure(
669 kMaxTasks / 2,
670 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running_b)));
671 job_state->SetNumTasksToRun(kMaxTasks);
672
673 // Unblocks tasks to let them racily wait for NotifyConcurrencyIncrease() to
674 // be called.
675 threads_continue.Signal();
676 task_source->NotifyConcurrencyIncrease();
677 // Wait for the remaining tasks. This should not block forever.
678 threads_running_b.Wait();
679
680 // Flush the task tracker to be sure that no local variables are accessed by
681 // tasks after the end of the scope.
682 task_tracker_.FlushForTesting();
683 }
684
685 // Verify that a JobTaskSource that becomes empty while in the queue eventually
686 // gets discarded.
TEST_P(ThreadGroupTestAllJobImpls,ScheduleEmptyJobTaskSource)687 TEST_P(ThreadGroupTestAllJobImpls, ScheduleEmptyJobTaskSource) {
688 StartThreadGroup();
689
690 task_tracker_.SetCanRunPolicy(CanRunPolicy::kNone);
691
692 auto job_task = base::MakeRefCounted<test::MockJobTask>(
693 BindRepeating([](JobDelegate*) { ShouldNotRun(); }),
694 /* num_tasks_to_run */ 1);
695 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
696 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
697 task_source->NotifyConcurrencyIncrease();
698
699 // The worker task will never run.
700 job_task->SetNumTasksToRun(0);
701
702 task_tracker_.SetCanRunPolicy(CanRunPolicy::kAll);
703 thread_group_->DidUpdateCanRunPolicy();
704
705 // This should not block since there's no task to run.
706 task_tracker_.FlushForTesting();
707 }
708
709 // Verify that Join() on a job contributes to max concurrency and waits for all
710 // workers to return.
TEST_P(ThreadGroupTestAllJobImpls,JoinJobTaskSource)711 TEST_P(ThreadGroupTestAllJobImpls, JoinJobTaskSource) {
712 StartThreadGroup();
713
714 TestWaitableEvent threads_continue;
715 RepeatingClosure threads_continue_barrier = BarrierClosure(
716 kMaxTasks + 1,
717 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_continue)));
718
719 auto job_task = base::MakeRefCounted<test::MockJobTask>(
720 BindLambdaForTesting([&](JobDelegate*) {
721 threads_continue_barrier.Run();
722 threads_continue.Wait();
723 }),
724 /* num_tasks_to_run */ kMaxTasks + 1);
725 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
726 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
727 task_source->NotifyConcurrencyIncrease();
728
729 JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
730 job_handle.Join();
731 // All worker tasks should complete before Join() returns.
732 EXPECT_EQ(0U, job_task->GetMaxConcurrency(0));
733 thread_group_->JoinForTesting();
734 EXPECT_EQ(1U, task_source->HasOneRef());
735 // Prevent TearDown() from calling JoinForTesting() again.
736 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
737 thread_group_ = nullptr;
738 }
739
740 // Verify that finishing work outside of a job unblocks workers with a stale
741 // max concurrency.
TEST_P(ThreadGroupTestAllJobImpls,JoinJobTaskSourceStaleConcurrency)742 TEST_P(ThreadGroupTestAllJobImpls, JoinJobTaskSourceStaleConcurrency) {
743 StartThreadGroup();
744
745 TestWaitableEvent thread_running;
746 std::atomic_size_t max_concurrency(1);
747 auto task_source = CreateJobTaskSource(
748 FROM_HERE, TaskTraits{},
749 BindLambdaForTesting([&](JobDelegate*) { thread_running.Signal(); }),
750 BindLambdaForTesting(
751 [&](size_t /*worker_count*/) -> size_t { return max_concurrency; }),
752 &mock_pooled_task_runner_delegate_);
753 task_source->NotifyConcurrencyIncrease();
754
755 JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
756 thread_running.Wait();
757
758 // Racily update max concurrency to unblock the task that was waiting on
759 // NotifyMaxConcurrency().
760 max_concurrency = 0;
761 job_handle.Join();
762
763 // This should not block since the job was joined.
764 task_tracker_.FlushForTesting();
765 }
766
767 // Verify that cancelling a job unblocks workers with a stale max concurrency.
TEST_P(ThreadGroupTestAllJobImpls,CancelJobTaskSourceWithStaleConcurrency)768 TEST_P(ThreadGroupTestAllJobImpls, CancelJobTaskSourceWithStaleConcurrency) {
769 StartThreadGroup();
770
771 TestWaitableEvent thread_running;
772 auto task_source = CreateJobTaskSource(
773 FROM_HERE, TaskTraits{},
774 BindLambdaForTesting([&](JobDelegate*) { thread_running.Signal(); }),
775 BindRepeating([](size_t /*worker_count*/) -> size_t { return 1; }),
776 &mock_pooled_task_runner_delegate_);
777 task_source->NotifyConcurrencyIncrease();
778
779 JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
780 thread_running.Wait();
781 job_handle.Cancel();
782
783 // This should not block since the job got cancelled.
784 task_tracker_.FlushForTesting();
785 }
786
787 // Verify that the maximum number of BEST_EFFORT tasks that can run concurrently
788 // in a thread group does not affect JobTaskSource with a priority that was
789 // increased from BEST_EFFORT to USER_BLOCKING.
TEST_P(ThreadGroupTestAllJobImpls,JobTaskSourceUpdatePriority)790 TEST_P(ThreadGroupTestAllJobImpls, JobTaskSourceUpdatePriority) {
791 StartThreadGroup();
792
793 CheckedLock num_tasks_running_lock;
794
795 std::unique_ptr<ConditionVariable> num_tasks_running_cv =
796 num_tasks_running_lock.CreateConditionVariable();
797 num_tasks_running_cv->declare_only_used_while_idle();
798
799 size_t num_tasks_running = 0;
800
801 auto job_task = base::MakeRefCounted<test::MockJobTask>(
802 BindLambdaForTesting([&](JobDelegate*) {
803 // Increment the number of tasks running.
804 {
805 CheckedAutoLock auto_lock(num_tasks_running_lock);
806 ++num_tasks_running;
807 }
808 num_tasks_running_cv->Broadcast();
809
810 // Wait until all posted tasks are running.
811 CheckedAutoLock auto_lock(num_tasks_running_lock);
812 while (num_tasks_running < kMaxTasks)
813 num_tasks_running_cv->Wait();
814 }),
815 /* num_tasks_to_run */ kMaxTasks);
816 scoped_refptr<JobTaskSource> task_source =
817 job_task->GetJobTaskSource(FROM_HERE, {TaskPriority::BEST_EFFORT},
818 &mock_pooled_task_runner_delegate_);
819 task_source->NotifyConcurrencyIncrease();
820
821 // Wait until |kMaxBestEffort| tasks start running.
822 {
823 CheckedAutoLock auto_lock(num_tasks_running_lock);
824 while (num_tasks_running < kMaxBestEffortTasks)
825 num_tasks_running_cv->Wait();
826 }
827
828 // Update the priority to USER_BLOCKING.
829 auto transaction = task_source->BeginTransaction();
830 transaction.UpdatePriority(TaskPriority::USER_BLOCKING);
831 thread_group_->UpdateSortKey(std::move(transaction));
832
833 // Wait until all posted tasks start running. This should not block forever,
834 // even in a thread group that enforces a maximum number of concurrent
835 // BEST_EFFORT tasks lower than |kMaxTasks|.
836 static_assert(kMaxBestEffortTasks < kMaxTasks, "");
837 {
838 CheckedAutoLock auto_lock(num_tasks_running_lock);
839 while (num_tasks_running < kMaxTasks)
840 num_tasks_running_cv->Wait();
841 }
842
843 // Flush the task tracker to be sure that no local variables are accessed by
844 // tasks after the end of the scope.
845 task_tracker_.FlushForTesting();
846 }
847
848 INSTANTIATE_TEST_SUITE_P(GenericParallel,
849 ThreadGroupTestAllExecutionModes,
850 ::testing::Values(TaskSourceExecutionMode::kParallel));
851 INSTANTIATE_TEST_SUITE_P(
852 GenericSequenced,
853 ThreadGroupTestAllExecutionModes,
854 ::testing::Values(TaskSourceExecutionMode::kSequenced));
855 INSTANTIATE_TEST_SUITE_P(GenericJob,
856 ThreadGroupTestAllExecutionModes,
857 ::testing::Values(TaskSourceExecutionMode::kJob));
858
859 INSTANTIATE_TEST_SUITE_P(,
860 ThreadGroupTestAllJobImpls,
861 testing::Bool(),
__anon832ccca21602(const testing::TestParamInfo<bool>& info) 862 [](const testing::TestParamInfo<bool>& info) {
863 if (info.param) {
864 return "NewJob";
865 }
866 return "OldJob";
867 });
868
869 } // namespace internal
870 } // namespace base
871