• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 // Benchmark gRPC end2end in various configurations
20 
21 #ifndef GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
22 #define GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
23 
24 #include <benchmark/benchmark.h>
25 
26 #include <sstream>
27 
28 #include "absl/log/check.h"
29 #include "src/proto/grpc/testing/echo.grpc.pb.h"
30 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
31 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
32 
33 namespace grpc {
34 namespace testing {
35 
36 //******************************************************************************
37 // BENCHMARKING KERNELS
38 //
39 
tag(intptr_t x)40 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
41 
42 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
43 // messages in each call) in a loop on a single channel
44 //
45 //  First parameter (i.e state.range(0)):  Message size (in bytes) to use
46 //  Second parameter (i.e state.range(1)): Number of ping pong messages.
47 //      Note: One ping-pong means two messages (one from client to server and
48 //      the other from server to client):
49 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPong(benchmark::State & state)50 static void BM_StreamingPingPong(benchmark::State& state) {
51   const int msg_size = state.range(0);
52   const int max_ping_pongs = state.range(1);
53 
54   EchoTestService::AsyncService service;
55   std::unique_ptr<Fixture> fixture(new Fixture(&service));
56   {
57     EchoResponse send_response;
58     EchoResponse recv_response;
59     EchoRequest send_request;
60     EchoRequest recv_request;
61 
62     if (msg_size > 0) {
63       send_request.set_message(std::string(msg_size, 'a'));
64       send_response.set_message(std::string(msg_size, 'b'));
65     }
66 
67     std::unique_ptr<EchoTestService::Stub> stub(
68         EchoTestService::NewStub(fixture->channel()));
69 
70     for (auto _ : state) {
71       ServerContext svr_ctx;
72       ServerContextMutator svr_ctx_mut(&svr_ctx);
73       ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
74       service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
75                                 fixture->cq(), tag(0));
76 
77       ClientContext cli_ctx;
78       ClientContextMutator cli_ctx_mut(&cli_ctx);
79       auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
80 
81       // Establish async stream between client side and server side
82       void* t;
83       bool ok;
84       int need_tags = (1 << 0) | (1 << 1);
85       while (need_tags) {
86         CHECK(fixture->cq()->Next(&t, &ok));
87         CHECK(ok);
88         int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
89         CHECK(need_tags & (1 << i));
90         need_tags &= ~(1 << i);
91       }
92 
93       // Send 'max_ping_pongs' number of ping pong messages
94       int ping_pong_cnt = 0;
95       while (ping_pong_cnt < max_ping_pongs) {
96         request_rw->Write(send_request, tag(0));   // Start client send
97         response_rw.Read(&recv_request, tag(1));   // Start server recv
98         request_rw->Read(&recv_response, tag(2));  // Start client recv
99 
100         need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
101         while (need_tags) {
102           CHECK(fixture->cq()->Next(&t, &ok));
103           CHECK(ok);
104           int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
105 
106           // If server recv is complete, start the server send operation
107           if (i == 1) {
108             response_rw.Write(send_response, tag(3));
109           }
110 
111           CHECK(need_tags & (1 << i));
112           need_tags &= ~(1 << i);
113         }
114 
115         ping_pong_cnt++;
116       }
117 
118       request_rw->WritesDone(tag(0));
119       response_rw.Finish(Status::OK, tag(1));
120 
121       Status recv_status;
122       request_rw->Finish(&recv_status, tag(2));
123 
124       need_tags = (1 << 0) | (1 << 1) | (1 << 2);
125       while (need_tags) {
126         CHECK(fixture->cq()->Next(&t, &ok));
127         int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
128         CHECK(need_tags & (1 << i));
129         need_tags &= ~(1 << i);
130       }
131 
132       CHECK(recv_status.ok());
133     }
134   }
135 
136   fixture.reset();
137   state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
138 }
139 
140 // Repeatedly sends ping pong messages in a single streaming Bidi call in a loop
141 //     First parameter (i.e state.range(0)):  Message size (in bytes) to use
142 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPongMsgs(benchmark::State & state)143 static void BM_StreamingPingPongMsgs(benchmark::State& state) {
144   const int msg_size = state.range(0);
145 
146   EchoTestService::AsyncService service;
147   std::unique_ptr<Fixture> fixture(new Fixture(&service));
148   {
149     EchoResponse send_response;
150     EchoResponse recv_response;
151     EchoRequest send_request;
152     EchoRequest recv_request;
153 
154     if (msg_size > 0) {
155       send_request.set_message(std::string(msg_size, 'a'));
156       send_response.set_message(std::string(msg_size, 'b'));
157     }
158 
159     std::unique_ptr<EchoTestService::Stub> stub(
160         EchoTestService::NewStub(fixture->channel()));
161 
162     ServerContext svr_ctx;
163     ServerContextMutator svr_ctx_mut(&svr_ctx);
164     ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
165     service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
166                               fixture->cq(), tag(0));
167 
168     ClientContext cli_ctx;
169     ClientContextMutator cli_ctx_mut(&cli_ctx);
170     auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
171 
172     // Establish async stream between client side and server side
173     void* t;
174     bool ok;
175     int need_tags = (1 << 0) | (1 << 1);
176     while (need_tags) {
177       CHECK(fixture->cq()->Next(&t, &ok));
178       CHECK(ok);
179       int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
180       CHECK(need_tags & (1 << i));
181       need_tags &= ~(1 << i);
182     }
183 
184     for (auto _ : state) {
185       request_rw->Write(send_request, tag(0));   // Start client send
186       response_rw.Read(&recv_request, tag(1));   // Start server recv
187       request_rw->Read(&recv_response, tag(2));  // Start client recv
188 
189       need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
190       while (need_tags) {
191         CHECK(fixture->cq()->Next(&t, &ok));
192         CHECK(ok);
193         int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
194 
195         // If server recv is complete, start the server send operation
196         if (i == 1) {
197           response_rw.Write(send_response, tag(3));
198         }
199 
200         CHECK(need_tags & (1 << i));
201         need_tags &= ~(1 << i);
202       }
203     }
204 
205     request_rw->WritesDone(tag(0));
206     response_rw.Finish(Status::OK, tag(1));
207     Status recv_status;
208     request_rw->Finish(&recv_status, tag(2));
209 
210     need_tags = (1 << 0) | (1 << 1) | (1 << 2);
211     while (need_tags) {
212       CHECK(fixture->cq()->Next(&t, &ok));
213       int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
214       CHECK(need_tags & (1 << i));
215       need_tags &= ~(1 << i);
216     }
217 
218     CHECK(recv_status.ok());
219   }
220 
221   fixture.reset();
222   state.SetBytesProcessed(msg_size * state.iterations() * 2);
223 }
224 
225 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
226 // messages in each call) in a loop on a single channel. Different from
227 // BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
228 // WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
229 // sendmsg syscalls for streaming by coalescing 1. initial metadata with first
230 // message; 2. final streaming message with trailing metadata.
231 //
232 //  First parameter (i.e state.range(0)):  Message size (in bytes) to use
233 //  Second parameter (i.e state.range(1)): Number of ping pong messages.
234 //      Note: One ping-pong means two messages (one from client to server and
235 //      the other from server to client):
236 //  Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
237 //  API and WriteLast API for server.
238 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPongWithCoalescingApi(benchmark::State & state)239 static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
240   const int msg_size = state.range(0);
241   const int max_ping_pongs = state.range(1);
242   // This options is used to test out server API: WriteLast and WriteAndFinish
243   // respectively, since we can not use both of them on server side at the same
244   // time. Value 1 means we are testing out the WriteAndFinish API, and
245   // otherwise we are testing out the WriteLast API.
246   const int write_and_finish = state.range(2);
247 
248   EchoTestService::AsyncService service;
249   std::unique_ptr<Fixture> fixture(new Fixture(&service));
250   {
251     EchoResponse send_response;
252     EchoResponse recv_response;
253     EchoRequest send_request;
254     EchoRequest recv_request;
255 
256     if (msg_size > 0) {
257       send_request.set_message(std::string(msg_size, 'a'));
258       send_response.set_message(std::string(msg_size, 'b'));
259     }
260 
261     std::unique_ptr<EchoTestService::Stub> stub(
262         EchoTestService::NewStub(fixture->channel()));
263 
264     for (auto _ : state) {
265       ServerContext svr_ctx;
266       ServerContextMutator svr_ctx_mut(&svr_ctx);
267       ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
268       service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
269                                 fixture->cq(), tag(0));
270 
271       ClientContext cli_ctx;
272       ClientContextMutator cli_ctx_mut(&cli_ctx);
273       cli_ctx.set_initial_metadata_corked(true);
274       // tag:1 here will never comes up, since we are not performing any op due
275       // to initial metadata coalescing.
276       auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
277 
278       void* t;
279       bool ok;
280       int expect_tags = 0;
281 
282       // Send 'max_ping_pongs' number of ping pong messages
283       int ping_pong_cnt = 0;
284       while (ping_pong_cnt < max_ping_pongs) {
285         if (ping_pong_cnt == max_ping_pongs - 1) {
286           request_rw->WriteLast(send_request, WriteOptions(), tag(2));
287         } else {
288           request_rw->Write(send_request, tag(2));  // Start client send
289         }
290 
291         int await_tags = (1 << 2);
292 
293         if (ping_pong_cnt == 0) {
294           // wait for the server call structure (call_hook, etc.) to be
295           // initialized (async stream between client side and server side
296           // established). It is necessary when client init metadata is
297           // coalesced
298           CHECK(fixture->cq()->Next(&t, &ok));
299           while (static_cast<int>(reinterpret_cast<intptr_t>(t)) != 0) {
300             // In some cases tag:2 comes before tag:0 (write tag comes out
301             // first), this while loop is to make sure get tag:0.
302             int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
303             CHECK(await_tags & (1 << i));
304             await_tags &= ~(1 << i);
305             CHECK(fixture->cq()->Next(&t, &ok));
306           }
307         }
308 
309         response_rw.Read(&recv_request, tag(3));   // Start server recv
310         request_rw->Read(&recv_response, tag(4));  // Start client recv
311 
312         await_tags |= (1 << 3) | (1 << 4);
313         expect_tags = await_tags;
314         await_tags |= (1 << 5);
315 
316         while (await_tags != 0) {
317           CHECK(fixture->cq()->Next(&t, &ok));
318           CHECK(ok);
319           int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
320 
321           // If server recv is complete, start the server send operation
322           if (i == 3) {
323             if (ping_pong_cnt == max_ping_pongs - 1) {
324               if (write_and_finish == 1) {
325                 response_rw.WriteAndFinish(send_response, WriteOptions(),
326                                            Status::OK, tag(5));
327                 expect_tags |= (1 << 5);
328               } else {
329                 response_rw.WriteLast(send_response, WriteOptions(), tag(5));
330                 // WriteLast buffers the write, so it's possible neither server
331                 // write op nor client read op will finish inside the while
332                 // loop.
333                 await_tags &= ~(1 << 4);
334                 await_tags &= ~(1 << 5);
335                 expect_tags |= (1 << 5);
336               }
337             } else {
338               response_rw.Write(send_response, tag(5));
339               expect_tags |= (1 << 5);
340             }
341           }
342 
343           CHECK(expect_tags & (1 << i));
344           expect_tags &= ~(1 << i);
345           await_tags &= ~(1 << i);
346         }
347 
348         ping_pong_cnt++;
349       }
350 
351       if (max_ping_pongs == 0) {
352         expect_tags |= (1 << 6) | (1 << 7) | (1 << 8);
353       } else {
354         if (write_and_finish == 1) {
355           expect_tags |= (1 << 8);
356         } else {
357           // server's buffered write and the client's read of the buffered write
358           // tags should come up.
359           expect_tags |= (1 << 7) | (1 << 8);
360         }
361       }
362 
363       // No message write or initial metadata write happened yet.
364       if (max_ping_pongs == 0) {
365         request_rw->WritesDone(tag(6));
366         // wait for server call data structure(call_hook, etc.) to be
367         // initialized, since initial metadata is corked.
368         CHECK(fixture->cq()->Next(&t, &ok));
369         while (static_cast<int>(reinterpret_cast<intptr_t>(t)) != 0) {
370           int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
371           CHECK(expect_tags & (1 << i));
372           expect_tags &= ~(1 << i);
373           CHECK(fixture->cq()->Next(&t, &ok));
374         }
375         response_rw.Finish(Status::OK, tag(7));
376       } else {
377         if (write_and_finish != 1) {
378           response_rw.Finish(Status::OK, tag(7));
379         }
380       }
381 
382       Status recv_status;
383       request_rw->Finish(&recv_status, tag(8));
384 
385       while (expect_tags) {
386         CHECK(fixture->cq()->Next(&t, &ok));
387         int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
388         CHECK(expect_tags & (1 << i));
389         expect_tags &= ~(1 << i);
390       }
391 
392       CHECK(recv_status.ok());
393     }
394   }
395 
396   fixture.reset();
397   state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
398 }
399 }  // namespace testing
400 }  // namespace grpc
401 
402 #endif  // GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
403