• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <benchmark/benchmark.h>
20 #include <grpc/grpc.h>
21 #include <grpc/support/alloc.h>
22 #include <string.h>
23 
24 #include <atomic>
25 
26 #include "absl/log/check.h"
27 #include "absl/log/log.h"
28 #include "src/core/lib/iomgr/ev_posix.h"
29 #include "src/core/lib/iomgr/port.h"
30 #include "src/core/lib/surface/completion_queue.h"
31 #include "src/core/util/crash.h"
32 #include "src/core/util/time.h"
33 #include "test/core/test_util/test_config.h"
34 #include "test/cpp/microbenchmarks/helpers.h"
35 #include "test/cpp/util/test_config.h"
36 
37 struct grpc_pollset {
38   gpr_mu mu;
39 };
40 
41 static gpr_mu g_mu;
42 static gpr_cv g_cv;
43 static int g_threads_active;
44 static bool g_active;
45 
46 namespace grpc {
47 namespace testing {
48 static grpc_completion_queue* g_cq;
49 
pollset_shutdown(grpc_pollset *,grpc_closure * closure)50 static void pollset_shutdown(grpc_pollset* /*ps*/, grpc_closure* closure) {
51   grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
52 }
53 
pollset_init(grpc_pollset * ps,gpr_mu ** mu)54 static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {
55   gpr_mu_init(&ps->mu);
56   *mu = &ps->mu;
57 }
58 
pollset_destroy(grpc_pollset * ps)59 static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); }
60 
pollset_kick(grpc_pollset *,grpc_pollset_worker *)61 static grpc_error_handle pollset_kick(grpc_pollset* /*p*/,
62                                       grpc_pollset_worker* /*worker*/) {
63   return absl::OkStatus();
64 }
65 
66 // Callback when the tag is dequeued from the completion queue. Does nothing
cq_done_cb(void *,grpc_cq_completion * cq_completion)67 static void cq_done_cb(void* /*done_arg*/, grpc_cq_completion* cq_completion) {
68   gpr_free(cq_completion);
69 }
70 
71 // Queues a completion tag if deadline is > 0.
72 // Does nothing if deadline is 0 (i.e gpr_time_0(GPR_CLOCK_MONOTONIC))
pollset_work(grpc_pollset * ps,grpc_pollset_worker **,grpc_core::Timestamp deadline)73 static grpc_error_handle pollset_work(grpc_pollset* ps,
74                                       grpc_pollset_worker** /*worker*/,
75                                       grpc_core::Timestamp deadline) {
76   if (deadline == grpc_core::Timestamp::ProcessEpoch()) {
77     VLOG(2) << "no-op";
78     return absl::OkStatus();
79   }
80 
81   gpr_mu_unlock(&ps->mu);
82 
83   void* tag = reinterpret_cast<void*>(10);  // Some random number
84   CHECK(grpc_cq_begin_op(g_cq, tag));
85   grpc_cq_end_op(
86       g_cq, tag, absl::OkStatus(), cq_done_cb, nullptr,
87       static_cast<grpc_cq_completion*>(gpr_malloc(sizeof(grpc_cq_completion))));
88   grpc_core::ExecCtx::Get()->Flush();
89   gpr_mu_lock(&ps->mu);
90   return absl::OkStatus();
91 }
92 
make_engine_vtable(const char * name)93 static grpc_event_engine_vtable make_engine_vtable(const char* name) {
94   grpc_event_engine_vtable vtable;
95   memset(&vtable, 0, sizeof(vtable));
96 
97   vtable.pollset_size = sizeof(grpc_pollset);
98   vtable.pollset_init = pollset_init;
99   vtable.pollset_shutdown = pollset_shutdown;
100   vtable.pollset_destroy = pollset_destroy;
101   vtable.pollset_work = pollset_work;
102   vtable.pollset_kick = pollset_kick;
103   vtable.is_any_background_poller_thread = [] { return false; };
104   vtable.add_closure_to_background_poller = [](grpc_closure* /*closure*/,
105                                                grpc_error_handle /*error*/) {
106     return false;
107   };
108   vtable.shutdown_background_closure = [] {};
109   vtable.shutdown_engine = [] {};
110   vtable.check_engine_available = [](bool) { return true; };
111   vtable.init_engine = [] {};
112   vtable.name = name;
113 
114   return vtable;
115 }
116 
setup()117 static void setup() {
118   grpc_init();
119   CHECK(strcmp(grpc_get_poll_strategy_name(), "none") == 0 ||
120         strcmp(grpc_get_poll_strategy_name(), "bm_cq_multiple_threads") == 0);
121 
122   g_cq = grpc_completion_queue_create_for_next(nullptr);
123 }
124 
teardown()125 static void teardown() {
126   grpc_completion_queue_shutdown(g_cq);
127 
128   // Drain any events
129   gpr_timespec deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
130   while (grpc_completion_queue_next(g_cq, deadline, nullptr).type !=
131          GRPC_QUEUE_SHUTDOWN) {
132     // Do nothing
133   }
134 
135   grpc_completion_queue_destroy(g_cq);
136   grpc_shutdown();
137 }
138 
139 // A few notes about Multi-threaded benchmarks:
140 
141 // Setup:
142 // The benchmark framework ensures that none of the threads proceed beyond the
143 // state.KeepRunning() call unless all the threads have called state.keepRunning
144 // at least once.  So it is safe to do the initialization in one of the threads
145 // before state.KeepRunning() is called.
146 
147 // Teardown:
148 // The benchmark framework also ensures that no thread is running the benchmark
149 // code (i.e the code between two successive calls of state.KeepRunning()) if
150 // state.KeepRunning() returns false. So it is safe to do the teardown in one
151 // of the threads after state.keepRunning() returns false.
152 
153 // However, our use requires synchronization because we do additional work at
154 // each thread that requires specific ordering (TrackCounters must be
155 // constructed after grpc_init because it needs the number of cores, initialized
156 // by grpc, and its Finish call must take place before grpc_shutdown so that it
157 // can use grpc_stats).
158 //
BM_Cq_Throughput(benchmark::State & state)159 static void BM_Cq_Throughput(benchmark::State& state) {
160   gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
161   auto thd_idx = state.thread_index();
162 
163   gpr_mu_lock(&g_mu);
164   g_threads_active++;
165   if (thd_idx == 0) {
166     setup();
167     g_active = true;
168     gpr_cv_broadcast(&g_cv);
169   } else {
170     while (!g_active) {
171       gpr_cv_wait(&g_cv, &g_mu, deadline);
172     }
173   }
174   gpr_mu_unlock(&g_mu);
175 
176   for (auto _ : state) {
177     CHECK(grpc_completion_queue_next(g_cq, deadline, nullptr).type ==
178           GRPC_OP_COMPLETE);
179   }
180 
181   state.SetItemsProcessed(state.iterations());
182 
183   gpr_mu_lock(&g_mu);
184   g_threads_active--;
185   if (g_threads_active == 0) {
186     gpr_cv_broadcast(&g_cv);
187   } else {
188     while (g_threads_active > 0) {
189       gpr_cv_wait(&g_cv, &g_mu, deadline);
190     }
191   }
192   gpr_mu_unlock(&g_mu);
193 
194   if (thd_idx == 0) {
195     teardown();
196     g_active = false;
197   }
198 }
199 
200 BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
201 
202 namespace {
203 const grpc_event_engine_vtable g_none_vtable =
204     grpc::testing::make_engine_vtable("none");
205 const grpc_event_engine_vtable g_bm_vtable =
206     grpc::testing::make_engine_vtable("bm_cq_multiple_threads");
207 }  // namespace
208 
209 }  // namespace testing
210 }  // namespace grpc
211 
212 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
213 // and others do not. This allows us to support both modes.
214 namespace benchmark {
RunTheBenchmarksNamespaced()215 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
216 }  // namespace benchmark
217 
main(int argc,char ** argv)218 int main(int argc, char** argv) {
219   // This test should only ever be run with a non or any polling engine
220   // Override the polling engine for the non-polling engine
221   // and add a custom polling engine
222   grpc_register_event_engine_factory(&grpc::testing::g_none_vtable, false);
223   grpc_register_event_engine_factory(&grpc::testing::g_bm_vtable, true);
224   grpc::testing::TestEnvironment env(&argc, argv);
225   gpr_mu_init(&g_mu);
226   gpr_cv_init(&g_cv);
227   ::benchmark::Initialize(&argc, argv);
228   grpc::testing::InitTest(&argc, &argv, false);
229   benchmark::RunTheBenchmarksNamespaced();
230   return 0;
231 }
232