• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H
20 #define TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H
21 
22 #include <grpc/support/atm.h>
23 #include <grpc/support/log.h>
24 #include <grpcpp/channel.h>
25 #include <grpcpp/create_channel.h>
26 #include <grpcpp/security/credentials.h>
27 #include <grpcpp/security/server_credentials.h>
28 #include <grpcpp/server.h>
29 #include <grpcpp/server_builder.h>
30 
31 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
32 #include "src/core/lib/channel/channel_args.h"
33 #include "src/core/lib/iomgr/endpoint.h"
34 #include "src/core/lib/iomgr/endpoint_pair.h"
35 #include "src/core/lib/iomgr/exec_ctx.h"
36 #include "src/core/lib/iomgr/tcp_posix.h"
37 #include "src/core/lib/surface/channel.h"
38 #include "src/core/lib/surface/completion_queue.h"
39 #include "src/core/lib/surface/server.h"
40 #include "test/core/util/passthru_endpoint.h"
41 #include "test/core/util/port.h"
42 
43 #include "src/cpp/client/create_channel_internal.h"
44 #include "test/cpp/microbenchmarks/helpers.h"
45 
46 namespace grpc {
47 namespace testing {
48 
49 class FixtureConfiguration {
50  public:
~FixtureConfiguration()51   virtual ~FixtureConfiguration() {}
ApplyCommonChannelArguments(ChannelArguments * c)52   virtual void ApplyCommonChannelArguments(ChannelArguments* c) const {
53     c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX);
54     c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX);
55   }
56 
ApplyCommonServerBuilderConfig(ServerBuilder * b)57   virtual void ApplyCommonServerBuilderConfig(ServerBuilder* b) const {
58     b->SetMaxReceiveMessageSize(INT_MAX);
59     b->SetMaxSendMessageSize(INT_MAX);
60   }
61 };
62 
63 class BaseFixture : public TrackCounters {};
64 
65 class FullstackFixture : public BaseFixture {
66  public:
FullstackFixture(Service * service,const FixtureConfiguration & config,const std::string & address)67   FullstackFixture(Service* service, const FixtureConfiguration& config,
68                    const std::string& address) {
69     ServerBuilder b;
70     if (address.length() > 0) {
71       b.AddListeningPort(address, InsecureServerCredentials());
72     }
73     cq_ = b.AddCompletionQueue(true);
74     b.RegisterService(service);
75     config.ApplyCommonServerBuilderConfig(&b);
76     server_ = b.BuildAndStart();
77     ChannelArguments args;
78     config.ApplyCommonChannelArguments(&args);
79     if (address.length() > 0) {
80       channel_ = ::grpc::CreateCustomChannel(
81           address, InsecureChannelCredentials(), args);
82     } else {
83       channel_ = server_->InProcessChannel(args);
84     }
85   }
86 
~FullstackFixture()87   virtual ~FullstackFixture() {
88     server_->Shutdown();
89     cq_->Shutdown();
90     void* tag;
91     bool ok;
92     while (cq_->Next(&tag, &ok)) {
93     }
94   }
95 
AddToLabel(std::ostream & out,benchmark::State & state)96   void AddToLabel(std::ostream& out, benchmark::State& state) {
97     BaseFixture::AddToLabel(out, state);
98     out << " polls/iter:"
99         << static_cast<double>(grpc_get_cq_poll_num(this->cq()->cq())) /
100                state.iterations();
101   }
102 
cq()103   ServerCompletionQueue* cq() { return cq_.get(); }
channel()104   std::shared_ptr<Channel> channel() { return channel_; }
105 
106  private:
107   std::unique_ptr<Server> server_;
108   std::unique_ptr<ServerCompletionQueue> cq_;
109   std::shared_ptr<Channel> channel_;
110 };
111 
112 class TCP : public FullstackFixture {
113  public:
114   TCP(Service* service, const FixtureConfiguration& fixture_configuration =
115                             FixtureConfiguration())
FullstackFixture(service,fixture_configuration,MakeAddress (& port_))116       : FullstackFixture(service, fixture_configuration, MakeAddress(&port_)) {}
117 
~TCP()118   ~TCP() { grpc_recycle_unused_port(port_); }
119 
120  private:
121   int port_;
122 
MakeAddress(int * port)123   static std::string MakeAddress(int* port) {
124     *port = grpc_pick_unused_port_or_die();
125     std::stringstream addr;
126     addr << "localhost:" << *port;
127     return addr.str();
128   }
129 };
130 
131 class UDS : public FullstackFixture {
132  public:
133   UDS(Service* service, const FixtureConfiguration& fixture_configuration =
134                             FixtureConfiguration())
FullstackFixture(service,fixture_configuration,MakeAddress (& port_))135       : FullstackFixture(service, fixture_configuration, MakeAddress(&port_)) {}
136 
~UDS()137   ~UDS() { grpc_recycle_unused_port(port_); }
138 
139  private:
140   int port_;
141 
MakeAddress(int * port)142   static std::string MakeAddress(int* port) {
143     *port = grpc_pick_unused_port_or_die();  // just for a unique id - not a
144                                              // real port
145     std::stringstream addr;
146     addr << "unix:/tmp/bm_fullstack." << *port;
147     return addr.str();
148   }
149 };
150 
151 class InProcess : public FullstackFixture {
152  public:
153   InProcess(Service* service,
154             const FixtureConfiguration& fixture_configuration =
155                 FixtureConfiguration())
156       : FullstackFixture(service, fixture_configuration, "") {}
~InProcess()157   ~InProcess() {}
158 };
159 
160 class EndpointPairFixture : public BaseFixture {
161  public:
EndpointPairFixture(Service * service,grpc_endpoint_pair endpoints,const FixtureConfiguration & fixture_configuration)162   EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints,
163                       const FixtureConfiguration& fixture_configuration)
164       : endpoint_pair_(endpoints) {
165     ServerBuilder b;
166     cq_ = b.AddCompletionQueue(true);
167     b.RegisterService(service);
168     fixture_configuration.ApplyCommonServerBuilderConfig(&b);
169     server_ = b.BuildAndStart();
170 
171     grpc_core::ExecCtx exec_ctx;
172 
173     /* add server endpoint to server_
174      * */
175     {
176       const grpc_channel_args* server_args =
177           grpc_server_get_channel_args(server_->c_server());
178       server_transport_ = grpc_create_chttp2_transport(
179           server_args, endpoints.server, false /* is_client */);
180 
181       for (grpc_pollset* pollset :
182            grpc_server_get_pollsets(server_->c_server())) {
183         grpc_endpoint_add_to_pollset(endpoints.server, pollset);
184       }
185 
186       grpc_server_setup_transport(server_->c_server(), server_transport_,
187                                   nullptr, server_args, nullptr);
188       grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr);
189     }
190 
191     /* create channel */
192     {
193       ChannelArguments args;
194       args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority");
195       fixture_configuration.ApplyCommonChannelArguments(&args);
196 
197       grpc_channel_args c_args = args.c_channel_args();
198       client_transport_ =
199           grpc_create_chttp2_transport(&c_args, endpoints.client, true);
200       GPR_ASSERT(client_transport_);
201       grpc_channel* channel = grpc_channel_create(
202           "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport_);
203       grpc_chttp2_transport_start_reading(client_transport_, nullptr, nullptr);
204 
205       channel_ = ::grpc::CreateChannelInternal(
206           "", channel,
207           std::vector<std::unique_ptr<
208               experimental::ClientInterceptorFactoryInterface>>());
209     }
210   }
211 
~EndpointPairFixture()212   virtual ~EndpointPairFixture() {
213     server_->Shutdown();
214     cq_->Shutdown();
215     void* tag;
216     bool ok;
217     while (cq_->Next(&tag, &ok)) {
218     }
219   }
220 
AddToLabel(std::ostream & out,benchmark::State & state)221   void AddToLabel(std::ostream& out, benchmark::State& state) {
222     BaseFixture::AddToLabel(out, state);
223     out << " polls/iter:"
224         << static_cast<double>(grpc_get_cq_poll_num(this->cq()->cq())) /
225                state.iterations();
226   }
227 
cq()228   ServerCompletionQueue* cq() { return cq_.get(); }
channel()229   std::shared_ptr<Channel> channel() { return channel_; }
230 
231  protected:
232   grpc_endpoint_pair endpoint_pair_;
233   grpc_transport* client_transport_;
234   grpc_transport* server_transport_;
235 
236  private:
237   std::unique_ptr<Server> server_;
238   std::unique_ptr<ServerCompletionQueue> cq_;
239   std::shared_ptr<Channel> channel_;
240 };
241 
242 class SockPair : public EndpointPairFixture {
243  public:
244   SockPair(Service* service, const FixtureConfiguration& fixture_configuration =
245                                  FixtureConfiguration())
246       : EndpointPairFixture(service,
247                             grpc_iomgr_create_endpoint_pair("test", nullptr),
248                             fixture_configuration) {}
249 };
250 
251 /* Use InProcessCHTTP2 instead. This class (with stats as an explicit parameter)
252    is here only to be able to initialize both the base class and stats_ with the
253    same stats instance without accessing the stats_ fields before the object is
254    properly initialized. */
255 class InProcessCHTTP2WithExplicitStats : public EndpointPairFixture {
256  public:
InProcessCHTTP2WithExplicitStats(Service * service,grpc_passthru_endpoint_stats * stats,const FixtureConfiguration & fixture_configuration)257   InProcessCHTTP2WithExplicitStats(
258       Service* service, grpc_passthru_endpoint_stats* stats,
259       const FixtureConfiguration& fixture_configuration)
260       : EndpointPairFixture(service, MakeEndpoints(stats),
261                             fixture_configuration),
262         stats_(stats) {}
263 
~InProcessCHTTP2WithExplicitStats()264   virtual ~InProcessCHTTP2WithExplicitStats() {
265     if (stats_ != nullptr) {
266       grpc_passthru_endpoint_stats_destroy(stats_);
267     }
268   }
269 
AddToLabel(std::ostream & out,benchmark::State & state)270   void AddToLabel(std::ostream& out, benchmark::State& state) {
271     EndpointPairFixture::AddToLabel(out, state);
272     out << " writes/iter:"
273         << static_cast<double>(gpr_atm_no_barrier_load(&stats_->num_writes)) /
274                static_cast<double>(state.iterations());
275   }
276 
277  private:
278   grpc_passthru_endpoint_stats* stats_;
279 
MakeEndpoints(grpc_passthru_endpoint_stats * stats)280   static grpc_endpoint_pair MakeEndpoints(grpc_passthru_endpoint_stats* stats) {
281     grpc_endpoint_pair p;
282     grpc_passthru_endpoint_create(&p.client, &p.server,
283                                   LibraryInitializer::get().rq(), stats);
284     return p;
285   }
286 };
287 
288 class InProcessCHTTP2 : public InProcessCHTTP2WithExplicitStats {
289  public:
290   InProcessCHTTP2(Service* service,
291                   const FixtureConfiguration& fixture_configuration =
292                       FixtureConfiguration())
InProcessCHTTP2WithExplicitStats(service,grpc_passthru_endpoint_stats_create (),fixture_configuration)293       : InProcessCHTTP2WithExplicitStats(service,
294                                          grpc_passthru_endpoint_stats_create(),
295                                          fixture_configuration) {}
296 };
297 
298 ////////////////////////////////////////////////////////////////////////////////
299 // Minimal stack fixtures
300 
301 class MinStackConfiguration : public FixtureConfiguration {
ApplyCommonChannelArguments(ChannelArguments * a)302   void ApplyCommonChannelArguments(ChannelArguments* a) const override {
303     a->SetInt(GRPC_ARG_MINIMAL_STACK, 1);
304     FixtureConfiguration::ApplyCommonChannelArguments(a);
305   }
306 
ApplyCommonServerBuilderConfig(ServerBuilder * b)307   void ApplyCommonServerBuilderConfig(ServerBuilder* b) const override {
308     b->AddChannelArgument(GRPC_ARG_MINIMAL_STACK, 1);
309     FixtureConfiguration::ApplyCommonServerBuilderConfig(b);
310   }
311 };
312 
313 template <class Base>
314 class MinStackize : public Base {
315  public:
MinStackize(Service * service)316   MinStackize(Service* service) : Base(service, MinStackConfiguration()) {}
317 };
318 
319 typedef MinStackize<TCP> MinTCP;
320 typedef MinStackize<UDS> MinUDS;
321 typedef MinStackize<InProcess> MinInProcess;
322 typedef MinStackize<SockPair> MinSockPair;
323 typedef MinStackize<InProcessCHTTP2> MinInProcessCHTTP2;
324 
325 }  // namespace testing
326 }  // namespace grpc
327 
328 #endif
329