• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* Benchmark gRPC end2end in various configurations */
20 
21 #ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
22 #define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
23 
24 #include <benchmark/benchmark.h>
25 #include <sstream>
26 #include "src/core/lib/profiling/timers.h"
27 #include "src/proto/grpc/testing/echo.grpc.pb.h"
28 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
29 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
30 
31 namespace grpc {
32 namespace testing {
33 
34 /*******************************************************************************
35  * BENCHMARKING KERNELS
36  */
37 
tag(intptr_t x)38 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
39 
40 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
41 // messages in each call) in a loop on a single channel
42 //
43 //  First parmeter (i.e state.range(0)):  Message size (in bytes) to use
44 //  Second parameter (i.e state.range(1)): Number of ping pong messages.
45 //      Note: One ping-pong means two messages (one from client to server and
46 //      the other from server to client):
47 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPong(benchmark::State & state)48 static void BM_StreamingPingPong(benchmark::State& state) {
49   const int msg_size = state.range(0);
50   const int max_ping_pongs = state.range(1);
51 
52   EchoTestService::AsyncService service;
53   std::unique_ptr<Fixture> fixture(new Fixture(&service));
54   {
55     EchoResponse send_response;
56     EchoResponse recv_response;
57     EchoRequest send_request;
58     EchoRequest recv_request;
59 
60     if (msg_size > 0) {
61       send_request.set_message(std::string(msg_size, 'a'));
62       send_response.set_message(std::string(msg_size, 'b'));
63     }
64 
65     std::unique_ptr<EchoTestService::Stub> stub(
66         EchoTestService::NewStub(fixture->channel()));
67 
68     while (state.KeepRunning()) {
69       ServerContext svr_ctx;
70       ServerContextMutator svr_ctx_mut(&svr_ctx);
71       ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
72       service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
73                                 fixture->cq(), tag(0));
74 
75       ClientContext cli_ctx;
76       ClientContextMutator cli_ctx_mut(&cli_ctx);
77       auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
78 
79       // Establish async stream between client side and server side
80       void* t;
81       bool ok;
82       int need_tags = (1 << 0) | (1 << 1);
83       while (need_tags) {
84         GPR_ASSERT(fixture->cq()->Next(&t, &ok));
85         GPR_ASSERT(ok);
86         int i = static_cast<int>((intptr_t)t);
87         GPR_ASSERT(need_tags & (1 << i));
88         need_tags &= ~(1 << i);
89       }
90 
91       // Send 'max_ping_pongs' number of ping pong messages
92       int ping_pong_cnt = 0;
93       while (ping_pong_cnt < max_ping_pongs) {
94         request_rw->Write(send_request, tag(0));   // Start client send
95         response_rw.Read(&recv_request, tag(1));   // Start server recv
96         request_rw->Read(&recv_response, tag(2));  // Start client recv
97 
98         need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
99         while (need_tags) {
100           GPR_ASSERT(fixture->cq()->Next(&t, &ok));
101           GPR_ASSERT(ok);
102           int i = static_cast<int>((intptr_t)t);
103 
104           // If server recv is complete, start the server send operation
105           if (i == 1) {
106             response_rw.Write(send_response, tag(3));
107           }
108 
109           GPR_ASSERT(need_tags & (1 << i));
110           need_tags &= ~(1 << i);
111         }
112 
113         ping_pong_cnt++;
114       }
115 
116       request_rw->WritesDone(tag(0));
117       response_rw.Finish(Status::OK, tag(1));
118 
119       Status recv_status;
120       request_rw->Finish(&recv_status, tag(2));
121 
122       need_tags = (1 << 0) | (1 << 1) | (1 << 2);
123       while (need_tags) {
124         GPR_ASSERT(fixture->cq()->Next(&t, &ok));
125         int i = static_cast<int>((intptr_t)t);
126         GPR_ASSERT(need_tags & (1 << i));
127         need_tags &= ~(1 << i);
128       }
129 
130       GPR_ASSERT(recv_status.ok());
131     }
132   }
133 
134   fixture->Finish(state);
135   fixture.reset();
136   state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
137 }
138 
139 // Repeatedly sends ping pong messages in a single streaming Bidi call in a loop
140 //     First parmeter (i.e state.range(0)):  Message size (in bytes) to use
141 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPongMsgs(benchmark::State & state)142 static void BM_StreamingPingPongMsgs(benchmark::State& state) {
143   const int msg_size = state.range(0);
144 
145   EchoTestService::AsyncService service;
146   std::unique_ptr<Fixture> fixture(new Fixture(&service));
147   {
148     EchoResponse send_response;
149     EchoResponse recv_response;
150     EchoRequest send_request;
151     EchoRequest recv_request;
152 
153     if (msg_size > 0) {
154       send_request.set_message(std::string(msg_size, 'a'));
155       send_response.set_message(std::string(msg_size, 'b'));
156     }
157 
158     std::unique_ptr<EchoTestService::Stub> stub(
159         EchoTestService::NewStub(fixture->channel()));
160 
161     ServerContext svr_ctx;
162     ServerContextMutator svr_ctx_mut(&svr_ctx);
163     ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
164     service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
165                               fixture->cq(), tag(0));
166 
167     ClientContext cli_ctx;
168     ClientContextMutator cli_ctx_mut(&cli_ctx);
169     auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
170 
171     // Establish async stream between client side and server side
172     void* t;
173     bool ok;
174     int need_tags = (1 << 0) | (1 << 1);
175     while (need_tags) {
176       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
177       GPR_ASSERT(ok);
178       int i = static_cast<int>((intptr_t)t);
179       GPR_ASSERT(need_tags & (1 << i));
180       need_tags &= ~(1 << i);
181     }
182 
183     while (state.KeepRunning()) {
184       GPR_TIMER_SCOPE("BenchmarkCycle", 0);
185       request_rw->Write(send_request, tag(0));   // Start client send
186       response_rw.Read(&recv_request, tag(1));   // Start server recv
187       request_rw->Read(&recv_response, tag(2));  // Start client recv
188 
189       need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
190       while (need_tags) {
191         GPR_ASSERT(fixture->cq()->Next(&t, &ok));
192         GPR_ASSERT(ok);
193         int i = static_cast<int>((intptr_t)t);
194 
195         // If server recv is complete, start the server send operation
196         if (i == 1) {
197           response_rw.Write(send_response, tag(3));
198         }
199 
200         GPR_ASSERT(need_tags & (1 << i));
201         need_tags &= ~(1 << i);
202       }
203     }
204 
205     request_rw->WritesDone(tag(0));
206     response_rw.Finish(Status::OK, tag(1));
207     Status recv_status;
208     request_rw->Finish(&recv_status, tag(2));
209 
210     need_tags = (1 << 0) | (1 << 1) | (1 << 2);
211     while (need_tags) {
212       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
213       int i = static_cast<int>((intptr_t)t);
214       GPR_ASSERT(need_tags & (1 << i));
215       need_tags &= ~(1 << i);
216     }
217 
218     GPR_ASSERT(recv_status.ok());
219   }
220 
221   fixture->Finish(state);
222   fixture.reset();
223   state.SetBytesProcessed(msg_size * state.iterations() * 2);
224 }
225 
226 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
227 // messages in each call) in a loop on a single channel. Different from
228 // BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
229 // WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
230 // sendmsg syscalls for streaming by coalescing 1. initial metadata with first
231 // message; 2. final streaming message with trailing metadata.
232 //
233 //  First parmeter (i.e state.range(0)):  Message size (in bytes) to use
234 //  Second parameter (i.e state.range(1)): Number of ping pong messages.
235 //      Note: One ping-pong means two messages (one from client to server and
236 //      the other from server to client):
237 //  Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
238 //  API and WriteLast API for server.
239 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_StreamingPingPongWithCoalescingApi(benchmark::State & state)240 static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
241   const int msg_size = state.range(0);
242   const int max_ping_pongs = state.range(1);
243   // This options is used to test out server API: WriteLast and WriteAndFinish
244   // respectively, since we can not use both of them on server side at the same
245   // time. Value 1 means we are testing out the WriteAndFinish API, and
246   // otherwise we are testing out the WriteLast API.
247   const int write_and_finish = state.range(2);
248 
249   EchoTestService::AsyncService service;
250   std::unique_ptr<Fixture> fixture(new Fixture(&service));
251   {
252     EchoResponse send_response;
253     EchoResponse recv_response;
254     EchoRequest send_request;
255     EchoRequest recv_request;
256 
257     if (msg_size > 0) {
258       send_request.set_message(std::string(msg_size, 'a'));
259       send_response.set_message(std::string(msg_size, 'b'));
260     }
261 
262     std::unique_ptr<EchoTestService::Stub> stub(
263         EchoTestService::NewStub(fixture->channel()));
264 
265     while (state.KeepRunning()) {
266       ServerContext svr_ctx;
267       ServerContextMutator svr_ctx_mut(&svr_ctx);
268       ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
269       service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
270                                 fixture->cq(), tag(0));
271 
272       ClientContext cli_ctx;
273       ClientContextMutator cli_ctx_mut(&cli_ctx);
274       cli_ctx.set_initial_metadata_corked(true);
275       // tag:1 here will never comes up, since we are not performing any op due
276       // to initial metadata coalescing.
277       auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
278 
279       void* t;
280       bool ok;
281       int expect_tags = 0;
282 
283       // Send 'max_ping_pongs' number of ping pong messages
284       int ping_pong_cnt = 0;
285       while (ping_pong_cnt < max_ping_pongs) {
286         if (ping_pong_cnt == max_ping_pongs - 1) {
287           request_rw->WriteLast(send_request, WriteOptions(), tag(2));
288         } else {
289           request_rw->Write(send_request, tag(2));  // Start client send
290         }
291 
292         int await_tags = (1 << 2);
293 
294         if (ping_pong_cnt == 0) {
295           // wait for the server call structure (call_hook, etc.) to be
296           // initialized (async stream between client side and server side
297           // established). It is necessary when client init metadata is
298           // coalesced
299           GPR_ASSERT(fixture->cq()->Next(&t, &ok));
300           while (static_cast<int>((intptr_t)t) != 0) {
301             // In some cases tag:2 comes before tag:0 (write tag comes out
302             // first), this while loop is to make sure get tag:0.
303             int i = static_cast<int>((intptr_t)t);
304             GPR_ASSERT(await_tags & (1 << i));
305             await_tags &= ~(1 << i);
306             GPR_ASSERT(fixture->cq()->Next(&t, &ok));
307           }
308         }
309 
310         response_rw.Read(&recv_request, tag(3));   // Start server recv
311         request_rw->Read(&recv_response, tag(4));  // Start client recv
312 
313         await_tags |= (1 << 3) | (1 << 4);
314         expect_tags = await_tags;
315         await_tags |= (1 << 5);
316 
317         while (await_tags != 0) {
318           GPR_ASSERT(fixture->cq()->Next(&t, &ok));
319           GPR_ASSERT(ok);
320           int i = static_cast<int>((intptr_t)t);
321 
322           // If server recv is complete, start the server send operation
323           if (i == 3) {
324             if (ping_pong_cnt == max_ping_pongs - 1) {
325               if (write_and_finish == 1) {
326                 response_rw.WriteAndFinish(send_response, WriteOptions(),
327                                            Status::OK, tag(5));
328                 expect_tags |= (1 << 5);
329               } else {
330                 response_rw.WriteLast(send_response, WriteOptions(), tag(5));
331                 // WriteLast buffers the write, so it's possible neither server
332                 // write op nor client read op will finish inside the while
333                 // loop.
334                 await_tags &= ~(1 << 4);
335                 await_tags &= ~(1 << 5);
336                 expect_tags |= (1 << 5);
337               }
338             } else {
339               response_rw.Write(send_response, tag(5));
340               expect_tags |= (1 << 5);
341             }
342           }
343 
344           GPR_ASSERT(expect_tags & (1 << i));
345           expect_tags &= ~(1 << i);
346           await_tags &= ~(1 << i);
347         }
348 
349         ping_pong_cnt++;
350       }
351 
352       if (max_ping_pongs == 0) {
353         expect_tags |= (1 << 6) | (1 << 7) | (1 << 8);
354       } else {
355         if (write_and_finish == 1) {
356           expect_tags |= (1 << 8);
357         } else {
358           // server's buffered write and the client's read of the buffered write
359           // tags should come up.
360           expect_tags |= (1 << 7) | (1 << 8);
361         }
362       }
363 
364       // No message write or initial metadata write happened yet.
365       if (max_ping_pongs == 0) {
366         request_rw->WritesDone(tag(6));
367         // wait for server call data structure(call_hook, etc.) to be
368         // initialized, since initial metadata is corked.
369         GPR_ASSERT(fixture->cq()->Next(&t, &ok));
370         while (static_cast<int>((intptr_t)t) != 0) {
371           int i = static_cast<int>((intptr_t)t);
372           GPR_ASSERT(expect_tags & (1 << i));
373           expect_tags &= ~(1 << i);
374           GPR_ASSERT(fixture->cq()->Next(&t, &ok));
375         }
376         response_rw.Finish(Status::OK, tag(7));
377       } else {
378         if (write_and_finish != 1) {
379           response_rw.Finish(Status::OK, tag(7));
380         }
381       }
382 
383       Status recv_status;
384       request_rw->Finish(&recv_status, tag(8));
385 
386       while (expect_tags) {
387         GPR_ASSERT(fixture->cq()->Next(&t, &ok));
388         int i = static_cast<int>((intptr_t)t);
389         GPR_ASSERT(expect_tags & (1 << i));
390         expect_tags &= ~(1 << i);
391       }
392 
393       GPR_ASSERT(recv_status.ok());
394     }
395   }
396 
397   fixture->Finish(state);
398   fixture.reset();
399   state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
400 }
401 }  // namespace testing
402 }  // namespace grpc
403 
404 #endif  // TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
405