• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/xds/xds_client/xds_client.h"
18 
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/support/port_platform.h>
21 #include <inttypes.h>
22 #include <string.h>
23 
24 #include <algorithm>
25 #include <functional>
26 #include <memory>
27 #include <string>
28 #include <type_traits>
29 #include <vector>
30 
31 #include "absl/cleanup/cleanup.h"
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "absl/strings/match.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/strings/str_join.h"
37 #include "absl/strings/str_split.h"
38 #include "absl/strings/string_view.h"
39 #include "absl/strings/strip.h"
40 #include "absl/types/optional.h"
41 #include "envoy/config/core/v3/base.upb.h"
42 #include "envoy/service/discovery/v3/discovery.upb.h"
43 #include "envoy/service/discovery/v3/discovery.upbdefs.h"
44 #include "google/protobuf/any.upb.h"
45 #include "google/protobuf/timestamp.upb.h"
46 #include "google/rpc/status.upb.h"
47 #include "src/core/lib/iomgr/exec_ctx.h"
48 #include "src/core/util/backoff.h"
49 #include "src/core/util/debug_location.h"
50 #include "src/core/util/orphanable.h"
51 #include "src/core/util/ref_counted_ptr.h"
52 #include "src/core/util/sync.h"
53 #include "src/core/util/upb_utils.h"
54 #include "src/core/util/uri.h"
55 #include "src/core/xds/xds_client/xds_api.h"
56 #include "src/core/xds/xds_client/xds_bootstrap.h"
57 #include "src/core/xds/xds_client/xds_locality.h"
58 #include "upb/base/string_view.h"
59 #include "upb/mem/arena.h"
60 #include "upb/reflection/def.h"
61 #include "upb/text/encode.h"
62 
63 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
64 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
65 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
66 #define GRPC_XDS_RECONNECT_JITTER 0.2
67 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
68 
69 namespace grpc_core {
70 
71 using ::grpc_event_engine::experimental::EventEngine;
72 
73 //
74 // Internal class declarations
75 //
76 
77 // An xds call wrapper that can restart a call upon failure. Holds a ref to
78 // the xds channel. The template parameter is the kind of wrapped xds call.
79 // TODO(roth): This is basically the same code as in LrsClient, and
80 // probably very similar to many other places in the codebase.
81 // Consider refactoring this into a common utility library somehow.
82 template <typename T>
83 class XdsClient::XdsChannel::RetryableCall final
84     : public InternallyRefCounted<RetryableCall<T>> {
85  public:
86   explicit RetryableCall(WeakRefCountedPtr<XdsChannel> xds_channel);
87 
88   // Disable thread-safety analysis because this method is called via
89   // OrphanablePtr<>, but there's no way to pass the lock annotation
90   // through there.
91   void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
92 
93   void OnCallFinishedLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
94 
call() const95   T* call() const { return call_.get(); }
xds_channel() const96   XdsChannel* xds_channel() const { return xds_channel_.get(); }
97 
98   bool IsCurrentCallOnChannel() const;
99 
100  private:
101   void StartNewCallLocked();
102   void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
103 
104   void OnRetryTimer();
105 
106   // The wrapped xds call that talks to the xds server. It's instantiated
107   // every time we start a new call. It's null during call retry backoff.
108   OrphanablePtr<T> call_;
109   // The owning xds channel.
110   WeakRefCountedPtr<XdsChannel> xds_channel_;
111 
112   // Retry state.
113   BackOff backoff_;
114   absl::optional<EventEngine::TaskHandle> timer_handle_
115       ABSL_GUARDED_BY(&XdsClient::mu_);
116 
117   bool shutting_down_ = false;
118 };
119 
120 // Contains an ADS call to the xds server.
121 class XdsClient::XdsChannel::AdsCall final
122     : public InternallyRefCounted<AdsCall> {
123  public:
124   // The ctor and dtor should not be used directly.
125   explicit AdsCall(RefCountedPtr<RetryableCall<AdsCall>> retryable_call);
126 
127   void Orphan() override;
128 
retryable_call() const129   RetryableCall<AdsCall>* retryable_call() const {
130     return retryable_call_.get();
131   }
xds_channel() const132   XdsChannel* xds_channel() const { return retryable_call_->xds_channel(); }
xds_client() const133   XdsClient* xds_client() const { return xds_channel()->xds_client(); }
seen_response() const134   bool seen_response() const { return seen_response_; }
135 
136   void SubscribeLocked(const XdsResourceType* type, const XdsResourceName& name,
137                        bool delay_send)
138       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
139   void UnsubscribeLocked(const XdsResourceType* type,
140                          const XdsResourceName& name, bool delay_unsubscription)
141       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
142 
143   bool HasSubscribedResources() const;
144 
145  private:
146   class AdsReadDelayHandle;
147 
148   class ResourceTimer final : public InternallyRefCounted<ResourceTimer> {
149    public:
ResourceTimer(const XdsResourceType * type,const XdsResourceName & name)150     ResourceTimer(const XdsResourceType* type, const XdsResourceName& name)
151         : type_(type), name_(name) {}
152 
153     // Disable thread-safety analysis because this method is called via
154     // OrphanablePtr<>, but there's no way to pass the lock annotation
155     // through there.
Orphan()156     void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS {
157       MaybeCancelTimer();
158       Unref(DEBUG_LOCATION, "Orphan");
159     }
160 
MarkSubscriptionSendStarted()161     void MarkSubscriptionSendStarted()
162         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
163       subscription_sent_ = true;
164     }
165 
MaybeMarkSubscriptionSendComplete(RefCountedPtr<AdsCall> ads_call)166     void MaybeMarkSubscriptionSendComplete(RefCountedPtr<AdsCall> ads_call)
167         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
168       if (subscription_sent_) MaybeStartTimer(std::move(ads_call));
169     }
170 
MarkSeen()171     void MarkSeen() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
172       resource_seen_ = true;
173       MaybeCancelTimer();
174     }
175 
MaybeCancelTimer()176     void MaybeCancelTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
177       if (timer_handle_.has_value() &&
178           ads_call_->xds_client()->engine()->Cancel(*timer_handle_)) {
179         timer_handle_.reset();
180         ads_call_.reset();
181       }
182     }
183 
184    private:
MaybeStartTimer(RefCountedPtr<AdsCall> ads_call)185     void MaybeStartTimer(RefCountedPtr<AdsCall> ads_call)
186         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
187       // Don't start timer if we've already either seen the resource or
188       // marked it as non-existing.
189       // Note: There are edge cases where we can have seen the resource
190       // before we have sent the initial subscription request, such as
191       // when we unsubscribe and then resubscribe to a given resource
192       // and then get a response containing that resource, all while a
193       // send_message op is in flight.
194       if (resource_seen_) return;
195       // Don't start timer if we haven't yet sent the initial subscription
196       // request for the resource.
197       if (!subscription_sent_) return;
198       // Don't start timer if it's already running.
199       if (timer_handle_.has_value()) return;
200       // Check if we already have a cached version of this resource
201       // (i.e., if this is the initial request for the resource after an
202       // ADS stream restart).  If so, we don't start the timer, because
203       // (a) we already have the resource and (b) the server may
204       // optimize by not resending the resource that we already have.
205       auto& authority_state =
206           ads_call->xds_client()->authority_state_map_[name_.authority];
207       ResourceState& state = authority_state.resource_map[type_][name_.key];
208       if (state.HasResource()) return;
209       // Start timer.
210       ads_call_ = std::move(ads_call);
211       timer_handle_ = ads_call_->xds_client()->engine()->RunAfter(
212           ads_call_->xds_client()->request_timeout_,
213           [self = Ref(DEBUG_LOCATION, "timer")]() {
214             ApplicationCallbackExecCtx callback_exec_ctx;
215             ExecCtx exec_ctx;
216             self->OnTimer();
217           });
218     }
219 
OnTimer()220     void OnTimer() {
221       {
222         MutexLock lock(&ads_call_->xds_client()->mu_);
223         timer_handle_.reset();
224         auto& authority_state =
225             ads_call_->xds_client()->authority_state_map_[name_.authority];
226         ResourceState& state = authority_state.resource_map[type_][name_.key];
227         // We might have received the resource after the timer fired but before
228         // the callback ran.
229         if (!state.HasResource()) {
230           GRPC_TRACE_LOG(xds_client, INFO)
231               << "[xds_client " << ads_call_->xds_client() << "] xds server "
232               << ads_call_->xds_channel()->server_.server_uri()
233               << ": timeout obtaining resource {type=" << type_->type_url()
234               << " name="
235               << XdsClient::ConstructFullXdsResourceName(
236                      name_.authority, type_->type_url(), name_.key)
237               << "} from xds server";
238           resource_seen_ = true;
239           state.SetDoesNotExist();
240           ads_call_->xds_client()->NotifyWatchersOnResourceChanged(
241               absl::NotFoundError("does not exist"), state.watchers(),
242               ReadDelayHandle::NoWait());
243         }
244       }
245       ads_call_->xds_client()->work_serializer_.DrainQueue();
246       ads_call_.reset();
247     }
248 
249     const XdsResourceType* type_;
250     const XdsResourceName name_;
251 
252     RefCountedPtr<AdsCall> ads_call_;
253     // True if we have sent the initial subscription request for this
254     // resource on this ADS stream.
255     bool subscription_sent_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
256     // True if we have either (a) seen the resource in a response on this
257     // stream or (b) declared the resource to not exist due to the timer
258     // firing.
259     bool resource_seen_ ABSL_GUARDED_BY(&XdsClient::mu_) = false;
260     absl::optional<EventEngine::TaskHandle> timer_handle_
261         ABSL_GUARDED_BY(&XdsClient::mu_);
262   };
263 
264   class StreamEventHandler final
265       : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler {
266    public:
StreamEventHandler(RefCountedPtr<AdsCall> ads_call)267     explicit StreamEventHandler(RefCountedPtr<AdsCall> ads_call)
268         : ads_call_(std::move(ads_call)) {}
269 
OnRequestSent(bool ok)270     void OnRequestSent(bool ok) override { ads_call_->OnRequestSent(ok); }
OnRecvMessage(absl::string_view payload)271     void OnRecvMessage(absl::string_view payload) override {
272       ads_call_->OnRecvMessage(payload);
273     }
OnStatusReceived(absl::Status status)274     void OnStatusReceived(absl::Status status) override {
275       ads_call_->OnStatusReceived(std::move(status));
276     }
277 
278    private:
279     RefCountedPtr<AdsCall> ads_call_;
280   };
281 
282   struct ResourceTypeState {
283     // Nonce and status for this resource type.
284     std::string nonce;
285     absl::Status status;
286 
287     // Subscribed resources of this type.
288     std::map<std::string /*authority*/,
289              std::map<XdsResourceKey, OrphanablePtr<ResourceTimer>>>
290         subscribed_resources;
291   };
292 
293   std::string CreateAdsRequest(absl::string_view type_url,
294                                absl::string_view version,
295                                absl::string_view nonce,
296                                const std::vector<std::string>& resource_names,
297                                absl::Status status) const
298       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
299 
300   void SendMessageLocked(const XdsResourceType* type)
301       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
302 
303   struct DecodeContext {
304     upb::Arena arena;
305     const XdsResourceType* type;
306     std::string type_url;
307     std::string version;
308     std::string nonce;
309     std::vector<std::string> errors;
310     std::map<std::string /*authority*/, std::set<XdsResourceKey>>
311         resources_seen;
312     uint64_t num_valid_resources = 0;
313     uint64_t num_invalid_resources = 0;
314     Timestamp update_time = Timestamp::Now();
315     RefCountedPtr<ReadDelayHandle> read_delay_handle;
316   };
317   void ParseResource(size_t idx, absl::string_view type_url,
318                      absl::string_view resource_name,
319                      absl::string_view serialized_resource,
320                      DecodeContext* context)
321       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
322   absl::Status DecodeAdsResponse(absl::string_view encoded_response,
323                                  DecodeContext* context)
324       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
325 
326   void OnRequestSent(bool ok);
327   void OnRecvMessage(absl::string_view payload);
328   void OnStatusReceived(absl::Status status);
329 
330   bool IsCurrentCallOnChannel() const;
331 
332   // Constructs a list of resource names of a given type for an ADS
333   // request.  Also starts the timer for each resource if needed.
334   std::vector<std::string> ResourceNamesForRequest(const XdsResourceType* type)
335       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
336 
337   // The owning RetryableCall<>.
338   RefCountedPtr<RetryableCall<AdsCall>> retryable_call_;
339 
340   OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
341       streaming_call_;
342 
343   bool sent_initial_message_ = false;
344   bool seen_response_ = false;
345 
346   const XdsResourceType* send_message_pending_
347       ABSL_GUARDED_BY(&XdsClient::mu_) = nullptr;
348 
349   // Resource types for which requests need to be sent.
350   std::set<const XdsResourceType*> buffered_requests_;
351 
352   // State for each resource type.
353   std::map<const XdsResourceType*, ResourceTypeState> state_map_;
354 };
355 
356 //
357 // XdsClient::XdsChannel::ConnectivityFailureWatcher
358 //
359 
360 class XdsClient::XdsChannel::ConnectivityFailureWatcher
361     : public XdsTransportFactory::XdsTransport::ConnectivityFailureWatcher {
362  public:
ConnectivityFailureWatcher(WeakRefCountedPtr<XdsChannel> xds_channel)363   explicit ConnectivityFailureWatcher(WeakRefCountedPtr<XdsChannel> xds_channel)
364       : xds_channel_(std::move(xds_channel)) {}
365 
OnConnectivityFailure(absl::Status status)366   void OnConnectivityFailure(absl::Status status) override {
367     xds_channel_->OnConnectivityFailure(std::move(status));
368   }
369 
370  private:
371   WeakRefCountedPtr<XdsChannel> xds_channel_;
372 };
373 
374 //
375 // XdsClient::XdsChannel
376 //
377 
XdsChannel(WeakRefCountedPtr<XdsClient> xds_client,const XdsBootstrap::XdsServer & server)378 XdsClient::XdsChannel::XdsChannel(WeakRefCountedPtr<XdsClient> xds_client,
379                                   const XdsBootstrap::XdsServer& server)
380     : DualRefCounted<XdsChannel>(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
381                                      ? "XdsChannel"
382                                      : nullptr),
383       xds_client_(std::move(xds_client)),
384       server_(server) {
385   GRPC_TRACE_LOG(xds_client, INFO)
386       << "[xds_client " << xds_client_.get() << "] creating channel " << this
387       << " for server " << server.server_uri();
388   absl::Status status;
389   transport_ = xds_client_->transport_factory_->GetTransport(server, &status);
390   CHECK(transport_ != nullptr);
391   if (!status.ok()) {
392     SetChannelStatusLocked(std::move(status));
393   } else {
394     failure_watcher_ = MakeRefCounted<ConnectivityFailureWatcher>(
395         WeakRef(DEBUG_LOCATION, "OnConnectivityFailure"));
396     transport_->StartConnectivityFailureWatch(failure_watcher_);
397   }
398 }
399 
~XdsChannel()400 XdsClient::XdsChannel::~XdsChannel() {
401   GRPC_TRACE_LOG(xds_client, INFO)
402       << "[xds_client " << xds_client() << "] destroying xds channel " << this
403       << " for server " << server_.server_uri();
404   xds_client_.reset(DEBUG_LOCATION, "XdsChannel");
405 }
406 
407 // This method should only ever be called when holding the lock, but we can't
408 // use a ABSL_EXCLUSIVE_LOCKS_REQUIRED annotation, because Orphan() will be
409 // called from DualRefCounted::Unref, which cannot have a lock annotation for
410 // a lock in this subclass.
Orphaned()411 void XdsClient::XdsChannel::Orphaned() ABSL_NO_THREAD_SAFETY_ANALYSIS {
412   GRPC_TRACE_LOG(xds_client, INFO)
413       << "[xds_client " << xds_client() << "] orphaning xds channel " << this
414       << " for server " << server_.server_uri();
415   shutting_down_ = true;
416   if (failure_watcher_ != nullptr) {
417     transport_->StopConnectivityFailureWatch(failure_watcher_);
418     failure_watcher_.reset();
419   }
420   transport_.reset();
421   // At this time, all strong refs are removed, remove from channel map to
422   // prevent subsequent subscription from trying to use this XdsChannel as
423   // it is shutting down.
424   xds_client_->xds_channel_map_.erase(server_.Key());
425   ads_call_.reset();
426 }
427 
ResetBackoff()428 void XdsClient::XdsChannel::ResetBackoff() { transport_->ResetBackoff(); }
429 
ads_call() const430 XdsClient::XdsChannel::AdsCall* XdsClient::XdsChannel::ads_call() const {
431   return ads_call_->call();
432 }
433 
SubscribeLocked(const XdsResourceType * type,const XdsResourceName & name)434 void XdsClient::XdsChannel::SubscribeLocked(const XdsResourceType* type,
435                                             const XdsResourceName& name) {
436   if (ads_call_ == nullptr) {
437     // Start the ADS call if this is the first request.
438     ads_call_.reset(
439         new RetryableCall<AdsCall>(WeakRef(DEBUG_LOCATION, "XdsChannel+ads")));
440     // Note: AdsCall's ctor will automatically subscribe to all
441     // resources that the XdsClient already has watchers for, so we can
442     // return here.
443     return;
444   }
445   // If the ADS call is in backoff state, we don't need to do anything now
446   // because when the call is restarted it will resend all necessary requests.
447   if (ads_call() == nullptr) return;
448   // Subscribe to this resource if the ADS call is active.
449   ads_call()->SubscribeLocked(type, name, /*delay_send=*/false);
450 }
451 
UnsubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_unsubscription)452 void XdsClient::XdsChannel::UnsubscribeLocked(const XdsResourceType* type,
453                                               const XdsResourceName& name,
454                                               bool delay_unsubscription) {
455   if (ads_call_ != nullptr) {
456     auto* call = ads_call_->call();
457     if (call != nullptr) {
458       call->UnsubscribeLocked(type, name, delay_unsubscription);
459       if (!call->HasSubscribedResources()) {
460         ads_call_.reset();
461       }
462     }
463   }
464 }
465 
MaybeFallbackLocked(const std::string & authority,AuthorityState & authority_state)466 bool XdsClient::XdsChannel::MaybeFallbackLocked(
467     const std::string& authority, AuthorityState& authority_state) {
468   if (!xds_client_->HasUncachedResources(authority_state)) {
469     return false;
470   }
471   std::vector<const XdsBootstrap::XdsServer*> xds_servers;
472   if (authority != kOldStyleAuthority) {
473     xds_servers =
474         xds_client_->bootstrap().LookupAuthority(authority)->servers();
475   }
476   if (xds_servers.empty()) xds_servers = xds_client_->bootstrap().servers();
477   for (size_t i = authority_state.xds_channels.size(); i < xds_servers.size();
478        ++i) {
479     authority_state.xds_channels.emplace_back(
480         xds_client_->GetOrCreateXdsChannelLocked(*xds_servers[i], "fallback"));
481     for (const auto& type_resource : authority_state.resource_map) {
482       for (const auto& key_state : type_resource.second) {
483         authority_state.xds_channels.back()->SubscribeLocked(
484             type_resource.first, {authority, key_state.first});
485       }
486     }
487     GRPC_TRACE_LOG(xds_client, INFO)
488         << "[xds_client " << xds_client_.get() << "] authority " << authority
489         << ": added fallback server " << xds_servers[i]->server_uri() << " ("
490         << authority_state.xds_channels.back()->status().ToString() << ")";
491     if (authority_state.xds_channels.back()->status().ok()) return true;
492   }
493   GRPC_TRACE_LOG(xds_client, INFO)
494       << "[xds_client " << xds_client_.get() << "] authority " << authority
495       << ": No fallback server";
496   return false;
497 }
498 
SetHealthyLocked()499 void XdsClient::XdsChannel::SetHealthyLocked() {
500   status_ = absl::OkStatus();
501   // Make this channel active iff:
502   // 1. Channel is on the list of authority channels
503   // 2. Channel is not the last channel on the list (i.e. not the active
504   // channel)
505   for (auto& authority : xds_client_->authority_state_map_) {
506     auto& channels = authority.second.xds_channels;
507     // Skip if channel is active.
508     if (channels.back() == this) continue;
509     auto channel_it = std::find(channels.begin(), channels.end(), this);
510     // Skip if this is not on the list
511     if (channel_it != channels.end()) {
512       GRPC_TRACE_LOG(xds_client, INFO)
513           << "[xds_client " << xds_client_.get() << "] authority "
514           << authority.first << ": Falling forward to " << server_.server_uri();
515       // Lower priority channels are no longer needed, connection is back!
516       channels.erase(channel_it + 1, channels.end());
517     }
518   }
519 }
520 
OnConnectivityFailure(absl::Status status)521 void XdsClient::XdsChannel::OnConnectivityFailure(absl::Status status) {
522   {
523     MutexLock lock(&xds_client_->mu_);
524     SetChannelStatusLocked(std::move(status));
525   }
526   xds_client_->work_serializer_.DrainQueue();
527 }
528 
SetChannelStatusLocked(absl::Status status)529 void XdsClient::XdsChannel::SetChannelStatusLocked(absl::Status status) {
530   if (shutting_down_) return;
531   status = absl::Status(status.code(), absl::StrCat("xDS channel for server ",
532                                                     server_.server_uri(), ": ",
533                                                     status.message()));
534   LOG(INFO) << "[xds_client " << xds_client() << "] " << status;
535   // If status was previously OK, report that the channel has gone unhealthy.
536   if (status_.ok() && xds_client_->metrics_reporter_ != nullptr) {
537     xds_client_->metrics_reporter_->ReportServerFailure(server_.server_uri());
538   }
539   // Save status in channel, so that we can immediately generate an
540   // error for any new watchers that may be started.
541   status_ = status;
542   // Find all watchers for this channel.
543   WatcherSet watchers_cached;
544   WatcherSet watchers_uncached;
545   for (auto& a : xds_client_->authority_state_map_) {  // authority
546     if (a.second.xds_channels.empty() || a.second.xds_channels.back() != this ||
547         MaybeFallbackLocked(a.first, a.second)) {
548       continue;
549     }
550     for (const auto& t : a.second.resource_map) {  // type
551       for (const auto& r : t.second) {             // resource id
552         auto& watchers =
553             r.second.HasResource() ? watchers_cached : watchers_uncached;
554         for (const auto& w : r.second.watchers()) {  // watchers
555           watchers.insert(w);
556         }
557       }
558     }
559   }
560   // Enqueue notifications for the watchers.
561   if (!watchers_cached.empty()) {
562     xds_client_->NotifyWatchersOnAmbientError(
563         status, std::move(watchers_cached), ReadDelayHandle::NoWait());
564   }
565   if (!watchers_uncached.empty()) {
566     xds_client_->NotifyWatchersOnResourceChanged(
567         status, std::move(watchers_uncached), ReadDelayHandle::NoWait());
568   }
569 }
570 
571 //
572 // XdsClient::XdsChannel::RetryableCall<>
573 //
574 
575 template <typename T>
RetryableCall(WeakRefCountedPtr<XdsChannel> xds_channel)576 XdsClient::XdsChannel::RetryableCall<T>::RetryableCall(
577     WeakRefCountedPtr<XdsChannel> xds_channel)
578     : xds_channel_(std::move(xds_channel)),
579       backoff_(BackOff::Options()
580                    .set_initial_backoff(Duration::Seconds(
581                        GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS))
582                    .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
583                    .set_jitter(GRPC_XDS_RECONNECT_JITTER)
584                    .set_max_backoff(Duration::Seconds(
585                        GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS))) {
586   StartNewCallLocked();
587 }
588 
589 template <typename T>
Orphan()590 void XdsClient::XdsChannel::RetryableCall<T>::Orphan() {
591   shutting_down_ = true;
592   call_.reset();
593   if (timer_handle_.has_value()) {
594     xds_channel()->xds_client()->engine()->Cancel(*timer_handle_);
595     timer_handle_.reset();
596   }
597   this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
598 }
599 
600 template <typename T>
OnCallFinishedLocked()601 void XdsClient::XdsChannel::RetryableCall<T>::OnCallFinishedLocked() {
602   // If we saw a response on the current stream, reset backoff.
603   if (call_->seen_response()) backoff_.Reset();
604   call_.reset();
605   // Start retry timer.
606   StartRetryTimerLocked();
607 }
608 
609 template <typename T>
StartNewCallLocked()610 void XdsClient::XdsChannel::RetryableCall<T>::StartNewCallLocked() {
611   if (shutting_down_) return;
612   CHECK(xds_channel_->transport_ != nullptr);
613   CHECK(call_ == nullptr);
614   GRPC_TRACE_LOG(xds_client, INFO)
615       << "[xds_client " << xds_channel()->xds_client() << "] xds server "
616       << xds_channel()->server_.server_uri()
617       << ": start new call from retryable call " << this;
618   call_ = MakeOrphanable<T>(
619       this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
620 }
621 
622 template <typename T>
StartRetryTimerLocked()623 void XdsClient::XdsChannel::RetryableCall<T>::StartRetryTimerLocked() {
624   if (shutting_down_) return;
625   const Duration delay = backoff_.NextAttemptDelay();
626   GRPC_TRACE_LOG(xds_client, INFO)
627       << "[xds_client " << xds_channel()->xds_client() << "] xds server "
628       << xds_channel()->server_.server_uri()
629       << ": call attempt failed; retry timer will fire in " << delay.millis()
630       << "ms.";
631   timer_handle_ = xds_channel()->xds_client()->engine()->RunAfter(
632       delay,
633       [self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() {
634         ApplicationCallbackExecCtx callback_exec_ctx;
635         ExecCtx exec_ctx;
636         self->OnRetryTimer();
637       });
638 }
639 
640 template <typename T>
OnRetryTimer()641 void XdsClient::XdsChannel::RetryableCall<T>::OnRetryTimer() {
642   MutexLock lock(&xds_channel_->xds_client()->mu_);
643   if (timer_handle_.has_value()) {
644     timer_handle_.reset();
645     if (shutting_down_) return;
646     GRPC_TRACE_LOG(xds_client, INFO)
647         << "[xds_client " << xds_channel()->xds_client() << "] xds server "
648         << xds_channel()->server_.server_uri()
649         << ": retry timer fired (retryable call: " << this << ")";
650     StartNewCallLocked();
651   }
652 }
653 
654 //
655 // XdsClient::XdsChannel::AdsCall::AdsReadDelayHandle
656 //
657 
658 class XdsClient::XdsChannel::AdsCall::AdsReadDelayHandle final
659     : public XdsClient::ReadDelayHandle {
660  public:
AdsReadDelayHandle(RefCountedPtr<AdsCall> ads_call)661   explicit AdsReadDelayHandle(RefCountedPtr<AdsCall> ads_call)
662       : ads_call_(std::move(ads_call)) {}
663 
~AdsReadDelayHandle()664   ~AdsReadDelayHandle() override {
665     MutexLock lock(&ads_call_->xds_client()->mu_);
666     auto call = ads_call_->streaming_call_.get();
667     if (call != nullptr) call->StartRecvMessage();
668   }
669 
670  private:
671   RefCountedPtr<AdsCall> ads_call_;
672 };
673 
674 //
675 // XdsClient::XdsChannel::AdsCall
676 //
677 
AdsCall(RefCountedPtr<RetryableCall<AdsCall>> retryable_call)678 XdsClient::XdsChannel::AdsCall::AdsCall(
679     RefCountedPtr<RetryableCall<AdsCall>> retryable_call)
680     : InternallyRefCounted<AdsCall>(
681           GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) ? "AdsCall" : nullptr),
682       retryable_call_(std::move(retryable_call)) {
683   CHECK_NE(xds_client(), nullptr);
684   // Init the ADS call.
685   const char* method =
686       "/envoy.service.discovery.v3.AggregatedDiscoveryService/"
687       "StreamAggregatedResources";
688   streaming_call_ = xds_channel()->transport_->CreateStreamingCall(
689       method, std::make_unique<StreamEventHandler>(
690                   // Passing the initial ref here.  This ref will go away when
691                   // the StreamEventHandler is destroyed.
692                   RefCountedPtr<AdsCall>(this)));
693   CHECK(streaming_call_ != nullptr);
694   // Start the call.
695   GRPC_TRACE_LOG(xds_client, INFO)
696       << "[xds_client " << xds_client() << "] xds server "
697       << xds_channel()->server_.server_uri()
698       << ": starting ADS call (ads_call: " << this
699       << ", streaming_call: " << streaming_call_.get() << ")";
700   // If this is a reconnect, add any necessary subscriptions from what's
701   // already in the cache.
702   for (auto& a : xds_client()->authority_state_map_) {
703     const std::string& authority = a.first;
704     auto it = std::find(a.second.xds_channels.begin(),
705                         a.second.xds_channels.end(), xds_channel());
706     // Skip authorities that are not using this xDS channel. The channel can be
707     // anywhere in the list.
708     if (it == a.second.xds_channels.end()) continue;
709     for (const auto& t : a.second.resource_map) {
710       const XdsResourceType* type = t.first;
711       for (const auto& r : t.second) {
712         const XdsResourceKey& resource_key = r.first;
713         SubscribeLocked(type, {authority, resource_key}, /*delay_send=*/true);
714       }
715     }
716   }
717   // Send initial message if we added any subscriptions above.
718   for (const auto& p : state_map_) {
719     SendMessageLocked(p.first);
720   }
721   streaming_call_->StartRecvMessage();
722 }
723 
Orphan()724 void XdsClient::XdsChannel::AdsCall::Orphan() {
725   state_map_.clear();
726   // Note that the initial ref is held by the StreamEventHandler, which
727   // will be destroyed when streaming_call_ is destroyed, which may not happen
728   // here, since there may be other refs held to streaming_call_ by internal
729   // callbacks.
730   streaming_call_.reset();
731 }
732 
SubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_send)733 void XdsClient::XdsChannel::AdsCall::SubscribeLocked(
734     const XdsResourceType* type, const XdsResourceName& name, bool delay_send) {
735   auto& state = state_map_[type].subscribed_resources[name.authority][name.key];
736   if (state == nullptr) {
737     state = MakeOrphanable<ResourceTimer>(type, name);
738     if (!delay_send) SendMessageLocked(type);
739   }
740 }
741 
UnsubscribeLocked(const XdsResourceType * type,const XdsResourceName & name,bool delay_unsubscription)742 void XdsClient::XdsChannel::AdsCall::UnsubscribeLocked(
743     const XdsResourceType* type, const XdsResourceName& name,
744     bool delay_unsubscription) {
745   auto& type_state_map = state_map_[type];
746   auto& authority_map = type_state_map.subscribed_resources[name.authority];
747   authority_map.erase(name.key);
748   if (authority_map.empty()) {
749     type_state_map.subscribed_resources.erase(name.authority);
750     // Note: We intentionally do not remove the top-level map entry for
751     // the resource type even if the authority map for the type is empty,
752     // because we need to retain the nonce in case a new watch is
753     // started for a resource of this type while this stream is still open.
754   }
755   // Don't need to send unsubscription message if this was the last
756   // resource we were subscribed to, since we'll be closing the stream
757   // immediately in that case.
758   if (!delay_unsubscription && HasSubscribedResources()) {
759     SendMessageLocked(type);
760   }
761 }
762 
HasSubscribedResources() const763 bool XdsClient::XdsChannel::AdsCall::HasSubscribedResources() const {
764   for (const auto& p : state_map_) {
765     if (!p.second.subscribed_resources.empty()) return true;
766   }
767   return false;
768 }
769 
770 namespace {
771 
MaybeLogDiscoveryRequest(const XdsClient * client,upb_DefPool * def_pool,const envoy_service_discovery_v3_DiscoveryRequest * request)772 void MaybeLogDiscoveryRequest(
773     const XdsClient* client, upb_DefPool* def_pool,
774     const envoy_service_discovery_v3_DiscoveryRequest* request) {
775   if (GRPC_TRACE_FLAG_ENABLED(xds_client) && ABSL_VLOG_IS_ON(2)) {
776     const upb_MessageDef* msg_type =
777         envoy_service_discovery_v3_DiscoveryRequest_getmsgdef(def_pool);
778     char buf[10240];
779     upb_TextEncode(reinterpret_cast<const upb_Message*>(request), msg_type,
780                    nullptr, 0, buf, sizeof(buf));
781     VLOG(2) << "[xds_client " << client << "] constructed ADS request: " << buf;
782   }
783 }
784 
SerializeDiscoveryRequest(upb_Arena * arena,envoy_service_discovery_v3_DiscoveryRequest * request)785 std::string SerializeDiscoveryRequest(
786     upb_Arena* arena, envoy_service_discovery_v3_DiscoveryRequest* request) {
787   size_t output_length;
788   char* output = envoy_service_discovery_v3_DiscoveryRequest_serialize(
789       request, arena, &output_length);
790   return std::string(output, output_length);
791 }
792 
793 }  // namespace
794 
CreateAdsRequest(absl::string_view type_url,absl::string_view version,absl::string_view nonce,const std::vector<std::string> & resource_names,absl::Status status) const795 std::string XdsClient::XdsChannel::AdsCall::CreateAdsRequest(
796     absl::string_view type_url, absl::string_view version,
797     absl::string_view nonce, const std::vector<std::string>& resource_names,
798     absl::Status status) const {
799   upb::Arena arena;
800   // Create a request.
801   envoy_service_discovery_v3_DiscoveryRequest* request =
802       envoy_service_discovery_v3_DiscoveryRequest_new(arena.ptr());
803   // Set type_url.
804   std::string type_url_str = absl::StrCat("type.googleapis.com/", type_url);
805   envoy_service_discovery_v3_DiscoveryRequest_set_type_url(
806       request, StdStringToUpbString(type_url_str));
807   // Set version_info.
808   if (!version.empty()) {
809     envoy_service_discovery_v3_DiscoveryRequest_set_version_info(
810         request, StdStringToUpbString(version));
811   }
812   // Set nonce.
813   if (!nonce.empty()) {
814     envoy_service_discovery_v3_DiscoveryRequest_set_response_nonce(
815         request, StdStringToUpbString(nonce));
816   }
817   // Set error_detail if it's a NACK.
818   std::string error_string_storage;
819   if (!status.ok()) {
820     google_rpc_Status* error_detail =
821         envoy_service_discovery_v3_DiscoveryRequest_mutable_error_detail(
822             request, arena.ptr());
823     // Hard-code INVALID_ARGUMENT as the status code.
824     // TODO(roth): If at some point we decide we care about this value,
825     // we could attach a status code to the individual errors where we
826     // generate them in the parsing code, and then use that here.
827     google_rpc_Status_set_code(error_detail, GRPC_STATUS_INVALID_ARGUMENT);
828     // Error description comes from the status that was passed in.
829     error_string_storage = std::string(status.message());
830     upb_StringView error_description =
831         StdStringToUpbString(error_string_storage);
832     google_rpc_Status_set_message(error_detail, error_description);
833   }
834   // Populate node.
835   if (!sent_initial_message_) {
836     envoy_config_core_v3_Node* node_msg =
837         envoy_service_discovery_v3_DiscoveryRequest_mutable_node(request,
838                                                                  arena.ptr());
839     PopulateXdsNode(xds_client()->bootstrap_->node(),
840                     xds_client()->user_agent_name_,
841                     xds_client()->user_agent_version_, node_msg, arena.ptr());
842     envoy_config_core_v3_Node_add_client_features(
843         node_msg, upb_StringView_FromString("xds.config.resource-in-sotw"),
844         arena.ptr());
845   }
846   // Add resource_names.
847   for (const std::string& resource_name : resource_names) {
848     envoy_service_discovery_v3_DiscoveryRequest_add_resource_names(
849         request, StdStringToUpbString(resource_name), arena.ptr());
850   }
851   MaybeLogDiscoveryRequest(xds_client(), xds_client()->def_pool_.ptr(),
852                            request);
853   return SerializeDiscoveryRequest(arena.ptr(), request);
854 }
855 
SendMessageLocked(const XdsResourceType * type)856 void XdsClient::XdsChannel::AdsCall::SendMessageLocked(
857     const XdsResourceType* type)
858     ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
859   // Buffer message sending if an existing message is in flight.
860   if (send_message_pending_ != nullptr) {
861     buffered_requests_.insert(type);
862     return;
863   }
864   auto& state = state_map_[type];
865   std::string serialized_message = CreateAdsRequest(
866       type->type_url(), xds_channel()->resource_type_version_map_[type],
867       state.nonce, ResourceNamesForRequest(type), state.status);
868   sent_initial_message_ = true;
869   GRPC_TRACE_LOG(xds_client, INFO)
870       << "[xds_client " << xds_client() << "] xds server "
871       << xds_channel()->server_.server_uri()
872       << ": sending ADS request: type=" << type->type_url()
873       << " version=" << xds_channel()->resource_type_version_map_[type]
874       << " nonce=" << state.nonce << " error=" << state.status;
875   state.status = absl::OkStatus();
876   streaming_call_->SendMessage(std::move(serialized_message));
877   send_message_pending_ = type;
878 }
879 
OnRequestSent(bool ok)880 void XdsClient::XdsChannel::AdsCall::OnRequestSent(bool ok) {
881   MutexLock lock(&xds_client()->mu_);
882   // For each resource that was in the message we just sent, start the
883   // resource timer if needed.
884   if (ok) {
885     auto& resource_type_state = state_map_[send_message_pending_];
886     for (const auto& p : resource_type_state.subscribed_resources) {
887       for (auto& q : p.second) {
888         q.second->MaybeMarkSubscriptionSendComplete(
889             Ref(DEBUG_LOCATION, "ResourceTimer"));
890       }
891     }
892   }
893   send_message_pending_ = nullptr;
894   if (ok && IsCurrentCallOnChannel()) {
895     // Continue to send another pending message if any.
896     // TODO(roth): The current code to handle buffered messages has the
897     // advantage of sending only the most recent list of resource names for
898     // each resource type (no matter how many times that resource type has
899     // been requested to send while the current message sending is still
900     // pending). But its disadvantage is that we send the requests in fixed
901     // order of resource types. We need to fix this if we are seeing some
902     // resource type(s) starved due to frequent requests of other resource
903     // type(s).
904     auto it = buffered_requests_.begin();
905     if (it != buffered_requests_.end()) {
906       SendMessageLocked(*it);
907       buffered_requests_.erase(it);
908     }
909   }
910 }
911 
ParseResource(size_t idx,absl::string_view type_url,absl::string_view resource_name,absl::string_view serialized_resource,DecodeContext * context)912 void XdsClient::XdsChannel::AdsCall::ParseResource(
913     size_t idx, absl::string_view type_url, absl::string_view resource_name,
914     absl::string_view serialized_resource, DecodeContext* context) {
915   std::string error_prefix = absl::StrCat(
916       "resource index ", idx, ": ",
917       resource_name.empty() ? "" : absl::StrCat(resource_name, ": "));
918   // Check the type_url of the resource.
919   if (context->type_url != type_url) {
920     context->errors.emplace_back(
921         absl::StrCat(error_prefix, "incorrect resource type \"", type_url,
922                      "\" (should be \"", context->type_url, "\")"));
923     ++context->num_invalid_resources;
924     return;
925   }
926   // Parse the resource.
927   XdsResourceType::DecodeContext resource_type_context = {
928       xds_client(), xds_channel()->server_, &xds_client_trace,
929       xds_client()->def_pool_.ptr(), context->arena.ptr()};
930   XdsResourceType::DecodeResult decode_result =
931       context->type->Decode(resource_type_context, serialized_resource);
932   // If we didn't already have the resource name from the Resource
933   // wrapper, try to get it from the decoding result.
934   if (resource_name.empty()) {
935     if (decode_result.name.has_value()) {
936       resource_name = *decode_result.name;
937       error_prefix =
938           absl::StrCat("resource index ", idx, ": ", resource_name, ": ");
939     } else {
940       // We don't have any way of determining the resource name, so
941       // there's nothing more we can do here.
942       context->errors.emplace_back(absl::StrCat(
943           error_prefix, decode_result.resource.status().ToString()));
944       ++context->num_invalid_resources;
945       return;
946     }
947   }
948   // If decoding failed, make sure we include the error in the NACK.
949   const absl::Status& decode_status = decode_result.resource.status();
950   if (!decode_status.ok()) {
951     context->errors.emplace_back(
952         absl::StrCat(error_prefix, decode_status.ToString()));
953   }
954   // Check the resource name.
955   auto parsed_resource_name =
956       xds_client()->ParseXdsResourceName(resource_name, context->type);
957   if (!parsed_resource_name.ok()) {
958     context->errors.emplace_back(
959         absl::StrCat(error_prefix, "Cannot parse xDS resource name"));
960     ++context->num_invalid_resources;
961     return;
962   }
963   // Cancel resource-does-not-exist timer, if needed.
964   auto timer_it = state_map_.find(context->type);
965   if (timer_it != state_map_.end()) {
966     auto it = timer_it->second.subscribed_resources.find(
967         parsed_resource_name->authority);
968     if (it != timer_it->second.subscribed_resources.end()) {
969       auto res_it = it->second.find(parsed_resource_name->key);
970       if (res_it != it->second.end()) {
971         res_it->second->MarkSeen();
972       }
973     }
974   }
975   // Lookup the authority in the cache.
976   auto authority_it =
977       xds_client()->authority_state_map_.find(parsed_resource_name->authority);
978   if (authority_it == xds_client()->authority_state_map_.end()) {
979     return;  // Skip resource -- we don't have a subscription for it.
980   }
981   // Found authority, so look up type.
982   AuthorityState& authority_state = authority_it->second;
983   auto type_it = authority_state.resource_map.find(context->type);
984   if (type_it == authority_state.resource_map.end()) {
985     return;  // Skip resource -- we don't have a subscription for it.
986   }
987   auto& type_map = type_it->second;
988   // Found type, so look up resource key.
989   auto it = type_map.find(parsed_resource_name->key);
990   if (it == type_map.end()) {
991     return;  // Skip resource -- we don't have a subscription for it.
992   }
993   ResourceState& resource_state = it->second;
994   // If needed, record that we've seen this resource.
995   if (context->type->AllResourcesRequiredInSotW()) {
996     context->resources_seen[parsed_resource_name->authority].insert(
997         parsed_resource_name->key);
998   }
999   // If we previously ignored the resource's deletion, log that we're
1000   // now re-adding it.
1001   if (resource_state.ignored_deletion()) {
1002     LOG(INFO) << "[xds_client " << xds_client() << "] xds server "
1003               << xds_channel()->server_.server_uri()
1004               << ": server returned new version of resource for which we "
1005                  "previously ignored a deletion: type "
1006               << type_url << " name " << resource_name;
1007     resource_state.set_ignored_deletion(false);
1008   }
1009   // Update resource state based on whether the resource is valid.
1010   absl::Status status = absl::InvalidArgumentError(
1011       absl::StrCat("invalid resource: ", decode_status.ToString()));
1012   if (!decode_status.ok()) {
1013     if (!resource_state.HasResource()) {
1014       xds_client()->NotifyWatchersOnResourceChanged(std::move(status),
1015                                                     resource_state.watchers(),
1016                                                     context->read_delay_handle);
1017     } else {
1018       xds_client()->NotifyWatchersOnAmbientError(std::move(status),
1019                                                  resource_state.watchers(),
1020                                                  context->read_delay_handle);
1021     }
1022     resource_state.SetNacked(context->version, decode_status.ToString(),
1023                              context->update_time);
1024     ++context->num_invalid_resources;
1025     return;
1026   }
1027   // Resource is valid.
1028   ++context->num_valid_resources;
1029   // Check if the resource has changed.
1030   const bool resource_identical =
1031       resource_state.HasResource() &&
1032       context->type->ResourcesEqual(resource_state.resource().get(),
1033                                     decode_result.resource->get());
1034   // If not changed, keep using the current decoded resource object.
1035   // This should avoid wasting memory, since external watchers may be
1036   // holding refs to the current object.
1037   if (resource_identical) decode_result.resource = resource_state.resource();
1038   // Update the resource state.
1039   resource_state.SetAcked(std::move(*decode_result.resource),
1040                           std::string(serialized_resource), context->version,
1041                           context->update_time);
1042   // If the resource didn't change, inhibit watcher notifications.
1043   if (resource_identical) {
1044     GRPC_TRACE_LOG(xds_client, INFO)
1045         << "[xds_client " << xds_client() << "] " << context->type_url
1046         << " resource " << resource_name << " identical to current, ignoring.";
1047     // If we previously had connectivity problems, notify watchers that
1048     // the ambient error has been cleared.
1049     if (!xds_channel()->status().ok()) {
1050       xds_client()->NotifyWatchersOnAmbientError(absl::OkStatus(),
1051                                                  resource_state.watchers(),
1052                                                  context->read_delay_handle);
1053     }
1054     return;
1055   }
1056   // Notify watchers.
1057   xds_client()->NotifyWatchersOnResourceChanged(resource_state.resource(),
1058                                                 resource_state.watchers(),
1059                                                 context->read_delay_handle);
1060 }
1061 
1062 namespace {
1063 
MaybeLogDiscoveryResponse(const XdsClient * client,upb_DefPool * def_pool,const envoy_service_discovery_v3_DiscoveryResponse * response)1064 void MaybeLogDiscoveryResponse(
1065     const XdsClient* client, upb_DefPool* def_pool,
1066     const envoy_service_discovery_v3_DiscoveryResponse* response) {
1067   if (GRPC_TRACE_FLAG_ENABLED(xds_client) && ABSL_VLOG_IS_ON(2)) {
1068     const upb_MessageDef* msg_type =
1069         envoy_service_discovery_v3_DiscoveryResponse_getmsgdef(def_pool);
1070     char buf[10240];
1071     upb_TextEncode(reinterpret_cast<const upb_Message*>(response), msg_type,
1072                    nullptr, 0, buf, sizeof(buf));
1073     VLOG(2) << "[xds_client " << client << "] received response: " << buf;
1074   }
1075 }
1076 
1077 }  // namespace
1078 
DecodeAdsResponse(absl::string_view encoded_response,DecodeContext * context)1079 absl::Status XdsClient::XdsChannel::AdsCall::DecodeAdsResponse(
1080     absl::string_view encoded_response, DecodeContext* context) {
1081   // Decode the response.
1082   const envoy_service_discovery_v3_DiscoveryResponse* response =
1083       envoy_service_discovery_v3_DiscoveryResponse_parse(
1084           encoded_response.data(), encoded_response.size(),
1085           context->arena.ptr());
1086   // If decoding fails, report a fatal error and return.
1087   if (response == nullptr) {
1088     return absl::InvalidArgumentError("Can't decode DiscoveryResponse.");
1089   }
1090   MaybeLogDiscoveryResponse(xds_client(), xds_client()->def_pool_.ptr(),
1091                             response);
1092   // Get the type_url, version, nonce, and number of resources.
1093   context->type_url = std::string(absl::StripPrefix(
1094       UpbStringToAbsl(
1095           envoy_service_discovery_v3_DiscoveryResponse_type_url(response)),
1096       "type.googleapis.com/"));
1097   context->version = UpbStringToStdString(
1098       envoy_service_discovery_v3_DiscoveryResponse_version_info(response));
1099   context->nonce = UpbStringToStdString(
1100       envoy_service_discovery_v3_DiscoveryResponse_nonce(response));
1101   size_t num_resources;
1102   const google_protobuf_Any* const* resources =
1103       envoy_service_discovery_v3_DiscoveryResponse_resources(response,
1104                                                              &num_resources);
1105   GRPC_TRACE_LOG(xds_client, INFO)
1106       << "[xds_client " << xds_client() << "] xds server "
1107       << xds_channel()->server_.server_uri()
1108       << ": received ADS response: type_url=" << context->type_url
1109       << ", version=" << context->version << ", nonce=" << context->nonce
1110       << ", num_resources=" << num_resources;
1111   context->type = xds_client()->GetResourceTypeLocked(context->type_url);
1112   if (context->type == nullptr) {
1113     return absl::InvalidArgumentError(
1114         absl::StrCat("unknown resource type ", context->type_url));
1115   }
1116   context->read_delay_handle = MakeRefCounted<AdsReadDelayHandle>(Ref());
1117   // Process each resource.
1118   for (size_t i = 0; i < num_resources; ++i) {
1119     absl::string_view type_url = absl::StripPrefix(
1120         UpbStringToAbsl(google_protobuf_Any_type_url(resources[i])),
1121         "type.googleapis.com/");
1122     absl::string_view serialized_resource =
1123         UpbStringToAbsl(google_protobuf_Any_value(resources[i]));
1124     // Unwrap Resource messages, if so wrapped.
1125     absl::string_view resource_name;
1126     if (type_url == "envoy.service.discovery.v3.Resource") {
1127       const auto* resource_wrapper = envoy_service_discovery_v3_Resource_parse(
1128           serialized_resource.data(), serialized_resource.size(),
1129           context->arena.ptr());
1130       if (resource_wrapper == nullptr) {
1131         context->errors.emplace_back(absl::StrCat(
1132             "resource index ", i, ": Can't decode Resource proto wrapper"));
1133         ++context->num_invalid_resources;
1134         continue;
1135       }
1136       const auto* resource =
1137           envoy_service_discovery_v3_Resource_resource(resource_wrapper);
1138       if (resource == nullptr) {
1139         context->errors.emplace_back(
1140             absl::StrCat("resource index ", i,
1141                          ": No resource present in Resource proto wrappe"));
1142         ++context->num_invalid_resources;
1143         continue;
1144       }
1145       type_url = absl::StripPrefix(
1146           UpbStringToAbsl(google_protobuf_Any_type_url(resource)),
1147           "type.googleapis.com/");
1148       serialized_resource =
1149           UpbStringToAbsl(google_protobuf_Any_value(resource));
1150       resource_name = UpbStringToAbsl(
1151           envoy_service_discovery_v3_Resource_name(resource_wrapper));
1152     }
1153     ParseResource(i, type_url, resource_name, serialized_resource, context);
1154   }
1155   return absl::OkStatus();
1156 }
1157 
OnRecvMessage(absl::string_view payload)1158 void XdsClient::XdsChannel::AdsCall::OnRecvMessage(absl::string_view payload) {
1159   // context.read_delay_handle needs to be destroyed after the mutex is
1160   // released.
1161   DecodeContext context;
1162   {
1163     MutexLock lock(&xds_client()->mu_);
1164     if (!IsCurrentCallOnChannel()) return;
1165     // Parse and validate the response.
1166     absl::Status status = DecodeAdsResponse(payload, &context);
1167     if (!status.ok()) {
1168       // Ignore unparsable response.
1169       LOG(ERROR) << "[xds_client " << xds_client() << "] xds server "
1170                  << xds_channel()->server_.server_uri()
1171                  << ": error parsing ADS response (" << status
1172                  << ") -- ignoring";
1173     } else {
1174       seen_response_ = true;
1175       xds_channel()->SetHealthyLocked();
1176       // Update nonce.
1177       auto& state = state_map_[context.type];
1178       state.nonce = context.nonce;
1179       // If we got an error, set state.status so that we'll NACK the update.
1180       if (!context.errors.empty()) {
1181         state.status = absl::UnavailableError(
1182             absl::StrCat("xDS response validation errors: [",
1183                          absl::StrJoin(context.errors, "; "), "]"));
1184         LOG(ERROR) << "[xds_client " << xds_client() << "] xds server "
1185                    << xds_channel()->server_.server_uri()
1186                    << ": ADS response invalid for resource type "
1187                    << context.type_url << " version " << context.version
1188                    << ", will NACK: nonce=" << state.nonce
1189                    << " status=" << state.status;
1190       }
1191       // Delete resources not seen in update if needed.
1192       if (context.type->AllResourcesRequiredInSotW()) {
1193         for (auto& a : xds_client()->authority_state_map_) {
1194           const std::string& authority = a.first;
1195           AuthorityState& authority_state = a.second;
1196           // Skip authorities that are not using this xDS channel.
1197           if (authority_state.xds_channels.back() != xds_channel()) {
1198             continue;
1199           }
1200           auto seen_authority_it = context.resources_seen.find(authority);
1201           // Find this resource type.
1202           auto type_it = authority_state.resource_map.find(context.type);
1203           if (type_it == authority_state.resource_map.end()) continue;
1204           // Iterate over resource ids.
1205           for (auto& r : type_it->second) {
1206             const XdsResourceKey& resource_key = r.first;
1207             ResourceState& resource_state = r.second;
1208             if (seen_authority_it == context.resources_seen.end() ||
1209                 seen_authority_it->second.find(resource_key) ==
1210                     seen_authority_it->second.end()) {
1211               // If the resource was newly requested but has not yet been
1212               // received, we don't want to generate an error for the
1213               // watchers, because this ADS response may be in reaction to an
1214               // earlier request that did not yet request the new resource, so
1215               // its absence from the response does not necessarily indicate
1216               // that the resource does not exist.  For that case, we rely on
1217               // the request timeout instead.
1218               if (!resource_state.HasResource()) continue;
1219               if (xds_channel()->server_.IgnoreResourceDeletion()) {
1220                 if (!resource_state.ignored_deletion()) {
1221                   LOG(ERROR)
1222                       << "[xds_client " << xds_client() << "] xds server "
1223                       << xds_channel()->server_.server_uri()
1224                       << ": ignoring deletion for resource type "
1225                       << context.type_url << " name "
1226                       << XdsClient::ConstructFullXdsResourceName(
1227                              authority, context.type_url.c_str(), resource_key);
1228                   resource_state.set_ignored_deletion(true);
1229                 }
1230               } else {
1231                 resource_state.SetDoesNotExist();
1232                 xds_client()->NotifyWatchersOnResourceChanged(
1233                     absl::NotFoundError("does not exist"),
1234                     resource_state.watchers(), context.read_delay_handle);
1235               }
1236             }
1237           }
1238         }
1239       }
1240       // If we had valid resources or the update was empty, update the version.
1241       if (context.num_valid_resources > 0 || context.errors.empty()) {
1242         xds_channel()->resource_type_version_map_[context.type] =
1243             std::move(context.version);
1244       }
1245       // Send ACK or NACK.
1246       SendMessageLocked(context.type);
1247     }
1248     // Update metrics.
1249     if (xds_client()->metrics_reporter_ != nullptr) {
1250       xds_client()->metrics_reporter_->ReportResourceUpdates(
1251           xds_channel()->server_.server_uri(), context.type_url,
1252           context.num_valid_resources, context.num_invalid_resources);
1253     }
1254   }
1255   xds_client()->work_serializer_.DrainQueue();
1256 }
1257 
OnStatusReceived(absl::Status status)1258 void XdsClient::XdsChannel::AdsCall::OnStatusReceived(absl::Status status) {
1259   {
1260     MutexLock lock(&xds_client()->mu_);
1261     GRPC_TRACE_LOG(xds_client, INFO)
1262         << "[xds_client " << xds_client() << "] xds server "
1263         << xds_channel()->server_.server_uri()
1264         << ": ADS call status received (xds_channel=" << xds_channel()
1265         << ", ads_call=" << this << ", streaming_call=" << streaming_call_.get()
1266         << "): " << status;
1267     // Cancel any does-not-exist timers that may be pending.
1268     for (const auto& p : state_map_) {
1269       for (const auto& q : p.second.subscribed_resources) {
1270         for (auto& r : q.second) {
1271           r.second->MaybeCancelTimer();
1272         }
1273       }
1274     }
1275     // Ignore status from a stale call.
1276     if (IsCurrentCallOnChannel()) {
1277       // Try to restart the call.
1278       retryable_call_->OnCallFinishedLocked();
1279       // If we didn't receive a response on the stream, report the
1280       // stream failure as a connectivity failure, which will report the
1281       // error to all watchers of resources on this channel.
1282       if (!seen_response_) {
1283         xds_channel()->SetChannelStatusLocked(absl::UnavailableError(
1284             absl::StrCat("xDS call failed with no responses received; status: ",
1285                          status.ToString())));
1286       }
1287     }
1288   }
1289   xds_client()->work_serializer_.DrainQueue();
1290 }
1291 
IsCurrentCallOnChannel() const1292 bool XdsClient::XdsChannel::AdsCall::IsCurrentCallOnChannel() const {
1293   // If the retryable ADS call is null (which only happens when the xds
1294   // channel is shutting down), all the ADS calls are stale.
1295   if (xds_channel()->ads_call_ == nullptr) return false;
1296   return this == xds_channel()->ads_call_->call();
1297 }
1298 
1299 std::vector<std::string>
ResourceNamesForRequest(const XdsResourceType * type)1300 XdsClient::XdsChannel::AdsCall::ResourceNamesForRequest(
1301     const XdsResourceType* type) {
1302   std::vector<std::string> resource_names;
1303   auto it = state_map_.find(type);
1304   if (it != state_map_.end()) {
1305     for (auto& a : it->second.subscribed_resources) {
1306       const std::string& authority = a.first;
1307       for (auto& p : a.second) {
1308         const XdsResourceKey& resource_key = p.first;
1309         resource_names.emplace_back(XdsClient::ConstructFullXdsResourceName(
1310             authority, type->type_url(), resource_key));
1311         OrphanablePtr<ResourceTimer>& resource_timer = p.second;
1312         resource_timer->MarkSubscriptionSendStarted();
1313       }
1314     }
1315   }
1316   return resource_names;
1317 }
1318 
1319 //
1320 // XdsClient::ResourceState
1321 //
1322 
SetAcked(std::shared_ptr<const XdsResourceType::ResourceData> resource,std::string serialized_proto,std::string version,Timestamp update_time)1323 void XdsClient::ResourceState::SetAcked(
1324     std::shared_ptr<const XdsResourceType::ResourceData> resource,
1325     std::string serialized_proto, std::string version, Timestamp update_time) {
1326   resource_ = std::move(resource);
1327   client_status_ = ClientResourceStatus::ACKED;
1328   serialized_proto_ = std::move(serialized_proto);
1329   update_time_ = update_time;
1330   version_ = std::move(version);
1331   failed_version_.clear();
1332   failed_details_.clear();
1333 }
1334 
SetNacked(const std::string & version,const std::string & details,Timestamp update_time)1335 void XdsClient::ResourceState::SetNacked(const std::string& version,
1336                                          const std::string& details,
1337                                          Timestamp update_time) {
1338   client_status_ = ClientResourceStatus::NACKED;
1339   failed_version_ = version;
1340   failed_details_ = details;
1341   failed_update_time_ = update_time;
1342 }
1343 
SetDoesNotExist()1344 void XdsClient::ResourceState::SetDoesNotExist() {
1345   resource_.reset();
1346   serialized_proto_.clear();
1347   client_status_ = ClientResourceStatus::DOES_NOT_EXIST;
1348   failed_version_.clear();
1349 }
1350 
CacheStateString() const1351 absl::string_view XdsClient::ResourceState::CacheStateString() const {
1352   switch (client_status_) {
1353     case ClientResourceStatus::REQUESTED:
1354       return "requested";
1355     case ClientResourceStatus::DOES_NOT_EXIST:
1356       return "does_not_exist";
1357     case ClientResourceStatus::ACKED:
1358       return "acked";
1359     case ClientResourceStatus::NACKED:
1360       return resource_ != nullptr ? "nacked_but_cached" : "nacked";
1361   }
1362   Crash("unknown resource state");
1363 }
1364 
1365 namespace {
1366 
EncodeTimestamp(Timestamp value,upb_Arena * arena)1367 google_protobuf_Timestamp* EncodeTimestamp(Timestamp value, upb_Arena* arena) {
1368   google_protobuf_Timestamp* timestamp = google_protobuf_Timestamp_new(arena);
1369   gpr_timespec timespec = value.as_timespec(GPR_CLOCK_REALTIME);
1370   google_protobuf_Timestamp_set_seconds(timestamp, timespec.tv_sec);
1371   google_protobuf_Timestamp_set_nanos(timestamp, timespec.tv_nsec);
1372   return timestamp;
1373 }
1374 
1375 }  // namespace
1376 
FillGenericXdsConfig(upb_StringView type_url,upb_StringView resource_name,upb_Arena * arena,envoy_service_status_v3_ClientConfig_GenericXdsConfig * entry) const1377 void XdsClient::ResourceState::FillGenericXdsConfig(
1378     upb_StringView type_url, upb_StringView resource_name, upb_Arena* arena,
1379     envoy_service_status_v3_ClientConfig_GenericXdsConfig* entry) const {
1380   envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_type_url(entry,
1381                                                                      type_url);
1382   envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_name(entry,
1383                                                                  resource_name);
1384   envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_client_status(
1385       entry, client_status_);
1386   if (!serialized_proto_.empty()) {
1387     envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_version_info(
1388         entry, StdStringToUpbString(version_));
1389     envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_last_updated(
1390         entry, EncodeTimestamp(update_time_, arena));
1391     auto* any_field =
1392         envoy_service_status_v3_ClientConfig_GenericXdsConfig_mutable_xds_config(
1393             entry, arena);
1394     google_protobuf_Any_set_type_url(any_field, type_url);
1395     google_protobuf_Any_set_value(any_field,
1396                                   StdStringToUpbString(serialized_proto_));
1397   }
1398   if (client_status_ == ClientResourceStatus::NACKED) {
1399     auto* update_failure_state = envoy_admin_v3_UpdateFailureState_new(arena);
1400     envoy_admin_v3_UpdateFailureState_set_details(
1401         update_failure_state, StdStringToUpbString(failed_details_));
1402     envoy_admin_v3_UpdateFailureState_set_version_info(
1403         update_failure_state, StdStringToUpbString(failed_version_));
1404     envoy_admin_v3_UpdateFailureState_set_last_update_attempt(
1405         update_failure_state, EncodeTimestamp(failed_update_time_, arena));
1406     envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_error_state(
1407         entry, update_failure_state);
1408   }
1409 }
1410 
1411 //
1412 // XdsClient
1413 //
1414 
1415 constexpr absl::string_view XdsClient::kOldStyleAuthority;
1416 
XdsClient(std::shared_ptr<XdsBootstrap> bootstrap,RefCountedPtr<XdsTransportFactory> transport_factory,std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,std::unique_ptr<XdsMetricsReporter> metrics_reporter,std::string user_agent_name,std::string user_agent_version,Duration resource_request_timeout)1417 XdsClient::XdsClient(
1418     std::shared_ptr<XdsBootstrap> bootstrap,
1419     RefCountedPtr<XdsTransportFactory> transport_factory,
1420     std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
1421     std::unique_ptr<XdsMetricsReporter> metrics_reporter,
1422     std::string user_agent_name, std::string user_agent_version,
1423     Duration resource_request_timeout)
1424     : DualRefCounted<XdsClient>(
1425           GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) ? "XdsClient" : nullptr),
1426       bootstrap_(std::move(bootstrap)),
1427       user_agent_name_(std::move(user_agent_name)),
1428       user_agent_version_(std::move(user_agent_version)),
1429       transport_factory_(std::move(transport_factory)),
1430       request_timeout_(resource_request_timeout),
1431       xds_federation_enabled_(XdsFederationEnabled()),
1432       work_serializer_(engine),
1433       engine_(std::move(engine)),
1434       metrics_reporter_(std::move(metrics_reporter)) {
1435   GRPC_TRACE_LOG(xds_client, INFO)
1436       << "[xds_client " << this << "] creating xds client";
1437   CHECK(bootstrap_ != nullptr);
1438   if (bootstrap_->node() != nullptr) {
1439     GRPC_TRACE_LOG(xds_client, INFO)
1440         << "[xds_client " << this
1441         << "] xDS node ID: " << bootstrap_->node()->id();
1442   }
1443 }
1444 
~XdsClient()1445 XdsClient::~XdsClient() {
1446   GRPC_TRACE_LOG(xds_client, INFO)
1447       << "[xds_client " << this << "] destroying xds client";
1448 }
1449 
Orphaned()1450 void XdsClient::Orphaned() {
1451   GRPC_TRACE_LOG(xds_client, INFO)
1452       << "[xds_client " << this << "] shutting down xds client";
1453   MutexLock lock(&mu_);
1454   shutting_down_ = true;
1455   // Clear cache and any remaining watchers that may not have been cancelled.
1456   authority_state_map_.clear();
1457   invalid_watchers_.clear();
1458 }
1459 
GetOrCreateXdsChannelLocked(const XdsBootstrap::XdsServer & server,const char * reason)1460 RefCountedPtr<XdsClient::XdsChannel> XdsClient::GetOrCreateXdsChannelLocked(
1461     const XdsBootstrap::XdsServer& server, const char* reason) {
1462   std::string key = server.Key();
1463   auto it = xds_channel_map_.find(key);
1464   if (it != xds_channel_map_.end()) {
1465     return it->second->Ref(DEBUG_LOCATION, reason);
1466   }
1467   // Channel not found, so create a new one.
1468   auto xds_channel =
1469       MakeRefCounted<XdsChannel>(WeakRef(DEBUG_LOCATION, "XdsChannel"), server);
1470   xds_channel_map_[std::move(key)] = xds_channel.get();
1471   return xds_channel;
1472 }
1473 
HasUncachedResources(const AuthorityState & authority_state)1474 bool XdsClient::HasUncachedResources(const AuthorityState& authority_state) {
1475   for (const auto& type_resource : authority_state.resource_map) {
1476     for (const auto& key_state : type_resource.second) {
1477       if (key_state.second.client_status() ==
1478           ResourceState::ClientResourceStatus::REQUESTED) {
1479         return true;
1480       }
1481     }
1482   }
1483   return false;
1484 }
1485 
WatchResource(const XdsResourceType * type,absl::string_view name,RefCountedPtr<ResourceWatcherInterface> watcher)1486 void XdsClient::WatchResource(const XdsResourceType* type,
1487                               absl::string_view name,
1488                               RefCountedPtr<ResourceWatcherInterface> watcher) {
1489   // Lambda for handling failure cases.
1490   auto fail = [&](absl::Status status) mutable {
1491     {
1492       MutexLock lock(&mu_);
1493       MaybeRegisterResourceTypeLocked(type);
1494       invalid_watchers_.insert(watcher);
1495     }
1496     NotifyWatchersOnResourceChanged(std::move(status), {watcher},
1497                                     ReadDelayHandle::NoWait());
1498     work_serializer_.DrainQueue();
1499   };
1500   auto resource_name = ParseXdsResourceName(name, type);
1501   if (!resource_name.ok()) {
1502     fail(absl::InvalidArgumentError(
1503         absl::StrCat("Unable to parse resource name ", name)));
1504     return;
1505   }
1506   // Find server to use.
1507   std::vector<const XdsBootstrap::XdsServer*> xds_servers;
1508   if (resource_name->authority != kOldStyleAuthority) {
1509     auto* authority =
1510         bootstrap_->LookupAuthority(std::string(resource_name->authority));
1511     if (authority == nullptr) {
1512       fail(absl::FailedPreconditionError(
1513           absl::StrCat("authority \"", resource_name->authority,
1514                        "\" not present in bootstrap config")));
1515       return;
1516     }
1517     xds_servers = authority->servers();
1518   }
1519   if (xds_servers.empty()) xds_servers = bootstrap_->servers();
1520   {
1521     MutexLock lock(&mu_);
1522     MaybeRegisterResourceTypeLocked(type);
1523     AuthorityState& authority_state =
1524         authority_state_map_[resource_name->authority];
1525     auto it_is_new = authority_state.resource_map[type].emplace(
1526         resource_name->key, ResourceState());
1527     bool first_watcher_for_resource = it_is_new.second;
1528     ResourceState& resource_state = it_is_new.first->second;
1529     resource_state.AddWatcher(watcher);
1530     bool notified_watcher = false;
1531     if (first_watcher_for_resource) {
1532       // We try to add new channels in 2 cases:
1533       // - This is the first resource for this authority (i.e., the list
1534       //   of channels is empty).
1535       // - The last channel in the list is failing.  That failure may not
1536       //   have previously triggered fallback if there were no uncached
1537       //   resources, but we've just added a new uncached resource,
1538       //   so we need to trigger fallback now.
1539       //
1540       // Note that when we add a channel, it might already be failing
1541       // due to being used in a different authority.  So we keep going
1542       // until either we add one that isn't failing or we've added them all.
1543       if (authority_state.xds_channels.empty() ||
1544           !authority_state.xds_channels.back()->status().ok()) {
1545         for (size_t i = authority_state.xds_channels.size();
1546              i < xds_servers.size(); ++i) {
1547           authority_state.xds_channels.emplace_back(
1548               GetOrCreateXdsChannelLocked(*xds_servers[i], "start watch"));
1549           if (authority_state.xds_channels.back()->status().ok()) {
1550             break;
1551           }
1552         }
1553       }
1554       for (const auto& channel : authority_state.xds_channels) {
1555         channel->SubscribeLocked(type, *resource_name);
1556       }
1557     } else {
1558       // If we already have a cached value for the resource, notify the new
1559       // watcher immediately.
1560       if (resource_state.HasResource()) {
1561         GRPC_TRACE_LOG(xds_client, INFO)
1562             << "[xds_client " << this << "] returning cached listener data for "
1563             << name;
1564         NotifyWatchersOnResourceChanged(resource_state.resource(), {watcher},
1565                                         ReadDelayHandle::NoWait());
1566         notified_watcher = true;
1567       } else if (resource_state.client_status() ==
1568                  ResourceState::ClientResourceStatus::DOES_NOT_EXIST) {
1569         GRPC_TRACE_LOG(xds_client, INFO)
1570             << "[xds_client " << this
1571             << "] reporting cached does-not-exist for " << name;
1572         NotifyWatchersOnResourceChanged(absl::NotFoundError("does not exist"),
1573                                         {watcher}, ReadDelayHandle::NoWait());
1574         notified_watcher = true;
1575       } else if (resource_state.client_status() ==
1576                  ResourceState::ClientResourceStatus::NACKED) {
1577         GRPC_TRACE_LOG(xds_client, INFO)
1578             << "[xds_client " << this
1579             << "] reporting cached validation failure for " << name << ": "
1580             << resource_state.failed_details();
1581         NotifyWatchersOnResourceChanged(
1582             absl::InvalidArgumentError(absl::StrCat(
1583                 "invalid resource: ", resource_state.failed_details())),
1584             {watcher}, ReadDelayHandle::NoWait());
1585         notified_watcher = true;
1586       }
1587     }
1588     // If the channel is not connected, report an error to the watcher.
1589     absl::Status channel_status = authority_state.xds_channels.back()->status();
1590     if (!channel_status.ok()) {
1591       GRPC_TRACE_LOG(xds_client, INFO)
1592           << "[xds_client " << this << "] returning cached channel error for "
1593           << name << ": " << channel_status;
1594       if (notified_watcher) {
1595         NotifyWatchersOnAmbientError(std::move(channel_status), {watcher},
1596                                      ReadDelayHandle::NoWait());
1597       } else {
1598         NotifyWatchersOnResourceChanged(std::move(channel_status), {watcher},
1599                                         ReadDelayHandle::NoWait());
1600       }
1601     }
1602   }
1603   work_serializer_.DrainQueue();
1604 }
1605 
CancelResourceWatch(const XdsResourceType * type,absl::string_view name,ResourceWatcherInterface * watcher,bool delay_unsubscription)1606 void XdsClient::CancelResourceWatch(const XdsResourceType* type,
1607                                     absl::string_view name,
1608                                     ResourceWatcherInterface* watcher,
1609                                     bool delay_unsubscription) {
1610   auto resource_name = ParseXdsResourceName(name, type);
1611   MutexLock lock(&mu_);
1612   // We cannot be sure whether the watcher is in invalid_watchers_ or in
1613   // authority_state_map_, so we check both, just to be safe.
1614   invalid_watchers_.erase(watcher);
1615   // Find authority.
1616   if (!resource_name.ok()) return;
1617   auto authority_it = authority_state_map_.find(resource_name->authority);
1618   if (authority_it == authority_state_map_.end()) return;
1619   AuthorityState& authority_state = authority_it->second;
1620   // Find type map.
1621   auto type_it = authority_state.resource_map.find(type);
1622   if (type_it == authority_state.resource_map.end()) return;
1623   auto& type_map = type_it->second;
1624   // Find resource key.
1625   auto resource_it = type_map.find(resource_name->key);
1626   if (resource_it == type_map.end()) return;
1627   ResourceState& resource_state = resource_it->second;
1628   // Remove watcher.
1629   resource_state.RemoveWatcher(watcher);
1630   // Clean up empty map entries, if any.
1631   if (!resource_state.HasWatchers()) {
1632     if (resource_state.ignored_deletion()) {
1633       LOG(INFO) << "[xds_client " << this
1634                 << "] unsubscribing from a resource for which we "
1635                 << "previously ignored a deletion: type " << type->type_url()
1636                 << " name " << name;
1637     }
1638     for (const auto& xds_channel : authority_state.xds_channels) {
1639       xds_channel->UnsubscribeLocked(type, *resource_name,
1640                                      delay_unsubscription);
1641     }
1642     type_map.erase(resource_it);
1643     if (type_map.empty()) {
1644       authority_state.resource_map.erase(type_it);
1645       if (authority_state.resource_map.empty()) {
1646         authority_state.xds_channels.clear();
1647       }
1648     }
1649   }
1650 }
1651 
MaybeRegisterResourceTypeLocked(const XdsResourceType * resource_type)1652 void XdsClient::MaybeRegisterResourceTypeLocked(
1653     const XdsResourceType* resource_type) {
1654   auto it = resource_types_.find(resource_type->type_url());
1655   if (it != resource_types_.end()) {
1656     CHECK(it->second == resource_type);
1657     return;
1658   }
1659   resource_types_.emplace(resource_type->type_url(), resource_type);
1660   resource_type->InitUpbSymtab(this, def_pool_.ptr());
1661 }
1662 
GetResourceTypeLocked(absl::string_view resource_type)1663 const XdsResourceType* XdsClient::GetResourceTypeLocked(
1664     absl::string_view resource_type) {
1665   auto it = resource_types_.find(resource_type);
1666   if (it != resource_types_.end()) return it->second;
1667   return nullptr;
1668 }
1669 
ParseXdsResourceName(absl::string_view name,const XdsResourceType * type)1670 absl::StatusOr<XdsClient::XdsResourceName> XdsClient::ParseXdsResourceName(
1671     absl::string_view name, const XdsResourceType* type) {
1672   // Old-style names use the empty string for authority.
1673   // authority is set to kOldStyleAuthority to indicate that it's an
1674   // old-style name.
1675   if (!xds_federation_enabled_ || !absl::StartsWith(name, "xdstp:")) {
1676     return XdsResourceName{std::string(kOldStyleAuthority),
1677                            {std::string(name), {}}};
1678   }
1679   // New style name.  Parse URI.
1680   auto uri = URI::Parse(name);
1681   if (!uri.ok()) return uri.status();
1682   // Split the resource type off of the path to get the id.
1683   std::pair<absl::string_view, absl::string_view> path_parts = absl::StrSplit(
1684       absl::StripPrefix(uri->path(), "/"), absl::MaxSplits('/', 1));
1685   if (type->type_url() != path_parts.first) {
1686     return absl::InvalidArgumentError(
1687         "xdstp URI path must indicate valid xDS resource type");
1688   }
1689   // Canonicalize order of query params.
1690   std::vector<URI::QueryParam> query_params;
1691   for (const auto& p : uri->query_parameter_map()) {
1692     query_params.emplace_back(
1693         URI::QueryParam{std::string(p.first), std::string(p.second)});
1694   }
1695   return XdsResourceName{
1696       uri->authority(),
1697       {std::string(path_parts.second), std::move(query_params)}};
1698 }
1699 
ConstructFullXdsResourceName(absl::string_view authority,absl::string_view resource_type,const XdsResourceKey & key)1700 std::string XdsClient::ConstructFullXdsResourceName(
1701     absl::string_view authority, absl::string_view resource_type,
1702     const XdsResourceKey& key) {
1703   if (authority != kOldStyleAuthority) {
1704     auto uri = URI::Create("xdstp", std::string(authority),
1705                            absl::StrCat("/", resource_type, "/", key.id),
1706                            key.query_params, /*fragment=*/"");
1707     CHECK(uri.ok());
1708     return uri->ToString();
1709   }
1710   // Old-style name.
1711   return key.id;
1712 }
1713 
ResetBackoff()1714 void XdsClient::ResetBackoff() {
1715   MutexLock lock(&mu_);
1716   for (auto& p : xds_channel_map_) {
1717     p.second->ResetBackoff();
1718   }
1719 }
1720 
AppendNodeToStatus(const absl::Status & status) const1721 absl::Status XdsClient::AppendNodeToStatus(const absl::Status& status) const {
1722   const auto* node = bootstrap_->node();
1723   if (node == nullptr) return status;
1724   return absl::Status(
1725       status.code(), absl::StrCat(status.message(),
1726                                   " (node ID:", bootstrap_->node()->id(), ")"));
1727 }
1728 
NotifyWatchersOnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsResourceType::ResourceData>> resource,WatcherSet watchers,RefCountedPtr<ReadDelayHandle> read_delay_handle)1729 void XdsClient::NotifyWatchersOnResourceChanged(
1730     absl::StatusOr<std::shared_ptr<const XdsResourceType::ResourceData>>
1731         resource,
1732     WatcherSet watchers, RefCountedPtr<ReadDelayHandle> read_delay_handle) {
1733   if (!resource.ok()) resource = AppendNodeToStatus(resource.status());
1734   work_serializer_.Schedule(
1735       [watchers = std::move(watchers), resource = std::move(resource),
1736        read_delay_handle = std::move(read_delay_handle)]()
1737           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1738             for (const auto& p : watchers) {
1739               p->OnGenericResourceChanged(resource, read_delay_handle);
1740             }
1741           },
1742       DEBUG_LOCATION);
1743 }
1744 
NotifyWatchersOnAmbientError(absl::Status status,WatcherSet watchers,RefCountedPtr<ReadDelayHandle> read_delay_handle)1745 void XdsClient::NotifyWatchersOnAmbientError(
1746     absl::Status status, WatcherSet watchers,
1747     RefCountedPtr<ReadDelayHandle> read_delay_handle) {
1748   if (!status.ok()) status = AppendNodeToStatus(status);
1749   work_serializer_.Schedule(
1750       [watchers = std::move(watchers), status = std::move(status),
1751        read_delay_handle = std::move(read_delay_handle)]()
1752           ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
1753             for (const auto& p : watchers) {
1754               p->OnAmbientError(status, read_delay_handle);
1755             }
1756           },
1757       DEBUG_LOCATION);
1758 }
1759 
DumpClientConfig(std::set<std::string> * string_pool,upb_Arena * arena,envoy_service_status_v3_ClientConfig * client_config)1760 void XdsClient::DumpClientConfig(
1761     std::set<std::string>* string_pool, upb_Arena* arena,
1762     envoy_service_status_v3_ClientConfig* client_config) {
1763   // Assemble config dump messages
1764   // Fill-in the node information
1765   auto* node =
1766       envoy_service_status_v3_ClientConfig_mutable_node(client_config, arena);
1767   PopulateXdsNode(bootstrap_->node(), user_agent_name_, user_agent_version_,
1768                   node, arena);
1769   // Dump each resource.
1770   for (const auto& a : authority_state_map_) {  // authority
1771     const std::string& authority = a.first;
1772     for (const auto& t : a.second.resource_map) {  // type
1773       const XdsResourceType* type = t.first;
1774       auto it =
1775           string_pool
1776               ->emplace(absl::StrCat("type.googleapis.com/", type->type_url()))
1777               .first;
1778       upb_StringView type_url = StdStringToUpbString(*it);
1779       for (const auto& r : t.second) {  // resource id
1780         auto it2 = string_pool
1781                        ->emplace(ConstructFullXdsResourceName(
1782                            authority, type->type_url(), r.first))
1783                        .first;
1784         upb_StringView resource_name = StdStringToUpbString(*it2);
1785         envoy_service_status_v3_ClientConfig_GenericXdsConfig* entry =
1786             envoy_service_status_v3_ClientConfig_add_generic_xds_configs(
1787                 client_config, arena);
1788         r.second.FillGenericXdsConfig(type_url, resource_name, arena, entry);
1789       }
1790     }
1791   }
1792 }
1793 
ReportResourceCounts(absl::FunctionRef<void (const ResourceCountLabels &,uint64_t)> func)1794 void XdsClient::ReportResourceCounts(
1795     absl::FunctionRef<void(const ResourceCountLabels&, uint64_t)> func) {
1796   ResourceCountLabels labels;
1797   for (const auto& a : authority_state_map_) {  // authority
1798     labels.xds_authority = a.first;
1799     for (const auto& t : a.second.resource_map) {  // type
1800       labels.resource_type = t.first->type_url();
1801       // Count the number of entries in each state.
1802       std::map<absl::string_view, uint64_t> counts;
1803       for (const auto& r : t.second) {  // resource id
1804         ++counts[r.second.CacheStateString()];
1805       }
1806       // Report the count for each state.
1807       for (const auto& c : counts) {
1808         labels.cache_state = c.first;
1809         func(labels, c.second);
1810       }
1811     }
1812   }
1813 }
1814 
ReportServerConnections(absl::FunctionRef<void (absl::string_view,bool)> func)1815 void XdsClient::ReportServerConnections(
1816     absl::FunctionRef<void(absl::string_view, bool)> func) {
1817   for (const auto& p : xds_channel_map_) {
1818     func(p.second->server_uri(), p.second->status().ok());
1819   }
1820 }
1821 
1822 }  // namespace grpc_core
1823