1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #ifndef TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
20 #define TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
21
22 #include <benchmark/benchmark.h>
23 #include <sstream>
24 #include "src/core/lib/profiling/timers.h"
25 #include "src/proto/grpc/testing/echo.grpc.pb.h"
26 #include "test/cpp/microbenchmarks/callback_test_service.h"
27 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
28 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
29
30 namespace grpc {
31 namespace testing {
32
33 /*******************************************************************************
34 * BENCHMARKING KERNELS
35 */
36
37 class BidiClient
38 : public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> {
39 public:
BidiClient(benchmark::State * state,EchoTestService::Stub * stub,ClientContext * cli_ctx,EchoRequest * request,EchoResponse * response)40 BidiClient(benchmark::State* state, EchoTestService::Stub* stub,
41 ClientContext* cli_ctx, EchoRequest* request,
42 EchoResponse* response)
43 : state_{state},
44 stub_{stub},
45 cli_ctx_{cli_ctx},
46 request_{request},
47 response_{response} {
48 msgs_size_ = state->range(0);
49 msgs_to_send_ = state->range(1);
50 StartNewRpc();
51 }
52
OnReadDone(bool ok)53 void OnReadDone(bool ok) override {
54 if (!ok) {
55 gpr_log(GPR_ERROR, "Client read failed");
56 return;
57 }
58 MaybeWrite();
59 }
60
OnWriteDone(bool ok)61 void OnWriteDone(bool ok) override {
62 if (!ok) {
63 gpr_log(GPR_ERROR, "Client write failed");
64 return;
65 }
66 writes_complete_++;
67 StartRead(response_);
68 }
69
OnDone(const Status & s)70 void OnDone(const Status& s) override {
71 GPR_ASSERT(s.ok());
72 GPR_ASSERT(writes_complete_ == msgs_to_send_);
73 if (state_->KeepRunning()) {
74 writes_complete_ = 0;
75 StartNewRpc();
76 } else {
77 std::unique_lock<std::mutex> l(mu);
78 done = true;
79 cv.notify_one();
80 }
81 }
82
StartNewRpc()83 void StartNewRpc() {
84 cli_ctx_->~ClientContext();
85 new (cli_ctx_) ClientContext();
86 cli_ctx_->AddMetadata(kServerMessageSize, std::to_string(msgs_size_));
87 stub_->experimental_async()->BidiStream(cli_ctx_, this);
88 MaybeWrite();
89 StartCall();
90 }
91
Await()92 void Await() {
93 std::unique_lock<std::mutex> l(mu);
94 while (!done) {
95 cv.wait(l);
96 }
97 }
98
99 private:
MaybeWrite()100 void MaybeWrite() {
101 if (writes_complete_ < msgs_to_send_) {
102 StartWrite(request_);
103 } else {
104 StartWritesDone();
105 }
106 }
107
108 benchmark::State* state_;
109 EchoTestService::Stub* stub_;
110 ClientContext* cli_ctx_;
111 EchoRequest* request_;
112 EchoResponse* response_;
113 int writes_complete_{0};
114 int msgs_to_send_;
115 int msgs_size_;
116 std::mutex mu;
117 std::condition_variable cv;
118 bool done = false;
119 };
120
121 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_CallbackBidiStreaming(benchmark::State & state)122 static void BM_CallbackBidiStreaming(benchmark::State& state) {
123 int message_size = state.range(0);
124 int max_ping_pongs = state.range(1);
125 CallbackStreamingTestService service;
126 std::unique_ptr<Fixture> fixture(new Fixture(&service));
127 std::unique_ptr<EchoTestService::Stub> stub_(
128 EchoTestService::NewStub(fixture->channel()));
129 EchoRequest request;
130 EchoResponse response;
131 ClientContext cli_ctx;
132 if (message_size > 0) {
133 request.set_message(std::string(message_size, 'a'));
134 } else {
135 request.set_message("");
136 }
137 if (state.KeepRunning()) {
138 GPR_TIMER_SCOPE("BenchmarkCycle", 0);
139 BidiClient test{&state, stub_.get(), &cli_ctx, &request, &response};
140 test.Await();
141 }
142 fixture->Finish(state);
143 fixture.reset();
144 state.SetBytesProcessed(2 * message_size * max_ping_pongs *
145 state.iterations());
146 }
147
148 } // namespace testing
149 } // namespace grpc
150 #endif // TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
151