1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
6
7 #include <stddef.h>
8
9 #include <memory>
10 #include <unordered_set>
11 #include <vector>
12
13 #include "base/atomicops.h"
14 #include "base/barrier_closure.h"
15 #include "base/bind.h"
16 #include "base/bind_helpers.h"
17 #include "base/callback.h"
18 #include "base/macros.h"
19 #include "base/memory/ptr_util.h"
20 #include "base/memory/ref_counted.h"
21 #include "base/metrics/histogram.h"
22 #include "base/metrics/histogram_samples.h"
23 #include "base/metrics/statistics_recorder.h"
24 #include "base/synchronization/atomic_flag.h"
25 #include "base/synchronization/condition_variable.h"
26 #include "base/synchronization/lock.h"
27 #include "base/synchronization/waitable_event.h"
28 #include "base/task_runner.h"
29 #include "base/task_scheduler/delayed_task_manager.h"
30 #include "base/task_scheduler/scheduler_worker_pool_params.h"
31 #include "base/task_scheduler/sequence.h"
32 #include "base/task_scheduler/sequence_sort_key.h"
33 #include "base/task_scheduler/task_tracker.h"
34 #include "base/task_scheduler/test_task_factory.h"
35 #include "base/task_scheduler/test_utils.h"
36 #include "base/test/bind_test_util.h"
37 #include "base/test/gtest_util.h"
38 #include "base/test/test_simple_task_runner.h"
39 #include "base/test/test_timeouts.h"
40 #include "base/threading/platform_thread.h"
41 #include "base/threading/scoped_blocking_call.h"
42 #include "base/threading/simple_thread.h"
43 #include "base/threading/thread.h"
44 #include "base/threading/thread_checker_impl.h"
45 #include "base/threading/thread_local_storage.h"
46 #include "base/threading/thread_restrictions.h"
47 #include "base/time/time.h"
48 #include "base/timer/timer.h"
49 #include "build/build_config.h"
50 #include "testing/gtest/include/gtest/gtest.h"
51
52 #if defined(OS_WIN)
53 #include "base/win/com_init_util.h"
54 #endif // defined(OS_WIN)
55
56 namespace base {
57 namespace internal {
58 namespace {
59
60 constexpr size_t kMaxTasks = 4;
61 constexpr size_t kNumThreadsPostingTasks = 4;
62 constexpr size_t kNumTasksPostedPerThread = 150;
63 // This can't be lower because Windows' WaitableEvent wakes up too early when a
64 // small timeout is used. This results in many spurious wake ups before a worker
65 // is allowed to cleanup.
66 constexpr TimeDelta kReclaimTimeForCleanupTests =
67 TimeDelta::FromMilliseconds(500);
68
69 // Waits on |event| in a scope where the blocking observer is null, to avoid
70 // affecting the max tasks.
WaitWithoutBlockingObserver(WaitableEvent * event)71 void WaitWithoutBlockingObserver(WaitableEvent* event) {
72 internal::ScopedClearBlockingObserverForTesting clear_blocking_observer;
73 ScopedAllowBaseSyncPrimitivesForTesting allow_base_sync_primitives;
74 event->Wait();
75 }
76
77 class TaskSchedulerWorkerPoolImplTestBase {
78 protected:
TaskSchedulerWorkerPoolImplTestBase()79 TaskSchedulerWorkerPoolImplTestBase()
80 : service_thread_("TaskSchedulerServiceThread"){};
81
CommonSetUp(TimeDelta suggested_reclaim_time=TimeDelta::Max ())82 void CommonSetUp(TimeDelta suggested_reclaim_time = TimeDelta::Max()) {
83 CreateAndStartWorkerPool(suggested_reclaim_time, kMaxTasks);
84 }
85
CommonTearDown()86 void CommonTearDown() {
87 service_thread_.Stop();
88 task_tracker_.FlushForTesting();
89 if (worker_pool_)
90 worker_pool_->JoinForTesting();
91 }
92
CreateWorkerPool()93 void CreateWorkerPool() {
94 ASSERT_FALSE(worker_pool_);
95 service_thread_.Start();
96 delayed_task_manager_.Start(service_thread_.task_runner());
97 worker_pool_ = std::make_unique<SchedulerWorkerPoolImpl>(
98 "TestWorkerPool", "A", ThreadPriority::NORMAL,
99 task_tracker_.GetTrackedRef(), &delayed_task_manager_);
100 ASSERT_TRUE(worker_pool_);
101 }
102
StartWorkerPool(TimeDelta suggested_reclaim_time,size_t max_tasks)103 virtual void StartWorkerPool(TimeDelta suggested_reclaim_time,
104 size_t max_tasks) {
105 ASSERT_TRUE(worker_pool_);
106 worker_pool_->Start(
107 SchedulerWorkerPoolParams(max_tasks, suggested_reclaim_time), max_tasks,
108 service_thread_.task_runner(), nullptr,
109 SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
110 }
111
CreateAndStartWorkerPool(TimeDelta suggested_reclaim_time,size_t max_tasks)112 void CreateAndStartWorkerPool(TimeDelta suggested_reclaim_time,
113 size_t max_tasks) {
114 CreateWorkerPool();
115 StartWorkerPool(suggested_reclaim_time, max_tasks);
116 }
117
118 Thread service_thread_;
119 TaskTracker task_tracker_ = {"Test"};
120
121 std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_;
122
123 private:
124 DelayedTaskManager delayed_task_manager_;
125
126 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTestBase);
127 };
128
129 class TaskSchedulerWorkerPoolImplTest
130 : public TaskSchedulerWorkerPoolImplTestBase,
131 public testing::Test {
132 protected:
133 TaskSchedulerWorkerPoolImplTest() = default;
134
SetUp()135 void SetUp() override { TaskSchedulerWorkerPoolImplTestBase::CommonSetUp(); }
136
TearDown()137 void TearDown() override {
138 TaskSchedulerWorkerPoolImplTestBase::CommonTearDown();
139 }
140
141 private:
142 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTest);
143 };
144
145 class TaskSchedulerWorkerPoolImplTestParam
146 : public TaskSchedulerWorkerPoolImplTestBase,
147 public testing::TestWithParam<test::ExecutionMode> {
148 protected:
149 TaskSchedulerWorkerPoolImplTestParam() = default;
150
SetUp()151 void SetUp() override { TaskSchedulerWorkerPoolImplTestBase::CommonSetUp(); }
152
TearDown()153 void TearDown() override {
154 TaskSchedulerWorkerPoolImplTestBase::CommonTearDown();
155 }
156
157 private:
158 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTestParam);
159 };
160
161 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
162
163 class ThreadPostingTasksWaitIdle : public SimpleThread {
164 public:
165 // Constructs a thread that posts tasks to |worker_pool| through an
166 // |execution_mode| task runner. The thread waits until all workers in
167 // |worker_pool| are idle before posting a new task.
ThreadPostingTasksWaitIdle(SchedulerWorkerPoolImpl * worker_pool,test::ExecutionMode execution_mode)168 ThreadPostingTasksWaitIdle(SchedulerWorkerPoolImpl* worker_pool,
169 test::ExecutionMode execution_mode)
170 : SimpleThread("ThreadPostingTasksWaitIdle"),
171 worker_pool_(worker_pool),
172 factory_(CreateTaskRunnerWithExecutionMode(worker_pool, execution_mode),
173 execution_mode) {
174 DCHECK(worker_pool_);
175 }
176
factory() const177 const test::TestTaskFactory* factory() const { return &factory_; }
178
179 private:
Run()180 void Run() override {
181 EXPECT_FALSE(factory_.task_runner()->RunsTasksInCurrentSequence());
182
183 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
184 worker_pool_->WaitForAllWorkersIdleForTesting();
185 EXPECT_TRUE(factory_.PostTask(PostNestedTask::NO, Closure()));
186 }
187 }
188
189 SchedulerWorkerPoolImpl* const worker_pool_;
190 const scoped_refptr<TaskRunner> task_runner_;
191 test::TestTaskFactory factory_;
192
193 DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasksWaitIdle);
194 };
195
196 } // namespace
197
TEST_P(TaskSchedulerWorkerPoolImplTestParam,PostTasksWaitAllWorkersIdle)198 TEST_P(TaskSchedulerWorkerPoolImplTestParam, PostTasksWaitAllWorkersIdle) {
199 // Create threads to post tasks. To verify that workers can sleep and be woken
200 // up when new tasks are posted, wait for all workers to become idle before
201 // posting a new task.
202 std::vector<std::unique_ptr<ThreadPostingTasksWaitIdle>>
203 threads_posting_tasks;
204 for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
205 threads_posting_tasks.push_back(
206 std::make_unique<ThreadPostingTasksWaitIdle>(worker_pool_.get(),
207 GetParam()));
208 threads_posting_tasks.back()->Start();
209 }
210
211 // Wait for all tasks to run.
212 for (const auto& thread_posting_tasks : threads_posting_tasks) {
213 thread_posting_tasks->Join();
214 thread_posting_tasks->factory()->WaitForAllTasksToRun();
215 }
216
217 // Wait until all workers are idle to be sure that no task accesses its
218 // TestTaskFactory after |thread_posting_tasks| is destroyed.
219 worker_pool_->WaitForAllWorkersIdleForTesting();
220 }
221
TEST_P(TaskSchedulerWorkerPoolImplTestParam,PostTasksWithOneAvailableWorker)222 TEST_P(TaskSchedulerWorkerPoolImplTestParam, PostTasksWithOneAvailableWorker) {
223 // Post blocking tasks to keep all workers busy except one until |event| is
224 // signaled. Use different factories so that tasks are added to different
225 // sequences and can run simultaneously when the execution mode is SEQUENCED.
226 WaitableEvent event;
227 std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories;
228 for (size_t i = 0; i < (kMaxTasks - 1); ++i) {
229 blocked_task_factories.push_back(std::make_unique<test::TestTaskFactory>(
230 CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
231 GetParam()));
232 EXPECT_TRUE(blocked_task_factories.back()->PostTask(
233 PostNestedTask::NO,
234 BindOnce(&WaitWithoutBlockingObserver, Unretained(&event))));
235 blocked_task_factories.back()->WaitForAllTasksToRun();
236 }
237
238 // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact
239 // that only one worker in |worker_pool_| isn't busy.
240 test::TestTaskFactory short_task_factory(
241 CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
242 GetParam());
243 for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
244 EXPECT_TRUE(short_task_factory.PostTask(PostNestedTask::NO, Closure()));
245 short_task_factory.WaitForAllTasksToRun();
246
247 // Release tasks waiting on |event|.
248 event.Signal();
249
250 // Wait until all workers are idle to be sure that no task accesses
251 // its TestTaskFactory after it is destroyed.
252 worker_pool_->WaitForAllWorkersIdleForTesting();
253 }
254
TEST_P(TaskSchedulerWorkerPoolImplTestParam,Saturate)255 TEST_P(TaskSchedulerWorkerPoolImplTestParam, Saturate) {
256 // Verify that it is possible to have |kMaxTasks| tasks/sequences running
257 // simultaneously. Use different factories so that the blocking tasks are
258 // added to different sequences and can run simultaneously when the execution
259 // mode is SEQUENCED.
260 WaitableEvent event;
261 std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
262 for (size_t i = 0; i < kMaxTasks; ++i) {
263 factories.push_back(std::make_unique<test::TestTaskFactory>(
264 CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
265 GetParam()));
266 EXPECT_TRUE(factories.back()->PostTask(
267 PostNestedTask::NO,
268 BindOnce(&WaitWithoutBlockingObserver, Unretained(&event))));
269 factories.back()->WaitForAllTasksToRun();
270 }
271
272 // Release tasks waiting on |event|.
273 event.Signal();
274
275 // Wait until all workers are idle to be sure that no task accesses
276 // its TestTaskFactory after it is destroyed.
277 worker_pool_->WaitForAllWorkersIdleForTesting();
278 }
279
280 #if defined(OS_WIN)
TEST_P(TaskSchedulerWorkerPoolImplTestParam,NoEnvironment)281 TEST_P(TaskSchedulerWorkerPoolImplTestParam, NoEnvironment) {
282 // Verify that COM is not initialized in a SchedulerWorkerPoolImpl initialized
283 // with SchedulerWorkerPoolImpl::WorkerEnvironment::NONE.
284 scoped_refptr<TaskRunner> task_runner =
285 CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam());
286
287 WaitableEvent task_running;
288 task_runner->PostTask(
289 FROM_HERE, BindOnce(
290 [](WaitableEvent* task_running) {
291 win::AssertComApartmentType(win::ComApartmentType::NONE);
292 task_running->Signal();
293 },
294 &task_running));
295
296 task_running.Wait();
297
298 worker_pool_->WaitForAllWorkersIdleForTesting();
299 }
300 #endif // defined(OS_WIN)
301
302 INSTANTIATE_TEST_CASE_P(Parallel,
303 TaskSchedulerWorkerPoolImplTestParam,
304 ::testing::Values(test::ExecutionMode::PARALLEL));
305 INSTANTIATE_TEST_CASE_P(Sequenced,
306 TaskSchedulerWorkerPoolImplTestParam,
307 ::testing::Values(test::ExecutionMode::SEQUENCED));
308
309 #if defined(OS_WIN)
310
311 namespace {
312
313 class TaskSchedulerWorkerPoolImplTestCOMMTAParam
314 : public TaskSchedulerWorkerPoolImplTestBase,
315 public testing::TestWithParam<test::ExecutionMode> {
316 protected:
317 TaskSchedulerWorkerPoolImplTestCOMMTAParam() = default;
318
SetUp()319 void SetUp() override { TaskSchedulerWorkerPoolImplTestBase::CommonSetUp(); }
320
TearDown()321 void TearDown() override {
322 TaskSchedulerWorkerPoolImplTestBase::CommonTearDown();
323 }
324
325 private:
StartWorkerPool(TimeDelta suggested_reclaim_time,size_t max_tasks)326 void StartWorkerPool(TimeDelta suggested_reclaim_time,
327 size_t max_tasks) override {
328 ASSERT_TRUE(worker_pool_);
329 worker_pool_->Start(
330 SchedulerWorkerPoolParams(max_tasks, suggested_reclaim_time), max_tasks,
331 service_thread_.task_runner(), nullptr,
332 SchedulerWorkerPoolImpl::WorkerEnvironment::COM_MTA);
333 }
334
335 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTestCOMMTAParam);
336 };
337
338 } // namespace
339
TEST_P(TaskSchedulerWorkerPoolImplTestCOMMTAParam,COMMTAInitialized)340 TEST_P(TaskSchedulerWorkerPoolImplTestCOMMTAParam, COMMTAInitialized) {
341 // Verify that SchedulerWorkerPoolImpl workers have a COM MTA available.
342 scoped_refptr<TaskRunner> task_runner =
343 CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam());
344
345 WaitableEvent task_running;
346 task_runner->PostTask(
347 FROM_HERE, BindOnce(
348 [](WaitableEvent* task_running) {
349 win::AssertComApartmentType(win::ComApartmentType::MTA);
350 task_running->Signal();
351 },
352 &task_running));
353
354 task_running.Wait();
355
356 worker_pool_->WaitForAllWorkersIdleForTesting();
357 }
358
359 INSTANTIATE_TEST_CASE_P(Parallel,
360 TaskSchedulerWorkerPoolImplTestCOMMTAParam,
361 ::testing::Values(test::ExecutionMode::PARALLEL));
362 INSTANTIATE_TEST_CASE_P(Sequenced,
363 TaskSchedulerWorkerPoolImplTestCOMMTAParam,
364 ::testing::Values(test::ExecutionMode::SEQUENCED));
365
366 #endif // defined(OS_WIN)
367
368 namespace {
369
370 class TaskSchedulerWorkerPoolImplStartInBodyTest
371 : public TaskSchedulerWorkerPoolImplTest {
372 public:
SetUp()373 void SetUp() override {
374 CreateWorkerPool();
375 // Let the test start the worker pool.
376 }
377 };
378
TaskPostedBeforeStart(PlatformThreadRef * platform_thread_ref,WaitableEvent * task_running,WaitableEvent * barrier)379 void TaskPostedBeforeStart(PlatformThreadRef* platform_thread_ref,
380 WaitableEvent* task_running,
381 WaitableEvent* barrier) {
382 *platform_thread_ref = PlatformThread::CurrentRef();
383 task_running->Signal();
384 WaitWithoutBlockingObserver(barrier);
385 }
386
387 } // namespace
388
389 // Verify that 2 tasks posted before Start() to a SchedulerWorkerPoolImpl with
390 // more than 2 workers run on different workers when Start() is called.
TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest,PostTasksBeforeStart)391 TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest, PostTasksBeforeStart) {
392 PlatformThreadRef task_1_thread_ref;
393 PlatformThreadRef task_2_thread_ref;
394 WaitableEvent task_1_running;
395 WaitableEvent task_2_running;
396
397 // This event is used to prevent a task from completing before the other task
398 // starts running. If that happened, both tasks could run on the same worker
399 // and this test couldn't verify that the correct number of workers were woken
400 // up.
401 WaitableEvent barrier;
402
403 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()})
404 ->PostTask(
405 FROM_HERE,
406 BindOnce(&TaskPostedBeforeStart, Unretained(&task_1_thread_ref),
407 Unretained(&task_1_running), Unretained(&barrier)));
408 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()})
409 ->PostTask(
410 FROM_HERE,
411 BindOnce(&TaskPostedBeforeStart, Unretained(&task_2_thread_ref),
412 Unretained(&task_2_running), Unretained(&barrier)));
413
414 // Workers should not be created and tasks should not run before the pool is
415 // started.
416 EXPECT_EQ(0U, worker_pool_->NumberOfWorkersForTesting());
417 EXPECT_FALSE(task_1_running.IsSignaled());
418 EXPECT_FALSE(task_2_running.IsSignaled());
419
420 StartWorkerPool(TimeDelta::Max(), kMaxTasks);
421
422 // Tasks should run shortly after the pool is started.
423 task_1_running.Wait();
424 task_2_running.Wait();
425
426 // Tasks should run on different threads.
427 EXPECT_NE(task_1_thread_ref, task_2_thread_ref);
428
429 barrier.Signal();
430 task_tracker_.FlushForTesting();
431 }
432
433 // Verify that posting many tasks before Start will cause the number of workers
434 // to grow to |max_tasks_| during Start.
TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest,PostManyTasks)435 TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest, PostManyTasks) {
436 scoped_refptr<TaskRunner> task_runner =
437 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
438 constexpr size_t kNumTasksPosted = 2 * kMaxTasks;
439 for (size_t i = 0; i < kNumTasksPosted; ++i)
440 task_runner->PostTask(FROM_HERE, DoNothing());
441
442 EXPECT_EQ(0U, worker_pool_->NumberOfWorkersForTesting());
443
444 StartWorkerPool(TimeDelta::Max(), kMaxTasks);
445 ASSERT_GT(kNumTasksPosted, worker_pool_->GetMaxTasksForTesting());
446 EXPECT_EQ(kMaxTasks, worker_pool_->GetMaxTasksForTesting());
447
448 EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(),
449 worker_pool_->GetMaxTasksForTesting());
450 }
451
452 namespace {
453
454 constexpr size_t kMagicTlsValue = 42;
455
456 class TaskSchedulerWorkerPoolCheckTlsReuse
457 : public TaskSchedulerWorkerPoolImplTest {
458 public:
SetTlsValueAndWait()459 void SetTlsValueAndWait() {
460 slot_.Set(reinterpret_cast<void*>(kMagicTlsValue));
461 WaitWithoutBlockingObserver(&waiter_);
462 }
463
CountZeroTlsValuesAndWait(WaitableEvent * count_waiter)464 void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) {
465 if (!slot_.Get())
466 subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1);
467
468 count_waiter->Signal();
469 WaitWithoutBlockingObserver(&waiter_);
470 }
471
472 protected:
473 TaskSchedulerWorkerPoolCheckTlsReuse() = default;
474
SetUp()475 void SetUp() override {
476 CreateAndStartWorkerPool(kReclaimTimeForCleanupTests, kMaxTasks);
477 }
478
479 subtle::Atomic32 zero_tls_values_ = 0;
480
481 WaitableEvent waiter_;
482
483 private:
484 ThreadLocalStorage::Slot slot_;
485
486 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse);
487 };
488
489 } // namespace
490
491 // Checks that at least one worker has been cleaned up by checking the TLS.
TEST_F(TaskSchedulerWorkerPoolCheckTlsReuse,CheckCleanupWorkers)492 TEST_F(TaskSchedulerWorkerPoolCheckTlsReuse, CheckCleanupWorkers) {
493 // Saturate the workers and mark each worker's thread with a magic TLS value.
494 std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
495 for (size_t i = 0; i < kMaxTasks; ++i) {
496 factories.push_back(std::make_unique<test::TestTaskFactory>(
497 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()}),
498 test::ExecutionMode::PARALLEL));
499 ASSERT_TRUE(factories.back()->PostTask(
500 PostNestedTask::NO,
501 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::SetTlsValueAndWait,
502 Unretained(this))));
503 factories.back()->WaitForAllTasksToRun();
504 }
505
506 // Release tasks waiting on |waiter_|.
507 waiter_.Signal();
508 worker_pool_->WaitForAllWorkersIdleForTesting();
509
510 // All workers should be done running by now, so reset for the next phase.
511 waiter_.Reset();
512
513 // Wait for the worker pool to clean up at least one worker.
514 worker_pool_->WaitForWorkersCleanedUpForTesting(1U);
515
516 // Saturate and count the worker threads that do not have the magic TLS value.
517 // If the value is not there, that means we're at a new worker.
518 std::vector<std::unique_ptr<WaitableEvent>> count_waiters;
519 for (auto& factory : factories) {
520 count_waiters.push_back(std::make_unique<WaitableEvent>());
521 ASSERT_TRUE(factory->PostTask(
522 PostNestedTask::NO,
523 Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::CountZeroTlsValuesAndWait,
524 Unretained(this),
525 count_waiters.back().get())));
526 factory->WaitForAllTasksToRun();
527 }
528
529 // Wait for all counters to complete.
530 for (auto& count_waiter : count_waiters)
531 count_waiter->Wait();
532
533 EXPECT_GT(subtle::NoBarrier_Load(&zero_tls_values_), 0);
534
535 // Release tasks waiting on |waiter_|.
536 waiter_.Signal();
537 }
538
539 namespace {
540
541 class TaskSchedulerWorkerPoolHistogramTest
542 : public TaskSchedulerWorkerPoolImplTest {
543 public:
544 TaskSchedulerWorkerPoolHistogramTest() = default;
545
546 protected:
547 // Override SetUp() to allow every test case to initialize a worker pool with
548 // its own arguments.
SetUp()549 void SetUp() override {}
550
551 // Floods |worker_pool_| with a single task each that blocks until
552 // |continue_event| is signaled. Every worker in the pool is blocked on
553 // |continue_event| when this method returns. Note: this helper can easily be
554 // generalized to be useful in other tests, but it's here for now because it's
555 // only used in a TaskSchedulerWorkerPoolHistogramTest at the moment.
FloodPool(WaitableEvent * continue_event)556 void FloodPool(WaitableEvent* continue_event) {
557 ASSERT_FALSE(continue_event->IsSignaled());
558
559 auto task_runner =
560 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
561
562 const auto max_tasks = worker_pool_->GetMaxTasksForTesting();
563
564 WaitableEvent workers_flooded;
565 RepeatingClosure all_workers_running_barrier = BarrierClosure(
566 max_tasks,
567 BindOnce(&WaitableEvent::Signal, Unretained(&workers_flooded)));
568 for (size_t i = 0; i < max_tasks; ++i) {
569 task_runner->PostTask(
570 FROM_HERE,
571 BindOnce(
572 [](OnceClosure on_running, WaitableEvent* continue_event) {
573 std::move(on_running).Run();
574 WaitWithoutBlockingObserver(continue_event);
575 },
576 all_workers_running_barrier, continue_event));
577 }
578 workers_flooded.Wait();
579 }
580
581 private:
582 std::unique_ptr<StatisticsRecorder> statistics_recorder_ =
583 StatisticsRecorder::CreateTemporaryForTesting();
584
585 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolHistogramTest);
586 };
587
588 } // namespace
589
TEST_F(TaskSchedulerWorkerPoolHistogramTest,NumTasksBetweenWaits)590 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaits) {
591 WaitableEvent event;
592 CreateAndStartWorkerPool(TimeDelta::Max(), kMaxTasks);
593 auto task_runner = worker_pool_->CreateSequencedTaskRunnerWithTraits(
594 {WithBaseSyncPrimitives()});
595
596 // Post a task.
597 task_runner->PostTask(
598 FROM_HERE, BindOnce(&WaitWithoutBlockingObserver, Unretained(&event)));
599
600 // Post 2 more tasks while the first task hasn't completed its execution. It
601 // is guaranteed that these tasks will run immediately after the first task,
602 // without allowing the worker to sleep.
603 task_runner->PostTask(FROM_HERE, DoNothing());
604 task_runner->PostTask(FROM_HERE, DoNothing());
605
606 // Allow tasks to run and wait until the SchedulerWorker is idle.
607 event.Signal();
608 worker_pool_->WaitForAllWorkersIdleForTesting();
609
610 // Wake up the SchedulerWorker that just became idle by posting a task and
611 // wait until it becomes idle again. The SchedulerWorker should record the
612 // TaskScheduler.NumTasksBetweenWaits.* histogram on wake up.
613 task_runner->PostTask(FROM_HERE, DoNothing());
614 worker_pool_->WaitForAllWorkersIdleForTesting();
615
616 // Verify that counts were recorded to the histogram as expected.
617 const auto* histogram = worker_pool_->num_tasks_between_waits_histogram();
618 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
619 EXPECT_EQ(1, histogram->SnapshotSamples()->GetCount(3));
620 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
621 }
622
623 // Verifies that NumTasksBetweenWaits histogram is logged as expected across
624 // idle and cleanup periods.
TEST_F(TaskSchedulerWorkerPoolHistogramTest,NumTasksBetweenWaitsWithIdlePeriodAndCleanup)625 TEST_F(TaskSchedulerWorkerPoolHistogramTest,
626 NumTasksBetweenWaitsWithIdlePeriodAndCleanup) {
627 WaitableEvent tasks_can_exit_event;
628 CreateAndStartWorkerPool(kReclaimTimeForCleanupTests, kMaxTasks);
629
630 WaitableEvent workers_continue;
631
632 FloodPool(&workers_continue);
633
634 const auto* histogram = worker_pool_->num_tasks_between_waits_histogram();
635
636 // NumTasksBetweenWaits shouldn't be logged until idle.
637 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
638 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(1));
639 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
640
641 // Make all workers go idle.
642 workers_continue.Signal();
643 worker_pool_->WaitForAllWorkersIdleForTesting();
644
645 // All workers should have reported a single hit in the "1" bucket per the the
646 // histogram being reported when going idle and each worker having processed
647 // precisely 1 task per the controlled flooding logic above.
648 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
649 EXPECT_EQ(static_cast<int>(kMaxTasks),
650 histogram->SnapshotSamples()->GetCount(1));
651 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
652
653 worker_pool_->WaitForWorkersCleanedUpForTesting(kMaxTasks - 1);
654
655 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
656 EXPECT_EQ(static_cast<int>(kMaxTasks),
657 histogram->SnapshotSamples()->GetCount(1));
658 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
659
660 // Flooding the pool once again (without letting any workers go idle)
661 // shouldn't affect the counts either.
662
663 workers_continue.Reset();
664 FloodPool(&workers_continue);
665
666 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
667 EXPECT_EQ(static_cast<int>(kMaxTasks),
668 histogram->SnapshotSamples()->GetCount(1));
669 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
670
671 workers_continue.Signal();
672 worker_pool_->WaitForAllWorkersIdleForTesting();
673 }
674
TEST_F(TaskSchedulerWorkerPoolHistogramTest,NumTasksBeforeCleanup)675 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBeforeCleanup) {
676 CreateWorkerPool();
677 auto histogrammed_thread_task_runner =
678 worker_pool_->CreateSequencedTaskRunnerWithTraits(
679 {WithBaseSyncPrimitives()});
680
681 // Post 3 tasks and hold the thread for idle thread stack ordering.
682 // This test assumes |histogrammed_thread_task_runner| gets assigned the same
683 // thread for each of its tasks.
684 PlatformThreadRef thread_ref;
685 histogrammed_thread_task_runner->PostTask(
686 FROM_HERE, BindOnce(
687 [](PlatformThreadRef* thread_ref) {
688 ASSERT_TRUE(thread_ref);
689 *thread_ref = PlatformThread::CurrentRef();
690 },
691 Unretained(&thread_ref)));
692 histogrammed_thread_task_runner->PostTask(
693 FROM_HERE, BindOnce(
694 [](PlatformThreadRef* thread_ref) {
695 ASSERT_FALSE(thread_ref->is_null());
696 EXPECT_EQ(*thread_ref, PlatformThread::CurrentRef());
697 },
698 Unretained(&thread_ref)));
699
700 WaitableEvent cleanup_thread_running;
701 WaitableEvent cleanup_thread_continue;
702 histogrammed_thread_task_runner->PostTask(
703 FROM_HERE,
704 BindOnce(
705 [](PlatformThreadRef* thread_ref,
706 WaitableEvent* cleanup_thread_running,
707 WaitableEvent* cleanup_thread_continue) {
708 ASSERT_FALSE(thread_ref->is_null());
709 EXPECT_EQ(*thread_ref, PlatformThread::CurrentRef());
710 cleanup_thread_running->Signal();
711 WaitWithoutBlockingObserver(cleanup_thread_continue);
712 },
713 Unretained(&thread_ref), Unretained(&cleanup_thread_running),
714 Unretained(&cleanup_thread_continue)));
715
716 // Start the worker pool with 2 workers, to avoid depending on the scheduler's
717 // logic to always keep one extra idle worker.
718 //
719 // The pool is started after the 3 initial tasks have been posted to ensure
720 // that they are scheduled on the same worker. If the tasks could run as they
721 // are posted, there would be a chance that:
722 // 1. Worker #1: Runs a tasks and empties the sequence, without adding
723 // itself to the idle stack yet.
724 // 2. Posting thread: Posts another task to the now empty sequence. Wakes
725 // up a new worker, since worker #1 isn't on the idle
726 // stack yet.
727 // 3: Worker #2: Runs the tasks, violating the expectation that the 3
728 // initial tasks run on the same worker.
729 constexpr size_t kTwoWorkers = 2;
730 StartWorkerPool(kReclaimTimeForCleanupTests, kTwoWorkers);
731
732 // Wait until the 3rd task is scheduled.
733 cleanup_thread_running.Wait();
734
735 // To allow the SchedulerWorker associated with
736 // |histogrammed_thread_task_runner| to cleanup, make sure it isn't on top of
737 // the idle stack by waking up another SchedulerWorker via
738 // |task_runner_for_top_idle|. |histogrammed_thread_task_runner| should
739 // release and go idle first and then |task_runner_for_top_idle| should
740 // release and go idle. This allows the SchedulerWorker associated with
741 // |histogrammed_thread_task_runner| to cleanup.
742 WaitableEvent top_idle_thread_running;
743 WaitableEvent top_idle_thread_continue;
744 auto task_runner_for_top_idle =
745 worker_pool_->CreateSequencedTaskRunnerWithTraits(
746 {WithBaseSyncPrimitives()});
747 task_runner_for_top_idle->PostTask(
748 FROM_HERE, BindOnce(
749 [](PlatformThreadRef thread_ref,
750 WaitableEvent* top_idle_thread_running,
751 WaitableEvent* top_idle_thread_continue) {
752 ASSERT_FALSE(thread_ref.is_null());
753 EXPECT_NE(thread_ref, PlatformThread::CurrentRef())
754 << "Worker reused. Worker will not cleanup and the "
755 "histogram value will be wrong.";
756 top_idle_thread_running->Signal();
757 WaitWithoutBlockingObserver(top_idle_thread_continue);
758 },
759 thread_ref, Unretained(&top_idle_thread_running),
760 Unretained(&top_idle_thread_continue)));
761 top_idle_thread_running.Wait();
762 EXPECT_EQ(0U, worker_pool_->NumberOfIdleWorkersForTesting());
763 cleanup_thread_continue.Signal();
764 // Wait for the cleanup thread to also become idle.
765 worker_pool_->WaitForWorkersIdleForTesting(1U);
766 top_idle_thread_continue.Signal();
767 // Allow the thread processing the |histogrammed_thread_task_runner| work to
768 // cleanup.
769 worker_pool_->WaitForWorkersCleanedUpForTesting(1U);
770
771 // Verify that counts were recorded to the histogram as expected.
772 const auto* histogram = worker_pool_->num_tasks_before_detach_histogram();
773 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
774 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(1));
775 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(2));
776 EXPECT_EQ(1, histogram->SnapshotSamples()->GetCount(3));
777 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(4));
778 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(5));
779 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(6));
780 EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
781 }
782
783 namespace {
784
785 class TaskSchedulerWorkerPoolStandbyPolicyTest
786 : public TaskSchedulerWorkerPoolImplTestBase,
787 public testing::Test {
788 public:
789 TaskSchedulerWorkerPoolStandbyPolicyTest() = default;
790
SetUp()791 void SetUp() override {
792 TaskSchedulerWorkerPoolImplTestBase::CommonSetUp(
793 kReclaimTimeForCleanupTests);
794 }
795
TearDown()796 void TearDown() override {
797 TaskSchedulerWorkerPoolImplTestBase::CommonTearDown();
798 }
799
800 private:
801 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolStandbyPolicyTest);
802 };
803
804 } // namespace
805
TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest,InitOne)806 TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest, InitOne) {
807 EXPECT_EQ(1U, worker_pool_->NumberOfWorkersForTesting());
808 }
809
810 // Verify that the SchedulerWorkerPoolImpl keeps at least one idle standby
811 // thread, capacity permitting.
TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest,VerifyStandbyThread)812 TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest, VerifyStandbyThread) {
813 auto task_runner =
814 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
815
816 WaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC);
817 WaitableEvent threads_continue;
818
819 RepeatingClosure thread_blocker = BindLambdaForTesting([&]() {
820 thread_running.Signal();
821 WaitWithoutBlockingObserver(&threads_continue);
822 });
823
824 // There should be one idle thread until we reach capacity
825 for (size_t i = 0; i < kMaxTasks; ++i) {
826 EXPECT_EQ(i + 1, worker_pool_->NumberOfWorkersForTesting());
827 task_runner->PostTask(FROM_HERE, thread_blocker);
828 thread_running.Wait();
829 }
830
831 // There should not be an extra idle thread if it means going above capacity
832 EXPECT_EQ(kMaxTasks, worker_pool_->NumberOfWorkersForTesting());
833
834 threads_continue.Signal();
835 // Wait long enough for all but one worker to clean up.
836 worker_pool_->WaitForWorkersCleanedUpForTesting(kMaxTasks - 1);
837 EXPECT_EQ(1U, worker_pool_->NumberOfWorkersForTesting());
838 // Give extra time for a worker to cleanup : none should as the pool is
839 // expected to keep a worker ready regardless of how long it was idle for.
840 PlatformThread::Sleep(kReclaimTimeForCleanupTests);
841 EXPECT_EQ(1U, worker_pool_->NumberOfWorkersForTesting());
842 }
843
844 // Verify that being "the" idle thread counts as being active (i.e. won't be
845 // reclaimed even if not on top of the idle stack when reclaim timeout expires).
846 // Regression test for https://crbug.com/847501.
TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest,InAndOutStandbyThreadIsActive)847 TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest,
848 InAndOutStandbyThreadIsActive) {
849 auto sequenced_task_runner =
850 worker_pool_->CreateSequencedTaskRunnerWithTraits({});
851
852 WaitableEvent timer_started;
853
854 RepeatingTimer recurring_task;
855 sequenced_task_runner->PostTask(
856 FROM_HERE, BindLambdaForTesting([&]() {
857 recurring_task.Start(FROM_HERE, kReclaimTimeForCleanupTests / 2,
858 DoNothing());
859 timer_started.Signal();
860 }));
861
862 timer_started.Wait();
863
864 // Running a task should have brought up a new standby thread.
865 EXPECT_EQ(2U, worker_pool_->NumberOfWorkersForTesting());
866
867 // Give extra time for a worker to cleanup : none should as the two workers
868 // are both considered "active" per the timer ticking faster than the reclaim
869 // timeout.
870 PlatformThread::Sleep(kReclaimTimeForCleanupTests * 2);
871 EXPECT_EQ(2U, worker_pool_->NumberOfWorkersForTesting());
872
873 sequenced_task_runner->PostTask(
874 FROM_HERE, BindLambdaForTesting([&]() { recurring_task.Stop(); }));
875
876 // Stopping the recurring task should let the second worker be reclaimed per
877 // not being "the" standby thread for a full reclaim timeout.
878 worker_pool_->WaitForWorkersCleanedUpForTesting(1);
879 EXPECT_EQ(1U, worker_pool_->NumberOfWorkersForTesting());
880 }
881
882 // Verify that being "the" idle thread counts as being active but isn't sticky.
883 // Regression test for https://crbug.com/847501.
TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest,OnlyKeepActiveStandbyThreads)884 TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest, OnlyKeepActiveStandbyThreads) {
885 auto sequenced_task_runner =
886 worker_pool_->CreateSequencedTaskRunnerWithTraits({});
887
888 // Start this test like
889 // TaskSchedulerWorkerPoolStandbyPolicyTest.InAndOutStandbyThreadIsActive and
890 // give it some time to stabilize.
891 RepeatingTimer recurring_task;
892 sequenced_task_runner->PostTask(
893 FROM_HERE, BindLambdaForTesting([&]() {
894 recurring_task.Start(FROM_HERE, kReclaimTimeForCleanupTests / 2,
895 DoNothing());
896 }));
897
898 PlatformThread::Sleep(kReclaimTimeForCleanupTests * 2);
899 EXPECT_EQ(2U, worker_pool_->NumberOfWorkersForTesting());
900
901 // Then also flood the pool (cycling the top of the idle stack).
902 {
903 auto task_runner =
904 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
905
906 WaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC);
907 WaitableEvent threads_continue;
908
909 RepeatingClosure thread_blocker = BindLambdaForTesting([&]() {
910 thread_running.Signal();
911 WaitWithoutBlockingObserver(&threads_continue);
912 });
913
914 for (size_t i = 0; i < kMaxTasks; ++i) {
915 task_runner->PostTask(FROM_HERE, thread_blocker);
916 thread_running.Wait();
917 }
918
919 EXPECT_EQ(kMaxTasks, worker_pool_->NumberOfWorkersForTesting());
920 threads_continue.Signal();
921
922 // Flush to ensure all references to |threads_continue| are gone before it
923 // goes out of scope.
924 task_tracker_.FlushForTesting();
925 }
926
927 // All workers should clean up but two (since the timer is still running).
928 worker_pool_->WaitForWorkersCleanedUpForTesting(kMaxTasks - 2);
929 EXPECT_EQ(2U, worker_pool_->NumberOfWorkersForTesting());
930
931 // Extra time shouldn't change this.
932 PlatformThread::Sleep(kReclaimTimeForCleanupTests * 2);
933 EXPECT_EQ(2U, worker_pool_->NumberOfWorkersForTesting());
934
935 // Stopping the timer should let the number of active threads go down to one.
936 sequenced_task_runner->PostTask(
937 FROM_HERE, BindLambdaForTesting([&]() { recurring_task.Stop(); }));
938 worker_pool_->WaitForWorkersCleanedUpForTesting(1);
939 EXPECT_EQ(1U, worker_pool_->NumberOfWorkersForTesting());
940 }
941
942 namespace {
943
944 enum class OptionalBlockingType {
945 NO_BLOCK,
946 MAY_BLOCK,
947 WILL_BLOCK,
948 };
949
950 struct NestedBlockingType {
NestedBlockingTypebase::internal::__anon70d02d881411::NestedBlockingType951 NestedBlockingType(BlockingType first_in,
952 OptionalBlockingType second_in,
953 BlockingType behaves_as_in)
954 : first(first_in), second(second_in), behaves_as(behaves_as_in) {}
955
956 BlockingType first;
957 OptionalBlockingType second;
958 BlockingType behaves_as;
959 };
960
961 class NestedScopedBlockingCall {
962 public:
NestedScopedBlockingCall(const NestedBlockingType & nested_blocking_type)963 NestedScopedBlockingCall(const NestedBlockingType& nested_blocking_type)
964 : first_scoped_blocking_call_(nested_blocking_type.first),
965 second_scoped_blocking_call_(
966 nested_blocking_type.second == OptionalBlockingType::WILL_BLOCK
967 ? std::make_unique<ScopedBlockingCall>(BlockingType::WILL_BLOCK)
968 : (nested_blocking_type.second ==
969 OptionalBlockingType::MAY_BLOCK
970 ? std::make_unique<ScopedBlockingCall>(
971 BlockingType::MAY_BLOCK)
972 : nullptr)) {}
973
974 private:
975 ScopedBlockingCall first_scoped_blocking_call_;
976 std::unique_ptr<ScopedBlockingCall> second_scoped_blocking_call_;
977
978 DISALLOW_COPY_AND_ASSIGN(NestedScopedBlockingCall);
979 };
980
981 } // namespace
982
983 class TaskSchedulerWorkerPoolBlockingTest
984 : public TaskSchedulerWorkerPoolImplTestBase,
985 public testing::TestWithParam<NestedBlockingType> {
986 public:
987 TaskSchedulerWorkerPoolBlockingTest() = default;
988
ParamInfoToString(::testing::TestParamInfo<NestedBlockingType> param_info)989 static std::string ParamInfoToString(
990 ::testing::TestParamInfo<NestedBlockingType> param_info) {
991 std::string str = param_info.param.first == BlockingType::MAY_BLOCK
992 ? "MAY_BLOCK"
993 : "WILL_BLOCK";
994 if (param_info.param.second == OptionalBlockingType::MAY_BLOCK)
995 str += "_MAY_BLOCK";
996 else if (param_info.param.second == OptionalBlockingType::WILL_BLOCK)
997 str += "_WILL_BLOCK";
998 return str;
999 }
1000
SetUp()1001 void SetUp() override {
1002 TaskSchedulerWorkerPoolImplTestBase::CommonSetUp();
1003 task_runner_ =
1004 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
1005 }
1006
TearDown()1007 void TearDown() override {
1008 TaskSchedulerWorkerPoolImplTestBase::CommonTearDown();
1009 }
1010
1011 protected:
1012 // Saturates the worker pool with a task that first blocks, waits to be
1013 // unblocked, then exits.
SaturateWithBlockingTasks(const NestedBlockingType & nested_blocking_type)1014 void SaturateWithBlockingTasks(
1015 const NestedBlockingType& nested_blocking_type) {
1016 ASSERT_FALSE(blocking_threads_running_.IsSignaled());
1017
1018 RepeatingClosure blocking_threads_running_closure = BarrierClosure(
1019 kMaxTasks, BindOnce(&WaitableEvent::Signal,
1020 Unretained(&blocking_threads_running_)));
1021
1022 for (size_t i = 0; i < kMaxTasks; ++i) {
1023 task_runner_->PostTask(
1024 FROM_HERE,
1025 BindOnce(
1026 [](Closure* blocking_threads_running_closure,
1027 WaitableEvent* blocking_threads_continue_,
1028 const NestedBlockingType& nested_blocking_type) {
1029 NestedScopedBlockingCall nested_scoped_blocking_call(
1030 nested_blocking_type);
1031 blocking_threads_running_closure->Run();
1032 WaitWithoutBlockingObserver(blocking_threads_continue_);
1033 },
1034 Unretained(&blocking_threads_running_closure),
1035 Unretained(&blocking_threads_continue_), nested_blocking_type));
1036 }
1037 blocking_threads_running_.Wait();
1038 }
1039
1040 // Returns how long we can expect a change to |max_tasks_| to occur
1041 // after a task has become blocked.
GetMaxTasksChangeSleepTime()1042 TimeDelta GetMaxTasksChangeSleepTime() {
1043 return std::max(SchedulerWorkerPoolImpl::kBlockedWorkersPollPeriod,
1044 worker_pool_->MayBlockThreshold()) +
1045 TestTimeouts::tiny_timeout();
1046 }
1047
1048 // Waits indefinitely, until |worker_pool_|'s max tasks increases to
1049 // |expected_max_tasks|.
ExpectMaxTasksIncreasesTo(size_t expected_max_tasks)1050 void ExpectMaxTasksIncreasesTo(size_t expected_max_tasks) {
1051 size_t max_tasks = worker_pool_->GetMaxTasksForTesting();
1052 while (max_tasks != expected_max_tasks) {
1053 PlatformThread::Sleep(GetMaxTasksChangeSleepTime());
1054 size_t new_max_tasks = worker_pool_->GetMaxTasksForTesting();
1055 ASSERT_GE(new_max_tasks, max_tasks);
1056 max_tasks = new_max_tasks;
1057 }
1058 }
1059
1060 // Unblocks tasks posted by SaturateWithBlockingTasks().
UnblockTasks()1061 void UnblockTasks() { blocking_threads_continue_.Signal(); }
1062
1063 scoped_refptr<TaskRunner> task_runner_;
1064
1065 private:
1066 WaitableEvent blocking_threads_running_;
1067 WaitableEvent blocking_threads_continue_;
1068
1069 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolBlockingTest);
1070 };
1071
1072 // Verify that BlockingScopeEntered() causes max tasks to increase and creates a
1073 // worker if needed. Also verify that BlockingScopeExited() decreases max tasks
1074 // after an increase.
TEST_P(TaskSchedulerWorkerPoolBlockingTest,ThreadBlockedUnblocked)1075 TEST_P(TaskSchedulerWorkerPoolBlockingTest, ThreadBlockedUnblocked) {
1076 ASSERT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1077
1078 SaturateWithBlockingTasks(GetParam());
1079 if (GetParam().behaves_as == BlockingType::MAY_BLOCK)
1080 ExpectMaxTasksIncreasesTo(2 * kMaxTasks);
1081 // A range of possible number of workers is accepted because of
1082 // crbug.com/757897.
1083 EXPECT_GE(worker_pool_->NumberOfWorkersForTesting(), kMaxTasks + 1);
1084 EXPECT_LE(worker_pool_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
1085 EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), 2 * kMaxTasks);
1086
1087 UnblockTasks();
1088 task_tracker_.FlushForTesting();
1089 EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1090 }
1091
1092 // Verify that tasks posted in a saturated pool before a ScopedBlockingCall will
1093 // execute after ScopedBlockingCall is instantiated.
TEST_P(TaskSchedulerWorkerPoolBlockingTest,PostBeforeBlocking)1094 TEST_P(TaskSchedulerWorkerPoolBlockingTest, PostBeforeBlocking) {
1095 WaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC);
1096 WaitableEvent thread_can_block;
1097 WaitableEvent threads_continue;
1098
1099 for (size_t i = 0; i < kMaxTasks; ++i) {
1100 task_runner_->PostTask(
1101 FROM_HERE,
1102 BindOnce(
1103 [](const NestedBlockingType& nested_blocking_type,
1104 WaitableEvent* thread_running, WaitableEvent* thread_can_block,
1105 WaitableEvent* threads_continue) {
1106 thread_running->Signal();
1107 WaitWithoutBlockingObserver(thread_can_block);
1108
1109 NestedScopedBlockingCall nested_scoped_blocking_call(
1110 nested_blocking_type);
1111 WaitWithoutBlockingObserver(threads_continue);
1112 },
1113 GetParam(), Unretained(&thread_running),
1114 Unretained(&thread_can_block), Unretained(&threads_continue)));
1115 thread_running.Wait();
1116 }
1117
1118 // All workers should be occupied and the pool should be saturated. Workers
1119 // have not entered ScopedBlockingCall yet.
1120 EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(), kMaxTasks);
1121 EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1122
1123 WaitableEvent extra_threads_running;
1124 WaitableEvent extra_threads_continue;
1125 RepeatingClosure extra_threads_running_barrier = BarrierClosure(
1126 kMaxTasks,
1127 BindOnce(&WaitableEvent::Signal, Unretained(&extra_threads_running)));
1128 for (size_t i = 0; i < kMaxTasks; ++i) {
1129 task_runner_->PostTask(FROM_HERE,
1130 BindOnce(
1131 [](Closure* extra_threads_running_barrier,
1132 WaitableEvent* extra_threads_continue) {
1133 extra_threads_running_barrier->Run();
1134 WaitWithoutBlockingObserver(
1135 extra_threads_continue);
1136 },
1137 Unretained(&extra_threads_running_barrier),
1138 Unretained(&extra_threads_continue)));
1139 }
1140
1141 // Allow tasks to enter ScopedBlockingCall. Workers should be created for the
1142 // tasks we just posted.
1143 thread_can_block.Signal();
1144 if (GetParam().behaves_as == BlockingType::MAY_BLOCK)
1145 ExpectMaxTasksIncreasesTo(2 * kMaxTasks);
1146
1147 // Should not block forever.
1148 extra_threads_running.Wait();
1149 EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
1150 extra_threads_continue.Signal();
1151
1152 threads_continue.Signal();
1153 task_tracker_.FlushForTesting();
1154 }
1155 // Verify that workers become idle when the pool is over-capacity and that
1156 // those workers do no work.
TEST_P(TaskSchedulerWorkerPoolBlockingTest,WorkersIdleWhenOverCapacity)1157 TEST_P(TaskSchedulerWorkerPoolBlockingTest, WorkersIdleWhenOverCapacity) {
1158 ASSERT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1159
1160 SaturateWithBlockingTasks(GetParam());
1161 if (GetParam().behaves_as == BlockingType::MAY_BLOCK)
1162 ExpectMaxTasksIncreasesTo(2 * kMaxTasks);
1163 EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), 2 * kMaxTasks);
1164 // A range of possible number of workers is accepted because of
1165 // crbug.com/757897.
1166 EXPECT_GE(worker_pool_->NumberOfWorkersForTesting(), kMaxTasks + 1);
1167 EXPECT_LE(worker_pool_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
1168
1169 WaitableEvent threads_running;
1170 WaitableEvent threads_continue;
1171
1172 RepeatingClosure threads_running_barrier = BarrierClosure(
1173 kMaxTasks,
1174 BindOnce(&WaitableEvent::Signal, Unretained(&threads_running)));
1175 // Posting these tasks should cause new workers to be created.
1176 for (size_t i = 0; i < kMaxTasks; ++i) {
1177 auto callback = BindOnce(
1178 [](Closure* threads_running_barrier, WaitableEvent* threads_continue) {
1179 threads_running_barrier->Run();
1180 WaitWithoutBlockingObserver(threads_continue);
1181 },
1182 Unretained(&threads_running_barrier), Unretained(&threads_continue));
1183 task_runner_->PostTask(FROM_HERE, std::move(callback));
1184 }
1185 threads_running.Wait();
1186
1187 ASSERT_EQ(worker_pool_->NumberOfIdleWorkersForTesting(), 0U);
1188 EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
1189
1190 AtomicFlag is_exiting;
1191 // These tasks should not get executed until after other tasks become
1192 // unblocked.
1193 for (size_t i = 0; i < kMaxTasks; ++i) {
1194 task_runner_->PostTask(FROM_HERE, BindOnce(
1195 [](AtomicFlag* is_exiting) {
1196 EXPECT_TRUE(is_exiting->IsSet());
1197 },
1198 Unretained(&is_exiting)));
1199 }
1200
1201 // The original |kMaxTasks| will finish their tasks after being
1202 // unblocked. There will be work in the work queue, but the pool should now
1203 // be over-capacity and workers will become idle.
1204 UnblockTasks();
1205 worker_pool_->WaitForWorkersIdleForTesting(kMaxTasks);
1206 EXPECT_EQ(worker_pool_->NumberOfIdleWorkersForTesting(), kMaxTasks);
1207
1208 // Posting more tasks should not cause workers idle from the pool being over
1209 // capacity to begin doing work.
1210 for (size_t i = 0; i < kMaxTasks; ++i) {
1211 task_runner_->PostTask(FROM_HERE, BindOnce(
1212 [](AtomicFlag* is_exiting) {
1213 EXPECT_TRUE(is_exiting->IsSet());
1214 },
1215 Unretained(&is_exiting)));
1216 }
1217
1218 // Give time for those idle workers to possibly do work (which should not
1219 // happen).
1220 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
1221
1222 is_exiting.Set();
1223 // Unblocks the new workers.
1224 threads_continue.Signal();
1225 task_tracker_.FlushForTesting();
1226 }
1227
1228 INSTANTIATE_TEST_CASE_P(
1229 ,
1230 TaskSchedulerWorkerPoolBlockingTest,
1231 ::testing::Values(NestedBlockingType(BlockingType::MAY_BLOCK,
1232 OptionalBlockingType::NO_BLOCK,
1233 BlockingType::MAY_BLOCK),
1234 NestedBlockingType(BlockingType::WILL_BLOCK,
1235 OptionalBlockingType::NO_BLOCK,
1236 BlockingType::WILL_BLOCK),
1237 NestedBlockingType(BlockingType::MAY_BLOCK,
1238 OptionalBlockingType::WILL_BLOCK,
1239 BlockingType::WILL_BLOCK),
1240 NestedBlockingType(BlockingType::WILL_BLOCK,
1241 OptionalBlockingType::MAY_BLOCK,
1242 BlockingType::WILL_BLOCK)),
1243 TaskSchedulerWorkerPoolBlockingTest::ParamInfoToString);
1244
1245 // Verify that if a thread enters the scope of a MAY_BLOCK ScopedBlockingCall,
1246 // but exits the scope before the MayBlockThreshold() is reached, that the max
1247 // tasks does not increase.
TEST_F(TaskSchedulerWorkerPoolBlockingTest,ThreadBlockUnblockPremature)1248 TEST_F(TaskSchedulerWorkerPoolBlockingTest, ThreadBlockUnblockPremature) {
1249 ASSERT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1250
1251 TimeDelta max_tasks_change_sleep = GetMaxTasksChangeSleepTime();
1252 worker_pool_->MaximizeMayBlockThresholdForTesting();
1253
1254 SaturateWithBlockingTasks(NestedBlockingType(BlockingType::MAY_BLOCK,
1255 OptionalBlockingType::NO_BLOCK,
1256 BlockingType::MAY_BLOCK));
1257 PlatformThread::Sleep(max_tasks_change_sleep);
1258 EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(), kMaxTasks);
1259 EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1260
1261 UnblockTasks();
1262 task_tracker_.FlushForTesting();
1263 EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1264 }
1265
1266 // Verify that if max tasks is incremented because of a MAY_BLOCK
1267 // ScopedBlockingCall, it isn't incremented again when there is a nested
1268 // WILL_BLOCK ScopedBlockingCall.
TEST_F(TaskSchedulerWorkerPoolBlockingTest,MayBlockIncreaseCapacityNestedWillBlock)1269 TEST_F(TaskSchedulerWorkerPoolBlockingTest,
1270 MayBlockIncreaseCapacityNestedWillBlock) {
1271 ASSERT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1272 auto task_runner =
1273 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
1274 WaitableEvent can_return;
1275
1276 // Saturate the pool so that a MAY_BLOCK ScopedBlockingCall would increment
1277 // the max tasks.
1278 for (size_t i = 0; i < kMaxTasks - 1; ++i) {
1279 task_runner->PostTask(FROM_HERE, BindOnce(&WaitWithoutBlockingObserver,
1280 Unretained(&can_return)));
1281 }
1282
1283 WaitableEvent can_instantiate_will_block;
1284 WaitableEvent did_instantiate_will_block;
1285
1286 // Post a task that instantiates a MAY_BLOCK ScopedBlockingCall.
1287 task_runner->PostTask(
1288 FROM_HERE,
1289 BindOnce(
1290 [](WaitableEvent* can_instantiate_will_block,
1291 WaitableEvent* did_instantiate_will_block,
1292 WaitableEvent* can_return) {
1293 ScopedBlockingCall may_block(BlockingType::MAY_BLOCK);
1294 WaitWithoutBlockingObserver(can_instantiate_will_block);
1295 ScopedBlockingCall will_block(BlockingType::WILL_BLOCK);
1296 did_instantiate_will_block->Signal();
1297 WaitWithoutBlockingObserver(can_return);
1298 },
1299 Unretained(&can_instantiate_will_block),
1300 Unretained(&did_instantiate_will_block), Unretained(&can_return)));
1301
1302 // After a short delay, max tasks should be incremented.
1303 ExpectMaxTasksIncreasesTo(kMaxTasks + 1);
1304
1305 // Wait until the task instantiates a WILL_BLOCK ScopedBlockingCall.
1306 can_instantiate_will_block.Signal();
1307 did_instantiate_will_block.Wait();
1308
1309 // Max tasks shouldn't be incremented again.
1310 EXPECT_EQ(kMaxTasks + 1, worker_pool_->GetMaxTasksForTesting());
1311
1312 // Tear down.
1313 can_return.Signal();
1314 task_tracker_.FlushForTesting();
1315 EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1316 }
1317
1318 // Verify that workers that become idle due to the pool being over capacity will
1319 // eventually cleanup.
TEST(TaskSchedulerWorkerPoolOverCapacityTest,VerifyCleanup)1320 TEST(TaskSchedulerWorkerPoolOverCapacityTest, VerifyCleanup) {
1321 constexpr size_t kLocalMaxTasks = 3;
1322
1323 TaskTracker task_tracker("Test");
1324 DelayedTaskManager delayed_task_manager;
1325 scoped_refptr<TaskRunner> service_thread_task_runner =
1326 MakeRefCounted<TestSimpleTaskRunner>();
1327 delayed_task_manager.Start(service_thread_task_runner);
1328 SchedulerWorkerPoolImpl worker_pool(
1329 "OverCapacityTestWorkerPool", "A", ThreadPriority::NORMAL,
1330 task_tracker.GetTrackedRef(), &delayed_task_manager);
1331 worker_pool.Start(
1332 SchedulerWorkerPoolParams(kLocalMaxTasks, kReclaimTimeForCleanupTests),
1333 kLocalMaxTasks, service_thread_task_runner, nullptr,
1334 SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
1335
1336 scoped_refptr<TaskRunner> task_runner =
1337 worker_pool.CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
1338
1339 WaitableEvent threads_running;
1340 WaitableEvent threads_continue;
1341 RepeatingClosure threads_running_barrier = BarrierClosure(
1342 kLocalMaxTasks,
1343 BindOnce(&WaitableEvent::Signal, Unretained(&threads_running)));
1344
1345 WaitableEvent blocked_call_continue;
1346 RepeatingClosure closure = BindRepeating(
1347 [](Closure* threads_running_barrier, WaitableEvent* threads_continue,
1348 WaitableEvent* blocked_call_continue) {
1349 threads_running_barrier->Run();
1350 {
1351 ScopedBlockingCall scoped_blocking_call(BlockingType::WILL_BLOCK);
1352 WaitWithoutBlockingObserver(blocked_call_continue);
1353 }
1354 WaitWithoutBlockingObserver(threads_continue);
1355 },
1356 Unretained(&threads_running_barrier), Unretained(&threads_continue),
1357 Unretained(&blocked_call_continue));
1358
1359 for (size_t i = 0; i < kLocalMaxTasks; ++i)
1360 task_runner->PostTask(FROM_HERE, closure);
1361
1362 threads_running.Wait();
1363
1364 WaitableEvent extra_threads_running;
1365 WaitableEvent extra_threads_continue;
1366
1367 RepeatingClosure extra_threads_running_barrier = BarrierClosure(
1368 kLocalMaxTasks,
1369 BindOnce(&WaitableEvent::Signal, Unretained(&extra_threads_running)));
1370 // These tasks should run on the new threads from increasing max tasks.
1371 for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1372 task_runner->PostTask(FROM_HERE,
1373 BindOnce(
1374 [](Closure* extra_threads_running_barrier,
1375 WaitableEvent* extra_threads_continue) {
1376 extra_threads_running_barrier->Run();
1377 WaitWithoutBlockingObserver(
1378 extra_threads_continue);
1379 },
1380 Unretained(&extra_threads_running_barrier),
1381 Unretained(&extra_threads_continue)));
1382 }
1383 extra_threads_running.Wait();
1384
1385 ASSERT_EQ(kLocalMaxTasks * 2, worker_pool.NumberOfWorkersForTesting());
1386 EXPECT_EQ(kLocalMaxTasks * 2, worker_pool.GetMaxTasksForTesting());
1387 blocked_call_continue.Signal();
1388 extra_threads_continue.Signal();
1389
1390 // Periodically post tasks to ensure that posting tasks does not prevent
1391 // workers that are idle due to the pool being over capacity from cleaning up.
1392 for (int i = 0; i < 16; ++i) {
1393 task_runner->PostDelayedTask(FROM_HERE, DoNothing(),
1394 kReclaimTimeForCleanupTests * i * 0.5);
1395 }
1396
1397 // Note: one worker above capacity will not get cleaned up since it's on the
1398 // top of the idle stack.
1399 worker_pool.WaitForWorkersCleanedUpForTesting(kLocalMaxTasks - 1);
1400 EXPECT_EQ(kLocalMaxTasks + 1, worker_pool.NumberOfWorkersForTesting());
1401
1402 threads_continue.Signal();
1403
1404 worker_pool.JoinForTesting();
1405 }
1406
1407 // Verify that the maximum number of workers is 256 and that hitting the max
1408 // leaves the pool in a valid state with regards to max tasks.
TEST_F(TaskSchedulerWorkerPoolBlockingTest,MaximumWorkersTest)1409 TEST_F(TaskSchedulerWorkerPoolBlockingTest, MaximumWorkersTest) {
1410 constexpr size_t kMaxNumberOfWorkers = 256;
1411 constexpr size_t kNumExtraTasks = 10;
1412
1413 WaitableEvent early_blocking_threads_running;
1414 RepeatingClosure early_threads_barrier_closure =
1415 BarrierClosure(kMaxNumberOfWorkers,
1416 BindOnce(&WaitableEvent::Signal,
1417 Unretained(&early_blocking_threads_running)));
1418
1419 WaitableEvent early_threads_finished;
1420 RepeatingClosure early_threads_finished_barrier = BarrierClosure(
1421 kMaxNumberOfWorkers,
1422 BindOnce(&WaitableEvent::Signal, Unretained(&early_threads_finished)));
1423
1424 WaitableEvent early_release_threads_continue;
1425
1426 // Post ScopedBlockingCall tasks to hit the worker cap.
1427 for (size_t i = 0; i < kMaxNumberOfWorkers; ++i) {
1428 task_runner_->PostTask(FROM_HERE,
1429 BindOnce(
1430 [](Closure* early_threads_barrier_closure,
1431 WaitableEvent* early_release_threads_continue,
1432 Closure* early_threads_finished) {
1433 {
1434 ScopedBlockingCall scoped_blocking_call(
1435 BlockingType::WILL_BLOCK);
1436 early_threads_barrier_closure->Run();
1437 WaitWithoutBlockingObserver(
1438 early_release_threads_continue);
1439 }
1440 early_threads_finished->Run();
1441 },
1442 Unretained(&early_threads_barrier_closure),
1443 Unretained(&early_release_threads_continue),
1444 Unretained(&early_threads_finished_barrier)));
1445 }
1446
1447 early_blocking_threads_running.Wait();
1448 EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(),
1449 kMaxTasks + kMaxNumberOfWorkers);
1450
1451 WaitableEvent late_release_thread_contine;
1452 WaitableEvent late_blocking_threads_running;
1453
1454 RepeatingClosure late_threads_barrier_closure = BarrierClosure(
1455 kNumExtraTasks, BindOnce(&WaitableEvent::Signal,
1456 Unretained(&late_blocking_threads_running)));
1457
1458 // Posts additional tasks. Note: we should already have |kMaxNumberOfWorkers|
1459 // tasks running. These tasks should not be able to get executed yet as
1460 // the pool is already at its max worker cap.
1461 for (size_t i = 0; i < kNumExtraTasks; ++i) {
1462 task_runner_->PostTask(
1463 FROM_HERE,
1464 BindOnce(
1465 [](Closure* late_threads_barrier_closure,
1466 WaitableEvent* late_release_thread_contine) {
1467 ScopedBlockingCall scoped_blocking_call(BlockingType::WILL_BLOCK);
1468 late_threads_barrier_closure->Run();
1469 WaitWithoutBlockingObserver(late_release_thread_contine);
1470 },
1471 Unretained(&late_threads_barrier_closure),
1472 Unretained(&late_release_thread_contine)));
1473 }
1474
1475 // Give time to see if we exceed the max number of workers.
1476 PlatformThread::Sleep(TestTimeouts::tiny_timeout());
1477 EXPECT_LE(worker_pool_->NumberOfWorkersForTesting(), kMaxNumberOfWorkers);
1478
1479 early_release_threads_continue.Signal();
1480 early_threads_finished.Wait();
1481 late_blocking_threads_running.Wait();
1482
1483 WaitableEvent final_tasks_running;
1484 WaitableEvent final_tasks_continue;
1485 RepeatingClosure final_tasks_running_barrier = BarrierClosure(
1486 kMaxTasks,
1487 BindOnce(&WaitableEvent::Signal, Unretained(&final_tasks_running)));
1488
1489 // Verify that we are still able to saturate the pool.
1490 for (size_t i = 0; i < kMaxTasks; ++i) {
1491 task_runner_->PostTask(
1492 FROM_HERE,
1493 BindOnce(
1494 [](Closure* closure, WaitableEvent* final_tasks_continue) {
1495 closure->Run();
1496 WaitWithoutBlockingObserver(final_tasks_continue);
1497 },
1498 Unretained(&final_tasks_running_barrier),
1499 Unretained(&final_tasks_continue)));
1500 }
1501 final_tasks_running.Wait();
1502 EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks + kNumExtraTasks);
1503 late_release_thread_contine.Signal();
1504 final_tasks_continue.Signal();
1505 task_tracker_.FlushForTesting();
1506 }
1507
1508 // Verify that the maximum number of background tasks that can run concurrently
1509 // is honored.
TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest,MaxBackgroundTasks)1510 TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest, MaxBackgroundTasks) {
1511 constexpr int kMaxBackgroundTasks = kMaxTasks / 2;
1512 worker_pool_->Start(
1513 SchedulerWorkerPoolParams(kMaxTasks, base::TimeDelta::Max()),
1514 kMaxBackgroundTasks, service_thread_.task_runner(), nullptr,
1515 SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
1516 const scoped_refptr<TaskRunner> foreground_runner =
1517 worker_pool_->CreateTaskRunnerWithTraits({MayBlock()});
1518 const scoped_refptr<TaskRunner> background_runner =
1519 worker_pool_->CreateTaskRunnerWithTraits(
1520 {TaskPriority::BACKGROUND, MayBlock()});
1521
1522 // It should be possible to have |kMaxBackgroundTasks|
1523 // TaskPriority::BACKGROUND tasks running concurrently.
1524 WaitableEvent background_tasks_running;
1525 WaitableEvent unblock_background_tasks;
1526 RepeatingClosure background_tasks_running_barrier = BarrierClosure(
1527 kMaxBackgroundTasks,
1528 BindOnce(&WaitableEvent::Signal, Unretained(&background_tasks_running)));
1529
1530 for (int i = 0; i < kMaxBackgroundTasks; ++i) {
1531 background_runner->PostTask(
1532 FROM_HERE, base::BindLambdaForTesting([&]() {
1533 background_tasks_running_barrier.Run();
1534 WaitWithoutBlockingObserver(&unblock_background_tasks);
1535 }));
1536 }
1537 background_tasks_running.Wait();
1538
1539 // No more TaskPriority::BACKGROUND task should run.
1540 AtomicFlag extra_background_task_can_run;
1541 WaitableEvent extra_background_task_running;
1542 background_runner->PostTask(
1543 FROM_HERE, base::BindLambdaForTesting([&]() {
1544 EXPECT_TRUE(extra_background_task_can_run.IsSet());
1545 extra_background_task_running.Signal();
1546 }));
1547
1548 // An extra foreground task should be able to run.
1549 WaitableEvent foreground_task_running;
1550 foreground_runner->PostTask(
1551 FROM_HERE, base::BindOnce(&WaitableEvent::Signal,
1552 Unretained(&foreground_task_running)));
1553 foreground_task_running.Wait();
1554
1555 // Completion of the TaskPriority::BACKGROUND tasks should allow the extra
1556 // TaskPriority::BACKGROUND task to run.
1557 extra_background_task_can_run.Set();
1558 unblock_background_tasks.Signal();
1559 extra_background_task_running.Wait();
1560
1561 // Tear down.
1562 task_tracker_.FlushForTesting();
1563 }
1564
1565 namespace {
1566
1567 class TaskSchedulerWorkerPoolBlockingCallAndMaxBackgroundTasksTest
1568 : public TaskSchedulerWorkerPoolImplTestBase,
1569 public testing::TestWithParam<BlockingType> {
1570 public:
1571 static constexpr int kMaxBackgroundTasks = kMaxTasks / 2;
1572
1573 TaskSchedulerWorkerPoolBlockingCallAndMaxBackgroundTasksTest() = default;
1574
SetUp()1575 void SetUp() override {
1576 CreateWorkerPool();
1577 worker_pool_->Start(
1578 SchedulerWorkerPoolParams(kMaxTasks, base::TimeDelta::Max()),
1579 kMaxBackgroundTasks, service_thread_.task_runner(), nullptr,
1580 SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
1581 }
1582
TearDown()1583 void TearDown() override {
1584 TaskSchedulerWorkerPoolImplTestBase::CommonTearDown();
1585 }
1586
1587 private:
1588 DISALLOW_COPY_AND_ASSIGN(
1589 TaskSchedulerWorkerPoolBlockingCallAndMaxBackgroundTasksTest);
1590 };
1591
1592 } // namespace
1593
TEST_P(TaskSchedulerWorkerPoolBlockingCallAndMaxBackgroundTasksTest,BlockingCallAndMaxBackgroundTasksTest)1594 TEST_P(TaskSchedulerWorkerPoolBlockingCallAndMaxBackgroundTasksTest,
1595 BlockingCallAndMaxBackgroundTasksTest) {
1596 const scoped_refptr<TaskRunner> background_runner =
1597 worker_pool_->CreateTaskRunnerWithTraits(
1598 {TaskPriority::BACKGROUND, MayBlock()});
1599
1600 // Post |kMaxBackgroundTasks| TaskPriority::BACKGROUND tasks that block in a
1601 // ScopedBlockingCall.
1602 WaitableEvent blocking_background_tasks_running;
1603 WaitableEvent unblock_blocking_background_tasks;
1604 RepeatingClosure blocking_background_tasks_running_barrier =
1605 BarrierClosure(kMaxBackgroundTasks,
1606 BindOnce(&WaitableEvent::Signal,
1607 Unretained(&blocking_background_tasks_running)));
1608 for (int i = 0; i < kMaxBackgroundTasks; ++i) {
1609 background_runner->PostTask(
1610 FROM_HERE, base::BindLambdaForTesting([&]() {
1611 blocking_background_tasks_running_barrier.Run();
1612 ScopedBlockingCall scoped_blocking_call(GetParam());
1613 WaitWithoutBlockingObserver(&unblock_blocking_background_tasks);
1614 }));
1615 }
1616 blocking_background_tasks_running.Wait();
1617
1618 // Post an extra |kMaxBackgroundTasks| TaskPriority::BACKGROUND tasks. They
1619 // should be able to run, because the existing TaskPriority::BACKGROUND tasks
1620 // are blocked within a ScopedBlockingCall.
1621 //
1622 // Note: We block the tasks until they have all started running to make sure
1623 // that it is possible to run an extra |kMaxBackgroundTasks| concurrently.
1624 WaitableEvent background_tasks_running;
1625 WaitableEvent unblock_background_tasks;
1626 RepeatingClosure background_tasks_running_barrier = BarrierClosure(
1627 kMaxBackgroundTasks,
1628 BindOnce(&WaitableEvent::Signal, Unretained(&background_tasks_running)));
1629 for (int i = 0; i < kMaxBackgroundTasks; ++i) {
1630 background_runner->PostTask(
1631 FROM_HERE, base::BindLambdaForTesting([&]() {
1632 background_tasks_running_barrier.Run();
1633 WaitWithoutBlockingObserver(&unblock_background_tasks);
1634 }));
1635 }
1636 background_tasks_running.Wait();
1637
1638 // Unblock all tasks and tear down.
1639 unblock_blocking_background_tasks.Signal();
1640 unblock_background_tasks.Signal();
1641 task_tracker_.FlushForTesting();
1642 }
1643
1644 INSTANTIATE_TEST_CASE_P(
1645 MayBlock,
1646 TaskSchedulerWorkerPoolBlockingCallAndMaxBackgroundTasksTest,
1647 ::testing::Values(BlockingType::MAY_BLOCK));
1648 INSTANTIATE_TEST_CASE_P(
1649 WillBlock,
1650 TaskSchedulerWorkerPoolBlockingCallAndMaxBackgroundTasksTest,
1651 ::testing::Values(BlockingType::WILL_BLOCK));
1652
1653 // Verify that worker detachement doesn't race with worker cleanup, regression
1654 // test for https://crbug.com/810464.
TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest,RacyCleanup)1655 TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest, RacyCleanup) {
1656 #if defined(OS_FUCHSIA)
1657 // Fuchsia + QEMU doesn't deal well with *many* threads being
1658 // created/destroyed at once: https://crbug.com/816575.
1659 constexpr size_t kLocalMaxTasks = 16;
1660 #else // defined(OS_FUCHSIA)
1661 constexpr size_t kLocalMaxTasks = 256;
1662 #endif // defined(OS_FUCHSIA)
1663 constexpr TimeDelta kReclaimTimeForRacyCleanupTest =
1664 TimeDelta::FromMilliseconds(10);
1665
1666 worker_pool_->Start(
1667 SchedulerWorkerPoolParams(kLocalMaxTasks, kReclaimTimeForRacyCleanupTest),
1668 kLocalMaxTasks, service_thread_.task_runner(), nullptr,
1669 SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
1670
1671 scoped_refptr<TaskRunner> task_runner =
1672 worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
1673
1674 WaitableEvent threads_running;
1675 WaitableEvent unblock_threads;
1676 RepeatingClosure threads_running_barrier = BarrierClosure(
1677 kLocalMaxTasks,
1678 BindOnce(&WaitableEvent::Signal, Unretained(&threads_running)));
1679
1680 for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1681 task_runner->PostTask(
1682 FROM_HERE,
1683 BindOnce(
1684 [](OnceClosure on_running, WaitableEvent* unblock_threads) {
1685 std::move(on_running).Run();
1686 WaitWithoutBlockingObserver(unblock_threads);
1687 },
1688 threads_running_barrier, Unretained(&unblock_threads)));
1689 }
1690
1691 // Wait for all workers to be ready and release them all at once.
1692 threads_running.Wait();
1693 unblock_threads.Signal();
1694
1695 // Sleep to wakeup precisely when all workers are going to try to cleanup per
1696 // being idle.
1697 PlatformThread::Sleep(kReclaimTimeForRacyCleanupTest);
1698
1699 worker_pool_->JoinForTesting();
1700
1701 // Unwinding this test will be racy if worker cleanup can race with
1702 // SchedulerWorkerPoolImpl destruction : https://crbug.com/810464.
1703 worker_pool_.reset();
1704 }
1705
1706 } // namespace internal
1707 } // namespace base
1708