1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_worker_pool.h"
6
7 #include <memory>
8
9 #include "base/bind.h"
10 #include "base/bind_helpers.h"
11 #include "base/location.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/task_runner.h"
14 #include "base/task_scheduler/delayed_task_manager.h"
15 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
16 #include "base/task_scheduler/scheduler_worker_pool_params.h"
17 #include "base/task_scheduler/task_tracker.h"
18 #include "base/task_scheduler/task_traits.h"
19 #include "base/task_scheduler/test_task_factory.h"
20 #include "base/task_scheduler/test_utils.h"
21 #include "base/test/test_timeouts.h"
22 #include "base/threading/platform_thread.h"
23 #include "base/threading/simple_thread.h"
24 #include "base/threading/thread.h"
25 #include "build/build_config.h"
26 #include "testing/gtest/include/gtest/gtest.h"
27
28 #if defined(OS_WIN)
29 #include "base/task_scheduler/platform_native_worker_pool_win.h"
30 #endif
31
32 namespace base {
33 namespace internal {
34
35 namespace {
36
37 constexpr size_t kMaxTasks = 4;
38 // By default, tests allow half of the pool to be used by background tasks.
39 constexpr size_t kMaxBackgroundTasks = kMaxTasks / 2;
40 constexpr size_t kNumThreadsPostingTasks = 4;
41 constexpr size_t kNumTasksPostedPerThread = 150;
42
43 enum class PoolType {
44 GENERIC,
45 #if defined(OS_WIN)
46 WINDOWS,
47 #endif
48 };
49
50 struct PoolExecutionType {
51 PoolType pool_type;
52 test::ExecutionMode execution_mode;
53 };
54
55 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
56
57 class ThreadPostingTasks : public SimpleThread {
58 public:
59 // Constructs a thread that posts |num_tasks_posted_per_thread| tasks to
60 // |worker_pool| through an |execution_mode| task runner. If
61 // |post_nested_task| is YES, each task posted by this thread posts another
62 // task when it runs.
ThreadPostingTasks(SchedulerWorkerPool * worker_pool,test::ExecutionMode execution_mode,PostNestedTask post_nested_task)63 ThreadPostingTasks(SchedulerWorkerPool* worker_pool,
64 test::ExecutionMode execution_mode,
65 PostNestedTask post_nested_task)
66 : SimpleThread("ThreadPostingTasks"),
67 worker_pool_(worker_pool),
68 post_nested_task_(post_nested_task),
69 factory_(test::CreateTaskRunnerWithExecutionMode(worker_pool,
70 execution_mode),
71 execution_mode) {
72 DCHECK(worker_pool_);
73 }
74
factory() const75 const test::TestTaskFactory* factory() const { return &factory_; }
76
77 private:
Run()78 void Run() override {
79 EXPECT_FALSE(factory_.task_runner()->RunsTasksInCurrentSequence());
80
81 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
82 EXPECT_TRUE(factory_.PostTask(post_nested_task_, Closure()));
83 }
84
85 SchedulerWorkerPool* const worker_pool_;
86 const scoped_refptr<TaskRunner> task_runner_;
87 const PostNestedTask post_nested_task_;
88 test::TestTaskFactory factory_;
89
90 DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks);
91 };
92
93 class TaskSchedulerWorkerPoolTest
94 : public testing::TestWithParam<PoolExecutionType> {
95 protected:
TaskSchedulerWorkerPoolTest()96 TaskSchedulerWorkerPoolTest()
97 : service_thread_("TaskSchedulerServiceThread") {}
98
SetUp()99 void SetUp() override {
100 service_thread_.Start();
101 delayed_task_manager_.Start(service_thread_.task_runner());
102 CreateWorkerPool();
103 }
104
TearDown()105 void TearDown() override {
106 service_thread_.Stop();
107 if (worker_pool_)
108 worker_pool_->JoinForTesting();
109 }
110
CreateWorkerPool()111 void CreateWorkerPool() {
112 ASSERT_FALSE(worker_pool_);
113 switch (GetParam().pool_type) {
114 case PoolType::GENERIC:
115 worker_pool_ = std::make_unique<SchedulerWorkerPoolImpl>(
116 "TestWorkerPool", "A", ThreadPriority::NORMAL,
117 task_tracker_.GetTrackedRef(), &delayed_task_manager_);
118 break;
119 #if defined(OS_WIN)
120 case PoolType::WINDOWS:
121 worker_pool_ = std::make_unique<PlatformNativeWorkerPoolWin>(
122 task_tracker_.GetTrackedRef(), &delayed_task_manager_);
123 break;
124 #endif
125 }
126 ASSERT_TRUE(worker_pool_);
127 }
128
StartWorkerPool()129 void StartWorkerPool() {
130 ASSERT_TRUE(worker_pool_);
131 switch (GetParam().pool_type) {
132 case PoolType::GENERIC: {
133 SchedulerWorkerPoolImpl* scheduler_worker_pool_impl =
134 static_cast<SchedulerWorkerPoolImpl*>(worker_pool_.get());
135 scheduler_worker_pool_impl->Start(
136 SchedulerWorkerPoolParams(kMaxTasks, TimeDelta::Max()),
137 kMaxBackgroundTasks, service_thread_.task_runner(), nullptr,
138 SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
139 break;
140 }
141 #if defined(OS_WIN)
142 case PoolType::WINDOWS: {
143 PlatformNativeWorkerPoolWin* scheduler_worker_pool_windows_impl =
144 static_cast<PlatformNativeWorkerPoolWin*>(worker_pool_.get());
145 scheduler_worker_pool_windows_impl->Start();
146 break;
147 }
148 #endif
149 }
150 }
151
152 Thread service_thread_;
153 TaskTracker task_tracker_ = {"Test"};
154 DelayedTaskManager delayed_task_manager_;
155
156 std::unique_ptr<SchedulerWorkerPool> worker_pool_;
157
158 private:
159 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolTest);
160 };
161
ShouldNotRun()162 void ShouldNotRun() {
163 ADD_FAILURE() << "Ran a task that shouldn't run.";
164 }
165
166 } // namespace
167
TEST_P(TaskSchedulerWorkerPoolTest,PostTasks)168 TEST_P(TaskSchedulerWorkerPoolTest, PostTasks) {
169 StartWorkerPool();
170 // Create threads to post tasks.
171 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
172 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
173 threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
174 worker_pool_.get(), GetParam().execution_mode, PostNestedTask::NO));
175 threads_posting_tasks.back()->Start();
176 }
177
178 // Wait for all tasks to run.
179 for (const auto& thread_posting_tasks : threads_posting_tasks) {
180 thread_posting_tasks->Join();
181 thread_posting_tasks->factory()->WaitForAllTasksToRun();
182 }
183
184 // Flush the task tracker to be sure that no task accesses its TestTaskFactory
185 // after |thread_posting_tasks| is destroyed.
186 task_tracker_.FlushForTesting();
187 }
188
TEST_P(TaskSchedulerWorkerPoolTest,NestedPostTasks)189 TEST_P(TaskSchedulerWorkerPoolTest, NestedPostTasks) {
190 StartWorkerPool();
191 // Create threads to post tasks. Each task posted by these threads will post
192 // another task when it runs.
193 std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
194 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
195 threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
196 worker_pool_.get(), GetParam().execution_mode, PostNestedTask::YES));
197 threads_posting_tasks.back()->Start();
198 }
199
200 // Wait for all tasks to run.
201 for (const auto& thread_posting_tasks : threads_posting_tasks) {
202 thread_posting_tasks->Join();
203 thread_posting_tasks->factory()->WaitForAllTasksToRun();
204 }
205
206 // Flush the task tracker to be sure that no task accesses its TestTaskFactory
207 // after |thread_posting_tasks| is destroyed.
208 task_tracker_.FlushForTesting();
209 }
210
211 // Verify that a Task can't be posted after shutdown.
TEST_P(TaskSchedulerWorkerPoolTest,PostTaskAfterShutdown)212 TEST_P(TaskSchedulerWorkerPoolTest, PostTaskAfterShutdown) {
213 StartWorkerPool();
214 auto task_runner = test::CreateTaskRunnerWithExecutionMode(
215 worker_pool_.get(), GetParam().execution_mode);
216 task_tracker_.Shutdown();
217 EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
218 }
219
220 // Verify that posting tasks after the pool was destroyed fails but doesn't
221 // crash.
TEST_P(TaskSchedulerWorkerPoolTest,PostAfterDestroy)222 TEST_P(TaskSchedulerWorkerPoolTest, PostAfterDestroy) {
223 StartWorkerPool();
224 auto task_runner = test::CreateTaskRunnerWithExecutionMode(
225 worker_pool_.get(), GetParam().execution_mode);
226 EXPECT_TRUE(task_runner->PostTask(FROM_HERE, DoNothing()));
227 task_tracker_.Shutdown();
228 worker_pool_->JoinForTesting();
229 worker_pool_.reset();
230 EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
231 }
232
233 // Verify that a Task runs shortly after its delay expires.
TEST_P(TaskSchedulerWorkerPoolTest,PostDelayedTask)234 TEST_P(TaskSchedulerWorkerPoolTest, PostDelayedTask) {
235 StartWorkerPool();
236
237 WaitableEvent task_ran(WaitableEvent::ResetPolicy::AUTOMATIC,
238 WaitableEvent::InitialState::NOT_SIGNALED);
239
240 auto task_runner = test::CreateTaskRunnerWithExecutionMode(
241 worker_pool_.get(), GetParam().execution_mode);
242
243 // Wait until the task runner is up and running to make sure the test below is
244 // solely timing the delayed task, not bringing up a physical thread.
245 task_runner->PostTask(
246 FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_ran)));
247 task_ran.Wait();
248 ASSERT_TRUE(!task_ran.IsSignaled());
249
250 // Post a task with a short delay.
251 TimeTicks start_time = TimeTicks::Now();
252 EXPECT_TRUE(task_runner->PostDelayedTask(
253 FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_ran)),
254 TestTimeouts::tiny_timeout()));
255
256 // Wait until the task runs.
257 task_ran.Wait();
258
259 // Expect the task to run after its delay expires, but no more than 250
260 // ms after that.
261 const TimeDelta actual_delay = TimeTicks::Now() - start_time;
262 EXPECT_GE(actual_delay, TestTimeouts::tiny_timeout());
263 EXPECT_LT(actual_delay,
264 TimeDelta::FromMilliseconds(250) + TestTimeouts::tiny_timeout());
265 }
266
267 // Verify that the RunsTasksInCurrentSequence() method of a SEQUENCED TaskRunner
268 // returns false when called from a task that isn't part of the sequence. Note:
269 // Tests that use TestTaskFactory already verify that
270 // RunsTasksInCurrentSequence() returns true when appropriate so this method
271 // complements it to get full coverage of that method.
TEST_P(TaskSchedulerWorkerPoolTest,SequencedRunsTasksInCurrentSequence)272 TEST_P(TaskSchedulerWorkerPoolTest, SequencedRunsTasksInCurrentSequence) {
273 StartWorkerPool();
274 auto task_runner = test::CreateTaskRunnerWithExecutionMode(
275 worker_pool_.get(), GetParam().execution_mode);
276 auto sequenced_task_runner =
277 worker_pool_->CreateSequencedTaskRunnerWithTraits(TaskTraits());
278
279 WaitableEvent task_ran;
280 task_runner->PostTask(
281 FROM_HERE,
282 BindOnce(
283 [](scoped_refptr<TaskRunner> sequenced_task_runner,
284 WaitableEvent* task_ran) {
285 EXPECT_FALSE(sequenced_task_runner->RunsTasksInCurrentSequence());
286 task_ran->Signal();
287 },
288 sequenced_task_runner, Unretained(&task_ran)));
289 task_ran.Wait();
290 }
291
292 // Verify that tasks posted before Start run after Start.
TEST_P(TaskSchedulerWorkerPoolTest,PostBeforeStart)293 TEST_P(TaskSchedulerWorkerPoolTest, PostBeforeStart) {
294 WaitableEvent task_1_running;
295 WaitableEvent task_2_running;
296
297 scoped_refptr<TaskRunner> task_runner =
298 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
299
300 task_runner->PostTask(
301 FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_1_running)));
302 task_runner->PostTask(
303 FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_2_running)));
304
305 // Workers should not be created and tasks should not run before the pool is
306 // started. The sleep is to give time for the tasks to potentially run.
307 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
308 EXPECT_FALSE(task_1_running.IsSignaled());
309 EXPECT_FALSE(task_2_running.IsSignaled());
310
311 StartWorkerPool();
312
313 // Tasks should run shortly after the pool is started.
314 task_1_running.Wait();
315 task_2_running.Wait();
316
317 task_tracker_.FlushForTesting();
318 }
319
320 INSTANTIATE_TEST_CASE_P(GenericParallel,
321 TaskSchedulerWorkerPoolTest,
322 ::testing::Values(PoolExecutionType{
323 PoolType::GENERIC, test::ExecutionMode::PARALLEL}));
324 INSTANTIATE_TEST_CASE_P(GenericSequenced,
325 TaskSchedulerWorkerPoolTest,
326 ::testing::Values(PoolExecutionType{
327 PoolType::GENERIC,
328 test::ExecutionMode::SEQUENCED}));
329
330 #if defined(OS_WIN)
331 INSTANTIATE_TEST_CASE_P(WinParallel,
332 TaskSchedulerWorkerPoolTest,
333 ::testing::Values(PoolExecutionType{
334 PoolType::WINDOWS, test::ExecutionMode::PARALLEL}));
335 INSTANTIATE_TEST_CASE_P(WinSequenced,
336 TaskSchedulerWorkerPoolTest,
337 ::testing::Values(PoolExecutionType{
338 PoolType::WINDOWS,
339 test::ExecutionMode::SEQUENCED}));
340 #endif
341
342 } // namespace internal
343 } // namespace base
344