1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /* Benchmark gRPC end2end in various configurations */
20
21 #ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
22 #define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
23
24 #include <benchmark/benchmark.h>
25 #include <sstream>
26 #include "src/core/lib/profiling/timers.h"
27 #include "src/proto/grpc/testing/echo.grpc.pb.h"
28 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
29 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
30
31 namespace grpc {
32 namespace testing {
33
34 /*******************************************************************************
35 * BENCHMARKING KERNELS
36 */
37
tag(intptr_t x)38 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
39
40 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
41 // messages in each call) in a loop on a single channel
42 //
43 // First parmeter (i.e state.range(0)): Message size (in bytes) to use
44 // Second parameter (i.e state.range(1)): Number of ping pong messages.
45 // Note: One ping-pong means two messages (one from client to server and
46 // the other from server to client):
47 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPong(benchmark::State & state)48 static void BM_StreamingPingPong(benchmark::State& state) {
49 const int msg_size = state.range(0);
50 const int max_ping_pongs = state.range(1);
51
52 EchoTestService::AsyncService service;
53 std::unique_ptr<Fixture> fixture(new Fixture(&service));
54 {
55 EchoResponse send_response;
56 EchoResponse recv_response;
57 EchoRequest send_request;
58 EchoRequest recv_request;
59
60 if (msg_size > 0) {
61 send_request.set_message(std::string(msg_size, 'a'));
62 send_response.set_message(std::string(msg_size, 'b'));
63 }
64
65 std::unique_ptr<EchoTestService::Stub> stub(
66 EchoTestService::NewStub(fixture->channel()));
67
68 while (state.KeepRunning()) {
69 ServerContext svr_ctx;
70 ServerContextMutator svr_ctx_mut(&svr_ctx);
71 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
72 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
73 fixture->cq(), tag(0));
74
75 ClientContext cli_ctx;
76 ClientContextMutator cli_ctx_mut(&cli_ctx);
77 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
78
79 // Establish async stream between client side and server side
80 void* t;
81 bool ok;
82 int need_tags = (1 << 0) | (1 << 1);
83 while (need_tags) {
84 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
85 GPR_ASSERT(ok);
86 int i = static_cast<int>((intptr_t)t);
87 GPR_ASSERT(need_tags & (1 << i));
88 need_tags &= ~(1 << i);
89 }
90
91 // Send 'max_ping_pongs' number of ping pong messages
92 int ping_pong_cnt = 0;
93 while (ping_pong_cnt < max_ping_pongs) {
94 request_rw->Write(send_request, tag(0)); // Start client send
95 response_rw.Read(&recv_request, tag(1)); // Start server recv
96 request_rw->Read(&recv_response, tag(2)); // Start client recv
97
98 need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
99 while (need_tags) {
100 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
101 GPR_ASSERT(ok);
102 int i = static_cast<int>((intptr_t)t);
103
104 // If server recv is complete, start the server send operation
105 if (i == 1) {
106 response_rw.Write(send_response, tag(3));
107 }
108
109 GPR_ASSERT(need_tags & (1 << i));
110 need_tags &= ~(1 << i);
111 }
112
113 ping_pong_cnt++;
114 }
115
116 request_rw->WritesDone(tag(0));
117 response_rw.Finish(Status::OK, tag(1));
118
119 Status recv_status;
120 request_rw->Finish(&recv_status, tag(2));
121
122 need_tags = (1 << 0) | (1 << 1) | (1 << 2);
123 while (need_tags) {
124 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
125 int i = static_cast<int>((intptr_t)t);
126 GPR_ASSERT(need_tags & (1 << i));
127 need_tags &= ~(1 << i);
128 }
129
130 GPR_ASSERT(recv_status.ok());
131 }
132 }
133
134 fixture->Finish(state);
135 fixture.reset();
136 state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
137 }
138
139 // Repeatedly sends ping pong messages in a single streaming Bidi call in a loop
140 // First parmeter (i.e state.range(0)): Message size (in bytes) to use
141 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPongMsgs(benchmark::State & state)142 static void BM_StreamingPingPongMsgs(benchmark::State& state) {
143 const int msg_size = state.range(0);
144
145 EchoTestService::AsyncService service;
146 std::unique_ptr<Fixture> fixture(new Fixture(&service));
147 {
148 EchoResponse send_response;
149 EchoResponse recv_response;
150 EchoRequest send_request;
151 EchoRequest recv_request;
152
153 if (msg_size > 0) {
154 send_request.set_message(std::string(msg_size, 'a'));
155 send_response.set_message(std::string(msg_size, 'b'));
156 }
157
158 std::unique_ptr<EchoTestService::Stub> stub(
159 EchoTestService::NewStub(fixture->channel()));
160
161 ServerContext svr_ctx;
162 ServerContextMutator svr_ctx_mut(&svr_ctx);
163 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
164 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
165 fixture->cq(), tag(0));
166
167 ClientContext cli_ctx;
168 ClientContextMutator cli_ctx_mut(&cli_ctx);
169 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
170
171 // Establish async stream between client side and server side
172 void* t;
173 bool ok;
174 int need_tags = (1 << 0) | (1 << 1);
175 while (need_tags) {
176 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
177 GPR_ASSERT(ok);
178 int i = static_cast<int>((intptr_t)t);
179 GPR_ASSERT(need_tags & (1 << i));
180 need_tags &= ~(1 << i);
181 }
182
183 while (state.KeepRunning()) {
184 GPR_TIMER_SCOPE("BenchmarkCycle", 0);
185 request_rw->Write(send_request, tag(0)); // Start client send
186 response_rw.Read(&recv_request, tag(1)); // Start server recv
187 request_rw->Read(&recv_response, tag(2)); // Start client recv
188
189 need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
190 while (need_tags) {
191 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
192 GPR_ASSERT(ok);
193 int i = static_cast<int>((intptr_t)t);
194
195 // If server recv is complete, start the server send operation
196 if (i == 1) {
197 response_rw.Write(send_response, tag(3));
198 }
199
200 GPR_ASSERT(need_tags & (1 << i));
201 need_tags &= ~(1 << i);
202 }
203 }
204
205 request_rw->WritesDone(tag(0));
206 response_rw.Finish(Status::OK, tag(1));
207 Status recv_status;
208 request_rw->Finish(&recv_status, tag(2));
209
210 need_tags = (1 << 0) | (1 << 1) | (1 << 2);
211 while (need_tags) {
212 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
213 int i = static_cast<int>((intptr_t)t);
214 GPR_ASSERT(need_tags & (1 << i));
215 need_tags &= ~(1 << i);
216 }
217
218 GPR_ASSERT(recv_status.ok());
219 }
220
221 fixture->Finish(state);
222 fixture.reset();
223 state.SetBytesProcessed(msg_size * state.iterations() * 2);
224 }
225
226 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
227 // messages in each call) in a loop on a single channel. Different from
228 // BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
229 // WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
230 // sendmsg syscalls for streaming by coalescing 1. initial metadata with first
231 // message; 2. final streaming message with trailing metadata.
232 //
233 // First parmeter (i.e state.range(0)): Message size (in bytes) to use
234 // Second parameter (i.e state.range(1)): Number of ping pong messages.
235 // Note: One ping-pong means two messages (one from client to server and
236 // the other from server to client):
237 // Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
238 // API and WriteLast API for server.
239 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPongWithCoalescingApi(benchmark::State & state)240 static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
241 const int msg_size = state.range(0);
242 const int max_ping_pongs = state.range(1);
243 // This options is used to test out server API: WriteLast and WriteAndFinish
244 // respectively, since we can not use both of them on server side at the same
245 // time. Value 1 means we are testing out the WriteAndFinish API, and
246 // otherwise we are testing out the WriteLast API.
247 const int write_and_finish = state.range(2);
248
249 EchoTestService::AsyncService service;
250 std::unique_ptr<Fixture> fixture(new Fixture(&service));
251 {
252 EchoResponse send_response;
253 EchoResponse recv_response;
254 EchoRequest send_request;
255 EchoRequest recv_request;
256
257 if (msg_size > 0) {
258 send_request.set_message(std::string(msg_size, 'a'));
259 send_response.set_message(std::string(msg_size, 'b'));
260 }
261
262 std::unique_ptr<EchoTestService::Stub> stub(
263 EchoTestService::NewStub(fixture->channel()));
264
265 while (state.KeepRunning()) {
266 ServerContext svr_ctx;
267 ServerContextMutator svr_ctx_mut(&svr_ctx);
268 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
269 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
270 fixture->cq(), tag(0));
271
272 ClientContext cli_ctx;
273 ClientContextMutator cli_ctx_mut(&cli_ctx);
274 cli_ctx.set_initial_metadata_corked(true);
275 // tag:1 here will never comes up, since we are not performing any op due
276 // to initial metadata coalescing.
277 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
278
279 void* t;
280 bool ok;
281 int expect_tags = 0;
282
283 // Send 'max_ping_pongs' number of ping pong messages
284 int ping_pong_cnt = 0;
285 while (ping_pong_cnt < max_ping_pongs) {
286 if (ping_pong_cnt == max_ping_pongs - 1) {
287 request_rw->WriteLast(send_request, WriteOptions(), tag(2));
288 } else {
289 request_rw->Write(send_request, tag(2)); // Start client send
290 }
291
292 int await_tags = (1 << 2);
293
294 if (ping_pong_cnt == 0) {
295 // wait for the server call structure (call_hook, etc.) to be
296 // initialized (async stream between client side and server side
297 // established). It is necessary when client init metadata is
298 // coalesced
299 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
300 while (static_cast<int>((intptr_t)t) != 0) {
301 // In some cases tag:2 comes before tag:0 (write tag comes out
302 // first), this while loop is to make sure get tag:0.
303 int i = static_cast<int>((intptr_t)t);
304 GPR_ASSERT(await_tags & (1 << i));
305 await_tags &= ~(1 << i);
306 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
307 }
308 }
309
310 response_rw.Read(&recv_request, tag(3)); // Start server recv
311 request_rw->Read(&recv_response, tag(4)); // Start client recv
312
313 await_tags |= (1 << 3) | (1 << 4);
314 expect_tags = await_tags;
315 await_tags |= (1 << 5);
316
317 while (await_tags != 0) {
318 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
319 GPR_ASSERT(ok);
320 int i = static_cast<int>((intptr_t)t);
321
322 // If server recv is complete, start the server send operation
323 if (i == 3) {
324 if (ping_pong_cnt == max_ping_pongs - 1) {
325 if (write_and_finish == 1) {
326 response_rw.WriteAndFinish(send_response, WriteOptions(),
327 Status::OK, tag(5));
328 expect_tags |= (1 << 5);
329 } else {
330 response_rw.WriteLast(send_response, WriteOptions(), tag(5));
331 // WriteLast buffers the write, so it's possible neither server
332 // write op nor client read op will finish inside the while
333 // loop.
334 await_tags &= ~(1 << 4);
335 await_tags &= ~(1 << 5);
336 expect_tags |= (1 << 5);
337 }
338 } else {
339 response_rw.Write(send_response, tag(5));
340 expect_tags |= (1 << 5);
341 }
342 }
343
344 GPR_ASSERT(expect_tags & (1 << i));
345 expect_tags &= ~(1 << i);
346 await_tags &= ~(1 << i);
347 }
348
349 ping_pong_cnt++;
350 }
351
352 if (max_ping_pongs == 0) {
353 expect_tags |= (1 << 6) | (1 << 7) | (1 << 8);
354 } else {
355 if (write_and_finish == 1) {
356 expect_tags |= (1 << 8);
357 } else {
358 // server's buffered write and the client's read of the buffered write
359 // tags should come up.
360 expect_tags |= (1 << 7) | (1 << 8);
361 }
362 }
363
364 // No message write or initial metadata write happened yet.
365 if (max_ping_pongs == 0) {
366 request_rw->WritesDone(tag(6));
367 // wait for server call data structure(call_hook, etc.) to be
368 // initialized, since initial metadata is corked.
369 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
370 while (static_cast<int>((intptr_t)t) != 0) {
371 int i = static_cast<int>((intptr_t)t);
372 GPR_ASSERT(expect_tags & (1 << i));
373 expect_tags &= ~(1 << i);
374 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
375 }
376 response_rw.Finish(Status::OK, tag(7));
377 } else {
378 if (write_and_finish != 1) {
379 response_rw.Finish(Status::OK, tag(7));
380 }
381 }
382
383 Status recv_status;
384 request_rw->Finish(&recv_status, tag(8));
385
386 while (expect_tags) {
387 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
388 int i = static_cast<int>((intptr_t)t);
389 GPR_ASSERT(expect_tags & (1 << i));
390 expect_tags &= ~(1 << i);
391 }
392
393 GPR_ASSERT(recv_status.ok());
394 }
395 }
396
397 fixture->Finish(state);
398 fixture.reset();
399 state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
400 }
401 } // namespace testing
402 } // namespace grpc
403
404 #endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
405