1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/xds/xds_client/xds_client.h"
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/support/port_platform.h>
21 #include <inttypes.h>
22 #include <string.h>
23
24 #include <algorithm>
25 #include <functional>
26 #include <memory>
27 #include <string>
28 #include <type_traits>
29 #include <vector>
30
31 #include "absl/cleanup/cleanup.h"
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "absl/strings/match.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/strings/str_join.h"
37 #include "absl/strings/str_split.h"
38 #include "absl/strings/string_view.h"
39 #include "absl/strings/strip.h"
40 #include "absl/types/optional.h"
41 #include "envoy/config/core/v3/base.upb.h"
42 #include "envoy/service/discovery/v3/discovery.upb.h"
43 #include "envoy/service/discovery/v3/discovery.upbdefs.h"
44 #include "google/protobuf/any.upb.h"
45 #include "google/protobuf/timestamp.upb.h"
46 #include "google/rpc/status.upb.h"
47 #include "src/core/lib/iomgr/exec_ctx.h"
48 #include "src/core/util/backoff.h"
49 #include "src/core/util/debug_location.h"
50 #include "src/core/util/orphanable.h"
51 #include "src/core/util/ref_counted_ptr.h"
52 #include "src/core/util/sync.h"
53 #include "src/core/util/upb_utils.h"
54 #include "src/core/util/uri.h"
55 #include "src/core/xds/xds_client/xds_api.h"
56 #include "src/core/xds/xds_client/xds_bootstrap.h"
57 #include "src/core/xds/xds_client/xds_locality.h"
58 #include "upb/base/string_view.h"
59 #include "upb/mem/arena.h"
60 #include "upb/reflection/def.h"
61 #include "upb/text/encode.h"
62
63 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
64 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
65 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
66 #define GRPC_XDS_RECONNECT_JITTER 0.2
67 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
68
69 namespace grpc_core {
70
71 using ::grpc_event_engine::experimental::EventEngine;
72
73 //
74 // Internal class declarations
75 //
76
77 // An xds call wrapper that can restart a call upon failure. Holds a ref to
78 // the xds channel. The template parameter is the kind of wrapped xds call.
79 // TODO(roth): This is basically the same code as in LrsClient, and
80 // probably very similar to many other places in the codebase.
81 // Consider refactoring this into a common utility library somehow.
82 template <typename T>
83 class XdsClient::XdsChannel::RetryableCall final
84 : public InternallyRefCounted<RetryableCall<T>> {
85 public:
86 explicit RetryableCall(WeakRefCountedPtr<XdsChannel> xds_channel);
87
88 // Disable thread-safety analysis because this method is called via
89 // OrphanablePtr<>, but there's no way to pass the lock annotation
90 // through there.
91 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
92
93 void OnCallFinishedLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
94
call() const95 T* call() const { return call_.get(); }
xds_channel() const96 XdsChannel* xds_channel() const { return xds_channel_.get(); }
97
98 bool IsCurrentCallOnChannel() const;
99
100 private:
101 void StartNewCallLocked();
102 void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
103
104 void OnRetryTimer();
105
106 // The wrapped xds call that talks to the xds server. It's instantiated
107 // every time we start a new call. It's null during call retry backoff.
108 OrphanablePtr<T> call_;
109 // The owning xds channel.
110 WeakRefCountedPtr<XdsChannel> xds_channel_;
111
112 // Retry state.
113 BackOff backoff_;
114 absl::optional<EventEngine::TaskHandle> timer_handle_
115 ABSL_GUARDED_BY(&XdsClient::mu_);
116
117 bool shutting_down_ = false;
118 };
119
120 // Contains an ADS call to the xds server.
121 class XdsClient::XdsChannel::AdsCall final
122 : public InternallyRefCounted<AdsCall> {
123 public:
124 // The ctor and dtor should not be used directly.
125 explicit AdsCall(RefCountedPtr<RetryableCall<AdsCall>> retryable_call);
126
127 void Orphan() override;
128
retryable_call() const129 RetryableCall<AdsCall>* retryable_call() const {
130 return retryable_call_.get();
131 }
xds_channel() const132 XdsChannel* xds_channel() const { return retryable_call_->xds_channel(); }
xds_client() const133 XdsClient* xds_client() const { return xds_channel()->xds_client(); }
seen_response() const134 bool seen_response() const { return seen_response_; }
135
136 void SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name,
137 bool delay_send)
138 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
139 void UnsubscribeLocked(const XdsResourceType* type,
140 const XdsResourceName& name, bool delay_unsubscription)
141 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
142
143 bool HasSubscribedResources() const;
144
145 private:
146 class AdsReadDelayHandle;
147
148 class ResourceTimer final : public InternallyRefCounted<ResourceTimer> {
149 public:
ResourceTimer(const XdsResourceType * type,const XdsResourceName & name)150 ResourceTimer(const XdsResourceType* type, const XdsResourceName& name)
151 : type_(type), name_(name) {}
152
153 // Disable thread-safety analysis because this method is called via
154 // OrphanablePtr<>, but there's no way to pass the lock annotation
155 // through there.
Orphan()156 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS {
157 MaybeCancelTimer();
158 Unref(DEBUG_LOCATION, "Orphan");
159 }
160
MarkSubscriptionSendStarted()161 void MarkSubscriptionSendStarted()
162 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
163 subscription_sent_ = true;
164 }
165
MaybeMarkSubscriptionSendComplete(RefCountedPtr<AdsCall> ads_call)166 void MaybeMarkSubscriptionSendComplete(RefCountedPtr<AdsCall> ads_call)
167 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
168 if (subscription_sent_) MaybeStartTimer(std::move(ads_call));
169 }
170
MarkSeen()171 void MarkSeen() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
172 resource_seen_ = true;
173 MaybeCancelTimer();
174 }
175
MaybeCancelTimer()176 void MaybeCancelTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
177 if (timer_handle_.has_value() &&
178 ads_call_->xds_client()->engine()->Cancel(*timer_handle_)) {
179 timer_handle_.reset();
180 ads_call_.reset();
181 }
182 }
183
184 private:
MaybeStartTimer(RefCountedPtr<AdsCall> ads_call)185 void MaybeStartTimer(RefCountedPtr<AdsCall> ads_call)
186 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
187 // Don't start timer if we've already either seen the resource or
188 // marked it as non-existing.
189 // Note: There are edge cases where we can have seen the resource
190 // before we have sent the initial subscription request, such as
191 // when we unsubscribe and then resubscribe to a given resource
192 // and then get a response containing that resource, all while a
193 // send_message op is in flight.
194 if (resource_seen_) return;
195 // Don't start timer if we haven't yet sent the initial subscription
196 // request for the resource.
197 if (!subscription_sent_) return;
198 // Don't start timer if it's already running.
199 if (timer_handle_.has_value()) return;
200 // Check if we already have a cached version of this resource
201 // (i.e., if this is the initial request for the resource after an
202 // ADS stream restart). If so, we don't start the timer, because
203 // (a) we already have the resource and (b) the server may
204 // optimize by not resending the resource that we already have.
205 auto& authority_state =
206 ads_call->xds_client()->authority_state_map_[name_.authority];
207 ResourceState& state = authority_state.resource_map[type_][name_.key];
208 if (state.HasResource()) return;
209 // Start timer.
210 ads_call_ = std::move(ads_call);
211 timer_handle_ = ads_call_->xds_client()->engine()->RunAfter(
212 ads_call_->xds_client()->request_timeout_,
213 [self = Ref(DEBUG_LOCATION, "timer")]() {
214 ApplicationCallbackExecCtx callback_exec_ctx;
215 ExecCtx exec_ctx;
216 self->OnTimer();
217 });
218 }
219
OnTimer()220 void OnTimer() {
221 {
222 MutexLock lock(&ads_call_->xds_client()->mu_);
223 timer_handle_.reset();
224 auto& authority_state =
225 ads_call_->xds_client()->authority_state_map_[name_.authority];
226 ResourceState& state = authority_state.resource_map[type_][name_.key];
227 // We might have received the resource after the timer fired but before
228 // the callback ran.
229 if (!state.HasResource()) {
230 GRPC_TRACE_LOG(xds_client, INFO)
231 << "[xds_client " << ads_call_->xds_client() << "] xds server "
232 << ads_call_->xds_channel()->server_.server_uri()
233 << ": timeout obtaining resource {type=" << type_->type_url()
234 << " name="
235 << XdsClient::ConstructFullXdsResourceName(
236 name_.authority, type_->type_url(), name_.key)
237 << "} from xds server";
238 resource_seen_ = true;
239 state.SetDoesNotExist();
240 ads_call_->xds_client()->NotifyWatchersOnResourceChanged(
241 absl::NotFoundError("does not exist"), state.watchers(),
242 ReadDelayHandle::NoWait());
243 }
244 }
245 ads_call_->xds_client()->work_serializer_.DrainQueue();
246 ads_call_.reset();
247 }
248
249 const XdsResourceType* type_;
250 const XdsResourceName name_;
251
252 RefCountedPtr<AdsCall> ads_call_;
253 // True if we have sent the initial subscription request for this
254 // resource on this ADS stream.
255 bool subscription_sent_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
256 // True if we have either (a) seen the resource in a response on this
257 // stream or (b) declared the resource to not exist due to the timer
258 // firing.
259 bool resource_seen_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
260 absl::optional<EventEngine::TaskHandle> timer_handle_
261 ABSL_GUARDED_BY(&XdsClient::mu_);
262 };
263
264 class StreamEventHandler final
265 : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler {
266 public:
StreamEventHandler(RefCountedPtr<AdsCall> ads_call)267 explicit StreamEventHandler(RefCountedPtr<AdsCall> ads_call)
268 : ads_call_(std::move(ads_call)) {}
269
OnRequestSent(bool ok)270 void OnRequestSent(bool ok) override { ads_call_->OnRequestSent(ok); }
OnRecvMessage(absl::string_view payload)271 void OnRecvMessage(absl::string_view payload) override {
272 ads_call_->OnRecvMessage(payload);
273 }
OnStatusReceived(absl::Status status)274 void OnStatusReceived(absl::Status status) override {
275 ads_call_->OnStatusReceived(std::move(status));
276 }
277
278 private:
279 RefCountedPtr<AdsCall> ads_call_;
280 };
281
282 struct ResourceTypeState {
283 // Nonce and status for this resource type.
284 std::string nonce;
285 absl::Status status;
286
287 // Subscribed resources of this type.
288 std::map<std::string /*authority*/,
289 std::map<XdsResourceKey, OrphanablePtr<ResourceTimer>>>
290 subscribed_resources;
291 };
292
293 std::string CreateAdsRequest(absl::string_view type_url,
294 absl::string_view version,
295 absl::string_view nonce,
296 const std::vector<std::string>& resource_names,
297 absl::Status status) const
298 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
299
300 void SendMessageLocked(const XdsResourceType* type)
301 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
302
303 struct DecodeContext {
304 upb::Arena arena;
305 const XdsResourceType* type;
306 std::string type_url;
307 std::string version;
308 std::string nonce;
309 std::vector<std::string> errors;
310 std::map<std::string /*authority*/, std::set<XdsResourceKey>>
311 resources_seen;
312 uint64_t num_valid_resources = 0;
313 uint64_t num_invalid_resources = 0;
314 Timestamp update_time = Timestamp::Now();
315 RefCountedPtr<ReadDelayHandle> read_delay_handle;
316 };
317 void ParseResource(size_t idx, absl::string_view type_url,
318 absl::string_view resource_name,
319 absl::string_view serialized_resource,
320 DecodeContext* context)
321 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
322 absl::Status DecodeAdsResponse(absl::string_view encoded_response,
323 DecodeContext* context)
324 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
325
326 void OnRequestSent(bool ok);
327 void OnRecvMessage(absl::string_view payload);
328 void OnStatusReceived(absl::Status status);
329
330 bool IsCurrentCallOnChannel() const;
331
332 // Constructs a list of resource names of a given type for an ADS
333 // request. Also starts the timer for each resource if needed.
334 std::vector<std::string> ResourceNamesForRequest(const XdsResourceType* type)
335 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
336
337 // The owning RetryableCall<>.
338 RefCountedPtr<RetryableCall<AdsCall>> retryable_call_;
339
340 OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
341 streaming_call_;
342
343 bool sent_initial_message_ = false;
344 bool seen_response_ = false;
345
346 const XdsResourceType* send_message_pending_
347 ABSL_GUARDED_BY(&XdsClient::mu_) = nullptr;
348
349 // Resource types for which requests need to be sent.
350 std::set<const XdsResourceType*> buffered_requests_;
351
352 // State for each resource type.
353 std::map<const XdsResourceType*, ResourceTypeState> state_map_;
354 };
355
356 //
357 // XdsClient::XdsChannel::ConnectivityFailureWatcher
358 //
359
360 class XdsClient::XdsChannel::ConnectivityFailureWatcher
361 : public XdsTransportFactory::XdsTransport::ConnectivityFailureWatcher {
362 public:
ConnectivityFailureWatcher(WeakRefCountedPtr<XdsChannel> xds_channel)363 explicit ConnectivityFailureWatcher(WeakRefCountedPtr<XdsChannel> xds_channel)
364 : xds_channel_(std::move(xds_channel)) {}
365
OnConnectivityFailure(absl::Status status)366 void OnConnectivityFailure(absl::Status status) override {
367 xds_channel_->OnConnectivityFailure(std::move(status));
368 }
369
370 private:
371 WeakRefCountedPtr<XdsChannel> xds_channel_;
372 };
373
374 //
375 // XdsClient::XdsChannel
376 //
377
XdsChannel(WeakRefCountedPtr<XdsClient> xds_client,const XdsBootstrap::XdsServer & server)378 XdsClient::XdsChannel::XdsChannel(WeakRefCountedPtr<XdsClient> xds_client,
379 const XdsBootstrap::XdsServer& server)
380 : DualRefCounted<XdsChannel>(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
381 ? "XdsChannel"
382 : nullptr),
383 xds_client_(std::move(xds_client)),
384 server_(server) {
385 GRPC_TRACE_LOG(xds_client, INFO)
386 << "[xds_client " << xds_client_.get() << "] creating channel " << this
387 << " for server " << server.server_uri();
388 absl::Status status;
389 transport_ = xds_client_->transport_factory_->GetTransport(server, &status);
390 CHECK(transport_ != nullptr);
391 if (!status.ok()) {
392 SetChannelStatusLocked(std::move(status));
393 } else {
394 failure_watcher_ = MakeRefCounted<ConnectivityFailureWatcher>(
395 WeakRef(DEBUG_LOCATION, "OnConnectivityFailure"));
396 transport_->StartConnectivityFailureWatch(failure_watcher_);
397 }
398 }
399
~XdsChannel()400 XdsClient::XdsChannel::~XdsChannel() {
401 GRPC_TRACE_LOG(xds_client, INFO)
402 << "[xds_client " << xds_client() << "] destroying xds channel " << this
403 << " for server " << server_.server_uri();
404 xds_client_.reset(DEBUG_LOCATION, "XdsChannel");
405 }
406
407 // This method should only ever be called when holding the lock, but we can't
408 // use a ABSL_EXCLUSIVE_LOCKS_REQUIRED annotation, because Orphan() will be
409 // called from DualRefCounted::Unref, which cannot have a lock annotation for
410 // a lock in this subclass.
Orphaned()411 void XdsClient::XdsChannel::Orphaned() ABSL_NO_THREAD_SAFETY_ANALYSIS {
412 GRPC_TRACE_LOG(xds_client, INFO)
413 << "[xds_client " << xds_client() << "] orphaning xds channel " << this
414 << " for server " << server_.server_uri();
415 shutting_down_ = true;
416 if (failure_watcher_ != nullptr) {
417 transport_->StopConnectivityFailureWatch(failure_watcher_);
418 failure_watcher_.reset();
419 }
420 transport_.reset();
421 // At this time, all strong refs are removed, remove from channel map to
422 // prevent subsequent subscription from trying to use this XdsChannel as
423 // it is shutting down.
424 xds_client_->xds_channel_map_.erase(server_.Key());
425 ads_call_.reset();
426 }
427
ResetBackoff()428 void XdsClient::XdsChannel::ResetBackoff() { transport_->ResetBackoff(); }
429
ads_call() const430 XdsClient::XdsChannel::AdsCall* XdsClient::XdsChannel::ads_call() const {
431 return ads_call_->call();
432 }
433
SubscribeLocked(const XdsResourceType * type,const XdsResourceName & name)434 void XdsClient::XdsChannel::SubscribeLocked(const XdsResourceType* type,
435 const XdsResourceName& name) {
436 if (ads_call_ == nullptr) {
437 // Start the ADS call if this is the first request.
438 ads_call_.reset(
439 new RetryableCall<AdsCall>(WeakRef(DEBUG_LOCATION, "XdsChannel+ads")));
440 // Note: AdsCall's ctor will automatically subscribe to all
441 // resources that the XdsClient already has watchers for, so we can
442 // return here.
443 return;
444 }
445 // If the ADS call is in backoff state, we don't need to do anything now
446 // because when the call is restarted it will resend all necessary requests.
447 if (ads_call() == nullptr) return;
448 // Subscribe to this resource if the ADS call is active.
449 ads_call()->SubscribeLocked(type, name, /*delay_send=*/false);
450 }
451
UnsubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_unsubscription)452 void XdsClient::XdsChannel::UnsubscribeLocked(const XdsResourceType* type,
453 const XdsResourceName& name,
454 bool delay_unsubscription) {
455 if (ads_call_ != nullptr) {
456 auto* call = ads_call_->call();
457 if (call != nullptr) {
458 call->UnsubscribeLocked(type, name, delay_unsubscription);
459 if (!call->HasSubscribedResources()) {
460 ads_call_.reset();
461 }
462 }
463 }
464 }
465
MaybeFallbackLocked(const std::string & authority,AuthorityState & authority_state)466 bool XdsClient::XdsChannel::MaybeFallbackLocked(
467 const std::string& authority, AuthorityState& authority_state) {
468 if (!xds_client_->HasUncachedResources(authority_state)) {
469 return false;
470 }
471 std::vector<const XdsBootstrap::XdsServer*> xds_servers;
472 if (authority != kOldStyleAuthority) {
473 xds_servers =
474 xds_client_->bootstrap().LookupAuthority(authority)->servers();
475 }
476 if (xds_servers.empty()) xds_servers = xds_client_->bootstrap().servers();
477 for (size_t i = authority_state.xds_channels.size(); i < xds_servers.size();
478 ++i) {
479 authority_state.xds_channels.emplace_back(
480 xds_client_->GetOrCreateXdsChannelLocked(*xds_servers[i], "fallback"));
481 for (const auto& type_resource : authority_state.resource_map) {
482 for (const auto& key_state : type_resource.second) {
483 authority_state.xds_channels.back()->SubscribeLocked(
484 type_resource.first, {authority, key_state.first});
485 }
486 }
487 GRPC_TRACE_LOG(xds_client, INFO)
488 << "[xds_client " << xds_client_.get() << "] authority " << authority
489 << ": added fallback server " << xds_servers[i]->server_uri() << " ("
490 << authority_state.xds_channels.back()->status().ToString() << ")";
491 if (authority_state.xds_channels.back()->status().ok()) return true;
492 }
493 GRPC_TRACE_LOG(xds_client, INFO)
494 << "[xds_client " << xds_client_.get() << "] authority " << authority
495 << ": No fallback server";
496 return false;
497 }
498
SetHealthyLocked()499 void XdsClient::XdsChannel::SetHealthyLocked() {
500 status_ = absl::OkStatus();
501 // Make this channel active iff:
502 // 1. Channel is on the list of authority channels
503 // 2. Channel is not the last channel on the list (i.e. not the active
504 // channel)
505 for (auto& authority : xds_client_->authority_state_map_) {
506 auto& channels = authority.second.xds_channels;
507 // Skip if channel is active.
508 if (channels.back() == this) continue;
509 auto channel_it = std::find(channels.begin(), channels.end(), this);
510 // Skip if this is not on the list
511 if (channel_it != channels.end()) {
512 GRPC_TRACE_LOG(xds_client, INFO)
513 << "[xds_client " << xds_client_.get() << "] authority "
514 << authority.first << ": Falling forward to " << server_.server_uri();
515 // Lower priority channels are no longer needed, connection is back!
516 channels.erase(channel_it + 1, channels.end());
517 }
518 }
519 }
520
OnConnectivityFailure(absl::Status status)521 void XdsClient::XdsChannel::OnConnectivityFailure(absl::Status status) {
522 {
523 MutexLock lock(&xds_client_->mu_);
524 SetChannelStatusLocked(std::move(status));
525 }
526 xds_client_->work_serializer_.DrainQueue();
527 }
528
SetChannelStatusLocked(absl::Status status)529 void XdsClient::XdsChannel::SetChannelStatusLocked(absl::Status status) {
530 if (shutting_down_) return;
531 status = absl::Status(status.code(), absl::StrCat("xDS channel for server ",
532 server_.server_uri(), ": ",
533 status.message()));
534 LOG(INFO) << "[xds_client " << xds_client() << "] " << status;
535 // If status was previously OK, report that the channel has gone unhealthy.
536 if (status_.ok() && xds_client_->metrics_reporter_ != nullptr) {
537 xds_client_->metrics_reporter_->ReportServerFailure(server_.server_uri());
538 }
539 // Save status in channel, so that we can immediately generate an
540 // error for any new watchers that may be started.
541 status_ = status;
542 // Find all watchers for this channel.
543 WatcherSet watchers_cached;
544 WatcherSet watchers_uncached;
545 for (auto& a : xds_client_->authority_state_map_) { // authority
546 if (a.second.xds_channels.empty() || a.second.xds_channels.back() != this ||
547 MaybeFallbackLocked(a.first, a.second)) {
548 continue;
549 }
550 for (const auto& t : a.second.resource_map) { // type
551 for (const auto& r : t.second) { // resource id
552 auto& watchers =
553 r.second.HasResource() ? watchers_cached : watchers_uncached;
554 for (const auto& w : r.second.watchers()) { // watchers
555 watchers.insert(w);
556 }
557 }
558 }
559 }
560 // Enqueue notifications for the watchers.
561 if (!watchers_cached.empty()) {
562 xds_client_->NotifyWatchersOnAmbientError(
563 status, std::move(watchers_cached), ReadDelayHandle::NoWait());
564 }
565 if (!watchers_uncached.empty()) {
566 xds_client_->NotifyWatchersOnResourceChanged(
567 status, std::move(watchers_uncached), ReadDelayHandle::NoWait());
568 }
569 }
570
571 //
572 // XdsClient::XdsChannel::RetryableCall<>
573 //
574
575 template <typename T>
RetryableCall(WeakRefCountedPtr<XdsChannel> xds_channel)576 XdsClient::XdsChannel::RetryableCall<T>::RetryableCall(
577 WeakRefCountedPtr<XdsChannel> xds_channel)
578 : xds_channel_(std::move(xds_channel)),
579 backoff_(BackOff::Options()
580 .set_initial_backoff(Duration::Seconds(
581 GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS))
582 .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
583 .set_jitter(GRPC_XDS_RECONNECT_JITTER)
584 .set_max_backoff(Duration::Seconds(
585 GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS))) {
586 StartNewCallLocked();
587 }
588
589 template <typename T>
Orphan()590 void XdsClient::XdsChannel::RetryableCall<T>::Orphan() {
591 shutting_down_ = true;
592 call_.reset();
593 if (timer_handle_.has_value()) {
594 xds_channel()->xds_client()->engine()->Cancel(*timer_handle_);
595 timer_handle_.reset();
596 }
597 this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
598 }
599
600 template <typename T>
OnCallFinishedLocked()601 void XdsClient::XdsChannel::RetryableCall<T>::OnCallFinishedLocked() {
602 // If we saw a response on the current stream, reset backoff.
603 if (call_->seen_response()) backoff_.Reset();
604 call_.reset();
605 // Start retry timer.
606 StartRetryTimerLocked();
607 }
608
609 template <typename T>
StartNewCallLocked()610 void XdsClient::XdsChannel::RetryableCall<T>::StartNewCallLocked() {
611 if (shutting_down_) return;
612 CHECK(xds_channel_->transport_ != nullptr);
613 CHECK(call_ == nullptr);
614 GRPC_TRACE_LOG(xds_client, INFO)
615 << "[xds_client " << xds_channel()->xds_client() << "] xds server "
616 << xds_channel()->server_.server_uri()
617 << ": start new call from retryable call " << this;
618 call_ = MakeOrphanable<T>(
619 this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
620 }
621
622 template <typename T>
StartRetryTimerLocked()623 void XdsClient::XdsChannel::RetryableCall<T>::StartRetryTimerLocked() {
624 if (shutting_down_) return;
625 const Duration delay = backoff_.NextAttemptDelay();
626 GRPC_TRACE_LOG(xds_client, INFO)
627 << "[xds_client " << xds_channel()->xds_client() << "] xds server "
628 << xds_channel()->server_.server_uri()
629 << ": call attempt failed; retry timer will fire in " << delay.millis()
630 << "ms.";
631 timer_handle_ = xds_channel()->xds_client()->engine()->RunAfter(
632 delay,
633 [self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() {
634 ApplicationCallbackExecCtx callback_exec_ctx;
635 ExecCtx exec_ctx;
636 self->OnRetryTimer();
637 });
638 }
639
640 template <typename T>
OnRetryTimer()641 void XdsClient::XdsChannel::RetryableCall<T>::OnRetryTimer() {
642 MutexLock lock(&xds_channel_->xds_client()->mu_);
643 if (timer_handle_.has_value()) {
644 timer_handle_.reset();
645 if (shutting_down_) return;
646 GRPC_TRACE_LOG(xds_client, INFO)
647 << "[xds_client " << xds_channel()->xds_client() << "] xds server "
648 << xds_channel()->server_.server_uri()
649 << ": retry timer fired (retryable call: " << this << ")";
650 StartNewCallLocked();
651 }
652 }
653
654 //
655 // XdsClient::XdsChannel::AdsCall::AdsReadDelayHandle
656 //
657
658 class XdsClient::XdsChannel::AdsCall::AdsReadDelayHandle final
659 : public XdsClient::ReadDelayHandle {
660 public:
AdsReadDelayHandle(RefCountedPtr<AdsCall> ads_call)661 explicit AdsReadDelayHandle(RefCountedPtr<AdsCall> ads_call)
662 : ads_call_(std::move(ads_call)) {}
663
~AdsReadDelayHandle()664 ~AdsReadDelayHandle() override {
665 MutexLock lock(&ads_call_->xds_client()->mu_);
666 auto call = ads_call_->streaming_call_.get();
667 if (call != nullptr) call->StartRecvMessage();
668 }
669
670 private:
671 RefCountedPtr<AdsCall> ads_call_;
672 };
673
674 //
675 // XdsClient::XdsChannel::AdsCall
676 //
677
AdsCall(RefCountedPtr<RetryableCall<AdsCall>> retryable_call)678 XdsClient::XdsChannel::AdsCall::AdsCall(
679 RefCountedPtr<RetryableCall<AdsCall>> retryable_call)
680 : InternallyRefCounted<AdsCall>(
681 GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) ? "AdsCall" : nullptr),
682 retryable_call_(std::move(retryable_call)) {
683 CHECK_NE(xds_client(), nullptr);
684 // Init the ADS call.
685 const char* method =
686 "/envoy.service.discovery.v3.AggregatedDiscoveryService/"
687 "StreamAggregatedResources";
688 streaming_call_ = xds_channel()->transport_->CreateStreamingCall(
689 method, std::make_unique<StreamEventHandler>(
690 // Passing the initial ref here. This ref will go away when
691 // the StreamEventHandler is destroyed.
692 RefCountedPtr<AdsCall>(this)));
693 CHECK(streaming_call_ != nullptr);
694 // Start the call.
695 GRPC_TRACE_LOG(xds_client, INFO)
696 << "[xds_client " << xds_client() << "] xds server "
697 << xds_channel()->server_.server_uri()
698 << ": starting ADS call (ads_call: " << this
699 << ", streaming_call: " << streaming_call_.get() << ")";
700 // If this is a reconnect, add any necessary subscriptions from what's
701 // already in the cache.
702 for (auto& a : xds_client()->authority_state_map_) {
703 const std::string& authority = a.first;
704 auto it = std::find(a.second.xds_channels.begin(),
705 a.second.xds_channels.end(), xds_channel());
706 // Skip authorities that are not using this xDS channel. The channel can be
707 // anywhere in the list.
708 if (it == a.second.xds_channels.end()) continue;
709 for (const auto& t : a.second.resource_map) {
710 const XdsResourceType* type = t.first;
711 for (const auto& r : t.second) {
712 const XdsResourceKey& resource_key = r.first;
713 SubscribeLocked(type, {authority, resource_key}, /*delay_send=*/true);
714 }
715 }
716 }
717 // Send initial message if we added any subscriptions above.
718 for (const auto& p : state_map_) {
719 SendMessageLocked(p.first);
720 }
721 streaming_call_->StartRecvMessage();
722 }
723
Orphan()724 void XdsClient::XdsChannel::AdsCall::Orphan() {
725 state_map_.clear();
726 // Note that the initial ref is held by the StreamEventHandler, which
727 // will be destroyed when streaming_call_ is destroyed, which may not happen
728 // here, since there may be other refs held to streaming_call_ by internal
729 // callbacks.
730 streaming_call_.reset();
731 }
732
SubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_send)733 void XdsClient::XdsChannel::AdsCall::SubscribeLocked(
734 const XdsResourceType* type, const XdsResourceName& name, bool delay_send) {
735 auto& state = state_map_[type].subscribed_resources[name.authority][name.key];
736 if (state == nullptr) {
737 state = MakeOrphanable<ResourceTimer>(type, name);
738 if (!delay_send) SendMessageLocked(type);
739 }
740 }
741
UnsubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_unsubscription)742 void XdsClient::XdsChannel::AdsCall::UnsubscribeLocked(
743 const XdsResourceType* type, const XdsResourceName& name,
744 bool delay_unsubscription) {
745 auto& type_state_map = state_map_[type];
746 auto& authority_map = type_state_map.subscribed_resources[name.authority];
747 authority_map.erase(name.key);
748 if (authority_map.empty()) {
749 type_state_map.subscribed_resources.erase(name.authority);
750 // Note: We intentionally do not remove the top-level map entry for
751 // the resource type even if the authority map for the type is empty,
752 // because we need to retain the nonce in case a new watch is
753 // started for a resource of this type while this stream is still open.
754 }
755 // Don't need to send unsubscription message if this was the last
756 // resource we were subscribed to, since we'll be closing the stream
757 // immediately in that case.
758 if (!delay_unsubscription && HasSubscribedResources()) {
759 SendMessageLocked(type);
760 }
761 }
762
HasSubscribedResources() const763 bool XdsClient::XdsChannel::AdsCall::HasSubscribedResources() const {
764 for (const auto& p : state_map_) {
765 if (!p.second.subscribed_resources.empty()) return true;
766 }
767 return false;
768 }
769
770 namespace {
771
MaybeLogDiscoveryRequest(const XdsClient * client,upb_DefPool * def_pool,const envoy_service_discovery_v3_DiscoveryRequest * request)772 void MaybeLogDiscoveryRequest(
773 const XdsClient* client, upb_DefPool* def_pool,
774 const envoy_service_discovery_v3_DiscoveryRequest* request) {
775 if (GRPC_TRACE_FLAG_ENABLED(xds_client) && ABSL_VLOG_IS_ON(2)) {
776 const upb_MessageDef* msg_type =
777 envoy_service_discovery_v3_DiscoveryRequest_getmsgdef(def_pool);
778 char buf[10240];
779 upb_TextEncode(reinterpret_cast<const upb_Message*>(request), msg_type,
780 nullptr, 0, buf, sizeof(buf));
781 VLOG(2) << "[xds_client " << client << "] constructed ADS request: " << buf;
782 }
783 }
784
SerializeDiscoveryRequest(upb_Arena * arena,envoy_service_discovery_v3_DiscoveryRequest * request)785 std::string SerializeDiscoveryRequest(
786 upb_Arena* arena, envoy_service_discovery_v3_DiscoveryRequest* request) {
787 size_t output_length;
788 char* output = envoy_service_discovery_v3_DiscoveryRequest_serialize(
789 request, arena, &output_length);
790 return std::string(output, output_length);
791 }
792
793 } // namespace
794
CreateAdsRequest(absl::string_view type_url,absl::string_view version,absl::string_view nonce,const std::vector<std::string> & resource_names,absl::Status status) const795 std::string XdsClient::XdsChannel::AdsCall::CreateAdsRequest(
796 absl::string_view type_url, absl::string_view version,
797 absl::string_view nonce, const std::vector<std::string>& resource_names,
798 absl::Status status) const {
799 upb::Arena arena;
800 // Create a request.
801 envoy_service_discovery_v3_DiscoveryRequest* request =
802 envoy_service_discovery_v3_DiscoveryRequest_new(arena.ptr());
803 // Set type_url.
804 std::string type_url_str = absl::StrCat("type.googleapis.com/", type_url);
805 envoy_service_discovery_v3_DiscoveryRequest_set_type_url(
806 request, StdStringToUpbString(type_url_str));
807 // Set version_info.
808 if (!version.empty()) {
809 envoy_service_discovery_v3_DiscoveryRequest_set_version_info(
810 request, StdStringToUpbString(version));
811 }
812 // Set nonce.
813 if (!nonce.empty()) {
814 envoy_service_discovery_v3_DiscoveryRequest_set_response_nonce(
815 request, StdStringToUpbString(nonce));
816 }
817 // Set error_detail if it's a NACK.
818 std::string error_string_storage;
819 if (!status.ok()) {
820 google_rpc_Status* error_detail =
821 envoy_service_discovery_v3_DiscoveryRequest_mutable_error_detail(
822 request, arena.ptr());
823 // Hard-code INVALID_ARGUMENT as the status code.
824 // TODO(roth): If at some point we decide we care about this value,
825 // we could attach a status code to the individual errors where we
826 // generate them in the parsing code, and then use that here.
827 google_rpc_Status_set_code(error_detail, GRPC_STATUS_INVALID_ARGUMENT);
828 // Error description comes from the status that was passed in.
829 error_string_storage = std::string(status.message());
830 upb_StringView error_description =
831 StdStringToUpbString(error_string_storage);
832 google_rpc_Status_set_message(error_detail, error_description);
833 }
834 // Populate node.
835 if (!sent_initial_message_) {
836 envoy_config_core_v3_Node* node_msg =
837 envoy_service_discovery_v3_DiscoveryRequest_mutable_node(request,
838 arena.ptr());
839 PopulateXdsNode(xds_client()->bootstrap_->node(),
840 xds_client()->user_agent_name_,
841 xds_client()->user_agent_version_, node_msg, arena.ptr());
842 envoy_config_core_v3_Node_add_client_features(
843 node_msg, upb_StringView_FromString("xds.config.resource-in-sotw"),
844 arena.ptr());
845 }
846 // Add resource_names.
847 for (const std::string& resource_name : resource_names) {
848 envoy_service_discovery_v3_DiscoveryRequest_add_resource_names(
849 request, StdStringToUpbString(resource_name), arena.ptr());
850 }
851 MaybeLogDiscoveryRequest(xds_client(), xds_client()->def_pool_.ptr(),
852 request);
853 return SerializeDiscoveryRequest(arena.ptr(), request);
854 }
855
SendMessageLocked(const XdsResourceType * type)856 void XdsClient::XdsChannel::AdsCall::SendMessageLocked(
857 const XdsResourceType* type)
858 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
859 // Buffer message sending if an existing message is in flight.
860 if (send_message_pending_ != nullptr) {
861 buffered_requests_.insert(type);
862 return;
863 }
864 auto& state = state_map_[type];
865 std::string serialized_message = CreateAdsRequest(
866 type->type_url(), xds_channel()->resource_type_version_map_[type],
867 state.nonce, ResourceNamesForRequest(type), state.status);
868 sent_initial_message_ = true;
869 GRPC_TRACE_LOG(xds_client, INFO)
870 << "[xds_client " << xds_client() << "] xds server "
871 << xds_channel()->server_.server_uri()
872 << ": sending ADS request: type=" << type->type_url()
873 << " version=" << xds_channel()->resource_type_version_map_[type]
874 << " nonce=" << state.nonce << " error=" << state.status;
875 state.status = absl::OkStatus();
876 streaming_call_->SendMessage(std::move(serialized_message));
877 send_message_pending_ = type;
878 }
879
OnRequestSent(bool ok)880 void XdsClient::XdsChannel::AdsCall::OnRequestSent(bool ok) {
881 MutexLock lock(&xds_client()->mu_);
882 // For each resource that was in the message we just sent, start the
883 // resource timer if needed.
884 if (ok) {
885 auto& resource_type_state = state_map_[send_message_pending_];
886 for (const auto& p : resource_type_state.subscribed_resources) {
887 for (auto& q : p.second) {
888 q.second->MaybeMarkSubscriptionSendComplete(
889 Ref(DEBUG_LOCATION, "ResourceTimer"));
890 }
891 }
892 }
893 send_message_pending_ = nullptr;
894 if (ok && IsCurrentCallOnChannel()) {
895 // Continue to send another pending message if any.
896 // TODO(roth): The current code to handle buffered messages has the
897 // advantage of sending only the most recent list of resource names for
898 // each resource type (no matter how many times that resource type has
899 // been requested to send while the current message sending is still
900 // pending). But its disadvantage is that we send the requests in fixed
901 // order of resource types. We need to fix this if we are seeing some
902 // resource type(s) starved due to frequent requests of other resource
903 // type(s).
904 auto it = buffered_requests_.begin();
905 if (it != buffered_requests_.end()) {
906 SendMessageLocked(*it);
907 buffered_requests_.erase(it);
908 }
909 }
910 }
911
ParseResource(size_t idx,absl::string_view type_url,absl::string_view resource_name,absl::string_view serialized_resource,DecodeContext * context)912 void XdsClient::XdsChannel::AdsCall::ParseResource(
913 size_t idx, absl::string_view type_url, absl::string_view resource_name,
914 absl::string_view serialized_resource, DecodeContext* context) {
915 std::string error_prefix = absl::StrCat(
916 "resource index ", idx, ": ",
917 resource_name.empty() ? "" : absl::StrCat(resource_name, ": "));
918 // Check the type_url of the resource.
919 if (context->type_url != type_url) {
920 context->errors.emplace_back(
921 absl::StrCat(error_prefix, "incorrect resource type \"", type_url,
922 "\" (should be \"", context->type_url, "\")"));
923 ++context->num_invalid_resources;
924 return;
925 }
926 // Parse the resource.
927 XdsResourceType::DecodeContext resource_type_context = {
928 xds_client(), xds_channel()->server_, &xds_client_trace,
929 xds_client()->def_pool_.ptr(), context->arena.ptr()};
930 XdsResourceType::DecodeResult decode_result =
931 context->type->Decode(resource_type_context, serialized_resource);
932 // If we didn't already have the resource name from the Resource
933 // wrapper, try to get it from the decoding result.
934 if (resource_name.empty()) {
935 if (decode_result.name.has_value()) {
936 resource_name = *decode_result.name;
937 error_prefix =
938 absl::StrCat("resource index ", idx, ": ", resource_name, ": ");
939 } else {
940 // We don't have any way of determining the resource name, so
941 // there's nothing more we can do here.
942 context->errors.emplace_back(absl::StrCat(
943 error_prefix, decode_result.resource.status().ToString()));
944 ++context->num_invalid_resources;
945 return;
946 }
947 }
948 // If decoding failed, make sure we include the error in the NACK.
949 const absl::Status& decode_status = decode_result.resource.status();
950 if (!decode_status.ok()) {
951 context->errors.emplace_back(
952 absl::StrCat(error_prefix, decode_status.ToString()));
953 }
954 // Check the resource name.
955 auto parsed_resource_name =
956 xds_client()->ParseXdsResourceName(resource_name, context->type);
957 if (!parsed_resource_name.ok()) {
958 context->errors.emplace_back(
959 absl::StrCat(error_prefix, "Cannot parse xDS resource name"));
960 ++context->num_invalid_resources;
961 return;
962 }
963 // Cancel resource-does-not-exist timer, if needed.
964 auto timer_it = state_map_.find(context->type);
965 if (timer_it != state_map_.end()) {
966 auto it = timer_it->second.subscribed_resources.find(
967 parsed_resource_name->authority);
968 if (it != timer_it->second.subscribed_resources.end()) {
969 auto res_it = it->second.find(parsed_resource_name->key);
970 if (res_it != it->second.end()) {
971 res_it->second->MarkSeen();
972 }
973 }
974 }
975 // Lookup the authority in the cache.
976 auto authority_it =
977 xds_client()->authority_state_map_.find(parsed_resource_name->authority);
978 if (authority_it == xds_client()->authority_state_map_.end()) {
979 return; // Skip resource -- we don't have a subscription for it.
980 }
981 // Found authority, so look up type.
982 AuthorityState& authority_state = authority_it->second;
983 auto type_it = authority_state.resource_map.find(context->type);
984 if (type_it == authority_state.resource_map.end()) {
985 return; // Skip resource -- we don't have a subscription for it.
986 }
987 auto& type_map = type_it->second;
988 // Found type, so look up resource key.
989 auto it = type_map.find(parsed_resource_name->key);
990 if (it == type_map.end()) {
991 return; // Skip resource -- we don't have a subscription for it.
992 }
993 ResourceState& resource_state = it->second;
994 // If needed, record that we've seen this resource.
995 if (context->type->AllResourcesRequiredInSotW()) {
996 context->resources_seen[parsed_resource_name->authority].insert(
997 parsed_resource_name->key);
998 }
999 // If we previously ignored the resource's deletion, log that we're
1000 // now re-adding it.
1001 if (resource_state.ignored_deletion()) {
1002 LOG(INFO) << "[xds_client " << xds_client() << "] xds server "
1003 << xds_channel()->server_.server_uri()
1004 << ": server returned new version of resource for which we "
1005 "previously ignored a deletion: type "
1006 << type_url << " name " << resource_name;
1007 resource_state.set_ignored_deletion(false);
1008 }
1009 // Update resource state based on whether the resource is valid.
1010 absl::Status status = absl::InvalidArgumentError(
1011 absl::StrCat("invalid resource: ", decode_status.ToString()));
1012 if (!decode_status.ok()) {
1013 if (!resource_state.HasResource()) {
1014 xds_client()->NotifyWatchersOnResourceChanged(std::move(status),
1015 resource_state.watchers(),
1016 context->read_delay_handle);
1017 } else {
1018 xds_client()->NotifyWatchersOnAmbientError(std::move(status),
1019 resource_state.watchers(),
1020 context->read_delay_handle);
1021 }
1022 resource_state.SetNacked(context->version, decode_status.ToString(),
1023 context->update_time);
1024 ++context->num_invalid_resources;
1025 return;
1026 }
1027 // Resource is valid.
1028 ++context->num_valid_resources;
1029 // Check if the resource has changed.
1030 const bool resource_identical =
1031 resource_state.HasResource() &&
1032 context->type->ResourcesEqual(resource_state.resource().get(),
1033 decode_result.resource->get());
1034 // If not changed, keep using the current decoded resource object.
1035 // This should avoid wasting memory, since external watchers may be
1036 // holding refs to the current object.
1037 if (resource_identical) decode_result.resource = resource_state.resource();
1038 // Update the resource state.
1039 resource_state.SetAcked(std::move(*decode_result.resource),
1040 std::string(serialized_resource), context->version,
1041 context->update_time);
1042 // If the resource didn't change, inhibit watcher notifications.
1043 if (resource_identical) {
1044 GRPC_TRACE_LOG(xds_client, INFO)
1045 << "[xds_client " << xds_client() << "] " << context->type_url
1046 << " resource " << resource_name << " identical to current, ignoring.";
1047 // If we previously had connectivity problems, notify watchers that
1048 // the ambient error has been cleared.
1049 if (!xds_channel()->status().ok()) {
1050 xds_client()->NotifyWatchersOnAmbientError(absl::OkStatus(),
1051 resource_state.watchers(),
1052 context->read_delay_handle);
1053 }
1054 return;
1055 }
1056 // Notify watchers.
1057 xds_client()->NotifyWatchersOnResourceChanged(resource_state.resource(),
1058 resource_state.watchers(),
1059 context->read_delay_handle);
1060 }
1061
1062 namespace {
1063
MaybeLogDiscoveryResponse(const XdsClient * client,upb_DefPool * def_pool,const envoy_service_discovery_v3_DiscoveryResponse * response)1064 void MaybeLogDiscoveryResponse(
1065 const XdsClient* client, upb_DefPool* def_pool,
1066 const envoy_service_discovery_v3_DiscoveryResponse* response) {
1067 if (GRPC_TRACE_FLAG_ENABLED(xds_client) && ABSL_VLOG_IS_ON(2)) {
1068 const upb_MessageDef* msg_type =
1069 envoy_service_discovery_v3_DiscoveryResponse_getmsgdef(def_pool);
1070 char buf[10240];
1071 upb_TextEncode(reinterpret_cast<const upb_Message*>(response), msg_type,
1072 nullptr, 0, buf, sizeof(buf));
1073 VLOG(2) << "[xds_client " << client << "] received response: " << buf;
1074 }
1075 }
1076
1077 } // namespace
1078
DecodeAdsResponse(absl::string_view encoded_response,DecodeContext * context)1079 absl::Status XdsClient::XdsChannel::AdsCall::DecodeAdsResponse(
1080 absl::string_view encoded_response, DecodeContext* context) {
1081 // Decode the response.
1082 const envoy_service_discovery_v3_DiscoveryResponse* response =
1083 envoy_service_discovery_v3_DiscoveryResponse_parse(
1084 encoded_response.data(), encoded_response.size(),
1085 context->arena.ptr());
1086 // If decoding fails, report a fatal error and return.
1087 if (response == nullptr) {
1088 return absl::InvalidArgumentError("Can't decode DiscoveryResponse.");
1089 }
1090 MaybeLogDiscoveryResponse(xds_client(), xds_client()->def_pool_.ptr(),
1091 response);
1092 // Get the type_url, version, nonce, and number of resources.
1093 context->type_url = std::string(absl::StripPrefix(
1094 UpbStringToAbsl(
1095 envoy_service_discovery_v3_DiscoveryResponse_type_url(response)),
1096 "type.googleapis.com/"));
1097 context->version = UpbStringToStdString(
1098 envoy_service_discovery_v3_DiscoveryResponse_version_info(response));
1099 context->nonce = UpbStringToStdString(
1100 envoy_service_discovery_v3_DiscoveryResponse_nonce(response));
1101 size_t num_resources;
1102 const google_protobuf_Any* const* resources =
1103 envoy_service_discovery_v3_DiscoveryResponse_resources(response,
1104 &num_resources);
1105 GRPC_TRACE_LOG(xds_client, INFO)
1106 << "[xds_client " << xds_client() << "] xds server "
1107 << xds_channel()->server_.server_uri()
1108 << ": received ADS response: type_url=" << context->type_url
1109 << ", version=" << context->version << ", nonce=" << context->nonce
1110 << ", num_resources=" << num_resources;
1111 context->type = xds_client()->GetResourceTypeLocked(context->type_url);
1112 if (context->type == nullptr) {
1113 return absl::InvalidArgumentError(
1114 absl::StrCat("unknown resource type ", context->type_url));
1115 }
1116 context->read_delay_handle = MakeRefCounted<AdsReadDelayHandle>(Ref());
1117 // Process each resource.
1118 for (size_t i = 0; i < num_resources; ++i) {
1119 absl::string_view type_url = absl::StripPrefix(
1120 UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])),
1121 "type.googleapis.com/");
1122 absl::string_view serialized_resource =
1123 UpbStringToAbsl(google_protobuf_Any_value(resources[i]));
1124 // Unwrap Resource messages, if so wrapped.
1125 absl::string_view resource_name;
1126 if (type_url == "envoy.service.discovery.v3.Resource") {
1127 const auto* resource_wrapper = envoy_service_discovery_v3_Resource_parse(
1128 serialized_resource.data(), serialized_resource.size(),
1129 context->arena.ptr());
1130 if (resource_wrapper == nullptr) {
1131 context->errors.emplace_back(absl::StrCat(
1132 "resource index ", i, ": Can't decode Resource proto wrapper"));
1133 ++context->num_invalid_resources;
1134 continue;
1135 }
1136 const auto* resource =
1137 envoy_service_discovery_v3_Resource_resource(resource_wrapper);
1138 if (resource == nullptr) {
1139 context->errors.emplace_back(
1140 absl::StrCat("resource index ", i,
1141 ": No resource present in Resource proto wrappe"));
1142 ++context->num_invalid_resources;
1143 continue;
1144 }
1145 type_url = absl::StripPrefix(
1146 UpbStringToAbsl(google_protobuf_Any_type_url(resource)),
1147 "type.googleapis.com/");
1148 serialized_resource =
1149 UpbStringToAbsl(google_protobuf_Any_value(resource));
1150 resource_name = UpbStringToAbsl(
1151 envoy_service_discovery_v3_Resource_name(resource_wrapper));
1152 }
1153 ParseResource(i, type_url, resource_name, serialized_resource, context);
1154 }
1155 return absl::OkStatus();
1156 }
1157
OnRecvMessage(absl::string_view payload)1158 void XdsClient::XdsChannel::AdsCall::OnRecvMessage(absl::string_view payload) {
1159 // context.read_delay_handle needs to be destroyed after the mutex is
1160 // released.
1161 DecodeContext context;
1162 {
1163 MutexLock lock(&xds_client()->mu_);
1164 if (!IsCurrentCallOnChannel()) return;
1165 // Parse and validate the response.
1166 absl::Status status = DecodeAdsResponse(payload, &context);
1167 if (!status.ok()) {
1168 // Ignore unparsable response.
1169 LOG(ERROR) << "[xds_client " << xds_client() << "] xds server "
1170 << xds_channel()->server_.server_uri()
1171 << ": error parsing ADS response (" << status
1172 << ") -- ignoring";
1173 } else {
1174 seen_response_ = true;
1175 xds_channel()->SetHealthyLocked();
1176 // Update nonce.
1177 auto& state = state_map_[context.type];
1178 state.nonce = context.nonce;
1179 // If we got an error, set state.status so that we'll NACK the update.
1180 if (!context.errors.empty()) {
1181 state.status = absl::UnavailableError(
1182 absl::StrCat("xDS response validation errors: [",
1183 absl::StrJoin(context.errors, "; "), "]"));
1184 LOG(ERROR) << "[xds_client " << xds_client() << "] xds server "
1185 << xds_channel()->server_.server_uri()
1186 << ": ADS response invalid for resource type "
1187 << context.type_url << " version " << context.version
1188 << ", will NACK: nonce=" << state.nonce
1189 << " status=" << state.status;
1190 }
1191 // Delete resources not seen in update if needed.
1192 if (context.type->AllResourcesRequiredInSotW()) {
1193 for (auto& a : xds_client()->authority_state_map_) {
1194 const std::string& authority = a.first;
1195 AuthorityState& authority_state = a.second;
1196 // Skip authorities that are not using this xDS channel.
1197 if (authority_state.xds_channels.back() != xds_channel()) {
1198 continue;
1199 }
1200 auto seen_authority_it = context.resources_seen.find(authority);
1201 // Find this resource type.
1202 auto type_it = authority_state.resource_map.find(context.type);
1203 if (type_it == authority_state.resource_map.end()) continue;
1204 // Iterate over resource ids.
1205 for (auto& r : type_it->second) {
1206 const XdsResourceKey& resource_key = r.first;
1207 ResourceState& resource_state = r.second;
1208 if (seen_authority_it == context.resources_seen.end() ||
1209 seen_authority_it->second.find(resource_key) ==
1210 seen_authority_it->second.end()) {
1211 // If the resource was newly requested but has not yet been
1212 // received, we don't want to generate an error for the
1213 // watchers, because this ADS response may be in reaction to an
1214 // earlier request that did not yet request the new resource, so
1215 // its absence from the response does not necessarily indicate
1216 // that the resource does not exist. For that case, we rely on
1217 // the request timeout instead.
1218 if (!resource_state.HasResource()) continue;
1219 if (xds_channel()->server_.IgnoreResourceDeletion()) {
1220 if (!resource_state.ignored_deletion()) {
1221 LOG(ERROR)
1222 << "[xds_client " << xds_client() << "] xds server "
1223 << xds_channel()->server_.server_uri()
1224 << ": ignoring deletion for resource type "
1225 << context.type_url << " name "
1226 << XdsClient::ConstructFullXdsResourceName(
1227 authority, context.type_url.c_str(), resource_key);
1228 resource_state.set_ignored_deletion(true);
1229 }
1230 } else {
1231 resource_state.SetDoesNotExist();
1232 xds_client()->NotifyWatchersOnResourceChanged(
1233 absl::NotFoundError("does not exist"),
1234 resource_state.watchers(), context.read_delay_handle);
1235 }
1236 }
1237 }
1238 }
1239 }
1240 // If we had valid resources or the update was empty, update the version.
1241 if (context.num_valid_resources > 0 || context.errors.empty()) {
1242 xds_channel()->resource_type_version_map_[context.type] =
1243 std::move(context.version);
1244 }
1245 // Send ACK or NACK.
1246 SendMessageLocked(context.type);
1247 }
1248 // Update metrics.
1249 if (xds_client()->metrics_reporter_ != nullptr) {
1250 xds_client()->metrics_reporter_->ReportResourceUpdates(
1251 xds_channel()->server_.server_uri(), context.type_url,
1252 context.num_valid_resources, context.num_invalid_resources);
1253 }
1254 }
1255 xds_client()->work_serializer_.DrainQueue();
1256 }
1257
OnStatusReceived(absl::Status status)1258 void XdsClient::XdsChannel::AdsCall::OnStatusReceived(absl::Status status) {
1259 {
1260 MutexLock lock(&xds_client()->mu_);
1261 GRPC_TRACE_LOG(xds_client, INFO)
1262 << "[xds_client " << xds_client() << "] xds server "
1263 << xds_channel()->server_.server_uri()
1264 << ": ADS call status received (xds_channel=" << xds_channel()
1265 << ", ads_call=" << this << ", streaming_call=" << streaming_call_.get()
1266 << "): " << status;
1267 // Cancel any does-not-exist timers that may be pending.
1268 for (const auto& p : state_map_) {
1269 for (const auto& q : p.second.subscribed_resources) {
1270 for (auto& r : q.second) {
1271 r.second->MaybeCancelTimer();
1272 }
1273 }
1274 }
1275 // Ignore status from a stale call.
1276 if (IsCurrentCallOnChannel()) {
1277 // Try to restart the call.
1278 retryable_call_->OnCallFinishedLocked();
1279 // If we didn't receive a response on the stream, report the
1280 // stream failure as a connectivity failure, which will report the
1281 // error to all watchers of resources on this channel.
1282 if (!seen_response_) {
1283 xds_channel()->SetChannelStatusLocked(absl::UnavailableError(
1284 absl::StrCat("xDS call failed with no responses received; status: ",
1285 status.ToString())));
1286 }
1287 }
1288 }
1289 xds_client()->work_serializer_.DrainQueue();
1290 }
1291
IsCurrentCallOnChannel() const1292 bool XdsClient::XdsChannel::AdsCall::IsCurrentCallOnChannel() const {
1293 // If the retryable ADS call is null (which only happens when the xds
1294 // channel is shutting down), all the ADS calls are stale.
1295 if (xds_channel()->ads_call_ == nullptr) return false;
1296 return this == xds_channel()->ads_call_->call();
1297 }
1298
1299 std::vector<std::string>
ResourceNamesForRequest(const XdsResourceType * type)1300 XdsClient::XdsChannel::AdsCall::ResourceNamesForRequest(
1301 const XdsResourceType* type) {
1302 std::vector<std::string> resource_names;
1303 auto it = state_map_.find(type);
1304 if (it != state_map_.end()) {
1305 for (auto& a : it->second.subscribed_resources) {
1306 const std::string& authority = a.first;
1307 for (auto& p : a.second) {
1308 const XdsResourceKey& resource_key = p.first;
1309 resource_names.emplace_back(XdsClient::ConstructFullXdsResourceName(
1310 authority, type->type_url(), resource_key));
1311 OrphanablePtr<ResourceTimer>& resource_timer = p.second;
1312 resource_timer->MarkSubscriptionSendStarted();
1313 }
1314 }
1315 }
1316 return resource_names;
1317 }
1318
1319 //
1320 // XdsClient::ResourceState
1321 //
1322
SetAcked(std::shared_ptr<const XdsResourceType::ResourceData> resource,std::string serialized_proto,std::string version,Timestamp update_time)1323 void XdsClient::ResourceState::SetAcked(
1324 std::shared_ptr<const XdsResourceType::ResourceData> resource,
1325 std::string serialized_proto, std::string version, Timestamp update_time) {
1326 resource_ = std::move(resource);
1327 client_status_ = ClientResourceStatus::ACKED;
1328 serialized_proto_ = std::move(serialized_proto);
1329 update_time_ = update_time;
1330 version_ = std::move(version);
1331 failed_version_.clear();
1332 failed_details_.clear();
1333 }
1334
SetNacked(const std::string & version,const std::string & details,Timestamp update_time)1335 void XdsClient::ResourceState::SetNacked(const std::string& version,
1336 const std::string& details,
1337 Timestamp update_time) {
1338 client_status_ = ClientResourceStatus::NACKED;
1339 failed_version_ = version;
1340 failed_details_ = details;
1341 failed_update_time_ = update_time;
1342 }
1343
SetDoesNotExist()1344 void XdsClient::ResourceState::SetDoesNotExist() {
1345 resource_.reset();
1346 serialized_proto_.clear();
1347 client_status_ = ClientResourceStatus::DOES_NOT_EXIST;
1348 failed_version_.clear();
1349 }
1350
CacheStateString() const1351 absl::string_view XdsClient::ResourceState::CacheStateString() const {
1352 switch (client_status_) {
1353 case ClientResourceStatus::REQUESTED:
1354 return "requested";
1355 case ClientResourceStatus::DOES_NOT_EXIST:
1356 return "does_not_exist";
1357 case ClientResourceStatus::ACKED:
1358 return "acked";
1359 case ClientResourceStatus::NACKED:
1360 return resource_ != nullptr ? "nacked_but_cached" : "nacked";
1361 }
1362 Crash("unknown resource state");
1363 }
1364
1365 namespace {
1366
EncodeTimestamp(Timestamp value,upb_Arena * arena)1367 google_protobuf_Timestamp* EncodeTimestamp(Timestamp value, upb_Arena* arena) {
1368 google_protobuf_Timestamp* timestamp = google_protobuf_Timestamp_new(arena);
1369 gpr_timespec timespec = value.as_timespec(GPR_CLOCK_REALTIME);
1370 google_protobuf_Timestamp_set_seconds(timestamp, timespec.tv_sec);
1371 google_protobuf_Timestamp_set_nanos(timestamp, timespec.tv_nsec);
1372 return timestamp;
1373 }
1374
1375 } // namespace
1376
FillGenericXdsConfig(upb_StringView type_url,upb_StringView resource_name,upb_Arena * arena,envoy_service_status_v3_ClientConfig_GenericXdsConfig * entry) const1377 void XdsClient::ResourceState::FillGenericXdsConfig(
1378 upb_StringView type_url, upb_StringView resource_name, upb_Arena* arena,
1379 envoy_service_status_v3_ClientConfig_GenericXdsConfig* entry) const {
1380 envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_type_url(entry,
1381 type_url);
1382 envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_name(entry,
1383 resource_name);
1384 envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_client_status(
1385 entry, client_status_);
1386 if (!serialized_proto_.empty()) {
1387 envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_version_info(
1388 entry, StdStringToUpbString(version_));
1389 envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_last_updated(
1390 entry, EncodeTimestamp(update_time_, arena));
1391 auto* any_field =
1392 envoy_service_status_v3_ClientConfig_GenericXdsConfig_mutable_xds_config(
1393 entry, arena);
1394 google_protobuf_Any_set_type_url(any_field, type_url);
1395 google_protobuf_Any_set_value(any_field,
1396 StdStringToUpbString(serialized_proto_));
1397 }
1398 if (client_status_ == ClientResourceStatus::NACKED) {
1399 auto* update_failure_state = envoy_admin_v3_UpdateFailureState_new(arena);
1400 envoy_admin_v3_UpdateFailureState_set_details(
1401 update_failure_state, StdStringToUpbString(failed_details_));
1402 envoy_admin_v3_UpdateFailureState_set_version_info(
1403 update_failure_state, StdStringToUpbString(failed_version_));
1404 envoy_admin_v3_UpdateFailureState_set_last_update_attempt(
1405 update_failure_state, EncodeTimestamp(failed_update_time_, arena));
1406 envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_error_state(
1407 entry, update_failure_state);
1408 }
1409 }
1410
1411 //
1412 // XdsClient
1413 //
1414
1415 constexpr absl::string_view XdsClient::kOldStyleAuthority;
1416
XdsClient(std::shared_ptr<XdsBootstrap> bootstrap,RefCountedPtr<XdsTransportFactory> transport_factory,std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,std::unique_ptr<XdsMetricsReporter> metrics_reporter,std::string user_agent_name,std::string user_agent_version,Duration resource_request_timeout)1417 XdsClient::XdsClient(
1418 std::shared_ptr<XdsBootstrap> bootstrap,
1419 RefCountedPtr<XdsTransportFactory> transport_factory,
1420 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
1421 std::unique_ptr<XdsMetricsReporter> metrics_reporter,
1422 std::string user_agent_name, std::string user_agent_version,
1423 Duration resource_request_timeout)
1424 : DualRefCounted<XdsClient>(
1425 GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) ? "XdsClient" : nullptr),
1426 bootstrap_(std::move(bootstrap)),
1427 user_agent_name_(std::move(user_agent_name)),
1428 user_agent_version_(std::move(user_agent_version)),
1429 transport_factory_(std::move(transport_factory)),
1430 request_timeout_(resource_request_timeout),
1431 xds_federation_enabled_(XdsFederationEnabled()),
1432 work_serializer_(engine),
1433 engine_(std::move(engine)),
1434 metrics_reporter_(std::move(metrics_reporter)) {
1435 GRPC_TRACE_LOG(xds_client, INFO)
1436 << "[xds_client " << this << "] creating xds client";
1437 CHECK(bootstrap_ != nullptr);
1438 if (bootstrap_->node() != nullptr) {
1439 GRPC_TRACE_LOG(xds_client, INFO)
1440 << "[xds_client " << this
1441 << "] xDS node ID: " << bootstrap_->node()->id();
1442 }
1443 }
1444
~XdsClient()1445 XdsClient::~XdsClient() {
1446 GRPC_TRACE_LOG(xds_client, INFO)
1447 << "[xds_client " << this << "] destroying xds client";
1448 }
1449
Orphaned()1450 void XdsClient::Orphaned() {
1451 GRPC_TRACE_LOG(xds_client, INFO)
1452 << "[xds_client " << this << "] shutting down xds client";
1453 MutexLock lock(&mu_);
1454 shutting_down_ = true;
1455 // Clear cache and any remaining watchers that may not have been cancelled.
1456 authority_state_map_.clear();
1457 invalid_watchers_.clear();
1458 }
1459
GetOrCreateXdsChannelLocked(const XdsBootstrap::XdsServer & server,const char * reason)1460 RefCountedPtr<XdsClient::XdsChannel> XdsClient::GetOrCreateXdsChannelLocked(
1461 const XdsBootstrap::XdsServer& server, const char* reason) {
1462 std::string key = server.Key();
1463 auto it = xds_channel_map_.find(key);
1464 if (it != xds_channel_map_.end()) {
1465 return it->second->Ref(DEBUG_LOCATION, reason);
1466 }
1467 // Channel not found, so create a new one.
1468 auto xds_channel =
1469 MakeRefCounted<XdsChannel>(WeakRef(DEBUG_LOCATION, "XdsChannel"), server);
1470 xds_channel_map_[std::move(key)] = xds_channel.get();
1471 return xds_channel;
1472 }
1473
HasUncachedResources(const AuthorityState & authority_state)1474 bool XdsClient::HasUncachedResources(const AuthorityState& authority_state) {
1475 for (const auto& type_resource : authority_state.resource_map) {
1476 for (const auto& key_state : type_resource.second) {
1477 if (key_state.second.client_status() ==
1478 ResourceState::ClientResourceStatus::REQUESTED) {
1479 return true;
1480 }
1481 }
1482 }
1483 return false;
1484 }
1485
WatchResource(const XdsResourceType * type,absl::string_view name,RefCountedPtr<ResourceWatcherInterface> watcher)1486 void XdsClient::WatchResource(const XdsResourceType* type,
1487 absl::string_view name,
1488 RefCountedPtr<ResourceWatcherInterface> watcher) {
1489 // Lambda for handling failure cases.
1490 auto fail = [&](absl::Status status) mutable {
1491 {
1492 MutexLock lock(&mu_);
1493 MaybeRegisterResourceTypeLocked(type);
1494 invalid_watchers_.insert(watcher);
1495 }
1496 NotifyWatchersOnResourceChanged(std::move(status), {watcher},
1497 ReadDelayHandle::NoWait());
1498 work_serializer_.DrainQueue();
1499 };
1500 auto resource_name = ParseXdsResourceName(name, type);
1501 if (!resource_name.ok()) {
1502 fail(absl::InvalidArgumentError(
1503 absl::StrCat("Unable to parse resource name ", name)));
1504 return;
1505 }
1506 // Find server to use.
1507 std::vector<const XdsBootstrap::XdsServer*> xds_servers;
1508 if (resource_name->authority != kOldStyleAuthority) {
1509 auto* authority =
1510 bootstrap_->LookupAuthority(std::string(resource_name->authority));
1511 if (authority == nullptr) {
1512 fail(absl::FailedPreconditionError(
1513 absl::StrCat("authority \"", resource_name->authority,
1514 "\" not present in bootstrap config")));
1515 return;
1516 }
1517 xds_servers = authority->servers();
1518 }
1519 if (xds_servers.empty()) xds_servers = bootstrap_->servers();
1520 {
1521 MutexLock lock(&mu_);
1522 MaybeRegisterResourceTypeLocked(type);
1523 AuthorityState& authority_state =
1524 authority_state_map_[resource_name->authority];
1525 auto it_is_new = authority_state.resource_map[type].emplace(
1526 resource_name->key, ResourceState());
1527 bool first_watcher_for_resource = it_is_new.second;
1528 ResourceState& resource_state = it_is_new.first->second;
1529 resource_state.AddWatcher(watcher);
1530 bool notified_watcher = false;
1531 if (first_watcher_for_resource) {
1532 // We try to add new channels in 2 cases:
1533 // - This is the first resource for this authority (i.e., the list
1534 // of channels is empty).
1535 // - The last channel in the list is failing. That failure may not
1536 // have previously triggered fallback if there were no uncached
1537 // resources, but we've just added a new uncached resource,
1538 // so we need to trigger fallback now.
1539 //
1540 // Note that when we add a channel, it might already be failing
1541 // due to being used in a different authority. So we keep going
1542 // until either we add one that isn't failing or we've added them all.
1543 if (authority_state.xds_channels.empty() ||
1544 !authority_state.xds_channels.back()->status().ok()) {
1545 for (size_t i = authority_state.xds_channels.size();
1546 i < xds_servers.size(); ++i) {
1547 authority_state.xds_channels.emplace_back(
1548 GetOrCreateXdsChannelLocked(*xds_servers[i], "start watch"));
1549 if (authority_state.xds_channels.back()->status().ok()) {
1550 break;
1551 }
1552 }
1553 }
1554 for (const auto& channel : authority_state.xds_channels) {
1555 channel->SubscribeLocked(type, *resource_name);
1556 }
1557 } else {
1558 // If we already have a cached value for the resource, notify the new
1559 // watcher immediately.
1560 if (resource_state.HasResource()) {
1561 GRPC_TRACE_LOG(xds_client, INFO)
1562 << "[xds_client " << this << "] returning cached listener data for "
1563 << name;
1564 NotifyWatchersOnResourceChanged(resource_state.resource(), {watcher},
1565 ReadDelayHandle::NoWait());
1566 notified_watcher = true;
1567 } else if (resource_state.client_status() ==
1568 ResourceState::ClientResourceStatus::DOES_NOT_EXIST) {
1569 GRPC_TRACE_LOG(xds_client, INFO)
1570 << "[xds_client " << this
1571 << "] reporting cached does-not-exist for " << name;
1572 NotifyWatchersOnResourceChanged(absl::NotFoundError("does not exist"),
1573 {watcher}, ReadDelayHandle::NoWait());
1574 notified_watcher = true;
1575 } else if (resource_state.client_status() ==
1576 ResourceState::ClientResourceStatus::NACKED) {
1577 GRPC_TRACE_LOG(xds_client, INFO)
1578 << "[xds_client " << this
1579 << "] reporting cached validation failure for " << name << ": "
1580 << resource_state.failed_details();
1581 NotifyWatchersOnResourceChanged(
1582 absl::InvalidArgumentError(absl::StrCat(
1583 "invalid resource: ", resource_state.failed_details())),
1584 {watcher}, ReadDelayHandle::NoWait());
1585 notified_watcher = true;
1586 }
1587 }
1588 // If the channel is not connected, report an error to the watcher.
1589 absl::Status channel_status = authority_state.xds_channels.back()->status();
1590 if (!channel_status.ok()) {
1591 GRPC_TRACE_LOG(xds_client, INFO)
1592 << "[xds_client " << this << "] returning cached channel error for "
1593 << name << ": " << channel_status;
1594 if (notified_watcher) {
1595 NotifyWatchersOnAmbientError(std::move(channel_status), {watcher},
1596 ReadDelayHandle::NoWait());
1597 } else {
1598 NotifyWatchersOnResourceChanged(std::move(channel_status), {watcher},
1599 ReadDelayHandle::NoWait());
1600 }
1601 }
1602 }
1603 work_serializer_.DrainQueue();
1604 }
1605
CancelResourceWatch(const XdsResourceType * type,absl::string_view name,ResourceWatcherInterface * watcher,bool delay_unsubscription)1606 void XdsClient::CancelResourceWatch(const XdsResourceType* type,
1607 absl::string_view name,
1608 ResourceWatcherInterface* watcher,
1609 bool delay_unsubscription) {
1610 auto resource_name = ParseXdsResourceName(name, type);
1611 MutexLock lock(&mu_);
1612 // We cannot be sure whether the watcher is in invalid_watchers_ or in
1613 // authority_state_map_, so we check both, just to be safe.
1614 invalid_watchers_.erase(watcher);
1615 // Find authority.
1616 if (!resource_name.ok()) return;
1617 auto authority_it = authority_state_map_.find(resource_name->authority);
1618 if (authority_it == authority_state_map_.end()) return;
1619 AuthorityState& authority_state = authority_it->second;
1620 // Find type map.
1621 auto type_it = authority_state.resource_map.find(type);
1622 if (type_it == authority_state.resource_map.end()) return;
1623 auto& type_map = type_it->second;
1624 // Find resource key.
1625 auto resource_it = type_map.find(resource_name->key);
1626 if (resource_it == type_map.end()) return;
1627 ResourceState& resource_state = resource_it->second;
1628 // Remove watcher.
1629 resource_state.RemoveWatcher(watcher);
1630 // Clean up empty map entries, if any.
1631 if (!resource_state.HasWatchers()) {
1632 if (resource_state.ignored_deletion()) {
1633 LOG(INFO) << "[xds_client " << this
1634 << "] unsubscribing from a resource for which we "
1635 << "previously ignored a deletion: type " << type->type_url()
1636 << " name " << name;
1637 }
1638 for (const auto& xds_channel : authority_state.xds_channels) {
1639 xds_channel->UnsubscribeLocked(type, *resource_name,
1640 delay_unsubscription);
1641 }
1642 type_map.erase(resource_it);
1643 if (type_map.empty()) {
1644 authority_state.resource_map.erase(type_it);
1645 if (authority_state.resource_map.empty()) {
1646 authority_state.xds_channels.clear();
1647 }
1648 }
1649 }
1650 }
1651
MaybeRegisterResourceTypeLocked(const XdsResourceType * resource_type)1652 void XdsClient::MaybeRegisterResourceTypeLocked(
1653 const XdsResourceType* resource_type) {
1654 auto it = resource_types_.find(resource_type->type_url());
1655 if (it != resource_types_.end()) {
1656 CHECK(it->second == resource_type);
1657 return;
1658 }
1659 resource_types_.emplace(resource_type->type_url(), resource_type);
1660 resource_type->InitUpbSymtab(this, def_pool_.ptr());
1661 }
1662
GetResourceTypeLocked(absl::string_view resource_type)1663 const XdsResourceType* XdsClient::GetResourceTypeLocked(
1664 absl::string_view resource_type) {
1665 auto it = resource_types_.find(resource_type);
1666 if (it != resource_types_.end()) return it->second;
1667 return nullptr;
1668 }
1669
ParseXdsResourceName(absl::string_view name,const XdsResourceType * type)1670 absl::StatusOr<XdsClient::XdsResourceName> XdsClient::ParseXdsResourceName(
1671 absl::string_view name, const XdsResourceType* type) {
1672 // Old-style names use the empty string for authority.
1673 // authority is set to kOldStyleAuthority to indicate that it's an
1674 // old-style name.
1675 if (!xds_federation_enabled_ || !absl::StartsWith(name, "xdstp:")) {
1676 return XdsResourceName{std::string(kOldStyleAuthority),
1677 {std::string(name), {}}};
1678 }
1679 // New style name. Parse URI.
1680 auto uri = URI::Parse(name);
1681 if (!uri.ok()) return uri.status();
1682 // Split the resource type off of the path to get the id.
1683 std::pair<absl::string_view, absl::string_view> path_parts = absl::StrSplit(
1684 absl::StripPrefix(uri->path(), "/"), absl::MaxSplits('/', 1));
1685 if (type->type_url() != path_parts.first) {
1686 return absl::InvalidArgumentError(
1687 "xdstp URI path must indicate valid xDS resource type");
1688 }
1689 // Canonicalize order of query params.
1690 std::vector<URI::QueryParam> query_params;
1691 for (const auto& p : uri->query_parameter_map()) {
1692 query_params.emplace_back(
1693 URI::QueryParam{std::string(p.first), std::string(p.second)});
1694 }
1695 return XdsResourceName{
1696 uri->authority(),
1697 {std::string(path_parts.second), std::move(query_params)}};
1698 }
1699
ConstructFullXdsResourceName(absl::string_view authority,absl::string_view resource_type,const XdsResourceKey & key)1700 std::string XdsClient::ConstructFullXdsResourceName(
1701 absl::string_view authority, absl::string_view resource_type,
1702 const XdsResourceKey& key) {
1703 if (authority != kOldStyleAuthority) {
1704 auto uri = URI::Create("xdstp", std::string(authority),
1705 absl::StrCat("/", resource_type, "/", key.id),
1706 key.query_params, /*fragment=*/"");
1707 CHECK(uri.ok());
1708 return uri->ToString();
1709 }
1710 // Old-style name.
1711 return key.id;
1712 }
1713
ResetBackoff()1714 void XdsClient::ResetBackoff() {
1715 MutexLock lock(&mu_);
1716 for (auto& p : xds_channel_map_) {
1717 p.second->ResetBackoff();
1718 }
1719 }
1720
AppendNodeToStatus(const absl::Status & status) const1721 absl::Status XdsClient::AppendNodeToStatus(const absl::Status& status) const {
1722 const auto* node = bootstrap_->node();
1723 if (node == nullptr) return status;
1724 return absl::Status(
1725 status.code(), absl::StrCat(status.message(),
1726 " (node ID:", bootstrap_->node()->id(), ")"));
1727 }
1728
NotifyWatchersOnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsResourceType::ResourceData>> resource,WatcherSet watchers,RefCountedPtr<ReadDelayHandle> read_delay_handle)1729 void XdsClient::NotifyWatchersOnResourceChanged(
1730 absl::StatusOr<std::shared_ptr<const XdsResourceType::ResourceData>>
1731 resource,
1732 WatcherSet watchers, RefCountedPtr<ReadDelayHandle> read_delay_handle) {
1733 if (!resource.ok()) resource = AppendNodeToStatus(resource.status());
1734 work_serializer_.Schedule(
1735 [watchers = std::move(watchers), resource = std::move(resource),
1736 read_delay_handle = std::move(read_delay_handle)]()
1737 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1738 for (const auto& p : watchers) {
1739 p->OnGenericResourceChanged(resource, read_delay_handle);
1740 }
1741 },
1742 DEBUG_LOCATION);
1743 }
1744
NotifyWatchersOnAmbientError(absl::Status status,WatcherSet watchers,RefCountedPtr<ReadDelayHandle> read_delay_handle)1745 void XdsClient::NotifyWatchersOnAmbientError(
1746 absl::Status status, WatcherSet watchers,
1747 RefCountedPtr<ReadDelayHandle> read_delay_handle) {
1748 if (!status.ok()) status = AppendNodeToStatus(status);
1749 work_serializer_.Schedule(
1750 [watchers = std::move(watchers), status = std::move(status),
1751 read_delay_handle = std::move(read_delay_handle)]()
1752 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1753 for (const auto& p : watchers) {
1754 p->OnAmbientError(status, read_delay_handle);
1755 }
1756 },
1757 DEBUG_LOCATION);
1758 }
1759
DumpClientConfig(std::set<std::string> * string_pool,upb_Arena * arena,envoy_service_status_v3_ClientConfig * client_config)1760 void XdsClient::DumpClientConfig(
1761 std::set<std::string>* string_pool, upb_Arena* arena,
1762 envoy_service_status_v3_ClientConfig* client_config) {
1763 // Assemble config dump messages
1764 // Fill-in the node information
1765 auto* node =
1766 envoy_service_status_v3_ClientConfig_mutable_node(client_config, arena);
1767 PopulateXdsNode(bootstrap_->node(), user_agent_name_, user_agent_version_,
1768 node, arena);
1769 // Dump each resource.
1770 for (const auto& a : authority_state_map_) { // authority
1771 const std::string& authority = a.first;
1772 for (const auto& t : a.second.resource_map) { // type
1773 const XdsResourceType* type = t.first;
1774 auto it =
1775 string_pool
1776 ->emplace(absl::StrCat("type.googleapis.com/", type->type_url()))
1777 .first;
1778 upb_StringView type_url = StdStringToUpbString(*it);
1779 for (const auto& r : t.second) { // resource id
1780 auto it2 = string_pool
1781 ->emplace(ConstructFullXdsResourceName(
1782 authority, type->type_url(), r.first))
1783 .first;
1784 upb_StringView resource_name = StdStringToUpbString(*it2);
1785 envoy_service_status_v3_ClientConfig_GenericXdsConfig* entry =
1786 envoy_service_status_v3_ClientConfig_add_generic_xds_configs(
1787 client_config, arena);
1788 r.second.FillGenericXdsConfig(type_url, resource_name, arena, entry);
1789 }
1790 }
1791 }
1792 }
1793
ReportResourceCounts(absl::FunctionRef<void (const ResourceCountLabels &,uint64_t)> func)1794 void XdsClient::ReportResourceCounts(
1795 absl::FunctionRef<void(const ResourceCountLabels&, uint64_t)> func) {
1796 ResourceCountLabels labels;
1797 for (const auto& a : authority_state_map_) { // authority
1798 labels.xds_authority = a.first;
1799 for (const auto& t : a.second.resource_map) { // type
1800 labels.resource_type = t.first->type_url();
1801 // Count the number of entries in each state.
1802 std::map<absl::string_view, uint64_t> counts;
1803 for (const auto& r : t.second) { // resource id
1804 ++counts[r.second.CacheStateString()];
1805 }
1806 // Report the count for each state.
1807 for (const auto& c : counts) {
1808 labels.cache_state = c.first;
1809 func(labels, c.second);
1810 }
1811 }
1812 }
1813 }
1814
ReportServerConnections(absl::FunctionRef<void (absl::string_view,bool)> func)1815 void XdsClient::ReportServerConnections(
1816 absl::FunctionRef<void(absl::string_view, bool)> func) {
1817 for (const auto& p : xds_channel_map_) {
1818 func(p.second->server_uri(), p.second->status().ok());
1819 }
1820 }
1821
1822 } // namespace grpc_core
1823