• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H
20 #define GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H
21 
22 #include <grpc/grpc.h>
23 #include <grpc/support/atm.h>
24 #include <grpcpp/channel.h>
25 #include <grpcpp/create_channel.h>
26 #include <grpcpp/security/credentials.h>
27 #include <grpcpp/security/server_credentials.h>
28 #include <grpcpp/server.h>
29 #include <grpcpp/server_builder.h>
30 
31 #include "absl/log/check.h"
32 #include "src/core/config/core_configuration.h"
33 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/iomgr/endpoint.h"
36 #include "src/core/lib/iomgr/endpoint_pair.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/iomgr/tcp_posix.h"
39 #include "src/core/lib/surface/channel.h"
40 #include "src/core/lib/surface/channel_create.h"
41 #include "src/core/lib/surface/completion_queue.h"
42 #include "src/core/server/server.h"
43 #include "src/core/util/crash.h"
44 #include "src/cpp/client/create_channel_internal.h"
45 #include "test/core/test_util/port.h"
46 #include "test/core/test_util/test_config.h"
47 #include "test/cpp/microbenchmarks/helpers.h"
48 
49 namespace grpc {
50 namespace testing {
51 
52 class FixtureConfiguration {
53  public:
~FixtureConfiguration()54   virtual ~FixtureConfiguration() {}
ApplyCommonChannelArguments(ChannelArguments * c)55   virtual void ApplyCommonChannelArguments(ChannelArguments* c) const {
56     c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX);
57     c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX);
58     c->SetInt(GRPC_ARG_ENABLE_RETRIES, 0);
59     c->SetResourceQuota(ResourceQuota());
60   }
61 
ApplyCommonServerBuilderConfig(ServerBuilder * b)62   virtual void ApplyCommonServerBuilderConfig(ServerBuilder* b) const {
63     b->SetMaxReceiveMessageSize(INT_MAX);
64     b->SetMaxSendMessageSize(INT_MAX);
65   }
66 };
67 
68 class BaseFixture {
69  public:
70   virtual ~BaseFixture() = default;
71 };
72 
73 class FullstackFixture : public BaseFixture {
74  public:
FullstackFixture(Service * service,const FixtureConfiguration & config,const std::string & address)75   FullstackFixture(Service* service, const FixtureConfiguration& config,
76                    const std::string& address) {
77     ServerBuilder b;
78     if (!address.empty()) {
79       b.AddListeningPort(address, InsecureServerCredentials());
80     }
81     cq_ = b.AddCompletionQueue(true);
82     b.RegisterService(service);
83     config.ApplyCommonServerBuilderConfig(&b);
84     server_ = b.BuildAndStart();
85     ChannelArguments args;
86     config.ApplyCommonChannelArguments(&args);
87     if (!address.empty()) {
88       channel_ = grpc::CreateCustomChannel(address,
89                                            InsecureChannelCredentials(), args);
90     } else {
91       channel_ = server_->InProcessChannel(args);
92     }
93   }
94 
~FullstackFixture()95   ~FullstackFixture() override {
96     channel_.reset();
97     server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
98     cq_->Shutdown();
99     void* tag;
100     bool ok;
101     while (cq_->Next(&tag, &ok)) {
102     }
103   }
104 
cq()105   ServerCompletionQueue* cq() { return cq_.get(); }
channel()106   std::shared_ptr<Channel> channel() { return channel_; }
107 
108  private:
109   std::unique_ptr<Server> server_;
110   std::unique_ptr<ServerCompletionQueue> cq_;
111   std::shared_ptr<Channel> channel_;
112 };
113 
114 class TCP : public FullstackFixture {
115  public:
116   explicit TCP(Service* service,
117                const FixtureConfiguration& fixture_configuration =
118                    FixtureConfiguration())
FullstackFixture(service,fixture_configuration,MakeAddress (& port_))119       : FullstackFixture(service, fixture_configuration, MakeAddress(&port_)) {}
120 
~TCP()121   ~TCP() override { grpc_recycle_unused_port(port_); }
122 
123  private:
124   int port_;
125 
MakeAddress(int * port)126   static std::string MakeAddress(int* port) {
127     *port = grpc_pick_unused_port_or_die();
128     std::stringstream addr;
129     addr << "localhost:" << *port;
130     return addr.str();
131   }
132 };
133 
134 class UDS : public FullstackFixture {
135  public:
136   explicit UDS(Service* service,
137                const FixtureConfiguration& fixture_configuration =
138                    FixtureConfiguration())
FullstackFixture(service,fixture_configuration,MakeAddress (& port_))139       : FullstackFixture(service, fixture_configuration, MakeAddress(&port_)) {}
140 
~UDS()141   ~UDS() override { grpc_recycle_unused_port(port_); }
142 
143  private:
144   int port_;
145 
MakeAddress(int * port)146   static std::string MakeAddress(int* port) {
147     *port = grpc_pick_unused_port_or_die();  // just for a unique id - not a
148                                              // real port
149     std::stringstream addr;
150     addr << "unix:/tmp/bm_fullstack." << *port;
151     return addr.str();
152   }
153 };
154 
155 class InProcess : public FullstackFixture {
156  public:
157   explicit InProcess(Service* service,
158                      const FixtureConfiguration& fixture_configuration =
159                          FixtureConfiguration())
160       : FullstackFixture(service, fixture_configuration, "") {}
~InProcess()161   ~InProcess() override {}
162 };
163 
164 class EndpointPairFixture : public BaseFixture {
165  public:
EndpointPairFixture(Service * service,grpc_endpoint_pair endpoints,const FixtureConfiguration & fixture_configuration)166   EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints,
167                       const FixtureConfiguration& fixture_configuration)
168       : endpoint_pair_(endpoints) {
169     ServerBuilder b;
170     cq_ = b.AddCompletionQueue(true);
171     b.RegisterService(service);
172     fixture_configuration.ApplyCommonServerBuilderConfig(&b);
173     server_ = b.BuildAndStart();
174     grpc_core::ExecCtx exec_ctx;
175     // add server endpoint to server_
176     //
177     {
178       grpc_core::Server* core_server =
179           grpc_core::Server::FromC(server_->c_server());
180       grpc_core::ChannelArgs server_args = core_server->channel_args();
181       server_transport_ = grpc_create_chttp2_transport(
182           server_args,
183           grpc_core::OrphanablePtr<grpc_endpoint>(endpoints.server),
184           /*is_client=*/false);
185       for (grpc_pollset* pollset : core_server->pollsets()) {
186         grpc_endpoint_add_to_pollset(endpoints.server, pollset);
187       }
188 
189       CHECK(GRPC_LOG_IF_ERROR("SetupTransport", core_server->SetupTransport(
190                                                     server_transport_, nullptr,
191                                                     server_args, nullptr)));
192       grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr,
193                                           nullptr, nullptr);
194     }
195 
196     // create channel
197     {
198       grpc_core::ChannelArgs c_args;
199       {
200         ChannelArguments args;
201         args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority");
202         fixture_configuration.ApplyCommonChannelArguments(&args);
203         // precondition
204         grpc_channel_args tmp_args;
205         args.SetChannelArgs(&tmp_args);
206         c_args = grpc_core::CoreConfiguration::Get()
207                      .channel_args_preconditioning()
208                      .PreconditionChannelArgs(&tmp_args);
209       }
210       client_transport_ = grpc_create_chttp2_transport(
211           c_args, grpc_core::OrphanablePtr<grpc_endpoint>(endpoints.client),
212           /*is_client=*/true);
213       CHECK(client_transport_);
214       grpc_channel* channel =
215           grpc_core::ChannelCreate("target", c_args, GRPC_CLIENT_DIRECT_CHANNEL,
216                                    client_transport_)
217               ->release()
218               ->c_ptr();
219       grpc_chttp2_transport_start_reading(client_transport_, nullptr, nullptr,
220                                           nullptr, nullptr);
221 
222       channel_ = grpc::CreateChannelInternal(
223           "", channel,
224           std::vector<std::unique_ptr<
225               experimental::ClientInterceptorFactoryInterface>>());
226     }
227   }
228 
~EndpointPairFixture()229   ~EndpointPairFixture() override {
230     server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0));
231     cq_->Shutdown();
232     void* tag;
233     bool ok;
234     while (cq_->Next(&tag, &ok)) {
235     }
236   }
237 
cq()238   ServerCompletionQueue* cq() { return cq_.get(); }
channel()239   std::shared_ptr<Channel> channel() { return channel_; }
240 
241  protected:
242   grpc_endpoint_pair endpoint_pair_;
243   grpc_core::Transport* client_transport_;
244   grpc_core::Transport* server_transport_;
245 
246  private:
247   std::unique_ptr<Server> server_;
248   std::unique_ptr<ServerCompletionQueue> cq_;
249   std::shared_ptr<Channel> channel_;
250 };
251 
252 class SockPair : public EndpointPairFixture {
253  public:
254   explicit SockPair(Service* service,
255                     const FixtureConfiguration& fixture_configuration =
256                         FixtureConfiguration())
257       : EndpointPairFixture(service,
258                             grpc_iomgr_create_endpoint_pair("test", nullptr),
259                             fixture_configuration) {}
260 };
261 
262 ////////////////////////////////////////////////////////////////////////////////
263 // Minimal stack fixtures
264 
265 class MinStackConfiguration : public FixtureConfiguration {
ApplyCommonChannelArguments(ChannelArguments * a)266   void ApplyCommonChannelArguments(ChannelArguments* a) const override {
267     a->SetInt(GRPC_ARG_MINIMAL_STACK, 1);
268     FixtureConfiguration::ApplyCommonChannelArguments(a);
269   }
270 
ApplyCommonServerBuilderConfig(ServerBuilder * b)271   void ApplyCommonServerBuilderConfig(ServerBuilder* b) const override {
272     b->AddChannelArgument(GRPC_ARG_MINIMAL_STACK, 1);
273     FixtureConfiguration::ApplyCommonServerBuilderConfig(b);
274   }
275 };
276 
277 template <class Base>
278 class MinStackize : public Base {
279  public:
MinStackize(Service * service)280   explicit MinStackize(Service* service)
281       : Base(service, MinStackConfiguration()) {}
282 };
283 
284 typedef MinStackize<TCP> MinTCP;
285 typedef MinStackize<UDS> MinUDS;
286 typedef MinStackize<InProcess> MinInProcess;
287 typedef MinStackize<SockPair> MinSockPair;
288 
289 }  // namespace testing
290 }  // namespace grpc
291 
292 #endif  // GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H
293