• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
20 #define TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
21 
22 #include <benchmark/benchmark.h>
23 #include <sstream>
24 #include "src/core/lib/profiling/timers.h"
25 #include "src/proto/grpc/testing/echo.grpc.pb.h"
26 #include "test/cpp/microbenchmarks/callback_test_service.h"
27 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
28 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
29 
30 namespace grpc {
31 namespace testing {
32 
33 /*******************************************************************************
34  * BENCHMARKING KERNELS
35  */
36 
37 class BidiClient
38     : public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> {
39  public:
BidiClient(benchmark::State * state,EchoTestService::Stub * stub,ClientContext * cli_ctx,EchoRequest * request,EchoResponse * response)40   BidiClient(benchmark::State* state, EchoTestService::Stub* stub,
41              ClientContext* cli_ctx, EchoRequest* request,
42              EchoResponse* response)
43       : state_{state},
44         stub_{stub},
45         cli_ctx_{cli_ctx},
46         request_{request},
47         response_{response} {
48     msgs_size_ = state->range(0);
49     msgs_to_send_ = state->range(1);
50     StartNewRpc();
51   }
52 
OnReadDone(bool ok)53   void OnReadDone(bool ok) override {
54     if (!ok) {
55       gpr_log(GPR_ERROR, "Client read failed");
56       return;
57     }
58     MaybeWrite();
59   }
60 
OnWriteDone(bool ok)61   void OnWriteDone(bool ok) override {
62     if (!ok) {
63       gpr_log(GPR_ERROR, "Client write failed");
64       return;
65     }
66     writes_complete_++;
67     StartRead(response_);
68   }
69 
OnDone(const Status & s)70   void OnDone(const Status& s) override {
71     GPR_ASSERT(s.ok());
72     GPR_ASSERT(writes_complete_ == msgs_to_send_);
73     if (state_->KeepRunning()) {
74       writes_complete_ = 0;
75       StartNewRpc();
76     } else {
77       std::unique_lock<std::mutex> l(mu);
78       done = true;
79       cv.notify_one();
80     }
81   }
82 
StartNewRpc()83   void StartNewRpc() {
84     cli_ctx_->~ClientContext();
85     new (cli_ctx_) ClientContext();
86     cli_ctx_->AddMetadata(kServerMessageSize, std::to_string(msgs_size_));
87     stub_->experimental_async()->BidiStream(cli_ctx_, this);
88     MaybeWrite();
89     StartCall();
90   }
91 
Await()92   void Await() {
93     std::unique_lock<std::mutex> l(mu);
94     while (!done) {
95       cv.wait(l);
96     }
97   }
98 
99  private:
MaybeWrite()100   void MaybeWrite() {
101     if (writes_complete_ < msgs_to_send_) {
102       StartWrite(request_);
103     } else {
104       StartWritesDone();
105     }
106   }
107 
108   benchmark::State* state_;
109   EchoTestService::Stub* stub_;
110   ClientContext* cli_ctx_;
111   EchoRequest* request_;
112   EchoResponse* response_;
113   int writes_complete_{0};
114   int msgs_to_send_;
115   int msgs_size_;
116   std::mutex mu;
117   std::condition_variable cv;
118   bool done = false;
119 };
120 
121 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
BM_CallbackBidiStreaming(benchmark::State & state)122 static void BM_CallbackBidiStreaming(benchmark::State& state) {
123   int message_size = state.range(0);
124   int max_ping_pongs = state.range(1);
125   CallbackStreamingTestService service;
126   std::unique_ptr<Fixture> fixture(new Fixture(&service));
127   std::unique_ptr<EchoTestService::Stub> stub_(
128       EchoTestService::NewStub(fixture->channel()));
129   EchoRequest request;
130   EchoResponse response;
131   ClientContext cli_ctx;
132   if (message_size > 0) {
133     request.set_message(std::string(message_size, 'a'));
134   } else {
135     request.set_message("");
136   }
137   if (state.KeepRunning()) {
138     GPR_TIMER_SCOPE("BenchmarkCycle", 0);
139     BidiClient test{&state, stub_.get(), &cli_ctx, &request, &response};
140     test.Await();
141   }
142   fixture->Finish(state);
143   fixture.reset();
144   state.SetBytesProcessed(2 * message_size * max_ping_pongs *
145                           state.iterations());
146 }
147 
148 }  // namespace testing
149 }  // namespace grpc
150 #endif  // TEST_CPP_MICROBENCHMARKS_CALLBACK_STREAMING_PING_PONG_H
151