1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <benchmark/benchmark.h>
20 #include <string.h>
21 #include <atomic>
22
23 #include <grpc/grpc.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include "test/cpp/microbenchmarks/helpers.h"
27 #include "test/cpp/util/test_config.h"
28
29 #include "src/core/lib/iomgr/ev_posix.h"
30 #include "src/core/lib/iomgr/port.h"
31 #include "src/core/lib/surface/completion_queue.h"
32
33 struct grpc_pollset {
34 gpr_mu mu;
35 };
36
37 static gpr_mu g_mu;
38 static gpr_cv g_cv;
39 static int g_threads_active;
40 static bool g_active;
41
42 namespace grpc {
43 namespace testing {
44 static grpc_completion_queue* g_cq;
45 static grpc_event_engine_vtable g_vtable;
46
pollset_shutdown(grpc_pollset * ps,grpc_closure * closure)47 static void pollset_shutdown(grpc_pollset* ps, grpc_closure* closure) {
48 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
49 }
50
pollset_init(grpc_pollset * ps,gpr_mu ** mu)51 static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {
52 gpr_mu_init(&ps->mu);
53 *mu = &ps->mu;
54 }
55
pollset_destroy(grpc_pollset * ps)56 static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); }
57
pollset_kick(grpc_pollset * p,grpc_pollset_worker * worker)58 static grpc_error* pollset_kick(grpc_pollset* p, grpc_pollset_worker* worker) {
59 return GRPC_ERROR_NONE;
60 }
61
62 /* Callback when the tag is dequeued from the completion queue. Does nothing */
cq_done_cb(void * done_arg,grpc_cq_completion * cq_completion)63 static void cq_done_cb(void* done_arg, grpc_cq_completion* cq_completion) {
64 gpr_free(cq_completion);
65 }
66
67 /* Queues a completion tag if deadline is > 0.
68 * Does nothing if deadline is 0 (i.e gpr_time_0(GPR_CLOCK_MONOTONIC)) */
pollset_work(grpc_pollset * ps,grpc_pollset_worker ** worker,grpc_millis deadline)69 static grpc_error* pollset_work(grpc_pollset* ps, grpc_pollset_worker** worker,
70 grpc_millis deadline) {
71 if (deadline == 0) {
72 gpr_log(GPR_DEBUG, "no-op");
73 return GRPC_ERROR_NONE;
74 }
75
76 gpr_mu_unlock(&ps->mu);
77
78 void* tag = (void*)static_cast<intptr_t>(10); // Some random number
79 GPR_ASSERT(grpc_cq_begin_op(g_cq, tag));
80 grpc_cq_end_op(
81 g_cq, tag, GRPC_ERROR_NONE, cq_done_cb, nullptr,
82 static_cast<grpc_cq_completion*>(gpr_malloc(sizeof(grpc_cq_completion))));
83 grpc_core::ExecCtx::Get()->Flush();
84 gpr_mu_lock(&ps->mu);
85 return GRPC_ERROR_NONE;
86 }
87
init_engine_vtable(bool)88 static const grpc_event_engine_vtable* init_engine_vtable(bool) {
89 memset(&g_vtable, 0, sizeof(g_vtable));
90
91 g_vtable.pollset_size = sizeof(grpc_pollset);
92 g_vtable.pollset_init = pollset_init;
93 g_vtable.pollset_shutdown = pollset_shutdown;
94 g_vtable.pollset_destroy = pollset_destroy;
95 g_vtable.pollset_work = pollset_work;
96 g_vtable.pollset_kick = pollset_kick;
97 g_vtable.shutdown_engine = [] {};
98
99 return &g_vtable;
100 }
101
setup()102 static void setup() {
103 // This test should only ever be run with a non or any polling engine
104 // Override the polling engine for the non-polling engine
105 // and add a custom polling engine
106 grpc_register_event_engine_factory("none", init_engine_vtable, false);
107 grpc_register_event_engine_factory("bm_cq_multiple_threads",
108 init_engine_vtable, true);
109
110 grpc_init();
111 GPR_ASSERT(strcmp(grpc_get_poll_strategy_name(), "none") == 0 ||
112 strcmp(grpc_get_poll_strategy_name(), "bm_cq_multiple_threads") ==
113 0);
114
115 g_cq = grpc_completion_queue_create_for_next(nullptr);
116 }
117
teardown()118 static void teardown() {
119 grpc_completion_queue_shutdown(g_cq);
120
121 /* Drain any events */
122 gpr_timespec deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
123 while (grpc_completion_queue_next(g_cq, deadline, nullptr).type !=
124 GRPC_QUEUE_SHUTDOWN) {
125 /* Do nothing */
126 }
127
128 grpc_completion_queue_destroy(g_cq);
129 grpc_shutdown();
130 }
131
132 /* A few notes about Multi-threaded benchmarks:
133
134 Setup:
135 The benchmark framework ensures that none of the threads proceed beyond the
136 state.KeepRunning() call unless all the threads have called state.keepRunning
137 atleast once. So it is safe to do the initialization in one of the threads
138 before state.KeepRunning() is called.
139
140 Teardown:
141 The benchmark framework also ensures that no thread is running the benchmark
142 code (i.e the code between two successive calls of state.KeepRunning()) if
143 state.KeepRunning() returns false. So it is safe to do the teardown in one
144 of the threads after state.keepRunning() returns false.
145
146 However, our use requires synchronization because we do additional work at
147 each thread that requires specific ordering (TrackCounters must be constructed
148 after grpc_init because it needs the number of cores, initialized by grpc,
149 and its Finish call must take place before grpc_shutdown so that it can use
150 grpc_stats).
151 */
BM_Cq_Throughput(benchmark::State & state)152 static void BM_Cq_Throughput(benchmark::State& state) {
153 gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
154 auto thd_idx = state.thread_index;
155
156 gpr_mu_lock(&g_mu);
157 g_threads_active++;
158 if (thd_idx == 0) {
159 setup();
160 g_active = true;
161 gpr_cv_broadcast(&g_cv);
162 } else {
163 while (!g_active) {
164 gpr_cv_wait(&g_cv, &g_mu, deadline);
165 }
166 }
167 gpr_mu_unlock(&g_mu);
168
169 // Use a TrackCounters object to monitor the gRPC performance statistics
170 // (optionally including low-level counters) before and after the test
171 TrackCounters track_counters;
172
173 while (state.KeepRunning()) {
174 GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, nullptr).type ==
175 GRPC_OP_COMPLETE);
176 }
177
178 state.SetItemsProcessed(state.iterations());
179 track_counters.Finish(state);
180
181 gpr_mu_lock(&g_mu);
182 g_threads_active--;
183 if (g_threads_active == 0) {
184 gpr_cv_broadcast(&g_cv);
185 } else {
186 while (g_threads_active > 0) {
187 gpr_cv_wait(&g_cv, &g_mu, deadline);
188 }
189 }
190 gpr_mu_unlock(&g_mu);
191
192 if (thd_idx == 0) {
193 teardown();
194 g_active = false;
195 }
196 }
197
198 BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
199
200 } // namespace testing
201 } // namespace grpc
202
203 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
204 // and others do not. This allows us to support both modes.
205 namespace benchmark {
RunTheBenchmarksNamespaced()206 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
207 } // namespace benchmark
208
main(int argc,char ** argv)209 int main(int argc, char** argv) {
210 gpr_mu_init(&g_mu);
211 gpr_cv_init(&g_cv);
212 ::benchmark::Initialize(&argc, argv);
213 ::grpc::testing::InitTest(&argc, &argv, false);
214 benchmark::RunTheBenchmarksNamespaced();
215 return 0;
216 }
217