• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/thread_pool/test_utils.h"
6 
7 #include <utility>
8 
9 #include "base/debug/leak_annotations.h"
10 #include "base/functional/bind.h"
11 #include "base/memory/raw_ptr.h"
12 #include "base/synchronization/condition_variable.h"
13 #include "base/task/thread_pool/pooled_parallel_task_runner.h"
14 #include "base/task/thread_pool/pooled_sequenced_task_runner.h"
15 #include "base/test/bind.h"
16 #include "base/threading/scoped_blocking_call_internal.h"
17 #include "base/threading/thread_restrictions.h"
18 #include "testing/gtest/include/gtest/gtest.h"
19 
20 namespace base {
21 namespace internal {
22 namespace test {
23 
24 namespace {
25 
26 // A task runner that posts each task as a MockJobTaskSource that runs a single
27 // task. This is used to run ThreadGroupTests which require a TaskRunner with
28 // kJob execution mode. Delayed tasks are not supported.
29 class MockJobTaskRunner : public TaskRunner {
30  public:
MockJobTaskRunner(const TaskTraits & traits,PooledTaskRunnerDelegate * pooled_task_runner_delegate)31   MockJobTaskRunner(const TaskTraits& traits,
32                     PooledTaskRunnerDelegate* pooled_task_runner_delegate)
33       : traits_(traits),
34         pooled_task_runner_delegate_(pooled_task_runner_delegate) {}
35 
36   MockJobTaskRunner(const MockJobTaskRunner&) = delete;
37   MockJobTaskRunner& operator=(const MockJobTaskRunner&) = delete;
38 
39   // TaskRunner:
40   bool PostDelayedTask(const Location& from_here,
41                        OnceClosure closure,
42                        TimeDelta delay) override;
43 
44  private:
45   ~MockJobTaskRunner() override = default;
46 
47   const TaskTraits traits_;
48   const raw_ptr<PooledTaskRunnerDelegate> pooled_task_runner_delegate_;
49 };
50 
PostDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)51 bool MockJobTaskRunner::PostDelayedTask(const Location& from_here,
52                                         OnceClosure closure,
53                                         TimeDelta delay) {
54   DCHECK_EQ(delay, TimeDelta());  // Jobs doesn't support delayed tasks.
55 
56   if (!PooledTaskRunnerDelegate::MatchesCurrentDelegate(
57           pooled_task_runner_delegate_)) {
58     return false;
59   }
60 
61   auto job_task = base::MakeRefCounted<MockJobTask>(std::move(closure));
62   scoped_refptr<JobTaskSource> task_source = job_task->GetJobTaskSource(
63       from_here, traits_, pooled_task_runner_delegate_);
64   return pooled_task_runner_delegate_->EnqueueJobTaskSource(
65       std::move(task_source));
66 }
67 
CreateJobTaskRunner(const TaskTraits & traits,MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate)68 scoped_refptr<TaskRunner> CreateJobTaskRunner(
69     const TaskTraits& traits,
70     MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate) {
71   return MakeRefCounted<MockJobTaskRunner>(traits,
72                                            mock_pooled_task_runner_delegate);
73 }
74 
75 }  // namespace
76 
MockWorkerThreadObserver()77 MockWorkerThreadObserver::MockWorkerThreadObserver()
78     : on_main_exit_cv_(lock_.CreateConditionVariable()) {}
79 
~MockWorkerThreadObserver()80 MockWorkerThreadObserver::~MockWorkerThreadObserver() {
81   WaitCallsOnMainExit();
82 }
83 
AllowCallsOnMainExit(int num_calls)84 void MockWorkerThreadObserver::AllowCallsOnMainExit(int num_calls) {
85   CheckedAutoLock auto_lock(lock_);
86   EXPECT_EQ(0, allowed_calls_on_main_exit_);
87   allowed_calls_on_main_exit_ = num_calls;
88 }
89 
WaitCallsOnMainExit()90 void MockWorkerThreadObserver::WaitCallsOnMainExit() {
91   CheckedAutoLock auto_lock(lock_);
92   while (allowed_calls_on_main_exit_ != 0)
93     on_main_exit_cv_->Wait();
94 }
95 
OnWorkerThreadMainExit()96 void MockWorkerThreadObserver::OnWorkerThreadMainExit() {
97   CheckedAutoLock auto_lock(lock_);
98   EXPECT_GE(allowed_calls_on_main_exit_, 0);
99   --allowed_calls_on_main_exit_;
100   if (allowed_calls_on_main_exit_ == 0)
101     on_main_exit_cv_->Signal();
102 }
103 
CreateSequenceWithTask(Task task,const TaskTraits & traits,scoped_refptr<TaskRunner> task_runner,TaskSourceExecutionMode execution_mode)104 scoped_refptr<Sequence> CreateSequenceWithTask(
105     Task task,
106     const TaskTraits& traits,
107     scoped_refptr<TaskRunner> task_runner,
108     TaskSourceExecutionMode execution_mode) {
109   scoped_refptr<Sequence> sequence =
110       MakeRefCounted<Sequence>(traits, task_runner.get(), execution_mode);
111   auto transaction = sequence->BeginTransaction();
112   transaction.WillPushImmediateTask();
113   transaction.PushImmediateTask(std::move(task));
114   return sequence;
115 }
116 
CreatePooledTaskRunnerWithExecutionMode(TaskSourceExecutionMode execution_mode,MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate,const TaskTraits & traits)117 scoped_refptr<TaskRunner> CreatePooledTaskRunnerWithExecutionMode(
118     TaskSourceExecutionMode execution_mode,
119     MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate,
120     const TaskTraits& traits) {
121   switch (execution_mode) {
122     case TaskSourceExecutionMode::kParallel:
123       return CreatePooledTaskRunner(traits, mock_pooled_task_runner_delegate);
124     case TaskSourceExecutionMode::kSequenced:
125       return CreatePooledSequencedTaskRunner(traits,
126                                              mock_pooled_task_runner_delegate);
127     case TaskSourceExecutionMode::kJob:
128       return CreateJobTaskRunner(traits, mock_pooled_task_runner_delegate);
129     default:
130       // Fall through.
131       break;
132   }
133   ADD_FAILURE() << "Unexpected ExecutionMode";
134   return nullptr;
135 }
136 
CreatePooledTaskRunner(const TaskTraits & traits,MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate)137 scoped_refptr<TaskRunner> CreatePooledTaskRunner(
138     const TaskTraits& traits,
139     MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate) {
140   return MakeRefCounted<PooledParallelTaskRunner>(
141       traits, mock_pooled_task_runner_delegate);
142 }
143 
CreatePooledSequencedTaskRunner(const TaskTraits & traits,MockPooledTaskRunnerDelegate * mock_pooled_task_runner_delegate)144 scoped_refptr<SequencedTaskRunner> CreatePooledSequencedTaskRunner(
145     const TaskTraits& traits,
146     MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate) {
147   return MakeRefCounted<PooledSequencedTaskRunner>(
148       traits, mock_pooled_task_runner_delegate);
149 }
150 
MockPooledTaskRunnerDelegate(TrackedRef<TaskTracker> task_tracker,DelayedTaskManager * delayed_task_manager)151 MockPooledTaskRunnerDelegate::MockPooledTaskRunnerDelegate(
152     TrackedRef<TaskTracker> task_tracker,
153     DelayedTaskManager* delayed_task_manager)
154     : task_tracker_(task_tracker),
155       delayed_task_manager_(delayed_task_manager) {}
156 
157 MockPooledTaskRunnerDelegate::~MockPooledTaskRunnerDelegate() = default;
158 
PostTaskWithSequence(Task task,scoped_refptr<Sequence> sequence)159 bool MockPooledTaskRunnerDelegate::PostTaskWithSequence(
160     Task task,
161     scoped_refptr<Sequence> sequence) {
162   // |thread_group_| must be initialized with SetThreadGroup() before
163   // proceeding.
164   DCHECK(thread_group_);
165   DCHECK(task.task);
166   DCHECK(sequence);
167 
168   if (!task_tracker_->WillPostTask(&task, sequence->shutdown_behavior())) {
169     // `task`'s destructor may run sequence-affine code, so it must be leaked
170     // when `WillPostTask` returns false.
171     auto leak = std::make_unique<Task>(std::move(task));
172     ANNOTATE_LEAKING_OBJECT_PTR(leak.get());
173     leak.release();
174     return false;
175   }
176 
177   if (task.delayed_run_time.is_null()) {
178     PostTaskWithSequenceNow(std::move(task), std::move(sequence));
179   } else {
180     // It's safe to take a ref on this pointer since the caller must have a ref
181     // to the TaskRunner in order to post.
182     scoped_refptr<TaskRunner> task_runner = sequence->task_runner();
183     delayed_task_manager_->AddDelayedTask(
184         std::move(task),
185         BindOnce(
186             [](scoped_refptr<Sequence> sequence,
187                MockPooledTaskRunnerDelegate* self, Task task) {
188               self->PostTaskWithSequenceNow(std::move(task),
189                                             std::move(sequence));
190             },
191             std::move(sequence), Unretained(this)),
192         std::move(task_runner));
193   }
194 
195   return true;
196 }
197 
PostTaskWithSequenceNow(Task task,scoped_refptr<Sequence> sequence)198 void MockPooledTaskRunnerDelegate::PostTaskWithSequenceNow(
199     Task task,
200     scoped_refptr<Sequence> sequence) {
201   auto transaction = sequence->BeginTransaction();
202   const bool sequence_should_be_queued = transaction.WillPushImmediateTask();
203   RegisteredTaskSource task_source;
204   if (sequence_should_be_queued) {
205     task_source = task_tracker_->RegisterTaskSource(std::move(sequence));
206     // We shouldn't push |task| if we're not allowed to queue |task_source|.
207     if (!task_source)
208       return;
209   }
210   transaction.PushImmediateTask(std::move(task));
211   if (task_source) {
212     thread_group_->PushTaskSourceAndWakeUpWorkers(
213         {std::move(task_source), std::move(transaction)});
214   }
215 }
216 
ShouldYield(const TaskSource * task_source)217 bool MockPooledTaskRunnerDelegate::ShouldYield(const TaskSource* task_source) {
218   return thread_group_->ShouldYield(task_source->GetSortKey());
219 }
220 
EnqueueJobTaskSource(scoped_refptr<JobTaskSource> task_source)221 bool MockPooledTaskRunnerDelegate::EnqueueJobTaskSource(
222     scoped_refptr<JobTaskSource> task_source) {
223   // |thread_group_| must be initialized with SetThreadGroup() before
224   // proceeding.
225   DCHECK(thread_group_);
226   DCHECK(task_source);
227 
228   auto registered_task_source =
229       task_tracker_->RegisterTaskSource(std::move(task_source));
230   if (!registered_task_source)
231     return false;
232   auto transaction = registered_task_source->BeginTransaction();
233   thread_group_->PushTaskSourceAndWakeUpWorkers(
234       {std::move(registered_task_source), std::move(transaction)});
235   return true;
236 }
237 
RemoveJobTaskSource(scoped_refptr<JobTaskSource> task_source)238 void MockPooledTaskRunnerDelegate::RemoveJobTaskSource(
239     scoped_refptr<JobTaskSource> task_source) {
240   thread_group_->RemoveTaskSource(*task_source);
241 }
242 
UpdatePriority(scoped_refptr<TaskSource> task_source,TaskPriority priority)243 void MockPooledTaskRunnerDelegate::UpdatePriority(
244     scoped_refptr<TaskSource> task_source,
245     TaskPriority priority) {
246   auto transaction = task_source->BeginTransaction();
247   transaction.UpdatePriority(priority);
248   thread_group_->UpdateSortKey(std::move(transaction));
249 }
250 
UpdateJobPriority(scoped_refptr<TaskSource> task_source,TaskPriority priority)251 void MockPooledTaskRunnerDelegate::UpdateJobPriority(
252     scoped_refptr<TaskSource> task_source,
253     TaskPriority priority) {
254   UpdatePriority(std::move(task_source), priority);
255 }
256 
SetThreadGroup(ThreadGroup * thread_group)257 void MockPooledTaskRunnerDelegate::SetThreadGroup(ThreadGroup* thread_group) {
258   thread_group_ = thread_group;
259 }
260 
261 MockJobTask::~MockJobTask() = default;
262 
MockJobTask(base::RepeatingCallback<void (JobDelegate *)> worker_task,size_t num_tasks_to_run)263 MockJobTask::MockJobTask(
264     base::RepeatingCallback<void(JobDelegate*)> worker_task,
265     size_t num_tasks_to_run)
266     : worker_task_(std::move(worker_task)),
267       remaining_num_tasks_to_run_(num_tasks_to_run) {}
268 
MockJobTask(base::OnceClosure worker_task)269 MockJobTask::MockJobTask(base::OnceClosure worker_task)
270     : worker_task_(base::BindRepeating(
271           [](base::OnceClosure&& worker_task, JobDelegate*) mutable {
272             std::move(worker_task).Run();
273           },
274           base::Passed(std::move(worker_task)))),
275       remaining_num_tasks_to_run_(1) {}
276 
GetMaxConcurrency(size_t) const277 size_t MockJobTask::GetMaxConcurrency(size_t /* worker_count */) const {
278   return remaining_num_tasks_to_run_.load();
279 }
280 
Run(JobDelegate * delegate)281 void MockJobTask::Run(JobDelegate* delegate) {
282   worker_task_.Run(delegate);
283   size_t before = remaining_num_tasks_to_run_.fetch_sub(1);
284   DCHECK_GT(before, 0U);
285 }
286 
GetJobTaskSource(const Location & from_here,const TaskTraits & traits,PooledTaskRunnerDelegate * delegate)287 scoped_refptr<JobTaskSource> MockJobTask::GetJobTaskSource(
288     const Location& from_here,
289     const TaskTraits& traits,
290     PooledTaskRunnerDelegate* delegate) {
291   return MakeRefCounted<JobTaskSource>(
292       from_here, traits, base::BindRepeating(&test::MockJobTask::Run, this),
293       base::BindRepeating(&test::MockJobTask::GetMaxConcurrency, this),
294       delegate);
295 }
296 
QueueAndRunTaskSource(TaskTracker * task_tracker,scoped_refptr<TaskSource> task_source)297 RegisteredTaskSource QueueAndRunTaskSource(
298     TaskTracker* task_tracker,
299     scoped_refptr<TaskSource> task_source) {
300   auto registered_task_source =
301       task_tracker->RegisterTaskSource(std::move(task_source));
302   EXPECT_TRUE(registered_task_source);
303   EXPECT_NE(registered_task_source.WillRunTask(),
304             TaskSource::RunStatus::kDisallowed);
305   return task_tracker->RunAndPopNextTask(std::move(registered_task_source));
306 }
307 
ShutdownTaskTracker(TaskTracker * task_tracker)308 void ShutdownTaskTracker(TaskTracker* task_tracker) {
309   task_tracker->StartShutdown();
310   task_tracker->CompleteShutdown();
311 }
312 
313 }  // namespace test
314 }  // namespace internal
315 }  // namespace base
316