• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* Test out pollset latencies */
20 
21 #include <benchmark/benchmark.h>
22 #include <grpc/grpc.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 
26 #include "src/core/lib/gpr/useful.h"
27 #include "src/core/lib/iomgr/ev_posix.h"
28 #include "src/core/lib/iomgr/pollset.h"
29 #include "src/core/lib/iomgr/port.h"
30 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
31 
32 #include "test/cpp/microbenchmarks/helpers.h"
33 #include "test/cpp/util/test_config.h"
34 
35 #include <string.h>
36 
37 #ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL
38 #include <sys/epoll.h>
39 #include <sys/eventfd.h>
40 #include <unistd.h>
41 #endif
42 
43 auto& force_library_initialization = Library::get();
44 
shutdown_ps(void * ps,grpc_error * error)45 static void shutdown_ps(void* ps, grpc_error* error) {
46   grpc_pollset_destroy(static_cast<grpc_pollset*>(ps));
47 }
48 
BM_CreateDestroyPollset(benchmark::State & state)49 static void BM_CreateDestroyPollset(benchmark::State& state) {
50   TrackCounters track_counters;
51   size_t ps_sz = grpc_pollset_size();
52   grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_malloc(ps_sz));
53   gpr_mu* mu;
54   grpc_core::ExecCtx exec_ctx;
55   grpc_closure shutdown_ps_closure;
56   GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
57                     grpc_schedule_on_exec_ctx);
58   while (state.KeepRunning()) {
59     memset(ps, 0, ps_sz);
60     grpc_pollset_init(ps, &mu);
61     gpr_mu_lock(mu);
62     grpc_pollset_shutdown(ps, &shutdown_ps_closure);
63     gpr_mu_unlock(mu);
64     grpc_core::ExecCtx::Get()->Flush();
65   }
66   grpc_core::ExecCtx::Get()->Flush();
67   gpr_free(ps);
68   track_counters.Finish(state);
69 }
70 BENCHMARK(BM_CreateDestroyPollset);
71 
72 #ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL
BM_PollEmptyPollset_SpeedOfLight(benchmark::State & state)73 static void BM_PollEmptyPollset_SpeedOfLight(benchmark::State& state) {
74   // equivalent to BM_PollEmptyPollset, but just use the OS primitives to guage
75   // what the speed of light would be if we abstracted perfectly
76   TrackCounters track_counters;
77   int epfd = epoll_create1(0);
78   GPR_ASSERT(epfd != -1);
79   size_t nev = state.range(0);
80   size_t nfd = state.range(1);
81   epoll_event* ev = new epoll_event[nev];
82   std::vector<int> fds;
83   for (size_t i = 0; i < nfd; i++) {
84     fds.push_back(eventfd(0, 0));
85     epoll_event ev;
86     ev.events = EPOLLIN;
87     epoll_ctl(epfd, EPOLL_CTL_ADD, fds.back(), &ev);
88   }
89   while (state.KeepRunning()) {
90     epoll_wait(epfd, ev, nev, 0);
91   }
92   for (auto fd : fds) {
93     close(fd);
94   }
95   close(epfd);
96   delete[] ev;
97   track_counters.Finish(state);
98 }
99 BENCHMARK(BM_PollEmptyPollset_SpeedOfLight)
100     ->Args({1, 0})
101     ->Args({1, 1})
102     ->Args({1, 10})
103     ->Args({1, 100})
104     ->Args({1, 1000})
105     ->Args({1, 10000})
106     ->Args({1, 100000})
107     ->Args({10, 1})
108     ->Args({100, 1})
109     ->Args({1000, 1});
110 #endif
111 
BM_PollEmptyPollset(benchmark::State & state)112 static void BM_PollEmptyPollset(benchmark::State& state) {
113   TrackCounters track_counters;
114   size_t ps_sz = grpc_pollset_size();
115   grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_zalloc(ps_sz));
116   gpr_mu* mu;
117   grpc_pollset_init(ps, &mu);
118   grpc_core::ExecCtx exec_ctx;
119   gpr_mu_lock(mu);
120   while (state.KeepRunning()) {
121     GRPC_ERROR_UNREF(grpc_pollset_work(ps, nullptr, 0));
122   }
123   grpc_closure shutdown_ps_closure;
124   GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
125                     grpc_schedule_on_exec_ctx);
126   grpc_pollset_shutdown(ps, &shutdown_ps_closure);
127   gpr_mu_unlock(mu);
128   grpc_core::ExecCtx::Get()->Flush();
129   gpr_free(ps);
130   track_counters.Finish(state);
131 }
132 BENCHMARK(BM_PollEmptyPollset);
133 
BM_PollAddFd(benchmark::State & state)134 static void BM_PollAddFd(benchmark::State& state) {
135   TrackCounters track_counters;
136   size_t ps_sz = grpc_pollset_size();
137   grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_zalloc(ps_sz));
138   gpr_mu* mu;
139   grpc_pollset_init(ps, &mu);
140   grpc_core::ExecCtx exec_ctx;
141   grpc_wakeup_fd wakeup_fd;
142   GPR_ASSERT(
143       GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&wakeup_fd)));
144   grpc_fd* fd = grpc_fd_create(wakeup_fd.read_fd, "xxx", false);
145   while (state.KeepRunning()) {
146     grpc_pollset_add_fd(ps, fd);
147     grpc_core::ExecCtx::Get()->Flush();
148   }
149   grpc_fd_orphan(fd, nullptr, nullptr, "xxx");
150   grpc_closure shutdown_ps_closure;
151   GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
152                     grpc_schedule_on_exec_ctx);
153   gpr_mu_lock(mu);
154   grpc_pollset_shutdown(ps, &shutdown_ps_closure);
155   gpr_mu_unlock(mu);
156   grpc_core::ExecCtx::Get()->Flush();
157   gpr_free(ps);
158   track_counters.Finish(state);
159 }
160 BENCHMARK(BM_PollAddFd);
161 
162 class Closure : public grpc_closure {
163  public:
~Closure()164   virtual ~Closure() {}
165 };
166 
167 template <class F>
MakeClosure(F f,grpc_closure_scheduler * scheduler)168 Closure* MakeClosure(F f, grpc_closure_scheduler* scheduler) {
169   struct C : public Closure {
170     C(F f, grpc_closure_scheduler* scheduler) : f_(f) {
171       GRPC_CLOSURE_INIT(this, C::cbfn, this, scheduler);
172     }
173     static void cbfn(void* arg, grpc_error* error) {
174       C* p = static_cast<C*>(arg);
175       p->f_();
176     }
177     F f_;
178   };
179   return new C(f, scheduler);
180 }
181 
182 #ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL
BM_SingleThreadPollOneFd_SpeedOfLight(benchmark::State & state)183 static void BM_SingleThreadPollOneFd_SpeedOfLight(benchmark::State& state) {
184   // equivalent to BM_PollEmptyPollset, but just use the OS primitives to guage
185   // what the speed of light would be if we abstracted perfectly
186   TrackCounters track_counters;
187   int epfd = epoll_create1(0);
188   GPR_ASSERT(epfd != -1);
189   epoll_event ev[100];
190   int fd = eventfd(0, EFD_NONBLOCK);
191   ev[0].events = EPOLLIN;
192   epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev[0]);
193   while (state.KeepRunning()) {
194     int err;
195     do {
196       err = eventfd_write(fd, 1);
197     } while (err < 0 && errno == EINTR);
198     GPR_ASSERT(err == 0);
199     do {
200       err = epoll_wait(epfd, ev, GPR_ARRAY_SIZE(ev), 0);
201     } while (err < 0 && errno == EINTR);
202     GPR_ASSERT(err == 1);
203     eventfd_t value;
204     do {
205       err = eventfd_read(fd, &value);
206     } while (err < 0 && errno == EINTR);
207     GPR_ASSERT(err == 0);
208   }
209   close(fd);
210   close(epfd);
211   track_counters.Finish(state);
212 }
213 BENCHMARK(BM_SingleThreadPollOneFd_SpeedOfLight);
214 #endif
215 
BM_SingleThreadPollOneFd(benchmark::State & state)216 static void BM_SingleThreadPollOneFd(benchmark::State& state) {
217   TrackCounters track_counters;
218   size_t ps_sz = grpc_pollset_size();
219   grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_zalloc(ps_sz));
220   gpr_mu* mu;
221   grpc_pollset_init(ps, &mu);
222   grpc_core::ExecCtx exec_ctx;
223   grpc_wakeup_fd wakeup_fd;
224   GRPC_ERROR_UNREF(grpc_wakeup_fd_init(&wakeup_fd));
225   grpc_fd* wakeup = grpc_fd_create(wakeup_fd.read_fd, "wakeup_read", false);
226   grpc_pollset_add_fd(ps, wakeup);
227   bool done = false;
228   Closure* continue_closure = MakeClosure(
229       [&]() {
230         GRPC_ERROR_UNREF(grpc_wakeup_fd_consume_wakeup(&wakeup_fd));
231         if (!state.KeepRunning()) {
232           done = true;
233           return;
234         }
235         GRPC_ERROR_UNREF(grpc_wakeup_fd_wakeup(&wakeup_fd));
236         grpc_fd_notify_on_read(wakeup, continue_closure);
237       },
238       grpc_schedule_on_exec_ctx);
239   GRPC_ERROR_UNREF(grpc_wakeup_fd_wakeup(&wakeup_fd));
240   grpc_fd_notify_on_read(wakeup, continue_closure);
241   gpr_mu_lock(mu);
242   while (!done) {
243     GRPC_ERROR_UNREF(grpc_pollset_work(ps, nullptr, GRPC_MILLIS_INF_FUTURE));
244   }
245   grpc_fd_orphan(wakeup, nullptr, nullptr, "done");
246   wakeup_fd.read_fd = 0;
247   grpc_closure shutdown_ps_closure;
248   GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
249                     grpc_schedule_on_exec_ctx);
250   grpc_pollset_shutdown(ps, &shutdown_ps_closure);
251   gpr_mu_unlock(mu);
252   grpc_core::ExecCtx::Get()->Flush();
253   grpc_wakeup_fd_destroy(&wakeup_fd);
254   gpr_free(ps);
255   track_counters.Finish(state);
256   delete continue_closure;
257 }
258 BENCHMARK(BM_SingleThreadPollOneFd);
259 
260 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
261 // and others do not. This allows us to support both modes.
262 namespace benchmark {
RunTheBenchmarksNamespaced()263 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
264 }  // namespace benchmark
265 
main(int argc,char ** argv)266 int main(int argc, char** argv) {
267   ::benchmark::Initialize(&argc, argv);
268   ::grpc::testing::InitTest(&argc, &argv, false);
269   benchmark::RunTheBenchmarksNamespaced();
270   return 0;
271 }
272