1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 // Benchmark gRPC end2end in various configurations
20
21 #ifndef GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
22 #define GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
23
24 #include <benchmark/benchmark.h>
25
26 #include <sstream>
27
28 #include "absl/log/check.h"
29 #include "src/proto/grpc/testing/echo.grpc.pb.h"
30 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
31 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
32
33 namespace grpc {
34 namespace testing {
35
36 //******************************************************************************
37 // BENCHMARKING KERNELS
38 //
39
tag(intptr_t x)40 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
41
42 template <class Fixture>
BM_PumpStreamClientToServer(benchmark::State & state)43 static void BM_PumpStreamClientToServer(benchmark::State& state) {
44 EchoTestService::AsyncService service;
45 std::unique_ptr<Fixture> fixture(new Fixture(&service));
46 {
47 EchoRequest send_request;
48 EchoRequest recv_request;
49 if (state.range(0) > 0) {
50 send_request.set_message(std::string(state.range(0), 'a'));
51 }
52 Status recv_status;
53 ServerContext svr_ctx;
54 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
55 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
56 fixture->cq(), tag(0));
57 std::unique_ptr<EchoTestService::Stub> stub(
58 EchoTestService::NewStub(fixture->channel()));
59 ClientContext cli_ctx;
60 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
61 int need_tags = (1 << 0) | (1 << 1);
62 void* t;
63 bool ok;
64 while (need_tags) {
65 CHECK(fixture->cq()->Next(&t, &ok));
66 CHECK(ok);
67 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
68 CHECK(need_tags & (1 << i));
69 need_tags &= ~(1 << i);
70 }
71 response_rw.Read(&recv_request, tag(0));
72 for (auto _ : state) {
73 request_rw->Write(send_request, tag(1));
74 while (true) {
75 CHECK(fixture->cq()->Next(&t, &ok));
76 if (t == tag(0)) {
77 response_rw.Read(&recv_request, tag(0));
78 } else if (t == tag(1)) {
79 break;
80 } else {
81 grpc_core::Crash("unreachable");
82 }
83 }
84 }
85 request_rw->WritesDone(tag(1));
86 need_tags = (1 << 0) | (1 << 1);
87 while (need_tags) {
88 CHECK(fixture->cq()->Next(&t, &ok));
89 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
90 CHECK(need_tags & (1 << i));
91 need_tags &= ~(1 << i);
92 }
93 response_rw.Finish(Status::OK, tag(0));
94 Status final_status;
95 request_rw->Finish(&final_status, tag(1));
96 need_tags = (1 << 0) | (1 << 1);
97 while (need_tags) {
98 CHECK(fixture->cq()->Next(&t, &ok));
99 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
100 CHECK(need_tags & (1 << i));
101 need_tags &= ~(1 << i);
102 }
103 CHECK(final_status.ok());
104 }
105 fixture.reset();
106 state.SetBytesProcessed(state.range(0) * state.iterations());
107 }
108
109 template <class Fixture>
BM_PumpStreamServerToClient(benchmark::State & state)110 static void BM_PumpStreamServerToClient(benchmark::State& state) {
111 EchoTestService::AsyncService service;
112 std::unique_ptr<Fixture> fixture(new Fixture(&service));
113 {
114 EchoResponse send_response;
115 EchoResponse recv_response;
116 if (state.range(0) > 0) {
117 send_response.set_message(std::string(state.range(0), 'a'));
118 }
119 Status recv_status;
120 ServerContext svr_ctx;
121 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
122 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
123 fixture->cq(), tag(0));
124 std::unique_ptr<EchoTestService::Stub> stub(
125 EchoTestService::NewStub(fixture->channel()));
126 ClientContext cli_ctx;
127 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
128 int need_tags = (1 << 0) | (1 << 1);
129 void* t;
130 bool ok;
131 while (need_tags) {
132 CHECK(fixture->cq()->Next(&t, &ok));
133 CHECK(ok);
134 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
135 CHECK(need_tags & (1 << i));
136 need_tags &= ~(1 << i);
137 }
138 request_rw->Read(&recv_response, tag(0));
139 for (auto _ : state) {
140 response_rw.Write(send_response, tag(1));
141 while (true) {
142 CHECK(fixture->cq()->Next(&t, &ok));
143 if (t == tag(0)) {
144 request_rw->Read(&recv_response, tag(0));
145 } else if (t == tag(1)) {
146 break;
147 } else {
148 grpc_core::Crash("unreachable");
149 }
150 }
151 }
152 response_rw.Finish(Status::OK, tag(1));
153 need_tags = (1 << 0) | (1 << 1);
154 while (need_tags) {
155 CHECK(fixture->cq()->Next(&t, &ok));
156 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
157 CHECK(need_tags & (1 << i));
158 need_tags &= ~(1 << i);
159 }
160 }
161 fixture.reset();
162 state.SetBytesProcessed(state.range(0) * state.iterations());
163 }
164 } // namespace testing
165 } // namespace grpc
166
167 #endif // GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
168