• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 // Benchmark gRPC end2end in various configurations
20 
21 #ifndef GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
22 #define GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
23 
24 #include <benchmark/benchmark.h>
25 
26 #include <sstream>
27 
28 #include "absl/log/check.h"
29 #include "src/proto/grpc/testing/echo.grpc.pb.h"
30 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
31 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
32 
33 namespace grpc {
34 namespace testing {
35 
36 //******************************************************************************
37 // BENCHMARKING KERNELS
38 //
39 
tag(intptr_t x)40 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
41 
42 template <class Fixture>
BM_PumpStreamClientToServer(benchmark::State & state)43 static void BM_PumpStreamClientToServer(benchmark::State& state) {
44   EchoTestService::AsyncService service;
45   std::unique_ptr<Fixture> fixture(new Fixture(&service));
46   {
47     EchoRequest send_request;
48     EchoRequest recv_request;
49     if (state.range(0) > 0) {
50       send_request.set_message(std::string(state.range(0), 'a'));
51     }
52     Status recv_status;
53     ServerContext svr_ctx;
54     ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
55     service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
56                               fixture->cq(), tag(0));
57     std::unique_ptr<EchoTestService::Stub> stub(
58         EchoTestService::NewStub(fixture->channel()));
59     ClientContext cli_ctx;
60     auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
61     int need_tags = (1 << 0) | (1 << 1);
62     void* t;
63     bool ok;
64     while (need_tags) {
65       CHECK(fixture->cq()->Next(&t, &ok));
66       CHECK(ok);
67       int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
68       CHECK(need_tags & (1 << i));
69       need_tags &= ~(1 << i);
70     }
71     response_rw.Read(&recv_request, tag(0));
72     for (auto _ : state) {
73       request_rw->Write(send_request, tag(1));
74       while (true) {
75         CHECK(fixture->cq()->Next(&t, &ok));
76         if (t == tag(0)) {
77           response_rw.Read(&recv_request, tag(0));
78         } else if (t == tag(1)) {
79           break;
80         } else {
81           grpc_core::Crash("unreachable");
82         }
83       }
84     }
85     request_rw->WritesDone(tag(1));
86     need_tags = (1 << 0) | (1 << 1);
87     while (need_tags) {
88       CHECK(fixture->cq()->Next(&t, &ok));
89       int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
90       CHECK(need_tags & (1 << i));
91       need_tags &= ~(1 << i);
92     }
93     response_rw.Finish(Status::OK, tag(0));
94     Status final_status;
95     request_rw->Finish(&final_status, tag(1));
96     need_tags = (1 << 0) | (1 << 1);
97     while (need_tags) {
98       CHECK(fixture->cq()->Next(&t, &ok));
99       int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
100       CHECK(need_tags & (1 << i));
101       need_tags &= ~(1 << i);
102     }
103     CHECK(final_status.ok());
104   }
105   fixture.reset();
106   state.SetBytesProcessed(state.range(0) * state.iterations());
107 }
108 
109 template <class Fixture>
BM_PumpStreamServerToClient(benchmark::State & state)110 static void BM_PumpStreamServerToClient(benchmark::State& state) {
111   EchoTestService::AsyncService service;
112   std::unique_ptr<Fixture> fixture(new Fixture(&service));
113   {
114     EchoResponse send_response;
115     EchoResponse recv_response;
116     if (state.range(0) > 0) {
117       send_response.set_message(std::string(state.range(0), 'a'));
118     }
119     Status recv_status;
120     ServerContext svr_ctx;
121     ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
122     service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
123                               fixture->cq(), tag(0));
124     std::unique_ptr<EchoTestService::Stub> stub(
125         EchoTestService::NewStub(fixture->channel()));
126     ClientContext cli_ctx;
127     auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
128     int need_tags = (1 << 0) | (1 << 1);
129     void* t;
130     bool ok;
131     while (need_tags) {
132       CHECK(fixture->cq()->Next(&t, &ok));
133       CHECK(ok);
134       int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
135       CHECK(need_tags & (1 << i));
136       need_tags &= ~(1 << i);
137     }
138     request_rw->Read(&recv_response, tag(0));
139     for (auto _ : state) {
140       response_rw.Write(send_response, tag(1));
141       while (true) {
142         CHECK(fixture->cq()->Next(&t, &ok));
143         if (t == tag(0)) {
144           request_rw->Read(&recv_response, tag(0));
145         } else if (t == tag(1)) {
146           break;
147         } else {
148           grpc_core::Crash("unreachable");
149         }
150       }
151     }
152     response_rw.Finish(Status::OK, tag(1));
153     need_tags = (1 << 0) | (1 << 1);
154     while (need_tags) {
155       CHECK(fixture->cq()->Next(&t, &ok));
156       int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
157       CHECK(need_tags & (1 << i));
158       need_tags &= ~(1 << i);
159     }
160   }
161   fixture.reset();
162   state.SetBytesProcessed(state.range(0) * state.iterations());
163 }
164 }  // namespace testing
165 }  // namespace grpc
166 
167 #endif  // GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
168