• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <benchmark/benchmark.h>
16 #include <grpc/support/cpu.h>
17 #include <grpcpp/impl/grpc_library.h>
18 
19 #include <atomic>
20 #include <cmath>
21 #include <memory>
22 #include <vector>
23 
24 #include "absl/log/check.h"
25 #include "absl/strings/str_format.h"
26 #include "src/core/lib/event_engine/common_closures.h"
27 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
28 #include "src/core/util/crash.h"
29 #include "src/core/util/notification.h"
30 #include "src/core/util/useful.h"
31 #include "test/core/test_util/test_config.h"
32 #include "test/cpp/microbenchmarks/helpers.h"
33 #include "test/cpp/util/test_config.h"
34 
35 namespace {
36 
37 using ::grpc_event_engine::experimental::AnyInvocableClosure;
38 using ::grpc_event_engine::experimental::EventEngine;
39 using ::grpc_event_engine::experimental::ThreadPool;
40 
41 struct FanoutParameters {
42   int depth;
43   int fanout;
44   int limit;
45 };
46 
BM_ThreadPool_RunSmallLambda(benchmark::State & state)47 void BM_ThreadPool_RunSmallLambda(benchmark::State& state) {
48   auto pool = grpc_event_engine::experimental::MakeThreadPool(
49       grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u));
50   const int cb_count = state.range(0);
51   std::atomic_int runcount{0};
52   for (auto _ : state) {
53     state.PauseTiming();
54     runcount.store(0);
55     grpc_core::Notification signal;
56     auto cb = [&signal, &runcount, cb_count]() {
57       if (runcount.fetch_add(1, std::memory_order_relaxed) + 1 == cb_count) {
58         signal.Notify();
59       }
60     };
61     state.ResumeTiming();
62     for (int i = 0; i < cb_count; i++) {
63       pool->Run(cb);
64     }
65     signal.WaitForNotification();
66   }
67   state.SetItemsProcessed(cb_count * state.iterations());
68   pool->Quiesce();
69 }
70 BENCHMARK(BM_ThreadPool_RunSmallLambda)
71     ->Range(100, 4096)
72     ->MeasureProcessCPUTime()
73     ->UseRealTime();
74 
BM_ThreadPool_RunClosure(benchmark::State & state)75 void BM_ThreadPool_RunClosure(benchmark::State& state) {
76   int cb_count = state.range(0);
77   grpc_core::Notification* signal = new grpc_core::Notification();
78   std::atomic_int count{0};
79   AnyInvocableClosure* closure =
80       new AnyInvocableClosure([signal_holder = &signal, cb_count, &count]() {
81         if (++count == cb_count) {
82           (*signal_holder)->Notify();
83         }
84       });
85   auto pool = grpc_event_engine::experimental::MakeThreadPool(
86       grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u));
87   for (auto _ : state) {
88     for (int i = 0; i < cb_count; i++) {
89       pool->Run(closure);
90     }
91     signal->WaitForNotification();
92     state.PauseTiming();
93     delete signal;
94     signal = new grpc_core::Notification();
95     count.store(0);
96     state.ResumeTiming();
97   }
98   delete signal;
99   state.SetItemsProcessed(cb_count * state.iterations());
100   pool->Quiesce();
101   delete closure;
102 }
103 BENCHMARK(BM_ThreadPool_RunClosure)
104     ->Range(100, 4096)
105     ->MeasureProcessCPUTime()
106     ->UseRealTime();
107 
FanoutTestArguments(benchmark::internal::Benchmark * b)108 void FanoutTestArguments(benchmark::internal::Benchmark* b) {
109   // TODO(hork): enable when the engines are fast enough to run these:
110   // ->Args({10000, 1})  // chain of callbacks scheduling callbacks
111   // ->Args({1, 10000})  // flat scheduling of callbacks
112   // ->Args({5, 6})      // depth 5, fans out to 9,330 callbacks
113   //  ->Args({2, 100})   // depth 2, fans out 10,101 callbacks
114   //  ->Args({4, 10})    // depth 4, fans out to 11,110 callbacks
115   b->Args({1000, 1})     // chain of callbacks scheduling callbacks
116       ->Args({100, 1})   // chain of callbacks scheduling callbacks
117       ->Args({1, 1000})  // flat scheduling of callbacks
118       ->Args({1, 100})   // flat scheduling of callbacks
119       ->Args({2, 70})    // depth 2, fans out 4971
120       ->Args({4, 8})     // depth 4, fans out 4681
121       ->UseRealTime()
122       ->MeasureProcessCPUTime();
123 }
124 
GetFanoutParameters(benchmark::State & state)125 FanoutParameters GetFanoutParameters(benchmark::State& state) {
126   FanoutParameters params;
127   params.depth = state.range(0);
128   params.fanout = state.range(1);
129   if (params.depth == 1 || params.fanout == 1) {
130     params.limit = std::max(params.depth, params.fanout) + 1;
131   } else {
132     // sum of geometric series
133     params.limit =
134         (1 - std::pow(params.fanout, params.depth + 1)) / (1 - params.fanout);
135   }
136   // sanity checking
137   CHECK(params.limit >= params.fanout * params.depth);
138   return params;
139 }
140 
141 // Callback for Lambda FanOut tests
142 //
143 // Note that params are copied each time for 2 reasons: 1) callbacks will
144 // inevitably continue to shut down after the end of the test, so a reference
145 // parameter will become invalid and crash some callbacks, and 2) in my RBE
146 // tests, copies are slightly faster than a shared_ptr<FanoutParams>
147 // alternative.
FanOutCallback(std::shared_ptr<ThreadPool> pool,const FanoutParameters params,grpc_core::Notification & signal,std::atomic_int & count,int processing_layer)148 void FanOutCallback(std::shared_ptr<ThreadPool> pool,
149                     const FanoutParameters params,
150                     grpc_core::Notification& signal, std::atomic_int& count,
151                     int processing_layer) {
152   int local_cnt = count.fetch_add(1, std::memory_order_acq_rel) + 1;
153   if (local_cnt == params.limit) {
154     signal.Notify();
155     return;
156   }
157   DCHECK_LT(local_cnt, params.limit);
158   if (params.depth == processing_layer) return;
159   for (int i = 0; i < params.fanout; i++) {
160     pool->Run([pool, params, processing_layer, &count, &signal]() {
161       FanOutCallback(pool, params, signal, count, processing_layer + 1);
162     });
163   }
164 }
165 
BM_ThreadPool_Lambda_FanOut(benchmark::State & state)166 void BM_ThreadPool_Lambda_FanOut(benchmark::State& state) {
167   auto params = GetFanoutParameters(state);
168   auto pool = grpc_event_engine::experimental::MakeThreadPool(
169       grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u));
170   for (auto _ : state) {
171     std::atomic_int count{0};
172     grpc_core::Notification signal;
173     FanOutCallback(pool, params, signal, count, /*processing_layer=*/0);
174     do {
175       signal.WaitForNotification();
176     } while (count.load() != params.limit);
177   }
178   state.SetItemsProcessed(params.limit * state.iterations());
179   pool->Quiesce();
180 }
181 BENCHMARK(BM_ThreadPool_Lambda_FanOut)->Apply(FanoutTestArguments);
182 
ClosureFanOutCallback(EventEngine::Closure * child_closure,std::shared_ptr<ThreadPool> pool,grpc_core::Notification ** signal_holder,std::atomic_int & count,const FanoutParameters params)183 void ClosureFanOutCallback(EventEngine::Closure* child_closure,
184                            std::shared_ptr<ThreadPool> pool,
185                            grpc_core::Notification** signal_holder,
186                            std::atomic_int& count,
187                            const FanoutParameters params) {
188   int local_cnt = count.fetch_add(1, std::memory_order_acq_rel) + 1;
189   if (local_cnt == params.limit) {
190     (*signal_holder)->Notify();
191     return;
192   }
193   if (local_cnt > params.limit) {
194     grpc_core::Crash(absl::StrFormat("Ran too many closures: %d/%d", local_cnt,
195                                      params.limit));
196   }
197   if (child_closure == nullptr) return;
198   for (int i = 0; i < params.fanout; i++) {
199     pool->Run(child_closure);
200   }
201 }
202 
BM_ThreadPool_Closure_FanOut(benchmark::State & state)203 void BM_ThreadPool_Closure_FanOut(benchmark::State& state) {
204   auto params = GetFanoutParameters(state);
205   auto pool = grpc_event_engine::experimental::MakeThreadPool(
206       grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u));
207   std::vector<EventEngine::Closure*> closures;
208   closures.reserve(params.depth + 2);
209   closures.push_back(nullptr);
210   grpc_core::Notification* signal = new grpc_core::Notification();
211   std::atomic_int count{0};
212   // prepare a unique closure for each depth
213   for (int i = 0; i <= params.depth; i++) {
214     // call the previous closure (e.g., closures[2] calls closures[1] during
215     // fanout)
216     closures.push_back(new AnyInvocableClosure(
217         [i, pool, &closures, params, signal_holder = &signal, &count]() {
218           ClosureFanOutCallback(closures[i], pool, signal_holder, count,
219                                 params);
220         }));
221   }
222   for (auto _ : state) {
223     DCHECK_EQ(count.load(std::memory_order_relaxed), 0);
224     pool->Run(closures[params.depth + 1]);
225     do {
226       signal->WaitForNotification();
227     } while (count.load() != params.limit);
228     // cleanup
229     state.PauseTiming();
230     delete signal;
231     signal = new grpc_core::Notification();
232     count.store(0);
233     state.ResumeTiming();
234   }
235   delete signal;
236   state.SetItemsProcessed(params.limit * state.iterations());
237   for (auto i : closures) delete i;
238   pool->Quiesce();
239 }
240 BENCHMARK(BM_ThreadPool_Closure_FanOut)->Apply(FanoutTestArguments);
241 
242 }  // namespace
243 
244 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
245 // and others do not. This allows us to support both modes.
246 namespace benchmark {
RunTheBenchmarksNamespaced()247 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
248 }  // namespace benchmark
249 
main(int argc,char ** argv)250 int main(int argc, char** argv) {
251   grpc::testing::TestEnvironment env(&argc, argv);
252   LibraryInitializer libInit;
253   benchmark::Initialize(&argc, argv);
254   grpc::testing::InitTest(&argc, &argv, false);
255 
256   benchmark::RunTheBenchmarksNamespaced();
257   return 0;
258 }
259