1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15
16 #include <gmock/gmock.h>
17 #include <grpcpp/create_channel.h>
18 #include <grpcpp/security/credentials.h>
19 #include <gtest/gtest.h>
20
21 #include <memory>
22 #include <string>
23 #include <vector>
24
25 #include "absl/log/log.h"
26 #include "absl/memory/memory.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/strings/strip.h"
29 #include "envoy/config/cluster/v3/cluster.pb.h"
30 #include "envoy/config/endpoint/v3/endpoint.pb.h"
31 #include "envoy/config/listener/v3/listener.pb.h"
32 #include "envoy/config/route/v3/route.pb.h"
33 #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
34 #include "src/core/client_channel/backup_poller.h"
35 #include "src/core/config/config_vars.h"
36 #include "test/core/test_util/resolve_localhost_ip46.h"
37 #include "test/core/test_util/test_config.h"
38 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
39 #include "test/cpp/util/credentials.h"
40
41 #ifndef DISABLED_XDS_PROTO_IN_CC
42
43 #include "src/cpp/server/csds/csds.h"
44 #include "src/proto/grpc/testing/xds/v3/csds.grpc.pb.h"
45
46 namespace grpc {
47 namespace testing {
48 namespace {
49
50 using ::envoy::admin::v3::ClientResourceStatus;
51 using ::envoy::config::cluster::v3::Cluster;
52 using ::envoy::config::endpoint::v3::ClusterLoadAssignment;
53 using ::envoy::config::listener::v3::Listener;
54 using ::envoy::config::route::v3::RouteConfiguration;
55 using ::envoy::extensions::filters::network::http_connection_manager::v3::
56 HttpConnectionManager;
57
58 MATCHER_P4(EqNode, id, user_agent_name, user_agent_version, client_features,
59 "equals Node") {
60 bool ok = true;
61 ok &= ::testing::ExplainMatchResult(id, arg.id(), result_listener);
62 ok &= ::testing::ExplainMatchResult(user_agent_name, arg.user_agent_name(),
63 result_listener);
64 ok &= ::testing::ExplainMatchResult(
65 user_agent_version, arg.user_agent_version(), result_listener);
66 ok &= ::testing::ExplainMatchResult(client_features, arg.client_features(),
67 result_listener);
68 return ok;
69 }
70
71 MATCHER_P6(EqGenericXdsConfig, type_url, name, version_info, xds_config,
72 client_status, error_state, "equals GenericXdsConfig") {
73 bool ok = true;
74 ok &=
75 ::testing::ExplainMatchResult(type_url, arg.type_url(), result_listener);
76 ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
77 ok &= ::testing::ExplainMatchResult(version_info, arg.version_info(),
78 result_listener);
79 ok &= ::testing::ExplainMatchResult(xds_config, arg.xds_config(),
80 result_listener);
81 ok &= ::testing::ExplainMatchResult(client_status, arg.client_status(),
82 result_listener);
83 ok &= ::testing::ExplainMatchResult(error_state, arg.error_state(),
84 result_listener);
85 return ok;
86 }
87
88 MATCHER_P2(EqListener, name, api_listener, "equals Listener") {
89 bool ok = true;
90 ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
91 ok &= ::testing::ExplainMatchResult(
92 api_listener, arg.api_listener().api_listener(), result_listener);
93 return ok;
94 }
95
96 MATCHER_P(EqHttpConnectionManagerNotRds, route_config,
97 "equals HttpConnectionManager") {
98 bool ok = true;
99 ok &= ::testing::ExplainMatchResult(route_config, arg.route_config(),
100 result_listener);
101 return ok;
102 }
103
104 MATCHER_P(EqRouteConfigurationName, name, "equals RouteConfiguration") {
105 bool ok = true;
106 ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
107 return ok;
108 }
109
110 MATCHER_P2(EqRouteConfiguration, name, cluster_name,
111 "equals RouteConfiguration") {
112 bool ok = true;
113 ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
114 ok &= ::testing::ExplainMatchResult(
115 ::testing::ElementsAre(::testing::Property(
116 &envoy::config::route::v3::VirtualHost::routes,
117 ::testing::ElementsAre(::testing::Property(
118 &envoy::config::route::v3::Route::route,
119 ::testing::Property(
120 &envoy::config::route::v3::RouteAction::cluster,
121 cluster_name))))),
122 arg.virtual_hosts(), result_listener);
123 return ok;
124 }
125
126 MATCHER_P(EqCluster, name, "equals Cluster") {
127 bool ok = true;
128 ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
129 return ok;
130 }
131
132 MATCHER_P(EqEndpoint, port, "equals Endpoint") {
133 bool ok = true;
134 ok &= ::testing::ExplainMatchResult(
135 port, arg.address().socket_address().port_value(), result_listener);
136 return ok;
137 }
138
139 MATCHER_P2(EqLocalityLbEndpoints, port, weight, "equals LocalityLbEndpoints") {
140 bool ok = true;
141 ok &= ::testing::ExplainMatchResult(
142 ::testing::ElementsAre(::testing::Property(
143 &envoy::config::endpoint::v3::LbEndpoint::endpoint,
144 EqEndpoint(port))),
145 arg.lb_endpoints(), result_listener);
146 ok &= ::testing::ExplainMatchResult(
147 weight, arg.load_balancing_weight().value(), result_listener);
148 return ok;
149 }
150
151 MATCHER_P(EqClusterLoadAssignmentName, cluster_name,
152 "equals ClusterLoadAssignment") {
153 bool ok = true;
154 ok &= ::testing::ExplainMatchResult(cluster_name, arg.cluster_name(),
155 result_listener);
156 return ok;
157 }
158
159 MATCHER_P3(EqClusterLoadAssignment, cluster_name, port, weight,
160 "equals ClusterLoadAssignment") {
161 bool ok = true;
162 ok &= ::testing::ExplainMatchResult(cluster_name, arg.cluster_name(),
163 result_listener);
164 ok &= ::testing::ExplainMatchResult(
165 ::testing::ElementsAre(EqLocalityLbEndpoints(port, weight)),
166 arg.endpoints(), result_listener);
167 return ok;
168 }
169
170 MATCHER_P2(EqUpdateFailureState, details, version_info,
171 "equals UpdateFailureState") {
172 bool ok = true;
173 ok &= ::testing::ExplainMatchResult(details, arg.details(), result_listener);
174 ok &= ::testing::ExplainMatchResult(version_info, arg.version_info(),
175 result_listener);
176 return ok;
177 }
178
179 MATCHER_P(UnpackListener, matcher, "is a Listener") {
180 Listener config;
181 if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
182 result_listener)) {
183 return false;
184 }
185 return ::testing::ExplainMatchResult(matcher, config, result_listener);
186 }
187
188 MATCHER_P(UnpackRouteConfiguration, matcher, "is a RouteConfiguration") {
189 RouteConfiguration config;
190 if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
191 result_listener)) {
192 return false;
193 }
194 return ::testing::ExplainMatchResult(matcher, config, result_listener);
195 }
196
197 MATCHER_P(UnpackHttpConnectionManager, matcher, "is a HttpConnectionManager") {
198 HttpConnectionManager config;
199 if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
200 result_listener)) {
201 return false;
202 }
203 return ::testing::ExplainMatchResult(matcher, config, result_listener);
204 }
205
206 MATCHER_P(UnpackCluster, matcher, "is a Cluster") {
207 Cluster config;
208 if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
209 result_listener)) {
210 return false;
211 }
212 return ::testing::ExplainMatchResult(matcher, config, result_listener);
213 }
214
215 MATCHER_P(UnpackClusterLoadAssignment, matcher, "is a ClusterLoadAssignment") {
216 ClusterLoadAssignment config;
217 if (!::testing::ExplainMatchResult(true, arg.UnpackTo(&config),
218 result_listener)) {
219 return false;
220 }
221 return ::testing::ExplainMatchResult(matcher, config, result_listener);
222 }
223
224 MATCHER(IsRdsEnabledHCM, "is a RDS enabled HttpConnectionManager") {
225 return ::testing::ExplainMatchResult(
226 UnpackHttpConnectionManager(
227 ::testing::Property(&HttpConnectionManager::has_rds, true)),
228 arg, result_listener);
229 }
230
231 MATCHER_P2(EqNoRdsHCM, route_configuration_name, cluster_name,
232 "equals RDS disabled HttpConnectionManager") {
233 return ::testing::ExplainMatchResult(
234 UnpackHttpConnectionManager(EqHttpConnectionManagerNotRds(
235 EqRouteConfiguration(route_configuration_name, cluster_name))),
236 arg, result_listener);
237 }
238
239 class ClientStatusDiscoveryServiceTest : public XdsEnd2endTest {
240 public:
ClientStatusDiscoveryServiceTest()241 ClientStatusDiscoveryServiceTest() {
242 admin_server_thread_ = std::make_unique<AdminServerThread>(this);
243 admin_server_thread_->Start();
244 std::string admin_server_address =
245 grpc_core::LocalIpAndPort(admin_server_thread_->port());
246 admin_channel_ = grpc::CreateChannel(
247 admin_server_address,
248 std::make_shared<FakeTransportSecurityChannelCredentials>());
249 csds_stub_ =
250 envoy::service::status::v3::ClientStatusDiscoveryService::NewStub(
251 admin_channel_);
252 if (GetParam().use_csds_streaming()) {
253 stream_ = csds_stub_->StreamClientStatus(&stream_context_);
254 }
255 }
256
~ClientStatusDiscoveryServiceTest()257 ~ClientStatusDiscoveryServiceTest() override {
258 if (stream_ != nullptr) {
259 EXPECT_TRUE(stream_->WritesDone());
260 Status status = stream_->Finish();
261 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
262 << " message=" << status.error_message();
263 }
264 admin_server_thread_->Shutdown();
265 }
266
FetchCsdsResponse()267 envoy::service::status::v3::ClientStatusResponse FetchCsdsResponse() {
268 envoy::service::status::v3::ClientStatusResponse response;
269 if (!GetParam().use_csds_streaming()) {
270 // Fetch through unary pulls
271 ClientContext context;
272 Status status = csds_stub_->FetchClientStatus(
273 &context, envoy::service::status::v3::ClientStatusRequest(),
274 &response);
275 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
276 << " message=" << status.error_message();
277 } else {
278 // Fetch through streaming pulls
279 EXPECT_TRUE(
280 stream_->Write(envoy::service::status::v3::ClientStatusRequest()));
281 EXPECT_TRUE(stream_->Read(&response));
282 }
283 return response;
284 }
285
286 private:
287 // Server thread for CSDS server.
288 class AdminServerThread : public ServerThread {
289 public:
AdminServerThread(XdsEnd2endTest * test_obj)290 explicit AdminServerThread(XdsEnd2endTest* test_obj)
291 : ServerThread(test_obj) {}
292
293 private:
Type()294 const char* Type() override { return "Admin"; }
295
RegisterAllServices(ServerBuilder * builder)296 void RegisterAllServices(ServerBuilder* builder) override {
297 builder->RegisterService(&csds_service_);
298 }
StartAllServices()299 void StartAllServices() override {}
ShutdownAllServices()300 void ShutdownAllServices() override {}
301
302 grpc::xds::experimental::ClientStatusDiscoveryService csds_service_;
303 };
304
305 std::unique_ptr<AdminServerThread> admin_server_thread_;
306 std::shared_ptr<Channel> admin_channel_;
307 std::unique_ptr<
308 envoy::service::status::v3::ClientStatusDiscoveryService::Stub>
309 csds_stub_;
310 ClientContext stream_context_;
311 std::unique_ptr<
312 ClientReaderWriter<envoy::service::status::v3::ClientStatusRequest,
313 envoy::service::status::v3::ClientStatusResponse>>
314 stream_;
315 };
316
317 // Run CSDS tests with RDS enabled and disabled.
318 // These need to run with the bootstrap from an env var instead of from
319 // a channel arg, since there needs to be a global XdsClient instance.
320 INSTANTIATE_TEST_SUITE_P(
321 XdsTest, ClientStatusDiscoveryServiceTest,
322 ::testing::Values(
323 XdsTestType().set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar),
324 XdsTestType()
325 .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
326 .set_enable_rds_testing(),
327 XdsTestType()
328 .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
329 .set_use_csds_streaming(),
330 XdsTestType()
331 .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
332 .set_enable_rds_testing()
333 .set_use_csds_streaming()),
334 &XdsTestType::Name);
335
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpVanilla)336 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpVanilla) {
337 CreateAndStartBackends(1);
338 const size_t kNumRpcs = 5;
339 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
340 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
341 // Send several RPCs to ensure the xDS setup works
342 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
343 // Fetches the client config
344 auto csds_response = FetchCsdsResponse();
345 LOG(INFO) << "xDS config dump: " << csds_response.DebugString();
346 ASSERT_EQ(1, csds_response.config_size());
347 const auto& client_config = csds_response.config(0);
348 // Validate the Node information
349 EXPECT_THAT(client_config.node(),
350 EqNode("xds_end2end_test", ::testing::HasSubstr("C-core"),
351 ::testing::HasSubstr(grpc_version_string()),
352 ::testing::ElementsAre(
353 "envoy.lb.does_not_support_overprovisioning")));
354 // Listener matcher depends on whether RDS is enabled.
355 ::testing::Matcher<google::protobuf::Any> api_listener_matcher;
356 if (GetParam().enable_rds_testing()) {
357 api_listener_matcher = IsRdsEnabledHCM();
358 } else {
359 api_listener_matcher =
360 EqNoRdsHCM(kDefaultRouteConfigurationName, kDefaultClusterName);
361 }
362 // Construct list of all matchers.
363 std::vector<::testing::Matcher<
364 envoy::service::status::v3::ClientConfig_GenericXdsConfig>>
365 matchers = {
366 // Listener
367 EqGenericXdsConfig(
368 kLdsTypeUrl, kServerName, "1",
369 UnpackListener(EqListener(kServerName, api_listener_matcher)),
370 ClientResourceStatus::ACKED, ::testing::_),
371 // Cluster
372 EqGenericXdsConfig(kCdsTypeUrl, kDefaultClusterName, "1",
373 UnpackCluster(EqCluster(kDefaultClusterName)),
374 ClientResourceStatus::ACKED, ::testing::_),
375 // ClusterLoadAssignment
376 EqGenericXdsConfig(
377 kEdsTypeUrl, kDefaultEdsServiceName, "1",
378 UnpackClusterLoadAssignment(EqClusterLoadAssignment(
379 kDefaultEdsServiceName, backends_[0]->port(),
380 kDefaultLocalityWeight)),
381 ClientResourceStatus::ACKED, ::testing::_),
382 };
383 // If RDS is enabled, add matcher for RDS resource.
384 if (GetParam().enable_rds_testing()) {
385 matchers.push_back(EqGenericXdsConfig(
386 kRdsTypeUrl, kDefaultRouteConfigurationName, "1",
387 UnpackRouteConfiguration(EqRouteConfiguration(
388 kDefaultRouteConfigurationName, kDefaultClusterName)),
389 ClientResourceStatus::ACKED, ::testing::_));
390 }
391 // Validate the dumped xDS configs
392 EXPECT_THAT(client_config.generic_xds_configs(),
393 ::testing::UnorderedElementsAreArray(matchers))
394 << "Actual: " << client_config.DebugString();
395 }
396
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpEmpty)397 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpEmpty) {
398 // The CSDS service should not fail if XdsClient is not initialized or there
399 // is no working xDS configs.
400 FetchCsdsResponse();
401 }
402
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpListenerError)403 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpListenerError) {
404 CreateAndStartBackends(1);
405 int kFetchConfigRetries = 3;
406 int kFetchIntervalMilliseconds = 200;
407 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
408 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
409 // Ensure the xDS resolver has working configs.
410 CheckRpcSendOk(DEBUG_LOCATION);
411 // Bad Listener should be rejected.
412 Listener listener;
413 listener.set_name(kServerName);
414 balancer_->ads_service()->SetLdsResource(listener);
415 // The old xDS configs should still be effective.
416 CheckRpcSendOk(DEBUG_LOCATION);
417 ::testing::Matcher<google::protobuf::Any> api_listener_matcher;
418 if (GetParam().enable_rds_testing()) {
419 api_listener_matcher = IsRdsEnabledHCM();
420 } else {
421 api_listener_matcher =
422 EqNoRdsHCM(kDefaultRouteConfigurationName, kDefaultClusterName);
423 }
424 for (int i = 0; i < kFetchConfigRetries; ++i) {
425 auto csds_response = FetchCsdsResponse();
426 // Check if error state is propagated
427 bool ok = ::testing::Value(
428 csds_response.config(0).generic_xds_configs(),
429 ::testing::Contains(EqGenericXdsConfig(
430 kLdsTypeUrl, kServerName, "1",
431 UnpackListener(EqListener(kServerName, api_listener_matcher)),
432 ClientResourceStatus::NACKED,
433 EqUpdateFailureState(
434 ::testing::HasSubstr(
435 "Listener has neither address nor ApiListener"),
436 "2"))));
437 if (ok) return; // TEST PASSED!
438 gpr_sleep_until(
439 grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
440 }
441 FAIL() << "error_state not seen in CSDS responses";
442 }
443
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpRouteError)444 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpRouteError) {
445 CreateAndStartBackends(1);
446 int kFetchConfigRetries = 3;
447 int kFetchIntervalMilliseconds = 200;
448 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
449 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
450 // Ensure the xDS resolver has working configs.
451 CheckRpcSendOk(DEBUG_LOCATION);
452 // Bad route config will be rejected.
453 RouteConfiguration route_config;
454 route_config.set_name(kDefaultRouteConfigurationName);
455 route_config.add_virtual_hosts();
456 SetRouteConfiguration(balancer_.get(), route_config);
457 // The old xDS configs should still be effective.
458 CheckRpcSendOk(DEBUG_LOCATION);
459 for (int i = 0; i < kFetchConfigRetries; ++i) {
460 auto csds_response = FetchCsdsResponse();
461 bool ok = false;
462 if (GetParam().enable_rds_testing()) {
463 ok = ::testing::Value(
464 csds_response.config(0).generic_xds_configs(),
465 ::testing::Contains(EqGenericXdsConfig(
466 kRdsTypeUrl, kDefaultRouteConfigurationName, "1",
467 UnpackRouteConfiguration(EqRouteConfiguration(
468 kDefaultRouteConfigurationName, kDefaultClusterName)),
469 ClientResourceStatus::NACKED,
470 EqUpdateFailureState(
471 ::testing::HasSubstr(
472 "field:virtual_hosts[0].domains error:must be non-empty"),
473 "2"))));
474 } else {
475 ok = ::testing::Value(
476 csds_response.config(0).generic_xds_configs(),
477 ::testing::Contains(EqGenericXdsConfig(
478 kLdsTypeUrl, kServerName, "1",
479 UnpackListener(EqListener(
480 kServerName, EqNoRdsHCM(kDefaultRouteConfigurationName,
481 kDefaultClusterName))),
482 ClientResourceStatus::NACKED,
483 EqUpdateFailureState(
484 ::testing::HasSubstr(
485 "field:api_listener.api_listener.value[envoy.extensions"
486 ".filters.network.http_connection_manager.v3"
487 ".HttpConnectionManager].route_config.virtual_hosts[0]"
488 ".domains error:must be non-empty"),
489 "2"))));
490 }
491 if (ok) return; // TEST PASSED!
492 gpr_sleep_until(
493 grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
494 }
495 FAIL() << "error_state not seen in CSDS responses";
496 }
497
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpClusterError)498 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpClusterError) {
499 CreateAndStartBackends(1);
500 int kFetchConfigRetries = 3;
501 int kFetchIntervalMilliseconds = 200;
502 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
503 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
504 // Ensure the xDS resolver has working configs.
505 CheckRpcSendOk(DEBUG_LOCATION);
506 // Listener without any route, will be rejected.
507 Cluster cluster;
508 cluster.set_name(kDefaultClusterName);
509 balancer_->ads_service()->SetCdsResource(cluster);
510 // The old xDS configs should still be effective.
511 CheckRpcSendOk(DEBUG_LOCATION);
512 for (int i = 0; i < kFetchConfigRetries; ++i) {
513 auto csds_response = FetchCsdsResponse();
514 // Check if error state is propagated
515 bool ok = ::testing::Value(
516 csds_response.config(0).generic_xds_configs(),
517 ::testing::Contains(EqGenericXdsConfig(
518 kCdsTypeUrl, kDefaultClusterName, "1",
519 UnpackCluster(EqCluster(kDefaultClusterName)),
520 ClientResourceStatus::NACKED,
521 EqUpdateFailureState(::testing::HasSubstr("unknown discovery type"),
522 "2"))));
523 if (ok) return; // TEST PASSED!
524 gpr_sleep_until(
525 grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
526 }
527 FAIL() << "error_state not seen in CSDS responses";
528 }
529
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpEndpointError)530 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpEndpointError) {
531 CreateAndStartBackends(1);
532 int kFetchConfigRetries = 3;
533 int kFetchIntervalMilliseconds = 200;
534 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
535 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
536 // Ensure the xDS resolver has working configs.
537 CheckRpcSendOk(DEBUG_LOCATION);
538 // Bad endpoint config will be rejected.
539 ClusterLoadAssignment cluster_load_assignment;
540 cluster_load_assignment.set_cluster_name(kDefaultEdsServiceName);
541 auto* endpoints = cluster_load_assignment.add_endpoints();
542 endpoints->mutable_load_balancing_weight()->set_value(1);
543 auto* endpoint = endpoints->add_lb_endpoints()->mutable_endpoint();
544 endpoint->mutable_address()->mutable_socket_address()->set_port_value(1 << 1);
545 balancer_->ads_service()->SetEdsResource(cluster_load_assignment);
546 // The old xDS configs should still be effective.
547 CheckRpcSendOk(DEBUG_LOCATION);
548 for (int i = 0; i < kFetchConfigRetries; ++i) {
549 auto csds_response = FetchCsdsResponse();
550 // Check if error state is propagated
551 bool ok = ::testing::Value(
552 csds_response.config(0).generic_xds_configs(),
553 ::testing::Contains(EqGenericXdsConfig(
554 kEdsTypeUrl, kDefaultEdsServiceName, "1",
555 UnpackClusterLoadAssignment(EqClusterLoadAssignment(
556 kDefaultEdsServiceName, backends_[0]->port(),
557 kDefaultLocalityWeight)),
558 ClientResourceStatus::NACKED,
559 EqUpdateFailureState(
560 ::testing::HasSubstr(
561 "errors parsing EDS resource: ["
562 "field:endpoints[0].locality error:field not present]"),
563 "2"))));
564 if (ok) return; // TEST PASSED!
565 gpr_sleep_until(
566 grpc_timeout_milliseconds_to_deadline(kFetchIntervalMilliseconds));
567 }
568 FAIL() << "error_state not seen in CSDS responses";
569 }
570
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpListenerRequested)571 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpListenerRequested) {
572 int kTimeoutMillisecond = 1000;
573 balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName);
574 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::DEADLINE_EXCEEDED,
575 "Deadline Exceeded",
576 RpcOptions().set_timeout_ms(kTimeoutMillisecond));
577 auto csds_response = FetchCsdsResponse();
578 EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
579 ::testing::Contains(EqGenericXdsConfig(
580 kLdsTypeUrl, kServerName, ::testing::_, ::testing::_,
581 ClientResourceStatus::REQUESTED, ::testing::_)));
582 }
583
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpClusterRequested)584 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpClusterRequested) {
585 int kTimeoutMillisecond = 1000;
586 std::string kClusterName1 = "cluster-1";
587 std::string kClusterName2 = "cluster-2";
588 // Create a route config requesting two non-existing clusters
589 RouteConfiguration route_config;
590 route_config.set_name(kDefaultRouteConfigurationName);
591 auto* vh = route_config.add_virtual_hosts();
592 // The VirtualHost must match the domain name, otherwise will cause resolver
593 // transient failure.
594 vh->add_domains("*");
595 auto* routes1 = vh->add_routes();
596 routes1->mutable_match()->set_prefix("");
597 routes1->mutable_route()->set_cluster(kClusterName1);
598 auto* routes2 = vh->add_routes();
599 routes2->mutable_match()->set_prefix("");
600 routes2->mutable_route()->set_cluster(kClusterName2);
601 SetRouteConfiguration(balancer_.get(), route_config);
602 // Try to get the configs plumb through
603 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::DEADLINE_EXCEEDED,
604 "Deadline Exceeded",
605 RpcOptions().set_timeout_ms(kTimeoutMillisecond));
606 auto csds_response = FetchCsdsResponse();
607 EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
608 ::testing::AllOf(
609 ::testing::Contains(EqGenericXdsConfig(
610 kCdsTypeUrl, kClusterName1, ::testing::_, ::testing::_,
611 ClientResourceStatus::REQUESTED, ::testing::_)),
612 ::testing::Contains(EqGenericXdsConfig(
613 kCdsTypeUrl, kClusterName2, ::testing::_, ::testing::_,
614 ClientResourceStatus::REQUESTED, ::testing::_))));
615 }
616
TEST_P(ClientStatusDiscoveryServiceTest,XdsConfigDumpMultiClient)617 TEST_P(ClientStatusDiscoveryServiceTest, XdsConfigDumpMultiClient) {
618 Listener listener = default_listener_;
619 const char* kServer2Name = "server2.example.com";
620 listener.set_name(kServer2Name);
621 balancer_->ads_service()->SetLdsResource(listener);
622 SetListenerAndRouteConfiguration(balancer_.get(), listener,
623 default_route_config_);
624 CreateAndStartBackends(1);
625 const size_t kNumRpcs = 5;
626 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
627 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
628 // Send several RPCs to ensure the xDS setup works
629 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
630 // Connect to a second server
631 auto channel2 = CreateChannel(0, kServer2Name);
632 channel2->GetState(/*try_to_connect=*/true);
633 ASSERT_TRUE(channel2->WaitForConnected(grpc_timeout_seconds_to_deadline(1)));
634 // Fetches the client config
635 auto csds_response = FetchCsdsResponse();
636 ASSERT_EQ(2, csds_response.config_size());
637 std::vector<std::string> scopes;
638 for (const auto& client_config : csds_response.config()) {
639 // Validate the Node information
640 EXPECT_THAT(client_config.node(),
641 EqNode("xds_end2end_test", ::testing::HasSubstr("C-core"),
642 ::testing::HasSubstr(grpc_version_string()),
643 ::testing::ElementsAre(
644 "envoy.lb.does_not_support_overprovisioning")));
645 scopes.emplace_back(client_config.client_scope());
646 absl::string_view server = client_config.client_scope();
647 // Listener matcher depends on whether RDS is enabled.
648 ::testing::Matcher<google::protobuf::Any> api_listener_matcher;
649 if (GetParam().enable_rds_testing()) {
650 api_listener_matcher = IsRdsEnabledHCM();
651 } else {
652 api_listener_matcher =
653 EqNoRdsHCM(kDefaultRouteConfigurationName, kDefaultClusterName);
654 }
655 // Construct list of all matchers.
656 std::vector<::testing::Matcher<
657 envoy::service::status::v3::ClientConfig_GenericXdsConfig>>
658 matchers = {
659 // Listener
660 EqGenericXdsConfig(
661 kLdsTypeUrl, absl::StripPrefix(server, "xds:"), "3",
662 UnpackListener(EqListener(absl::StripPrefix(server, "xds:"),
663 api_listener_matcher)),
664 ClientResourceStatus::ACKED, ::testing::_),
665 // Cluster
666 EqGenericXdsConfig(kCdsTypeUrl, kDefaultClusterName, "1",
667 UnpackCluster(EqCluster(kDefaultClusterName)),
668 ClientResourceStatus::ACKED, ::testing::_),
669 // ClusterLoadAssignment
670 EqGenericXdsConfig(
671 kEdsTypeUrl, kDefaultEdsServiceName, "1",
672 UnpackClusterLoadAssignment(EqClusterLoadAssignment(
673 kDefaultEdsServiceName, backends_[0]->port(),
674 kDefaultLocalityWeight)),
675 ClientResourceStatus::ACKED, ::testing::_),
676 };
677 // If RDS is enabled, add matcher for RDS resource.
678 if (GetParam().enable_rds_testing()) {
679 matchers.push_back(EqGenericXdsConfig(
680 kRdsTypeUrl, kDefaultRouteConfigurationName, "2",
681 UnpackRouteConfiguration(EqRouteConfiguration(
682 kDefaultRouteConfigurationName, kDefaultClusterName)),
683 ClientResourceStatus::ACKED, ::testing::_));
684 }
685 // Validate the dumped xDS configs
686 EXPECT_THAT(client_config.generic_xds_configs(),
687 ::testing::UnorderedElementsAreArray(matchers));
688 }
689 EXPECT_THAT(scopes, ::testing::UnorderedElementsAre(
690 "xds:server.example.com", "xds:server2.example.com"));
691 }
692
693 class CsdsShortAdsTimeoutTest : public ClientStatusDiscoveryServiceTest {
694 protected:
SetUp()695 void SetUp() override {
696 // Shorten the ADS subscription timeout to speed up the test run.
697 InitClient(absl::nullopt, /*lb_expected_authority=*/"",
698 /*xds_resource_does_not_exist_timeout_ms=*/2000);
699 }
700 };
701
702 // Run CSDS tests with RDS enabled and disabled.
703 // These need to run with the bootstrap from an env var instead of from
704 // a channel arg, since there needs to be a global XdsClient instance.
705 INSTANTIATE_TEST_SUITE_P(
706 XdsTest, CsdsShortAdsTimeoutTest,
707 ::testing::Values(
708 XdsTestType().set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar),
709 XdsTestType()
710 .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
711 .set_enable_rds_testing(),
712 XdsTestType()
713 .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
714 .set_use_csds_streaming(),
715 XdsTestType()
716 .set_bootstrap_source(XdsTestType::kBootstrapFromEnvVar)
717 .set_enable_rds_testing()
718 .set_use_csds_streaming()),
719 &XdsTestType::Name);
720
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpListenerDoesNotExist)721 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpListenerDoesNotExist) {
722 int kTimeoutMillisecond = 1000000; // 1000s wait for the transient failure.
723 balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName);
724 CheckRpcSendFailure(
725 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
726 absl::StrCat("empty address list \\(LDS resource ", kServerName,
727 ": does not exist \\(node ID:xds_end2end_test\\)\\)"),
728 RpcOptions().set_timeout_ms(kTimeoutMillisecond));
729 auto csds_response = FetchCsdsResponse();
730 EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
731 ::testing::Contains(EqGenericXdsConfig(
732 kLdsTypeUrl, kServerName, ::testing::_, ::testing::_,
733 ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
734 }
735
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpRouteConfigDoesNotExist)736 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpRouteConfigDoesNotExist) {
737 if (!GetParam().enable_rds_testing()) return;
738 int kTimeoutMillisecond = 1000000; // 1000s wait for the transient failure.
739 balancer_->ads_service()->UnsetResource(kRdsTypeUrl,
740 kDefaultRouteConfigurationName);
741 CheckRpcSendFailure(
742 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
743 absl::StrCat("empty address list \\(RDS resource ",
744 kDefaultRouteConfigurationName,
745 ": does not exist \\(node ID:xds_end2end_test\\)\\)"),
746 RpcOptions().set_timeout_ms(kTimeoutMillisecond));
747 auto csds_response = FetchCsdsResponse();
748 EXPECT_THAT(
749 csds_response.config(0).generic_xds_configs(),
750 ::testing::Contains(EqGenericXdsConfig(
751 kRdsTypeUrl, kDefaultRouteConfigurationName, ::testing::_,
752 ::testing::_, ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
753 }
754
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpClusterDoesNotExist)755 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpClusterDoesNotExist) {
756 int kTimeoutMillisecond = 1000000; // 1000s wait for the transient failure.
757 balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
758 CheckRpcSendFailure(
759 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
760 absl::StrCat("CDS resource ", kDefaultClusterName,
761 ": does not exist \\(node ID:xds_end2end_test\\)"),
762 RpcOptions().set_timeout_ms(kTimeoutMillisecond));
763 auto csds_response = FetchCsdsResponse();
764 EXPECT_THAT(csds_response.config(0).generic_xds_configs(),
765 ::testing::Contains(EqGenericXdsConfig(
766 kCdsTypeUrl, kDefaultClusterName, ::testing::_, ::testing::_,
767 ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
768 }
769
TEST_P(CsdsShortAdsTimeoutTest,XdsConfigDumpEndpointDoesNotExist)770 TEST_P(CsdsShortAdsTimeoutTest, XdsConfigDumpEndpointDoesNotExist) {
771 int kTimeoutMillisecond = 1000000; // 1000s wait for the transient failure.
772 balancer_->ads_service()->UnsetResource(kEdsTypeUrl, kDefaultEdsServiceName);
773 CheckRpcSendFailure(
774 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
775 "no children in weighted_target policy: EDS resource eds_service_name: "
776 "does not exist \\(node ID:xds_end2end_test\\)",
777 RpcOptions().set_timeout_ms(kTimeoutMillisecond));
778 auto csds_response = FetchCsdsResponse();
779 EXPECT_THAT(
780 csds_response.config(0).generic_xds_configs(),
781 ::testing::Contains(EqGenericXdsConfig(
782 kEdsTypeUrl, kDefaultEdsServiceName, ::testing::_, ::testing::_,
783 ClientResourceStatus::DOES_NOT_EXIST, ::testing::_)));
784 }
785
786 } // namespace
787 } // namespace testing
788 } // namespace grpc
789
790 #endif // DISABLED_XDS_PROTO_IN_CC
791
main(int argc,char ** argv)792 int main(int argc, char** argv) {
793 grpc::testing::TestEnvironment env(&argc, argv);
794 ::testing::InitGoogleTest(&argc, argv);
795 // Make the backup poller poll very frequently in order to pick up
796 // updates from all the subchannels's FDs.
797 grpc_core::ConfigVars::Overrides overrides;
798 overrides.client_channel_backup_poll_interval_ms = 1;
799 grpc_core::ConfigVars::SetOverrides(overrides);
800 #if TARGET_OS_IPHONE
801 // Workaround Apple CFStream bug
802 grpc_core::SetEnv("grpc_cfstream", "0");
803 #endif
804 grpc_init();
805 const auto result = RUN_ALL_TESTS();
806 grpc_shutdown();
807 return result;
808 }
809