• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
6 
7 #include <stddef.h>
8 
9 #include <memory>
10 #include <unordered_set>
11 #include <vector>
12 
13 #include "base/atomicops.h"
14 #include "base/barrier_closure.h"
15 #include "base/bind.h"
16 #include "base/bind_helpers.h"
17 #include "base/callback.h"
18 #include "base/macros.h"
19 #include "base/memory/ptr_util.h"
20 #include "base/memory/ref_counted.h"
21 #include "base/metrics/histogram.h"
22 #include "base/metrics/histogram_samples.h"
23 #include "base/metrics/statistics_recorder.h"
24 #include "base/synchronization/atomic_flag.h"
25 #include "base/synchronization/condition_variable.h"
26 #include "base/synchronization/lock.h"
27 #include "base/synchronization/waitable_event.h"
28 #include "base/task_runner.h"
29 #include "base/task_scheduler/delayed_task_manager.h"
30 #include "base/task_scheduler/scheduler_worker_pool_params.h"
31 #include "base/task_scheduler/sequence.h"
32 #include "base/task_scheduler/sequence_sort_key.h"
33 #include "base/task_scheduler/task_tracker.h"
34 #include "base/task_scheduler/test_task_factory.h"
35 #include "base/task_scheduler/test_utils.h"
36 #include "base/test/bind_test_util.h"
37 #include "base/test/gtest_util.h"
38 #include "base/test/test_simple_task_runner.h"
39 #include "base/test/test_timeouts.h"
40 #include "base/threading/platform_thread.h"
41 #include "base/threading/scoped_blocking_call.h"
42 #include "base/threading/simple_thread.h"
43 #include "base/threading/thread.h"
44 #include "base/threading/thread_checker_impl.h"
45 #include "base/threading/thread_local_storage.h"
46 #include "base/threading/thread_restrictions.h"
47 #include "base/time/time.h"
48 #include "base/timer/timer.h"
49 #include "build/build_config.h"
50 #include "testing/gtest/include/gtest/gtest.h"
51 
52 #if defined(OS_WIN)
53 #include "base/win/com_init_util.h"
54 #endif  // defined(OS_WIN)
55 
56 namespace base {
57 namespace internal {
58 namespace {
59 
60 constexpr size_t kMaxTasks = 4;
61 constexpr size_t kNumThreadsPostingTasks = 4;
62 constexpr size_t kNumTasksPostedPerThread = 150;
63 // This can't be lower because Windows' WaitableEvent wakes up too early when a
64 // small timeout is used. This results in many spurious wake ups before a worker
65 // is allowed to cleanup.
66 constexpr TimeDelta kReclaimTimeForCleanupTests =
67     TimeDelta::FromMilliseconds(500);
68 
69 // Waits on |event| in a scope where the blocking observer is null, to avoid
70 // affecting the max tasks.
WaitWithoutBlockingObserver(WaitableEvent * event)71 void WaitWithoutBlockingObserver(WaitableEvent* event) {
72   internal::ScopedClearBlockingObserverForTesting clear_blocking_observer;
73   ScopedAllowBaseSyncPrimitivesForTesting allow_base_sync_primitives;
74   event->Wait();
75 }
76 
77 class TaskSchedulerWorkerPoolImplTestBase {
78  protected:
TaskSchedulerWorkerPoolImplTestBase()79   TaskSchedulerWorkerPoolImplTestBase()
80       : service_thread_("TaskSchedulerServiceThread"){};
81 
CommonSetUp(TimeDelta suggested_reclaim_time=TimeDelta::Max ())82   void CommonSetUp(TimeDelta suggested_reclaim_time = TimeDelta::Max()) {
83     CreateAndStartWorkerPool(suggested_reclaim_time, kMaxTasks);
84   }
85 
CommonTearDown()86   void CommonTearDown() {
87     service_thread_.Stop();
88     task_tracker_.FlushForTesting();
89     if (worker_pool_)
90       worker_pool_->JoinForTesting();
91   }
92 
CreateWorkerPool()93   void CreateWorkerPool() {
94     ASSERT_FALSE(worker_pool_);
95     service_thread_.Start();
96     delayed_task_manager_.Start(service_thread_.task_runner());
97     worker_pool_ = std::make_unique<SchedulerWorkerPoolImpl>(
98         "TestWorkerPool", "A", ThreadPriority::NORMAL,
99         task_tracker_.GetTrackedRef(), &delayed_task_manager_);
100     ASSERT_TRUE(worker_pool_);
101   }
102 
StartWorkerPool(TimeDelta suggested_reclaim_time,size_t max_tasks)103   virtual void StartWorkerPool(TimeDelta suggested_reclaim_time,
104                                size_t max_tasks) {
105     ASSERT_TRUE(worker_pool_);
106     worker_pool_->Start(
107         SchedulerWorkerPoolParams(max_tasks, suggested_reclaim_time), max_tasks,
108         service_thread_.task_runner(), nullptr,
109         SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
110   }
111 
CreateAndStartWorkerPool(TimeDelta suggested_reclaim_time,size_t max_tasks)112   void CreateAndStartWorkerPool(TimeDelta suggested_reclaim_time,
113                                 size_t max_tasks) {
114     CreateWorkerPool();
115     StartWorkerPool(suggested_reclaim_time, max_tasks);
116   }
117 
118   Thread service_thread_;
119   TaskTracker task_tracker_ = {"Test"};
120 
121   std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool_;
122 
123  private:
124   DelayedTaskManager delayed_task_manager_;
125 
126   DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTestBase);
127 };
128 
129 class TaskSchedulerWorkerPoolImplTest
130     : public TaskSchedulerWorkerPoolImplTestBase,
131       public testing::Test {
132  protected:
133   TaskSchedulerWorkerPoolImplTest() = default;
134 
SetUp()135   void SetUp() override { TaskSchedulerWorkerPoolImplTestBase::CommonSetUp(); }
136 
TearDown()137   void TearDown() override {
138     TaskSchedulerWorkerPoolImplTestBase::CommonTearDown();
139   }
140 
141  private:
142   DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTest);
143 };
144 
145 class TaskSchedulerWorkerPoolImplTestParam
146     : public TaskSchedulerWorkerPoolImplTestBase,
147       public testing::TestWithParam<test::ExecutionMode> {
148  protected:
149   TaskSchedulerWorkerPoolImplTestParam() = default;
150 
SetUp()151   void SetUp() override { TaskSchedulerWorkerPoolImplTestBase::CommonSetUp(); }
152 
TearDown()153   void TearDown() override {
154     TaskSchedulerWorkerPoolImplTestBase::CommonTearDown();
155   }
156 
157  private:
158   DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTestParam);
159 };
160 
161 using PostNestedTask = test::TestTaskFactory::PostNestedTask;
162 
163 class ThreadPostingTasksWaitIdle : public SimpleThread {
164  public:
165   // Constructs a thread that posts tasks to |worker_pool| through an
166   // |execution_mode| task runner. The thread waits until all workers in
167   // |worker_pool| are idle before posting a new task.
ThreadPostingTasksWaitIdle(SchedulerWorkerPoolImpl * worker_pool,test::ExecutionMode execution_mode)168   ThreadPostingTasksWaitIdle(SchedulerWorkerPoolImpl* worker_pool,
169                              test::ExecutionMode execution_mode)
170       : SimpleThread("ThreadPostingTasksWaitIdle"),
171         worker_pool_(worker_pool),
172         factory_(CreateTaskRunnerWithExecutionMode(worker_pool, execution_mode),
173                  execution_mode) {
174     DCHECK(worker_pool_);
175   }
176 
factory() const177   const test::TestTaskFactory* factory() const { return &factory_; }
178 
179  private:
Run()180   void Run() override {
181     EXPECT_FALSE(factory_.task_runner()->RunsTasksInCurrentSequence());
182 
183     for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
184       worker_pool_->WaitForAllWorkersIdleForTesting();
185       EXPECT_TRUE(factory_.PostTask(PostNestedTask::NO, Closure()));
186     }
187   }
188 
189   SchedulerWorkerPoolImpl* const worker_pool_;
190   const scoped_refptr<TaskRunner> task_runner_;
191   test::TestTaskFactory factory_;
192 
193   DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasksWaitIdle);
194 };
195 
196 }  // namespace
197 
TEST_P(TaskSchedulerWorkerPoolImplTestParam,PostTasksWaitAllWorkersIdle)198 TEST_P(TaskSchedulerWorkerPoolImplTestParam, PostTasksWaitAllWorkersIdle) {
199   // Create threads to post tasks. To verify that workers can sleep and be woken
200   // up when new tasks are posted, wait for all workers to become idle before
201   // posting a new task.
202   std::vector<std::unique_ptr<ThreadPostingTasksWaitIdle>>
203       threads_posting_tasks;
204   for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
205     threads_posting_tasks.push_back(
206         std::make_unique<ThreadPostingTasksWaitIdle>(worker_pool_.get(),
207                                                      GetParam()));
208     threads_posting_tasks.back()->Start();
209   }
210 
211   // Wait for all tasks to run.
212   for (const auto& thread_posting_tasks : threads_posting_tasks) {
213     thread_posting_tasks->Join();
214     thread_posting_tasks->factory()->WaitForAllTasksToRun();
215   }
216 
217   // Wait until all workers are idle to be sure that no task accesses its
218   // TestTaskFactory after |thread_posting_tasks| is destroyed.
219   worker_pool_->WaitForAllWorkersIdleForTesting();
220 }
221 
TEST_P(TaskSchedulerWorkerPoolImplTestParam,PostTasksWithOneAvailableWorker)222 TEST_P(TaskSchedulerWorkerPoolImplTestParam, PostTasksWithOneAvailableWorker) {
223   // Post blocking tasks to keep all workers busy except one until |event| is
224   // signaled. Use different factories so that tasks are added to different
225   // sequences and can run simultaneously when the execution mode is SEQUENCED.
226   WaitableEvent event;
227   std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories;
228   for (size_t i = 0; i < (kMaxTasks - 1); ++i) {
229     blocked_task_factories.push_back(std::make_unique<test::TestTaskFactory>(
230         CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
231         GetParam()));
232     EXPECT_TRUE(blocked_task_factories.back()->PostTask(
233         PostNestedTask::NO,
234         BindOnce(&WaitWithoutBlockingObserver, Unretained(&event))));
235     blocked_task_factories.back()->WaitForAllTasksToRun();
236   }
237 
238   // Post |kNumTasksPostedPerThread| tasks that should all run despite the fact
239   // that only one worker in |worker_pool_| isn't busy.
240   test::TestTaskFactory short_task_factory(
241       CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
242       GetParam());
243   for (size_t i = 0; i < kNumTasksPostedPerThread; ++i)
244     EXPECT_TRUE(short_task_factory.PostTask(PostNestedTask::NO, Closure()));
245   short_task_factory.WaitForAllTasksToRun();
246 
247   // Release tasks waiting on |event|.
248   event.Signal();
249 
250   // Wait until all workers are idle to be sure that no task accesses
251   // its TestTaskFactory after it is destroyed.
252   worker_pool_->WaitForAllWorkersIdleForTesting();
253 }
254 
TEST_P(TaskSchedulerWorkerPoolImplTestParam,Saturate)255 TEST_P(TaskSchedulerWorkerPoolImplTestParam, Saturate) {
256   // Verify that it is possible to have |kMaxTasks| tasks/sequences running
257   // simultaneously. Use different factories so that the blocking tasks are
258   // added to different sequences and can run simultaneously when the execution
259   // mode is SEQUENCED.
260   WaitableEvent event;
261   std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
262   for (size_t i = 0; i < kMaxTasks; ++i) {
263     factories.push_back(std::make_unique<test::TestTaskFactory>(
264         CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam()),
265         GetParam()));
266     EXPECT_TRUE(factories.back()->PostTask(
267         PostNestedTask::NO,
268         BindOnce(&WaitWithoutBlockingObserver, Unretained(&event))));
269     factories.back()->WaitForAllTasksToRun();
270   }
271 
272   // Release tasks waiting on |event|.
273   event.Signal();
274 
275   // Wait until all workers are idle to be sure that no task accesses
276   // its TestTaskFactory after it is destroyed.
277   worker_pool_->WaitForAllWorkersIdleForTesting();
278 }
279 
280 #if defined(OS_WIN)
TEST_P(TaskSchedulerWorkerPoolImplTestParam,NoEnvironment)281 TEST_P(TaskSchedulerWorkerPoolImplTestParam, NoEnvironment) {
282   // Verify that COM is not initialized in a SchedulerWorkerPoolImpl initialized
283   // with SchedulerWorkerPoolImpl::WorkerEnvironment::NONE.
284   scoped_refptr<TaskRunner> task_runner =
285       CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam());
286 
287   WaitableEvent task_running;
288   task_runner->PostTask(
289       FROM_HERE, BindOnce(
290                      [](WaitableEvent* task_running) {
291                        win::AssertComApartmentType(win::ComApartmentType::NONE);
292                        task_running->Signal();
293                      },
294                      &task_running));
295 
296   task_running.Wait();
297 
298   worker_pool_->WaitForAllWorkersIdleForTesting();
299 }
300 #endif  // defined(OS_WIN)
301 
302 INSTANTIATE_TEST_CASE_P(Parallel,
303                         TaskSchedulerWorkerPoolImplTestParam,
304                         ::testing::Values(test::ExecutionMode::PARALLEL));
305 INSTANTIATE_TEST_CASE_P(Sequenced,
306                         TaskSchedulerWorkerPoolImplTestParam,
307                         ::testing::Values(test::ExecutionMode::SEQUENCED));
308 
309 #if defined(OS_WIN)
310 
311 namespace {
312 
313 class TaskSchedulerWorkerPoolImplTestCOMMTAParam
314     : public TaskSchedulerWorkerPoolImplTestBase,
315       public testing::TestWithParam<test::ExecutionMode> {
316  protected:
317   TaskSchedulerWorkerPoolImplTestCOMMTAParam() = default;
318 
SetUp()319   void SetUp() override { TaskSchedulerWorkerPoolImplTestBase::CommonSetUp(); }
320 
TearDown()321   void TearDown() override {
322     TaskSchedulerWorkerPoolImplTestBase::CommonTearDown();
323   }
324 
325  private:
StartWorkerPool(TimeDelta suggested_reclaim_time,size_t max_tasks)326   void StartWorkerPool(TimeDelta suggested_reclaim_time,
327                        size_t max_tasks) override {
328     ASSERT_TRUE(worker_pool_);
329     worker_pool_->Start(
330         SchedulerWorkerPoolParams(max_tasks, suggested_reclaim_time), max_tasks,
331         service_thread_.task_runner(), nullptr,
332         SchedulerWorkerPoolImpl::WorkerEnvironment::COM_MTA);
333   }
334 
335   DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolImplTestCOMMTAParam);
336 };
337 
338 }  // namespace
339 
TEST_P(TaskSchedulerWorkerPoolImplTestCOMMTAParam,COMMTAInitialized)340 TEST_P(TaskSchedulerWorkerPoolImplTestCOMMTAParam, COMMTAInitialized) {
341   // Verify that SchedulerWorkerPoolImpl workers have a COM MTA available.
342   scoped_refptr<TaskRunner> task_runner =
343       CreateTaskRunnerWithExecutionMode(worker_pool_.get(), GetParam());
344 
345   WaitableEvent task_running;
346   task_runner->PostTask(
347       FROM_HERE, BindOnce(
348                      [](WaitableEvent* task_running) {
349                        win::AssertComApartmentType(win::ComApartmentType::MTA);
350                        task_running->Signal();
351                      },
352                      &task_running));
353 
354   task_running.Wait();
355 
356   worker_pool_->WaitForAllWorkersIdleForTesting();
357 }
358 
359 INSTANTIATE_TEST_CASE_P(Parallel,
360                         TaskSchedulerWorkerPoolImplTestCOMMTAParam,
361                         ::testing::Values(test::ExecutionMode::PARALLEL));
362 INSTANTIATE_TEST_CASE_P(Sequenced,
363                         TaskSchedulerWorkerPoolImplTestCOMMTAParam,
364                         ::testing::Values(test::ExecutionMode::SEQUENCED));
365 
366 #endif  // defined(OS_WIN)
367 
368 namespace {
369 
370 class TaskSchedulerWorkerPoolImplStartInBodyTest
371     : public TaskSchedulerWorkerPoolImplTest {
372  public:
SetUp()373   void SetUp() override {
374     CreateWorkerPool();
375     // Let the test start the worker pool.
376   }
377 };
378 
TaskPostedBeforeStart(PlatformThreadRef * platform_thread_ref,WaitableEvent * task_running,WaitableEvent * barrier)379 void TaskPostedBeforeStart(PlatformThreadRef* platform_thread_ref,
380                            WaitableEvent* task_running,
381                            WaitableEvent* barrier) {
382   *platform_thread_ref = PlatformThread::CurrentRef();
383   task_running->Signal();
384   WaitWithoutBlockingObserver(barrier);
385 }
386 
387 }  // namespace
388 
389 // Verify that 2 tasks posted before Start() to a SchedulerWorkerPoolImpl with
390 // more than 2 workers run on different workers when Start() is called.
TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest,PostTasksBeforeStart)391 TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest, PostTasksBeforeStart) {
392   PlatformThreadRef task_1_thread_ref;
393   PlatformThreadRef task_2_thread_ref;
394   WaitableEvent task_1_running;
395   WaitableEvent task_2_running;
396 
397   // This event is used to prevent a task from completing before the other task
398   // starts running. If that happened, both tasks could run on the same worker
399   // and this test couldn't verify that the correct number of workers were woken
400   // up.
401   WaitableEvent barrier;
402 
403   worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()})
404       ->PostTask(
405           FROM_HERE,
406           BindOnce(&TaskPostedBeforeStart, Unretained(&task_1_thread_ref),
407                    Unretained(&task_1_running), Unretained(&barrier)));
408   worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()})
409       ->PostTask(
410           FROM_HERE,
411           BindOnce(&TaskPostedBeforeStart, Unretained(&task_2_thread_ref),
412                    Unretained(&task_2_running), Unretained(&barrier)));
413 
414   // Workers should not be created and tasks should not run before the pool is
415   // started.
416   EXPECT_EQ(0U, worker_pool_->NumberOfWorkersForTesting());
417   EXPECT_FALSE(task_1_running.IsSignaled());
418   EXPECT_FALSE(task_2_running.IsSignaled());
419 
420   StartWorkerPool(TimeDelta::Max(), kMaxTasks);
421 
422   // Tasks should run shortly after the pool is started.
423   task_1_running.Wait();
424   task_2_running.Wait();
425 
426   // Tasks should run on different threads.
427   EXPECT_NE(task_1_thread_ref, task_2_thread_ref);
428 
429   barrier.Signal();
430   task_tracker_.FlushForTesting();
431 }
432 
433 // Verify that posting many tasks before Start will cause the number of workers
434 // to grow to |max_tasks_| during Start.
TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest,PostManyTasks)435 TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest, PostManyTasks) {
436   scoped_refptr<TaskRunner> task_runner =
437       worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
438   constexpr size_t kNumTasksPosted = 2 * kMaxTasks;
439   for (size_t i = 0; i < kNumTasksPosted; ++i)
440     task_runner->PostTask(FROM_HERE, DoNothing());
441 
442   EXPECT_EQ(0U, worker_pool_->NumberOfWorkersForTesting());
443 
444   StartWorkerPool(TimeDelta::Max(), kMaxTasks);
445   ASSERT_GT(kNumTasksPosted, worker_pool_->GetMaxTasksForTesting());
446   EXPECT_EQ(kMaxTasks, worker_pool_->GetMaxTasksForTesting());
447 
448   EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(),
449             worker_pool_->GetMaxTasksForTesting());
450 }
451 
452 namespace {
453 
454 constexpr size_t kMagicTlsValue = 42;
455 
456 class TaskSchedulerWorkerPoolCheckTlsReuse
457     : public TaskSchedulerWorkerPoolImplTest {
458  public:
SetTlsValueAndWait()459   void SetTlsValueAndWait() {
460     slot_.Set(reinterpret_cast<void*>(kMagicTlsValue));
461     WaitWithoutBlockingObserver(&waiter_);
462   }
463 
CountZeroTlsValuesAndWait(WaitableEvent * count_waiter)464   void CountZeroTlsValuesAndWait(WaitableEvent* count_waiter) {
465     if (!slot_.Get())
466       subtle::NoBarrier_AtomicIncrement(&zero_tls_values_, 1);
467 
468     count_waiter->Signal();
469     WaitWithoutBlockingObserver(&waiter_);
470   }
471 
472  protected:
473   TaskSchedulerWorkerPoolCheckTlsReuse() = default;
474 
SetUp()475   void SetUp() override {
476     CreateAndStartWorkerPool(kReclaimTimeForCleanupTests, kMaxTasks);
477   }
478 
479   subtle::Atomic32 zero_tls_values_ = 0;
480 
481   WaitableEvent waiter_;
482 
483  private:
484   ThreadLocalStorage::Slot slot_;
485 
486   DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolCheckTlsReuse);
487 };
488 
489 }  // namespace
490 
491 // Checks that at least one worker has been cleaned up by checking the TLS.
TEST_F(TaskSchedulerWorkerPoolCheckTlsReuse,CheckCleanupWorkers)492 TEST_F(TaskSchedulerWorkerPoolCheckTlsReuse, CheckCleanupWorkers) {
493   // Saturate the workers and mark each worker's thread with a magic TLS value.
494   std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
495   for (size_t i = 0; i < kMaxTasks; ++i) {
496     factories.push_back(std::make_unique<test::TestTaskFactory>(
497         worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()}),
498         test::ExecutionMode::PARALLEL));
499     ASSERT_TRUE(factories.back()->PostTask(
500         PostNestedTask::NO,
501         Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::SetTlsValueAndWait,
502              Unretained(this))));
503     factories.back()->WaitForAllTasksToRun();
504   }
505 
506   // Release tasks waiting on |waiter_|.
507   waiter_.Signal();
508   worker_pool_->WaitForAllWorkersIdleForTesting();
509 
510   // All workers should be done running by now, so reset for the next phase.
511   waiter_.Reset();
512 
513   // Wait for the worker pool to clean up at least one worker.
514   worker_pool_->WaitForWorkersCleanedUpForTesting(1U);
515 
516   // Saturate and count the worker threads that do not have the magic TLS value.
517   // If the value is not there, that means we're at a new worker.
518   std::vector<std::unique_ptr<WaitableEvent>> count_waiters;
519   for (auto& factory : factories) {
520     count_waiters.push_back(std::make_unique<WaitableEvent>());
521     ASSERT_TRUE(factory->PostTask(
522           PostNestedTask::NO,
523           Bind(&TaskSchedulerWorkerPoolCheckTlsReuse::CountZeroTlsValuesAndWait,
524                Unretained(this),
525                count_waiters.back().get())));
526     factory->WaitForAllTasksToRun();
527   }
528 
529   // Wait for all counters to complete.
530   for (auto& count_waiter : count_waiters)
531     count_waiter->Wait();
532 
533   EXPECT_GT(subtle::NoBarrier_Load(&zero_tls_values_), 0);
534 
535   // Release tasks waiting on |waiter_|.
536   waiter_.Signal();
537 }
538 
539 namespace {
540 
541 class TaskSchedulerWorkerPoolHistogramTest
542     : public TaskSchedulerWorkerPoolImplTest {
543  public:
544   TaskSchedulerWorkerPoolHistogramTest() = default;
545 
546  protected:
547   // Override SetUp() to allow every test case to initialize a worker pool with
548   // its own arguments.
SetUp()549   void SetUp() override {}
550 
551   // Floods |worker_pool_| with a single task each that blocks until
552   // |continue_event| is signaled. Every worker in the pool is blocked on
553   // |continue_event| when this method returns. Note: this helper can easily be
554   // generalized to be useful in other tests, but it's here for now because it's
555   // only used in a TaskSchedulerWorkerPoolHistogramTest at the moment.
FloodPool(WaitableEvent * continue_event)556   void FloodPool(WaitableEvent* continue_event) {
557     ASSERT_FALSE(continue_event->IsSignaled());
558 
559     auto task_runner =
560         worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
561 
562     const auto max_tasks = worker_pool_->GetMaxTasksForTesting();
563 
564     WaitableEvent workers_flooded;
565     RepeatingClosure all_workers_running_barrier = BarrierClosure(
566         max_tasks,
567         BindOnce(&WaitableEvent::Signal, Unretained(&workers_flooded)));
568     for (size_t i = 0; i < max_tasks; ++i) {
569       task_runner->PostTask(
570           FROM_HERE,
571           BindOnce(
572               [](OnceClosure on_running, WaitableEvent* continue_event) {
573                 std::move(on_running).Run();
574                 WaitWithoutBlockingObserver(continue_event);
575               },
576               all_workers_running_barrier, continue_event));
577     }
578     workers_flooded.Wait();
579   }
580 
581  private:
582   std::unique_ptr<StatisticsRecorder> statistics_recorder_ =
583       StatisticsRecorder::CreateTemporaryForTesting();
584 
585   DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolHistogramTest);
586 };
587 
588 }  // namespace
589 
TEST_F(TaskSchedulerWorkerPoolHistogramTest,NumTasksBetweenWaits)590 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBetweenWaits) {
591   WaitableEvent event;
592   CreateAndStartWorkerPool(TimeDelta::Max(), kMaxTasks);
593   auto task_runner = worker_pool_->CreateSequencedTaskRunnerWithTraits(
594       {WithBaseSyncPrimitives()});
595 
596   // Post a task.
597   task_runner->PostTask(
598       FROM_HERE, BindOnce(&WaitWithoutBlockingObserver, Unretained(&event)));
599 
600   // Post 2 more tasks while the first task hasn't completed its execution. It
601   // is guaranteed that these tasks will run immediately after the first task,
602   // without allowing the worker to sleep.
603   task_runner->PostTask(FROM_HERE, DoNothing());
604   task_runner->PostTask(FROM_HERE, DoNothing());
605 
606   // Allow tasks to run and wait until the SchedulerWorker is idle.
607   event.Signal();
608   worker_pool_->WaitForAllWorkersIdleForTesting();
609 
610   // Wake up the SchedulerWorker that just became idle by posting a task and
611   // wait until it becomes idle again. The SchedulerWorker should record the
612   // TaskScheduler.NumTasksBetweenWaits.* histogram on wake up.
613   task_runner->PostTask(FROM_HERE, DoNothing());
614   worker_pool_->WaitForAllWorkersIdleForTesting();
615 
616   // Verify that counts were recorded to the histogram as expected.
617   const auto* histogram = worker_pool_->num_tasks_between_waits_histogram();
618   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
619   EXPECT_EQ(1, histogram->SnapshotSamples()->GetCount(3));
620   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
621 }
622 
623 // Verifies that NumTasksBetweenWaits histogram is logged as expected across
624 // idle and cleanup periods.
TEST_F(TaskSchedulerWorkerPoolHistogramTest,NumTasksBetweenWaitsWithIdlePeriodAndCleanup)625 TEST_F(TaskSchedulerWorkerPoolHistogramTest,
626        NumTasksBetweenWaitsWithIdlePeriodAndCleanup) {
627   WaitableEvent tasks_can_exit_event;
628   CreateAndStartWorkerPool(kReclaimTimeForCleanupTests, kMaxTasks);
629 
630   WaitableEvent workers_continue;
631 
632   FloodPool(&workers_continue);
633 
634   const auto* histogram = worker_pool_->num_tasks_between_waits_histogram();
635 
636   // NumTasksBetweenWaits shouldn't be logged until idle.
637   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
638   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(1));
639   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
640 
641   // Make all workers go idle.
642   workers_continue.Signal();
643   worker_pool_->WaitForAllWorkersIdleForTesting();
644 
645   // All workers should have reported a single hit in the "1" bucket per the the
646   // histogram being reported when going idle and each worker having processed
647   // precisely 1 task per the controlled flooding logic above.
648   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
649   EXPECT_EQ(static_cast<int>(kMaxTasks),
650             histogram->SnapshotSamples()->GetCount(1));
651   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
652 
653   worker_pool_->WaitForWorkersCleanedUpForTesting(kMaxTasks - 1);
654 
655   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
656   EXPECT_EQ(static_cast<int>(kMaxTasks),
657             histogram->SnapshotSamples()->GetCount(1));
658   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
659 
660   // Flooding the pool once again (without letting any workers go idle)
661   // shouldn't affect the counts either.
662 
663   workers_continue.Reset();
664   FloodPool(&workers_continue);
665 
666   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
667   EXPECT_EQ(static_cast<int>(kMaxTasks),
668             histogram->SnapshotSamples()->GetCount(1));
669   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
670 
671   workers_continue.Signal();
672   worker_pool_->WaitForAllWorkersIdleForTesting();
673 }
674 
TEST_F(TaskSchedulerWorkerPoolHistogramTest,NumTasksBeforeCleanup)675 TEST_F(TaskSchedulerWorkerPoolHistogramTest, NumTasksBeforeCleanup) {
676   CreateWorkerPool();
677   auto histogrammed_thread_task_runner =
678       worker_pool_->CreateSequencedTaskRunnerWithTraits(
679           {WithBaseSyncPrimitives()});
680 
681   // Post 3 tasks and hold the thread for idle thread stack ordering.
682   // This test assumes |histogrammed_thread_task_runner| gets assigned the same
683   // thread for each of its tasks.
684   PlatformThreadRef thread_ref;
685   histogrammed_thread_task_runner->PostTask(
686       FROM_HERE, BindOnce(
687                      [](PlatformThreadRef* thread_ref) {
688                        ASSERT_TRUE(thread_ref);
689                        *thread_ref = PlatformThread::CurrentRef();
690                      },
691                      Unretained(&thread_ref)));
692   histogrammed_thread_task_runner->PostTask(
693       FROM_HERE, BindOnce(
694                      [](PlatformThreadRef* thread_ref) {
695                        ASSERT_FALSE(thread_ref->is_null());
696                        EXPECT_EQ(*thread_ref, PlatformThread::CurrentRef());
697                      },
698                      Unretained(&thread_ref)));
699 
700   WaitableEvent cleanup_thread_running;
701   WaitableEvent cleanup_thread_continue;
702   histogrammed_thread_task_runner->PostTask(
703       FROM_HERE,
704       BindOnce(
705           [](PlatformThreadRef* thread_ref,
706              WaitableEvent* cleanup_thread_running,
707              WaitableEvent* cleanup_thread_continue) {
708             ASSERT_FALSE(thread_ref->is_null());
709             EXPECT_EQ(*thread_ref, PlatformThread::CurrentRef());
710             cleanup_thread_running->Signal();
711             WaitWithoutBlockingObserver(cleanup_thread_continue);
712           },
713           Unretained(&thread_ref), Unretained(&cleanup_thread_running),
714           Unretained(&cleanup_thread_continue)));
715 
716   // Start the worker pool with 2 workers, to avoid depending on the scheduler's
717   // logic to always keep one extra idle worker.
718   //
719   // The pool is started after the 3 initial tasks have been posted to ensure
720   // that they are scheduled on the same worker. If the tasks could run as they
721   // are posted, there would be a chance that:
722   // 1. Worker #1:        Runs a tasks and empties the sequence, without adding
723   //                      itself to the idle stack yet.
724   // 2. Posting thread:   Posts another task to the now empty sequence. Wakes
725   //                      up a new worker, since worker #1 isn't on the idle
726   //                      stack yet.
727   // 3: Worker #2:        Runs the tasks, violating the expectation that the 3
728   //                      initial tasks run on the same worker.
729   constexpr size_t kTwoWorkers = 2;
730   StartWorkerPool(kReclaimTimeForCleanupTests, kTwoWorkers);
731 
732   // Wait until the 3rd task is scheduled.
733   cleanup_thread_running.Wait();
734 
735   // To allow the SchedulerWorker associated with
736   // |histogrammed_thread_task_runner| to cleanup, make sure it isn't on top of
737   // the idle stack by waking up another SchedulerWorker via
738   // |task_runner_for_top_idle|. |histogrammed_thread_task_runner| should
739   // release and go idle first and then |task_runner_for_top_idle| should
740   // release and go idle. This allows the SchedulerWorker associated with
741   // |histogrammed_thread_task_runner| to cleanup.
742   WaitableEvent top_idle_thread_running;
743   WaitableEvent top_idle_thread_continue;
744   auto task_runner_for_top_idle =
745       worker_pool_->CreateSequencedTaskRunnerWithTraits(
746           {WithBaseSyncPrimitives()});
747   task_runner_for_top_idle->PostTask(
748       FROM_HERE, BindOnce(
749                      [](PlatformThreadRef thread_ref,
750                         WaitableEvent* top_idle_thread_running,
751                         WaitableEvent* top_idle_thread_continue) {
752                        ASSERT_FALSE(thread_ref.is_null());
753                        EXPECT_NE(thread_ref, PlatformThread::CurrentRef())
754                            << "Worker reused. Worker will not cleanup and the "
755                               "histogram value will be wrong.";
756                        top_idle_thread_running->Signal();
757                        WaitWithoutBlockingObserver(top_idle_thread_continue);
758                      },
759                      thread_ref, Unretained(&top_idle_thread_running),
760                      Unretained(&top_idle_thread_continue)));
761   top_idle_thread_running.Wait();
762   EXPECT_EQ(0U, worker_pool_->NumberOfIdleWorkersForTesting());
763   cleanup_thread_continue.Signal();
764   // Wait for the cleanup thread to also become idle.
765   worker_pool_->WaitForWorkersIdleForTesting(1U);
766   top_idle_thread_continue.Signal();
767   // Allow the thread processing the |histogrammed_thread_task_runner| work to
768   // cleanup.
769   worker_pool_->WaitForWorkersCleanedUpForTesting(1U);
770 
771   // Verify that counts were recorded to the histogram as expected.
772   const auto* histogram = worker_pool_->num_tasks_before_detach_histogram();
773   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(0));
774   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(1));
775   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(2));
776   EXPECT_EQ(1, histogram->SnapshotSamples()->GetCount(3));
777   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(4));
778   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(5));
779   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(6));
780   EXPECT_EQ(0, histogram->SnapshotSamples()->GetCount(10));
781 }
782 
783 namespace {
784 
785 class TaskSchedulerWorkerPoolStandbyPolicyTest
786     : public TaskSchedulerWorkerPoolImplTestBase,
787       public testing::Test {
788  public:
789   TaskSchedulerWorkerPoolStandbyPolicyTest() = default;
790 
SetUp()791   void SetUp() override {
792     TaskSchedulerWorkerPoolImplTestBase::CommonSetUp(
793         kReclaimTimeForCleanupTests);
794   }
795 
TearDown()796   void TearDown() override {
797     TaskSchedulerWorkerPoolImplTestBase::CommonTearDown();
798   }
799 
800  private:
801   DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolStandbyPolicyTest);
802 };
803 
804 }  // namespace
805 
TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest,InitOne)806 TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest, InitOne) {
807   EXPECT_EQ(1U, worker_pool_->NumberOfWorkersForTesting());
808 }
809 
810 // Verify that the SchedulerWorkerPoolImpl keeps at least one idle standby
811 // thread, capacity permitting.
TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest,VerifyStandbyThread)812 TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest, VerifyStandbyThread) {
813   auto task_runner =
814       worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
815 
816   WaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC);
817   WaitableEvent threads_continue;
818 
819   RepeatingClosure thread_blocker = BindLambdaForTesting([&]() {
820     thread_running.Signal();
821     WaitWithoutBlockingObserver(&threads_continue);
822   });
823 
824   // There should be one idle thread until we reach capacity
825   for (size_t i = 0; i < kMaxTasks; ++i) {
826     EXPECT_EQ(i + 1, worker_pool_->NumberOfWorkersForTesting());
827     task_runner->PostTask(FROM_HERE, thread_blocker);
828     thread_running.Wait();
829   }
830 
831   // There should not be an extra idle thread if it means going above capacity
832   EXPECT_EQ(kMaxTasks, worker_pool_->NumberOfWorkersForTesting());
833 
834   threads_continue.Signal();
835   // Wait long enough for all but one worker to clean up.
836   worker_pool_->WaitForWorkersCleanedUpForTesting(kMaxTasks - 1);
837   EXPECT_EQ(1U, worker_pool_->NumberOfWorkersForTesting());
838   // Give extra time for a worker to cleanup : none should as the pool is
839   // expected to keep a worker ready regardless of how long it was idle for.
840   PlatformThread::Sleep(kReclaimTimeForCleanupTests);
841   EXPECT_EQ(1U, worker_pool_->NumberOfWorkersForTesting());
842 }
843 
844 // Verify that being "the" idle thread counts as being active (i.e. won't be
845 // reclaimed even if not on top of the idle stack when reclaim timeout expires).
846 // Regression test for https://crbug.com/847501.
TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest,InAndOutStandbyThreadIsActive)847 TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest,
848        InAndOutStandbyThreadIsActive) {
849   auto sequenced_task_runner =
850       worker_pool_->CreateSequencedTaskRunnerWithTraits({});
851 
852   WaitableEvent timer_started;
853 
854   RepeatingTimer recurring_task;
855   sequenced_task_runner->PostTask(
856       FROM_HERE, BindLambdaForTesting([&]() {
857         recurring_task.Start(FROM_HERE, kReclaimTimeForCleanupTests / 2,
858                              DoNothing());
859         timer_started.Signal();
860       }));
861 
862   timer_started.Wait();
863 
864   // Running a task should have brought up a new standby thread.
865   EXPECT_EQ(2U, worker_pool_->NumberOfWorkersForTesting());
866 
867   // Give extra time for a worker to cleanup : none should as the two workers
868   // are both considered "active" per the timer ticking faster than the reclaim
869   // timeout.
870   PlatformThread::Sleep(kReclaimTimeForCleanupTests * 2);
871   EXPECT_EQ(2U, worker_pool_->NumberOfWorkersForTesting());
872 
873   sequenced_task_runner->PostTask(
874       FROM_HERE, BindLambdaForTesting([&]() { recurring_task.Stop(); }));
875 
876   // Stopping the recurring task should let the second worker be reclaimed per
877   // not being "the" standby thread for a full reclaim timeout.
878   worker_pool_->WaitForWorkersCleanedUpForTesting(1);
879   EXPECT_EQ(1U, worker_pool_->NumberOfWorkersForTesting());
880 }
881 
882 // Verify that being "the" idle thread counts as being active but isn't sticky.
883 // Regression test for https://crbug.com/847501.
TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest,OnlyKeepActiveStandbyThreads)884 TEST_F(TaskSchedulerWorkerPoolStandbyPolicyTest, OnlyKeepActiveStandbyThreads) {
885   auto sequenced_task_runner =
886       worker_pool_->CreateSequencedTaskRunnerWithTraits({});
887 
888   // Start this test like
889   // TaskSchedulerWorkerPoolStandbyPolicyTest.InAndOutStandbyThreadIsActive and
890   // give it some time to stabilize.
891   RepeatingTimer recurring_task;
892   sequenced_task_runner->PostTask(
893       FROM_HERE, BindLambdaForTesting([&]() {
894         recurring_task.Start(FROM_HERE, kReclaimTimeForCleanupTests / 2,
895                              DoNothing());
896       }));
897 
898   PlatformThread::Sleep(kReclaimTimeForCleanupTests * 2);
899   EXPECT_EQ(2U, worker_pool_->NumberOfWorkersForTesting());
900 
901   // Then also flood the pool (cycling the top of the idle stack).
902   {
903     auto task_runner =
904         worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
905 
906     WaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC);
907     WaitableEvent threads_continue;
908 
909     RepeatingClosure thread_blocker = BindLambdaForTesting([&]() {
910       thread_running.Signal();
911       WaitWithoutBlockingObserver(&threads_continue);
912     });
913 
914     for (size_t i = 0; i < kMaxTasks; ++i) {
915       task_runner->PostTask(FROM_HERE, thread_blocker);
916       thread_running.Wait();
917     }
918 
919     EXPECT_EQ(kMaxTasks, worker_pool_->NumberOfWorkersForTesting());
920     threads_continue.Signal();
921 
922     // Flush to ensure all references to |threads_continue| are gone before it
923     // goes out of scope.
924     task_tracker_.FlushForTesting();
925   }
926 
927   // All workers should clean up but two (since the timer is still running).
928   worker_pool_->WaitForWorkersCleanedUpForTesting(kMaxTasks - 2);
929   EXPECT_EQ(2U, worker_pool_->NumberOfWorkersForTesting());
930 
931   // Extra time shouldn't change this.
932   PlatformThread::Sleep(kReclaimTimeForCleanupTests * 2);
933   EXPECT_EQ(2U, worker_pool_->NumberOfWorkersForTesting());
934 
935   // Stopping the timer should let the number of active threads go down to one.
936   sequenced_task_runner->PostTask(
937       FROM_HERE, BindLambdaForTesting([&]() { recurring_task.Stop(); }));
938   worker_pool_->WaitForWorkersCleanedUpForTesting(1);
939   EXPECT_EQ(1U, worker_pool_->NumberOfWorkersForTesting());
940 }
941 
942 namespace {
943 
944 enum class OptionalBlockingType {
945   NO_BLOCK,
946   MAY_BLOCK,
947   WILL_BLOCK,
948 };
949 
950 struct NestedBlockingType {
NestedBlockingTypebase::internal::__anon70d02d881411::NestedBlockingType951   NestedBlockingType(BlockingType first_in,
952                      OptionalBlockingType second_in,
953                      BlockingType behaves_as_in)
954       : first(first_in), second(second_in), behaves_as(behaves_as_in) {}
955 
956   BlockingType first;
957   OptionalBlockingType second;
958   BlockingType behaves_as;
959 };
960 
961 class NestedScopedBlockingCall {
962  public:
NestedScopedBlockingCall(const NestedBlockingType & nested_blocking_type)963   NestedScopedBlockingCall(const NestedBlockingType& nested_blocking_type)
964       : first_scoped_blocking_call_(nested_blocking_type.first),
965         second_scoped_blocking_call_(
966             nested_blocking_type.second == OptionalBlockingType::WILL_BLOCK
967                 ? std::make_unique<ScopedBlockingCall>(BlockingType::WILL_BLOCK)
968                 : (nested_blocking_type.second ==
969                            OptionalBlockingType::MAY_BLOCK
970                        ? std::make_unique<ScopedBlockingCall>(
971                              BlockingType::MAY_BLOCK)
972                        : nullptr)) {}
973 
974  private:
975   ScopedBlockingCall first_scoped_blocking_call_;
976   std::unique_ptr<ScopedBlockingCall> second_scoped_blocking_call_;
977 
978   DISALLOW_COPY_AND_ASSIGN(NestedScopedBlockingCall);
979 };
980 
981 }  // namespace
982 
983 class TaskSchedulerWorkerPoolBlockingTest
984     : public TaskSchedulerWorkerPoolImplTestBase,
985       public testing::TestWithParam<NestedBlockingType> {
986  public:
987   TaskSchedulerWorkerPoolBlockingTest() = default;
988 
ParamInfoToString(::testing::TestParamInfo<NestedBlockingType> param_info)989   static std::string ParamInfoToString(
990       ::testing::TestParamInfo<NestedBlockingType> param_info) {
991     std::string str = param_info.param.first == BlockingType::MAY_BLOCK
992                           ? "MAY_BLOCK"
993                           : "WILL_BLOCK";
994     if (param_info.param.second == OptionalBlockingType::MAY_BLOCK)
995       str += "_MAY_BLOCK";
996     else if (param_info.param.second == OptionalBlockingType::WILL_BLOCK)
997       str += "_WILL_BLOCK";
998     return str;
999   }
1000 
SetUp()1001   void SetUp() override {
1002     TaskSchedulerWorkerPoolImplTestBase::CommonSetUp();
1003     task_runner_ =
1004         worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
1005   }
1006 
TearDown()1007   void TearDown() override {
1008     TaskSchedulerWorkerPoolImplTestBase::CommonTearDown();
1009   }
1010 
1011  protected:
1012   // Saturates the worker pool with a task that first blocks, waits to be
1013   // unblocked, then exits.
SaturateWithBlockingTasks(const NestedBlockingType & nested_blocking_type)1014   void SaturateWithBlockingTasks(
1015       const NestedBlockingType& nested_blocking_type) {
1016     ASSERT_FALSE(blocking_threads_running_.IsSignaled());
1017 
1018     RepeatingClosure blocking_threads_running_closure = BarrierClosure(
1019         kMaxTasks, BindOnce(&WaitableEvent::Signal,
1020                             Unretained(&blocking_threads_running_)));
1021 
1022     for (size_t i = 0; i < kMaxTasks; ++i) {
1023       task_runner_->PostTask(
1024           FROM_HERE,
1025           BindOnce(
1026               [](Closure* blocking_threads_running_closure,
1027                  WaitableEvent* blocking_threads_continue_,
1028                  const NestedBlockingType& nested_blocking_type) {
1029                 NestedScopedBlockingCall nested_scoped_blocking_call(
1030                     nested_blocking_type);
1031                 blocking_threads_running_closure->Run();
1032                 WaitWithoutBlockingObserver(blocking_threads_continue_);
1033               },
1034               Unretained(&blocking_threads_running_closure),
1035               Unretained(&blocking_threads_continue_), nested_blocking_type));
1036     }
1037     blocking_threads_running_.Wait();
1038   }
1039 
1040   // Returns how long we can expect a change to |max_tasks_| to occur
1041   // after a task has become blocked.
GetMaxTasksChangeSleepTime()1042   TimeDelta GetMaxTasksChangeSleepTime() {
1043     return std::max(SchedulerWorkerPoolImpl::kBlockedWorkersPollPeriod,
1044                     worker_pool_->MayBlockThreshold()) +
1045            TestTimeouts::tiny_timeout();
1046   }
1047 
1048   // Waits indefinitely, until |worker_pool_|'s max tasks increases to
1049   // |expected_max_tasks|.
ExpectMaxTasksIncreasesTo(size_t expected_max_tasks)1050   void ExpectMaxTasksIncreasesTo(size_t expected_max_tasks) {
1051     size_t max_tasks = worker_pool_->GetMaxTasksForTesting();
1052     while (max_tasks != expected_max_tasks) {
1053       PlatformThread::Sleep(GetMaxTasksChangeSleepTime());
1054       size_t new_max_tasks = worker_pool_->GetMaxTasksForTesting();
1055       ASSERT_GE(new_max_tasks, max_tasks);
1056       max_tasks = new_max_tasks;
1057     }
1058   }
1059 
1060   // Unblocks tasks posted by SaturateWithBlockingTasks().
UnblockTasks()1061   void UnblockTasks() { blocking_threads_continue_.Signal(); }
1062 
1063   scoped_refptr<TaskRunner> task_runner_;
1064 
1065  private:
1066   WaitableEvent blocking_threads_running_;
1067   WaitableEvent blocking_threads_continue_;
1068 
1069   DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolBlockingTest);
1070 };
1071 
1072 // Verify that BlockingScopeEntered() causes max tasks to increase and creates a
1073 // worker if needed. Also verify that BlockingScopeExited() decreases max tasks
1074 // after an increase.
TEST_P(TaskSchedulerWorkerPoolBlockingTest,ThreadBlockedUnblocked)1075 TEST_P(TaskSchedulerWorkerPoolBlockingTest, ThreadBlockedUnblocked) {
1076   ASSERT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1077 
1078   SaturateWithBlockingTasks(GetParam());
1079   if (GetParam().behaves_as == BlockingType::MAY_BLOCK)
1080     ExpectMaxTasksIncreasesTo(2 * kMaxTasks);
1081   // A range of possible number of workers is accepted because of
1082   // crbug.com/757897.
1083   EXPECT_GE(worker_pool_->NumberOfWorkersForTesting(), kMaxTasks + 1);
1084   EXPECT_LE(worker_pool_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
1085   EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), 2 * kMaxTasks);
1086 
1087   UnblockTasks();
1088   task_tracker_.FlushForTesting();
1089   EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1090 }
1091 
1092 // Verify that tasks posted in a saturated pool before a ScopedBlockingCall will
1093 // execute after ScopedBlockingCall is instantiated.
TEST_P(TaskSchedulerWorkerPoolBlockingTest,PostBeforeBlocking)1094 TEST_P(TaskSchedulerWorkerPoolBlockingTest, PostBeforeBlocking) {
1095   WaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC);
1096   WaitableEvent thread_can_block;
1097   WaitableEvent threads_continue;
1098 
1099   for (size_t i = 0; i < kMaxTasks; ++i) {
1100     task_runner_->PostTask(
1101         FROM_HERE,
1102         BindOnce(
1103             [](const NestedBlockingType& nested_blocking_type,
1104                WaitableEvent* thread_running, WaitableEvent* thread_can_block,
1105                WaitableEvent* threads_continue) {
1106               thread_running->Signal();
1107               WaitWithoutBlockingObserver(thread_can_block);
1108 
1109               NestedScopedBlockingCall nested_scoped_blocking_call(
1110                   nested_blocking_type);
1111               WaitWithoutBlockingObserver(threads_continue);
1112             },
1113             GetParam(), Unretained(&thread_running),
1114             Unretained(&thread_can_block), Unretained(&threads_continue)));
1115     thread_running.Wait();
1116   }
1117 
1118   // All workers should be occupied and the pool should be saturated. Workers
1119   // have not entered ScopedBlockingCall yet.
1120   EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(), kMaxTasks);
1121   EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1122 
1123   WaitableEvent extra_threads_running;
1124   WaitableEvent extra_threads_continue;
1125   RepeatingClosure extra_threads_running_barrier = BarrierClosure(
1126       kMaxTasks,
1127       BindOnce(&WaitableEvent::Signal, Unretained(&extra_threads_running)));
1128   for (size_t i = 0; i < kMaxTasks; ++i) {
1129     task_runner_->PostTask(FROM_HERE,
1130                            BindOnce(
1131                                [](Closure* extra_threads_running_barrier,
1132                                   WaitableEvent* extra_threads_continue) {
1133                                  extra_threads_running_barrier->Run();
1134                                  WaitWithoutBlockingObserver(
1135                                      extra_threads_continue);
1136                                },
1137                                Unretained(&extra_threads_running_barrier),
1138                                Unretained(&extra_threads_continue)));
1139   }
1140 
1141   // Allow tasks to enter ScopedBlockingCall. Workers should be created for the
1142   // tasks we just posted.
1143   thread_can_block.Signal();
1144   if (GetParam().behaves_as == BlockingType::MAY_BLOCK)
1145     ExpectMaxTasksIncreasesTo(2 * kMaxTasks);
1146 
1147   // Should not block forever.
1148   extra_threads_running.Wait();
1149   EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
1150   extra_threads_continue.Signal();
1151 
1152   threads_continue.Signal();
1153   task_tracker_.FlushForTesting();
1154 }
1155 // Verify that workers become idle when the pool is over-capacity and that
1156 // those workers do no work.
TEST_P(TaskSchedulerWorkerPoolBlockingTest,WorkersIdleWhenOverCapacity)1157 TEST_P(TaskSchedulerWorkerPoolBlockingTest, WorkersIdleWhenOverCapacity) {
1158   ASSERT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1159 
1160   SaturateWithBlockingTasks(GetParam());
1161   if (GetParam().behaves_as == BlockingType::MAY_BLOCK)
1162     ExpectMaxTasksIncreasesTo(2 * kMaxTasks);
1163   EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), 2 * kMaxTasks);
1164   // A range of possible number of workers is accepted because of
1165   // crbug.com/757897.
1166   EXPECT_GE(worker_pool_->NumberOfWorkersForTesting(), kMaxTasks + 1);
1167   EXPECT_LE(worker_pool_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
1168 
1169   WaitableEvent threads_running;
1170   WaitableEvent threads_continue;
1171 
1172   RepeatingClosure threads_running_barrier = BarrierClosure(
1173       kMaxTasks,
1174       BindOnce(&WaitableEvent::Signal, Unretained(&threads_running)));
1175   // Posting these tasks should cause new workers to be created.
1176   for (size_t i = 0; i < kMaxTasks; ++i) {
1177     auto callback = BindOnce(
1178         [](Closure* threads_running_barrier, WaitableEvent* threads_continue) {
1179           threads_running_barrier->Run();
1180           WaitWithoutBlockingObserver(threads_continue);
1181         },
1182         Unretained(&threads_running_barrier), Unretained(&threads_continue));
1183     task_runner_->PostTask(FROM_HERE, std::move(callback));
1184   }
1185   threads_running.Wait();
1186 
1187   ASSERT_EQ(worker_pool_->NumberOfIdleWorkersForTesting(), 0U);
1188   EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
1189 
1190   AtomicFlag is_exiting;
1191   // These tasks should not get executed until after other tasks become
1192   // unblocked.
1193   for (size_t i = 0; i < kMaxTasks; ++i) {
1194     task_runner_->PostTask(FROM_HERE, BindOnce(
1195                                           [](AtomicFlag* is_exiting) {
1196                                             EXPECT_TRUE(is_exiting->IsSet());
1197                                           },
1198                                           Unretained(&is_exiting)));
1199   }
1200 
1201   // The original |kMaxTasks| will finish their tasks after being
1202   // unblocked. There will be work in the work queue, but the pool should now
1203   // be over-capacity and workers will become idle.
1204   UnblockTasks();
1205   worker_pool_->WaitForWorkersIdleForTesting(kMaxTasks);
1206   EXPECT_EQ(worker_pool_->NumberOfIdleWorkersForTesting(), kMaxTasks);
1207 
1208   // Posting more tasks should not cause workers idle from the pool being over
1209   // capacity to begin doing work.
1210   for (size_t i = 0; i < kMaxTasks; ++i) {
1211     task_runner_->PostTask(FROM_HERE, BindOnce(
1212                                           [](AtomicFlag* is_exiting) {
1213                                             EXPECT_TRUE(is_exiting->IsSet());
1214                                           },
1215                                           Unretained(&is_exiting)));
1216   }
1217 
1218   // Give time for those idle workers to possibly do work (which should not
1219   // happen).
1220   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
1221 
1222   is_exiting.Set();
1223   // Unblocks the new workers.
1224   threads_continue.Signal();
1225   task_tracker_.FlushForTesting();
1226 }
1227 
1228 INSTANTIATE_TEST_CASE_P(
1229     ,
1230     TaskSchedulerWorkerPoolBlockingTest,
1231     ::testing::Values(NestedBlockingType(BlockingType::MAY_BLOCK,
1232                                          OptionalBlockingType::NO_BLOCK,
1233                                          BlockingType::MAY_BLOCK),
1234                       NestedBlockingType(BlockingType::WILL_BLOCK,
1235                                          OptionalBlockingType::NO_BLOCK,
1236                                          BlockingType::WILL_BLOCK),
1237                       NestedBlockingType(BlockingType::MAY_BLOCK,
1238                                          OptionalBlockingType::WILL_BLOCK,
1239                                          BlockingType::WILL_BLOCK),
1240                       NestedBlockingType(BlockingType::WILL_BLOCK,
1241                                          OptionalBlockingType::MAY_BLOCK,
1242                                          BlockingType::WILL_BLOCK)),
1243     TaskSchedulerWorkerPoolBlockingTest::ParamInfoToString);
1244 
1245 // Verify that if a thread enters the scope of a MAY_BLOCK ScopedBlockingCall,
1246 // but exits the scope before the MayBlockThreshold() is reached, that the max
1247 // tasks does not increase.
TEST_F(TaskSchedulerWorkerPoolBlockingTest,ThreadBlockUnblockPremature)1248 TEST_F(TaskSchedulerWorkerPoolBlockingTest, ThreadBlockUnblockPremature) {
1249   ASSERT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1250 
1251   TimeDelta max_tasks_change_sleep = GetMaxTasksChangeSleepTime();
1252   worker_pool_->MaximizeMayBlockThresholdForTesting();
1253 
1254   SaturateWithBlockingTasks(NestedBlockingType(BlockingType::MAY_BLOCK,
1255                                                OptionalBlockingType::NO_BLOCK,
1256                                                BlockingType::MAY_BLOCK));
1257   PlatformThread::Sleep(max_tasks_change_sleep);
1258   EXPECT_EQ(worker_pool_->NumberOfWorkersForTesting(), kMaxTasks);
1259   EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1260 
1261   UnblockTasks();
1262   task_tracker_.FlushForTesting();
1263   EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1264 }
1265 
1266 // Verify that if max tasks is incremented because of a MAY_BLOCK
1267 // ScopedBlockingCall, it isn't incremented again when there is a nested
1268 // WILL_BLOCK ScopedBlockingCall.
TEST_F(TaskSchedulerWorkerPoolBlockingTest,MayBlockIncreaseCapacityNestedWillBlock)1269 TEST_F(TaskSchedulerWorkerPoolBlockingTest,
1270        MayBlockIncreaseCapacityNestedWillBlock) {
1271   ASSERT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1272   auto task_runner =
1273       worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
1274   WaitableEvent can_return;
1275 
1276   // Saturate the pool so that a MAY_BLOCK ScopedBlockingCall would increment
1277   // the max tasks.
1278   for (size_t i = 0; i < kMaxTasks - 1; ++i) {
1279     task_runner->PostTask(FROM_HERE, BindOnce(&WaitWithoutBlockingObserver,
1280                                               Unretained(&can_return)));
1281   }
1282 
1283   WaitableEvent can_instantiate_will_block;
1284   WaitableEvent did_instantiate_will_block;
1285 
1286   // Post a task that instantiates a MAY_BLOCK ScopedBlockingCall.
1287   task_runner->PostTask(
1288       FROM_HERE,
1289       BindOnce(
1290           [](WaitableEvent* can_instantiate_will_block,
1291              WaitableEvent* did_instantiate_will_block,
1292              WaitableEvent* can_return) {
1293             ScopedBlockingCall may_block(BlockingType::MAY_BLOCK);
1294             WaitWithoutBlockingObserver(can_instantiate_will_block);
1295             ScopedBlockingCall will_block(BlockingType::WILL_BLOCK);
1296             did_instantiate_will_block->Signal();
1297             WaitWithoutBlockingObserver(can_return);
1298           },
1299           Unretained(&can_instantiate_will_block),
1300           Unretained(&did_instantiate_will_block), Unretained(&can_return)));
1301 
1302   // After a short delay, max tasks should be incremented.
1303   ExpectMaxTasksIncreasesTo(kMaxTasks + 1);
1304 
1305   // Wait until the task instantiates a WILL_BLOCK ScopedBlockingCall.
1306   can_instantiate_will_block.Signal();
1307   did_instantiate_will_block.Wait();
1308 
1309   // Max tasks shouldn't be incremented again.
1310   EXPECT_EQ(kMaxTasks + 1, worker_pool_->GetMaxTasksForTesting());
1311 
1312   // Tear down.
1313   can_return.Signal();
1314   task_tracker_.FlushForTesting();
1315   EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks);
1316 }
1317 
1318 // Verify that workers that become idle due to the pool being over capacity will
1319 // eventually cleanup.
TEST(TaskSchedulerWorkerPoolOverCapacityTest,VerifyCleanup)1320 TEST(TaskSchedulerWorkerPoolOverCapacityTest, VerifyCleanup) {
1321   constexpr size_t kLocalMaxTasks = 3;
1322 
1323   TaskTracker task_tracker("Test");
1324   DelayedTaskManager delayed_task_manager;
1325   scoped_refptr<TaskRunner> service_thread_task_runner =
1326       MakeRefCounted<TestSimpleTaskRunner>();
1327   delayed_task_manager.Start(service_thread_task_runner);
1328   SchedulerWorkerPoolImpl worker_pool(
1329       "OverCapacityTestWorkerPool", "A", ThreadPriority::NORMAL,
1330       task_tracker.GetTrackedRef(), &delayed_task_manager);
1331   worker_pool.Start(
1332       SchedulerWorkerPoolParams(kLocalMaxTasks, kReclaimTimeForCleanupTests),
1333       kLocalMaxTasks, service_thread_task_runner, nullptr,
1334       SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
1335 
1336   scoped_refptr<TaskRunner> task_runner =
1337       worker_pool.CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
1338 
1339   WaitableEvent threads_running;
1340   WaitableEvent threads_continue;
1341   RepeatingClosure threads_running_barrier = BarrierClosure(
1342       kLocalMaxTasks,
1343       BindOnce(&WaitableEvent::Signal, Unretained(&threads_running)));
1344 
1345   WaitableEvent blocked_call_continue;
1346   RepeatingClosure closure = BindRepeating(
1347       [](Closure* threads_running_barrier, WaitableEvent* threads_continue,
1348          WaitableEvent* blocked_call_continue) {
1349         threads_running_barrier->Run();
1350         {
1351           ScopedBlockingCall scoped_blocking_call(BlockingType::WILL_BLOCK);
1352           WaitWithoutBlockingObserver(blocked_call_continue);
1353         }
1354         WaitWithoutBlockingObserver(threads_continue);
1355       },
1356       Unretained(&threads_running_barrier), Unretained(&threads_continue),
1357       Unretained(&blocked_call_continue));
1358 
1359   for (size_t i = 0; i < kLocalMaxTasks; ++i)
1360     task_runner->PostTask(FROM_HERE, closure);
1361 
1362   threads_running.Wait();
1363 
1364   WaitableEvent extra_threads_running;
1365   WaitableEvent extra_threads_continue;
1366 
1367   RepeatingClosure extra_threads_running_barrier = BarrierClosure(
1368       kLocalMaxTasks,
1369       BindOnce(&WaitableEvent::Signal, Unretained(&extra_threads_running)));
1370   // These tasks should run on the new threads from increasing max tasks.
1371   for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1372     task_runner->PostTask(FROM_HERE,
1373                           BindOnce(
1374                               [](Closure* extra_threads_running_barrier,
1375                                  WaitableEvent* extra_threads_continue) {
1376                                 extra_threads_running_barrier->Run();
1377                                 WaitWithoutBlockingObserver(
1378                                     extra_threads_continue);
1379                               },
1380                               Unretained(&extra_threads_running_barrier),
1381                               Unretained(&extra_threads_continue)));
1382   }
1383   extra_threads_running.Wait();
1384 
1385   ASSERT_EQ(kLocalMaxTasks * 2, worker_pool.NumberOfWorkersForTesting());
1386   EXPECT_EQ(kLocalMaxTasks * 2, worker_pool.GetMaxTasksForTesting());
1387   blocked_call_continue.Signal();
1388   extra_threads_continue.Signal();
1389 
1390   // Periodically post tasks to ensure that posting tasks does not prevent
1391   // workers that are idle due to the pool being over capacity from cleaning up.
1392   for (int i = 0; i < 16; ++i) {
1393     task_runner->PostDelayedTask(FROM_HERE, DoNothing(),
1394                                  kReclaimTimeForCleanupTests * i * 0.5);
1395   }
1396 
1397   // Note: one worker above capacity will not get cleaned up since it's on the
1398   // top of the idle stack.
1399   worker_pool.WaitForWorkersCleanedUpForTesting(kLocalMaxTasks - 1);
1400   EXPECT_EQ(kLocalMaxTasks + 1, worker_pool.NumberOfWorkersForTesting());
1401 
1402   threads_continue.Signal();
1403 
1404   worker_pool.JoinForTesting();
1405 }
1406 
1407 // Verify that the maximum number of workers is 256 and that hitting the max
1408 // leaves the pool in a valid state with regards to max tasks.
TEST_F(TaskSchedulerWorkerPoolBlockingTest,MaximumWorkersTest)1409 TEST_F(TaskSchedulerWorkerPoolBlockingTest, MaximumWorkersTest) {
1410   constexpr size_t kMaxNumberOfWorkers = 256;
1411   constexpr size_t kNumExtraTasks = 10;
1412 
1413   WaitableEvent early_blocking_threads_running;
1414   RepeatingClosure early_threads_barrier_closure =
1415       BarrierClosure(kMaxNumberOfWorkers,
1416                      BindOnce(&WaitableEvent::Signal,
1417                               Unretained(&early_blocking_threads_running)));
1418 
1419   WaitableEvent early_threads_finished;
1420   RepeatingClosure early_threads_finished_barrier = BarrierClosure(
1421       kMaxNumberOfWorkers,
1422       BindOnce(&WaitableEvent::Signal, Unretained(&early_threads_finished)));
1423 
1424   WaitableEvent early_release_threads_continue;
1425 
1426   // Post ScopedBlockingCall tasks to hit the worker cap.
1427   for (size_t i = 0; i < kMaxNumberOfWorkers; ++i) {
1428     task_runner_->PostTask(FROM_HERE,
1429                            BindOnce(
1430                                [](Closure* early_threads_barrier_closure,
1431                                   WaitableEvent* early_release_threads_continue,
1432                                   Closure* early_threads_finished) {
1433                                  {
1434                                    ScopedBlockingCall scoped_blocking_call(
1435                                        BlockingType::WILL_BLOCK);
1436                                    early_threads_barrier_closure->Run();
1437                                    WaitWithoutBlockingObserver(
1438                                        early_release_threads_continue);
1439                                  }
1440                                  early_threads_finished->Run();
1441                                },
1442                                Unretained(&early_threads_barrier_closure),
1443                                Unretained(&early_release_threads_continue),
1444                                Unretained(&early_threads_finished_barrier)));
1445   }
1446 
1447   early_blocking_threads_running.Wait();
1448   EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(),
1449             kMaxTasks + kMaxNumberOfWorkers);
1450 
1451   WaitableEvent late_release_thread_contine;
1452   WaitableEvent late_blocking_threads_running;
1453 
1454   RepeatingClosure late_threads_barrier_closure = BarrierClosure(
1455       kNumExtraTasks, BindOnce(&WaitableEvent::Signal,
1456                                Unretained(&late_blocking_threads_running)));
1457 
1458   // Posts additional tasks. Note: we should already have |kMaxNumberOfWorkers|
1459   // tasks running. These tasks should not be able to get executed yet as
1460   // the pool is already at its max worker cap.
1461   for (size_t i = 0; i < kNumExtraTasks; ++i) {
1462     task_runner_->PostTask(
1463         FROM_HERE,
1464         BindOnce(
1465             [](Closure* late_threads_barrier_closure,
1466                WaitableEvent* late_release_thread_contine) {
1467               ScopedBlockingCall scoped_blocking_call(BlockingType::WILL_BLOCK);
1468               late_threads_barrier_closure->Run();
1469               WaitWithoutBlockingObserver(late_release_thread_contine);
1470             },
1471             Unretained(&late_threads_barrier_closure),
1472             Unretained(&late_release_thread_contine)));
1473   }
1474 
1475   // Give time to see if we exceed the max number of workers.
1476   PlatformThread::Sleep(TestTimeouts::tiny_timeout());
1477   EXPECT_LE(worker_pool_->NumberOfWorkersForTesting(), kMaxNumberOfWorkers);
1478 
1479   early_release_threads_continue.Signal();
1480   early_threads_finished.Wait();
1481   late_blocking_threads_running.Wait();
1482 
1483   WaitableEvent final_tasks_running;
1484   WaitableEvent final_tasks_continue;
1485   RepeatingClosure final_tasks_running_barrier = BarrierClosure(
1486       kMaxTasks,
1487       BindOnce(&WaitableEvent::Signal, Unretained(&final_tasks_running)));
1488 
1489   // Verify that we are still able to saturate the pool.
1490   for (size_t i = 0; i < kMaxTasks; ++i) {
1491     task_runner_->PostTask(
1492         FROM_HERE,
1493         BindOnce(
1494             [](Closure* closure, WaitableEvent* final_tasks_continue) {
1495               closure->Run();
1496               WaitWithoutBlockingObserver(final_tasks_continue);
1497             },
1498             Unretained(&final_tasks_running_barrier),
1499             Unretained(&final_tasks_continue)));
1500   }
1501   final_tasks_running.Wait();
1502   EXPECT_EQ(worker_pool_->GetMaxTasksForTesting(), kMaxTasks + kNumExtraTasks);
1503   late_release_thread_contine.Signal();
1504   final_tasks_continue.Signal();
1505   task_tracker_.FlushForTesting();
1506 }
1507 
1508 // Verify that the maximum number of background tasks that can run concurrently
1509 // is honored.
TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest,MaxBackgroundTasks)1510 TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest, MaxBackgroundTasks) {
1511   constexpr int kMaxBackgroundTasks = kMaxTasks / 2;
1512   worker_pool_->Start(
1513       SchedulerWorkerPoolParams(kMaxTasks, base::TimeDelta::Max()),
1514       kMaxBackgroundTasks, service_thread_.task_runner(), nullptr,
1515       SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
1516   const scoped_refptr<TaskRunner> foreground_runner =
1517       worker_pool_->CreateTaskRunnerWithTraits({MayBlock()});
1518   const scoped_refptr<TaskRunner> background_runner =
1519       worker_pool_->CreateTaskRunnerWithTraits(
1520           {TaskPriority::BACKGROUND, MayBlock()});
1521 
1522   // It should be possible to have |kMaxBackgroundTasks|
1523   // TaskPriority::BACKGROUND tasks running concurrently.
1524   WaitableEvent background_tasks_running;
1525   WaitableEvent unblock_background_tasks;
1526   RepeatingClosure background_tasks_running_barrier = BarrierClosure(
1527       kMaxBackgroundTasks,
1528       BindOnce(&WaitableEvent::Signal, Unretained(&background_tasks_running)));
1529 
1530   for (int i = 0; i < kMaxBackgroundTasks; ++i) {
1531     background_runner->PostTask(
1532         FROM_HERE, base::BindLambdaForTesting([&]() {
1533           background_tasks_running_barrier.Run();
1534           WaitWithoutBlockingObserver(&unblock_background_tasks);
1535         }));
1536   }
1537   background_tasks_running.Wait();
1538 
1539   // No more TaskPriority::BACKGROUND task should run.
1540   AtomicFlag extra_background_task_can_run;
1541   WaitableEvent extra_background_task_running;
1542   background_runner->PostTask(
1543       FROM_HERE, base::BindLambdaForTesting([&]() {
1544         EXPECT_TRUE(extra_background_task_can_run.IsSet());
1545         extra_background_task_running.Signal();
1546       }));
1547 
1548   // An extra foreground task should be able to run.
1549   WaitableEvent foreground_task_running;
1550   foreground_runner->PostTask(
1551       FROM_HERE, base::BindOnce(&WaitableEvent::Signal,
1552                                 Unretained(&foreground_task_running)));
1553   foreground_task_running.Wait();
1554 
1555   // Completion of the TaskPriority::BACKGROUND tasks should allow the extra
1556   // TaskPriority::BACKGROUND task to run.
1557   extra_background_task_can_run.Set();
1558   unblock_background_tasks.Signal();
1559   extra_background_task_running.Wait();
1560 
1561   // Tear down.
1562   task_tracker_.FlushForTesting();
1563 }
1564 
1565 namespace {
1566 
1567 class TaskSchedulerWorkerPoolBlockingCallAndMaxBackgroundTasksTest
1568     : public TaskSchedulerWorkerPoolImplTestBase,
1569       public testing::TestWithParam<BlockingType> {
1570  public:
1571   static constexpr int kMaxBackgroundTasks = kMaxTasks / 2;
1572 
1573   TaskSchedulerWorkerPoolBlockingCallAndMaxBackgroundTasksTest() = default;
1574 
SetUp()1575   void SetUp() override {
1576     CreateWorkerPool();
1577     worker_pool_->Start(
1578         SchedulerWorkerPoolParams(kMaxTasks, base::TimeDelta::Max()),
1579         kMaxBackgroundTasks, service_thread_.task_runner(), nullptr,
1580         SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
1581   }
1582 
TearDown()1583   void TearDown() override {
1584     TaskSchedulerWorkerPoolImplTestBase::CommonTearDown();
1585   }
1586 
1587  private:
1588   DISALLOW_COPY_AND_ASSIGN(
1589       TaskSchedulerWorkerPoolBlockingCallAndMaxBackgroundTasksTest);
1590 };
1591 
1592 }  // namespace
1593 
TEST_P(TaskSchedulerWorkerPoolBlockingCallAndMaxBackgroundTasksTest,BlockingCallAndMaxBackgroundTasksTest)1594 TEST_P(TaskSchedulerWorkerPoolBlockingCallAndMaxBackgroundTasksTest,
1595        BlockingCallAndMaxBackgroundTasksTest) {
1596   const scoped_refptr<TaskRunner> background_runner =
1597       worker_pool_->CreateTaskRunnerWithTraits(
1598           {TaskPriority::BACKGROUND, MayBlock()});
1599 
1600   // Post |kMaxBackgroundTasks| TaskPriority::BACKGROUND tasks that block in a
1601   // ScopedBlockingCall.
1602   WaitableEvent blocking_background_tasks_running;
1603   WaitableEvent unblock_blocking_background_tasks;
1604   RepeatingClosure blocking_background_tasks_running_barrier =
1605       BarrierClosure(kMaxBackgroundTasks,
1606                      BindOnce(&WaitableEvent::Signal,
1607                               Unretained(&blocking_background_tasks_running)));
1608   for (int i = 0; i < kMaxBackgroundTasks; ++i) {
1609     background_runner->PostTask(
1610         FROM_HERE, base::BindLambdaForTesting([&]() {
1611           blocking_background_tasks_running_barrier.Run();
1612           ScopedBlockingCall scoped_blocking_call(GetParam());
1613           WaitWithoutBlockingObserver(&unblock_blocking_background_tasks);
1614         }));
1615   }
1616   blocking_background_tasks_running.Wait();
1617 
1618   // Post an extra |kMaxBackgroundTasks| TaskPriority::BACKGROUND tasks. They
1619   // should be able to run, because the existing TaskPriority::BACKGROUND tasks
1620   // are blocked within a ScopedBlockingCall.
1621   //
1622   // Note: We block the tasks until they have all started running to make sure
1623   // that it is possible to run an extra |kMaxBackgroundTasks| concurrently.
1624   WaitableEvent background_tasks_running;
1625   WaitableEvent unblock_background_tasks;
1626   RepeatingClosure background_tasks_running_barrier = BarrierClosure(
1627       kMaxBackgroundTasks,
1628       BindOnce(&WaitableEvent::Signal, Unretained(&background_tasks_running)));
1629   for (int i = 0; i < kMaxBackgroundTasks; ++i) {
1630     background_runner->PostTask(
1631         FROM_HERE, base::BindLambdaForTesting([&]() {
1632           background_tasks_running_barrier.Run();
1633           WaitWithoutBlockingObserver(&unblock_background_tasks);
1634         }));
1635   }
1636   background_tasks_running.Wait();
1637 
1638   // Unblock all tasks and tear down.
1639   unblock_blocking_background_tasks.Signal();
1640   unblock_background_tasks.Signal();
1641   task_tracker_.FlushForTesting();
1642 }
1643 
1644 INSTANTIATE_TEST_CASE_P(
1645     MayBlock,
1646     TaskSchedulerWorkerPoolBlockingCallAndMaxBackgroundTasksTest,
1647     ::testing::Values(BlockingType::MAY_BLOCK));
1648 INSTANTIATE_TEST_CASE_P(
1649     WillBlock,
1650     TaskSchedulerWorkerPoolBlockingCallAndMaxBackgroundTasksTest,
1651     ::testing::Values(BlockingType::WILL_BLOCK));
1652 
1653 // Verify that worker detachement doesn't race with worker cleanup, regression
1654 // test for https://crbug.com/810464.
TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest,RacyCleanup)1655 TEST_F(TaskSchedulerWorkerPoolImplStartInBodyTest, RacyCleanup) {
1656 #if defined(OS_FUCHSIA)
1657   // Fuchsia + QEMU doesn't deal well with *many* threads being
1658   // created/destroyed at once: https://crbug.com/816575.
1659   constexpr size_t kLocalMaxTasks = 16;
1660 #else   // defined(OS_FUCHSIA)
1661   constexpr size_t kLocalMaxTasks = 256;
1662 #endif  // defined(OS_FUCHSIA)
1663   constexpr TimeDelta kReclaimTimeForRacyCleanupTest =
1664       TimeDelta::FromMilliseconds(10);
1665 
1666   worker_pool_->Start(
1667       SchedulerWorkerPoolParams(kLocalMaxTasks, kReclaimTimeForRacyCleanupTest),
1668       kLocalMaxTasks, service_thread_.task_runner(), nullptr,
1669       SchedulerWorkerPoolImpl::WorkerEnvironment::NONE);
1670 
1671   scoped_refptr<TaskRunner> task_runner =
1672       worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()});
1673 
1674   WaitableEvent threads_running;
1675   WaitableEvent unblock_threads;
1676   RepeatingClosure threads_running_barrier = BarrierClosure(
1677       kLocalMaxTasks,
1678       BindOnce(&WaitableEvent::Signal, Unretained(&threads_running)));
1679 
1680   for (size_t i = 0; i < kLocalMaxTasks; ++i) {
1681     task_runner->PostTask(
1682         FROM_HERE,
1683         BindOnce(
1684             [](OnceClosure on_running, WaitableEvent* unblock_threads) {
1685               std::move(on_running).Run();
1686               WaitWithoutBlockingObserver(unblock_threads);
1687             },
1688             threads_running_barrier, Unretained(&unblock_threads)));
1689   }
1690 
1691   // Wait for all workers to be ready and release them all at once.
1692   threads_running.Wait();
1693   unblock_threads.Signal();
1694 
1695   // Sleep to wakeup precisely when all workers are going to try to cleanup per
1696   // being idle.
1697   PlatformThread::Sleep(kReclaimTimeForRacyCleanupTest);
1698 
1699   worker_pool_->JoinForTesting();
1700 
1701   // Unwinding this test will be racy if worker cleanup can race with
1702   // SchedulerWorkerPoolImpl destruction : https://crbug.com/810464.
1703   worker_pool_.reset();
1704 }
1705 
1706 }  // namespace internal
1707 }  // namespace base
1708