1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/thread_group.h"
6
7 #include <memory>
8 #include <tuple>
9 #include <utility>
10
11 #include "base/barrier_closure.h"
12 #include "base/functional/bind.h"
13 #include "base/functional/callback_helpers.h"
14 #include "base/location.h"
15 #include "base/memory/ref_counted.h"
16 #include "base/task/task_runner.h"
17 #include "base/task/task_traits.h"
18 #include "base/task/thread_pool/can_run_policy_test.h"
19 #include "base/task/thread_pool/delayed_task_manager.h"
20 #include "base/task/thread_pool/pooled_sequenced_task_runner.h"
21 #include "base/task/thread_pool/task_tracker.h"
22 #include "base/task/thread_pool/test_task_factory.h"
23 #include "base/task/thread_pool/test_utils.h"
24 #include "base/task/thread_pool/thread_group_impl.h"
25 #include "base/test/bind.h"
26 #include "base/test/test_timeouts.h"
27 #include "base/test/test_waitable_event.h"
28 #include "base/threading/platform_thread.h"
29 #include "base/threading/scoped_blocking_call.h"
30 #include "base/threading/scoped_blocking_call_internal.h"
31 #include "base/threading/simple_thread.h"
32 #include "base/threading/thread.h"
33 #include "base/threading/thread_restrictions.h"
34 #include "build/build_config.h"
35 #include "testing/gtest/include/gtest/gtest.h"
36
37 #if BUILDFLAG(IS_WIN)
38 #include "base/win/com_init_check_hook.h"
39 #include "base/win/com_init_util.h"
40 #endif
41
42 namespace base {
43 namespace internal {
44
45 namespace {
46
47 constexpr size_t kMaxTasks = 4;
48 constexpr size_t kTooManyTasks = 1000;
49 // By default, tests allow half of the thread group to be used by best-effort
50 // tasks.
51 constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
52 constexpr size_t kNumThreadsPostingTasks = 4;
53 constexpr size_t kNumTasksPostedPerThread = 150;
54
55 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
56
57 class ThreadPostingTasks : public SimpleThread {
58 public:
59 // Constructs a thread that posts |num_tasks_posted_per_thread| tasks to
60 // |thread_group| through an |execution_mode| task runner. If
61 // |post_nested_task| is YES, each task posted by this thread posts another
62 // task when it runs.
ThreadPostingTasks(test::MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate_,TaskSourceExecutionMode execution_mode,PostNestedTask post_nested_task)63 ThreadPostingTasks(
64 test::MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate_,
65 TaskSourceExecutionMode execution_mode,
66 PostNestedTask post_nested_task)
67 : SimpleThread("ThreadPostingTasks"),
68 post_nested_task_(post_nested_task),
69 factory_(test::CreatePooledTaskRunnerWithExecutionMode(
70 execution_mode,
71 mock_pooled_task_runner_delegate_),
72 execution_mode) {}
73 ThreadPostingTasks(const ThreadPostingTasks&) = delete;
74 ThreadPostingTasks& operator=(const ThreadPostingTasks&) = delete;
75
factory() const76 const test::TestTaskFactory* factory() const { return &factory_; }
77
78 private:
Run()79 void Run() override {
80 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
81 EXPECT_TRUE(factory_.PostTask(post_nested_task_, OnceClosure()));
82 }
83
84 const scoped_refptr<TaskRunner> task_runner_;
85 const PostNestedTask post_nested_task_;
86 test::TestTaskFactory factory_;
87 };
88
89 class ThreadGroupTestBase : public testing::Test, public ThreadGroup::Delegate {
90 public:
91 ThreadGroupTestBase(const ThreadGroupTestBase&) = delete;
92 ThreadGroupTestBase& operator=(const ThreadGroupTestBase&) = delete;
93
94 protected:
95 ThreadGroupTestBase() = default;
96
SetUp()97 void SetUp() override {
98 service_thread_.Start();
99 delayed_task_manager_.Start(service_thread_.task_runner());
100 CreateThreadGroup();
101 }
102
TearDown()103 void TearDown() override {
104 delayed_task_manager_.Shutdown();
105 service_thread_.Stop();
106 DestroyThreadGroup();
107 }
108
CreateThreadGroup()109 void CreateThreadGroup() {
110 ASSERT_FALSE(thread_group_);
111 thread_group_ = std::make_unique<ThreadGroupImpl>(
112 "TestThreadGroup", "A", ThreadType::kDefault,
113 task_tracker_.GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
114
115 mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
116 }
117
StartThreadGroup(ThreadGroup::WorkerEnvironment worker_environment=ThreadGroup::WorkerEnvironment::NONE)118 void StartThreadGroup(ThreadGroup::WorkerEnvironment worker_environment =
119 ThreadGroup::WorkerEnvironment::NONE) {
120 ASSERT_TRUE(thread_group_);
121 ThreadGroupImpl* thread_group_impl =
122 static_cast<ThreadGroupImpl*>(thread_group_.get());
123 thread_group_impl->Start(kMaxTasks, kMaxBestEffortTasks, TimeDelta::Max(),
124 service_thread_.task_runner(), nullptr,
125 worker_environment,
126 /*synchronous_thread_start_for_testing=*/false,
127 /*may_block_threshold=*/{});
128 }
129
DestroyThreadGroup()130 void DestroyThreadGroup() {
131 if (!thread_group_) {
132 return;
133 }
134
135 thread_group_->JoinForTesting();
136 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
137 thread_group_.reset();
138 }
139
140 Thread service_thread_{"ThreadPoolServiceThread"};
141 TaskTracker task_tracker_;
142 DelayedTaskManager delayed_task_manager_;
143 test::MockPooledTaskRunnerDelegate mock_pooled_task_runner_delegate_ = {
144 task_tracker_.GetTrackedRef(), &delayed_task_manager_};
145
146 std::unique_ptr<ThreadGroup> thread_group_;
147
148 private:
149 // ThreadGroup::Delegate:
GetThreadGroupForTraits(const TaskTraits & traits)150 ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override {
151 return thread_group_.get();
152 }
153
154 TrackedRefFactory<ThreadGroup::Delegate> tracked_ref_factory_{this};
155 };
156
157 using ThreadGroupTest = ThreadGroupTestBase;
158
159 // TODO(etiennep): Audit tests that don't need TaskSourceExecutionMode
160 // parameter.
161 class ThreadGroupTestAllExecutionModes
162 : public ThreadGroupTestBase,
163 public testing::WithParamInterface<TaskSourceExecutionMode> {
164 public:
165 ThreadGroupTestAllExecutionModes() = default;
166 ThreadGroupTestAllExecutionModes(const ThreadGroupTestAllExecutionModes&) =
167 delete;
168 ThreadGroupTestAllExecutionModes& operator=(
169 const ThreadGroupTestAllExecutionModes&) = delete;
170
execution_mode() const171 TaskSourceExecutionMode execution_mode() const { return GetParam(); }
172
CreatePooledTaskRunner(const TaskTraits & traits={})173 scoped_refptr<TaskRunner> CreatePooledTaskRunner(
174 const TaskTraits& traits = {}) {
175 return test::CreatePooledTaskRunnerWithExecutionMode(
176 execution_mode(), &mock_pooled_task_runner_delegate_, traits);
177 }
178 };
179
ShouldNotRun()180 void ShouldNotRun() {
181 ADD_FAILURE() << "Ran a task that shouldn't run.";
182 }
183
184 } // namespace
185
TEST_P(ThreadGroupTestAllExecutionModes,PostTasks)186 TEST_P(ThreadGroupTestAllExecutionModes, PostTasks) {
187 StartThreadGroup();
188 // Create threads to post tasks.
189 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
190 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
191 threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
192 &mock_pooled_task_runner_delegate_, execution_mode(),
193 PostNestedTask::NO));
194 threads_posting_tasks.back()->Start();
195 }
196
197 // Wait for all tasks to run.
198 for (const auto& thread_posting_tasks : threads_posting_tasks) {
199 thread_posting_tasks->Join();
200 thread_posting_tasks->factory()->WaitForAllTasksToRun();
201 }
202
203 // Flush the task tracker to be sure that no task accesses its TestTaskFactory
204 // after |thread_posting_tasks| is destroyed.
205 task_tracker_.FlushForTesting();
206 }
207
TEST_P(ThreadGroupTestAllExecutionModes,NestedPostTasks)208 TEST_P(ThreadGroupTestAllExecutionModes, NestedPostTasks) {
209 StartThreadGroup();
210 // Create threads to post tasks. Each task posted by these threads will post
211 // another task when it runs.
212 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
213 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
214 threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
215 &mock_pooled_task_runner_delegate_, execution_mode(),
216 PostNestedTask::YES));
217 threads_posting_tasks.back()->Start();
218 }
219
220 // Wait for all tasks to run.
221 for (const auto& thread_posting_tasks : threads_posting_tasks) {
222 thread_posting_tasks->Join();
223 thread_posting_tasks->factory()->WaitForAllTasksToRun();
224 }
225
226 // Flush the task tracker to be sure that no task accesses its TestTaskFactory
227 // after |thread_posting_tasks| is destroyed.
228 task_tracker_.FlushForTesting();
229 }
230
231 // Verify that a Task can't be posted after shutdown.
TEST_P(ThreadGroupTestAllExecutionModes,PostTaskAfterShutdown)232 TEST_P(ThreadGroupTestAllExecutionModes, PostTaskAfterShutdown) {
233 StartThreadGroup();
234 auto task_runner = CreatePooledTaskRunner();
235 test::ShutdownTaskTracker(&task_tracker_);
236 EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
237 }
238
239 // Verify that a Task runs shortly after its delay expires.
TEST_P(ThreadGroupTestAllExecutionModes,PostDelayedTask)240 TEST_P(ThreadGroupTestAllExecutionModes, PostDelayedTask) {
241 StartThreadGroup();
242 // kJob doesn't support delays.
243 if (execution_mode() == TaskSourceExecutionMode::kJob)
244 return;
245
246 TestWaitableEvent task_ran(WaitableEvent::ResetPolicy::AUTOMATIC);
247 auto task_runner = CreatePooledTaskRunner();
248
249 // Wait until the task runner is up and running to make sure the test below is
250 // solely timing the delayed task, not bringing up a physical thread.
251 task_runner->PostTask(
252 FROM_HERE, BindOnce(&TestWaitableEvent::Signal, Unretained(&task_ran)));
253 task_ran.Wait();
254 ASSERT_TRUE(!task_ran.IsSignaled());
255
256 // Post a task with a short delay.
257 const TimeTicks start_time = TimeTicks::Now();
258 EXPECT_TRUE(task_runner->PostDelayedTask(
259 FROM_HERE, BindOnce(&TestWaitableEvent::Signal, Unretained(&task_ran)),
260 TestTimeouts::tiny_timeout()));
261
262 // Wait until the task runs.
263 task_ran.Wait();
264
265 // Expect the task to run after its delay expires, but no more than a
266 // reasonable amount of time after that (overloaded bots can be slow sometimes
267 // so give it 10X flexibility).
268 const TimeDelta actual_delay = TimeTicks::Now() - start_time;
269 EXPECT_GE(actual_delay, TestTimeouts::tiny_timeout());
270 EXPECT_LT(actual_delay, 10 * TestTimeouts::tiny_timeout());
271 }
272
273 // Verify that the RunsTasksInCurrentSequence() method of a SEQUENCED TaskRunner
274 // returns false when called from a task that isn't part of the sequence. Note:
275 // Tests that use TestTaskFactory already verify that
276 // RunsTasksInCurrentSequence() returns true when appropriate so this method
277 // complements it to get full coverage of that method.
TEST_P(ThreadGroupTestAllExecutionModes,SequencedRunsTasksInCurrentSequence)278 TEST_P(ThreadGroupTestAllExecutionModes, SequencedRunsTasksInCurrentSequence) {
279 StartThreadGroup();
280 auto task_runner = CreatePooledTaskRunner();
281 auto sequenced_task_runner = test::CreatePooledSequencedTaskRunner(
282 TaskTraits(), &mock_pooled_task_runner_delegate_);
283
284 TestWaitableEvent task_ran;
285 task_runner->PostTask(
286 FROM_HERE,
287 BindOnce(
288 [](scoped_refptr<SequencedTaskRunner> sequenced_task_runner,
289 TestWaitableEvent* task_ran) {
290 EXPECT_FALSE(sequenced_task_runner->RunsTasksInCurrentSequence());
291 task_ran->Signal();
292 },
293 sequenced_task_runner, Unretained(&task_ran)));
294 task_ran.Wait();
295 }
296
297 // Verify that tasks posted before Start run after Start.
TEST_P(ThreadGroupTestAllExecutionModes,PostBeforeStart)298 TEST_P(ThreadGroupTestAllExecutionModes, PostBeforeStart) {
299 TestWaitableEvent task_1_running;
300 TestWaitableEvent task_2_running;
301
302 auto task_runner = CreatePooledTaskRunner();
303 task_runner->PostTask(FROM_HERE, BindOnce(&TestWaitableEvent::Signal,
304 Unretained(&task_1_running)));
305 task_runner->PostTask(FROM_HERE, BindOnce(&TestWaitableEvent::Signal,
306 Unretained(&task_2_running)));
307
308 // Workers should not be created and tasks should not run before the thread
309 // group is started. The sleep is to give time for the tasks to potentially
310 // run.
311 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
312 EXPECT_FALSE(task_1_running.IsSignaled());
313 EXPECT_FALSE(task_2_running.IsSignaled());
314
315 StartThreadGroup();
316
317 // Tasks should run shortly after the thread group is started.
318 task_1_running.Wait();
319 task_2_running.Wait();
320
321 task_tracker_.FlushForTesting();
322 }
323
324 // Verify that tasks only run when allowed by the CanRunPolicy.
TEST_P(ThreadGroupTestAllExecutionModes,CanRunPolicyBasic)325 TEST_P(ThreadGroupTestAllExecutionModes, CanRunPolicyBasic) {
326 StartThreadGroup();
327 test::TestCanRunPolicyBasic(
328 thread_group_.get(),
329 [this](TaskPriority priority) {
330 return CreatePooledTaskRunner({priority});
331 },
332 &task_tracker_);
333 }
334
TEST_F(ThreadGroupTest,CanRunPolicyUpdatedBeforeRun)335 TEST_F(ThreadGroupTest, CanRunPolicyUpdatedBeforeRun) {
336 StartThreadGroup();
337 // This test only works with SequencedTaskRunner become it assumes
338 // ordered execution of 2 posted tasks.
339 test::TestCanRunPolicyChangedBeforeRun(
340 thread_group_.get(),
341 [this](TaskPriority priority) {
342 return test::CreatePooledSequencedTaskRunner(
343 {priority}, &mock_pooled_task_runner_delegate_);
344 },
345 &task_tracker_);
346 }
347
TEST_P(ThreadGroupTestAllExecutionModes,CanRunPolicyLoad)348 TEST_P(ThreadGroupTestAllExecutionModes, CanRunPolicyLoad) {
349 StartThreadGroup();
350 test::TestCanRunPolicyLoad(
351 thread_group_.get(),
352 [this](TaskPriority priority) {
353 return CreatePooledTaskRunner({priority});
354 },
355 &task_tracker_);
356 }
357
358 // Verifies that ShouldYield() returns true for a priority that is not allowed
359 // to run by the CanRunPolicy.
TEST_F(ThreadGroupTest,CanRunPolicyShouldYield)360 TEST_F(ThreadGroupTest, CanRunPolicyShouldYield) {
361 StartThreadGroup();
362
363 task_tracker_.SetCanRunPolicy(CanRunPolicy::kNone);
364 thread_group_->DidUpdateCanRunPolicy();
365 EXPECT_TRUE(
366 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
367 EXPECT_TRUE(
368 thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
369
370 task_tracker_.SetCanRunPolicy(CanRunPolicy::kForegroundOnly);
371 thread_group_->DidUpdateCanRunPolicy();
372 EXPECT_TRUE(
373 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
374 EXPECT_FALSE(
375 thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
376
377 task_tracker_.SetCanRunPolicy(CanRunPolicy::kAll);
378 thread_group_->DidUpdateCanRunPolicy();
379 EXPECT_FALSE(
380 thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
381 EXPECT_FALSE(
382 thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
383 }
384
TEST_F(ThreadGroupTest,SetMaxTasks)385 TEST_F(ThreadGroupTest, SetMaxTasks) {
386 StartThreadGroup();
387
388 constexpr size_t kNewMaxTasks = kMaxTasks / 2;
389
390 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
391 thread_group_->SetMaxTasks(kNewMaxTasks);
392 ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kNewMaxTasks);
393
394 TestWaitableEvent threads_running;
395 TestWaitableEvent busy_threads_continue;
396 const scoped_refptr<TaskRunner> task_runner =
397 test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
398 &mock_pooled_task_runner_delegate_);
399
400 RepeatingClosure threads_running_barrier = BarrierClosure(
401 kNewMaxTasks,
402 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
403
404 // Posting these tasks should cause new workers to be created.
405 for (size_t i = 0; i < kNewMaxTasks; ++i) {
406 task_runner->PostTask(
407 FROM_HERE, BindLambdaForTesting(
408 [&busy_threads_continue, &threads_running_barrier]() {
409 threads_running_barrier.Run();
410 busy_threads_continue.Wait();
411 }));
412 }
413 threads_running.Wait();
414
415 AtomicFlag is_exiting;
416 // These tasks should not get executed until after other tasks become
417 // unblocked.
418 for (size_t i = 0; i < kNewMaxTasks; ++i) {
419 task_runner->PostTask(FROM_HERE, BindOnce(
420 [](AtomicFlag* is_exiting) {
421 EXPECT_TRUE(is_exiting->IsSet());
422 },
423 Unretained(&is_exiting)));
424 }
425 // Give time for those idle workers to possibly do work (which should not
426 // happen).
427 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
428
429 is_exiting.Set();
430 thread_group_->ResetMaxTasks();
431 busy_threads_continue.Signal();
432 task_tracker_.FlushForTesting();
433 }
434
435 // Verify that the maximum number of BEST_EFFORT tasks that can run concurrently
436 // in a thread group does not affect Sequences with a priority that was
437 // increased from BEST_EFFORT to USER_BLOCKING.
TEST_F(ThreadGroupTest,UpdatePriorityBestEffortToUserBlocking)438 TEST_F(ThreadGroupTest, UpdatePriorityBestEffortToUserBlocking) {
439 StartThreadGroup();
440
441 CheckedLock num_tasks_running_lock;
442
443 ConditionVariable num_tasks_running_cv =
444 num_tasks_running_lock.CreateConditionVariable();
445 num_tasks_running_cv.declare_only_used_while_idle();
446
447 size_t num_tasks_running = 0;
448
449 // Post |kMaxTasks| BEST_EFFORT tasks that block until they all start running.
450 std::vector<scoped_refptr<PooledSequencedTaskRunner>> task_runners;
451
452 for (size_t i = 0; i < kMaxTasks; ++i) {
453 task_runners.push_back(MakeRefCounted<PooledSequencedTaskRunner>(
454 TaskTraits(TaskPriority::BEST_EFFORT),
455 &mock_pooled_task_runner_delegate_));
456 task_runners.back()->PostTask(
457 FROM_HERE, BindLambdaForTesting([&] {
458 // Increment the number of tasks running.
459 {
460 CheckedAutoLock auto_lock(num_tasks_running_lock);
461 ++num_tasks_running;
462 }
463 num_tasks_running_cv.Broadcast();
464
465 // Wait until all posted tasks are running.
466 CheckedAutoLock auto_lock(num_tasks_running_lock);
467 while (num_tasks_running < kMaxTasks)
468 num_tasks_running_cv.Wait();
469 }));
470 }
471
472 // Wait until |kMaxBestEffort| tasks start running.
473 {
474 CheckedAutoLock auto_lock(num_tasks_running_lock);
475 while (num_tasks_running < kMaxBestEffortTasks)
476 num_tasks_running_cv.Wait();
477 }
478
479 // Update the priority of all TaskRunners to USER_BLOCKING.
480 for (size_t i = 0; i < kMaxTasks; ++i)
481 task_runners[i]->UpdatePriority(TaskPriority::USER_BLOCKING);
482
483 // Wait until all posted tasks start running. This should not block forever,
484 // even in a thread group that enforces a maximum number of concurrent
485 // BEST_EFFORT tasks lower than |kMaxTasks|.
486 static_assert(kMaxBestEffortTasks < kMaxTasks, "");
487 {
488 CheckedAutoLock auto_lock(num_tasks_running_lock);
489 while (num_tasks_running < kMaxTasks)
490 num_tasks_running_cv.Wait();
491 }
492
493 task_tracker_.FlushForTesting();
494 }
495
496 // Regression test for crbug.com/955953.
TEST_P(ThreadGroupTestAllExecutionModes,ScopedBlockingCallTwice)497 TEST_P(ThreadGroupTestAllExecutionModes, ScopedBlockingCallTwice) {
498 StartThreadGroup();
499 auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
500 execution_mode(), &mock_pooled_task_runner_delegate_, {MayBlock()});
501
502 TestWaitableEvent task_ran;
503 task_runner->PostTask(FROM_HERE,
504 BindOnce(
505 [](TestWaitableEvent* task_ran) {
506 {
507 ScopedBlockingCall scoped_blocking_call(
508 FROM_HERE, BlockingType::MAY_BLOCK);
509 }
510 {
511 ScopedBlockingCall scoped_blocking_call(
512 FROM_HERE, BlockingType::MAY_BLOCK);
513 }
514 task_ran->Signal();
515 },
516 Unretained(&task_ran)));
517 task_ran.Wait();
518 }
519
520 #if BUILDFLAG(IS_WIN)
TEST_P(ThreadGroupTestAllExecutionModes,COMMTAWorkerEnvironment)521 TEST_P(ThreadGroupTestAllExecutionModes, COMMTAWorkerEnvironment) {
522 StartThreadGroup(ThreadGroup::WorkerEnvironment::COM_MTA);
523 auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
524 execution_mode(), &mock_pooled_task_runner_delegate_);
525
526 TestWaitableEvent task_ran;
527 task_runner->PostTask(
528 FROM_HERE, BindOnce(
529 [](TestWaitableEvent* task_ran) {
530 win::AssertComApartmentType(win::ComApartmentType::MTA);
531 task_ran->Signal();
532 },
533 Unretained(&task_ran)));
534 task_ran.Wait();
535 }
536
TEST_P(ThreadGroupTestAllExecutionModes,NoWorkerEnvironment)537 TEST_P(ThreadGroupTestAllExecutionModes, NoWorkerEnvironment) {
538 StartThreadGroup(ThreadGroup::WorkerEnvironment::NONE);
539 auto task_runner = test::CreatePooledTaskRunnerWithExecutionMode(
540 execution_mode(), &mock_pooled_task_runner_delegate_);
541
542 TestWaitableEvent task_ran;
543 task_runner->PostTask(
544 FROM_HERE, BindOnce(
545 [](TestWaitableEvent* task_ran) {
546 win::AssertComApartmentType(win::ComApartmentType::NONE);
547 task_ran->Signal();
548 },
549 Unretained(&task_ran)));
550 task_ran.Wait();
551 }
552 #endif
553
554 // Verifies that ShouldYield() returns false when there is no pending task.
TEST_F(ThreadGroupTest,ShouldYieldSingleTask)555 TEST_F(ThreadGroupTest, ShouldYieldSingleTask) {
556 StartThreadGroup();
557
558 test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
559 &mock_pooled_task_runner_delegate_)
560 ->PostTask(FROM_HERE, BindLambdaForTesting([&] {
561 EXPECT_FALSE(thread_group_->ShouldYield(
562 {TaskPriority::BEST_EFFORT, TimeTicks::Now()}));
563 EXPECT_FALSE(thread_group_->ShouldYield(
564 {TaskPriority::USER_VISIBLE, TimeTicks::Now()}));
565 EXPECT_FALSE(thread_group_->ShouldYield(
566 {TaskPriority::USER_VISIBLE, TimeTicks::Now()}));
567 }));
568
569 task_tracker_.FlushForTesting();
570 }
571
572 // Verify that tasks from a JobTaskSource run at the intended concurrency.
TEST_F(ThreadGroupTest,ScheduleJobTaskSource)573 TEST_F(ThreadGroupTest, ScheduleJobTaskSource) {
574 StartThreadGroup();
575
576 TestWaitableEvent threads_running;
577 TestWaitableEvent threads_continue;
578
579 RepeatingClosure threads_running_barrier = BarrierClosure(
580 kMaxTasks,
581 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
582
583 auto job_task = base::MakeRefCounted<test::MockJobTask>(
584 BindLambdaForTesting(
585 [&threads_running_barrier, &threads_continue](JobDelegate*) {
586 threads_running_barrier.Run();
587 threads_continue.Wait();
588 }),
589 /* num_tasks_to_run */ kMaxTasks);
590 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
591 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
592
593 auto registered_task_source =
594 task_tracker_.RegisterTaskSource(std::move(task_source));
595 EXPECT_TRUE(registered_task_source);
596 thread_group_->PushTaskSourceAndWakeUpWorkers(
597 RegisteredTaskSourceAndTransaction::FromTaskSource(
598 std::move(registered_task_source)));
599
600 threads_running.Wait();
601 threads_continue.Signal();
602
603 // Flush the task tracker to be sure that no local variables are accessed by
604 // tasks after the end of the scope.
605 task_tracker_.FlushForTesting();
606 }
607
608 // Verify that tasks from a JobTaskSource run at the intended concurrency.
TEST_F(ThreadGroupTest,ScheduleJobTaskSourceMultipleTime)609 TEST_F(ThreadGroupTest, ScheduleJobTaskSourceMultipleTime) {
610 StartThreadGroup();
611
612 TestWaitableEvent thread_running;
613 TestWaitableEvent thread_continue;
614 auto job_task = base::MakeRefCounted<test::MockJobTask>(
615 BindLambdaForTesting([&thread_running, &thread_continue](JobDelegate*) {
616 DCHECK(!thread_running.IsSignaled());
617 thread_running.Signal();
618 thread_continue.Wait();
619 }),
620 /* num_tasks_to_run */ 1);
621 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
622 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
623
624 thread_group_->PushTaskSourceAndWakeUpWorkers(
625 RegisteredTaskSourceAndTransaction::FromTaskSource(
626 task_tracker_.RegisterTaskSource(task_source)));
627
628 // Enqueuing the task source again shouldn't affect the number of time it's
629 // run.
630 thread_group_->PushTaskSourceAndWakeUpWorkers(
631 RegisteredTaskSourceAndTransaction::FromTaskSource(
632 task_tracker_.RegisterTaskSource(task_source)));
633
634 thread_running.Wait();
635 thread_continue.Signal();
636
637 // Once the worker task ran, enqueuing the task source has no effect.
638 thread_group_->PushTaskSourceAndWakeUpWorkers(
639 RegisteredTaskSourceAndTransaction::FromTaskSource(
640 task_tracker_.RegisterTaskSource(task_source)));
641
642 // Flush the task tracker to be sure that no local variables are accessed by
643 // tasks after the end of the scope.
644 task_tracker_.FlushForTesting();
645 }
646
647 // Verify that Cancel() on a job stops running the worker task and causes
648 // current workers to yield.
TEST_F(ThreadGroupTest,CancelJobTaskSource)649 TEST_F(ThreadGroupTest, CancelJobTaskSource) {
650 StartThreadGroup();
651
652 CheckedLock tasks_running_lock;
653 ConditionVariable tasks_running_cv =
654 tasks_running_lock.CreateConditionVariable();
655 bool tasks_running = false;
656
657 // Schedule a big number of tasks.
658 auto job_task = base::MakeRefCounted<test::MockJobTask>(
659 BindLambdaForTesting([&](JobDelegate* delegate) {
660 {
661 CheckedAutoLock auto_lock(tasks_running_lock);
662 tasks_running = true;
663 }
664 tasks_running_cv.Signal();
665
666 while (!delegate->ShouldYield()) {
667 }
668 }),
669 /* num_tasks_to_run */ kTooManyTasks);
670 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
671 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
672
673 mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
674 JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
675
676 // Wait for at least 1 task to start running.
677 {
678 CheckedAutoLock auto_lock(tasks_running_lock);
679 while (!tasks_running)
680 tasks_running_cv.Wait();
681 }
682
683 // Cancels pending tasks and unblocks running ones.
684 job_handle.Cancel();
685
686 // This should not block since the job got cancelled.
687 task_tracker_.FlushForTesting();
688 }
689
690 // Verify that calling JobTaskSource::NotifyConcurrencyIncrease() (re-)schedule
691 // tasks with the intended concurrency.
TEST_F(ThreadGroupTest,JobTaskSourceConcurrencyIncrease)692 TEST_F(ThreadGroupTest, JobTaskSourceConcurrencyIncrease) {
693 StartThreadGroup();
694
695 TestWaitableEvent threads_running_a;
696 TestWaitableEvent threads_continue;
697
698 // Initially schedule half the tasks.
699 RepeatingClosure threads_running_barrier = BarrierClosure(
700 kMaxTasks / 2,
701 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running_a)));
702
703 auto job_state = base::MakeRefCounted<test::MockJobTask>(
704 BindLambdaForTesting(
705 [&threads_running_barrier, &threads_continue](JobDelegate*) {
706 threads_running_barrier.Run();
707 threads_continue.Wait();
708 }),
709 /* num_tasks_to_run */ kMaxTasks / 2);
710 auto task_source = job_state->GetJobTaskSource(
711 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
712
713 auto registered_task_source = task_tracker_.RegisterTaskSource(task_source);
714 EXPECT_TRUE(registered_task_source);
715 thread_group_->PushTaskSourceAndWakeUpWorkers(
716 RegisteredTaskSourceAndTransaction::FromTaskSource(
717 std::move(registered_task_source)));
718
719 threads_running_a.Wait();
720 // Reset |threads_running_barrier| for the remaining tasks.
721 TestWaitableEvent threads_running_b;
722 threads_running_barrier = BarrierClosure(
723 kMaxTasks / 2,
724 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running_b)));
725 job_state->SetNumTasksToRun(kMaxTasks);
726
727 // Unblocks tasks to let them racily wait for NotifyConcurrencyIncrease() to
728 // be called.
729 threads_continue.Signal();
730 task_source->NotifyConcurrencyIncrease();
731 // Wait for the remaining tasks. This should not block forever.
732 threads_running_b.Wait();
733
734 // Flush the task tracker to be sure that no local variables are accessed by
735 // tasks after the end of the scope.
736 task_tracker_.FlushForTesting();
737 }
738
739 // Verify that a JobTaskSource that becomes empty while in the queue eventually
740 // gets discarded.
TEST_F(ThreadGroupTest,ScheduleEmptyJobTaskSource)741 TEST_F(ThreadGroupTest, ScheduleEmptyJobTaskSource) {
742 StartThreadGroup();
743
744 task_tracker_.SetCanRunPolicy(CanRunPolicy::kNone);
745
746 auto job_task = base::MakeRefCounted<test::MockJobTask>(
747 BindRepeating([](JobDelegate*) { ShouldNotRun(); }),
748 /* num_tasks_to_run */ 1);
749 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
750 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
751
752 auto registered_task_source =
753 task_tracker_.RegisterTaskSource(std::move(task_source));
754 EXPECT_TRUE(registered_task_source);
755 thread_group_->PushTaskSourceAndWakeUpWorkers(
756 RegisteredTaskSourceAndTransaction::FromTaskSource(
757 std::move(registered_task_source)));
758
759 // The worker task will never run.
760 job_task->SetNumTasksToRun(0);
761
762 task_tracker_.SetCanRunPolicy(CanRunPolicy::kAll);
763 thread_group_->DidUpdateCanRunPolicy();
764
765 // This should not block since there's no task to run.
766 task_tracker_.FlushForTesting();
767 }
768
769 // Verify that Join() on a job contributes to max concurrency and waits for all
770 // workers to return.
TEST_F(ThreadGroupTest,JoinJobTaskSource)771 TEST_F(ThreadGroupTest, JoinJobTaskSource) {
772 StartThreadGroup();
773
774 TestWaitableEvent threads_continue;
775 RepeatingClosure threads_continue_barrier = BarrierClosure(
776 kMaxTasks + 1,
777 BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_continue)));
778
779 auto job_task = base::MakeRefCounted<test::MockJobTask>(
780 BindLambdaForTesting([&](JobDelegate*) {
781 threads_continue_barrier.Run();
782 threads_continue.Wait();
783 }),
784 /* num_tasks_to_run */ kMaxTasks + 1);
785 scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
786 FROM_HERE, {}, &mock_pooled_task_runner_delegate_);
787
788 mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
789 JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
790 job_handle.Join();
791 // All worker tasks should complete before Join() returns.
792 EXPECT_EQ(0U, job_task->GetMaxConcurrency(0));
793 thread_group_->JoinForTesting();
794 EXPECT_EQ(1U, task_source->HasOneRef());
795 // Prevent TearDown() from calling JoinForTesting() again.
796 mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
797 thread_group_ = nullptr;
798 }
799
800 // Verify that finishing work outside of a job unblocks workers with a stale
801 // max concurrency.
TEST_F(ThreadGroupTest,JoinJobTaskSourceStaleConcurrency)802 TEST_F(ThreadGroupTest, JoinJobTaskSourceStaleConcurrency) {
803 StartThreadGroup();
804
805 TestWaitableEvent thread_running;
806 std::atomic_size_t max_concurrency(1);
807 auto task_source = MakeRefCounted<JobTaskSource>(
808 FROM_HERE, TaskTraits{},
809 BindLambdaForTesting([&](JobDelegate*) { thread_running.Signal(); }),
810 BindLambdaForTesting(
811 [&](size_t /*worker_count*/) -> size_t { return max_concurrency; }),
812 &mock_pooled_task_runner_delegate_);
813
814 mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
815 JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
816 thread_running.Wait();
817
818 // Racily update max concurrency to unblock the task that was waiting on
819 // NotifyMaxConcurrency().
820 max_concurrency = 0;
821 job_handle.Join();
822
823 // This should not block since the job was joined.
824 task_tracker_.FlushForTesting();
825 }
826
827 // Verify that cancelling a job unblocks workers with a stale max concurrency.
TEST_F(ThreadGroupTest,CancelJobTaskSourceWithStaleConcurrency)828 TEST_F(ThreadGroupTest, CancelJobTaskSourceWithStaleConcurrency) {
829 StartThreadGroup();
830
831 TestWaitableEvent thread_running;
832 auto task_source = MakeRefCounted<JobTaskSource>(
833 FROM_HERE, TaskTraits{},
834 BindLambdaForTesting([&](JobDelegate*) { thread_running.Signal(); }),
835 BindRepeating([](size_t /*worker_count*/) -> size_t { return 1; }),
836 &mock_pooled_task_runner_delegate_);
837
838 mock_pooled_task_runner_delegate_.EnqueueJobTaskSource(task_source);
839 JobHandle job_handle = internal::JobTaskSource::CreateJobHandle(task_source);
840 thread_running.Wait();
841 job_handle.Cancel();
842
843 // This should not block since the job got cancelled.
844 task_tracker_.FlushForTesting();
845 }
846
847 // Verify that the maximum number of BEST_EFFORT tasks that can run concurrently
848 // in a thread group does not affect JobTaskSource with a priority that was
849 // increased from BEST_EFFORT to USER_BLOCKING.
TEST_F(ThreadGroupTest,JobTaskSourceUpdatePriority)850 TEST_F(ThreadGroupTest, JobTaskSourceUpdatePriority) {
851 StartThreadGroup();
852
853 CheckedLock num_tasks_running_lock;
854
855 ConditionVariable num_tasks_running_cv =
856 num_tasks_running_lock.CreateConditionVariable();
857 num_tasks_running_cv.declare_only_used_while_idle();
858
859 size_t num_tasks_running = 0;
860
861 auto job_task = base::MakeRefCounted<test::MockJobTask>(
862 BindLambdaForTesting([&](JobDelegate*) {
863 // Increment the number of tasks running.
864 {
865 CheckedAutoLock auto_lock(num_tasks_running_lock);
866 ++num_tasks_running;
867 }
868 num_tasks_running_cv.Broadcast();
869
870 // Wait until all posted tasks are running.
871 CheckedAutoLock auto_lock(num_tasks_running_lock);
872 while (num_tasks_running < kMaxTasks)
873 num_tasks_running_cv.Wait();
874 }),
875 /* num_tasks_to_run */ kMaxTasks);
876 scoped_refptr<JobTaskSource> task_source =
877 job_task->GetJobTaskSource(FROM_HERE, {TaskPriority::BEST_EFFORT},
878 &mock_pooled_task_runner_delegate_);
879
880 auto registered_task_source = task_tracker_.RegisterTaskSource(task_source);
881 EXPECT_TRUE(registered_task_source);
882 thread_group_->PushTaskSourceAndWakeUpWorkers(
883 RegisteredTaskSourceAndTransaction::FromTaskSource(
884 std::move(registered_task_source)));
885
886 // Wait until |kMaxBestEffort| tasks start running.
887 {
888 CheckedAutoLock auto_lock(num_tasks_running_lock);
889 while (num_tasks_running < kMaxBestEffortTasks)
890 num_tasks_running_cv.Wait();
891 }
892
893 // Update the priority to USER_BLOCKING.
894 auto transaction = task_source->BeginTransaction();
895 transaction.UpdatePriority(TaskPriority::USER_BLOCKING);
896 thread_group_->UpdateSortKey(std::move(transaction));
897
898 // Wait until all posted tasks start running. This should not block forever,
899 // even in a thread group that enforces a maximum number of concurrent
900 // BEST_EFFORT tasks lower than |kMaxTasks|.
901 static_assert(kMaxBestEffortTasks < kMaxTasks, "");
902 {
903 CheckedAutoLock auto_lock(num_tasks_running_lock);
904 while (num_tasks_running < kMaxTasks)
905 num_tasks_running_cv.Wait();
906 }
907
908 // Flush the task tracker to be sure that no local variables are accessed by
909 // tasks after the end of the scope.
910 task_tracker_.FlushForTesting();
911 }
912
913 INSTANTIATE_TEST_SUITE_P(GenericParallel,
914 ThreadGroupTestAllExecutionModes,
915 ::testing::Values(TaskSourceExecutionMode::kParallel));
916 INSTANTIATE_TEST_SUITE_P(
917 GenericSequenced,
918 ThreadGroupTestAllExecutionModes,
919 ::testing::Values(TaskSourceExecutionMode::kSequenced));
920 INSTANTIATE_TEST_SUITE_P(GenericJob,
921 ThreadGroupTestAllExecutionModes,
922 ::testing::Values(TaskSourceExecutionMode::kJob));
923
924 } // namespace internal
925 } // namespace base
926