1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 // TODO(roth): Add the following tests:
18 // - tests for load-reporting APIs? (or maybe move those out of XdsClient?)
19
20 #include "src/core/xds/xds_client/xds_client.h"
21
22 #include <google/protobuf/any.pb.h>
23 #include <google/protobuf/struct.pb.h>
24 #include <grpc/grpc.h>
25 #include <grpc/support/json.h>
26 #include <grpcpp/impl/codegen/config_protobuf.h>
27 #include <stdint.h>
28
29 #include <algorithm>
30 #include <deque>
31 #include <fstream>
32 #include <map>
33 #include <memory>
34 #include <optional>
35 #include <string>
36 #include <vector>
37
38 #include "absl/strings/str_cat.h"
39 #include "absl/time/time.h"
40 #include "absl/types/optional.h"
41 #include "absl/types/variant.h"
42 #include "envoy/config/core/v3/base.pb.h"
43 #include "envoy/service/discovery/v3/discovery.pb.h"
44 #include "envoy/service/status/v3/csds.pb.h"
45 #include "gmock/gmock.h"
46 #include "gtest/gtest.h"
47 #include "src/core/lib/iomgr/timer_manager.h"
48 #include "src/core/util/debug_location.h"
49 #include "src/core/util/json/json.h"
50 #include "src/core/util/json/json_args.h"
51 #include "src/core/util/json/json_object_loader.h"
52 #include "src/core/util/json/json_reader.h"
53 #include "src/core/util/json/json_writer.h"
54 #include "src/core/util/match.h"
55 #include "src/core/util/sync.h"
56 #include "src/core/xds/xds_client/xds_bootstrap.h"
57 #include "src/core/xds/xds_client/xds_resource_type_impl.h"
58 #include "test/core/event_engine/event_engine_test_utils.h"
59 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
60 #include "test/core/test_util/scoped_env_var.h"
61 #include "test/core/test_util/test_config.h"
62 #include "test/core/xds/xds_client_test_peer.h"
63 #include "test/core/xds/xds_transport_fake.h"
64 #include "upb/reflection/def.h"
65
66 // IWYU pragma: no_include <google/protobuf/message.h>
67 // IWYU pragma: no_include <google/protobuf/stubs/status.h>
68 // IWYU pragma: no_include <google/protobuf/unknown_field_set.h>
69 // IWYU pragma: no_include <google/protobuf/util/json_util.h>
70 // IWYU pragma: no_include "google/protobuf/json/json.h"
71 // IWYU pragma: no_include "google/protobuf/util/json_util.h"
72
73 using envoy::admin::v3::ClientResourceStatus;
74 using envoy::service::discovery::v3::DiscoveryRequest;
75 using envoy::service::discovery::v3::DiscoveryResponse;
76 using envoy::service::status::v3::ClientConfig;
77 using grpc_event_engine::experimental::FuzzingEventEngine;
78
79 namespace grpc_core {
80 namespace testing {
81 namespace {
82
83 constexpr absl::string_view kDefaultXdsServerUrl = "default_xds_server";
84
85 constexpr Timestamp kTime0 =
86 Timestamp::FromMillisecondsAfterProcessEpoch(10000);
87 constexpr Timestamp kTime1 =
88 Timestamp::FromMillisecondsAfterProcessEpoch(15000);
89 constexpr Timestamp kTime2 =
90 Timestamp::FromMillisecondsAfterProcessEpoch(20000);
91
92 class XdsClientTest : public ::testing::Test {
93 protected:
94 // A fake bootstrap implementation that allows tests to populate the
95 // fields however they want.
96 class FakeXdsBootstrap : public XdsBootstrap {
97 public:
98 class FakeNode : public Node {
99 public:
FakeNode()100 FakeNode() : id_("xds_client_test") {}
id() const101 const std::string& id() const override { return id_; }
cluster() const102 const std::string& cluster() const override { return cluster_; }
locality_region() const103 const std::string& locality_region() const override {
104 return locality_region_;
105 }
locality_zone() const106 const std::string& locality_zone() const override {
107 return locality_zone_;
108 }
locality_sub_zone() const109 const std::string& locality_sub_zone() const override {
110 return locality_sub_zone_;
111 }
metadata() const112 const Json::Object& metadata() const override { return metadata_; }
113
set_id(std::string id)114 void set_id(std::string id) { id_ = std::move(id); }
set_cluster(std::string cluster)115 void set_cluster(std::string cluster) { cluster_ = std::move(cluster); }
set_locality_region(std::string locality_region)116 void set_locality_region(std::string locality_region) {
117 locality_region_ = std::move(locality_region);
118 }
set_locality_zone(std::string locality_zone)119 void set_locality_zone(std::string locality_zone) {
120 locality_zone_ = std::move(locality_zone);
121 }
set_locality_sub_zone(std::string locality_sub_zone)122 void set_locality_sub_zone(std::string locality_sub_zone) {
123 locality_sub_zone_ = std::move(locality_sub_zone);
124 }
set_metadata(Json::Object metadata)125 void set_metadata(Json::Object metadata) {
126 metadata_ = std::move(metadata);
127 }
128
129 private:
130 std::string id_;
131 std::string cluster_;
132 std::string locality_region_;
133 std::string locality_zone_;
134 std::string locality_sub_zone_;
135 Json::Object metadata_;
136 };
137
138 class FakeXdsServer : public XdsServer {
139 public:
FakeXdsServer(absl::string_view server_uri=kDefaultXdsServerUrl,bool ignore_resource_deletion=false)140 explicit FakeXdsServer(
141 absl::string_view server_uri = kDefaultXdsServerUrl,
142 bool ignore_resource_deletion = false)
143 : server_uri_(server_uri),
144 ignore_resource_deletion_(ignore_resource_deletion) {}
server_uri() const145 const std::string& server_uri() const override { return server_uri_; }
IgnoreResourceDeletion() const146 bool IgnoreResourceDeletion() const override {
147 return ignore_resource_deletion_;
148 }
Equals(const XdsServer & other) const149 bool Equals(const XdsServer& other) const override {
150 const auto& o = static_cast<const FakeXdsServer&>(other);
151 return server_uri_ == o.server_uri_ &&
152 ignore_resource_deletion_ == o.ignore_resource_deletion_;
153 }
Key() const154 std::string Key() const override {
155 return absl::StrCat(server_uri_, "#", ignore_resource_deletion_);
156 }
157
158 private:
159 std::string server_uri_;
160 bool ignore_resource_deletion_ = false;
161 };
162
163 class FakeAuthority : public Authority {
164 public:
servers() const165 std::vector<const XdsServer*> servers() const override {
166 if (server_.has_value()) {
167 return {&*server_};
168 } else {
169 return {};
170 };
171 }
172
set_server(absl::optional<FakeXdsServer> server)173 void set_server(absl::optional<FakeXdsServer> server) {
174 server_ = std::move(server);
175 }
176
177 private:
178 absl::optional<FakeXdsServer> server_;
179 };
180
181 class Builder {
182 public:
Builder()183 Builder() { node_.emplace(); }
set_node_id(std::string id)184 Builder& set_node_id(std::string id) {
185 if (!node_.has_value()) node_.emplace();
186 node_->set_id(std::move(id));
187 return *this;
188 }
AddAuthority(std::string name,FakeAuthority authority)189 Builder& AddAuthority(std::string name, FakeAuthority authority) {
190 authorities_[std::move(name)] = std::move(authority);
191 return *this;
192 }
SetServers(absl::Span<const FakeXdsServer> servers)193 Builder& SetServers(absl::Span<const FakeXdsServer> servers) {
194 servers_.assign(servers.begin(), servers.end());
195 return *this;
196 }
Build()197 std::unique_ptr<XdsBootstrap> Build() {
198 auto bootstrap = std::make_unique<FakeXdsBootstrap>();
199 bootstrap->servers_ = std::move(servers_);
200 bootstrap->node_ = std::move(node_);
201 bootstrap->authorities_ = std::move(authorities_);
202 return bootstrap;
203 }
204
205 private:
206 std::vector<FakeXdsServer> servers_ = {FakeXdsServer()};
207 absl::optional<FakeNode> node_;
208 std::map<std::string, FakeAuthority> authorities_;
209 };
210
ToString() const211 std::string ToString() const override { return "<fake>"; }
212
servers() const213 std::vector<const XdsServer*> servers() const override {
214 std::vector<const XdsServer*> result;
215 result.reserve(servers_.size());
216 for (size_t i = 0; i < servers_.size(); ++i) {
217 result.emplace_back(&servers_[i]);
218 }
219 return result;
220 }
221
node() const222 const Node* node() const override {
223 return node_.has_value() ? &*node_ : nullptr;
224 }
LookupAuthority(const std::string & name) const225 const Authority* LookupAuthority(const std::string& name) const override {
226 auto it = authorities_.find(name);
227 if (it == authorities_.end()) return nullptr;
228 return &it->second;
229 }
230
231 private:
232 std::vector<FakeXdsServer> servers_;
233 absl::optional<FakeNode> node_;
234 std::map<std::string, FakeAuthority> authorities_;
235 };
236
237 // A template for a test xDS resource type with an associated watcher impl.
238 // For simplicity, we use JSON instead of proto for serialization.
239 //
240 // The specified ResourceStruct must provide the following:
241 // - a static JsonLoader() method, as described in json_object_loader.h
242 // - an AsJsonString() method that returns the object in JSON string form
243 // - a static TypeUrl() method that returns the resource type
244 //
245 // The all_resources_required_in_sotw parameter indicates the value
246 // that should be returned by the AllResourcesRequiredInSotW() method.
247 template <typename ResourceStruct, bool all_resources_required_in_sotw>
248 class XdsTestResourceType
249 : public XdsResourceTypeImpl<
250 XdsTestResourceType<ResourceStruct, all_resources_required_in_sotw>,
251 ResourceStruct> {
252 public:
253 // A watcher implementation that queues delivered watches.
254 class Watcher : public XdsResourceTypeImpl<
255 XdsTestResourceType<ResourceStruct,
256 all_resources_required_in_sotw>,
257 ResourceStruct>::WatcherInterface {
258 public:
Watcher(std::shared_ptr<FuzzingEventEngine> event_engine)259 explicit Watcher(std::shared_ptr<FuzzingEventEngine> event_engine)
260 : event_engine_(std::move(event_engine)) {}
261
~Watcher()262 ~Watcher() override {
263 MutexLock lock(&mu_);
264 EXPECT_THAT(queue_, ::testing::IsEmpty())
265 << this << " " << queue_[0].ToString();
266 }
267
HasEvent()268 bool HasEvent() {
269 MutexLock lock(&mu_);
270 return !queue_.empty();
271 }
272
273 // Returns true if no event is received after draining the fuzzing
274 // EE queue.
ExpectNoEvent()275 bool ExpectNoEvent() {
276 event_engine_->TickUntilIdle();
277 return !HasEvent();
278 }
279
280 struct ResourceAndReadDelayHandle {
281 std::shared_ptr<const ResourceStruct> resource;
282 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle;
283 };
WaitForNextResourceAndHandle(SourceLocation location=SourceLocation ())284 absl::optional<ResourceAndReadDelayHandle> WaitForNextResourceAndHandle(
285 SourceLocation location = SourceLocation()) {
286 auto event = WaitForNextOnResourceChangedEvent(location);
287 if (!event.has_value()) return absl::nullopt;
288 EXPECT_TRUE(event->resource.ok())
289 << "got unexpected error: " << event->resource.status() << " at "
290 << location.file() << ":" << location.line();
291 if (!event->resource.ok()) return absl::nullopt;
292 return ResourceAndReadDelayHandle{std::move(*event->resource),
293 std::move(event->read_delay_handle)};
294 }
295
WaitForNextResource(SourceLocation location=SourceLocation ())296 std::shared_ptr<const ResourceStruct> WaitForNextResource(
297 SourceLocation location = SourceLocation()) {
298 auto resource_and_handle = WaitForNextResourceAndHandle(location);
299 if (!resource_and_handle.has_value()) return nullptr;
300 return std::move(resource_and_handle->resource);
301 }
302
WaitForNextError(SourceLocation location=SourceLocation ())303 absl::optional<absl::Status> WaitForNextError(
304 SourceLocation location = SourceLocation()) {
305 auto event = WaitForNextOnResourceChangedEvent(location);
306 if (!event.has_value()) return absl::nullopt;
307 EXPECT_FALSE(event->resource.ok())
308 << "got unexpected resource: " << (*event->resource)->name << " at "
309 << location.file() << ":" << location.line();
310 if (event->resource.ok()) return absl::nullopt;
311 return event->resource.status();
312 }
313
WaitForDoesNotExist(SourceLocation location=SourceLocation ())314 bool WaitForDoesNotExist(SourceLocation location = SourceLocation()) {
315 auto status = WaitForNextError(location);
316 if (!status.has_value()) return false;
317 EXPECT_EQ(status->code(), absl::StatusCode::kNotFound)
318 << "unexpected status: " << *status << " at " << location.file()
319 << ":" << location.line();
320 return status->code() == absl::StatusCode::kNotFound;
321 }
322
WaitForNextAmbientError(SourceLocation location=SourceLocation ())323 absl::optional<absl::Status> WaitForNextAmbientError(
324 SourceLocation location = SourceLocation()) {
325 auto event = WaitForNextEvent();
326 if (!event.has_value()) return absl::nullopt;
327 return Match(
328 event->payload,
329 [&](const absl::StatusOr<std::shared_ptr<const ResourceStruct>>&
330 resource) -> absl::optional<absl::Status> {
331 EXPECT_TRUE(false)
332 << "got unexpected resource: " << event->ToString() << " at "
333 << location.file() << ":" << location.line();
334 return absl::nullopt;
335 },
336 [&](const absl::Status& status) -> absl::optional<absl::Status> {
337 return status;
338 });
339 }
340
341 private:
342 // An event delivered to the watcher.
343 struct Event {
344 absl::variant<
345 // OnResourceChanged()
346 absl::StatusOr<std::shared_ptr<const ResourceStruct>>,
347 // OnAmbientError()
348 absl::Status>
349 payload;
350 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle;
351
ToStringgrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsTestResourceType::Watcher::Event352 std::string ToString() const {
353 return Match(
354 payload,
355 [&](const absl::StatusOr<std::shared_ptr<const ResourceStruct>>&
356 resource) {
357 return absl::StrCat(
358 "{resource=",
359 resource.ok() ? (*resource)->name
360 : resource.status().ToString(),
361 ", read_delay_handle=", (read_delay_handle == nullptr),
362 "}");
363 },
364 [&](const absl::Status& status) {
365 return absl::StrCat("{ambient_error=", status.ToString(),
366 ", read_delay_handle=",
367 (read_delay_handle == nullptr), "}");
368 });
369 }
370 };
371
WaitForNextEvent()372 absl::optional<Event> WaitForNextEvent() {
373 while (true) {
374 {
375 MutexLock lock(&mu_);
376 if (!queue_.empty()) {
377 Event event = std::move(queue_.front());
378 queue_.pop_front();
379 return event;
380 }
381 if (event_engine_->IsIdle()) return absl::nullopt;
382 }
383 event_engine_->Tick();
384 }
385 }
386
387 struct OnResourceChangedEvent {
388 absl::StatusOr<std::shared_ptr<const ResourceStruct>> resource;
389 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle;
390 };
WaitForNextOnResourceChangedEvent(SourceLocation location=SourceLocation ())391 absl::optional<OnResourceChangedEvent> WaitForNextOnResourceChangedEvent(
392 SourceLocation location = SourceLocation()) {
393 auto event = WaitForNextEvent();
394 if (!event.has_value()) return absl::nullopt;
395 return MatchMutable(
396 &event->payload,
397 [&](absl::StatusOr<std::shared_ptr<const ResourceStruct>>* resource)
398 -> absl::optional<OnResourceChangedEvent> {
399 return OnResourceChangedEvent{
400 std::move(*resource), std::move(event->read_delay_handle)};
401 },
402 [&](absl::Status* status)
403 -> absl::optional<OnResourceChangedEvent> {
404 EXPECT_TRUE(false)
405 << "got unexpected ambient error: " << status->ToString()
406 << " at " << location.file() << ":" << location.line();
407 return absl::nullopt;
408 });
409 }
410
OnResourceChanged(absl::StatusOr<std::shared_ptr<const ResourceStruct>> resource,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)411 void OnResourceChanged(
412 absl::StatusOr<std::shared_ptr<const ResourceStruct>> resource,
413 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)
414 override {
415 MutexLock lock(&mu_);
416 queue_.emplace_back(
417 Event{std::move(resource), std::move(read_delay_handle)});
418 }
419
OnAmbientError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)420 void OnAmbientError(absl::Status status,
421 RefCountedPtr<XdsClient::ReadDelayHandle>
422 read_delay_handle) override {
423 MutexLock lock(&mu_);
424 queue_.push_back(
425 Event{std::move(status), std::move(read_delay_handle)});
426 }
427
428 std::shared_ptr<FuzzingEventEngine> event_engine_;
429
430 Mutex mu_;
431 std::deque<Event> queue_ ABSL_GUARDED_BY(&mu_);
432 };
433
type_url() const434 absl::string_view type_url() const override {
435 return ResourceStruct::TypeUrl();
436 }
Decode(const XdsResourceType::DecodeContext &,absl::string_view serialized_resource) const437 XdsResourceType::DecodeResult Decode(
438 const XdsResourceType::DecodeContext& /*context*/,
439 absl::string_view serialized_resource) const override {
440 auto json = JsonParse(serialized_resource);
441 XdsResourceType::DecodeResult result;
442 if (!json.ok()) {
443 result.resource = json.status();
444 } else {
445 absl::StatusOr<ResourceStruct> foo =
446 LoadFromJson<ResourceStruct>(*json);
447 if (!foo.ok()) {
448 auto it = json->object().find("name");
449 if (it != json->object().end()) {
450 result.name = it->second.string();
451 }
452 result.resource = foo.status();
453 } else {
454 result.name = foo->name;
455 result.resource = std::make_unique<ResourceStruct>(std::move(*foo));
456 }
457 }
458 return result;
459 }
AllResourcesRequiredInSotW() const460 bool AllResourcesRequiredInSotW() const override {
461 return all_resources_required_in_sotw;
462 }
InitUpbSymtab(XdsClient *,upb_DefPool *) const463 void InitUpbSymtab(XdsClient*, upb_DefPool* /*symtab*/) const override {}
464
EncodeAsAny(const ResourceStruct & resource)465 static google::protobuf::Any EncodeAsAny(const ResourceStruct& resource) {
466 google::protobuf::Any any;
467 any.set_type_url(
468 absl::StrCat("type.googleapis.com/", ResourceStruct::TypeUrl()));
469 any.set_value(resource.AsJsonString());
470 return any;
471 }
472 };
473
474 // A fake "Foo" xDS resource type.
475 struct XdsFooResource : public XdsResourceType::ResourceData {
476 std::string name;
477 uint32_t value;
478
479 XdsFooResource() = default;
XdsFooResourcegrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsFooResource480 XdsFooResource(std::string name, uint32_t value)
481 : name(std::move(name)), value(value) {}
482
operator ==grpc_core::testing::__anonc009e8040111::XdsClientTest::XdsFooResource483 bool operator==(const XdsFooResource& other) const {
484 return name == other.name && value == other.value;
485 }
486
AsJsonStringgrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsFooResource487 std::string AsJsonString() const {
488 return absl::StrCat("{\"name\":\"", name, "\",\"value\":", value, "}");
489 }
490
JsonLoadergrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsFooResource491 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
492 static const auto* loader = JsonObjectLoader<XdsFooResource>()
493 .Field("name", &XdsFooResource::name)
494 .Field("value", &XdsFooResource::value)
495 .Finish();
496 return loader;
497 }
498
TypeUrlgrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsFooResource499 static absl::string_view TypeUrl() { return "test.v3.foo"; }
500 };
501 using XdsFooResourceType = XdsTestResourceType<XdsFooResource, false>;
502
503 // A fake "Bar" xDS resource type.
504 struct XdsBarResource : public XdsResourceType::ResourceData {
505 std::string name;
506 std::string value;
507
508 XdsBarResource() = default;
XdsBarResourcegrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsBarResource509 XdsBarResource(std::string name, std::string value)
510 : name(std::move(name)), value(std::move(value)) {}
511
operator ==grpc_core::testing::__anonc009e8040111::XdsClientTest::XdsBarResource512 bool operator==(const XdsBarResource& other) const {
513 return name == other.name && value == other.value;
514 }
515
AsJsonStringgrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsBarResource516 std::string AsJsonString() const {
517 return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value,
518 "\"}");
519 }
520
JsonLoadergrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsBarResource521 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
522 static const auto* loader = JsonObjectLoader<XdsBarResource>()
523 .Field("name", &XdsBarResource::name)
524 .Field("value", &XdsBarResource::value)
525 .Finish();
526 return loader;
527 }
528
TypeUrlgrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsBarResource529 static absl::string_view TypeUrl() { return "test.v3.bar"; }
530 };
531 using XdsBarResourceType = XdsTestResourceType<XdsBarResource, false>;
532
533 // A fake "WildcardCapable" xDS resource type.
534 // This resource type return true for AllResourcesRequiredInSotW(),
535 // just like LDS and CDS.
536 struct XdsWildcardCapableResource : public XdsResourceType::ResourceData {
537 std::string name;
538 uint32_t value;
539
540 XdsWildcardCapableResource() = default;
XdsWildcardCapableResourcegrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsWildcardCapableResource541 XdsWildcardCapableResource(std::string name, uint32_t value)
542 : name(std::move(name)), value(value) {}
543
operator ==grpc_core::testing::__anonc009e8040111::XdsClientTest::XdsWildcardCapableResource544 bool operator==(const XdsWildcardCapableResource& other) const {
545 return name == other.name && value == other.value;
546 }
547
AsJsonStringgrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsWildcardCapableResource548 std::string AsJsonString() const {
549 return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value,
550 "\"}");
551 }
552
JsonLoadergrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsWildcardCapableResource553 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
554 static const auto* loader =
555 JsonObjectLoader<XdsWildcardCapableResource>()
556 .Field("name", &XdsWildcardCapableResource::name)
557 .Field("value", &XdsWildcardCapableResource::value)
558 .Finish();
559 return loader;
560 }
561
TypeUrlgrpc_core::testing::__anonc009e8040111::XdsClientTest::XdsWildcardCapableResource562 static absl::string_view TypeUrl() { return "test.v3.wildcard_capable"; }
563 };
564 using XdsWildcardCapableResourceType =
565 XdsTestResourceType<XdsWildcardCapableResource,
566 /*all_resources_required_in_sotw=*/true>;
567
568 // A helper class to build and serialize a DiscoveryResponse.
569 class ResponseBuilder {
570 public:
ResponseBuilder(absl::string_view type_url)571 explicit ResponseBuilder(absl::string_view type_url) {
572 response_.set_type_url(absl::StrCat("type.googleapis.com/", type_url));
573 }
574
set_version_info(absl::string_view version_info)575 ResponseBuilder& set_version_info(absl::string_view version_info) {
576 response_.set_version_info(std::string(version_info));
577 return *this;
578 }
set_nonce(absl::string_view nonce)579 ResponseBuilder& set_nonce(absl::string_view nonce) {
580 response_.set_nonce(std::string(nonce));
581 return *this;
582 }
583
584 template <typename ResourceType>
AddResource(const typename ResourceType::ResourceType & resource,bool in_resource_wrapper=false)585 ResponseBuilder& AddResource(
586 const typename ResourceType::ResourceType& resource,
587 bool in_resource_wrapper = false) {
588 auto* res = response_.add_resources();
589 *res = ResourceType::EncodeAsAny(resource);
590 if (in_resource_wrapper) {
591 envoy::service::discovery::v3::Resource resource_wrapper;
592 resource_wrapper.set_name(resource.name);
593 *resource_wrapper.mutable_resource() = std::move(*res);
594 res->PackFrom(resource_wrapper);
595 }
596 return *this;
597 }
598
AddFooResource(const XdsFooResource & resource,bool in_resource_wrapper=false)599 ResponseBuilder& AddFooResource(const XdsFooResource& resource,
600 bool in_resource_wrapper = false) {
601 return AddResource<XdsFooResourceType>(resource, in_resource_wrapper);
602 }
603
AddBarResource(const XdsBarResource & resource,bool in_resource_wrapper=false)604 ResponseBuilder& AddBarResource(const XdsBarResource& resource,
605 bool in_resource_wrapper = false) {
606 return AddResource<XdsBarResourceType>(resource, in_resource_wrapper);
607 }
608
AddWildcardCapableResource(const XdsWildcardCapableResource & resource,bool in_resource_wrapper=false)609 ResponseBuilder& AddWildcardCapableResource(
610 const XdsWildcardCapableResource& resource,
611 bool in_resource_wrapper = false) {
612 return AddResource<XdsWildcardCapableResourceType>(resource,
613 in_resource_wrapper);
614 }
615
AddInvalidResource(absl::string_view type_url,absl::string_view value,absl::string_view resource_wrapper_name="")616 ResponseBuilder& AddInvalidResource(
617 absl::string_view type_url, absl::string_view value,
618 absl::string_view resource_wrapper_name = "") {
619 auto* res = response_.add_resources();
620 res->set_type_url(absl::StrCat("type.googleapis.com/", type_url));
621 res->set_value(std::string(value));
622 if (!resource_wrapper_name.empty()) {
623 envoy::service::discovery::v3::Resource resource_wrapper;
624 resource_wrapper.set_name(std::string(resource_wrapper_name));
625 *resource_wrapper.mutable_resource() = std::move(*res);
626 res->PackFrom(resource_wrapper);
627 }
628 return *this;
629 }
630
AddInvalidResourceWrapper()631 ResponseBuilder& AddInvalidResourceWrapper() {
632 auto* res = response_.add_resources();
633 res->set_type_url(
634 "type.googleapis.com/envoy.service.discovery.v3.Resource");
635 res->set_value(std::string("\0", 1));
636 return *this;
637 }
638
AddEmptyResource()639 ResponseBuilder& AddEmptyResource() {
640 response_.add_resources();
641 return *this;
642 }
643
Serialize()644 std::string Serialize() {
645 std::string serialized_response;
646 EXPECT_TRUE(response_.SerializeToString(&serialized_response));
647 return serialized_response;
648 }
649
650 private:
651 DiscoveryResponse response_;
652 };
653
654 class MetricsReporter : public XdsMetricsReporter {
655 public:
656 using ResourceUpdateMap = std::map<
657 std::pair<std::string /*xds_server*/, std::string /*resource_type*/>,
658 uint64_t>;
659 using ServerFailureMap = std::map<std::string /*xds_server*/, uint64_t>;
660
MetricsReporter(std::shared_ptr<FuzzingEventEngine> event_engine)661 explicit MetricsReporter(std::shared_ptr<FuzzingEventEngine> event_engine)
662 : event_engine_(std::move(event_engine)) {}
663
resource_updates_valid() const664 ResourceUpdateMap resource_updates_valid() const {
665 MutexLock lock(&mu_);
666 return resource_updates_valid_;
667 }
resource_updates_invalid() const668 ResourceUpdateMap resource_updates_invalid() const {
669 MutexLock lock(&mu_);
670 return resource_updates_invalid_;
671 }
server_failures() const672 const ServerFailureMap& server_failures() const { return server_failures_; }
673
674 // Returns true if matchers return true before the timeout.
675 // Runs matchers once as soon as it is called and then again
676 // every time the metrics reporter sees an update.
WaitForMetricsReporterData(::testing::Matcher<ResourceUpdateMap> resource_updates_valid_matcher,::testing::Matcher<ResourceUpdateMap> resource_updates_invalid_matcher,::testing::Matcher<ServerFailureMap> server_failures_matcher,SourceLocation location=SourceLocation ())677 bool WaitForMetricsReporterData(
678 ::testing::Matcher<ResourceUpdateMap> resource_updates_valid_matcher,
679 ::testing::Matcher<ResourceUpdateMap> resource_updates_invalid_matcher,
680 ::testing::Matcher<ServerFailureMap> server_failures_matcher,
681 SourceLocation location = SourceLocation()) {
682 while (true) {
683 {
684 MutexLock lock(&mu_);
685 if (::testing::Matches(resource_updates_valid_matcher)(
686 resource_updates_valid_) &&
687 ::testing::Matches(resource_updates_invalid_matcher)(
688 resource_updates_invalid_) &&
689 ::testing::Matches(server_failures_matcher)(server_failures_)) {
690 return true;
691 }
692 if (event_engine_->IsIdle()) {
693 EXPECT_THAT(resource_updates_valid_, resource_updates_valid_matcher)
694 << location.file() << ":" << location.line();
695 EXPECT_THAT(resource_updates_invalid_,
696 resource_updates_invalid_matcher)
697 << location.file() << ":" << location.line();
698 EXPECT_THAT(server_failures_, server_failures_matcher)
699 << location.file() << ":" << location.line();
700 return false;
701 }
702 }
703 event_engine_->Tick();
704 }
705 }
706
707 private:
ReportResourceUpdates(absl::string_view xds_server,absl::string_view resource_type,uint64_t num_resources_valid,uint64_t num_resources_invalid)708 void ReportResourceUpdates(absl::string_view xds_server,
709 absl::string_view resource_type,
710 uint64_t num_resources_valid,
711 uint64_t num_resources_invalid) override {
712 MutexLock lock(&mu_);
713 auto key =
714 std::make_pair(std::string(xds_server), std::string(resource_type));
715 if (num_resources_valid > 0) {
716 resource_updates_valid_[key] += num_resources_valid;
717 }
718 if (num_resources_invalid > 0) {
719 resource_updates_invalid_[key] += num_resources_invalid;
720 }
721 cond_.SignalAll();
722 }
723
ReportServerFailure(absl::string_view xds_server)724 void ReportServerFailure(absl::string_view xds_server) override {
725 MutexLock lock(&mu_);
726 ++server_failures_[std::string(xds_server)];
727 cond_.SignalAll();
728 }
729
730 std::shared_ptr<FuzzingEventEngine> event_engine_;
731
732 mutable Mutex mu_;
733 ResourceUpdateMap resource_updates_valid_ ABSL_GUARDED_BY(mu_);
734 ResourceUpdateMap resource_updates_invalid_ ABSL_GUARDED_BY(mu_);
735 ServerFailureMap server_failures_ ABSL_GUARDED_BY(mu_);
736 CondVar cond_;
737 };
738
739 using ResourceCounts =
740 std::vector<std::pair<XdsClientTestPeer::ResourceCountLabels, uint64_t>>;
GetResourceCounts()741 ResourceCounts GetResourceCounts() {
742 ResourceCounts resource_counts;
743 XdsClientTestPeer(xds_client_.get())
744 .TestReportResourceCounts(
745 [&](const XdsClientTestPeer::ResourceCountLabels& labels,
746 uint64_t count) {
747 resource_counts.emplace_back(labels, count);
748 });
749 return resource_counts;
750 }
751
752 using ServerConnectionMap = std::map<std::string, bool>;
GetServerConnections()753 ServerConnectionMap GetServerConnections() {
754 ServerConnectionMap server_connection_map;
755 XdsClientTestPeer(xds_client_.get())
756 .TestReportServerConnections(
757 [&](absl::string_view xds_server, bool connected) {
758 std::string server(xds_server);
759 EXPECT_EQ(server_connection_map.find(server),
760 server_connection_map.end());
761 server_connection_map[std::move(server)] = connected;
762 });
763 return server_connection_map;
764 }
765
SetUp()766 void SetUp() override {
767 time_cache_.TestOnlySetNow(kTime0);
768 event_engine_ = std::make_shared<FuzzingEventEngine>(
769 FuzzingEventEngine::Options(), fuzzing_event_engine::Actions());
770 grpc_timer_manager_set_start_threaded(false);
771 grpc_init();
772 }
773
TearDown()774 void TearDown() override {
775 transport_factory_.reset();
776 xds_client_.reset();
777 event_engine_->FuzzingDone();
778 event_engine_->TickUntilIdle();
779 event_engine_->UnsetGlobalHooks();
780 grpc_event_engine::experimental::WaitForSingleOwner(
781 std::move(event_engine_));
782 grpc_shutdown_blocking();
783 }
784
785 // Sets transport_factory_ and initializes xds_client_ with the
786 // specified bootstrap config.
InitXdsClient(FakeXdsBootstrap::Builder bootstrap_builder=FakeXdsBootstrap::Builder ())787 void InitXdsClient(FakeXdsBootstrap::Builder bootstrap_builder =
788 FakeXdsBootstrap::Builder()) {
789 transport_factory_ = MakeRefCounted<FakeXdsTransportFactory>(
790 []() { FAIL() << "Multiple concurrent reads"; }, event_engine_);
791 auto metrics_reporter = std::make_unique<MetricsReporter>(event_engine_);
792 metrics_reporter_ = metrics_reporter.get();
793 xds_client_ = MakeRefCounted<XdsClient>(
794 bootstrap_builder.Build(), transport_factory_, event_engine_,
795 std::move(metrics_reporter), "foo agent", "foo version");
796 }
797
798 // Starts and cancels a watch for a Foo resource.
StartFooWatch(absl::string_view resource_name)799 RefCountedPtr<XdsFooResourceType::Watcher> StartFooWatch(
800 absl::string_view resource_name) {
801 auto watcher = MakeRefCounted<XdsFooResourceType::Watcher>(event_engine_);
802 XdsFooResourceType::StartWatch(xds_client_.get(), resource_name, watcher);
803 return watcher;
804 }
CancelFooWatch(XdsFooResourceType::Watcher * watcher,absl::string_view resource_name,bool delay_unsubscription=false)805 void CancelFooWatch(XdsFooResourceType::Watcher* watcher,
806 absl::string_view resource_name,
807 bool delay_unsubscription = false) {
808 XdsFooResourceType::CancelWatch(xds_client_.get(), resource_name, watcher,
809 delay_unsubscription);
810 }
811
812 // Starts and cancels a watch for a Bar resource.
StartBarWatch(absl::string_view resource_name)813 RefCountedPtr<XdsBarResourceType::Watcher> StartBarWatch(
814 absl::string_view resource_name) {
815 auto watcher = MakeRefCounted<XdsBarResourceType::Watcher>(event_engine_);
816 XdsBarResourceType::StartWatch(xds_client_.get(), resource_name, watcher);
817 return watcher;
818 }
CancelBarWatch(XdsBarResourceType::Watcher * watcher,absl::string_view resource_name,bool delay_unsubscription=false)819 void CancelBarWatch(XdsBarResourceType::Watcher* watcher,
820 absl::string_view resource_name,
821 bool delay_unsubscription = false) {
822 XdsBarResourceType::CancelWatch(xds_client_.get(), resource_name, watcher,
823 delay_unsubscription);
824 }
825
826 // Starts and cancels a watch for a WildcardCapable resource.
827 RefCountedPtr<XdsWildcardCapableResourceType::Watcher>
StartWildcardCapableWatch(absl::string_view resource_name)828 StartWildcardCapableWatch(absl::string_view resource_name) {
829 auto watcher =
830 MakeRefCounted<XdsWildcardCapableResourceType::Watcher>(event_engine_);
831 XdsWildcardCapableResourceType::StartWatch(xds_client_.get(), resource_name,
832 watcher);
833 return watcher;
834 }
CancelWildcardCapableWatch(XdsWildcardCapableResourceType::Watcher * watcher,absl::string_view resource_name,bool delay_unsubscription=false)835 void CancelWildcardCapableWatch(
836 XdsWildcardCapableResourceType::Watcher* watcher,
837 absl::string_view resource_name, bool delay_unsubscription = false) {
838 XdsWildcardCapableResourceType::CancelWatch(
839 xds_client_.get(), resource_name, watcher, delay_unsubscription);
840 }
841
WaitForAdsStream(const XdsBootstrap::XdsServer & xds_server)842 RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream(
843 const XdsBootstrap::XdsServer& xds_server) {
844 return transport_factory_->WaitForStream(
845 xds_server, FakeXdsTransportFactory::kAdsMethod);
846 }
847
WaitForAdsStream()848 RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> WaitForAdsStream() {
849 return WaitForAdsStream(*xds_client_->bootstrap().servers().front());
850 }
851
TriggerConnectionFailure(const XdsBootstrap::XdsServer & xds_server,absl::Status status)852 void TriggerConnectionFailure(const XdsBootstrap::XdsServer& xds_server,
853 absl::Status status) {
854 transport_factory_->TriggerConnectionFailure(xds_server, std::move(status));
855 }
856
857 // Gets the latest request sent to the fake xDS server.
WaitForRequest(FakeXdsTransportFactory::FakeStreamingCall * stream,SourceLocation location=SourceLocation ())858 absl::optional<DiscoveryRequest> WaitForRequest(
859 FakeXdsTransportFactory::FakeStreamingCall* stream,
860 SourceLocation location = SourceLocation()) {
861 auto message = stream->WaitForMessageFromClient();
862 if (!message.has_value()) return absl::nullopt;
863 DiscoveryRequest request;
864 bool success = request.ParseFromString(*message);
865 EXPECT_TRUE(success) << "Failed to deserialize DiscoveryRequest at "
866 << location.file() << ":" << location.line();
867 if (!success) return absl::nullopt;
868 return std::move(request);
869 }
870
871 // Helper function to check the fields of a DiscoveryRequest.
CheckRequest(const DiscoveryRequest & request,absl::string_view type_url,absl::string_view version_info,absl::string_view response_nonce,const absl::Status & error_detail,const std::set<absl::string_view> & resource_names,SourceLocation location=SourceLocation ())872 void CheckRequest(const DiscoveryRequest& request, absl::string_view type_url,
873 absl::string_view version_info,
874 absl::string_view response_nonce,
875 const absl::Status& error_detail,
876 const std::set<absl::string_view>& resource_names,
877 SourceLocation location = SourceLocation()) {
878 EXPECT_EQ(request.type_url(),
879 absl::StrCat("type.googleapis.com/", type_url))
880 << location.file() << ":" << location.line();
881 EXPECT_EQ(request.version_info(), version_info)
882 << location.file() << ":" << location.line();
883 EXPECT_EQ(request.response_nonce(), response_nonce)
884 << location.file() << ":" << location.line();
885 if (error_detail.ok()) {
886 EXPECT_FALSE(request.has_error_detail())
887 << location.file() << ":" << location.line();
888 } else {
889 EXPECT_EQ(request.error_detail().code(),
890 static_cast<int>(error_detail.code()))
891 << location.file() << ":" << location.line();
892 EXPECT_EQ(request.error_detail().message(), error_detail.message())
893 << location.file() << ":" << location.line();
894 }
895 EXPECT_THAT(request.resource_names(),
896 ::testing::UnorderedElementsAreArray(resource_names))
897 << location.file() << ":" << location.line();
898 }
899
900 // Helper function to check the contents of the node message in a
901 // request against the client's node info.
CheckRequestNode(const DiscoveryRequest & request,SourceLocation location=SourceLocation ())902 void CheckRequestNode(const DiscoveryRequest& request,
903 SourceLocation location = SourceLocation()) {
904 return CheckNode(request.node(), location);
905 }
906
907 // Helper function to check the contents of a node message against the
908 // client's node info.
CheckNode(const envoy::config::core::v3::Node & node,SourceLocation location=SourceLocation ())909 void CheckNode(const envoy::config::core::v3::Node& node,
910 SourceLocation location = SourceLocation()) {
911 // These fields come from the bootstrap config.
912 EXPECT_EQ(node.id(), xds_client_->bootstrap().node()->id())
913 << location.file() << ":" << location.line();
914 EXPECT_EQ(node.cluster(), xds_client_->bootstrap().node()->cluster())
915 << location.file() << ":" << location.line();
916 EXPECT_EQ(node.locality().region(),
917 xds_client_->bootstrap().node()->locality_region())
918 << location.file() << ":" << location.line();
919 EXPECT_EQ(node.locality().zone(),
920 xds_client_->bootstrap().node()->locality_zone())
921 << location.file() << ":" << location.line();
922 EXPECT_EQ(node.locality().sub_zone(),
923 xds_client_->bootstrap().node()->locality_sub_zone())
924 << location.file() << ":" << location.line();
925 if (xds_client_->bootstrap().node()->metadata().empty()) {
926 EXPECT_FALSE(node.has_metadata())
927 << location.file() << ":" << location.line();
928 } else {
929 std::string metadata_json_str;
930 auto status =
931 MessageToJsonString(node.metadata(), &metadata_json_str,
932 GRPC_CUSTOM_JSONUTIL::JsonPrintOptions());
933 ASSERT_TRUE(status.ok())
934 << status << " on " << location.file() << ":" << location.line();
935 auto metadata_json = JsonParse(metadata_json_str);
936 ASSERT_TRUE(metadata_json.ok())
937 << metadata_json.status() << " on " << location.file() << ":"
938 << location.line();
939 Json expected =
940 Json::FromObject(xds_client_->bootstrap().node()->metadata());
941 EXPECT_EQ(*metadata_json, expected)
942 << location.file() << ":" << location.line()
943 << ":\nexpected: " << JsonDump(expected)
944 << "\nactual: " << JsonDump(*metadata_json);
945 }
946 EXPECT_EQ(node.user_agent_name(), "foo agent")
947 << location.file() << ":" << location.line();
948 EXPECT_EQ(node.user_agent_version(), "foo version")
949 << location.file() << ":" << location.line();
950 }
951
DumpCsds(SourceLocation location=SourceLocation ())952 ClientConfig DumpCsds(SourceLocation location = SourceLocation()) {
953 std::string client_config_serialized =
954 XdsClientTestPeer(xds_client_.get()).TestDumpClientConfig();
955 ClientConfig client_config_proto;
956 CHECK(client_config_proto.ParseFromString(client_config_serialized))
957 << "at " << location.file() << ":" << location.line();
958 CheckNode(client_config_proto.node(), location);
959 return client_config_proto;
960 }
961
962 ScopedTimeCache time_cache_;
963 std::shared_ptr<FuzzingEventEngine> event_engine_;
964 RefCountedPtr<FakeXdsTransportFactory> transport_factory_;
965 RefCountedPtr<XdsClient> xds_client_;
966 MetricsReporter* metrics_reporter_ = nullptr;
967 };
968
969 MATCHER_P3(ResourceCountLabelsEq, xds_authority, resource_type, cache_state,
970 "equals ResourceCountLabels") {
971 bool ok = true;
972 ok &= ::testing::ExplainMatchResult(xds_authority, arg.xds_authority,
973 result_listener);
974 ok &= ::testing::ExplainMatchResult(resource_type, arg.resource_type,
975 result_listener);
976 ok &= ::testing::ExplainMatchResult(cache_state, arg.cache_state,
977 result_listener);
978 return ok;
979 }
980
981 MATCHER_P(TimestampProtoEq, timestamp, "equals timestamp") {
982 gpr_timespec ts = {arg.seconds(), arg.nanos(), GPR_CLOCK_REALTIME};
983 return ::testing::ExplainMatchResult(
984 timestamp, Timestamp::FromTimespecRoundDown(ts), result_listener);
985 }
986
987 // Matches a CSDS ClientConfig proto.
988 //
989 // The resource_fields argument is a matcher that must validate the
990 // xds_config, version_info, and last_updated fields. Examples are
991 // CsdsResourceFields() and CsdsNoResourceFields().
992 //
993 // The error_fields argument is a matcher that must validate the
994 // error_state field. Examples are CsdsErrorFields(),
995 // CsdsErrorDetailsOnly(), and CsdsNoErrorFields().
996 MATCHER_P5(CsdsResourceEq, client_status, type_url, name, resource_fields,
997 error_fields, "equals CSDS resource") {
998 bool ok = true;
999 ok &= ::testing::ExplainMatchResult(arg.client_status(), client_status,
1000 result_listener);
1001 ok &= ::testing::ExplainMatchResult(
1002 absl::StrCat("type.googleapis.com/", type_url), arg.type_url(),
1003 result_listener);
1004 ok &= ::testing::ExplainMatchResult(name, arg.name(), result_listener);
1005 ok &= ::testing::ExplainMatchResult(resource_fields, arg, result_listener);
1006 ok &= ::testing::ExplainMatchResult(error_fields, arg, result_listener);
1007 return ok;
1008 }
1009
1010 // Validates the resource fields in a CSDS ClientConfig proto. Intended
1011 // for use with CsdsResourceEq().
1012 MATCHER_P3(CsdsResourceFields, resource, version, last_updated,
1013 "CSDS resource fields") {
1014 bool ok = true;
1015 ok &= ::testing::ExplainMatchResult(resource, arg.xds_config().value(),
1016 result_listener);
1017 ok &= ::testing::ExplainMatchResult(version, arg.version_info(),
1018 result_listener);
1019 ok &= ::testing::ExplainMatchResult(last_updated, arg.last_updated(),
1020 result_listener);
1021 return ok;
1022 }
1023
1024 // Validates the resource fields are not present in a CSDS ClientConfig
1025 // proto. Intended for use with CsdsResourceEq().
1026 MATCHER(CsdsNoResourceFields, "CSDS has no resource fields") {
1027 bool ok = true;
1028 ok &= ::testing::ExplainMatchResult(::testing::IsFalse(),
1029 arg.has_xds_config(), result_listener);
1030 ok &= ::testing::ExplainMatchResult("", arg.version_info(), result_listener);
1031 ok &= ::testing::ExplainMatchResult(::testing::IsFalse(),
1032 arg.has_last_updated(), result_listener);
1033 return ok;
1034 }
1035
1036 // Validates the error fields in a CSDS ClientConfig proto. Intended
1037 // for use with CsdsResourceEq().
1038 MATCHER_P3(CsdsErrorFields, error_details, error_version, error_time,
1039 "CSDS error fields") {
1040 bool ok = true;
1041 ok &= ::testing::ExplainMatchResult(
1042 error_details, arg.error_state().details(), result_listener);
1043 ok &= ::testing::ExplainMatchResult(
1044 error_version, arg.error_state().version_info(), result_listener);
1045 ok &= ::testing::ExplainMatchResult(
1046 error_time, arg.error_state().last_update_attempt(), result_listener);
1047 return ok;
1048 }
1049
1050 // Same as CsdsErrorFields, but expects the error details without a
1051 // version or timestamp.
1052 MATCHER_P(CsdsErrorDetailsOnly, error_details, "CSDS error details only") {
1053 bool ok = true;
1054 ok &= ::testing::ExplainMatchResult(
1055 error_details, arg.error_state().details(), result_listener);
1056 ok &= ::testing::ExplainMatchResult("", arg.error_state().version_info(),
1057 result_listener);
1058 ok &= ::testing::ExplainMatchResult(
1059 ::testing::IsFalse(), arg.error_state().has_last_update_attempt(),
1060 result_listener);
1061 return ok;
1062 }
1063
1064 // Validates that there is no error in a CSDS ClientConfig proto. Intended
1065 // for use with CsdsResourceEq().
1066 MATCHER(CsdsNoErrorFields, "CSDS has no error fields") {
1067 return ::testing::ExplainMatchResult(::testing::IsFalse(),
1068 arg.has_error_state(), result_listener);
1069 }
1070
1071 // Convenient wrapper for ACKED resources in CSDS.
1072 MATCHER_P5(CsdsResourceAcked, type_url, name, resource, version, last_updated,
1073 "equals CSDS ACKED resource") {
1074 return ::testing::ExplainMatchResult(
1075 CsdsResourceEq(ClientResourceStatus::ACKED, type_url, name,
1076 CsdsResourceFields(resource, version, last_updated),
1077 CsdsNoErrorFields()),
1078 arg, result_listener);
1079 }
1080
1081 // Convenient wrapper for REQUESTED resources in CSDS.
1082 MATCHER_P2(CsdsResourceRequested, type_url, name,
1083 "equals CSDS requested resource") {
1084 return ::testing::ExplainMatchResult(
1085 CsdsResourceEq(ClientResourceStatus::REQUESTED, type_url, name,
1086 CsdsNoResourceFields(), CsdsNoErrorFields()),
1087 arg, result_listener);
1088 }
1089
1090 // Convenient wrapper for DOES_NOT_EXIST resources in CSDS.
1091 MATCHER_P2(CsdsResourceDoesNotExist, type_url, name,
1092 "equals CSDS does-not-exist resource") {
1093 return ::testing::ExplainMatchResult(
1094 CsdsResourceEq(ClientResourceStatus::DOES_NOT_EXIST, type_url, name,
1095 CsdsNoResourceFields(), CsdsNoErrorFields()),
1096 arg, result_listener);
1097 }
1098
TEST_F(XdsClientTest,BasicWatch)1099 TEST_F(XdsClientTest, BasicWatch) {
1100 InitXdsClient();
1101 // Metrics should initially be empty.
1102 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1103 ::testing::ElementsAre());
1104 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1105 ::testing::ElementsAre());
1106 EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
1107 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
1108 EXPECT_THAT(metrics_reporter_->server_failures(), ::testing::ElementsAre());
1109 // CSDS should initially be empty.
1110 ClientConfig csds = DumpCsds();
1111 EXPECT_THAT(csds.generic_xds_configs(), ::testing::ElementsAre());
1112 // Start a watch for "foo1".
1113 auto watcher = StartFooWatch("foo1");
1114 // Check metrics.
1115 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1116 ::testing::ElementsAre());
1117 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1118 ::testing::ElementsAre());
1119 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
1120 kDefaultXdsServerUrl, true)));
1121 EXPECT_THAT(GetResourceCounts(),
1122 ::testing::ElementsAre(::testing::Pair(
1123 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1124 XdsFooResourceType::Get()->type_url(),
1125 "requested"),
1126 1)));
1127 // CSDS should show that the resource has been requested.
1128 csds = DumpCsds();
1129 EXPECT_THAT(csds.generic_xds_configs(),
1130 ::testing::ElementsAre(CsdsResourceRequested(
1131 XdsFooResourceType::Get()->type_url(), "foo1")));
1132 // Watcher should initially not see any resource reported.
1133 EXPECT_FALSE(watcher->HasEvent());
1134 // XdsClient should have created an ADS stream.
1135 auto stream = WaitForAdsStream();
1136 ASSERT_TRUE(stream != nullptr);
1137 // XdsClient should have sent a subscription request on the ADS stream.
1138 auto request = WaitForRequest(stream.get());
1139 ASSERT_TRUE(request.has_value());
1140 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1141 /*version_info=*/"", /*response_nonce=*/"",
1142 /*error_detail=*/absl::OkStatus(),
1143 /*resource_names=*/{"foo1"});
1144 CheckRequestNode(*request); // Should be present on the first request.
1145 // Send a response.
1146 stream->SendMessageToClient(
1147 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1148 .set_version_info("1")
1149 .set_nonce("A")
1150 .AddFooResource(XdsFooResource("foo1", 6))
1151 .Serialize());
1152 // XdsClient should have delivered the response to the watcher.
1153 auto resource = watcher->WaitForNextResource();
1154 ASSERT_NE(resource, nullptr);
1155 EXPECT_EQ(resource->name, "foo1");
1156 EXPECT_EQ(resource->value, 6);
1157 // Check metric data.
1158 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1159 ::testing::ElementsAre(::testing::Pair(
1160 ::testing::Pair(kDefaultXdsServerUrl,
1161 XdsFooResourceType::Get()->type_url()),
1162 1)),
1163 ::testing::ElementsAre(), ::testing::_));
1164 EXPECT_THAT(
1165 GetResourceCounts(),
1166 ::testing::ElementsAre(::testing::Pair(
1167 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1168 XdsFooResourceType::Get()->type_url(), "acked"),
1169 1)));
1170 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
1171 kDefaultXdsServerUrl, true)));
1172 // Check CSDS data.
1173 csds = DumpCsds();
1174 EXPECT_THAT(csds.generic_xds_configs(),
1175 ::testing::ElementsAre(CsdsResourceAcked(
1176 XdsFooResourceType::Get()->type_url(), "foo1",
1177 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
1178 // XdsClient should have sent an ACK message to the xDS server.
1179 request = WaitForRequest(stream.get());
1180 ASSERT_TRUE(request.has_value());
1181 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1182 /*version_info=*/"1", /*response_nonce=*/"A",
1183 /*error_detail=*/absl::OkStatus(),
1184 /*resource_names=*/{"foo1"});
1185 // Cancel watch.
1186 CancelFooWatch(watcher.get(), "foo1");
1187 EXPECT_TRUE(stream->IsOrphaned());
1188 // Check metric data.
1189 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1190 ::testing::ElementsAre(::testing::Pair(
1191 ::testing::Pair(kDefaultXdsServerUrl,
1192 XdsFooResourceType::Get()->type_url()),
1193 1)),
1194 ::testing::ElementsAre(), ::testing::_));
1195 EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
1196 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
1197 // Check CSDS data.
1198 csds = DumpCsds();
1199 EXPECT_THAT(csds.generic_xds_configs(), ::testing::ElementsAre());
1200 }
1201
TEST_F(XdsClientTest,UpdateFromServer)1202 TEST_F(XdsClientTest, UpdateFromServer) {
1203 InitXdsClient();
1204 // Start a watch for "foo1".
1205 auto watcher = StartFooWatch("foo1");
1206 // Watcher should initially not see any resource reported.
1207 EXPECT_FALSE(watcher->HasEvent());
1208 // XdsClient should have created an ADS stream.
1209 auto stream = WaitForAdsStream();
1210 ASSERT_TRUE(stream != nullptr);
1211 // XdsClient should have sent a subscription request on the ADS stream.
1212 auto request = WaitForRequest(stream.get());
1213 ASSERT_TRUE(request.has_value());
1214 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1215 /*version_info=*/"", /*response_nonce=*/"",
1216 /*error_detail=*/absl::OkStatus(),
1217 /*resource_names=*/{"foo1"});
1218 CheckRequestNode(*request); // Should be present on the first request.
1219 // Send a response.
1220 stream->SendMessageToClient(
1221 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1222 .set_version_info("1")
1223 .set_nonce("A")
1224 .AddFooResource(XdsFooResource("foo1", 6))
1225 .Serialize());
1226 // XdsClient should have delivered the response to the watcher.
1227 auto resource = watcher->WaitForNextResource();
1228 ASSERT_NE(resource, nullptr);
1229 EXPECT_EQ(resource->name, "foo1");
1230 EXPECT_EQ(resource->value, 6);
1231 // Check metric data.
1232 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1233 ::testing::ElementsAre(::testing::Pair(
1234 ::testing::Pair(kDefaultXdsServerUrl,
1235 XdsFooResourceType::Get()->type_url()),
1236 1)),
1237 ::testing::ElementsAre(), ::testing::_));
1238 EXPECT_THAT(
1239 GetResourceCounts(),
1240 ::testing::ElementsAre(::testing::Pair(
1241 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1242 XdsFooResourceType::Get()->type_url(), "acked"),
1243 1)));
1244 // Check CSDS data.
1245 ClientConfig csds = DumpCsds();
1246 EXPECT_THAT(csds.generic_xds_configs(),
1247 ::testing::ElementsAre(CsdsResourceAcked(
1248 XdsFooResourceType::Get()->type_url(), "foo1",
1249 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
1250 // XdsClient should have sent an ACK message to the xDS server.
1251 request = WaitForRequest(stream.get());
1252 ASSERT_TRUE(request.has_value());
1253 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1254 /*version_info=*/"1", /*response_nonce=*/"A",
1255 /*error_detail=*/absl::OkStatus(),
1256 /*resource_names=*/{"foo1"});
1257 // Server sends an updated version of the resource.
1258 // We increment time to make sure that the CSDS data gets a new timestamp.
1259 time_cache_.TestOnlySetNow(kTime1);
1260 stream->SendMessageToClient(
1261 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1262 .set_version_info("2")
1263 .set_nonce("B")
1264 .AddFooResource(XdsFooResource("foo1", 9))
1265 .Serialize());
1266 // XdsClient should have delivered the response to the watcher.
1267 resource = watcher->WaitForNextResource();
1268 ASSERT_NE(resource, nullptr);
1269 EXPECT_EQ(resource->name, "foo1");
1270 EXPECT_EQ(resource->value, 9);
1271 // Check metric data.
1272 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1273 ::testing::ElementsAre(::testing::Pair(
1274 ::testing::Pair(kDefaultXdsServerUrl,
1275 XdsFooResourceType::Get()->type_url()),
1276 2)),
1277 ::testing::ElementsAre(), ::testing::_));
1278 EXPECT_THAT(
1279 GetResourceCounts(),
1280 ::testing::ElementsAre(::testing::Pair(
1281 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1282 XdsFooResourceType::Get()->type_url(), "acked"),
1283 1)));
1284 // Check CSDS data.
1285 csds = DumpCsds();
1286 EXPECT_THAT(csds.generic_xds_configs(),
1287 ::testing::ElementsAre(CsdsResourceAcked(
1288 XdsFooResourceType::Get()->type_url(), "foo1",
1289 resource->AsJsonString(), "2", TimestampProtoEq(kTime1))));
1290 // XdsClient should have sent an ACK message to the xDS server.
1291 request = WaitForRequest(stream.get());
1292 ASSERT_TRUE(request.has_value());
1293 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1294 /*version_info=*/"2", /*response_nonce=*/"B",
1295 /*error_detail=*/absl::OkStatus(),
1296 /*resource_names=*/{"foo1"});
1297 // Cancel watch.
1298 CancelFooWatch(watcher.get(), "foo1");
1299 EXPECT_TRUE(stream->IsOrphaned());
1300 }
1301
TEST_F(XdsClientTest,MultipleWatchersForSameResource)1302 TEST_F(XdsClientTest, MultipleWatchersForSameResource) {
1303 InitXdsClient();
1304 // Start a watch for "foo1".
1305 auto watcher = StartFooWatch("foo1");
1306 // Watcher should initially not see any resource reported.
1307 EXPECT_FALSE(watcher->HasEvent());
1308 // XdsClient should have created an ADS stream.
1309 auto stream = WaitForAdsStream();
1310 ASSERT_TRUE(stream != nullptr);
1311 // XdsClient should have sent a subscription request on the ADS stream.
1312 auto request = WaitForRequest(stream.get());
1313 ASSERT_TRUE(request.has_value());
1314 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1315 /*version_info=*/"", /*response_nonce=*/"",
1316 /*error_detail=*/absl::OkStatus(),
1317 /*resource_names=*/{"foo1"});
1318 CheckRequestNode(*request); // Should be present on the first request.
1319 // Send a response.
1320 stream->SendMessageToClient(
1321 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1322 .set_version_info("1")
1323 .set_nonce("A")
1324 .AddFooResource(XdsFooResource("foo1", 6))
1325 .Serialize());
1326 // XdsClient should have delivered the response to the watcher.
1327 auto resource = watcher->WaitForNextResource();
1328 ASSERT_NE(resource, nullptr);
1329 EXPECT_EQ(resource->name, "foo1");
1330 EXPECT_EQ(resource->value, 6);
1331 // Check metric data.
1332 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1333 ::testing::ElementsAre(::testing::Pair(
1334 ::testing::Pair(kDefaultXdsServerUrl,
1335 XdsFooResourceType::Get()->type_url()),
1336 1)),
1337 ::testing::ElementsAre(), ::testing::_));
1338 EXPECT_THAT(
1339 GetResourceCounts(),
1340 ::testing::ElementsAre(::testing::Pair(
1341 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1342 XdsFooResourceType::Get()->type_url(), "acked"),
1343 1)));
1344 // Check CSDS data.
1345 ClientConfig csds = DumpCsds();
1346 EXPECT_THAT(csds.generic_xds_configs(),
1347 ::testing::ElementsAre(CsdsResourceAcked(
1348 XdsFooResourceType::Get()->type_url(), "foo1",
1349 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
1350 // XdsClient should have sent an ACK message to the xDS server.
1351 request = WaitForRequest(stream.get());
1352 ASSERT_TRUE(request.has_value());
1353 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1354 /*version_info=*/"1", /*response_nonce=*/"A",
1355 /*error_detail=*/absl::OkStatus(),
1356 /*resource_names=*/{"foo1"});
1357 // Start a second watcher for the same resource.
1358 auto watcher2 = StartFooWatch("foo1");
1359 // This watcher should get an immediate notification, because the
1360 // resource is already cached.
1361 resource = watcher2->WaitForNextResource();
1362 ASSERT_NE(resource, nullptr);
1363 EXPECT_EQ(resource->name, "foo1");
1364 EXPECT_EQ(resource->value, 6);
1365 // Server should not have seen another request from the client.
1366 ASSERT_FALSE(stream->HaveMessageFromClient());
1367 // Server sends an updated version of the resource.
1368 stream->SendMessageToClient(
1369 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1370 .set_version_info("2")
1371 .set_nonce("B")
1372 .AddFooResource(XdsFooResource("foo1", 9))
1373 .Serialize());
1374 // XdsClient should deliver the response to both watchers.
1375 resource = watcher->WaitForNextResource();
1376 ASSERT_NE(resource, nullptr);
1377 EXPECT_EQ(resource->name, "foo1");
1378 EXPECT_EQ(resource->value, 9);
1379 resource = watcher2->WaitForNextResource();
1380 ASSERT_NE(resource, nullptr);
1381 EXPECT_EQ(resource->name, "foo1");
1382 EXPECT_EQ(resource->value, 9);
1383 // Check metric data.
1384 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1385 ::testing::ElementsAre(::testing::Pair(
1386 ::testing::Pair(kDefaultXdsServerUrl,
1387 XdsFooResourceType::Get()->type_url()),
1388 2)),
1389 ::testing::ElementsAre(), ::testing::_));
1390 EXPECT_THAT(
1391 GetResourceCounts(),
1392 ::testing::ElementsAre(::testing::Pair(
1393 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1394 XdsFooResourceType::Get()->type_url(), "acked"),
1395 1)));
1396 // Check CSDS data.
1397 csds = DumpCsds();
1398 EXPECT_THAT(csds.generic_xds_configs(),
1399 ::testing::ElementsAre(CsdsResourceAcked(
1400 XdsFooResourceType::Get()->type_url(), "foo1",
1401 resource->AsJsonString(), "2", TimestampProtoEq(kTime0))));
1402 // XdsClient should have sent an ACK message to the xDS server.
1403 request = WaitForRequest(stream.get());
1404 ASSERT_TRUE(request.has_value());
1405 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1406 /*version_info=*/"2", /*response_nonce=*/"B",
1407 /*error_detail=*/absl::OkStatus(),
1408 /*resource_names=*/{"foo1"});
1409 // Cancel one of the watchers.
1410 CancelFooWatch(watcher.get(), "foo1");
1411 // The server should not see any new request.
1412 ASSERT_FALSE(WaitForRequest(stream.get()));
1413 // Now cancel the second watcher.
1414 CancelFooWatch(watcher2.get(), "foo1");
1415 EXPECT_TRUE(stream->IsOrphaned());
1416 }
1417
TEST_F(XdsClientTest,SubscribeToMultipleResources)1418 TEST_F(XdsClientTest, SubscribeToMultipleResources) {
1419 InitXdsClient();
1420 // Start a watch for "foo1".
1421 auto watcher = StartFooWatch("foo1");
1422 // Watcher should initially not see any resource reported.
1423 EXPECT_FALSE(watcher->HasEvent());
1424 // Check metrics.
1425 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1426 ::testing::ElementsAre());
1427 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1428 ::testing::ElementsAre());
1429 EXPECT_THAT(GetResourceCounts(),
1430 ::testing::ElementsAre(::testing::Pair(
1431 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1432 XdsFooResourceType::Get()->type_url(),
1433 "requested"),
1434 1)));
1435 // CSDS should show that the resource has been requested.
1436 ClientConfig csds = DumpCsds();
1437 EXPECT_THAT(csds.generic_xds_configs(),
1438 ::testing::ElementsAre(CsdsResourceRequested(
1439 XdsFooResourceType::Get()->type_url(), "foo1")));
1440 // XdsClient should have created an ADS stream.
1441 auto stream = WaitForAdsStream();
1442 ASSERT_TRUE(stream != nullptr);
1443 // XdsClient should have sent a subscription request on the ADS stream.
1444 auto request = WaitForRequest(stream.get());
1445 ASSERT_TRUE(request.has_value());
1446 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1447 /*version_info=*/"", /*response_nonce=*/"",
1448 /*error_detail=*/absl::OkStatus(),
1449 /*resource_names=*/{"foo1"});
1450 CheckRequestNode(*request); // Should be present on the first request.
1451 // Send a response.
1452 stream->SendMessageToClient(
1453 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1454 .set_version_info("1")
1455 .set_nonce("A")
1456 .AddFooResource(XdsFooResource("foo1", 6))
1457 .Serialize());
1458 // XdsClient should have delivered the response to the watcher.
1459 auto resource = watcher->WaitForNextResource();
1460 ASSERT_NE(resource, nullptr);
1461 EXPECT_EQ(resource->name, "foo1");
1462 EXPECT_EQ(resource->value, 6);
1463 // Check metric data.
1464 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1465 ::testing::ElementsAre(::testing::Pair(
1466 ::testing::Pair(kDefaultXdsServerUrl,
1467 XdsFooResourceType::Get()->type_url()),
1468 1)),
1469 ::testing::ElementsAre(), ::testing::_));
1470 EXPECT_THAT(
1471 GetResourceCounts(),
1472 ::testing::ElementsAre(::testing::Pair(
1473 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1474 XdsFooResourceType::Get()->type_url(), "acked"),
1475 1)));
1476 // Check CSDS data.
1477 csds = DumpCsds();
1478 EXPECT_THAT(csds.generic_xds_configs(),
1479 ::testing::ElementsAre(CsdsResourceAcked(
1480 XdsFooResourceType::Get()->type_url(), "foo1",
1481 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
1482 // XdsClient should have sent an ACK message to the xDS server.
1483 request = WaitForRequest(stream.get());
1484 ASSERT_TRUE(request.has_value());
1485 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1486 /*version_info=*/"1", /*response_nonce=*/"A",
1487 /*error_detail=*/absl::OkStatus(),
1488 /*resource_names=*/{"foo1"});
1489 // Start a watch for "foo2".
1490 auto watcher2 = StartFooWatch("foo2");
1491 // Check metric data.
1492 EXPECT_THAT(
1493 GetResourceCounts(),
1494 ::testing::ElementsAre(
1495 ::testing::Pair(ResourceCountLabelsEq(
1496 XdsClient::kOldStyleAuthority,
1497 XdsFooResourceType::Get()->type_url(), "acked"),
1498 1),
1499 ::testing::Pair(
1500 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1501 XdsFooResourceType::Get()->type_url(),
1502 "requested"),
1503 1)));
1504 // Check CSDS data.
1505 csds = DumpCsds();
1506 EXPECT_THAT(csds.generic_xds_configs(),
1507 ::testing::UnorderedElementsAre(
1508 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
1509 "foo1", resource->AsJsonString(), "1",
1510 TimestampProtoEq(kTime0)),
1511 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(),
1512 "foo2")));
1513 // XdsClient should have sent a subscription request on the ADS stream.
1514 request = WaitForRequest(stream.get());
1515 ASSERT_TRUE(request.has_value());
1516 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1517 /*version_info=*/"1", /*response_nonce=*/"A",
1518 /*error_detail=*/absl::OkStatus(),
1519 /*resource_names=*/{"foo1", "foo2"});
1520 // Send a response.
1521 // We increment time to make sure that the CSDS data gets a new timestamp.
1522 time_cache_.TestOnlySetNow(kTime1);
1523 stream->SendMessageToClient(
1524 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1525 .set_version_info("1")
1526 .set_nonce("B")
1527 .AddFooResource(XdsFooResource("foo2", 7))
1528 .Serialize());
1529 // XdsClient should have delivered the response to the watcher.
1530 auto resource2 = watcher2->WaitForNextResource();
1531 ASSERT_NE(resource2, nullptr);
1532 EXPECT_EQ(resource2->name, "foo2");
1533 EXPECT_EQ(resource2->value, 7);
1534 // Check metric data.
1535 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1536 ::testing::ElementsAre(::testing::Pair(
1537 ::testing::Pair(kDefaultXdsServerUrl,
1538 XdsFooResourceType::Get()->type_url()),
1539 2)),
1540 ::testing::ElementsAre(), ::testing::_));
1541 EXPECT_THAT(
1542 GetResourceCounts(),
1543 ::testing::ElementsAre(::testing::Pair(
1544 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1545 XdsFooResourceType::Get()->type_url(), "acked"),
1546 2)));
1547 // Check CSDS data.
1548 csds = DumpCsds();
1549 EXPECT_THAT(csds.generic_xds_configs(),
1550 ::testing::UnorderedElementsAre(
1551 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
1552 "foo1", resource->AsJsonString(), "1",
1553 TimestampProtoEq(kTime0)),
1554 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
1555 "foo2", resource2->AsJsonString(), "1",
1556 TimestampProtoEq(kTime1))));
1557 // XdsClient should have sent an ACK message to the xDS server.
1558 request = WaitForRequest(stream.get());
1559 ASSERT_TRUE(request.has_value());
1560 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1561 /*version_info=*/"1", /*response_nonce=*/"B",
1562 /*error_detail=*/absl::OkStatus(),
1563 /*resource_names=*/{"foo1", "foo2"});
1564 // Cancel watch for "foo1".
1565 CancelFooWatch(watcher.get(), "foo1");
1566 // Check metric data.
1567 EXPECT_THAT(
1568 GetResourceCounts(),
1569 ::testing::ElementsAre(::testing::Pair(
1570 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1571 XdsFooResourceType::Get()->type_url(), "acked"),
1572 1)));
1573 // Check CSDS data.
1574 csds = DumpCsds();
1575 EXPECT_THAT(csds.generic_xds_configs(),
1576 ::testing::ElementsAre(CsdsResourceAcked(
1577 XdsFooResourceType::Get()->type_url(), "foo2",
1578 resource2->AsJsonString(), "1", TimestampProtoEq(kTime1))));
1579 // XdsClient should send an unsubscription request.
1580 request = WaitForRequest(stream.get());
1581 ASSERT_TRUE(request.has_value());
1582 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1583 /*version_info=*/"1", /*response_nonce=*/"B",
1584 /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"});
1585 // Now cancel watch for "foo2".
1586 CancelFooWatch(watcher2.get(), "foo2");
1587 EXPECT_TRUE(stream->IsOrphaned());
1588 }
1589
TEST_F(XdsClientTest,UpdateContainsOnlyChangedResource)1590 TEST_F(XdsClientTest, UpdateContainsOnlyChangedResource) {
1591 InitXdsClient();
1592 // Start a watch for "foo1".
1593 auto watcher = StartFooWatch("foo1");
1594 // Watcher should initially not see any resource reported.
1595 EXPECT_FALSE(watcher->HasEvent());
1596 // XdsClient should have created an ADS stream.
1597 auto stream = WaitForAdsStream();
1598 ASSERT_TRUE(stream != nullptr);
1599 // XdsClient should have sent a subscription request on the ADS stream.
1600 auto request = WaitForRequest(stream.get());
1601 ASSERT_TRUE(request.has_value());
1602 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1603 /*version_info=*/"", /*response_nonce=*/"",
1604 /*error_detail=*/absl::OkStatus(),
1605 /*resource_names=*/{"foo1"});
1606 CheckRequestNode(*request); // Should be present on the first request.
1607 // Send a response.
1608 stream->SendMessageToClient(
1609 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1610 .set_version_info("1")
1611 .set_nonce("A")
1612 .AddFooResource(XdsFooResource("foo1", 6))
1613 .Serialize());
1614 // XdsClient should have delivered the response to the watcher.
1615 auto resource = watcher->WaitForNextResource();
1616 ASSERT_NE(resource, nullptr);
1617 EXPECT_EQ(resource->name, "foo1");
1618 EXPECT_EQ(resource->value, 6);
1619 // XdsClient should have sent an ACK message to the xDS server.
1620 request = WaitForRequest(stream.get());
1621 ASSERT_TRUE(request.has_value());
1622 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1623 /*version_info=*/"1", /*response_nonce=*/"A",
1624 /*error_detail=*/absl::OkStatus(),
1625 /*resource_names=*/{"foo1"});
1626 // Start a watch for "foo2".
1627 auto watcher2 = StartFooWatch("foo2");
1628 // XdsClient should have sent a subscription request on the ADS stream.
1629 request = WaitForRequest(stream.get());
1630 ASSERT_TRUE(request.has_value());
1631 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1632 /*version_info=*/"1", /*response_nonce=*/"A",
1633 /*error_detail=*/absl::OkStatus(),
1634 /*resource_names=*/{"foo1", "foo2"});
1635 // Send a response.
1636 // We increment time to make sure that the CSDS data gets a new timestamp.
1637 time_cache_.TestOnlySetNow(kTime1);
1638 stream->SendMessageToClient(
1639 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1640 .set_version_info("1")
1641 .set_nonce("B")
1642 .AddFooResource(XdsFooResource("foo2", 7))
1643 .Serialize());
1644 // XdsClient should have delivered the response to the watcher.
1645 auto resource2 = watcher2->WaitForNextResource();
1646 ASSERT_NE(resource2, nullptr);
1647 EXPECT_EQ(resource2->name, "foo2");
1648 EXPECT_EQ(resource2->value, 7);
1649 // Check metric data.
1650 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1651 ::testing::ElementsAre(::testing::Pair(
1652 ::testing::Pair(kDefaultXdsServerUrl,
1653 XdsFooResourceType::Get()->type_url()),
1654 2)),
1655 ::testing::ElementsAre(), ::testing::_));
1656 EXPECT_THAT(
1657 GetResourceCounts(),
1658 ::testing::ElementsAre(::testing::Pair(
1659 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1660 XdsFooResourceType::Get()->type_url(), "acked"),
1661 2)));
1662 // Check CSDS data.
1663 ClientConfig csds = DumpCsds();
1664 EXPECT_THAT(csds.generic_xds_configs(),
1665 ::testing::UnorderedElementsAre(
1666 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
1667 "foo1", resource->AsJsonString(), "1",
1668 TimestampProtoEq(kTime0)),
1669 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
1670 "foo2", resource2->AsJsonString(), "1",
1671 TimestampProtoEq(kTime1))));
1672 // XdsClient should have sent an ACK message to the xDS server.
1673 request = WaitForRequest(stream.get());
1674 ASSERT_TRUE(request.has_value());
1675 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1676 /*version_info=*/"1", /*response_nonce=*/"B",
1677 /*error_detail=*/absl::OkStatus(),
1678 /*resource_names=*/{"foo1", "foo2"});
1679 // Server sends an update for "foo1". The response does not contain "foo2".
1680 // We increment time to make sure that the CSDS data gets a new timestamp.
1681 time_cache_.TestOnlySetNow(kTime2);
1682 stream->SendMessageToClient(
1683 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1684 .set_version_info("2")
1685 .set_nonce("C")
1686 .AddFooResource(XdsFooResource("foo1", 9))
1687 .Serialize());
1688 // XdsClient should have delivered the response to the watcher.
1689 resource = watcher->WaitForNextResource();
1690 ASSERT_NE(resource, nullptr);
1691 EXPECT_EQ(resource->name, "foo1");
1692 EXPECT_EQ(resource->value, 9);
1693 // Check metric data.
1694 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1695 ::testing::ElementsAre(::testing::Pair(
1696 ::testing::Pair(kDefaultXdsServerUrl,
1697 XdsFooResourceType::Get()->type_url()),
1698 3)),
1699 ::testing::ElementsAre(), ::testing::_));
1700 EXPECT_THAT(
1701 GetResourceCounts(),
1702 ::testing::ElementsAre(::testing::Pair(
1703 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1704 XdsFooResourceType::Get()->type_url(), "acked"),
1705 2)));
1706 // Check CSDS data.
1707 csds = DumpCsds();
1708 EXPECT_THAT(csds.generic_xds_configs(),
1709 ::testing::UnorderedElementsAre(
1710 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
1711 "foo1", resource->AsJsonString(), "2",
1712 TimestampProtoEq(kTime2)),
1713 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
1714 "foo2", resource2->AsJsonString(), "1",
1715 TimestampProtoEq(kTime1))));
1716 // XdsClient should have sent an ACK message to the xDS server.
1717 request = WaitForRequest(stream.get());
1718 ASSERT_TRUE(request.has_value());
1719 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1720 /*version_info=*/"2", /*response_nonce=*/"C",
1721 /*error_detail=*/absl::OkStatus(),
1722 /*resource_names=*/{"foo1", "foo2"});
1723 // Cancel watch for "foo1".
1724 CancelFooWatch(watcher.get(), "foo1");
1725 // XdsClient should send an unsubscription request.
1726 request = WaitForRequest(stream.get());
1727 ASSERT_TRUE(request.has_value());
1728 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1729 /*version_info=*/"2", /*response_nonce=*/"C",
1730 /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"});
1731 // Now cancel watch for "foo2".
1732 CancelFooWatch(watcher2.get(), "foo2");
1733 EXPECT_TRUE(stream->IsOrphaned());
1734 }
1735
TEST_F(XdsClientTest,ResourceValidationFailure)1736 TEST_F(XdsClientTest, ResourceValidationFailure) {
1737 InitXdsClient();
1738 // Start a watch for "foo1".
1739 auto watcher = StartFooWatch("foo1");
1740 // Check metric data.
1741 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1742 ::testing::ElementsAre());
1743 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1744 ::testing::ElementsAre());
1745 EXPECT_THAT(GetResourceCounts(),
1746 ::testing::ElementsAre(::testing::Pair(
1747 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1748 XdsFooResourceType::Get()->type_url(),
1749 "requested"),
1750 1)));
1751 // CSDS should show that the resource has been requested.
1752 ClientConfig csds = DumpCsds();
1753 EXPECT_THAT(csds.generic_xds_configs(),
1754 ::testing::ElementsAre(CsdsResourceRequested(
1755 XdsFooResourceType::Get()->type_url(), "foo1")));
1756 // Watcher should initially not see any resource reported.
1757 EXPECT_FALSE(watcher->HasEvent());
1758 // XdsClient should have created an ADS stream.
1759 auto stream = WaitForAdsStream();
1760 ASSERT_TRUE(stream != nullptr);
1761 // XdsClient should have sent a subscription request on the ADS stream.
1762 auto request = WaitForRequest(stream.get());
1763 ASSERT_TRUE(request.has_value());
1764 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1765 /*version_info=*/"", /*response_nonce=*/"",
1766 /*error_detail=*/absl::OkStatus(),
1767 /*resource_names=*/{"foo1"});
1768 CheckRequestNode(*request); // Should be present on the first request.
1769 // Send a response containing an invalid resource.
1770 stream->SendMessageToClient(
1771 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1772 .set_version_info("1")
1773 .set_nonce("A")
1774 .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
1775 "{\"name\":\"foo1\",\"value\":[]}")
1776 .Serialize());
1777 // XdsClient should deliver an error to the watcher.
1778 auto error = watcher->WaitForNextError();
1779 ASSERT_TRUE(error.has_value());
1780 EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
1781 EXPECT_EQ(error->message(),
1782 "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1783 "[field:value error:is not a number] (node ID:xds_client_test)")
1784 << *error;
1785 // Check metric data.
1786 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1787 ::testing::ElementsAre(),
1788 ::testing::ElementsAre(::testing::Pair(
1789 ::testing::Pair(kDefaultXdsServerUrl,
1790 XdsFooResourceType::Get()->type_url()),
1791 1)),
1792 ::testing::_));
1793 EXPECT_THAT(GetResourceCounts(),
1794 ::testing::ElementsAre(::testing::Pair(
1795 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1796 XdsFooResourceType::Get()->type_url(),
1797 "nacked"),
1798 1)));
1799 // CSDS should show that the resource has been NACKed.
1800 csds = DumpCsds();
1801 EXPECT_THAT(
1802 csds.generic_xds_configs(),
1803 ::testing::ElementsAre(CsdsResourceEq(
1804 ClientResourceStatus::NACKED, XdsFooResourceType::Get()->type_url(),
1805 "foo1", CsdsNoResourceFields(),
1806 CsdsErrorFields("INVALID_ARGUMENT: errors validating JSON: "
1807 "[field:value error:is not a number]",
1808 "1", TimestampProtoEq(kTime0)))));
1809 // XdsClient should NACK the update.
1810 // Note that version_info is not populated in the request.
1811 request = WaitForRequest(stream.get());
1812 ASSERT_TRUE(request.has_value());
1813 CheckRequest(
1814 *request, XdsFooResourceType::Get()->type_url(),
1815 /*version_info=*/"", /*response_nonce=*/"A",
1816 // error_detail=
1817 absl::InvalidArgumentError(
1818 "xDS response validation errors: ["
1819 "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: "
1820 "[field:value error:is not a number]]"),
1821 /*resource_names=*/{"foo1"});
1822 // Start a second watch for the same resource. It should immediately
1823 // receive the same error.
1824 auto watcher2 = StartFooWatch("foo1");
1825 error = watcher2->WaitForNextError();
1826 ASSERT_TRUE(error.has_value());
1827 EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
1828 EXPECT_EQ(error->message(),
1829 "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
1830 "[field:value error:is not a number] (node ID:xds_client_test)")
1831 << *error;
1832 // Now server sends an updated version of the resource.
1833 // We increment time to make sure that the CSDS data gets a new timestamp.
1834 time_cache_.TestOnlySetNow(kTime1);
1835 stream->SendMessageToClient(
1836 ResponseBuilder(XdsFooResourceType::Get()->type_url())
1837 .set_version_info("2")
1838 .set_nonce("B")
1839 .AddFooResource(XdsFooResource("foo1", 9))
1840 .Serialize());
1841 // XdsClient should deliver the response to both watchers.
1842 auto resource = watcher->WaitForNextResource();
1843 ASSERT_NE(resource, nullptr);
1844 EXPECT_EQ(resource->name, "foo1");
1845 EXPECT_EQ(resource->value, 9);
1846 resource = watcher2->WaitForNextResource();
1847 ASSERT_NE(resource, nullptr);
1848 EXPECT_EQ(resource->name, "foo1");
1849 EXPECT_EQ(resource->value, 9);
1850 // Check metric data.
1851 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1852 ::testing::ElementsAre(::testing::Pair(
1853 ::testing::Pair(kDefaultXdsServerUrl,
1854 XdsFooResourceType::Get()->type_url()),
1855 1)),
1856 ::testing::ElementsAre(::testing::Pair(
1857 ::testing::Pair(kDefaultXdsServerUrl,
1858 XdsFooResourceType::Get()->type_url()),
1859 1)),
1860 ::testing::_));
1861 EXPECT_THAT(
1862 GetResourceCounts(),
1863 ::testing::ElementsAre(::testing::Pair(
1864 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1865 XdsFooResourceType::Get()->type_url(), "acked"),
1866 1)));
1867 // Check CSDS data.
1868 csds = DumpCsds();
1869 EXPECT_THAT(csds.generic_xds_configs(),
1870 ::testing::UnorderedElementsAre(CsdsResourceAcked(
1871 XdsFooResourceType::Get()->type_url(), "foo1",
1872 resource->AsJsonString(), "2", TimestampProtoEq(kTime1))));
1873 // XdsClient should have sent an ACK message to the xDS server.
1874 request = WaitForRequest(stream.get());
1875 ASSERT_TRUE(request.has_value());
1876 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1877 /*version_info=*/"2", /*response_nonce=*/"B",
1878 /*error_detail=*/absl::OkStatus(),
1879 /*resource_names=*/{"foo1"});
1880 // Cancel watch.
1881 CancelFooWatch(watcher.get(), "foo1");
1882 CancelFooWatch(watcher2.get(), "foo1");
1883 EXPECT_TRUE(stream->IsOrphaned());
1884 }
1885
TEST_F(XdsClientTest,ResourceValidationFailureMultipleResources)1886 TEST_F(XdsClientTest, ResourceValidationFailureMultipleResources) {
1887 InitXdsClient();
1888 // Start a watch for "foo1".
1889 auto watcher = StartFooWatch("foo1");
1890 // Watcher should initially not see any resource reported.
1891 EXPECT_FALSE(watcher->HasEvent());
1892 // Check metric data.
1893 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
1894 ::testing::ElementsAre());
1895 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
1896 ::testing::ElementsAre());
1897 EXPECT_THAT(GetResourceCounts(),
1898 ::testing::ElementsAre(::testing::Pair(
1899 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1900 XdsFooResourceType::Get()->type_url(),
1901 "requested"),
1902 1)));
1903 // CSDS should show that the resource has been requested.
1904 ClientConfig csds = DumpCsds();
1905 EXPECT_THAT(csds.generic_xds_configs(),
1906 ::testing::ElementsAre(CsdsResourceRequested(
1907 XdsFooResourceType::Get()->type_url(), "foo1")));
1908 // XdsClient should have created an ADS stream.
1909 auto stream = WaitForAdsStream();
1910 ASSERT_TRUE(stream != nullptr);
1911 // XdsClient should have sent a subscription request on the ADS stream.
1912 auto request = WaitForRequest(stream.get());
1913 ASSERT_TRUE(request.has_value());
1914 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1915 /*version_info=*/"", /*response_nonce=*/"",
1916 /*error_detail=*/absl::OkStatus(),
1917 /*resource_names=*/{"foo1"});
1918 CheckRequestNode(*request); // Should be present on the first request.
1919 // Before the server responds, add a watch for another resource.
1920 auto watcher2 = StartFooWatch("foo2");
1921 // Check metric data.
1922 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1923 ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
1924 EXPECT_THAT(GetResourceCounts(),
1925 ::testing::ElementsAre(::testing::Pair(
1926 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1927 XdsFooResourceType::Get()->type_url(),
1928 "requested"),
1929 2)));
1930 // CSDS should show that both resourcees have been requested.
1931 csds = DumpCsds();
1932 EXPECT_THAT(
1933 csds.generic_xds_configs(),
1934 ::testing::ElementsAre(
1935 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo1"),
1936 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(),
1937 "foo2")));
1938 // Client should send another request.
1939 request = WaitForRequest(stream.get());
1940 ASSERT_TRUE(request.has_value());
1941 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1942 /*version_info=*/"", /*response_nonce=*/"",
1943 /*error_detail=*/absl::OkStatus(),
1944 /*resource_names=*/{"foo1", "foo2"});
1945 // Add a watch for a third resource.
1946 auto watcher3 = StartFooWatch("foo3");
1947 // Check metric data.
1948 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1949 ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
1950 EXPECT_THAT(GetResourceCounts(),
1951 ::testing::ElementsAre(::testing::Pair(
1952 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1953 XdsFooResourceType::Get()->type_url(),
1954 "requested"),
1955 3)));
1956 // Check CSDS.
1957 csds = DumpCsds();
1958 EXPECT_THAT(
1959 csds.generic_xds_configs(),
1960 ::testing::UnorderedElementsAre(
1961 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo1"),
1962 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo2"),
1963 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(),
1964 "foo3")));
1965 // Client should send another request.
1966 request = WaitForRequest(stream.get());
1967 ASSERT_TRUE(request.has_value());
1968 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1969 /*version_info=*/"", /*response_nonce=*/"",
1970 /*error_detail=*/absl::OkStatus(),
1971 /*resource_names=*/{"foo1", "foo2", "foo3"});
1972 // Add a watch for a fourth resource.
1973 auto watcher4 = StartFooWatch("foo4");
1974 // Check metric data.
1975 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
1976 ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
1977 EXPECT_THAT(GetResourceCounts(),
1978 ::testing::ElementsAre(::testing::Pair(
1979 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
1980 XdsFooResourceType::Get()->type_url(),
1981 "requested"),
1982 4)));
1983 // Check CSDS.
1984 csds = DumpCsds();
1985 EXPECT_THAT(
1986 csds.generic_xds_configs(),
1987 ::testing::UnorderedElementsAre(
1988 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo1"),
1989 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo2"),
1990 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo3"),
1991 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(),
1992 "foo4")));
1993 // Client should send another request.
1994 request = WaitForRequest(stream.get());
1995 ASSERT_TRUE(request.has_value());
1996 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
1997 /*version_info=*/"", /*response_nonce=*/"",
1998 /*error_detail=*/absl::OkStatus(),
1999 /*resource_names=*/{"foo1", "foo2", "foo3", "foo4"});
2000 // Server sends a response containing three invalid resources and one
2001 // valid resource.
2002 stream->SendMessageToClient(
2003 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2004 .set_version_info("1")
2005 .set_nonce("A")
2006 // foo1: JSON parsing succeeds, so we know the resource name,
2007 // but validation fails.
2008 .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
2009 "{\"name\":\"foo1\",\"value\":[]}")
2010 // foo2: JSON parsing fails, and not wrapped in a Resource
2011 // wrapper, so we don't actually know the resource's name.
2012 .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
2013 "{\"name\":\"foo2,\"value\":6}")
2014 // Empty resource. Will be included in NACK but will not
2015 // affect any watchers.
2016 .AddEmptyResource()
2017 // Invalid resource wrapper. Will be included in NACK but
2018 // will not affect any watchers.
2019 .AddInvalidResourceWrapper()
2020 // foo3: JSON parsing fails, but it is wrapped in a Resource
2021 // wrapper, so we do know the resource's name.
2022 .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
2023 "{\"name\":\"foo3,\"value\":6}",
2024 /*resource_wrapper_name=*/"foo3")
2025 // foo4: valid resource.
2026 .AddFooResource(XdsFooResource("foo4", 5))
2027 .Serialize());
2028 // XdsClient should deliver an error to the watchers for foo1 and foo3.
2029 auto error = watcher->WaitForNextError();
2030 ASSERT_TRUE(error.has_value());
2031 EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
2032 EXPECT_EQ(error->message(),
2033 "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
2034 "[field:value error:is not a number] (node ID:xds_client_test)")
2035 << *error;
2036 error = watcher3->WaitForNextError();
2037 ASSERT_TRUE(error.has_value());
2038 EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
2039 EXPECT_EQ(error->message(),
2040 "invalid resource: INVALID_ARGUMENT: JSON parsing failed: "
2041 "[JSON parse error at index 15] (node ID:xds_client_test)")
2042 << *error;
2043 // It cannot delivery an error for foo2, because the client doesn't know
2044 // that that resource in the response was actually supposed to be foo2.
2045 EXPECT_FALSE(watcher2->HasEvent());
2046 // It will delivery a valid resource update for foo4.
2047 auto resource = watcher4->WaitForNextResource();
2048 ASSERT_NE(resource, nullptr);
2049 EXPECT_EQ(resource->name, "foo4");
2050 EXPECT_EQ(resource->value, 5);
2051 // Check metric data.
2052 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2053 ::testing::ElementsAre(::testing::Pair(
2054 ::testing::Pair(kDefaultXdsServerUrl,
2055 XdsFooResourceType::Get()->type_url()),
2056 1)),
2057 ::testing::ElementsAre(::testing::Pair(
2058 ::testing::Pair(kDefaultXdsServerUrl,
2059 XdsFooResourceType::Get()->type_url()),
2060 5)),
2061 ::testing::_));
2062 EXPECT_THAT(
2063 GetResourceCounts(),
2064 ::testing::ElementsAre(
2065 // foo4
2066 ::testing::Pair(ResourceCountLabelsEq(
2067 XdsClient::kOldStyleAuthority,
2068 XdsFooResourceType::Get()->type_url(), "acked"),
2069 1),
2070 // foo1 and foo3
2071 ::testing::Pair(ResourceCountLabelsEq(
2072 XdsClient::kOldStyleAuthority,
2073 XdsFooResourceType::Get()->type_url(), "nacked"),
2074 2),
2075 // did not recognize response for foo2
2076 ::testing::Pair(
2077 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2078 XdsFooResourceType::Get()->type_url(),
2079 "requested"),
2080 1)));
2081 // Check CSDS.
2082 csds = DumpCsds();
2083 EXPECT_THAT(
2084 csds.generic_xds_configs(),
2085 ::testing::UnorderedElementsAre(
2086 CsdsResourceEq(
2087 ClientResourceStatus::NACKED,
2088 XdsFooResourceType::Get()->type_url(), "foo1",
2089 CsdsNoResourceFields(),
2090 CsdsErrorFields("INVALID_ARGUMENT: errors validating JSON: "
2091 "[field:value error:is not a number]",
2092 "1", TimestampProtoEq(kTime0))),
2093 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo2"),
2094 CsdsResourceEq(
2095 ClientResourceStatus::NACKED,
2096 XdsFooResourceType::Get()->type_url(), "foo3",
2097 CsdsNoResourceFields(),
2098 CsdsErrorFields("INVALID_ARGUMENT: JSON parsing failed: "
2099 "[JSON parse error at index 15]",
2100 "1", TimestampProtoEq(kTime0))),
2101 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(), "foo4",
2102 resource->AsJsonString(), "1",
2103 TimestampProtoEq(kTime0))));
2104 // XdsClient should NACK the update.
2105 // There was one good resource, so the version will be updated.
2106 request = WaitForRequest(stream.get());
2107 ASSERT_TRUE(request.has_value());
2108 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2109 /*version_info=*/"1", /*response_nonce=*/"A",
2110 // error_detail=
2111 absl::InvalidArgumentError(absl::StrCat(
2112 "xDS response validation errors: ["
2113 // foo1
2114 "resource index 0: foo1: "
2115 "INVALID_ARGUMENT: errors validating JSON: "
2116 "[field:value error:is not a number]; "
2117 // foo2 (name not known)
2118 "resource index 1: INVALID_ARGUMENT: JSON parsing failed: "
2119 "[JSON parse error at index 15]; "
2120 // empty resource
2121 "resource index 2: incorrect resource type \"\" "
2122 "(should be \"",
2123 XdsFooResourceType::Get()->type_url(),
2124 "\"); "
2125 // invalid resource wrapper
2126 "resource index 3: Can't decode Resource proto wrapper; "
2127 // foo3
2128 "resource index 4: foo3: "
2129 "INVALID_ARGUMENT: JSON parsing failed: "
2130 "[JSON parse error at index 15]]")),
2131 /*resource_names=*/{"foo1", "foo2", "foo3", "foo4"});
2132 // Cancel watches.
2133 CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
2134 CancelFooWatch(watcher2.get(), "foo2", /*delay_unsubscription=*/true);
2135 CancelFooWatch(watcher3.get(), "foo3", /*delay_unsubscription=*/true);
2136 CancelFooWatch(watcher4.get(), "foo4");
2137 EXPECT_TRUE(stream->IsOrphaned());
2138 }
2139
TEST_F(XdsClientTest,ResourceValidationFailureForCachedResource)2140 TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) {
2141 InitXdsClient();
2142 // Start a watch for "foo1".
2143 auto watcher = StartFooWatch("foo1");
2144 // Watcher should initially not see any resource reported.
2145 EXPECT_FALSE(watcher->HasEvent());
2146 // XdsClient should have created an ADS stream.
2147 auto stream = WaitForAdsStream();
2148 ASSERT_TRUE(stream != nullptr);
2149 // XdsClient should have sent a subscription request on the ADS stream.
2150 auto request = WaitForRequest(stream.get());
2151 ASSERT_TRUE(request.has_value());
2152 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2153 /*version_info=*/"", /*response_nonce=*/"",
2154 /*error_detail=*/absl::OkStatus(),
2155 /*resource_names=*/{"foo1"});
2156 CheckRequestNode(*request); // Should be present on the first request.
2157 // Send a response.
2158 stream->SendMessageToClient(
2159 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2160 .set_version_info("1")
2161 .set_nonce("A")
2162 .AddFooResource(XdsFooResource("foo1", 6))
2163 .Serialize());
2164 // XdsClient should have delivered the response to the watcher.
2165 auto resource = watcher->WaitForNextResource();
2166 ASSERT_NE(resource, nullptr);
2167 EXPECT_EQ(resource->name, "foo1");
2168 EXPECT_EQ(resource->value, 6);
2169 // Check metric data.
2170 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2171 ::testing::ElementsAre(::testing::Pair(
2172 ::testing::Pair(kDefaultXdsServerUrl,
2173 XdsFooResourceType::Get()->type_url()),
2174 1)),
2175 ::testing::ElementsAre(), ::testing::_));
2176 EXPECT_THAT(
2177 GetResourceCounts(),
2178 ::testing::ElementsAre(::testing::Pair(
2179 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2180 XdsFooResourceType::Get()->type_url(), "acked"),
2181 1)));
2182 // Check CSDS data.
2183 ClientConfig csds = DumpCsds();
2184 EXPECT_THAT(csds.generic_xds_configs(),
2185 ::testing::UnorderedElementsAre(CsdsResourceAcked(
2186 XdsFooResourceType::Get()->type_url(), "foo1",
2187 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
2188 // XdsClient should have sent an ACK message to the xDS server.
2189 request = WaitForRequest(stream.get());
2190 ASSERT_TRUE(request.has_value());
2191 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2192 /*version_info=*/"1", /*response_nonce=*/"A",
2193 /*error_detail=*/absl::OkStatus(),
2194 /*resource_names=*/{"foo1"});
2195 // Send an update containing an invalid resource.
2196 // We increment time to make sure that the CSDS data gets a new timestamp.
2197 time_cache_.TestOnlySetNow(kTime1);
2198 stream->SendMessageToClient(
2199 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2200 .set_version_info("2")
2201 .set_nonce("B")
2202 .AddInvalidResource(XdsFooResourceType::Get()->type_url(),
2203 "{\"name\":\"foo1\",\"value\":[]}")
2204 .Serialize());
2205 // XdsClient should deliver an error to the watcher.
2206 auto error = watcher->WaitForNextAmbientError();
2207 ASSERT_TRUE(error.has_value());
2208 EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
2209 EXPECT_EQ(error->message(),
2210 "invalid resource: INVALID_ARGUMENT: errors validating JSON: "
2211 "[field:value error:is not a number] (node ID:xds_client_test)")
2212 << *error;
2213 // Check metric data.
2214 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2215 ::testing::ElementsAre(::testing::Pair(
2216 ::testing::Pair(kDefaultXdsServerUrl,
2217 XdsFooResourceType::Get()->type_url()),
2218 1)),
2219 ::testing::ElementsAre(::testing::Pair(
2220 ::testing::Pair(kDefaultXdsServerUrl,
2221 XdsFooResourceType::Get()->type_url()),
2222 1)),
2223 ::testing::_));
2224 EXPECT_THAT(GetResourceCounts(),
2225 ::testing::ElementsAre(::testing::Pair(
2226 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
2227 XdsFooResourceType::Get()->type_url(),
2228 "nacked_but_cached"),
2229 1)));
2230 // Check CSDS data.
2231 csds = DumpCsds();
2232 EXPECT_THAT(csds.generic_xds_configs(),
2233 ::testing::UnorderedElementsAre(CsdsResourceEq(
2234 ClientResourceStatus::NACKED,
2235 XdsFooResourceType::Get()->type_url(), "foo1",
2236 CsdsResourceFields(resource->AsJsonString(), "1",
2237 TimestampProtoEq(kTime0)),
2238 CsdsErrorFields("INVALID_ARGUMENT: errors validating JSON: "
2239 "[field:value error:is not a number]",
2240 "2", TimestampProtoEq(kTime1)))));
2241 // XdsClient should NACK the update.
2242 // Note that version_info is set to the previous version in this request,
2243 // because there were no valid resources in it.
2244 request = WaitForRequest(stream.get());
2245 ASSERT_TRUE(request.has_value());
2246 CheckRequest(
2247 *request, XdsFooResourceType::Get()->type_url(),
2248 /*version_info=*/"1", /*response_nonce=*/"B",
2249 // error_detail=
2250 absl::InvalidArgumentError(
2251 "xDS response validation errors: ["
2252 "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: "
2253 "[field:value error:is not a number]]"),
2254 /*resource_names=*/{"foo1"});
2255 // Start a second watcher for the same resource. Even though the last
2256 // update was a NACK, we should still deliver the cached resource to
2257 // the watcher.
2258 // TODO(roth): Consider what the right behavior is here. It seems
2259 // inconsistent that the watcher sees the error if it had started
2260 // before the error was seen but does not if it was started afterwards.
2261 // One option is to not send errors at all for already-cached resources;
2262 // another option is to send the errors even for newly started watchers.
2263 auto watcher2 = StartFooWatch("foo1");
2264 resource = watcher2->WaitForNextResource();
2265 ASSERT_NE(resource, nullptr);
2266 EXPECT_EQ(resource->name, "foo1");
2267 EXPECT_EQ(resource->value, 6);
2268 // Cancel watches.
2269 CancelFooWatch(watcher.get(), "foo1");
2270 CancelFooWatch(watcher2.get(), "foo1");
2271 EXPECT_TRUE(stream->IsOrphaned());
2272 }
2273
TEST_F(XdsClientTest,WildcardCapableResponseWithEmptyResource)2274 TEST_F(XdsClientTest, WildcardCapableResponseWithEmptyResource) {
2275 InitXdsClient();
2276 // Start a watch for "wc1".
2277 auto watcher = StartWildcardCapableWatch("wc1");
2278 // Watcher should initially not see any resource reported.
2279 EXPECT_FALSE(watcher->HasEvent());
2280 // XdsClient should have created an ADS stream.
2281 auto stream = WaitForAdsStream();
2282 ASSERT_TRUE(stream != nullptr);
2283 // XdsClient should have sent a subscription request on the ADS stream.
2284 auto request = WaitForRequest(stream.get());
2285 ASSERT_TRUE(request.has_value());
2286 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2287 /*version_info=*/"", /*response_nonce=*/"",
2288 /*error_detail=*/absl::OkStatus(),
2289 /*resource_names=*/{"wc1"});
2290 CheckRequestNode(*request); // Should be present on the first request.
2291 // Server sends a response containing the requested resources plus an
2292 // empty resource.
2293 stream->SendMessageToClient(
2294 ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2295 .set_version_info("1")
2296 .set_nonce("A")
2297 .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6))
2298 .AddEmptyResource()
2299 .Serialize());
2300 // XdsClient will delivery a valid resource update for wc1.
2301 auto resource = watcher->WaitForNextResource();
2302 ASSERT_NE(resource, nullptr);
2303 EXPECT_EQ(resource->name, "wc1");
2304 EXPECT_EQ(resource->value, 6);
2305 // Check metric data.
2306 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2307 ::testing::ElementsAre(::testing::Pair(
2308 ::testing::Pair(kDefaultXdsServerUrl,
2309 XdsWildcardCapableResourceType::Get()->type_url()),
2310 1)),
2311 ::testing::ElementsAre(::testing::Pair(
2312 ::testing::Pair(kDefaultXdsServerUrl,
2313 XdsWildcardCapableResourceType::Get()->type_url()),
2314 1)),
2315 ::testing::_));
2316 EXPECT_THAT(
2317 GetResourceCounts(),
2318 ::testing::ElementsAre(::testing::Pair(
2319 ResourceCountLabelsEq(
2320 XdsClient::kOldStyleAuthority,
2321 XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2322 1)));
2323 // Check CSDS data.
2324 ClientConfig csds = DumpCsds();
2325 EXPECT_THAT(csds.generic_xds_configs(),
2326 ::testing::UnorderedElementsAre(CsdsResourceAcked(
2327 XdsWildcardCapableResourceType::Get()->type_url(), "wc1",
2328 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
2329 // XdsClient should NACK the update.
2330 // There was one good resource, so the version will be updated.
2331 request = WaitForRequest(stream.get());
2332 ASSERT_TRUE(request.has_value());
2333 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2334 /*version_info=*/"1", /*response_nonce=*/"A",
2335 // error_detail=
2336 absl::InvalidArgumentError(absl::StrCat(
2337 "xDS response validation errors: ["
2338 "resource index 1: incorrect resource type \"\" "
2339 "(should be \"",
2340 XdsWildcardCapableResourceType::Get()->type_url(), "\")]")),
2341 /*resource_names=*/{"wc1"});
2342 // Cancel watch.
2343 CancelWildcardCapableWatch(watcher.get(), "wc1");
2344 EXPECT_TRUE(stream->IsOrphaned());
2345 }
2346
2347 // This tests resource removal triggered by the server when using a
2348 // resource type that requires all resources to be present in every
2349 // response, similar to LDS and CDS.
TEST_F(XdsClientTest,ResourceDeletion)2350 TEST_F(XdsClientTest, ResourceDeletion) {
2351 InitXdsClient();
2352 // Start a watch for "wc1".
2353 auto watcher = StartWildcardCapableWatch("wc1");
2354 // Watcher should initially not see any resource reported.
2355 EXPECT_FALSE(watcher->HasEvent());
2356 // XdsClient should have created an ADS stream.
2357 auto stream = WaitForAdsStream();
2358 ASSERT_TRUE(stream != nullptr);
2359 // XdsClient should have sent a subscription request on the ADS stream.
2360 auto request = WaitForRequest(stream.get());
2361 ASSERT_TRUE(request.has_value());
2362 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2363 /*version_info=*/"", /*response_nonce=*/"",
2364 /*error_detail=*/absl::OkStatus(),
2365 /*resource_names=*/{"wc1"});
2366 CheckRequestNode(*request); // Should be present on the first request.
2367 // Server sends a response.
2368 stream->SendMessageToClient(
2369 ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2370 .set_version_info("1")
2371 .set_nonce("A")
2372 .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6))
2373 .Serialize());
2374 // XdsClient should have delivered the response to the watcher.
2375 auto resource = watcher->WaitForNextResource();
2376 ASSERT_NE(resource, nullptr);
2377 EXPECT_EQ(resource->name, "wc1");
2378 EXPECT_EQ(resource->value, 6);
2379 // Check metric data.
2380 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2381 ::testing::ElementsAre(::testing::Pair(
2382 ::testing::Pair(kDefaultXdsServerUrl,
2383 XdsWildcardCapableResourceType::Get()->type_url()),
2384 1)),
2385 ::testing::ElementsAre(), ::testing::_));
2386 EXPECT_THAT(
2387 GetResourceCounts(),
2388 ::testing::ElementsAre(::testing::Pair(
2389 ResourceCountLabelsEq(
2390 XdsClient::kOldStyleAuthority,
2391 XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2392 1)));
2393 // Check CSDS data.
2394 ClientConfig csds = DumpCsds();
2395 EXPECT_THAT(csds.generic_xds_configs(),
2396 ::testing::UnorderedElementsAre(CsdsResourceAcked(
2397 XdsWildcardCapableResourceType::Get()->type_url(), "wc1",
2398 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
2399 // XdsClient should have sent an ACK message to the xDS server.
2400 request = WaitForRequest(stream.get());
2401 ASSERT_TRUE(request.has_value());
2402 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2403 /*version_info=*/"1", /*response_nonce=*/"A",
2404 /*error_detail=*/absl::OkStatus(),
2405 /*resource_names=*/{"wc1"});
2406 // Server now sends a response without the resource, thus indicating
2407 // it's been deleted.
2408 stream->SendMessageToClient(
2409 ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2410 .set_version_info("2")
2411 .set_nonce("B")
2412 .Serialize());
2413 // Watcher should see the does-not-exist event.
2414 EXPECT_TRUE(watcher->WaitForDoesNotExist());
2415 // Check metric data.
2416 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2417 ::testing::ElementsAre(::testing::Pair(
2418 ::testing::Pair(kDefaultXdsServerUrl,
2419 XdsWildcardCapableResourceType::Get()->type_url()),
2420 1)),
2421 ::testing::ElementsAre(), ::testing::_));
2422 EXPECT_THAT(GetResourceCounts(),
2423 ::testing::ElementsAre(::testing::Pair(
2424 ResourceCountLabelsEq(
2425 XdsClient::kOldStyleAuthority,
2426 XdsWildcardCapableResourceType::Get()->type_url(),
2427 "does_not_exist"),
2428 1)));
2429 // Check CSDS data.
2430 csds = DumpCsds();
2431 EXPECT_THAT(csds.generic_xds_configs(),
2432 ::testing::UnorderedElementsAre(CsdsResourceDoesNotExist(
2433 XdsWildcardCapableResourceType::Get()->type_url(), "wc1")));
2434 // Start a new watcher for the same resource. It should immediately
2435 // receive the same does-not-exist notification.
2436 auto watcher2 = StartWildcardCapableWatch("wc1");
2437 EXPECT_TRUE(watcher2->WaitForDoesNotExist());
2438 // XdsClient should have sent an ACK message to the xDS server.
2439 request = WaitForRequest(stream.get());
2440 ASSERT_TRUE(request.has_value());
2441 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2442 /*version_info=*/"2", /*response_nonce=*/"B",
2443 /*error_detail=*/absl::OkStatus(),
2444 /*resource_names=*/{"wc1"});
2445 // Server sends the resource again.
2446 // We increment time to make sure that the CSDS data gets a new timestamp.
2447 time_cache_.TestOnlySetNow(kTime1);
2448 stream->SendMessageToClient(
2449 ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2450 .set_version_info("3")
2451 .set_nonce("C")
2452 .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 7))
2453 .Serialize());
2454 // XdsClient should have delivered the response to the watchers.
2455 resource = watcher->WaitForNextResource();
2456 ASSERT_NE(resource, nullptr);
2457 EXPECT_EQ(resource->name, "wc1");
2458 EXPECT_EQ(resource->value, 7);
2459 resource = watcher2->WaitForNextResource();
2460 ASSERT_NE(resource, nullptr);
2461 EXPECT_EQ(resource->name, "wc1");
2462 EXPECT_EQ(resource->value, 7);
2463 // Check metric data.
2464 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2465 ::testing::ElementsAre(::testing::Pair(
2466 ::testing::Pair(kDefaultXdsServerUrl,
2467 XdsWildcardCapableResourceType::Get()->type_url()),
2468 2)),
2469 ::testing::ElementsAre(), ::testing::_));
2470 EXPECT_THAT(
2471 GetResourceCounts(),
2472 ::testing::ElementsAre(::testing::Pair(
2473 ResourceCountLabelsEq(
2474 XdsClient::kOldStyleAuthority,
2475 XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2476 1)));
2477 // Check CSDS data.
2478 csds = DumpCsds();
2479 EXPECT_THAT(csds.generic_xds_configs(),
2480 ::testing::UnorderedElementsAre(CsdsResourceAcked(
2481 XdsWildcardCapableResourceType::Get()->type_url(), "wc1",
2482 resource->AsJsonString(), "3", TimestampProtoEq(kTime1))));
2483 // XdsClient should have sent an ACK message to the xDS server.
2484 request = WaitForRequest(stream.get());
2485 ASSERT_TRUE(request.has_value());
2486 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2487 /*version_info=*/"3", /*response_nonce=*/"C",
2488 /*error_detail=*/absl::OkStatus(),
2489 /*resource_names=*/{"wc1"});
2490 // Cancel watch.
2491 CancelWildcardCapableWatch(watcher.get(), "wc1");
2492 CancelWildcardCapableWatch(watcher2.get(), "wc1");
2493 EXPECT_TRUE(stream->IsOrphaned());
2494 }
2495
2496 // This tests that when we ignore resource deletions from the server
2497 // when configured to do so.
TEST_F(XdsClientTest,ResourceDeletionIgnoredWhenConfigured)2498 TEST_F(XdsClientTest, ResourceDeletionIgnoredWhenConfigured) {
2499 InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
2500 {FakeXdsBootstrap::FakeXdsServer(kDefaultXdsServerUrl, true)}));
2501 // Start a watch for "wc1".
2502 auto watcher = StartWildcardCapableWatch("wc1");
2503 // Watcher should initially not see any resource reported.
2504 EXPECT_FALSE(watcher->HasEvent());
2505 // XdsClient should have created an ADS stream.
2506 auto stream = WaitForAdsStream();
2507 ASSERT_TRUE(stream != nullptr);
2508 // XdsClient should have sent a subscription request on the ADS stream.
2509 auto request = WaitForRequest(stream.get());
2510 ASSERT_TRUE(request.has_value());
2511 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2512 /*version_info=*/"", /*response_nonce=*/"",
2513 /*error_detail=*/absl::OkStatus(),
2514 /*resource_names=*/{"wc1"});
2515 CheckRequestNode(*request); // Should be present on the first request.
2516 // Server sends a response.
2517 stream->SendMessageToClient(
2518 ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2519 .set_version_info("1")
2520 .set_nonce("A")
2521 .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 6))
2522 .Serialize());
2523 // XdsClient should have delivered the response to the watcher.
2524 auto resource = watcher->WaitForNextResource();
2525 ASSERT_NE(resource, nullptr);
2526 EXPECT_EQ(resource->name, "wc1");
2527 EXPECT_EQ(resource->value, 6);
2528 // Check metric data.
2529 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2530 ::testing::ElementsAre(::testing::Pair(
2531 ::testing::Pair(kDefaultXdsServerUrl,
2532 XdsWildcardCapableResourceType::Get()->type_url()),
2533 1)),
2534 ::testing::ElementsAre(), ::testing::_));
2535 EXPECT_THAT(
2536 GetResourceCounts(),
2537 ::testing::ElementsAre(::testing::Pair(
2538 ResourceCountLabelsEq(
2539 XdsClient::kOldStyleAuthority,
2540 XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2541 1)));
2542 // Check CSDS data.
2543 ClientConfig csds = DumpCsds();
2544 EXPECT_THAT(csds.generic_xds_configs(),
2545 ::testing::UnorderedElementsAre(CsdsResourceAcked(
2546 XdsWildcardCapableResourceType::Get()->type_url(), "wc1",
2547 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
2548 // XdsClient should have sent an ACK message to the xDS server.
2549 request = WaitForRequest(stream.get());
2550 ASSERT_TRUE(request.has_value());
2551 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2552 /*version_info=*/"1", /*response_nonce=*/"A",
2553 /*error_detail=*/absl::OkStatus(),
2554 /*resource_names=*/{"wc1"});
2555 // Server now sends a response without the resource, thus indicating
2556 // it's been deleted.
2557 // We increment time to make sure that the CSDS data does NOT get a
2558 // new timestamp.
2559 time_cache_.TestOnlySetNow(kTime1);
2560 stream->SendMessageToClient(
2561 ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2562 .set_version_info("2")
2563 .set_nonce("B")
2564 .Serialize());
2565 // Watcher should not see any update, since we should have ignored the
2566 // deletion.
2567 EXPECT_TRUE(watcher->ExpectNoEvent());
2568 // Check metric data.
2569 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2570 ::testing::ElementsAre(::testing::Pair(
2571 ::testing::Pair(kDefaultXdsServerUrl,
2572 XdsWildcardCapableResourceType::Get()->type_url()),
2573 1)),
2574 ::testing::ElementsAre(), ::testing::_));
2575 EXPECT_THAT(
2576 GetResourceCounts(),
2577 ::testing::ElementsAre(::testing::Pair(
2578 ResourceCountLabelsEq(
2579 XdsClient::kOldStyleAuthority,
2580 XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2581 1)));
2582 // Check CSDS data.
2583 csds = DumpCsds();
2584 EXPECT_THAT(csds.generic_xds_configs(),
2585 ::testing::UnorderedElementsAre(CsdsResourceAcked(
2586 XdsWildcardCapableResourceType::Get()->type_url(), "wc1",
2587 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
2588 // Start a new watcher for the same resource. It should immediately
2589 // receive the cached resource.
2590 auto watcher2 = StartWildcardCapableWatch("wc1");
2591 resource = watcher2->WaitForNextResource();
2592 ASSERT_NE(resource, nullptr);
2593 EXPECT_EQ(resource->name, "wc1");
2594 EXPECT_EQ(resource->value, 6);
2595 // XdsClient should have sent an ACK message to the xDS server.
2596 request = WaitForRequest(stream.get());
2597 ASSERT_TRUE(request.has_value());
2598 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2599 /*version_info=*/"2", /*response_nonce=*/"B",
2600 /*error_detail=*/absl::OkStatus(),
2601 /*resource_names=*/{"wc1"});
2602 // Server sends a new value for the resource.
2603 // We increment time to make sure that the CSDS data gets a new timestamp.
2604 time_cache_.TestOnlySetNow(kTime2);
2605 stream->SendMessageToClient(
2606 ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url())
2607 .set_version_info("3")
2608 .set_nonce("C")
2609 .AddWildcardCapableResource(XdsWildcardCapableResource("wc1", 7))
2610 .Serialize());
2611 // XdsClient should have delivered the response to the watchers.
2612 resource = watcher->WaitForNextResource();
2613 ASSERT_NE(resource, nullptr);
2614 EXPECT_EQ(resource->name, "wc1");
2615 EXPECT_EQ(resource->value, 7);
2616 resource = watcher2->WaitForNextResource();
2617 ASSERT_NE(resource, nullptr);
2618 EXPECT_EQ(resource->name, "wc1");
2619 EXPECT_EQ(resource->value, 7);
2620 // Check metric data.
2621 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2622 ::testing::ElementsAre(::testing::Pair(
2623 ::testing::Pair(kDefaultXdsServerUrl,
2624 XdsWildcardCapableResourceType::Get()->type_url()),
2625 2)),
2626 ::testing::ElementsAre(), ::testing::_));
2627 EXPECT_THAT(
2628 GetResourceCounts(),
2629 ::testing::ElementsAre(::testing::Pair(
2630 ResourceCountLabelsEq(
2631 XdsClient::kOldStyleAuthority,
2632 XdsWildcardCapableResourceType::Get()->type_url(), "acked"),
2633 1)));
2634 // Check CSDS data.
2635 csds = DumpCsds();
2636 EXPECT_THAT(csds.generic_xds_configs(),
2637 ::testing::UnorderedElementsAre(CsdsResourceAcked(
2638 XdsWildcardCapableResourceType::Get()->type_url(), "wc1",
2639 resource->AsJsonString(), "3", TimestampProtoEq(kTime2))));
2640 // XdsClient should have sent an ACK message to the xDS server.
2641 request = WaitForRequest(stream.get());
2642 ASSERT_TRUE(request.has_value());
2643 CheckRequest(*request, XdsWildcardCapableResourceType::Get()->type_url(),
2644 /*version_info=*/"3", /*response_nonce=*/"C",
2645 /*error_detail=*/absl::OkStatus(),
2646 /*resource_names=*/{"wc1"});
2647 // Cancel watch.
2648 CancelWildcardCapableWatch(watcher.get(), "wc1");
2649 CancelWildcardCapableWatch(watcher2.get(), "wc1");
2650 EXPECT_TRUE(stream->IsOrphaned());
2651 }
2652
TEST_F(XdsClientTest,StreamClosedByServer)2653 TEST_F(XdsClientTest, StreamClosedByServer) {
2654 InitXdsClient();
2655 // Metrics should initially be empty.
2656 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
2657 // Start a watch for "foo1".
2658 auto watcher = StartFooWatch("foo1");
2659 // Check metric data.
2660 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2661 kDefaultXdsServerUrl, true)));
2662 // Watcher should initially not see any resource reported.
2663 EXPECT_FALSE(watcher->HasEvent());
2664 // XdsClient should have created an ADS stream.
2665 auto stream = WaitForAdsStream();
2666 ASSERT_TRUE(stream != nullptr);
2667 // XdsClient should have sent a subscription request on the ADS stream.
2668 auto request = WaitForRequest(stream.get());
2669 ASSERT_TRUE(request.has_value());
2670 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2671 /*version_info=*/"", /*response_nonce=*/"",
2672 /*error_detail=*/absl::OkStatus(),
2673 /*resource_names=*/{"foo1"});
2674 CheckRequestNode(*request); // Should be present on the first request.
2675 // Server sends a response.
2676 stream->SendMessageToClient(
2677 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2678 .set_version_info("1")
2679 .set_nonce("A")
2680 .AddFooResource(XdsFooResource("foo1", 6))
2681 .Serialize());
2682 // XdsClient should have delivered the response to the watcher.
2683 auto resource = watcher->WaitForNextResource();
2684 ASSERT_NE(resource, nullptr);
2685 EXPECT_EQ(resource->name, "foo1");
2686 EXPECT_EQ(resource->value, 6);
2687 // XdsClient should have sent an ACK message to the xDS server.
2688 request = WaitForRequest(stream.get());
2689 ASSERT_TRUE(request.has_value());
2690 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2691 /*version_info=*/"1", /*response_nonce=*/"A",
2692 /*error_detail=*/absl::OkStatus(),
2693 /*resource_names=*/{"foo1"});
2694 // Now server closes the stream.
2695 stream->MaybeSendStatusToClient(absl::OkStatus());
2696 // XdsClient should NOT report error to watcher, because we saw a
2697 // response on the stream before it failed.
2698 // Stream should be orphaned.
2699 EXPECT_TRUE(stream->IsOrphaned());
2700 // Check metric data.
2701 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2702 kDefaultXdsServerUrl, true)));
2703 // XdsClient should create a new stream.
2704 stream = WaitForAdsStream();
2705 ASSERT_TRUE(stream != nullptr);
2706 // XdsClient sends a subscription request.
2707 // Note that the version persists from the previous stream, but the
2708 // nonce does not.
2709 request = WaitForRequest(stream.get());
2710 ASSERT_TRUE(request.has_value());
2711 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2712 /*version_info=*/"1", /*response_nonce=*/"",
2713 /*error_detail=*/absl::OkStatus(),
2714 /*resource_names=*/{"foo1"});
2715 CheckRequestNode(*request); // Should be present on the first request.
2716 // Before the server resends the resource, start a new watcher for the
2717 // same resource. This watcher should immediately receive the cached
2718 // resource.
2719 auto watcher2 = StartFooWatch("foo1");
2720 resource = watcher2->WaitForNextResource();
2721 ASSERT_NE(resource, nullptr);
2722 EXPECT_EQ(resource->name, "foo1");
2723 EXPECT_EQ(resource->value, 6);
2724 // Server now sends the requested resource.
2725 stream->SendMessageToClient(
2726 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2727 .set_version_info("1")
2728 .set_nonce("B")
2729 .AddFooResource(XdsFooResource("foo1", 6))
2730 .Serialize());
2731 // Watcher does NOT get an update, since the resource has not changed.
2732 EXPECT_FALSE(watcher->WaitForNextResource());
2733 // XdsClient sends an ACK.
2734 request = WaitForRequest(stream.get());
2735 ASSERT_TRUE(request.has_value());
2736 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2737 /*version_info=*/"1", /*response_nonce=*/"B",
2738 /*error_detail=*/absl::OkStatus(),
2739 /*resource_names=*/{"foo1"});
2740 // Cancel watcher.
2741 CancelFooWatch(watcher.get(), "foo1");
2742 CancelFooWatch(watcher2.get(), "foo1");
2743 EXPECT_TRUE(stream->IsOrphaned());
2744 }
2745
TEST_F(XdsClientTest,StreamClosedByServerWithoutSeeingResponse)2746 TEST_F(XdsClientTest, StreamClosedByServerWithoutSeeingResponse) {
2747 InitXdsClient();
2748 // Metrics should initially be empty.
2749 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
2750 // Start a watch for "foo1".
2751 auto watcher = StartFooWatch("foo1");
2752 // Check metric data.
2753 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2754 kDefaultXdsServerUrl, true)));
2755 EXPECT_THAT(metrics_reporter_->server_failures(), ::testing::ElementsAre());
2756 // Watcher should initially not see any resource reported.
2757 EXPECT_FALSE(watcher->HasEvent());
2758 // XdsClient should have created an ADS stream.
2759 auto stream = WaitForAdsStream();
2760 ASSERT_TRUE(stream != nullptr);
2761 // XdsClient should have sent a subscription request on the ADS stream.
2762 auto request = WaitForRequest(stream.get());
2763 ASSERT_TRUE(request.has_value());
2764 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2765 /*version_info=*/"", /*response_nonce=*/"",
2766 /*error_detail=*/absl::OkStatus(),
2767 /*resource_names=*/{"foo1"});
2768 CheckRequestNode(*request); // Should be present on the first request.
2769 // Server closes the stream without sending a response.
2770 stream->MaybeSendStatusToClient(absl::UnavailableError("ugh"));
2771 // Check metric data.
2772 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2773 kDefaultXdsServerUrl, false)));
2774 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2775 ::testing::_, ::testing::_,
2776 ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
2777 // XdsClient should report an error to the watcher.
2778 auto error = watcher->WaitForNextError();
2779 ASSERT_TRUE(error.has_value());
2780 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2781 EXPECT_EQ(error->message(),
2782 "xDS channel for server default_xds_server: xDS call failed "
2783 "with no responses received; status: UNAVAILABLE: ugh "
2784 "(node ID:xds_client_test)")
2785 << *error;
2786 // XdsClient should create a new stream.
2787 stream = WaitForAdsStream();
2788 ASSERT_TRUE(stream != nullptr);
2789 // XdsClient sends a subscription request.
2790 request = WaitForRequest(stream.get());
2791 ASSERT_TRUE(request.has_value());
2792 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2793 /*version_info=*/"", /*response_nonce=*/"",
2794 /*error_detail=*/absl::OkStatus(),
2795 /*resource_names=*/{"foo1"});
2796 CheckRequestNode(*request); // Should be present on the first request.
2797 // Connection still reported as unhappy until we get a response.
2798 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2799 kDefaultXdsServerUrl, false)));
2800 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2801 ::testing::_, ::testing::_,
2802 ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
2803 // Server now sends the requested resource.
2804 stream->SendMessageToClient(
2805 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2806 .set_version_info("1")
2807 .set_nonce("A")
2808 .AddFooResource(XdsFooResource("foo1", 6))
2809 .Serialize());
2810 // Watcher gets the resource.
2811 auto resource = watcher->WaitForNextResource();
2812 ASSERT_NE(resource, nullptr);
2813 EXPECT_EQ(resource->name, "foo1");
2814 EXPECT_EQ(resource->value, 6);
2815 // Connection now reported as happy.
2816 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2817 kDefaultXdsServerUrl, true)));
2818 // XdsClient sends an ACK.
2819 request = WaitForRequest(stream.get());
2820 ASSERT_TRUE(request.has_value());
2821 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2822 /*version_info=*/"1", /*response_nonce=*/"A",
2823 /*error_detail=*/absl::OkStatus(),
2824 /*resource_names=*/{"foo1"});
2825 // Cancel watcher.
2826 CancelFooWatch(watcher.get(), "foo1");
2827 EXPECT_TRUE(stream->IsOrphaned());
2828 }
2829
TEST_F(XdsClientTest,ConnectionFails)2830 TEST_F(XdsClientTest, ConnectionFails) {
2831 InitXdsClient();
2832 // Tell transport to let us manually trigger completion of the
2833 // send_message ops to XdsClient.
2834 transport_factory_->SetAutoCompleteMessagesFromClient(false);
2835 // Metrics should initially be empty.
2836 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
2837 EXPECT_THAT(metrics_reporter_->server_failures(), ::testing::ElementsAre());
2838 // Start a watch for "foo1".
2839 auto watcher = StartFooWatch("foo1");
2840 // Check metric data.
2841 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2842 kDefaultXdsServerUrl, true)));
2843 // Watcher should initially not see any resource reported.
2844 EXPECT_FALSE(watcher->HasEvent());
2845 // XdsClient should have created an ADS stream.
2846 auto stream = WaitForAdsStream();
2847 ASSERT_TRUE(stream != nullptr);
2848 // XdsClient should have sent a subscription request on the ADS stream.
2849 auto request = WaitForRequest(stream.get());
2850 ASSERT_TRUE(request.has_value());
2851 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2852 /*version_info=*/"", /*response_nonce=*/"",
2853 /*error_detail=*/absl::OkStatus(),
2854 /*resource_names=*/{"foo1"});
2855 CheckRequestNode(*request); // Should be present on the first request.
2856 // Transport reports connection failure.
2857 TriggerConnectionFailure(*xds_client_->bootstrap().servers().front(),
2858 absl::UnavailableError("connection failed"));
2859 // XdsClient should report an error to the watcher.
2860 auto error = watcher->WaitForNextError();
2861 ASSERT_TRUE(error.has_value());
2862 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2863 EXPECT_EQ(error->message(),
2864 "xDS channel for server default_xds_server: "
2865 "connection failed (node ID:xds_client_test)")
2866 << *error;
2867 // Connection reported as unhappy.
2868 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2869 kDefaultXdsServerUrl, false)));
2870 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
2871 ::testing::_, ::testing::_,
2872 ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
2873 // We should not see a resource-does-not-exist event, because the
2874 // timer should not be running while the channel is disconnected.
2875 EXPECT_TRUE(watcher->ExpectNoEvent());
2876 // Start a new watch. This watcher should be given the same error,
2877 // since we have not yet recovered.
2878 auto watcher2 = StartFooWatch("foo1");
2879 error = watcher2->WaitForNextError();
2880 ASSERT_TRUE(error.has_value());
2881 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2882 EXPECT_EQ(error->message(),
2883 "xDS channel for server default_xds_server: "
2884 "connection failed (node ID:xds_client_test)")
2885 << *error;
2886 // Second watcher should not see resource-does-not-exist either.
2887 EXPECT_FALSE(watcher2->HasEvent());
2888 // The ADS stream uses wait_for_ready inside the XdsTransport interface,
2889 // so when the channel reconnects, the already-started stream will proceed.
2890 stream->CompleteSendMessageFromClient();
2891 // Server sends a response.
2892 stream->SendMessageToClient(
2893 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2894 .set_version_info("1")
2895 .set_nonce("A")
2896 .AddFooResource(XdsFooResource("foo1", 6))
2897 .Serialize());
2898 // Connection now reported as happy.
2899 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
2900 kDefaultXdsServerUrl, true)));
2901 // XdsClient should have delivered the response to the watchers.
2902 auto resource = watcher->WaitForNextResource();
2903 ASSERT_NE(resource, nullptr);
2904 EXPECT_EQ(resource->name, "foo1");
2905 EXPECT_EQ(resource->value, 6);
2906 resource = watcher2->WaitForNextResource();
2907 ASSERT_NE(resource, nullptr);
2908 EXPECT_EQ(resource->name, "foo1");
2909 EXPECT_EQ(resource->value, 6);
2910 // XdsClient should have sent an ACK message to the xDS server.
2911 request = WaitForRequest(stream.get());
2912 ASSERT_TRUE(request.has_value());
2913 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2914 /*version_info=*/"1", /*response_nonce=*/"A",
2915 /*error_detail=*/absl::OkStatus(),
2916 /*resource_names=*/{"foo1"});
2917 stream->CompleteSendMessageFromClient();
2918 // Cancel watches.
2919 CancelFooWatch(watcher.get(), "foo1");
2920 CancelFooWatch(watcher2.get(), "foo1");
2921 EXPECT_TRUE(stream->IsOrphaned());
2922 }
2923
TEST_F(XdsClientTest,ConnectionFailsWithCachedResource)2924 TEST_F(XdsClientTest, ConnectionFailsWithCachedResource) {
2925 InitXdsClient();
2926 // Start a watch for "foo1".
2927 auto watcher = StartFooWatch("foo1");
2928 // XdsClient should have created an ADS stream.
2929 auto stream = WaitForAdsStream();
2930 ASSERT_TRUE(stream != nullptr);
2931 // XdsClient should have sent a subscription request on the ADS stream.
2932 auto request = WaitForRequest(stream.get());
2933 ASSERT_TRUE(request.has_value());
2934 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2935 /*version_info=*/"", /*response_nonce=*/"",
2936 /*error_detail=*/absl::OkStatus(),
2937 /*resource_names=*/{"foo1"});
2938 CheckRequestNode(*request); // Should be present on the first request.
2939 // Server sends a response.
2940 stream->SendMessageToClient(
2941 ResponseBuilder(XdsFooResourceType::Get()->type_url())
2942 .set_version_info("1")
2943 .set_nonce("A")
2944 .AddFooResource(XdsFooResource("foo1", 6))
2945 .Serialize());
2946 // XdsClient should have delivered the resource to the watcher.
2947 auto resource = watcher->WaitForNextResource();
2948 ASSERT_NE(resource, nullptr);
2949 EXPECT_EQ(resource->name, "foo1");
2950 EXPECT_EQ(resource->value, 6);
2951 // XdsClient should have sent an ACK message to the xDS server.
2952 request = WaitForRequest(stream.get());
2953 ASSERT_TRUE(request.has_value());
2954 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2955 /*version_info=*/"1", /*response_nonce=*/"A",
2956 /*error_detail=*/absl::OkStatus(),
2957 /*resource_names=*/{"foo1"});
2958 // Transport reports connection failure.
2959 TriggerConnectionFailure(*xds_client_->bootstrap().servers().front(),
2960 absl::UnavailableError("connection failed"));
2961 // XdsClient should report an ambient error to the watcher.
2962 auto error = watcher->WaitForNextAmbientError();
2963 ASSERT_TRUE(error.has_value());
2964 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2965 EXPECT_EQ(error->message(),
2966 "xDS channel for server default_xds_server: "
2967 "connection failed (node ID:xds_client_test)")
2968 << *error;
2969 // The transport failing should also cause the stream to terminate.
2970 stream->MaybeSendStatusToClient(absl::UnavailableError("ugh"));
2971 // The XdsClient will create a new stream.
2972 stream = WaitForAdsStream();
2973 ASSERT_TRUE(stream != nullptr);
2974 // XdsClient should have sent a subscription request on the new stream
2975 // that includes the last seen version.
2976 request = WaitForRequest(stream.get());
2977 ASSERT_TRUE(request.has_value());
2978 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
2979 /*version_info=*/"1", /*response_nonce=*/"",
2980 /*error_detail=*/absl::OkStatus(),
2981 /*resource_names=*/{"foo1"});
2982 CheckRequestNode(*request); // Should be present on the first request.
2983 // Start a new watch. This watcher should be given the cached resource
2984 // followed by the ambient error.
2985 auto watcher2 = StartFooWatch("foo1");
2986 resource = watcher2->WaitForNextResource();
2987 ASSERT_NE(resource, nullptr);
2988 EXPECT_EQ(resource->name, "foo1");
2989 EXPECT_EQ(resource->value, 6);
2990 error = watcher2->WaitForNextAmbientError();
2991 ASSERT_TRUE(error.has_value());
2992 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
2993 EXPECT_EQ(error->message(),
2994 "xDS channel for server default_xds_server: "
2995 "connection failed (node ID:xds_client_test)")
2996 << *error;
2997 // The ADS stream uses wait_for_ready inside the XdsTransport interface,
2998 // so when the channel reconnects, the already-started stream will proceed.
2999 // Server sends a response with the same resource contents as before.
3000 stream->SendMessageToClient(
3001 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3002 .set_version_info("1")
3003 .set_nonce("A")
3004 .AddFooResource(XdsFooResource("foo1", 6))
3005 .Serialize());
3006 // The resource hasn't changed, so XdsClient will not call the
3007 // watchers' OnResourceChanged() methods. However, it will call
3008 // OnAmbientError() with an OK status to let them know that the
3009 // ambient error is gone.
3010 error = watcher->WaitForNextAmbientError();
3011 ASSERT_TRUE(error.has_value());
3012 EXPECT_EQ(*error, absl::OkStatus());
3013 error = watcher2->WaitForNextAmbientError();
3014 ASSERT_TRUE(error.has_value());
3015 EXPECT_EQ(*error, absl::OkStatus());
3016 // XdsClient should have sent an ACK message to the xDS server.
3017 request = WaitForRequest(stream.get());
3018 ASSERT_TRUE(request.has_value());
3019 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3020 /*version_info=*/"1", /*response_nonce=*/"A",
3021 /*error_detail=*/absl::OkStatus(),
3022 /*resource_names=*/{"foo1"});
3023 // Cancel watches.
3024 CancelFooWatch(watcher.get(), "foo1");
3025 CancelFooWatch(watcher2.get(), "foo1");
3026 EXPECT_TRUE(stream->IsOrphaned());
3027 }
3028
TEST_F(XdsClientTest,ResourceDoesNotExistUponTimeout)3029 TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) {
3030 InitXdsClient();
3031 // Start a watch for "foo1".
3032 auto watcher = StartFooWatch("foo1");
3033 // Watcher should initially not see any resource reported.
3034 EXPECT_FALSE(watcher->HasEvent());
3035 // Check metric data.
3036 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3037 ::testing::ElementsAre());
3038 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3039 ::testing::ElementsAre());
3040 EXPECT_THAT(GetResourceCounts(),
3041 ::testing::ElementsAre(::testing::Pair(
3042 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3043 XdsFooResourceType::Get()->type_url(),
3044 "requested"),
3045 1)));
3046 // CSDS should show that the resource has been requested.
3047 ClientConfig csds = DumpCsds();
3048 EXPECT_THAT(csds.generic_xds_configs(),
3049 ::testing::ElementsAre(CsdsResourceRequested(
3050 XdsFooResourceType::Get()->type_url(), "foo1")));
3051 // XdsClient should have created an ADS stream.
3052 auto stream = WaitForAdsStream();
3053 ASSERT_TRUE(stream != nullptr);
3054 // XdsClient should have sent a subscription request on the ADS stream.
3055 auto request = WaitForRequest(stream.get());
3056 ASSERT_TRUE(request.has_value());
3057 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3058 /*version_info=*/"", /*response_nonce=*/"",
3059 /*error_detail=*/absl::OkStatus(),
3060 /*resource_names=*/{"foo1"});
3061 CheckRequestNode(*request); // Should be present on the first request.
3062 // Do not send a response, but wait for the resource to be reported as
3063 // not existing.
3064 EXPECT_TRUE(watcher->WaitForDoesNotExist());
3065 // Check metric data.
3066 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3067 ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
3068 EXPECT_THAT(GetResourceCounts(),
3069 ::testing::ElementsAre(::testing::Pair(
3070 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3071 XdsFooResourceType::Get()->type_url(),
3072 "does_not_exist"),
3073 1)));
3074 // CSDS should show that the resource has been requested.
3075 csds = DumpCsds();
3076 EXPECT_THAT(csds.generic_xds_configs(),
3077 ::testing::ElementsAre(CsdsResourceDoesNotExist(
3078 XdsFooResourceType::Get()->type_url(), "foo1")));
3079 // Start a new watcher for the same resource. It should immediately
3080 // receive the same does-not-exist notification.
3081 auto watcher2 = StartFooWatch("foo1");
3082 EXPECT_TRUE(watcher2->WaitForDoesNotExist());
3083 // Now server sends a response.
3084 stream->SendMessageToClient(
3085 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3086 .set_version_info("1")
3087 .set_nonce("A")
3088 .AddFooResource(XdsFooResource("foo1", 6))
3089 .Serialize());
3090 // XdsClient should have delivered the response to the watchers.
3091 auto resource = watcher->WaitForNextResource();
3092 ASSERT_NE(resource, nullptr);
3093 EXPECT_EQ(resource->name, "foo1");
3094 EXPECT_EQ(resource->value, 6);
3095 resource = watcher2->WaitForNextResource();
3096 ASSERT_NE(resource, nullptr);
3097 EXPECT_EQ(resource->name, "foo1");
3098 EXPECT_EQ(resource->value, 6);
3099 // Check metric data.
3100 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3101 ::testing::ElementsAre(::testing::Pair(
3102 ::testing::Pair(kDefaultXdsServerUrl,
3103 XdsFooResourceType::Get()->type_url()),
3104 1)),
3105 ::testing::ElementsAre(), ::testing::_));
3106 EXPECT_THAT(
3107 GetResourceCounts(),
3108 ::testing::ElementsAre(::testing::Pair(
3109 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3110 XdsFooResourceType::Get()->type_url(), "acked"),
3111 1)));
3112 // Check CSDS data.
3113 csds = DumpCsds();
3114 EXPECT_THAT(csds.generic_xds_configs(),
3115 ::testing::UnorderedElementsAre(CsdsResourceAcked(
3116 XdsFooResourceType::Get()->type_url(), "foo1",
3117 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3118 // XdsClient should have sent an ACK message to the xDS server.
3119 request = WaitForRequest(stream.get());
3120 ASSERT_TRUE(request.has_value());
3121 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3122 /*version_info=*/"1", /*response_nonce=*/"A",
3123 /*error_detail=*/absl::OkStatus(),
3124 /*resource_names=*/{"foo1"});
3125 // Cancel watch.
3126 CancelFooWatch(watcher.get(), "foo1");
3127 CancelFooWatch(watcher2.get(), "foo1");
3128 EXPECT_TRUE(stream->IsOrphaned());
3129 }
3130
TEST_F(XdsClientTest,ResourceDoesNotExistAfterStreamRestart)3131 TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) {
3132 InitXdsClient();
3133 // Metrics should initially be empty.
3134 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3135 ::testing::ElementsAre());
3136 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3137 ::testing::ElementsAre());
3138 EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
3139 // Start a watch for "foo1".
3140 auto watcher = StartFooWatch("foo1");
3141 // Check metric data.
3142 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3143 ::testing::ElementsAre());
3144 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3145 ::testing::ElementsAre());
3146 EXPECT_THAT(GetResourceCounts(),
3147 ::testing::ElementsAre(::testing::Pair(
3148 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3149 XdsFooResourceType::Get()->type_url(),
3150 "requested"),
3151 1)));
3152 // CSDS should show that the resource has been requested.
3153 ClientConfig csds = DumpCsds();
3154 EXPECT_THAT(csds.generic_xds_configs(),
3155 ::testing::ElementsAre(CsdsResourceRequested(
3156 XdsFooResourceType::Get()->type_url(), "foo1")));
3157 // Watcher should initially not see any resource reported.
3158 EXPECT_FALSE(watcher->HasEvent());
3159 // XdsClient should have created an ADS stream.
3160 auto stream = WaitForAdsStream();
3161 ASSERT_TRUE(stream != nullptr);
3162 // XdsClient should have sent a subscription request on the ADS stream.
3163 auto request = WaitForRequest(stream.get());
3164 ASSERT_TRUE(request.has_value());
3165 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3166 /*version_info=*/"", /*response_nonce=*/"",
3167 /*error_detail=*/absl::OkStatus(),
3168 /*resource_names=*/{"foo1"});
3169 CheckRequestNode(*request); // Should be present on the first request.
3170 // Stream fails.
3171 stream->MaybeSendStatusToClient(absl::UnavailableError("ugh"));
3172 // XdsClient should report error to watcher.
3173 auto error = watcher->WaitForNextError();
3174 ASSERT_TRUE(error.has_value());
3175 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
3176 EXPECT_EQ(error->message(),
3177 "xDS channel for server default_xds_server: xDS call failed "
3178 "with no responses received; status: UNAVAILABLE: ugh "
3179 "(node ID:xds_client_test)")
3180 << *error;
3181 // Check metric data.
3182 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3183 ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
3184 EXPECT_THAT(GetResourceCounts(),
3185 ::testing::ElementsAre(::testing::Pair(
3186 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3187 XdsFooResourceType::Get()->type_url(),
3188 "requested"),
3189 1)));
3190 // CSDS should show that the resource has been requested.
3191 csds = DumpCsds();
3192 EXPECT_THAT(csds.generic_xds_configs(),
3193 ::testing::ElementsAre(CsdsResourceRequested(
3194 XdsFooResourceType::Get()->type_url(), "foo1")));
3195 // XdsClient should create a new stream.
3196 stream = WaitForAdsStream();
3197 ASSERT_TRUE(stream != nullptr);
3198 // XdsClient sends a subscription request.
3199 request = WaitForRequest(stream.get());
3200 ASSERT_TRUE(request.has_value());
3201 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3202 /*version_info=*/"", /*response_nonce=*/"",
3203 /*error_detail=*/absl::OkStatus(),
3204 /*resource_names=*/{"foo1"});
3205 CheckRequestNode(*request); // Should be present on the first request.
3206 // Server does NOT send a response immediately.
3207 // Client should receive a resource does-not-exist.
3208 ASSERT_TRUE(watcher->WaitForDoesNotExist());
3209 // Check metric data.
3210 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3211 ::testing::ElementsAre(), ::testing::ElementsAre(), ::testing::_));
3212 EXPECT_THAT(GetResourceCounts(),
3213 ::testing::ElementsAre(::testing::Pair(
3214 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3215 XdsFooResourceType::Get()->type_url(),
3216 "does_not_exist"),
3217 1)));
3218 // CSDS should show that the resource has been requested.
3219 csds = DumpCsds();
3220 EXPECT_THAT(csds.generic_xds_configs(),
3221 ::testing::ElementsAre(CsdsResourceDoesNotExist(
3222 XdsFooResourceType::Get()->type_url(), "foo1")));
3223 // Server now sends the requested resource.
3224 stream->SendMessageToClient(
3225 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3226 .set_version_info("1")
3227 .set_nonce("A")
3228 .AddFooResource(XdsFooResource("foo1", 6))
3229 .Serialize());
3230 // The resource is delivered to the watcher.
3231 auto resource = watcher->WaitForNextResource();
3232 ASSERT_NE(resource, nullptr);
3233 EXPECT_EQ(resource->name, "foo1");
3234 EXPECT_EQ(resource->value, 6);
3235 // Check metric data.
3236 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3237 ::testing::ElementsAre(::testing::Pair(
3238 ::testing::Pair(kDefaultXdsServerUrl,
3239 XdsFooResourceType::Get()->type_url()),
3240 1)),
3241 ::testing::ElementsAre(), ::testing::_));
3242 EXPECT_THAT(
3243 GetResourceCounts(),
3244 ::testing::ElementsAre(::testing::Pair(
3245 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3246 XdsFooResourceType::Get()->type_url(), "acked"),
3247 1)));
3248 // Check CSDS data.
3249 csds = DumpCsds();
3250 EXPECT_THAT(csds.generic_xds_configs(),
3251 ::testing::UnorderedElementsAre(CsdsResourceAcked(
3252 XdsFooResourceType::Get()->type_url(), "foo1",
3253 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3254 // XdsClient sends an ACK.
3255 request = WaitForRequest(stream.get());
3256 ASSERT_TRUE(request.has_value());
3257 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3258 /*version_info=*/"1", /*response_nonce=*/"A",
3259 /*error_detail=*/absl::OkStatus(),
3260 /*resource_names=*/{"foo1"});
3261 // Cancel watcher.
3262 CancelFooWatch(watcher.get(), "foo1");
3263 EXPECT_TRUE(stream->IsOrphaned());
3264 }
3265
TEST_F(XdsClientTest,DoesNotExistTimerNotStartedUntilSendCompletes)3266 TEST_F(XdsClientTest, DoesNotExistTimerNotStartedUntilSendCompletes) {
3267 InitXdsClient();
3268 // Tell transport to let us manually trigger completion of the
3269 // send_message ops to XdsClient.
3270 transport_factory_->SetAutoCompleteMessagesFromClient(false);
3271 // Start a watch for "foo1".
3272 auto watcher = StartFooWatch("foo1");
3273 // Watcher should initially not see any resource reported.
3274 EXPECT_FALSE(watcher->HasEvent());
3275 // XdsClient should have created an ADS stream.
3276 auto stream = WaitForAdsStream();
3277 ASSERT_TRUE(stream != nullptr);
3278 // XdsClient should have sent a subscription request on the ADS stream.
3279 auto request = WaitForRequest(stream.get());
3280 ASSERT_TRUE(request.has_value());
3281 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3282 /*version_info=*/"", /*response_nonce=*/"",
3283 /*error_detail=*/absl::OkStatus(),
3284 /*resource_names=*/{"foo1"});
3285 CheckRequestNode(*request); // Should be present on the first request.
3286 // Server does NOT send a response.
3287 // We should not see a resource-does-not-exist event, because the
3288 // timer should not be running while the channel is disconnected.
3289 EXPECT_TRUE(watcher->ExpectNoEvent());
3290 // Check metric data.
3291 EXPECT_THAT(GetResourceCounts(),
3292 ::testing::ElementsAre(::testing::Pair(
3293 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3294 XdsFooResourceType::Get()->type_url(),
3295 "requested"),
3296 1)));
3297 // CSDS should show that the resource has been requested.
3298 ClientConfig csds = DumpCsds();
3299 EXPECT_THAT(csds.generic_xds_configs(),
3300 ::testing::ElementsAre(CsdsResourceRequested(
3301 XdsFooResourceType::Get()->type_url(), "foo1")));
3302 // The ADS stream uses wait_for_ready inside the XdsTransport interface,
3303 // so when the channel connects, the already-started stream will proceed.
3304 stream->CompleteSendMessageFromClient();
3305 // Server does NOT send a response.
3306 // Watcher should see a does-not-exist event.
3307 EXPECT_TRUE(watcher->WaitForDoesNotExist());
3308 // Check metric data.
3309 EXPECT_THAT(GetResourceCounts(),
3310 ::testing::ElementsAre(::testing::Pair(
3311 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3312 XdsFooResourceType::Get()->type_url(),
3313 "does_not_exist"),
3314 1)));
3315 // CSDS should show that the resource has been requested.
3316 csds = DumpCsds();
3317 EXPECT_THAT(csds.generic_xds_configs(),
3318 ::testing::ElementsAre(CsdsResourceDoesNotExist(
3319 XdsFooResourceType::Get()->type_url(), "foo1")));
3320 // Now server sends a response.
3321 stream->SendMessageToClient(
3322 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3323 .set_version_info("1")
3324 .set_nonce("A")
3325 .AddFooResource(XdsFooResource("foo1", 6))
3326 .Serialize());
3327 // XdsClient should have delivered the response to the watcher.
3328 auto resource = watcher->WaitForNextResource();
3329 ASSERT_NE(resource, nullptr);
3330 EXPECT_EQ(resource->name, "foo1");
3331 EXPECT_EQ(resource->value, 6);
3332 // Check metric data.
3333 EXPECT_THAT(
3334 GetResourceCounts(),
3335 ::testing::ElementsAre(::testing::Pair(
3336 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3337 XdsFooResourceType::Get()->type_url(), "acked"),
3338 1)));
3339 // Check CSDS data.
3340 csds = DumpCsds();
3341 EXPECT_THAT(csds.generic_xds_configs(),
3342 ::testing::UnorderedElementsAre(CsdsResourceAcked(
3343 XdsFooResourceType::Get()->type_url(), "foo1",
3344 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3345 // XdsClient should have sent an ACK message to the xDS server.
3346 request = WaitForRequest(stream.get());
3347 ASSERT_TRUE(request.has_value());
3348 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3349 /*version_info=*/"1", /*response_nonce=*/"A",
3350 /*error_detail=*/absl::OkStatus(),
3351 /*resource_names=*/{"foo1"});
3352 stream->CompleteSendMessageFromClient();
3353 // Cancel watch.
3354 CancelFooWatch(watcher.get(), "foo1");
3355 EXPECT_TRUE(stream->IsOrphaned());
3356 }
3357
3358 // In https://github.com/grpc/grpc/issues/29583, we ran into a case
3359 // where we wound up starting a timer after we had already received the
3360 // resource, thus incorrectly reporting the resource as not existing.
3361 // This happened when unsubscribing and then resubscribing to the same
3362 // resource a send_message op was already in flight and then receiving an
3363 // update containing that resource.
TEST_F(XdsClientTest,ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending)3364 TEST_F(XdsClientTest,
3365 ResourceDoesNotExistUnsubscribeAndResubscribeWhileSendMessagePending) {
3366 InitXdsClient();
3367 // Tell transport to let us manually trigger completion of the
3368 // send_message ops to XdsClient.
3369 transport_factory_->SetAutoCompleteMessagesFromClient(false);
3370 // Start a watch for "foo1".
3371 auto watcher = StartFooWatch("foo1");
3372 // Watcher should initially not see any resource reported.
3373 EXPECT_FALSE(watcher->HasEvent());
3374 // XdsClient should have created an ADS stream.
3375 auto stream = WaitForAdsStream();
3376 ASSERT_TRUE(stream != nullptr);
3377 // XdsClient should have sent a subscription request on the ADS stream.
3378 auto request = WaitForRequest(stream.get());
3379 ASSERT_TRUE(request.has_value());
3380 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3381 /*version_info=*/"", /*response_nonce=*/"",
3382 /*error_detail=*/absl::OkStatus(),
3383 /*resource_names=*/{"foo1"});
3384 CheckRequestNode(*request); // Should be present on the first request.
3385 stream->CompleteSendMessageFromClient();
3386 // Server sends a response.
3387 stream->SendMessageToClient(
3388 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3389 .set_version_info("1")
3390 .set_nonce("A")
3391 .AddFooResource(XdsFooResource("foo1", 6))
3392 .Serialize());
3393 // XdsClient should have delivered the response to the watchers.
3394 auto resource = watcher->WaitForNextResource();
3395 ASSERT_NE(resource, nullptr);
3396 EXPECT_EQ(resource->name, "foo1");
3397 EXPECT_EQ(resource->value, 6);
3398 // Check metric data.
3399 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3400 ::testing::ElementsAre(::testing::Pair(
3401 ::testing::Pair(kDefaultXdsServerUrl,
3402 XdsFooResourceType::Get()->type_url()),
3403 1)),
3404 ::testing::ElementsAre(), ::testing::_));
3405 EXPECT_THAT(
3406 GetResourceCounts(),
3407 ::testing::ElementsAre(::testing::Pair(
3408 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3409 XdsFooResourceType::Get()->type_url(), "acked"),
3410 1)));
3411 // Check CSDS data.
3412 ClientConfig csds = DumpCsds();
3413 EXPECT_THAT(csds.generic_xds_configs(),
3414 ::testing::UnorderedElementsAre(CsdsResourceAcked(
3415 XdsFooResourceType::Get()->type_url(), "foo1",
3416 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3417 // XdsClient should have sent an ACK message to the xDS server.
3418 request = WaitForRequest(stream.get());
3419 ASSERT_TRUE(request.has_value());
3420 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3421 /*version_info=*/"1", /*response_nonce=*/"A",
3422 /*error_detail=*/absl::OkStatus(),
3423 /*resource_names=*/{"foo1"});
3424 stream->CompleteSendMessageFromClient();
3425 // Start a watch for a second resource.
3426 auto watcher2 = StartFooWatch("foo2");
3427 // Watcher should initially not see any resource reported.
3428 EXPECT_FALSE(watcher2->HasEvent());
3429 // Check metric data.
3430 EXPECT_THAT(
3431 GetResourceCounts(),
3432 ::testing::ElementsAre(
3433 ::testing::Pair(ResourceCountLabelsEq(
3434 XdsClient::kOldStyleAuthority,
3435 XdsFooResourceType::Get()->type_url(), "acked"),
3436 1),
3437 ::testing::Pair(
3438 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3439 XdsFooResourceType::Get()->type_url(),
3440 "requested"),
3441 1)));
3442 // Check CSDS data.
3443 csds = DumpCsds();
3444 EXPECT_THAT(csds.generic_xds_configs(),
3445 ::testing::UnorderedElementsAre(
3446 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
3447 "foo1", resource->AsJsonString(), "1",
3448 TimestampProtoEq(kTime0)),
3449 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(),
3450 "foo2")));
3451 // XdsClient sends a request to subscribe to the new resource.
3452 request = WaitForRequest(stream.get());
3453 ASSERT_TRUE(request.has_value());
3454 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3455 /*version_info=*/"1", /*response_nonce=*/"A",
3456 /*error_detail=*/absl::OkStatus(),
3457 /*resource_names=*/{"foo1", "foo2"});
3458 // NOTE: We do NOT yet tell the XdsClient that the send_message op is
3459 // complete.
3460 // Unsubscribe from foo1 and then re-subscribe to it.
3461 CancelFooWatch(watcher.get(), "foo1");
3462 EXPECT_THAT(GetResourceCounts(),
3463 ::testing::ElementsAre(::testing::Pair(
3464 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3465 XdsFooResourceType::Get()->type_url(),
3466 "requested"),
3467 1)));
3468 csds = DumpCsds();
3469 EXPECT_THAT(csds.generic_xds_configs(),
3470 ::testing::ElementsAre(CsdsResourceRequested(
3471 XdsFooResourceType::Get()->type_url(), "foo2")));
3472 watcher = StartFooWatch("foo1");
3473 EXPECT_THAT(GetResourceCounts(),
3474 ::testing::ElementsAre(::testing::Pair(
3475 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3476 XdsFooResourceType::Get()->type_url(),
3477 "requested"),
3478 2)));
3479 csds = DumpCsds();
3480 EXPECT_THAT(
3481 csds.generic_xds_configs(),
3482 ::testing::ElementsAre(
3483 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo1"),
3484 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(),
3485 "foo2")));
3486 // Now send a response from the server containing both foo1 and foo2.
3487 // We increment time to make sure that the CSDS data gets a new timestamp.
3488 time_cache_.TestOnlySetNow(kTime1);
3489 stream->SendMessageToClient(
3490 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3491 .set_version_info("1")
3492 .set_nonce("B")
3493 .AddFooResource(XdsFooResource("foo1", 6))
3494 .AddFooResource(XdsFooResource("foo2", 7))
3495 .Serialize());
3496 // The watcher for foo1 will receive an update even if the resource
3497 // has not changed, since the previous value was removed from the
3498 // cache when we unsubscribed.
3499 resource = watcher->WaitForNextResource();
3500 ASSERT_NE(resource, nullptr);
3501 EXPECT_EQ(resource->name, "foo1");
3502 EXPECT_EQ(resource->value, 6);
3503 // For foo2, the watcher should receive notification for the new resource.
3504 auto resource2 = watcher2->WaitForNextResource();
3505 ASSERT_NE(resource2, nullptr);
3506 EXPECT_EQ(resource2->name, "foo2");
3507 EXPECT_EQ(resource2->value, 7);
3508 // Check metric data.
3509 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3510 ::testing::ElementsAre(::testing::Pair(
3511 ::testing::Pair(kDefaultXdsServerUrl,
3512 XdsFooResourceType::Get()->type_url()),
3513 3)),
3514 ::testing::ElementsAre(), ::testing::_));
3515 EXPECT_THAT(
3516 GetResourceCounts(),
3517 ::testing::ElementsAre(::testing::Pair(
3518 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3519 XdsFooResourceType::Get()->type_url(), "acked"),
3520 2)));
3521 // Check CSDS data.
3522 csds = DumpCsds();
3523 EXPECT_THAT(csds.generic_xds_configs(),
3524 ::testing::UnorderedElementsAre(
3525 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
3526 "foo1", resource->AsJsonString(), "1",
3527 TimestampProtoEq(kTime1)),
3528 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
3529 "foo2", resource2->AsJsonString(), "1",
3530 TimestampProtoEq(kTime1))));
3531 // Now we finally tell XdsClient that its previous send_message op is
3532 // complete.
3533 stream->CompleteSendMessageFromClient();
3534 // XdsClient should send an ACK with the updated subscription list
3535 // (which happens to be identical to the old list), and it should not
3536 // restart the does-not-exist timer.
3537 request = WaitForRequest(stream.get());
3538 ASSERT_TRUE(request.has_value());
3539 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3540 /*version_info=*/"1", /*response_nonce=*/"B",
3541 /*error_detail=*/absl::OkStatus(),
3542 /*resource_names=*/{"foo1", "foo2"});
3543 stream->CompleteSendMessageFromClient();
3544 // Make sure the watcher for foo1 does not see a does-not-exist event.
3545 EXPECT_TRUE(watcher->ExpectNoEvent());
3546 // Cancel watches.
3547 CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
3548 CancelFooWatch(watcher2.get(), "foo2");
3549 EXPECT_TRUE(stream->IsOrphaned());
3550 }
3551
TEST_F(XdsClientTest,DoNotSendDoesNotExistForCachedResource)3552 TEST_F(XdsClientTest, DoNotSendDoesNotExistForCachedResource) {
3553 InitXdsClient();
3554 // Start a watch for "foo1".
3555 auto watcher = StartFooWatch("foo1");
3556 // Watcher should initially not see any resource reported.
3557 EXPECT_FALSE(watcher->HasEvent());
3558 // XdsClient should have created an ADS stream.
3559 auto stream = WaitForAdsStream();
3560 ASSERT_TRUE(stream != nullptr);
3561 // XdsClient should have sent a subscription request on the ADS stream.
3562 auto request = WaitForRequest(stream.get());
3563 ASSERT_TRUE(request.has_value());
3564 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3565 /*version_info=*/"", /*response_nonce=*/"",
3566 /*error_detail=*/absl::OkStatus(),
3567 /*resource_names=*/{"foo1"});
3568 CheckRequestNode(*request); // Should be present on the first request.
3569 // Server sends a response.
3570 stream->SendMessageToClient(
3571 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3572 .set_version_info("1")
3573 .set_nonce("A")
3574 .AddFooResource(XdsFooResource("foo1", 6))
3575 .Serialize());
3576 // XdsClient should have delivered the response to the watcher.
3577 auto resource = watcher->WaitForNextResource();
3578 ASSERT_NE(resource, nullptr);
3579 EXPECT_EQ(resource->name, "foo1");
3580 EXPECT_EQ(resource->value, 6);
3581 // Check metric data.
3582 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3583 ::testing::ElementsAre(::testing::Pair(
3584 ::testing::Pair(kDefaultXdsServerUrl,
3585 XdsFooResourceType::Get()->type_url()),
3586 1)),
3587 ::testing::ElementsAre(), ::testing::_));
3588 EXPECT_THAT(
3589 GetResourceCounts(),
3590 ::testing::ElementsAre(::testing::Pair(
3591 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3592 XdsFooResourceType::Get()->type_url(), "acked"),
3593 1)));
3594 // Check CSDS data.
3595 ClientConfig csds = DumpCsds();
3596 EXPECT_THAT(csds.generic_xds_configs(),
3597 ::testing::UnorderedElementsAre(CsdsResourceAcked(
3598 XdsFooResourceType::Get()->type_url(), "foo1",
3599 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3600 // XdsClient should have sent an ACK message to the xDS server.
3601 request = WaitForRequest(stream.get());
3602 ASSERT_TRUE(request.has_value());
3603 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3604 /*version_info=*/"1", /*response_nonce=*/"A",
3605 /*error_detail=*/absl::OkStatus(),
3606 /*resource_names=*/{"foo1"});
3607 // Stream fails because of transport disconnection.
3608 stream->MaybeSendStatusToClient(absl::UnavailableError("connection failed"));
3609 // XdsClient should NOT report error to watcher, because we saw a
3610 // response on the stream before it failed.
3611 // XdsClient creates a new stream.
3612 stream = WaitForAdsStream();
3613 ASSERT_TRUE(stream != nullptr);
3614 // XdsClient should have sent a subscription request on the ADS stream.
3615 request = WaitForRequest(stream.get());
3616 ASSERT_TRUE(request.has_value());
3617 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3618 /*version_info=*/"1", /*response_nonce=*/"",
3619 /*error_detail=*/absl::OkStatus(),
3620 /*resource_names=*/{"foo1"});
3621 CheckRequestNode(*request); // Should be present on the first request.
3622 // Server does NOT send a response.
3623 // We should not see a resource-does-not-exist event, because the
3624 // resource was already cached, so the server can optimize by not
3625 // resending it.
3626 EXPECT_TRUE(watcher->ExpectNoEvent());
3627 // Check metric data.
3628 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3629 ::testing::ElementsAre(::testing::Pair(
3630 ::testing::Pair(kDefaultXdsServerUrl,
3631 XdsFooResourceType::Get()->type_url()),
3632 1)),
3633 ::testing::ElementsAre(), ::testing::_));
3634 EXPECT_THAT(
3635 GetResourceCounts(),
3636 ::testing::ElementsAre(::testing::Pair(
3637 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3638 XdsFooResourceType::Get()->type_url(), "acked"),
3639 1)));
3640 // Check CSDS data.
3641 csds = DumpCsds();
3642 EXPECT_THAT(csds.generic_xds_configs(),
3643 ::testing::UnorderedElementsAre(CsdsResourceAcked(
3644 XdsFooResourceType::Get()->type_url(), "foo1",
3645 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3646 // Now server sends a response.
3647 // We increment time to make sure that the CSDS data gets a new timestamp.
3648 time_cache_.TestOnlySetNow(kTime1);
3649 stream->SendMessageToClient(
3650 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3651 .set_version_info("1")
3652 .set_nonce("A")
3653 .AddFooResource(XdsFooResource("foo1", 6))
3654 .Serialize());
3655 // Watcher will not see any update, since the resource is unchanged.
3656 EXPECT_TRUE(watcher->ExpectNoEvent());
3657 // Check metric data.
3658 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3659 ::testing::ElementsAre(::testing::Pair(
3660 ::testing::Pair(kDefaultXdsServerUrl,
3661 XdsFooResourceType::Get()->type_url()),
3662 2)),
3663 ::testing::ElementsAre(), ::testing::_));
3664 EXPECT_THAT(
3665 GetResourceCounts(),
3666 ::testing::ElementsAre(::testing::Pair(
3667 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3668 XdsFooResourceType::Get()->type_url(), "acked"),
3669 1)));
3670 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3671 kDefaultXdsServerUrl, true)));
3672 // Check CSDS data.
3673 csds = DumpCsds();
3674 EXPECT_THAT(csds.generic_xds_configs(),
3675 ::testing::UnorderedElementsAre(CsdsResourceAcked(
3676 XdsFooResourceType::Get()->type_url(), "foo1",
3677 resource->AsJsonString(), "1", TimestampProtoEq(kTime1))));
3678 // XdsClient should have sent an ACK message to the xDS server.
3679 request = WaitForRequest(stream.get());
3680 ASSERT_TRUE(request.has_value());
3681 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3682 /*version_info=*/"1", /*response_nonce=*/"A",
3683 /*error_detail=*/absl::OkStatus(),
3684 /*resource_names=*/{"foo1"});
3685 // Cancel watch.
3686 CancelFooWatch(watcher.get(), "foo1");
3687 EXPECT_TRUE(stream->IsOrphaned());
3688 }
3689
TEST_F(XdsClientTest,ResourceWrappedInResourceMessage)3690 TEST_F(XdsClientTest, ResourceWrappedInResourceMessage) {
3691 InitXdsClient();
3692 // Start a watch for "foo1".
3693 auto watcher = StartFooWatch("foo1");
3694 // Watcher should initially not see any resource reported.
3695 EXPECT_FALSE(watcher->HasEvent());
3696 // XdsClient should have created an ADS stream.
3697 auto stream = WaitForAdsStream();
3698 ASSERT_TRUE(stream != nullptr);
3699 // XdsClient should have sent a subscription request on the ADS stream.
3700 auto request = WaitForRequest(stream.get());
3701 ASSERT_TRUE(request.has_value());
3702 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3703 /*version_info=*/"", /*response_nonce=*/"",
3704 /*error_detail=*/absl::OkStatus(),
3705 /*resource_names=*/{"foo1"});
3706 CheckRequestNode(*request); // Should be present on the first request.
3707 // Send a response with the resource wrapped in a Resource message.
3708 stream->SendMessageToClient(
3709 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3710 .set_version_info("1")
3711 .set_nonce("A")
3712 .AddFooResource(XdsFooResource("foo1", 6),
3713 /*in_resource_wrapper=*/true)
3714 .Serialize());
3715 // XdsClient should have delivered the response to the watcher.
3716 auto resource = watcher->WaitForNextResource();
3717 ASSERT_NE(resource, nullptr);
3718 EXPECT_EQ(resource->name, "foo1");
3719 EXPECT_EQ(resource->value, 6);
3720 // Check metric data.
3721 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3722 ::testing::ElementsAre(::testing::Pair(
3723 ::testing::Pair(kDefaultXdsServerUrl,
3724 XdsFooResourceType::Get()->type_url()),
3725 1)),
3726 ::testing::ElementsAre(), ::testing::_));
3727 EXPECT_THAT(
3728 GetResourceCounts(),
3729 ::testing::ElementsAre(::testing::Pair(
3730 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3731 XdsFooResourceType::Get()->type_url(), "acked"),
3732 1)));
3733 // Check CSDS data.
3734 ClientConfig csds = DumpCsds();
3735 EXPECT_THAT(csds.generic_xds_configs(),
3736 ::testing::UnorderedElementsAre(CsdsResourceAcked(
3737 XdsFooResourceType::Get()->type_url(), "foo1",
3738 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3739 // XdsClient should have sent an ACK message to the xDS server.
3740 request = WaitForRequest(stream.get());
3741 ASSERT_TRUE(request.has_value());
3742 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3743 /*version_info=*/"1", /*response_nonce=*/"A",
3744 /*error_detail=*/absl::OkStatus(),
3745 /*resource_names=*/{"foo1"});
3746 // Cancel watch.
3747 CancelFooWatch(watcher.get(), "foo1");
3748 EXPECT_TRUE(stream->IsOrphaned());
3749 }
3750
TEST_F(XdsClientTest,MultipleResourceTypes)3751 TEST_F(XdsClientTest, MultipleResourceTypes) {
3752 InitXdsClient();
3753 // Start a watch for "foo1".
3754 auto watcher = StartFooWatch("foo1");
3755 // Watcher should initially not see any resource reported.
3756 EXPECT_FALSE(watcher->HasEvent());
3757 // XdsClient should have created an ADS stream.
3758 auto stream = WaitForAdsStream();
3759 ASSERT_TRUE(stream != nullptr);
3760 // XdsClient should have sent a subscription request on the ADS stream.
3761 auto request = WaitForRequest(stream.get());
3762 ASSERT_TRUE(request.has_value());
3763 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3764 /*version_info=*/"", /*response_nonce=*/"",
3765 /*error_detail=*/absl::OkStatus(),
3766 /*resource_names=*/{"foo1"});
3767 CheckRequestNode(*request); // Should be present on the first request.
3768 // Send a response.
3769 stream->SendMessageToClient(
3770 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3771 .set_version_info("1")
3772 .set_nonce("A")
3773 .AddFooResource(XdsFooResource("foo1", 6))
3774 .Serialize());
3775 // XdsClient should have delivered the response to the watcher.
3776 auto resource = watcher->WaitForNextResource();
3777 ASSERT_NE(resource, nullptr);
3778 EXPECT_EQ(resource->name, "foo1");
3779 EXPECT_EQ(resource->value, 6);
3780 // Check metric data.
3781 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3782 ::testing::ElementsAre(::testing::Pair(
3783 ::testing::Pair(kDefaultXdsServerUrl,
3784 XdsFooResourceType::Get()->type_url()),
3785 1)),
3786 ::testing::ElementsAre(), ::testing::_));
3787 EXPECT_THAT(
3788 GetResourceCounts(),
3789 ::testing::ElementsAre(::testing::Pair(
3790 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3791 XdsFooResourceType::Get()->type_url(), "acked"),
3792 1)));
3793 // Check CSDS data.
3794 ClientConfig csds = DumpCsds();
3795 EXPECT_THAT(csds.generic_xds_configs(),
3796 ::testing::UnorderedElementsAre(CsdsResourceAcked(
3797 XdsFooResourceType::Get()->type_url(), "foo1",
3798 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3799 // XdsClient should have sent an ACK message to the xDS server.
3800 request = WaitForRequest(stream.get());
3801 ASSERT_TRUE(request.has_value());
3802 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3803 /*version_info=*/"1", /*response_nonce=*/"A",
3804 /*error_detail=*/absl::OkStatus(),
3805 /*resource_names=*/{"foo1"});
3806 // Start a watch for "bar1".
3807 auto watcher2 = StartBarWatch("bar1");
3808 // XdsClient should have sent a subscription request on the ADS stream.
3809 // Note that version and nonce here do NOT use the values for Foo,
3810 // since each resource type has its own state.
3811 request = WaitForRequest(stream.get());
3812 ASSERT_TRUE(request.has_value());
3813 CheckRequest(*request, XdsBarResourceType::Get()->type_url(),
3814 /*version_info=*/"", /*response_nonce=*/"",
3815 /*error_detail=*/absl::OkStatus(),
3816 /*resource_names=*/{"bar1"});
3817 // Send a response.
3818 // We increment time to make sure that the CSDS data gets a new timestamp.
3819 time_cache_.TestOnlySetNow(kTime1);
3820 stream->SendMessageToClient(
3821 ResponseBuilder(XdsBarResourceType::Get()->type_url())
3822 .set_version_info("2")
3823 .set_nonce("B")
3824 .AddBarResource(XdsBarResource("bar1", "whee"))
3825 .Serialize());
3826 // XdsClient should have delivered the response to the watcher.
3827 auto resource2 = watcher2->WaitForNextResource();
3828 ASSERT_NE(resource, nullptr);
3829 EXPECT_EQ(resource2->name, "bar1");
3830 EXPECT_EQ(resource2->value, "whee");
3831 // Check metric data.
3832 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3833 ::testing::ElementsAre(
3834 ::testing::Pair(
3835 ::testing::Pair(kDefaultXdsServerUrl,
3836 XdsBarResourceType::Get()->type_url()),
3837 1),
3838 ::testing::Pair(
3839 ::testing::Pair(kDefaultXdsServerUrl,
3840 XdsFooResourceType::Get()->type_url()),
3841 1)),
3842 ::testing::ElementsAre(), ::testing::_));
3843 EXPECT_THAT(
3844 GetResourceCounts(),
3845 ::testing::UnorderedElementsAre(
3846 ::testing::Pair(ResourceCountLabelsEq(
3847 XdsClient::kOldStyleAuthority,
3848 XdsBarResourceType::Get()->type_url(), "acked"),
3849 1),
3850 ::testing::Pair(ResourceCountLabelsEq(
3851 XdsClient::kOldStyleAuthority,
3852 XdsFooResourceType::Get()->type_url(), "acked"),
3853 1)));
3854 // Check CSDS data.
3855 csds = DumpCsds();
3856 EXPECT_THAT(csds.generic_xds_configs(),
3857 ::testing::UnorderedElementsAre(
3858 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
3859 "foo1", resource->AsJsonString(), "1",
3860 TimestampProtoEq(kTime0)),
3861 CsdsResourceAcked(XdsBarResourceType::Get()->type_url(),
3862 "bar1", resource2->AsJsonString(), "2",
3863 TimestampProtoEq(kTime1))));
3864 // XdsClient should have sent an ACK message to the xDS server.
3865 request = WaitForRequest(stream.get());
3866 ASSERT_TRUE(request.has_value());
3867 CheckRequest(*request, XdsBarResourceType::Get()->type_url(),
3868 /*version_info=*/"2", /*response_nonce=*/"B",
3869 /*error_detail=*/absl::OkStatus(),
3870 /*resource_names=*/{"bar1"});
3871 // Cancel watch for "foo1".
3872 CancelFooWatch(watcher.get(), "foo1");
3873 // XdsClient should send an unsubscription request.
3874 request = WaitForRequest(stream.get());
3875 ASSERT_TRUE(request.has_value());
3876 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3877 /*version_info=*/"1", /*response_nonce=*/"A",
3878 /*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
3879 // Server sends an empty response for the resource type.
3880 // (The server doesn't need to do this, but it may.)
3881 stream->SendMessageToClient(
3882 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3883 .set_version_info("1")
3884 .set_nonce("C")
3885 .Serialize());
3886 // Client should ACK.
3887 request = WaitForRequest(stream.get());
3888 ASSERT_TRUE(request.has_value());
3889 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3890 /*version_info=*/"1", /*response_nonce=*/"C",
3891 /*error_detail=*/absl::OkStatus(), /*resource_names=*/{});
3892 // Now subscribe to foo2.
3893 watcher = StartFooWatch("foo2");
3894 // Client sends a subscription request, which retains the nonce and
3895 // version seen previously.
3896 request = WaitForRequest(stream.get());
3897 ASSERT_TRUE(request.has_value());
3898 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3899 /*version_info=*/"1", /*response_nonce=*/"C",
3900 /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"});
3901 // Server sends foo2.
3902 stream->SendMessageToClient(
3903 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3904 .set_version_info("1")
3905 .set_nonce("D")
3906 .AddFooResource(XdsFooResource("foo2", 8))
3907 .Serialize());
3908 // Watcher receives the resource.
3909 resource = watcher->WaitForNextResource();
3910 ASSERT_NE(resource, nullptr);
3911 EXPECT_EQ(resource->name, "foo2");
3912 EXPECT_EQ(resource->value, 8);
3913 // Client ACKs.
3914 request = WaitForRequest(stream.get());
3915 ASSERT_TRUE(request.has_value());
3916 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3917 /*version_info=*/"1", /*response_nonce=*/"D",
3918 /*error_detail=*/absl::OkStatus(), /*resource_names=*/{"foo2"});
3919 // Cancel watches.
3920 CancelFooWatch(watcher.get(), "foo2", /*delay_unsubscription=*/true);
3921 CancelBarWatch(watcher2.get(), "bar1");
3922 EXPECT_TRUE(stream->IsOrphaned());
3923 }
3924
TEST_F(XdsClientTest,Federation)3925 TEST_F(XdsClientTest, Federation) {
3926 constexpr char kAuthority[] = "xds.example.com";
3927 const std::string kXdstpResourceName = absl::StrCat(
3928 "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
3929 FakeXdsBootstrap::FakeXdsServer authority_server("other_xds_server");
3930 FakeXdsBootstrap::FakeAuthority authority;
3931 authority.set_server(authority_server);
3932 InitXdsClient(
3933 FakeXdsBootstrap::Builder().AddAuthority(kAuthority, authority));
3934 // Metrics should initially be empty.
3935 EXPECT_THAT(metrics_reporter_->resource_updates_valid(),
3936 ::testing::ElementsAre());
3937 EXPECT_THAT(metrics_reporter_->resource_updates_invalid(),
3938 ::testing::ElementsAre());
3939 EXPECT_THAT(GetResourceCounts(), ::testing::ElementsAre());
3940 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre());
3941 // Start a watch for "foo1".
3942 auto watcher = StartFooWatch("foo1");
3943 // Watcher should initially not see any resource reported.
3944 EXPECT_FALSE(watcher->HasEvent());
3945 // XdsClient should have created an ADS stream to the top-level xDS server.
3946 auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front());
3947 ASSERT_TRUE(stream != nullptr);
3948 // XdsClient should have sent a subscription request on the ADS stream.
3949 auto request = WaitForRequest(stream.get());
3950 ASSERT_TRUE(request.has_value());
3951 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3952 /*version_info=*/"", /*response_nonce=*/"",
3953 /*error_detail=*/absl::OkStatus(),
3954 /*resource_names=*/{"foo1"});
3955 CheckRequestNode(*request); // Should be present on the first request.
3956 // Send a response.
3957 stream->SendMessageToClient(
3958 ResponseBuilder(XdsFooResourceType::Get()->type_url())
3959 .set_version_info("1")
3960 .set_nonce("A")
3961 .AddFooResource(XdsFooResource("foo1", 6))
3962 .Serialize());
3963 // XdsClient should have delivered the response to the watcher.
3964 auto resource = watcher->WaitForNextResource();
3965 ASSERT_NE(resource, nullptr);
3966 EXPECT_EQ(resource->name, "foo1");
3967 EXPECT_EQ(resource->value, 6);
3968 // Check metric data.
3969 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
3970 ::testing::ElementsAre(::testing::Pair(
3971 ::testing::Pair(kDefaultXdsServerUrl,
3972 XdsFooResourceType::Get()->type_url()),
3973 1)),
3974 ::testing::ElementsAre(), ::testing::_));
3975 EXPECT_THAT(
3976 GetResourceCounts(),
3977 ::testing::ElementsAre(::testing::Pair(
3978 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
3979 XdsFooResourceType::Get()->type_url(), "acked"),
3980 1)));
3981 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
3982 kDefaultXdsServerUrl, true)));
3983 // Check CSDS data.
3984 ClientConfig csds = DumpCsds();
3985 EXPECT_THAT(csds.generic_xds_configs(),
3986 ::testing::UnorderedElementsAre(CsdsResourceAcked(
3987 XdsFooResourceType::Get()->type_url(), "foo1",
3988 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
3989 // XdsClient should have sent an ACK message to the xDS server.
3990 request = WaitForRequest(stream.get());
3991 ASSERT_TRUE(request.has_value());
3992 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
3993 /*version_info=*/"1", /*response_nonce=*/"A",
3994 /*error_detail=*/absl::OkStatus(),
3995 /*resource_names=*/{"foo1"});
3996 // Start a watch for the xdstp resource name.
3997 auto watcher2 = StartFooWatch(kXdstpResourceName);
3998 // Watcher should initially not see any resource reported.
3999 EXPECT_FALSE(watcher2->HasEvent());
4000 // Check metric data.
4001 EXPECT_THAT(
4002 GetResourceCounts(),
4003 ::testing::ElementsAre(
4004 ::testing::Pair(ResourceCountLabelsEq(
4005 XdsClient::kOldStyleAuthority,
4006 XdsFooResourceType::Get()->type_url(), "acked"),
4007 1),
4008 ::testing::Pair(ResourceCountLabelsEq(
4009 kAuthority, XdsFooResourceType::Get()->type_url(),
4010 "requested"),
4011 1)));
4012 EXPECT_THAT(GetServerConnections(),
4013 ::testing::ElementsAre(
4014 ::testing::Pair(kDefaultXdsServerUrl, true),
4015 ::testing::Pair(authority_server.server_uri(), true)));
4016 // Check CSDS data.
4017 csds = DumpCsds();
4018 EXPECT_THAT(csds.generic_xds_configs(),
4019 ::testing::UnorderedElementsAre(
4020 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4021 "foo1", resource->AsJsonString(), "1",
4022 TimestampProtoEq(kTime0)),
4023 CsdsResourceRequested(XdsFooResourceType::Get()->type_url(),
4024 kXdstpResourceName)));
4025 // XdsClient will create a new stream to the server for this authority.
4026 auto stream2 = WaitForAdsStream(authority_server);
4027 ASSERT_TRUE(stream2 != nullptr);
4028 // XdsClient should have sent a subscription request on the ADS stream.
4029 // Note that version and nonce here do NOT use the values for Foo,
4030 // since each authority has its own state.
4031 request = WaitForRequest(stream2.get());
4032 ASSERT_TRUE(request.has_value());
4033 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4034 /*version_info=*/"", /*response_nonce=*/"",
4035 /*error_detail=*/absl::OkStatus(),
4036 /*resource_names=*/{kXdstpResourceName});
4037 CheckRequestNode(*request); // Should be present on the first request.
4038 // Send a response.
4039 // We increment time to make sure that the CSDS data gets a new timestamp.
4040 time_cache_.TestOnlySetNow(kTime1);
4041 stream2->SendMessageToClient(
4042 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4043 .set_version_info("2")
4044 .set_nonce("B")
4045 .AddFooResource(XdsFooResource(kXdstpResourceName, 3))
4046 .Serialize());
4047 // XdsClient should have delivered the response to the watcher.
4048 auto resource2 = watcher2->WaitForNextResource();
4049 ASSERT_NE(resource2, nullptr);
4050 EXPECT_EQ(resource2->name, kXdstpResourceName);
4051 EXPECT_EQ(resource2->value, 3);
4052 // Check metric data.
4053 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4054 ::testing::ElementsAre(
4055 ::testing::Pair(
4056 ::testing::Pair(kDefaultXdsServerUrl,
4057 XdsFooResourceType::Get()->type_url()),
4058 1),
4059 ::testing::Pair(
4060 ::testing::Pair(authority_server.server_uri(),
4061 XdsFooResourceType::Get()->type_url()),
4062 1)),
4063 ::testing::ElementsAre(), ::testing::_));
4064 EXPECT_THAT(
4065 GetResourceCounts(),
4066 ::testing::ElementsAre(
4067 ::testing::Pair(ResourceCountLabelsEq(
4068 XdsClient::kOldStyleAuthority,
4069 XdsFooResourceType::Get()->type_url(), "acked"),
4070 1),
4071 ::testing::Pair(
4072 ResourceCountLabelsEq(
4073 kAuthority, XdsFooResourceType::Get()->type_url(), "acked"),
4074 1)));
4075 EXPECT_THAT(GetServerConnections(),
4076 ::testing::ElementsAre(
4077 ::testing::Pair(kDefaultXdsServerUrl, true),
4078 ::testing::Pair(authority_server.server_uri(), true)));
4079 // Check CSDS data.
4080 csds = DumpCsds();
4081 EXPECT_THAT(
4082 csds.generic_xds_configs(),
4083 ::testing::UnorderedElementsAre(
4084 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(), "foo1",
4085 resource->AsJsonString(), "1",
4086 TimestampProtoEq(kTime0)),
4087 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4088 kXdstpResourceName, resource2->AsJsonString(), "2",
4089 TimestampProtoEq(kTime1))));
4090 // XdsClient should have sent an ACK message to the xDS server.
4091 request = WaitForRequest(stream2.get());
4092 ASSERT_TRUE(request.has_value());
4093 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4094 /*version_info=*/"2", /*response_nonce=*/"B",
4095 /*error_detail=*/absl::OkStatus(),
4096 /*resource_names=*/{kXdstpResourceName});
4097 // Cancel watch for "foo1".
4098 CancelFooWatch(watcher.get(), "foo1");
4099 EXPECT_TRUE(stream->IsOrphaned());
4100 // Now cancel watch for xdstp resource name.
4101 CancelFooWatch(watcher2.get(), kXdstpResourceName);
4102 EXPECT_TRUE(stream2->IsOrphaned());
4103 }
4104
TEST_F(XdsClientTest,FederationAuthorityDefaultsToTopLevelXdsServer)4105 TEST_F(XdsClientTest, FederationAuthorityDefaultsToTopLevelXdsServer) {
4106 constexpr char kAuthority[] = "xds.example.com";
4107 const std::string kXdstpResourceName = absl::StrCat(
4108 "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
4109 // Authority does not specify any xDS servers, so XdsClient will use
4110 // the top-level xDS server in the bootstrap config for this authority.
4111 InitXdsClient(FakeXdsBootstrap::Builder().AddAuthority(
4112 kAuthority, FakeXdsBootstrap::FakeAuthority()));
4113 // Start a watch for "foo1".
4114 auto watcher = StartFooWatch("foo1");
4115 // Watcher should initially not see any resource reported.
4116 EXPECT_FALSE(watcher->HasEvent());
4117 // XdsClient should have created an ADS stream to the top-level xDS server.
4118 auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front());
4119 ASSERT_TRUE(stream != nullptr);
4120 // XdsClient should have sent a subscription request on the ADS stream.
4121 auto request = WaitForRequest(stream.get());
4122 ASSERT_TRUE(request.has_value());
4123 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4124 /*version_info=*/"", /*response_nonce=*/"",
4125 /*error_detail=*/absl::OkStatus(),
4126 /*resource_names=*/{"foo1"});
4127 CheckRequestNode(*request); // Should be present on the first request.
4128 // Send a response.
4129 stream->SendMessageToClient(
4130 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4131 .set_version_info("1")
4132 .set_nonce("A")
4133 .AddFooResource(XdsFooResource("foo1", 6))
4134 .Serialize());
4135 // XdsClient should have delivered the response to the watcher.
4136 auto resource = watcher->WaitForNextResource();
4137 ASSERT_NE(resource, nullptr);
4138 EXPECT_EQ(resource->name, "foo1");
4139 EXPECT_EQ(resource->value, 6);
4140 // Check metric data.
4141 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4142 ::testing::ElementsAre(::testing::Pair(
4143 ::testing::Pair(kDefaultXdsServerUrl,
4144 XdsFooResourceType::Get()->type_url()),
4145 1)),
4146 ::testing::ElementsAre(), ::testing::_));
4147 EXPECT_THAT(
4148 GetResourceCounts(),
4149 ::testing::ElementsAre(::testing::Pair(
4150 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
4151 XdsFooResourceType::Get()->type_url(), "acked"),
4152 1)));
4153 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4154 kDefaultXdsServerUrl, true)));
4155 // Check CSDS data.
4156 ClientConfig csds = DumpCsds();
4157 EXPECT_THAT(csds.generic_xds_configs(),
4158 ::testing::UnorderedElementsAre(CsdsResourceAcked(
4159 XdsFooResourceType::Get()->type_url(), "foo1",
4160 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
4161 // XdsClient should have sent an ACK message to the xDS server.
4162 request = WaitForRequest(stream.get());
4163 ASSERT_TRUE(request.has_value());
4164 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4165 /*version_info=*/"1", /*response_nonce=*/"A",
4166 /*error_detail=*/absl::OkStatus(),
4167 /*resource_names=*/{"foo1"});
4168 // Start a watch for the xdstp resource name.
4169 auto watcher2 = StartFooWatch(kXdstpResourceName);
4170 // Watcher should initially not see any resource reported.
4171 EXPECT_FALSE(watcher2->HasEvent());
4172 // XdsClient will send a subscription request on the ADS stream that
4173 // includes both resources, since both are being obtained from the
4174 // same server.
4175 request = WaitForRequest(stream.get());
4176 ASSERT_TRUE(request.has_value());
4177 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4178 /*version_info=*/"1", /*response_nonce=*/"A",
4179 /*error_detail=*/absl::OkStatus(),
4180 /*resource_names=*/{"foo1", kXdstpResourceName});
4181 // Send a response.
4182 // We increment time to make sure that the CSDS data gets a new timestamp.
4183 time_cache_.TestOnlySetNow(kTime1);
4184 stream->SendMessageToClient(
4185 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4186 .set_version_info("2")
4187 .set_nonce("B")
4188 .AddFooResource(XdsFooResource(kXdstpResourceName, 3))
4189 .Serialize());
4190 // XdsClient should have delivered the response to the watcher.
4191 auto resource2 = watcher2->WaitForNextResource();
4192 ASSERT_NE(resource2, nullptr);
4193 EXPECT_EQ(resource2->name, kXdstpResourceName);
4194 EXPECT_EQ(resource2->value, 3);
4195 // Check metric data.
4196 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4197 ::testing::ElementsAre(::testing::Pair(
4198 ::testing::Pair(kDefaultXdsServerUrl,
4199 XdsFooResourceType::Get()->type_url()),
4200 2)),
4201 ::testing::ElementsAre(), ::testing::_));
4202 EXPECT_THAT(
4203 GetResourceCounts(),
4204 ::testing::ElementsAre(
4205 ::testing::Pair(ResourceCountLabelsEq(
4206 XdsClient::kOldStyleAuthority,
4207 XdsFooResourceType::Get()->type_url(), "acked"),
4208 1),
4209 ::testing::Pair(
4210 ResourceCountLabelsEq(
4211 kAuthority, XdsFooResourceType::Get()->type_url(), "acked"),
4212 1)));
4213 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4214 kDefaultXdsServerUrl, true)));
4215 // Check CSDS data.
4216 csds = DumpCsds();
4217 EXPECT_THAT(
4218 csds.generic_xds_configs(),
4219 ::testing::UnorderedElementsAre(
4220 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(), "foo1",
4221 resource->AsJsonString(), "1",
4222 TimestampProtoEq(kTime0)),
4223 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4224 kXdstpResourceName, resource2->AsJsonString(), "2",
4225 TimestampProtoEq(kTime1))));
4226 // XdsClient should have sent an ACK message to the xDS server.
4227 request = WaitForRequest(stream.get());
4228 ASSERT_TRUE(request.has_value());
4229 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4230 /*version_info=*/"2", /*response_nonce=*/"B",
4231 /*error_detail=*/absl::OkStatus(),
4232 /*resource_names=*/{"foo1", kXdstpResourceName});
4233 // Cancel watch for "foo1".
4234 CancelFooWatch(watcher.get(), "foo1");
4235 // XdsClient should send an unsubscription request.
4236 request = WaitForRequest(stream.get());
4237 ASSERT_TRUE(request.has_value());
4238 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4239 /*version_info=*/"2", /*response_nonce=*/"B",
4240 /*error_detail=*/absl::OkStatus(),
4241 /*resource_names=*/{kXdstpResourceName});
4242 // Now cancel watch for xdstp resource name.
4243 CancelFooWatch(watcher2.get(), kXdstpResourceName);
4244 EXPECT_TRUE(stream->IsOrphaned());
4245 }
4246
TEST_F(XdsClientTest,FederationWithUnknownAuthority)4247 TEST_F(XdsClientTest, FederationWithUnknownAuthority) {
4248 constexpr char kAuthority[] = "xds.example.com";
4249 const std::string kXdstpResourceName = absl::StrCat(
4250 "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
4251 // Note: Not adding authority to bootstrap config.
4252 InitXdsClient();
4253 // Start a watch for the xdstp resource name.
4254 auto watcher = StartFooWatch(kXdstpResourceName);
4255 // Watcher should immediately get an error about the unknown authority.
4256 auto error = watcher->WaitForNextError();
4257 ASSERT_TRUE(error.has_value());
4258 EXPECT_EQ(error->code(), absl::StatusCode::kFailedPrecondition);
4259 EXPECT_EQ(error->message(),
4260 "authority \"xds.example.com\" not present in bootstrap config "
4261 "(node ID:xds_client_test)")
4262 << *error;
4263 }
4264
TEST_F(XdsClientTest,FederationWithUnparseableXdstpResourceName)4265 TEST_F(XdsClientTest, FederationWithUnparseableXdstpResourceName) {
4266 // Note: Not adding authority to bootstrap config.
4267 InitXdsClient();
4268 // Start a watch for the xdstp resource name.
4269 auto watcher = StartFooWatch("xdstp://x");
4270 // Watcher should immediately get an error about the unknown authority.
4271 auto error = watcher->WaitForNextError();
4272 ASSERT_TRUE(error.has_value());
4273 EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument);
4274 EXPECT_EQ(error->message(),
4275 "Unable to parse resource name xdstp://x "
4276 "(node ID:xds_client_test)")
4277 << *error;
4278 }
4279
4280 // TODO(roth,apolcyn): remove this test when the
4281 // GRPC_EXPERIMENTAL_XDS_FEDERATION env var is removed.
TEST_F(XdsClientTest,FederationDisabledWithNewStyleName)4282 TEST_F(XdsClientTest, FederationDisabledWithNewStyleName) {
4283 testing::ScopedEnvVar env_var("GRPC_EXPERIMENTAL_XDS_FEDERATION", "false");
4284 // We will use this xdstp name, whose authority is not present in
4285 // the bootstrap config. But since federation is not enabled, we
4286 // will treat this as an opaque old-style name, so we'll send it to
4287 // the default server.
4288 constexpr char kXdstpResourceName[] =
4289 "xdstp://xds.example.com/test.v3.foo/foo1";
4290 InitXdsClient();
4291 // Start a watch for the xdstp name.
4292 auto watcher = StartFooWatch(kXdstpResourceName);
4293 // Watcher should initially not see any resource reported.
4294 EXPECT_FALSE(watcher->HasEvent());
4295 // XdsClient should have created an ADS stream.
4296 auto stream = WaitForAdsStream();
4297 ASSERT_TRUE(stream != nullptr);
4298 // XdsClient should have sent a subscription request on the ADS stream.
4299 auto request = WaitForRequest(stream.get());
4300 ASSERT_TRUE(request.has_value());
4301 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4302 /*version_info=*/"", /*response_nonce=*/"",
4303 /*error_detail=*/absl::OkStatus(),
4304 /*resource_names=*/{kXdstpResourceName});
4305 CheckRequestNode(*request); // Should be present on the first request.
4306 // Send a response.
4307 stream->SendMessageToClient(
4308 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4309 .set_version_info("1")
4310 .set_nonce("A")
4311 .AddFooResource(XdsFooResource(kXdstpResourceName, 6))
4312 .Serialize());
4313 // XdsClient should have delivered the response to the watcher.
4314 auto resource = watcher->WaitForNextResource();
4315 ASSERT_NE(resource, nullptr);
4316 EXPECT_EQ(resource->name, kXdstpResourceName);
4317 EXPECT_EQ(resource->value, 6);
4318 // XdsClient should have sent an ACK message to the xDS server.
4319 request = WaitForRequest(stream.get());
4320 ASSERT_TRUE(request.has_value());
4321 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4322 /*version_info=*/"1", /*response_nonce=*/"A",
4323 /*error_detail=*/absl::OkStatus(),
4324 /*resource_names=*/{kXdstpResourceName});
4325 // Cancel watch.
4326 CancelFooWatch(watcher.get(), kXdstpResourceName);
4327 EXPECT_TRUE(stream->IsOrphaned());
4328 }
4329
TEST_F(XdsClientTest,FederationChannelFailureReportedToWatchers)4330 TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) {
4331 constexpr char kAuthority[] = "xds.example.com";
4332 const std::string kXdstpResourceName = absl::StrCat(
4333 "xdstp://", kAuthority, "/", XdsFooResource::TypeUrl(), "/foo2");
4334 FakeXdsBootstrap::FakeXdsServer authority_server("other_xds_server");
4335 FakeXdsBootstrap::FakeAuthority authority;
4336 authority.set_server(authority_server);
4337 InitXdsClient(
4338 FakeXdsBootstrap::Builder().AddAuthority(kAuthority, authority));
4339 // Start a watch for "foo1".
4340 auto watcher = StartFooWatch("foo1");
4341 // Watcher should initially not see any resource reported.
4342 EXPECT_FALSE(watcher->HasEvent());
4343 // XdsClient should have created an ADS stream to the top-level xDS server.
4344 auto stream = WaitForAdsStream(*xds_client_->bootstrap().servers().front());
4345 ASSERT_TRUE(stream != nullptr);
4346 // XdsClient should have sent a subscription request on the ADS stream.
4347 auto request = WaitForRequest(stream.get());
4348 ASSERT_TRUE(request.has_value());
4349 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4350 /*version_info=*/"", /*response_nonce=*/"",
4351 /*error_detail=*/absl::OkStatus(),
4352 /*resource_names=*/{"foo1"});
4353 CheckRequestNode(*request); // Should be present on the first request.
4354 // Send a response.
4355 stream->SendMessageToClient(
4356 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4357 .set_version_info("1")
4358 .set_nonce("A")
4359 .AddFooResource(XdsFooResource("foo1", 6))
4360 .Serialize());
4361 // XdsClient should have delivered the response to the watcher.
4362 auto resource = watcher->WaitForNextResource();
4363 ASSERT_NE(resource, nullptr);
4364 EXPECT_EQ(resource->name, "foo1");
4365 EXPECT_EQ(resource->value, 6);
4366 // Check metric data.
4367 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4368 ::testing::ElementsAre(::testing::Pair(
4369 ::testing::Pair(kDefaultXdsServerUrl,
4370 XdsFooResourceType::Get()->type_url()),
4371 1)),
4372 ::testing::ElementsAre(), ::testing::_));
4373 EXPECT_THAT(
4374 GetResourceCounts(),
4375 ::testing::ElementsAre(::testing::Pair(
4376 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
4377 XdsFooResourceType::Get()->type_url(), "acked"),
4378 1)));
4379 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4380 kDefaultXdsServerUrl, true)));
4381 // Check CSDS data.
4382 ClientConfig csds = DumpCsds();
4383 EXPECT_THAT(csds.generic_xds_configs(),
4384 ::testing::UnorderedElementsAre(CsdsResourceAcked(
4385 XdsFooResourceType::Get()->type_url(), "foo1",
4386 resource->AsJsonString(), "1", TimestampProtoEq(kTime0))));
4387 // XdsClient should have sent an ACK message to the xDS server.
4388 request = WaitForRequest(stream.get());
4389 ASSERT_TRUE(request.has_value());
4390 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4391 /*version_info=*/"1", /*response_nonce=*/"A",
4392 /*error_detail=*/absl::OkStatus(),
4393 /*resource_names=*/{"foo1"});
4394 // Start a watch for the xdstp resource name.
4395 auto watcher2 = StartFooWatch(kXdstpResourceName);
4396 // Check metric data.
4397 EXPECT_THAT(GetServerConnections(),
4398 ::testing::ElementsAre(
4399 ::testing::Pair(kDefaultXdsServerUrl, true),
4400 ::testing::Pair(authority_server.server_uri(), true)));
4401 // Watcher should initially not see any resource reported.
4402 EXPECT_FALSE(watcher2->HasEvent());
4403 // XdsClient will create a new stream to the server for this authority.
4404 auto stream2 = WaitForAdsStream(authority_server);
4405 ASSERT_TRUE(stream2 != nullptr);
4406 // XdsClient should have sent a subscription request on the ADS stream.
4407 // Note that version and nonce here do NOT use the values for Foo,
4408 // since each authority has its own state.
4409 request = WaitForRequest(stream2.get());
4410 ASSERT_TRUE(request.has_value());
4411 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4412 /*version_info=*/"", /*response_nonce=*/"",
4413 /*error_detail=*/absl::OkStatus(),
4414 /*resource_names=*/{kXdstpResourceName});
4415 CheckRequestNode(*request); // Should be present on the first request.
4416 // Send a response.
4417 // We increment time to make sure that the CSDS data gets a new timestamp.
4418 time_cache_.TestOnlySetNow(kTime1);
4419 stream2->SendMessageToClient(
4420 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4421 .set_version_info("2")
4422 .set_nonce("B")
4423 .AddFooResource(XdsFooResource(kXdstpResourceName, 3))
4424 .Serialize());
4425 // XdsClient should have delivered the response to the watcher.
4426 auto resource2 = watcher2->WaitForNextResource();
4427 ASSERT_NE(resource2, nullptr);
4428 EXPECT_EQ(resource2->name, kXdstpResourceName);
4429 EXPECT_EQ(resource2->value, 3);
4430 // Check metric data.
4431 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4432 ::testing::ElementsAre(
4433 ::testing::Pair(
4434 ::testing::Pair(kDefaultXdsServerUrl,
4435 XdsFooResourceType::Get()->type_url()),
4436 1),
4437 ::testing::Pair(
4438 ::testing::Pair(authority_server.server_uri(),
4439 XdsFooResourceType::Get()->type_url()),
4440 1)),
4441 ::testing::ElementsAre(), ::testing::_));
4442 EXPECT_THAT(
4443 GetResourceCounts(),
4444 ::testing::ElementsAre(
4445 ::testing::Pair(ResourceCountLabelsEq(
4446 XdsClient::kOldStyleAuthority,
4447 XdsFooResourceType::Get()->type_url(), "acked"),
4448 1),
4449 ::testing::Pair(
4450 ResourceCountLabelsEq(
4451 kAuthority, XdsFooResourceType::Get()->type_url(), "acked"),
4452 1)));
4453 EXPECT_THAT(GetServerConnections(),
4454 ::testing::ElementsAre(
4455 ::testing::Pair(kDefaultXdsServerUrl, true),
4456 ::testing::Pair(authority_server.server_uri(), true)));
4457 // Check CSDS data.
4458 csds = DumpCsds();
4459 EXPECT_THAT(
4460 csds.generic_xds_configs(),
4461 ::testing::UnorderedElementsAre(
4462 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(), "foo1",
4463 resource->AsJsonString(), "1",
4464 TimestampProtoEq(kTime0)),
4465 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4466 kXdstpResourceName, resource2->AsJsonString(), "2",
4467 TimestampProtoEq(kTime1))));
4468 // XdsClient should have sent an ACK message to the xDS server.
4469 request = WaitForRequest(stream2.get());
4470 ASSERT_TRUE(request.has_value());
4471 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4472 /*version_info=*/"2", /*response_nonce=*/"B",
4473 /*error_detail=*/absl::OkStatus(),
4474 /*resource_names=*/{kXdstpResourceName});
4475 // Now cause a channel failure on the stream to the authority's xDS server.
4476 TriggerConnectionFailure(authority_server,
4477 absl::UnavailableError("connection failed"));
4478 // The watcher for the xdstp resource name should see the error.
4479 auto error = watcher2->WaitForNextAmbientError();
4480 ASSERT_TRUE(error.has_value());
4481 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
4482 EXPECT_EQ(error->message(),
4483 "xDS channel for server other_xds_server: connection failed "
4484 "(node ID:xds_client_test)")
4485 << *error;
4486 // The watcher for "foo1" should not see any error.
4487 EXPECT_FALSE(watcher->HasEvent());
4488 // Check metric data.
4489 EXPECT_THAT(GetServerConnections(),
4490 ::testing::ElementsAre(
4491 ::testing::Pair(kDefaultXdsServerUrl, true),
4492 ::testing::Pair(authority_server.server_uri(), false)));
4493 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4494 ::testing::_, ::testing::_,
4495 ::testing::ElementsAre(
4496 ::testing::Pair(authority_server.server_uri(), 1))));
4497 // Cancel watch for "foo1".
4498 CancelFooWatch(watcher.get(), "foo1");
4499 EXPECT_TRUE(stream->IsOrphaned());
4500 // Now cancel watch for xdstp resource name.
4501 CancelFooWatch(watcher2.get(), kXdstpResourceName);
4502 EXPECT_TRUE(stream2->IsOrphaned());
4503 }
4504
TEST_F(XdsClientTest,AdsReadWaitsForHandleRelease)4505 TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) {
4506 InitXdsClient();
4507 // Start watches for "foo1" and "foo2".
4508 auto watcher1 = StartFooWatch("foo1");
4509 // XdsClient should have created an ADS stream.
4510 auto stream = WaitForAdsStream();
4511 ASSERT_TRUE(stream != nullptr);
4512 // XdsClient should have sent a subscription request on the ADS stream.
4513 auto request = WaitForRequest(stream.get());
4514 ASSERT_TRUE(request.has_value());
4515 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4516 /*version_info=*/"", /*response_nonce=*/"",
4517 /*error_detail=*/absl::OkStatus(),
4518 /*resource_names=*/{"foo1"});
4519 auto watcher2 = StartFooWatch("foo2");
4520 request = WaitForRequest(stream.get());
4521 ASSERT_TRUE(request.has_value());
4522 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4523 /*version_info=*/"", /*response_nonce=*/"",
4524 /*error_detail=*/absl::OkStatus(),
4525 /*resource_names=*/{"foo1", "foo2"});
4526 // Send a response with 2 resources.
4527 stream->SendMessageToClient(
4528 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4529 .set_version_info("1")
4530 .set_nonce("A")
4531 .AddFooResource(XdsFooResource("foo1", 6))
4532 .AddFooResource(XdsFooResource("foo2", 10))
4533 .Serialize());
4534 // Send a response with a single resource, will not be read until the handle
4535 // is released
4536 stream->SendMessageToClient(
4537 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4538 .set_version_info("2")
4539 .set_nonce("B")
4540 .AddFooResource(XdsFooResource("foo1", 8))
4541 .Serialize());
4542 // XdsClient should have delivered the response to the watcher.
4543 auto resource1 = watcher1->WaitForNextResourceAndHandle();
4544 ASSERT_NE(resource1, absl::nullopt);
4545 EXPECT_EQ(resource1->resource->name, "foo1");
4546 EXPECT_EQ(resource1->resource->value, 6);
4547 auto resource2 = watcher2->WaitForNextResourceAndHandle();
4548 ASSERT_NE(resource2, absl::nullopt);
4549 EXPECT_EQ(resource2->resource->name, "foo2");
4550 EXPECT_EQ(resource2->resource->value, 10);
4551 // XdsClient should have sent an ACK message to the xDS server.
4552 request = WaitForRequest(stream.get());
4553 ASSERT_TRUE(request.has_value());
4554 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4555 /*version_info=*/"1", /*response_nonce=*/"A",
4556 /*error_detail=*/absl::OkStatus(),
4557 /*resource_names=*/{"foo1", "foo2"});
4558 EXPECT_TRUE(stream->WaitForReadsStarted(1));
4559 resource1->read_delay_handle.reset();
4560 EXPECT_TRUE(stream->WaitForReadsStarted(1));
4561 resource2->read_delay_handle.reset();
4562 EXPECT_TRUE(stream->WaitForReadsStarted(2));
4563 resource1 = watcher1->WaitForNextResourceAndHandle();
4564 ASSERT_NE(resource1, absl::nullopt);
4565 EXPECT_EQ(resource1->resource->name, "foo1");
4566 EXPECT_EQ(resource1->resource->value, 8);
4567 EXPECT_EQ(watcher2->WaitForNextResourceAndHandle(), absl::nullopt);
4568 // XdsClient should have sent an ACK message to the xDS server.
4569 request = WaitForRequest(stream.get());
4570 ASSERT_TRUE(request.has_value());
4571 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4572 /*version_info=*/"2", /*response_nonce=*/"B",
4573 /*error_detail=*/absl::OkStatus(),
4574 /*resource_names=*/{"foo1", "foo2"});
4575 EXPECT_TRUE(stream->WaitForReadsStarted(2));
4576 resource1->read_delay_handle.reset();
4577 EXPECT_TRUE(stream->WaitForReadsStarted(3));
4578 // Cancel watch.
4579 CancelFooWatch(watcher1.get(), "foo1");
4580 request = WaitForRequest(stream.get());
4581 ASSERT_TRUE(request.has_value());
4582 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4583 /*version_info=*/"2", /*response_nonce=*/"B",
4584 /*error_detail=*/absl::OkStatus(),
4585 /*resource_names=*/{"foo2"});
4586 CancelFooWatch(watcher2.get(), "foo2");
4587 EXPECT_TRUE(stream->IsOrphaned());
4588 }
4589
TEST_F(XdsClientTest,FallbackAndRecover)4590 TEST_F(XdsClientTest, FallbackAndRecover) {
4591 FakeXdsBootstrap::FakeXdsServer primary_server(kDefaultXdsServerUrl);
4592 FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server");
4593 // Regular operation
4594 InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
4595 {primary_server, fallback_server}));
4596 // Start a watch for "foo1".
4597 auto watcher = StartFooWatch("foo1");
4598 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4599 kDefaultXdsServerUrl, true)));
4600 EXPECT_THAT(GetResourceCounts(),
4601 ::testing::ElementsAre(::testing::Pair(
4602 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
4603 XdsFooResourceType::Get()->type_url(),
4604 "requested"),
4605 1)));
4606 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4607 ::testing::IsEmpty(), ::testing::_, ::testing::ElementsAre()));
4608 // CSDS should show that the resource has been requested.
4609 ClientConfig csds = DumpCsds();
4610 EXPECT_THAT(csds.generic_xds_configs(),
4611 ::testing::ElementsAre(CsdsResourceRequested(
4612 XdsFooResourceType::Get()->type_url(), "foo1")));
4613 // XdsClient should have created an ADS stream.
4614 auto stream = WaitForAdsStream();
4615 ASSERT_TRUE(stream != nullptr);
4616 // XdsClient should have sent a subscription request on the ADS stream.
4617 auto request = WaitForRequest(stream.get());
4618 ASSERT_TRUE(request.has_value());
4619 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4620 /*version_info=*/"", /*response_nonce=*/"",
4621 /*error_detail=*/absl::OkStatus(),
4622 /*resource_names=*/{"foo1"});
4623 // Input: Get initial response from primary server.
4624 stream->SendMessageToClient(
4625 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4626 .set_version_info("20")
4627 .set_nonce("O")
4628 .AddFooResource(XdsFooResource("foo1", 6))
4629 .Serialize());
4630 // Result (local): Resource is delivered to watcher.
4631 auto resource = watcher->WaitForNextResource();
4632 ASSERT_NE(resource, nullptr);
4633 EXPECT_EQ(resource->name, "foo1");
4634 EXPECT_EQ(resource->value, 6);
4635 // Result (local): Metrics show 1 resource update and 1 cached resource.
4636 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4637 ::testing::ElementsAre(::testing::Pair(
4638 ::testing::Pair(kDefaultXdsServerUrl,
4639 XdsFooResourceType::Get()->type_url()),
4640 1)),
4641 ::testing::_, ::testing::_));
4642 EXPECT_THAT(
4643 GetResourceCounts(),
4644 ::testing::ElementsAre(::testing::Pair(
4645 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
4646 XdsFooResourceType::Get()->type_url(), "acked"),
4647 1)));
4648 // Check CSDS data.
4649 csds = DumpCsds();
4650 EXPECT_THAT(csds.generic_xds_configs(),
4651 ::testing::UnorderedElementsAre(CsdsResourceAcked(
4652 XdsFooResourceType::Get()->type_url(), "foo1",
4653 resource->AsJsonString(), "20", TimestampProtoEq(kTime0))));
4654 // Result (remote): Client sends ACK to server.
4655 request = WaitForRequest(stream.get());
4656 ASSERT_TRUE(request.has_value());
4657 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4658 /*version_info=*/"20", /*response_nonce=*/"O",
4659 /*error_detail=*/absl::OkStatus(),
4660 /*resource_names=*/{"foo1"});
4661 // Input: Trigger connection failure to primary.
4662 TriggerConnectionFailure(primary_server,
4663 absl::UnavailableError("Server down"));
4664 // Result (local): The error is reported to the watcher.
4665 auto error = watcher->WaitForNextAmbientError();
4666 ASSERT_TRUE(error.has_value());
4667 EXPECT_EQ(error->code(), absl::StatusCode::kUnavailable);
4668 EXPECT_EQ(error->message(),
4669 "xDS channel for server default_xds_server: Server down (node "
4670 "ID:xds_client_test)");
4671 // Result (local): The metrics show the channel as being unhealthy.
4672 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4673 kDefaultXdsServerUrl, false)));
4674 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4675 ::testing::_, ::testing::_,
4676 ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
4677 // Input: Trigger stream failure.
4678 stream->MaybeSendStatusToClient(absl::UnavailableError("Stream failure"));
4679 // Result (local): The metrics still show the channel as being unhealthy.
4680 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4681 kDefaultXdsServerUrl, false)));
4682 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4683 ::testing::_, ::testing::_,
4684 ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
4685 // Result (remote): The client starts a new stream and sends a subscription
4686 // message. Note that the server does not respond, so the channel will still
4687 // have non-OK status.
4688 stream = WaitForAdsStream();
4689 ASSERT_NE(stream, nullptr);
4690 request = WaitForRequest(stream.get());
4691 ASSERT_TRUE(request.has_value());
4692 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4693 /*version_info=*/"20", /*response_nonce=*/"",
4694 /*error_detail=*/absl::OkStatus(),
4695 /*resource_names=*/{"foo1"});
4696 // Input: Start second watch for foo1 (already cached).
4697 auto watcher_cached = StartFooWatch("foo1");
4698 // Result (local): New watcher gets the cached resource.
4699 resource = watcher_cached->WaitForNextResource();
4700 ASSERT_NE(resource, nullptr);
4701 EXPECT_EQ(resource->name, "foo1");
4702 EXPECT_EQ(resource->value, 6);
4703 // Result (local): New watcher gets the error from the channel state.
4704 error = watcher_cached->WaitForNextAmbientError();
4705 ASSERT_TRUE(error.has_value());
4706 EXPECT_EQ(error->message(),
4707 "xDS channel for server default_xds_server: Server down (node "
4708 "ID:xds_client_test)")
4709 << error->message();
4710 CancelFooWatch(watcher_cached.get(), "foo1");
4711 // Input: Start watch for foo2 (not already cached).
4712 auto watcher2 = StartFooWatch("foo2");
4713 // Result (local): Metrics show a healthy channel to the fallback server.
4714 EXPECT_THAT(GetServerConnections(),
4715 ::testing::ElementsAre(
4716 ::testing::Pair(kDefaultXdsServerUrl, false),
4717 ::testing::Pair(fallback_server.server_uri(), true)));
4718 // Result (remote): Client sent a new request for both resources on the
4719 // stream to the primary.
4720 request = WaitForRequest(stream.get());
4721 ASSERT_TRUE(request.has_value());
4722 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4723 /*version_info=*/"20", /*response_nonce=*/"",
4724 /*error_detail=*/absl::OkStatus(),
4725 /*resource_names=*/{"foo1", "foo2"});
4726 // Result (remote): Client created a stream to the fallback server and sent a
4727 // request on that stream for both resources.
4728 auto stream2 = WaitForAdsStream(fallback_server);
4729 ASSERT_TRUE(stream2 != nullptr);
4730 request = WaitForRequest(stream2.get());
4731 ASSERT_TRUE(request.has_value());
4732 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4733 /*version_info=*/"", /*response_nonce=*/"",
4734 /*error_detail=*/absl::OkStatus(),
4735 /*resource_names=*/{"foo1", "foo2"});
4736 // Input: Fallback server sends a response with both resources.
4737 // We increment time to make sure that the CSDS data gets a new timestamp.
4738 time_cache_.TestOnlySetNow(kTime1);
4739 stream2->SendMessageToClient(
4740 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4741 .set_version_info("5")
4742 .set_nonce("A")
4743 .AddFooResource(XdsFooResource("foo1", 20))
4744 .AddFooResource(XdsFooResource("foo2", 30))
4745 .Serialize());
4746 // Result (local): Resources are delivered to watchers.
4747 resource = watcher->WaitForNextResource();
4748 ASSERT_NE(resource, nullptr);
4749 EXPECT_EQ(resource->name, "foo1");
4750 EXPECT_EQ(resource->value, 20);
4751 auto resource2 = watcher2->WaitForNextResource();
4752 ASSERT_NE(resource2, nullptr);
4753 EXPECT_EQ(resource2->name, "foo2");
4754 EXPECT_EQ(resource2->value, 30);
4755 // Result (local): Metrics show an update from fallback server.
4756 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4757 ::testing::ElementsAre(
4758 ::testing::Pair(
4759 ::testing::Pair(kDefaultXdsServerUrl,
4760 XdsFooResourceType::Get()->type_url()),
4761 1),
4762 ::testing::Pair(
4763 ::testing::Pair(fallback_server.server_uri(),
4764 XdsFooResourceType::Get()->type_url()),
4765 2)),
4766 ::testing::_, ::testing::_));
4767 EXPECT_THAT(GetServerConnections(),
4768 ::testing::ElementsAre(
4769 ::testing::Pair(kDefaultXdsServerUrl, false),
4770 ::testing::Pair(fallback_server.server_uri(), true)));
4771 EXPECT_THAT(
4772 GetResourceCounts(),
4773 ::testing::ElementsAre(::testing::Pair(
4774 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
4775 XdsFooResourceType::Get()->type_url(), "acked"),
4776 2)));
4777 // Check CSDS data.
4778 csds = DumpCsds();
4779 EXPECT_THAT(csds.generic_xds_configs(),
4780 ::testing::UnorderedElementsAre(
4781 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4782 "foo1", resource->AsJsonString(), "5",
4783 TimestampProtoEq(kTime1)),
4784 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4785 "foo2", resource2->AsJsonString(), "5",
4786 TimestampProtoEq(kTime1))));
4787 // Result (remote): Client sends ACK to fallback server.
4788 request = WaitForRequest(stream2.get());
4789 ASSERT_TRUE(request.has_value());
4790 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4791 /*version_info=*/"5", /*response_nonce=*/"A",
4792 /*error_detail=*/absl::OkStatus(),
4793 /*resource_names=*/{"foo1", "foo2"});
4794 // Input: Primary server sends a response containing both resources.
4795 // We increment time to make sure that the CSDS data gets a new timestamp.
4796 time_cache_.TestOnlySetNow(kTime2);
4797 stream->SendMessageToClient(
4798 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4799 .set_version_info("15")
4800 .set_nonce("B")
4801 .AddFooResource(XdsFooResource("foo1", 35))
4802 .AddFooResource(XdsFooResource("foo2", 25))
4803 .Serialize());
4804 // Result (local): Resources are delivered to watchers.
4805 resource = watcher->WaitForNextResource();
4806 ASSERT_NE(resource, nullptr);
4807 EXPECT_EQ(resource->name, "foo1");
4808 EXPECT_EQ(resource->value, 35);
4809 resource2 = watcher2->WaitForNextResource();
4810 ASSERT_NE(resource2, nullptr);
4811 EXPECT_EQ(resource2->name, "foo2");
4812 EXPECT_EQ(resource2->value, 25);
4813 // Result (local): Metrics show that we've closed the channel to the fallback
4814 // server and received resource updates from the primary server.
4815 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4816 ::testing::ElementsAre(
4817 ::testing::Pair(
4818 ::testing::Pair(kDefaultXdsServerUrl,
4819 XdsFooResourceType::Get()->type_url()),
4820 3),
4821 ::testing::Pair(
4822 ::testing::Pair(fallback_server.server_uri(),
4823 XdsFooResourceType::Get()->type_url()),
4824 2)),
4825 ::testing::_,
4826 ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
4827 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4828 kDefaultXdsServerUrl, true)));
4829 // Check CSDS data.
4830 csds = DumpCsds();
4831 EXPECT_THAT(csds.generic_xds_configs(),
4832 ::testing::UnorderedElementsAre(
4833 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4834 "foo1", resource->AsJsonString(), "15",
4835 TimestampProtoEq(kTime2)),
4836 CsdsResourceAcked(XdsFooResourceType::Get()->type_url(),
4837 "foo2", resource2->AsJsonString(), "15",
4838 TimestampProtoEq(kTime2))));
4839 // Result (remote): The stream to the fallback server has been orphaned.
4840 EXPECT_TRUE(stream2->IsOrphaned());
4841 // Result (remote): Client sends ACK to server.
4842 request = WaitForRequest(stream.get());
4843 ASSERT_TRUE(request.has_value());
4844 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4845 /*version_info=*/"15", /*response_nonce=*/"B",
4846 /*error_detail=*/absl::OkStatus(),
4847 /*resource_names=*/{"foo1", "foo2"});
4848 // Clean up.
4849 CancelFooWatch(watcher.get(), "foo1", /*delay_unsubscription=*/true);
4850 CancelFooWatch(watcher2.get(), "foo2");
4851 // Result (remote): The stream to the primary server has been orphaned.
4852 EXPECT_TRUE(stream->IsOrphaned());
4853 }
4854
4855 // Test for both servers being unavailable
TEST_F(XdsClientTest,FallbackReportsError)4856 TEST_F(XdsClientTest, FallbackReportsError) {
4857 FakeXdsBootstrap::FakeXdsServer primary_server(kDefaultXdsServerUrl);
4858 FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server");
4859 InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
4860 {primary_server, fallback_server}));
4861 auto watcher = StartFooWatch("foo1");
4862 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4863 kDefaultXdsServerUrl, true)));
4864 auto stream = WaitForAdsStream();
4865 ASSERT_TRUE(stream != nullptr);
4866 // XdsClient should have sent a subscription request on the ADS stream.
4867 auto request = WaitForRequest(stream.get());
4868 ASSERT_TRUE(request.has_value());
4869 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4870 /*version_info=*/"", /*response_nonce=*/"",
4871 /*error_detail=*/absl::OkStatus(),
4872 /*resource_names=*/{"foo1"});
4873 EXPECT_THAT(GetResourceCounts(),
4874 ::testing::ElementsAre(::testing::Pair(
4875 ResourceCountLabelsEq(XdsClient::kOldStyleAuthority,
4876 XdsFooResourceType::Get()->type_url(),
4877 "requested"),
4878 1)));
4879 TriggerConnectionFailure(primary_server,
4880 absl::UnavailableError("Server down"));
4881 EXPECT_THAT(GetServerConnections(),
4882 ::testing::ElementsAre(
4883 ::testing::Pair(kDefaultXdsServerUrl, false),
4884 ::testing::Pair(fallback_server.server_uri(), true)));
4885 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4886 ::testing::_, ::testing::_,
4887 ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
4888 // CSDS should show that the resource has been requested.
4889 ClientConfig csds = DumpCsds();
4890 EXPECT_THAT(csds.generic_xds_configs(),
4891 ::testing::ElementsAre(CsdsResourceRequested(
4892 XdsFooResourceType::Get()->type_url(), "foo1")));
4893 // Fallback happens now
4894 stream = WaitForAdsStream(fallback_server);
4895 ASSERT_NE(stream, nullptr);
4896 request = WaitForRequest(stream.get());
4897 ASSERT_TRUE(request.has_value());
4898 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4899 /*version_info=*/"", /*response_nonce=*/"",
4900 /*error_detail=*/absl::OkStatus(),
4901 /*resource_names=*/{"foo1"});
4902 TriggerConnectionFailure(fallback_server,
4903 absl::UnavailableError("Another server down"));
4904 EXPECT_THAT(GetServerConnections(),
4905 ::testing::ElementsAre(
4906 ::testing::Pair(kDefaultXdsServerUrl, false),
4907 ::testing::Pair(fallback_server.server_uri(), false)));
4908 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4909 ::testing::_, ::testing::_,
4910 ::testing::ElementsAre(
4911 ::testing::Pair(kDefaultXdsServerUrl, 1),
4912 ::testing::Pair(fallback_server.server_uri(), 1))));
4913 csds = DumpCsds();
4914 EXPECT_THAT(csds.generic_xds_configs(),
4915 ::testing::ElementsAre(CsdsResourceRequested(
4916 XdsFooResourceType::Get()->type_url(), "foo1")));
4917 auto error = watcher->WaitForNextError();
4918 ASSERT_TRUE(error.has_value());
4919 EXPECT_THAT(error->code(), absl::StatusCode::kUnavailable);
4920 EXPECT_EQ(error->message(),
4921 "xDS channel for server fallback_xds_server: Another server down "
4922 "(node ID:xds_client_test)")
4923 << error->message();
4924 }
4925
TEST_F(XdsClientTest,FallbackOnStartup)4926 TEST_F(XdsClientTest, FallbackOnStartup) {
4927 FakeXdsBootstrap::FakeXdsServer primary_server;
4928 FakeXdsBootstrap::FakeXdsServer fallback_server("fallback_xds_server");
4929 // Regular operation
4930 InitXdsClient(FakeXdsBootstrap::Builder().SetServers(
4931 {primary_server, fallback_server}));
4932 // Start a watch for "foo1".
4933 auto watcher = StartFooWatch("foo1");
4934 auto primary_stream = WaitForAdsStream(primary_server);
4935 ASSERT_NE(primary_stream, nullptr);
4936 // XdsClient should have sent a subscription request on the ADS stream.
4937 auto request = WaitForRequest(primary_stream.get());
4938 ASSERT_TRUE(request.has_value());
4939 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4940 /*version_info=*/"", /*response_nonce=*/"",
4941 /*error_detail=*/absl::OkStatus(),
4942 /*resource_names=*/{"foo1"});
4943 TriggerConnectionFailure(primary_server,
4944 absl::UnavailableError("Primary server is down"));
4945 // XdsClient should have created an ADS stream.
4946 auto fallback_stream = WaitForAdsStream(fallback_server);
4947 ASSERT_NE(fallback_stream, nullptr);
4948 // XdsClient should have sent a subscription request on the ADS stream.
4949 request = WaitForRequest(fallback_stream.get());
4950 ASSERT_TRUE(request.has_value());
4951 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4952 /*version_info=*/"", /*response_nonce=*/"",
4953 /*error_detail=*/absl::OkStatus(),
4954 /*resource_names=*/{"foo1"});
4955 // Send a response.
4956 fallback_stream->SendMessageToClient(
4957 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4958 .set_version_info("1")
4959 .set_nonce("A")
4960 .AddFooResource(XdsFooResource("foo1", 6))
4961 .Serialize());
4962 EXPECT_THAT(GetServerConnections(),
4963 ::testing::ElementsAre(
4964 ::testing::Pair(kDefaultXdsServerUrl, false),
4965 ::testing::Pair(fallback_server.server_uri(), true)));
4966 EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData(
4967 ::testing::_, ::testing::_,
4968 ::testing::ElementsAre(::testing::Pair(kDefaultXdsServerUrl, 1))));
4969 // XdsClient should have delivered the response to the watcher.
4970 auto resource = watcher->WaitForNextResource();
4971 ASSERT_NE(resource, nullptr);
4972 EXPECT_EQ(resource->name, "foo1");
4973 EXPECT_EQ(resource->value, 6);
4974 // Client sends an ACK.
4975 request = WaitForRequest(fallback_stream.get());
4976 ASSERT_TRUE(request.has_value());
4977 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4978 /*version_info=*/"1", /*response_nonce=*/"A",
4979 /*error_detail=*/absl::OkStatus(),
4980 /*resource_names=*/{"foo1"});
4981 // Recover to primary
4982 primary_stream->SendMessageToClient(
4983 ResponseBuilder(XdsFooResourceType::Get()->type_url())
4984 .set_version_info("5")
4985 .set_nonce("D")
4986 .AddFooResource(XdsFooResource("foo1", 42))
4987 .Serialize());
4988 EXPECT_TRUE(fallback_stream->IsOrphaned());
4989 resource = watcher->WaitForNextResource();
4990 ASSERT_NE(resource, nullptr);
4991 EXPECT_EQ(resource->name, "foo1");
4992 EXPECT_EQ(resource->value, 42);
4993 EXPECT_THAT(GetServerConnections(), ::testing::ElementsAre(::testing::Pair(
4994 kDefaultXdsServerUrl, true)));
4995 request = WaitForRequest(primary_stream.get());
4996 ASSERT_TRUE(request.has_value());
4997 CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
4998 /*version_info=*/"5", /*response_nonce=*/"D",
4999 /*error_detail=*/absl::OkStatus(),
5000 /*resource_names=*/{"foo1"});
5001 }
5002
5003 } // namespace
5004 } // namespace testing
5005 } // namespace grpc_core
5006
main(int argc,char ** argv)5007 int main(int argc, char** argv) {
5008 ::testing::InitGoogleTest(&argc, argv);
5009 grpc::testing::TestEnvironment env(&argc, argv);
5010 return RUN_ALL_TESTS();
5011 }
5012