1 // Copyright 2020 The Marl Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 // This file contains a number of benchmarks that do not use marl.
16 // They exist to compare marl's performance against other simple scheduler
17 // approaches.
18 
19 #include "marl_bench.h"
20 
21 #include "benchmark/benchmark.h"
22 
23 #include <mutex>
24 #include <queue>
25 #include <thread>
26 
27 namespace {
28 
29 // Event provides a basic wait-and-signal synchronization primitive.
30 class Event {
31  public:
32   // wait blocks until the event is fired.
wait()33   void wait() {
34     std::unique_lock<std::mutex> lock(mutex_);
35     cv_.wait(lock, [&] { return signalled_; });
36   }
37 
38   // signal signals the Event, unblocking any calls to wait.
signal()39   void signal() {
40     std::unique_lock<std::mutex> lock(mutex_);
41     signalled_ = true;
42     cv_.notify_all();
43   }
44 
45  private:
46   std::condition_variable cv_;
47   std::mutex mutex_;
48   bool signalled_ = false;
49 };
50 
51 }  // anonymous namespace
52 
53 // A simple multi-thread, single-queue task executor that shares a single mutex
54 // across N threads. This implementation suffers from lock contention.
SingleQueueTaskExecutor(benchmark::State & state)55 static void SingleQueueTaskExecutor(benchmark::State& state) {
56   using Task = std::function<uint32_t(uint32_t)>;
57 
58   auto const numTasks = Schedule::numTasks(state);
59   auto const numThreads = Schedule::numThreads(state);
60 
61   for (auto _ : state) {
62     state.PauseTiming();
63 
64     std::mutex mutex;
65     // Set everything up with the mutex locked to prevent the threads from
66     // performing work while the timing is paused.
67     mutex.lock();
68 
69     // Set up the tasks.
70     std::queue<Task> tasks;
71     for (int i = 0; i < numTasks; i++) {
72       tasks.push(Schedule::doSomeWork);
73     }
74 
75     auto taskRunner = [&] {
76       while (true) {
77         Task task;
78 
79         // Take the next task.
80         // Note that this lock is likely to block while waiting for other
81         // threads.
82         mutex.lock();
83         if (tasks.size() > 0) {
84           task = tasks.front();
85           tasks.pop();
86         }
87         mutex.unlock();
88 
89         if (task) {
90           task(123);
91         } else {
92           return;  // done.
93         }
94       }
95     };
96 
97     // Set up the threads.
98     std::vector<std::thread> threads;
99     for (int i = 0; i < numThreads; i++) {
100       threads.emplace_back(std::thread(taskRunner));
101     }
102 
103     state.ResumeTiming();
104     mutex.unlock();  // Go threads, go!
105 
106     if (numThreads > 0) {
107       // Wait for all threads to finish.
108       for (auto& thread : threads) {
109         thread.join();
110       }
111     } else {
112       // Single-threaded test - just run the worker.
113       taskRunner();
114     }
115   }
116 }
117 BENCHMARK(SingleQueueTaskExecutor)->Apply(Schedule::args);
118 
119 // A simple multi-thread, multi-queue task executor that avoids lock contention.
120 // Tasks queues are evenly balanced, and each should take an equal amount of
121 // time to execute.
MultiQueueTaskExecutor(benchmark::State & state)122 static void MultiQueueTaskExecutor(benchmark::State& state) {
123   using Task = std::function<uint32_t(uint32_t)>;
124   using TaskQueue = std::vector<Task>;
125 
126   auto const numTasks = Schedule::numTasks(state);
127   auto const numThreads = Schedule::numThreads(state);
128   auto const numQueues = std::max(numThreads, 1);
129 
130   // Set up the tasks queues.
131   std::vector<TaskQueue> taskQueues(numQueues);
132   for (int i = 0; i < numTasks; i++) {
133     taskQueues[i % numQueues].emplace_back(Schedule::doSomeWork);
134   }
135 
136   for (auto _ : state) {
137     if (numThreads > 0) {
138       state.PauseTiming();
139       Event start;
140 
141       // Set up the threads.
142       std::vector<std::thread> threads;
143       for (int i = 0; i < numThreads; i++) {
144         threads.emplace_back(std::thread([&, i] {
145           start.wait();
146           for (auto& task : taskQueues[i]) {
147             task(123);
148           }
149         }));
150       }
151 
152       state.ResumeTiming();
153       start.signal();
154 
155       // Wait for all threads to finish.
156       for (auto& thread : threads) {
157         thread.join();
158       }
159     } else {
160       // Single-threaded test - just run the tasks.
161       for (auto& task : taskQueues[0]) {
162         task(123);
163       }
164     }
165   }
166 }
167 BENCHMARK(MultiQueueTaskExecutor)->Apply(Schedule::args);