1 // 2 // 3 // Copyright 2017 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H 20 #define GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H 21 22 #include <grpc/grpc.h> 23 #include <grpc/support/atm.h> 24 #include <grpcpp/channel.h> 25 #include <grpcpp/create_channel.h> 26 #include <grpcpp/security/credentials.h> 27 #include <grpcpp/security/server_credentials.h> 28 #include <grpcpp/server.h> 29 #include <grpcpp/server_builder.h> 30 31 #include "absl/log/check.h" 32 #include "src/core/config/core_configuration.h" 33 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" 34 #include "src/core/lib/channel/channel_args.h" 35 #include "src/core/lib/iomgr/endpoint.h" 36 #include "src/core/lib/iomgr/endpoint_pair.h" 37 #include "src/core/lib/iomgr/exec_ctx.h" 38 #include "src/core/lib/iomgr/tcp_posix.h" 39 #include "src/core/lib/surface/channel.h" 40 #include "src/core/lib/surface/channel_create.h" 41 #include "src/core/lib/surface/completion_queue.h" 42 #include "src/core/server/server.h" 43 #include "src/core/util/crash.h" 44 #include "src/cpp/client/create_channel_internal.h" 45 #include "test/core/test_util/port.h" 46 #include "test/core/test_util/test_config.h" 47 #include "test/cpp/microbenchmarks/helpers.h" 48 49 namespace grpc { 50 namespace testing { 51 52 class FixtureConfiguration { 53 public: ~FixtureConfiguration()54 virtual ~FixtureConfiguration() {} ApplyCommonChannelArguments(ChannelArguments * c)55 virtual void ApplyCommonChannelArguments(ChannelArguments* c) const { 56 c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX); 57 c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX); 58 c->SetInt(GRPC_ARG_ENABLE_RETRIES, 0); 59 c->SetResourceQuota(ResourceQuota()); 60 } 61 ApplyCommonServerBuilderConfig(ServerBuilder * b)62 virtual void ApplyCommonServerBuilderConfig(ServerBuilder* b) const { 63 b->SetMaxReceiveMessageSize(INT_MAX); 64 b->SetMaxSendMessageSize(INT_MAX); 65 } 66 }; 67 68 class BaseFixture { 69 public: 70 virtual ~BaseFixture() = default; 71 }; 72 73 class FullstackFixture : public BaseFixture { 74 public: FullstackFixture(Service * service,const FixtureConfiguration & config,const std::string & address)75 FullstackFixture(Service* service, const FixtureConfiguration& config, 76 const std::string& address) { 77 ServerBuilder b; 78 if (!address.empty()) { 79 b.AddListeningPort(address, InsecureServerCredentials()); 80 } 81 cq_ = b.AddCompletionQueue(true); 82 b.RegisterService(service); 83 config.ApplyCommonServerBuilderConfig(&b); 84 server_ = b.BuildAndStart(); 85 ChannelArguments args; 86 config.ApplyCommonChannelArguments(&args); 87 if (!address.empty()) { 88 channel_ = grpc::CreateCustomChannel(address, 89 InsecureChannelCredentials(), args); 90 } else { 91 channel_ = server_->InProcessChannel(args); 92 } 93 } 94 ~FullstackFixture()95 ~FullstackFixture() override { 96 channel_.reset(); 97 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); 98 cq_->Shutdown(); 99 void* tag; 100 bool ok; 101 while (cq_->Next(&tag, &ok)) { 102 } 103 } 104 cq()105 ServerCompletionQueue* cq() { return cq_.get(); } channel()106 std::shared_ptr<Channel> channel() { return channel_; } 107 108 private: 109 std::unique_ptr<Server> server_; 110 std::unique_ptr<ServerCompletionQueue> cq_; 111 std::shared_ptr<Channel> channel_; 112 }; 113 114 class TCP : public FullstackFixture { 115 public: 116 explicit TCP(Service* service, 117 const FixtureConfiguration& fixture_configuration = 118 FixtureConfiguration()) FullstackFixture(service,fixture_configuration,MakeAddress (& port_))119 : FullstackFixture(service, fixture_configuration, MakeAddress(&port_)) {} 120 ~TCP()121 ~TCP() override { grpc_recycle_unused_port(port_); } 122 123 private: 124 int port_; 125 MakeAddress(int * port)126 static std::string MakeAddress(int* port) { 127 *port = grpc_pick_unused_port_or_die(); 128 std::stringstream addr; 129 addr << "localhost:" << *port; 130 return addr.str(); 131 } 132 }; 133 134 class UDS : public FullstackFixture { 135 public: 136 explicit UDS(Service* service, 137 const FixtureConfiguration& fixture_configuration = 138 FixtureConfiguration()) FullstackFixture(service,fixture_configuration,MakeAddress (& port_))139 : FullstackFixture(service, fixture_configuration, MakeAddress(&port_)) {} 140 ~UDS()141 ~UDS() override { grpc_recycle_unused_port(port_); } 142 143 private: 144 int port_; 145 MakeAddress(int * port)146 static std::string MakeAddress(int* port) { 147 *port = grpc_pick_unused_port_or_die(); // just for a unique id - not a 148 // real port 149 std::stringstream addr; 150 addr << "unix:/tmp/bm_fullstack." << *port; 151 return addr.str(); 152 } 153 }; 154 155 class InProcess : public FullstackFixture { 156 public: 157 explicit InProcess(Service* service, 158 const FixtureConfiguration& fixture_configuration = 159 FixtureConfiguration()) 160 : FullstackFixture(service, fixture_configuration, "") {} ~InProcess()161 ~InProcess() override {} 162 }; 163 164 class EndpointPairFixture : public BaseFixture { 165 public: EndpointPairFixture(Service * service,grpc_endpoint_pair endpoints,const FixtureConfiguration & fixture_configuration)166 EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints, 167 const FixtureConfiguration& fixture_configuration) 168 : endpoint_pair_(endpoints) { 169 ServerBuilder b; 170 cq_ = b.AddCompletionQueue(true); 171 b.RegisterService(service); 172 fixture_configuration.ApplyCommonServerBuilderConfig(&b); 173 server_ = b.BuildAndStart(); 174 grpc_core::ExecCtx exec_ctx; 175 // add server endpoint to server_ 176 // 177 { 178 grpc_core::Server* core_server = 179 grpc_core::Server::FromC(server_->c_server()); 180 grpc_core::ChannelArgs server_args = core_server->channel_args(); 181 server_transport_ = grpc_create_chttp2_transport( 182 server_args, 183 grpc_core::OrphanablePtr<grpc_endpoint>(endpoints.server), 184 /*is_client=*/false); 185 for (grpc_pollset* pollset : core_server->pollsets()) { 186 grpc_endpoint_add_to_pollset(endpoints.server, pollset); 187 } 188 189 CHECK(GRPC_LOG_IF_ERROR("SetupTransport", core_server->SetupTransport( 190 server_transport_, nullptr, 191 server_args, nullptr))); 192 grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr, 193 nullptr, nullptr); 194 } 195 196 // create channel 197 { 198 grpc_core::ChannelArgs c_args; 199 { 200 ChannelArguments args; 201 args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority"); 202 fixture_configuration.ApplyCommonChannelArguments(&args); 203 // precondition 204 grpc_channel_args tmp_args; 205 args.SetChannelArgs(&tmp_args); 206 c_args = grpc_core::CoreConfiguration::Get() 207 .channel_args_preconditioning() 208 .PreconditionChannelArgs(&tmp_args); 209 } 210 client_transport_ = grpc_create_chttp2_transport( 211 c_args, grpc_core::OrphanablePtr<grpc_endpoint>(endpoints.client), 212 /*is_client=*/true); 213 CHECK(client_transport_); 214 grpc_channel* channel = 215 grpc_core::ChannelCreate("target", c_args, GRPC_CLIENT_DIRECT_CHANNEL, 216 client_transport_) 217 ->release() 218 ->c_ptr(); 219 grpc_chttp2_transport_start_reading(client_transport_, nullptr, nullptr, 220 nullptr, nullptr); 221 222 channel_ = grpc::CreateChannelInternal( 223 "", channel, 224 std::vector<std::unique_ptr< 225 experimental::ClientInterceptorFactoryInterface>>()); 226 } 227 } 228 ~EndpointPairFixture()229 ~EndpointPairFixture() override { 230 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); 231 cq_->Shutdown(); 232 void* tag; 233 bool ok; 234 while (cq_->Next(&tag, &ok)) { 235 } 236 } 237 cq()238 ServerCompletionQueue* cq() { return cq_.get(); } channel()239 std::shared_ptr<Channel> channel() { return channel_; } 240 241 protected: 242 grpc_endpoint_pair endpoint_pair_; 243 grpc_core::Transport* client_transport_; 244 grpc_core::Transport* server_transport_; 245 246 private: 247 std::unique_ptr<Server> server_; 248 std::unique_ptr<ServerCompletionQueue> cq_; 249 std::shared_ptr<Channel> channel_; 250 }; 251 252 class SockPair : public EndpointPairFixture { 253 public: 254 explicit SockPair(Service* service, 255 const FixtureConfiguration& fixture_configuration = 256 FixtureConfiguration()) 257 : EndpointPairFixture(service, 258 grpc_iomgr_create_endpoint_pair("test", nullptr), 259 fixture_configuration) {} 260 }; 261 262 //////////////////////////////////////////////////////////////////////////////// 263 // Minimal stack fixtures 264 265 class MinStackConfiguration : public FixtureConfiguration { ApplyCommonChannelArguments(ChannelArguments * a)266 void ApplyCommonChannelArguments(ChannelArguments* a) const override { 267 a->SetInt(GRPC_ARG_MINIMAL_STACK, 1); 268 FixtureConfiguration::ApplyCommonChannelArguments(a); 269 } 270 ApplyCommonServerBuilderConfig(ServerBuilder * b)271 void ApplyCommonServerBuilderConfig(ServerBuilder* b) const override { 272 b->AddChannelArgument(GRPC_ARG_MINIMAL_STACK, 1); 273 FixtureConfiguration::ApplyCommonServerBuilderConfig(b); 274 } 275 }; 276 277 template <class Base> 278 class MinStackize : public Base { 279 public: MinStackize(Service * service)280 explicit MinStackize(Service* service) 281 : Base(service, MinStackConfiguration()) {} 282 }; 283 284 typedef MinStackize<TCP> MinTCP; 285 typedef MinStackize<UDS> MinUDS; 286 typedef MinStackize<InProcess> MinInProcess; 287 typedef MinStackize<SockPair> MinSockPair; 288 289 } // namespace testing 290 } // namespace grpc 291 292 #endif // GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H 293