• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <benchmark/benchmark.h>
20 #include <string.h>
21 #include <atomic>
22 
23 #include <grpc/grpc.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include "test/cpp/microbenchmarks/helpers.h"
27 #include "test/cpp/util/test_config.h"
28 
29 #include "src/core/lib/iomgr/ev_posix.h"
30 #include "src/core/lib/iomgr/port.h"
31 #include "src/core/lib/surface/completion_queue.h"
32 
33 struct grpc_pollset {
34   gpr_mu mu;
35 };
36 
37 static gpr_mu g_mu;
38 static gpr_cv g_cv;
39 static int g_threads_active;
40 static bool g_active;
41 
42 namespace grpc {
43 namespace testing {
44 static grpc_completion_queue* g_cq;
45 static grpc_event_engine_vtable g_vtable;
46 
pollset_shutdown(grpc_pollset * ps,grpc_closure * closure)47 static void pollset_shutdown(grpc_pollset* ps, grpc_closure* closure) {
48   GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
49 }
50 
pollset_init(grpc_pollset * ps,gpr_mu ** mu)51 static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {
52   gpr_mu_init(&ps->mu);
53   *mu = &ps->mu;
54 }
55 
pollset_destroy(grpc_pollset * ps)56 static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); }
57 
pollset_kick(grpc_pollset * p,grpc_pollset_worker * worker)58 static grpc_error* pollset_kick(grpc_pollset* p, grpc_pollset_worker* worker) {
59   return GRPC_ERROR_NONE;
60 }
61 
62 /* Callback when the tag is dequeued from the completion queue. Does nothing */
cq_done_cb(void * done_arg,grpc_cq_completion * cq_completion)63 static void cq_done_cb(void* done_arg, grpc_cq_completion* cq_completion) {
64   gpr_free(cq_completion);
65 }
66 
67 /* Queues a completion tag if deadline is > 0.
68  * Does nothing if deadline is 0 (i.e gpr_time_0(GPR_CLOCK_MONOTONIC)) */
pollset_work(grpc_pollset * ps,grpc_pollset_worker ** worker,grpc_millis deadline)69 static grpc_error* pollset_work(grpc_pollset* ps, grpc_pollset_worker** worker,
70                                 grpc_millis deadline) {
71   if (deadline == 0) {
72     gpr_log(GPR_DEBUG, "no-op");
73     return GRPC_ERROR_NONE;
74   }
75 
76   gpr_mu_unlock(&ps->mu);
77 
78   void* tag = (void*)static_cast<intptr_t>(10);  // Some random number
79   GPR_ASSERT(grpc_cq_begin_op(g_cq, tag));
80   grpc_cq_end_op(
81       g_cq, tag, GRPC_ERROR_NONE, cq_done_cb, nullptr,
82       static_cast<grpc_cq_completion*>(gpr_malloc(sizeof(grpc_cq_completion))));
83   grpc_core::ExecCtx::Get()->Flush();
84   gpr_mu_lock(&ps->mu);
85   return GRPC_ERROR_NONE;
86 }
87 
init_engine_vtable(bool)88 static const grpc_event_engine_vtable* init_engine_vtable(bool) {
89   memset(&g_vtable, 0, sizeof(g_vtable));
90 
91   g_vtable.pollset_size = sizeof(grpc_pollset);
92   g_vtable.pollset_init = pollset_init;
93   g_vtable.pollset_shutdown = pollset_shutdown;
94   g_vtable.pollset_destroy = pollset_destroy;
95   g_vtable.pollset_work = pollset_work;
96   g_vtable.pollset_kick = pollset_kick;
97   g_vtable.shutdown_engine = [] {};
98 
99   return &g_vtable;
100 }
101 
setup()102 static void setup() {
103   // This test should only ever be run with a non or any polling engine
104   // Override the polling engine for the non-polling engine
105   // and add a custom polling engine
106   grpc_register_event_engine_factory("none", init_engine_vtable, false);
107   grpc_register_event_engine_factory("bm_cq_multiple_threads",
108                                      init_engine_vtable, true);
109 
110   grpc_init();
111   GPR_ASSERT(strcmp(grpc_get_poll_strategy_name(), "none") == 0 ||
112              strcmp(grpc_get_poll_strategy_name(), "bm_cq_multiple_threads") ==
113                  0);
114 
115   g_cq = grpc_completion_queue_create_for_next(nullptr);
116 }
117 
teardown()118 static void teardown() {
119   grpc_completion_queue_shutdown(g_cq);
120 
121   /* Drain any events */
122   gpr_timespec deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
123   while (grpc_completion_queue_next(g_cq, deadline, nullptr).type !=
124          GRPC_QUEUE_SHUTDOWN) {
125     /* Do nothing */
126   }
127 
128   grpc_completion_queue_destroy(g_cq);
129   grpc_shutdown();
130 }
131 
132 /* A few notes about Multi-threaded benchmarks:
133 
134  Setup:
135   The benchmark framework ensures that none of the threads proceed beyond the
136   state.KeepRunning() call unless all the threads have called state.keepRunning
137   atleast once.  So it is safe to do the initialization in one of the threads
138   before state.KeepRunning() is called.
139 
140  Teardown:
141   The benchmark framework also ensures that no thread is running the benchmark
142   code (i.e the code between two successive calls of state.KeepRunning()) if
143   state.KeepRunning() returns false. So it is safe to do the teardown in one
144   of the threads after state.keepRunning() returns false.
145 
146  However, our use requires synchronization because we do additional work at
147  each thread that requires specific ordering (TrackCounters must be constructed
148  after grpc_init because it needs the number of cores, initialized by grpc,
149  and its Finish call must take place before grpc_shutdown so that it can use
150  grpc_stats).
151 */
BM_Cq_Throughput(benchmark::State & state)152 static void BM_Cq_Throughput(benchmark::State& state) {
153   gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
154   auto thd_idx = state.thread_index;
155 
156   gpr_mu_lock(&g_mu);
157   g_threads_active++;
158   if (thd_idx == 0) {
159     setup();
160     g_active = true;
161     gpr_cv_broadcast(&g_cv);
162   } else {
163     while (!g_active) {
164       gpr_cv_wait(&g_cv, &g_mu, deadline);
165     }
166   }
167   gpr_mu_unlock(&g_mu);
168 
169   // Use a TrackCounters object to monitor the gRPC performance statistics
170   // (optionally including low-level counters) before and after the test
171   TrackCounters track_counters;
172 
173   while (state.KeepRunning()) {
174     GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, nullptr).type ==
175                GRPC_OP_COMPLETE);
176   }
177 
178   state.SetItemsProcessed(state.iterations());
179   track_counters.Finish(state);
180 
181   gpr_mu_lock(&g_mu);
182   g_threads_active--;
183   if (g_threads_active == 0) {
184     gpr_cv_broadcast(&g_cv);
185   } else {
186     while (g_threads_active > 0) {
187       gpr_cv_wait(&g_cv, &g_mu, deadline);
188     }
189   }
190   gpr_mu_unlock(&g_mu);
191 
192   if (thd_idx == 0) {
193     teardown();
194     g_active = false;
195   }
196 }
197 
198 BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
199 
200 }  // namespace testing
201 }  // namespace grpc
202 
203 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
204 // and others do not. This allows us to support both modes.
205 namespace benchmark {
RunTheBenchmarksNamespaced()206 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
207 }  // namespace benchmark
208 
main(int argc,char ** argv)209 int main(int argc, char** argv) {
210   gpr_mu_init(&g_mu);
211   gpr_cv_init(&g_cv);
212   ::benchmark::Initialize(&argc, argv);
213   ::grpc::testing::InitTest(&argc, &argv, false);
214   benchmark::RunTheBenchmarksNamespaced();
215   return 0;
216 }
217