1 // Copyright 2019 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/thread_pool/job_task_source_interface.h"
6
7 #include <utility>
8
9 #include "base/functional/callback_helpers.h"
10 #include "base/memory/ptr_util.h"
11 #include "base/task/post_job.h"
12 #include "base/task/task_features.h"
13 #include "base/task/thread_pool/pooled_task_runner_delegate.h"
14 #include "base/task/thread_pool/test_utils.h"
15 #include "base/test/bind.h"
16 #include "base/test/gtest_util.h"
17 #include "base/test/scoped_feature_list.h"
18 #include "base/test/test_timeouts.h"
19 #include "build/build_config.h"
20 #include "testing/gmock/include/gmock/gmock.h"
21 #include "testing/gtest/include/gtest/gtest.h"
22
23 using ::testing::_;
24 using ::testing::Return;
25
26 namespace base {
27 namespace internal {
28
29 class MockPooledTaskRunnerDelegate : public PooledTaskRunnerDelegate {
30 public:
31 MOCK_METHOD2(PostTaskWithSequence,
32 bool(Task task, scoped_refptr<Sequence> sequence));
33 MOCK_METHOD1(ShouldYield, bool(const TaskSource* task_source));
34 MOCK_METHOD1(EnqueueJobTaskSource,
35 bool(scoped_refptr<JobTaskSource> task_source));
36 MOCK_METHOD1(RemoveJobTaskSource,
37 void(scoped_refptr<JobTaskSource> task_source));
38 MOCK_CONST_METHOD1(IsRunningPoolWithTraits, bool(const TaskTraits& traits));
39 MOCK_METHOD2(UpdatePriority,
40 void(scoped_refptr<TaskSource> task_source,
41 TaskPriority priority));
42 MOCK_METHOD2(UpdateJobPriority,
43 void(scoped_refptr<TaskSource> task_source,
44 TaskPriority priority));
45 };
46
47 class ThreadPoolJobTaskSourceTest : public testing::Test,
48 public testing::WithParamInterface<bool> {
49 protected:
ThreadPoolJobTaskSourceTest()50 ThreadPoolJobTaskSourceTest() {
51 if (GetParam()) {
52 scoped_feature_list_.InitAndEnableFeature(kUseNewJobImplementation);
53 } else {
54 scoped_feature_list_.InitAndDisableFeature(kUseNewJobImplementation);
55 }
56 }
57
58 // Creates and starts a job which which runs `callback` with
59 // `initial_max_concurrency`.
60 std::pair<scoped_refptr<test::MockJobTask>, scoped_refptr<JobTaskSource>>
StartJob(size_t initial_max_concurrency,base::RepeatingCallback<void (JobDelegate *)> callback=DoNothing ())61 StartJob(size_t initial_max_concurrency,
62 base::RepeatingCallback<void(JobDelegate*)> callback = DoNothing()) {
63 auto job_task = base::MakeRefCounted<test::MockJobTask>(
64 std::move(callback), /* num_tasks_to_run */ initial_max_concurrency);
65 auto task_source = job_task->GetJobTaskSource(
66 FROM_HERE, {}, &pooled_task_runner_delegate_);
67 if (initial_max_concurrency > 0) {
68 EXPECT_CALL(pooled_task_runner_delegate_, EnqueueJobTaskSource(_));
69 }
70 task_source->NotifyConcurrencyIncrease();
71 return {job_task, task_source};
72 }
73
74 testing::StrictMock<MockPooledTaskRunnerDelegate>
75 pooled_task_runner_delegate_;
76
77 private:
78 base::test::ScopedFeatureList scoped_feature_list_;
79 };
80
81 // Verifies the normal flow of running 2 tasks one after the other.
TEST_P(ThreadPoolJobTaskSourceTest,RunTasks)82 TEST_P(ThreadPoolJobTaskSourceTest, RunTasks) {
83 auto [job_task, task_source] = StartJob(/* initial_max_concurrency=*/2);
84 auto registered_task_source =
85 RegisteredTaskSource::CreateForTesting(task_source);
86
87 EXPECT_EQ(2U, task_source->GetRemainingConcurrency());
88 {
89 EXPECT_EQ(registered_task_source.WillRunTask(),
90 TaskSource::RunStatus::kAllowedNotSaturated);
91 EXPECT_EQ(1U, task_source->GetWorkerCount());
92
93 auto task = registered_task_source.TakeTask();
94 std::move(task.task).Run();
95 EXPECT_TRUE(registered_task_source.DidProcessTask());
96 EXPECT_EQ(0U, task_source->GetWorkerCount());
97 }
98 {
99 EXPECT_EQ(registered_task_source.WillRunTask(),
100 TaskSource::RunStatus::kAllowedSaturated);
101 EXPECT_EQ(1U, task_source->GetWorkerCount());
102
103 // An attempt to run an additional task is not allowed.
104 EXPECT_EQ(RegisteredTaskSource::CreateForTesting(task_source).WillRunTask(),
105 TaskSource::RunStatus::kDisallowed);
106 EXPECT_EQ(0U, task_source->GetRemainingConcurrency());
107 auto task = registered_task_source.TakeTask();
108 EXPECT_EQ(RegisteredTaskSource::CreateForTesting(task_source).WillRunTask(),
109 TaskSource::RunStatus::kDisallowed);
110
111 std::move(task.task).Run();
112 EXPECT_EQ(0U, task_source->GetRemainingConcurrency());
113 EXPECT_TRUE(task_source->IsActive());
114 // Returns false because the task source is out of tasks.
115 EXPECT_FALSE(registered_task_source.DidProcessTask());
116 EXPECT_EQ(0U, task_source->GetWorkerCount());
117 EXPECT_FALSE(task_source->IsActive());
118 }
119 }
120
121 // Verifies that a job task source doesn't allow any new RunStatus after Clear()
122 // is called.
TEST_P(ThreadPoolJobTaskSourceTest,Clear)123 TEST_P(ThreadPoolJobTaskSourceTest, Clear) {
124 auto [job_task, task_source] = StartJob(/* initial_max_concurrency=*/5);
125
126 EXPECT_EQ(5U, task_source->GetRemainingConcurrency());
127 auto registered_task_source_a =
128 RegisteredTaskSource::CreateForTesting(task_source);
129 EXPECT_EQ(registered_task_source_a.WillRunTask(),
130 TaskSource::RunStatus::kAllowedNotSaturated);
131 auto task_a = registered_task_source_a.TakeTask();
132
133 auto registered_task_source_b =
134 RegisteredTaskSource::CreateForTesting(task_source);
135 EXPECT_EQ(registered_task_source_b.WillRunTask(),
136 TaskSource::RunStatus::kAllowedNotSaturated);
137
138 auto registered_task_source_c =
139 RegisteredTaskSource::CreateForTesting(task_source);
140 EXPECT_EQ(registered_task_source_c.WillRunTask(),
141 TaskSource::RunStatus::kAllowedNotSaturated);
142
143 auto registered_task_source_d =
144 RegisteredTaskSource::CreateForTesting(task_source);
145 EXPECT_EQ(registered_task_source_d.WillRunTask(),
146 TaskSource::RunStatus::kAllowedNotSaturated);
147
148 EXPECT_FALSE(task_source->ShouldYield());
149
150 {
151 EXPECT_EQ(1U, task_source->GetRemainingConcurrency());
152 auto task = registered_task_source_c.Clear();
153 EXPECT_FALSE(task);
154 registered_task_source_c.DidProcessTask();
155 EXPECT_EQ(0U, task_source->GetRemainingConcurrency());
156 }
157 // The task source shouldn't allow any further tasks after Clear.
158 EXPECT_TRUE(task_source->ShouldYield());
159 EXPECT_EQ(RegisteredTaskSource::CreateForTesting(task_source).WillRunTask(),
160 TaskSource::RunStatus::kDisallowed);
161
162 // Another outstanding RunStatus can still call Clear.
163 {
164 auto task = registered_task_source_d.Clear();
165 EXPECT_FALSE(task);
166 registered_task_source_d.DidProcessTask();
167 EXPECT_EQ(0U, task_source->GetRemainingConcurrency());
168 }
169
170 // A task that was already acquired can still run.
171 std::move(task_a.task).Run();
172 registered_task_source_a.DidProcessTask();
173
174 // A valid outstanding RunStatus can also take and run a task.
175 {
176 auto task = registered_task_source_b.TakeTask();
177 std::move(task.task).Run();
178 registered_task_source_b.DidProcessTask();
179 }
180 }
181
182 // Verifies that a job task source doesn't return an "allowed" RunStatus after
183 // Cancel() is called.
TEST_P(ThreadPoolJobTaskSourceTest,Cancel)184 TEST_P(ThreadPoolJobTaskSourceTest, Cancel) {
185 auto [job_task, task_source] = StartJob(/* initial_max_concurrency=*/3);
186
187 auto registered_task_source_a =
188 RegisteredTaskSource::CreateForTesting(task_source);
189 EXPECT_EQ(registered_task_source_a.WillRunTask(),
190 TaskSource::RunStatus::kAllowedNotSaturated);
191 auto task_a = registered_task_source_a.TakeTask();
192
193 auto registered_task_source_b =
194 RegisteredTaskSource::CreateForTesting(task_source);
195 EXPECT_EQ(registered_task_source_b.WillRunTask(),
196 TaskSource::RunStatus::kAllowedNotSaturated);
197
198 EXPECT_FALSE(task_source->ShouldYield());
199
200 task_source->Cancel();
201 EXPECT_TRUE(task_source->ShouldYield());
202
203 // The task source shouldn't allow any further tasks after Cancel.
204 EXPECT_EQ(RegisteredTaskSource::CreateForTesting(task_source).WillRunTask(),
205 TaskSource::RunStatus::kDisallowed);
206
207 // A task that was already acquired can still run.
208 std::move(task_a.task).Run();
209 registered_task_source_a.DidProcessTask();
210
211 // A RegisteredTaskSource that's ready can also take and run a task.
212 {
213 auto task = registered_task_source_b.TakeTask();
214 std::move(task.task).Run();
215 registered_task_source_b.DidProcessTask();
216 }
217 }
218
219 // Verifies that multiple tasks can run in parallel up to |max_concurrency|.
TEST_P(ThreadPoolJobTaskSourceTest,RunTasksInParallel)220 TEST_P(ThreadPoolJobTaskSourceTest, RunTasksInParallel) {
221 auto [job_task, task_source] = StartJob(/* initial_max_concurrency=*/2);
222
223 auto registered_task_source_a =
224 RegisteredTaskSource::CreateForTesting(task_source);
225 EXPECT_EQ(registered_task_source_a.WillRunTask(),
226 TaskSource::RunStatus::kAllowedNotSaturated);
227 EXPECT_EQ(1U, task_source->GetWorkerCount());
228 EXPECT_EQ(1U, task_source->GetSortKey().worker_count());
229 auto task_a = registered_task_source_a.TakeTask();
230
231 auto registered_task_source_b =
232 RegisteredTaskSource::CreateForTesting(task_source);
233 EXPECT_EQ(registered_task_source_b.WillRunTask(),
234 TaskSource::RunStatus::kAllowedSaturated);
235 EXPECT_EQ(2U, task_source->GetWorkerCount());
236 EXPECT_EQ(2U, task_source->GetSortKey().worker_count());
237 auto task_b = registered_task_source_b.TakeTask();
238
239 // WillRunTask() should return a null RunStatus once the max concurrency is
240 // reached.
241 EXPECT_EQ(RegisteredTaskSource::CreateForTesting(task_source).WillRunTask(),
242 TaskSource::RunStatus::kDisallowed);
243
244 std::move(task_a.task).Run();
245 EXPECT_FALSE(registered_task_source_a.DidProcessTask());
246 EXPECT_EQ(1U, task_source->GetSortKey().worker_count());
247
248 // Increasing max concurrency above the number of workers should cause the
249 // task source to re-enqueue.
250 job_task->SetNumTasksToRun(2);
251 EXPECT_CALL(pooled_task_runner_delegate_, EnqueueJobTaskSource(_));
252 task_source->NotifyConcurrencyIncrease();
253
254 std::move(task_b.task).Run();
255 EXPECT_TRUE(registered_task_source_b.DidProcessTask());
256 EXPECT_EQ(0U, task_source->GetSortKey().worker_count());
257
258 EXPECT_EQ(0U, task_source->GetWorkerCount());
259
260 auto registered_task_source_c =
261 RegisteredTaskSource::CreateForTesting(task_source);
262 EXPECT_EQ(registered_task_source_c.WillRunTask(),
263 TaskSource::RunStatus::kAllowedSaturated);
264 auto task_c = registered_task_source_c.TakeTask();
265
266 std::move(task_c.task).Run();
267 EXPECT_FALSE(registered_task_source_c.DidProcessTask());
268 }
269
270 // Verifies the normal flow of running the join task until completion.
TEST_P(ThreadPoolJobTaskSourceTest,RunJoinTask)271 TEST_P(ThreadPoolJobTaskSourceTest, RunJoinTask) {
272 auto job_task = base::MakeRefCounted<test::MockJobTask>(
273 DoNothing(), /* num_tasks_to_run */ 2);
274 scoped_refptr<JobTaskSource> task_source =
275 job_task->GetJobTaskSource(FROM_HERE, {}, &pooled_task_runner_delegate_);
276
277 EXPECT_TRUE(task_source->WillJoin());
278 // Intentionally run |worker_task| twice to make sure RunJoinTask() calls
279 // it again. This can happen in production if the joining thread spuriously
280 // return and needs to run again.
281 EXPECT_TRUE(task_source->RunJoinTask());
282 EXPECT_FALSE(task_source->RunJoinTask());
283 }
284
285 // Verify that |worker_count| excludes the (inactive) returning thread calling
286 // max_concurrency_callback.
TEST_P(ThreadPoolJobTaskSourceTest,RunTaskWorkerCount)287 TEST_P(ThreadPoolJobTaskSourceTest, RunTaskWorkerCount) {
288 size_t max_concurrency = 1;
289 scoped_refptr<JobTaskSource> task_source = internal::CreateJobTaskSource(
290 FROM_HERE, TaskTraits(),
291 BindLambdaForTesting([&](JobDelegate* delegate) { --max_concurrency; }),
292 BindLambdaForTesting([&](size_t worker_count) -> size_t {
293 return max_concurrency + worker_count;
294 }),
295 &pooled_task_runner_delegate_);
296 EXPECT_CALL(pooled_task_runner_delegate_, EnqueueJobTaskSource(_));
297 task_source->NotifyConcurrencyIncrease();
298
299 auto registered_task_source =
300 RegisteredTaskSource::CreateForTesting(task_source);
301
302 EXPECT_EQ(registered_task_source.WillRunTask(),
303 TaskSource::RunStatus::kAllowedSaturated);
304 auto task = registered_task_source.TakeTask();
305 std::move(task.task).Run();
306 // Once the worker_task runs, |worker_count| should drop to 0 and the job
307 // should finish.
308 EXPECT_FALSE(registered_task_source.DidProcessTask());
309 EXPECT_EQ(0U, max_concurrency);
310 }
311
312 // Verify that |worker_count| excludes the (inactive) joining thread calling
313 // max_concurrency_callback.
TEST_P(ThreadPoolJobTaskSourceTest,RunJoinTaskWorkerCount)314 TEST_P(ThreadPoolJobTaskSourceTest, RunJoinTaskWorkerCount) {
315 size_t max_concurrency = 1;
316 scoped_refptr<JobTaskSource> task_source = internal::CreateJobTaskSource(
317 FROM_HERE, TaskTraits(),
318 BindLambdaForTesting([&](JobDelegate* delegate) { --max_concurrency; }),
319 BindLambdaForTesting([&](size_t worker_count) -> size_t {
320 return max_concurrency + worker_count;
321 }),
322 &pooled_task_runner_delegate_);
323
324 EXPECT_TRUE(task_source->WillJoin());
325 // Once the worker_task runs, |worker_count| should drop to 0 and the job
326 // should finish.
327 EXPECT_FALSE(task_source->RunJoinTask());
328 EXPECT_EQ(0U, max_concurrency);
329 }
330
331 // Verifies that WillJoin() doesn't allow a joining thread to contribute
332 // after Cancel() is called.
TEST_P(ThreadPoolJobTaskSourceTest,CancelJoinTask)333 TEST_P(ThreadPoolJobTaskSourceTest, CancelJoinTask) {
334 auto job_task = base::MakeRefCounted<test::MockJobTask>(
335 DoNothing(), /* num_tasks_to_run */ 2);
336 scoped_refptr<JobTaskSource> task_source =
337 job_task->GetJobTaskSource(FROM_HERE, {}, &pooled_task_runner_delegate_);
338
339 task_source->Cancel();
340 EXPECT_FALSE(task_source->WillJoin());
341 }
342
343 // Verifies that RunJoinTask() doesn't allow a joining thread to contribute
344 // after Cancel() is called.
TEST_P(ThreadPoolJobTaskSourceTest,JoinCancelTask)345 TEST_P(ThreadPoolJobTaskSourceTest, JoinCancelTask) {
346 auto job_task = base::MakeRefCounted<test::MockJobTask>(
347 DoNothing(), /* num_tasks_to_run */ 2);
348 scoped_refptr<JobTaskSource> task_source =
349 job_task->GetJobTaskSource(FROM_HERE, {}, &pooled_task_runner_delegate_);
350
351 EXPECT_TRUE(task_source->WillJoin());
352 task_source->Cancel();
353 EXPECT_FALSE(task_source->RunJoinTask());
354 }
355
356 // Verifies that the join task can run in parallel with worker tasks up to
357 // |max_concurrency|.
TEST_P(ThreadPoolJobTaskSourceTest,RunJoinTaskInParallel)358 TEST_P(ThreadPoolJobTaskSourceTest, RunJoinTaskInParallel) {
359 auto [job_task, task_source] = StartJob(/* initial_max_concurrency=*/2);
360
361 auto registered_task_source =
362 RegisteredTaskSource::CreateForTesting(task_source);
363 EXPECT_EQ(registered_task_source.WillRunTask(),
364 TaskSource::RunStatus::kAllowedNotSaturated);
365 auto worker_task = registered_task_source.TakeTask();
366
367 EXPECT_TRUE(task_source->WillJoin());
368 EXPECT_TRUE(task_source->IsActive());
369
370 std::move(worker_task.task).Run();
371 EXPECT_FALSE(registered_task_source.DidProcessTask());
372
373 EXPECT_FALSE(task_source->RunJoinTask());
374 EXPECT_FALSE(task_source->IsActive());
375 }
376
377 // Verifies that a call to NotifyConcurrencyIncrease() calls the delegate
378 // and allows to run additional tasks.
TEST_P(ThreadPoolJobTaskSourceTest,NotifyConcurrencyIncrease)379 TEST_P(ThreadPoolJobTaskSourceTest, NotifyConcurrencyIncrease) {
380 auto [job_task, task_source] = StartJob(/* initial_max_concurrency=*/1);
381
382 auto registered_task_source_a =
383 RegisteredTaskSource::CreateForTesting(task_source);
384 EXPECT_EQ(registered_task_source_a.WillRunTask(),
385 TaskSource::RunStatus::kAllowedSaturated);
386 auto task_a = registered_task_source_a.TakeTask();
387 EXPECT_EQ(RegisteredTaskSource::CreateForTesting(task_source).WillRunTask(),
388 TaskSource::RunStatus::kDisallowed);
389
390 job_task->SetNumTasksToRun(2);
391 EXPECT_CALL(pooled_task_runner_delegate_, EnqueueJobTaskSource(_)).Times(1);
392 task_source->NotifyConcurrencyIncrease();
393
394 auto registered_task_source_b =
395 RegisteredTaskSource::CreateForTesting(task_source);
396 // WillRunTask() should return a valid RunStatus because max concurrency was
397 // increased to 2.
398 EXPECT_EQ(registered_task_source_b.WillRunTask(),
399 TaskSource::RunStatus::kAllowedSaturated);
400 auto task_b = registered_task_source_b.TakeTask();
401 EXPECT_EQ(RegisteredTaskSource::CreateForTesting(task_source).WillRunTask(),
402 TaskSource::RunStatus::kDisallowed);
403
404 std::move(task_a.task).Run();
405 EXPECT_FALSE(registered_task_source_a.DidProcessTask());
406
407 std::move(task_b.task).Run();
408 EXPECT_FALSE(registered_task_source_b.DidProcessTask());
409 }
410
411 // Verifies that ShouldYield() calls the delegate.
TEST_P(ThreadPoolJobTaskSourceTest,ShouldYield)412 TEST_P(ThreadPoolJobTaskSourceTest, ShouldYield) {
413 auto [job_task, task_source] = StartJob(
414 /*initial_max_concurrency=*/1,
415 BindLambdaForTesting([](JobDelegate* delegate) {
416 // As set up below, the mock will return false once and true the second
417 // time.
418 EXPECT_FALSE(delegate->ShouldYield());
419 EXPECT_TRUE(delegate->ShouldYield());
420 }));
421
422 auto registered_task_source =
423 RegisteredTaskSource::CreateForTesting(task_source);
424 ASSERT_EQ(registered_task_source.WillRunTask(),
425 TaskSource::RunStatus::kAllowedSaturated);
426
427 auto task = registered_task_source.TakeTask();
428
429 EXPECT_CALL(pooled_task_runner_delegate_, ShouldYield(_))
430 .Times(2)
431 .WillOnce(Return(false))
432 .WillOnce(Return(true));
433
434 std::move(task.task).Run();
435 EXPECT_FALSE(registered_task_source.DidProcessTask());
436 }
437
438 // Verifies that max concurrency is allowed to stagnate when ShouldYield returns
439 // true.
TEST_P(ThreadPoolJobTaskSourceTest,MaxConcurrencyStagnateIfShouldYield)440 TEST_P(ThreadPoolJobTaskSourceTest, MaxConcurrencyStagnateIfShouldYield) {
441 scoped_refptr<JobTaskSource> task_source = internal::CreateJobTaskSource(
442 FROM_HERE, TaskTraits(), BindRepeating([](JobDelegate* delegate) {
443 // As set up below, the mock will return true once.
444 ASSERT_TRUE(delegate->ShouldYield());
445 }),
446 BindRepeating([](size_t /*worker_count*/) -> size_t {
447 return 1; // max concurrency is always 1.
448 }),
449 &pooled_task_runner_delegate_);
450 EXPECT_CALL(pooled_task_runner_delegate_, EnqueueJobTaskSource(_));
451 task_source->NotifyConcurrencyIncrease();
452
453 EXPECT_CALL(pooled_task_runner_delegate_, ShouldYield(_))
454 .WillOnce(Return(true));
455
456 auto registered_task_source =
457 RegisteredTaskSource::CreateForTesting(task_source);
458 ASSERT_EQ(registered_task_source.WillRunTask(),
459 TaskSource::RunStatus::kAllowedSaturated);
460 auto task = registered_task_source.TakeTask();
461
462 // Running the task should not fail even though max concurrency remained at 1,
463 // since ShouldYield() returned true.
464 std::move(task.task).Run();
465 registered_task_source.DidProcessTask();
466 }
467
TEST_P(ThreadPoolJobTaskSourceTest,InvalidTakeTask)468 TEST_P(ThreadPoolJobTaskSourceTest, InvalidTakeTask) {
469 auto [job_task, task_source] = StartJob(/* initial_max_concurrency=*/1);
470
471 auto registered_task_source_a =
472 RegisteredTaskSource::CreateForTesting(task_source);
473 EXPECT_EQ(registered_task_source_a.WillRunTask(),
474 TaskSource::RunStatus::kAllowedSaturated);
475
476 auto registered_task_source_b =
477 RegisteredTaskSource::CreateForTesting(task_source);
478 EXPECT_EQ(registered_task_source_b.WillRunTask(),
479 TaskSource::RunStatus::kDisallowed);
480
481 // Can not be called with an invalid RunStatus.
482 EXPECT_DCHECK_DEATH({ auto task = registered_task_source_b.TakeTask(); });
483
484 auto task = registered_task_source_a.TakeTask();
485 registered_task_source_a.DidProcessTask();
486 }
487
TEST_P(ThreadPoolJobTaskSourceTest,InvalidDidProcessTask)488 TEST_P(ThreadPoolJobTaskSourceTest, InvalidDidProcessTask) {
489 auto job_task =
490 base::MakeRefCounted<test::MockJobTask>(DoNothing(),
491 /* num_tasks_to_run */ 1);
492 scoped_refptr<JobTaskSource> task_source =
493 job_task->GetJobTaskSource(FROM_HERE, {}, &pooled_task_runner_delegate_);
494
495 auto registered_task_source =
496 RegisteredTaskSource::CreateForTesting(task_source);
497
498 // Can not be called before WillRunTask().
499 EXPECT_DCHECK_DEATH(registered_task_source.DidProcessTask());
500 }
501
TEST_P(ThreadPoolJobTaskSourceTest,AcquireTaskId)502 TEST_P(ThreadPoolJobTaskSourceTest, AcquireTaskId) {
503 auto job_task =
504 base::MakeRefCounted<test::MockJobTask>(DoNothing(),
505 /* num_tasks_to_run */ 4);
506 scoped_refptr<JobTaskSource> task_source =
507 job_task->GetJobTaskSource(FROM_HERE, {}, &pooled_task_runner_delegate_);
508
509 EXPECT_EQ(0U, task_source->AcquireTaskId());
510 EXPECT_EQ(1U, task_source->AcquireTaskId());
511 EXPECT_EQ(2U, task_source->AcquireTaskId());
512 EXPECT_EQ(3U, task_source->AcquireTaskId());
513 EXPECT_EQ(4U, task_source->AcquireTaskId());
514 task_source->ReleaseTaskId(1);
515 task_source->ReleaseTaskId(3);
516 EXPECT_EQ(1U, task_source->AcquireTaskId());
517 EXPECT_EQ(3U, task_source->AcquireTaskId());
518 EXPECT_EQ(5U, task_source->AcquireTaskId());
519 }
520
521 // Verifies that task id is released after worker_task returns.
TEST_P(ThreadPoolJobTaskSourceTest,GetTaskId)522 TEST_P(ThreadPoolJobTaskSourceTest, GetTaskId) {
523 auto [job_task, task_source] = StartJob(
524 /* initial_max_concurrency=*/2, BindRepeating([](JobDelegate* delegate) {
525 // Confirm that task id 0 is reused on the second run.
526 EXPECT_EQ(0U, delegate->GetTaskId());
527 }));
528
529 auto registered_task_source =
530 RegisteredTaskSource::CreateForTesting(task_source);
531
532 // Run the worker_task twice.
533 ASSERT_EQ(registered_task_source.WillRunTask(),
534 TaskSource::RunStatus::kAllowedNotSaturated);
535 auto task1 = registered_task_source.TakeTask();
536 std::move(task1.task).Run();
537 registered_task_source.DidProcessTask();
538
539 ASSERT_EQ(registered_task_source.WillRunTask(),
540 TaskSource::RunStatus::kAllowedSaturated);
541 auto task2 = registered_task_source.TakeTask();
542 std::move(task2.task).Run();
543 registered_task_source.DidProcessTask();
544 }
545
546 INSTANTIATE_TEST_SUITE_P(,
547 ThreadPoolJobTaskSourceTest,
548 testing::Bool(),
__anon7a7c267b0902(const testing::TestParamInfo<bool>& info) 549 [](const testing::TestParamInfo<bool>& info) {
550 if (info.param) {
551 return "NewJob";
552 }
553 return "OldJob";
554 });
555
556 } // namespace internal
557 } // namespace base
558