• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
20 #define GRPC_TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
21 
22 #include <benchmark/benchmark.h>
23 
24 #include <sstream>
25 
26 #include "absl/log/check.h"
27 #include "absl/log/log.h"
28 #include "src/proto/grpc/testing/echo.grpc.pb.h"
29 #include "test/cpp/microbenchmarks/callback_test_service.h"
30 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
31 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
32 
33 namespace grpc {
34 namespace testing {
35 
36 //******************************************************************************
37 // BENCHMARKING KERNELS
38 //
39 
40 class BidiClient : public grpc::ClientBidiReactor<EchoRequest, EchoResponse> {
41  public:
BidiClient(benchmark::State * state,EchoTestService::Stub * stub,ClientContext * cli_ctx,EchoRequest * request,EchoResponse * response)42   BidiClient(benchmark::State* state, EchoTestService::Stub* stub,
43              ClientContext* cli_ctx, EchoRequest* request,
44              EchoResponse* response)
45       : state_{state},
46         stub_{stub},
47         cli_ctx_{cli_ctx},
48         request_{request},
49         response_{response} {
50     msgs_size_ = state->range(0);
51     msgs_to_send_ = state->range(1);
52     StartNewRpc();
53   }
54 
OnReadDone(bool ok)55   void OnReadDone(bool ok) override {
56     if (!ok) {
57       LOG(ERROR) << "Client read failed";
58       return;
59     }
60     MaybeWrite();
61   }
62 
OnWriteDone(bool ok)63   void OnWriteDone(bool ok) override {
64     if (!ok) {
65       LOG(ERROR) << "Client write failed";
66       return;
67     }
68     writes_complete_++;
69     StartRead(response_);
70   }
71 
OnDone(const Status & s)72   void OnDone(const Status& s) override {
73     CHECK(s.ok());
74     CHECK_EQ(writes_complete_, msgs_to_send_);
75     if (state_->KeepRunning()) {
76       writes_complete_ = 0;
77       StartNewRpc();
78     } else {
79       std::unique_lock<std::mutex> l(mu);
80       done = true;
81       cv.notify_one();
82     }
83   }
84 
StartNewRpc()85   void StartNewRpc() {
86     cli_ctx_->~ClientContext();
87     new (cli_ctx_) ClientContext();
88     cli_ctx_->AddMetadata(kServerMessageSize, std::to_string(msgs_size_));
89     stub_->async()->BidiStream(cli_ctx_, this);
90     MaybeWrite();
91     StartCall();
92   }
93 
Await()94   void Await() {
95     std::unique_lock<std::mutex> l(mu);
96     while (!done) {
97       cv.wait(l);
98     }
99   }
100 
101  private:
MaybeWrite()102   void MaybeWrite() {
103     if (writes_complete_ < msgs_to_send_) {
104       StartWrite(request_);
105     } else {
106       StartWritesDone();
107     }
108   }
109 
110   benchmark::State* state_;
111   EchoTestService::Stub* stub_;
112   ClientContext* cli_ctx_;
113   EchoRequest* request_;
114   EchoResponse* response_;
115   int writes_complete_{0};
116   int msgs_to_send_;
117   int msgs_size_;
118   std::mutex mu;
119   std::condition_variable cv;
120   bool done = false;
121 };
122 
123 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_CallbackBidiStreaming(benchmark::State & state)124 static void BM_CallbackBidiStreaming(benchmark::State& state) {
125   int message_size = state.range(0);
126   int max_ping_pongs = state.range(1);
127   CallbackStreamingTestService service;
128   std::unique_ptr<Fixture> fixture(new Fixture(&service));
129   std::unique_ptr<EchoTestService::Stub> stub_(
130       EchoTestService::NewStub(fixture->channel()));
131   EchoRequest request;
132   EchoResponse response;
133   ClientContext cli_ctx;
134   if (message_size > 0) {
135     request.set_message(std::string(message_size, 'a'));
136   } else {
137     request.set_message("");
138   }
139   if (state.KeepRunning()) {
140     BidiClient test{&state, stub_.get(), &cli_ctx, &request, &response};
141     test.Await();
142   }
143   fixture.reset();
144   state.SetBytesProcessed(2 * message_size * max_ping_pongs *
145                           state.iterations());
146 }
147 
148 }  // namespace testing
149 }  // namespace grpc
150 #endif  // GRPC_TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
151