• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <benchmark/benchmark.h>
16 #include <grpc/event_engine/event_engine.h>
17 #include <grpcpp/impl/grpc_library.h>
18 
19 #include <atomic>
20 #include <cmath>
21 #include <memory>
22 #include <vector>
23 
24 #include "absl/debugging/leak_check.h"
25 #include "absl/functional/any_invocable.h"
26 #include "absl/log/check.h"
27 #include "absl/strings/str_format.h"
28 #include "src/core/lib/event_engine/common_closures.h"
29 #include "src/core/lib/event_engine/default_event_engine.h"
30 #include "src/core/util/crash.h"
31 #include "src/core/util/notification.h"
32 #include "test/core/test_util/test_config.h"
33 #include "test/cpp/microbenchmarks/helpers.h"
34 #include "test/cpp/util/test_config.h"
35 
36 namespace {
37 
38 using ::grpc_event_engine::experimental::AnyInvocableClosure;
39 using ::grpc_event_engine::experimental::EventEngine;
40 using ::grpc_event_engine::experimental::GetDefaultEventEngine;
41 
42 struct FanoutParameters {
43   int depth;
44   int fanout;
45   int limit;
46 };
47 
BM_EventEngine_RunSmallLambda(benchmark::State & state)48 void BM_EventEngine_RunSmallLambda(benchmark::State& state) {
49   auto engine = GetDefaultEventEngine();
50   const int cb_count = state.range(0);
51   std::atomic_int count{0};
52   for (auto _ : state) {
53     state.PauseTiming();
54     grpc_core::Notification signal;
55     auto cb = [&signal, &count, cb_count]() {
56       if (++count == cb_count) signal.Notify();
57     };
58     state.ResumeTiming();
59     for (int i = 0; i < cb_count; i++) {
60       engine->Run(cb);
61     }
62     signal.WaitForNotification();
63     count.store(0);
64   }
65   state.SetItemsProcessed(cb_count * state.iterations());
66 }
67 BENCHMARK(BM_EventEngine_RunSmallLambda)
68     ->Range(100, 4096)
69     ->MeasureProcessCPUTime()
70     ->UseRealTime();
71 
BM_EventEngine_RunLargeLambda(benchmark::State & state)72 void BM_EventEngine_RunLargeLambda(benchmark::State& state) {
73   int cb_count = state.range(0);
74   // large lambdas require an extra allocation
75   std::string extra = "12345678";
76   auto engine = GetDefaultEventEngine();
77   std::atomic_int count{0};
78   for (auto _ : state) {
79     state.PauseTiming();
80     grpc_core::Notification signal;
81     auto cb = [&signal, &count, cb_count, extra]() {
82       (void)extra;
83       if (++count == cb_count) signal.Notify();
84     };
85     state.ResumeTiming();
86     for (int i = 0; i < cb_count; i++) {
87       engine->Run(cb);
88     }
89     signal.WaitForNotification();
90     count.store(0);
91   }
92   state.SetItemsProcessed(cb_count * state.iterations());
93 }
94 BENCHMARK(BM_EventEngine_RunLargeLambda)
95     ->Range(100, 4096)
96     ->MeasureProcessCPUTime()
97     ->UseRealTime();
98 
BM_EventEngine_RunClosure(benchmark::State & state)99 void BM_EventEngine_RunClosure(benchmark::State& state) {
100   int cb_count = state.range(0);
101   grpc_core::Notification* signal = new grpc_core::Notification();
102   std::atomic_int count{0};
103   // Ignore leaks from this closure. For simplicity, this closure is not deleted
104   // because the closure may still be executing after the EventEngine is
105   // destroyed. This is because the default posix EventEngine's thread pool may
106   // get destroyed separately from the EventEngine.
107   AnyInvocableClosure* closure = absl::IgnoreLeak(
108       new AnyInvocableClosure([signal_holder = &signal, cb_count, &count]() {
109         if (++count == cb_count) {
110           (*signal_holder)->Notify();
111         }
112       }));
113   auto engine = GetDefaultEventEngine();
114   for (auto _ : state) {
115     for (int i = 0; i < cb_count; i++) {
116       engine->Run(closure);
117     }
118     signal->WaitForNotification();
119     state.PauseTiming();
120     delete signal;
121     signal = new grpc_core::Notification();
122     count.store(0);
123     state.ResumeTiming();
124   }
125   delete signal;
126   state.SetItemsProcessed(cb_count * state.iterations());
127 }
128 BENCHMARK(BM_EventEngine_RunClosure)
129     ->Range(100, 4096)
130     ->MeasureProcessCPUTime()
131     ->UseRealTime();
132 
FanoutTestArguments(benchmark::internal::Benchmark * b)133 void FanoutTestArguments(benchmark::internal::Benchmark* b) {
134   // TODO(hork): enable when the engines are fast enough to run these:
135   // ->Args({10000, 1})  // chain of callbacks scheduling callbacks
136   // ->Args({1, 10000})  // flat scheduling of callbacks
137   // ->Args({5, 6})      // depth 5, fans out to 9,330 callbacks
138   //  ->Args({2, 100})   // depth 2, fans out 10,101 callbacks
139   //  ->Args({4, 10})    // depth 4, fans out to 11,110 callbacks
140   b->Args({1000, 1})     // chain of callbacks scheduling callbacks
141       ->Args({100, 1})   // chain of callbacks scheduling callbacks
142       ->Args({1, 1000})  // flat scheduling of callbacks
143       ->Args({1, 100})   // flat scheduling of callbacks
144       ->Args({2, 70})    // depth 2, fans out 4971
145       ->Args({4, 8})     // depth 4, fans out 4681
146       ->UseRealTime()
147       ->MeasureProcessCPUTime();
148 }
149 
GetFanoutParameters(benchmark::State & state)150 FanoutParameters GetFanoutParameters(benchmark::State& state) {
151   FanoutParameters params;
152   params.depth = state.range(0);
153   params.fanout = state.range(1);
154   if (params.depth == 1 || params.fanout == 1) {
155     params.limit = std::max(params.depth, params.fanout) + 1;
156   } else {
157     // sum of geometric series
158     params.limit =
159         (1 - std::pow(params.fanout, params.depth + 1)) / (1 - params.fanout);
160   }
161   // sanity checking
162   CHECK(params.limit >= params.fanout * params.depth);
163   return params;
164 }
165 
166 // EventEngine callback for Lambda FanOut tests
167 //
168 // Note that params are copied each time for 2 reasons: 1) callbacks will
169 // inevitably continue to shut down after the end of the test, so a reference
170 // parameter will become invalid and crash some callbacks, and 2) in my RBE
171 // tests, copies are slightly faster than a shared_ptr<FanoutParams>
172 // alternative.
FanOutCallback(std::shared_ptr<EventEngine> engine,const FanoutParameters params,grpc_core::Notification & signal,std::atomic_int & count,int processing_layer)173 void FanOutCallback(std::shared_ptr<EventEngine> engine,
174                     const FanoutParameters params,
175                     grpc_core::Notification& signal, std::atomic_int& count,
176                     int processing_layer) {
177   int local_cnt = count.fetch_add(1, std::memory_order_acq_rel) + 1;
178   if (local_cnt == params.limit) {
179     signal.Notify();
180     return;
181   }
182   DCHECK_LT(local_cnt, params.limit);
183   if (params.depth == processing_layer) return;
184   for (int i = 0; i < params.fanout; i++) {
185     engine->Run([engine, params, processing_layer, &count, &signal]() {
186       FanOutCallback(engine, params, signal, count, processing_layer + 1);
187     });
188   }
189 }
190 
BM_EventEngine_Lambda_FanOut(benchmark::State & state)191 void BM_EventEngine_Lambda_FanOut(benchmark::State& state) {
192   auto params = GetFanoutParameters(state);
193   auto engine = GetDefaultEventEngine();
194   for (auto _ : state) {
195     std::atomic_int count{0};
196     grpc_core::Notification signal;
197     FanOutCallback(engine, params, signal, count, /*processing_layer=*/0);
198     do {
199       signal.WaitForNotification();
200     } while (count.load() != params.limit);
201   }
202   state.SetItemsProcessed(params.limit * state.iterations());
203 }
204 BENCHMARK(BM_EventEngine_Lambda_FanOut)->Apply(FanoutTestArguments);
205 
ClosureFanOutCallback(EventEngine::Closure * child_closure,std::shared_ptr<EventEngine> engine,grpc_core::Notification ** signal_holder,std::atomic_int & count,const FanoutParameters params)206 void ClosureFanOutCallback(EventEngine::Closure* child_closure,
207                            std::shared_ptr<EventEngine> engine,
208                            grpc_core::Notification** signal_holder,
209                            std::atomic_int& count,
210                            const FanoutParameters params) {
211   int local_cnt = count.fetch_add(1, std::memory_order_acq_rel) + 1;
212   if (local_cnt == params.limit) {
213     (*signal_holder)->Notify();
214     return;
215   }
216   if (local_cnt > params.limit) {
217     grpc_core::Crash(absl::StrFormat("Ran too many closures: %d/%d", local_cnt,
218                                      params.limit));
219   }
220   if (child_closure == nullptr) return;
221   for (int i = 0; i < params.fanout; i++) {
222     engine->Run(child_closure);
223   }
224 }
225 
BM_EventEngine_Closure_FanOut(benchmark::State & state)226 void BM_EventEngine_Closure_FanOut(benchmark::State& state) {
227   auto params = GetFanoutParameters(state);
228   auto engine = GetDefaultEventEngine();
229   std::vector<EventEngine::Closure*> closures;
230   closures.reserve(params.depth + 2);
231   closures.push_back(nullptr);
232   grpc_core::Notification* signal = new grpc_core::Notification();
233   std::atomic_int count{0};
234   // prepare a unique closure for each depth
235   for (int i = 0; i <= params.depth; i++) {
236     // call the previous closure (e.g., closures[2] calls closures[1] during
237     // fanout)
238     closures.push_back(new AnyInvocableClosure(
239         [i, engine, &closures, params, signal_holder = &signal, &count]() {
240           ClosureFanOutCallback(closures[i], engine, signal_holder, count,
241                                 params);
242         }));
243   }
244   for (auto _ : state) {
245     DCHECK_EQ(count.load(std::memory_order_relaxed), 0);
246     engine->Run(closures[params.depth + 1]);
247     do {
248       signal->WaitForNotification();
249     } while (count.load() != params.limit);
250     // cleanup
251     state.PauseTiming();
252     delete signal;
253     signal = new grpc_core::Notification();
254     count.store(0);
255     state.ResumeTiming();
256   }
257   delete signal;
258   state.SetItemsProcessed(params.limit * state.iterations());
259   for (auto i : closures) delete i;
260 }
261 BENCHMARK(BM_EventEngine_Closure_FanOut)->Apply(FanoutTestArguments);
262 
263 }  // namespace
264 
265 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
266 // and others do not. This allows us to support both modes.
267 namespace benchmark {
RunTheBenchmarksNamespaced()268 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
269 }  // namespace benchmark
270 
main(int argc,char ** argv)271 int main(int argc, char** argv) {
272   grpc::testing::TestEnvironment env(&argc, argv);
273   LibraryInitializer libInit;
274   benchmark::Initialize(&argc, argv);
275   grpc::testing::InitTest(&argc, &argv, false);
276 
277   benchmark::RunTheBenchmarksNamespaced();
278   return 0;
279 }
280