1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /* Benchmark gRPC end2end in various configurations */
20
21 #ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
22 #define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
23
24 #include <benchmark/benchmark.h>
25 #include <sstream>
26 #include "src/core/lib/profiling/timers.h"
27 #include "src/proto/grpc/testing/echo.grpc.pb.h"
28 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
29 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
30
31 namespace grpc {
32 namespace testing {
33
34 /*******************************************************************************
35 * BENCHMARKING KERNELS
36 */
37
tag(intptr_t x)38 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
39
40 template <class Fixture>
BM_PumpStreamClientToServer(benchmark::State & state)41 static void BM_PumpStreamClientToServer(benchmark::State& state) {
42 EchoTestService::AsyncService service;
43 std::unique_ptr<Fixture> fixture(new Fixture(&service));
44 {
45 EchoRequest send_request;
46 EchoRequest recv_request;
47 if (state.range(0) > 0) {
48 send_request.set_message(std::string(state.range(0), 'a'));
49 }
50 Status recv_status;
51 ServerContext svr_ctx;
52 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
53 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
54 fixture->cq(), tag(0));
55 std::unique_ptr<EchoTestService::Stub> stub(
56 EchoTestService::NewStub(fixture->channel()));
57 ClientContext cli_ctx;
58 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
59 int need_tags = (1 << 0) | (1 << 1);
60 void* t;
61 bool ok;
62 while (need_tags) {
63 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
64 GPR_ASSERT(ok);
65 int i = static_cast<int>((intptr_t)t);
66 GPR_ASSERT(need_tags & (1 << i));
67 need_tags &= ~(1 << i);
68 }
69 response_rw.Read(&recv_request, tag(0));
70 while (state.KeepRunning()) {
71 GPR_TIMER_SCOPE("BenchmarkCycle", 0);
72 request_rw->Write(send_request, tag(1));
73 while (true) {
74 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
75 if (t == tag(0)) {
76 response_rw.Read(&recv_request, tag(0));
77 } else if (t == tag(1)) {
78 break;
79 } else {
80 GPR_ASSERT(false);
81 }
82 }
83 }
84 request_rw->WritesDone(tag(1));
85 need_tags = (1 << 0) | (1 << 1);
86 while (need_tags) {
87 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
88 int i = static_cast<int>((intptr_t)t);
89 GPR_ASSERT(need_tags & (1 << i));
90 need_tags &= ~(1 << i);
91 }
92 response_rw.Finish(Status::OK, tag(0));
93 Status final_status;
94 request_rw->Finish(&final_status, tag(1));
95 need_tags = (1 << 0) | (1 << 1);
96 while (need_tags) {
97 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
98 int i = static_cast<int>((intptr_t)t);
99 GPR_ASSERT(need_tags & (1 << i));
100 need_tags &= ~(1 << i);
101 }
102 GPR_ASSERT(final_status.ok());
103 }
104 fixture->Finish(state);
105 fixture.reset();
106 state.SetBytesProcessed(state.range(0) * state.iterations());
107 }
108
109 template <class Fixture>
BM_PumpStreamServerToClient(benchmark::State & state)110 static void BM_PumpStreamServerToClient(benchmark::State& state) {
111 EchoTestService::AsyncService service;
112 std::unique_ptr<Fixture> fixture(new Fixture(&service));
113 {
114 EchoResponse send_response;
115 EchoResponse recv_response;
116 if (state.range(0) > 0) {
117 send_response.set_message(std::string(state.range(0), 'a'));
118 }
119 Status recv_status;
120 ServerContext svr_ctx;
121 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
122 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
123 fixture->cq(), tag(0));
124 std::unique_ptr<EchoTestService::Stub> stub(
125 EchoTestService::NewStub(fixture->channel()));
126 ClientContext cli_ctx;
127 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
128 int need_tags = (1 << 0) | (1 << 1);
129 void* t;
130 bool ok;
131 while (need_tags) {
132 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
133 GPR_ASSERT(ok);
134 int i = static_cast<int>((intptr_t)t);
135 GPR_ASSERT(need_tags & (1 << i));
136 need_tags &= ~(1 << i);
137 }
138 request_rw->Read(&recv_response, tag(0));
139 while (state.KeepRunning()) {
140 GPR_TIMER_SCOPE("BenchmarkCycle", 0);
141 response_rw.Write(send_response, tag(1));
142 while (true) {
143 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
144 if (t == tag(0)) {
145 request_rw->Read(&recv_response, tag(0));
146 } else if (t == tag(1)) {
147 break;
148 } else {
149 GPR_ASSERT(false);
150 }
151 }
152 }
153 response_rw.Finish(Status::OK, tag(1));
154 need_tags = (1 << 0) | (1 << 1);
155 while (need_tags) {
156 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
157 int i = static_cast<int>((intptr_t)t);
158 GPR_ASSERT(need_tags & (1 << i));
159 need_tags &= ~(1 << i);
160 }
161 }
162 fixture->Finish(state);
163 fixture.reset();
164 state.SetBytesProcessed(state.range(0) * state.iterations());
165 }
166 } // namespace testing
167 } // namespace grpc
168
169 #endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H
170