1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H 20 #define TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H 21 22 #include <grpc/support/atm.h> 23 #include <grpc/support/log.h> 24 #include <grpcpp/channel.h> 25 #include <grpcpp/create_channel.h> 26 #include <grpcpp/security/credentials.h> 27 #include <grpcpp/security/server_credentials.h> 28 #include <grpcpp/server.h> 29 #include <grpcpp/server_builder.h> 30 31 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" 32 #include "src/core/lib/channel/channel_args.h" 33 #include "src/core/lib/iomgr/endpoint.h" 34 #include "src/core/lib/iomgr/endpoint_pair.h" 35 #include "src/core/lib/iomgr/exec_ctx.h" 36 #include "src/core/lib/iomgr/tcp_posix.h" 37 #include "src/core/lib/surface/channel.h" 38 #include "src/core/lib/surface/completion_queue.h" 39 #include "src/core/lib/surface/server.h" 40 #include "test/core/util/passthru_endpoint.h" 41 #include "test/core/util/port.h" 42 43 #include "src/cpp/client/create_channel_internal.h" 44 #include "test/cpp/microbenchmarks/helpers.h" 45 46 namespace grpc { 47 namespace testing { 48 49 class FixtureConfiguration { 50 public: ~FixtureConfiguration()51 virtual ~FixtureConfiguration() {} ApplyCommonChannelArguments(ChannelArguments * c)52 virtual void ApplyCommonChannelArguments(ChannelArguments* c) const { 53 c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX); 54 c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX); 55 } 56 ApplyCommonServerBuilderConfig(ServerBuilder * b)57 virtual void ApplyCommonServerBuilderConfig(ServerBuilder* b) const { 58 b->SetMaxReceiveMessageSize(INT_MAX); 59 b->SetMaxSendMessageSize(INT_MAX); 60 } 61 }; 62 63 class BaseFixture : public TrackCounters {}; 64 65 class FullstackFixture : public BaseFixture { 66 public: FullstackFixture(Service * service,const FixtureConfiguration & config,const std::string & address)67 FullstackFixture(Service* service, const FixtureConfiguration& config, 68 const std::string& address) { 69 ServerBuilder b; 70 if (address.length() > 0) { 71 b.AddListeningPort(address, InsecureServerCredentials()); 72 } 73 cq_ = b.AddCompletionQueue(true); 74 b.RegisterService(service); 75 config.ApplyCommonServerBuilderConfig(&b); 76 server_ = b.BuildAndStart(); 77 ChannelArguments args; 78 config.ApplyCommonChannelArguments(&args); 79 if (address.length() > 0) { 80 channel_ = ::grpc::CreateCustomChannel( 81 address, InsecureChannelCredentials(), args); 82 } else { 83 channel_ = server_->InProcessChannel(args); 84 } 85 } 86 ~FullstackFixture()87 virtual ~FullstackFixture() { 88 server_->Shutdown(); 89 cq_->Shutdown(); 90 void* tag; 91 bool ok; 92 while (cq_->Next(&tag, &ok)) { 93 } 94 } 95 AddToLabel(std::ostream & out,benchmark::State & state)96 void AddToLabel(std::ostream& out, benchmark::State& state) { 97 BaseFixture::AddToLabel(out, state); 98 out << " polls/iter:" 99 << static_cast<double>(grpc_get_cq_poll_num(this->cq()->cq())) / 100 state.iterations(); 101 } 102 cq()103 ServerCompletionQueue* cq() { return cq_.get(); } channel()104 std::shared_ptr<Channel> channel() { return channel_; } 105 106 private: 107 std::unique_ptr<Server> server_; 108 std::unique_ptr<ServerCompletionQueue> cq_; 109 std::shared_ptr<Channel> channel_; 110 }; 111 112 class TCP : public FullstackFixture { 113 public: 114 TCP(Service* service, const FixtureConfiguration& fixture_configuration = 115 FixtureConfiguration()) FullstackFixture(service,fixture_configuration,MakeAddress (& port_))116 : FullstackFixture(service, fixture_configuration, MakeAddress(&port_)) {} 117 ~TCP()118 ~TCP() { grpc_recycle_unused_port(port_); } 119 120 private: 121 int port_; 122 MakeAddress(int * port)123 static std::string MakeAddress(int* port) { 124 *port = grpc_pick_unused_port_or_die(); 125 std::stringstream addr; 126 addr << "localhost:" << *port; 127 return addr.str(); 128 } 129 }; 130 131 class UDS : public FullstackFixture { 132 public: 133 UDS(Service* service, const FixtureConfiguration& fixture_configuration = 134 FixtureConfiguration()) FullstackFixture(service,fixture_configuration,MakeAddress (& port_))135 : FullstackFixture(service, fixture_configuration, MakeAddress(&port_)) {} 136 ~UDS()137 ~UDS() { grpc_recycle_unused_port(port_); } 138 139 private: 140 int port_; 141 MakeAddress(int * port)142 static std::string MakeAddress(int* port) { 143 *port = grpc_pick_unused_port_or_die(); // just for a unique id - not a 144 // real port 145 std::stringstream addr; 146 addr << "unix:/tmp/bm_fullstack." << *port; 147 return addr.str(); 148 } 149 }; 150 151 class InProcess : public FullstackFixture { 152 public: 153 InProcess(Service* service, 154 const FixtureConfiguration& fixture_configuration = 155 FixtureConfiguration()) 156 : FullstackFixture(service, fixture_configuration, "") {} ~InProcess()157 ~InProcess() {} 158 }; 159 160 class EndpointPairFixture : public BaseFixture { 161 public: EndpointPairFixture(Service * service,grpc_endpoint_pair endpoints,const FixtureConfiguration & fixture_configuration)162 EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints, 163 const FixtureConfiguration& fixture_configuration) 164 : endpoint_pair_(endpoints) { 165 ServerBuilder b; 166 cq_ = b.AddCompletionQueue(true); 167 b.RegisterService(service); 168 fixture_configuration.ApplyCommonServerBuilderConfig(&b); 169 server_ = b.BuildAndStart(); 170 171 grpc_core::ExecCtx exec_ctx; 172 173 /* add server endpoint to server_ 174 * */ 175 { 176 const grpc_channel_args* server_args = 177 grpc_server_get_channel_args(server_->c_server()); 178 server_transport_ = grpc_create_chttp2_transport( 179 server_args, endpoints.server, false /* is_client */); 180 181 for (grpc_pollset* pollset : 182 grpc_server_get_pollsets(server_->c_server())) { 183 grpc_endpoint_add_to_pollset(endpoints.server, pollset); 184 } 185 186 grpc_server_setup_transport(server_->c_server(), server_transport_, 187 nullptr, server_args, nullptr); 188 grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr); 189 } 190 191 /* create channel */ 192 { 193 ChannelArguments args; 194 args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority"); 195 fixture_configuration.ApplyCommonChannelArguments(&args); 196 197 grpc_channel_args c_args = args.c_channel_args(); 198 client_transport_ = 199 grpc_create_chttp2_transport(&c_args, endpoints.client, true); 200 GPR_ASSERT(client_transport_); 201 grpc_channel* channel = grpc_channel_create( 202 "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport_); 203 grpc_chttp2_transport_start_reading(client_transport_, nullptr, nullptr); 204 205 channel_ = ::grpc::CreateChannelInternal( 206 "", channel, 207 std::vector<std::unique_ptr< 208 experimental::ClientInterceptorFactoryInterface>>()); 209 } 210 } 211 ~EndpointPairFixture()212 virtual ~EndpointPairFixture() { 213 server_->Shutdown(); 214 cq_->Shutdown(); 215 void* tag; 216 bool ok; 217 while (cq_->Next(&tag, &ok)) { 218 } 219 } 220 AddToLabel(std::ostream & out,benchmark::State & state)221 void AddToLabel(std::ostream& out, benchmark::State& state) { 222 BaseFixture::AddToLabel(out, state); 223 out << " polls/iter:" 224 << static_cast<double>(grpc_get_cq_poll_num(this->cq()->cq())) / 225 state.iterations(); 226 } 227 cq()228 ServerCompletionQueue* cq() { return cq_.get(); } channel()229 std::shared_ptr<Channel> channel() { return channel_; } 230 231 protected: 232 grpc_endpoint_pair endpoint_pair_; 233 grpc_transport* client_transport_; 234 grpc_transport* server_transport_; 235 236 private: 237 std::unique_ptr<Server> server_; 238 std::unique_ptr<ServerCompletionQueue> cq_; 239 std::shared_ptr<Channel> channel_; 240 }; 241 242 class SockPair : public EndpointPairFixture { 243 public: 244 SockPair(Service* service, const FixtureConfiguration& fixture_configuration = 245 FixtureConfiguration()) 246 : EndpointPairFixture(service, 247 grpc_iomgr_create_endpoint_pair("test", nullptr), 248 fixture_configuration) {} 249 }; 250 251 /* Use InProcessCHTTP2 instead. This class (with stats as an explicit parameter) 252 is here only to be able to initialize both the base class and stats_ with the 253 same stats instance without accessing the stats_ fields before the object is 254 properly initialized. */ 255 class InProcessCHTTP2WithExplicitStats : public EndpointPairFixture { 256 public: InProcessCHTTP2WithExplicitStats(Service * service,grpc_passthru_endpoint_stats * stats,const FixtureConfiguration & fixture_configuration)257 InProcessCHTTP2WithExplicitStats( 258 Service* service, grpc_passthru_endpoint_stats* stats, 259 const FixtureConfiguration& fixture_configuration) 260 : EndpointPairFixture(service, MakeEndpoints(stats), 261 fixture_configuration), 262 stats_(stats) {} 263 ~InProcessCHTTP2WithExplicitStats()264 virtual ~InProcessCHTTP2WithExplicitStats() { 265 if (stats_ != nullptr) { 266 grpc_passthru_endpoint_stats_destroy(stats_); 267 } 268 } 269 AddToLabel(std::ostream & out,benchmark::State & state)270 void AddToLabel(std::ostream& out, benchmark::State& state) { 271 EndpointPairFixture::AddToLabel(out, state); 272 out << " writes/iter:" 273 << static_cast<double>(gpr_atm_no_barrier_load(&stats_->num_writes)) / 274 static_cast<double>(state.iterations()); 275 } 276 277 private: 278 grpc_passthru_endpoint_stats* stats_; 279 MakeEndpoints(grpc_passthru_endpoint_stats * stats)280 static grpc_endpoint_pair MakeEndpoints(grpc_passthru_endpoint_stats* stats) { 281 grpc_endpoint_pair p; 282 grpc_passthru_endpoint_create(&p.client, &p.server, 283 LibraryInitializer::get().rq(), stats); 284 return p; 285 } 286 }; 287 288 class InProcessCHTTP2 : public InProcessCHTTP2WithExplicitStats { 289 public: 290 InProcessCHTTP2(Service* service, 291 const FixtureConfiguration& fixture_configuration = 292 FixtureConfiguration()) InProcessCHTTP2WithExplicitStats(service,grpc_passthru_endpoint_stats_create (),fixture_configuration)293 : InProcessCHTTP2WithExplicitStats(service, 294 grpc_passthru_endpoint_stats_create(), 295 fixture_configuration) {} 296 }; 297 298 //////////////////////////////////////////////////////////////////////////////// 299 // Minimal stack fixtures 300 301 class MinStackConfiguration : public FixtureConfiguration { ApplyCommonChannelArguments(ChannelArguments * a)302 void ApplyCommonChannelArguments(ChannelArguments* a) const override { 303 a->SetInt(GRPC_ARG_MINIMAL_STACK, 1); 304 FixtureConfiguration::ApplyCommonChannelArguments(a); 305 } 306 ApplyCommonServerBuilderConfig(ServerBuilder * b)307 void ApplyCommonServerBuilderConfig(ServerBuilder* b) const override { 308 b->AddChannelArgument(GRPC_ARG_MINIMAL_STACK, 1); 309 FixtureConfiguration::ApplyCommonServerBuilderConfig(b); 310 } 311 }; 312 313 template <class Base> 314 class MinStackize : public Base { 315 public: MinStackize(Service * service)316 MinStackize(Service* service) : Base(service, MinStackConfiguration()) {} 317 }; 318 319 typedef MinStackize<TCP> MinTCP; 320 typedef MinStackize<UDS> MinUDS; 321 typedef MinStackize<InProcess> MinInProcess; 322 typedef MinStackize<SockPair> MinSockPair; 323 typedef MinStackize<InProcessCHTTP2> MinInProcessCHTTP2; 324 325 } // namespace testing 326 } // namespace grpc 327 328 #endif 329