• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <inttypes.h>
20 #include <limits.h>
21 #include <string.h>
22 
23 #include "absl/container/inlined_vector.h"
24 #include "absl/strings/str_format.h"
25 #include "absl/strings/str_join.h"
26 #include "absl/strings/string_view.h"
27 
28 #include <grpc/byte_buffer_reader.h>
29 #include <grpc/grpc.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/time.h>
32 
33 #include "src/core/ext/filters/client_channel/client_channel.h"
34 #include "src/core/ext/filters/client_channel/service_config.h"
35 #include "src/core/ext/xds/xds_api.h"
36 #include "src/core/ext/xds/xds_bootstrap.h"
37 #include "src/core/ext/xds/xds_channel_args.h"
38 #include "src/core/ext/xds/xds_client.h"
39 #include "src/core/ext/xds/xds_client_stats.h"
40 #include "src/core/ext/xds/xds_http_filters.h"
41 #include "src/core/lib/address_utils/sockaddr_utils.h"
42 #include "src/core/lib/backoff/backoff.h"
43 #include "src/core/lib/channel/channel_args.h"
44 #include "src/core/lib/channel/channel_stack.h"
45 #include "src/core/lib/gpr/env.h"
46 #include "src/core/lib/gpr/string.h"
47 #include "src/core/lib/gprpp/memory.h"
48 #include "src/core/lib/gprpp/orphanable.h"
49 #include "src/core/lib/gprpp/ref_counted_ptr.h"
50 #include "src/core/lib/gprpp/sync.h"
51 #include "src/core/lib/iomgr/sockaddr.h"
52 #include "src/core/lib/iomgr/timer.h"
53 #include "src/core/lib/slice/slice_internal.h"
54 #include "src/core/lib/slice/slice_string_helpers.h"
55 #include "src/core/lib/surface/call.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/channel_init.h"
58 #include "src/core/lib/transport/static_metadata.h"
59 
60 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
61 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
62 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
63 #define GRPC_XDS_RECONNECT_JITTER 0.2
64 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
65 
66 namespace grpc_core {
67 
68 TraceFlag grpc_xds_client_trace(false, "xds_client");
69 TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
70 
71 namespace {
72 
73 Mutex* g_mu = nullptr;
74 const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr;
75 XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr;
76 char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr;
77 
78 }  // namespace
79 
80 //
81 // Internal class declarations
82 //
83 
84 // An xds call wrapper that can restart a call upon failure. Holds a ref to
85 // the xds channel. The template parameter is the kind of wrapped xds call.
86 template <typename T>
87 class XdsClient::ChannelState::RetryableCall
88     : public InternallyRefCounted<RetryableCall<T>> {
89  public:
90   explicit RetryableCall(RefCountedPtr<ChannelState> chand);
91 
92   void Orphan() override;
93 
94   void OnCallFinishedLocked();
95 
calld() const96   T* calld() const { return calld_.get(); }
chand() const97   ChannelState* chand() const { return chand_.get(); }
98 
99   bool IsCurrentCallOnChannel() const;
100 
101  private:
102   void StartNewCallLocked();
103   void StartRetryTimerLocked();
104   static void OnRetryTimer(void* arg, grpc_error_handle error);
105   void OnRetryTimerLocked(grpc_error_handle error);
106 
107   // The wrapped xds call that talks to the xds server. It's instantiated
108   // every time we start a new call. It's null during call retry backoff.
109   OrphanablePtr<T> calld_;
110   // The owning xds channel.
111   RefCountedPtr<ChannelState> chand_;
112 
113   // Retry state.
114   BackOff backoff_;
115   grpc_timer retry_timer_;
116   grpc_closure on_retry_timer_;
117   bool retry_timer_callback_pending_ = false;
118 
119   bool shutting_down_ = false;
120 };
121 
122 // Contains an ADS call to the xds server.
123 class XdsClient::ChannelState::AdsCallState
124     : public InternallyRefCounted<AdsCallState> {
125  public:
126   // The ctor and dtor should not be used directly.
127   explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
128   ~AdsCallState() override;
129 
130   void Orphan() override;
131 
parent() const132   RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
chand() const133   ChannelState* chand() const { return parent_->chand(); }
xds_client() const134   XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const135   bool seen_response() const { return seen_response_; }
136 
137   void SubscribeLocked(const std::string& type_url, const std::string& name)
138       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
139   void UnsubscribeLocked(const std::string& type_url, const std::string& name,
140                          bool delay_unsubscription)
141       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
142 
143   bool HasSubscribedResources() const;
144 
145  private:
146   class ResourceState : public InternallyRefCounted<ResourceState> {
147    public:
ResourceState(const std::string & type_url,const std::string & name,bool sent_initial_request)148     ResourceState(const std::string& type_url, const std::string& name,
149                   bool sent_initial_request)
150         : type_url_(type_url),
151           name_(name),
152           sent_initial_request_(sent_initial_request) {
153       GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
154                         grpc_schedule_on_exec_ctx);
155     }
156 
Orphan()157     void Orphan() override {
158       Finish();
159       Unref(DEBUG_LOCATION, "Orphan");
160     }
161 
Start(RefCountedPtr<AdsCallState> ads_calld)162     void Start(RefCountedPtr<AdsCallState> ads_calld) {
163       if (sent_initial_request_) return;
164       sent_initial_request_ = true;
165       ads_calld_ = std::move(ads_calld);
166       Ref(DEBUG_LOCATION, "timer").release();
167       timer_pending_ = true;
168       grpc_timer_init(
169           &timer_,
170           ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
171           &timer_callback_);
172     }
173 
Finish()174     void Finish() {
175       if (timer_pending_) {
176         grpc_timer_cancel(&timer_);
177         timer_pending_ = false;
178       }
179     }
180 
181    private:
OnTimer(void * arg,grpc_error_handle error)182     static void OnTimer(void* arg, grpc_error_handle error) {
183       ResourceState* self = static_cast<ResourceState*>(arg);
184       {
185         MutexLock lock(&self->ads_calld_->xds_client()->mu_);
186         self->OnTimerLocked(GRPC_ERROR_REF(error));
187       }
188       self->ads_calld_.reset();
189       self->Unref(DEBUG_LOCATION, "timer");
190     }
191 
OnTimerLocked(grpc_error_handle error)192     void OnTimerLocked(grpc_error_handle error)
193         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
194       if (error == GRPC_ERROR_NONE && timer_pending_) {
195         timer_pending_ = false;
196         grpc_error_handle watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
197             absl::StrFormat(
198                 "timeout obtaining resource {type=%s name=%s} from xds server",
199                 type_url_, name_)
200                 .c_str());
201         watcher_error = grpc_error_set_int(
202             watcher_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
203         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
204           gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
205                   grpc_error_std_string(watcher_error).c_str());
206         }
207         if (type_url_ == XdsApi::kLdsTypeUrl) {
208           ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
209           state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
210           for (const auto& p : state.watchers) {
211             p.first->OnError(GRPC_ERROR_REF(watcher_error));
212           }
213         } else if (type_url_ == XdsApi::kRdsTypeUrl) {
214           RouteConfigState& state =
215               ads_calld_->xds_client()->route_config_map_[name_];
216           state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
217           for (const auto& p : state.watchers) {
218             p.first->OnError(GRPC_ERROR_REF(watcher_error));
219           }
220         } else if (type_url_ == XdsApi::kCdsTypeUrl) {
221           ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
222           state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
223           for (const auto& p : state.watchers) {
224             p.first->OnError(GRPC_ERROR_REF(watcher_error));
225           }
226         } else if (type_url_ == XdsApi::kEdsTypeUrl) {
227           EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
228           state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
229           for (const auto& p : state.watchers) {
230             p.first->OnError(GRPC_ERROR_REF(watcher_error));
231           }
232         } else {
233           GPR_UNREACHABLE_CODE(return );
234         }
235         GRPC_ERROR_UNREF(watcher_error);
236       }
237       GRPC_ERROR_UNREF(error);
238     }
239 
240     const std::string type_url_;
241     const std::string name_;
242 
243     RefCountedPtr<AdsCallState> ads_calld_;
244     bool sent_initial_request_;
245     bool timer_pending_ = false;
246     grpc_timer timer_;
247     grpc_closure timer_callback_;
248   };
249 
250   struct ResourceTypeState {
~ResourceTypeStategrpc_core::XdsClient::ChannelState::AdsCallState::ResourceTypeState251     ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
252 
253     // Nonce and error for this resource type.
254     std::string nonce;
255     grpc_error_handle error = GRPC_ERROR_NONE;
256 
257     // Subscribed resources of this type.
258     std::map<std::string /* name */, OrphanablePtr<ResourceState>>
259         subscribed_resources;
260   };
261 
262   void SendMessageLocked(const std::string& type_url)
263       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
264 
265   void AcceptLdsUpdateLocked(std::string version, grpc_millis update_time,
266                              XdsApi::LdsUpdateMap lds_update_map)
267       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
268   void AcceptRdsUpdateLocked(std::string version, grpc_millis update_time,
269                              XdsApi::RdsUpdateMap rds_update_map)
270       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
271   void AcceptCdsUpdateLocked(std::string version, grpc_millis update_time,
272                              XdsApi::CdsUpdateMap cds_update_map)
273       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
274   void AcceptEdsUpdateLocked(std::string version, grpc_millis update_time,
275                              XdsApi::EdsUpdateMap eds_update_map)
276       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
277 
278   static void OnRequestSent(void* arg, grpc_error_handle error);
279   void OnRequestSentLocked(grpc_error_handle error)
280       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
281   static void OnResponseReceived(void* arg, grpc_error_handle error);
282   bool OnResponseReceivedLocked()
283       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
284   static void OnStatusReceived(void* arg, grpc_error_handle error);
285   void OnStatusReceivedLocked(grpc_error_handle error)
286       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
287 
288   bool IsCurrentCallOnChannel() const;
289 
290   std::set<absl::string_view> ResourceNamesForRequest(
291       const std::string& type_url);
292 
293   // The owning RetryableCall<>.
294   RefCountedPtr<RetryableCall<AdsCallState>> parent_;
295 
296   bool sent_initial_message_ = false;
297   bool seen_response_ = false;
298 
299   // Always non-NULL.
300   grpc_call* call_;
301 
302   // recv_initial_metadata
303   grpc_metadata_array initial_metadata_recv_;
304 
305   // send_message
306   grpc_byte_buffer* send_message_payload_ = nullptr;
307   grpc_closure on_request_sent_;
308 
309   // recv_message
310   grpc_byte_buffer* recv_message_payload_ = nullptr;
311   grpc_closure on_response_received_;
312 
313   // recv_trailing_metadata
314   grpc_metadata_array trailing_metadata_recv_;
315   grpc_status_code status_code_;
316   grpc_slice status_details_;
317   grpc_closure on_status_received_;
318 
319   // Resource types for which requests need to be sent.
320   std::set<std::string /*type_url*/> buffered_requests_;
321 
322   // State for each resource type.
323   std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
324 };
325 
326 // Contains an LRS call to the xds server.
327 class XdsClient::ChannelState::LrsCallState
328     : public InternallyRefCounted<LrsCallState> {
329  public:
330   // The ctor and dtor should not be used directly.
331   explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
332   ~LrsCallState() override;
333 
334   void Orphan() override;
335 
336   void MaybeStartReportingLocked();
337 
parent()338   RetryableCall<LrsCallState>* parent() { return parent_.get(); }
chand() const339   ChannelState* chand() const { return parent_->chand(); }
xds_client() const340   XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const341   bool seen_response() const { return seen_response_; }
342 
343  private:
344   // Reports client-side load stats according to a fixed interval.
345   class Reporter : public InternallyRefCounted<Reporter> {
346    public:
Reporter(RefCountedPtr<LrsCallState> parent,grpc_millis report_interval)347     Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
348         : parent_(std::move(parent)), report_interval_(report_interval) {
349       GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
350                         grpc_schedule_on_exec_ctx);
351       GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
352                         grpc_schedule_on_exec_ctx);
353       ScheduleNextReportLocked();
354     }
355 
356     void Orphan() override;
357 
358    private:
359     void ScheduleNextReportLocked()
360         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
361     static void OnNextReportTimer(void* arg, grpc_error_handle error);
362     bool OnNextReportTimerLocked(grpc_error_handle error)
363         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
364     bool SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
365     static void OnReportDone(void* arg, grpc_error_handle error);
366     bool OnReportDoneLocked(grpc_error_handle error)
367         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
368 
IsCurrentReporterOnCall() const369     bool IsCurrentReporterOnCall() const {
370       return this == parent_->reporter_.get();
371     }
xds_client() const372     XdsClient* xds_client() const { return parent_->xds_client(); }
373 
374     // The owning LRS call.
375     RefCountedPtr<LrsCallState> parent_;
376 
377     // The load reporting state.
378     const grpc_millis report_interval_;
379     bool last_report_counters_were_zero_ = false;
380     bool next_report_timer_callback_pending_ = false;
381     grpc_timer next_report_timer_;
382     grpc_closure on_next_report_timer_;
383     grpc_closure on_report_done_;
384   };
385 
386   static void OnInitialRequestSent(void* arg, grpc_error_handle error);
387   void OnInitialRequestSentLocked()
388       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
389   static void OnResponseReceived(void* arg, grpc_error_handle error);
390   bool OnResponseReceivedLocked()
391       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
392   static void OnStatusReceived(void* arg, grpc_error_handle error);
393   void OnStatusReceivedLocked(grpc_error_handle error)
394       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
395 
396   bool IsCurrentCallOnChannel() const;
397 
398   // The owning RetryableCall<>.
399   RefCountedPtr<RetryableCall<LrsCallState>> parent_;
400   bool seen_response_ = false;
401 
402   // Always non-NULL.
403   grpc_call* call_;
404 
405   // recv_initial_metadata
406   grpc_metadata_array initial_metadata_recv_;
407 
408   // send_message
409   grpc_byte_buffer* send_message_payload_ = nullptr;
410   grpc_closure on_initial_request_sent_;
411 
412   // recv_message
413   grpc_byte_buffer* recv_message_payload_ = nullptr;
414   grpc_closure on_response_received_;
415 
416   // recv_trailing_metadata
417   grpc_metadata_array trailing_metadata_recv_;
418   grpc_status_code status_code_;
419   grpc_slice status_details_;
420   grpc_closure on_status_received_;
421 
422   // Load reporting state.
423   bool send_all_clusters_ = false;
424   std::set<std::string> cluster_names_;  // Asked for by the LRS server.
425   grpc_millis load_reporting_interval_ = 0;
426   OrphanablePtr<Reporter> reporter_;
427 };
428 
429 //
430 // XdsClient::ChannelState::StateWatcher
431 //
432 
433 class XdsClient::ChannelState::StateWatcher
434     : public AsyncConnectivityStateWatcherInterface {
435  public:
StateWatcher(RefCountedPtr<ChannelState> parent)436   explicit StateWatcher(RefCountedPtr<ChannelState> parent)
437       : parent_(std::move(parent)) {}
438 
439  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)440   void OnConnectivityStateChange(grpc_connectivity_state new_state,
441                                  const absl::Status& status) override {
442     MutexLock lock(&parent_->xds_client_->mu_);
443     if (!parent_->shutting_down_ &&
444         new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
445       // In TRANSIENT_FAILURE.  Notify all watchers of error.
446       gpr_log(GPR_INFO,
447               "[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
448               "status_message:(%s)",
449               parent_->xds_client(), status.ToString().c_str());
450       parent_->xds_client_->NotifyOnErrorLocked(
451           GRPC_ERROR_CREATE_FROM_STATIC_STRING(
452               "xds channel in TRANSIENT_FAILURE"));
453     }
454   }
455 
456   RefCountedPtr<ChannelState> parent_;
457 };
458 
459 //
460 // XdsClient::ChannelState
461 //
462 
463 namespace {
464 
CreateXdsChannel(grpc_channel_args * args,const XdsBootstrap::XdsServer & server)465 grpc_channel* CreateXdsChannel(grpc_channel_args* args,
466                                const XdsBootstrap::XdsServer& server) {
467   RefCountedPtr<grpc_channel_credentials> channel_creds =
468       XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type,
469                                                 server.channel_creds_config);
470   return grpc_secure_channel_create(channel_creds.get(),
471                                     server.server_uri.c_str(), args, nullptr);
472 }
473 
474 }  // namespace
475 
ChannelState(WeakRefCountedPtr<XdsClient> xds_client,const XdsBootstrap::XdsServer & server)476 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
477                                       const XdsBootstrap::XdsServer& server)
478     : InternallyRefCounted<ChannelState>(
479           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
480               ? "ChannelState"
481               : nullptr),
482       xds_client_(std::move(xds_client)),
483       server_(server) {
484   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
485     gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
486             xds_client_.get(), server.server_uri.c_str());
487   }
488   channel_ = CreateXdsChannel(xds_client_->args_, server);
489   GPR_ASSERT(channel_ != nullptr);
490   StartConnectivityWatchLocked();
491 }
492 
~ChannelState()493 XdsClient::ChannelState::~ChannelState() {
494   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
495     gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
496             this);
497   }
498   grpc_channel_destroy(channel_);
499   xds_client_.reset(DEBUG_LOCATION, "ChannelState");
500 }
501 
Orphan()502 void XdsClient::ChannelState::Orphan() {
503   shutting_down_ = true;
504   CancelConnectivityWatchLocked();
505   ads_calld_.reset();
506   lrs_calld_.reset();
507   Unref(DEBUG_LOCATION, "ChannelState+orphaned");
508 }
509 
ads_calld() const510 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
511     const {
512   return ads_calld_->calld();
513 }
514 
lrs_calld() const515 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
516     const {
517   return lrs_calld_->calld();
518 }
519 
HasActiveAdsCall() const520 bool XdsClient::ChannelState::HasActiveAdsCall() const {
521   return ads_calld_ != nullptr && ads_calld_->calld() != nullptr;
522 }
523 
MaybeStartLrsCall()524 void XdsClient::ChannelState::MaybeStartLrsCall() {
525   if (lrs_calld_ != nullptr) return;
526   lrs_calld_.reset(
527       new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
528 }
529 
StopLrsCall()530 void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
531 
StartConnectivityWatchLocked()532 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
533   ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
534   GPR_ASSERT(client_channel != nullptr);
535   watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
536   client_channel->AddConnectivityWatcher(
537       GRPC_CHANNEL_IDLE,
538       OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
539 }
540 
CancelConnectivityWatchLocked()541 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
542   ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
543   GPR_ASSERT(client_channel != nullptr);
544   client_channel->RemoveConnectivityWatcher(watcher_);
545 }
546 
SubscribeLocked(const std::string & type_url,const std::string & name)547 void XdsClient::ChannelState::SubscribeLocked(const std::string& type_url,
548                                               const std::string& name) {
549   if (ads_calld_ == nullptr) {
550     // Start the ADS call if this is the first request.
551     ads_calld_.reset(new RetryableCall<AdsCallState>(
552         Ref(DEBUG_LOCATION, "ChannelState+ads")));
553     // Note: AdsCallState's ctor will automatically subscribe to all
554     // resources that the XdsClient already has watchers for, so we can
555     // return here.
556     return;
557   }
558   // If the ADS call is in backoff state, we don't need to do anything now
559   // because when the call is restarted it will resend all necessary requests.
560   if (ads_calld() == nullptr) return;
561   // Subscribe to this resource if the ADS call is active.
562   ads_calld()->SubscribeLocked(type_url, name);
563 }
564 
UnsubscribeLocked(const std::string & type_url,const std::string & name,bool delay_unsubscription)565 void XdsClient::ChannelState::UnsubscribeLocked(const std::string& type_url,
566                                                 const std::string& name,
567                                                 bool delay_unsubscription) {
568   if (ads_calld_ != nullptr) {
569     auto* calld = ads_calld_->calld();
570     if (calld != nullptr) {
571       calld->UnsubscribeLocked(type_url, name, delay_unsubscription);
572       if (!calld->HasSubscribedResources()) ads_calld_.reset();
573     }
574   }
575 }
576 
577 //
578 // XdsClient::ChannelState::RetryableCall<>
579 //
580 
581 template <typename T>
RetryableCall(RefCountedPtr<ChannelState> chand)582 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
583     RefCountedPtr<ChannelState> chand)
584     : chand_(std::move(chand)),
585       backoff_(
586           BackOff::Options()
587               .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
588                                    1000)
589               .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
590               .set_jitter(GRPC_XDS_RECONNECT_JITTER)
591               .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
592   // Closure Initialization
593   GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
594                     grpc_schedule_on_exec_ctx);
595   StartNewCallLocked();
596 }
597 
598 template <typename T>
Orphan()599 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
600   shutting_down_ = true;
601   calld_.reset();
602   if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
603   this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
604 }
605 
606 template <typename T>
OnCallFinishedLocked()607 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
608   const bool seen_response = calld_->seen_response();
609   calld_.reset();
610   if (seen_response) {
611     // If we lost connection to the xds server, reset backoff and restart the
612     // call immediately.
613     backoff_.Reset();
614     StartNewCallLocked();
615   } else {
616     // If we failed to connect to the xds server, retry later.
617     StartRetryTimerLocked();
618   }
619 }
620 
621 template <typename T>
StartNewCallLocked()622 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
623   if (shutting_down_) return;
624   GPR_ASSERT(chand_->channel_ != nullptr);
625   GPR_ASSERT(calld_ == nullptr);
626   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
627     gpr_log(GPR_INFO,
628             "[xds_client %p] Start new call from retryable call (chand: %p, "
629             "retryable call: %p)",
630             chand()->xds_client(), chand(), this);
631   }
632   calld_ = MakeOrphanable<T>(
633       this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
634 }
635 
636 template <typename T>
StartRetryTimerLocked()637 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
638   if (shutting_down_) return;
639   const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
640   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
641     grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
642     gpr_log(GPR_INFO,
643             "[xds_client %p] Failed to connect to xds server (chand: %p) "
644             "retry timer will fire in %" PRId64 "ms.",
645             chand()->xds_client(), chand(), timeout);
646   }
647   this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
648   grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
649   retry_timer_callback_pending_ = true;
650 }
651 
652 template <typename T>
OnRetryTimer(void * arg,grpc_error_handle error)653 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
654     void* arg, grpc_error_handle error) {
655   RetryableCall* calld = static_cast<RetryableCall*>(arg);
656   {
657     MutexLock lock(&calld->chand_->xds_client()->mu_);
658     calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
659   }
660   calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
661 }
662 
663 template <typename T>
OnRetryTimerLocked(grpc_error_handle error)664 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
665     grpc_error_handle error) {
666   retry_timer_callback_pending_ = false;
667   if (!shutting_down_ && error == GRPC_ERROR_NONE) {
668     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
669       gpr_log(
670           GPR_INFO,
671           "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
672           chand()->xds_client(), chand(), this);
673     }
674     StartNewCallLocked();
675   }
676   GRPC_ERROR_UNREF(error);
677 }
678 
679 //
680 // XdsClient::ChannelState::AdsCallState
681 //
682 
AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent)683 XdsClient::ChannelState::AdsCallState::AdsCallState(
684     RefCountedPtr<RetryableCall<AdsCallState>> parent)
685     : InternallyRefCounted<AdsCallState>(
686           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
687               ? "AdsCallState"
688               : nullptr),
689       parent_(std::move(parent)) {
690   // Init the ADS call. Note that the call will progress every time there's
691   // activity in xds_client()->interested_parties_, which is comprised of
692   // the polling entities from client_channel.
693   GPR_ASSERT(xds_client() != nullptr);
694   // Create a call with the specified method name.
695   const auto& method =
696       chand()->server_.ShouldUseV3()
697           ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
698           : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
699   call_ = grpc_channel_create_pollset_set_call(
700       chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
701       xds_client()->interested_parties_, method, nullptr,
702       GRPC_MILLIS_INF_FUTURE, nullptr);
703   GPR_ASSERT(call_ != nullptr);
704   // Init data associated with the call.
705   grpc_metadata_array_init(&initial_metadata_recv_);
706   grpc_metadata_array_init(&trailing_metadata_recv_);
707   // Start the call.
708   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
709     gpr_log(GPR_INFO,
710             "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
711             "call: %p)",
712             xds_client(), chand(), this, call_);
713   }
714   // Create the ops.
715   grpc_call_error call_error;
716   grpc_op ops[3];
717   memset(ops, 0, sizeof(ops));
718   // Op: send initial metadata.
719   grpc_op* op = ops;
720   op->op = GRPC_OP_SEND_INITIAL_METADATA;
721   op->data.send_initial_metadata.count = 0;
722   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
723               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
724   op->reserved = nullptr;
725   op++;
726   call_error = grpc_call_start_batch_and_execute(
727       call_, ops, static_cast<size_t>(op - ops), nullptr);
728   GPR_ASSERT(GRPC_CALL_OK == call_error);
729   // Op: send request message.
730   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
731                     grpc_schedule_on_exec_ctx);
732   for (const auto& p : xds_client()->listener_map_) {
733     SubscribeLocked(XdsApi::kLdsTypeUrl, std::string(p.first));
734   }
735   for (const auto& p : xds_client()->route_config_map_) {
736     SubscribeLocked(XdsApi::kRdsTypeUrl, std::string(p.first));
737   }
738   for (const auto& p : xds_client()->cluster_map_) {
739     SubscribeLocked(XdsApi::kCdsTypeUrl, std::string(p.first));
740   }
741   for (const auto& p : xds_client()->endpoint_map_) {
742     SubscribeLocked(XdsApi::kEdsTypeUrl, std::string(p.first));
743   }
744   // Op: recv initial metadata.
745   op = ops;
746   op->op = GRPC_OP_RECV_INITIAL_METADATA;
747   op->data.recv_initial_metadata.recv_initial_metadata =
748       &initial_metadata_recv_;
749   op->flags = 0;
750   op->reserved = nullptr;
751   op++;
752   // Op: recv response.
753   op->op = GRPC_OP_RECV_MESSAGE;
754   op->data.recv_message.recv_message = &recv_message_payload_;
755   op->flags = 0;
756   op->reserved = nullptr;
757   op++;
758   Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
759   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
760                     grpc_schedule_on_exec_ctx);
761   call_error = grpc_call_start_batch_and_execute(
762       call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
763   GPR_ASSERT(GRPC_CALL_OK == call_error);
764   // Op: recv server status.
765   op = ops;
766   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
767   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
768   op->data.recv_status_on_client.status = &status_code_;
769   op->data.recv_status_on_client.status_details = &status_details_;
770   op->flags = 0;
771   op->reserved = nullptr;
772   op++;
773   // This callback signals the end of the call, so it relies on the initial
774   // ref instead of a new ref. When it's invoked, it's the initial ref that is
775   // unreffed.
776   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
777                     grpc_schedule_on_exec_ctx);
778   call_error = grpc_call_start_batch_and_execute(
779       call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
780   GPR_ASSERT(GRPC_CALL_OK == call_error);
781 }
782 
~AdsCallState()783 XdsClient::ChannelState::AdsCallState::~AdsCallState() {
784   grpc_metadata_array_destroy(&initial_metadata_recv_);
785   grpc_metadata_array_destroy(&trailing_metadata_recv_);
786   grpc_byte_buffer_destroy(send_message_payload_);
787   grpc_byte_buffer_destroy(recv_message_payload_);
788   grpc_slice_unref_internal(status_details_);
789   GPR_ASSERT(call_ != nullptr);
790   grpc_call_unref(call_);
791 }
792 
Orphan()793 void XdsClient::ChannelState::AdsCallState::Orphan() {
794   GPR_ASSERT(call_ != nullptr);
795   // If we are here because xds_client wants to cancel the call,
796   // on_status_received_ will complete the cancellation and clean up. Otherwise,
797   // we are here because xds_client has to orphan a failed call, then the
798   // following cancellation will be a no-op.
799   grpc_call_cancel_internal(call_);
800   state_map_.clear();
801   // Note that the initial ref is hold by on_status_received_. So the
802   // corresponding unref happens in on_status_received_ instead of here.
803 }
804 
SendMessageLocked(const std::string & type_url)805 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
806     const std::string& type_url)
807     ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
808   // Buffer message sending if an existing message is in flight.
809   if (send_message_payload_ != nullptr) {
810     buffered_requests_.insert(type_url);
811     return;
812   }
813   auto& state = state_map_[type_url];
814   grpc_slice request_payload_slice;
815   std::set<absl::string_view> resource_names =
816       ResourceNamesForRequest(type_url);
817   request_payload_slice = xds_client()->api_.CreateAdsRequest(
818       chand()->server_, type_url, resource_names,
819       xds_client()->resource_version_map_[type_url], state.nonce,
820       GRPC_ERROR_REF(state.error), !sent_initial_message_);
821   if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
822       type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
823     state_map_.erase(type_url);
824   }
825   sent_initial_message_ = true;
826   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
827     gpr_log(GPR_INFO,
828             "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
829             "error=%s resources=%s",
830             xds_client(), type_url.c_str(),
831             xds_client()->resource_version_map_[type_url].c_str(),
832             state.nonce.c_str(), grpc_error_std_string(state.error).c_str(),
833             absl::StrJoin(resource_names, " ").c_str());
834   }
835   GRPC_ERROR_UNREF(state.error);
836   state.error = GRPC_ERROR_NONE;
837   // Create message payload.
838   send_message_payload_ =
839       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
840   grpc_slice_unref_internal(request_payload_slice);
841   // Send the message.
842   grpc_op op;
843   memset(&op, 0, sizeof(op));
844   op.op = GRPC_OP_SEND_MESSAGE;
845   op.data.send_message.send_message = send_message_payload_;
846   Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
847   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
848                     grpc_schedule_on_exec_ctx);
849   grpc_call_error call_error =
850       grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
851   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
852     gpr_log(GPR_ERROR,
853             "[xds_client %p] calld=%p call_error=%d sending ADS message",
854             xds_client(), this, call_error);
855     GPR_ASSERT(GRPC_CALL_OK == call_error);
856   }
857 }
858 
SubscribeLocked(const std::string & type_url,const std::string & name)859 void XdsClient::ChannelState::AdsCallState::SubscribeLocked(
860     const std::string& type_url, const std::string& name) {
861   auto& state = state_map_[type_url].subscribed_resources[name];
862   if (state == nullptr) {
863     state = MakeOrphanable<ResourceState>(
864         type_url, name, !xds_client()->resource_version_map_[type_url].empty());
865     SendMessageLocked(type_url);
866   }
867 }
868 
UnsubscribeLocked(const std::string & type_url,const std::string & name,bool delay_unsubscription)869 void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked(
870     const std::string& type_url, const std::string& name,
871     bool delay_unsubscription) {
872   state_map_[type_url].subscribed_resources.erase(name);
873   if (!delay_unsubscription) SendMessageLocked(type_url);
874 }
875 
HasSubscribedResources() const876 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
877   for (const auto& p : state_map_) {
878     if (!p.second.subscribed_resources.empty()) return true;
879   }
880   return false;
881 }
882 
883 namespace {
884 
885 // Build a resource metadata struct for ADS result accepting methods and CSDS.
CreateResourceMetadataAcked(std::string serialized_proto,std::string version,grpc_millis update_time)886 XdsApi::ResourceMetadata CreateResourceMetadataAcked(
887     std::string serialized_proto, std::string version,
888     grpc_millis update_time) {
889   XdsApi::ResourceMetadata resource_metadata;
890   resource_metadata.serialized_proto = std::move(serialized_proto);
891   resource_metadata.update_time = update_time;
892   resource_metadata.version = std::move(version);
893   resource_metadata.client_status = XdsApi::ResourceMetadata::ACKED;
894   return resource_metadata;
895 }
896 
897 }  // namespace
898 
AcceptLdsUpdateLocked(std::string version,grpc_millis update_time,XdsApi::LdsUpdateMap lds_update_map)899 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
900     std::string version, grpc_millis update_time,
901     XdsApi::LdsUpdateMap lds_update_map) {
902   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
903     gpr_log(GPR_INFO,
904             "[xds_client %p] LDS update received containing %" PRIuPTR
905             " resources",
906             xds_client(), lds_update_map.size());
907   }
908   auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
909   std::set<std::string> rds_resource_names_seen;
910   for (auto& p : lds_update_map) {
911     const std::string& listener_name = p.first;
912     XdsApi::LdsUpdate& lds_update = p.second.resource;
913     auto& state = lds_state.subscribed_resources[listener_name];
914     if (state != nullptr) state->Finish();
915     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
916       gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: %s", xds_client(),
917               listener_name.c_str(), lds_update.ToString().c_str());
918     }
919     // Record the RDS resource names seen.
920     if (!lds_update.http_connection_manager.route_config_name.empty()) {
921       rds_resource_names_seen.insert(
922           lds_update.http_connection_manager.route_config_name);
923     }
924     // Ignore identical update.
925     ListenerState& listener_state = xds_client()->listener_map_[listener_name];
926     if (listener_state.update.has_value() &&
927         *listener_state.update == lds_update) {
928       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
929         gpr_log(GPR_INFO,
930                 "[xds_client %p] LDS update for %s identical to current, "
931                 "ignoring.",
932                 xds_client(), listener_name.c_str());
933       }
934       continue;
935     }
936     // Update the listener state.
937     listener_state.update = std::move(lds_update);
938     listener_state.meta = CreateResourceMetadataAcked(
939         std::move(p.second.serialized_proto), version, update_time);
940     // Notify watchers.
941     for (const auto& p : listener_state.watchers) {
942       p.first->OnListenerChanged(*listener_state.update);
943     }
944   }
945   // For any subscribed resource that is not present in the update,
946   // remove it from the cache and notify watchers that it does not exist.
947   for (const auto& p : lds_state.subscribed_resources) {
948     const std::string& listener_name = p.first;
949     if (lds_update_map.find(listener_name) == lds_update_map.end()) {
950       ListenerState& listener_state =
951           xds_client()->listener_map_[listener_name];
952       // If the resource was newly requested but has not yet been received,
953       // we don't want to generate an error for the watchers, because this LDS
954       // response may be in reaction to an earlier request that did not yet
955       // request the new resource, so its absence from the response does not
956       // necessarily indicate that the resource does not exist.
957       // For that case, we rely on the request timeout instead.
958       if (!listener_state.update.has_value()) continue;
959       listener_state.update.reset();
960       for (const auto& p : listener_state.watchers) {
961         p.first->OnResourceDoesNotExist();
962       }
963     }
964   }
965   // For any RDS resource that is no longer referred to by any LDS
966   // resources, remove it from the cache and notify watchers that it
967   // does not exist.
968   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
969   for (const auto& p : rds_state.subscribed_resources) {
970     const std::string& rds_resource_name = p.first;
971     if (rds_resource_names_seen.find(rds_resource_name) ==
972         rds_resource_names_seen.end()) {
973       RouteConfigState& route_config_state =
974           xds_client()->route_config_map_[rds_resource_name];
975       route_config_state.update.reset();
976       for (const auto& p : route_config_state.watchers) {
977         p.first->OnResourceDoesNotExist();
978       }
979     }
980   }
981 }
982 
AcceptRdsUpdateLocked(std::string version,grpc_millis update_time,XdsApi::RdsUpdateMap rds_update_map)983 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked(
984     std::string version, grpc_millis update_time,
985     XdsApi::RdsUpdateMap rds_update_map) {
986   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
987     gpr_log(GPR_INFO,
988             "[xds_client %p] RDS update received containing %" PRIuPTR
989             " resources",
990             xds_client(), rds_update_map.size());
991   }
992   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
993   for (auto& p : rds_update_map) {
994     const std::string& route_config_name = p.first;
995     XdsApi::RdsUpdate& rds_update = p.second.resource;
996     auto& state = rds_state.subscribed_resources[route_config_name];
997     if (state != nullptr) state->Finish();
998     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
999       gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
1000               rds_update.ToString().c_str());
1001     }
1002     RouteConfigState& route_config_state =
1003         xds_client()->route_config_map_[route_config_name];
1004     // Ignore identical update.
1005     if (route_config_state.update.has_value() &&
1006         *route_config_state.update == rds_update) {
1007       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1008         gpr_log(GPR_INFO,
1009                 "[xds_client %p] RDS resource identical to current, ignoring",
1010                 xds_client());
1011       }
1012       continue;
1013     }
1014     // Update the cache.
1015     route_config_state.update = std::move(rds_update);
1016     route_config_state.meta = CreateResourceMetadataAcked(
1017         std::move(p.second.serialized_proto), version, update_time);
1018     // Notify all watchers.
1019     for (const auto& p : route_config_state.watchers) {
1020       p.first->OnRouteConfigChanged(*route_config_state.update);
1021     }
1022   }
1023 }
1024 
AcceptCdsUpdateLocked(std::string version,grpc_millis update_time,XdsApi::CdsUpdateMap cds_update_map)1025 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
1026     std::string version, grpc_millis update_time,
1027     XdsApi::CdsUpdateMap cds_update_map) {
1028   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1029     gpr_log(GPR_INFO,
1030             "[xds_client %p] CDS update received containing %" PRIuPTR
1031             " resources",
1032             xds_client(), cds_update_map.size());
1033   }
1034   auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
1035   std::set<std::string> eds_resource_names_seen;
1036   for (auto& p : cds_update_map) {
1037     const char* cluster_name = p.first.c_str();
1038     XdsApi::CdsUpdate& cds_update = p.second.resource;
1039     auto& state = cds_state.subscribed_resources[cluster_name];
1040     if (state != nullptr) state->Finish();
1041     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1042       gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(),
1043               cluster_name, cds_update.ToString().c_str());
1044     }
1045     // Record the EDS resource names seen.
1046     eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
1047                                        ? cluster_name
1048                                        : cds_update.eds_service_name);
1049     // Ignore identical update.
1050     ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1051     if (cluster_state.update.has_value() &&
1052         *cluster_state.update == cds_update) {
1053       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1054         gpr_log(GPR_INFO,
1055                 "[xds_client %p] CDS update identical to current, ignoring.",
1056                 xds_client());
1057       }
1058       continue;
1059     }
1060     // Update the cluster state.
1061     cluster_state.update = std::move(cds_update);
1062     cluster_state.meta = CreateResourceMetadataAcked(
1063         std::move(p.second.serialized_proto), version, update_time);
1064     // Notify all watchers.
1065     for (const auto& p : cluster_state.watchers) {
1066       p.first->OnClusterChanged(cluster_state.update.value());
1067     }
1068   }
1069   // For any subscribed resource that is not present in the update,
1070   // remove it from the cache and notify watchers that it does not exist.
1071   for (const auto& p : cds_state.subscribed_resources) {
1072     const std::string& cluster_name = p.first;
1073     if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
1074       ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1075       // If the resource was newly requested but has not yet been received,
1076       // we don't want to generate an error for the watchers, because this CDS
1077       // response may be in reaction to an earlier request that did not yet
1078       // request the new resource, so its absence from the response does not
1079       // necessarily indicate that the resource does not exist.
1080       // For that case, we rely on the request timeout instead.
1081       if (!cluster_state.update.has_value()) continue;
1082       cluster_state.update.reset();
1083       for (const auto& p : cluster_state.watchers) {
1084         p.first->OnResourceDoesNotExist();
1085       }
1086     }
1087   }
1088   // For any EDS resource that is no longer referred to by any CDS
1089   // resources, remove it from the cache and notify watchers that it
1090   // does not exist.
1091   auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1092   for (const auto& p : eds_state.subscribed_resources) {
1093     const std::string& eds_resource_name = p.first;
1094     if (eds_resource_names_seen.find(eds_resource_name) ==
1095         eds_resource_names_seen.end()) {
1096       EndpointState& endpoint_state =
1097           xds_client()->endpoint_map_[eds_resource_name];
1098       endpoint_state.update.reset();
1099       for (const auto& p : endpoint_state.watchers) {
1100         p.first->OnResourceDoesNotExist();
1101       }
1102     }
1103   }
1104 }
1105 
AcceptEdsUpdateLocked(std::string version,grpc_millis update_time,XdsApi::EdsUpdateMap eds_update_map)1106 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdateLocked(
1107     std::string version, grpc_millis update_time,
1108     XdsApi::EdsUpdateMap eds_update_map) {
1109   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1110     gpr_log(GPR_INFO,
1111             "[xds_client %p] EDS update received containing %" PRIuPTR
1112             " resources",
1113             xds_client(), eds_update_map.size());
1114   }
1115   auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1116   for (auto& p : eds_update_map) {
1117     const char* eds_service_name = p.first.c_str();
1118     XdsApi::EdsUpdate& eds_update = p.second.resource;
1119     auto& state = eds_state.subscribed_resources[eds_service_name];
1120     if (state != nullptr) state->Finish();
1121     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1122       gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
1123               eds_service_name, eds_update.ToString().c_str());
1124     }
1125     EndpointState& endpoint_state =
1126         xds_client()->endpoint_map_[eds_service_name];
1127     // Ignore identical update.
1128     if (endpoint_state.update.has_value() &&
1129         *endpoint_state.update == eds_update) {
1130       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1131         gpr_log(GPR_INFO,
1132                 "[xds_client %p] EDS update identical to current, ignoring.",
1133                 xds_client());
1134       }
1135       continue;
1136     }
1137     // Update the cluster state.
1138     endpoint_state.update = std::move(eds_update);
1139     endpoint_state.meta = CreateResourceMetadataAcked(
1140         std::move(p.second.serialized_proto), version, update_time);
1141     // Notify all watchers.
1142     for (const auto& p : endpoint_state.watchers) {
1143       p.first->OnEndpointChanged(endpoint_state.update.value());
1144     }
1145   }
1146 }
1147 
OnRequestSent(void * arg,grpc_error_handle error)1148 void XdsClient::ChannelState::AdsCallState::OnRequestSent(
1149     void* arg, grpc_error_handle error) {
1150   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1151   {
1152     MutexLock lock(&ads_calld->xds_client()->mu_);
1153     ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
1154   }
1155   ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1156 }
1157 
OnRequestSentLocked(grpc_error_handle error)1158 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1159     grpc_error_handle error) {
1160   if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
1161     // Clean up the sent message.
1162     grpc_byte_buffer_destroy(send_message_payload_);
1163     send_message_payload_ = nullptr;
1164     // Continue to send another pending message if any.
1165     // TODO(roth): The current code to handle buffered messages has the
1166     // advantage of sending only the most recent list of resource names for
1167     // each resource type (no matter how many times that resource type has
1168     // been requested to send while the current message sending is still
1169     // pending). But its disadvantage is that we send the requests in fixed
1170     // order of resource types. We need to fix this if we are seeing some
1171     // resource type(s) starved due to frequent requests of other resource
1172     // type(s).
1173     auto it = buffered_requests_.begin();
1174     if (it != buffered_requests_.end()) {
1175       SendMessageLocked(*it);
1176       buffered_requests_.erase(it);
1177     }
1178   }
1179   GRPC_ERROR_UNREF(error);
1180 }
1181 
OnResponseReceived(void * arg,grpc_error_handle)1182 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1183     void* arg, grpc_error_handle /* error */) {
1184   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1185   bool done;
1186   {
1187     MutexLock lock(&ads_calld->xds_client()->mu_);
1188     done = ads_calld->OnResponseReceivedLocked();
1189   }
1190   if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1191 }
1192 
OnResponseReceivedLocked()1193 bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1194   // Empty payload means the call was cancelled.
1195   if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1196     return true;
1197   }
1198   // Read the response.
1199   grpc_byte_buffer_reader bbr;
1200   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1201   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1202   grpc_byte_buffer_reader_destroy(&bbr);
1203   grpc_byte_buffer_destroy(recv_message_payload_);
1204   recv_message_payload_ = nullptr;
1205   // Parse and validate the response.
1206   XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
1207       chand()->server_, response_slice,
1208       ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
1209       ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
1210       ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
1211       ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
1212   grpc_slice_unref_internal(response_slice);
1213   if (result.type_url.empty()) {
1214     // Ignore unparsable response.
1215     gpr_log(GPR_ERROR,
1216             "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
1217             xds_client(), grpc_error_std_string(result.parse_error).c_str());
1218     GRPC_ERROR_UNREF(result.parse_error);
1219   } else {
1220     grpc_millis update_time = grpc_core::ExecCtx::Get()->Now();
1221     // Update nonce.
1222     auto& state = state_map_[result.type_url];
1223     state.nonce = std::move(result.nonce);
1224     // NACK or ACK the response.
1225     if (result.parse_error != GRPC_ERROR_NONE) {
1226       xds_client()->UpdateResourceMetadataWithFailedParseResultLocked(
1227           update_time, result);
1228       GRPC_ERROR_UNREF(state.error);
1229       state.error = result.parse_error;
1230       // NACK unacceptable update.
1231       gpr_log(GPR_ERROR,
1232               "[xds_client %p] ADS response invalid for resource type %s "
1233               "version %s, will NACK: nonce=%s error=%s",
1234               xds_client(), result.type_url.c_str(), result.version.c_str(),
1235               state.nonce.c_str(),
1236               grpc_error_std_string(result.parse_error).c_str());
1237       SendMessageLocked(result.type_url);
1238     } else {
1239       seen_response_ = true;
1240       // Accept the ADS response according to the type_url.
1241       if (result.type_url == XdsApi::kLdsTypeUrl) {
1242         AcceptLdsUpdateLocked(result.version, update_time,
1243                               std::move(result.lds_update_map));
1244       } else if (result.type_url == XdsApi::kRdsTypeUrl) {
1245         AcceptRdsUpdateLocked(result.version, update_time,
1246                               std::move(result.rds_update_map));
1247       } else if (result.type_url == XdsApi::kCdsTypeUrl) {
1248         AcceptCdsUpdateLocked(result.version, update_time,
1249                               std::move(result.cds_update_map));
1250       } else if (result.type_url == XdsApi::kEdsTypeUrl) {
1251         AcceptEdsUpdateLocked(result.version, update_time,
1252                               std::move(result.eds_update_map));
1253       }
1254       xds_client()->resource_version_map_[result.type_url] =
1255           std::move(result.version);
1256       // ACK the update.
1257       SendMessageLocked(result.type_url);
1258       // Start load reporting if needed.
1259       auto& lrs_call = chand()->lrs_calld_;
1260       if (lrs_call != nullptr) {
1261         LrsCallState* lrs_calld = lrs_call->calld();
1262         if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1263       }
1264     }
1265   }
1266   if (xds_client()->shutting_down_) return true;
1267   // Keep listening for updates.
1268   grpc_op op;
1269   memset(&op, 0, sizeof(op));
1270   op.op = GRPC_OP_RECV_MESSAGE;
1271   op.data.recv_message.recv_message = &recv_message_payload_;
1272   op.flags = 0;
1273   op.reserved = nullptr;
1274   GPR_ASSERT(call_ != nullptr);
1275   // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1276   const grpc_call_error call_error =
1277       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1278   GPR_ASSERT(GRPC_CALL_OK == call_error);
1279   return false;
1280 }
1281 
OnStatusReceived(void * arg,grpc_error_handle error)1282 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1283     void* arg, grpc_error_handle error) {
1284   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1285   {
1286     MutexLock lock(&ads_calld->xds_client()->mu_);
1287     ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1288   }
1289   ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1290 }
1291 
OnStatusReceivedLocked(grpc_error_handle error)1292 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1293     grpc_error_handle error) {
1294   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1295     char* status_details = grpc_slice_to_c_string(status_details_);
1296     gpr_log(GPR_INFO,
1297             "[xds_client %p] ADS call status received. Status = %d, details "
1298             "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
1299             xds_client(), status_code_, status_details, chand(), this, call_,
1300             grpc_error_std_string(error).c_str());
1301     gpr_free(status_details);
1302   }
1303   // Ignore status from a stale call.
1304   if (IsCurrentCallOnChannel()) {
1305     // Try to restart the call.
1306     parent_->OnCallFinishedLocked();
1307     // Send error to all watchers.
1308     xds_client()->NotifyOnErrorLocked(
1309         GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
1310   }
1311   GRPC_ERROR_UNREF(error);
1312 }
1313 
IsCurrentCallOnChannel() const1314 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1315   // If the retryable ADS call is null (which only happens when the xds channel
1316   // is shutting down), all the ADS calls are stale.
1317   if (chand()->ads_calld_ == nullptr) return false;
1318   return this == chand()->ads_calld_->calld();
1319 }
1320 
1321 std::set<absl::string_view>
ResourceNamesForRequest(const std::string & type_url)1322 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1323     const std::string& type_url) {
1324   std::set<absl::string_view> resource_names;
1325   auto it = state_map_.find(type_url);
1326   if (it != state_map_.end()) {
1327     for (auto& p : it->second.subscribed_resources) {
1328       resource_names.insert(p.first);
1329       OrphanablePtr<ResourceState>& state = p.second;
1330       state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
1331     }
1332   }
1333   return resource_names;
1334 }
1335 
1336 //
1337 // XdsClient::ChannelState::LrsCallState::Reporter
1338 //
1339 
Orphan()1340 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1341   if (next_report_timer_callback_pending_) {
1342     grpc_timer_cancel(&next_report_timer_);
1343   }
1344 }
1345 
1346 void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked()1347     ScheduleNextReportLocked() {
1348   const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1349   grpc_timer_init(&next_report_timer_, next_report_time,
1350                   &on_next_report_timer_);
1351   next_report_timer_callback_pending_ = true;
1352 }
1353 
OnNextReportTimer(void * arg,grpc_error_handle error)1354 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1355     void* arg, grpc_error_handle error) {
1356   Reporter* self = static_cast<Reporter*>(arg);
1357   bool done;
1358   {
1359     MutexLock lock(&self->xds_client()->mu_);
1360     done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
1361   }
1362   if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
1363 }
1364 
OnNextReportTimerLocked(grpc_error_handle error)1365 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1366     grpc_error_handle error) {
1367   next_report_timer_callback_pending_ = false;
1368   if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1369     GRPC_ERROR_UNREF(error);
1370     return true;
1371   }
1372   return SendReportLocked();
1373 }
1374 
1375 namespace {
1376 
LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap & snapshot)1377 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1378   for (const auto& p : snapshot) {
1379     const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1380     if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1381     for (const auto& q : cluster_snapshot.locality_stats) {
1382       const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1383       if (!locality_snapshot.IsZero()) return false;
1384     }
1385   }
1386   return true;
1387 }
1388 
1389 }  // namespace
1390 
SendReportLocked()1391 bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1392   // Construct snapshot from all reported stats.
1393   XdsApi::ClusterLoadReportMap snapshot =
1394       xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
1395                                                   parent_->cluster_names_);
1396   // Skip client load report if the counters were all zero in the last
1397   // report and they are still zero in this one.
1398   const bool old_val = last_report_counters_were_zero_;
1399   last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1400   if (old_val && last_report_counters_were_zero_) {
1401     if (xds_client()->load_report_map_.empty()) {
1402       parent_->chand()->StopLrsCall();
1403       return true;
1404     }
1405     ScheduleNextReportLocked();
1406     return false;
1407   }
1408   // Create a request that contains the snapshot.
1409   grpc_slice request_payload_slice =
1410       xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1411   parent_->send_message_payload_ =
1412       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1413   grpc_slice_unref_internal(request_payload_slice);
1414   // Send the report.
1415   grpc_op op;
1416   memset(&op, 0, sizeof(op));
1417   op.op = GRPC_OP_SEND_MESSAGE;
1418   op.data.send_message.send_message = parent_->send_message_payload_;
1419   grpc_call_error call_error = grpc_call_start_batch_and_execute(
1420       parent_->call_, &op, 1, &on_report_done_);
1421   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1422     gpr_log(GPR_ERROR,
1423             "[xds_client %p] calld=%p call_error=%d sending client load report",
1424             xds_client(), this, call_error);
1425     GPR_ASSERT(GRPC_CALL_OK == call_error);
1426   }
1427   return false;
1428 }
1429 
OnReportDone(void * arg,grpc_error_handle error)1430 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
1431     void* arg, grpc_error_handle error) {
1432   Reporter* self = static_cast<Reporter*>(arg);
1433   bool done;
1434   {
1435     MutexLock lock(&self->xds_client()->mu_);
1436     done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
1437   }
1438   if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1439 }
1440 
OnReportDoneLocked(grpc_error_handle error)1441 bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1442     grpc_error_handle error) {
1443   grpc_byte_buffer_destroy(parent_->send_message_payload_);
1444   parent_->send_message_payload_ = nullptr;
1445   // If there are no more registered stats to report, cancel the call.
1446   if (xds_client()->load_report_map_.empty()) {
1447     parent_->chand()->StopLrsCall();
1448     GRPC_ERROR_UNREF(error);
1449     return true;
1450   }
1451   if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1452     GRPC_ERROR_UNREF(error);
1453     // If this reporter is no longer the current one on the call, the reason
1454     // might be that it was orphaned for a new one due to config update.
1455     if (!IsCurrentReporterOnCall()) {
1456       parent_->MaybeStartReportingLocked();
1457     }
1458     return true;
1459   }
1460   ScheduleNextReportLocked();
1461   return false;
1462 }
1463 
1464 //
1465 // XdsClient::ChannelState::LrsCallState
1466 //
1467 
LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent)1468 XdsClient::ChannelState::LrsCallState::LrsCallState(
1469     RefCountedPtr<RetryableCall<LrsCallState>> parent)
1470     : InternallyRefCounted<LrsCallState>(
1471           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
1472               ? "LrsCallState"
1473               : nullptr),
1474       parent_(std::move(parent)) {
1475   // Init the LRS call. Note that the call will progress every time there's
1476   // activity in xds_client()->interested_parties_, which is comprised of
1477   // the polling entities from client_channel.
1478   GPR_ASSERT(xds_client() != nullptr);
1479   const auto& method =
1480       chand()->server_.ShouldUseV3()
1481           ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
1482           : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
1483   call_ = grpc_channel_create_pollset_set_call(
1484       chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1485       xds_client()->interested_parties_, method, nullptr,
1486       GRPC_MILLIS_INF_FUTURE, nullptr);
1487   GPR_ASSERT(call_ != nullptr);
1488   // Init the request payload.
1489   grpc_slice request_payload_slice =
1490       xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
1491   send_message_payload_ =
1492       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1493   grpc_slice_unref_internal(request_payload_slice);
1494   // Init other data associated with the LRS call.
1495   grpc_metadata_array_init(&initial_metadata_recv_);
1496   grpc_metadata_array_init(&trailing_metadata_recv_);
1497   // Start the call.
1498   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1499     gpr_log(GPR_INFO,
1500             "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
1501             "call: %p)",
1502             xds_client(), chand(), this, call_);
1503   }
1504   // Create the ops.
1505   grpc_call_error call_error;
1506   grpc_op ops[3];
1507   memset(ops, 0, sizeof(ops));
1508   // Op: send initial metadata.
1509   grpc_op* op = ops;
1510   op->op = GRPC_OP_SEND_INITIAL_METADATA;
1511   op->data.send_initial_metadata.count = 0;
1512   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
1513               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1514   op->reserved = nullptr;
1515   op++;
1516   // Op: send request message.
1517   GPR_ASSERT(send_message_payload_ != nullptr);
1518   op->op = GRPC_OP_SEND_MESSAGE;
1519   op->data.send_message.send_message = send_message_payload_;
1520   op->flags = 0;
1521   op->reserved = nullptr;
1522   op++;
1523   Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1524   GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
1525                     grpc_schedule_on_exec_ctx);
1526   call_error = grpc_call_start_batch_and_execute(
1527       call_, ops, static_cast<size_t>(op - ops), &on_initial_request_sent_);
1528   GPR_ASSERT(GRPC_CALL_OK == call_error);
1529   // Op: recv initial metadata.
1530   op = ops;
1531   op->op = GRPC_OP_RECV_INITIAL_METADATA;
1532   op->data.recv_initial_metadata.recv_initial_metadata =
1533       &initial_metadata_recv_;
1534   op->flags = 0;
1535   op->reserved = nullptr;
1536   op++;
1537   // Op: recv response.
1538   op->op = GRPC_OP_RECV_MESSAGE;
1539   op->data.recv_message.recv_message = &recv_message_payload_;
1540   op->flags = 0;
1541   op->reserved = nullptr;
1542   op++;
1543   Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1544   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1545                     grpc_schedule_on_exec_ctx);
1546   call_error = grpc_call_start_batch_and_execute(
1547       call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
1548   GPR_ASSERT(GRPC_CALL_OK == call_error);
1549   // Op: recv server status.
1550   op = ops;
1551   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1552   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1553   op->data.recv_status_on_client.status = &status_code_;
1554   op->data.recv_status_on_client.status_details = &status_details_;
1555   op->flags = 0;
1556   op->reserved = nullptr;
1557   op++;
1558   // This callback signals the end of the call, so it relies on the initial
1559   // ref instead of a new ref. When it's invoked, it's the initial ref that is
1560   // unreffed.
1561   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1562                     grpc_schedule_on_exec_ctx);
1563   call_error = grpc_call_start_batch_and_execute(
1564       call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
1565   GPR_ASSERT(GRPC_CALL_OK == call_error);
1566 }
1567 
~LrsCallState()1568 XdsClient::ChannelState::LrsCallState::~LrsCallState() {
1569   grpc_metadata_array_destroy(&initial_metadata_recv_);
1570   grpc_metadata_array_destroy(&trailing_metadata_recv_);
1571   grpc_byte_buffer_destroy(send_message_payload_);
1572   grpc_byte_buffer_destroy(recv_message_payload_);
1573   grpc_slice_unref_internal(status_details_);
1574   GPR_ASSERT(call_ != nullptr);
1575   grpc_call_unref(call_);
1576 }
1577 
Orphan()1578 void XdsClient::ChannelState::LrsCallState::Orphan() {
1579   reporter_.reset();
1580   GPR_ASSERT(call_ != nullptr);
1581   // If we are here because xds_client wants to cancel the call,
1582   // on_status_received_ will complete the cancellation and clean up. Otherwise,
1583   // we are here because xds_client has to orphan a failed call, then the
1584   // following cancellation will be a no-op.
1585   grpc_call_cancel_internal(call_);
1586   // Note that the initial ref is hold by on_status_received_. So the
1587   // corresponding unref happens in on_status_received_ instead of here.
1588 }
1589 
MaybeStartReportingLocked()1590 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1591   // Don't start again if already started.
1592   if (reporter_ != nullptr) return;
1593   // Don't start if the previous send_message op (of the initial request or the
1594   // last report of the previous reporter) hasn't completed.
1595   if (send_message_payload_ != nullptr) return;
1596   // Don't start if no LRS response has arrived.
1597   if (!seen_response()) return;
1598   // Don't start if the ADS call hasn't received any valid response. Note that
1599   // this must be the first channel because it is the current channel but its
1600   // ADS call hasn't seen any response.
1601   if (chand()->ads_calld_ == nullptr ||
1602       chand()->ads_calld_->calld() == nullptr ||
1603       !chand()->ads_calld_->calld()->seen_response()) {
1604     return;
1605   }
1606   // Start reporting.
1607   reporter_ = MakeOrphanable<Reporter>(
1608       Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1609 }
1610 
OnInitialRequestSent(void * arg,grpc_error_handle)1611 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
1612     void* arg, grpc_error_handle /*error*/) {
1613   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1614   {
1615     MutexLock lock(&lrs_calld->xds_client()->mu_);
1616     lrs_calld->OnInitialRequestSentLocked();
1617   }
1618   lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1619 }
1620 
OnInitialRequestSentLocked()1621 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1622   // Clear the send_message_payload_.
1623   grpc_byte_buffer_destroy(send_message_payload_);
1624   send_message_payload_ = nullptr;
1625   MaybeStartReportingLocked();
1626 }
1627 
OnResponseReceived(void * arg,grpc_error_handle)1628 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1629     void* arg, grpc_error_handle /*error*/) {
1630   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1631   bool done;
1632   {
1633     MutexLock lock(&lrs_calld->xds_client()->mu_);
1634     done = lrs_calld->OnResponseReceivedLocked();
1635   }
1636   if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1637 }
1638 
OnResponseReceivedLocked()1639 bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1640   // Empty payload means the call was cancelled.
1641   if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1642     return true;
1643   }
1644   // Read the response.
1645   grpc_byte_buffer_reader bbr;
1646   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1647   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1648   grpc_byte_buffer_reader_destroy(&bbr);
1649   grpc_byte_buffer_destroy(recv_message_payload_);
1650   recv_message_payload_ = nullptr;
1651   // This anonymous lambda is a hack to avoid the usage of goto.
1652   [&]() {
1653     // Parse the response.
1654     bool send_all_clusters = false;
1655     std::set<std::string> new_cluster_names;
1656     grpc_millis new_load_reporting_interval;
1657     grpc_error_handle parse_error = xds_client()->api_.ParseLrsResponse(
1658         response_slice, &send_all_clusters, &new_cluster_names,
1659         &new_load_reporting_interval);
1660     if (parse_error != GRPC_ERROR_NONE) {
1661       gpr_log(GPR_ERROR,
1662               "[xds_client %p] LRS response parsing failed. error=%s",
1663               xds_client(), grpc_error_std_string(parse_error).c_str());
1664       GRPC_ERROR_UNREF(parse_error);
1665       return;
1666     }
1667     seen_response_ = true;
1668     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1669       gpr_log(
1670           GPR_INFO,
1671           "[xds_client %p] LRS response received, %" PRIuPTR
1672           " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1673           "ms",
1674           xds_client(), new_cluster_names.size(), send_all_clusters,
1675           new_load_reporting_interval);
1676       size_t i = 0;
1677       for (const auto& name : new_cluster_names) {
1678         gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1679                 xds_client(), i++, name.c_str());
1680       }
1681     }
1682     if (new_load_reporting_interval <
1683         GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1684       new_load_reporting_interval =
1685           GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1686       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1687         gpr_log(GPR_INFO,
1688                 "[xds_client %p] Increased load_report_interval to minimum "
1689                 "value %dms",
1690                 xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1691       }
1692     }
1693     // Ignore identical update.
1694     if (send_all_clusters == send_all_clusters_ &&
1695         cluster_names_ == new_cluster_names &&
1696         load_reporting_interval_ == new_load_reporting_interval) {
1697       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1698         gpr_log(GPR_INFO,
1699                 "[xds_client %p] Incoming LRS response identical to current, "
1700                 "ignoring.",
1701                 xds_client());
1702       }
1703       return;
1704     }
1705     // Stop current load reporting (if any) to adopt the new config.
1706     reporter_.reset();
1707     // Record the new config.
1708     send_all_clusters_ = send_all_clusters;
1709     cluster_names_ = std::move(new_cluster_names);
1710     load_reporting_interval_ = new_load_reporting_interval;
1711     // Try starting sending load report.
1712     MaybeStartReportingLocked();
1713   }();
1714   grpc_slice_unref_internal(response_slice);
1715   if (xds_client()->shutting_down_) return true;
1716   // Keep listening for LRS config updates.
1717   grpc_op op;
1718   memset(&op, 0, sizeof(op));
1719   op.op = GRPC_OP_RECV_MESSAGE;
1720   op.data.recv_message.recv_message = &recv_message_payload_;
1721   op.flags = 0;
1722   op.reserved = nullptr;
1723   GPR_ASSERT(call_ != nullptr);
1724   // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1725   const grpc_call_error call_error =
1726       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1727   GPR_ASSERT(GRPC_CALL_OK == call_error);
1728   return false;
1729 }
1730 
OnStatusReceived(void * arg,grpc_error_handle error)1731 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1732     void* arg, grpc_error_handle error) {
1733   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1734   {
1735     MutexLock lock(&lrs_calld->xds_client()->mu_);
1736     lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1737   }
1738   lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1739 }
1740 
OnStatusReceivedLocked(grpc_error_handle error)1741 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1742     grpc_error_handle error) {
1743   GPR_ASSERT(call_ != nullptr);
1744   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1745     char* status_details = grpc_slice_to_c_string(status_details_);
1746     gpr_log(GPR_INFO,
1747             "[xds_client %p] LRS call status received. Status = %d, details "
1748             "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
1749             xds_client(), status_code_, status_details, chand(), this, call_,
1750             grpc_error_std_string(error).c_str());
1751     gpr_free(status_details);
1752   }
1753   // Ignore status from a stale call.
1754   if (IsCurrentCallOnChannel()) {
1755     GPR_ASSERT(!xds_client()->shutting_down_);
1756     // Try to restart the call.
1757     parent_->OnCallFinishedLocked();
1758   }
1759   GRPC_ERROR_UNREF(error);
1760 }
1761 
IsCurrentCallOnChannel() const1762 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1763   // If the retryable LRS call is null (which only happens when the xds channel
1764   // is shutting down), all the LRS calls are stale.
1765   if (chand()->lrs_calld_ == nullptr) return false;
1766   return this == chand()->lrs_calld_->calld();
1767 }
1768 
1769 //
1770 // XdsClient
1771 //
1772 
1773 namespace {
1774 
GetRequestTimeout(const grpc_channel_args * args)1775 grpc_millis GetRequestTimeout(const grpc_channel_args* args) {
1776   return grpc_channel_args_find_integer(
1777       args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
1778       {15000, 0, INT_MAX});
1779 }
1780 
ModifyChannelArgs(const grpc_channel_args * args)1781 grpc_channel_args* ModifyChannelArgs(const grpc_channel_args* args) {
1782   absl::InlinedVector<grpc_arg, 2> args_to_add = {
1783       grpc_channel_arg_integer_create(
1784           const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
1785           5 * 60 * GPR_MS_PER_SEC),
1786       grpc_channel_arg_integer_create(
1787           const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
1788   };
1789   return grpc_channel_args_copy_and_add(args, args_to_add.data(),
1790                                         args_to_add.size());
1791 }
1792 
1793 }  // namespace
1794 
XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,const grpc_channel_args * args)1795 XdsClient::XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,
1796                      const grpc_channel_args* args)
1797     : DualRefCounted<XdsClient>(
1798           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
1799                                                                   : nullptr),
1800       bootstrap_(std::move(bootstrap)),
1801       args_(ModifyChannelArgs(args)),
1802       request_timeout_(GetRequestTimeout(args)),
1803       interested_parties_(grpc_pollset_set_create()),
1804       certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
1805           bootstrap_->certificate_providers())),
1806       api_(this, &grpc_xds_client_trace, bootstrap_->node()) {
1807   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1808     gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1809   }
1810   // Create ChannelState object.
1811   chand_ = MakeOrphanable<ChannelState>(
1812       WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
1813 }
1814 
~XdsClient()1815 XdsClient::~XdsClient() {
1816   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1817     gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1818   }
1819   grpc_channel_args_destroy(args_);
1820   grpc_pollset_set_destroy(interested_parties_);
1821 }
1822 
AddChannelzLinkage(channelz::ChannelNode * parent_channelz_node)1823 void XdsClient::AddChannelzLinkage(
1824     channelz::ChannelNode* parent_channelz_node) {
1825   MutexLock lock(&mu_);
1826   channelz::ChannelNode* xds_channelz_node =
1827       grpc_channel_get_channelz_node(chand_->channel());
1828   if (xds_channelz_node != nullptr) {
1829     parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
1830   }
1831 }
1832 
RemoveChannelzLinkage(channelz::ChannelNode * parent_channelz_node)1833 void XdsClient::RemoveChannelzLinkage(
1834     channelz::ChannelNode* parent_channelz_node) {
1835   MutexLock lock(&mu_);
1836   channelz::ChannelNode* xds_channelz_node =
1837       grpc_channel_get_channelz_node(chand_->channel());
1838   if (xds_channelz_node != nullptr) {
1839     parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
1840   }
1841 }
1842 
Orphan()1843 void XdsClient::Orphan() {
1844   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1845     gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1846   }
1847   {
1848     MutexLock lock(g_mu);
1849     if (g_xds_client == this) g_xds_client = nullptr;
1850   }
1851   {
1852     MutexLock lock(&mu_);
1853     shutting_down_ = true;
1854     // Orphan ChannelState object.
1855     chand_.reset();
1856     // We do not clear cluster_map_ and endpoint_map_ if the xds client was
1857     // created by the XdsResolver because the maps contain refs for watchers
1858     // which in turn hold refs to the loadbalancing policies. At this point, it
1859     // is possible for ADS calls to be in progress. Unreffing the loadbalancing
1860     // policies before those calls are done would lead to issues such as
1861     // https://github.com/grpc/grpc/issues/20928.
1862     if (!listener_map_.empty()) {
1863       cluster_map_.clear();
1864       endpoint_map_.clear();
1865     }
1866   }
1867 }
1868 
WatchListenerData(absl::string_view listener_name,std::unique_ptr<ListenerWatcherInterface> watcher)1869 void XdsClient::WatchListenerData(
1870     absl::string_view listener_name,
1871     std::unique_ptr<ListenerWatcherInterface> watcher) {
1872   std::string listener_name_str = std::string(listener_name);
1873   MutexLock lock(&mu_);
1874   ListenerState& listener_state = listener_map_[listener_name_str];
1875   ListenerWatcherInterface* w = watcher.get();
1876   listener_state.watchers[w] = std::move(watcher);
1877   // If we've already received an LDS update, notify the new watcher
1878   // immediately.
1879   if (listener_state.update.has_value()) {
1880     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1881       gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
1882               this, listener_name_str.c_str());
1883     }
1884     w->OnListenerChanged(*listener_state.update);
1885   }
1886   chand_->SubscribeLocked(XdsApi::kLdsTypeUrl, listener_name_str);
1887 }
1888 
CancelListenerDataWatch(absl::string_view listener_name,ListenerWatcherInterface * watcher,bool delay_unsubscription)1889 void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
1890                                         ListenerWatcherInterface* watcher,
1891                                         bool delay_unsubscription) {
1892   MutexLock lock(&mu_);
1893   if (shutting_down_) return;
1894   std::string listener_name_str = std::string(listener_name);
1895   ListenerState& listener_state = listener_map_[listener_name_str];
1896   auto it = listener_state.watchers.find(watcher);
1897   if (it != listener_state.watchers.end()) {
1898     listener_state.watchers.erase(it);
1899     if (listener_state.watchers.empty()) {
1900       listener_map_.erase(listener_name_str);
1901       chand_->UnsubscribeLocked(XdsApi::kLdsTypeUrl, listener_name_str,
1902                                 delay_unsubscription);
1903     }
1904   }
1905 }
1906 
WatchRouteConfigData(absl::string_view route_config_name,std::unique_ptr<RouteConfigWatcherInterface> watcher)1907 void XdsClient::WatchRouteConfigData(
1908     absl::string_view route_config_name,
1909     std::unique_ptr<RouteConfigWatcherInterface> watcher) {
1910   std::string route_config_name_str = std::string(route_config_name);
1911   MutexLock lock(&mu_);
1912   RouteConfigState& route_config_state =
1913       route_config_map_[route_config_name_str];
1914   RouteConfigWatcherInterface* w = watcher.get();
1915   route_config_state.watchers[w] = std::move(watcher);
1916   // If we've already received an RDS update, notify the new watcher
1917   // immediately.
1918   if (route_config_state.update.has_value()) {
1919     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1920       gpr_log(GPR_INFO,
1921               "[xds_client %p] returning cached route config data for %s", this,
1922               route_config_name_str.c_str());
1923     }
1924     w->OnRouteConfigChanged(*route_config_state.update);
1925   }
1926   chand_->SubscribeLocked(XdsApi::kRdsTypeUrl, route_config_name_str);
1927 }
1928 
CancelRouteConfigDataWatch(absl::string_view route_config_name,RouteConfigWatcherInterface * watcher,bool delay_unsubscription)1929 void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
1930                                            RouteConfigWatcherInterface* watcher,
1931                                            bool delay_unsubscription) {
1932   MutexLock lock(&mu_);
1933   if (shutting_down_) return;
1934   std::string route_config_name_str = std::string(route_config_name);
1935   RouteConfigState& route_config_state =
1936       route_config_map_[route_config_name_str];
1937   auto it = route_config_state.watchers.find(watcher);
1938   if (it != route_config_state.watchers.end()) {
1939     route_config_state.watchers.erase(it);
1940     if (route_config_state.watchers.empty()) {
1941       route_config_map_.erase(route_config_name_str);
1942       chand_->UnsubscribeLocked(XdsApi::kRdsTypeUrl, route_config_name_str,
1943                                 delay_unsubscription);
1944     }
1945   }
1946 }
1947 
WatchClusterData(absl::string_view cluster_name,std::unique_ptr<ClusterWatcherInterface> watcher)1948 void XdsClient::WatchClusterData(
1949     absl::string_view cluster_name,
1950     std::unique_ptr<ClusterWatcherInterface> watcher) {
1951   std::string cluster_name_str = std::string(cluster_name);
1952   MutexLock lock(&mu_);
1953   ClusterState& cluster_state = cluster_map_[cluster_name_str];
1954   ClusterWatcherInterface* w = watcher.get();
1955   cluster_state.watchers[w] = std::move(watcher);
1956   // If we've already received a CDS update, notify the new watcher
1957   // immediately.
1958   if (cluster_state.update.has_value()) {
1959     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1960       gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
1961               this, cluster_name_str.c_str());
1962     }
1963     w->OnClusterChanged(cluster_state.update.value());
1964   }
1965   chand_->SubscribeLocked(XdsApi::kCdsTypeUrl, cluster_name_str);
1966 }
1967 
CancelClusterDataWatch(absl::string_view cluster_name,ClusterWatcherInterface * watcher,bool delay_unsubscription)1968 void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
1969                                        ClusterWatcherInterface* watcher,
1970                                        bool delay_unsubscription) {
1971   MutexLock lock(&mu_);
1972   if (shutting_down_) return;
1973   std::string cluster_name_str = std::string(cluster_name);
1974   ClusterState& cluster_state = cluster_map_[cluster_name_str];
1975   auto it = cluster_state.watchers.find(watcher);
1976   if (it != cluster_state.watchers.end()) {
1977     cluster_state.watchers.erase(it);
1978     if (cluster_state.watchers.empty()) {
1979       cluster_map_.erase(cluster_name_str);
1980       chand_->UnsubscribeLocked(XdsApi::kCdsTypeUrl, cluster_name_str,
1981                                 delay_unsubscription);
1982     }
1983   }
1984 }
1985 
WatchEndpointData(absl::string_view eds_service_name,std::unique_ptr<EndpointWatcherInterface> watcher)1986 void XdsClient::WatchEndpointData(
1987     absl::string_view eds_service_name,
1988     std::unique_ptr<EndpointWatcherInterface> watcher) {
1989   std::string eds_service_name_str = std::string(eds_service_name);
1990   MutexLock lock(&mu_);
1991   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1992   EndpointWatcherInterface* w = watcher.get();
1993   endpoint_state.watchers[w] = std::move(watcher);
1994   // If we've already received an EDS update, notify the new watcher
1995   // immediately.
1996   if (endpoint_state.update.has_value()) {
1997     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1998       gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
1999               this, eds_service_name_str.c_str());
2000     }
2001     w->OnEndpointChanged(endpoint_state.update.value());
2002   }
2003   chand_->SubscribeLocked(XdsApi::kEdsTypeUrl, eds_service_name_str);
2004 }
2005 
CancelEndpointDataWatch(absl::string_view eds_service_name,EndpointWatcherInterface * watcher,bool delay_unsubscription)2006 void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
2007                                         EndpointWatcherInterface* watcher,
2008                                         bool delay_unsubscription) {
2009   MutexLock lock(&mu_);
2010   if (shutting_down_) return;
2011   std::string eds_service_name_str = std::string(eds_service_name);
2012   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
2013   auto it = endpoint_state.watchers.find(watcher);
2014   if (it != endpoint_state.watchers.end()) {
2015     endpoint_state.watchers.erase(it);
2016     if (endpoint_state.watchers.empty()) {
2017       endpoint_map_.erase(eds_service_name_str);
2018       chand_->UnsubscribeLocked(XdsApi::kEdsTypeUrl, eds_service_name_str,
2019                                 delay_unsubscription);
2020     }
2021   }
2022 }
2023 
AddClusterDropStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name)2024 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
2025     absl::string_view lrs_server, absl::string_view cluster_name,
2026     absl::string_view eds_service_name) {
2027   // TODO(roth): When we add support for direct federation, use the
2028   // server name specified in lrs_server.
2029   auto key =
2030       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2031   MutexLock lock(&mu_);
2032   // We jump through some hoops here to make sure that the absl::string_views
2033   // stored in the XdsClusterDropStats object point to the strings
2034   // in the load_report_map_ key, so that they have the same lifetime.
2035   auto it = load_report_map_
2036                 .emplace(std::make_pair(std::move(key), LoadReportState()))
2037                 .first;
2038   LoadReportState& load_report_state = it->second;
2039   RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
2040   if (load_report_state.drop_stats != nullptr) {
2041     cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
2042   }
2043   if (cluster_drop_stats == nullptr) {
2044     if (load_report_state.drop_stats != nullptr) {
2045       load_report_state.deleted_drop_stats +=
2046           load_report_state.drop_stats->GetSnapshotAndReset();
2047     }
2048     cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
2049         Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
2050         it->first.first /*cluster_name*/,
2051         it->first.second /*eds_service_name*/);
2052     load_report_state.drop_stats = cluster_drop_stats.get();
2053   }
2054   chand_->MaybeStartLrsCall();
2055   return cluster_drop_stats;
2056 }
2057 
RemoveClusterDropStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,XdsClusterDropStats * cluster_drop_stats)2058 void XdsClient::RemoveClusterDropStats(
2059     absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2060     absl::string_view eds_service_name,
2061     XdsClusterDropStats* cluster_drop_stats) {
2062   MutexLock lock(&mu_);
2063   // TODO(roth): When we add support for direct federation, use the
2064   // server name specified in lrs_server.
2065   auto it = load_report_map_.find(
2066       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2067   if (it == load_report_map_.end()) return;
2068   LoadReportState& load_report_state = it->second;
2069   if (load_report_state.drop_stats == cluster_drop_stats) {
2070     // Record final snapshot in deleted_drop_stats, which will be
2071     // added to the next load report.
2072     load_report_state.deleted_drop_stats +=
2073         load_report_state.drop_stats->GetSnapshotAndReset();
2074     load_report_state.drop_stats = nullptr;
2075   }
2076 }
2077 
AddClusterLocalityStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality)2078 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
2079     absl::string_view lrs_server, absl::string_view cluster_name,
2080     absl::string_view eds_service_name,
2081     RefCountedPtr<XdsLocalityName> locality) {
2082   // TODO(roth): When we add support for direct federation, use the
2083   // server name specified in lrs_server.
2084   auto key =
2085       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2086   MutexLock lock(&mu_);
2087   // We jump through some hoops here to make sure that the absl::string_views
2088   // stored in the XdsClusterLocalityStats object point to the strings
2089   // in the load_report_map_ key, so that they have the same lifetime.
2090   auto it = load_report_map_
2091                 .emplace(std::make_pair(std::move(key), LoadReportState()))
2092                 .first;
2093   LoadReportState& load_report_state = it->second;
2094   LoadReportState::LocalityState& locality_state =
2095       load_report_state.locality_stats[locality];
2096   RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
2097   if (locality_state.locality_stats != nullptr) {
2098     cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
2099   }
2100   if (cluster_locality_stats == nullptr) {
2101     if (locality_state.locality_stats != nullptr) {
2102       locality_state.deleted_locality_stats +=
2103           locality_state.locality_stats->GetSnapshotAndReset();
2104     }
2105     cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2106         Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
2107         it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
2108         std::move(locality));
2109     locality_state.locality_stats = cluster_locality_stats.get();
2110   }
2111   chand_->MaybeStartLrsCall();
2112   return cluster_locality_stats;
2113 }
2114 
RemoveClusterLocalityStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,XdsClusterLocalityStats * cluster_locality_stats)2115 void XdsClient::RemoveClusterLocalityStats(
2116     absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2117     absl::string_view eds_service_name,
2118     const RefCountedPtr<XdsLocalityName>& locality,
2119     XdsClusterLocalityStats* cluster_locality_stats) {
2120   MutexLock lock(&mu_);
2121   // TODO(roth): When we add support for direct federation, use the
2122   // server name specified in lrs_server.
2123   auto it = load_report_map_.find(
2124       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2125   if (it == load_report_map_.end()) return;
2126   LoadReportState& load_report_state = it->second;
2127   auto locality_it = load_report_state.locality_stats.find(locality);
2128   if (locality_it == load_report_state.locality_stats.end()) return;
2129   LoadReportState::LocalityState& locality_state = locality_it->second;
2130   if (locality_state.locality_stats == cluster_locality_stats) {
2131     // Record final snapshot in deleted_locality_stats, which will be
2132     // added to the next load report.
2133     locality_state.deleted_locality_stats +=
2134         locality_state.locality_stats->GetSnapshotAndReset();
2135     locality_state.locality_stats = nullptr;
2136   }
2137 }
2138 
ResetBackoff()2139 void XdsClient::ResetBackoff() {
2140   MutexLock lock(&mu_);
2141   if (chand_ != nullptr) {
2142     grpc_channel_reset_connect_backoff(chand_->channel());
2143   }
2144 }
2145 
NotifyOnErrorLocked(grpc_error_handle error)2146 void XdsClient::NotifyOnErrorLocked(grpc_error_handle error) {
2147   for (const auto& p : listener_map_) {
2148     const ListenerState& listener_state = p.second;
2149     for (const auto& p : listener_state.watchers) {
2150       p.first->OnError(GRPC_ERROR_REF(error));
2151     }
2152   }
2153   for (const auto& p : route_config_map_) {
2154     const RouteConfigState& route_config_state = p.second;
2155     for (const auto& p : route_config_state.watchers) {
2156       p.first->OnError(GRPC_ERROR_REF(error));
2157     }
2158   }
2159   for (const auto& p : cluster_map_) {
2160     const ClusterState& cluster_state = p.second;
2161     for (const auto& p : cluster_state.watchers) {
2162       p.first->OnError(GRPC_ERROR_REF(error));
2163     }
2164   }
2165   for (const auto& p : endpoint_map_) {
2166     const EndpointState& endpoint_state = p.second;
2167     for (const auto& p : endpoint_state.watchers) {
2168       p.first->OnError(GRPC_ERROR_REF(error));
2169     }
2170   }
2171   GRPC_ERROR_UNREF(error);
2172 }
2173 
BuildLoadReportSnapshotLocked(bool send_all_clusters,const std::set<std::string> & clusters)2174 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
2175     bool send_all_clusters, const std::set<std::string>& clusters) {
2176   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2177     gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
2178   }
2179   XdsApi::ClusterLoadReportMap snapshot_map;
2180   for (auto load_report_it = load_report_map_.begin();
2181        load_report_it != load_report_map_.end();) {
2182     // Cluster key is cluster and EDS service name.
2183     const auto& cluster_key = load_report_it->first;
2184     LoadReportState& load_report = load_report_it->second;
2185     // If the CDS response for a cluster indicates to use LRS but the
2186     // LRS server does not say that it wants reports for this cluster,
2187     // then we'll have stats objects here whose data we're not going to
2188     // include in the load report.  However, we still need to clear out
2189     // the data from the stats objects, so that if the LRS server starts
2190     // asking for the data in the future, we don't incorrectly include
2191     // data from previous reporting intervals in that future report.
2192     const bool record_stats =
2193         send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2194     XdsApi::ClusterLoadReport snapshot;
2195     // Aggregate drop stats.
2196     snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2197     if (load_report.drop_stats != nullptr) {
2198       snapshot.dropped_requests +=
2199           load_report.drop_stats->GetSnapshotAndReset();
2200       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2201         gpr_log(GPR_INFO,
2202                 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
2203                 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2204                 load_report.drop_stats);
2205       }
2206     }
2207     // Aggregate locality stats.
2208     for (auto it = load_report.locality_stats.begin();
2209          it != load_report.locality_stats.end();) {
2210       const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2211       auto& locality_state = it->second;
2212       XdsClusterLocalityStats::Snapshot& locality_snapshot =
2213           snapshot.locality_stats[locality_name];
2214       locality_snapshot = std::move(locality_state.deleted_locality_stats);
2215       if (locality_state.locality_stats != nullptr) {
2216         locality_snapshot +=
2217             locality_state.locality_stats->GetSnapshotAndReset();
2218         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2219           gpr_log(GPR_INFO,
2220                   "[xds_client %p] cluster=%s eds_service_name=%s "
2221                   "locality=%s locality_stats=%p",
2222                   this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2223                   locality_name->AsHumanReadableString().c_str(),
2224                   locality_state.locality_stats);
2225         }
2226       }
2227       // If the only thing left in this entry was final snapshots from
2228       // deleted locality stats objects, remove the entry.
2229       if (locality_state.locality_stats == nullptr) {
2230         it = load_report.locality_stats.erase(it);
2231       } else {
2232         ++it;
2233       }
2234     }
2235     // Compute load report interval.
2236     const grpc_millis now = ExecCtx::Get()->Now();
2237     snapshot.load_report_interval = now - load_report.last_report_time;
2238     load_report.last_report_time = now;
2239     // Record snapshot.
2240     if (record_stats) {
2241       snapshot_map[cluster_key] = std::move(snapshot);
2242     }
2243     // If the only thing left in this entry was final snapshots from
2244     // deleted stats objects, remove the entry.
2245     if (load_report.locality_stats.empty() &&
2246         load_report.drop_stats == nullptr) {
2247       load_report_it = load_report_map_.erase(load_report_it);
2248     } else {
2249       ++load_report_it;
2250     }
2251   }
2252   return snapshot_map;
2253 }
2254 
UpdateResourceMetadataWithFailedParseResultLocked(grpc_millis update_time,const XdsApi::AdsParseResult & result)2255 void XdsClient::UpdateResourceMetadataWithFailedParseResultLocked(
2256     grpc_millis update_time, const XdsApi::AdsParseResult& result) {
2257   // ADS update is rejected and the resource names in the failed update is
2258   // available.
2259   std::string details = grpc_error_std_string(result.parse_error);
2260   for (auto& name : result.resource_names_failed) {
2261     XdsApi::ResourceMetadata* resource_metadata = nullptr;
2262     if (result.type_url == XdsApi::kLdsTypeUrl) {
2263       auto it = listener_map_.find(name);
2264       if (it != listener_map_.end()) {
2265         resource_metadata = &it->second.meta;
2266       }
2267     } else if (result.type_url == XdsApi::kRdsTypeUrl) {
2268       auto it = route_config_map_.find(name);
2269       if (route_config_map_.find(name) != route_config_map_.end()) {
2270         resource_metadata = &it->second.meta;
2271       }
2272     } else if (result.type_url == XdsApi::kCdsTypeUrl) {
2273       auto it = cluster_map_.find(name);
2274       if (cluster_map_.find(name) != cluster_map_.end()) {
2275         resource_metadata = &it->second.meta;
2276       }
2277     } else if (result.type_url == XdsApi::kEdsTypeUrl) {
2278       auto it = endpoint_map_.find(name);
2279       if (endpoint_map_.find(name) != endpoint_map_.end()) {
2280         resource_metadata = &it->second.meta;
2281       }
2282     }
2283     if (resource_metadata == nullptr) {
2284       return;
2285     }
2286     resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED;
2287     resource_metadata->failed_version = result.version;
2288     resource_metadata->failed_details = details;
2289     resource_metadata->failed_update_time = update_time;
2290   }
2291 }
2292 
DumpClientConfigBinary()2293 std::string XdsClient::DumpClientConfigBinary() {
2294   MutexLock lock(&mu_);
2295   XdsApi::ResourceTypeMetadataMap resource_type_metadata_map;
2296   // Update per-xds-type version if available, this version corresponding to the
2297   // last successful ADS update version.
2298   for (auto& p : resource_version_map_) {
2299     resource_type_metadata_map[p.first].version = p.second;
2300   }
2301   // Collect resource metadata from listeners
2302   auto& lds_map =
2303       resource_type_metadata_map[XdsApi::kLdsTypeUrl].resource_metadata_map;
2304   for (auto& p : listener_map_) {
2305     lds_map[p.first] = &p.second.meta;
2306   }
2307   // Collect resource metadata from route configs
2308   auto& rds_map =
2309       resource_type_metadata_map[XdsApi::kRdsTypeUrl].resource_metadata_map;
2310   for (auto& p : route_config_map_) {
2311     rds_map[p.first] = &p.second.meta;
2312   }
2313   // Collect resource metadata from clusters
2314   auto& cds_map =
2315       resource_type_metadata_map[XdsApi::kCdsTypeUrl].resource_metadata_map;
2316   for (auto& p : cluster_map_) {
2317     cds_map[p.first] = &p.second.meta;
2318   }
2319   // Collect resource metadata from endpoints
2320   auto& eds_map =
2321       resource_type_metadata_map[XdsApi::kEdsTypeUrl].resource_metadata_map;
2322   for (auto& p : endpoint_map_) {
2323     eds_map[p.first] = &p.second.meta;
2324   }
2325   // Assemble config dump messages
2326   return api_.AssembleClientConfig(resource_type_metadata_map);
2327 }
2328 
2329 //
2330 // accessors for global state
2331 //
2332 
XdsClientGlobalInit()2333 void XdsClientGlobalInit() {
2334   g_mu = new Mutex;
2335   XdsHttpFilterRegistry::Init();
2336 }
2337 
2338 // TODO(roth): Find a better way to clear the fallback config that does
2339 // not require using ABSL_NO_THREAD_SAFETY_ANALYSIS.
XdsClientGlobalShutdown()2340 void XdsClientGlobalShutdown() ABSL_NO_THREAD_SAFETY_ANALYSIS {
2341   gpr_free(g_fallback_bootstrap_config);
2342   g_fallback_bootstrap_config = nullptr;
2343   delete g_mu;
2344   g_mu = nullptr;
2345   XdsHttpFilterRegistry::Shutdown();
2346 }
2347 
2348 namespace {
2349 
GetBootstrapContents(const char * fallback_config,grpc_error_handle * error)2350 std::string GetBootstrapContents(const char* fallback_config,
2351                                  grpc_error_handle* error) {
2352   // First, try GRPC_XDS_BOOTSTRAP env var.
2353   grpc_core::UniquePtr<char> path(gpr_getenv("GRPC_XDS_BOOTSTRAP"));
2354   if (path != nullptr) {
2355     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2356       gpr_log(GPR_INFO,
2357               "Got bootstrap file location from GRPC_XDS_BOOTSTRAP "
2358               "environment variable: %s",
2359               path.get());
2360     }
2361     grpc_slice contents;
2362     *error =
2363         grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents);
2364     if (*error != GRPC_ERROR_NONE) return "";
2365     std::string contents_str(StringViewFromSlice(contents));
2366     grpc_slice_unref_internal(contents);
2367     return contents_str;
2368   }
2369   // Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var.
2370   grpc_core::UniquePtr<char> env_config(
2371       gpr_getenv("GRPC_XDS_BOOTSTRAP_CONFIG"));
2372   if (env_config != nullptr) {
2373     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2374       gpr_log(GPR_INFO,
2375               "Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG "
2376               "environment variable");
2377     }
2378     return env_config.get();
2379   }
2380   // Finally, try fallback config.
2381   if (fallback_config != nullptr) {
2382     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2383       gpr_log(GPR_INFO, "Got bootstrap contents from fallback config");
2384     }
2385     return fallback_config;
2386   }
2387   // No bootstrap config found.
2388   *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2389       "Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG "
2390       "not defined");
2391   return "";
2392 }
2393 
2394 }  // namespace
2395 
GetOrCreate(const grpc_channel_args * args,grpc_error_handle * error)2396 RefCountedPtr<XdsClient> XdsClient::GetOrCreate(const grpc_channel_args* args,
2397                                                 grpc_error_handle* error) {
2398   RefCountedPtr<XdsClient> xds_client;
2399   // If getting bootstrap from channel args, create a local XdsClient
2400   // instance for the channel or server instead of using the global instance.
2401   const char* bootstrap_config = grpc_channel_args_find_string(
2402       args, GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG);
2403   if (bootstrap_config != nullptr) {
2404     std::unique_ptr<XdsBootstrap> bootstrap =
2405         XdsBootstrap::Create(bootstrap_config, error);
2406     if (*error == GRPC_ERROR_NONE) {
2407       grpc_channel_args* xds_channel_args =
2408           grpc_channel_args_find_pointer<grpc_channel_args>(
2409               args,
2410               GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS);
2411       return MakeRefCounted<XdsClient>(std::move(bootstrap), xds_channel_args);
2412     }
2413     return nullptr;
2414   }
2415   // Otherwise, use the global instance.
2416   {
2417     MutexLock lock(g_mu);
2418     if (g_xds_client != nullptr) {
2419       auto xds_client = g_xds_client->RefIfNonZero();
2420       if (xds_client != nullptr) return xds_client;
2421     }
2422     // Find bootstrap contents.
2423     std::string bootstrap_contents =
2424         GetBootstrapContents(g_fallback_bootstrap_config, error);
2425     if (*error != GRPC_ERROR_NONE) return nullptr;
2426     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2427       gpr_log(GPR_INFO, "xDS bootstrap contents: %s",
2428               bootstrap_contents.c_str());
2429     }
2430     // Parse bootstrap.
2431     std::unique_ptr<XdsBootstrap> bootstrap =
2432         XdsBootstrap::Create(bootstrap_contents, error);
2433     if (*error != GRPC_ERROR_NONE) return nullptr;
2434     // Instantiate XdsClient.
2435     xds_client =
2436         MakeRefCounted<XdsClient>(std::move(bootstrap), g_channel_args);
2437     g_xds_client = xds_client.get();
2438   }
2439   return xds_client;
2440 }
2441 
2442 namespace internal {
2443 
SetXdsChannelArgsForTest(grpc_channel_args * args)2444 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
2445   MutexLock lock(g_mu);
2446   g_channel_args = args;
2447 }
2448 
UnsetGlobalXdsClientForTest()2449 void UnsetGlobalXdsClientForTest() {
2450   MutexLock lock(g_mu);
2451   g_xds_client = nullptr;
2452 }
2453 
SetXdsFallbackBootstrapConfig(const char * config)2454 void SetXdsFallbackBootstrapConfig(const char* config) {
2455   MutexLock lock(g_mu);
2456   gpr_free(g_fallback_bootstrap_config);
2457   g_fallback_bootstrap_config = gpr_strdup(config);
2458 }
2459 
2460 }  // namespace internal
2461 
2462 //
2463 // embedding XdsClient in channel args
2464 //
2465 
2466 #define GRPC_ARG_XDS_CLIENT "grpc.internal.xds_client"
2467 
2468 namespace {
2469 
XdsClientArgCopy(void * p)2470 void* XdsClientArgCopy(void* p) {
2471   XdsClient* xds_client = static_cast<XdsClient*>(p);
2472   xds_client->Ref(DEBUG_LOCATION, "channel arg").release();
2473   return p;
2474 }
2475 
XdsClientArgDestroy(void * p)2476 void XdsClientArgDestroy(void* p) {
2477   XdsClient* xds_client = static_cast<XdsClient*>(p);
2478   xds_client->Unref(DEBUG_LOCATION, "channel arg");
2479 }
2480 
XdsClientArgCmp(void * p,void * q)2481 int XdsClientArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
2482 
2483 const grpc_arg_pointer_vtable kXdsClientArgVtable = {
2484     XdsClientArgCopy, XdsClientArgDestroy, XdsClientArgCmp};
2485 
2486 }  // namespace
2487 
MakeChannelArg() const2488 grpc_arg XdsClient::MakeChannelArg() const {
2489   return grpc_channel_arg_pointer_create(const_cast<char*>(GRPC_ARG_XDS_CLIENT),
2490                                          const_cast<XdsClient*>(this),
2491                                          &kXdsClientArgVtable);
2492 }
2493 
GetFromChannelArgs(const grpc_channel_args & args)2494 RefCountedPtr<XdsClient> XdsClient::GetFromChannelArgs(
2495     const grpc_channel_args& args) {
2496   XdsClient* xds_client =
2497       grpc_channel_args_find_pointer<XdsClient>(&args, GRPC_ARG_XDS_CLIENT);
2498   if (xds_client == nullptr) return nullptr;
2499   return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs");
2500 }
2501 
2502 }  // namespace grpc_core
2503 
2504 // The returned bytes may contain NULL(0), so we can't use c-string.
grpc_dump_xds_configs()2505 grpc_slice grpc_dump_xds_configs() {
2506   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2507   grpc_core::ExecCtx exec_ctx;
2508   grpc_error_handle error = GRPC_ERROR_NONE;
2509   auto xds_client = grpc_core::XdsClient::GetOrCreate(nullptr, &error);
2510   if (error != GRPC_ERROR_NONE) {
2511     // If we isn't using xDS, just return an empty string.
2512     GRPC_ERROR_UNREF(error);
2513     return grpc_empty_slice();
2514   }
2515   return grpc_slice_from_cpp_string(xds_client->DumpClientConfigBinary());
2516 }
2517