1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #ifndef GRPC_TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
20 #define GRPC_TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
21
22 #include <benchmark/benchmark.h>
23
24 #include <sstream>
25
26 #include "absl/log/check.h"
27 #include "absl/log/log.h"
28 #include "src/proto/grpc/testing/echo.grpc.pb.h"
29 #include "test/cpp/microbenchmarks/callback_test_service.h"
30 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
31 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
32
33 namespace grpc {
34 namespace testing {
35
36 //******************************************************************************
37 // BENCHMARKING KERNELS
38 //
39
40 class BidiClient : public grpc::ClientBidiReactor<EchoRequest, EchoResponse> {
41 public:
BidiClient(benchmark::State * state,EchoTestService::Stub * stub,ClientContext * cli_ctx,EchoRequest * request,EchoResponse * response)42 BidiClient(benchmark::State* state, EchoTestService::Stub* stub,
43 ClientContext* cli_ctx, EchoRequest* request,
44 EchoResponse* response)
45 : state_{state},
46 stub_{stub},
47 cli_ctx_{cli_ctx},
48 request_{request},
49 response_{response} {
50 msgs_size_ = state->range(0);
51 msgs_to_send_ = state->range(1);
52 StartNewRpc();
53 }
54
OnReadDone(bool ok)55 void OnReadDone(bool ok) override {
56 if (!ok) {
57 LOG(ERROR) << "Client read failed";
58 return;
59 }
60 MaybeWrite();
61 }
62
OnWriteDone(bool ok)63 void OnWriteDone(bool ok) override {
64 if (!ok) {
65 LOG(ERROR) << "Client write failed";
66 return;
67 }
68 writes_complete_++;
69 StartRead(response_);
70 }
71
OnDone(const Status & s)72 void OnDone(const Status& s) override {
73 CHECK(s.ok());
74 CHECK_EQ(writes_complete_, msgs_to_send_);
75 if (state_->KeepRunning()) {
76 writes_complete_ = 0;
77 StartNewRpc();
78 } else {
79 std::unique_lock<std::mutex> l(mu);
80 done = true;
81 cv.notify_one();
82 }
83 }
84
StartNewRpc()85 void StartNewRpc() {
86 cli_ctx_->~ClientContext();
87 new (cli_ctx_) ClientContext();
88 cli_ctx_->AddMetadata(kServerMessageSize, std::to_string(msgs_size_));
89 stub_->async()->BidiStream(cli_ctx_, this);
90 MaybeWrite();
91 StartCall();
92 }
93
Await()94 void Await() {
95 std::unique_lock<std::mutex> l(mu);
96 while (!done) {
97 cv.wait(l);
98 }
99 }
100
101 private:
MaybeWrite()102 void MaybeWrite() {
103 if (writes_complete_ < msgs_to_send_) {
104 StartWrite(request_);
105 } else {
106 StartWritesDone();
107 }
108 }
109
110 benchmark::State* state_;
111 EchoTestService::Stub* stub_;
112 ClientContext* cli_ctx_;
113 EchoRequest* request_;
114 EchoResponse* response_;
115 int writes_complete_{0};
116 int msgs_to_send_;
117 int msgs_size_;
118 std::mutex mu;
119 std::condition_variable cv;
120 bool done = false;
121 };
122
123 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_CallbackBidiStreaming(benchmark::State & state)124 static void BM_CallbackBidiStreaming(benchmark::State& state) {
125 int message_size = state.range(0);
126 int max_ping_pongs = state.range(1);
127 CallbackStreamingTestService service;
128 std::unique_ptr<Fixture> fixture(new Fixture(&service));
129 std::unique_ptr<EchoTestService::Stub> stub_(
130 EchoTestService::NewStub(fixture->channel()));
131 EchoRequest request;
132 EchoResponse response;
133 ClientContext cli_ctx;
134 if (message_size > 0) {
135 request.set_message(std::string(message_size, 'a'));
136 } else {
137 request.set_message("");
138 }
139 if (state.KeepRunning()) {
140 BidiClient test{&state, stub_.get(), &cli_ctx, &request, &response};
141 test.Await();
142 }
143 fixture.reset();
144 state.SetBytesProcessed(2 * message_size * max_ping_pongs *
145 state.iterations());
146 }
147
148 } // namespace testing
149 } // namespace grpc
150 #endif // GRPC_TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
151