1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <benchmark/benchmark.h>
20 #include <grpc/grpc.h>
21 #include <grpc/support/alloc.h>
22 #include <string.h>
23
24 #include <atomic>
25
26 #include "absl/log/check.h"
27 #include "absl/log/log.h"
28 #include "src/core/lib/iomgr/ev_posix.h"
29 #include "src/core/lib/iomgr/port.h"
30 #include "src/core/lib/surface/completion_queue.h"
31 #include "src/core/util/crash.h"
32 #include "src/core/util/time.h"
33 #include "test/core/test_util/test_config.h"
34 #include "test/cpp/microbenchmarks/helpers.h"
35 #include "test/cpp/util/test_config.h"
36
37 struct grpc_pollset {
38 gpr_mu mu;
39 };
40
41 static gpr_mu g_mu;
42 static gpr_cv g_cv;
43 static int g_threads_active;
44 static bool g_active;
45
46 namespace grpc {
47 namespace testing {
48 static grpc_completion_queue* g_cq;
49
pollset_shutdown(grpc_pollset *,grpc_closure * closure)50 static void pollset_shutdown(grpc_pollset* /*ps*/, grpc_closure* closure) {
51 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
52 }
53
pollset_init(grpc_pollset * ps,gpr_mu ** mu)54 static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {
55 gpr_mu_init(&ps->mu);
56 *mu = &ps->mu;
57 }
58
pollset_destroy(grpc_pollset * ps)59 static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); }
60
pollset_kick(grpc_pollset *,grpc_pollset_worker *)61 static grpc_error_handle pollset_kick(grpc_pollset* /*p*/,
62 grpc_pollset_worker* /*worker*/) {
63 return absl::OkStatus();
64 }
65
66 // Callback when the tag is dequeued from the completion queue. Does nothing
cq_done_cb(void *,grpc_cq_completion * cq_completion)67 static void cq_done_cb(void* /*done_arg*/, grpc_cq_completion* cq_completion) {
68 gpr_free(cq_completion);
69 }
70
71 // Queues a completion tag if deadline is > 0.
72 // Does nothing if deadline is 0 (i.e gpr_time_0(GPR_CLOCK_MONOTONIC))
pollset_work(grpc_pollset * ps,grpc_pollset_worker **,grpc_core::Timestamp deadline)73 static grpc_error_handle pollset_work(grpc_pollset* ps,
74 grpc_pollset_worker** /*worker*/,
75 grpc_core::Timestamp deadline) {
76 if (deadline == grpc_core::Timestamp::ProcessEpoch()) {
77 VLOG(2) << "no-op";
78 return absl::OkStatus();
79 }
80
81 gpr_mu_unlock(&ps->mu);
82
83 void* tag = reinterpret_cast<void*>(10); // Some random number
84 CHECK(grpc_cq_begin_op(g_cq, tag));
85 grpc_cq_end_op(
86 g_cq, tag, absl::OkStatus(), cq_done_cb, nullptr,
87 static_cast<grpc_cq_completion*>(gpr_malloc(sizeof(grpc_cq_completion))));
88 grpc_core::ExecCtx::Get()->Flush();
89 gpr_mu_lock(&ps->mu);
90 return absl::OkStatus();
91 }
92
make_engine_vtable(const char * name)93 static grpc_event_engine_vtable make_engine_vtable(const char* name) {
94 grpc_event_engine_vtable vtable;
95 memset(&vtable, 0, sizeof(vtable));
96
97 vtable.pollset_size = sizeof(grpc_pollset);
98 vtable.pollset_init = pollset_init;
99 vtable.pollset_shutdown = pollset_shutdown;
100 vtable.pollset_destroy = pollset_destroy;
101 vtable.pollset_work = pollset_work;
102 vtable.pollset_kick = pollset_kick;
103 vtable.is_any_background_poller_thread = [] { return false; };
104 vtable.add_closure_to_background_poller = [](grpc_closure* /*closure*/,
105 grpc_error_handle /*error*/) {
106 return false;
107 };
108 vtable.shutdown_background_closure = [] {};
109 vtable.shutdown_engine = [] {};
110 vtable.check_engine_available = [](bool) { return true; };
111 vtable.init_engine = [] {};
112 vtable.name = name;
113
114 return vtable;
115 }
116
setup()117 static void setup() {
118 grpc_init();
119 CHECK(strcmp(grpc_get_poll_strategy_name(), "none") == 0 ||
120 strcmp(grpc_get_poll_strategy_name(), "bm_cq_multiple_threads") == 0);
121
122 g_cq = grpc_completion_queue_create_for_next(nullptr);
123 }
124
teardown()125 static void teardown() {
126 grpc_completion_queue_shutdown(g_cq);
127
128 // Drain any events
129 gpr_timespec deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
130 while (grpc_completion_queue_next(g_cq, deadline, nullptr).type !=
131 GRPC_QUEUE_SHUTDOWN) {
132 // Do nothing
133 }
134
135 grpc_completion_queue_destroy(g_cq);
136 grpc_shutdown();
137 }
138
139 // A few notes about Multi-threaded benchmarks:
140
141 // Setup:
142 // The benchmark framework ensures that none of the threads proceed beyond the
143 // state.KeepRunning() call unless all the threads have called state.keepRunning
144 // at least once. So it is safe to do the initialization in one of the threads
145 // before state.KeepRunning() is called.
146
147 // Teardown:
148 // The benchmark framework also ensures that no thread is running the benchmark
149 // code (i.e the code between two successive calls of state.KeepRunning()) if
150 // state.KeepRunning() returns false. So it is safe to do the teardown in one
151 // of the threads after state.keepRunning() returns false.
152
153 // However, our use requires synchronization because we do additional work at
154 // each thread that requires specific ordering (TrackCounters must be
155 // constructed after grpc_init because it needs the number of cores, initialized
156 // by grpc, and its Finish call must take place before grpc_shutdown so that it
157 // can use grpc_stats).
158 //
BM_Cq_Throughput(benchmark::State & state)159 static void BM_Cq_Throughput(benchmark::State& state) {
160 gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
161 auto thd_idx = state.thread_index();
162
163 gpr_mu_lock(&g_mu);
164 g_threads_active++;
165 if (thd_idx == 0) {
166 setup();
167 g_active = true;
168 gpr_cv_broadcast(&g_cv);
169 } else {
170 while (!g_active) {
171 gpr_cv_wait(&g_cv, &g_mu, deadline);
172 }
173 }
174 gpr_mu_unlock(&g_mu);
175
176 for (auto _ : state) {
177 CHECK(grpc_completion_queue_next(g_cq, deadline, nullptr).type ==
178 GRPC_OP_COMPLETE);
179 }
180
181 state.SetItemsProcessed(state.iterations());
182
183 gpr_mu_lock(&g_mu);
184 g_threads_active--;
185 if (g_threads_active == 0) {
186 gpr_cv_broadcast(&g_cv);
187 } else {
188 while (g_threads_active > 0) {
189 gpr_cv_wait(&g_cv, &g_mu, deadline);
190 }
191 }
192 gpr_mu_unlock(&g_mu);
193
194 if (thd_idx == 0) {
195 teardown();
196 g_active = false;
197 }
198 }
199
200 BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
201
202 namespace {
203 const grpc_event_engine_vtable g_none_vtable =
204 grpc::testing::make_engine_vtable("none");
205 const grpc_event_engine_vtable g_bm_vtable =
206 grpc::testing::make_engine_vtable("bm_cq_multiple_threads");
207 } // namespace
208
209 } // namespace testing
210 } // namespace grpc
211
212 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
213 // and others do not. This allows us to support both modes.
214 namespace benchmark {
RunTheBenchmarksNamespaced()215 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
216 } // namespace benchmark
217
main(int argc,char ** argv)218 int main(int argc, char** argv) {
219 // This test should only ever be run with a non or any polling engine
220 // Override the polling engine for the non-polling engine
221 // and add a custom polling engine
222 grpc_register_event_engine_factory(&grpc::testing::g_none_vtable, false);
223 grpc_register_event_engine_factory(&grpc::testing::g_bm_vtable, true);
224 grpc::testing::TestEnvironment env(&argc, argv);
225 gpr_mu_init(&g_mu);
226 gpr_cv_init(&g_cv);
227 ::benchmark::Initialize(&argc, argv);
228 grpc::testing::InitTest(&argc, &argv, false);
229 benchmark::RunTheBenchmarksNamespaced();
230 return 0;
231 }
232