1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <benchmark/benchmark.h>
15 #include <grpc/event_engine/event_engine.h>
16 #include <grpc/support/port_platform.h>
17
18 #include <deque>
19
20 #include "absl/log/check.h"
21 #include "src/core/lib/event_engine/common_closures.h"
22 #include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
23 #include "src/core/util/sync.h"
24 #include "test/core/test_util/test_config.h"
25
26 namespace {
27
28 using ::grpc_event_engine::experimental::AnyInvocableClosure;
29 using ::grpc_event_engine::experimental::BasicWorkQueue;
30 using ::grpc_event_engine::experimental::EventEngine;
31
32 grpc_core::Mutex globalMu;
33 BasicWorkQueue globalWorkQueue;
34 std::deque<EventEngine::Closure*> globalDeque;
35
36 // --- Multithreaded Tests ---------------------------------------------------
37
MultithreadedTestArguments(benchmark::internal::Benchmark * b)38 void MultithreadedTestArguments(benchmark::internal::Benchmark* b) {
39 b->Range(1, 512)
40 ->UseRealTime()
41 ->MeasureProcessCPUTime()
42 ->Threads(1)
43 ->Threads(4)
44 ->ThreadPerCpu();
45 }
46
BM_MultithreadedWorkQueuePopOldest(benchmark::State & state)47 void BM_MultithreadedWorkQueuePopOldest(benchmark::State& state) {
48 AnyInvocableClosure closure([] {});
49 int element_count = state.range(0);
50 double pop_attempts = 0;
51 for (auto _ : state) {
52 for (int i = 0; i < element_count; i++) globalWorkQueue.Add(&closure);
53 int cnt = 0;
54 do {
55 if (++pop_attempts && globalWorkQueue.PopOldest() != nullptr) ++cnt;
56 } while (cnt < element_count);
57 }
58 state.counters["added"] = element_count * state.iterations();
59 state.counters["pop_rate"] = benchmark::Counter(
60 element_count * state.iterations(), benchmark::Counter::kIsRate);
61 state.counters["pop_attempts"] = pop_attempts;
62 // Rough measurement of queue contention.
63 // WorkQueue::Pop* may return nullptr when the queue is non-empty, usually
64 // when under thread contention. hit_rate is the ratio of pop attempts to
65 // closure executions.
66 state.counters["hit_rate"] =
67 benchmark::Counter(element_count * state.iterations() / pop_attempts,
68 benchmark::Counter::kAvgThreads);
69 if (state.thread_index() == 0) {
70 CHECK(globalWorkQueue.Empty());
71 }
72 }
73 BENCHMARK(BM_MultithreadedWorkQueuePopOldest)
74 ->Apply(MultithreadedTestArguments);
75
BM_MultithreadedWorkQueuePopMostRecent(benchmark::State & state)76 void BM_MultithreadedWorkQueuePopMostRecent(benchmark::State& state) {
77 AnyInvocableClosure closure([] {});
78 int element_count = state.range(0);
79 double pop_attempts = 0;
80 for (auto _ : state) {
81 for (int i = 0; i < element_count; i++) globalWorkQueue.Add(&closure);
82 int cnt = 0;
83 do {
84 if (++pop_attempts && globalWorkQueue.PopMostRecent() != nullptr) ++cnt;
85 } while (cnt < element_count);
86 }
87 state.counters["added"] = element_count * state.iterations();
88 state.counters["pop_rate"] = benchmark::Counter(
89 element_count * state.iterations(), benchmark::Counter::kIsRate);
90 state.counters["pop_attempts"] = pop_attempts;
91 state.counters["hit_rate"] =
92 benchmark::Counter(element_count * state.iterations() / pop_attempts,
93 benchmark::Counter::kAvgThreads);
94 if (state.thread_index() == 0) {
95 CHECK(globalWorkQueue.Empty());
96 }
97 }
98 BENCHMARK(BM_MultithreadedWorkQueuePopMostRecent)
99 ->Apply(MultithreadedTestArguments);
100
BM_MultithreadedStdDequeLIFO(benchmark::State & state)101 void BM_MultithreadedStdDequeLIFO(benchmark::State& state) {
102 int element_count = state.range(0);
103 AnyInvocableClosure closure([] {});
104 for (auto _ : state) {
105 for (int i = 0; i < element_count; i++) {
106 grpc_core::MutexLock lock(&globalMu);
107 globalDeque.push_back(&closure);
108 }
109 for (int i = 0; i < element_count; i++) {
110 grpc_core::MutexLock lock(&globalMu);
111 EventEngine::Closure* popped = globalDeque.back();
112 globalDeque.pop_back();
113 CHECK_NE(popped, nullptr);
114 }
115 }
116 state.counters["added"] = element_count * state.iterations();
117 state.counters["pop_attempts"] = state.counters["added"];
118 state.counters["pop_rate"] = benchmark::Counter(
119 element_count * state.iterations(), benchmark::Counter::kIsRate);
120 state.counters["hit_rate"] =
121 benchmark::Counter(1, benchmark::Counter::kAvgThreads);
122 }
123 BENCHMARK(BM_MultithreadedStdDequeLIFO)->Apply(MultithreadedTestArguments);
124
125 // --- Basic Functionality Tests ---------------------------------------------
126
BM_WorkQueueIntptrPopMostRecent(benchmark::State & state)127 void BM_WorkQueueIntptrPopMostRecent(benchmark::State& state) {
128 BasicWorkQueue queue;
129 grpc_event_engine::experimental::AnyInvocableClosure closure([] {});
130 int element_count = state.range(0);
131 for (auto _ : state) {
132 int cnt = 0;
133 for (int i = 0; i < element_count; i++) queue.Add(&closure);
134 do {
135 if (queue.PopMostRecent() != nullptr) ++cnt;
136 } while (cnt < element_count);
137 }
138 state.counters["Added"] = element_count * state.iterations();
139 state.counters["Popped"] = state.counters["Added"];
140 state.counters["Pop Rate"] =
141 benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
142 }
143 BENCHMARK(BM_WorkQueueIntptrPopMostRecent)
144 ->Range(1, 512)
145 ->UseRealTime()
146 ->MeasureProcessCPUTime();
147
BM_WorkQueueClosureExecution(benchmark::State & state)148 void BM_WorkQueueClosureExecution(benchmark::State& state) {
149 BasicWorkQueue queue;
150 int element_count = state.range(0);
151 int run_count = 0;
152 grpc_event_engine::experimental::AnyInvocableClosure closure(
153 [&run_count] { ++run_count; });
154 for (auto _ : state) {
155 for (int i = 0; i < element_count; i++) queue.Add(&closure);
156 do {
157 queue.PopMostRecent()->Run();
158 } while (run_count < element_count);
159 run_count = 0;
160 }
161 state.counters["Added"] = element_count * state.iterations();
162 state.counters["Popped"] = state.counters["Added"];
163 state.counters["Pop Rate"] =
164 benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
165 }
166 BENCHMARK(BM_WorkQueueClosureExecution)
167 ->Range(8, 128)
168 ->UseRealTime()
169 ->MeasureProcessCPUTime();
170
BM_WorkQueueAnyInvocableExecution(benchmark::State & state)171 void BM_WorkQueueAnyInvocableExecution(benchmark::State& state) {
172 BasicWorkQueue queue;
173 int element_count = state.range(0);
174 int run_count = 0;
175 for (auto _ : state) {
176 for (int i = 0; i < element_count; i++) {
177 queue.Add([&run_count] { ++run_count; });
178 }
179 do {
180 queue.PopMostRecent()->Run();
181 } while (run_count < element_count);
182 run_count = 0;
183 }
184 state.counters["Added"] = element_count * state.iterations();
185 state.counters["Popped"] = state.counters["Added"];
186 state.counters["Pop Rate"] =
187 benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
188 }
189 BENCHMARK(BM_WorkQueueAnyInvocableExecution)
190 ->Range(8, 128)
191 ->UseRealTime()
192 ->MeasureProcessCPUTime();
193
194 } // namespace
195
196 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
197 // and others do not. This allows us to support both modes.
198 namespace benchmark {
RunTheBenchmarksNamespaced()199 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
200 } // namespace benchmark
201
main(int argc,char ** argv)202 int main(int argc, char** argv) {
203 grpc::testing::TestEnvironment env(&argc, argv);
204 ::benchmark::Initialize(&argc, argv);
205 benchmark::RunTheBenchmarksNamespaced();
206 return 0;
207 }
208