• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 #ifndef GRPC_TEST_CPP_END2END_XDS_XDS_END2END_TEST_LIB_H
17 #define GRPC_TEST_CPP_END2END_XDS_XDS_END2END_TEST_LIB_H
18 
19 #include <gmock/gmock.h>
20 #include <grpc/grpc.h>
21 #include <grpc/grpc_security.h>
22 #include <grpcpp/channel.h>
23 #include <grpcpp/client_context.h>
24 #include <grpcpp/ext/call_metric_recorder.h>
25 #include <grpcpp/ext/server_metric_recorder.h>
26 #include <grpcpp/xds_server_builder.h>
27 #include <gtest/gtest.h>
28 
29 #include <memory>
30 #include <set>
31 #include <string>
32 #include <thread>
33 #include <vector>
34 
35 #include "absl/log/check.h"
36 #include "absl/log/log.h"
37 #include "absl/status/statusor.h"
38 #include "absl/strings/str_cat.h"
39 #include "absl/strings/string_view.h"
40 #include "absl/types/optional.h"
41 #include "envoy/config/rbac/v3/rbac.pb.h"
42 #include "envoy/extensions/filters/http/rbac/v3/rbac.pb.h"
43 #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
44 #include "src/core/lib/security/credentials/fake/fake_credentials.h"
45 #include "src/core/lib/security/security_connector/ssl_utils.h"
46 #include "src/cpp/server/secure_server_credentials.h"
47 #include "src/proto/grpc/testing/echo.pb.h"
48 #include "test/core/test_util/port.h"
49 #include "test/core/test_util/resolve_localhost_ip46.h"
50 #include "test/cpp/end2end/counted_service.h"
51 #include "test/cpp/end2end/test_service_impl.h"
52 #include "test/cpp/end2end/xds/xds_server.h"
53 #include "test/cpp/end2end/xds/xds_utils.h"
54 #include "xds/data/orca/v3/orca_load_report.pb.h"
55 
56 namespace grpc {
57 namespace testing {
58 
59 // The parameter type for INSTANTIATE_TEST_SUITE_P().
60 class XdsTestType {
61  public:
62   enum HttpFilterConfigLocation {
63     // Set the HTTP filter config directly in LDS.
64     kHttpFilterConfigInListener,
65     // Enable the HTTP filter in LDS, but override the filter config in route.
66     kHttpFilterConfigInRoute,
67   };
68 
69   enum BootstrapSource {
70     kBootstrapFromChannelArg,
71     kBootstrapFromFile,
72     kBootstrapFromEnvVar,
73   };
74 
set_enable_load_reporting()75   XdsTestType& set_enable_load_reporting() {
76     enable_load_reporting_ = true;
77     return *this;
78   }
79 
set_enable_rds_testing()80   XdsTestType& set_enable_rds_testing() {
81     enable_rds_testing_ = true;
82     return *this;
83   }
84 
set_use_csds_streaming()85   XdsTestType& set_use_csds_streaming() {
86     use_csds_streaming_ = true;
87     return *this;
88   }
89 
set_filter_config_setup(HttpFilterConfigLocation setup)90   XdsTestType& set_filter_config_setup(HttpFilterConfigLocation setup) {
91     filter_config_setup_ = setup;
92     return *this;
93   }
94 
set_bootstrap_source(BootstrapSource bootstrap_source)95   XdsTestType& set_bootstrap_source(BootstrapSource bootstrap_source) {
96     bootstrap_source_ = bootstrap_source;
97     return *this;
98   }
99 
set_rbac_action(::envoy::config::rbac::v3::RBAC_Action action)100   XdsTestType& set_rbac_action(::envoy::config::rbac::v3::RBAC_Action action) {
101     rbac_action_ = action;
102     return *this;
103   }
104 
set_rbac_audit_condition(::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition audit_condition)105   XdsTestType& set_rbac_audit_condition(
106       ::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition
107           audit_condition) {
108     rbac_audit_condition_ = audit_condition;
109     return *this;
110   }
111 
enable_load_reporting()112   bool enable_load_reporting() const { return enable_load_reporting_; }
enable_rds_testing()113   bool enable_rds_testing() const { return enable_rds_testing_; }
use_csds_streaming()114   bool use_csds_streaming() const { return use_csds_streaming_; }
filter_config_setup()115   HttpFilterConfigLocation filter_config_setup() const {
116     return filter_config_setup_;
117   }
bootstrap_source()118   BootstrapSource bootstrap_source() const { return bootstrap_source_; }
rbac_action()119   ::envoy::config::rbac::v3::RBAC_Action rbac_action() const {
120     return rbac_action_;
121   }
122   ::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition
rbac_audit_condition()123   rbac_audit_condition() const {
124     return rbac_audit_condition_;
125   }
126 
AsString()127   std::string AsString() const {
128     std::string retval = "V3";
129     if (enable_load_reporting_) retval += "WithLoadReporting";
130     if (enable_rds_testing_) retval += "Rds";
131     if (use_csds_streaming_) retval += "CsdsStreaming";
132     if (filter_config_setup_ == kHttpFilterConfigInRoute) {
133       retval += "FilterPerRouteOverride";
134     }
135     if (bootstrap_source_ == kBootstrapFromFile) {
136       retval += "BootstrapFromFile";
137     } else if (bootstrap_source_ == kBootstrapFromEnvVar) {
138       retval += "BootstrapFromEnvVar";
139     }
140     if (rbac_action_ == ::envoy::config::rbac::v3::RBAC_Action_ALLOW) {
141       retval += "RbacAllow";
142     } else if (rbac_action_ == ::envoy::config::rbac::v3::RBAC_Action_DENY) {
143       retval += "RbacDeny";
144     }
145     if (rbac_audit_condition_ !=
146         ::envoy::config::rbac::v3::
147             RBAC_AuditLoggingOptions_AuditCondition_NONE) {
148       retval += absl::StrCat("AuditCondition",
149                              ::envoy::config::rbac::v3::
150                                  RBAC_AuditLoggingOptions_AuditCondition_Name(
151                                      rbac_audit_condition_));
152     }
153     return retval;
154   }
155 
156   // For use as the final parameter in INSTANTIATE_TEST_SUITE_P().
Name(const::testing::TestParamInfo<XdsTestType> & info)157   static std::string Name(const ::testing::TestParamInfo<XdsTestType>& info) {
158     return info.param.AsString();
159   }
160 
161  private:
162   bool enable_load_reporting_ = false;
163   bool enable_rds_testing_ = false;
164   bool use_csds_streaming_ = false;
165   HttpFilterConfigLocation filter_config_setup_ = kHttpFilterConfigInListener;
166   BootstrapSource bootstrap_source_ = kBootstrapFromChannelArg;
167   ::envoy::config::rbac::v3::RBAC_Action rbac_action_ =
168       ::envoy::config::rbac::v3::RBAC_Action_LOG;
169   ::envoy::config::rbac::v3::RBAC_AuditLoggingOptions_AuditCondition
170       rbac_audit_condition_ = ::envoy::config::rbac::v3::
171           RBAC_AuditLoggingOptions_AuditCondition_NONE;
172 };
173 
174 // A base class for xDS end-to-end tests.
175 //
176 // An xDS server is provided in balancer_.  It is automatically started
177 // for every test.  Additional xDS servers can be started if needed by
178 // calling CreateAndStartBalancer().
179 //
180 // A default set of LDS, RDS, and CDS resources are created for gRPC
181 // clients, available in default_listener_, default_route_config_, and
182 // default_cluster_.  These resources are automatically loaded into
183 // balancer_ but can be modified by individual tests.  No EDS resource
184 // is provided by default.  There are also default LDS and RDS resources
185 // for the gRPC server side in default_server_listener_ and
186 // default_server_route_config_.  Methods are provided for constructing new
187 // resources that can be added to the xDS server as needed.
188 //
189 // This class provides a mechanism for running backend servers, which will
190 // be stored in backends_.  No servers are created or started by default,
191 // but tests can call CreateAndStartBackends() to start however many
192 // backends they want.  There are also a number of methods for accessing
193 // backends by index, which is the index into the backends_ vector.
194 // For methods that take a start_index and stop_index, this refers to
195 // the indexes in the range [start_index, stop_index).  If stop_index
196 // is 0, backends_.size() is used.  Backends may or may not be
197 // xDS-enabled, at the discretion of the test.
198 class XdsEnd2endTest : public ::testing::TestWithParam<XdsTestType>,
199                        public XdsResourceUtils {
200  protected:
201   // TLS certificate paths.
202   static const char kCaCertPath[];
203   static const char kServerCertPath[];
204   static const char kServerKeyPath[];
205 
206   // Message used in EchoRequest to the backend.
207   static const char kRequestMessage[];
208 
209   // A base class for server threads.
210   class ServerThread {
211    public:
212     // A status notifier for xDS-enabled servers.
213     //
214     // TODO(yashykt): This notifier records the most recent state seen
215     // for every URI and then lets the caller wait until the status for
216     // that URI is the expected one.  If we are expecting an update that
217     // has the same status as the previous one, then we really have no
218     // way of knowing whether the second update has actually been sent.
219     // A better approach here would be to queue the updates received by
220     // the notifier and then have a method to get the next update from
221     // the queue, if any.
222     // Also, we should change the callers to check not just the status
223     // but also the corresponding error message, so that we can verify
224     // that we're emitting useful error messages for our users.
225     class XdsServingStatusNotifier
226         : public grpc::XdsServerServingStatusNotifierInterface {
227      public:
228       void OnServingStatusUpdate(std::string uri,
229                                  ServingStatusUpdate update) override;
230 
231       GRPC_MUST_USE_RESULT bool WaitOnServingStatusChange(
232           const std::string& uri, grpc::StatusCode expected_status,
233           absl::Duration timeout = absl::Seconds(10));
234 
235      private:
236       grpc_core::Mutex mu_;
237       grpc_core::CondVar cond_;
238       std::map<std::string, grpc::Status> status_map ABSL_GUARDED_BY(mu_);
239     };
240 
241     // If use_xds_enabled_server is true, the server will use xDS.
242     // If credentials is null, fake credentials will be used.
243     explicit ServerThread(
244         XdsEnd2endTest* test_obj, bool use_xds_enabled_server = false,
245         std::shared_ptr<ServerCredentials> credentials = nullptr);
246 
~ServerThread()247     virtual ~ServerThread() {
248       // Shutdown should be called manually. Shutdown calls virtual methods and
249       // can't be called from the base class destructor.
250       CHECK(!running_);
251     }
252 
253     void Start();
254     void Shutdown();
255 
target()256     std::string target() const { return absl::StrCat("localhost:", port_); }
257 
port()258     int port() const { return port_; }
259 
notifier()260     XdsServingStatusNotifier* notifier() { return &notifier_; }
261 
262     GRPC_MUST_USE_RESULT bool WaitOnServingStatusChange(
263         grpc::StatusCode expected_status,
264         absl::Duration timeout = absl::Seconds(10)) {
265       return notifier_.WaitOnServingStatusChange(
266           grpc_core::LocalIpAndPort(port_), expected_status, timeout);
267     }
268 
set_allow_put_requests(bool allow_put_requests)269     void set_allow_put_requests(bool allow_put_requests) {
270       allow_put_requests_ = allow_put_requests;
271     }
272 
273     void StopListening();
274 
275     void StopListeningAndSendGoaways();
276 
277    private:
278     class XdsChannelArgsServerBuilderOption;
279 
280     virtual const char* Type() = 0;
281     virtual void RegisterAllServices(ServerBuilder* builder) = 0;
282     virtual void StartAllServices() = 0;
283     virtual void ShutdownAllServices() = 0;
284 
285     void Serve(grpc_core::Mutex* mu, grpc_core::CondVar* cond);
286 
287     XdsEnd2endTest* test_obj_;
288     const bool use_xds_enabled_server_;
289     const std::shared_ptr<ServerCredentials> credentials_;
290     const int port_;
291     std::unique_ptr<Server> server_;
292     XdsServingStatusNotifier notifier_;
293     std::unique_ptr<std::thread> thread_;
294     bool running_ = false;
295     bool allow_put_requests_ = false;
296   };
297 
298   // A server thread for a backend server.
299   class BackendServerThread : public ServerThread {
300    public:
301     // A wrapper around the backend echo test service impl that counts
302     // requests and responses.
303     template <typename RpcService>
304     class BackendServiceImpl
305         : public CountedService<TestMultipleServiceImpl<RpcService>> {
306      public:
BackendServiceImpl()307       BackendServiceImpl() {}
308 
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)309       Status Echo(ServerContext* context, const EchoRequest* request,
310                   EchoResponse* response) override {
311         auto peer_identity = context->auth_context()->GetPeerIdentity();
312         CountedService<
313             TestMultipleServiceImpl<RpcService>>::IncreaseRequestCount();
314         {
315           grpc_core::MutexLock lock(&mu_);
316           clients_.insert(context->peer());
317           last_peer_identity_.clear();
318           for (const auto& entry : peer_identity) {
319             last_peer_identity_.emplace_back(entry.data(), entry.size());
320           }
321         }
322         if (request->has_param() && request->param().has_backend_metrics()) {
323           const auto& request_metrics = request->param().backend_metrics();
324           auto* recorder = context->ExperimentalGetCallMetricRecorder();
325           if (request_metrics.cpu_utilization() != 0) {
326             recorder->RecordCpuUtilizationMetric(
327                 request_metrics.cpu_utilization());
328           }
329           if (request_metrics.mem_utilization() != 0) {
330             recorder->RecordMemoryUtilizationMetric(
331                 request_metrics.mem_utilization());
332           }
333           if (request_metrics.application_utilization() != 0) {
334             recorder->RecordApplicationUtilizationMetric(
335                 request_metrics.application_utilization());
336           }
337           for (const auto& p : request_metrics.named_metrics()) {
338             char* key = static_cast<char*>(
339                 grpc_call_arena_alloc(context->c_call(), p.first.size() + 1));
340             strncpy(key, p.first.data(), p.first.size());
341             key[p.first.size()] = '\0';
342             recorder->RecordNamedMetric(key, p.second);
343           }
344         }
345         const auto status = TestMultipleServiceImpl<RpcService>::Echo(
346             context, request, response);
347         CountedService<
348             TestMultipleServiceImpl<RpcService>>::IncreaseResponseCount();
349         return status;
350       }
351 
Echo1(ServerContext * context,const EchoRequest * request,EchoResponse * response)352       Status Echo1(ServerContext* context, const EchoRequest* request,
353                    EchoResponse* response) override {
354         return Echo(context, request, response);
355       }
356 
Echo2(ServerContext * context,const EchoRequest * request,EchoResponse * response)357       Status Echo2(ServerContext* context, const EchoRequest* request,
358                    EchoResponse* response) override {
359         return Echo(context, request, response);
360       }
361 
Start()362       void Start() {}
Shutdown()363       void Shutdown() {}
364 
clients()365       std::set<std::string> clients() {
366         grpc_core::MutexLock lock(&mu_);
367         return clients_;
368       }
369 
last_peer_identity()370       const std::vector<std::string>& last_peer_identity() {
371         grpc_core::MutexLock lock(&mu_);
372         return last_peer_identity_;
373       }
374 
375      private:
376       grpc_core::Mutex mu_;
377       std::set<std::string> clients_ ABSL_GUARDED_BY(&mu_);
378       std::vector<std::string> last_peer_identity_ ABSL_GUARDED_BY(&mu_);
379     };
380 
381     // If use_xds_enabled_server is true, the server will use xDS.
382     BackendServerThread(XdsEnd2endTest* test_obj, bool use_xds_enabled_server,
383                         std::shared_ptr<ServerCredentials> credentials);
384 
385     BackendServiceImpl<grpc::testing::EchoTestService::Service>*
backend_service()386     backend_service() {
387       return &backend_service_;
388     }
389     BackendServiceImpl<grpc::testing::EchoTest1Service::Service>*
backend_service1()390     backend_service1() {
391       return &backend_service1_;
392     }
393     BackendServiceImpl<grpc::testing::EchoTest2Service::Service>*
backend_service2()394     backend_service2() {
395       return &backend_service2_;
396     }
server_metric_recorder()397     grpc::experimental::ServerMetricRecorder* server_metric_recorder() const {
398       return server_metric_recorder_.get();
399     }
400 
401    private:
Type()402     const char* Type() override { return "Backend"; }
403     void RegisterAllServices(ServerBuilder* builder) override;
404     void StartAllServices() override;
405     void ShutdownAllServices() override;
406 
407     BackendServiceImpl<grpc::testing::EchoTestService::Service>
408         backend_service_;
409     BackendServiceImpl<grpc::testing::EchoTest1Service::Service>
410         backend_service1_;
411     BackendServiceImpl<grpc::testing::EchoTest2Service::Service>
412         backend_service2_;
413     std::unique_ptr<experimental::ServerMetricRecorder> server_metric_recorder_;
414   };
415 
416   // A server thread for the xDS server.
417   class BalancerServerThread : public ServerThread {
418    public:
419     explicit BalancerServerThread(
420         XdsEnd2endTest* test_obj, absl::string_view debug_label,
421         std::shared_ptr<ServerCredentials> credentials);
422 
ads_service()423     AdsServiceImpl* ads_service() { return ads_service_.get(); }
lrs_service()424     LrsServiceImpl* lrs_service() { return lrs_service_.get(); }
425 
426    private:
Type()427     const char* Type() override { return "Balancer"; }
428     void RegisterAllServices(ServerBuilder* builder) override;
429     void StartAllServices() override;
430     void ShutdownAllServices() override;
431 
432     std::shared_ptr<AdsServiceImpl> ads_service_;
433     std::shared_ptr<LrsServiceImpl> lrs_service_;
434   };
435 
436   // RPC services used to talk to the backends.
437   enum RpcService {
438     SERVICE_ECHO,
439     SERVICE_ECHO1,
440     SERVICE_ECHO2,
441   };
442 
443   // RPC methods used to talk to the backends.
444   enum RpcMethod {
445     METHOD_ECHO,
446     METHOD_ECHO1,
447     METHOD_ECHO2,
448   };
449 
450   // If balancer_credentials is null, it defaults to fake credentials.
451   explicit XdsEnd2endTest(
452       std::shared_ptr<ServerCredentials> balancer_credentials = nullptr);
453 
SetUp()454   void SetUp() override { InitClient(); }
455   void TearDown() override;
456 
457   //
458   // xDS server management
459   //
460 
461   // Creates and starts a new balancer, running in its own thread.
462   // Most tests will not need to call this; instead, they can use
463   // balancer_, which is already populated with default resources.
464   // If credentials is null, it defaults to fake credentials.
465   std::unique_ptr<BalancerServerThread> CreateAndStartBalancer(
466       absl::string_view debug_label = "",
467       std::shared_ptr<ServerCredentials> credentials = nullptr);
468 
469   // Sets the Listener and RouteConfiguration resource on the specified
470   // balancer.  If RDS is in use, they will be set as separate resources;
471   // otherwise, the RouteConfig will be inlined into the Listener.
472   void SetListenerAndRouteConfiguration(
473       BalancerServerThread* balancer, Listener listener,
474       const RouteConfiguration& route_config,
475       const HcmAccessor& hcm_accessor = ClientHcmAccessor()) {
476     XdsResourceUtils::SetListenerAndRouteConfiguration(
477         balancer->ads_service(), std::move(listener), route_config,
478         GetParam().enable_rds_testing(), hcm_accessor);
479   }
480 
481   // A convenient wrapper for setting the Listener and
482   // RouteConfiguration resources on the server side.
SetServerListenerNameAndRouteConfiguration(BalancerServerThread * balancer,Listener listener,int port,const RouteConfiguration & route_config)483   void SetServerListenerNameAndRouteConfiguration(
484       BalancerServerThread* balancer, Listener listener, int port,
485       const RouteConfiguration& route_config) {
486     SetListenerAndRouteConfiguration(
487         balancer, PopulateServerListenerNameAndPort(listener, port),
488         route_config, ServerHcmAccessor());
489   }
490 
491   // Sets the RouteConfiguration resource on the specified balancer.
492   // If RDS is in use, it will be set directly as an independent
493   // resource; otherwise, it will be inlined into a Listener resource
494   // (either listener_to_copy, or if that is null, default_listener_).
495   void SetRouteConfiguration(BalancerServerThread* balancer,
496                              const RouteConfiguration& route_config,
497                              const Listener* listener_to_copy = nullptr) {
498     XdsResourceUtils::SetRouteConfiguration(
499         balancer->ads_service(), route_config, GetParam().enable_rds_testing(),
500         listener_to_copy);
501   }
502 
503   // Helper method for generating an endpoint for a backend, for use in
504   // constructing an EDS resource.
505   EdsResourceArgs::Endpoint CreateEndpoint(
506       size_t backend_idx,
507       ::envoy::config::core::v3::HealthStatus health_status =
508           ::envoy::config::core::v3::HealthStatus::UNKNOWN,
509       int lb_weight = 1, std::vector<size_t> additional_backend_indexes = {},
510       absl::string_view hostname = "",
511       const std::map<std::string, std::string /*JSON*/>& metadata = {}) {
512     std::vector<int> additional_ports;
513     additional_ports.reserve(additional_backend_indexes.size());
514     for (size_t idx : additional_backend_indexes) {
515       additional_ports.push_back(backends_[idx]->port());
516     }
517     return EdsResourceArgs::Endpoint(backends_[backend_idx]->port(),
518                                      health_status, lb_weight, additional_ports,
519                                      hostname, metadata);
520   }
521 
522   // Creates a vector of endpoints for a specified range of backends,
523   // for use in constructing an EDS resource.
524   std::vector<EdsResourceArgs::Endpoint> CreateEndpointsForBackends(
525       size_t start_index = 0, size_t stop_index = 0,
526       ::envoy::config::core::v3::HealthStatus health_status =
527           ::envoy::config::core::v3::HealthStatus::UNKNOWN,
528       int lb_weight = 1);
529 
530   // Returns an endpoint for an unused port, for use in constructing an
531   // EDS resource.
MakeNonExistentEndpoint()532   EdsResourceArgs::Endpoint MakeNonExistentEndpoint() {
533     return EdsResourceArgs::Endpoint(grpc_pick_unused_port_or_die());
534   }
535 
536   //
537   // Backend management
538   //
539 
540   // Creates num_backends backends and stores them in backends_, but does
541   // not actually start them.  If xds_enabled is true, the backends are
542   // xDS-enabled.  If credentials is null, it defaults to fake credentials.
543   void CreateBackends(
544       size_t num_backends, bool xds_enabled = false,
545       std::shared_ptr<ServerCredentials> credentials = nullptr) {
546     for (size_t i = 0; i < num_backends; ++i) {
547       backends_.emplace_back(
548           new BackendServerThread(this, xds_enabled, credentials));
549     }
550   }
551 
552   // Starts all backends in backends_.
StartAllBackends()553   void StartAllBackends() {
554     for (auto& backend : backends_) backend->Start();
555   }
556 
557   // Same as CreateBackends(), but also starts the backends.
558   void CreateAndStartBackends(
559       size_t num_backends, bool xds_enabled = false,
560       std::shared_ptr<ServerCredentials> credentials = nullptr) {
561     CreateBackends(num_backends, xds_enabled, std::move(credentials));
562     StartAllBackends();
563   }
564 
565   // Starts the backend at backends_[index].
StartBackend(size_t index)566   void StartBackend(size_t index) { backends_[index]->Start(); }
567 
568   // Shuts down all backends in backends_.
ShutdownAllBackends()569   void ShutdownAllBackends() {
570     for (auto& backend : backends_) backend->Shutdown();
571   }
572 
573   // Shuts down the backend at backends_[index].
ShutdownBackend(size_t index)574   void ShutdownBackend(size_t index) { backends_[index]->Shutdown(); }
575 
576   // Resets the request counters for backends in the specified range.
577   void ResetBackendCounters(size_t start_index = 0, size_t stop_index = 0);
578 
579   // Returns true if the specified backend has received requests for the
580   // specified service.
581   bool SeenBackend(size_t backend_idx,
582                    const RpcService rpc_service = SERVICE_ECHO);
583 
584   // Returns true if all backends in the specified range have received
585   // requests for the specified service.
586   bool SeenAllBackends(size_t start_index = 0, size_t stop_index = 0,
587                        const RpcService rpc_service = SERVICE_ECHO);
588 
589   // Returns a vector containing the port for every backend in the
590   // specified range.
591   std::vector<int> GetBackendPorts(size_t start_index = 0,
592                                    size_t stop_index = 0) const;
593 
594   //
595   // Client management
596   //
597 
598   // Initializes global state for the client, such as the bootstrap file
599   // and channel args for the XdsClient.  Then calls ResetStub().
600   // All tests must call this exactly once at the start of the test.
601   // If credentials is null, fake credentials will be used.
602   void InitClient(absl::optional<XdsBootstrapBuilder> builder = absl::nullopt,
603                   std::string lb_expected_authority = "",
604                   int xds_resource_does_not_exist_timeout_ms = 0,
605                   std::string balancer_authority_override = "",
606                   ChannelArguments* args = nullptr,
607                   std::shared_ptr<ChannelCredentials> credentials = nullptr);
608 
MakeBootstrapBuilder()609   XdsBootstrapBuilder MakeBootstrapBuilder() {
610     return XdsBootstrapBuilder().SetServers(
611         {absl::StrCat("localhost:", balancer_->port())});
612   }
613 
614   // Sets channel_, stub_, stub1_, and stub2_.
615   // If credentials is null, fake credentials will be used.
616   void ResetStub(int failover_timeout_ms = 0, ChannelArguments* args = nullptr,
617                  std::shared_ptr<ChannelCredentials> credentials = nullptr);
618 
619   // Creates a new client channel.  Requires that InitClient() has
620   // already been called.
621   // If credentials is null, fake credentials will be used.
622   std::shared_ptr<Channel> CreateChannel(
623       int failover_timeout_ms = 0, const char* server_name = kServerName,
624       const char* xds_authority = "", ChannelArguments* args = nullptr,
625       std::shared_ptr<ChannelCredentials> credentials = nullptr);
626 
627   //
628   // Sending RPCs
629   //
630 
631   // Options used for sending an RPC.
632   struct RpcOptions {
633     RpcService service = SERVICE_ECHO;
634     RpcMethod method = METHOD_ECHO;
635     // Will be multiplied by grpc_test_slowdown_factor().
636     int timeout_ms = 5000;
637     bool wait_for_ready = false;
638     std::vector<std::pair<std::string, std::string>> metadata;
639     // These options are used by the backend service impl.
640     bool server_fail = false;
641     int server_sleep_us = 0;
642     int client_cancel_after_us = 0;
643     bool skip_cancelled_check = false;
644     StatusCode server_expected_error = StatusCode::OK;
645     absl::optional<xds::data::orca::v3::OrcaLoadReport> backend_metrics;
646     bool server_notify_client_when_started = false;
647     bool echo_host_from_authority_header = false;
648     bool echo_metadata_initially = false;
649 
RpcOptionsRpcOptions650     RpcOptions() {}
651 
set_rpc_serviceRpcOptions652     RpcOptions& set_rpc_service(RpcService rpc_service) {
653       service = rpc_service;
654       return *this;
655     }
656 
set_rpc_methodRpcOptions657     RpcOptions& set_rpc_method(RpcMethod rpc_method) {
658       method = rpc_method;
659       return *this;
660     }
661 
set_timeout_msRpcOptions662     RpcOptions& set_timeout_ms(int rpc_timeout_ms) {
663       timeout_ms = rpc_timeout_ms;
664       return *this;
665     }
666 
set_timeoutRpcOptions667     RpcOptions& set_timeout(grpc_core::Duration timeout) {
668       timeout_ms = timeout.millis();
669       return *this;
670     }
671 
set_wait_for_readyRpcOptions672     RpcOptions& set_wait_for_ready(bool rpc_wait_for_ready) {
673       wait_for_ready = rpc_wait_for_ready;
674       return *this;
675     }
676 
set_metadataRpcOptions677     RpcOptions& set_metadata(
678         std::vector<std::pair<std::string, std::string>> rpc_metadata) {
679       metadata = std::move(rpc_metadata);
680       return *this;
681     }
682 
set_server_failRpcOptions683     RpcOptions& set_server_fail(bool rpc_server_fail) {
684       server_fail = rpc_server_fail;
685       return *this;
686     }
687 
set_server_sleep_usRpcOptions688     RpcOptions& set_server_sleep_us(int rpc_server_sleep_us) {
689       server_sleep_us = rpc_server_sleep_us;
690       return *this;
691     }
692 
set_client_cancel_after_usRpcOptions693     RpcOptions& set_client_cancel_after_us(int rpc_client_cancel_after_us) {
694       client_cancel_after_us = rpc_client_cancel_after_us;
695       return *this;
696     }
697 
set_skip_cancelled_checkRpcOptions698     RpcOptions& set_skip_cancelled_check(bool rpc_skip_cancelled_check) {
699       skip_cancelled_check = rpc_skip_cancelled_check;
700       return *this;
701     }
702 
set_server_expected_errorRpcOptions703     RpcOptions& set_server_expected_error(StatusCode code) {
704       server_expected_error = code;
705       return *this;
706     }
707 
set_backend_metricsRpcOptions708     RpcOptions& set_backend_metrics(
709         absl::optional<xds::data::orca::v3::OrcaLoadReport> metrics) {
710       backend_metrics = std::move(metrics);
711       return *this;
712     }
713 
set_server_notify_client_when_startedRpcOptions714     RpcOptions& set_server_notify_client_when_started(
715         bool rpc_server_notify_client_when_started) {
716       server_notify_client_when_started = rpc_server_notify_client_when_started;
717       return *this;
718     }
719 
set_echo_host_from_authority_headerRpcOptions720     RpcOptions& set_echo_host_from_authority_header(bool value) {
721       echo_host_from_authority_header = value;
722       return *this;
723     }
724 
set_echo_metadata_initiallyRpcOptions725     RpcOptions& set_echo_metadata_initially(bool value) {
726       echo_metadata_initially = value;
727       return *this;
728     }
729 
730     // Populates context and request.
731     void SetupRpc(ClientContext* context, EchoRequest* request) const;
732   };
733 
734   // Sends an RPC with the specified options.
735   // If response is non-null, it will be populated with the response.
736   // Returns the status of the RPC.
737   Status SendRpc(const RpcOptions& rpc_options = RpcOptions(),
738                  EchoResponse* response = nullptr,
739                  std::multimap<std::string, std::string>*
740                      server_initial_metadata = nullptr);
741 
742   // Internal helper function for SendRpc().
743   template <typename Stub>
SendRpcMethod(Stub * stub,const RpcOptions & rpc_options,ClientContext * context,EchoRequest & request,EchoResponse * response)744   static Status SendRpcMethod(Stub* stub, const RpcOptions& rpc_options,
745                               ClientContext* context, EchoRequest& request,
746                               EchoResponse* response) {
747     switch (rpc_options.method) {
748       case METHOD_ECHO:
749         return stub->Echo(context, request, response);
750       case METHOD_ECHO1:
751         return stub->Echo1(context, request, response);
752       case METHOD_ECHO2:
753         return stub->Echo2(context, request, response);
754     }
755     GPR_UNREACHABLE_CODE(return grpc::Status::OK);
756   }
757 
758   // Send RPCs in a loop until either continue_predicate() returns false
759   // or timeout_ms elapses.
760   struct RpcResult {
761     Status status;
762     EchoResponse response;
763   };
764   void SendRpcsUntil(const grpc_core::DebugLocation& debug_location,
765                      std::function<bool(const RpcResult&)> continue_predicate,
766                      int timeout_ms = 15000,
767                      const RpcOptions& rpc_options = RpcOptions());
768 
769   // Sends the specified number of RPCs and fails if the RPC fails.
770   void CheckRpcSendOk(const grpc_core::DebugLocation& debug_location,
771                       const size_t times = 1,
772                       const RpcOptions& rpc_options = RpcOptions());
773 
774   // Sends one RPC, which must fail with the specified status code and
775   // a message matching the specified regex.
776   void CheckRpcSendFailure(const grpc_core::DebugLocation& debug_location,
777                            StatusCode expected_status,
778                            absl::string_view expected_message_regex,
779                            const RpcOptions& rpc_options = RpcOptions());
780 
781   // Sends RPCs until either a timeout or an RPC fail, in which case the
782   // failure must match the specified status and message regex.
783   void SendRpcsUntilFailure(const grpc_core::DebugLocation& debug_location,
784                             StatusCode expected_status,
785                             absl::string_view expected_message_regex,
786                             int timeout_ms = 15000,
787                             const RpcOptions& rpc_options = RpcOptions());
788 
789   // Sends num_rpcs RPCs, counting how many of them fail with a message
790   // matching the specified expected_message_prefix.
791   // Any failure with a non-matching status or message is a test failure.
792   size_t SendRpcsAndCountFailuresWithMessage(
793       const grpc_core::DebugLocation& debug_location, size_t num_rpcs,
794       StatusCode expected_status, absl::string_view expected_message_prefix,
795       const RpcOptions& rpc_options = RpcOptions());
796 
797   // A class for running a long-running RPC in its own thread.
798   // TODO(roth): Maybe consolidate this and SendConcurrentRpcs()
799   // somehow?  LongRunningRpc has a cleaner API, but SendConcurrentRpcs()
800   // uses the callback API, which is probably better.
801   class LongRunningRpc {
802    public:
803     // Starts the RPC.
804     void StartRpc(grpc::testing::EchoTestService::Stub* stub,
805                   const RpcOptions& rpc_options =
806                       RpcOptions().set_timeout_ms(0).set_client_cancel_after_us(
807                           1 * 1000 * 1000));
808 
809     // Cancels the RPC.
810     void CancelRpc();
811 
812     // Gets the RPC's status.  Blocks if the RPC is not yet complete.
813     Status GetStatus();
814 
815    private:
816     std::thread sender_thread_;
817     ClientContext context_;
818     Status status_;
819   };
820 
821   // Starts a set of concurrent RPCs.
822   struct ConcurrentRpc {
823     ClientContext context;
824     Status status;
825     grpc_core::Duration elapsed_time;
826     EchoResponse response;
827   };
828   std::vector<std::unique_ptr<ConcurrentRpc>> SendConcurrentRpcs(
829       const grpc_core::DebugLocation& debug_location,
830       grpc::testing::EchoTestService::Stub* stub, size_t num_rpcs,
831       const RpcOptions& rpc_options);
832 
833   //
834   // Waiting for individual backends to be seen by the client
835   //
836 
837   struct WaitForBackendOptions {
838     // If true, resets the backend counters before returning.
839     bool reset_counters = true;
840     // How long to wait for the backend(s) to see requests.
841     // Will be multiplied by grpc_test_slowdown_factor().
842     int timeout_ms = 15000;
843 
WaitForBackendOptionsWaitForBackendOptions844     WaitForBackendOptions() {}
845 
set_reset_countersWaitForBackendOptions846     WaitForBackendOptions& set_reset_counters(bool enable) {
847       reset_counters = enable;
848       return *this;
849     }
850 
set_timeout_msWaitForBackendOptions851     WaitForBackendOptions& set_timeout_ms(int ms) {
852       timeout_ms = ms;
853       return *this;
854     }
855   };
856 
857   // Sends RPCs until all of the backends in the specified range see requests.
858   // The check_status callback will be invoked to check the status of
859   // every RPC; if null, the default is to check that the RPC succeeded.
860   // Returns the total number of RPCs sent.
861   size_t WaitForAllBackends(
862       const grpc_core::DebugLocation& debug_location, size_t start_index = 0,
863       size_t stop_index = 0,
864       std::function<void(const RpcResult&)> check_status = nullptr,
865       const WaitForBackendOptions& wait_options = WaitForBackendOptions(),
866       const RpcOptions& rpc_options = RpcOptions());
867 
868   // Sends RPCs until the backend at index backend_idx sees requests.
869   void WaitForBackend(
870       const grpc_core::DebugLocation& debug_location, size_t backend_idx,
871       std::function<void(const RpcResult&)> check_status = nullptr,
872       const WaitForBackendOptions& wait_options = WaitForBackendOptions(),
873       const RpcOptions& rpc_options = RpcOptions()) {
874     WaitForAllBackends(debug_location, backend_idx, backend_idx + 1,
875                        check_status, wait_options, rpc_options);
876   }
877 
878   //
879   // Waiting for xDS NACKs
880   //
881   // These methods send RPCs in a loop until they see a NACK from the
882   // xDS server, or until a timeout expires.
883 
884   // Sends RPCs until get_state() returns a response.
885   // TODO(roth): Does this actually need to send RPCs, or can it just
886   // use a condition variable to wait?  I suspect that we need to be
887   // sending RPCs for polling reasons, but that should go away when we
888   // finish the EventEngine migration.  Once that's done, try changing
889   // this to not send RPCs.
890   // Also, consider refactoring to also support waiting for ACKs, since
891   // there are several use-cases where tests are doing that.
892   absl::optional<AdsServiceImpl::ResponseState> WaitForNack(
893       const grpc_core::DebugLocation& debug_location,
894       std::function<absl::optional<AdsServiceImpl::ResponseState>()> get_state,
895       const RpcOptions& rpc_options = RpcOptions(),
896       StatusCode expected_status = StatusCode::UNAVAILABLE);
897 
898   // Sends RPCs until an LDS NACK is seen.
899   absl::optional<AdsServiceImpl::ResponseState> WaitForLdsNack(
900       const grpc_core::DebugLocation& debug_location,
901       const RpcOptions& rpc_options = RpcOptions(),
902       StatusCode expected_status = StatusCode::UNAVAILABLE) {
903     return WaitForNack(
904         debug_location,
905         [&]() { return balancer_->ads_service()->lds_response_state(); },
906         rpc_options, expected_status);
907   }
908 
909   // Sends RPCs until an RDS NACK is seen.
910   absl::optional<AdsServiceImpl::ResponseState> WaitForRdsNack(
911       const grpc_core::DebugLocation& debug_location,
912       const RpcOptions& rpc_options = RpcOptions(),
913       StatusCode expected_status = StatusCode::UNAVAILABLE) {
914     return WaitForNack(
915         debug_location,
916         [&]() { return RouteConfigurationResponseState(balancer_.get()); },
917         rpc_options, expected_status);
918   }
919 
920   // Sends RPCs until a CDS NACK is seen.
921   absl::optional<AdsServiceImpl::ResponseState> WaitForCdsNack(
922       const grpc_core::DebugLocation& debug_location,
923       const RpcOptions& rpc_options = RpcOptions(),
924       StatusCode expected_status = StatusCode::UNAVAILABLE) {
925     return WaitForNack(
926         debug_location,
927         [&]() { return balancer_->ads_service()->cds_response_state(); },
928         rpc_options, expected_status);
929   }
930 
931   // Sends RPCs until an EDS NACK is seen.
932   absl::optional<AdsServiceImpl::ResponseState> WaitForEdsNack(
933       const grpc_core::DebugLocation& debug_location,
934       const RpcOptions& rpc_options = RpcOptions()) {
935     return WaitForNack(
936         debug_location,
937         [&]() { return balancer_->ads_service()->eds_response_state(); },
938         rpc_options);
939   }
940 
941   // Convenient front-end to wait for RouteConfiguration to be NACKed,
942   // regardless of whether it's sent in LDS or RDS.
943   absl::optional<AdsServiceImpl::ResponseState> WaitForRouteConfigNack(
944       const grpc_core::DebugLocation& debug_location,
945       const RpcOptions& rpc_options = RpcOptions(),
946       StatusCode expected_status = StatusCode::UNAVAILABLE) {
947     if (GetParam().enable_rds_testing()) {
948       return WaitForRdsNack(debug_location, rpc_options, expected_status);
949     }
950     return WaitForLdsNack(debug_location, rpc_options, expected_status);
951   }
952 
953   // Convenient front-end for accessing xDS response state for a
954   // RouteConfiguration, regardless of whether it's sent in LDS or RDS.
RouteConfigurationResponseState(BalancerServerThread * balancer)955   absl::optional<AdsServiceImpl::ResponseState> RouteConfigurationResponseState(
956       BalancerServerThread* balancer) const {
957     AdsServiceImpl* ads_service = balancer->ads_service();
958     if (GetParam().enable_rds_testing()) {
959       return ads_service->rds_response_state();
960     }
961     return ads_service->lds_response_state();
962   }
963 
964   //
965   // Miscellaneous helper methods
966   //
967 
968   // There is slight difference between time fetched by GPR and by C++ system
969   // clock API. It's unclear if they are using the same syscall, but we do know
970   // GPR round the number at millisecond-level. This creates a 1ms difference,
971   // which could cause flake.
NowFromCycleCounter()972   static grpc_core::Timestamp NowFromCycleCounter() {
973     return grpc_core::Timestamp::FromTimespecRoundDown(
974         gpr_now(GPR_CLOCK_MONOTONIC));
975   }
976 
977   // Sets duration_proto to duration times grpc_test_slowdown_factor().
978   static void SetProtoDuration(grpc_core::Duration duration,
979                                google::protobuf::Duration* duration_proto);
980 
981   // Returns the number of RPCs needed to pass error_tolerance at 99.99994%
982   // chance. Rolling dices in drop/fault-injection generates a binomial
983   // distribution (if our code is not horribly wrong). Let's make "n" the number
984   // of samples, "p" the probability. If we have np>5 & n(1-p)>5, we can
985   // approximately treat the binomial distribution as a normal distribution.
986   //
987   // For normal distribution, we can easily look up how many standard deviation
988   // we need to reach 99.995%. Based on Wiki's table
989   // https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule, we need 5.00
990   // sigma (standard deviation) to cover the probability area of 99.99994%. In
991   // another word, for a sample with size "n" probability "p" error-tolerance
992   // "k", we want the error always land within 5.00 sigma. The sigma of
993   // binomial distribution and be computed as sqrt(np(1-p)). Hence, we have
994   // the equation:
995   //
996   //   kn <= 5.00 * sqrt(np(1-p))
997   // TODO(yashykt): The above explanation assumes a normal distribution, but we
998   // use a uniform distribution instead. We need a better estimate of how many
999   // RPCs are needed with what error tolerance.
ComputeIdealNumRpcs(double p,double error_tolerance)1000   static size_t ComputeIdealNumRpcs(double p, double error_tolerance) {
1001     CHECK_GE(p, 0);
1002     CHECK_LE(p, 1);
1003     size_t num_rpcs =
1004         ceil(p * (1 - p) * 5.00 * 5.00 / error_tolerance / error_tolerance);
1005     num_rpcs += 1000;  // Add 1K as a buffer to avoid flakiness.
1006     LOG(INFO) << "Sending " << num_rpcs << " RPCs for percentage=" << p
1007               << " error_tolerance=" << error_tolerance;
1008     return num_rpcs;
1009   }
1010 
1011   // Returns a regex that can be matched against an RPC failure status
1012   // message for a connection failure.
1013   static std::string MakeConnectionFailureRegex(
1014       absl::string_view prefix, bool has_resolution_note = true);
1015 
1016   // Returns a regex that can be matched against an RPC failure status
1017   // message for a Tls handshake failure.
1018   static std::string MakeTlsHandshakeFailureRegex(absl::string_view prefix);
1019 
1020   // Returns a private key pair, read from local files.
1021   static grpc_core::PemKeyCertPairList ReadTlsIdentityPair(
1022       const char* key_path, const char* cert_path);
1023 
1024   // Internal helper function for creating TLS and mTLS creds.
1025   // Not intended to be used by tests.
1026   static std::vector<experimental::IdentityKeyCertPair>
1027   MakeIdentityKeyCertPairForTlsCreds();
1028 
1029   // Returns XdsCredentials with mTLS fallback creds.
1030   static std::shared_ptr<ChannelCredentials> CreateXdsChannelCredentials();
1031   static std::shared_ptr<ChannelCredentials> CreateTlsChannelCredentials();
1032 
1033   // Creates various types of server credentials.
1034   static std::shared_ptr<ServerCredentials> CreateFakeServerCredentials();
1035   static std::shared_ptr<ServerCredentials> CreateMtlsServerCredentials();
1036   static std::shared_ptr<ServerCredentials> CreateTlsServerCredentials();
1037 
1038   std::unique_ptr<BalancerServerThread> balancer_;
1039 
1040   std::shared_ptr<Channel> channel_;
1041   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
1042   std::unique_ptr<grpc::testing::EchoTest1Service::Stub> stub1_;
1043   std::unique_ptr<grpc::testing::EchoTest2Service::Stub> stub2_;
1044 
1045   std::vector<std::unique_ptr<BackendServerThread>> backends_;
1046 
1047   // Default xDS resources.
1048   Listener default_listener_;
1049   RouteConfiguration default_route_config_;
1050   Listener default_server_listener_;
1051   RouteConfiguration default_server_route_config_;
1052   Cluster default_cluster_;
1053 
1054   int xds_drain_grace_time_ms_ = 10 * 60 * 1000;  // 10 mins
1055 
1056   bool bootstrap_contents_from_env_var_;
1057   std::string bootstrap_;
1058   char* bootstrap_file_ = nullptr;
1059   absl::InlinedVector<grpc_arg, 3> xds_channel_args_to_add_;
1060   grpc_channel_args xds_channel_args_;
1061 };
1062 
1063 }  // namespace testing
1064 }  // namespace grpc
1065 
1066 #endif  // GRPC_TEST_CPP_END2END_XDS_XDS_END2END_TEST_LIB_H
1067