• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task_scheduler/scheduler_worker_pool.h"
6 
7 #include <memory>
8 
9 #include "base/bind.h"
10 #include "base/bind_helpers.h"
11 #include "base/location.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/task_runner.h"
14 #include "base/task_scheduler/delayed_task_manager.h"
15 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
16 #include "base/task_scheduler/scheduler_worker_pool_params.h"
17 #include "base/task_scheduler/task_tracker.h"
18 #include "base/task_scheduler/task_traits.h"
19 #include "base/task_scheduler/test_task_factory.h"
20 #include "base/task_scheduler/test_utils.h"
21 #include "base/test/test_timeouts.h"
22 #include "base/threading/platform_thread.h"
23 #include "base/threading/simple_thread.h"
24 #include "base/threading/thread.h"
25 #include "build/build_config.h"
26 #include "testing/gtest/include/gtest/gtest.h"
27 
28 #if defined(OS_WIN)
29 #include "base/task_scheduler/platform_native_worker_pool_win.h"
30 #endif
31 
32 namespace base {
33 namespace internal {
34 
35 namespace {
36 
37 constexpr size_t kMaxTasks = 4;
38 // By default, tests allow half of the pool to be used by background tasks.
39 constexpr size_t kMaxBackgroundTasks = kMaxTasks / 2;
40 constexpr size_t kNumThreadsPostingTasks = 4;
41 constexpr size_t kNumTasksPostedPerThread = 150;
42 
43 enum class PoolType {
44   GENERIC,
45 #if defined(OS_WIN)
46   WINDOWS,
47 #endif
48 };
49 
50 struct PoolExecutionType {
51   PoolType pool_type;
52   test::ExecutionMode execution_mode;
53 };
54 
55 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
56 
57 class ThreadPostingTasks : public SimpleThread {
58  public:
59   // Constructs a thread that posts |num_tasks_posted_per_thread| tasks to
60   // |worker_pool| through an |execution_mode| task runner. If
61   // |post_nested_task| is YES, each task posted by this thread posts another
62   // task when it runs.
ThreadPostingTasks(SchedulerWorkerPool * worker_pool,test::ExecutionMode execution_mode,PostNestedTask post_nested_task)63   ThreadPostingTasks(SchedulerWorkerPool* worker_pool,
64                      test::ExecutionMode execution_mode,
65                      PostNestedTask post_nested_task)
66       : SimpleThread("ThreadPostingTasks"),
67         worker_pool_(worker_pool),
68         post_nested_task_(post_nested_task),
69         factory_(test::CreateTaskRunnerWithExecutionMode(worker_pool,
70                                                          execution_mode),
71                  execution_mode) {
72     DCHECK(worker_pool_);
73   }
74 
factory() const75   const test::TestTaskFactory* factory() const { return &factory_; }
76 
77  private:
Run()78   void Run() override {
79     EXPECT_FALSE(factory_.task_runner()->RunsTasksInCurrentSequence());
80 
81     for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
82       EXPECT_TRUE(factory_.PostTask(post_nested_task_, Closure()));
83   }
84 
85   SchedulerWorkerPool* const worker_pool_;
86   const scoped_refptr<TaskRunner> task_runner_;
87   const PostNestedTask post_nested_task_;
88   test::TestTaskFactory factory_;
89 
90   DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks);
91 };
92 
93 class TaskSchedulerWorkerPoolTest
94     : public testing::TestWithParam<PoolExecutionType> {
95  protected:
TaskSchedulerWorkerPoolTest()96   TaskSchedulerWorkerPoolTest()
97       : service_thread_("TaskSchedulerServiceThread") {}
98 
SetUp()99   void SetUp() override {
100     service_thread_.Start();
101     delayed_task_manager_.Start(service_thread_.task_runner());
102     CreateWorkerPool();
103   }
104 
TearDown()105   void TearDown() override {
106     service_thread_.Stop();
107     if (worker_pool_)
108       worker_pool_->JoinForTesting();
109   }
110 
CreateWorkerPool()111   void CreateWorkerPool() {
112     ASSERT_FALSE(worker_pool_);
113     switch (GetParam().pool_type) {
114       case PoolType::GENERIC:
115         worker_pool_ = std::make_unique<SchedulerWorkerPoolImpl>(
116             "TestWorkerPool", "A", ThreadPriority::NORMAL,
117             task_tracker_.GetTrackedRef(), &delayed_task_manager_);
118         break;
119 #if defined(OS_WIN)
120       case PoolType::WINDOWS:
121         worker_pool_ = std::make_unique<PlatformNativeWorkerPoolWin>(
122             task_tracker_.GetTrackedRef(), &delayed_task_manager_);
123         break;
124 #endif
125     }
126     ASSERT_TRUE(worker_pool_);
127   }
128 
StartWorkerPool()129   void StartWorkerPool() {
130     ASSERT_TRUE(worker_pool_);
131     switch (GetParam().pool_type) {
132       case PoolType::GENERIC: {
133         SchedulerWorkerPoolImpl* scheduler_worker_pool_impl =
134             static_cast<SchedulerWorkerPoolImpl*>(worker_pool_.get());
135         scheduler_worker_pool_impl->Start(
136             SchedulerWorkerPoolParams(kMaxTasks, TimeDelta::Max()),
137             kMaxBackgroundTasks, service_thread_.task_runner(), nullptr,
138             SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
139         break;
140       }
141 #if defined(OS_WIN)
142       case PoolType::WINDOWS: {
143         PlatformNativeWorkerPoolWin* scheduler_worker_pool_windows_impl =
144             static_cast<PlatformNativeWorkerPoolWin*>(worker_pool_.get());
145         scheduler_worker_pool_windows_impl->Start();
146         break;
147       }
148 #endif
149     }
150   }
151 
152   Thread service_thread_;
153   TaskTracker task_tracker_ = {"Test"};
154   DelayedTaskManager delayed_task_manager_;
155 
156   std::unique_ptr<SchedulerWorkerPool> worker_pool_;
157 
158  private:
159   DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolTest);
160 };
161 
ShouldNotRun()162 void ShouldNotRun() {
163   ADD_FAILURE() << "Ran a task that shouldn't run.";
164 }
165 
166 }  // namespace
167 
TEST_P(TaskSchedulerWorkerPoolTest,PostTasks)168 TEST_P(TaskSchedulerWorkerPoolTest, PostTasks) {
169   StartWorkerPool();
170   // Create threads to post tasks.
171   std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
172   for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
173     threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
174         worker_pool_.get(), GetParam().execution_mode, PostNestedTask::NO));
175     threads_posting_tasks.back()->Start();
176   }
177 
178   // Wait for all tasks to run.
179   for (const auto& thread_posting_tasks : threads_posting_tasks) {
180     thread_posting_tasks->Join();
181     thread_posting_tasks->factory()->WaitForAllTasksToRun();
182   }
183 
184   // Flush the task tracker to be sure that no task accesses its TestTaskFactory
185   // after |thread_posting_tasks| is destroyed.
186   task_tracker_.FlushForTesting();
187 }
188 
TEST_P(TaskSchedulerWorkerPoolTest,NestedPostTasks)189 TEST_P(TaskSchedulerWorkerPoolTest, NestedPostTasks) {
190   StartWorkerPool();
191   // Create threads to post tasks. Each task posted by these threads will post
192   // another task when it runs.
193   std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks;
194   for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
195     threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>(
196         worker_pool_.get(), GetParam().execution_mode, PostNestedTask::YES));
197     threads_posting_tasks.back()->Start();
198   }
199 
200   // Wait for all tasks to run.
201   for (const auto& thread_posting_tasks : threads_posting_tasks) {
202     thread_posting_tasks->Join();
203     thread_posting_tasks->factory()->WaitForAllTasksToRun();
204   }
205 
206   // Flush the task tracker to be sure that no task accesses its TestTaskFactory
207   // after |thread_posting_tasks| is destroyed.
208   task_tracker_.FlushForTesting();
209 }
210 
211 // Verify that a Task can't be posted after shutdown.
TEST_P(TaskSchedulerWorkerPoolTest,PostTaskAfterShutdown)212 TEST_P(TaskSchedulerWorkerPoolTest, PostTaskAfterShutdown) {
213   StartWorkerPool();
214   auto task_runner = test::CreateTaskRunnerWithExecutionMode(
215       worker_pool_.get(), GetParam().execution_mode);
216   task_tracker_.Shutdown();
217   EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
218 }
219 
220 // Verify that posting tasks after the pool was destroyed fails but doesn't
221 // crash.
TEST_P(TaskSchedulerWorkerPoolTest,PostAfterDestroy)222 TEST_P(TaskSchedulerWorkerPoolTest, PostAfterDestroy) {
223   StartWorkerPool();
224   auto task_runner = test::CreateTaskRunnerWithExecutionMode(
225       worker_pool_.get(), GetParam().execution_mode);
226   EXPECT_TRUE(task_runner->PostTask(FROM_HERE, DoNothing()));
227   task_tracker_.Shutdown();
228   worker_pool_->JoinForTesting();
229   worker_pool_.reset();
230   EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun)));
231 }
232 
233 // Verify that a Task runs shortly after its delay expires.
TEST_P(TaskSchedulerWorkerPoolTest,PostDelayedTask)234 TEST_P(TaskSchedulerWorkerPoolTest, PostDelayedTask) {
235   StartWorkerPool();
236 
237   WaitableEvent task_ran(WaitableEvent::ResetPolicy::AUTOMATIC,
238                          WaitableEvent::InitialState::NOT_SIGNALED);
239 
240   auto task_runner = test::CreateTaskRunnerWithExecutionMode(
241       worker_pool_.get(), GetParam().execution_mode);
242 
243   // Wait until the task runner is up and running to make sure the test below is
244   // solely timing the delayed task, not bringing up a physical thread.
245   task_runner->PostTask(
246       FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_ran)));
247   task_ran.Wait();
248   ASSERT_TRUE(!task_ran.IsSignaled());
249 
250   // Post a task with a short delay.
251   TimeTicks start_time = TimeTicks::Now();
252   EXPECT_TRUE(task_runner->PostDelayedTask(
253       FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_ran)),
254       TestTimeouts::tiny_timeout()));
255 
256   // Wait until the task runs.
257   task_ran.Wait();
258 
259   // Expect the task to run after its delay expires, but no more than 250
260   // ms after that.
261   const TimeDelta actual_delay = TimeTicks::Now() - start_time;
262   EXPECT_GE(actual_delay, TestTimeouts::tiny_timeout());
263   EXPECT_LT(actual_delay,
264             TimeDelta::FromMilliseconds(250) + TestTimeouts::tiny_timeout());
265 }
266 
267 // Verify that the RunsTasksInCurrentSequence() method of a SEQUENCED TaskRunner
268 // returns false when called from a task that isn't part of the sequence. Note:
269 // Tests that use TestTaskFactory already verify that
270 // RunsTasksInCurrentSequence() returns true when appropriate so this method
271 // complements it to get full coverage of that method.
TEST_P(TaskSchedulerWorkerPoolTest,SequencedRunsTasksInCurrentSequence)272 TEST_P(TaskSchedulerWorkerPoolTest, SequencedRunsTasksInCurrentSequence) {
273   StartWorkerPool();
274   auto task_runner = test::CreateTaskRunnerWithExecutionMode(
275       worker_pool_.get(), GetParam().execution_mode);
276   auto sequenced_task_runner =
277       worker_pool_->CreateSequencedTaskRunnerWithTraits(TaskTraits());
278 
279   WaitableEvent task_ran;
280   task_runner->PostTask(
281       FROM_HERE,
282       BindOnce(
283           [](scoped_refptr<TaskRunner> sequenced_task_runner,
284              WaitableEvent* task_ran) {
285             EXPECT_FALSE(sequenced_task_runner->RunsTasksInCurrentSequence());
286             task_ran->Signal();
287           },
288           sequenced_task_runner, Unretained(&task_ran)));
289   task_ran.Wait();
290 }
291 
292 // Verify that tasks posted before Start run after Start.
TEST_P(TaskSchedulerWorkerPoolTest,PostBeforeStart)293 TEST_P(TaskSchedulerWorkerPoolTest, PostBeforeStart) {
294   WaitableEvent task_1_running;
295   WaitableEvent task_2_running;
296 
297   scoped_refptr<TaskRunner> task_runner =
298       worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
299 
300   task_runner->PostTask(
301       FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_1_running)));
302   task_runner->PostTask(
303       FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_2_running)));
304 
305   // Workers should not be created and tasks should not run before the pool is
306   // started. The sleep is to give time for the tasks to potentially run.
307   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
308   EXPECT_FALSE(task_1_running.IsSignaled());
309   EXPECT_FALSE(task_2_running.IsSignaled());
310 
311   StartWorkerPool();
312 
313   // Tasks should run shortly after the pool is started.
314   task_1_running.Wait();
315   task_2_running.Wait();
316 
317   task_tracker_.FlushForTesting();
318 }
319 
320 INSTANTIATE_TEST_CASE_P(GenericParallel,
321                         TaskSchedulerWorkerPoolTest,
322                         ::testing::Values(PoolExecutionType{
323                             PoolType::GENERIC, test::ExecutionMode::PARALLEL}));
324 INSTANTIATE_TEST_CASE_P(GenericSequenced,
325                         TaskSchedulerWorkerPoolTest,
326                         ::testing::Values(PoolExecutionType{
327                             PoolType::GENERIC,
328                             test::ExecutionMode::SEQUENCED}));
329 
330 #if defined(OS_WIN)
331 INSTANTIATE_TEST_CASE_P(WinParallel,
332                         TaskSchedulerWorkerPoolTest,
333                         ::testing::Values(PoolExecutionType{
334                             PoolType::WINDOWS, test::ExecutionMode::PARALLEL}));
335 INSTANTIATE_TEST_CASE_P(WinSequenced,
336                         TaskSchedulerWorkerPoolTest,
337                         ::testing::Values(PoolExecutionType{
338                             PoolType::WINDOWS,
339                             test::ExecutionMode::SEQUENCED}));
340 #endif
341 
342 }  // namespace internal
343 }  // namespace base
344