1 // Copyright 2020 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stddef.h>
6 #include <atomic>
7 #include <utility>
8 #include <vector>
9
10 #include "base/containers/queue.h"
11 #include "base/containers/stack.h"
12 #include "base/functional/callback_helpers.h"
13 #include "base/synchronization/lock.h"
14 #include "base/task/post_job.h"
15 #include "base/task/thread_pool.h"
16 #include "base/test/bind.h"
17 #include "base/test/task_environment.h"
18 #include "testing/gtest/include/gtest/gtest.h"
19 #include "testing/perf/perf_result_reporter.h"
20 #include "third_party/abseil-cpp/absl/types/optional.h"
21
22 namespace base {
23
24 namespace {
25
26 // The perftest implements the following assignment strategy:
27 // - Naive: See RunJobWithNaiveAssignment().
28 // - Dynamic: See RunJobWithDynamicAssignment().
29 // - Loop around: See RunJobWithLoopAround().
30 // The following test setups exists for different strategies, although
31 // not every combination is performed:
32 // - No-op: Work items are no-op tasks.
33 // - No-op + disrupted: 10 disruptive tasks are posted every 1ms.
34 // - Busy wait: Work items are busy wait for 5us.
35 // - Busy wait + disrupted
36
37 constexpr char kMetricPrefixJob[] = "Job.";
38 constexpr char kMetricWorkThroughput[] = "work_throughput";
39 constexpr char kStoryNoOpNaive[] = "noop_naive";
40 constexpr char kStoryBusyWaitNaive[] = "busy_wait_naive";
41 constexpr char kStoryNoOpAtomic[] = "noop_atomic";
42 constexpr char kStoryNoOpAtomicDisrupted[] = "noop_atomic_disrupted";
43 constexpr char kStoryBusyWaitAtomic[] = "busy_wait_atomic";
44 constexpr char kStoryBusyWaitAtomicDisrupted[] = "busy_wait_atomic_disrupted";
45 constexpr char kStoryNoOpDynamic[] = "noop_dynamic";
46 constexpr char kStoryNoOpDynamicDisrupted[] = "noop_dynamic_disrupted";
47 constexpr char kStoryBusyWaitDynamic[] = "busy_wait_dynamic";
48 constexpr char kStoryBusyWaitDynamicDisrupted[] = "busy_wait_dynamic_disrupted";
49 constexpr char kStoryNoOpLoopAround[] = "noop_loop_around";
50 constexpr char kStoryNoOpLoopAroundDisrupted[] = "noop_loop_around_disrupted";
51 constexpr char kStoryBusyWaitLoopAround[] = "busy_wait_loop_around";
52 constexpr char kStoryBusyWaitLoopAroundDisrupted[] =
53 "busy_wait_loop_around_disrupted";
54
SetUpReporter(const std::string & story_name)55 perf_test::PerfResultReporter SetUpReporter(const std::string& story_name) {
56 perf_test::PerfResultReporter reporter(kMetricPrefixJob, story_name);
57 reporter.RegisterImportantMetric(kMetricWorkThroughput, "tasks/ms");
58 return reporter;
59 }
60
61 // A thread-safe data structure that generates heuristic starting points in a
62 // range to process items in parallel.
63 // Note: we could expose this atomic-binary-search-index-generator in
64 // //base/util if it's useful for real-world use cases.
65 class IndexGenerator {
66 public:
IndexGenerator(size_t size)67 explicit IndexGenerator(size_t size) : size_(size) {
68 AutoLock auto_lock(lock_);
69 pending_indices_.push(0);
70 ranges_to_split_.push({0, size_});
71 }
72
73 IndexGenerator(const IndexGenerator&) = delete;
74 IndexGenerator& operator=(const IndexGenerator&) = delete;
75
GetNext()76 absl::optional<size_t> GetNext() {
77 AutoLock auto_lock(lock_);
78 if (!pending_indices_.empty()) {
79 // Return any pending index first.
80 auto index = pending_indices_.top();
81 pending_indices_.pop();
82 return index;
83 }
84 if (ranges_to_split_.empty())
85 return absl::nullopt;
86
87 // Split the oldest running range in 2 and return the middle index as
88 // starting point.
89 auto range = ranges_to_split_.front();
90 ranges_to_split_.pop();
91 size_t size = range.second - range.first;
92 size_t mid = range.first + size / 2;
93 // Both sides of the range are added to |ranges_to_split_| so they may be
94 // further split if possible.
95 if (mid - range.first > 1)
96 ranges_to_split_.push({range.first, mid});
97 if (range.second - mid > 1)
98 ranges_to_split_.push({mid, range.second});
99 return mid;
100 }
101
GiveBack(size_t index)102 void GiveBack(size_t index) {
103 AutoLock auto_lock(lock_);
104 // Add |index| to pending indices so GetNext() may return it before anything
105 // else.
106 pending_indices_.push(index);
107 }
108
109 private:
110 base::Lock lock_;
111 // Pending indices that are ready to be handed out, prioritized over
112 // |pending_ranges_| when non-empty.
113 base::stack<size_t> pending_indices_ GUARDED_BY(lock_);
114 // Pending [start, end] (exclusive) ranges to split and hand out indices from.
115 base::queue<std::pair<size_t, size_t>> ranges_to_split_ GUARDED_BY(lock_);
116 const size_t size_;
117 };
118
119 struct WorkItem {
120 std::atomic_bool acquire{false};
121
TryAcquirebase::__anon51de6ee60111::WorkItem122 bool TryAcquire() {
123 // memory_order_relaxed is sufficient as the WorkItem's state itself hasn't
124 // been modified since the beginning of its associated job. This is only
125 // atomically acquiring the right to work on it.
126 return acquire.exchange(true, std::memory_order_relaxed) == false;
127 }
128 };
129
130 class WorkList {
131 public:
WorkList(size_t num_work_items,RepeatingCallback<void (size_t)> process_item)132 WorkList(size_t num_work_items, RepeatingCallback<void(size_t)> process_item)
133 : num_incomplete_items_(num_work_items),
134 items_(num_work_items),
135 process_item_(std::move(process_item)) {}
136
137 WorkList(const WorkList&) = delete;
138 WorkList& operator=(const WorkList&) = delete;
139
140 // Acquires work item at |index|. Returns true if successful, or false if the
141 // item was already acquired.
TryAcquire(size_t index)142 bool TryAcquire(size_t index) { return items_[index].TryAcquire(); }
143
144 // Processes work item at |index|. Returns true if there are more work items
145 // to process, or false if all items were processed.
ProcessWorkItem(size_t index)146 bool ProcessWorkItem(size_t index) {
147 process_item_.Run(index);
148 return num_incomplete_items_.fetch_sub(1, std::memory_order_relaxed) > 1;
149 }
150
NumIncompleteWorkItems(size_t) const151 size_t NumIncompleteWorkItems(size_t /*worker_count*/) const {
152 // memory_order_relaxed is sufficient since this is not synchronized with
153 // other state.
154 return num_incomplete_items_.load(std::memory_order_relaxed);
155 }
156
NumWorkItems() const157 size_t NumWorkItems() const { return items_.size(); }
158
159 private:
160 std::atomic_size_t num_incomplete_items_;
161 std::vector<WorkItem> items_;
162 RepeatingCallback<void(size_t)> process_item_;
163 };
164
BusyWaitCallback(TimeDelta delta)165 RepeatingCallback<void(size_t)> BusyWaitCallback(TimeDelta delta) {
166 return base::BindRepeating(
167 [](base::TimeDelta duration, size_t index) {
168 const base::TimeTicks end_time = base::TimeTicks::Now() + duration;
169 while (base::TimeTicks::Now() < end_time)
170 ;
171 },
172 delta);
173 }
174
175 // Posts |task_count| no-op tasks every |delay|.
DisruptivePostTasks(size_t task_count,TimeDelta delay)176 void DisruptivePostTasks(size_t task_count, TimeDelta delay) {
177 for (size_t i = 0; i < task_count; ++i) {
178 ThreadPool::PostTask(FROM_HERE, {TaskPriority::USER_BLOCKING}, DoNothing());
179 }
180 ThreadPool::PostDelayedTask(FROM_HERE, {TaskPriority::USER_BLOCKING},
181 BindOnce(&DisruptivePostTasks, task_count, delay),
182 delay);
183 }
184
185 class JobPerfTest : public testing::Test {
186 public:
187 JobPerfTest() = default;
188
189 JobPerfTest(const JobPerfTest&) = delete;
190 JobPerfTest& operator=(const JobPerfTest&) = delete;
191
192 // Process |num_work_items| items with |process_item| in parallel. Work is
193 // assigned by having each worker sequentially traversing all items and
194 // acquiring unvisited ones.
RunJobWithNaiveAssignment(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item)195 void RunJobWithNaiveAssignment(const std::string& story_name,
196 size_t num_work_items,
197 RepeatingCallback<void(size_t)> process_item) {
198 WorkList work_list(num_work_items, std::move(process_item));
199
200 const TimeTicks job_run_start = TimeTicks::Now();
201
202 WaitableEvent complete;
203 auto handle = PostJob(
204 FROM_HERE, {TaskPriority::USER_VISIBLE},
205 BindRepeating(
206 [](WorkList* work_list, WaitableEvent* complete,
207 JobDelegate* delegate) {
208 for (size_t i = 0; i < work_list->NumWorkItems() &&
209 work_list->NumIncompleteWorkItems(0) != 0 &&
210 !delegate->ShouldYield();
211 ++i) {
212 if (!work_list->TryAcquire(i))
213 continue;
214 if (!work_list->ProcessWorkItem(i)) {
215 complete->Signal();
216 return;
217 }
218 }
219 },
220 Unretained(&work_list), Unretained(&complete)),
221 BindRepeating(&WorkList::NumIncompleteWorkItems,
222 Unretained(&work_list)));
223
224 complete.Wait();
225 handle.Join();
226 const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
227 EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
228
229 auto reporter = SetUpReporter(story_name);
230 reporter.AddResult(kMetricWorkThroughput,
231 size_t(num_work_items / job_duration.InMilliseconds()));
232 }
233
234 // Process |num_work_items| items with |process_item| in parallel. Work is
235 // assigned by having each worker sequentially traversing all items
236 // synchronized with an atomic variable.
RunJobWithAtomicAssignment(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item,bool disruptive_post_tasks=false)237 void RunJobWithAtomicAssignment(const std::string& story_name,
238 size_t num_work_items,
239 RepeatingCallback<void(size_t)> process_item,
240 bool disruptive_post_tasks = false) {
241 WorkList work_list(num_work_items, std::move(process_item));
242 std::atomic_size_t index{0};
243
244 // Post extra tasks to disrupt Job execution and cause workers to yield.
245 if (disruptive_post_tasks)
246 DisruptivePostTasks(10, Milliseconds(1));
247
248 const TimeTicks job_run_start = TimeTicks::Now();
249
250 WaitableEvent complete;
251 auto handle = PostJob(
252 FROM_HERE, {TaskPriority::USER_VISIBLE},
253 BindRepeating(
254 [](WorkList* work_list, WaitableEvent* complete,
255 std::atomic_size_t* index, JobDelegate* delegate) {
256 while (!delegate->ShouldYield()) {
257 const size_t i = index->fetch_add(1, std::memory_order_relaxed);
258 if (i >= work_list->NumWorkItems() ||
259 !work_list->ProcessWorkItem(i)) {
260 complete->Signal();
261 return;
262 }
263 }
264 },
265 Unretained(&work_list), Unretained(&complete), Unretained(&index)),
266 BindRepeating(&WorkList::NumIncompleteWorkItems,
267 Unretained(&work_list)));
268
269 complete.Wait();
270 handle.Join();
271 const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
272 EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
273
274 auto reporter = SetUpReporter(story_name);
275 reporter.AddResult(kMetricWorkThroughput,
276 size_t(num_work_items / job_duration.InMilliseconds()));
277 }
278
279 // Process |num_work_items| items with |process_item| in parallel. Work is
280 // assigned dynamically having each new worker given a different point far
281 // from other workers until all work is done. This is achieved by recursively
282 // splitting each range that was previously given in half.
RunJobWithDynamicAssignment(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item,bool disruptive_post_tasks=false)283 void RunJobWithDynamicAssignment(const std::string& story_name,
284 size_t num_work_items,
285 RepeatingCallback<void(size_t)> process_item,
286 bool disruptive_post_tasks = false) {
287 WorkList work_list(num_work_items, std::move(process_item));
288 IndexGenerator generator(num_work_items);
289
290 // Post extra tasks to disrupt Job execution and cause workers to yield.
291 if (disruptive_post_tasks)
292 DisruptivePostTasks(10, Milliseconds(1));
293
294 const TimeTicks job_run_start = TimeTicks::Now();
295
296 WaitableEvent complete;
297 auto handle = PostJob(
298 FROM_HERE, {TaskPriority::USER_VISIBLE},
299 BindRepeating(
300 [](IndexGenerator* generator, WorkList* work_list,
301 WaitableEvent* complete, JobDelegate* delegate) {
302 while (work_list->NumIncompleteWorkItems(0) != 0 &&
303 !delegate->ShouldYield()) {
304 absl::optional<size_t> index = generator->GetNext();
305 if (!index)
306 return;
307 for (size_t i = *index; i < work_list->NumWorkItems(); ++i) {
308 if (delegate->ShouldYield()) {
309 generator->GiveBack(i);
310 return;
311 }
312 if (!work_list->TryAcquire(i)) {
313 // If this was touched already, get a new starting point.
314 break;
315 }
316 if (!work_list->ProcessWorkItem(i)) {
317 complete->Signal();
318 return;
319 }
320 }
321 }
322 },
323 Unretained(&generator), Unretained(&work_list),
324 Unretained(&complete)),
325 BindRepeating(&WorkList::NumIncompleteWorkItems,
326 Unretained(&work_list)));
327
328 complete.Wait();
329 handle.Join();
330 const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
331 EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
332
333 auto reporter = SetUpReporter(story_name);
334 reporter.AddResult(kMetricWorkThroughput,
335 size_t(num_work_items / job_duration.InMilliseconds()));
336 }
337
338 // Process |num_work_items| items with |process_item| in parallel. Work is
339 // assigned having each new worker given a different starting point far from
340 // other workers and loop over all work items from there. This is achieved by
341 // recursively splitting each range that was previously given in half.
RunJobWithLoopAround(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item,bool disruptive_post_tasks=false)342 void RunJobWithLoopAround(const std::string& story_name,
343 size_t num_work_items,
344 RepeatingCallback<void(size_t)> process_item,
345 bool disruptive_post_tasks = false) {
346 WorkList work_list(num_work_items, std::move(process_item));
347 IndexGenerator generator(num_work_items);
348
349 // Post extra tasks to disrupt Job execution and cause workers to yield.
350 if (disruptive_post_tasks)
351 DisruptivePostTasks(10, Milliseconds(1));
352
353 const TimeTicks job_run_start = TimeTicks::Now();
354
355 WaitableEvent complete;
356 auto handle =
357 PostJob(FROM_HERE, {TaskPriority::USER_VISIBLE},
358 BindRepeating(
359 [](IndexGenerator* generator, WorkList* work_list,
360 WaitableEvent* complete, JobDelegate* delegate) {
361 absl::optional<size_t> index = generator->GetNext();
362 if (!index)
363 return;
364 size_t i = *index;
365 while (true) {
366 if (delegate->ShouldYield()) {
367 generator->GiveBack(i);
368 return;
369 }
370 if (!work_list->TryAcquire(i)) {
371 // If this was touched already, skip.
372 continue;
373 }
374 if (!work_list->ProcessWorkItem(i)) {
375 // This will cause the loop to exit if there's no work
376 // left.
377 complete->Signal();
378 return;
379 }
380 ++i;
381 if (i == work_list->NumWorkItems())
382 i = 0;
383 }
384 },
385 Unretained(&generator), Unretained(&work_list),
386 Unretained(&complete)),
387 BindRepeating(&WorkList::NumIncompleteWorkItems,
388 Unretained(&work_list)));
389
390 complete.Wait();
391 handle.Join();
392 const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
393 EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
394
395 auto reporter = SetUpReporter(story_name);
396 reporter.AddResult(kMetricWorkThroughput,
397 size_t(num_work_items / job_duration.InMilliseconds()));
398 }
399
400 private:
401 test::TaskEnvironment task_environment;
402 };
403
404 } // namespace
405
TEST_F(JobPerfTest,NoOpWorkNaiveAssignment)406 TEST_F(JobPerfTest, NoOpWorkNaiveAssignment) {
407 RunJobWithNaiveAssignment(kStoryNoOpNaive, 10000000, DoNothing());
408 }
409
TEST_F(JobPerfTest,BusyWaitNaiveAssignment)410 TEST_F(JobPerfTest, BusyWaitNaiveAssignment) {
411 RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
412 RunJobWithNaiveAssignment(kStoryBusyWaitNaive, 500000, std::move(callback));
413 }
414
TEST_F(JobPerfTest,NoOpWorkAtomicAssignment)415 TEST_F(JobPerfTest, NoOpWorkAtomicAssignment) {
416 RunJobWithAtomicAssignment(kStoryNoOpAtomic, 10000000, DoNothing());
417 }
418
TEST_F(JobPerfTest,NoOpDisruptedWorkAtomicAssignment)419 TEST_F(JobPerfTest, NoOpDisruptedWorkAtomicAssignment) {
420 RunJobWithAtomicAssignment(kStoryNoOpAtomicDisrupted, 10000000, DoNothing(),
421 true);
422 }
423
TEST_F(JobPerfTest,BusyWaitAtomicAssignment)424 TEST_F(JobPerfTest, BusyWaitAtomicAssignment) {
425 RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
426 RunJobWithAtomicAssignment(kStoryBusyWaitAtomic, 500000, std::move(callback));
427 }
428
TEST_F(JobPerfTest,BusyWaitDisruptedWorkAtomicAssignment)429 TEST_F(JobPerfTest, BusyWaitDisruptedWorkAtomicAssignment) {
430 RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
431 RunJobWithAtomicAssignment(kStoryBusyWaitAtomicDisrupted, 500000,
432 std::move(callback), true);
433 }
434
TEST_F(JobPerfTest,NoOpWorkDynamicAssignment)435 TEST_F(JobPerfTest, NoOpWorkDynamicAssignment) {
436 RunJobWithDynamicAssignment(kStoryNoOpDynamic, 10000000, DoNothing());
437 }
438
TEST_F(JobPerfTest,NoOpDisruptedWorkDynamicAssignment)439 TEST_F(JobPerfTest, NoOpDisruptedWorkDynamicAssignment) {
440 RunJobWithDynamicAssignment(kStoryNoOpDynamicDisrupted, 10000000, DoNothing(),
441 true);
442 }
443
TEST_F(JobPerfTest,BusyWaitWorkDynamicAssignment)444 TEST_F(JobPerfTest, BusyWaitWorkDynamicAssignment) {
445 RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
446 RunJobWithDynamicAssignment(kStoryBusyWaitDynamic, 500000,
447 std::move(callback));
448 }
449
TEST_F(JobPerfTest,BusyWaitDisruptedWorkDynamicAssignment)450 TEST_F(JobPerfTest, BusyWaitDisruptedWorkDynamicAssignment) {
451 RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
452 RunJobWithDynamicAssignment(kStoryBusyWaitDynamicDisrupted, 500000,
453 std::move(callback), true);
454 }
455
TEST_F(JobPerfTest,NoOpWorkLoopAround)456 TEST_F(JobPerfTest, NoOpWorkLoopAround) {
457 RunJobWithLoopAround(kStoryNoOpLoopAround, 10000000, DoNothing());
458 }
459
TEST_F(JobPerfTest,NoOpDisruptedWorkLoopAround)460 TEST_F(JobPerfTest, NoOpDisruptedWorkLoopAround) {
461 RunJobWithLoopAround(kStoryNoOpLoopAroundDisrupted, 10000000, DoNothing(),
462 true);
463 }
464
TEST_F(JobPerfTest,BusyWaitWorkLoopAround)465 TEST_F(JobPerfTest, BusyWaitWorkLoopAround) {
466 RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
467 RunJobWithLoopAround(kStoryBusyWaitLoopAround, 500000, std::move(callback));
468 }
469
TEST_F(JobPerfTest,BusyWaitDisruptedWorkLoopAround)470 TEST_F(JobPerfTest, BusyWaitDisruptedWorkLoopAround) {
471 RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
472 RunJobWithLoopAround(kStoryBusyWaitLoopAroundDisrupted, 500000,
473 std::move(callback), true);
474 }
475
476 } // namespace base
477