1 // Copyright 2017 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 // 15 16 #ifndef GRPC_TEST_CPP_END2END_XDS_XDS_END2END_TEST_LIB_H 17 #define GRPC_TEST_CPP_END2END_XDS_XDS_END2END_TEST_LIB_H 18 19 #include <gmock/gmock.h> 20 #include <grpc/grpc.h> 21 #include <grpc/grpc_security.h> 22 #include <grpcpp/channel.h> 23 #include <grpcpp/client_context.h> 24 #include <grpcpp/ext/call_metric_recorder.h> 25 #include <grpcpp/ext/server_metric_recorder.h> 26 #include <grpcpp/xds_server_builder.h> 27 #include <gtest/gtest.h> 28 29 #include <memory> 30 #include <set> 31 #include <string> 32 #include <thread> 33 #include <vector> 34 35 #include "absl/log/check.h" 36 #include "absl/log/log.h" 37 #include "absl/status/statusor.h" 38 #include "absl/strings/str_cat.h" 39 #include "absl/strings/string_view.h" 40 #include "absl/types/optional.h" 41 #include "envoy/config/rbac/v3/rbac.pb.h" 42 #include "envoy/extensions/filters/http/rbac/v3/rbac.pb.h" 43 #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" 44 #include "src/core/lib/security/credentials/fake/fake_credentials.h" 45 #include "src/core/lib/security/security_connector/ssl_utils.h" 46 #include "src/cpp/server/secure_server_credentials.h" 47 #include "src/proto/grpc/testing/echo.pb.h" 48 #include "test/core/test_util/port.h" 49 #include "test/core/test_util/resolve_localhost_ip46.h" 50 #include "test/cpp/end2end/counted_service.h" 51 #include "test/cpp/end2end/test_service_impl.h" 52 #include "test/cpp/end2end/xds/xds_server.h" 53 #include "test/cpp/end2end/xds/xds_utils.h" 54 #include "xds/data/orca/v3/orca_load_report.pb.h" 55 56 namespace grpc { 57 namespace testing { 58 59 // The parameter type for INSTANTIATE_TEST_SUITE_P(). 60 class XdsTestType { 61 public: 62 enum HttpFilterConfigLocation { 63 // Set the HTTP filter config directly in LDS. 64 kHttpFilterConfigInListener, 65 // Enable the HTTP filter in LDS, but override the filter config in route. 66 kHttpFilterConfigInRoute, 67 }; 68 69 enum BootstrapSource { 70 kBootstrapFromChannelArg, 71 kBootstrapFromFile, 72 kBootstrapFromEnvVar, 73 }; 74 set_enable_load_reporting()75 XdsTestType& set_enable_load_reporting() { 76 enable_load_reporting_ = true; 77 return *this; 78 } 79 set_enable_rds_testing()80 XdsTestType& set_enable_rds_testing() { 81 enable_rds_testing_ = true; 82 return *this; 83 } 84 set_use_csds_streaming()85 XdsTestType& set_use_csds_streaming() { 86 use_csds_streaming_ = true; 87 return *this; 88 } 89 set_filter_config_setup(HttpFilterConfigLocation setup)90 XdsTestType& set_filter_config_setup(HttpFilterConfigLocation setup) { 91 filter_config_setup_ = setup; 92 return *this; 93 } 94 set_bootstrap_source(BootstrapSource bootstrap_source)95 XdsTestType& set_bootstrap_source(BootstrapSource bootstrap_source) { 96 bootstrap_source_ = bootstrap_source; 97 return *this; 98 } 99 set_rbac_action(::envoy::config::rbac::v3::RBAC_Action action)100 XdsTestType& set_rbac_action(::envoy::config::rbac::v3::RBAC_Action action) { 101 rbac_action_ = action; 102 return *this; 103 } 104 set_rbac_audit_condition(::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition audit_condition)105 XdsTestType& set_rbac_audit_condition( 106 ::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition 107 audit_condition) { 108 rbac_audit_condition_ = audit_condition; 109 return *this; 110 } 111 enable_load_reporting()112 bool enable_load_reporting() const { return enable_load_reporting_; } enable_rds_testing()113 bool enable_rds_testing() const { return enable_rds_testing_; } use_csds_streaming()114 bool use_csds_streaming() const { return use_csds_streaming_; } filter_config_setup()115 HttpFilterConfigLocation filter_config_setup() const { 116 return filter_config_setup_; 117 } bootstrap_source()118 BootstrapSource bootstrap_source() const { return bootstrap_source_; } rbac_action()119 ::envoy::config::rbac::v3::RBAC_Action rbac_action() const { 120 return rbac_action_; 121 } 122 ::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition rbac_audit_condition()123 rbac_audit_condition() const { 124 return rbac_audit_condition_; 125 } 126 AsString()127 std::string AsString() const { 128 std::string retval = "V3"; 129 if (enable_load_reporting_) retval += "WithLoadReporting"; 130 if (enable_rds_testing_) retval += "Rds"; 131 if (use_csds_streaming_) retval += "CsdsStreaming"; 132 if (filter_config_setup_ == kHttpFilterConfigInRoute) { 133 retval += "FilterPerRouteOverride"; 134 } 135 if (bootstrap_source_ == kBootstrapFromFile) { 136 retval += "BootstrapFromFile"; 137 } else if (bootstrap_source_ == kBootstrapFromEnvVar) { 138 retval += "BootstrapFromEnvVar"; 139 } 140 if (rbac_action_ == ::envoy::config::rbac::v3::RBAC_Action_ALLOW) { 141 retval += "RbacAllow"; 142 } else if (rbac_action_ == ::envoy::config::rbac::v3::RBAC_Action_DENY) { 143 retval += "RbacDeny"; 144 } 145 if (rbac_audit_condition_ != 146 ::envoy::config::rbac::v3:: 147 RBAC_AuditLoggingOptions_AuditCondition_NONE) { 148 retval += absl::StrCat("AuditCondition", 149 ::envoy::config::rbac::v3:: 150 RBAC_AuditLoggingOptions_AuditCondition_Name( 151 rbac_audit_condition_)); 152 } 153 return retval; 154 } 155 156 // For use as the final parameter in INSTANTIATE_TEST_SUITE_P(). Name(const::testing::TestParamInfo<XdsTestType> & info)157 static std::string Name(const ::testing::TestParamInfo<XdsTestType>& info) { 158 return info.param.AsString(); 159 } 160 161 private: 162 bool enable_load_reporting_ = false; 163 bool enable_rds_testing_ = false; 164 bool use_csds_streaming_ = false; 165 HttpFilterConfigLocation filter_config_setup_ = kHttpFilterConfigInListener; 166 BootstrapSource bootstrap_source_ = kBootstrapFromChannelArg; 167 ::envoy::config::rbac::v3::RBAC_Action rbac_action_ = 168 ::envoy::config::rbac::v3::RBAC_Action_LOG; 169 ::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition 170 rbac_audit_condition_ = ::envoy::config::rbac::v3:: 171 RBAC_AuditLoggingOptions_AuditCondition_NONE; 172 }; 173 174 // A base class for xDS end-to-end tests. 175 // 176 // An xDS server is provided in balancer_. It is automatically started 177 // for every test. Additional xDS servers can be started if needed by 178 // calling CreateAndStartBalancer(). 179 // 180 // A default set of LDS, RDS, and CDS resources are created for gRPC 181 // clients, available in default_listener_, default_route_config_, and 182 // default_cluster_. These resources are automatically loaded into 183 // balancer_ but can be modified by individual tests. No EDS resource 184 // is provided by default. There are also default LDS and RDS resources 185 // for the gRPC server side in default_server_listener_ and 186 // default_server_route_config_. Methods are provided for constructing new 187 // resources that can be added to the xDS server as needed. 188 // 189 // This class provides a mechanism for running backend servers, which will 190 // be stored in backends_. No servers are created or started by default, 191 // but tests can call CreateAndStartBackends() to start however many 192 // backends they want. There are also a number of methods for accessing 193 // backends by index, which is the index into the backends_ vector. 194 // For methods that take a start_index and stop_index, this refers to 195 // the indexes in the range [start_index, stop_index). If stop_index 196 // is 0, backends_.size() is used. Backends may or may not be 197 // xDS-enabled, at the discretion of the test. 198 class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType>, 199 public XdsResourceUtils { 200 protected: 201 // TLS certificate paths. 202 static const char kCaCertPath[]; 203 static const char kServerCertPath[]; 204 static const char kServerKeyPath[]; 205 206 // Message used in EchoRequest to the backend. 207 static const char kRequestMessage[]; 208 209 // A base class for server threads. 210 class ServerThread { 211 public: 212 // A status notifier for xDS-enabled servers. 213 // 214 // TODO(yashykt): This notifier records the most recent state seen 215 // for every URI and then lets the caller wait until the status for 216 // that URI is the expected one. If we are expecting an update that 217 // has the same status as the previous one, then we really have no 218 // way of knowing whether the second update has actually been sent. 219 // A better approach here would be to queue the updates received by 220 // the notifier and then have a method to get the next update from 221 // the queue, if any. 222 // Also, we should change the callers to check not just the status 223 // but also the corresponding error message, so that we can verify 224 // that we're emitting useful error messages for our users. 225 class XdsServingStatusNotifier 226 : public grpc::XdsServerServingStatusNotifierInterface { 227 public: 228 void OnServingStatusUpdate(std::string uri, 229 ServingStatusUpdate update) override; 230 231 GRPC_MUST_USE_RESULT bool WaitOnServingStatusChange( 232 const std::string& uri, grpc::StatusCode expected_status, 233 absl::Duration timeout = absl::Seconds(10)); 234 235 private: 236 grpc_core::Mutex mu_; 237 grpc_core::CondVar cond_; 238 std::map<std::string, grpc::Status> status_map ABSL_GUARDED_BY(mu_); 239 }; 240 241 // If use_xds_enabled_server is true, the server will use xDS. 242 // If credentials is null, fake credentials will be used. 243 explicit ServerThread( 244 XdsEnd2endTest* test_obj, bool use_xds_enabled_server = false, 245 std::shared_ptr<ServerCredentials> credentials = nullptr); 246 ~ServerThread()247 virtual ~ServerThread() { 248 // Shutdown should be called manually. Shutdown calls virtual methods and 249 // can't be called from the base class destructor. 250 CHECK(!running_); 251 } 252 253 void Start(); 254 void Shutdown(); 255 target()256 std::string target() const { return absl::StrCat("localhost:", port_); } 257 port()258 int port() const { return port_; } 259 notifier()260 XdsServingStatusNotifier* notifier() { return ¬ifier_; } 261 262 GRPC_MUST_USE_RESULT bool WaitOnServingStatusChange( 263 grpc::StatusCode expected_status, 264 absl::Duration timeout = absl::Seconds(10)) { 265 return notifier_.WaitOnServingStatusChange( 266 grpc_core::LocalIpAndPort(port_), expected_status, timeout); 267 } 268 set_allow_put_requests(bool allow_put_requests)269 void set_allow_put_requests(bool allow_put_requests) { 270 allow_put_requests_ = allow_put_requests; 271 } 272 273 void StopListening(); 274 275 void StopListeningAndSendGoaways(); 276 277 private: 278 class XdsChannelArgsServerBuilderOption; 279 280 virtual const char* Type() = 0; 281 virtual void RegisterAllServices(ServerBuilder* builder) = 0; 282 virtual void StartAllServices() = 0; 283 virtual void ShutdownAllServices() = 0; 284 285 void Serve(grpc_core::Mutex* mu, grpc_core::CondVar* cond); 286 287 XdsEnd2endTest* test_obj_; 288 const bool use_xds_enabled_server_; 289 const std::shared_ptr<ServerCredentials> credentials_; 290 const int port_; 291 std::unique_ptr<Server> server_; 292 XdsServingStatusNotifier notifier_; 293 std::unique_ptr<std::thread> thread_; 294 bool running_ = false; 295 bool allow_put_requests_ = false; 296 }; 297 298 // A server thread for a backend server. 299 class BackendServerThread : public ServerThread { 300 public: 301 // A wrapper around the backend echo test service impl that counts 302 // requests and responses. 303 template <typename RpcService> 304 class BackendServiceImpl 305 : public CountedService<TestMultipleServiceImpl<RpcService>> { 306 public: BackendServiceImpl()307 BackendServiceImpl() {} 308 Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)309 Status Echo(ServerContext* context, const EchoRequest* request, 310 EchoResponse* response) override { 311 auto peer_identity = context->auth_context()->GetPeerIdentity(); 312 CountedService< 313 TestMultipleServiceImpl<RpcService>>::IncreaseRequestCount(); 314 { 315 grpc_core::MutexLock lock(&mu_); 316 clients_.insert(context->peer()); 317 last_peer_identity_.clear(); 318 for (const auto& entry : peer_identity) { 319 last_peer_identity_.emplace_back(entry.data(), entry.size()); 320 } 321 } 322 if (request->has_param() && request->param().has_backend_metrics()) { 323 const auto& request_metrics = request->param().backend_metrics(); 324 auto* recorder = context->ExperimentalGetCallMetricRecorder(); 325 if (request_metrics.cpu_utilization() != 0) { 326 recorder->RecordCpuUtilizationMetric( 327 request_metrics.cpu_utilization()); 328 } 329 if (request_metrics.mem_utilization() != 0) { 330 recorder->RecordMemoryUtilizationMetric( 331 request_metrics.mem_utilization()); 332 } 333 if (request_metrics.application_utilization() != 0) { 334 recorder->RecordApplicationUtilizationMetric( 335 request_metrics.application_utilization()); 336 } 337 for (const auto& p : request_metrics.named_metrics()) { 338 char* key = static_cast<char*>( 339 grpc_call_arena_alloc(context->c_call(), p.first.size() + 1)); 340 strncpy(key, p.first.data(), p.first.size()); 341 key[p.first.size()] = '\0'; 342 recorder->RecordNamedMetric(key, p.second); 343 } 344 } 345 const auto status = TestMultipleServiceImpl<RpcService>::Echo( 346 context, request, response); 347 CountedService< 348 TestMultipleServiceImpl<RpcService>>::IncreaseResponseCount(); 349 return status; 350 } 351 Echo1(ServerContext * context,const EchoRequest * request,EchoResponse * response)352 Status Echo1(ServerContext* context, const EchoRequest* request, 353 EchoResponse* response) override { 354 return Echo(context, request, response); 355 } 356 Echo2(ServerContext * context,const EchoRequest * request,EchoResponse * response)357 Status Echo2(ServerContext* context, const EchoRequest* request, 358 EchoResponse* response) override { 359 return Echo(context, request, response); 360 } 361 Start()362 void Start() {} Shutdown()363 void Shutdown() {} 364 clients()365 std::set<std::string> clients() { 366 grpc_core::MutexLock lock(&mu_); 367 return clients_; 368 } 369 last_peer_identity()370 const std::vector<std::string>& last_peer_identity() { 371 grpc_core::MutexLock lock(&mu_); 372 return last_peer_identity_; 373 } 374 375 private: 376 grpc_core::Mutex mu_; 377 std::set<std::string> clients_ ABSL_GUARDED_BY(&mu_); 378 std::vector<std::string> last_peer_identity_ ABSL_GUARDED_BY(&mu_); 379 }; 380 381 // If use_xds_enabled_server is true, the server will use xDS. 382 BackendServerThread(XdsEnd2endTest* test_obj, bool use_xds_enabled_server, 383 std::shared_ptr<ServerCredentials> credentials); 384 385 BackendServiceImpl<grpc::testing::EchoTestService::Service>* backend_service()386 backend_service() { 387 return &backend_service_; 388 } 389 BackendServiceImpl<grpc::testing::EchoTest1Service::Service>* backend_service1()390 backend_service1() { 391 return &backend_service1_; 392 } 393 BackendServiceImpl<grpc::testing::EchoTest2Service::Service>* backend_service2()394 backend_service2() { 395 return &backend_service2_; 396 } server_metric_recorder()397 grpc::experimental::ServerMetricRecorder* server_metric_recorder() const { 398 return server_metric_recorder_.get(); 399 } 400 401 private: Type()402 const char* Type() override { return "Backend"; } 403 void RegisterAllServices(ServerBuilder* builder) override; 404 void StartAllServices() override; 405 void ShutdownAllServices() override; 406 407 BackendServiceImpl<grpc::testing::EchoTestService::Service> 408 backend_service_; 409 BackendServiceImpl<grpc::testing::EchoTest1Service::Service> 410 backend_service1_; 411 BackendServiceImpl<grpc::testing::EchoTest2Service::Service> 412 backend_service2_; 413 std::unique_ptr<experimental::ServerMetricRecorder> server_metric_recorder_; 414 }; 415 416 // A server thread for the xDS server. 417 class BalancerServerThread : public ServerThread { 418 public: 419 explicit BalancerServerThread( 420 XdsEnd2endTest* test_obj, absl::string_view debug_label, 421 std::shared_ptr<ServerCredentials> credentials); 422 ads_service()423 AdsServiceImpl* ads_service() { return ads_service_.get(); } lrs_service()424 LrsServiceImpl* lrs_service() { return lrs_service_.get(); } 425 426 private: Type()427 const char* Type() override { return "Balancer"; } 428 void RegisterAllServices(ServerBuilder* builder) override; 429 void StartAllServices() override; 430 void ShutdownAllServices() override; 431 432 std::shared_ptr<AdsServiceImpl> ads_service_; 433 std::shared_ptr<LrsServiceImpl> lrs_service_; 434 }; 435 436 // RPC services used to talk to the backends. 437 enum RpcService { 438 SERVICE_ECHO, 439 SERVICE_ECHO1, 440 SERVICE_ECHO2, 441 }; 442 443 // RPC methods used to talk to the backends. 444 enum RpcMethod { 445 METHOD_ECHO, 446 METHOD_ECHO1, 447 METHOD_ECHO2, 448 }; 449 450 // If balancer_credentials is null, it defaults to fake credentials. 451 explicit XdsEnd2endTest( 452 std::shared_ptr<ServerCredentials> balancer_credentials = nullptr); 453 SetUp()454 void SetUp() override { InitClient(); } 455 void TearDown() override; 456 457 // 458 // xDS server management 459 // 460 461 // Creates and starts a new balancer, running in its own thread. 462 // Most tests will not need to call this; instead, they can use 463 // balancer_, which is already populated with default resources. 464 // If credentials is null, it defaults to fake credentials. 465 std::unique_ptr<BalancerServerThread> CreateAndStartBalancer( 466 absl::string_view debug_label = "", 467 std::shared_ptr<ServerCredentials> credentials = nullptr); 468 469 // Sets the Listener and RouteConfiguration resource on the specified 470 // balancer. If RDS is in use, they will be set as separate resources; 471 // otherwise, the RouteConfig will be inlined into the Listener. 472 void SetListenerAndRouteConfiguration( 473 BalancerServerThread* balancer, Listener listener, 474 const RouteConfiguration& route_config, 475 const HcmAccessor& hcm_accessor = ClientHcmAccessor()) { 476 XdsResourceUtils::SetListenerAndRouteConfiguration( 477 balancer->ads_service(), std::move(listener), route_config, 478 GetParam().enable_rds_testing(), hcm_accessor); 479 } 480 481 // A convenient wrapper for setting the Listener and 482 // RouteConfiguration resources on the server side. SetServerListenerNameAndRouteConfiguration(BalancerServerThread * balancer,Listener listener,int port,const RouteConfiguration & route_config)483 void SetServerListenerNameAndRouteConfiguration( 484 BalancerServerThread* balancer, Listener listener, int port, 485 const RouteConfiguration& route_config) { 486 SetListenerAndRouteConfiguration( 487 balancer, PopulateServerListenerNameAndPort(listener, port), 488 route_config, ServerHcmAccessor()); 489 } 490 491 // Sets the RouteConfiguration resource on the specified balancer. 492 // If RDS is in use, it will be set directly as an independent 493 // resource; otherwise, it will be inlined into a Listener resource 494 // (either listener_to_copy, or if that is null, default_listener_). 495 void SetRouteConfiguration(BalancerServerThread* balancer, 496 const RouteConfiguration& route_config, 497 const Listener* listener_to_copy = nullptr) { 498 XdsResourceUtils::SetRouteConfiguration( 499 balancer->ads_service(), route_config, GetParam().enable_rds_testing(), 500 listener_to_copy); 501 } 502 503 // Helper method for generating an endpoint for a backend, for use in 504 // constructing an EDS resource. 505 EdsResourceArgs::Endpoint CreateEndpoint( 506 size_t backend_idx, 507 ::envoy::config::core::v3::HealthStatus health_status = 508 ::envoy::config::core::v3::HealthStatus::UNKNOWN, 509 int lb_weight = 1, std::vector<size_t> additional_backend_indexes = {}, 510 absl::string_view hostname = "", 511 const std::map<std::string, std::string /*JSON*/>& metadata = {}) { 512 std::vector<int> additional_ports; 513 additional_ports.reserve(additional_backend_indexes.size()); 514 for (size_t idx : additional_backend_indexes) { 515 additional_ports.push_back(backends_[idx]->port()); 516 } 517 return EdsResourceArgs::Endpoint(backends_[backend_idx]->port(), 518 health_status, lb_weight, additional_ports, 519 hostname, metadata); 520 } 521 522 // Creates a vector of endpoints for a specified range of backends, 523 // for use in constructing an EDS resource. 524 std::vector<EdsResourceArgs::Endpoint> CreateEndpointsForBackends( 525 size_t start_index = 0, size_t stop_index = 0, 526 ::envoy::config::core::v3::HealthStatus health_status = 527 ::envoy::config::core::v3::HealthStatus::UNKNOWN, 528 int lb_weight = 1); 529 530 // Returns an endpoint for an unused port, for use in constructing an 531 // EDS resource. MakeNonExistentEndpoint()532 EdsResourceArgs::Endpoint MakeNonExistentEndpoint() { 533 return EdsResourceArgs::Endpoint(grpc_pick_unused_port_or_die()); 534 } 535 536 // 537 // Backend management 538 // 539 540 // Creates num_backends backends and stores them in backends_, but does 541 // not actually start them. If xds_enabled is true, the backends are 542 // xDS-enabled. If credentials is null, it defaults to fake credentials. 543 void CreateBackends( 544 size_t num_backends, bool xds_enabled = false, 545 std::shared_ptr<ServerCredentials> credentials = nullptr) { 546 for (size_t i = 0; i < num_backends; ++i) { 547 backends_.emplace_back( 548 new BackendServerThread(this, xds_enabled, credentials)); 549 } 550 } 551 552 // Starts all backends in backends_. StartAllBackends()553 void StartAllBackends() { 554 for (auto& backend : backends_) backend->Start(); 555 } 556 557 // Same as CreateBackends(), but also starts the backends. 558 void CreateAndStartBackends( 559 size_t num_backends, bool xds_enabled = false, 560 std::shared_ptr<ServerCredentials> credentials = nullptr) { 561 CreateBackends(num_backends, xds_enabled, std::move(credentials)); 562 StartAllBackends(); 563 } 564 565 // Starts the backend at backends_[index]. StartBackend(size_t index)566 void StartBackend(size_t index) { backends_[index]->Start(); } 567 568 // Shuts down all backends in backends_. ShutdownAllBackends()569 void ShutdownAllBackends() { 570 for (auto& backend : backends_) backend->Shutdown(); 571 } 572 573 // Shuts down the backend at backends_[index]. ShutdownBackend(size_t index)574 void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); } 575 576 // Resets the request counters for backends in the specified range. 577 void ResetBackendCounters(size_t start_index = 0, size_t stop_index = 0); 578 579 // Returns true if the specified backend has received requests for the 580 // specified service. 581 bool SeenBackend(size_t backend_idx, 582 const RpcService rpc_service = SERVICE_ECHO); 583 584 // Returns true if all backends in the specified range have received 585 // requests for the specified service. 586 bool SeenAllBackends(size_t start_index = 0, size_t stop_index = 0, 587 const RpcService rpc_service = SERVICE_ECHO); 588 589 // Returns a vector containing the port for every backend in the 590 // specified range. 591 std::vector<int> GetBackendPorts(size_t start_index = 0, 592 size_t stop_index = 0) const; 593 594 // 595 // Client management 596 // 597 598 // Initializes global state for the client, such as the bootstrap file 599 // and channel args for the XdsClient. Then calls ResetStub(). 600 // All tests must call this exactly once at the start of the test. 601 // If credentials is null, fake credentials will be used. 602 void InitClient(absl::optional<XdsBootstrapBuilder> builder = absl::nullopt, 603 std::string lb_expected_authority = "", 604 int xds_resource_does_not_exist_timeout_ms = 0, 605 std::string balancer_authority_override = "", 606 ChannelArguments* args = nullptr, 607 std::shared_ptr<ChannelCredentials> credentials = nullptr); 608 MakeBootstrapBuilder()609 XdsBootstrapBuilder MakeBootstrapBuilder() { 610 return XdsBootstrapBuilder().SetServers( 611 {absl::StrCat("localhost:", balancer_->port())}); 612 } 613 614 // Sets channel_, stub_, stub1_, and stub2_. 615 // If credentials is null, fake credentials will be used. 616 void ResetStub(int failover_timeout_ms = 0, ChannelArguments* args = nullptr, 617 std::shared_ptr<ChannelCredentials> credentials = nullptr); 618 619 // Creates a new client channel. Requires that InitClient() has 620 // already been called. 621 // If credentials is null, fake credentials will be used. 622 std::shared_ptr<Channel> CreateChannel( 623 int failover_timeout_ms = 0, const char* server_name = kServerName, 624 const char* xds_authority = "", ChannelArguments* args = nullptr, 625 std::shared_ptr<ChannelCredentials> credentials = nullptr); 626 627 // 628 // Sending RPCs 629 // 630 631 // Options used for sending an RPC. 632 struct RpcOptions { 633 RpcService service = SERVICE_ECHO; 634 RpcMethod method = METHOD_ECHO; 635 // Will be multiplied by grpc_test_slowdown_factor(). 636 int timeout_ms = 5000; 637 bool wait_for_ready = false; 638 std::vector<std::pair<std::string, std::string>> metadata; 639 // These options are used by the backend service impl. 640 bool server_fail = false; 641 int server_sleep_us = 0; 642 int client_cancel_after_us = 0; 643 bool skip_cancelled_check = false; 644 StatusCode server_expected_error = StatusCode::OK; 645 absl::optional<xds::data::orca::v3::OrcaLoadReport> backend_metrics; 646 bool server_notify_client_when_started = false; 647 bool echo_host_from_authority_header = false; 648 bool echo_metadata_initially = false; 649 RpcOptionsRpcOptions650 RpcOptions() {} 651 set_rpc_serviceRpcOptions652 RpcOptions& set_rpc_service(RpcService rpc_service) { 653 service = rpc_service; 654 return *this; 655 } 656 set_rpc_methodRpcOptions657 RpcOptions& set_rpc_method(RpcMethod rpc_method) { 658 method = rpc_method; 659 return *this; 660 } 661 set_timeout_msRpcOptions662 RpcOptions& set_timeout_ms(int rpc_timeout_ms) { 663 timeout_ms = rpc_timeout_ms; 664 return *this; 665 } 666 set_timeoutRpcOptions667 RpcOptions& set_timeout(grpc_core::Duration timeout) { 668 timeout_ms = timeout.millis(); 669 return *this; 670 } 671 set_wait_for_readyRpcOptions672 RpcOptions& set_wait_for_ready(bool rpc_wait_for_ready) { 673 wait_for_ready = rpc_wait_for_ready; 674 return *this; 675 } 676 set_metadataRpcOptions677 RpcOptions& set_metadata( 678 std::vector<std::pair<std::string, std::string>> rpc_metadata) { 679 metadata = std::move(rpc_metadata); 680 return *this; 681 } 682 set_server_failRpcOptions683 RpcOptions& set_server_fail(bool rpc_server_fail) { 684 server_fail = rpc_server_fail; 685 return *this; 686 } 687 set_server_sleep_usRpcOptions688 RpcOptions& set_server_sleep_us(int rpc_server_sleep_us) { 689 server_sleep_us = rpc_server_sleep_us; 690 return *this; 691 } 692 set_client_cancel_after_usRpcOptions693 RpcOptions& set_client_cancel_after_us(int rpc_client_cancel_after_us) { 694 client_cancel_after_us = rpc_client_cancel_after_us; 695 return *this; 696 } 697 set_skip_cancelled_checkRpcOptions698 RpcOptions& set_skip_cancelled_check(bool rpc_skip_cancelled_check) { 699 skip_cancelled_check = rpc_skip_cancelled_check; 700 return *this; 701 } 702 set_server_expected_errorRpcOptions703 RpcOptions& set_server_expected_error(StatusCode code) { 704 server_expected_error = code; 705 return *this; 706 } 707 set_backend_metricsRpcOptions708 RpcOptions& set_backend_metrics( 709 absl::optional<xds::data::orca::v3::OrcaLoadReport> metrics) { 710 backend_metrics = std::move(metrics); 711 return *this; 712 } 713 set_server_notify_client_when_startedRpcOptions714 RpcOptions& set_server_notify_client_when_started( 715 bool rpc_server_notify_client_when_started) { 716 server_notify_client_when_started = rpc_server_notify_client_when_started; 717 return *this; 718 } 719 set_echo_host_from_authority_headerRpcOptions720 RpcOptions& set_echo_host_from_authority_header(bool value) { 721 echo_host_from_authority_header = value; 722 return *this; 723 } 724 set_echo_metadata_initiallyRpcOptions725 RpcOptions& set_echo_metadata_initially(bool value) { 726 echo_metadata_initially = value; 727 return *this; 728 } 729 730 // Populates context and request. 731 void SetupRpc(ClientContext* context, EchoRequest* request) const; 732 }; 733 734 // Sends an RPC with the specified options. 735 // If response is non-null, it will be populated with the response. 736 // Returns the status of the RPC. 737 Status SendRpc(const RpcOptions& rpc_options = RpcOptions(), 738 EchoResponse* response = nullptr, 739 std::multimap<std::string, std::string>* 740 server_initial_metadata = nullptr); 741 742 // Internal helper function for SendRpc(). 743 template <typename Stub> SendRpcMethod(Stub * stub,const RpcOptions & rpc_options,ClientContext * context,EchoRequest & request,EchoResponse * response)744 static Status SendRpcMethod(Stub* stub, const RpcOptions& rpc_options, 745 ClientContext* context, EchoRequest& request, 746 EchoResponse* response) { 747 switch (rpc_options.method) { 748 case METHOD_ECHO: 749 return stub->Echo(context, request, response); 750 case METHOD_ECHO1: 751 return stub->Echo1(context, request, response); 752 case METHOD_ECHO2: 753 return stub->Echo2(context, request, response); 754 } 755 GPR_UNREACHABLE_CODE(return grpc::Status::OK); 756 } 757 758 // Send RPCs in a loop until either continue_predicate() returns false 759 // or timeout_ms elapses. 760 struct RpcResult { 761 Status status; 762 EchoResponse response; 763 }; 764 void SendRpcsUntil(const grpc_core::DebugLocation& debug_location, 765 std::function<bool(const RpcResult&)> continue_predicate, 766 int timeout_ms = 15000, 767 const RpcOptions& rpc_options = RpcOptions()); 768 769 // Sends the specified number of RPCs and fails if the RPC fails. 770 void CheckRpcSendOk(const grpc_core::DebugLocation& debug_location, 771 const size_t times = 1, 772 const RpcOptions& rpc_options = RpcOptions()); 773 774 // Sends one RPC, which must fail with the specified status code and 775 // a message matching the specified regex. 776 void CheckRpcSendFailure(const grpc_core::DebugLocation& debug_location, 777 StatusCode expected_status, 778 absl::string_view expected_message_regex, 779 const RpcOptions& rpc_options = RpcOptions()); 780 781 // Sends RPCs until either a timeout or an RPC fail, in which case the 782 // failure must match the specified status and message regex. 783 void SendRpcsUntilFailure(const grpc_core::DebugLocation& debug_location, 784 StatusCode expected_status, 785 absl::string_view expected_message_regex, 786 int timeout_ms = 15000, 787 const RpcOptions& rpc_options = RpcOptions()); 788 789 // Sends num_rpcs RPCs, counting how many of them fail with a message 790 // matching the specified expected_message_prefix. 791 // Any failure with a non-matching status or message is a test failure. 792 size_t SendRpcsAndCountFailuresWithMessage( 793 const grpc_core::DebugLocation& debug_location, size_t num_rpcs, 794 StatusCode expected_status, absl::string_view expected_message_prefix, 795 const RpcOptions& rpc_options = RpcOptions()); 796 797 // A class for running a long-running RPC in its own thread. 798 // TODO(roth): Maybe consolidate this and SendConcurrentRpcs() 799 // somehow? LongRunningRpc has a cleaner API, but SendConcurrentRpcs() 800 // uses the callback API, which is probably better. 801 class LongRunningRpc { 802 public: 803 // Starts the RPC. 804 void StartRpc(grpc::testing::EchoTestService::Stub* stub, 805 const RpcOptions& rpc_options = 806 RpcOptions().set_timeout_ms(0).set_client_cancel_after_us( 807 1 * 1000 * 1000)); 808 809 // Cancels the RPC. 810 void CancelRpc(); 811 812 // Gets the RPC's status. Blocks if the RPC is not yet complete. 813 Status GetStatus(); 814 815 private: 816 std::thread sender_thread_; 817 ClientContext context_; 818 Status status_; 819 }; 820 821 // Starts a set of concurrent RPCs. 822 struct ConcurrentRpc { 823 ClientContext context; 824 Status status; 825 grpc_core::Duration elapsed_time; 826 EchoResponse response; 827 }; 828 std::vector<std::unique_ptr<ConcurrentRpc>> SendConcurrentRpcs( 829 const grpc_core::DebugLocation& debug_location, 830 grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs, 831 const RpcOptions& rpc_options); 832 833 // 834 // Waiting for individual backends to be seen by the client 835 // 836 837 struct WaitForBackendOptions { 838 // If true, resets the backend counters before returning. 839 bool reset_counters = true; 840 // How long to wait for the backend(s) to see requests. 841 // Will be multiplied by grpc_test_slowdown_factor(). 842 int timeout_ms = 15000; 843 WaitForBackendOptionsWaitForBackendOptions844 WaitForBackendOptions() {} 845 set_reset_countersWaitForBackendOptions846 WaitForBackendOptions& set_reset_counters(bool enable) { 847 reset_counters = enable; 848 return *this; 849 } 850 set_timeout_msWaitForBackendOptions851 WaitForBackendOptions& set_timeout_ms(int ms) { 852 timeout_ms = ms; 853 return *this; 854 } 855 }; 856 857 // Sends RPCs until all of the backends in the specified range see requests. 858 // The check_status callback will be invoked to check the status of 859 // every RPC; if null, the default is to check that the RPC succeeded. 860 // Returns the total number of RPCs sent. 861 size_t WaitForAllBackends( 862 const grpc_core::DebugLocation& debug_location, size_t start_index = 0, 863 size_t stop_index = 0, 864 std::function<void(const RpcResult&)> check_status = nullptr, 865 const WaitForBackendOptions& wait_options = WaitForBackendOptions(), 866 const RpcOptions& rpc_options = RpcOptions()); 867 868 // Sends RPCs until the backend at index backend_idx sees requests. 869 void WaitForBackend( 870 const grpc_core::DebugLocation& debug_location, size_t backend_idx, 871 std::function<void(const RpcResult&)> check_status = nullptr, 872 const WaitForBackendOptions& wait_options = WaitForBackendOptions(), 873 const RpcOptions& rpc_options = RpcOptions()) { 874 WaitForAllBackends(debug_location, backend_idx, backend_idx + 1, 875 check_status, wait_options, rpc_options); 876 } 877 878 // 879 // Waiting for xDS NACKs 880 // 881 // These methods send RPCs in a loop until they see a NACK from the 882 // xDS server, or until a timeout expires. 883 884 // Sends RPCs until get_state() returns a response. 885 // TODO(roth): Does this actually need to send RPCs, or can it just 886 // use a condition variable to wait? I suspect that we need to be 887 // sending RPCs for polling reasons, but that should go away when we 888 // finish the EventEngine migration. Once that's done, try changing 889 // this to not send RPCs. 890 // Also, consider refactoring to also support waiting for ACKs, since 891 // there are several use-cases where tests are doing that. 892 absl::optional<AdsServiceImpl::ResponseState> WaitForNack( 893 const grpc_core::DebugLocation& debug_location, 894 std::function<absl::optional<AdsServiceImpl::ResponseState>()> get_state, 895 const RpcOptions& rpc_options = RpcOptions(), 896 StatusCode expected_status = StatusCode::UNAVAILABLE); 897 898 // Sends RPCs until an LDS NACK is seen. 899 absl::optional<AdsServiceImpl::ResponseState> WaitForLdsNack( 900 const grpc_core::DebugLocation& debug_location, 901 const RpcOptions& rpc_options = RpcOptions(), 902 StatusCode expected_status = StatusCode::UNAVAILABLE) { 903 return WaitForNack( 904 debug_location, 905 [&]() { return balancer_->ads_service()->lds_response_state(); }, 906 rpc_options, expected_status); 907 } 908 909 // Sends RPCs until an RDS NACK is seen. 910 absl::optional<AdsServiceImpl::ResponseState> WaitForRdsNack( 911 const grpc_core::DebugLocation& debug_location, 912 const RpcOptions& rpc_options = RpcOptions(), 913 StatusCode expected_status = StatusCode::UNAVAILABLE) { 914 return WaitForNack( 915 debug_location, 916 [&]() { return RouteConfigurationResponseState(balancer_.get()); }, 917 rpc_options, expected_status); 918 } 919 920 // Sends RPCs until a CDS NACK is seen. 921 absl::optional<AdsServiceImpl::ResponseState> WaitForCdsNack( 922 const grpc_core::DebugLocation& debug_location, 923 const RpcOptions& rpc_options = RpcOptions(), 924 StatusCode expected_status = StatusCode::UNAVAILABLE) { 925 return WaitForNack( 926 debug_location, 927 [&]() { return balancer_->ads_service()->cds_response_state(); }, 928 rpc_options, expected_status); 929 } 930 931 // Sends RPCs until an EDS NACK is seen. 932 absl::optional<AdsServiceImpl::ResponseState> WaitForEdsNack( 933 const grpc_core::DebugLocation& debug_location, 934 const RpcOptions& rpc_options = RpcOptions()) { 935 return WaitForNack( 936 debug_location, 937 [&]() { return balancer_->ads_service()->eds_response_state(); }, 938 rpc_options); 939 } 940 941 // Convenient front-end to wait for RouteConfiguration to be NACKed, 942 // regardless of whether it's sent in LDS or RDS. 943 absl::optional<AdsServiceImpl::ResponseState> WaitForRouteConfigNack( 944 const grpc_core::DebugLocation& debug_location, 945 const RpcOptions& rpc_options = RpcOptions(), 946 StatusCode expected_status = StatusCode::UNAVAILABLE) { 947 if (GetParam().enable_rds_testing()) { 948 return WaitForRdsNack(debug_location, rpc_options, expected_status); 949 } 950 return WaitForLdsNack(debug_location, rpc_options, expected_status); 951 } 952 953 // Convenient front-end for accessing xDS response state for a 954 // RouteConfiguration, regardless of whether it's sent in LDS or RDS. RouteConfigurationResponseState(BalancerServerThread * balancer)955 absl::optional<AdsServiceImpl::ResponseState> RouteConfigurationResponseState( 956 BalancerServerThread* balancer) const { 957 AdsServiceImpl* ads_service = balancer->ads_service(); 958 if (GetParam().enable_rds_testing()) { 959 return ads_service->rds_response_state(); 960 } 961 return ads_service->lds_response_state(); 962 } 963 964 // 965 // Miscellaneous helper methods 966 // 967 968 // There is slight difference between time fetched by GPR and by C++ system 969 // clock API. It's unclear if they are using the same syscall, but we do know 970 // GPR round the number at millisecond-level. This creates a 1ms difference, 971 // which could cause flake. NowFromCycleCounter()972 static grpc_core::Timestamp NowFromCycleCounter() { 973 return grpc_core::Timestamp::FromTimespecRoundDown( 974 gpr_now(GPR_CLOCK_MONOTONIC)); 975 } 976 977 // Sets duration_proto to duration times grpc_test_slowdown_factor(). 978 static void SetProtoDuration(grpc_core::Duration duration, 979 google::protobuf::Duration* duration_proto); 980 981 // Returns the number of RPCs needed to pass error_tolerance at 99.99994% 982 // chance. Rolling dices in drop/fault-injection generates a binomial 983 // distribution (if our code is not horribly wrong). Let's make "n" the number 984 // of samples, "p" the probability. If we have np>5 & n(1-p)>5, we can 985 // approximately treat the binomial distribution as a normal distribution. 986 // 987 // For normal distribution, we can easily look up how many standard deviation 988 // we need to reach 99.995%. Based on Wiki's table 989 // https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule, we need 5.00 990 // sigma (standard deviation) to cover the probability area of 99.99994%. In 991 // another word, for a sample with size "n" probability "p" error-tolerance 992 // "k", we want the error always land within 5.00 sigma. The sigma of 993 // binomial distribution and be computed as sqrt(np(1-p)). Hence, we have 994 // the equation: 995 // 996 // kn <= 5.00 * sqrt(np(1-p)) 997 // TODO(yashykt): The above explanation assumes a normal distribution, but we 998 // use a uniform distribution instead. We need a better estimate of how many 999 // RPCs are needed with what error tolerance. ComputeIdealNumRpcs(double p,double error_tolerance)1000 static size_t ComputeIdealNumRpcs(double p, double error_tolerance) { 1001 CHECK_GE(p, 0); 1002 CHECK_LE(p, 1); 1003 size_t num_rpcs = 1004 ceil(p * (1 - p) * 5.00 * 5.00 / error_tolerance / error_tolerance); 1005 num_rpcs += 1000; // Add 1K as a buffer to avoid flakiness. 1006 LOG(INFO) << "Sending " << num_rpcs << " RPCs for percentage=" << p 1007 << " error_tolerance=" << error_tolerance; 1008 return num_rpcs; 1009 } 1010 1011 // Returns a regex that can be matched against an RPC failure status 1012 // message for a connection failure. 1013 static std::string MakeConnectionFailureRegex( 1014 absl::string_view prefix, bool has_resolution_note = true); 1015 1016 // Returns a regex that can be matched against an RPC failure status 1017 // message for a Tls handshake failure. 1018 static std::string MakeTlsHandshakeFailureRegex(absl::string_view prefix); 1019 1020 // Returns a private key pair, read from local files. 1021 static grpc_core::PemKeyCertPairList ReadTlsIdentityPair( 1022 const char* key_path, const char* cert_path); 1023 1024 // Internal helper function for creating TLS and mTLS creds. 1025 // Not intended to be used by tests. 1026 static std::vector<experimental::IdentityKeyCertPair> 1027 MakeIdentityKeyCertPairForTlsCreds(); 1028 1029 // Returns XdsCredentials with mTLS fallback creds. 1030 static std::shared_ptr<ChannelCredentials> CreateXdsChannelCredentials(); 1031 static std::shared_ptr<ChannelCredentials> CreateTlsChannelCredentials(); 1032 1033 // Creates various types of server credentials. 1034 static std::shared_ptr<ServerCredentials> CreateFakeServerCredentials(); 1035 static std::shared_ptr<ServerCredentials> CreateMtlsServerCredentials(); 1036 static std::shared_ptr<ServerCredentials> CreateTlsServerCredentials(); 1037 1038 std::unique_ptr<BalancerServerThread> balancer_; 1039 1040 std::shared_ptr<Channel> channel_; 1041 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; 1042 std::unique_ptr<grpc::testing::EchoTest1Service::Stub> stub1_; 1043 std::unique_ptr<grpc::testing::EchoTest2Service::Stub> stub2_; 1044 1045 std::vector<std::unique_ptr<BackendServerThread>> backends_; 1046 1047 // Default xDS resources. 1048 Listener default_listener_; 1049 RouteConfiguration default_route_config_; 1050 Listener default_server_listener_; 1051 RouteConfiguration default_server_route_config_; 1052 Cluster default_cluster_; 1053 1054 int xds_drain_grace_time_ms_ = 10 * 60 * 1000; // 10 mins 1055 1056 bool bootstrap_contents_from_env_var_; 1057 std::string bootstrap_; 1058 char* bootstrap_file_ = nullptr; 1059 absl::InlinedVector<grpc_arg, 3> xds_channel_args_to_add_; 1060 grpc_channel_args xds_channel_args_; 1061 }; 1062 1063 } // namespace testing 1064 } // namespace grpc 1065 1066 #endif // GRPC_TEST_CPP_END2END_XDS_XDS_END2END_TEST_LIB_H 1067