• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 #include <gmock/gmock.h>
17 #include <grpcpp/create_channel.h>
18 #include <grpcpp/security/credentials.h>
19 #include <gtest/gtest.h>
20 
21 #include <memory>
22 #include <string>
23 #include <vector>
24 
25 #include "absl/log/log.h"
26 #include "absl/memory/memory.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/strip.h"
29 #include "envoy/config/cluster/v3/cluster.pb.h"
30 #include "envoy/config/endpoint/v3/endpoint.pb.h"
31 #include "envoy/config/listener/v3/listener.pb.h"
32 #include "envoy/config/route/v3/route.pb.h"
33 #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
34 #include "src/core/client_channel/backup_poller.h"
35 #include "src/core/config/config_vars.h"
36 #include "test/core/test_util/resolve_localhost_ip46.h"
37 #include "test/core/test_util/test_config.h"
38 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
39 #include "test/cpp/util/credentials.h"
40 
41 #ifndef DISABLED_XDS_PROTO_IN_CC
42 
43 #include "src/cpp/server/csds/csds.h"
44 #include "src/proto/grpc/testing/xds/v3/csds.grpc.pb.h"
45 
46 namespace grpc {
47 namespace testing {
48 namespace {
49 
50 using ::envoy::admin::v3::ClientResourceStatus;
51 using ::envoy::config::cluster::v3::Cluster;
52 using ::envoy::config::endpoint::v3::ClusterLoadAssignment;
53 using ::envoy::config::listener::v3::Listener;
54 using ::envoy::config::route::v3::RouteConfiguration;
55 using ::envoy::extensions::filters::network::http_connection_manager::v3::
56     HttpConnectionManager;
57 
58 MATCHER_P4(EqNode, id, user_agent_name, user_agent_version, client_features,
59            "equals Node") {
60   bool ok = true;
61   ok &= ::testing::ExplainMatchResult(id, arg.id(), result_listener);
62   ok &= ::testing::ExplainMatchResult(user_agent_name, arg.user_agent_name(),
63                                       result_listener);
64   ok &= ::testing::ExplainMatchResult(
65       user_agent_version, arg.user_agent_version(), result_listener);
66   ok &= ::testing::ExplainMatchResult(client_features, arg.client_features(),
67                                       result_listener);
68   return ok;
69 }
70 
71 MATCHER_P6(EqGenericXdsConfig, type_url, name, version_info, xds_config,
72            client_status, error_state, "equals GenericXdsConfig") {
73   bool ok = true;
74   ok &=
75       ::testing::ExplainMatchResult(type_url, arg.type_url(), result_listener);
76   ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
77   ok &= ::testing::ExplainMatchResult(version_info, arg.version_info(),
78                                       result_listener);
79   ok &= ::testing::ExplainMatchResult(xds_config, arg.xds_config(),
80                                       result_listener);
81   ok &= ::testing::ExplainMatchResult(client_status, arg.client_status(),
82                                       result_listener);
83   ok &= ::testing::ExplainMatchResult(error_state, arg.error_state(),
84                                       result_listener);
85   return ok;
86 }
87 
88 MATCHER_P2(EqListener, name, api_listener, "equals Listener") {
89   bool ok = true;
90   ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
91   ok &= ::testing::ExplainMatchResult(
92       api_listener, arg.api_listener().api_listener(), result_listener);
93   return ok;
94 }
95 
96 MATCHER_P(EqHttpConnectionManagerNotRds, route_config,
97           "equals HttpConnectionManager") {
98   bool ok = true;
99   ok &= ::testing::ExplainMatchResult(route_config, arg.route_config(),
100                                       result_listener);
101   return ok;
102 }
103 
104 MATCHER_P(EqRouteConfigurationName, name, "equals RouteConfiguration") {
105   bool ok = true;
106   ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
107   return ok;
108 }
109 
110 MATCHER_P2(EqRouteConfiguration, name, cluster_name,
111            "equals RouteConfiguration") {
112   bool ok = true;
113   ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
114   ok &= ::testing::ExplainMatchResult(
115       ::testing::ElementsAre(::testing::Property(
116           &envoy::config::route::v3::VirtualHost::routes,
117           ::testing::ElementsAre(::testing::Property(
118               &envoy::config::route::v3::Route::route,
119               ::testing::Property(
120                   &envoy::config::route::v3::RouteAction::cluster,
121                   cluster_name))))),
122       arg.virtual_hosts(), result_listener);
123   return ok;
124 }
125 
126 MATCHER_P(EqCluster, name, "equals Cluster") {
127   bool ok = true;
128   ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
129   return ok;
130 }
131 
132 MATCHER_P(EqEndpoint, port, "equals Endpoint") {
133   bool ok = true;
134   ok &= ::testing::ExplainMatchResult(
135       port, arg.address().socket_address().port_value(), result_listener);
136   return ok;
137 }
138 
139 MATCHER_P2(EqLocalityLbEndpoints, port, weight, "equals LocalityLbEndpoints") {
140   bool ok = true;
141   ok &= ::testing::ExplainMatchResult(
142       ::testing::ElementsAre(::testing::Property(
143           &envoy::config::endpoint::v3::LbEndpoint::endpoint,
144           EqEndpoint(port))),
145       arg.lb_endpoints(), result_listener);
146   ok &= ::testing::ExplainMatchResult(
147       weight, arg.load_balancing_weight().value(), result_listener);
148   return ok;
149 }
150 
151 MATCHER_P(EqClusterLoadAssignmentName, cluster_name,
152           "equals ClusterLoadAssignment") {
153   bool ok = true;
154   ok &= ::testing::ExplainMatchResult(cluster_name, arg.cluster_name(),
155                                       result_listener);
156   return ok;
157 }
158 
159 MATCHER_P3(EqClusterLoadAssignment, cluster_name, port, weight,
160            "equals ClusterLoadAssignment") {
161   bool ok = true;
162   ok &= ::testing::ExplainMatchResult(cluster_name, arg.cluster_name(),
163                                       result_listener);
164   ok &= ::testing::ExplainMatchResult(
165       ::testing::ElementsAre(EqLocalityLbEndpoints(port, weight)),
166       arg.endpoints(), result_listener);
167   return ok;
168 }
169 
170 MATCHER_P2(EqUpdateFailureState, details, version_info,
171            "equals UpdateFailureState") {
172   bool ok = true;
173   ok &= ::testing::ExplainMatchResult(details, arg.details(), result_listener);
174   ok &= ::testing::ExplainMatchResult(version_info, arg.version_info(),
175                                       result_listener);
176   return ok;
177 }
178 
179 MATCHER_P(UnpackListener, matcher, "is a Listener") {
180   Listener config;
181   if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
182                                      result_listener)) {
183     return false;
184   }
185   return ::testing::ExplainMatchResult(matcher, config, result_listener);
186 }
187 
188 MATCHER_P(UnpackRouteConfiguration, matcher, "is a RouteConfiguration") {
189   RouteConfiguration config;
190   if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
191                                      result_listener)) {
192     return false;
193   }
194   return ::testing::ExplainMatchResult(matcher, config, result_listener);
195 }
196 
197 MATCHER_P(UnpackHttpConnectionManager, matcher, "is a HttpConnectionManager") {
198   HttpConnectionManager config;
199   if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
200                                      result_listener)) {
201     return false;
202   }
203   return ::testing::ExplainMatchResult(matcher, config, result_listener);
204 }
205 
206 MATCHER_P(UnpackCluster, matcher, "is a Cluster") {
207   Cluster config;
208   if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
209                                      result_listener)) {
210     return false;
211   }
212   return ::testing::ExplainMatchResult(matcher, config, result_listener);
213 }
214 
215 MATCHER_P(UnpackClusterLoadAssignment, matcher, "is a ClusterLoadAssignment") {
216   ClusterLoadAssignment config;
217   if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
218                                      result_listener)) {
219     return false;
220   }
221   return ::testing::ExplainMatchResult(matcher, config, result_listener);
222 }
223 
224 MATCHER(IsRdsEnabledHCM, "is a RDS enabled HttpConnectionManager") {
225   return ::testing::ExplainMatchResult(
226       UnpackHttpConnectionManager(
227           ::testing::Property(&HttpConnectionManager::has_rds, true)),
228       arg, result_listener);
229 }
230 
231 MATCHER_P2(EqNoRdsHCM, route_configuration_name, cluster_name,
232            "equals RDS disabled HttpConnectionManager") {
233   return ::testing::ExplainMatchResult(
234       UnpackHttpConnectionManager(EqHttpConnectionManagerNotRds(
235           EqRouteConfiguration(route_configuration_name, cluster_name))),
236       arg, result_listener);
237 }
238 
239 class ClientStatusDiscoveryServiceTest : public XdsEnd2endTest {
240  public:
ClientStatusDiscoveryServiceTest()241   ClientStatusDiscoveryServiceTest() {
242     admin_server_thread_ = std::make_unique<AdminServerThread>(this);
243     admin_server_thread_->Start();
244     std::string admin_server_address =
245         grpc_core::LocalIpAndPort(admin_server_thread_->port());
246     admin_channel_ = grpc::CreateChannel(
247         admin_server_address,
248         std::make_shared<FakeTransportSecurityChannelCredentials>());
249     csds_stub_ =
250         envoy::service::status::v3::ClientStatusDiscoveryService::NewStub(
251             admin_channel_);
252     if (GetParam().use_csds_streaming()) {
253       stream_ = csds_stub_->StreamClientStatus(&stream_context_);
254     }
255   }
256 
~ClientStatusDiscoveryServiceTest()257   ~ClientStatusDiscoveryServiceTest() override {
258     if (stream_ != nullptr) {
259       EXPECT_TRUE(stream_->WritesDone());
260       Status status = stream_->Finish();
261       EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
262                                << " message=" << status.error_message();
263     }
264     admin_server_thread_->Shutdown();
265   }
266 
FetchCsdsResponse()267   envoy::service::status::v3::ClientStatusResponse FetchCsdsResponse() {
268     envoy::service::status::v3::ClientStatusResponse response;
269     if (!GetParam().use_csds_streaming()) {
270       // Fetch through unary pulls
271       ClientContext context;
272       Status status = csds_stub_->FetchClientStatus(
273           &context, envoy::service::status::v3::ClientStatusRequest(),
274           &response);
275       EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
276                                << " message=" << status.error_message();
277     } else {
278       // Fetch through streaming pulls
279       EXPECT_TRUE(
280           stream_->Write(envoy::service::status::v3::ClientStatusRequest()));
281       EXPECT_TRUE(stream_->Read(&response));
282     }
283     return response;
284   }
285 
286  private:
287   // Server thread for CSDS server.
288   class AdminServerThread : public ServerThread {
289    public:
AdminServerThread(XdsEnd2endTest * test_obj)290     explicit AdminServerThread(XdsEnd2endTest* test_obj)
291         : ServerThread(test_obj) {}
292 
293    private:
Type()294     const char* Type() override { return "Admin"; }
295 
RegisterAllServices(ServerBuilder * builder)296     void RegisterAllServices(ServerBuilder* builder) override {
297       builder->RegisterService(&csds_service_);
298     }
StartAllServices()299     void StartAllServices() override {}
ShutdownAllServices()300     void ShutdownAllServices() override {}
301 
302     grpc::xds::experimental::ClientStatusDiscoveryService csds_service_;
303   };
304 
305   std::unique_ptr<AdminServerThread> admin_server_thread_;
306   std::shared_ptr<Channel> admin_channel_;
307   std::unique_ptr<
308       envoy::service::status::v3::ClientStatusDiscoveryService::Stub>
309       csds_stub_;
310   ClientContext stream_context_;
311   std::unique_ptr<
312       ClientReaderWriter<envoy::service::status::v3::ClientStatusRequest,
313                          envoy::service::status::v3::ClientStatusResponse>>
314       stream_;
315 };
316 
317 // Run CSDS tests with RDS enabled and disabled.
318 // These need to run with the bootstrap from an env var instead of from
319 // a channel arg, since there needs to be a global XdsClient instance.
320 INSTANTIATE_TEST_SUITE_P(
321     XdsTest, ClientStatusDiscoveryServiceTest,
322     ::testing::Values(
323         XdsTestType().set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar),
324         XdsTestType()
325             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
326             .set_enable_rds_testing(),
327         XdsTestType()
328             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
329             .set_use_csds_streaming(),
330         XdsTestType()
331             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
332             .set_enable_rds_testing()
333             .set_use_csds_streaming()),
334     &XdsTestType::Name);
335 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpVanilla)336 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpVanilla) {
337   CreateAndStartBackends(1);
338   const size_t kNumRpcs = 5;
339   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
340   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
341   // Send several RPCs to ensure the xDS setup works
342   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
343   // Fetches the client config
344   auto csds_response = FetchCsdsResponse();
345   LOG(INFO) << "xDS config dump: " << csds_response.DebugString();
346   ASSERT_EQ(1, csds_response.config_size());
347   const auto& client_config = csds_response.config(0);
348   // Validate the Node information
349   EXPECT_THAT(client_config.node(),
350               EqNode("xds_end2end_test", ::testing::HasSubstr("C-core"),
351                      ::testing::HasSubstr(grpc_version_string()),
352                      ::testing::ElementsAre(
353                          "envoy.lb.does_not_support_overprovisioning")));
354   // Listener matcher depends on whether RDS is enabled.
355   ::testing::Matcher<google::protobuf::Any> api_listener_matcher;
356   if (GetParam().enable_rds_testing()) {
357     api_listener_matcher = IsRdsEnabledHCM();
358   } else {
359     api_listener_matcher =
360         EqNoRdsHCM(kDefaultRouteConfigurationName, kDefaultClusterName);
361   }
362   // Construct list of all matchers.
363   std::vector<::testing::Matcher<
364       envoy::service::status::v3::ClientConfig_GenericXdsConfig>>
365       matchers = {
366           // Listener
367           EqGenericXdsConfig(
368               kLdsTypeUrl, kServerName, "1",
369               UnpackListener(EqListener(kServerName, api_listener_matcher)),
370               ClientResourceStatus::ACKED, ::testing::_),
371           // Cluster
372           EqGenericXdsConfig(kCdsTypeUrl, kDefaultClusterName, "1",
373                              UnpackCluster(EqCluster(kDefaultClusterName)),
374                              ClientResourceStatus::ACKED, ::testing::_),
375           // ClusterLoadAssignment
376           EqGenericXdsConfig(
377               kEdsTypeUrl, kDefaultEdsServiceName, "1",
378               UnpackClusterLoadAssignment(EqClusterLoadAssignment(
379                   kDefaultEdsServiceName, backends_[0]->port(),
380                   kDefaultLocalityWeight)),
381               ClientResourceStatus::ACKED, ::testing::_),
382       };
383   // If RDS is enabled, add matcher for RDS resource.
384   if (GetParam().enable_rds_testing()) {
385     matchers.push_back(EqGenericXdsConfig(
386         kRdsTypeUrl, kDefaultRouteConfigurationName, "1",
387         UnpackRouteConfiguration(EqRouteConfiguration(
388             kDefaultRouteConfigurationName, kDefaultClusterName)),
389         ClientResourceStatus::ACKED, ::testing::_));
390   }
391   // Validate the dumped xDS configs
392   EXPECT_THAT(client_config.generic_xds_configs(),
393               ::testing::UnorderedElementsAreArray(matchers))
394       << "Actual: " << client_config.DebugString();
395 }
396 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpEmpty)397 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpEmpty) {
398   // The CSDS service should not fail if XdsClient is not initialized or there
399   // is no working xDS configs.
400   FetchCsdsResponse();
401 }
402 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpListenerError)403 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpListenerError) {
404   CreateAndStartBackends(1);
405   int kFetchConfigRetries = 3;
406   int kFetchIntervalMilliseconds = 200;
407   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
408   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
409   // Ensure the xDS resolver has working configs.
410   CheckRpcSendOk(DEBUG_LOCATION);
411   // Bad Listener should be rejected.
412   Listener listener;
413   listener.set_name(kServerName);
414   balancer_->ads_service()->SetLdsResource(listener);
415   // The old xDS configs should still be effective.
416   CheckRpcSendOk(DEBUG_LOCATION);
417   ::testing::Matcher<google::protobuf::Any> api_listener_matcher;
418   if (GetParam().enable_rds_testing()) {
419     api_listener_matcher = IsRdsEnabledHCM();
420   } else {
421     api_listener_matcher =
422         EqNoRdsHCM(kDefaultRouteConfigurationName, kDefaultClusterName);
423   }
424   for (int i = 0; i < kFetchConfigRetries; ++i) {
425     auto csds_response = FetchCsdsResponse();
426     // Check if error state is propagated
427     bool ok = ::testing::Value(
428         csds_response.config(0).generic_xds_configs(),
429         ::testing::Contains(EqGenericXdsConfig(
430             kLdsTypeUrl, kServerName, "1",
431             UnpackListener(EqListener(kServerName, api_listener_matcher)),
432             ClientResourceStatus::NACKED,
433             EqUpdateFailureState(
434                 ::testing::HasSubstr(
435                     "Listener has neither address nor ApiListener"),
436                 "2"))));
437     if (ok) return;  // TEST PASSED!
438     gpr_sleep_until(
439         grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
440   }
441   FAIL() << "error_state not seen in CSDS responses";
442 }
443 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpRouteError)444 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpRouteError) {
445   CreateAndStartBackends(1);
446   int kFetchConfigRetries = 3;
447   int kFetchIntervalMilliseconds = 200;
448   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
449   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
450   // Ensure the xDS resolver has working configs.
451   CheckRpcSendOk(DEBUG_LOCATION);
452   // Bad route config will be rejected.
453   RouteConfiguration route_config;
454   route_config.set_name(kDefaultRouteConfigurationName);
455   route_config.add_virtual_hosts();
456   SetRouteConfiguration(balancer_.get(), route_config);
457   // The old xDS configs should still be effective.
458   CheckRpcSendOk(DEBUG_LOCATION);
459   for (int i = 0; i < kFetchConfigRetries; ++i) {
460     auto csds_response = FetchCsdsResponse();
461     bool ok = false;
462     if (GetParam().enable_rds_testing()) {
463       ok = ::testing::Value(
464           csds_response.config(0).generic_xds_configs(),
465           ::testing::Contains(EqGenericXdsConfig(
466               kRdsTypeUrl, kDefaultRouteConfigurationName, "1",
467               UnpackRouteConfiguration(EqRouteConfiguration(
468                   kDefaultRouteConfigurationName, kDefaultClusterName)),
469               ClientResourceStatus::NACKED,
470               EqUpdateFailureState(
471                   ::testing::HasSubstr(
472                       "field:virtual_hosts[0].domains error:must be non-empty"),
473                   "2"))));
474     } else {
475       ok = ::testing::Value(
476           csds_response.config(0).generic_xds_configs(),
477           ::testing::Contains(EqGenericXdsConfig(
478               kLdsTypeUrl, kServerName, "1",
479               UnpackListener(EqListener(
480                   kServerName, EqNoRdsHCM(kDefaultRouteConfigurationName,
481                                           kDefaultClusterName))),
482               ClientResourceStatus::NACKED,
483               EqUpdateFailureState(
484                   ::testing::HasSubstr(
485                       "field:api_listener.api_listener.value[envoy.extensions"
486                       ".filters.network.http_connection_manager.v3"
487                       ".HttpConnectionManager].route_config.virtual_hosts[0]"
488                       ".domains error:must be non-empty"),
489                   "2"))));
490     }
491     if (ok) return;  // TEST PASSED!
492     gpr_sleep_until(
493         grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
494   }
495   FAIL() << "error_state not seen in CSDS responses";
496 }
497 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpClusterError)498 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpClusterError) {
499   CreateAndStartBackends(1);
500   int kFetchConfigRetries = 3;
501   int kFetchIntervalMilliseconds = 200;
502   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
503   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
504   // Ensure the xDS resolver has working configs.
505   CheckRpcSendOk(DEBUG_LOCATION);
506   // Listener without any route, will be rejected.
507   Cluster cluster;
508   cluster.set_name(kDefaultClusterName);
509   balancer_->ads_service()->SetCdsResource(cluster);
510   // The old xDS configs should still be effective.
511   CheckRpcSendOk(DEBUG_LOCATION);
512   for (int i = 0; i < kFetchConfigRetries; ++i) {
513     auto csds_response = FetchCsdsResponse();
514     // Check if error state is propagated
515     bool ok = ::testing::Value(
516         csds_response.config(0).generic_xds_configs(),
517         ::testing::Contains(EqGenericXdsConfig(
518             kCdsTypeUrl, kDefaultClusterName, "1",
519             UnpackCluster(EqCluster(kDefaultClusterName)),
520             ClientResourceStatus::NACKED,
521             EqUpdateFailureState(::testing::HasSubstr("unknown discovery type"),
522                                  "2"))));
523     if (ok) return;  // TEST PASSED!
524     gpr_sleep_until(
525         grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
526   }
527   FAIL() << "error_state not seen in CSDS responses";
528 }
529 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpEndpointError)530 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpEndpointError) {
531   CreateAndStartBackends(1);
532   int kFetchConfigRetries = 3;
533   int kFetchIntervalMilliseconds = 200;
534   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
535   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
536   // Ensure the xDS resolver has working configs.
537   CheckRpcSendOk(DEBUG_LOCATION);
538   // Bad endpoint config will be rejected.
539   ClusterLoadAssignment cluster_load_assignment;
540   cluster_load_assignment.set_cluster_name(kDefaultEdsServiceName);
541   auto* endpoints = cluster_load_assignment.add_endpoints();
542   endpoints->mutable_load_balancing_weight()->set_value(1);
543   auto* endpoint = endpoints->add_lb_endpoints()->mutable_endpoint();
544   endpoint->mutable_address()->mutable_socket_address()->set_port_value(1 << 1);
545   balancer_->ads_service()->SetEdsResource(cluster_load_assignment);
546   // The old xDS configs should still be effective.
547   CheckRpcSendOk(DEBUG_LOCATION);
548   for (int i = 0; i < kFetchConfigRetries; ++i) {
549     auto csds_response = FetchCsdsResponse();
550     // Check if error state is propagated
551     bool ok = ::testing::Value(
552         csds_response.config(0).generic_xds_configs(),
553         ::testing::Contains(EqGenericXdsConfig(
554             kEdsTypeUrl, kDefaultEdsServiceName, "1",
555             UnpackClusterLoadAssignment(EqClusterLoadAssignment(
556                 kDefaultEdsServiceName, backends_[0]->port(),
557                 kDefaultLocalityWeight)),
558             ClientResourceStatus::NACKED,
559             EqUpdateFailureState(
560                 ::testing::HasSubstr(
561                     "errors parsing EDS resource: ["
562                     "field:endpoints[0].locality error:field not present]"),
563                 "2"))));
564     if (ok) return;  // TEST PASSED!
565     gpr_sleep_until(
566         grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
567   }
568   FAIL() << "error_state not seen in CSDS responses";
569 }
570 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpListenerRequested)571 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpListenerRequested) {
572   int kTimeoutMillisecond = 1000;
573   balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName);
574   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::DEADLINE_EXCEEDED,
575                       "Deadline Exceeded",
576                       RpcOptions().set_timeout_ms(kTimeoutMillisecond));
577   auto csds_response = FetchCsdsResponse();
578   EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
579               ::testing::Contains(EqGenericXdsConfig(
580                   kLdsTypeUrl, kServerName, ::testing::_, ::testing::_,
581                   ClientResourceStatus::REQUESTED, ::testing::_)));
582 }
583 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpClusterRequested)584 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpClusterRequested) {
585   int kTimeoutMillisecond = 1000;
586   std::string kClusterName1 = "cluster-1";
587   std::string kClusterName2 = "cluster-2";
588   // Create a route config requesting two non-existing clusters
589   RouteConfiguration route_config;
590   route_config.set_name(kDefaultRouteConfigurationName);
591   auto* vh = route_config.add_virtual_hosts();
592   // The VirtualHost must match the domain name, otherwise will cause resolver
593   // transient failure.
594   vh->add_domains("*");
595   auto* routes1 = vh->add_routes();
596   routes1->mutable_match()->set_prefix("");
597   routes1->mutable_route()->set_cluster(kClusterName1);
598   auto* routes2 = vh->add_routes();
599   routes2->mutable_match()->set_prefix("");
600   routes2->mutable_route()->set_cluster(kClusterName2);
601   SetRouteConfiguration(balancer_.get(), route_config);
602   // Try to get the configs plumb through
603   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::DEADLINE_EXCEEDED,
604                       "Deadline Exceeded",
605                       RpcOptions().set_timeout_ms(kTimeoutMillisecond));
606   auto csds_response = FetchCsdsResponse();
607   EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
608               ::testing::AllOf(
609                   ::testing::Contains(EqGenericXdsConfig(
610                       kCdsTypeUrl, kClusterName1, ::testing::_, ::testing::_,
611                       ClientResourceStatus::REQUESTED, ::testing::_)),
612                   ::testing::Contains(EqGenericXdsConfig(
613                       kCdsTypeUrl, kClusterName2, ::testing::_, ::testing::_,
614                       ClientResourceStatus::REQUESTED, ::testing::_))));
615 }
616 
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpMultiClient)617 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpMultiClient) {
618   Listener listener = default_listener_;
619   const char* kServer2Name = "server2.example.com";
620   listener.set_name(kServer2Name);
621   balancer_->ads_service()->SetLdsResource(listener);
622   SetListenerAndRouteConfiguration(balancer_.get(), listener,
623                                    default_route_config_);
624   CreateAndStartBackends(1);
625   const size_t kNumRpcs = 5;
626   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
627   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
628   // Send several RPCs to ensure the xDS setup works
629   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
630   // Connect to a second server
631   auto channel2 = CreateChannel(0, kServer2Name);
632   channel2->GetState(/*try_to_connect=*/true);
633   ASSERT_TRUE(channel2->WaitForConnected(grpc_timeout_seconds_to_deadline(1)));
634   // Fetches the client config
635   auto csds_response = FetchCsdsResponse();
636   ASSERT_EQ(2, csds_response.config_size());
637   std::vector<std::string> scopes;
638   for (const auto& client_config : csds_response.config()) {
639     // Validate the Node information
640     EXPECT_THAT(client_config.node(),
641                 EqNode("xds_end2end_test", ::testing::HasSubstr("C-core"),
642                        ::testing::HasSubstr(grpc_version_string()),
643                        ::testing::ElementsAre(
644                            "envoy.lb.does_not_support_overprovisioning")));
645     scopes.emplace_back(client_config.client_scope());
646     absl::string_view server = client_config.client_scope();
647     // Listener matcher depends on whether RDS is enabled.
648     ::testing::Matcher<google::protobuf::Any> api_listener_matcher;
649     if (GetParam().enable_rds_testing()) {
650       api_listener_matcher = IsRdsEnabledHCM();
651     } else {
652       api_listener_matcher =
653           EqNoRdsHCM(kDefaultRouteConfigurationName, kDefaultClusterName);
654     }
655     // Construct list of all matchers.
656     std::vector<::testing::Matcher<
657         envoy::service::status::v3::ClientConfig_GenericXdsConfig>>
658         matchers = {
659             // Listener
660             EqGenericXdsConfig(
661                 kLdsTypeUrl, absl::StripPrefix(server, "xds:"), "3",
662                 UnpackListener(EqListener(absl::StripPrefix(server, "xds:"),
663                                           api_listener_matcher)),
664                 ClientResourceStatus::ACKED, ::testing::_),
665             // Cluster
666             EqGenericXdsConfig(kCdsTypeUrl, kDefaultClusterName, "1",
667                                UnpackCluster(EqCluster(kDefaultClusterName)),
668                                ClientResourceStatus::ACKED, ::testing::_),
669             // ClusterLoadAssignment
670             EqGenericXdsConfig(
671                 kEdsTypeUrl, kDefaultEdsServiceName, "1",
672                 UnpackClusterLoadAssignment(EqClusterLoadAssignment(
673                     kDefaultEdsServiceName, backends_[0]->port(),
674                     kDefaultLocalityWeight)),
675                 ClientResourceStatus::ACKED, ::testing::_),
676         };
677     // If RDS is enabled, add matcher for RDS resource.
678     if (GetParam().enable_rds_testing()) {
679       matchers.push_back(EqGenericXdsConfig(
680           kRdsTypeUrl, kDefaultRouteConfigurationName, "2",
681           UnpackRouteConfiguration(EqRouteConfiguration(
682               kDefaultRouteConfigurationName, kDefaultClusterName)),
683           ClientResourceStatus::ACKED, ::testing::_));
684     }
685     // Validate the dumped xDS configs
686     EXPECT_THAT(client_config.generic_xds_configs(),
687                 ::testing::UnorderedElementsAreArray(matchers));
688   }
689   EXPECT_THAT(scopes, ::testing::UnorderedElementsAre(
690                           "xds:server.example.com", "xds:server2.example.com"));
691 }
692 
693 class CsdsShortAdsTimeoutTest : public ClientStatusDiscoveryServiceTest {
694  protected:
SetUp()695   void SetUp() override {
696     // Shorten the ADS subscription timeout to speed up the test run.
697     InitClient(absl::nullopt, /*lb_expected_authority=*/"",
698                /*xds_resource_does_not_exist_timeout_ms=*/2000);
699   }
700 };
701 
702 // Run CSDS tests with RDS enabled and disabled.
703 // These need to run with the bootstrap from an env var instead of from
704 // a channel arg, since there needs to be a global XdsClient instance.
705 INSTANTIATE_TEST_SUITE_P(
706     XdsTest, CsdsShortAdsTimeoutTest,
707     ::testing::Values(
708         XdsTestType().set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar),
709         XdsTestType()
710             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
711             .set_enable_rds_testing(),
712         XdsTestType()
713             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
714             .set_use_csds_streaming(),
715         XdsTestType()
716             .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
717             .set_enable_rds_testing()
718             .set_use_csds_streaming()),
719     &XdsTestType::Name);
720 
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpListenerDoesNotExist)721 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpListenerDoesNotExist) {
722   int kTimeoutMillisecond = 1000000;  // 1000s wait for the transient failure.
723   balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName);
724   CheckRpcSendFailure(
725       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
726       absl::StrCat("empty address list \\(LDS resource ", kServerName,
727                    ": does not exist \\(node ID:xds_end2end_test\\)\\)"),
728       RpcOptions().set_timeout_ms(kTimeoutMillisecond));
729   auto csds_response = FetchCsdsResponse();
730   EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
731               ::testing::Contains(EqGenericXdsConfig(
732                   kLdsTypeUrl, kServerName, ::testing::_, ::testing::_,
733                   ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
734 }
735 
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpRouteConfigDoesNotExist)736 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpRouteConfigDoesNotExist) {
737   if (!GetParam().enable_rds_testing()) return;
738   int kTimeoutMillisecond = 1000000;  // 1000s wait for the transient failure.
739   balancer_->ads_service()->UnsetResource(kRdsTypeUrl,
740                                           kDefaultRouteConfigurationName);
741   CheckRpcSendFailure(
742       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
743       absl::StrCat("empty address list \\(RDS resource ",
744                    kDefaultRouteConfigurationName,
745                    ": does not exist \\(node ID:xds_end2end_test\\)\\)"),
746       RpcOptions().set_timeout_ms(kTimeoutMillisecond));
747   auto csds_response = FetchCsdsResponse();
748   EXPECT_THAT(
749       csds_response.config(0).generic_xds_configs(),
750       ::testing::Contains(EqGenericXdsConfig(
751           kRdsTypeUrl, kDefaultRouteConfigurationName, ::testing::_,
752           ::testing::_, ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
753 }
754 
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpClusterDoesNotExist)755 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpClusterDoesNotExist) {
756   int kTimeoutMillisecond = 1000000;  // 1000s wait for the transient failure.
757   balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
758   CheckRpcSendFailure(
759       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
760       absl::StrCat("CDS resource ", kDefaultClusterName,
761                    ": does not exist \\(node ID:xds_end2end_test\\)"),
762       RpcOptions().set_timeout_ms(kTimeoutMillisecond));
763   auto csds_response = FetchCsdsResponse();
764   EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
765               ::testing::Contains(EqGenericXdsConfig(
766                   kCdsTypeUrl, kDefaultClusterName, ::testing::_, ::testing::_,
767                   ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
768 }
769 
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpEndpointDoesNotExist)770 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpEndpointDoesNotExist) {
771   int kTimeoutMillisecond = 1000000;  // 1000s wait for the transient failure.
772   balancer_->ads_service()->UnsetResource(kEdsTypeUrl, kDefaultEdsServiceName);
773   CheckRpcSendFailure(
774       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
775       "no children in weighted_target policy: EDS resource eds_service_name: "
776       "does not exist \\(node ID:xds_end2end_test\\)",
777       RpcOptions().set_timeout_ms(kTimeoutMillisecond));
778   auto csds_response = FetchCsdsResponse();
779   EXPECT_THAT(
780       csds_response.config(0).generic_xds_configs(),
781       ::testing::Contains(EqGenericXdsConfig(
782           kEdsTypeUrl, kDefaultEdsServiceName, ::testing::_, ::testing::_,
783           ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
784 }
785 
786 }  // namespace
787 }  // namespace testing
788 }  // namespace grpc
789 
790 #endif  // DISABLED_XDS_PROTO_IN_CC
791 
main(int argc,char ** argv)792 int main(int argc, char** argv) {
793   grpc::testing::TestEnvironment env(&argc, argv);
794   ::testing::InitGoogleTest(&argc, argv);
795   // Make the backup poller poll very frequently in order to pick up
796   // updates from all the subchannels's FDs.
797   grpc_core::ConfigVars::Overrides overrides;
798   overrides.client_channel_backup_poll_interval_ms = 1;
799   grpc_core::ConfigVars::SetOverrides(overrides);
800 #if TARGET_OS_IPHONE
801   // Workaround Apple CFStream bug
802   grpc_core::SetEnv("grpc_cfstream", "0");
803 #endif
804   grpc_init();
805   const auto result = RUN_ALL_TESTS();
806   grpc_shutdown();
807   return result;
808 }
809