1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 // Benchmark gRPC end2end in various configurations
20
21 #ifndef GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
22 #define GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
23
24 #include <benchmark/benchmark.h>
25
26 #include <sstream>
27
28 #include "absl/log/check.h"
29 #include "src/proto/grpc/testing/echo.grpc.pb.h"
30 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
31 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
32
33 namespace grpc {
34 namespace testing {
35
36 //******************************************************************************
37 // BENCHMARKING KERNELS
38 //
39
tag(intptr_t x)40 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
41
42 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
43 // messages in each call) in a loop on a single channel
44 //
45 // First parameter (i.e state.range(0)): Message size (in bytes) to use
46 // Second parameter (i.e state.range(1)): Number of ping pong messages.
47 // Note: One ping-pong means two messages (one from client to server and
48 // the other from server to client):
49 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPong(benchmark::State & state)50 static void BM_StreamingPingPong(benchmark::State& state) {
51 const int msg_size = state.range(0);
52 const int max_ping_pongs = state.range(1);
53
54 EchoTestService::AsyncService service;
55 std::unique_ptr<Fixture> fixture(new Fixture(&service));
56 {
57 EchoResponse send_response;
58 EchoResponse recv_response;
59 EchoRequest send_request;
60 EchoRequest recv_request;
61
62 if (msg_size > 0) {
63 send_request.set_message(std::string(msg_size, 'a'));
64 send_response.set_message(std::string(msg_size, 'b'));
65 }
66
67 std::unique_ptr<EchoTestService::Stub> stub(
68 EchoTestService::NewStub(fixture->channel()));
69
70 for (auto _ : state) {
71 ServerContext svr_ctx;
72 ServerContextMutator svr_ctx_mut(&svr_ctx);
73 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
74 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
75 fixture->cq(), tag(0));
76
77 ClientContext cli_ctx;
78 ClientContextMutator cli_ctx_mut(&cli_ctx);
79 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
80
81 // Establish async stream between client side and server side
82 void* t;
83 bool ok;
84 int need_tags = (1 << 0) | (1 << 1);
85 while (need_tags) {
86 CHECK(fixture->cq()->Next(&t, &ok));
87 CHECK(ok);
88 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
89 CHECK(need_tags & (1 << i));
90 need_tags &= ~(1 << i);
91 }
92
93 // Send 'max_ping_pongs' number of ping pong messages
94 int ping_pong_cnt = 0;
95 while (ping_pong_cnt < max_ping_pongs) {
96 request_rw->Write(send_request, tag(0)); // Start client send
97 response_rw.Read(&recv_request, tag(1)); // Start server recv
98 request_rw->Read(&recv_response, tag(2)); // Start client recv
99
100 need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
101 while (need_tags) {
102 CHECK(fixture->cq()->Next(&t, &ok));
103 CHECK(ok);
104 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
105
106 // If server recv is complete, start the server send operation
107 if (i == 1) {
108 response_rw.Write(send_response, tag(3));
109 }
110
111 CHECK(need_tags & (1 << i));
112 need_tags &= ~(1 << i);
113 }
114
115 ping_pong_cnt++;
116 }
117
118 request_rw->WritesDone(tag(0));
119 response_rw.Finish(Status::OK, tag(1));
120
121 Status recv_status;
122 request_rw->Finish(&recv_status, tag(2));
123
124 need_tags = (1 << 0) | (1 << 1) | (1 << 2);
125 while (need_tags) {
126 CHECK(fixture->cq()->Next(&t, &ok));
127 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
128 CHECK(need_tags & (1 << i));
129 need_tags &= ~(1 << i);
130 }
131
132 CHECK(recv_status.ok());
133 }
134 }
135
136 fixture.reset();
137 state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
138 }
139
140 // Repeatedly sends ping pong messages in a single streaming Bidi call in a loop
141 // First parameter (i.e state.range(0)): Message size (in bytes) to use
142 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPongMsgs(benchmark::State & state)143 static void BM_StreamingPingPongMsgs(benchmark::State& state) {
144 const int msg_size = state.range(0);
145
146 EchoTestService::AsyncService service;
147 std::unique_ptr<Fixture> fixture(new Fixture(&service));
148 {
149 EchoResponse send_response;
150 EchoResponse recv_response;
151 EchoRequest send_request;
152 EchoRequest recv_request;
153
154 if (msg_size > 0) {
155 send_request.set_message(std::string(msg_size, 'a'));
156 send_response.set_message(std::string(msg_size, 'b'));
157 }
158
159 std::unique_ptr<EchoTestService::Stub> stub(
160 EchoTestService::NewStub(fixture->channel()));
161
162 ServerContext svr_ctx;
163 ServerContextMutator svr_ctx_mut(&svr_ctx);
164 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
165 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
166 fixture->cq(), tag(0));
167
168 ClientContext cli_ctx;
169 ClientContextMutator cli_ctx_mut(&cli_ctx);
170 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
171
172 // Establish async stream between client side and server side
173 void* t;
174 bool ok;
175 int need_tags = (1 << 0) | (1 << 1);
176 while (need_tags) {
177 CHECK(fixture->cq()->Next(&t, &ok));
178 CHECK(ok);
179 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
180 CHECK(need_tags & (1 << i));
181 need_tags &= ~(1 << i);
182 }
183
184 for (auto _ : state) {
185 request_rw->Write(send_request, tag(0)); // Start client send
186 response_rw.Read(&recv_request, tag(1)); // Start server recv
187 request_rw->Read(&recv_response, tag(2)); // Start client recv
188
189 need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
190 while (need_tags) {
191 CHECK(fixture->cq()->Next(&t, &ok));
192 CHECK(ok);
193 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
194
195 // If server recv is complete, start the server send operation
196 if (i == 1) {
197 response_rw.Write(send_response, tag(3));
198 }
199
200 CHECK(need_tags & (1 << i));
201 need_tags &= ~(1 << i);
202 }
203 }
204
205 request_rw->WritesDone(tag(0));
206 response_rw.Finish(Status::OK, tag(1));
207 Status recv_status;
208 request_rw->Finish(&recv_status, tag(2));
209
210 need_tags = (1 << 0) | (1 << 1) | (1 << 2);
211 while (need_tags) {
212 CHECK(fixture->cq()->Next(&t, &ok));
213 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
214 CHECK(need_tags & (1 << i));
215 need_tags &= ~(1 << i);
216 }
217
218 CHECK(recv_status.ok());
219 }
220
221 fixture.reset();
222 state.SetBytesProcessed(msg_size * state.iterations() * 2);
223 }
224
225 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
226 // messages in each call) in a loop on a single channel. Different from
227 // BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
228 // WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
229 // sendmsg syscalls for streaming by coalescing 1. initial metadata with first
230 // message; 2. final streaming message with trailing metadata.
231 //
232 // First parameter (i.e state.range(0)): Message size (in bytes) to use
233 // Second parameter (i.e state.range(1)): Number of ping pong messages.
234 // Note: One ping-pong means two messages (one from client to server and
235 // the other from server to client):
236 // Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
237 // API and WriteLast API for server.
238 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPongWithCoalescingApi(benchmark::State & state)239 static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
240 const int msg_size = state.range(0);
241 const int max_ping_pongs = state.range(1);
242 // This options is used to test out server API: WriteLast and WriteAndFinish
243 // respectively, since we can not use both of them on server side at the same
244 // time. Value 1 means we are testing out the WriteAndFinish API, and
245 // otherwise we are testing out the WriteLast API.
246 const int write_and_finish = state.range(2);
247
248 EchoTestService::AsyncService service;
249 std::unique_ptr<Fixture> fixture(new Fixture(&service));
250 {
251 EchoResponse send_response;
252 EchoResponse recv_response;
253 EchoRequest send_request;
254 EchoRequest recv_request;
255
256 if (msg_size > 0) {
257 send_request.set_message(std::string(msg_size, 'a'));
258 send_response.set_message(std::string(msg_size, 'b'));
259 }
260
261 std::unique_ptr<EchoTestService::Stub> stub(
262 EchoTestService::NewStub(fixture->channel()));
263
264 for (auto _ : state) {
265 ServerContext svr_ctx;
266 ServerContextMutator svr_ctx_mut(&svr_ctx);
267 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
268 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
269 fixture->cq(), tag(0));
270
271 ClientContext cli_ctx;
272 ClientContextMutator cli_ctx_mut(&cli_ctx);
273 cli_ctx.set_initial_metadata_corked(true);
274 // tag:1 here will never comes up, since we are not performing any op due
275 // to initial metadata coalescing.
276 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
277
278 void* t;
279 bool ok;
280 int expect_tags = 0;
281
282 // Send 'max_ping_pongs' number of ping pong messages
283 int ping_pong_cnt = 0;
284 while (ping_pong_cnt < max_ping_pongs) {
285 if (ping_pong_cnt == max_ping_pongs - 1) {
286 request_rw->WriteLast(send_request, WriteOptions(), tag(2));
287 } else {
288 request_rw->Write(send_request, tag(2)); // Start client send
289 }
290
291 int await_tags = (1 << 2);
292
293 if (ping_pong_cnt == 0) {
294 // wait for the server call structure (call_hook, etc.) to be
295 // initialized (async stream between client side and server side
296 // established). It is necessary when client init metadata is
297 // coalesced
298 CHECK(fixture->cq()->Next(&t, &ok));
299 while (static_cast<int>(reinterpret_cast<intptr_t>(t)) != 0) {
300 // In some cases tag:2 comes before tag:0 (write tag comes out
301 // first), this while loop is to make sure get tag:0.
302 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
303 CHECK(await_tags & (1 << i));
304 await_tags &= ~(1 << i);
305 CHECK(fixture->cq()->Next(&t, &ok));
306 }
307 }
308
309 response_rw.Read(&recv_request, tag(3)); // Start server recv
310 request_rw->Read(&recv_response, tag(4)); // Start client recv
311
312 await_tags |= (1 << 3) | (1 << 4);
313 expect_tags = await_tags;
314 await_tags |= (1 << 5);
315
316 while (await_tags != 0) {
317 CHECK(fixture->cq()->Next(&t, &ok));
318 CHECK(ok);
319 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
320
321 // If server recv is complete, start the server send operation
322 if (i == 3) {
323 if (ping_pong_cnt == max_ping_pongs - 1) {
324 if (write_and_finish == 1) {
325 response_rw.WriteAndFinish(send_response, WriteOptions(),
326 Status::OK, tag(5));
327 expect_tags |= (1 << 5);
328 } else {
329 response_rw.WriteLast(send_response, WriteOptions(), tag(5));
330 // WriteLast buffers the write, so it's possible neither server
331 // write op nor client read op will finish inside the while
332 // loop.
333 await_tags &= ~(1 << 4);
334 await_tags &= ~(1 << 5);
335 expect_tags |= (1 << 5);
336 }
337 } else {
338 response_rw.Write(send_response, tag(5));
339 expect_tags |= (1 << 5);
340 }
341 }
342
343 CHECK(expect_tags & (1 << i));
344 expect_tags &= ~(1 << i);
345 await_tags &= ~(1 << i);
346 }
347
348 ping_pong_cnt++;
349 }
350
351 if (max_ping_pongs == 0) {
352 expect_tags |= (1 << 6) | (1 << 7) | (1 << 8);
353 } else {
354 if (write_and_finish == 1) {
355 expect_tags |= (1 << 8);
356 } else {
357 // server's buffered write and the client's read of the buffered write
358 // tags should come up.
359 expect_tags |= (1 << 7) | (1 << 8);
360 }
361 }
362
363 // No message write or initial metadata write happened yet.
364 if (max_ping_pongs == 0) {
365 request_rw->WritesDone(tag(6));
366 // wait for server call data structure(call_hook, etc.) to be
367 // initialized, since initial metadata is corked.
368 CHECK(fixture->cq()->Next(&t, &ok));
369 while (static_cast<int>(reinterpret_cast<intptr_t>(t)) != 0) {
370 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
371 CHECK(expect_tags & (1 << i));
372 expect_tags &= ~(1 << i);
373 CHECK(fixture->cq()->Next(&t, &ok));
374 }
375 response_rw.Finish(Status::OK, tag(7));
376 } else {
377 if (write_and_finish != 1) {
378 response_rw.Finish(Status::OK, tag(7));
379 }
380 }
381
382 Status recv_status;
383 request_rw->Finish(&recv_status, tag(8));
384
385 while (expect_tags) {
386 CHECK(fixture->cq()->Next(&t, &ok));
387 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
388 CHECK(expect_tags & (1 << i));
389 expect_tags &= ~(1 << i);
390 }
391
392 CHECK(recv_status.ok());
393 }
394 }
395
396 fixture.reset();
397 state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
398 }
399 } // namespace testing
400 } // namespace grpc
401
402 #endif // GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
403