• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <benchmark/benchmark.h>
15 #include <grpc/event_engine/event_engine.h>
16 #include <grpc/support/port_platform.h>
17 
18 #include <deque>
19 
20 #include "absl/log/check.h"
21 #include "src/core/lib/event_engine/common_closures.h"
22 #include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
23 #include "src/core/util/sync.h"
24 #include "test/core/test_util/test_config.h"
25 
26 namespace {
27 
28 using ::grpc_event_engine::experimental::AnyInvocableClosure;
29 using ::grpc_event_engine::experimental::BasicWorkQueue;
30 using ::grpc_event_engine::experimental::EventEngine;
31 
32 grpc_core::Mutex globalMu;
33 BasicWorkQueue globalWorkQueue;
34 std::deque<EventEngine::Closure*> globalDeque;
35 
36 // --- Multithreaded Tests ---------------------------------------------------
37 
MultithreadedTestArguments(benchmark::internal::Benchmark * b)38 void MultithreadedTestArguments(benchmark::internal::Benchmark* b) {
39   b->Range(1, 512)
40       ->UseRealTime()
41       ->MeasureProcessCPUTime()
42       ->Threads(1)
43       ->Threads(4)
44       ->ThreadPerCpu();
45 }
46 
BM_MultithreadedWorkQueuePopOldest(benchmark::State & state)47 void BM_MultithreadedWorkQueuePopOldest(benchmark::State& state) {
48   AnyInvocableClosure closure([] {});
49   int element_count = state.range(0);
50   double pop_attempts = 0;
51   for (auto _ : state) {
52     for (int i = 0; i < element_count; i++) globalWorkQueue.Add(&closure);
53     int cnt = 0;
54     do {
55       if (++pop_attempts && globalWorkQueue.PopOldest() != nullptr) ++cnt;
56     } while (cnt < element_count);
57   }
58   state.counters["added"] = element_count * state.iterations();
59   state.counters["pop_rate"] = benchmark::Counter(
60       element_count * state.iterations(), benchmark::Counter::kIsRate);
61   state.counters["pop_attempts"] = pop_attempts;
62   // Rough measurement of queue contention.
63   // WorkQueue::Pop* may return nullptr when the queue is non-empty, usually
64   // when under thread contention. hit_rate is the ratio of pop attempts to
65   // closure executions.
66   state.counters["hit_rate"] =
67       benchmark::Counter(element_count * state.iterations() / pop_attempts,
68                          benchmark::Counter::kAvgThreads);
69   if (state.thread_index() == 0) {
70     CHECK(globalWorkQueue.Empty());
71   }
72 }
73 BENCHMARK(BM_MultithreadedWorkQueuePopOldest)
74     ->Apply(MultithreadedTestArguments);
75 
BM_MultithreadedWorkQueuePopMostRecent(benchmark::State & state)76 void BM_MultithreadedWorkQueuePopMostRecent(benchmark::State& state) {
77   AnyInvocableClosure closure([] {});
78   int element_count = state.range(0);
79   double pop_attempts = 0;
80   for (auto _ : state) {
81     for (int i = 0; i < element_count; i++) globalWorkQueue.Add(&closure);
82     int cnt = 0;
83     do {
84       if (++pop_attempts && globalWorkQueue.PopMostRecent() != nullptr) ++cnt;
85     } while (cnt < element_count);
86   }
87   state.counters["added"] = element_count * state.iterations();
88   state.counters["pop_rate"] = benchmark::Counter(
89       element_count * state.iterations(), benchmark::Counter::kIsRate);
90   state.counters["pop_attempts"] = pop_attempts;
91   state.counters["hit_rate"] =
92       benchmark::Counter(element_count * state.iterations() / pop_attempts,
93                          benchmark::Counter::kAvgThreads);
94   if (state.thread_index() == 0) {
95     CHECK(globalWorkQueue.Empty());
96   }
97 }
98 BENCHMARK(BM_MultithreadedWorkQueuePopMostRecent)
99     ->Apply(MultithreadedTestArguments);
100 
BM_MultithreadedStdDequeLIFO(benchmark::State & state)101 void BM_MultithreadedStdDequeLIFO(benchmark::State& state) {
102   int element_count = state.range(0);
103   AnyInvocableClosure closure([] {});
104   for (auto _ : state) {
105     for (int i = 0; i < element_count; i++) {
106       grpc_core::MutexLock lock(&globalMu);
107       globalDeque.push_back(&closure);
108     }
109     for (int i = 0; i < element_count; i++) {
110       grpc_core::MutexLock lock(&globalMu);
111       EventEngine::Closure* popped = globalDeque.back();
112       globalDeque.pop_back();
113       CHECK_NE(popped, nullptr);
114     }
115   }
116   state.counters["added"] = element_count * state.iterations();
117   state.counters["pop_attempts"] = state.counters["added"];
118   state.counters["pop_rate"] = benchmark::Counter(
119       element_count * state.iterations(), benchmark::Counter::kIsRate);
120   state.counters["hit_rate"] =
121       benchmark::Counter(1, benchmark::Counter::kAvgThreads);
122 }
123 BENCHMARK(BM_MultithreadedStdDequeLIFO)->Apply(MultithreadedTestArguments);
124 
125 // --- Basic Functionality Tests ---------------------------------------------
126 
BM_WorkQueueIntptrPopMostRecent(benchmark::State & state)127 void BM_WorkQueueIntptrPopMostRecent(benchmark::State& state) {
128   BasicWorkQueue queue;
129   grpc_event_engine::experimental::AnyInvocableClosure closure([] {});
130   int element_count = state.range(0);
131   for (auto _ : state) {
132     int cnt = 0;
133     for (int i = 0; i < element_count; i++) queue.Add(&closure);
134     do {
135       if (queue.PopMostRecent() != nullptr) ++cnt;
136     } while (cnt < element_count);
137   }
138   state.counters["Added"] = element_count * state.iterations();
139   state.counters["Popped"] = state.counters["Added"];
140   state.counters["Pop Rate"] =
141       benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
142 }
143 BENCHMARK(BM_WorkQueueIntptrPopMostRecent)
144     ->Range(1, 512)
145     ->UseRealTime()
146     ->MeasureProcessCPUTime();
147 
BM_WorkQueueClosureExecution(benchmark::State & state)148 void BM_WorkQueueClosureExecution(benchmark::State& state) {
149   BasicWorkQueue queue;
150   int element_count = state.range(0);
151   int run_count = 0;
152   grpc_event_engine::experimental::AnyInvocableClosure closure(
153       [&run_count] { ++run_count; });
154   for (auto _ : state) {
155     for (int i = 0; i < element_count; i++) queue.Add(&closure);
156     do {
157       queue.PopMostRecent()->Run();
158     } while (run_count < element_count);
159     run_count = 0;
160   }
161   state.counters["Added"] = element_count * state.iterations();
162   state.counters["Popped"] = state.counters["Added"];
163   state.counters["Pop Rate"] =
164       benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
165 }
166 BENCHMARK(BM_WorkQueueClosureExecution)
167     ->Range(8, 128)
168     ->UseRealTime()
169     ->MeasureProcessCPUTime();
170 
BM_WorkQueueAnyInvocableExecution(benchmark::State & state)171 void BM_WorkQueueAnyInvocableExecution(benchmark::State& state) {
172   BasicWorkQueue queue;
173   int element_count = state.range(0);
174   int run_count = 0;
175   for (auto _ : state) {
176     for (int i = 0; i < element_count; i++) {
177       queue.Add([&run_count] { ++run_count; });
178     }
179     do {
180       queue.PopMostRecent()->Run();
181     } while (run_count < element_count);
182     run_count = 0;
183   }
184   state.counters["Added"] = element_count * state.iterations();
185   state.counters["Popped"] = state.counters["Added"];
186   state.counters["Pop Rate"] =
187       benchmark::Counter(state.counters["Popped"], benchmark::Counter::kIsRate);
188 }
189 BENCHMARK(BM_WorkQueueAnyInvocableExecution)
190     ->Range(8, 128)
191     ->UseRealTime()
192     ->MeasureProcessCPUTime();
193 
194 }  // namespace
195 
196 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
197 // and others do not. This allows us to support both modes.
198 namespace benchmark {
RunTheBenchmarksNamespaced()199 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
200 }  // namespace benchmark
201 
main(int argc,char ** argv)202 int main(int argc, char** argv) {
203   grpc::testing::TestEnvironment env(&argc, argv);
204   ::benchmark::Initialize(&argc, argv);
205   benchmark::RunTheBenchmarksNamespaced();
206   return 0;
207 }
208