• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 // TODO(roth): Add the following tests:
18 // - tests for load-reporting APIs?  (or maybe move those out of XdsClient?)
19 
20 #include "src/core/xds/xds_client/xds_client.h"
21 
22 #include <google/protobuf/any.pb.h>
23 #include <google/protobuf/struct.pb.h>
24 #include <grpc/grpc.h>
25 #include <grpc/support/json.h>
26 #include <grpcpp/impl/codegen/config_protobuf.h>
27 #include <stdint.h>
28 
29 #include <algorithm>
30 #include <deque>
31 #include <fstream>
32 #include <map>
33 #include <memory>
34 #include <optional>
35 #include <string>
36 #include <vector>
37 
38 #include "absl/strings/str_cat.h"
39 #include "absl/time/time.h"
40 #include "absl/types/optional.h"
41 #include "absl/types/variant.h"
42 #include "envoy/config/core/v3/base.pb.h"
43 #include "envoy/service/discovery/v3/discovery.pb.h"
44 #include "envoy/service/status/v3/csds.pb.h"
45 #include "gmock/gmock.h"
46 #include "gtest/gtest.h"
47 #include "src/core/lib/iomgr/timer_manager.h"
48 #include "src/core/util/debug_location.h"
49 #include "src/core/util/json/json.h"
50 #include "src/core/util/json/json_args.h"
51 #include "src/core/util/json/json_object_loader.h"
52 #include "src/core/util/json/json_reader.h"
53 #include "src/core/util/json/json_writer.h"
54 #include "src/core/util/match.h"
55 #include "src/core/util/sync.h"
56 #include "src/core/xds/xds_client/xds_bootstrap.h"
57 #include "src/core/xds/xds_client/xds_resource_type_impl.h"
58 #include "test/core/event_engine/event_engine_test_utils.h"
59 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
60 #include "test/core/test_util/scoped_env_var.h"
61 #include "test/core/test_util/test_config.h"
62 #include "test/core/xds/xds_client_test_peer.h"
63 #include "test/core/xds/xds_transport_fake.h"
64 #include "upb/reflection/def.h"
65 
66 // IWYU pragma: no_include <google/protobuf/message.h>
67 // IWYU pragma: no_include <google/protobuf/stubs/status.h>
68 // IWYU pragma: no_include <google/protobuf/unknown_field_set.h>
69 // IWYU pragma: no_include <google/protobuf/util/json_util.h>
70 // IWYU pragma: no_include "google/protobuf/json/json.h"
71 // IWYU pragma: no_include "google/protobuf/util/json_util.h"
72 
73 using envoy::admin::v3::ClientResourceStatus;
74 using envoy::service::discovery::v3::DiscoveryRequest;
75 using envoy::service::discovery::v3::DiscoveryResponse;
76 using envoy::service::status::v3::ClientConfig;
77 using grpc_event_engine::experimental::FuzzingEventEngine;
78 
79 namespace grpc_core {
80 namespace testing {
81 namespace {
82 
83 constexpr absl::string_view kDefaultXdsServerUrl = "default_xds_server";
84 
85 constexpr Timestamp kTime0 =
86     Timestamp::FromMillisecondsAfterProcessEpoch(10000);
87 constexpr Timestamp kTime1 =
88     Timestamp::FromMillisecondsAfterProcessEpoch(15000);
89 constexpr Timestamp kTime2 =
90     Timestamp::FromMillisecondsAfterProcessEpoch(20000);
91 
92 class XdsClientTest : public ::testing::Test {
93  protected:
94   // A fake bootstrap implementation that allows tests to populate the
95   // fields however they want.
96   class FakeXdsBootstrap : public XdsBootstrap {
97    public:
98     class FakeNode : public Node {
99      public:
FakeNode()100       FakeNode() : id_("xds_client_test") {}
id() const101       const std::string& id() const override { return id_; }
cluster() const102       const std::string& cluster() const override { return cluster_; }
locality_region() const103       const std::string& locality_region() const override {
104         return locality_region_;
105       }
locality_zone() const106       const std::string& locality_zone() const override {
107         return locality_zone_;
108       }
locality_sub_zone() const109       const std::string& locality_sub_zone() const override {
110         return locality_sub_zone_;
111       }
metadata() const112       const Json::Object& metadata() const override { return metadata_; }
113 
set_id(std::string id)114       void set_id(std::string id) { id_ = std::move(id); }
set_cluster(std::string cluster)115       void set_cluster(std::string cluster) { cluster_ = std::move(cluster); }
set_locality_region(std::string locality_region)116       void set_locality_region(std::string locality_region) {
117         locality_region_ = std::move(locality_region);
118       }
set_locality_zone(std::string locality_zone)119       void set_locality_zone(std::string locality_zone) {
120         locality_zone_ = std::move(locality_zone);
121       }
set_locality_sub_zone(std::string locality_sub_zone)122       void set_locality_sub_zone(std::string locality_sub_zone) {
123         locality_sub_zone_ = std::move(locality_sub_zone);
124       }
set_metadata(Json::Object metadata)125       void set_metadata(Json::Object metadata) {
126         metadata_ = std::move(metadata);
127       }
128 
129      private:
130       std::string id_;
131       std::string cluster_;
132       std::string locality_region_;
133       std::string locality_zone_;
134       std::string locality_sub_zone_;
135       Json::Object metadata_;
136     };
137 
138     class FakeXdsServer : public XdsServer {
139      public:
FakeXdsServer(absl::string_view server_uri=kDefaultXdsServerUrl,bool ignore_resource_deletion=false)140       explicit FakeXdsServer(
141           absl::string_view server_uri = kDefaultXdsServerUrl,
142           bool ignore_resource_deletion = false)
143           : server_uri_(server_uri),
144             ignore_resource_deletion_(ignore_resource_deletion) {}
server_uri() const145       const std::string& server_uri() const override { return server_uri_; }
IgnoreResourceDeletion() const146       bool IgnoreResourceDeletion() const override {
147         return ignore_resource_deletion_;
148       }
Equals(const XdsServer & other) const149       bool Equals(const XdsServer& other) const override {
150         const auto& o = static_cast<const FakeXdsServer&>(other);
151         return server_uri_ == o.server_uri_ &&
152                ignore_resource_deletion_ == o.ignore_resource_deletion_;
153       }
Key() const154       std::string Key() const override {
155         return absl::StrCat(server_uri_, "#", ignore_resource_deletion_);
156       }
157 
158      private:
159       std::string server_uri_;
160       bool ignore_resource_deletion_ = false;
161     };
162 
163     class FakeAuthority : public Authority {
164      public:
servers() const165       std::vector<const XdsServer*> servers() const override {
166         if (server_.has_value()) {
167           return {&*server_};
168         } else {
169           return {};
170         };
171       }
172 
set_server(absl::optional<FakeXdsServer> server)173       void set_server(absl::optional<FakeXdsServer> server) {
174         server_ = std::move(server);
175       }
176 
177      private:
178       absl::optional<FakeXdsServer> server_;
179     };
180 
181     class Builder {
182      public:
Builder()183       Builder() { node_.emplace(); }
set_node_id(std::string id)184       Builder& set_node_id(std::string id) {
185         if (!node_.has_value()) node_.emplace();
186         node_->set_id(std::move(id));
187         return *this;
188       }
AddAuthority(std::string name,FakeAuthority authority)189       Builder& AddAuthority(std::string name, FakeAuthority authority) {
190         authorities_[std::move(name)] = std::move(authority);
191         return *this;
192       }
SetServers(absl::Span<const FakeXdsServer> servers)193       Builder& SetServers(absl::Span<const FakeXdsServer> servers) {
194         servers_.assign(servers.begin(), servers.end());
195         return *this;
196       }
Build()197       std::unique_ptr<XdsBootstrap> Build() {
198         auto bootstrap = std::make_unique<FakeXdsBootstrap>();
199         bootstrap->servers_ = std::move(servers_);
200         bootstrap->node_ = std::move(node_);
201         bootstrap->authorities_ = std::move(authorities_);
202         return bootstrap;
203       }
204 
205      private:
206       std::vector<FakeXdsServer> servers_ = {FakeXdsServer()};
207       absl::optional<FakeNode> node_;
208       std::map<std::string, FakeAuthority> authorities_;
209     };
210 
ToString() const211     std::string ToString() const override { return "<fake>"; }
212 
servers() const213     std::vector<const XdsServer*> servers() const override {
214       std::vector<const XdsServer*> result;
215       result.reserve(servers_.size());
216       for (size_t i = 0; i < servers_.size(); ++i) {
217         result.emplace_back(&servers_[i]);
218       }
219       return result;
220     }
221 
node() const222     const Node* node() const override {
223       return node_.has_value() ? &*node_ : nullptr;
224     }
LookupAuthority(const std::string & name) const225     const Authority* LookupAuthority(const std::string& name) const override {
226       auto it = authorities_.find(name);
227       if (it == authorities_.end()) return nullptr;
228       return &it->second;
229     }
230 
231    private:
232     std::vector<FakeXdsServer> servers_;
233     absl::optional<FakeNode> node_;
234     std::map<std::string, FakeAuthority> authorities_;
235   };
236 
237   // A template for a test xDS resource type with an associated watcher impl.
238   // For simplicity, we use JSON instead of proto for serialization.
239   //
240   // The specified ResourceStruct must provide the following:
241   // - a static JsonLoader() method, as described in json_object_loader.h
242   // - an AsJsonString() method that returns the object in JSON string form
243   // - a static TypeUrl() method that returns the resource type
244   //
245   // The all_resources_required_in_sotw parameter indicates the value
246   // that should be returned by the AllResourcesRequiredInSotW() method.
247   template <typename ResourceStruct, bool all_resources_required_in_sotw>
248   class XdsTestResourceType
249       : public XdsResourceTypeImpl<
250             XdsTestResourceType<ResourceStruct, all_resources_required_in_sotw>,
251             ResourceStruct> {
252    public:
253     // A watcher implementation that queues delivered watches.
254     class Watcher : public XdsResourceTypeImpl<
255                         XdsTestResourceType<ResourceStruct,
256                                             all_resources_required_in_sotw>,
257                         ResourceStruct>::WatcherInterface {
258      public:
Watcher(std::shared_ptr<FuzzingEventEngine> event_engine)259       explicit Watcher(std::shared_ptr<FuzzingEventEngine> event_engine)
260           : event_engine_(std::move(event_engine)) {}
261 
~Watcher()262       ~Watcher() override {
263         MutexLock lock(&mu_);
264         EXPECT_THAT(queue_, ::testing::IsEmpty())
265             << this << " " << queue_[0].ToString();
266       }
267 
HasEvent()268       bool HasEvent() {
269         MutexLock lock(&mu_);
270         return !queue_.empty();
271       }
272 
273       // Returns true if no event is received after draining the fuzzing
274       // EE queue.
ExpectNoEvent()275       bool ExpectNoEvent() {
276         event_engine_->TickUntilIdle();
277         return !HasEvent();
278       }
279 
280       struct ResourceAndReadDelayHandle {
281         std::shared_ptr<const ResourceStruct> resource;
282         RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle;
283       };
WaitForNextResourceAndHandle(SourceLocation location=SourceLocation ())284       absl::optional<ResourceAndReadDelayHandle> WaitForNextResourceAndHandle(
285           SourceLocation location = SourceLocation()) {
286         auto event = WaitForNextOnResourceChangedEvent(location);
287         if (!event.has_value()) return absl::nullopt;
288         EXPECT_TRUE(event->resource.ok())
289             << "got unexpected error: " << event->resource.status() << " at "
290             << location.file() << ":" << location.line();
291         if (!event->resource.ok()) return absl::nullopt;
292         return ResourceAndReadDelayHandle{std::move(*event->resource),
293                                           std::move(event->read_delay_handle)};
294       }
295 
WaitForNextResource(SourceLocation location=SourceLocation ())296       std::shared_ptr<const ResourceStruct> WaitForNextResource(
297           SourceLocation location = SourceLocation()) {
298         auto resource_and_handle = WaitForNextResourceAndHandle(location);
299         if (!resource_and_handle.has_value()) return nullptr;
300         return std::move(resource_and_handle->resource);
301       }
302 
WaitForNextError(SourceLocation location=SourceLocation ())303       absl::optional<absl::Status> WaitForNextError(
304           SourceLocation location = SourceLocation()) {
305         auto event = WaitForNextOnResourceChangedEvent(location);
306         if (!event.has_value()) return absl::nullopt;
307         EXPECT_FALSE(event->resource.ok())
308             << "got unexpected resource: " << (*event->resource)->name << " at "
309             << location.file() << ":" << location.line();
310         if (event->resource.ok()) return absl::nullopt;
311         return event->resource.status();
312       }
313 
WaitForDoesNotExist(SourceLocation location=SourceLocation ())314       bool WaitForDoesNotExist(SourceLocation location = SourceLocation()) {
315         auto status = WaitForNextError(location);
316         if (!status.has_value()) return false;
317         EXPECT_EQ(status->code(), absl::StatusCode::kNotFound)
318             << "unexpected status: " << *status << " at " << location.file()
319             << ":" << location.line();
320         return status->code() == absl::StatusCode::kNotFound;
321       }
322 
WaitForNextAmbientError(SourceLocation location=SourceLocation ())323       absl::optional<absl::Status> WaitForNextAmbientError(
324           SourceLocation location = SourceLocation()) {
325         auto event = WaitForNextEvent();
326         if (!event.has_value()) return absl::nullopt;
327         return Match(
328             event->payload,
329             [&](const absl::StatusOr<std::shared_ptr<const ResourceStruct>>&
330                     resource) -> absl::optional<absl::Status> {
331               EXPECT_TRUE(false)
332                   << "got unexpected resource: " << event->ToString() << " at "
333                   << location.file() << ":" << location.line();
334               return absl::nullopt;
335             },
336             [&](const absl::Status& status) -> absl::optional<absl::Status> {
337               return status;
338             });
339       }
340 
341      private:
342       // An event delivered to the watcher.
343       struct Event {
344         absl::variant<
345             // OnResourceChanged()
346             absl::StatusOr<std::shared_ptr<const ResourceStruct>>,
347             // OnAmbientError()
348             absl::Status>
349             payload;
350         RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle;
351 
ToStringgrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsTestResourceType::Watcher::Event352         std::string ToString() const {
353           return Match(
354               payload,
355               [&](const absl::StatusOr<std::shared_ptr<const ResourceStruct>>&
356                       resource) {
357                 return absl::StrCat(
358                     "{resource=",
359                     resource.ok() ? (*resource)->name
360                                   : resource.status().ToString(),
361                     ", read_delay_handle=", (read_delay_handle == nullptr),
362                     "}");
363               },
364               [&](const absl::Status& status) {
365                 return absl::StrCat("{ambient_error=", status.ToString(),
366                                     ", read_delay_handle=",
367                                     (read_delay_handle == nullptr), "}");
368               });
369         }
370       };
371 
WaitForNextEvent()372       absl::optional<Event> WaitForNextEvent() {
373         while (true) {
374           {
375             MutexLock lock(&mu_);
376             if (!queue_.empty()) {
377               Event event = std::move(queue_.front());
378               queue_.pop_front();
379               return event;
380             }
381             if (event_engine_->IsIdle()) return absl::nullopt;
382           }
383           event_engine_->Tick();
384         }
385       }
386 
387       struct OnResourceChangedEvent {
388         absl::StatusOr<std::shared_ptr<const ResourceStruct>> resource;
389         RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle;
390       };
WaitForNextOnResourceChangedEvent(SourceLocation location=SourceLocation ())391       absl::optional<OnResourceChangedEvent> WaitForNextOnResourceChangedEvent(
392           SourceLocation location = SourceLocation()) {
393         auto event = WaitForNextEvent();
394         if (!event.has_value()) return absl::nullopt;
395         return MatchMutable(
396             &event->payload,
397             [&](absl::StatusOr<std::shared_ptr<const ResourceStruct>>* resource)
398                 -> absl::optional<OnResourceChangedEvent> {
399               return OnResourceChangedEvent{
400                   std::move(*resource), std::move(event->read_delay_handle)};
401             },
402             [&](absl::Status* status)
403                 -> absl::optional<OnResourceChangedEvent> {
404               EXPECT_TRUE(false)
405                   << "got unexpected ambient error: " << status->ToString()
406                   << " at " << location.file() << ":" << location.line();
407               return absl::nullopt;
408             });
409       }
410 
OnResourceChanged(absl::StatusOr<std::shared_ptr<const ResourceStruct>> resource,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)411       void OnResourceChanged(
412           absl::StatusOr<std::shared_ptr<const ResourceStruct>> resource,
413           RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)
414           override {
415         MutexLock lock(&mu_);
416         queue_.emplace_back(
417             Event{std::move(resource), std::move(read_delay_handle)});
418       }
419 
OnAmbientError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)420       void OnAmbientError(absl::Status status,
421                           RefCountedPtr<XdsClient::ReadDelayHandle>
422                               read_delay_handle) override {
423         MutexLock lock(&mu_);
424         queue_.push_back(
425             Event{std::move(status), std::move(read_delay_handle)});
426       }
427 
428       std::shared_ptr<FuzzingEventEngine> event_engine_;
429 
430       Mutex mu_;
431       std::deque<Event> queue_ ABSL_GUARDED_BY(&mu_);
432     };
433 
type_url() const434     absl::string_view type_url() const override {
435       return ResourceStruct::TypeUrl();
436     }
Decode(const XdsResourceType::DecodeContext &,absl::string_view serialized_resource) const437     XdsResourceType::DecodeResult Decode(
438         const XdsResourceType::DecodeContext& /*context*/,
439         absl::string_view serialized_resource) const override {
440       auto json = JsonParse(serialized_resource);
441       XdsResourceType::DecodeResult result;
442       if (!json.ok()) {
443         result.resource = json.status();
444       } else {
445         absl::StatusOr<ResourceStruct> foo =
446             LoadFromJson<ResourceStruct>(*json);
447         if (!foo.ok()) {
448           auto it = json->object().find("name");
449           if (it != json->object().end()) {
450             result.name = it->second.string();
451           }
452           result.resource = foo.status();
453         } else {
454           result.name = foo->name;
455           result.resource = std::make_unique<ResourceStruct>(std::move(*foo));
456         }
457       }
458       return result;
459     }
AllResourcesRequiredInSotW() const460     bool AllResourcesRequiredInSotW() const override {
461       return all_resources_required_in_sotw;
462     }
InitUpbSymtab(XdsClient *,upb_DefPool *) const463     void InitUpbSymtab(XdsClient*, upb_DefPool* /*symtab*/) const override {}
464 
EncodeAsAny(const ResourceStruct & resource)465     static google::protobuf::Any EncodeAsAny(const ResourceStruct& resource) {
466       google::protobuf::Any any;
467       any.set_type_url(
468           absl::StrCat("type.googleapis.com/", ResourceStruct::TypeUrl()));
469       any.set_value(resource.AsJsonString());
470       return any;
471     }
472   };
473 
474   // A fake "Foo" xDS resource type.
475   struct XdsFooResource : public XdsResourceType::ResourceData {
476     std::string name;
477     uint32_t value;
478 
479     XdsFooResource() = default;
XdsFooResourcegrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsFooResource480     XdsFooResource(std::string name, uint32_t value)
481         : name(std::move(name)), value(value) {}
482 
operator ==grpc_core::testing::__anonc009e8040111::XdsClientTest::XdsFooResource483     bool operator==(const XdsFooResource& other) const {
484       return name == other.name && value == other.value;
485     }
486 
AsJsonStringgrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsFooResource487     std::string AsJsonString() const {
488       return absl::StrCat("{\"name\":\"", name, "\",\"value\":", value, "}");
489     }
490 
JsonLoadergrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsFooResource491     static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
492       static const auto* loader = JsonObjectLoader<XdsFooResource>()
493                                       .Field("name", &XdsFooResource::name)
494                                       .Field("value", &XdsFooResource::value)
495                                       .Finish();
496       return loader;
497     }
498 
TypeUrlgrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsFooResource499     static absl::string_view TypeUrl() { return "test.v3.foo"; }
500   };
501   using XdsFooResourceType = XdsTestResourceType<XdsFooResource, false>;
502 
503   // A fake "Bar" xDS resource type.
504   struct XdsBarResource : public XdsResourceType::ResourceData {
505     std::string name;
506     std::string value;
507 
508     XdsBarResource() = default;
XdsBarResourcegrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsBarResource509     XdsBarResource(std::string name, std::string value)
510         : name(std::move(name)), value(std::move(value)) {}
511 
operator ==grpc_core::testing::__anonc009e8040111::XdsClientTest::XdsBarResource512     bool operator==(const XdsBarResource& other) const {
513       return name == other.name && value == other.value;
514     }
515 
AsJsonStringgrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsBarResource516     std::string AsJsonString() const {
517       return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value,
518                           "\"}");
519     }
520 
JsonLoadergrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsBarResource521     static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
522       static const auto* loader = JsonObjectLoader<XdsBarResource>()
523                                       .Field("name", &XdsBarResource::name)
524                                       .Field("value", &XdsBarResource::value)
525                                       .Finish();
526       return loader;
527     }
528 
TypeUrlgrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsBarResource529     static absl::string_view TypeUrl() { return "test.v3.bar"; }
530   };
531   using XdsBarResourceType = XdsTestResourceType<XdsBarResource, false>;
532 
533   // A fake "WildcardCapable" xDS resource type.
534   // This resource type return true for AllResourcesRequiredInSotW(),
535   // just like LDS and CDS.
536   struct XdsWildcardCapableResource : public XdsResourceType::ResourceData {
537     std::string name;
538     uint32_t value;
539 
540     XdsWildcardCapableResource() = default;
XdsWildcardCapableResourcegrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsWildcardCapableResource541     XdsWildcardCapableResource(std::string name, uint32_t value)
542         : name(std::move(name)), value(value) {}
543 
operator ==grpc_core::testing::__anonc009e8040111::XdsClientTest::XdsWildcardCapableResource544     bool operator==(const XdsWildcardCapableResource& other) const {
545       return name == other.name && value == other.value;
546     }
547 
AsJsonStringgrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsWildcardCapableResource548     std::string AsJsonString() const {
549       return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value,
550                           "\"}");
551     }
552 
JsonLoadergrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsWildcardCapableResource553     static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
554       static const auto* loader =
555           JsonObjectLoader<XdsWildcardCapableResource>()
556               .Field("name", &XdsWildcardCapableResource::name)
557               .Field("value", &XdsWildcardCapableResource::value)
558               .Finish();
559       return loader;
560     }
561 
TypeUrlgrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsWildcardCapableResource562     static absl::string_view TypeUrl() { return "test.v3.wildcard_capable"; }
563   };
564   using XdsWildcardCapableResourceType =
565       XdsTestResourceType<XdsWildcardCapableResource,
566                           /*all_resources_required_in_sotw=*/true>;
567 
568   // A helper class to build and serialize a DiscoveryResponse.
569   class ResponseBuilder {
570    public:
ResponseBuilder(absl::string_view type_url)571     explicit ResponseBuilder(absl::string_view type_url) {
572       response_.set_type_url(absl::StrCat("type.googleapis.com/", type_url));
573     }
574 
set_version_info(absl::string_view version_info)575     ResponseBuilder& set_version_info(absl::string_view version_info) {
576       response_.set_version_info(std::string(version_info));
577       return *this;
578     }
set_nonce(absl::string_view nonce)579     ResponseBuilder& set_nonce(absl::string_view nonce) {
580       response_.set_nonce(std::string(nonce));
581       return *this;
582     }
583 
584     template <typename ResourceType>
AddResource(const typename ResourceType::ResourceType & resource,bool in_resource_wrapper=false)585     ResponseBuilder& AddResource(
586         const typename ResourceType::ResourceType& resource,
587         bool in_resource_wrapper = false) {
588       auto* res = response_.add_resources();
589       *res = ResourceType::EncodeAsAny(resource);
590       if (in_resource_wrapper) {
591         envoy::service::discovery::v3::Resource resource_wrapper;
592         resource_wrapper.set_name(resource.name);
593         *resource_wrapper.mutable_resource() = std::move(*res);
594         res->PackFrom(resource_wrapper);
595       }
596       return *this;
597     }
598 
AddFooResource(const XdsFooResource & resource,bool in_resource_wrapper=false)599     ResponseBuilder& AddFooResource(const XdsFooResource& resource,
600                                     bool in_resource_wrapper = false) {
601       return AddResource<XdsFooResourceType>(resource, in_resource_wrapper);
602     }
603 
AddBarResource(const XdsBarResource & resource,bool in_resource_wrapper=false)604     ResponseBuilder& AddBarResource(const XdsBarResource& resource,
605                                     bool in_resource_wrapper = false) {
606       return AddResource<XdsBarResourceType>(resource, in_resource_wrapper);
607     }
608 
AddWildcardCapableResource(const XdsWildcardCapableResource & resource,bool in_resource_wrapper=false)609     ResponseBuilder& AddWildcardCapableResource(
610         const XdsWildcardCapableResource& resource,
611         bool in_resource_wrapper = false) {
612       return AddResource<XdsWildcardCapableResourceType>(resource,
613                                                          in_resource_wrapper);
614     }
615 
AddInvalidResource(absl::string_view type_url,absl::string_view value,absl::string_view resource_wrapper_name="")616     ResponseBuilder& AddInvalidResource(
617         absl::string_view type_url, absl::string_view value,
618         absl::string_view resource_wrapper_name = "") {
619       auto* res = response_.add_resources();
620       res->set_type_url(absl::StrCat("type.googleapis.com/", type_url));
621       res->set_value(std::string(value));
622       if (!resource_wrapper_name.empty()) {
623         envoy::service::discovery::v3::Resource resource_wrapper;
624         resource_wrapper.set_name(std::string(resource_wrapper_name));
625         *resource_wrapper.mutable_resource() = std::move(*res);
626         res->PackFrom(resource_wrapper);
627       }
628       return *this;
629     }
630 
AddInvalidResourceWrapper()631     ResponseBuilder& AddInvalidResourceWrapper() {
632       auto* res = response_.add_resources();
633       res->set_type_url(
634           "type.googleapis.com/envoy.service.discovery.v3.Resource");
635       res->set_value(std::string("\0", 1));
636       return *this;
637     }
638 
AddEmptyResource()639     ResponseBuilder& AddEmptyResource() {
640       response_.add_resources();
641       return *this;
642     }
643 
Serialize()644     std::string Serialize() {
645       std::string serialized_response;
646       EXPECT_TRUE(response_.SerializeToString(&serialized_response));
647       return serialized_response;
648     }
649 
650    private:
651     DiscoveryResponse response_;
652   };
653 
654   class MetricsReporter : public XdsMetricsReporter {
655    public:
656     using ResourceUpdateMap = std::map<
657         std::pair<std::string /*xds_server*/, std::string /*resource_type*/>,
658         uint64_t>;
659     using ServerFailureMap = std::map<std::string /*xds_server*/, uint64_t>;
660 
MetricsReporter(std::shared_ptr<FuzzingEventEngine> event_engine)661     explicit MetricsReporter(std::shared_ptr<FuzzingEventEngine> event_engine)
662         : event_engine_(std::move(event_engine)) {}
663 
resource_updates_valid() const664     ResourceUpdateMap resource_updates_valid() const {
665       MutexLock lock(&mu_);
666       return resource_updates_valid_;
667     }
resource_updates_invalid() const668     ResourceUpdateMap resource_updates_invalid() const {
669       MutexLock lock(&mu_);
670       return resource_updates_invalid_;
671     }
server_failures() const672     const ServerFailureMap& server_failures() const { return server_failures_; }
673 
674     // Returns true if matchers return true before the timeout.
675     // Runs matchers once as soon as it is called and then again
676     // every time the metrics reporter sees an update.
WaitForMetricsReporterData(::testing::Matcher<ResourceUpdateMap> resource_updates_valid_matcher,::testing::Matcher<ResourceUpdateMap> resource_updates_invalid_matcher,::testing::Matcher<ServerFailureMap> server_failures_matcher,SourceLocation location=SourceLocation ())677     bool WaitForMetricsReporterData(
678         ::testing::Matcher<ResourceUpdateMap> resource_updates_valid_matcher,
679         ::testing::Matcher<ResourceUpdateMap> resource_updates_invalid_matcher,
680         ::testing::Matcher<ServerFailureMap> server_failures_matcher,
681         SourceLocation location = SourceLocation()) {
682       while (true) {
683         {
684           MutexLock lock(&mu_);
685           if (::testing::Matches(resource_updates_valid_matcher)(
686                   resource_updates_valid_) &&
687               ::testing::Matches(resource_updates_invalid_matcher)(
688                   resource_updates_invalid_) &&
689               ::testing::Matches(server_failures_matcher)(server_failures_)) {
690             return true;
691           }
692           if (event_engine_->IsIdle()) {
693             EXPECT_THAT(resource_updates_valid_, resource_updates_valid_matcher)
694                 << location.file() << ":" << location.line();
695             EXPECT_THAT(resource_updates_invalid_,
696                         resource_updates_invalid_matcher)
697                 << location.file() << ":" << location.line();
698             EXPECT_THAT(server_failures_, server_failures_matcher)
699                 << location.file() << ":" << location.line();
700             return false;
701           }
702         }
703         event_engine_->Tick();
704       }
705     }
706 
707    private:
ReportResourceUpdates(absl::string_view xds_server,absl::string_view resource_type,uint64_t num_resources_valid,uint64_t num_resources_invalid)708     void ReportResourceUpdates(absl::string_view xds_server,
709                                absl::string_view resource_type,
710                                uint64_t num_resources_valid,
711                                uint64_t num_resources_invalid) override {
712       MutexLock lock(&mu_);
713       auto key =
714           std::make_pair(std::string(xds_server), std::string(resource_type));
715       if (num_resources_valid > 0) {
716         resource_updates_valid_[key] += num_resources_valid;
717       }
718       if (num_resources_invalid > 0) {
719         resource_updates_invalid_[key] += num_resources_invalid;
720       }
721       cond_.SignalAll();
722     }
723 
ReportServerFailure(absl::string_view xds_server)724     void ReportServerFailure(absl::string_view xds_server) override {
725       MutexLock lock(&mu_);
726       ++server_failures_[std::string(xds_server)];
727       cond_.SignalAll();
728     }
729 
730     std::shared_ptr<FuzzingEventEngine> event_engine_;
731 
732     mutable Mutex mu_;
733     ResourceUpdateMap resource_updates_valid_ ABSL_GUARDED_BY(mu_);
734     ResourceUpdateMap resource_updates_invalid_ ABSL_GUARDED_BY(mu_);
735     ServerFailureMap server_failures_ ABSL_GUARDED_BY(mu_);
736     CondVar cond_;
737   };
738 
739   using ResourceCounts =
740       std::vector<std::pair<XdsClientTestPeer::ResourceCountLabels, uint64_t>>;
GetResourceCounts()741   ResourceCounts GetResourceCounts() {
742     ResourceCounts resource_counts;
743     XdsClientTestPeer(xds_client_.get())
744         .TestReportResourceCounts(
745             [&](const XdsClientTestPeer::ResourceCountLabels& labels,
746                 uint64_t count) {
747               resource_counts.emplace_back(labels, count);
748             });
749     return resource_counts;
750   }
751 
752   using ServerConnectionMap = std::map<std::string, bool>;
GetServerConnections()753   ServerConnectionMap GetServerConnections() {
754     ServerConnectionMap server_connection_map;
755     XdsClientTestPeer(xds_client_.get())
756         .TestReportServerConnections(
757             [&](absl::string_view xds_server, bool connected) {
758               std::string server(xds_server);
759               EXPECT_EQ(server_connection_map.find(server),
760                         server_connection_map.end());
761               server_connection_map[std::move(server)] = connected;
762             });
763     return server_connection_map;
764   }
765 
SetUp()766   void SetUp() override {
767     time_cache_.TestOnlySetNow(kTime0);
768     event_engine_ = std::make_shared<FuzzingEventEngine>(
769         FuzzingEventEngine::Options(), fuzzing_event_engine::Actions());
770     grpc_timer_manager_set_start_threaded(false);
771     grpc_init();
772   }
773 
TearDown()774   void TearDown() override {
775     transport_factory_.reset();
776     xds_client_.reset();
777     event_engine_->FuzzingDone();
778     event_engine_->TickUntilIdle();
779     event_engine_->UnsetGlobalHooks();
780     grpc_event_engine::experimental::WaitForSingleOwner(
781         std::move(event_engine_));
782     grpc_shutdown_blocking();
783   }
784 
785   // Sets transport_factory_ and initializes xds_client_ with the
786   // specified bootstrap config.
InitXdsClient(FakeXdsBootstrap::Builder bootstrap_builder=FakeXdsBootstrap::Builder ())787   void InitXdsClient(FakeXdsBootstrap::Builder bootstrap_builder =
788                          FakeXdsBootstrap::Builder()) {
789     transport_factory_ = MakeRefCounted<FakeXdsTransportFactory>(
790         []() { FAIL() << "Multiple concurrent reads"; }, event_engine_);
791     auto metrics_reporter = std::make_unique<MetricsReporter>(event_engine_);
792     metrics_reporter_ = metrics_reporter.get();
793     xds_client_ = MakeRefCounted<XdsClient>(
794         bootstrap_builder.Build(), transport_factory_, event_engine_,
795         std::move(metrics_reporter), "foo agent", "foo version");
796   }
797 
798   // Starts and cancels a watch for a Foo resource.
StartFooWatch(absl::string_view resource_name)799   RefCountedPtr<XdsFooResourceType::Watcher> StartFooWatch(
800       absl::string_view resource_name) {
801     auto watcher = MakeRefCounted<XdsFooResourceType::Watcher>(event_engine_);
802     XdsFooResourceType::StartWatch(xds_client_.get(), resource_name, watcher);
803     return watcher;
804   }
CancelFooWatch(XdsFooResourceType::Watcher * watcher,absl::string_view resource_name,bool delay_unsubscription=false)805   void CancelFooWatch(XdsFooResourceType::Watcher* watcher,
806                       absl::string_view resource_name,
807                       bool delay_unsubscription = false) {
808     XdsFooResourceType::CancelWatch(xds_client_.get(), resource_name, watcher,
809                                     delay_unsubscription);
810   }
811 
812   // Starts and cancels a watch for a Bar resource.
StartBarWatch(absl::string_view resource_name)813   RefCountedPtr<XdsBarResourceType::Watcher> StartBarWatch(
814       absl::string_view resource_name) {
815     auto watcher = MakeRefCounted<XdsBarResourceType::Watcher>(event_engine_);
816     XdsBarResourceType::StartWatch(xds_client_.get(), resource_name, watcher);
817     return watcher;
818   }
CancelBarWatch(XdsBarResourceType::Watcher * watcher,absl::string_view resource_name,bool delay_unsubscription=false)819   void CancelBarWatch(XdsBarResourceType::Watcher* watcher,
820                       absl::string_view resource_name,
821                       bool delay_unsubscription = false) {
822     XdsBarResourceType::CancelWatch(xds_client_.get(), resource_name, watcher,
823                                     delay_unsubscription);
824   }
825 
826   // Starts and cancels a watch for a WildcardCapable resource.
827   RefCountedPtr<XdsWildcardCapableResourceType::Watcher>
StartWildcardCapableWatch(absl::string_view resource_name)828   StartWildcardCapableWatch(absl::string_view resource_name) {
829     auto watcher =
830         MakeRefCounted<XdsWildcardCapableResourceType::Watcher>(event_engine_);
831     XdsWildcardCapableResourceType::StartWatch(xds_client_.get(), resource_name,
832                                                watcher);
833     return watcher;
834   }
CancelWildcardCapableWatch(XdsWildcardCapableResourceType::Watcher * watcher,absl::string_view resource_name,bool delay_unsubscription=false)835   void CancelWildcardCapableWatch(
836       XdsWildcardCapableResourceType::Watcher* watcher,
837       absl::string_view resource_name, bool delay_unsubscription = false) {
838     XdsWildcardCapableResourceType::CancelWatch(
839         xds_client_.get(), resource_name, watcher, delay_unsubscription);
840   }
841 
WaitForAdsStream(const XdsBootstrap::XdsServer & xds_server)842   RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream(
843       const XdsBootstrap::XdsServer& xds_server) {
844     return transport_factory_->WaitForStream(
845         xds_server, FakeXdsTransportFactory::kAdsMethod);
846   }
847 
WaitForAdsStream()848   RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream() {
849     return WaitForAdsStream(*xds_client_->bootstrap().servers().front());
850   }
851 
TriggerConnectionFailure(const XdsBootstrap::XdsServer & xds_server,absl::Status status)852   void TriggerConnectionFailure(const XdsBootstrap::XdsServer& xds_server,
853                                 absl::Status status) {
854     transport_factory_->TriggerConnectionFailure(xds_server, std::move(status));
855   }
856 
857   // Gets the latest request sent to the fake xDS server.
WaitForRequest(FakeXdsTransportFactory::FakeStreamingCall * stream,SourceLocation location=SourceLocation ())858   absl::optional<DiscoveryRequest> WaitForRequest(
859       FakeXdsTransportFactory::FakeStreamingCall* stream,
860       SourceLocation location = SourceLocation()) {
861     auto message = stream->WaitForMessageFromClient();
862     if (!message.has_value()) return absl::nullopt;
863     DiscoveryRequest request;
864     bool success = request.ParseFromString(*message);
865     EXPECT_TRUE(success) << "Failed to deserialize DiscoveryRequest at "
866                          << location.file() << ":" << location.line();
867     if (!success) return absl::nullopt;
868     return std::move(request);
869   }
870 
871   // Helper function to check the fields of a DiscoveryRequest.
CheckRequest(const DiscoveryRequest & request,absl::string_view type_url,absl::string_view version_info,absl::string_view response_nonce,const absl::Status & error_detail,const std::set<absl::string_view> & resource_names,SourceLocation location=SourceLocation ())872   void CheckRequest(const DiscoveryRequest& request, absl::string_view type_url,
873                     absl::string_view version_info,
874                     absl::string_view response_nonce,
875                     const absl::Status& error_detail,
876                     const std::set<absl::string_view>& resource_names,
877                     SourceLocation location = SourceLocation()) {
878     EXPECT_EQ(request.type_url(),
879               absl::StrCat("type.googleapis.com/", type_url))
880         << location.file() << ":" << location.line();
881     EXPECT_EQ(request.version_info(), version_info)
882         << location.file() << ":" << location.line();
883     EXPECT_EQ(request.response_nonce(), response_nonce)
884         << location.file() << ":" << location.line();
885     if (error_detail.ok()) {
886       EXPECT_FALSE(request.has_error_detail())
887           << location.file() << ":" << location.line();
888     } else {
889       EXPECT_EQ(request.error_detail().code(),
890                 static_cast<int>(error_detail.code()))
891           << location.file() << ":" << location.line();
892       EXPECT_EQ(request.error_detail().message(), error_detail.message())
893           << location.file() << ":" << location.line();
894     }
895     EXPECT_THAT(request.resource_names(),
896                 ::testing::UnorderedElementsAreArray(resource_names))
897         << location.file() << ":" << location.line();
898   }
899 
900   // Helper function to check the contents of the node message in a
901   // request against the client's node info.
CheckRequestNode(const DiscoveryRequest & request,SourceLocation location=SourceLocation ())902   void CheckRequestNode(const DiscoveryRequest& request,
903                         SourceLocation location = SourceLocation()) {
904     return CheckNode(request.node(), location);
905   }
906 
907   // Helper function to check the contents of a node message against the
908   // client's node info.
CheckNode(const envoy::config::core::v3::Node & node,SourceLocation location=SourceLocation ())909   void CheckNode(const envoy::config::core::v3::Node& node,
910                  SourceLocation location = SourceLocation()) {
911     // These fields come from the bootstrap config.
912     EXPECT_EQ(node.id(), xds_client_->bootstrap().node()->id())
913         << location.file() << ":" << location.line();
914     EXPECT_EQ(node.cluster(), xds_client_->bootstrap().node()->cluster())
915         << location.file() << ":" << location.line();
916     EXPECT_EQ(node.locality().region(),
917               xds_client_->bootstrap().node()->locality_region())
918         << location.file() << ":" << location.line();
919     EXPECT_EQ(node.locality().zone(),
920               xds_client_->bootstrap().node()->locality_zone())
921         << location.file() << ":" << location.line();
922     EXPECT_EQ(node.locality().sub_zone(),
923               xds_client_->bootstrap().node()->locality_sub_zone())
924         << location.file() << ":" << location.line();
925     if (xds_client_->bootstrap().node()->metadata().empty()) {
926       EXPECT_FALSE(node.has_metadata())
927           << location.file() << ":" << location.line();
928     } else {
929       std::string metadata_json_str;
930       auto status =
931           MessageToJsonString(node.metadata(), &metadata_json_str,
932                               GRPC_CUSTOM_JSONUTIL::JsonPrintOptions());
933       ASSERT_TRUE(status.ok())
934           << status << " on " << location.file() << ":" << location.line();
935       auto metadata_json = JsonParse(metadata_json_str);
936       ASSERT_TRUE(metadata_json.ok())
937           << metadata_json.status() << " on " << location.file() << ":"
938           << location.line();
939       Json expected =
940           Json::FromObject(xds_client_->bootstrap().node()->metadata());
941       EXPECT_EQ(*metadata_json, expected)
942           << location.file() << ":" << location.line()
943           << ":\nexpected: " << JsonDump(expected)
944           << "\nactual: " << JsonDump(*metadata_json);
945     }
946     EXPECT_EQ(node.user_agent_name(), "foo agent")
947         << location.file() << ":" << location.line();
948     EXPECT_EQ(node.user_agent_version(), "foo version")
949         << location.file() << ":" << location.line();
950   }
951 
DumpCsds(SourceLocation location=SourceLocation ())952   ClientConfig DumpCsds(SourceLocation location = SourceLocation()) {
953     std::string client_config_serialized =
954         XdsClientTestPeer(xds_client_.get()).TestDumpClientConfig();
955     ClientConfig client_config_proto;
956     CHECK(client_config_proto.ParseFromString(client_config_serialized))
957         << "at " << location.file() << ":" << location.line();
958     CheckNode(client_config_proto.node(), location);
959     return client_config_proto;
960   }
961 
962   ScopedTimeCache time_cache_;
963   std::shared_ptr<FuzzingEventEngine> event_engine_;
964   RefCountedPtr<FakeXdsTransportFactory> transport_factory_;
965   RefCountedPtr<XdsClient> xds_client_;
966   MetricsReporter* metrics_reporter_ = nullptr;
967 };
968 
969 MATCHER_P3(ResourceCountLabelsEq, xds_authority, resource_type, cache_state,
970            "equals ResourceCountLabels") {
971   bool ok = true;
972   ok &= ::testing::ExplainMatchResult(xds_authority, arg.xds_authority,
973                                       result_listener);
974   ok &= ::testing::ExplainMatchResult(resource_type, arg.resource_type,
975                                       result_listener);
976   ok &= ::testing::ExplainMatchResult(cache_state, arg.cache_state,
977                                       result_listener);
978   return ok;
979 }
980 
981 MATCHER_P(TimestampProtoEq, timestamp, "equals timestamp") {
982   gpr_timespec ts = {arg.seconds(), arg.nanos(), GPR_CLOCK_REALTIME};
983   return ::testing::ExplainMatchResult(
984       timestamp, Timestamp::FromTimespecRoundDown(ts), result_listener);
985 }
986 
987 // Matches a CSDS ClientConfig proto.
988 //
989 // The resource_fields argument is a matcher that must validate the
990 // xds_config, version_info, and last_updated fields.  Examples are
991 // CsdsResourceFields() and CsdsNoResourceFields().
992 //
993 // The error_fields argument is a matcher that must validate the
994 // error_state field.  Examples are CsdsErrorFields(),
995 // CsdsErrorDetailsOnly(), and CsdsNoErrorFields().
996 MATCHER_P5(CsdsResourceEq, client_status, type_url, name, resource_fields,
997            error_fields, "equals CSDS resource") {
998   bool ok = true;
999   ok &= ::testing::ExplainMatchResult(arg.client_status(), client_status,
1000                                       result_listener);
1001   ok &= ::testing::ExplainMatchResult(
1002       absl::StrCat("type.googleapis.com/", type_url), arg.type_url(),
1003       result_listener);
1004   ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
1005   ok &= ::testing::ExplainMatchResult(resource_fields, arg, result_listener);
1006   ok &= ::testing::ExplainMatchResult(error_fields, arg, result_listener);
1007   return ok;
1008 }
1009 
1010 // Validates the resource fields in a CSDS ClientConfig proto.  Intended
1011 // for use with CsdsResourceEq().
1012 MATCHER_P3(CsdsResourceFields, resource, version, last_updated,
1013            "CSDS resource fields") {
1014   bool ok = true;
1015   ok &= ::testing::ExplainMatchResult(resource, arg.xds_config().value(),
1016                                       result_listener);
1017   ok &= ::testing::ExplainMatchResult(version, arg.version_info(),
1018                                       result_listener);
1019   ok &= ::testing::ExplainMatchResult(last_updated, arg.last_updated(),
1020                                       result_listener);
1021   return ok;
1022 }
1023 
1024 // Validates the resource fields are not present in a CSDS ClientConfig
1025 // proto.  Intended for use with CsdsResourceEq().
1026 MATCHER(CsdsNoResourceFields, "CSDS has no resource fields") {
1027   bool ok = true;
1028   ok &= ::testing::ExplainMatchResult(::testing::IsFalse(),
1029                                       arg.has_xds_config(), result_listener);
1030   ok &= ::testing::ExplainMatchResult("", arg.version_info(), result_listener);
1031   ok &= ::testing::ExplainMatchResult(::testing::IsFalse(),
1032                                       arg.has_last_updated(), result_listener);
1033   return ok;
1034 }
1035 
1036 // Validates the error fields in a CSDS ClientConfig proto.  Intended
1037 // for use with CsdsResourceEq().
1038 MATCHER_P3(CsdsErrorFields, error_details, error_version, error_time,
1039            "CSDS error fields") {
1040   bool ok = true;
1041   ok &= ::testing::ExplainMatchResult(
1042       error_details, arg.error_state().details(), result_listener);
1043   ok &= ::testing::ExplainMatchResult(
1044       error_version, arg.error_state().version_info(), result_listener);
1045   ok &= ::testing::ExplainMatchResult(
1046       error_time, arg.error_state().last_update_attempt(), result_listener);
1047   return ok;
1048 }
1049 
1050 // Same as CsdsErrorFields, but expects the error details without a
1051 // version or timestamp.
1052 MATCHER_P(CsdsErrorDetailsOnly, error_details, "CSDS error details only") {
1053   bool ok = true;
1054   ok &= ::testing::ExplainMatchResult(
1055       error_details, arg.error_state().details(), result_listener);
1056   ok &= ::testing::ExplainMatchResult("", arg.error_state().version_info(),
1057                                       result_listener);
1058   ok &= ::testing::ExplainMatchResult(
1059       ::testing::IsFalse(), arg.error_state().has_last_update_attempt(),
1060       result_listener);
1061   return ok;
1062 }
1063 
1064 // Validates that there is no error in a CSDS ClientConfig proto.  Intended
1065 // for use with CsdsResourceEq().
1066 MATCHER(CsdsNoErrorFields, "CSDS has no error fields") {
1067   return ::testing::ExplainMatchResult(::testing::IsFalse(),
1068                                        arg.has_error_state(), result_listener);
1069 }
1070 
1071 // Convenient wrapper for ACKED resources in CSDS.
1072 MATCHER_P5(CsdsResourceAcked, type_url, name, resource, version, last_updated,
1073            "equals CSDS ACKED resource") {
1074   return ::testing::ExplainMatchResult(
1075       CsdsResourceEq(ClientResourceStatus::ACKED, type_url, name,
1076                      CsdsResourceFields(resource, version, last_updated),
1077                      CsdsNoErrorFields()),
1078       arg, result_listener);
1079 }
1080 
1081 // Convenient wrapper for REQUESTED resources in CSDS.
1082 MATCHER_P2(CsdsResourceRequested, type_url, name,
1083            "equals CSDS requested resource") {
1084   return ::testing::ExplainMatchResult(
1085       CsdsResourceEq(ClientResourceStatus::REQUESTED, type_url, name,
1086                      CsdsNoResourceFields(), CsdsNoErrorFields()),
1087       arg, result_listener);
1088 }
1089 
1090 // Convenient wrapper for DOES_NOT_EXIST resources in CSDS.
1091 MATCHER_P2(CsdsResourceDoesNotExist, type_url, name,
1092            "equals CSDS does-not-exist resource") {
1093   return ::testing::ExplainMatchResult(
1094       CsdsResourceEq(ClientResourceStatus::DOES_NOT_EXIST, type_url, name,
1095                      CsdsNoResourceFields(), CsdsNoErrorFields()),
1096       arg, result_listener);
1097 }
1098 
TEST_F(XdsClientTest,BasicWatch)1099 TEST_F(XdsClientTest, BasicWatch) {
1100   InitXdsClient();
1101   // Metrics should initially be empty.
1102   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1103               ::testing::ElementsAre());
1104   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1105               ::testing::ElementsAre());
1106   EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
1107   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
1108   EXPECT_THAT(metrics_reporter_->server_failures(), ::testing::ElementsAre());
1109   // CSDS should initially be empty.
1110   ClientConfig csds = DumpCsds();
1111   EXPECT_THAT(csds.generic_xds_configs(), ::testing::ElementsAre());
1112   // Start a watch for "foo1".
1113   auto watcher = StartFooWatch("foo1");
1114   // Check metrics.
1115   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1116               ::testing::ElementsAre());
1117   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1118               ::testing::ElementsAre());
1119   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
1120                                           kDefaultXdsServerUrl, true)));
1121   EXPECT_THAT(GetResourceCounts(),
1122               ::testing::ElementsAre(::testing::Pair(
1123                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1124                                         XdsFooResourceType::Get()->type_url(),
1125                                         "requested"),
1126                   1)));
1127   // CSDS should show that the resource has been requested.
1128   csds = DumpCsds();
1129   EXPECT_THAT(csds.generic_xds_configs(),
1130               ::testing::ElementsAre(CsdsResourceRequested(
1131                   XdsFooResourceType::Get()->type_url(), "foo1")));
1132   // Watcher should initially not see any resource reported.
1133   EXPECT_FALSE(watcher->HasEvent());
1134   // XdsClient should have created an ADS stream.
1135   auto stream = WaitForAdsStream();
1136   ASSERT_TRUE(stream != nullptr);
1137   // XdsClient should have sent a subscription request on the ADS stream.
1138   auto request = WaitForRequest(stream.get());
1139   ASSERT_TRUE(request.has_value());
1140   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1141                /*version_info=*/"", /*response_nonce=*/"",
1142                /*error_detail=*/absl::OkStatus(),
1143                /*resource_names=*/{"foo1"});
1144   CheckRequestNode(*request);  // Should be present on the first request.
1145   // Send a response.
1146   stream->SendMessageToClient(
1147       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1148           .set_version_info("1")
1149           .set_nonce("A")
1150           .AddFooResource(XdsFooResource("foo1", 6))
1151           .Serialize());
1152   // XdsClient should have delivered the response to the watcher.
1153   auto resource = watcher->WaitForNextResource();
1154   ASSERT_NE(resource, nullptr);
1155   EXPECT_EQ(resource->name, "foo1");
1156   EXPECT_EQ(resource->value, 6);
1157   // Check metric data.
1158   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1159       ::testing::ElementsAre(::testing::Pair(
1160           ::testing::Pair(kDefaultXdsServerUrl,
1161                           XdsFooResourceType::Get()->type_url()),
1162           1)),
1163       ::testing::ElementsAre(), ::testing::_));
1164   EXPECT_THAT(
1165       GetResourceCounts(),
1166       ::testing::ElementsAre(::testing::Pair(
1167           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1168                                 XdsFooResourceType::Get()->type_url(), "acked"),
1169           1)));
1170   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
1171                                           kDefaultXdsServerUrl, true)));
1172   // Check CSDS data.
1173   csds = DumpCsds();
1174   EXPECT_THAT(csds.generic_xds_configs(),
1175               ::testing::ElementsAre(CsdsResourceAcked(
1176                   XdsFooResourceType::Get()->type_url(), "foo1",
1177                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
1178   // XdsClient should have sent an ACK message to the xDS server.
1179   request = WaitForRequest(stream.get());
1180   ASSERT_TRUE(request.has_value());
1181   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1182                /*version_info=*/"1", /*response_nonce=*/"A",
1183                /*error_detail=*/absl::OkStatus(),
1184                /*resource_names=*/{"foo1"});
1185   // Cancel watch.
1186   CancelFooWatch(watcher.get(), "foo1");
1187   EXPECT_TRUE(stream->IsOrphaned());
1188   // Check metric data.
1189   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1190       ::testing::ElementsAre(::testing::Pair(
1191           ::testing::Pair(kDefaultXdsServerUrl,
1192                           XdsFooResourceType::Get()->type_url()),
1193           1)),
1194       ::testing::ElementsAre(), ::testing::_));
1195   EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
1196   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
1197   // Check CSDS data.
1198   csds = DumpCsds();
1199   EXPECT_THAT(csds.generic_xds_configs(), ::testing::ElementsAre());
1200 }
1201 
TEST_F(XdsClientTest,UpdateFromServer)1202 TEST_F(XdsClientTest, UpdateFromServer) {
1203   InitXdsClient();
1204   // Start a watch for "foo1".
1205   auto watcher = StartFooWatch("foo1");
1206   // Watcher should initially not see any resource reported.
1207   EXPECT_FALSE(watcher->HasEvent());
1208   // XdsClient should have created an ADS stream.
1209   auto stream = WaitForAdsStream();
1210   ASSERT_TRUE(stream != nullptr);
1211   // XdsClient should have sent a subscription request on the ADS stream.
1212   auto request = WaitForRequest(stream.get());
1213   ASSERT_TRUE(request.has_value());
1214   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1215                /*version_info=*/"", /*response_nonce=*/"",
1216                /*error_detail=*/absl::OkStatus(),
1217                /*resource_names=*/{"foo1"});
1218   CheckRequestNode(*request);  // Should be present on the first request.
1219   // Send a response.
1220   stream->SendMessageToClient(
1221       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1222           .set_version_info("1")
1223           .set_nonce("A")
1224           .AddFooResource(XdsFooResource("foo1", 6))
1225           .Serialize());
1226   // XdsClient should have delivered the response to the watcher.
1227   auto resource = watcher->WaitForNextResource();
1228   ASSERT_NE(resource, nullptr);
1229   EXPECT_EQ(resource->name, "foo1");
1230   EXPECT_EQ(resource->value, 6);
1231   // Check metric data.
1232   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1233       ::testing::ElementsAre(::testing::Pair(
1234           ::testing::Pair(kDefaultXdsServerUrl,
1235                           XdsFooResourceType::Get()->type_url()),
1236           1)),
1237       ::testing::ElementsAre(), ::testing::_));
1238   EXPECT_THAT(
1239       GetResourceCounts(),
1240       ::testing::ElementsAre(::testing::Pair(
1241           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1242                                 XdsFooResourceType::Get()->type_url(), "acked"),
1243           1)));
1244   // Check CSDS data.
1245   ClientConfig csds = DumpCsds();
1246   EXPECT_THAT(csds.generic_xds_configs(),
1247               ::testing::ElementsAre(CsdsResourceAcked(
1248                   XdsFooResourceType::Get()->type_url(), "foo1",
1249                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
1250   // XdsClient should have sent an ACK message to the xDS server.
1251   request = WaitForRequest(stream.get());
1252   ASSERT_TRUE(request.has_value());
1253   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1254                /*version_info=*/"1", /*response_nonce=*/"A",
1255                /*error_detail=*/absl::OkStatus(),
1256                /*resource_names=*/{"foo1"});
1257   // Server sends an updated version of the resource.
1258   // We increment time to make sure that the CSDS data gets a new timestamp.
1259   time_cache_.TestOnlySetNow(kTime1);
1260   stream->SendMessageToClient(
1261       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1262           .set_version_info("2")
1263           .set_nonce("B")
1264           .AddFooResource(XdsFooResource("foo1", 9))
1265           .Serialize());
1266   // XdsClient should have delivered the response to the watcher.
1267   resource = watcher->WaitForNextResource();
1268   ASSERT_NE(resource, nullptr);
1269   EXPECT_EQ(resource->name, "foo1");
1270   EXPECT_EQ(resource->value, 9);
1271   // Check metric data.
1272   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1273       ::testing::ElementsAre(::testing::Pair(
1274           ::testing::Pair(kDefaultXdsServerUrl,
1275                           XdsFooResourceType::Get()->type_url()),
1276           2)),
1277       ::testing::ElementsAre(), ::testing::_));
1278   EXPECT_THAT(
1279       GetResourceCounts(),
1280       ::testing::ElementsAre(::testing::Pair(
1281           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1282                                 XdsFooResourceType::Get()->type_url(), "acked"),
1283           1)));
1284   // Check CSDS data.
1285   csds = DumpCsds();
1286   EXPECT_THAT(csds.generic_xds_configs(),
1287               ::testing::ElementsAre(CsdsResourceAcked(
1288                   XdsFooResourceType::Get()->type_url(), "foo1",
1289                   resource->AsJsonString(), "2", TimestampProtoEq(kTime1))));
1290   // XdsClient should have sent an ACK message to the xDS server.
1291   request = WaitForRequest(stream.get());
1292   ASSERT_TRUE(request.has_value());
1293   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1294                /*version_info=*/"2", /*response_nonce=*/"B",
1295                /*error_detail=*/absl::OkStatus(),
1296                /*resource_names=*/{"foo1"});
1297   // Cancel watch.
1298   CancelFooWatch(watcher.get(), "foo1");
1299   EXPECT_TRUE(stream->IsOrphaned());
1300 }
1301 
TEST_F(XdsClientTest,MultipleWatchersForSameResource)1302 TEST_F(XdsClientTest, MultipleWatchersForSameResource) {
1303   InitXdsClient();
1304   // Start a watch for "foo1".
1305   auto watcher = StartFooWatch("foo1");
1306   // Watcher should initially not see any resource reported.
1307   EXPECT_FALSE(watcher->HasEvent());
1308   // XdsClient should have created an ADS stream.
1309   auto stream = WaitForAdsStream();
1310   ASSERT_TRUE(stream != nullptr);
1311   // XdsClient should have sent a subscription request on the ADS stream.
1312   auto request = WaitForRequest(stream.get());
1313   ASSERT_TRUE(request.has_value());
1314   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1315                /*version_info=*/"", /*response_nonce=*/"",
1316                /*error_detail=*/absl::OkStatus(),
1317                /*resource_names=*/{"foo1"});
1318   CheckRequestNode(*request);  // Should be present on the first request.
1319   // Send a response.
1320   stream->SendMessageToClient(
1321       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1322           .set_version_info("1")
1323           .set_nonce("A")
1324           .AddFooResource(XdsFooResource("foo1", 6))
1325           .Serialize());
1326   // XdsClient should have delivered the response to the watcher.
1327   auto resource = watcher->WaitForNextResource();
1328   ASSERT_NE(resource, nullptr);
1329   EXPECT_EQ(resource->name, "foo1");
1330   EXPECT_EQ(resource->value, 6);
1331   // Check metric data.
1332   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1333       ::testing::ElementsAre(::testing::Pair(
1334           ::testing::Pair(kDefaultXdsServerUrl,
1335                           XdsFooResourceType::Get()->type_url()),
1336           1)),
1337       ::testing::ElementsAre(), ::testing::_));
1338   EXPECT_THAT(
1339       GetResourceCounts(),
1340       ::testing::ElementsAre(::testing::Pair(
1341           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1342                                 XdsFooResourceType::Get()->type_url(), "acked"),
1343           1)));
1344   // Check CSDS data.
1345   ClientConfig csds = DumpCsds();
1346   EXPECT_THAT(csds.generic_xds_configs(),
1347               ::testing::ElementsAre(CsdsResourceAcked(
1348                   XdsFooResourceType::Get()->type_url(), "foo1",
1349                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
1350   // XdsClient should have sent an ACK message to the xDS server.
1351   request = WaitForRequest(stream.get());
1352   ASSERT_TRUE(request.has_value());
1353   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1354                /*version_info=*/"1", /*response_nonce=*/"A",
1355                /*error_detail=*/absl::OkStatus(),
1356                /*resource_names=*/{"foo1"});
1357   // Start a second watcher for the same resource.
1358   auto watcher2 = StartFooWatch("foo1");
1359   // This watcher should get an immediate notification, because the
1360   // resource is already cached.
1361   resource = watcher2->WaitForNextResource();
1362   ASSERT_NE(resource, nullptr);
1363   EXPECT_EQ(resource->name, "foo1");
1364   EXPECT_EQ(resource->value, 6);
1365   // Server should not have seen another request from the client.
1366   ASSERT_FALSE(stream->HaveMessageFromClient());
1367   // Server sends an updated version of the resource.
1368   stream->SendMessageToClient(
1369       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1370           .set_version_info("2")
1371           .set_nonce("B")
1372           .AddFooResource(XdsFooResource("foo1", 9))
1373           .Serialize());
1374   // XdsClient should deliver the response to both watchers.
1375   resource = watcher->WaitForNextResource();
1376   ASSERT_NE(resource, nullptr);
1377   EXPECT_EQ(resource->name, "foo1");
1378   EXPECT_EQ(resource->value, 9);
1379   resource = watcher2->WaitForNextResource();
1380   ASSERT_NE(resource, nullptr);
1381   EXPECT_EQ(resource->name, "foo1");
1382   EXPECT_EQ(resource->value, 9);
1383   // Check metric data.
1384   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1385       ::testing::ElementsAre(::testing::Pair(
1386           ::testing::Pair(kDefaultXdsServerUrl,
1387                           XdsFooResourceType::Get()->type_url()),
1388           2)),
1389       ::testing::ElementsAre(), ::testing::_));
1390   EXPECT_THAT(
1391       GetResourceCounts(),
1392       ::testing::ElementsAre(::testing::Pair(
1393           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1394                                 XdsFooResourceType::Get()->type_url(), "acked"),
1395           1)));
1396   // Check CSDS data.
1397   csds = DumpCsds();
1398   EXPECT_THAT(csds.generic_xds_configs(),
1399               ::testing::ElementsAre(CsdsResourceAcked(
1400                   XdsFooResourceType::Get()->type_url(), "foo1",
1401                   resource->AsJsonString(), "2", TimestampProtoEq(kTime0))));
1402   // XdsClient should have sent an ACK message to the xDS server.
1403   request = WaitForRequest(stream.get());
1404   ASSERT_TRUE(request.has_value());
1405   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1406                /*version_info=*/"2", /*response_nonce=*/"B",
1407                /*error_detail=*/absl::OkStatus(),
1408                /*resource_names=*/{"foo1"});
1409   // Cancel one of the watchers.
1410   CancelFooWatch(watcher.get(), "foo1");
1411   // The server should not see any new request.
1412   ASSERT_FALSE(WaitForRequest(stream.get()));
1413   // Now cancel the second watcher.
1414   CancelFooWatch(watcher2.get(), "foo1");
1415   EXPECT_TRUE(stream->IsOrphaned());
1416 }
1417 
TEST_F(XdsClientTest,SubscribeToMultipleResources)1418 TEST_F(XdsClientTest, SubscribeToMultipleResources) {
1419   InitXdsClient();
1420   // Start a watch for "foo1".
1421   auto watcher = StartFooWatch("foo1");
1422   // Watcher should initially not see any resource reported.
1423   EXPECT_FALSE(watcher->HasEvent());
1424   // Check metrics.
1425   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1426               ::testing::ElementsAre());
1427   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1428               ::testing::ElementsAre());
1429   EXPECT_THAT(GetResourceCounts(),
1430               ::testing::ElementsAre(::testing::Pair(
1431                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1432                                         XdsFooResourceType::Get()->type_url(),
1433                                         "requested"),
1434                   1)));
1435   // CSDS should show that the resource has been requested.
1436   ClientConfig csds = DumpCsds();
1437   EXPECT_THAT(csds.generic_xds_configs(),
1438               ::testing::ElementsAre(CsdsResourceRequested(
1439                   XdsFooResourceType::Get()->type_url(), "foo1")));
1440   // XdsClient should have created an ADS stream.
1441   auto stream = WaitForAdsStream();
1442   ASSERT_TRUE(stream != nullptr);
1443   // XdsClient should have sent a subscription request on the ADS stream.
1444   auto request = WaitForRequest(stream.get());
1445   ASSERT_TRUE(request.has_value());
1446   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1447                /*version_info=*/"", /*response_nonce=*/"",
1448                /*error_detail=*/absl::OkStatus(),
1449                /*resource_names=*/{"foo1"});
1450   CheckRequestNode(*request);  // Should be present on the first request.
1451   // Send a response.
1452   stream->SendMessageToClient(
1453       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1454           .set_version_info("1")
1455           .set_nonce("A")
1456           .AddFooResource(XdsFooResource("foo1", 6))
1457           .Serialize());
1458   // XdsClient should have delivered the response to the watcher.
1459   auto resource = watcher->WaitForNextResource();
1460   ASSERT_NE(resource, nullptr);
1461   EXPECT_EQ(resource->name, "foo1");
1462   EXPECT_EQ(resource->value, 6);
1463   // Check metric data.
1464   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1465       ::testing::ElementsAre(::testing::Pair(
1466           ::testing::Pair(kDefaultXdsServerUrl,
1467                           XdsFooResourceType::Get()->type_url()),
1468           1)),
1469       ::testing::ElementsAre(), ::testing::_));
1470   EXPECT_THAT(
1471       GetResourceCounts(),
1472       ::testing::ElementsAre(::testing::Pair(
1473           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1474                                 XdsFooResourceType::Get()->type_url(), "acked"),
1475           1)));
1476   // Check CSDS data.
1477   csds = DumpCsds();
1478   EXPECT_THAT(csds.generic_xds_configs(),
1479               ::testing::ElementsAre(CsdsResourceAcked(
1480                   XdsFooResourceType::Get()->type_url(), "foo1",
1481                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
1482   // XdsClient should have sent an ACK message to the xDS server.
1483   request = WaitForRequest(stream.get());
1484   ASSERT_TRUE(request.has_value());
1485   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1486                /*version_info=*/"1", /*response_nonce=*/"A",
1487                /*error_detail=*/absl::OkStatus(),
1488                /*resource_names=*/{"foo1"});
1489   // Start a watch for "foo2".
1490   auto watcher2 = StartFooWatch("foo2");
1491   // Check metric data.
1492   EXPECT_THAT(
1493       GetResourceCounts(),
1494       ::testing::ElementsAre(
1495           ::testing::Pair(ResourceCountLabelsEq(
1496                               XdsClient::kOldStyleAuthority,
1497                               XdsFooResourceType::Get()->type_url(), "acked"),
1498                           1),
1499           ::testing::Pair(
1500               ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1501                                     XdsFooResourceType::Get()->type_url(),
1502                                     "requested"),
1503               1)));
1504   // Check CSDS data.
1505   csds = DumpCsds();
1506   EXPECT_THAT(csds.generic_xds_configs(),
1507               ::testing::UnorderedElementsAre(
1508                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
1509                                     "foo1", resource->AsJsonString(), "1",
1510                                     TimestampProtoEq(kTime0)),
1511                   CsdsResourceRequested(XdsFooResourceType::Get()->type_url(),
1512                                         "foo2")));
1513   // XdsClient should have sent a subscription request on the ADS stream.
1514   request = WaitForRequest(stream.get());
1515   ASSERT_TRUE(request.has_value());
1516   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1517                /*version_info=*/"1", /*response_nonce=*/"A",
1518                /*error_detail=*/absl::OkStatus(),
1519                /*resource_names=*/{"foo1", "foo2"});
1520   // Send a response.
1521   // We increment time to make sure that the CSDS data gets a new timestamp.
1522   time_cache_.TestOnlySetNow(kTime1);
1523   stream->SendMessageToClient(
1524       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1525           .set_version_info("1")
1526           .set_nonce("B")
1527           .AddFooResource(XdsFooResource("foo2", 7))
1528           .Serialize());
1529   // XdsClient should have delivered the response to the watcher.
1530   auto resource2 = watcher2->WaitForNextResource();
1531   ASSERT_NE(resource2, nullptr);
1532   EXPECT_EQ(resource2->name, "foo2");
1533   EXPECT_EQ(resource2->value, 7);
1534   // Check metric data.
1535   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1536       ::testing::ElementsAre(::testing::Pair(
1537           ::testing::Pair(kDefaultXdsServerUrl,
1538                           XdsFooResourceType::Get()->type_url()),
1539           2)),
1540       ::testing::ElementsAre(), ::testing::_));
1541   EXPECT_THAT(
1542       GetResourceCounts(),
1543       ::testing::ElementsAre(::testing::Pair(
1544           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1545                                 XdsFooResourceType::Get()->type_url(), "acked"),
1546           2)));
1547   // Check CSDS data.
1548   csds = DumpCsds();
1549   EXPECT_THAT(csds.generic_xds_configs(),
1550               ::testing::UnorderedElementsAre(
1551                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
1552                                     "foo1", resource->AsJsonString(), "1",
1553                                     TimestampProtoEq(kTime0)),
1554                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
1555                                     "foo2", resource2->AsJsonString(), "1",
1556                                     TimestampProtoEq(kTime1))));
1557   // XdsClient should have sent an ACK message to the xDS server.
1558   request = WaitForRequest(stream.get());
1559   ASSERT_TRUE(request.has_value());
1560   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1561                /*version_info=*/"1", /*response_nonce=*/"B",
1562                /*error_detail=*/absl::OkStatus(),
1563                /*resource_names=*/{"foo1", "foo2"});
1564   // Cancel watch for "foo1".
1565   CancelFooWatch(watcher.get(), "foo1");
1566   // Check metric data.
1567   EXPECT_THAT(
1568       GetResourceCounts(),
1569       ::testing::ElementsAre(::testing::Pair(
1570           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1571                                 XdsFooResourceType::Get()->type_url(), "acked"),
1572           1)));
1573   // Check CSDS data.
1574   csds = DumpCsds();
1575   EXPECT_THAT(csds.generic_xds_configs(),
1576               ::testing::ElementsAre(CsdsResourceAcked(
1577                   XdsFooResourceType::Get()->type_url(), "foo2",
1578                   resource2->AsJsonString(), "1", TimestampProtoEq(kTime1))));
1579   // XdsClient should send an unsubscription request.
1580   request = WaitForRequest(stream.get());
1581   ASSERT_TRUE(request.has_value());
1582   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1583                /*version_info=*/"1", /*response_nonce=*/"B",
1584                /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"});
1585   // Now cancel watch for "foo2".
1586   CancelFooWatch(watcher2.get(), "foo2");
1587   EXPECT_TRUE(stream->IsOrphaned());
1588 }
1589 
TEST_F(XdsClientTest,UpdateContainsOnlyChangedResource)1590 TEST_F(XdsClientTest, UpdateContainsOnlyChangedResource) {
1591   InitXdsClient();
1592   // Start a watch for "foo1".
1593   auto watcher = StartFooWatch("foo1");
1594   // Watcher should initially not see any resource reported.
1595   EXPECT_FALSE(watcher->HasEvent());
1596   // XdsClient should have created an ADS stream.
1597   auto stream = WaitForAdsStream();
1598   ASSERT_TRUE(stream != nullptr);
1599   // XdsClient should have sent a subscription request on the ADS stream.
1600   auto request = WaitForRequest(stream.get());
1601   ASSERT_TRUE(request.has_value());
1602   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1603                /*version_info=*/"", /*response_nonce=*/"",
1604                /*error_detail=*/absl::OkStatus(),
1605                /*resource_names=*/{"foo1"});
1606   CheckRequestNode(*request);  // Should be present on the first request.
1607   // Send a response.
1608   stream->SendMessageToClient(
1609       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1610           .set_version_info("1")
1611           .set_nonce("A")
1612           .AddFooResource(XdsFooResource("foo1", 6))
1613           .Serialize());
1614   // XdsClient should have delivered the response to the watcher.
1615   auto resource = watcher->WaitForNextResource();
1616   ASSERT_NE(resource, nullptr);
1617   EXPECT_EQ(resource->name, "foo1");
1618   EXPECT_EQ(resource->value, 6);
1619   // XdsClient should have sent an ACK message to the xDS server.
1620   request = WaitForRequest(stream.get());
1621   ASSERT_TRUE(request.has_value());
1622   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1623                /*version_info=*/"1", /*response_nonce=*/"A",
1624                /*error_detail=*/absl::OkStatus(),
1625                /*resource_names=*/{"foo1"});
1626   // Start a watch for "foo2".
1627   auto watcher2 = StartFooWatch("foo2");
1628   // XdsClient should have sent a subscription request on the ADS stream.
1629   request = WaitForRequest(stream.get());
1630   ASSERT_TRUE(request.has_value());
1631   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1632                /*version_info=*/"1", /*response_nonce=*/"A",
1633                /*error_detail=*/absl::OkStatus(),
1634                /*resource_names=*/{"foo1", "foo2"});
1635   // Send a response.
1636   // We increment time to make sure that the CSDS data gets a new timestamp.
1637   time_cache_.TestOnlySetNow(kTime1);
1638   stream->SendMessageToClient(
1639       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1640           .set_version_info("1")
1641           .set_nonce("B")
1642           .AddFooResource(XdsFooResource("foo2", 7))
1643           .Serialize());
1644   // XdsClient should have delivered the response to the watcher.
1645   auto resource2 = watcher2->WaitForNextResource();
1646   ASSERT_NE(resource2, nullptr);
1647   EXPECT_EQ(resource2->name, "foo2");
1648   EXPECT_EQ(resource2->value, 7);
1649   // Check metric data.
1650   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1651       ::testing::ElementsAre(::testing::Pair(
1652           ::testing::Pair(kDefaultXdsServerUrl,
1653                           XdsFooResourceType::Get()->type_url()),
1654           2)),
1655       ::testing::ElementsAre(), ::testing::_));
1656   EXPECT_THAT(
1657       GetResourceCounts(),
1658       ::testing::ElementsAre(::testing::Pair(
1659           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1660                                 XdsFooResourceType::Get()->type_url(), "acked"),
1661           2)));
1662   // Check CSDS data.
1663   ClientConfig csds = DumpCsds();
1664   EXPECT_THAT(csds.generic_xds_configs(),
1665               ::testing::UnorderedElementsAre(
1666                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
1667                                     "foo1", resource->AsJsonString(), "1",
1668                                     TimestampProtoEq(kTime0)),
1669                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
1670                                     "foo2", resource2->AsJsonString(), "1",
1671                                     TimestampProtoEq(kTime1))));
1672   // XdsClient should have sent an ACK message to the xDS server.
1673   request = WaitForRequest(stream.get());
1674   ASSERT_TRUE(request.has_value());
1675   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1676                /*version_info=*/"1", /*response_nonce=*/"B",
1677                /*error_detail=*/absl::OkStatus(),
1678                /*resource_names=*/{"foo1", "foo2"});
1679   // Server sends an update for "foo1".  The response does not contain "foo2".
1680   // We increment time to make sure that the CSDS data gets a new timestamp.
1681   time_cache_.TestOnlySetNow(kTime2);
1682   stream->SendMessageToClient(
1683       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1684           .set_version_info("2")
1685           .set_nonce("C")
1686           .AddFooResource(XdsFooResource("foo1", 9))
1687           .Serialize());
1688   // XdsClient should have delivered the response to the watcher.
1689   resource = watcher->WaitForNextResource();
1690   ASSERT_NE(resource, nullptr);
1691   EXPECT_EQ(resource->name, "foo1");
1692   EXPECT_EQ(resource->value, 9);
1693   // Check metric data.
1694   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1695       ::testing::ElementsAre(::testing::Pair(
1696           ::testing::Pair(kDefaultXdsServerUrl,
1697                           XdsFooResourceType::Get()->type_url()),
1698           3)),
1699       ::testing::ElementsAre(), ::testing::_));
1700   EXPECT_THAT(
1701       GetResourceCounts(),
1702       ::testing::ElementsAre(::testing::Pair(
1703           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1704                                 XdsFooResourceType::Get()->type_url(), "acked"),
1705           2)));
1706   // Check CSDS data.
1707   csds = DumpCsds();
1708   EXPECT_THAT(csds.generic_xds_configs(),
1709               ::testing::UnorderedElementsAre(
1710                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
1711                                     "foo1", resource->AsJsonString(), "2",
1712                                     TimestampProtoEq(kTime2)),
1713                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
1714                                     "foo2", resource2->AsJsonString(), "1",
1715                                     TimestampProtoEq(kTime1))));
1716   // XdsClient should have sent an ACK message to the xDS server.
1717   request = WaitForRequest(stream.get());
1718   ASSERT_TRUE(request.has_value());
1719   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1720                /*version_info=*/"2", /*response_nonce=*/"C",
1721                /*error_detail=*/absl::OkStatus(),
1722                /*resource_names=*/{"foo1", "foo2"});
1723   // Cancel watch for "foo1".
1724   CancelFooWatch(watcher.get(), "foo1");
1725   // XdsClient should send an unsubscription request.
1726   request = WaitForRequest(stream.get());
1727   ASSERT_TRUE(request.has_value());
1728   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1729                /*version_info=*/"2", /*response_nonce=*/"C",
1730                /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"});
1731   // Now cancel watch for "foo2".
1732   CancelFooWatch(watcher2.get(), "foo2");
1733   EXPECT_TRUE(stream->IsOrphaned());
1734 }
1735 
TEST_F(XdsClientTest,ResourceValidationFailure)1736 TEST_F(XdsClientTest, ResourceValidationFailure) {
1737   InitXdsClient();
1738   // Start a watch for "foo1".
1739   auto watcher = StartFooWatch("foo1");
1740   // Check metric data.
1741   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1742               ::testing::ElementsAre());
1743   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1744               ::testing::ElementsAre());
1745   EXPECT_THAT(GetResourceCounts(),
1746               ::testing::ElementsAre(::testing::Pair(
1747                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1748                                         XdsFooResourceType::Get()->type_url(),
1749                                         "requested"),
1750                   1)));
1751   // CSDS should show that the resource has been requested.
1752   ClientConfig csds = DumpCsds();
1753   EXPECT_THAT(csds.generic_xds_configs(),
1754               ::testing::ElementsAre(CsdsResourceRequested(
1755                   XdsFooResourceType::Get()->type_url(), "foo1")));
1756   // Watcher should initially not see any resource reported.
1757   EXPECT_FALSE(watcher->HasEvent());
1758   // XdsClient should have created an ADS stream.
1759   auto stream = WaitForAdsStream();
1760   ASSERT_TRUE(stream != nullptr);
1761   // XdsClient should have sent a subscription request on the ADS stream.
1762   auto request = WaitForRequest(stream.get());
1763   ASSERT_TRUE(request.has_value());
1764   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1765                /*version_info=*/"", /*response_nonce=*/"",
1766                /*error_detail=*/absl::OkStatus(),
1767                /*resource_names=*/{"foo1"});
1768   CheckRequestNode(*request);  // Should be present on the first request.
1769   // Send a response containing an invalid resource.
1770   stream->SendMessageToClient(
1771       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1772           .set_version_info("1")
1773           .set_nonce("A")
1774           .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
1775                               "{\"name\":\"foo1\",\"value\":[]}")
1776           .Serialize());
1777   // XdsClient should deliver an error to the watcher.
1778   auto error = watcher->WaitForNextError();
1779   ASSERT_TRUE(error.has_value());
1780   EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
1781   EXPECT_EQ(error->message(),
1782             "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1783             "[field:value error:is not a number] (node ID:xds_client_test)")
1784       << *error;
1785   // Check metric data.
1786   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1787       ::testing::ElementsAre(),
1788       ::testing::ElementsAre(::testing::Pair(
1789           ::testing::Pair(kDefaultXdsServerUrl,
1790                           XdsFooResourceType::Get()->type_url()),
1791           1)),
1792       ::testing::_));
1793   EXPECT_THAT(GetResourceCounts(),
1794               ::testing::ElementsAre(::testing::Pair(
1795                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1796                                         XdsFooResourceType::Get()->type_url(),
1797                                         "nacked"),
1798                   1)));
1799   // CSDS should show that the resource has been NACKed.
1800   csds = DumpCsds();
1801   EXPECT_THAT(
1802       csds.generic_xds_configs(),
1803       ::testing::ElementsAre(CsdsResourceEq(
1804           ClientResourceStatus::NACKED, XdsFooResourceType::Get()->type_url(),
1805           "foo1", CsdsNoResourceFields(),
1806           CsdsErrorFields("INVALID_ARGUMENT: errors validating JSON: "
1807                           "[field:value error:is not a number]",
1808                           "1", TimestampProtoEq(kTime0)))));
1809   // XdsClient should NACK the update.
1810   // Note that version_info is not populated in the request.
1811   request = WaitForRequest(stream.get());
1812   ASSERT_TRUE(request.has_value());
1813   CheckRequest(
1814       *request, XdsFooResourceType::Get()->type_url(),
1815       /*version_info=*/"", /*response_nonce=*/"A",
1816       // error_detail=
1817       absl::InvalidArgumentError(
1818           "xDS response validation errors: ["
1819           "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: "
1820           "[field:value error:is not a number]]"),
1821       /*resource_names=*/{"foo1"});
1822   // Start a second watch for the same resource.  It should immediately
1823   // receive the same error.
1824   auto watcher2 = StartFooWatch("foo1");
1825   error = watcher2->WaitForNextError();
1826   ASSERT_TRUE(error.has_value());
1827   EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
1828   EXPECT_EQ(error->message(),
1829             "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1830             "[field:value error:is not a number] (node ID:xds_client_test)")
1831       << *error;
1832   // Now server sends an updated version of the resource.
1833   // We increment time to make sure that the CSDS data gets a new timestamp.
1834   time_cache_.TestOnlySetNow(kTime1);
1835   stream->SendMessageToClient(
1836       ResponseBuilder(XdsFooResourceType::Get()->type_url())
1837           .set_version_info("2")
1838           .set_nonce("B")
1839           .AddFooResource(XdsFooResource("foo1", 9))
1840           .Serialize());
1841   // XdsClient should deliver the response to both watchers.
1842   auto resource = watcher->WaitForNextResource();
1843   ASSERT_NE(resource, nullptr);
1844   EXPECT_EQ(resource->name, "foo1");
1845   EXPECT_EQ(resource->value, 9);
1846   resource = watcher2->WaitForNextResource();
1847   ASSERT_NE(resource, nullptr);
1848   EXPECT_EQ(resource->name, "foo1");
1849   EXPECT_EQ(resource->value, 9);
1850   // Check metric data.
1851   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1852       ::testing::ElementsAre(::testing::Pair(
1853           ::testing::Pair(kDefaultXdsServerUrl,
1854                           XdsFooResourceType::Get()->type_url()),
1855           1)),
1856       ::testing::ElementsAre(::testing::Pair(
1857           ::testing::Pair(kDefaultXdsServerUrl,
1858                           XdsFooResourceType::Get()->type_url()),
1859           1)),
1860       ::testing::_));
1861   EXPECT_THAT(
1862       GetResourceCounts(),
1863       ::testing::ElementsAre(::testing::Pair(
1864           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1865                                 XdsFooResourceType::Get()->type_url(), "acked"),
1866           1)));
1867   // Check CSDS data.
1868   csds = DumpCsds();
1869   EXPECT_THAT(csds.generic_xds_configs(),
1870               ::testing::UnorderedElementsAre(CsdsResourceAcked(
1871                   XdsFooResourceType::Get()->type_url(), "foo1",
1872                   resource->AsJsonString(), "2", TimestampProtoEq(kTime1))));
1873   // XdsClient should have sent an ACK message to the xDS server.
1874   request = WaitForRequest(stream.get());
1875   ASSERT_TRUE(request.has_value());
1876   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1877                /*version_info=*/"2", /*response_nonce=*/"B",
1878                /*error_detail=*/absl::OkStatus(),
1879                /*resource_names=*/{"foo1"});
1880   // Cancel watch.
1881   CancelFooWatch(watcher.get(), "foo1");
1882   CancelFooWatch(watcher2.get(), "foo1");
1883   EXPECT_TRUE(stream->IsOrphaned());
1884 }
1885 
TEST_F(XdsClientTest,ResourceValidationFailureMultipleResources)1886 TEST_F(XdsClientTest, ResourceValidationFailureMultipleResources) {
1887   InitXdsClient();
1888   // Start a watch for "foo1".
1889   auto watcher = StartFooWatch("foo1");
1890   // Watcher should initially not see any resource reported.
1891   EXPECT_FALSE(watcher->HasEvent());
1892   // Check metric data.
1893   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1894               ::testing::ElementsAre());
1895   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1896               ::testing::ElementsAre());
1897   EXPECT_THAT(GetResourceCounts(),
1898               ::testing::ElementsAre(::testing::Pair(
1899                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1900                                         XdsFooResourceType::Get()->type_url(),
1901                                         "requested"),
1902                   1)));
1903   // CSDS should show that the resource has been requested.
1904   ClientConfig csds = DumpCsds();
1905   EXPECT_THAT(csds.generic_xds_configs(),
1906               ::testing::ElementsAre(CsdsResourceRequested(
1907                   XdsFooResourceType::Get()->type_url(), "foo1")));
1908   // XdsClient should have created an ADS stream.
1909   auto stream = WaitForAdsStream();
1910   ASSERT_TRUE(stream != nullptr);
1911   // XdsClient should have sent a subscription request on the ADS stream.
1912   auto request = WaitForRequest(stream.get());
1913   ASSERT_TRUE(request.has_value());
1914   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1915                /*version_info=*/"", /*response_nonce=*/"",
1916                /*error_detail=*/absl::OkStatus(),
1917                /*resource_names=*/{"foo1"});
1918   CheckRequestNode(*request);  // Should be present on the first request.
1919   // Before the server responds, add a watch for another resource.
1920   auto watcher2 = StartFooWatch("foo2");
1921   // Check metric data.
1922   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1923       ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
1924   EXPECT_THAT(GetResourceCounts(),
1925               ::testing::ElementsAre(::testing::Pair(
1926                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1927                                         XdsFooResourceType::Get()->type_url(),
1928                                         "requested"),
1929                   2)));
1930   // CSDS should show that both resourcees have been requested.
1931   csds = DumpCsds();
1932   EXPECT_THAT(
1933       csds.generic_xds_configs(),
1934       ::testing::ElementsAre(
1935           CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo1"),
1936           CsdsResourceRequested(XdsFooResourceType::Get()->type_url(),
1937                                 "foo2")));
1938   // Client should send another request.
1939   request = WaitForRequest(stream.get());
1940   ASSERT_TRUE(request.has_value());
1941   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1942                /*version_info=*/"", /*response_nonce=*/"",
1943                /*error_detail=*/absl::OkStatus(),
1944                /*resource_names=*/{"foo1", "foo2"});
1945   // Add a watch for a third resource.
1946   auto watcher3 = StartFooWatch("foo3");
1947   // Check metric data.
1948   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1949       ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
1950   EXPECT_THAT(GetResourceCounts(),
1951               ::testing::ElementsAre(::testing::Pair(
1952                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1953                                         XdsFooResourceType::Get()->type_url(),
1954                                         "requested"),
1955                   3)));
1956   // Check CSDS.
1957   csds = DumpCsds();
1958   EXPECT_THAT(
1959       csds.generic_xds_configs(),
1960       ::testing::UnorderedElementsAre(
1961           CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo1"),
1962           CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo2"),
1963           CsdsResourceRequested(XdsFooResourceType::Get()->type_url(),
1964                                 "foo3")));
1965   // Client should send another request.
1966   request = WaitForRequest(stream.get());
1967   ASSERT_TRUE(request.has_value());
1968   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1969                /*version_info=*/"", /*response_nonce=*/"",
1970                /*error_detail=*/absl::OkStatus(),
1971                /*resource_names=*/{"foo1", "foo2", "foo3"});
1972   // Add a watch for a fourth resource.
1973   auto watcher4 = StartFooWatch("foo4");
1974   // Check metric data.
1975   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1976       ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
1977   EXPECT_THAT(GetResourceCounts(),
1978               ::testing::ElementsAre(::testing::Pair(
1979                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1980                                         XdsFooResourceType::Get()->type_url(),
1981                                         "requested"),
1982                   4)));
1983   // Check CSDS.
1984   csds = DumpCsds();
1985   EXPECT_THAT(
1986       csds.generic_xds_configs(),
1987       ::testing::UnorderedElementsAre(
1988           CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo1"),
1989           CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo2"),
1990           CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo3"),
1991           CsdsResourceRequested(XdsFooResourceType::Get()->type_url(),
1992                                 "foo4")));
1993   // Client should send another request.
1994   request = WaitForRequest(stream.get());
1995   ASSERT_TRUE(request.has_value());
1996   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1997                /*version_info=*/"", /*response_nonce=*/"",
1998                /*error_detail=*/absl::OkStatus(),
1999                /*resource_names=*/{"foo1", "foo2", "foo3", "foo4"});
2000   // Server sends a response containing three invalid resources and one
2001   // valid resource.
2002   stream->SendMessageToClient(
2003       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2004           .set_version_info("1")
2005           .set_nonce("A")
2006           // foo1: JSON parsing succeeds, so we know the resource name,
2007           // but validation fails.
2008           .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
2009                               "{\"name\":\"foo1\",\"value\":[]}")
2010           // foo2: JSON parsing fails, and not wrapped in a Resource
2011           // wrapper, so we don't actually know the resource's name.
2012           .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
2013                               "{\"name\":\"foo2,\"value\":6}")
2014           // Empty resource.  Will be included in NACK but will not
2015           // affect any watchers.
2016           .AddEmptyResource()
2017           // Invalid resource wrapper.  Will be included in NACK but
2018           // will not affect any watchers.
2019           .AddInvalidResourceWrapper()
2020           // foo3: JSON parsing fails, but it is wrapped in a Resource
2021           // wrapper, so we do know the resource's name.
2022           .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
2023                               "{\"name\":\"foo3,\"value\":6}",
2024                               /*resource_wrapper_name=*/"foo3")
2025           // foo4: valid resource.
2026           .AddFooResource(XdsFooResource("foo4", 5))
2027           .Serialize());
2028   // XdsClient should deliver an error to the watchers for foo1 and foo3.
2029   auto error = watcher->WaitForNextError();
2030   ASSERT_TRUE(error.has_value());
2031   EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
2032   EXPECT_EQ(error->message(),
2033             "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
2034             "[field:value error:is not a number] (node ID:xds_client_test)")
2035       << *error;
2036   error = watcher3->WaitForNextError();
2037   ASSERT_TRUE(error.has_value());
2038   EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
2039   EXPECT_EQ(error->message(),
2040             "invalid resource: INVALID_ARGUMENT: JSON parsing failed: "
2041             "[JSON parse error at index 15] (node ID:xds_client_test)")
2042       << *error;
2043   // It cannot delivery an error for foo2, because the client doesn't know
2044   // that that resource in the response was actually supposed to be foo2.
2045   EXPECT_FALSE(watcher2->HasEvent());
2046   // It will delivery a valid resource update for foo4.
2047   auto resource = watcher4->WaitForNextResource();
2048   ASSERT_NE(resource, nullptr);
2049   EXPECT_EQ(resource->name, "foo4");
2050   EXPECT_EQ(resource->value, 5);
2051   // Check metric data.
2052   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2053       ::testing::ElementsAre(::testing::Pair(
2054           ::testing::Pair(kDefaultXdsServerUrl,
2055                           XdsFooResourceType::Get()->type_url()),
2056           1)),
2057       ::testing::ElementsAre(::testing::Pair(
2058           ::testing::Pair(kDefaultXdsServerUrl,
2059                           XdsFooResourceType::Get()->type_url()),
2060           5)),
2061       ::testing::_));
2062   EXPECT_THAT(
2063       GetResourceCounts(),
2064       ::testing::ElementsAre(
2065           // foo4
2066           ::testing::Pair(ResourceCountLabelsEq(
2067                               XdsClient::kOldStyleAuthority,
2068                               XdsFooResourceType::Get()->type_url(), "acked"),
2069                           1),
2070           // foo1 and foo3
2071           ::testing::Pair(ResourceCountLabelsEq(
2072                               XdsClient::kOldStyleAuthority,
2073                               XdsFooResourceType::Get()->type_url(), "nacked"),
2074                           2),
2075           // did not recognize response for foo2
2076           ::testing::Pair(
2077               ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2078                                     XdsFooResourceType::Get()->type_url(),
2079                                     "requested"),
2080               1)));
2081   // Check CSDS.
2082   csds = DumpCsds();
2083   EXPECT_THAT(
2084       csds.generic_xds_configs(),
2085       ::testing::UnorderedElementsAre(
2086           CsdsResourceEq(
2087               ClientResourceStatus::NACKED,
2088               XdsFooResourceType::Get()->type_url(), "foo1",
2089               CsdsNoResourceFields(),
2090               CsdsErrorFields("INVALID_ARGUMENT: errors validating JSON: "
2091                               "[field:value error:is not a number]",
2092                               "1", TimestampProtoEq(kTime0))),
2093           CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo2"),
2094           CsdsResourceEq(
2095               ClientResourceStatus::NACKED,
2096               XdsFooResourceType::Get()->type_url(), "foo3",
2097               CsdsNoResourceFields(),
2098               CsdsErrorFields("INVALID_ARGUMENT: JSON parsing failed: "
2099                               "[JSON parse error at index 15]",
2100                               "1", TimestampProtoEq(kTime0))),
2101           CsdsResourceAcked(XdsFooResourceType::Get()->type_url(), "foo4",
2102                             resource->AsJsonString(), "1",
2103                             TimestampProtoEq(kTime0))));
2104   // XdsClient should NACK the update.
2105   // There was one good resource, so the version will be updated.
2106   request = WaitForRequest(stream.get());
2107   ASSERT_TRUE(request.has_value());
2108   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2109                /*version_info=*/"1", /*response_nonce=*/"A",
2110                // error_detail=
2111                absl::InvalidArgumentError(absl::StrCat(
2112                    "xDS response validation errors: ["
2113                    // foo1
2114                    "resource index 0: foo1: "
2115                    "INVALID_ARGUMENT: errors validating JSON: "
2116                    "[field:value error:is not a number]; "
2117                    // foo2 (name not known)
2118                    "resource index 1: INVALID_ARGUMENT: JSON parsing failed: "
2119                    "[JSON parse error at index 15]; "
2120                    // empty resource
2121                    "resource index 2: incorrect resource type \"\" "
2122                    "(should be \"",
2123                    XdsFooResourceType::Get()->type_url(),
2124                    "\"); "
2125                    // invalid resource wrapper
2126                    "resource index 3: Can't decode Resource proto wrapper; "
2127                    // foo3
2128                    "resource index 4: foo3: "
2129                    "INVALID_ARGUMENT: JSON parsing failed: "
2130                    "[JSON parse error at index 15]]")),
2131                /*resource_names=*/{"foo1", "foo2", "foo3", "foo4"});
2132   // Cancel watches.
2133   CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
2134   CancelFooWatch(watcher2.get(), "foo2", /*delay_unsubscription=*/true);
2135   CancelFooWatch(watcher3.get(), "foo3", /*delay_unsubscription=*/true);
2136   CancelFooWatch(watcher4.get(), "foo4");
2137   EXPECT_TRUE(stream->IsOrphaned());
2138 }
2139 
TEST_F(XdsClientTest,ResourceValidationFailureForCachedResource)2140 TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) {
2141   InitXdsClient();
2142   // Start a watch for "foo1".
2143   auto watcher = StartFooWatch("foo1");
2144   // Watcher should initially not see any resource reported.
2145   EXPECT_FALSE(watcher->HasEvent());
2146   // XdsClient should have created an ADS stream.
2147   auto stream = WaitForAdsStream();
2148   ASSERT_TRUE(stream != nullptr);
2149   // XdsClient should have sent a subscription request on the ADS stream.
2150   auto request = WaitForRequest(stream.get());
2151   ASSERT_TRUE(request.has_value());
2152   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2153                /*version_info=*/"", /*response_nonce=*/"",
2154                /*error_detail=*/absl::OkStatus(),
2155                /*resource_names=*/{"foo1"});
2156   CheckRequestNode(*request);  // Should be present on the first request.
2157   // Send a response.
2158   stream->SendMessageToClient(
2159       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2160           .set_version_info("1")
2161           .set_nonce("A")
2162           .AddFooResource(XdsFooResource("foo1", 6))
2163           .Serialize());
2164   // XdsClient should have delivered the response to the watcher.
2165   auto resource = watcher->WaitForNextResource();
2166   ASSERT_NE(resource, nullptr);
2167   EXPECT_EQ(resource->name, "foo1");
2168   EXPECT_EQ(resource->value, 6);
2169   // Check metric data.
2170   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2171       ::testing::ElementsAre(::testing::Pair(
2172           ::testing::Pair(kDefaultXdsServerUrl,
2173                           XdsFooResourceType::Get()->type_url()),
2174           1)),
2175       ::testing::ElementsAre(), ::testing::_));
2176   EXPECT_THAT(
2177       GetResourceCounts(),
2178       ::testing::ElementsAre(::testing::Pair(
2179           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2180                                 XdsFooResourceType::Get()->type_url(), "acked"),
2181           1)));
2182   // Check CSDS data.
2183   ClientConfig csds = DumpCsds();
2184   EXPECT_THAT(csds.generic_xds_configs(),
2185               ::testing::UnorderedElementsAre(CsdsResourceAcked(
2186                   XdsFooResourceType::Get()->type_url(), "foo1",
2187                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
2188   // XdsClient should have sent an ACK message to the xDS server.
2189   request = WaitForRequest(stream.get());
2190   ASSERT_TRUE(request.has_value());
2191   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2192                /*version_info=*/"1", /*response_nonce=*/"A",
2193                /*error_detail=*/absl::OkStatus(),
2194                /*resource_names=*/{"foo1"});
2195   // Send an update containing an invalid resource.
2196   // We increment time to make sure that the CSDS data gets a new timestamp.
2197   time_cache_.TestOnlySetNow(kTime1);
2198   stream->SendMessageToClient(
2199       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2200           .set_version_info("2")
2201           .set_nonce("B")
2202           .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
2203                               "{\"name\":\"foo1\",\"value\":[]}")
2204           .Serialize());
2205   // XdsClient should deliver an error to the watcher.
2206   auto error = watcher->WaitForNextAmbientError();
2207   ASSERT_TRUE(error.has_value());
2208   EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
2209   EXPECT_EQ(error->message(),
2210             "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
2211             "[field:value error:is not a number] (node ID:xds_client_test)")
2212       << *error;
2213   // Check metric data.
2214   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2215       ::testing::ElementsAre(::testing::Pair(
2216           ::testing::Pair(kDefaultXdsServerUrl,
2217                           XdsFooResourceType::Get()->type_url()),
2218           1)),
2219       ::testing::ElementsAre(::testing::Pair(
2220           ::testing::Pair(kDefaultXdsServerUrl,
2221                           XdsFooResourceType::Get()->type_url()),
2222           1)),
2223       ::testing::_));
2224   EXPECT_THAT(GetResourceCounts(),
2225               ::testing::ElementsAre(::testing::Pair(
2226                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2227                                         XdsFooResourceType::Get()->type_url(),
2228                                         "nacked_but_cached"),
2229                   1)));
2230   // Check CSDS data.
2231   csds = DumpCsds();
2232   EXPECT_THAT(csds.generic_xds_configs(),
2233               ::testing::UnorderedElementsAre(CsdsResourceEq(
2234                   ClientResourceStatus::NACKED,
2235                   XdsFooResourceType::Get()->type_url(), "foo1",
2236                   CsdsResourceFields(resource->AsJsonString(), "1",
2237                                      TimestampProtoEq(kTime0)),
2238                   CsdsErrorFields("INVALID_ARGUMENT: errors validating JSON: "
2239                                   "[field:value error:is not a number]",
2240                                   "2", TimestampProtoEq(kTime1)))));
2241   // XdsClient should NACK the update.
2242   // Note that version_info is set to the previous version in this request,
2243   // because there were no valid resources in it.
2244   request = WaitForRequest(stream.get());
2245   ASSERT_TRUE(request.has_value());
2246   CheckRequest(
2247       *request, XdsFooResourceType::Get()->type_url(),
2248       /*version_info=*/"1", /*response_nonce=*/"B",
2249       // error_detail=
2250       absl::InvalidArgumentError(
2251           "xDS response validation errors: ["
2252           "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: "
2253           "[field:value error:is not a number]]"),
2254       /*resource_names=*/{"foo1"});
2255   // Start a second watcher for the same resource.  Even though the last
2256   // update was a NACK, we should still deliver the cached resource to
2257   // the watcher.
2258   // TODO(roth): Consider what the right behavior is here.  It seems
2259   // inconsistent that the watcher sees the error if it had started
2260   // before the error was seen but does not if it was started afterwards.
2261   // One option is to not send errors at all for already-cached resources;
2262   // another option is to send the errors even for newly started watchers.
2263   auto watcher2 = StartFooWatch("foo1");
2264   resource = watcher2->WaitForNextResource();
2265   ASSERT_NE(resource, nullptr);
2266   EXPECT_EQ(resource->name, "foo1");
2267   EXPECT_EQ(resource->value, 6);
2268   // Cancel watches.
2269   CancelFooWatch(watcher.get(), "foo1");
2270   CancelFooWatch(watcher2.get(), "foo1");
2271   EXPECT_TRUE(stream->IsOrphaned());
2272 }
2273 
TEST_F(XdsClientTest,WildcardCapableResponseWithEmptyResource)2274 TEST_F(XdsClientTest, WildcardCapableResponseWithEmptyResource) {
2275   InitXdsClient();
2276   // Start a watch for "wc1".
2277   auto watcher = StartWildcardCapableWatch("wc1");
2278   // Watcher should initially not see any resource reported.
2279   EXPECT_FALSE(watcher->HasEvent());
2280   // XdsClient should have created an ADS stream.
2281   auto stream = WaitForAdsStream();
2282   ASSERT_TRUE(stream != nullptr);
2283   // XdsClient should have sent a subscription request on the ADS stream.
2284   auto request = WaitForRequest(stream.get());
2285   ASSERT_TRUE(request.has_value());
2286   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2287                /*version_info=*/"", /*response_nonce=*/"",
2288                /*error_detail=*/absl::OkStatus(),
2289                /*resource_names=*/{"wc1"});
2290   CheckRequestNode(*request);  // Should be present on the first request.
2291   // Server sends a response containing the requested resources plus an
2292   // empty resource.
2293   stream->SendMessageToClient(
2294       ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2295           .set_version_info("1")
2296           .set_nonce("A")
2297           .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6))
2298           .AddEmptyResource()
2299           .Serialize());
2300   // XdsClient will delivery a valid resource update for wc1.
2301   auto resource = watcher->WaitForNextResource();
2302   ASSERT_NE(resource, nullptr);
2303   EXPECT_EQ(resource->name, "wc1");
2304   EXPECT_EQ(resource->value, 6);
2305   // Check metric data.
2306   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2307       ::testing::ElementsAre(::testing::Pair(
2308           ::testing::Pair(kDefaultXdsServerUrl,
2309                           XdsWildcardCapableResourceType::Get()->type_url()),
2310           1)),
2311       ::testing::ElementsAre(::testing::Pair(
2312           ::testing::Pair(kDefaultXdsServerUrl,
2313                           XdsWildcardCapableResourceType::Get()->type_url()),
2314           1)),
2315       ::testing::_));
2316   EXPECT_THAT(
2317       GetResourceCounts(),
2318       ::testing::ElementsAre(::testing::Pair(
2319           ResourceCountLabelsEq(
2320               XdsClient::kOldStyleAuthority,
2321               XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2322           1)));
2323   // Check CSDS data.
2324   ClientConfig csds = DumpCsds();
2325   EXPECT_THAT(csds.generic_xds_configs(),
2326               ::testing::UnorderedElementsAre(CsdsResourceAcked(
2327                   XdsWildcardCapableResourceType::Get()->type_url(), "wc1",
2328                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
2329   // XdsClient should NACK the update.
2330   // There was one good resource, so the version will be updated.
2331   request = WaitForRequest(stream.get());
2332   ASSERT_TRUE(request.has_value());
2333   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2334                /*version_info=*/"1", /*response_nonce=*/"A",
2335                // error_detail=
2336                absl::InvalidArgumentError(absl::StrCat(
2337                    "xDS response validation errors: ["
2338                    "resource index 1: incorrect resource type \"\" "
2339                    "(should be \"",
2340                    XdsWildcardCapableResourceType::Get()->type_url(), "\")]")),
2341                /*resource_names=*/{"wc1"});
2342   // Cancel watch.
2343   CancelWildcardCapableWatch(watcher.get(), "wc1");
2344   EXPECT_TRUE(stream->IsOrphaned());
2345 }
2346 
2347 // This tests resource removal triggered by the server when using a
2348 // resource type that requires all resources to be present in every
2349 // response, similar to LDS and CDS.
TEST_F(XdsClientTest,ResourceDeletion)2350 TEST_F(XdsClientTest, ResourceDeletion) {
2351   InitXdsClient();
2352   // Start a watch for "wc1".
2353   auto watcher = StartWildcardCapableWatch("wc1");
2354   // Watcher should initially not see any resource reported.
2355   EXPECT_FALSE(watcher->HasEvent());
2356   // XdsClient should have created an ADS stream.
2357   auto stream = WaitForAdsStream();
2358   ASSERT_TRUE(stream != nullptr);
2359   // XdsClient should have sent a subscription request on the ADS stream.
2360   auto request = WaitForRequest(stream.get());
2361   ASSERT_TRUE(request.has_value());
2362   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2363                /*version_info=*/"", /*response_nonce=*/"",
2364                /*error_detail=*/absl::OkStatus(),
2365                /*resource_names=*/{"wc1"});
2366   CheckRequestNode(*request);  // Should be present on the first request.
2367   // Server sends a response.
2368   stream->SendMessageToClient(
2369       ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2370           .set_version_info("1")
2371           .set_nonce("A")
2372           .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6))
2373           .Serialize());
2374   // XdsClient should have delivered the response to the watcher.
2375   auto resource = watcher->WaitForNextResource();
2376   ASSERT_NE(resource, nullptr);
2377   EXPECT_EQ(resource->name, "wc1");
2378   EXPECT_EQ(resource->value, 6);
2379   // Check metric data.
2380   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2381       ::testing::ElementsAre(::testing::Pair(
2382           ::testing::Pair(kDefaultXdsServerUrl,
2383                           XdsWildcardCapableResourceType::Get()->type_url()),
2384           1)),
2385       ::testing::ElementsAre(), ::testing::_));
2386   EXPECT_THAT(
2387       GetResourceCounts(),
2388       ::testing::ElementsAre(::testing::Pair(
2389           ResourceCountLabelsEq(
2390               XdsClient::kOldStyleAuthority,
2391               XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2392           1)));
2393   // Check CSDS data.
2394   ClientConfig csds = DumpCsds();
2395   EXPECT_THAT(csds.generic_xds_configs(),
2396               ::testing::UnorderedElementsAre(CsdsResourceAcked(
2397                   XdsWildcardCapableResourceType::Get()->type_url(), "wc1",
2398                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
2399   // XdsClient should have sent an ACK message to the xDS server.
2400   request = WaitForRequest(stream.get());
2401   ASSERT_TRUE(request.has_value());
2402   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2403                /*version_info=*/"1", /*response_nonce=*/"A",
2404                /*error_detail=*/absl::OkStatus(),
2405                /*resource_names=*/{"wc1"});
2406   // Server now sends a response without the resource, thus indicating
2407   // it's been deleted.
2408   stream->SendMessageToClient(
2409       ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2410           .set_version_info("2")
2411           .set_nonce("B")
2412           .Serialize());
2413   // Watcher should see the does-not-exist event.
2414   EXPECT_TRUE(watcher->WaitForDoesNotExist());
2415   // Check metric data.
2416   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2417       ::testing::ElementsAre(::testing::Pair(
2418           ::testing::Pair(kDefaultXdsServerUrl,
2419                           XdsWildcardCapableResourceType::Get()->type_url()),
2420           1)),
2421       ::testing::ElementsAre(), ::testing::_));
2422   EXPECT_THAT(GetResourceCounts(),
2423               ::testing::ElementsAre(::testing::Pair(
2424                   ResourceCountLabelsEq(
2425                       XdsClient::kOldStyleAuthority,
2426                       XdsWildcardCapableResourceType::Get()->type_url(),
2427                       "does_not_exist"),
2428                   1)));
2429   // Check CSDS data.
2430   csds = DumpCsds();
2431   EXPECT_THAT(csds.generic_xds_configs(),
2432               ::testing::UnorderedElementsAre(CsdsResourceDoesNotExist(
2433                   XdsWildcardCapableResourceType::Get()->type_url(), "wc1")));
2434   // Start a new watcher for the same resource.  It should immediately
2435   // receive the same does-not-exist notification.
2436   auto watcher2 = StartWildcardCapableWatch("wc1");
2437   EXPECT_TRUE(watcher2->WaitForDoesNotExist());
2438   // XdsClient should have sent an ACK message to the xDS server.
2439   request = WaitForRequest(stream.get());
2440   ASSERT_TRUE(request.has_value());
2441   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2442                /*version_info=*/"2", /*response_nonce=*/"B",
2443                /*error_detail=*/absl::OkStatus(),
2444                /*resource_names=*/{"wc1"});
2445   // Server sends the resource again.
2446   // We increment time to make sure that the CSDS data gets a new timestamp.
2447   time_cache_.TestOnlySetNow(kTime1);
2448   stream->SendMessageToClient(
2449       ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2450           .set_version_info("3")
2451           .set_nonce("C")
2452           .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 7))
2453           .Serialize());
2454   // XdsClient should have delivered the response to the watchers.
2455   resource = watcher->WaitForNextResource();
2456   ASSERT_NE(resource, nullptr);
2457   EXPECT_EQ(resource->name, "wc1");
2458   EXPECT_EQ(resource->value, 7);
2459   resource = watcher2->WaitForNextResource();
2460   ASSERT_NE(resource, nullptr);
2461   EXPECT_EQ(resource->name, "wc1");
2462   EXPECT_EQ(resource->value, 7);
2463   // Check metric data.
2464   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2465       ::testing::ElementsAre(::testing::Pair(
2466           ::testing::Pair(kDefaultXdsServerUrl,
2467                           XdsWildcardCapableResourceType::Get()->type_url()),
2468           2)),
2469       ::testing::ElementsAre(), ::testing::_));
2470   EXPECT_THAT(
2471       GetResourceCounts(),
2472       ::testing::ElementsAre(::testing::Pair(
2473           ResourceCountLabelsEq(
2474               XdsClient::kOldStyleAuthority,
2475               XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2476           1)));
2477   // Check CSDS data.
2478   csds = DumpCsds();
2479   EXPECT_THAT(csds.generic_xds_configs(),
2480               ::testing::UnorderedElementsAre(CsdsResourceAcked(
2481                   XdsWildcardCapableResourceType::Get()->type_url(), "wc1",
2482                   resource->AsJsonString(), "3", TimestampProtoEq(kTime1))));
2483   // XdsClient should have sent an ACK message to the xDS server.
2484   request = WaitForRequest(stream.get());
2485   ASSERT_TRUE(request.has_value());
2486   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2487                /*version_info=*/"3", /*response_nonce=*/"C",
2488                /*error_detail=*/absl::OkStatus(),
2489                /*resource_names=*/{"wc1"});
2490   // Cancel watch.
2491   CancelWildcardCapableWatch(watcher.get(), "wc1");
2492   CancelWildcardCapableWatch(watcher2.get(), "wc1");
2493   EXPECT_TRUE(stream->IsOrphaned());
2494 }
2495 
2496 // This tests that when we ignore resource deletions from the server
2497 // when configured to do so.
TEST_F(XdsClientTest,ResourceDeletionIgnoredWhenConfigured)2498 TEST_F(XdsClientTest, ResourceDeletionIgnoredWhenConfigured) {
2499   InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
2500       {FakeXdsBootstrap::FakeXdsServer(kDefaultXdsServerUrl, true)}));
2501   // Start a watch for "wc1".
2502   auto watcher = StartWildcardCapableWatch("wc1");
2503   // Watcher should initially not see any resource reported.
2504   EXPECT_FALSE(watcher->HasEvent());
2505   // XdsClient should have created an ADS stream.
2506   auto stream = WaitForAdsStream();
2507   ASSERT_TRUE(stream != nullptr);
2508   // XdsClient should have sent a subscription request on the ADS stream.
2509   auto request = WaitForRequest(stream.get());
2510   ASSERT_TRUE(request.has_value());
2511   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2512                /*version_info=*/"", /*response_nonce=*/"",
2513                /*error_detail=*/absl::OkStatus(),
2514                /*resource_names=*/{"wc1"});
2515   CheckRequestNode(*request);  // Should be present on the first request.
2516   // Server sends a response.
2517   stream->SendMessageToClient(
2518       ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2519           .set_version_info("1")
2520           .set_nonce("A")
2521           .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6))
2522           .Serialize());
2523   // XdsClient should have delivered the response to the watcher.
2524   auto resource = watcher->WaitForNextResource();
2525   ASSERT_NE(resource, nullptr);
2526   EXPECT_EQ(resource->name, "wc1");
2527   EXPECT_EQ(resource->value, 6);
2528   // Check metric data.
2529   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2530       ::testing::ElementsAre(::testing::Pair(
2531           ::testing::Pair(kDefaultXdsServerUrl,
2532                           XdsWildcardCapableResourceType::Get()->type_url()),
2533           1)),
2534       ::testing::ElementsAre(), ::testing::_));
2535   EXPECT_THAT(
2536       GetResourceCounts(),
2537       ::testing::ElementsAre(::testing::Pair(
2538           ResourceCountLabelsEq(
2539               XdsClient::kOldStyleAuthority,
2540               XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2541           1)));
2542   // Check CSDS data.
2543   ClientConfig csds = DumpCsds();
2544   EXPECT_THAT(csds.generic_xds_configs(),
2545               ::testing::UnorderedElementsAre(CsdsResourceAcked(
2546                   XdsWildcardCapableResourceType::Get()->type_url(), "wc1",
2547                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
2548   // XdsClient should have sent an ACK message to the xDS server.
2549   request = WaitForRequest(stream.get());
2550   ASSERT_TRUE(request.has_value());
2551   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2552                /*version_info=*/"1", /*response_nonce=*/"A",
2553                /*error_detail=*/absl::OkStatus(),
2554                /*resource_names=*/{"wc1"});
2555   // Server now sends a response without the resource, thus indicating
2556   // it's been deleted.
2557   // We increment time to make sure that the CSDS data does NOT get a
2558   // new timestamp.
2559   time_cache_.TestOnlySetNow(kTime1);
2560   stream->SendMessageToClient(
2561       ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2562           .set_version_info("2")
2563           .set_nonce("B")
2564           .Serialize());
2565   // Watcher should not see any update, since we should have ignored the
2566   // deletion.
2567   EXPECT_TRUE(watcher->ExpectNoEvent());
2568   // Check metric data.
2569   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2570       ::testing::ElementsAre(::testing::Pair(
2571           ::testing::Pair(kDefaultXdsServerUrl,
2572                           XdsWildcardCapableResourceType::Get()->type_url()),
2573           1)),
2574       ::testing::ElementsAre(), ::testing::_));
2575   EXPECT_THAT(
2576       GetResourceCounts(),
2577       ::testing::ElementsAre(::testing::Pair(
2578           ResourceCountLabelsEq(
2579               XdsClient::kOldStyleAuthority,
2580               XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2581           1)));
2582   // Check CSDS data.
2583   csds = DumpCsds();
2584   EXPECT_THAT(csds.generic_xds_configs(),
2585               ::testing::UnorderedElementsAre(CsdsResourceAcked(
2586                   XdsWildcardCapableResourceType::Get()->type_url(), "wc1",
2587                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
2588   // Start a new watcher for the same resource.  It should immediately
2589   // receive the cached resource.
2590   auto watcher2 = StartWildcardCapableWatch("wc1");
2591   resource = watcher2->WaitForNextResource();
2592   ASSERT_NE(resource, nullptr);
2593   EXPECT_EQ(resource->name, "wc1");
2594   EXPECT_EQ(resource->value, 6);
2595   // XdsClient should have sent an ACK message to the xDS server.
2596   request = WaitForRequest(stream.get());
2597   ASSERT_TRUE(request.has_value());
2598   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2599                /*version_info=*/"2", /*response_nonce=*/"B",
2600                /*error_detail=*/absl::OkStatus(),
2601                /*resource_names=*/{"wc1"});
2602   // Server sends a new value for the resource.
2603   // We increment time to make sure that the CSDS data gets a new timestamp.
2604   time_cache_.TestOnlySetNow(kTime2);
2605   stream->SendMessageToClient(
2606       ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2607           .set_version_info("3")
2608           .set_nonce("C")
2609           .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 7))
2610           .Serialize());
2611   // XdsClient should have delivered the response to the watchers.
2612   resource = watcher->WaitForNextResource();
2613   ASSERT_NE(resource, nullptr);
2614   EXPECT_EQ(resource->name, "wc1");
2615   EXPECT_EQ(resource->value, 7);
2616   resource = watcher2->WaitForNextResource();
2617   ASSERT_NE(resource, nullptr);
2618   EXPECT_EQ(resource->name, "wc1");
2619   EXPECT_EQ(resource->value, 7);
2620   // Check metric data.
2621   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2622       ::testing::ElementsAre(::testing::Pair(
2623           ::testing::Pair(kDefaultXdsServerUrl,
2624                           XdsWildcardCapableResourceType::Get()->type_url()),
2625           2)),
2626       ::testing::ElementsAre(), ::testing::_));
2627   EXPECT_THAT(
2628       GetResourceCounts(),
2629       ::testing::ElementsAre(::testing::Pair(
2630           ResourceCountLabelsEq(
2631               XdsClient::kOldStyleAuthority,
2632               XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2633           1)));
2634   // Check CSDS data.
2635   csds = DumpCsds();
2636   EXPECT_THAT(csds.generic_xds_configs(),
2637               ::testing::UnorderedElementsAre(CsdsResourceAcked(
2638                   XdsWildcardCapableResourceType::Get()->type_url(), "wc1",
2639                   resource->AsJsonString(), "3", TimestampProtoEq(kTime2))));
2640   // XdsClient should have sent an ACK message to the xDS server.
2641   request = WaitForRequest(stream.get());
2642   ASSERT_TRUE(request.has_value());
2643   CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2644                /*version_info=*/"3", /*response_nonce=*/"C",
2645                /*error_detail=*/absl::OkStatus(),
2646                /*resource_names=*/{"wc1"});
2647   // Cancel watch.
2648   CancelWildcardCapableWatch(watcher.get(), "wc1");
2649   CancelWildcardCapableWatch(watcher2.get(), "wc1");
2650   EXPECT_TRUE(stream->IsOrphaned());
2651 }
2652 
TEST_F(XdsClientTest,StreamClosedByServer)2653 TEST_F(XdsClientTest, StreamClosedByServer) {
2654   InitXdsClient();
2655   // Metrics should initially be empty.
2656   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
2657   // Start a watch for "foo1".
2658   auto watcher = StartFooWatch("foo1");
2659   // Check metric data.
2660   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2661                                           kDefaultXdsServerUrl, true)));
2662   // Watcher should initially not see any resource reported.
2663   EXPECT_FALSE(watcher->HasEvent());
2664   // XdsClient should have created an ADS stream.
2665   auto stream = WaitForAdsStream();
2666   ASSERT_TRUE(stream != nullptr);
2667   // XdsClient should have sent a subscription request on the ADS stream.
2668   auto request = WaitForRequest(stream.get());
2669   ASSERT_TRUE(request.has_value());
2670   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2671                /*version_info=*/"", /*response_nonce=*/"",
2672                /*error_detail=*/absl::OkStatus(),
2673                /*resource_names=*/{"foo1"});
2674   CheckRequestNode(*request);  // Should be present on the first request.
2675   // Server sends a response.
2676   stream->SendMessageToClient(
2677       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2678           .set_version_info("1")
2679           .set_nonce("A")
2680           .AddFooResource(XdsFooResource("foo1", 6))
2681           .Serialize());
2682   // XdsClient should have delivered the response to the watcher.
2683   auto resource = watcher->WaitForNextResource();
2684   ASSERT_NE(resource, nullptr);
2685   EXPECT_EQ(resource->name, "foo1");
2686   EXPECT_EQ(resource->value, 6);
2687   // XdsClient should have sent an ACK message to the xDS server.
2688   request = WaitForRequest(stream.get());
2689   ASSERT_TRUE(request.has_value());
2690   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2691                /*version_info=*/"1", /*response_nonce=*/"A",
2692                /*error_detail=*/absl::OkStatus(),
2693                /*resource_names=*/{"foo1"});
2694   // Now server closes the stream.
2695   stream->MaybeSendStatusToClient(absl::OkStatus());
2696   // XdsClient should NOT report error to watcher, because we saw a
2697   // response on the stream before it failed.
2698   // Stream should be orphaned.
2699   EXPECT_TRUE(stream->IsOrphaned());
2700   // Check metric data.
2701   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2702                                           kDefaultXdsServerUrl, true)));
2703   // XdsClient should create a new stream.
2704   stream = WaitForAdsStream();
2705   ASSERT_TRUE(stream != nullptr);
2706   // XdsClient sends a subscription request.
2707   // Note that the version persists from the previous stream, but the
2708   // nonce does not.
2709   request = WaitForRequest(stream.get());
2710   ASSERT_TRUE(request.has_value());
2711   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2712                /*version_info=*/"1", /*response_nonce=*/"",
2713                /*error_detail=*/absl::OkStatus(),
2714                /*resource_names=*/{"foo1"});
2715   CheckRequestNode(*request);  // Should be present on the first request.
2716   // Before the server resends the resource, start a new watcher for the
2717   // same resource.  This watcher should immediately receive the cached
2718   // resource.
2719   auto watcher2 = StartFooWatch("foo1");
2720   resource = watcher2->WaitForNextResource();
2721   ASSERT_NE(resource, nullptr);
2722   EXPECT_EQ(resource->name, "foo1");
2723   EXPECT_EQ(resource->value, 6);
2724   // Server now sends the requested resource.
2725   stream->SendMessageToClient(
2726       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2727           .set_version_info("1")
2728           .set_nonce("B")
2729           .AddFooResource(XdsFooResource("foo1", 6))
2730           .Serialize());
2731   // Watcher does NOT get an update, since the resource has not changed.
2732   EXPECT_FALSE(watcher->WaitForNextResource());
2733   // XdsClient sends an ACK.
2734   request = WaitForRequest(stream.get());
2735   ASSERT_TRUE(request.has_value());
2736   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2737                /*version_info=*/"1", /*response_nonce=*/"B",
2738                /*error_detail=*/absl::OkStatus(),
2739                /*resource_names=*/{"foo1"});
2740   // Cancel watcher.
2741   CancelFooWatch(watcher.get(), "foo1");
2742   CancelFooWatch(watcher2.get(), "foo1");
2743   EXPECT_TRUE(stream->IsOrphaned());
2744 }
2745 
TEST_F(XdsClientTest,StreamClosedByServerWithoutSeeingResponse)2746 TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) {
2747   InitXdsClient();
2748   // Metrics should initially be empty.
2749   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
2750   // Start a watch for "foo1".
2751   auto watcher = StartFooWatch("foo1");
2752   // Check metric data.
2753   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2754                                           kDefaultXdsServerUrl, true)));
2755   EXPECT_THAT(metrics_reporter_->server_failures(), ::testing::ElementsAre());
2756   // Watcher should initially not see any resource reported.
2757   EXPECT_FALSE(watcher->HasEvent());
2758   // XdsClient should have created an ADS stream.
2759   auto stream = WaitForAdsStream();
2760   ASSERT_TRUE(stream != nullptr);
2761   // XdsClient should have sent a subscription request on the ADS stream.
2762   auto request = WaitForRequest(stream.get());
2763   ASSERT_TRUE(request.has_value());
2764   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2765                /*version_info=*/"", /*response_nonce=*/"",
2766                /*error_detail=*/absl::OkStatus(),
2767                /*resource_names=*/{"foo1"});
2768   CheckRequestNode(*request);  // Should be present on the first request.
2769   // Server closes the stream without sending a response.
2770   stream->MaybeSendStatusToClient(absl::UnavailableError("ugh"));
2771   // Check metric data.
2772   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2773                                           kDefaultXdsServerUrl, false)));
2774   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2775       ::testing::_, ::testing::_,
2776       ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
2777   // XdsClient should report an error to the watcher.
2778   auto error = watcher->WaitForNextError();
2779   ASSERT_TRUE(error.has_value());
2780   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2781   EXPECT_EQ(error->message(),
2782             "xDS channel for server default_xds_server: xDS call failed "
2783             "with no responses received; status: UNAVAILABLE: ugh "
2784             "(node ID:xds_client_test)")
2785       << *error;
2786   // XdsClient should create a new stream.
2787   stream = WaitForAdsStream();
2788   ASSERT_TRUE(stream != nullptr);
2789   // XdsClient sends a subscription request.
2790   request = WaitForRequest(stream.get());
2791   ASSERT_TRUE(request.has_value());
2792   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2793                /*version_info=*/"", /*response_nonce=*/"",
2794                /*error_detail=*/absl::OkStatus(),
2795                /*resource_names=*/{"foo1"});
2796   CheckRequestNode(*request);  // Should be present on the first request.
2797   // Connection still reported as unhappy until we get a response.
2798   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2799                                           kDefaultXdsServerUrl, false)));
2800   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2801       ::testing::_, ::testing::_,
2802       ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
2803   // Server now sends the requested resource.
2804   stream->SendMessageToClient(
2805       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2806           .set_version_info("1")
2807           .set_nonce("A")
2808           .AddFooResource(XdsFooResource("foo1", 6))
2809           .Serialize());
2810   // Watcher gets the resource.
2811   auto resource = watcher->WaitForNextResource();
2812   ASSERT_NE(resource, nullptr);
2813   EXPECT_EQ(resource->name, "foo1");
2814   EXPECT_EQ(resource->value, 6);
2815   // Connection now reported as happy.
2816   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2817                                           kDefaultXdsServerUrl, true)));
2818   // XdsClient sends an ACK.
2819   request = WaitForRequest(stream.get());
2820   ASSERT_TRUE(request.has_value());
2821   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2822                /*version_info=*/"1", /*response_nonce=*/"A",
2823                /*error_detail=*/absl::OkStatus(),
2824                /*resource_names=*/{"foo1"});
2825   // Cancel watcher.
2826   CancelFooWatch(watcher.get(), "foo1");
2827   EXPECT_TRUE(stream->IsOrphaned());
2828 }
2829 
TEST_F(XdsClientTest,ConnectionFails)2830 TEST_F(XdsClientTest, ConnectionFails) {
2831   InitXdsClient();
2832   // Tell transport to let us manually trigger completion of the
2833   // send_message ops to XdsClient.
2834   transport_factory_->SetAutoCompleteMessagesFromClient(false);
2835   // Metrics should initially be empty.
2836   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
2837   EXPECT_THAT(metrics_reporter_->server_failures(), ::testing::ElementsAre());
2838   // Start a watch for "foo1".
2839   auto watcher = StartFooWatch("foo1");
2840   // Check metric data.
2841   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2842                                           kDefaultXdsServerUrl, true)));
2843   // Watcher should initially not see any resource reported.
2844   EXPECT_FALSE(watcher->HasEvent());
2845   // XdsClient should have created an ADS stream.
2846   auto stream = WaitForAdsStream();
2847   ASSERT_TRUE(stream != nullptr);
2848   // XdsClient should have sent a subscription request on the ADS stream.
2849   auto request = WaitForRequest(stream.get());
2850   ASSERT_TRUE(request.has_value());
2851   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2852                /*version_info=*/"", /*response_nonce=*/"",
2853                /*error_detail=*/absl::OkStatus(),
2854                /*resource_names=*/{"foo1"});
2855   CheckRequestNode(*request);  // Should be present on the first request.
2856   // Transport reports connection failure.
2857   TriggerConnectionFailure(*xds_client_->bootstrap().servers().front(),
2858                            absl::UnavailableError("connection failed"));
2859   // XdsClient should report an error to the watcher.
2860   auto error = watcher->WaitForNextError();
2861   ASSERT_TRUE(error.has_value());
2862   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2863   EXPECT_EQ(error->message(),
2864             "xDS channel for server default_xds_server: "
2865             "connection failed (node ID:xds_client_test)")
2866       << *error;
2867   // Connection reported as unhappy.
2868   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2869                                           kDefaultXdsServerUrl, false)));
2870   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2871       ::testing::_, ::testing::_,
2872       ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
2873   // We should not see a resource-does-not-exist event, because the
2874   // timer should not be running while the channel is disconnected.
2875   EXPECT_TRUE(watcher->ExpectNoEvent());
2876   // Start a new watch.  This watcher should be given the same error,
2877   // since we have not yet recovered.
2878   auto watcher2 = StartFooWatch("foo1");
2879   error = watcher2->WaitForNextError();
2880   ASSERT_TRUE(error.has_value());
2881   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2882   EXPECT_EQ(error->message(),
2883             "xDS channel for server default_xds_server: "
2884             "connection failed (node ID:xds_client_test)")
2885       << *error;
2886   // Second watcher should not see resource-does-not-exist either.
2887   EXPECT_FALSE(watcher2->HasEvent());
2888   // The ADS stream uses wait_for_ready inside the XdsTransport interface,
2889   // so when the channel reconnects, the already-started stream will proceed.
2890   stream->CompleteSendMessageFromClient();
2891   // Server sends a response.
2892   stream->SendMessageToClient(
2893       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2894           .set_version_info("1")
2895           .set_nonce("A")
2896           .AddFooResource(XdsFooResource("foo1", 6))
2897           .Serialize());
2898   // Connection now reported as happy.
2899   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2900                                           kDefaultXdsServerUrl, true)));
2901   // XdsClient should have delivered the response to the watchers.
2902   auto resource = watcher->WaitForNextResource();
2903   ASSERT_NE(resource, nullptr);
2904   EXPECT_EQ(resource->name, "foo1");
2905   EXPECT_EQ(resource->value, 6);
2906   resource = watcher2->WaitForNextResource();
2907   ASSERT_NE(resource, nullptr);
2908   EXPECT_EQ(resource->name, "foo1");
2909   EXPECT_EQ(resource->value, 6);
2910   // XdsClient should have sent an ACK message to the xDS server.
2911   request = WaitForRequest(stream.get());
2912   ASSERT_TRUE(request.has_value());
2913   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2914                /*version_info=*/"1", /*response_nonce=*/"A",
2915                /*error_detail=*/absl::OkStatus(),
2916                /*resource_names=*/{"foo1"});
2917   stream->CompleteSendMessageFromClient();
2918   // Cancel watches.
2919   CancelFooWatch(watcher.get(), "foo1");
2920   CancelFooWatch(watcher2.get(), "foo1");
2921   EXPECT_TRUE(stream->IsOrphaned());
2922 }
2923 
TEST_F(XdsClientTest,ConnectionFailsWithCachedResource)2924 TEST_F(XdsClientTest, ConnectionFailsWithCachedResource) {
2925   InitXdsClient();
2926   // Start a watch for "foo1".
2927   auto watcher = StartFooWatch("foo1");
2928   // XdsClient should have created an ADS stream.
2929   auto stream = WaitForAdsStream();
2930   ASSERT_TRUE(stream != nullptr);
2931   // XdsClient should have sent a subscription request on the ADS stream.
2932   auto request = WaitForRequest(stream.get());
2933   ASSERT_TRUE(request.has_value());
2934   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2935                /*version_info=*/"", /*response_nonce=*/"",
2936                /*error_detail=*/absl::OkStatus(),
2937                /*resource_names=*/{"foo1"});
2938   CheckRequestNode(*request);  // Should be present on the first request.
2939   // Server sends a response.
2940   stream->SendMessageToClient(
2941       ResponseBuilder(XdsFooResourceType::Get()->type_url())
2942           .set_version_info("1")
2943           .set_nonce("A")
2944           .AddFooResource(XdsFooResource("foo1", 6))
2945           .Serialize());
2946   // XdsClient should have delivered the resource to the watcher.
2947   auto resource = watcher->WaitForNextResource();
2948   ASSERT_NE(resource, nullptr);
2949   EXPECT_EQ(resource->name, "foo1");
2950   EXPECT_EQ(resource->value, 6);
2951   // XdsClient should have sent an ACK message to the xDS server.
2952   request = WaitForRequest(stream.get());
2953   ASSERT_TRUE(request.has_value());
2954   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2955                /*version_info=*/"1", /*response_nonce=*/"A",
2956                /*error_detail=*/absl::OkStatus(),
2957                /*resource_names=*/{"foo1"});
2958   // Transport reports connection failure.
2959   TriggerConnectionFailure(*xds_client_->bootstrap().servers().front(),
2960                            absl::UnavailableError("connection failed"));
2961   // XdsClient should report an ambient error to the watcher.
2962   auto error = watcher->WaitForNextAmbientError();
2963   ASSERT_TRUE(error.has_value());
2964   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2965   EXPECT_EQ(error->message(),
2966             "xDS channel for server default_xds_server: "
2967             "connection failed (node ID:xds_client_test)")
2968       << *error;
2969   // The transport failing should also cause the stream to terminate.
2970   stream->MaybeSendStatusToClient(absl::UnavailableError("ugh"));
2971   // The XdsClient will create a new stream.
2972   stream = WaitForAdsStream();
2973   ASSERT_TRUE(stream != nullptr);
2974   // XdsClient should have sent a subscription request on the new stream
2975   // that includes the last seen version.
2976   request = WaitForRequest(stream.get());
2977   ASSERT_TRUE(request.has_value());
2978   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2979                /*version_info=*/"1", /*response_nonce=*/"",
2980                /*error_detail=*/absl::OkStatus(),
2981                /*resource_names=*/{"foo1"});
2982   CheckRequestNode(*request);  // Should be present on the first request.
2983   // Start a new watch.  This watcher should be given the cached resource
2984   // followed by the ambient error.
2985   auto watcher2 = StartFooWatch("foo1");
2986   resource = watcher2->WaitForNextResource();
2987   ASSERT_NE(resource, nullptr);
2988   EXPECT_EQ(resource->name, "foo1");
2989   EXPECT_EQ(resource->value, 6);
2990   error = watcher2->WaitForNextAmbientError();
2991   ASSERT_TRUE(error.has_value());
2992   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2993   EXPECT_EQ(error->message(),
2994             "xDS channel for server default_xds_server: "
2995             "connection failed (node ID:xds_client_test)")
2996       << *error;
2997   // The ADS stream uses wait_for_ready inside the XdsTransport interface,
2998   // so when the channel reconnects, the already-started stream will proceed.
2999   // Server sends a response with the same resource contents as before.
3000   stream->SendMessageToClient(
3001       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3002           .set_version_info("1")
3003           .set_nonce("A")
3004           .AddFooResource(XdsFooResource("foo1", 6))
3005           .Serialize());
3006   // The resource hasn't changed, so XdsClient will not call the
3007   // watchers' OnResourceChanged() methods.  However, it will call
3008   // OnAmbientError() with an OK status to let them know that the
3009   // ambient error is gone.
3010   error = watcher->WaitForNextAmbientError();
3011   ASSERT_TRUE(error.has_value());
3012   EXPECT_EQ(*error, absl::OkStatus());
3013   error = watcher2->WaitForNextAmbientError();
3014   ASSERT_TRUE(error.has_value());
3015   EXPECT_EQ(*error, absl::OkStatus());
3016   // XdsClient should have sent an ACK message to the xDS server.
3017   request = WaitForRequest(stream.get());
3018   ASSERT_TRUE(request.has_value());
3019   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3020                /*version_info=*/"1", /*response_nonce=*/"A",
3021                /*error_detail=*/absl::OkStatus(),
3022                /*resource_names=*/{"foo1"});
3023   // Cancel watches.
3024   CancelFooWatch(watcher.get(), "foo1");
3025   CancelFooWatch(watcher2.get(), "foo1");
3026   EXPECT_TRUE(stream->IsOrphaned());
3027 }
3028 
TEST_F(XdsClientTest,ResourceDoesNotExistUponTimeout)3029 TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) {
3030   InitXdsClient();
3031   // Start a watch for "foo1".
3032   auto watcher = StartFooWatch("foo1");
3033   // Watcher should initially not see any resource reported.
3034   EXPECT_FALSE(watcher->HasEvent());
3035   // Check metric data.
3036   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3037               ::testing::ElementsAre());
3038   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3039               ::testing::ElementsAre());
3040   EXPECT_THAT(GetResourceCounts(),
3041               ::testing::ElementsAre(::testing::Pair(
3042                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3043                                         XdsFooResourceType::Get()->type_url(),
3044                                         "requested"),
3045                   1)));
3046   // CSDS should show that the resource has been requested.
3047   ClientConfig csds = DumpCsds();
3048   EXPECT_THAT(csds.generic_xds_configs(),
3049               ::testing::ElementsAre(CsdsResourceRequested(
3050                   XdsFooResourceType::Get()->type_url(), "foo1")));
3051   // XdsClient should have created an ADS stream.
3052   auto stream = WaitForAdsStream();
3053   ASSERT_TRUE(stream != nullptr);
3054   // XdsClient should have sent a subscription request on the ADS stream.
3055   auto request = WaitForRequest(stream.get());
3056   ASSERT_TRUE(request.has_value());
3057   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3058                /*version_info=*/"", /*response_nonce=*/"",
3059                /*error_detail=*/absl::OkStatus(),
3060                /*resource_names=*/{"foo1"});
3061   CheckRequestNode(*request);  // Should be present on the first request.
3062   // Do not send a response, but wait for the resource to be reported as
3063   // not existing.
3064   EXPECT_TRUE(watcher->WaitForDoesNotExist());
3065   // Check metric data.
3066   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3067       ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
3068   EXPECT_THAT(GetResourceCounts(),
3069               ::testing::ElementsAre(::testing::Pair(
3070                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3071                                         XdsFooResourceType::Get()->type_url(),
3072                                         "does_not_exist"),
3073                   1)));
3074   // CSDS should show that the resource has been requested.
3075   csds = DumpCsds();
3076   EXPECT_THAT(csds.generic_xds_configs(),
3077               ::testing::ElementsAre(CsdsResourceDoesNotExist(
3078                   XdsFooResourceType::Get()->type_url(), "foo1")));
3079   // Start a new watcher for the same resource.  It should immediately
3080   // receive the same does-not-exist notification.
3081   auto watcher2 = StartFooWatch("foo1");
3082   EXPECT_TRUE(watcher2->WaitForDoesNotExist());
3083   // Now server sends a response.
3084   stream->SendMessageToClient(
3085       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3086           .set_version_info("1")
3087           .set_nonce("A")
3088           .AddFooResource(XdsFooResource("foo1", 6))
3089           .Serialize());
3090   // XdsClient should have delivered the response to the watchers.
3091   auto resource = watcher->WaitForNextResource();
3092   ASSERT_NE(resource, nullptr);
3093   EXPECT_EQ(resource->name, "foo1");
3094   EXPECT_EQ(resource->value, 6);
3095   resource = watcher2->WaitForNextResource();
3096   ASSERT_NE(resource, nullptr);
3097   EXPECT_EQ(resource->name, "foo1");
3098   EXPECT_EQ(resource->value, 6);
3099   // Check metric data.
3100   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3101       ::testing::ElementsAre(::testing::Pair(
3102           ::testing::Pair(kDefaultXdsServerUrl,
3103                           XdsFooResourceType::Get()->type_url()),
3104           1)),
3105       ::testing::ElementsAre(), ::testing::_));
3106   EXPECT_THAT(
3107       GetResourceCounts(),
3108       ::testing::ElementsAre(::testing::Pair(
3109           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3110                                 XdsFooResourceType::Get()->type_url(), "acked"),
3111           1)));
3112   // Check CSDS data.
3113   csds = DumpCsds();
3114   EXPECT_THAT(csds.generic_xds_configs(),
3115               ::testing::UnorderedElementsAre(CsdsResourceAcked(
3116                   XdsFooResourceType::Get()->type_url(), "foo1",
3117                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3118   // XdsClient should have sent an ACK message to the xDS server.
3119   request = WaitForRequest(stream.get());
3120   ASSERT_TRUE(request.has_value());
3121   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3122                /*version_info=*/"1", /*response_nonce=*/"A",
3123                /*error_detail=*/absl::OkStatus(),
3124                /*resource_names=*/{"foo1"});
3125   // Cancel watch.
3126   CancelFooWatch(watcher.get(), "foo1");
3127   CancelFooWatch(watcher2.get(), "foo1");
3128   EXPECT_TRUE(stream->IsOrphaned());
3129 }
3130 
TEST_F(XdsClientTest,ResourceDoesNotExistAfterStreamRestart)3131 TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) {
3132   InitXdsClient();
3133   // Metrics should initially be empty.
3134   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3135               ::testing::ElementsAre());
3136   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3137               ::testing::ElementsAre());
3138   EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
3139   // Start a watch for "foo1".
3140   auto watcher = StartFooWatch("foo1");
3141   // Check metric data.
3142   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3143               ::testing::ElementsAre());
3144   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3145               ::testing::ElementsAre());
3146   EXPECT_THAT(GetResourceCounts(),
3147               ::testing::ElementsAre(::testing::Pair(
3148                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3149                                         XdsFooResourceType::Get()->type_url(),
3150                                         "requested"),
3151                   1)));
3152   // CSDS should show that the resource has been requested.
3153   ClientConfig csds = DumpCsds();
3154   EXPECT_THAT(csds.generic_xds_configs(),
3155               ::testing::ElementsAre(CsdsResourceRequested(
3156                   XdsFooResourceType::Get()->type_url(), "foo1")));
3157   // Watcher should initially not see any resource reported.
3158   EXPECT_FALSE(watcher->HasEvent());
3159   // XdsClient should have created an ADS stream.
3160   auto stream = WaitForAdsStream();
3161   ASSERT_TRUE(stream != nullptr);
3162   // XdsClient should have sent a subscription request on the ADS stream.
3163   auto request = WaitForRequest(stream.get());
3164   ASSERT_TRUE(request.has_value());
3165   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3166                /*version_info=*/"", /*response_nonce=*/"",
3167                /*error_detail=*/absl::OkStatus(),
3168                /*resource_names=*/{"foo1"});
3169   CheckRequestNode(*request);  // Should be present on the first request.
3170   // Stream fails.
3171   stream->MaybeSendStatusToClient(absl::UnavailableError("ugh"));
3172   // XdsClient should report error to watcher.
3173   auto error = watcher->WaitForNextError();
3174   ASSERT_TRUE(error.has_value());
3175   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
3176   EXPECT_EQ(error->message(),
3177             "xDS channel for server default_xds_server: xDS call failed "
3178             "with no responses received; status: UNAVAILABLE: ugh "
3179             "(node ID:xds_client_test)")
3180       << *error;
3181   // Check metric data.
3182   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3183       ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
3184   EXPECT_THAT(GetResourceCounts(),
3185               ::testing::ElementsAre(::testing::Pair(
3186                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3187                                         XdsFooResourceType::Get()->type_url(),
3188                                         "requested"),
3189                   1)));
3190   // CSDS should show that the resource has been requested.
3191   csds = DumpCsds();
3192   EXPECT_THAT(csds.generic_xds_configs(),
3193               ::testing::ElementsAre(CsdsResourceRequested(
3194                   XdsFooResourceType::Get()->type_url(), "foo1")));
3195   // XdsClient should create a new stream.
3196   stream = WaitForAdsStream();
3197   ASSERT_TRUE(stream != nullptr);
3198   // XdsClient sends a subscription request.
3199   request = WaitForRequest(stream.get());
3200   ASSERT_TRUE(request.has_value());
3201   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3202                /*version_info=*/"", /*response_nonce=*/"",
3203                /*error_detail=*/absl::OkStatus(),
3204                /*resource_names=*/{"foo1"});
3205   CheckRequestNode(*request);  // Should be present on the first request.
3206   // Server does NOT send a response immediately.
3207   // Client should receive a resource does-not-exist.
3208   ASSERT_TRUE(watcher->WaitForDoesNotExist());
3209   // Check metric data.
3210   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3211       ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
3212   EXPECT_THAT(GetResourceCounts(),
3213               ::testing::ElementsAre(::testing::Pair(
3214                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3215                                         XdsFooResourceType::Get()->type_url(),
3216                                         "does_not_exist"),
3217                   1)));
3218   // CSDS should show that the resource has been requested.
3219   csds = DumpCsds();
3220   EXPECT_THAT(csds.generic_xds_configs(),
3221               ::testing::ElementsAre(CsdsResourceDoesNotExist(
3222                   XdsFooResourceType::Get()->type_url(), "foo1")));
3223   // Server now sends the requested resource.
3224   stream->SendMessageToClient(
3225       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3226           .set_version_info("1")
3227           .set_nonce("A")
3228           .AddFooResource(XdsFooResource("foo1", 6))
3229           .Serialize());
3230   // The resource is delivered to the watcher.
3231   auto resource = watcher->WaitForNextResource();
3232   ASSERT_NE(resource, nullptr);
3233   EXPECT_EQ(resource->name, "foo1");
3234   EXPECT_EQ(resource->value, 6);
3235   // Check metric data.
3236   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3237       ::testing::ElementsAre(::testing::Pair(
3238           ::testing::Pair(kDefaultXdsServerUrl,
3239                           XdsFooResourceType::Get()->type_url()),
3240           1)),
3241       ::testing::ElementsAre(), ::testing::_));
3242   EXPECT_THAT(
3243       GetResourceCounts(),
3244       ::testing::ElementsAre(::testing::Pair(
3245           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3246                                 XdsFooResourceType::Get()->type_url(), "acked"),
3247           1)));
3248   // Check CSDS data.
3249   csds = DumpCsds();
3250   EXPECT_THAT(csds.generic_xds_configs(),
3251               ::testing::UnorderedElementsAre(CsdsResourceAcked(
3252                   XdsFooResourceType::Get()->type_url(), "foo1",
3253                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3254   // XdsClient sends an ACK.
3255   request = WaitForRequest(stream.get());
3256   ASSERT_TRUE(request.has_value());
3257   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3258                /*version_info=*/"1", /*response_nonce=*/"A",
3259                /*error_detail=*/absl::OkStatus(),
3260                /*resource_names=*/{"foo1"});
3261   // Cancel watcher.
3262   CancelFooWatch(watcher.get(), "foo1");
3263   EXPECT_TRUE(stream->IsOrphaned());
3264 }
3265 
TEST_F(XdsClientTest,DoesNotExistTimerNotStartedUntilSendCompletes)3266 TEST_F(XdsClientTest, DoesNotExistTimerNotStartedUntilSendCompletes) {
3267   InitXdsClient();
3268   // Tell transport to let us manually trigger completion of the
3269   // send_message ops to XdsClient.
3270   transport_factory_->SetAutoCompleteMessagesFromClient(false);
3271   // Start a watch for "foo1".
3272   auto watcher = StartFooWatch("foo1");
3273   // Watcher should initially not see any resource reported.
3274   EXPECT_FALSE(watcher->HasEvent());
3275   // XdsClient should have created an ADS stream.
3276   auto stream = WaitForAdsStream();
3277   ASSERT_TRUE(stream != nullptr);
3278   // XdsClient should have sent a subscription request on the ADS stream.
3279   auto request = WaitForRequest(stream.get());
3280   ASSERT_TRUE(request.has_value());
3281   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3282                /*version_info=*/"", /*response_nonce=*/"",
3283                /*error_detail=*/absl::OkStatus(),
3284                /*resource_names=*/{"foo1"});
3285   CheckRequestNode(*request);  // Should be present on the first request.
3286   // Server does NOT send a response.
3287   // We should not see a resource-does-not-exist event, because the
3288   // timer should not be running while the channel is disconnected.
3289   EXPECT_TRUE(watcher->ExpectNoEvent());
3290   // Check metric data.
3291   EXPECT_THAT(GetResourceCounts(),
3292               ::testing::ElementsAre(::testing::Pair(
3293                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3294                                         XdsFooResourceType::Get()->type_url(),
3295                                         "requested"),
3296                   1)));
3297   // CSDS should show that the resource has been requested.
3298   ClientConfig csds = DumpCsds();
3299   EXPECT_THAT(csds.generic_xds_configs(),
3300               ::testing::ElementsAre(CsdsResourceRequested(
3301                   XdsFooResourceType::Get()->type_url(), "foo1")));
3302   // The ADS stream uses wait_for_ready inside the XdsTransport interface,
3303   // so when the channel connects, the already-started stream will proceed.
3304   stream->CompleteSendMessageFromClient();
3305   // Server does NOT send a response.
3306   // Watcher should see a does-not-exist event.
3307   EXPECT_TRUE(watcher->WaitForDoesNotExist());
3308   // Check metric data.
3309   EXPECT_THAT(GetResourceCounts(),
3310               ::testing::ElementsAre(::testing::Pair(
3311                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3312                                         XdsFooResourceType::Get()->type_url(),
3313                                         "does_not_exist"),
3314                   1)));
3315   // CSDS should show that the resource has been requested.
3316   csds = DumpCsds();
3317   EXPECT_THAT(csds.generic_xds_configs(),
3318               ::testing::ElementsAre(CsdsResourceDoesNotExist(
3319                   XdsFooResourceType::Get()->type_url(), "foo1")));
3320   // Now server sends a response.
3321   stream->SendMessageToClient(
3322       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3323           .set_version_info("1")
3324           .set_nonce("A")
3325           .AddFooResource(XdsFooResource("foo1", 6))
3326           .Serialize());
3327   // XdsClient should have delivered the response to the watcher.
3328   auto resource = watcher->WaitForNextResource();
3329   ASSERT_NE(resource, nullptr);
3330   EXPECT_EQ(resource->name, "foo1");
3331   EXPECT_EQ(resource->value, 6);
3332   // Check metric data.
3333   EXPECT_THAT(
3334       GetResourceCounts(),
3335       ::testing::ElementsAre(::testing::Pair(
3336           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3337                                 XdsFooResourceType::Get()->type_url(), "acked"),
3338           1)));
3339   // Check CSDS data.
3340   csds = DumpCsds();
3341   EXPECT_THAT(csds.generic_xds_configs(),
3342               ::testing::UnorderedElementsAre(CsdsResourceAcked(
3343                   XdsFooResourceType::Get()->type_url(), "foo1",
3344                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3345   // XdsClient should have sent an ACK message to the xDS server.
3346   request = WaitForRequest(stream.get());
3347   ASSERT_TRUE(request.has_value());
3348   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3349                /*version_info=*/"1", /*response_nonce=*/"A",
3350                /*error_detail=*/absl::OkStatus(),
3351                /*resource_names=*/{"foo1"});
3352   stream->CompleteSendMessageFromClient();
3353   // Cancel watch.
3354   CancelFooWatch(watcher.get(), "foo1");
3355   EXPECT_TRUE(stream->IsOrphaned());
3356 }
3357 
3358 // In https://github.com/grpc/grpc/issues/29583, we ran into a case
3359 // where we wound up starting a timer after we had already received the
3360 // resource, thus incorrectly reporting the resource as not existing.
3361 // This happened when unsubscribing and then resubscribing to the same
3362 // resource a send_message op was already in flight and then receiving an
3363 // update containing that resource.
TEST_F(XdsClientTest,ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending)3364 TEST_F(XdsClientTest,
3365        ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending) {
3366   InitXdsClient();
3367   // Tell transport to let us manually trigger completion of the
3368   // send_message ops to XdsClient.
3369   transport_factory_->SetAutoCompleteMessagesFromClient(false);
3370   // Start a watch for "foo1".
3371   auto watcher = StartFooWatch("foo1");
3372   // Watcher should initially not see any resource reported.
3373   EXPECT_FALSE(watcher->HasEvent());
3374   // XdsClient should have created an ADS stream.
3375   auto stream = WaitForAdsStream();
3376   ASSERT_TRUE(stream != nullptr);
3377   // XdsClient should have sent a subscription request on the ADS stream.
3378   auto request = WaitForRequest(stream.get());
3379   ASSERT_TRUE(request.has_value());
3380   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3381                /*version_info=*/"", /*response_nonce=*/"",
3382                /*error_detail=*/absl::OkStatus(),
3383                /*resource_names=*/{"foo1"});
3384   CheckRequestNode(*request);  // Should be present on the first request.
3385   stream->CompleteSendMessageFromClient();
3386   // Server sends a response.
3387   stream->SendMessageToClient(
3388       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3389           .set_version_info("1")
3390           .set_nonce("A")
3391           .AddFooResource(XdsFooResource("foo1", 6))
3392           .Serialize());
3393   // XdsClient should have delivered the response to the watchers.
3394   auto resource = watcher->WaitForNextResource();
3395   ASSERT_NE(resource, nullptr);
3396   EXPECT_EQ(resource->name, "foo1");
3397   EXPECT_EQ(resource->value, 6);
3398   // Check metric data.
3399   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3400       ::testing::ElementsAre(::testing::Pair(
3401           ::testing::Pair(kDefaultXdsServerUrl,
3402                           XdsFooResourceType::Get()->type_url()),
3403           1)),
3404       ::testing::ElementsAre(), ::testing::_));
3405   EXPECT_THAT(
3406       GetResourceCounts(),
3407       ::testing::ElementsAre(::testing::Pair(
3408           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3409                                 XdsFooResourceType::Get()->type_url(), "acked"),
3410           1)));
3411   // Check CSDS data.
3412   ClientConfig csds = DumpCsds();
3413   EXPECT_THAT(csds.generic_xds_configs(),
3414               ::testing::UnorderedElementsAre(CsdsResourceAcked(
3415                   XdsFooResourceType::Get()->type_url(), "foo1",
3416                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3417   // XdsClient should have sent an ACK message to the xDS server.
3418   request = WaitForRequest(stream.get());
3419   ASSERT_TRUE(request.has_value());
3420   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3421                /*version_info=*/"1", /*response_nonce=*/"A",
3422                /*error_detail=*/absl::OkStatus(),
3423                /*resource_names=*/{"foo1"});
3424   stream->CompleteSendMessageFromClient();
3425   // Start a watch for a second resource.
3426   auto watcher2 = StartFooWatch("foo2");
3427   // Watcher should initially not see any resource reported.
3428   EXPECT_FALSE(watcher2->HasEvent());
3429   // Check metric data.
3430   EXPECT_THAT(
3431       GetResourceCounts(),
3432       ::testing::ElementsAre(
3433           ::testing::Pair(ResourceCountLabelsEq(
3434                               XdsClient::kOldStyleAuthority,
3435                               XdsFooResourceType::Get()->type_url(), "acked"),
3436                           1),
3437           ::testing::Pair(
3438               ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3439                                     XdsFooResourceType::Get()->type_url(),
3440                                     "requested"),
3441               1)));
3442   // Check CSDS data.
3443   csds = DumpCsds();
3444   EXPECT_THAT(csds.generic_xds_configs(),
3445               ::testing::UnorderedElementsAre(
3446                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
3447                                     "foo1", resource->AsJsonString(), "1",
3448                                     TimestampProtoEq(kTime0)),
3449                   CsdsResourceRequested(XdsFooResourceType::Get()->type_url(),
3450                                         "foo2")));
3451   // XdsClient sends a request to subscribe to the new resource.
3452   request = WaitForRequest(stream.get());
3453   ASSERT_TRUE(request.has_value());
3454   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3455                /*version_info=*/"1", /*response_nonce=*/"A",
3456                /*error_detail=*/absl::OkStatus(),
3457                /*resource_names=*/{"foo1", "foo2"});
3458   // NOTE: We do NOT yet tell the XdsClient that the send_message op is
3459   // complete.
3460   // Unsubscribe from foo1 and then re-subscribe to it.
3461   CancelFooWatch(watcher.get(), "foo1");
3462   EXPECT_THAT(GetResourceCounts(),
3463               ::testing::ElementsAre(::testing::Pair(
3464                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3465                                         XdsFooResourceType::Get()->type_url(),
3466                                         "requested"),
3467                   1)));
3468   csds = DumpCsds();
3469   EXPECT_THAT(csds.generic_xds_configs(),
3470               ::testing::ElementsAre(CsdsResourceRequested(
3471                   XdsFooResourceType::Get()->type_url(), "foo2")));
3472   watcher = StartFooWatch("foo1");
3473   EXPECT_THAT(GetResourceCounts(),
3474               ::testing::ElementsAre(::testing::Pair(
3475                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3476                                         XdsFooResourceType::Get()->type_url(),
3477                                         "requested"),
3478                   2)));
3479   csds = DumpCsds();
3480   EXPECT_THAT(
3481       csds.generic_xds_configs(),
3482       ::testing::ElementsAre(
3483           CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo1"),
3484           CsdsResourceRequested(XdsFooResourceType::Get()->type_url(),
3485                                 "foo2")));
3486   // Now send a response from the server containing both foo1 and foo2.
3487   // We increment time to make sure that the CSDS data gets a new timestamp.
3488   time_cache_.TestOnlySetNow(kTime1);
3489   stream->SendMessageToClient(
3490       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3491           .set_version_info("1")
3492           .set_nonce("B")
3493           .AddFooResource(XdsFooResource("foo1", 6))
3494           .AddFooResource(XdsFooResource("foo2", 7))
3495           .Serialize());
3496   // The watcher for foo1 will receive an update even if the resource
3497   // has not changed, since the previous value was removed from the
3498   // cache when we unsubscribed.
3499   resource = watcher->WaitForNextResource();
3500   ASSERT_NE(resource, nullptr);
3501   EXPECT_EQ(resource->name, "foo1");
3502   EXPECT_EQ(resource->value, 6);
3503   // For foo2, the watcher should receive notification for the new resource.
3504   auto resource2 = watcher2->WaitForNextResource();
3505   ASSERT_NE(resource2, nullptr);
3506   EXPECT_EQ(resource2->name, "foo2");
3507   EXPECT_EQ(resource2->value, 7);
3508   // Check metric data.
3509   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3510       ::testing::ElementsAre(::testing::Pair(
3511           ::testing::Pair(kDefaultXdsServerUrl,
3512                           XdsFooResourceType::Get()->type_url()),
3513           3)),
3514       ::testing::ElementsAre(), ::testing::_));
3515   EXPECT_THAT(
3516       GetResourceCounts(),
3517       ::testing::ElementsAre(::testing::Pair(
3518           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3519                                 XdsFooResourceType::Get()->type_url(), "acked"),
3520           2)));
3521   // Check CSDS data.
3522   csds = DumpCsds();
3523   EXPECT_THAT(csds.generic_xds_configs(),
3524               ::testing::UnorderedElementsAre(
3525                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
3526                                     "foo1", resource->AsJsonString(), "1",
3527                                     TimestampProtoEq(kTime1)),
3528                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
3529                                     "foo2", resource2->AsJsonString(), "1",
3530                                     TimestampProtoEq(kTime1))));
3531   // Now we finally tell XdsClient that its previous send_message op is
3532   // complete.
3533   stream->CompleteSendMessageFromClient();
3534   // XdsClient should send an ACK with the updated subscription list
3535   // (which happens to be identical to the old list), and it should not
3536   // restart the does-not-exist timer.
3537   request = WaitForRequest(stream.get());
3538   ASSERT_TRUE(request.has_value());
3539   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3540                /*version_info=*/"1", /*response_nonce=*/"B",
3541                /*error_detail=*/absl::OkStatus(),
3542                /*resource_names=*/{"foo1", "foo2"});
3543   stream->CompleteSendMessageFromClient();
3544   // Make sure the watcher for foo1 does not see a does-not-exist event.
3545   EXPECT_TRUE(watcher->ExpectNoEvent());
3546   // Cancel watches.
3547   CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
3548   CancelFooWatch(watcher2.get(), "foo2");
3549   EXPECT_TRUE(stream->IsOrphaned());
3550 }
3551 
TEST_F(XdsClientTest,DoNotSendDoesNotExistForCachedResource)3552 TEST_F(XdsClientTest, DoNotSendDoesNotExistForCachedResource) {
3553   InitXdsClient();
3554   // Start a watch for "foo1".
3555   auto watcher = StartFooWatch("foo1");
3556   // Watcher should initially not see any resource reported.
3557   EXPECT_FALSE(watcher->HasEvent());
3558   // XdsClient should have created an ADS stream.
3559   auto stream = WaitForAdsStream();
3560   ASSERT_TRUE(stream != nullptr);
3561   // XdsClient should have sent a subscription request on the ADS stream.
3562   auto request = WaitForRequest(stream.get());
3563   ASSERT_TRUE(request.has_value());
3564   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3565                /*version_info=*/"", /*response_nonce=*/"",
3566                /*error_detail=*/absl::OkStatus(),
3567                /*resource_names=*/{"foo1"});
3568   CheckRequestNode(*request);  // Should be present on the first request.
3569   // Server sends a response.
3570   stream->SendMessageToClient(
3571       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3572           .set_version_info("1")
3573           .set_nonce("A")
3574           .AddFooResource(XdsFooResource("foo1", 6))
3575           .Serialize());
3576   // XdsClient should have delivered the response to the watcher.
3577   auto resource = watcher->WaitForNextResource();
3578   ASSERT_NE(resource, nullptr);
3579   EXPECT_EQ(resource->name, "foo1");
3580   EXPECT_EQ(resource->value, 6);
3581   // Check metric data.
3582   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3583       ::testing::ElementsAre(::testing::Pair(
3584           ::testing::Pair(kDefaultXdsServerUrl,
3585                           XdsFooResourceType::Get()->type_url()),
3586           1)),
3587       ::testing::ElementsAre(), ::testing::_));
3588   EXPECT_THAT(
3589       GetResourceCounts(),
3590       ::testing::ElementsAre(::testing::Pair(
3591           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3592                                 XdsFooResourceType::Get()->type_url(), "acked"),
3593           1)));
3594   // Check CSDS data.
3595   ClientConfig csds = DumpCsds();
3596   EXPECT_THAT(csds.generic_xds_configs(),
3597               ::testing::UnorderedElementsAre(CsdsResourceAcked(
3598                   XdsFooResourceType::Get()->type_url(), "foo1",
3599                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3600   // XdsClient should have sent an ACK message to the xDS server.
3601   request = WaitForRequest(stream.get());
3602   ASSERT_TRUE(request.has_value());
3603   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3604                /*version_info=*/"1", /*response_nonce=*/"A",
3605                /*error_detail=*/absl::OkStatus(),
3606                /*resource_names=*/{"foo1"});
3607   // Stream fails because of transport disconnection.
3608   stream->MaybeSendStatusToClient(absl::UnavailableError("connection failed"));
3609   // XdsClient should NOT report error to watcher, because we saw a
3610   // response on the stream before it failed.
3611   // XdsClient creates a new stream.
3612   stream = WaitForAdsStream();
3613   ASSERT_TRUE(stream != nullptr);
3614   // XdsClient should have sent a subscription request on the ADS stream.
3615   request = WaitForRequest(stream.get());
3616   ASSERT_TRUE(request.has_value());
3617   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3618                /*version_info=*/"1", /*response_nonce=*/"",
3619                /*error_detail=*/absl::OkStatus(),
3620                /*resource_names=*/{"foo1"});
3621   CheckRequestNode(*request);  // Should be present on the first request.
3622   // Server does NOT send a response.
3623   // We should not see a resource-does-not-exist event, because the
3624   // resource was already cached, so the server can optimize by not
3625   // resending it.
3626   EXPECT_TRUE(watcher->ExpectNoEvent());
3627   // Check metric data.
3628   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3629       ::testing::ElementsAre(::testing::Pair(
3630           ::testing::Pair(kDefaultXdsServerUrl,
3631                           XdsFooResourceType::Get()->type_url()),
3632           1)),
3633       ::testing::ElementsAre(), ::testing::_));
3634   EXPECT_THAT(
3635       GetResourceCounts(),
3636       ::testing::ElementsAre(::testing::Pair(
3637           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3638                                 XdsFooResourceType::Get()->type_url(), "acked"),
3639           1)));
3640   // Check CSDS data.
3641   csds = DumpCsds();
3642   EXPECT_THAT(csds.generic_xds_configs(),
3643               ::testing::UnorderedElementsAre(CsdsResourceAcked(
3644                   XdsFooResourceType::Get()->type_url(), "foo1",
3645                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3646   // Now server sends a response.
3647   // We increment time to make sure that the CSDS data gets a new timestamp.
3648   time_cache_.TestOnlySetNow(kTime1);
3649   stream->SendMessageToClient(
3650       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3651           .set_version_info("1")
3652           .set_nonce("A")
3653           .AddFooResource(XdsFooResource("foo1", 6))
3654           .Serialize());
3655   // Watcher will not see any update, since the resource is unchanged.
3656   EXPECT_TRUE(watcher->ExpectNoEvent());
3657   // Check metric data.
3658   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3659       ::testing::ElementsAre(::testing::Pair(
3660           ::testing::Pair(kDefaultXdsServerUrl,
3661                           XdsFooResourceType::Get()->type_url()),
3662           2)),
3663       ::testing::ElementsAre(), ::testing::_));
3664   EXPECT_THAT(
3665       GetResourceCounts(),
3666       ::testing::ElementsAre(::testing::Pair(
3667           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3668                                 XdsFooResourceType::Get()->type_url(), "acked"),
3669           1)));
3670   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3671                                           kDefaultXdsServerUrl, true)));
3672   // Check CSDS data.
3673   csds = DumpCsds();
3674   EXPECT_THAT(csds.generic_xds_configs(),
3675               ::testing::UnorderedElementsAre(CsdsResourceAcked(
3676                   XdsFooResourceType::Get()->type_url(), "foo1",
3677                   resource->AsJsonString(), "1", TimestampProtoEq(kTime1))));
3678   // XdsClient should have sent an ACK message to the xDS server.
3679   request = WaitForRequest(stream.get());
3680   ASSERT_TRUE(request.has_value());
3681   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3682                /*version_info=*/"1", /*response_nonce=*/"A",
3683                /*error_detail=*/absl::OkStatus(),
3684                /*resource_names=*/{"foo1"});
3685   // Cancel watch.
3686   CancelFooWatch(watcher.get(), "foo1");
3687   EXPECT_TRUE(stream->IsOrphaned());
3688 }
3689 
TEST_F(XdsClientTest,ResourceWrappedInResourceMessage)3690 TEST_F(XdsClientTest, ResourceWrappedInResourceMessage) {
3691   InitXdsClient();
3692   // Start a watch for "foo1".
3693   auto watcher = StartFooWatch("foo1");
3694   // Watcher should initially not see any resource reported.
3695   EXPECT_FALSE(watcher->HasEvent());
3696   // XdsClient should have created an ADS stream.
3697   auto stream = WaitForAdsStream();
3698   ASSERT_TRUE(stream != nullptr);
3699   // XdsClient should have sent a subscription request on the ADS stream.
3700   auto request = WaitForRequest(stream.get());
3701   ASSERT_TRUE(request.has_value());
3702   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3703                /*version_info=*/"", /*response_nonce=*/"",
3704                /*error_detail=*/absl::OkStatus(),
3705                /*resource_names=*/{"foo1"});
3706   CheckRequestNode(*request);  // Should be present on the first request.
3707   // Send a response with the resource wrapped in a Resource message.
3708   stream->SendMessageToClient(
3709       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3710           .set_version_info("1")
3711           .set_nonce("A")
3712           .AddFooResource(XdsFooResource("foo1", 6),
3713                           /*in_resource_wrapper=*/true)
3714           .Serialize());
3715   // XdsClient should have delivered the response to the watcher.
3716   auto resource = watcher->WaitForNextResource();
3717   ASSERT_NE(resource, nullptr);
3718   EXPECT_EQ(resource->name, "foo1");
3719   EXPECT_EQ(resource->value, 6);
3720   // Check metric data.
3721   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3722       ::testing::ElementsAre(::testing::Pair(
3723           ::testing::Pair(kDefaultXdsServerUrl,
3724                           XdsFooResourceType::Get()->type_url()),
3725           1)),
3726       ::testing::ElementsAre(), ::testing::_));
3727   EXPECT_THAT(
3728       GetResourceCounts(),
3729       ::testing::ElementsAre(::testing::Pair(
3730           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3731                                 XdsFooResourceType::Get()->type_url(), "acked"),
3732           1)));
3733   // Check CSDS data.
3734   ClientConfig csds = DumpCsds();
3735   EXPECT_THAT(csds.generic_xds_configs(),
3736               ::testing::UnorderedElementsAre(CsdsResourceAcked(
3737                   XdsFooResourceType::Get()->type_url(), "foo1",
3738                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3739   // XdsClient should have sent an ACK message to the xDS server.
3740   request = WaitForRequest(stream.get());
3741   ASSERT_TRUE(request.has_value());
3742   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3743                /*version_info=*/"1", /*response_nonce=*/"A",
3744                /*error_detail=*/absl::OkStatus(),
3745                /*resource_names=*/{"foo1"});
3746   // Cancel watch.
3747   CancelFooWatch(watcher.get(), "foo1");
3748   EXPECT_TRUE(stream->IsOrphaned());
3749 }
3750 
TEST_F(XdsClientTest,MultipleResourceTypes)3751 TEST_F(XdsClientTest, MultipleResourceTypes) {
3752   InitXdsClient();
3753   // Start a watch for "foo1".
3754   auto watcher = StartFooWatch("foo1");
3755   // Watcher should initially not see any resource reported.
3756   EXPECT_FALSE(watcher->HasEvent());
3757   // XdsClient should have created an ADS stream.
3758   auto stream = WaitForAdsStream();
3759   ASSERT_TRUE(stream != nullptr);
3760   // XdsClient should have sent a subscription request on the ADS stream.
3761   auto request = WaitForRequest(stream.get());
3762   ASSERT_TRUE(request.has_value());
3763   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3764                /*version_info=*/"", /*response_nonce=*/"",
3765                /*error_detail=*/absl::OkStatus(),
3766                /*resource_names=*/{"foo1"});
3767   CheckRequestNode(*request);  // Should be present on the first request.
3768   // Send a response.
3769   stream->SendMessageToClient(
3770       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3771           .set_version_info("1")
3772           .set_nonce("A")
3773           .AddFooResource(XdsFooResource("foo1", 6))
3774           .Serialize());
3775   // XdsClient should have delivered the response to the watcher.
3776   auto resource = watcher->WaitForNextResource();
3777   ASSERT_NE(resource, nullptr);
3778   EXPECT_EQ(resource->name, "foo1");
3779   EXPECT_EQ(resource->value, 6);
3780   // Check metric data.
3781   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3782       ::testing::ElementsAre(::testing::Pair(
3783           ::testing::Pair(kDefaultXdsServerUrl,
3784                           XdsFooResourceType::Get()->type_url()),
3785           1)),
3786       ::testing::ElementsAre(), ::testing::_));
3787   EXPECT_THAT(
3788       GetResourceCounts(),
3789       ::testing::ElementsAre(::testing::Pair(
3790           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3791                                 XdsFooResourceType::Get()->type_url(), "acked"),
3792           1)));
3793   // Check CSDS data.
3794   ClientConfig csds = DumpCsds();
3795   EXPECT_THAT(csds.generic_xds_configs(),
3796               ::testing::UnorderedElementsAre(CsdsResourceAcked(
3797                   XdsFooResourceType::Get()->type_url(), "foo1",
3798                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3799   // XdsClient should have sent an ACK message to the xDS server.
3800   request = WaitForRequest(stream.get());
3801   ASSERT_TRUE(request.has_value());
3802   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3803                /*version_info=*/"1", /*response_nonce=*/"A",
3804                /*error_detail=*/absl::OkStatus(),
3805                /*resource_names=*/{"foo1"});
3806   // Start a watch for "bar1".
3807   auto watcher2 = StartBarWatch("bar1");
3808   // XdsClient should have sent a subscription request on the ADS stream.
3809   // Note that version and nonce here do NOT use the values for Foo,
3810   // since each resource type has its own state.
3811   request = WaitForRequest(stream.get());
3812   ASSERT_TRUE(request.has_value());
3813   CheckRequest(*request, XdsBarResourceType::Get()->type_url(),
3814                /*version_info=*/"", /*response_nonce=*/"",
3815                /*error_detail=*/absl::OkStatus(),
3816                /*resource_names=*/{"bar1"});
3817   // Send a response.
3818   // We increment time to make sure that the CSDS data gets a new timestamp.
3819   time_cache_.TestOnlySetNow(kTime1);
3820   stream->SendMessageToClient(
3821       ResponseBuilder(XdsBarResourceType::Get()->type_url())
3822           .set_version_info("2")
3823           .set_nonce("B")
3824           .AddBarResource(XdsBarResource("bar1", "whee"))
3825           .Serialize());
3826   // XdsClient should have delivered the response to the watcher.
3827   auto resource2 = watcher2->WaitForNextResource();
3828   ASSERT_NE(resource, nullptr);
3829   EXPECT_EQ(resource2->name, "bar1");
3830   EXPECT_EQ(resource2->value, "whee");
3831   // Check metric data.
3832   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3833       ::testing::ElementsAre(
3834           ::testing::Pair(
3835               ::testing::Pair(kDefaultXdsServerUrl,
3836                               XdsBarResourceType::Get()->type_url()),
3837               1),
3838           ::testing::Pair(
3839               ::testing::Pair(kDefaultXdsServerUrl,
3840                               XdsFooResourceType::Get()->type_url()),
3841               1)),
3842       ::testing::ElementsAre(), ::testing::_));
3843   EXPECT_THAT(
3844       GetResourceCounts(),
3845       ::testing::UnorderedElementsAre(
3846           ::testing::Pair(ResourceCountLabelsEq(
3847                               XdsClient::kOldStyleAuthority,
3848                               XdsBarResourceType::Get()->type_url(), "acked"),
3849                           1),
3850           ::testing::Pair(ResourceCountLabelsEq(
3851                               XdsClient::kOldStyleAuthority,
3852                               XdsFooResourceType::Get()->type_url(), "acked"),
3853                           1)));
3854   // Check CSDS data.
3855   csds = DumpCsds();
3856   EXPECT_THAT(csds.generic_xds_configs(),
3857               ::testing::UnorderedElementsAre(
3858                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
3859                                     "foo1", resource->AsJsonString(), "1",
3860                                     TimestampProtoEq(kTime0)),
3861                   CsdsResourceAcked(XdsBarResourceType::Get()->type_url(),
3862                                     "bar1", resource2->AsJsonString(), "2",
3863                                     TimestampProtoEq(kTime1))));
3864   // XdsClient should have sent an ACK message to the xDS server.
3865   request = WaitForRequest(stream.get());
3866   ASSERT_TRUE(request.has_value());
3867   CheckRequest(*request, XdsBarResourceType::Get()->type_url(),
3868                /*version_info=*/"2", /*response_nonce=*/"B",
3869                /*error_detail=*/absl::OkStatus(),
3870                /*resource_names=*/{"bar1"});
3871   // Cancel watch for "foo1".
3872   CancelFooWatch(watcher.get(), "foo1");
3873   // XdsClient should send an unsubscription request.
3874   request = WaitForRequest(stream.get());
3875   ASSERT_TRUE(request.has_value());
3876   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3877                /*version_info=*/"1", /*response_nonce=*/"A",
3878                /*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
3879   // Server sends an empty response for the resource type.
3880   // (The server doesn't need to do this, but it may.)
3881   stream->SendMessageToClient(
3882       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3883           .set_version_info("1")
3884           .set_nonce("C")
3885           .Serialize());
3886   // Client should ACK.
3887   request = WaitForRequest(stream.get());
3888   ASSERT_TRUE(request.has_value());
3889   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3890                /*version_info=*/"1", /*response_nonce=*/"C",
3891                /*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
3892   // Now subscribe to foo2.
3893   watcher = StartFooWatch("foo2");
3894   // Client sends a subscription request, which retains the nonce and
3895   // version seen previously.
3896   request = WaitForRequest(stream.get());
3897   ASSERT_TRUE(request.has_value());
3898   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3899                /*version_info=*/"1", /*response_nonce=*/"C",
3900                /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"});
3901   // Server sends foo2.
3902   stream->SendMessageToClient(
3903       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3904           .set_version_info("1")
3905           .set_nonce("D")
3906           .AddFooResource(XdsFooResource("foo2", 8))
3907           .Serialize());
3908   // Watcher receives the resource.
3909   resource = watcher->WaitForNextResource();
3910   ASSERT_NE(resource, nullptr);
3911   EXPECT_EQ(resource->name, "foo2");
3912   EXPECT_EQ(resource->value, 8);
3913   // Client ACKs.
3914   request = WaitForRequest(stream.get());
3915   ASSERT_TRUE(request.has_value());
3916   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3917                /*version_info=*/"1", /*response_nonce=*/"D",
3918                /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"});
3919   // Cancel watches.
3920   CancelFooWatch(watcher.get(), "foo2", /*delay_unsubscription=*/true);
3921   CancelBarWatch(watcher2.get(), "bar1");
3922   EXPECT_TRUE(stream->IsOrphaned());
3923 }
3924 
TEST_F(XdsClientTest,Federation)3925 TEST_F(XdsClientTest, Federation) {
3926   constexpr char kAuthority[] = "xds.example.com";
3927   const std::string kXdstpResourceName = absl::StrCat(
3928       "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
3929   FakeXdsBootstrap::FakeXdsServer authority_server("other_xds_server");
3930   FakeXdsBootstrap::FakeAuthority authority;
3931   authority.set_server(authority_server);
3932   InitXdsClient(
3933       FakeXdsBootstrap::Builder().AddAuthority(kAuthority, authority));
3934   // Metrics should initially be empty.
3935   EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3936               ::testing::ElementsAre());
3937   EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3938               ::testing::ElementsAre());
3939   EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
3940   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
3941   // Start a watch for "foo1".
3942   auto watcher = StartFooWatch("foo1");
3943   // Watcher should initially not see any resource reported.
3944   EXPECT_FALSE(watcher->HasEvent());
3945   // XdsClient should have created an ADS stream to the top-level xDS server.
3946   auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front());
3947   ASSERT_TRUE(stream != nullptr);
3948   // XdsClient should have sent a subscription request on the ADS stream.
3949   auto request = WaitForRequest(stream.get());
3950   ASSERT_TRUE(request.has_value());
3951   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3952                /*version_info=*/"", /*response_nonce=*/"",
3953                /*error_detail=*/absl::OkStatus(),
3954                /*resource_names=*/{"foo1"});
3955   CheckRequestNode(*request);  // Should be present on the first request.
3956   // Send a response.
3957   stream->SendMessageToClient(
3958       ResponseBuilder(XdsFooResourceType::Get()->type_url())
3959           .set_version_info("1")
3960           .set_nonce("A")
3961           .AddFooResource(XdsFooResource("foo1", 6))
3962           .Serialize());
3963   // XdsClient should have delivered the response to the watcher.
3964   auto resource = watcher->WaitForNextResource();
3965   ASSERT_NE(resource, nullptr);
3966   EXPECT_EQ(resource->name, "foo1");
3967   EXPECT_EQ(resource->value, 6);
3968   // Check metric data.
3969   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3970       ::testing::ElementsAre(::testing::Pair(
3971           ::testing::Pair(kDefaultXdsServerUrl,
3972                           XdsFooResourceType::Get()->type_url()),
3973           1)),
3974       ::testing::ElementsAre(), ::testing::_));
3975   EXPECT_THAT(
3976       GetResourceCounts(),
3977       ::testing::ElementsAre(::testing::Pair(
3978           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3979                                 XdsFooResourceType::Get()->type_url(), "acked"),
3980           1)));
3981   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3982                                           kDefaultXdsServerUrl, true)));
3983   // Check CSDS data.
3984   ClientConfig csds = DumpCsds();
3985   EXPECT_THAT(csds.generic_xds_configs(),
3986               ::testing::UnorderedElementsAre(CsdsResourceAcked(
3987                   XdsFooResourceType::Get()->type_url(), "foo1",
3988                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3989   // XdsClient should have sent an ACK message to the xDS server.
3990   request = WaitForRequest(stream.get());
3991   ASSERT_TRUE(request.has_value());
3992   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3993                /*version_info=*/"1", /*response_nonce=*/"A",
3994                /*error_detail=*/absl::OkStatus(),
3995                /*resource_names=*/{"foo1"});
3996   // Start a watch for the xdstp resource name.
3997   auto watcher2 = StartFooWatch(kXdstpResourceName);
3998   // Watcher should initially not see any resource reported.
3999   EXPECT_FALSE(watcher2->HasEvent());
4000   // Check metric data.
4001   EXPECT_THAT(
4002       GetResourceCounts(),
4003       ::testing::ElementsAre(
4004           ::testing::Pair(ResourceCountLabelsEq(
4005                               XdsClient::kOldStyleAuthority,
4006                               XdsFooResourceType::Get()->type_url(), "acked"),
4007                           1),
4008           ::testing::Pair(ResourceCountLabelsEq(
4009                               kAuthority, XdsFooResourceType::Get()->type_url(),
4010                               "requested"),
4011                           1)));
4012   EXPECT_THAT(GetServerConnections(),
4013               ::testing::ElementsAre(
4014                   ::testing::Pair(kDefaultXdsServerUrl, true),
4015                   ::testing::Pair(authority_server.server_uri(), true)));
4016   // Check CSDS data.
4017   csds = DumpCsds();
4018   EXPECT_THAT(csds.generic_xds_configs(),
4019               ::testing::UnorderedElementsAre(
4020                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4021                                     "foo1", resource->AsJsonString(), "1",
4022                                     TimestampProtoEq(kTime0)),
4023                   CsdsResourceRequested(XdsFooResourceType::Get()->type_url(),
4024                                         kXdstpResourceName)));
4025   // XdsClient will create a new stream to the server for this authority.
4026   auto stream2 = WaitForAdsStream(authority_server);
4027   ASSERT_TRUE(stream2 != nullptr);
4028   // XdsClient should have sent a subscription request on the ADS stream.
4029   // Note that version and nonce here do NOT use the values for Foo,
4030   // since each authority has its own state.
4031   request = WaitForRequest(stream2.get());
4032   ASSERT_TRUE(request.has_value());
4033   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4034                /*version_info=*/"", /*response_nonce=*/"",
4035                /*error_detail=*/absl::OkStatus(),
4036                /*resource_names=*/{kXdstpResourceName});
4037   CheckRequestNode(*request);  // Should be present on the first request.
4038   // Send a response.
4039   // We increment time to make sure that the CSDS data gets a new timestamp.
4040   time_cache_.TestOnlySetNow(kTime1);
4041   stream2->SendMessageToClient(
4042       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4043           .set_version_info("2")
4044           .set_nonce("B")
4045           .AddFooResource(XdsFooResource(kXdstpResourceName, 3))
4046           .Serialize());
4047   // XdsClient should have delivered the response to the watcher.
4048   auto resource2 = watcher2->WaitForNextResource();
4049   ASSERT_NE(resource2, nullptr);
4050   EXPECT_EQ(resource2->name, kXdstpResourceName);
4051   EXPECT_EQ(resource2->value, 3);
4052   // Check metric data.
4053   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4054       ::testing::ElementsAre(
4055           ::testing::Pair(
4056               ::testing::Pair(kDefaultXdsServerUrl,
4057                               XdsFooResourceType::Get()->type_url()),
4058               1),
4059           ::testing::Pair(
4060               ::testing::Pair(authority_server.server_uri(),
4061                               XdsFooResourceType::Get()->type_url()),
4062               1)),
4063       ::testing::ElementsAre(), ::testing::_));
4064   EXPECT_THAT(
4065       GetResourceCounts(),
4066       ::testing::ElementsAre(
4067           ::testing::Pair(ResourceCountLabelsEq(
4068                               XdsClient::kOldStyleAuthority,
4069                               XdsFooResourceType::Get()->type_url(), "acked"),
4070                           1),
4071           ::testing::Pair(
4072               ResourceCountLabelsEq(
4073                   kAuthority, XdsFooResourceType::Get()->type_url(), "acked"),
4074               1)));
4075   EXPECT_THAT(GetServerConnections(),
4076               ::testing::ElementsAre(
4077                   ::testing::Pair(kDefaultXdsServerUrl, true),
4078                   ::testing::Pair(authority_server.server_uri(), true)));
4079   // Check CSDS data.
4080   csds = DumpCsds();
4081   EXPECT_THAT(
4082       csds.generic_xds_configs(),
4083       ::testing::UnorderedElementsAre(
4084           CsdsResourceAcked(XdsFooResourceType::Get()->type_url(), "foo1",
4085                             resource->AsJsonString(), "1",
4086                             TimestampProtoEq(kTime0)),
4087           CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4088                             kXdstpResourceName, resource2->AsJsonString(), "2",
4089                             TimestampProtoEq(kTime1))));
4090   // XdsClient should have sent an ACK message to the xDS server.
4091   request = WaitForRequest(stream2.get());
4092   ASSERT_TRUE(request.has_value());
4093   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4094                /*version_info=*/"2", /*response_nonce=*/"B",
4095                /*error_detail=*/absl::OkStatus(),
4096                /*resource_names=*/{kXdstpResourceName});
4097   // Cancel watch for "foo1".
4098   CancelFooWatch(watcher.get(), "foo1");
4099   EXPECT_TRUE(stream->IsOrphaned());
4100   // Now cancel watch for xdstp resource name.
4101   CancelFooWatch(watcher2.get(), kXdstpResourceName);
4102   EXPECT_TRUE(stream2->IsOrphaned());
4103 }
4104 
TEST_F(XdsClientTest,FederationAuthorityDefaultsToTopLevelXdsServer)4105 TEST_F(XdsClientTest, FederationAuthorityDefaultsToTopLevelXdsServer) {
4106   constexpr char kAuthority[] = "xds.example.com";
4107   const std::string kXdstpResourceName = absl::StrCat(
4108       "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
4109   // Authority does not specify any xDS servers, so XdsClient will use
4110   // the top-level xDS server in the bootstrap config for this authority.
4111   InitXdsClient(FakeXdsBootstrap::Builder().AddAuthority(
4112       kAuthority, FakeXdsBootstrap::FakeAuthority()));
4113   // Start a watch for "foo1".
4114   auto watcher = StartFooWatch("foo1");
4115   // Watcher should initially not see any resource reported.
4116   EXPECT_FALSE(watcher->HasEvent());
4117   // XdsClient should have created an ADS stream to the top-level xDS server.
4118   auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front());
4119   ASSERT_TRUE(stream != nullptr);
4120   // XdsClient should have sent a subscription request on the ADS stream.
4121   auto request = WaitForRequest(stream.get());
4122   ASSERT_TRUE(request.has_value());
4123   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4124                /*version_info=*/"", /*response_nonce=*/"",
4125                /*error_detail=*/absl::OkStatus(),
4126                /*resource_names=*/{"foo1"});
4127   CheckRequestNode(*request);  // Should be present on the first request.
4128   // Send a response.
4129   stream->SendMessageToClient(
4130       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4131           .set_version_info("1")
4132           .set_nonce("A")
4133           .AddFooResource(XdsFooResource("foo1", 6))
4134           .Serialize());
4135   // XdsClient should have delivered the response to the watcher.
4136   auto resource = watcher->WaitForNextResource();
4137   ASSERT_NE(resource, nullptr);
4138   EXPECT_EQ(resource->name, "foo1");
4139   EXPECT_EQ(resource->value, 6);
4140   // Check metric data.
4141   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4142       ::testing::ElementsAre(::testing::Pair(
4143           ::testing::Pair(kDefaultXdsServerUrl,
4144                           XdsFooResourceType::Get()->type_url()),
4145           1)),
4146       ::testing::ElementsAre(), ::testing::_));
4147   EXPECT_THAT(
4148       GetResourceCounts(),
4149       ::testing::ElementsAre(::testing::Pair(
4150           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
4151                                 XdsFooResourceType::Get()->type_url(), "acked"),
4152           1)));
4153   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4154                                           kDefaultXdsServerUrl, true)));
4155   // Check CSDS data.
4156   ClientConfig csds = DumpCsds();
4157   EXPECT_THAT(csds.generic_xds_configs(),
4158               ::testing::UnorderedElementsAre(CsdsResourceAcked(
4159                   XdsFooResourceType::Get()->type_url(), "foo1",
4160                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
4161   // XdsClient should have sent an ACK message to the xDS server.
4162   request = WaitForRequest(stream.get());
4163   ASSERT_TRUE(request.has_value());
4164   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4165                /*version_info=*/"1", /*response_nonce=*/"A",
4166                /*error_detail=*/absl::OkStatus(),
4167                /*resource_names=*/{"foo1"});
4168   // Start a watch for the xdstp resource name.
4169   auto watcher2 = StartFooWatch(kXdstpResourceName);
4170   // Watcher should initially not see any resource reported.
4171   EXPECT_FALSE(watcher2->HasEvent());
4172   // XdsClient will send a subscription request on the ADS stream that
4173   // includes both resources, since both are being obtained from the
4174   // same server.
4175   request = WaitForRequest(stream.get());
4176   ASSERT_TRUE(request.has_value());
4177   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4178                /*version_info=*/"1", /*response_nonce=*/"A",
4179                /*error_detail=*/absl::OkStatus(),
4180                /*resource_names=*/{"foo1", kXdstpResourceName});
4181   // Send a response.
4182   // We increment time to make sure that the CSDS data gets a new timestamp.
4183   time_cache_.TestOnlySetNow(kTime1);
4184   stream->SendMessageToClient(
4185       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4186           .set_version_info("2")
4187           .set_nonce("B")
4188           .AddFooResource(XdsFooResource(kXdstpResourceName, 3))
4189           .Serialize());
4190   // XdsClient should have delivered the response to the watcher.
4191   auto resource2 = watcher2->WaitForNextResource();
4192   ASSERT_NE(resource2, nullptr);
4193   EXPECT_EQ(resource2->name, kXdstpResourceName);
4194   EXPECT_EQ(resource2->value, 3);
4195   // Check metric data.
4196   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4197       ::testing::ElementsAre(::testing::Pair(
4198           ::testing::Pair(kDefaultXdsServerUrl,
4199                           XdsFooResourceType::Get()->type_url()),
4200           2)),
4201       ::testing::ElementsAre(), ::testing::_));
4202   EXPECT_THAT(
4203       GetResourceCounts(),
4204       ::testing::ElementsAre(
4205           ::testing::Pair(ResourceCountLabelsEq(
4206                               XdsClient::kOldStyleAuthority,
4207                               XdsFooResourceType::Get()->type_url(), "acked"),
4208                           1),
4209           ::testing::Pair(
4210               ResourceCountLabelsEq(
4211                   kAuthority, XdsFooResourceType::Get()->type_url(), "acked"),
4212               1)));
4213   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4214                                           kDefaultXdsServerUrl, true)));
4215   // Check CSDS data.
4216   csds = DumpCsds();
4217   EXPECT_THAT(
4218       csds.generic_xds_configs(),
4219       ::testing::UnorderedElementsAre(
4220           CsdsResourceAcked(XdsFooResourceType::Get()->type_url(), "foo1",
4221                             resource->AsJsonString(), "1",
4222                             TimestampProtoEq(kTime0)),
4223           CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4224                             kXdstpResourceName, resource2->AsJsonString(), "2",
4225                             TimestampProtoEq(kTime1))));
4226   // XdsClient should have sent an ACK message to the xDS server.
4227   request = WaitForRequest(stream.get());
4228   ASSERT_TRUE(request.has_value());
4229   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4230                /*version_info=*/"2", /*response_nonce=*/"B",
4231                /*error_detail=*/absl::OkStatus(),
4232                /*resource_names=*/{"foo1", kXdstpResourceName});
4233   // Cancel watch for "foo1".
4234   CancelFooWatch(watcher.get(), "foo1");
4235   // XdsClient should send an unsubscription request.
4236   request = WaitForRequest(stream.get());
4237   ASSERT_TRUE(request.has_value());
4238   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4239                /*version_info=*/"2", /*response_nonce=*/"B",
4240                /*error_detail=*/absl::OkStatus(),
4241                /*resource_names=*/{kXdstpResourceName});
4242   // Now cancel watch for xdstp resource name.
4243   CancelFooWatch(watcher2.get(), kXdstpResourceName);
4244   EXPECT_TRUE(stream->IsOrphaned());
4245 }
4246 
TEST_F(XdsClientTest,FederationWithUnknownAuthority)4247 TEST_F(XdsClientTest, FederationWithUnknownAuthority) {
4248   constexpr char kAuthority[] = "xds.example.com";
4249   const std::string kXdstpResourceName = absl::StrCat(
4250       "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
4251   // Note: Not adding authority to bootstrap config.
4252   InitXdsClient();
4253   // Start a watch for the xdstp resource name.
4254   auto watcher = StartFooWatch(kXdstpResourceName);
4255   // Watcher should immediately get an error about the unknown authority.
4256   auto error = watcher->WaitForNextError();
4257   ASSERT_TRUE(error.has_value());
4258   EXPECT_EQ(error->code(), absl::StatusCode::kFailedPrecondition);
4259   EXPECT_EQ(error->message(),
4260             "authority \"xds.example.com\" not present in bootstrap config "
4261             "(node ID:xds_client_test)")
4262       << *error;
4263 }
4264 
TEST_F(XdsClientTest,FederationWithUnparseableXdstpResourceName)4265 TEST_F(XdsClientTest, FederationWithUnparseableXdstpResourceName) {
4266   // Note: Not adding authority to bootstrap config.
4267   InitXdsClient();
4268   // Start a watch for the xdstp resource name.
4269   auto watcher = StartFooWatch("xdstp://x");
4270   // Watcher should immediately get an error about the unknown authority.
4271   auto error = watcher->WaitForNextError();
4272   ASSERT_TRUE(error.has_value());
4273   EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
4274   EXPECT_EQ(error->message(),
4275             "Unable to parse resource name xdstp://x "
4276             "(node ID:xds_client_test)")
4277       << *error;
4278 }
4279 
4280 // TODO(roth,apolcyn): remove this test when the
4281 // GRPC_EXPERIMENTAL_XDS_FEDERATION env var is removed.
TEST_F(XdsClientTest,FederationDisabledWithNewStyleName)4282 TEST_F(XdsClientTest, FederationDisabledWithNewStyleName) {
4283   testing::ScopedEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION", "false");
4284   // We will use this xdstp name, whose authority is not present in
4285   // the bootstrap config.  But since federation is not enabled, we
4286   // will treat this as an opaque old-style name, so we'll send it to
4287   // the default server.
4288   constexpr char kXdstpResourceName[] =
4289       "xdstp://xds.example.com/test.v3.foo/foo1";
4290   InitXdsClient();
4291   // Start a watch for the xdstp name.
4292   auto watcher = StartFooWatch(kXdstpResourceName);
4293   // Watcher should initially not see any resource reported.
4294   EXPECT_FALSE(watcher->HasEvent());
4295   // XdsClient should have created an ADS stream.
4296   auto stream = WaitForAdsStream();
4297   ASSERT_TRUE(stream != nullptr);
4298   // XdsClient should have sent a subscription request on the ADS stream.
4299   auto request = WaitForRequest(stream.get());
4300   ASSERT_TRUE(request.has_value());
4301   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4302                /*version_info=*/"", /*response_nonce=*/"",
4303                /*error_detail=*/absl::OkStatus(),
4304                /*resource_names=*/{kXdstpResourceName});
4305   CheckRequestNode(*request);  // Should be present on the first request.
4306   // Send a response.
4307   stream->SendMessageToClient(
4308       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4309           .set_version_info("1")
4310           .set_nonce("A")
4311           .AddFooResource(XdsFooResource(kXdstpResourceName, 6))
4312           .Serialize());
4313   // XdsClient should have delivered the response to the watcher.
4314   auto resource = watcher->WaitForNextResource();
4315   ASSERT_NE(resource, nullptr);
4316   EXPECT_EQ(resource->name, kXdstpResourceName);
4317   EXPECT_EQ(resource->value, 6);
4318   // XdsClient should have sent an ACK message to the xDS server.
4319   request = WaitForRequest(stream.get());
4320   ASSERT_TRUE(request.has_value());
4321   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4322                /*version_info=*/"1", /*response_nonce=*/"A",
4323                /*error_detail=*/absl::OkStatus(),
4324                /*resource_names=*/{kXdstpResourceName});
4325   // Cancel watch.
4326   CancelFooWatch(watcher.get(), kXdstpResourceName);
4327   EXPECT_TRUE(stream->IsOrphaned());
4328 }
4329 
TEST_F(XdsClientTest,FederationChannelFailureReportedToWatchers)4330 TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) {
4331   constexpr char kAuthority[] = "xds.example.com";
4332   const std::string kXdstpResourceName = absl::StrCat(
4333       "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
4334   FakeXdsBootstrap::FakeXdsServer authority_server("other_xds_server");
4335   FakeXdsBootstrap::FakeAuthority authority;
4336   authority.set_server(authority_server);
4337   InitXdsClient(
4338       FakeXdsBootstrap::Builder().AddAuthority(kAuthority, authority));
4339   // Start a watch for "foo1".
4340   auto watcher = StartFooWatch("foo1");
4341   // Watcher should initially not see any resource reported.
4342   EXPECT_FALSE(watcher->HasEvent());
4343   // XdsClient should have created an ADS stream to the top-level xDS server.
4344   auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front());
4345   ASSERT_TRUE(stream != nullptr);
4346   // XdsClient should have sent a subscription request on the ADS stream.
4347   auto request = WaitForRequest(stream.get());
4348   ASSERT_TRUE(request.has_value());
4349   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4350                /*version_info=*/"", /*response_nonce=*/"",
4351                /*error_detail=*/absl::OkStatus(),
4352                /*resource_names=*/{"foo1"});
4353   CheckRequestNode(*request);  // Should be present on the first request.
4354   // Send a response.
4355   stream->SendMessageToClient(
4356       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4357           .set_version_info("1")
4358           .set_nonce("A")
4359           .AddFooResource(XdsFooResource("foo1", 6))
4360           .Serialize());
4361   // XdsClient should have delivered the response to the watcher.
4362   auto resource = watcher->WaitForNextResource();
4363   ASSERT_NE(resource, nullptr);
4364   EXPECT_EQ(resource->name, "foo1");
4365   EXPECT_EQ(resource->value, 6);
4366   // Check metric data.
4367   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4368       ::testing::ElementsAre(::testing::Pair(
4369           ::testing::Pair(kDefaultXdsServerUrl,
4370                           XdsFooResourceType::Get()->type_url()),
4371           1)),
4372       ::testing::ElementsAre(), ::testing::_));
4373   EXPECT_THAT(
4374       GetResourceCounts(),
4375       ::testing::ElementsAre(::testing::Pair(
4376           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
4377                                 XdsFooResourceType::Get()->type_url(), "acked"),
4378           1)));
4379   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4380                                           kDefaultXdsServerUrl, true)));
4381   // Check CSDS data.
4382   ClientConfig csds = DumpCsds();
4383   EXPECT_THAT(csds.generic_xds_configs(),
4384               ::testing::UnorderedElementsAre(CsdsResourceAcked(
4385                   XdsFooResourceType::Get()->type_url(), "foo1",
4386                   resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
4387   // XdsClient should have sent an ACK message to the xDS server.
4388   request = WaitForRequest(stream.get());
4389   ASSERT_TRUE(request.has_value());
4390   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4391                /*version_info=*/"1", /*response_nonce=*/"A",
4392                /*error_detail=*/absl::OkStatus(),
4393                /*resource_names=*/{"foo1"});
4394   // Start a watch for the xdstp resource name.
4395   auto watcher2 = StartFooWatch(kXdstpResourceName);
4396   // Check metric data.
4397   EXPECT_THAT(GetServerConnections(),
4398               ::testing::ElementsAre(
4399                   ::testing::Pair(kDefaultXdsServerUrl, true),
4400                   ::testing::Pair(authority_server.server_uri(), true)));
4401   // Watcher should initially not see any resource reported.
4402   EXPECT_FALSE(watcher2->HasEvent());
4403   // XdsClient will create a new stream to the server for this authority.
4404   auto stream2 = WaitForAdsStream(authority_server);
4405   ASSERT_TRUE(stream2 != nullptr);
4406   // XdsClient should have sent a subscription request on the ADS stream.
4407   // Note that version and nonce here do NOT use the values for Foo,
4408   // since each authority has its own state.
4409   request = WaitForRequest(stream2.get());
4410   ASSERT_TRUE(request.has_value());
4411   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4412                /*version_info=*/"", /*response_nonce=*/"",
4413                /*error_detail=*/absl::OkStatus(),
4414                /*resource_names=*/{kXdstpResourceName});
4415   CheckRequestNode(*request);  // Should be present on the first request.
4416   // Send a response.
4417   // We increment time to make sure that the CSDS data gets a new timestamp.
4418   time_cache_.TestOnlySetNow(kTime1);
4419   stream2->SendMessageToClient(
4420       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4421           .set_version_info("2")
4422           .set_nonce("B")
4423           .AddFooResource(XdsFooResource(kXdstpResourceName, 3))
4424           .Serialize());
4425   // XdsClient should have delivered the response to the watcher.
4426   auto resource2 = watcher2->WaitForNextResource();
4427   ASSERT_NE(resource2, nullptr);
4428   EXPECT_EQ(resource2->name, kXdstpResourceName);
4429   EXPECT_EQ(resource2->value, 3);
4430   // Check metric data.
4431   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4432       ::testing::ElementsAre(
4433           ::testing::Pair(
4434               ::testing::Pair(kDefaultXdsServerUrl,
4435                               XdsFooResourceType::Get()->type_url()),
4436               1),
4437           ::testing::Pair(
4438               ::testing::Pair(authority_server.server_uri(),
4439                               XdsFooResourceType::Get()->type_url()),
4440               1)),
4441       ::testing::ElementsAre(), ::testing::_));
4442   EXPECT_THAT(
4443       GetResourceCounts(),
4444       ::testing::ElementsAre(
4445           ::testing::Pair(ResourceCountLabelsEq(
4446                               XdsClient::kOldStyleAuthority,
4447                               XdsFooResourceType::Get()->type_url(), "acked"),
4448                           1),
4449           ::testing::Pair(
4450               ResourceCountLabelsEq(
4451                   kAuthority, XdsFooResourceType::Get()->type_url(), "acked"),
4452               1)));
4453   EXPECT_THAT(GetServerConnections(),
4454               ::testing::ElementsAre(
4455                   ::testing::Pair(kDefaultXdsServerUrl, true),
4456                   ::testing::Pair(authority_server.server_uri(), true)));
4457   // Check CSDS data.
4458   csds = DumpCsds();
4459   EXPECT_THAT(
4460       csds.generic_xds_configs(),
4461       ::testing::UnorderedElementsAre(
4462           CsdsResourceAcked(XdsFooResourceType::Get()->type_url(), "foo1",
4463                             resource->AsJsonString(), "1",
4464                             TimestampProtoEq(kTime0)),
4465           CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4466                             kXdstpResourceName, resource2->AsJsonString(), "2",
4467                             TimestampProtoEq(kTime1))));
4468   // XdsClient should have sent an ACK message to the xDS server.
4469   request = WaitForRequest(stream2.get());
4470   ASSERT_TRUE(request.has_value());
4471   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4472                /*version_info=*/"2", /*response_nonce=*/"B",
4473                /*error_detail=*/absl::OkStatus(),
4474                /*resource_names=*/{kXdstpResourceName});
4475   // Now cause a channel failure on the stream to the authority's xDS server.
4476   TriggerConnectionFailure(authority_server,
4477                            absl::UnavailableError("connection failed"));
4478   // The watcher for the xdstp resource name should see the error.
4479   auto error = watcher2->WaitForNextAmbientError();
4480   ASSERT_TRUE(error.has_value());
4481   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
4482   EXPECT_EQ(error->message(),
4483             "xDS channel for server other_xds_server: connection failed "
4484             "(node ID:xds_client_test)")
4485       << *error;
4486   // The watcher for "foo1" should not see any error.
4487   EXPECT_FALSE(watcher->HasEvent());
4488   // Check metric data.
4489   EXPECT_THAT(GetServerConnections(),
4490               ::testing::ElementsAre(
4491                   ::testing::Pair(kDefaultXdsServerUrl, true),
4492                   ::testing::Pair(authority_server.server_uri(), false)));
4493   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4494       ::testing::_, ::testing::_,
4495       ::testing::ElementsAre(
4496           ::testing::Pair(authority_server.server_uri(), 1))));
4497   // Cancel watch for "foo1".
4498   CancelFooWatch(watcher.get(), "foo1");
4499   EXPECT_TRUE(stream->IsOrphaned());
4500   // Now cancel watch for xdstp resource name.
4501   CancelFooWatch(watcher2.get(), kXdstpResourceName);
4502   EXPECT_TRUE(stream2->IsOrphaned());
4503 }
4504 
TEST_F(XdsClientTest,AdsReadWaitsForHandleRelease)4505 TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) {
4506   InitXdsClient();
4507   // Start watches for "foo1" and "foo2".
4508   auto watcher1 = StartFooWatch("foo1");
4509   // XdsClient should have created an ADS stream.
4510   auto stream = WaitForAdsStream();
4511   ASSERT_TRUE(stream != nullptr);
4512   // XdsClient should have sent a subscription request on the ADS stream.
4513   auto request = WaitForRequest(stream.get());
4514   ASSERT_TRUE(request.has_value());
4515   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4516                /*version_info=*/"", /*response_nonce=*/"",
4517                /*error_detail=*/absl::OkStatus(),
4518                /*resource_names=*/{"foo1"});
4519   auto watcher2 = StartFooWatch("foo2");
4520   request = WaitForRequest(stream.get());
4521   ASSERT_TRUE(request.has_value());
4522   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4523                /*version_info=*/"", /*response_nonce=*/"",
4524                /*error_detail=*/absl::OkStatus(),
4525                /*resource_names=*/{"foo1", "foo2"});
4526   // Send a response with 2 resources.
4527   stream->SendMessageToClient(
4528       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4529           .set_version_info("1")
4530           .set_nonce("A")
4531           .AddFooResource(XdsFooResource("foo1", 6))
4532           .AddFooResource(XdsFooResource("foo2", 10))
4533           .Serialize());
4534   // Send a response with a single resource, will not be read until the handle
4535   // is released
4536   stream->SendMessageToClient(
4537       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4538           .set_version_info("2")
4539           .set_nonce("B")
4540           .AddFooResource(XdsFooResource("foo1", 8))
4541           .Serialize());
4542   // XdsClient should have delivered the response to the watcher.
4543   auto resource1 = watcher1->WaitForNextResourceAndHandle();
4544   ASSERT_NE(resource1, absl::nullopt);
4545   EXPECT_EQ(resource1->resource->name, "foo1");
4546   EXPECT_EQ(resource1->resource->value, 6);
4547   auto resource2 = watcher2->WaitForNextResourceAndHandle();
4548   ASSERT_NE(resource2, absl::nullopt);
4549   EXPECT_EQ(resource2->resource->name, "foo2");
4550   EXPECT_EQ(resource2->resource->value, 10);
4551   // XdsClient should have sent an ACK message to the xDS server.
4552   request = WaitForRequest(stream.get());
4553   ASSERT_TRUE(request.has_value());
4554   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4555                /*version_info=*/"1", /*response_nonce=*/"A",
4556                /*error_detail=*/absl::OkStatus(),
4557                /*resource_names=*/{"foo1", "foo2"});
4558   EXPECT_TRUE(stream->WaitForReadsStarted(1));
4559   resource1->read_delay_handle.reset();
4560   EXPECT_TRUE(stream->WaitForReadsStarted(1));
4561   resource2->read_delay_handle.reset();
4562   EXPECT_TRUE(stream->WaitForReadsStarted(2));
4563   resource1 = watcher1->WaitForNextResourceAndHandle();
4564   ASSERT_NE(resource1, absl::nullopt);
4565   EXPECT_EQ(resource1->resource->name, "foo1");
4566   EXPECT_EQ(resource1->resource->value, 8);
4567   EXPECT_EQ(watcher2->WaitForNextResourceAndHandle(), absl::nullopt);
4568   // XdsClient should have sent an ACK message to the xDS server.
4569   request = WaitForRequest(stream.get());
4570   ASSERT_TRUE(request.has_value());
4571   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4572                /*version_info=*/"2", /*response_nonce=*/"B",
4573                /*error_detail=*/absl::OkStatus(),
4574                /*resource_names=*/{"foo1", "foo2"});
4575   EXPECT_TRUE(stream->WaitForReadsStarted(2));
4576   resource1->read_delay_handle.reset();
4577   EXPECT_TRUE(stream->WaitForReadsStarted(3));
4578   // Cancel watch.
4579   CancelFooWatch(watcher1.get(), "foo1");
4580   request = WaitForRequest(stream.get());
4581   ASSERT_TRUE(request.has_value());
4582   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4583                /*version_info=*/"2", /*response_nonce=*/"B",
4584                /*error_detail=*/absl::OkStatus(),
4585                /*resource_names=*/{"foo2"});
4586   CancelFooWatch(watcher2.get(), "foo2");
4587   EXPECT_TRUE(stream->IsOrphaned());
4588 }
4589 
TEST_F(XdsClientTest,FallbackAndRecover)4590 TEST_F(XdsClientTest, FallbackAndRecover) {
4591   FakeXdsBootstrap::FakeXdsServer primary_server(kDefaultXdsServerUrl);
4592   FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server");
4593   // Regular operation
4594   InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
4595       {primary_server, fallback_server}));
4596   // Start a watch for "foo1".
4597   auto watcher = StartFooWatch("foo1");
4598   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4599                                           kDefaultXdsServerUrl, true)));
4600   EXPECT_THAT(GetResourceCounts(),
4601               ::testing::ElementsAre(::testing::Pair(
4602                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
4603                                         XdsFooResourceType::Get()->type_url(),
4604                                         "requested"),
4605                   1)));
4606   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4607       ::testing::IsEmpty(), ::testing::_, ::testing::ElementsAre()));
4608   // CSDS should show that the resource has been requested.
4609   ClientConfig csds = DumpCsds();
4610   EXPECT_THAT(csds.generic_xds_configs(),
4611               ::testing::ElementsAre(CsdsResourceRequested(
4612                   XdsFooResourceType::Get()->type_url(), "foo1")));
4613   // XdsClient should have created an ADS stream.
4614   auto stream = WaitForAdsStream();
4615   ASSERT_TRUE(stream != nullptr);
4616   // XdsClient should have sent a subscription request on the ADS stream.
4617   auto request = WaitForRequest(stream.get());
4618   ASSERT_TRUE(request.has_value());
4619   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4620                /*version_info=*/"", /*response_nonce=*/"",
4621                /*error_detail=*/absl::OkStatus(),
4622                /*resource_names=*/{"foo1"});
4623   // Input: Get initial response from primary server.
4624   stream->SendMessageToClient(
4625       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4626           .set_version_info("20")
4627           .set_nonce("O")
4628           .AddFooResource(XdsFooResource("foo1", 6))
4629           .Serialize());
4630   // Result (local): Resource is delivered to watcher.
4631   auto resource = watcher->WaitForNextResource();
4632   ASSERT_NE(resource, nullptr);
4633   EXPECT_EQ(resource->name, "foo1");
4634   EXPECT_EQ(resource->value, 6);
4635   // Result (local): Metrics show 1 resource update and 1 cached resource.
4636   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4637       ::testing::ElementsAre(::testing::Pair(
4638           ::testing::Pair(kDefaultXdsServerUrl,
4639                           XdsFooResourceType::Get()->type_url()),
4640           1)),
4641       ::testing::_, ::testing::_));
4642   EXPECT_THAT(
4643       GetResourceCounts(),
4644       ::testing::ElementsAre(::testing::Pair(
4645           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
4646                                 XdsFooResourceType::Get()->type_url(), "acked"),
4647           1)));
4648   // Check CSDS data.
4649   csds = DumpCsds();
4650   EXPECT_THAT(csds.generic_xds_configs(),
4651               ::testing::UnorderedElementsAre(CsdsResourceAcked(
4652                   XdsFooResourceType::Get()->type_url(), "foo1",
4653                   resource->AsJsonString(), "20", TimestampProtoEq(kTime0))));
4654   // Result (remote): Client sends ACK to server.
4655   request = WaitForRequest(stream.get());
4656   ASSERT_TRUE(request.has_value());
4657   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4658                /*version_info=*/"20", /*response_nonce=*/"O",
4659                /*error_detail=*/absl::OkStatus(),
4660                /*resource_names=*/{"foo1"});
4661   // Input: Trigger connection failure to primary.
4662   TriggerConnectionFailure(primary_server,
4663                            absl::UnavailableError("Server down"));
4664   // Result (local): The error is reported to the watcher.
4665   auto error = watcher->WaitForNextAmbientError();
4666   ASSERT_TRUE(error.has_value());
4667   EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
4668   EXPECT_EQ(error->message(),
4669             "xDS channel for server default_xds_server: Server down (node "
4670             "ID:xds_client_test)");
4671   // Result (local): The metrics show the channel as being unhealthy.
4672   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4673                                           kDefaultXdsServerUrl, false)));
4674   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4675       ::testing::_, ::testing::_,
4676       ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
4677   // Input: Trigger stream failure.
4678   stream->MaybeSendStatusToClient(absl::UnavailableError("Stream failure"));
4679   // Result (local): The metrics still show the channel as being unhealthy.
4680   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4681                                           kDefaultXdsServerUrl, false)));
4682   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4683       ::testing::_, ::testing::_,
4684       ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
4685   // Result (remote): The client starts a new stream and sends a subscription
4686   //   message. Note that the server does not respond, so the channel will still
4687   //   have non-OK status.
4688   stream = WaitForAdsStream();
4689   ASSERT_NE(stream, nullptr);
4690   request = WaitForRequest(stream.get());
4691   ASSERT_TRUE(request.has_value());
4692   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4693                /*version_info=*/"20", /*response_nonce=*/"",
4694                /*error_detail=*/absl::OkStatus(),
4695                /*resource_names=*/{"foo1"});
4696   // Input: Start second watch for foo1 (already cached).
4697   auto watcher_cached = StartFooWatch("foo1");
4698   // Result (local): New watcher gets the cached resource.
4699   resource = watcher_cached->WaitForNextResource();
4700   ASSERT_NE(resource, nullptr);
4701   EXPECT_EQ(resource->name, "foo1");
4702   EXPECT_EQ(resource->value, 6);
4703   // Result (local): New watcher gets the error from the channel state.
4704   error = watcher_cached->WaitForNextAmbientError();
4705   ASSERT_TRUE(error.has_value());
4706   EXPECT_EQ(error->message(),
4707             "xDS channel for server default_xds_server: Server down (node "
4708             "ID:xds_client_test)")
4709       << error->message();
4710   CancelFooWatch(watcher_cached.get(), "foo1");
4711   // Input: Start watch for foo2 (not already cached).
4712   auto watcher2 = StartFooWatch("foo2");
4713   // Result (local): Metrics show a healthy channel to the fallback server.
4714   EXPECT_THAT(GetServerConnections(),
4715               ::testing::ElementsAre(
4716                   ::testing::Pair(kDefaultXdsServerUrl, false),
4717                   ::testing::Pair(fallback_server.server_uri(), true)));
4718   // Result (remote): Client sent a new request for both resources on the
4719   //   stream to the primary.
4720   request = WaitForRequest(stream.get());
4721   ASSERT_TRUE(request.has_value());
4722   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4723                /*version_info=*/"20", /*response_nonce=*/"",
4724                /*error_detail=*/absl::OkStatus(),
4725                /*resource_names=*/{"foo1", "foo2"});
4726   // Result (remote): Client created a stream to the fallback server and sent a
4727   //   request on that stream for both resources.
4728   auto stream2 = WaitForAdsStream(fallback_server);
4729   ASSERT_TRUE(stream2 != nullptr);
4730   request = WaitForRequest(stream2.get());
4731   ASSERT_TRUE(request.has_value());
4732   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4733                /*version_info=*/"", /*response_nonce=*/"",
4734                /*error_detail=*/absl::OkStatus(),
4735                /*resource_names=*/{"foo1", "foo2"});
4736   // Input: Fallback server sends a response with both resources.
4737   // We increment time to make sure that the CSDS data gets a new timestamp.
4738   time_cache_.TestOnlySetNow(kTime1);
4739   stream2->SendMessageToClient(
4740       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4741           .set_version_info("5")
4742           .set_nonce("A")
4743           .AddFooResource(XdsFooResource("foo1", 20))
4744           .AddFooResource(XdsFooResource("foo2", 30))
4745           .Serialize());
4746   // Result (local): Resources are delivered to watchers.
4747   resource = watcher->WaitForNextResource();
4748   ASSERT_NE(resource, nullptr);
4749   EXPECT_EQ(resource->name, "foo1");
4750   EXPECT_EQ(resource->value, 20);
4751   auto resource2 = watcher2->WaitForNextResource();
4752   ASSERT_NE(resource2, nullptr);
4753   EXPECT_EQ(resource2->name, "foo2");
4754   EXPECT_EQ(resource2->value, 30);
4755   // Result (local): Metrics show an update from fallback server.
4756   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4757       ::testing::ElementsAre(
4758           ::testing::Pair(
4759               ::testing::Pair(kDefaultXdsServerUrl,
4760                               XdsFooResourceType::Get()->type_url()),
4761               1),
4762           ::testing::Pair(
4763               ::testing::Pair(fallback_server.server_uri(),
4764                               XdsFooResourceType::Get()->type_url()),
4765               2)),
4766       ::testing::_, ::testing::_));
4767   EXPECT_THAT(GetServerConnections(),
4768               ::testing::ElementsAre(
4769                   ::testing::Pair(kDefaultXdsServerUrl, false),
4770                   ::testing::Pair(fallback_server.server_uri(), true)));
4771   EXPECT_THAT(
4772       GetResourceCounts(),
4773       ::testing::ElementsAre(::testing::Pair(
4774           ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
4775                                 XdsFooResourceType::Get()->type_url(), "acked"),
4776           2)));
4777   // Check CSDS data.
4778   csds = DumpCsds();
4779   EXPECT_THAT(csds.generic_xds_configs(),
4780               ::testing::UnorderedElementsAre(
4781                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4782                                     "foo1", resource->AsJsonString(), "5",
4783                                     TimestampProtoEq(kTime1)),
4784                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4785                                     "foo2", resource2->AsJsonString(), "5",
4786                                     TimestampProtoEq(kTime1))));
4787   // Result (remote): Client sends ACK to fallback server.
4788   request = WaitForRequest(stream2.get());
4789   ASSERT_TRUE(request.has_value());
4790   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4791                /*version_info=*/"5", /*response_nonce=*/"A",
4792                /*error_detail=*/absl::OkStatus(),
4793                /*resource_names=*/{"foo1", "foo2"});
4794   // Input: Primary server sends a response containing both resources.
4795   // We increment time to make sure that the CSDS data gets a new timestamp.
4796   time_cache_.TestOnlySetNow(kTime2);
4797   stream->SendMessageToClient(
4798       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4799           .set_version_info("15")
4800           .set_nonce("B")
4801           .AddFooResource(XdsFooResource("foo1", 35))
4802           .AddFooResource(XdsFooResource("foo2", 25))
4803           .Serialize());
4804   // Result (local): Resources are delivered to watchers.
4805   resource = watcher->WaitForNextResource();
4806   ASSERT_NE(resource, nullptr);
4807   EXPECT_EQ(resource->name, "foo1");
4808   EXPECT_EQ(resource->value, 35);
4809   resource2 = watcher2->WaitForNextResource();
4810   ASSERT_NE(resource2, nullptr);
4811   EXPECT_EQ(resource2->name, "foo2");
4812   EXPECT_EQ(resource2->value, 25);
4813   // Result (local): Metrics show that we've closed the channel to the fallback
4814   //   server and received resource updates from the primary server.
4815   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4816       ::testing::ElementsAre(
4817           ::testing::Pair(
4818               ::testing::Pair(kDefaultXdsServerUrl,
4819                               XdsFooResourceType::Get()->type_url()),
4820               3),
4821           ::testing::Pair(
4822               ::testing::Pair(fallback_server.server_uri(),
4823                               XdsFooResourceType::Get()->type_url()),
4824               2)),
4825       ::testing::_,
4826       ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
4827   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4828                                           kDefaultXdsServerUrl, true)));
4829   // Check CSDS data.
4830   csds = DumpCsds();
4831   EXPECT_THAT(csds.generic_xds_configs(),
4832               ::testing::UnorderedElementsAre(
4833                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4834                                     "foo1", resource->AsJsonString(), "15",
4835                                     TimestampProtoEq(kTime2)),
4836                   CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4837                                     "foo2", resource2->AsJsonString(), "15",
4838                                     TimestampProtoEq(kTime2))));
4839   // Result (remote): The stream to the fallback server has been orphaned.
4840   EXPECT_TRUE(stream2->IsOrphaned());
4841   // Result (remote): Client sends ACK to server.
4842   request = WaitForRequest(stream.get());
4843   ASSERT_TRUE(request.has_value());
4844   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4845                /*version_info=*/"15", /*response_nonce=*/"B",
4846                /*error_detail=*/absl::OkStatus(),
4847                /*resource_names=*/{"foo1", "foo2"});
4848   // Clean up.
4849   CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
4850   CancelFooWatch(watcher2.get(), "foo2");
4851   // Result (remote): The stream to the primary server has been orphaned.
4852   EXPECT_TRUE(stream->IsOrphaned());
4853 }
4854 
4855 // Test for both servers being unavailable
TEST_F(XdsClientTest,FallbackReportsError)4856 TEST_F(XdsClientTest, FallbackReportsError) {
4857   FakeXdsBootstrap::FakeXdsServer primary_server(kDefaultXdsServerUrl);
4858   FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server");
4859   InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
4860       {primary_server, fallback_server}));
4861   auto watcher = StartFooWatch("foo1");
4862   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4863                                           kDefaultXdsServerUrl, true)));
4864   auto stream = WaitForAdsStream();
4865   ASSERT_TRUE(stream != nullptr);
4866   // XdsClient should have sent a subscription request on the ADS stream.
4867   auto request = WaitForRequest(stream.get());
4868   ASSERT_TRUE(request.has_value());
4869   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4870                /*version_info=*/"", /*response_nonce=*/"",
4871                /*error_detail=*/absl::OkStatus(),
4872                /*resource_names=*/{"foo1"});
4873   EXPECT_THAT(GetResourceCounts(),
4874               ::testing::ElementsAre(::testing::Pair(
4875                   ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
4876                                         XdsFooResourceType::Get()->type_url(),
4877                                         "requested"),
4878                   1)));
4879   TriggerConnectionFailure(primary_server,
4880                            absl::UnavailableError("Server down"));
4881   EXPECT_THAT(GetServerConnections(),
4882               ::testing::ElementsAre(
4883                   ::testing::Pair(kDefaultXdsServerUrl, false),
4884                   ::testing::Pair(fallback_server.server_uri(), true)));
4885   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4886       ::testing::_, ::testing::_,
4887       ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
4888   // CSDS should show that the resource has been requested.
4889   ClientConfig csds = DumpCsds();
4890   EXPECT_THAT(csds.generic_xds_configs(),
4891               ::testing::ElementsAre(CsdsResourceRequested(
4892                   XdsFooResourceType::Get()->type_url(), "foo1")));
4893   // Fallback happens now
4894   stream = WaitForAdsStream(fallback_server);
4895   ASSERT_NE(stream, nullptr);
4896   request = WaitForRequest(stream.get());
4897   ASSERT_TRUE(request.has_value());
4898   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4899                /*version_info=*/"", /*response_nonce=*/"",
4900                /*error_detail=*/absl::OkStatus(),
4901                /*resource_names=*/{"foo1"});
4902   TriggerConnectionFailure(fallback_server,
4903                            absl::UnavailableError("Another server down"));
4904   EXPECT_THAT(GetServerConnections(),
4905               ::testing::ElementsAre(
4906                   ::testing::Pair(kDefaultXdsServerUrl, false),
4907                   ::testing::Pair(fallback_server.server_uri(), false)));
4908   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4909       ::testing::_, ::testing::_,
4910       ::testing::ElementsAre(
4911           ::testing::Pair(kDefaultXdsServerUrl, 1),
4912           ::testing::Pair(fallback_server.server_uri(), 1))));
4913   csds = DumpCsds();
4914   EXPECT_THAT(csds.generic_xds_configs(),
4915               ::testing::ElementsAre(CsdsResourceRequested(
4916                   XdsFooResourceType::Get()->type_url(), "foo1")));
4917   auto error = watcher->WaitForNextError();
4918   ASSERT_TRUE(error.has_value());
4919   EXPECT_THAT(error->code(), absl::StatusCode::kUnavailable);
4920   EXPECT_EQ(error->message(),
4921             "xDS channel for server fallback_xds_server: Another server down "
4922             "(node ID:xds_client_test)")
4923       << error->message();
4924 }
4925 
TEST_F(XdsClientTest,FallbackOnStartup)4926 TEST_F(XdsClientTest, FallbackOnStartup) {
4927   FakeXdsBootstrap::FakeXdsServer primary_server;
4928   FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server");
4929   // Regular operation
4930   InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
4931       {primary_server, fallback_server}));
4932   // Start a watch for "foo1".
4933   auto watcher = StartFooWatch("foo1");
4934   auto primary_stream = WaitForAdsStream(primary_server);
4935   ASSERT_NE(primary_stream, nullptr);
4936   // XdsClient should have sent a subscription request on the ADS stream.
4937   auto request = WaitForRequest(primary_stream.get());
4938   ASSERT_TRUE(request.has_value());
4939   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4940                /*version_info=*/"", /*response_nonce=*/"",
4941                /*error_detail=*/absl::OkStatus(),
4942                /*resource_names=*/{"foo1"});
4943   TriggerConnectionFailure(primary_server,
4944                            absl::UnavailableError("Primary server is down"));
4945   // XdsClient should have created an ADS stream.
4946   auto fallback_stream = WaitForAdsStream(fallback_server);
4947   ASSERT_NE(fallback_stream, nullptr);
4948   // XdsClient should have sent a subscription request on the ADS stream.
4949   request = WaitForRequest(fallback_stream.get());
4950   ASSERT_TRUE(request.has_value());
4951   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4952                /*version_info=*/"", /*response_nonce=*/"",
4953                /*error_detail=*/absl::OkStatus(),
4954                /*resource_names=*/{"foo1"});
4955   // Send a response.
4956   fallback_stream->SendMessageToClient(
4957       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4958           .set_version_info("1")
4959           .set_nonce("A")
4960           .AddFooResource(XdsFooResource("foo1", 6))
4961           .Serialize());
4962   EXPECT_THAT(GetServerConnections(),
4963               ::testing::ElementsAre(
4964                   ::testing::Pair(kDefaultXdsServerUrl, false),
4965                   ::testing::Pair(fallback_server.server_uri(), true)));
4966   EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4967       ::testing::_, ::testing::_,
4968       ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
4969   // XdsClient should have delivered the response to the watcher.
4970   auto resource = watcher->WaitForNextResource();
4971   ASSERT_NE(resource, nullptr);
4972   EXPECT_EQ(resource->name, "foo1");
4973   EXPECT_EQ(resource->value, 6);
4974   // Client sends an ACK.
4975   request = WaitForRequest(fallback_stream.get());
4976   ASSERT_TRUE(request.has_value());
4977   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4978                /*version_info=*/"1", /*response_nonce=*/"A",
4979                /*error_detail=*/absl::OkStatus(),
4980                /*resource_names=*/{"foo1"});
4981   // Recover to primary
4982   primary_stream->SendMessageToClient(
4983       ResponseBuilder(XdsFooResourceType::Get()->type_url())
4984           .set_version_info("5")
4985           .set_nonce("D")
4986           .AddFooResource(XdsFooResource("foo1", 42))
4987           .Serialize());
4988   EXPECT_TRUE(fallback_stream->IsOrphaned());
4989   resource = watcher->WaitForNextResource();
4990   ASSERT_NE(resource, nullptr);
4991   EXPECT_EQ(resource->name, "foo1");
4992   EXPECT_EQ(resource->value, 42);
4993   EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4994                                           kDefaultXdsServerUrl, true)));
4995   request = WaitForRequest(primary_stream.get());
4996   ASSERT_TRUE(request.has_value());
4997   CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4998                /*version_info=*/"5", /*response_nonce=*/"D",
4999                /*error_detail=*/absl::OkStatus(),
5000                /*resource_names=*/{"foo1"});
5001 }
5002 
5003 }  // namespace
5004 }  // namespace testing
5005 }  // namespace grpc_core
5006 
main(int argc,char ** argv)5007 int main(int argc, char** argv) {
5008   ::testing::InitGoogleTest(&argc, argv);
5009   grpc::testing::TestEnvironment env(&argc, argv);
5010   return RUN_ALL_TESTS();
5011 }
5012