// // Copyright 2022 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // TODO(roth): Add the following tests: // - tests for DumpClientConfigBinary() // - tests for load-reporting APIs? (or maybe move those out of XdsClient?) #include "src/core/ext/xds/xds_client.h" #include #include #include #include #include #include #include #include #include #include #include #include "absl/strings/str_cat.h" #include "absl/time/time.h" #include "absl/types/optional.h" #include "absl/types/variant.h" #include "gmock/gmock.h" #include "gtest/gtest.h" #include "upb/reflection/def.h" #include #include #include #include #include "src/core/ext/xds/xds_bootstrap.h" #include "src/core/ext/xds/xds_resource_type_impl.h" #include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/match.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/json/json.h" #include "src/core/lib/json/json_args.h" #include "src/core/lib/json/json_object_loader.h" #include "src/core/lib/json/json_reader.h" #include "src/core/lib/json/json_writer.h" #include "src/proto/grpc/testing/xds/v3/base.pb.h" #include "src/proto/grpc/testing/xds/v3/discovery.pb.h" #include "test/core/util/scoped_env_var.h" #include "test/core/util/test_config.h" #include "test/core/xds/xds_client_test_peer.h" #include "test/core/xds/xds_transport_fake.h" // IWYU pragma: no_include // IWYU pragma: no_include // IWYU pragma: no_include // IWYU pragma: no_include // IWYU pragma: no_include "google/protobuf/json/json.h" // IWYU pragma: no_include "google/protobuf/util/json_util.h" using envoy::service::discovery::v3::DiscoveryRequest; using envoy::service::discovery::v3::DiscoveryResponse; namespace grpc_core { namespace testing { namespace { constexpr absl::string_view kDefaultXdsServerUrl = "default_xds_server"; class XdsClientTest : public ::testing::Test { protected: // A fake bootstrap implementation that allows tests to populate the // fields however they want. class FakeXdsBootstrap : public XdsBootstrap { public: class FakeNode : public Node { public: FakeNode() : id_("xds_client_test") {} const std::string& id() const override { return id_; } const std::string& cluster() const override { return cluster_; } const std::string& locality_region() const override { return locality_region_; } const std::string& locality_zone() const override { return locality_zone_; } const std::string& locality_sub_zone() const override { return locality_sub_zone_; } const Json::Object& metadata() const override { return metadata_; } void set_id(std::string id) { id_ = std::move(id); } void set_cluster(std::string cluster) { cluster_ = std::move(cluster); } void set_locality_region(std::string locality_region) { locality_region_ = std::move(locality_region); } void set_locality_zone(std::string locality_zone) { locality_zone_ = std::move(locality_zone); } void set_locality_sub_zone(std::string locality_sub_zone) { locality_sub_zone_ = std::move(locality_sub_zone); } void set_metadata(Json::Object metadata) { metadata_ = std::move(metadata); } private: std::string id_; std::string cluster_; std::string locality_region_; std::string locality_zone_; std::string locality_sub_zone_; Json::Object metadata_; }; class FakeXdsServer : public XdsServer { public: explicit FakeXdsServer( absl::string_view server_uri = kDefaultXdsServerUrl, bool ignore_resource_deletion = false) : server_uri_(server_uri), ignore_resource_deletion_(ignore_resource_deletion) {} const std::string& server_uri() const override { return server_uri_; } bool IgnoreResourceDeletion() const override { return ignore_resource_deletion_; } bool Equals(const XdsServer& other) const override { const auto& o = static_cast(other); return server_uri_ == o.server_uri_ && ignore_resource_deletion_ == o.ignore_resource_deletion_; } std::string Key() const override { return absl::StrCat(server_uri_, "#", ignore_resource_deletion_); } private: std::string server_uri_; bool ignore_resource_deletion_ = false; }; class FakeAuthority : public Authority { public: std::vector servers() const override { if (server_.has_value()) { return {&*server_}; } else { return {}; }; } void set_server(absl::optional server) { server_ = std::move(server); } private: absl::optional server_; }; class Builder { public: Builder() { node_.emplace(); } Builder& set_node_id(std::string id) { if (!node_.has_value()) node_.emplace(); node_->set_id(std::move(id)); return *this; } Builder& AddAuthority(std::string name, FakeAuthority authority) { authorities_[std::move(name)] = std::move(authority); return *this; } Builder& SetServers(absl::Span servers) { servers_.assign(servers.begin(), servers.end()); return *this; } std::unique_ptr Build() { auto bootstrap = std::make_unique(); bootstrap->servers_ = std::move(servers_); bootstrap->node_ = std::move(node_); bootstrap->authorities_ = std::move(authorities_); return bootstrap; } private: std::vector servers_ = {FakeXdsServer()}; absl::optional node_; std::map authorities_; }; std::string ToString() const override { return ""; } std::vector servers() const override { std::vector result; result.reserve(servers_.size()); for (size_t i = 0; i < servers_.size(); ++i) { result.emplace_back(&servers_[i]); } return result; } const Node* node() const override { return node_.has_value() ? &*node_ : nullptr; } const Authority* LookupAuthority(const std::string& name) const override { auto it = authorities_.find(name); if (it == authorities_.end()) return nullptr; return &it->second; } private: std::vector servers_; absl::optional node_; std::map authorities_; }; // A template for a test xDS resource type with an associated watcher impl. // For simplicity, we use JSON instead of proto for serialization. // // The specified ResourceStruct must provide the following: // - a static JsonLoader() method, as described in json_object_loader.h // - an AsJsonString() method that returns the object in JSON string form // - a static TypeUrl() method that returns the resource type // // The all_resources_required_in_sotw parameter indicates the value // that should be returned by the AllResourcesRequiredInSotW() method. template class XdsTestResourceType : public XdsResourceTypeImpl< XdsTestResourceType, ResourceStruct> { public: struct ResourceAndReadDelayHandle { std::shared_ptr resource; RefCountedPtr read_delay_handle; }; // A watcher implementation that queues delivered watches. class Watcher : public XdsResourceTypeImpl< XdsTestResourceType, ResourceStruct>::WatcherInterface { public: ~Watcher() override { MutexLock lock(&mu_); EXPECT_THAT(queue_, ::testing::IsEmpty()) << this << " " << Match( queue_[0], [&](const ResourceAndReadDelayHandle& resource) { return absl::StrFormat("Resource %s", resource.resource->name); }, [&](const absl::Status& status) { return status.ToString(); }, [&](const DoesNotExist& /* tag */) -> std::string { return ""; }); } // Returns true if no event is received during the timeout period. bool ExpectNoEvent(absl::Duration timeout) { MutexLock lock(&mu_); return !WaitForEventLocked(timeout); } bool HasEvent() { MutexLock lock(&mu_); return !queue_.empty(); } absl::optional WaitForNextResourceAndHandle( absl::Duration timeout = absl::Seconds(1), SourceLocation location = SourceLocation()) { MutexLock lock(&mu_); if (!WaitForEventLocked(timeout)) return absl::nullopt; Event& event = queue_.front(); if (!absl::holds_alternative(event)) { EXPECT_TRUE(false) << "got unexpected event " << (absl::holds_alternative(event) ? "error" : "does-not-exist") << " at " << location.file() << ":" << location.line(); return absl::nullopt; } auto foo = std::move(absl::get(event)); queue_.pop_front(); return foo; } std::shared_ptr WaitForNextResource( absl::Duration timeout = absl::Seconds(1), SourceLocation location = SourceLocation()) { auto resource_and_handle = WaitForNextResourceAndHandle(timeout, location); if (!resource_and_handle.has_value()) { return nullptr; } return std::move(resource_and_handle->resource); } absl::optional WaitForNextError( absl::Duration timeout = absl::Seconds(1), SourceLocation location = SourceLocation()) { MutexLock lock(&mu_); if (!WaitForEventLocked(timeout)) return absl::nullopt; Event& event = queue_.front(); if (!absl::holds_alternative(event)) { EXPECT_TRUE(false) << "got unexpected event " << (absl::holds_alternative(event) ? "resource" : "does-not-exist") << " at " << location.file() << ":" << location.line(); return absl::nullopt; } absl::Status error = std::move(absl::get(event)); queue_.pop_front(); return std::move(error); } bool WaitForDoesNotExist(absl::Duration timeout, SourceLocation location = SourceLocation()) { MutexLock lock(&mu_); if (!WaitForEventLocked(timeout)) return false; Event& event = queue_.front(); if (!absl::holds_alternative(event)) { EXPECT_TRUE(false) << "got unexpected event " << (absl::holds_alternative(event) ? "error" : "resource") << " at " << location.file() << ":" << location.line(); return false; } queue_.pop_front(); return true; } private: struct DoesNotExist {}; using Event = absl::variant; void OnResourceChanged(std::shared_ptr foo, RefCountedPtr read_delay_handle) override { MutexLock lock(&mu_); ResourceAndReadDelayHandle event_details = { std::move(foo), std::move(read_delay_handle)}; queue_.emplace_back(std::move(event_details)); cv_.Signal(); } void OnError( absl::Status status, RefCountedPtr /* read_delay_handle */) override { MutexLock lock(&mu_); queue_.push_back(std::move(status)); cv_.Signal(); } void OnResourceDoesNotExist( RefCountedPtr /* read_delay_handle */) override { MutexLock lock(&mu_); queue_.push_back(DoesNotExist()); cv_.Signal(); } // Returns true if an event was received, or false if the timeout // expires before any event is received. bool WaitForEventLocked(absl::Duration timeout) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) { while (queue_.empty()) { if (cv_.WaitWithTimeout(&mu_, timeout * grpc_test_slowdown_factor())) { return false; } } return true; } Mutex mu_; CondVar cv_; std::deque queue_ ABSL_GUARDED_BY(&mu_); }; absl::string_view type_url() const override { return ResourceStruct::TypeUrl(); } XdsResourceType::DecodeResult Decode( const XdsResourceType::DecodeContext& /*context*/, absl::string_view serialized_resource) const override { auto json = JsonParse(serialized_resource); XdsResourceType::DecodeResult result; if (!json.ok()) { result.resource = json.status(); } else { absl::StatusOr foo = LoadFromJson(*json); if (!foo.ok()) { auto it = json->object().find("name"); if (it != json->object().end()) { result.name = it->second.string(); } result.resource = foo.status(); } else { result.name = foo->name; result.resource = std::make_unique(std::move(*foo)); } } return result; } bool AllResourcesRequiredInSotW() const override { return all_resources_required_in_sotw; } void InitUpbSymtab(XdsClient*, upb_DefPool* /*symtab*/) const override {} static google::protobuf::Any EncodeAsAny(const ResourceStruct& resource) { google::protobuf::Any any; any.set_type_url( absl::StrCat("type.googleapis.com/", ResourceStruct::TypeUrl())); any.set_value(resource.AsJsonString()); return any; } }; // A fake "Foo" xDS resource type. struct XdsFooResource : public XdsResourceType::ResourceData { std::string name; uint32_t value; XdsFooResource() = default; XdsFooResource(std::string name, uint32_t value) : name(std::move(name)), value(value) {} bool operator==(const XdsFooResource& other) const { return name == other.name && value == other.value; } std::string AsJsonString() const { return absl::StrCat("{\"name\":\"", name, "\",\"value\":", value, "}"); } static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { static const auto* loader = JsonObjectLoader() .Field("name", &XdsFooResource::name) .Field("value", &XdsFooResource::value) .Finish(); return loader; } static absl::string_view TypeUrl() { return "test.v3.foo"; } }; using XdsFooResourceType = XdsTestResourceType; // A fake "Bar" xDS resource type. struct XdsBarResource : public XdsResourceType::ResourceData { std::string name; std::string value; XdsBarResource() = default; XdsBarResource(std::string name, std::string value) : name(std::move(name)), value(std::move(value)) {} bool operator==(const XdsBarResource& other) const { return name == other.name && value == other.value; } std::string AsJsonString() const { return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value, "\"}"); } static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { static const auto* loader = JsonObjectLoader() .Field("name", &XdsBarResource::name) .Field("value", &XdsBarResource::value) .Finish(); return loader; } static absl::string_view TypeUrl() { return "test.v3.bar"; } }; using XdsBarResourceType = XdsTestResourceType; // A fake "WildcardCapable" xDS resource type. // This resource type return true for AllResourcesRequiredInSotW(), // just like LDS and CDS. struct XdsWildcardCapableResource : public XdsResourceType::ResourceData { std::string name; uint32_t value; XdsWildcardCapableResource() = default; XdsWildcardCapableResource(std::string name, uint32_t value) : name(std::move(name)), value(value) {} bool operator==(const XdsWildcardCapableResource& other) const { return name == other.name && value == other.value; } std::string AsJsonString() const { return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value, "\"}"); } static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { static const auto* loader = JsonObjectLoader() .Field("name", &XdsWildcardCapableResource::name) .Field("value", &XdsWildcardCapableResource::value) .Finish(); return loader; } static absl::string_view TypeUrl() { return "test.v3.wildcard_capable"; } }; using XdsWildcardCapableResourceType = XdsTestResourceType; // A helper class to build and serialize a DiscoveryResponse. class ResponseBuilder { public: explicit ResponseBuilder(absl::string_view type_url) { response_.set_type_url(absl::StrCat("type.googleapis.com/", type_url)); } ResponseBuilder& set_version_info(absl::string_view version_info) { response_.set_version_info(std::string(version_info)); return *this; } ResponseBuilder& set_nonce(absl::string_view nonce) { response_.set_nonce(std::string(nonce)); return *this; } template ResponseBuilder& AddResource( const typename ResourceType::ResourceType& resource, bool in_resource_wrapper = false) { auto* res = response_.add_resources(); *res = ResourceType::EncodeAsAny(resource); if (in_resource_wrapper) { envoy::service::discovery::v3::Resource resource_wrapper; resource_wrapper.set_name(resource.name); *resource_wrapper.mutable_resource() = std::move(*res); res->PackFrom(resource_wrapper); } return *this; } ResponseBuilder& AddFooResource(const XdsFooResource& resource, bool in_resource_wrapper = false) { return AddResource(resource, in_resource_wrapper); } ResponseBuilder& AddBarResource(const XdsBarResource& resource, bool in_resource_wrapper = false) { return AddResource(resource, in_resource_wrapper); } ResponseBuilder& AddWildcardCapableResource( const XdsWildcardCapableResource& resource, bool in_resource_wrapper = false) { return AddResource(resource, in_resource_wrapper); } ResponseBuilder& AddInvalidResource( absl::string_view type_url, absl::string_view value, absl::string_view resource_wrapper_name = "") { auto* res = response_.add_resources(); res->set_type_url(absl::StrCat("type.googleapis.com/", type_url)); res->set_value(std::string(value)); if (!resource_wrapper_name.empty()) { envoy::service::discovery::v3::Resource resource_wrapper; resource_wrapper.set_name(std::string(resource_wrapper_name)); *resource_wrapper.mutable_resource() = std::move(*res); res->PackFrom(resource_wrapper); } return *this; } ResponseBuilder& AddInvalidResourceWrapper() { auto* res = response_.add_resources(); res->set_type_url( "type.googleapis.com/envoy.service.discovery.v3.Resource"); res->set_value(std::string("\0", 1)); return *this; } ResponseBuilder& AddEmptyResource() { response_.add_resources(); return *this; } std::string Serialize() { std::string serialized_response; EXPECT_TRUE(response_.SerializeToString(&serialized_response)); return serialized_response; } private: DiscoveryResponse response_; }; class MetricsReporter : public XdsMetricsReporter { public: using ResourceUpdateMap = std::map< std::pair, uint64_t>; const ResourceUpdateMap& resource_updates_valid() const { return resource_updates_valid_; } const ResourceUpdateMap& resource_updates_invalid() const { return resource_updates_invalid_; } private: void ReportResourceUpdates(absl::string_view xds_server, absl::string_view resource_type, uint64_t num_resources_valid, uint64_t num_resources_invalid) override { auto key = std::make_pair(std::string(xds_server), std::string(resource_type)); if (num_resources_valid > 0) { resource_updates_valid_[key] += num_resources_valid; } if (num_resources_invalid > 0) { resource_updates_invalid_[key] += num_resources_invalid; } } ResourceUpdateMap resource_updates_valid_; ResourceUpdateMap resource_updates_invalid_; }; using ResourceCounts = std::vector>; ResourceCounts GetResourceCounts() { ResourceCounts resource_counts; XdsClientTestPeer(xds_client_.get()) .TestReportResourceCounts( [&](const XdsClientTestPeer::ResourceCountLabels& labels, uint64_t count) { resource_counts.emplace_back(labels, count); }); return resource_counts; } using ServerConnectionMap = std::map; ServerConnectionMap GetServerConnections() { ServerConnectionMap server_connection_map; XdsClientTestPeer(xds_client_.get()) .TestReportServerConnections( [&](absl::string_view xds_server, bool connected) { std::string server(xds_server); EXPECT_EQ(server_connection_map.find(server), server_connection_map.end()); server_connection_map[std::move(server)] = connected; }); return server_connection_map; } // Sets transport_factory_ and initializes xds_client_ with the // specified bootstrap config. void InitXdsClient( FakeXdsBootstrap::Builder bootstrap_builder = FakeXdsBootstrap::Builder(), Duration resource_request_timeout = Duration::Seconds(15)) { auto transport_factory = MakeOrphanable( []() { FAIL() << "Multiple concurrent reads"; }); transport_factory_ = transport_factory->Ref().TakeAsSubclass(); auto metrics_reporter = std::make_unique(); metrics_reporter_ = metrics_reporter.get(); xds_client_ = MakeRefCounted( bootstrap_builder.Build(), std::move(transport_factory), grpc_event_engine::experimental::GetDefaultEventEngine(), std::move(metrics_reporter), "foo agent", "foo version", resource_request_timeout * grpc_test_slowdown_factor()); } // Starts and cancels a watch for a Foo resource. RefCountedPtr StartFooWatch( absl::string_view resource_name) { auto watcher = MakeRefCounted(); XdsFooResourceType::StartWatch(xds_client_.get(), resource_name, watcher); return watcher; } void CancelFooWatch(XdsFooResourceType::Watcher* watcher, absl::string_view resource_name, bool delay_unsubscription = false) { XdsFooResourceType::CancelWatch(xds_client_.get(), resource_name, watcher, delay_unsubscription); } // Starts and cancels a watch for a Bar resource. RefCountedPtr StartBarWatch( absl::string_view resource_name) { auto watcher = MakeRefCounted(); XdsBarResourceType::StartWatch(xds_client_.get(), resource_name, watcher); return watcher; } void CancelBarWatch(XdsBarResourceType::Watcher* watcher, absl::string_view resource_name, bool delay_unsubscription = false) { XdsBarResourceType::CancelWatch(xds_client_.get(), resource_name, watcher, delay_unsubscription); } // Starts and cancels a watch for a WildcardCapable resource. RefCountedPtr StartWildcardCapableWatch(absl::string_view resource_name) { auto watcher = MakeRefCounted(); XdsWildcardCapableResourceType::StartWatch(xds_client_.get(), resource_name, watcher); return watcher; } void CancelWildcardCapableWatch( XdsWildcardCapableResourceType::Watcher* watcher, absl::string_view resource_name, bool delay_unsubscription = false) { XdsWildcardCapableResourceType::CancelWatch( xds_client_.get(), resource_name, watcher, delay_unsubscription); } RefCountedPtr WaitForAdsStream( const XdsBootstrap::XdsServer& xds_server, absl::Duration timeout = absl::Seconds(5)) { return transport_factory_->WaitForStream( xds_server, FakeXdsTransportFactory::kAdsMethod, timeout * grpc_test_slowdown_factor()); } void TriggerConnectionFailure(const XdsBootstrap::XdsServer& xds_server, absl::Status status) { transport_factory_->TriggerConnectionFailure(xds_server, std::move(status)); } RefCountedPtr WaitForAdsStream( absl::Duration timeout = absl::Seconds(5)) { return WaitForAdsStream(*xds_client_->bootstrap().servers().front(), timeout); } // Gets the latest request sent to the fake xDS server. absl::optional WaitForRequest( FakeXdsTransportFactory::FakeStreamingCall* stream, absl::Duration timeout = absl::Seconds(3), SourceLocation location = SourceLocation()) { auto message = stream->WaitForMessageFromClient(timeout * grpc_test_slowdown_factor()); if (!message.has_value()) return absl::nullopt; DiscoveryRequest request; bool success = request.ParseFromString(*message); EXPECT_TRUE(success) << "Failed to deserialize DiscoveryRequest at " << location.file() << ":" << location.line(); if (!success) return absl::nullopt; return std::move(request); } // Helper function to check the fields of a DiscoveryRequest. void CheckRequest(const DiscoveryRequest& request, absl::string_view type_url, absl::string_view version_info, absl::string_view response_nonce, const absl::Status& error_detail, const std::set& resource_names, SourceLocation location = SourceLocation()) { EXPECT_EQ(request.type_url(), absl::StrCat("type.googleapis.com/", type_url)) << location.file() << ":" << location.line(); EXPECT_EQ(request.version_info(), version_info) << location.file() << ":" << location.line(); EXPECT_EQ(request.response_nonce(), response_nonce) << location.file() << ":" << location.line(); if (error_detail.ok()) { EXPECT_FALSE(request.has_error_detail()) << location.file() << ":" << location.line(); } else { EXPECT_EQ(request.error_detail().code(), static_cast(error_detail.code())) << location.file() << ":" << location.line(); EXPECT_EQ(request.error_detail().message(), error_detail.message()) << location.file() << ":" << location.line(); } EXPECT_THAT(request.resource_names(), ::testing::UnorderedElementsAreArray(resource_names)) << location.file() << ":" << location.line(); } // Helper function to check the contents of the node message in a // request against the client's node info. void CheckRequestNode(const DiscoveryRequest& request, SourceLocation location = SourceLocation()) { // These fields come from the bootstrap config. EXPECT_EQ(request.node().id(), xds_client_->bootstrap().node()->id()) << location.file() << ":" << location.line(); EXPECT_EQ(request.node().cluster(), xds_client_->bootstrap().node()->cluster()) << location.file() << ":" << location.line(); EXPECT_EQ(request.node().locality().region(), xds_client_->bootstrap().node()->locality_region()) << location.file() << ":" << location.line(); EXPECT_EQ(request.node().locality().zone(), xds_client_->bootstrap().node()->locality_zone()) << location.file() << ":" << location.line(); EXPECT_EQ(request.node().locality().sub_zone(), xds_client_->bootstrap().node()->locality_sub_zone()) << location.file() << ":" << location.line(); if (xds_client_->bootstrap().node()->metadata().empty()) { EXPECT_FALSE(request.node().has_metadata()) << location.file() << ":" << location.line(); } else { std::string metadata_json_str; auto status = MessageToJsonString(request.node().metadata(), &metadata_json_str, GRPC_CUSTOM_JSONUTIL::JsonPrintOptions()); ASSERT_TRUE(status.ok()) << status << " on " << location.file() << ":" << location.line(); auto metadata_json = JsonParse(metadata_json_str); ASSERT_TRUE(metadata_json.ok()) << metadata_json.status() << " on " << location.file() << ":" << location.line(); Json expected = Json::FromObject(xds_client_->bootstrap().node()->metadata()); EXPECT_EQ(*metadata_json, expected) << location.file() << ":" << location.line() << ":\nexpected: " << JsonDump(expected) << "\nactual: " << JsonDump(*metadata_json); } EXPECT_EQ(request.node().user_agent_name(), "foo agent") << location.file() << ":" << location.line(); EXPECT_EQ(request.node().user_agent_version(), "foo version") << location.file() << ":" << location.line(); } RefCountedPtr transport_factory_; RefCountedPtr xds_client_; MetricsReporter* metrics_reporter_ = nullptr; }; MATCHER_P3(ResourceCountLabelsEq, xds_authority, resource_type, cache_state, "equals ResourceCountLabels") { bool ok = true; ok &= ::testing::ExplainMatchResult(xds_authority, arg.xds_authority, result_listener); ok &= ::testing::ExplainMatchResult(resource_type, arg.resource_type, result_listener); ok &= ::testing::ExplainMatchResult(cache_state, arg.cache_state, result_listener); return ok; } TEST_F(XdsClientTest, BasicWatch) { InitXdsClient(); // Metrics should initially be empty. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre()); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre()); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Check metrics. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Cancel watch. CancelFooWatch(watcher.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre()); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre()); } TEST_F(XdsClientTest, UpdateFromServer) { InitXdsClient(); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Server sends an updated version of the resource. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("2") .set_nonce("B") .AddFooResource(XdsFooResource("foo1", 9)) .Serialize()); // XdsClient should have delivered the response to the watcher. resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 9); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 2))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"2", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Cancel watch. CancelFooWatch(watcher.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, MultipleWatchersForSameResource) { InitXdsClient(); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Start a second watcher for the same resource. auto watcher2 = StartFooWatch("foo1"); // This watcher should get an immediate notification, because the // resource is already cached. resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Server should not have seen another request from the client. ASSERT_FALSE(stream->HaveMessageFromClient()); // Server sends an updated version of the resource. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("2") .set_nonce("B") .AddFooResource(XdsFooResource("foo1", 9)) .Serialize()); // XdsClient should deliver the response to both watchers. resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 9); resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 9); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 2))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"2", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Cancel one of the watchers. CancelFooWatch(watcher.get(), "foo1"); // The server should not see any new request. ASSERT_FALSE(WaitForRequest(stream.get())); // Now cancel the second watcher. CancelFooWatch(watcher2.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, SubscribeToMultipleResources) { InitXdsClient(); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // Check metrics. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Start a watch for "foo2". auto watcher2 = StartFooWatch("foo2"); // Check metric data. EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre( ::testing::Pair(ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1), ::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); // XdsClient should have sent a subscription request on the ADS stream. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("B") .AddFooResource(XdsFooResource("foo2", 7)) .Serialize()); // XdsClient should have delivered the response to the watcher. resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo2"); EXPECT_EQ(resource->value, 7); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 2))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 2))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); // Cancel watch for "foo1". CancelFooWatch(watcher.get(), "foo1"); // Check metric data. EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should send an unsubscription request. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"}); // Now cancel watch for "foo2". CancelFooWatch(watcher2.get(), "foo2"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, UpdateContainsOnlyChangedResource) { InitXdsClient(); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Start a watch for "foo2". auto watcher2 = StartFooWatch("foo2"); // XdsClient should have sent a subscription request on the ADS stream. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("B") .AddFooResource(XdsFooResource("foo2", 7)) .Serialize()); // XdsClient should have delivered the response to the watcher. resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo2"); EXPECT_EQ(resource->value, 7); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 2))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 2))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); // Server sends an update for "foo1". The response does not contain "foo2". stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("2") .set_nonce("C") .AddFooResource(XdsFooResource("foo1", 9)) .Serialize()); // XdsClient should have delivered the response to the watcher. resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 9); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 3))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 2))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"2", /*response_nonce=*/"C", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); // Cancel watch for "foo1". CancelFooWatch(watcher.get(), "foo1"); // XdsClient should send an unsubscription request. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"2", /*response_nonce=*/"C", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"}); // Now cancel watch for "foo2". CancelFooWatch(watcher2.get(), "foo2"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, ResourceValidationFailure) { InitXdsClient(); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Send a response containing an invalid resource. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddInvalidResource(XdsFooResourceType::Get()->type_url(), "{\"name\":\"foo1\",\"value\":[]}") .Serialize()); // XdsClient should deliver an error to the watcher. auto error = watcher->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); EXPECT_EQ(error->message(), "invalid resource: INVALID_ARGUMENT: errors validating JSON: " "[field:value error:is not a number] (node ID:xds_client_test)") << *error; // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "nacked"), 1))); // XdsClient should NACK the update. // Note that version_info is not populated in the request. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest( *request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"A", // error_detail= absl::InvalidArgumentError( "xDS response validation errors: [" "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: " "[field:value error:is not a number]]"), /*resource_names=*/{"foo1"}); // Start a second watch for the same resource. It should immediately // receive the same error. auto watcher2 = StartFooWatch("foo1"); error = watcher2->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); EXPECT_EQ(error->message(), "invalid resource: INVALID_ARGUMENT: errors validating JSON: " "[field:value error:is not a number] (node ID:xds_client_test)") << *error; // Now server sends an updated version of the resource. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("2") .set_nonce("B") .AddFooResource(XdsFooResource("foo1", 9)) .Serialize()); // XdsClient should deliver the response to both watchers. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 9); resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 9); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"2", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Cancel watch. CancelFooWatch(watcher.get(), "foo1"); CancelFooWatch(watcher2.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, ResourceValidationFailureMultipleResources) { InitXdsClient(); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Before the server responds, add a watch for another resource. auto watcher2 = StartFooWatch("foo2"); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 2))); // Client should send another request. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); // Add a watch for a third resource. auto watcher3 = StartFooWatch("foo3"); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 3))); // Client should send another request. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2", "foo3"}); // Add a watch for a fourth resource. auto watcher4 = StartFooWatch("foo4"); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 4))); // Client should send another request. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2", "foo3", "foo4"}); // Server sends a response containing three invalid resources and one // valid resource. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") // foo1: JSON parsing succeeds, so we know the resource name, // but validation fails. .AddInvalidResource(XdsFooResourceType::Get()->type_url(), "{\"name\":\"foo1\",\"value\":[]}") // foo2: JSON parsing fails, and not wrapped in a Resource // wrapper, so we don't actually know the resource's name. .AddInvalidResource(XdsFooResourceType::Get()->type_url(), "{\"name\":\"foo2,\"value\":6}") // Empty resource. Will be included in NACK but will not // affect any watchers. .AddEmptyResource() // Invalid resource wrapper. Will be included in NACK but // will not affect any watchers. .AddInvalidResourceWrapper() // foo3: JSON parsing fails, but it is wrapped in a Resource // wrapper, so we do know the resource's name. .AddInvalidResource(XdsFooResourceType::Get()->type_url(), "{\"name\":\"foo3,\"value\":6}", /*resource_wrapper_name=*/"foo3") // foo4: valid resource. .AddFooResource(XdsFooResource("foo4", 5)) .Serialize()); // XdsClient should deliver an error to the watchers for foo1 and foo3. auto error = watcher->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); EXPECT_EQ(error->message(), "invalid resource: INVALID_ARGUMENT: errors validating JSON: " "[field:value error:is not a number] (node ID:xds_client_test)") << *error; error = watcher3->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); EXPECT_EQ(error->message(), "invalid resource: INVALID_ARGUMENT: JSON parsing failed: " "[JSON parse error at index 15] (node ID:xds_client_test)") << *error; // It cannot delivery an error for foo2, because the client doesn't know // that that resource in the response was actually supposed to be foo2. EXPECT_FALSE(watcher2->HasEvent()); // It will delivery a valid resource update for foo4. auto resource = watcher4->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo4"); EXPECT_EQ(resource->value, 5); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 5))); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre( // foo4 ::testing::Pair(ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1), // foo1 and foo3 ::testing::Pair(ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "nacked"), 2), // did not recognize response for foo2 ::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); // XdsClient should NACK the update. // There was one good resource, so the version will be updated. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", // error_detail= absl::InvalidArgumentError(absl::StrCat( "xDS response validation errors: [" // foo1 "resource index 0: foo1: " "INVALID_ARGUMENT: errors validating JSON: " "[field:value error:is not a number]; " // foo2 (name not known) "resource index 1: INVALID_ARGUMENT: JSON parsing failed: " "[JSON parse error at index 15]; " // empty resource "resource index 2: incorrect resource type \"\" " "(should be \"", XdsFooResourceType::Get()->type_url(), "\"); " // invalid resource wrapper "resource index 3: Can't decode Resource proto wrapper; " // foo3 "resource index 4: foo3: " "INVALID_ARGUMENT: JSON parsing failed: " "[JSON parse error at index 15]]")), /*resource_names=*/{"foo1", "foo2", "foo3", "foo4"}); // Cancel watches. CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); CancelFooWatch(watcher2.get(), "foo2", /*delay_unsubscription=*/true); CancelFooWatch(watcher3.get(), "foo3", /*delay_unsubscription=*/true); CancelFooWatch(watcher4.get(), "foo4"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) { InitXdsClient(); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Send an update containing an invalid resource. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("2") .set_nonce("B") .AddInvalidResource(XdsFooResourceType::Get()->type_url(), "{\"name\":\"foo1\",\"value\":[]}") .Serialize()); // XdsClient should deliver an error to the watcher. auto error = watcher->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); EXPECT_EQ(error->message(), "invalid resource: INVALID_ARGUMENT: errors validating JSON: " "[field:value error:is not a number] (node ID:xds_client_test)") << *error; // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "nacked_but_cached"), 1))); // XdsClient should NACK the update. // Note that version_info is set to the previous version in this request, // because there were no valid resources in it. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest( *request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"B", // error_detail= absl::InvalidArgumentError( "xDS response validation errors: [" "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: " "[field:value error:is not a number]]"), /*resource_names=*/{"foo1"}); // Start a second watcher for the same resource. Even though the last // update was a NACK, we should still deliver the cached resource to // the watcher. // TODO(roth): Consider what the right behavior is here. It seems // inconsistent that the watcher sees the error if it had started // before the error was seen but does not if it was started afterwards. // One option is to not send errors at all for already-cached resources; // another option is to send the errors even for newly started watchers. auto watcher2 = StartFooWatch("foo1"); resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Cancel watches. CancelFooWatch(watcher.get(), "foo1"); CancelFooWatch(watcher2.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, WildcardCapableResponseWithEmptyResource) { InitXdsClient(); // Start a watch for "wc1". auto watcher = StartWildcardCapableWatch("wc1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"wc1"}); CheckRequestNode(*request); // Should be present on the first request. // Server sends a response containing the requested resources plus an // empty resource. stream->SendMessageToClient( ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6)) .AddEmptyResource() .Serialize()); // XdsClient will delivery a valid resource update for wc1. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "wc1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT( metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsWildcardCapableResourceType::Get()->type_url()), 1))); EXPECT_THAT( metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsWildcardCapableResourceType::Get()->type_url()), 1))); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsWildcardCapableResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should NACK the update. // There was one good resource, so the version will be updated. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", // error_detail= absl::InvalidArgumentError(absl::StrCat( "xDS response validation errors: [" "resource index 1: incorrect resource type \"\" " "(should be \"", XdsWildcardCapableResourceType::Get()->type_url(), "\")]")), /*resource_names=*/{"wc1"}); // Cancel watch. CancelWildcardCapableWatch(watcher.get(), "wc1"); EXPECT_TRUE(stream->Orphaned()); } // This tests resource removal triggered by the server when using a // resource type that requires all resources to be present in every // response, similar to LDS and CDS. TEST_F(XdsClientTest, ResourceDeletion) { InitXdsClient(); // Start a watch for "wc1". auto watcher = StartWildcardCapableWatch("wc1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"wc1"}); CheckRequestNode(*request); // Should be present on the first request. // Server sends a response. stream->SendMessageToClient( ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "wc1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT( metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsWildcardCapableResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsWildcardCapableResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"wc1"}); // Server now sends a response without the resource, thus indicating // it's been deleted. stream->SendMessageToClient( ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url()) .set_version_info("2") .set_nonce("B") .Serialize()); // Watcher should see the does-not-exist event. EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(1))); // Check metric data. EXPECT_THAT( metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsWildcardCapableResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsWildcardCapableResourceType::Get()->type_url(), "does_not_exist"), 1))); // Start a new watcher for the same resource. It should immediately // receive the same does-not-exist notification. auto watcher2 = StartWildcardCapableWatch("wc1"); EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(), /*version_info=*/"2", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"wc1"}); // Server sends the resource again. stream->SendMessageToClient( ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url()) .set_version_info("3") .set_nonce("C") .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 7)) .Serialize()); // XdsClient should have delivered the response to the watchers. resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "wc1"); EXPECT_EQ(resource->value, 7); resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "wc1"); EXPECT_EQ(resource->value, 7); // Check metric data. EXPECT_THAT( metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsWildcardCapableResourceType::Get()->type_url()), 2))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsWildcardCapableResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(), /*version_info=*/"3", /*response_nonce=*/"C", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"wc1"}); // Cancel watch. CancelWildcardCapableWatch(watcher.get(), "wc1"); CancelWildcardCapableWatch(watcher2.get(), "wc1"); EXPECT_TRUE(stream->Orphaned()); } // This tests that when we ignore resource deletions from the server // when configured to do so. TEST_F(XdsClientTest, ResourceDeletionIgnoredWhenConfigured) { InitXdsClient(FakeXdsBootstrap::Builder().SetServers( {FakeXdsBootstrap::FakeXdsServer(kDefaultXdsServerUrl, true)})); // Start a watch for "wc1". auto watcher = StartWildcardCapableWatch("wc1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"wc1"}); CheckRequestNode(*request); // Should be present on the first request. // Server sends a response. stream->SendMessageToClient( ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "wc1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT( metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsWildcardCapableResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsWildcardCapableResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"wc1"}); // Server now sends a response without the resource, thus indicating // it's been deleted. stream->SendMessageToClient( ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url()) .set_version_info("2") .set_nonce("B") .Serialize()); // Watcher should not see any update, since we should have ignored the // deletion. EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(1))); // Check metric data. EXPECT_THAT( metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsWildcardCapableResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsWildcardCapableResourceType::Get()->type_url(), "acked"), 1))); // Start a new watcher for the same resource. It should immediately // receive the cached resource. auto watcher2 = StartWildcardCapableWatch("wc1"); resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "wc1"); EXPECT_EQ(resource->value, 6); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(), /*version_info=*/"2", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"wc1"}); // Server sends a new value for the resource. stream->SendMessageToClient( ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url()) .set_version_info("3") .set_nonce("C") .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 7)) .Serialize()); // XdsClient should have delivered the response to the watchers. resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "wc1"); EXPECT_EQ(resource->value, 7); resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "wc1"); EXPECT_EQ(resource->value, 7); // Check metric data. EXPECT_THAT( metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsWildcardCapableResourceType::Get()->type_url()), 2))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsWildcardCapableResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(), /*version_info=*/"3", /*response_nonce=*/"C", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"wc1"}); // Cancel watch. CancelWildcardCapableWatch(watcher.get(), "wc1"); CancelWildcardCapableWatch(watcher2.get(), "wc1"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, StreamClosedByServer) { InitXdsClient(); // Metrics should initially be empty. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre()); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Check metric data. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Server sends a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Now server closes the stream. stream->MaybeSendStatusToClient(absl::OkStatus()); // XdsClient should NOT report error to watcher, because we saw a // response on the stream before it failed. // Stream should be orphaned. EXPECT_TRUE(stream->Orphaned()); // Check metric data. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); // XdsClient should create a new stream. stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient sends a subscription request. // Note that the version persists from the previous stream, but the // nonce does not. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Before the server resends the resource, start a new watcher for the // same resource. This watcher should immediately receive the cached // resource. auto watcher2 = StartFooWatch("foo1"); resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Server now sends the requested resource. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("B") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // Watcher does NOT get an update, since the resource has not changed. EXPECT_FALSE(watcher->WaitForNextResource()); // XdsClient sends an ACK. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Cancel watcher. CancelFooWatch(watcher.get(), "foo1"); CancelFooWatch(watcher2.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) { InitXdsClient(); // Metrics should initially be empty. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre()); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Check metric data. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Server closes the stream without sending a response. stream->MaybeSendStatusToClient(absl::UnavailableError("ugh")); // Check metric data. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, false))); // XdsClient should report an error to the watcher. auto error = watcher->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); EXPECT_EQ(error->message(), "xDS channel for server default_xds_server: xDS call failed " "with no responses received; status: UNAVAILABLE: ugh " "(node ID:xds_client_test)") << *error; // XdsClient should create a new stream. stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient sends a subscription request. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Connection still reported as unhappy until we get a response. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, false))); // Server now sends the requested resource. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // Watcher gets the resource. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Connection now reported as happy. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); // XdsClient sends an ACK. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Cancel watcher. CancelFooWatch(watcher.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, ConnectionFails) { // Lower resources-does-not-exist timeout, to make sure that we're not // triggering that here. InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); // Tell transport to let us manually trigger completion of the // send_message ops to XdsClient. transport_factory_->SetAutoCompleteMessagesFromClient(false); // Metrics should initially be empty. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre()); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Check metric data. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Transport reports connection failure. TriggerConnectionFailure(*xds_client_->bootstrap().servers().front(), absl::UnavailableError("connection failed")); // XdsClient should report an error to the watcher. auto error = watcher->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); EXPECT_EQ(error->message(), "xDS channel for server default_xds_server: " "connection failed (node ID:xds_client_test)") << *error; // Connection reported as unhappy. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, false))); // We should not see a resource-does-not-exist event, because the // timer should not be running while the channel is disconnected. EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4))); // Start a new watch. This watcher should be given the same error, // since we have not yet recovered. auto watcher2 = StartFooWatch("foo1"); error = watcher2->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); EXPECT_EQ(error->message(), "xDS channel for server default_xds_server: " "connection failed (node ID:xds_client_test)") << *error; // Second watcher should not see resource-does-not-exist either. EXPECT_FALSE(watcher2->HasEvent()); // The ADS stream uses wait_for_ready inside the XdsTransport interface, // so when the channel reconnects, the already-started stream will proceed. stream->CompleteSendMessageFromClient(); // Server sends a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // Connection now reported as happy. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); // XdsClient should have delivered the response to the watchers. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); stream->CompleteSendMessageFromClient(); // Cancel watches. CancelFooWatch(watcher.get(), "foo1"); CancelFooWatch(watcher2.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) { InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1)); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Do not send a response, but wait for the resource to be reported as // not existing. EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(5))); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "does_not_exist"), 1))); // Start a new watcher for the same resource. It should immediately // receive the same does-not-exist notification. auto watcher2 = StartFooWatch("foo1"); EXPECT_TRUE(watcher2->WaitForDoesNotExist(absl::Seconds(1))); // Now server sends a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watchers. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Cancel watch. CancelFooWatch(watcher.get(), "foo1"); CancelFooWatch(watcher2.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) { // Lower resources-does-not-exist timeout so test finishes faster. InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); // Metrics should initially be empty. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre()); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Stream fails. stream->MaybeSendStatusToClient(absl::UnavailableError("ugh")); // XdsClient should report error to watcher. auto error = watcher->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); EXPECT_EQ(error->message(), "xDS channel for server default_xds_server: xDS call failed " "with no responses received; status: UNAVAILABLE: ugh " "(node ID:xds_client_test)") << *error; // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); // XdsClient should create a new stream. stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient sends a subscription request. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Server does NOT send a response immediately. // Client should receive a resource does-not-exist. ASSERT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4))); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "does_not_exist"), 1))); // Server now sends the requested resource. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // The resource is delivered to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient sends an ACK. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Cancel watcher. CancelFooWatch(watcher.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, DoesNotExistTimerNotStartedUntilSendCompletes) { // Lower resources-does-not-exist timeout, to make sure that we're not // triggering that here. InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); // Tell transport to let us manually trigger completion of the // send_message ops to XdsClient. transport_factory_->SetAutoCompleteMessagesFromClient(false); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Server does NOT send a response. // We should not see a resource-does-not-exist event, because the // timer should not be running while the channel is disconnected. EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4))); // Check metric data. EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); // The ADS stream uses wait_for_ready inside the XdsTransport interface, // so when the channel connects, the already-started stream will proceed. stream->CompleteSendMessageFromClient(); // Server does NOT send a response. // Watcher should see a does-not-exist event. EXPECT_TRUE(watcher->WaitForDoesNotExist(absl::Seconds(4))); // Check metric data. EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "does_not_exist"), 1))); // Now server sends a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); stream->CompleteSendMessageFromClient(); // Cancel watch. CancelFooWatch(watcher.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); } // In https://github.com/grpc/grpc/issues/29583, we ran into a case // where we wound up starting a timer after we had already received the // resource, thus incorrectly reporting the resource as not existing. // This happened when unsubscribing and then resubscribing to the same // resource a send_message op was already in flight and then receiving an // update containing that resource. TEST_F(XdsClientTest, ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending) { InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(1)); // Tell transport to let us manually trigger completion of the // send_message ops to XdsClient. transport_factory_->SetAutoCompleteMessagesFromClient(false); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. stream->CompleteSendMessageFromClient(); // Server sends a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watchers. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); stream->CompleteSendMessageFromClient(); // Start a watch for a second resource. auto watcher2 = StartFooWatch("foo2"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher2->HasEvent()); // Check metric data. EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre( ::testing::Pair(ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1), ::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); // XdsClient sends a request to subscribe to the new resource. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); // NOTE: We do NOT yet tell the XdsClient that the send_message op is // complete. // Unsubscribe from foo1 and then re-subscribe to it. CancelFooWatch(watcher.get(), "foo1"); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); watcher = StartFooWatch("foo1"); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 2))); // Now send a response from the server containing both foo1 and foo2. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("B") .AddFooResource(XdsFooResource("foo1", 6)) .AddFooResource(XdsFooResource("foo2", 7)) .Serialize()); // The watcher for foo1 will receive an update even if the resource // has not changed, since the previous value was removed from the // cache when we unsubscribed. resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // For foo2, the watcher should receive notification for the new resource. resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo2"); EXPECT_EQ(resource->value, 7); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 3))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 2))); // Now we finally tell XdsClient that its previous send_message op is // complete. stream->CompleteSendMessageFromClient(); // XdsClient should send an ACK with the updated subscription list // (which happens to be identical to the old list), and it should not // restart the does-not-exist timer. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); stream->CompleteSendMessageFromClient(); // Make sure the watcher for foo1 does not see a does-not-exist event. EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(5))); // Cancel watches. CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); CancelFooWatch(watcher2.get(), "foo2"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, DoNotSendDoesNotExistForCachedResource) { // Lower resources-does-not-exist timeout, to make sure that we're not // triggering that here. InitXdsClient(FakeXdsBootstrap::Builder(), Duration::Seconds(3)); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Server sends a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Stream fails because of transport disconnection. stream->MaybeSendStatusToClient(absl::UnavailableError("connection failed")); // XdsClient should NOT report error to watcher, because we saw a // response on the stream before it failed. // XdsClient creates a new stream. stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Server does NOT send a response. // We should not see a resource-does-not-exist event, because the // resource was already cached, so the server can optimize by not // resending it. EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(4))); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // Now server sends a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // Watcher will not see any update, since the resource is unchanged. EXPECT_TRUE(watcher->ExpectNoEvent(absl::Seconds(1))); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 2))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Cancel watch. CancelFooWatch(watcher.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, ResourceWrappedInResourceMessage) { InitXdsClient(); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Send a response with the resource wrapped in a Resource message. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6), /*in_resource_wrapper=*/true) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Cancel watch. CancelFooWatch(watcher.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, MultipleResourceTypes) { InitXdsClient(); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Start a watch for "bar1". auto watcher2 = StartBarWatch("bar1"); // XdsClient should have sent a subscription request on the ADS stream. // Note that version and nonce here do NOT use the values for Foo, // since each resource type has its own state. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsBarResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"bar1"}); // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsBarResourceType::Get()->type_url()) .set_version_info("2") .set_nonce("B") .AddBarResource(XdsBarResource("bar1", "whee")) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource2 = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource2->name, "bar1"); EXPECT_EQ(resource2->value, "whee"); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre( ::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsBarResourceType::Get()->type_url()), 1), ::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::UnorderedElementsAre( ::testing::Pair(ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsBarResourceType::Get()->type_url(), "acked"), 1), ::testing::Pair(ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsBarResourceType::Get()->type_url(), /*version_info=*/"2", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"bar1"}); // Cancel watch for "foo1". CancelFooWatch(watcher.get(), "foo1"); // XdsClient should send an unsubscription request. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{}); // Now cancel watch for "bar1". CancelBarWatch(watcher2.get(), "bar1"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, Federation) { constexpr char kAuthority[] = "xds.example.com"; const std::string kXdstpResourceName = absl::StrCat( "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2"); FakeXdsBootstrap::FakeXdsServer authority_server("other_xds_server"); FakeXdsBootstrap::FakeAuthority authority; authority.set_server(authority_server); InitXdsClient( FakeXdsBootstrap::Builder().AddAuthority(kAuthority, authority)); // Metrics should initially be empty. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre()); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre()); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre()); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream to the top-level xDS server. auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front()); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Start a watch for the xdstp resource name. auto watcher2 = StartFooWatch(kXdstpResourceName); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher2->HasEvent()); // Check metric data. EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre( ::testing::Pair(ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1), ::testing::Pair(ResourceCountLabelsEq( kAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre( ::testing::Pair(kDefaultXdsServerUrl, true), ::testing::Pair(authority_server.server_uri(), true))); // XdsClient will create a new stream to the server for this authority. auto stream2 = WaitForAdsStream(authority_server); ASSERT_TRUE(stream2 != nullptr); // XdsClient should have sent a subscription request on the ADS stream. // Note that version and nonce here do NOT use the values for Foo, // since each authority has its own state. request = WaitForRequest(stream2.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{kXdstpResourceName}); CheckRequestNode(*request); // Should be present on the first request. // Send a response. stream2->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("2") .set_nonce("B") .AddFooResource(XdsFooResource(kXdstpResourceName, 3)) .Serialize()); // XdsClient should have delivered the response to the watcher. resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, kXdstpResourceName); EXPECT_EQ(resource->value, 3); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre( ::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1), ::testing::Pair( ::testing::Pair(authority_server.server_uri(), XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre( ::testing::Pair(ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1), ::testing::Pair( ResourceCountLabelsEq( kAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre( ::testing::Pair(kDefaultXdsServerUrl, true), ::testing::Pair(authority_server.server_uri(), true))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream2.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"2", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{kXdstpResourceName}); // Cancel watch for "foo1". CancelFooWatch(watcher.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); // Now cancel watch for xdstp resource name. CancelFooWatch(watcher2.get(), kXdstpResourceName); EXPECT_TRUE(stream2->Orphaned()); } TEST_F(XdsClientTest, FederationAuthorityDefaultsToTopLevelXdsServer) { constexpr char kAuthority[] = "xds.example.com"; const std::string kXdstpResourceName = absl::StrCat( "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2"); // Authority does not specify any xDS servers, so XdsClient will use // the top-level xDS server in the bootstrap config for this authority. InitXdsClient(FakeXdsBootstrap::Builder().AddAuthority( kAuthority, FakeXdsBootstrap::FakeAuthority())); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream to the top-level xDS server. auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front()); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Start a watch for the xdstp resource name. auto watcher2 = StartFooWatch(kXdstpResourceName); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher2->HasEvent()); // XdsClient will send a subscription request on the ADS stream that // includes both resources, since both are being obtained from the // same server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", kXdstpResourceName}); // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("2") .set_nonce("B") .AddFooResource(XdsFooResource(kXdstpResourceName, 3)) .Serialize()); // XdsClient should have delivered the response to the watcher. resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, kXdstpResourceName); EXPECT_EQ(resource->value, 3); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 2))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre( ::testing::Pair(ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1), ::testing::Pair( ResourceCountLabelsEq( kAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"2", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", kXdstpResourceName}); // Cancel watch for "foo1". CancelFooWatch(watcher.get(), "foo1"); // XdsClient should send an unsubscription request. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"2", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{kXdstpResourceName}); // Now cancel watch for xdstp resource name. CancelFooWatch(watcher2.get(), kXdstpResourceName); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, FederationWithUnknownAuthority) { constexpr char kAuthority[] = "xds.example.com"; const std::string kXdstpResourceName = absl::StrCat( "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2"); // Note: Not adding authority to bootstrap config. InitXdsClient(); // Start a watch for the xdstp resource name. auto watcher = StartFooWatch(kXdstpResourceName); // Watcher should immediately get an error about the unknown authority. auto error = watcher->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); EXPECT_EQ(error->message(), "authority \"xds.example.com\" not present in bootstrap config") << *error; } TEST_F(XdsClientTest, FederationWithUnparseableXdstpResourceName) { // Note: Not adding authority to bootstrap config. InitXdsClient(); // Start a watch for the xdstp resource name. auto watcher = StartFooWatch("xdstp://x"); // Watcher should immediately get an error about the unknown authority. auto error = watcher->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); EXPECT_EQ(error->message(), "Unable to parse resource name xdstp://x") << *error; } // TODO(roth,apolcyn): remove this test when the // GRPC_EXPERIMENTAL_XDS_FEDERATION env var is removed. TEST_F(XdsClientTest, FederationDisabledWithNewStyleName) { testing::ScopedEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION", "false"); // We will use this xdstp name, whose authority is not present in // the bootstrap config. But since federation is not enabled, we // will treat this as an opaque old-style name, so we'll send it to // the default server. constexpr char kXdstpResourceName[] = "xdstp://xds.example.com/test.v3.foo/foo1"; InitXdsClient(); // Start a watch for the xdstp name. auto watcher = StartFooWatch(kXdstpResourceName); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{kXdstpResourceName}); CheckRequestNode(*request); // Should be present on the first request. // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource(kXdstpResourceName, 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, kXdstpResourceName); EXPECT_EQ(resource->value, 6); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{kXdstpResourceName}); // Cancel watch. CancelFooWatch(watcher.get(), kXdstpResourceName); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) { constexpr char kAuthority[] = "xds.example.com"; const std::string kXdstpResourceName = absl::StrCat( "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2"); FakeXdsBootstrap::FakeXdsServer authority_server("other_xds_server"); FakeXdsBootstrap::FakeAuthority authority; authority.set_server(authority_server); InitXdsClient( FakeXdsBootstrap::Builder().AddAuthority(kAuthority, authority)); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher->HasEvent()); // XdsClient should have created an ADS stream to the top-level xDS server. auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front()); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); CheckRequestNode(*request); // Should be present on the first request. // Send a response. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Start a watch for the xdstp resource name. auto watcher2 = StartFooWatch(kXdstpResourceName); // Check metric data. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre( ::testing::Pair(kDefaultXdsServerUrl, true), ::testing::Pair(authority_server.server_uri(), true))); // Watcher should initially not see any resource reported. EXPECT_FALSE(watcher2->HasEvent()); // XdsClient will create a new stream to the server for this authority. auto stream2 = WaitForAdsStream(authority_server); ASSERT_TRUE(stream2 != nullptr); // XdsClient should have sent a subscription request on the ADS stream. // Note that version and nonce here do NOT use the values for Foo, // since each authority has its own state. request = WaitForRequest(stream2.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{kXdstpResourceName}); CheckRequestNode(*request); // Should be present on the first request. // Send a response. stream2->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("2") .set_nonce("B") .AddFooResource(XdsFooResource(kXdstpResourceName, 3)) .Serialize()); // XdsClient should have delivered the response to the watcher. resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, kXdstpResourceName); EXPECT_EQ(resource->value, 3); // Check metric data. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre( ::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1), ::testing::Pair( ::testing::Pair(authority_server.server_uri(), XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_invalid(), ::testing::ElementsAre()); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre( ::testing::Pair(ResourceCountLabelsEq( XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1), ::testing::Pair( ResourceCountLabelsEq( kAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre( ::testing::Pair(kDefaultXdsServerUrl, true), ::testing::Pair(authority_server.server_uri(), true))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream2.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"2", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{kXdstpResourceName}); // Now cause a channel failure on the stream to the authority's xDS server. TriggerConnectionFailure(authority_server, absl::UnavailableError("connection failed")); // The watcher for the xdstp resource name should see the error. auto error = watcher2->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); EXPECT_EQ(error->message(), "xDS channel for server other_xds_server: connection failed " "(node ID:xds_client_test)") << *error; // The watcher for "foo1" should not see any error. EXPECT_FALSE(watcher->HasEvent()); // Check metric data. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre( ::testing::Pair(kDefaultXdsServerUrl, true), ::testing::Pair(authority_server.server_uri(), false))); // Cancel watch for "foo1". CancelFooWatch(watcher.get(), "foo1"); EXPECT_TRUE(stream->Orphaned()); // Now cancel watch for xdstp resource name. CancelFooWatch(watcher2.get(), kXdstpResourceName); EXPECT_TRUE(stream2->Orphaned()); } TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) { const absl::Duration timeout = absl::Seconds(5) * grpc_test_slowdown_factor(); InitXdsClient(); // Start watches for "foo1" and "foo2". auto watcher1 = StartFooWatch("foo1"); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); auto watcher2 = StartFooWatch("foo2"); request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); // Send a response with 2 resources. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .AddFooResource(XdsFooResource("foo2", 10)) .Serialize()); // Send a response with a single resource, will not be read until the handle // is released stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("2") .set_nonce("B") .AddFooResource(XdsFooResource("foo1", 8)) .Serialize()); // XdsClient should have delivered the response to the watcher. auto resource1 = watcher1->WaitForNextResourceAndHandle(); ASSERT_NE(resource1, absl::nullopt); EXPECT_EQ(resource1->resource->name, "foo1"); EXPECT_EQ(resource1->resource->value, 6); auto resource2 = watcher2->WaitForNextResourceAndHandle(); ASSERT_NE(resource2, absl::nullopt); EXPECT_EQ(resource2->resource->name, "foo2"); EXPECT_EQ(resource2->resource->value, 10); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); EXPECT_TRUE(stream->WaitForReadsStarted(1, timeout)); resource1->read_delay_handle.reset(); EXPECT_TRUE(stream->WaitForReadsStarted(1, timeout)); resource2->read_delay_handle.reset(); EXPECT_TRUE(stream->WaitForReadsStarted(2, timeout)); resource1 = watcher1->WaitForNextResourceAndHandle(); ASSERT_NE(resource1, absl::nullopt); EXPECT_EQ(resource1->resource->name, "foo1"); EXPECT_EQ(resource1->resource->value, 8); EXPECT_EQ(watcher2->WaitForNextResourceAndHandle(), absl::nullopt); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"2", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); EXPECT_TRUE(stream->WaitForReadsStarted(2, timeout)); resource1->read_delay_handle.reset(); EXPECT_TRUE(stream->WaitForReadsStarted(3, timeout)); // Cancel watch. CancelFooWatch(watcher1.get(), "foo1"); request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"2", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"}); CancelFooWatch(watcher2.get(), "foo2"); EXPECT_TRUE(stream->Orphaned()); } TEST_F(XdsClientTest, FallbackAndRecover) { FakeXdsBootstrap::FakeXdsServer primary_server(kDefaultXdsServerUrl); FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server"); // Regular operation InitXdsClient(FakeXdsBootstrap::Builder().SetServers( {primary_server, fallback_server})); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::IsEmpty()); // XdsClient should have created an ADS stream. auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Input: Get initial response from primary server. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("20") .set_nonce("O") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); // Result (local): Resource is delivered to watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Result (local): Metrics show 1 resource update and 1 cached resource. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre(::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1))); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 1))); // Result (remote): Client sends ACK to server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"20", /*response_nonce=*/"O", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Input: Trigger connection failure to primary. TriggerConnectionFailure(primary_server, absl::UnavailableError("Server down")); // Result (local): The error is reported to the watcher. auto error = watcher->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable); EXPECT_EQ(error->message(), "xDS channel for server default_xds_server: Server down (node " "ID:xds_client_test)"); // Result (local): The metrics show the channel as being unhealthy. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, false))); // Input: Trigger stream failure. stream->MaybeSendStatusToClient(absl::UnavailableError("Stream failure")); // Result (local): The metrics still show the channel as being unhealthy. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, false))); // Result (remote): The client starts a new stream and sends a subscription // message. Note that the server does not respond, so the channel will still // have non-OK status. stream = WaitForAdsStream(); ASSERT_NE(stream, nullptr); request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"20", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Input: Start second watch for foo1 (already cached). auto watcher_cached = StartFooWatch("foo1"); // Result (local): New watcher gets the cached resource. resource = watcher_cached->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Result (local): New watcher gets the error from the channel state. error = watcher_cached->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->message(), "xDS channel for server default_xds_server: Server down (node " "ID:xds_client_test)") << error->message(); CancelFooWatch(watcher_cached.get(), "foo1"); // Input: Start watch for foo2 (not already cached). auto watcher2 = StartFooWatch("foo2"); // Result (local): Metrics show a healthy channel to the fallback server. EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre( ::testing::Pair(kDefaultXdsServerUrl, false), ::testing::Pair(fallback_server.server_uri(), true))); // Result (remote): Client sent a new request for both resources on the // stream to the primary. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"20", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); // Result (remote): Client created a stream to the fallback server and sent a // request on that stream for both resources. auto stream2 = WaitForAdsStream(fallback_server); ASSERT_TRUE(stream2 != nullptr); request = WaitForRequest(stream2.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); // Input: Fallback server sends a response with both resources. stream2->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("5") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 20)) .AddFooResource(XdsFooResource("foo2", 30)) .Serialize()); // Result (local): Resources are delivered to watchers. resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 20); resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo2"); EXPECT_EQ(resource->value, 30); // Result (local): Metrics show an update from fallback server. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre( ::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 1), ::testing::Pair( ::testing::Pair(fallback_server.server_uri(), XdsFooResourceType::Get()->type_url()), 2))); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre( ::testing::Pair(kDefaultXdsServerUrl, false), ::testing::Pair(fallback_server.server_uri(), true))); EXPECT_THAT( GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "acked"), 2))); // Result (remote): Client sends ACK to fallback server. request = WaitForRequest(stream2.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"5", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); // Input: Primary server sends a response containing both resources. stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("15") .set_nonce("B") .AddFooResource(XdsFooResource("foo1", 35)) .AddFooResource(XdsFooResource("foo2", 25)) .Serialize()); // Result (local): Metrics show that we've closed the channel to the fallback // server and received resource updates from the primary server. EXPECT_THAT(metrics_reporter_->resource_updates_valid(), ::testing::ElementsAre( ::testing::Pair( ::testing::Pair(kDefaultXdsServerUrl, XdsFooResourceType::Get()->type_url()), 3), ::testing::Pair( ::testing::Pair(fallback_server.server_uri(), XdsFooResourceType::Get()->type_url()), 2))); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); // Result (remote): The stream to the fallback server has been orphaned. EXPECT_TRUE(stream2->Orphaned()); // Result (local): Resources are delivered to watchers. resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 35); resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo2"); EXPECT_EQ(resource->value, 25); // Result (remote): Client sends ACK to server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"15", /*response_nonce=*/"B", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1", "foo2"}); // Clean up. CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true); CancelFooWatch(watcher2.get(), "foo2"); // Result (remote): The stream to the primary server has been orphaned. EXPECT_TRUE(stream->Orphaned()); } // Test for both servers being unavailable TEST_F(XdsClientTest, FallbackReportsError) { FakeXdsBootstrap::FakeXdsServer primary_server(kDefaultXdsServerUrl); FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server"); InitXdsClient(FakeXdsBootstrap::Builder().SetServers( {primary_server, fallback_server})); auto watcher = StartFooWatch("foo1"); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); auto stream = WaitForAdsStream(); ASSERT_TRUE(stream != nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre(::testing::Pair( ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, XdsFooResourceType::Get()->type_url(), "requested"), 1))); TriggerConnectionFailure(primary_server, absl::UnavailableError("Server down")); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre( ::testing::Pair(kDefaultXdsServerUrl, false), ::testing::Pair(fallback_server.server_uri(), true))); // Fallback happens now stream = WaitForAdsStream(fallback_server); ASSERT_NE(stream, nullptr); request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); TriggerConnectionFailure(fallback_server, absl::UnavailableError("Another server down")); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre( ::testing::Pair(kDefaultXdsServerUrl, false), ::testing::Pair(fallback_server.server_uri(), false))); auto error = watcher->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_THAT(error->code(), absl::StatusCode::kUnavailable); EXPECT_EQ(error->message(), "xDS channel for server fallback_xds_server: Another server down " "(node ID:xds_client_test)") << error->message(); } TEST_F(XdsClientTest, FallbackOnStartup) { FakeXdsBootstrap::FakeXdsServer primary_server; FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server"); // Regular operation InitXdsClient(FakeXdsBootstrap::Builder().SetServers( {primary_server, fallback_server})); // Start a watch for "foo1". auto watcher = StartFooWatch("foo1"); auto primary_stream = WaitForAdsStream(primary_server); ASSERT_NE(primary_stream, nullptr); // XdsClient should have sent a subscription request on the ADS stream. auto request = WaitForRequest(primary_stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); TriggerConnectionFailure(primary_server, absl::UnavailableError("Primary server is down")); // XdsClient should have created an ADS stream. auto fallback_stream = WaitForAdsStream(fallback_server); ASSERT_NE(fallback_stream, nullptr); // XdsClient should have sent a subscription request on the ADS stream. request = WaitForRequest(fallback_stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"", /*response_nonce=*/"", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Send a response. fallback_stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("1") .set_nonce("A") .AddFooResource(XdsFooResource("foo1", 6)) .Serialize()); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre( ::testing::Pair(kDefaultXdsServerUrl, false), ::testing::Pair(fallback_server.server_uri(), true))); // XdsClient should have delivered the response to the watcher. auto resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); // Client sends an ACK. request = WaitForRequest(fallback_stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"1", /*response_nonce=*/"A", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); // Recover to primary primary_stream->SendMessageToClient( ResponseBuilder(XdsFooResourceType::Get()->type_url()) .set_version_info("5") .set_nonce("D") .AddFooResource(XdsFooResource("foo1", 42)) .Serialize()); EXPECT_TRUE(fallback_stream->Orphaned()); resource = watcher->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 42); EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair( kDefaultXdsServerUrl, true))); request = WaitForRequest(primary_stream.get()); ASSERT_TRUE(request.has_value()); CheckRequest(*request, XdsFooResourceType::Get()->type_url(), /*version_info=*/"5", /*response_nonce=*/"D", /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo1"}); } } // namespace } // namespace testing } // namespace grpc_core int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); grpc::testing::TestEnvironment env(&argc, argv); grpc_init(); int ret = RUN_ALL_TESTS(); grpc_shutdown(); return ret; }