• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <stddef.h>
6 #include <atomic>
7 #include <utility>
8 #include <vector>
9 
10 #include "base/containers/queue.h"
11 #include "base/containers/stack.h"
12 #include "base/functional/callback_helpers.h"
13 #include "base/synchronization/lock.h"
14 #include "base/task/post_job.h"
15 #include "base/task/thread_pool.h"
16 #include "base/test/bind.h"
17 #include "base/test/task_environment.h"
18 #include "testing/gtest/include/gtest/gtest.h"
19 #include "testing/perf/perf_result_reporter.h"
20 #include "third_party/abseil-cpp/absl/types/optional.h"
21 
22 namespace base {
23 
24 namespace {
25 
26 // The perftest implements the following assignment strategy:
27 // - Naive: See RunJobWithNaiveAssignment().
28 // - Dynamic: See RunJobWithDynamicAssignment().
29 // - Loop around: See RunJobWithLoopAround().
30 // The following test setups exists for different strategies, although
31 // not every combination is performed:
32 // - No-op: Work items are no-op tasks.
33 // - No-op + disrupted: 10 disruptive tasks are posted every 1ms.
34 // - Busy wait: Work items are busy wait for 5us.
35 // - Busy wait + disrupted
36 
37 constexpr char kMetricPrefixJob[] = "Job.";
38 constexpr char kMetricWorkThroughput[] = "work_throughput";
39 constexpr char kStoryNoOpNaive[] = "noop_naive";
40 constexpr char kStoryBusyWaitNaive[] = "busy_wait_naive";
41 constexpr char kStoryNoOpAtomic[] = "noop_atomic";
42 constexpr char kStoryNoOpAtomicDisrupted[] = "noop_atomic_disrupted";
43 constexpr char kStoryBusyWaitAtomic[] = "busy_wait_atomic";
44 constexpr char kStoryBusyWaitAtomicDisrupted[] = "busy_wait_atomic_disrupted";
45 constexpr char kStoryNoOpDynamic[] = "noop_dynamic";
46 constexpr char kStoryNoOpDynamicDisrupted[] = "noop_dynamic_disrupted";
47 constexpr char kStoryBusyWaitDynamic[] = "busy_wait_dynamic";
48 constexpr char kStoryBusyWaitDynamicDisrupted[] = "busy_wait_dynamic_disrupted";
49 constexpr char kStoryNoOpLoopAround[] = "noop_loop_around";
50 constexpr char kStoryNoOpLoopAroundDisrupted[] = "noop_loop_around_disrupted";
51 constexpr char kStoryBusyWaitLoopAround[] = "busy_wait_loop_around";
52 constexpr char kStoryBusyWaitLoopAroundDisrupted[] =
53     "busy_wait_loop_around_disrupted";
54 
SetUpReporter(const std::string & story_name)55 perf_test::PerfResultReporter SetUpReporter(const std::string& story_name) {
56   perf_test::PerfResultReporter reporter(kMetricPrefixJob, story_name);
57   reporter.RegisterImportantMetric(kMetricWorkThroughput, "tasks/ms");
58   return reporter;
59 }
60 
61 // A thread-safe data structure that generates heuristic starting points in a
62 // range to process items in parallel.
63 // Note: we could expose this atomic-binary-search-index-generator in
64 // //base/util if it's useful for real-world use cases.
65 class IndexGenerator {
66  public:
IndexGenerator(size_t size)67   explicit IndexGenerator(size_t size) : size_(size) {
68     AutoLock auto_lock(lock_);
69     pending_indices_.push(0);
70     ranges_to_split_.push({0, size_});
71   }
72 
73   IndexGenerator(const IndexGenerator&) = delete;
74   IndexGenerator& operator=(const IndexGenerator&) = delete;
75 
GetNext()76   absl::optional<size_t> GetNext() {
77     AutoLock auto_lock(lock_);
78     if (!pending_indices_.empty()) {
79       // Return any pending index first.
80       auto index = pending_indices_.top();
81       pending_indices_.pop();
82       return index;
83     }
84     if (ranges_to_split_.empty())
85       return absl::nullopt;
86 
87     // Split the oldest running range in 2 and return the middle index as
88     // starting point.
89     auto range = ranges_to_split_.front();
90     ranges_to_split_.pop();
91     size_t size = range.second - range.first;
92     size_t mid = range.first + size / 2;
93     // Both sides of the range are added to |ranges_to_split_| so they may be
94     // further split if possible.
95     if (mid - range.first > 1)
96       ranges_to_split_.push({range.first, mid});
97     if (range.second - mid > 1)
98       ranges_to_split_.push({mid, range.second});
99     return mid;
100   }
101 
GiveBack(size_t index)102   void GiveBack(size_t index) {
103     AutoLock auto_lock(lock_);
104     // Add |index| to pending indices so GetNext() may return it before anything
105     // else.
106     pending_indices_.push(index);
107   }
108 
109  private:
110   base::Lock lock_;
111   // Pending indices that are ready to be handed out, prioritized over
112   // |pending_ranges_| when non-empty.
113   base::stack<size_t> pending_indices_ GUARDED_BY(lock_);
114   // Pending [start, end] (exclusive) ranges to split and hand out indices from.
115   base::queue<std::pair<size_t, size_t>> ranges_to_split_ GUARDED_BY(lock_);
116   const size_t size_;
117 };
118 
119 struct WorkItem {
120   std::atomic_bool acquire{false};
121 
TryAcquirebase::__anon51de6ee60111::WorkItem122   bool TryAcquire() {
123     // memory_order_relaxed is sufficient as the WorkItem's state itself hasn't
124     // been modified since the beginning of its associated job. This is only
125     // atomically acquiring the right to work on it.
126     return acquire.exchange(true, std::memory_order_relaxed) == false;
127   }
128 };
129 
130 class WorkList {
131  public:
WorkList(size_t num_work_items,RepeatingCallback<void (size_t)> process_item)132   WorkList(size_t num_work_items, RepeatingCallback<void(size_t)> process_item)
133       : num_incomplete_items_(num_work_items),
134         items_(num_work_items),
135         process_item_(std::move(process_item)) {}
136 
137   WorkList(const WorkList&) = delete;
138   WorkList& operator=(const WorkList&) = delete;
139 
140   // Acquires work item at |index|. Returns true if successful, or false if the
141   // item was already acquired.
TryAcquire(size_t index)142   bool TryAcquire(size_t index) { return items_[index].TryAcquire(); }
143 
144   // Processes work item at |index|. Returns true if there are more work items
145   // to process, or false if all items were processed.
ProcessWorkItem(size_t index)146   bool ProcessWorkItem(size_t index) {
147     process_item_.Run(index);
148     return num_incomplete_items_.fetch_sub(1, std::memory_order_relaxed) > 1;
149   }
150 
NumIncompleteWorkItems(size_t) const151   size_t NumIncompleteWorkItems(size_t /*worker_count*/) const {
152     // memory_order_relaxed is sufficient since this is not synchronized with
153     // other state.
154     return num_incomplete_items_.load(std::memory_order_relaxed);
155   }
156 
NumWorkItems() const157   size_t NumWorkItems() const { return items_.size(); }
158 
159  private:
160   std::atomic_size_t num_incomplete_items_;
161   std::vector<WorkItem> items_;
162   RepeatingCallback<void(size_t)> process_item_;
163 };
164 
BusyWaitCallback(TimeDelta delta)165 RepeatingCallback<void(size_t)> BusyWaitCallback(TimeDelta delta) {
166   return base::BindRepeating(
167       [](base::TimeDelta duration, size_t index) {
168         const base::TimeTicks end_time = base::TimeTicks::Now() + duration;
169         while (base::TimeTicks::Now() < end_time)
170           ;
171       },
172       delta);
173 }
174 
175 // Posts |task_count| no-op tasks every |delay|.
DisruptivePostTasks(size_t task_count,TimeDelta delay)176 void DisruptivePostTasks(size_t task_count, TimeDelta delay) {
177   for (size_t i = 0; i < task_count; ++i) {
178     ThreadPool::PostTask(FROM_HERE, {TaskPriority::USER_BLOCKING}, DoNothing());
179   }
180   ThreadPool::PostDelayedTask(FROM_HERE, {TaskPriority::USER_BLOCKING},
181                               BindOnce(&DisruptivePostTasks, task_count, delay),
182                               delay);
183 }
184 
185 class JobPerfTest : public testing::Test {
186  public:
187   JobPerfTest() = default;
188 
189   JobPerfTest(const JobPerfTest&) = delete;
190   JobPerfTest& operator=(const JobPerfTest&) = delete;
191 
192   // Process |num_work_items| items with |process_item| in parallel. Work is
193   // assigned by having each worker sequentially traversing all items and
194   // acquiring unvisited ones.
RunJobWithNaiveAssignment(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item)195   void RunJobWithNaiveAssignment(const std::string& story_name,
196                                  size_t num_work_items,
197                                  RepeatingCallback<void(size_t)> process_item) {
198     WorkList work_list(num_work_items, std::move(process_item));
199 
200     const TimeTicks job_run_start = TimeTicks::Now();
201 
202     WaitableEvent complete;
203     auto handle = PostJob(
204         FROM_HERE, {TaskPriority::USER_VISIBLE},
205         BindRepeating(
206             [](WorkList* work_list, WaitableEvent* complete,
207                JobDelegate* delegate) {
208               for (size_t i = 0; i < work_list->NumWorkItems() &&
209                                  work_list->NumIncompleteWorkItems(0) != 0 &&
210                                  !delegate->ShouldYield();
211                    ++i) {
212                 if (!work_list->TryAcquire(i))
213                   continue;
214                 if (!work_list->ProcessWorkItem(i)) {
215                   complete->Signal();
216                   return;
217                 }
218               }
219             },
220             Unretained(&work_list), Unretained(&complete)),
221         BindRepeating(&WorkList::NumIncompleteWorkItems,
222                       Unretained(&work_list)));
223 
224     complete.Wait();
225     handle.Join();
226     const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
227     EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
228 
229     auto reporter = SetUpReporter(story_name);
230     reporter.AddResult(kMetricWorkThroughput,
231                        size_t(num_work_items / job_duration.InMilliseconds()));
232   }
233 
234   // Process |num_work_items| items with |process_item| in parallel. Work is
235   // assigned by having each worker sequentially traversing all items
236   // synchronized with an atomic variable.
RunJobWithAtomicAssignment(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item,bool disruptive_post_tasks=false)237   void RunJobWithAtomicAssignment(const std::string& story_name,
238                                   size_t num_work_items,
239                                   RepeatingCallback<void(size_t)> process_item,
240                                   bool disruptive_post_tasks = false) {
241     WorkList work_list(num_work_items, std::move(process_item));
242     std::atomic_size_t index{0};
243 
244     // Post extra tasks to disrupt Job execution and cause workers to yield.
245     if (disruptive_post_tasks)
246       DisruptivePostTasks(10, Milliseconds(1));
247 
248     const TimeTicks job_run_start = TimeTicks::Now();
249 
250     WaitableEvent complete;
251     auto handle = PostJob(
252         FROM_HERE, {TaskPriority::USER_VISIBLE},
253         BindRepeating(
254             [](WorkList* work_list, WaitableEvent* complete,
255                std::atomic_size_t* index, JobDelegate* delegate) {
256               while (!delegate->ShouldYield()) {
257                 const size_t i = index->fetch_add(1, std::memory_order_relaxed);
258                 if (i >= work_list->NumWorkItems() ||
259                     !work_list->ProcessWorkItem(i)) {
260                   complete->Signal();
261                   return;
262                 }
263               }
264             },
265             Unretained(&work_list), Unretained(&complete), Unretained(&index)),
266         BindRepeating(&WorkList::NumIncompleteWorkItems,
267                       Unretained(&work_list)));
268 
269     complete.Wait();
270     handle.Join();
271     const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
272     EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
273 
274     auto reporter = SetUpReporter(story_name);
275     reporter.AddResult(kMetricWorkThroughput,
276                        size_t(num_work_items / job_duration.InMilliseconds()));
277   }
278 
279   // Process |num_work_items| items with |process_item| in parallel. Work is
280   // assigned dynamically having each new worker given a different point far
281   // from other workers until all work is done. This is achieved by recursively
282   // splitting each range that was previously given in half.
RunJobWithDynamicAssignment(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item,bool disruptive_post_tasks=false)283   void RunJobWithDynamicAssignment(const std::string& story_name,
284                                    size_t num_work_items,
285                                    RepeatingCallback<void(size_t)> process_item,
286                                    bool disruptive_post_tasks = false) {
287     WorkList work_list(num_work_items, std::move(process_item));
288     IndexGenerator generator(num_work_items);
289 
290     // Post extra tasks to disrupt Job execution and cause workers to yield.
291     if (disruptive_post_tasks)
292       DisruptivePostTasks(10, Milliseconds(1));
293 
294     const TimeTicks job_run_start = TimeTicks::Now();
295 
296     WaitableEvent complete;
297     auto handle = PostJob(
298         FROM_HERE, {TaskPriority::USER_VISIBLE},
299         BindRepeating(
300             [](IndexGenerator* generator, WorkList* work_list,
301                WaitableEvent* complete, JobDelegate* delegate) {
302               while (work_list->NumIncompleteWorkItems(0) != 0 &&
303                      !delegate->ShouldYield()) {
304                 absl::optional<size_t> index = generator->GetNext();
305                 if (!index)
306                   return;
307                 for (size_t i = *index; i < work_list->NumWorkItems(); ++i) {
308                   if (delegate->ShouldYield()) {
309                     generator->GiveBack(i);
310                     return;
311                   }
312                   if (!work_list->TryAcquire(i)) {
313                     // If this was touched already, get a new starting point.
314                     break;
315                   }
316                   if (!work_list->ProcessWorkItem(i)) {
317                     complete->Signal();
318                     return;
319                   }
320                 }
321               }
322             },
323             Unretained(&generator), Unretained(&work_list),
324             Unretained(&complete)),
325         BindRepeating(&WorkList::NumIncompleteWorkItems,
326                       Unretained(&work_list)));
327 
328     complete.Wait();
329     handle.Join();
330     const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
331     EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
332 
333     auto reporter = SetUpReporter(story_name);
334     reporter.AddResult(kMetricWorkThroughput,
335                        size_t(num_work_items / job_duration.InMilliseconds()));
336   }
337 
338   // Process |num_work_items| items with |process_item| in parallel. Work is
339   // assigned having each new worker given a different starting point far from
340   // other workers and loop over all work items from there. This is achieved by
341   // recursively splitting each range that was previously given in half.
RunJobWithLoopAround(const std::string & story_name,size_t num_work_items,RepeatingCallback<void (size_t)> process_item,bool disruptive_post_tasks=false)342   void RunJobWithLoopAround(const std::string& story_name,
343                             size_t num_work_items,
344                             RepeatingCallback<void(size_t)> process_item,
345                             bool disruptive_post_tasks = false) {
346     WorkList work_list(num_work_items, std::move(process_item));
347     IndexGenerator generator(num_work_items);
348 
349     // Post extra tasks to disrupt Job execution and cause workers to yield.
350     if (disruptive_post_tasks)
351       DisruptivePostTasks(10, Milliseconds(1));
352 
353     const TimeTicks job_run_start = TimeTicks::Now();
354 
355     WaitableEvent complete;
356     auto handle =
357         PostJob(FROM_HERE, {TaskPriority::USER_VISIBLE},
358                 BindRepeating(
359                     [](IndexGenerator* generator, WorkList* work_list,
360                        WaitableEvent* complete, JobDelegate* delegate) {
361                       absl::optional<size_t> index = generator->GetNext();
362                       if (!index)
363                         return;
364                       size_t i = *index;
365                       while (true) {
366                         if (delegate->ShouldYield()) {
367                           generator->GiveBack(i);
368                           return;
369                         }
370                         if (!work_list->TryAcquire(i)) {
371                           // If this was touched already, skip.
372                           continue;
373                         }
374                         if (!work_list->ProcessWorkItem(i)) {
375                           // This will cause the loop to exit if there's no work
376                           // left.
377                           complete->Signal();
378                           return;
379                         }
380                         ++i;
381                         if (i == work_list->NumWorkItems())
382                           i = 0;
383                       }
384                     },
385                     Unretained(&generator), Unretained(&work_list),
386                     Unretained(&complete)),
387                 BindRepeating(&WorkList::NumIncompleteWorkItems,
388                               Unretained(&work_list)));
389 
390     complete.Wait();
391     handle.Join();
392     const TimeDelta job_duration = TimeTicks::Now() - job_run_start;
393     EXPECT_EQ(0U, work_list.NumIncompleteWorkItems(0));
394 
395     auto reporter = SetUpReporter(story_name);
396     reporter.AddResult(kMetricWorkThroughput,
397                        size_t(num_work_items / job_duration.InMilliseconds()));
398   }
399 
400  private:
401   test::TaskEnvironment task_environment;
402 };
403 
404 }  // namespace
405 
TEST_F(JobPerfTest,NoOpWorkNaiveAssignment)406 TEST_F(JobPerfTest, NoOpWorkNaiveAssignment) {
407   RunJobWithNaiveAssignment(kStoryNoOpNaive, 10000000, DoNothing());
408 }
409 
TEST_F(JobPerfTest,BusyWaitNaiveAssignment)410 TEST_F(JobPerfTest, BusyWaitNaiveAssignment) {
411   RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
412   RunJobWithNaiveAssignment(kStoryBusyWaitNaive, 500000, std::move(callback));
413 }
414 
TEST_F(JobPerfTest,NoOpWorkAtomicAssignment)415 TEST_F(JobPerfTest, NoOpWorkAtomicAssignment) {
416   RunJobWithAtomicAssignment(kStoryNoOpAtomic, 10000000, DoNothing());
417 }
418 
TEST_F(JobPerfTest,NoOpDisruptedWorkAtomicAssignment)419 TEST_F(JobPerfTest, NoOpDisruptedWorkAtomicAssignment) {
420   RunJobWithAtomicAssignment(kStoryNoOpAtomicDisrupted, 10000000, DoNothing(),
421                              true);
422 }
423 
TEST_F(JobPerfTest,BusyWaitAtomicAssignment)424 TEST_F(JobPerfTest, BusyWaitAtomicAssignment) {
425   RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
426   RunJobWithAtomicAssignment(kStoryBusyWaitAtomic, 500000, std::move(callback));
427 }
428 
TEST_F(JobPerfTest,BusyWaitDisruptedWorkAtomicAssignment)429 TEST_F(JobPerfTest, BusyWaitDisruptedWorkAtomicAssignment) {
430   RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
431   RunJobWithAtomicAssignment(kStoryBusyWaitAtomicDisrupted, 500000,
432                              std::move(callback), true);
433 }
434 
TEST_F(JobPerfTest,NoOpWorkDynamicAssignment)435 TEST_F(JobPerfTest, NoOpWorkDynamicAssignment) {
436   RunJobWithDynamicAssignment(kStoryNoOpDynamic, 10000000, DoNothing());
437 }
438 
TEST_F(JobPerfTest,NoOpDisruptedWorkDynamicAssignment)439 TEST_F(JobPerfTest, NoOpDisruptedWorkDynamicAssignment) {
440   RunJobWithDynamicAssignment(kStoryNoOpDynamicDisrupted, 10000000, DoNothing(),
441                               true);
442 }
443 
TEST_F(JobPerfTest,BusyWaitWorkDynamicAssignment)444 TEST_F(JobPerfTest, BusyWaitWorkDynamicAssignment) {
445   RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
446   RunJobWithDynamicAssignment(kStoryBusyWaitDynamic, 500000,
447                               std::move(callback));
448 }
449 
TEST_F(JobPerfTest,BusyWaitDisruptedWorkDynamicAssignment)450 TEST_F(JobPerfTest, BusyWaitDisruptedWorkDynamicAssignment) {
451   RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
452   RunJobWithDynamicAssignment(kStoryBusyWaitDynamicDisrupted, 500000,
453                               std::move(callback), true);
454 }
455 
TEST_F(JobPerfTest,NoOpWorkLoopAround)456 TEST_F(JobPerfTest, NoOpWorkLoopAround) {
457   RunJobWithLoopAround(kStoryNoOpLoopAround, 10000000, DoNothing());
458 }
459 
TEST_F(JobPerfTest,NoOpDisruptedWorkLoopAround)460 TEST_F(JobPerfTest, NoOpDisruptedWorkLoopAround) {
461   RunJobWithLoopAround(kStoryNoOpLoopAroundDisrupted, 10000000, DoNothing(),
462                        true);
463 }
464 
TEST_F(JobPerfTest,BusyWaitWorkLoopAround)465 TEST_F(JobPerfTest, BusyWaitWorkLoopAround) {
466   RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
467   RunJobWithLoopAround(kStoryBusyWaitLoopAround, 500000, std::move(callback));
468 }
469 
TEST_F(JobPerfTest,BusyWaitDisruptedWorkLoopAround)470 TEST_F(JobPerfTest, BusyWaitDisruptedWorkLoopAround) {
471   RepeatingCallback<void(size_t)> callback = BusyWaitCallback(Microseconds(5));
472   RunJobWithLoopAround(kStoryBusyWaitLoopAroundDisrupted, 500000,
473                        std::move(callback), true);
474 }
475 
476 }  // namespace base
477