• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <string.h>
24 
25 #include "absl/container/inlined_vector.h"
26 #include "absl/strings/str_format.h"
27 #include "absl/strings/str_join.h"
28 #include "absl/strings/string_view.h"
29 
30 #include <grpc/byte_buffer_reader.h>
31 #include <grpc/grpc.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/time.h>
34 
35 #include "src/core/ext/filters/client_channel/client_channel.h"
36 #include "src/core/ext/filters/client_channel/service_config.h"
37 #include "src/core/ext/xds/xds_api.h"
38 #include "src/core/ext/xds/xds_channel_args.h"
39 #include "src/core/ext/xds/xds_client.h"
40 #include "src/core/ext/xds/xds_client_stats.h"
41 #include "src/core/lib/backoff/backoff.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/channel/channel_stack.h"
44 #include "src/core/lib/gpr/string.h"
45 #include "src/core/lib/gprpp/memory.h"
46 #include "src/core/lib/gprpp/orphanable.h"
47 #include "src/core/lib/gprpp/ref_counted_ptr.h"
48 #include "src/core/lib/gprpp/sync.h"
49 #include "src/core/lib/iomgr/sockaddr.h"
50 #include "src/core/lib/iomgr/sockaddr_utils.h"
51 #include "src/core/lib/iomgr/timer.h"
52 #include "src/core/lib/slice/slice_internal.h"
53 #include "src/core/lib/slice/slice_string_helpers.h"
54 #include "src/core/lib/surface/call.h"
55 #include "src/core/lib/surface/channel.h"
56 #include "src/core/lib/surface/channel_init.h"
57 #include "src/core/lib/transport/static_metadata.h"
58 
59 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
60 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
61 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
62 #define GRPC_XDS_RECONNECT_JITTER 0.2
63 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
64 
65 namespace grpc_core {
66 
67 TraceFlag grpc_xds_client_trace(false, "xds_client");
68 TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
69 
70 namespace {
71 
72 Mutex* g_mu = nullptr;
73 const grpc_channel_args* g_channel_args = nullptr;
74 XdsClient* g_xds_client = nullptr;
75 char* g_fallback_bootstrap_config = nullptr;
76 
77 }  // namespace
78 
79 //
80 // Internal class declarations
81 //
82 
83 // An xds call wrapper that can restart a call upon failure. Holds a ref to
84 // the xds channel. The template parameter is the kind of wrapped xds call.
85 template <typename T>
86 class XdsClient::ChannelState::RetryableCall
87     : public InternallyRefCounted<RetryableCall<T>> {
88  public:
89   explicit RetryableCall(RefCountedPtr<ChannelState> chand);
90 
91   void Orphan() override;
92 
93   void OnCallFinishedLocked();
94 
calld() const95   T* calld() const { return calld_.get(); }
chand() const96   ChannelState* chand() const { return chand_.get(); }
97 
98   bool IsCurrentCallOnChannel() const;
99 
100  private:
101   void StartNewCallLocked();
102   void StartRetryTimerLocked();
103   static void OnRetryTimer(void* arg, grpc_error* error);
104   void OnRetryTimerLocked(grpc_error* error);
105 
106   // The wrapped xds call that talks to the xds server. It's instantiated
107   // every time we start a new call. It's null during call retry backoff.
108   OrphanablePtr<T> calld_;
109   // The owning xds channel.
110   RefCountedPtr<ChannelState> chand_;
111 
112   // Retry state.
113   BackOff backoff_;
114   grpc_timer retry_timer_;
115   grpc_closure on_retry_timer_;
116   bool retry_timer_callback_pending_ = false;
117 
118   bool shutting_down_ = false;
119 };
120 
121 // Contains an ADS call to the xds server.
122 class XdsClient::ChannelState::AdsCallState
123     : public InternallyRefCounted<AdsCallState> {
124  public:
125   // The ctor and dtor should not be used directly.
126   explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
127   ~AdsCallState() override;
128 
129   void Orphan() override;
130 
parent() const131   RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
chand() const132   ChannelState* chand() const { return parent_->chand(); }
xds_client() const133   XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const134   bool seen_response() const { return seen_response_; }
135 
136   void Subscribe(const std::string& type_url, const std::string& name);
137   void Unsubscribe(const std::string& type_url, const std::string& name,
138                    bool delay_unsubscription);
139 
140   bool HasSubscribedResources() const;
141 
142  private:
143   class ResourceState : public InternallyRefCounted<ResourceState> {
144    public:
ResourceState(const std::string & type_url,const std::string & name,bool sent_initial_request)145     ResourceState(const std::string& type_url, const std::string& name,
146                   bool sent_initial_request)
147         : type_url_(type_url),
148           name_(name),
149           sent_initial_request_(sent_initial_request) {
150       GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
151                         grpc_schedule_on_exec_ctx);
152     }
153 
Orphan()154     void Orphan() override {
155       Finish();
156       Unref(DEBUG_LOCATION, "Orphan");
157     }
158 
Start(RefCountedPtr<AdsCallState> ads_calld)159     void Start(RefCountedPtr<AdsCallState> ads_calld) {
160       if (sent_initial_request_) return;
161       sent_initial_request_ = true;
162       ads_calld_ = std::move(ads_calld);
163       Ref(DEBUG_LOCATION, "timer").release();
164       timer_pending_ = true;
165       grpc_timer_init(
166           &timer_,
167           ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
168           &timer_callback_);
169     }
170 
Finish()171     void Finish() {
172       if (timer_pending_) {
173         grpc_timer_cancel(&timer_);
174         timer_pending_ = false;
175       }
176     }
177 
178    private:
OnTimer(void * arg,grpc_error * error)179     static void OnTimer(void* arg, grpc_error* error) {
180       ResourceState* self = static_cast<ResourceState*>(arg);
181       {
182         MutexLock lock(&self->ads_calld_->xds_client()->mu_);
183         self->OnTimerLocked(GRPC_ERROR_REF(error));
184       }
185       self->ads_calld_.reset();
186       self->Unref(DEBUG_LOCATION, "timer");
187     }
188 
OnTimerLocked(grpc_error * error)189     void OnTimerLocked(grpc_error* error) {
190       if (error == GRPC_ERROR_NONE && timer_pending_) {
191         timer_pending_ = false;
192         grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
193             absl::StrFormat(
194                 "timeout obtaining resource {type=%s name=%s} from xds server",
195                 type_url_, name_)
196                 .c_str());
197         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
198           gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
199                   grpc_error_string(watcher_error));
200         }
201         if (type_url_ == XdsApi::kLdsTypeUrl) {
202           ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
203           for (const auto& p : state.watchers) {
204             p.first->OnError(GRPC_ERROR_REF(watcher_error));
205           }
206         } else if (type_url_ == XdsApi::kRdsTypeUrl) {
207           RouteConfigState& state =
208               ads_calld_->xds_client()->route_config_map_[name_];
209           for (const auto& p : state.watchers) {
210             p.first->OnError(GRPC_ERROR_REF(watcher_error));
211           }
212         } else if (type_url_ == XdsApi::kCdsTypeUrl) {
213           ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
214           for (const auto& p : state.watchers) {
215             p.first->OnError(GRPC_ERROR_REF(watcher_error));
216           }
217         } else if (type_url_ == XdsApi::kEdsTypeUrl) {
218           EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
219           for (const auto& p : state.watchers) {
220             p.first->OnError(GRPC_ERROR_REF(watcher_error));
221           }
222         } else {
223           GPR_UNREACHABLE_CODE(return );
224         }
225         GRPC_ERROR_UNREF(watcher_error);
226       }
227       GRPC_ERROR_UNREF(error);
228     }
229 
230     const std::string type_url_;
231     const std::string name_;
232 
233     RefCountedPtr<AdsCallState> ads_calld_;
234     bool sent_initial_request_;
235     bool timer_pending_ = false;
236     grpc_timer timer_;
237     grpc_closure timer_callback_;
238   };
239 
240   struct ResourceTypeState {
~ResourceTypeStategrpc_core::XdsClient::ChannelState::AdsCallState::ResourceTypeState241     ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
242 
243     // Nonce and error for this resource type.
244     std::string nonce;
245     grpc_error* error = GRPC_ERROR_NONE;
246 
247     // Subscribed resources of this type.
248     std::map<std::string /* name */, OrphanablePtr<ResourceState>>
249         subscribed_resources;
250   };
251 
252   void SendMessageLocked(const std::string& type_url);
253 
254   void AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map);
255   void AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map);
256   void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
257   void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
258 
259   static void OnRequestSent(void* arg, grpc_error* error);
260   void OnRequestSentLocked(grpc_error* error);
261   static void OnResponseReceived(void* arg, grpc_error* error);
262   bool OnResponseReceivedLocked();
263   static void OnStatusReceived(void* arg, grpc_error* error);
264   void OnStatusReceivedLocked(grpc_error* error);
265 
266   bool IsCurrentCallOnChannel() const;
267 
268   std::set<absl::string_view> ResourceNamesForRequest(
269       const std::string& type_url);
270 
271   // The owning RetryableCall<>.
272   RefCountedPtr<RetryableCall<AdsCallState>> parent_;
273 
274   bool sent_initial_message_ = false;
275   bool seen_response_ = false;
276 
277   // Always non-NULL.
278   grpc_call* call_;
279 
280   // recv_initial_metadata
281   grpc_metadata_array initial_metadata_recv_;
282 
283   // send_message
284   grpc_byte_buffer* send_message_payload_ = nullptr;
285   grpc_closure on_request_sent_;
286 
287   // recv_message
288   grpc_byte_buffer* recv_message_payload_ = nullptr;
289   grpc_closure on_response_received_;
290 
291   // recv_trailing_metadata
292   grpc_metadata_array trailing_metadata_recv_;
293   grpc_status_code status_code_;
294   grpc_slice status_details_;
295   grpc_closure on_status_received_;
296 
297   // Resource types for which requests need to be sent.
298   std::set<std::string /*type_url*/> buffered_requests_;
299 
300   // State for each resource type.
301   std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
302 };
303 
304 // Contains an LRS call to the xds server.
305 class XdsClient::ChannelState::LrsCallState
306     : public InternallyRefCounted<LrsCallState> {
307  public:
308   // The ctor and dtor should not be used directly.
309   explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
310   ~LrsCallState() override;
311 
312   void Orphan() override;
313 
314   void MaybeStartReportingLocked();
315 
parent()316   RetryableCall<LrsCallState>* parent() { return parent_.get(); }
chand() const317   ChannelState* chand() const { return parent_->chand(); }
xds_client() const318   XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const319   bool seen_response() const { return seen_response_; }
320 
321  private:
322   // Reports client-side load stats according to a fixed interval.
323   class Reporter : public InternallyRefCounted<Reporter> {
324    public:
Reporter(RefCountedPtr<LrsCallState> parent,grpc_millis report_interval)325     Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
326         : parent_(std::move(parent)), report_interval_(report_interval) {
327       GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
328                         grpc_schedule_on_exec_ctx);
329       GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
330                         grpc_schedule_on_exec_ctx);
331       ScheduleNextReportLocked();
332     }
333 
334     void Orphan() override;
335 
336    private:
337     void ScheduleNextReportLocked();
338     static void OnNextReportTimer(void* arg, grpc_error* error);
339     bool OnNextReportTimerLocked(grpc_error* error);
340     bool SendReportLocked();
341     static void OnReportDone(void* arg, grpc_error* error);
342     bool OnReportDoneLocked(grpc_error* error);
343 
IsCurrentReporterOnCall() const344     bool IsCurrentReporterOnCall() const {
345       return this == parent_->reporter_.get();
346     }
xds_client() const347     XdsClient* xds_client() const { return parent_->xds_client(); }
348 
349     // The owning LRS call.
350     RefCountedPtr<LrsCallState> parent_;
351 
352     // The load reporting state.
353     const grpc_millis report_interval_;
354     bool last_report_counters_were_zero_ = false;
355     bool next_report_timer_callback_pending_ = false;
356     grpc_timer next_report_timer_;
357     grpc_closure on_next_report_timer_;
358     grpc_closure on_report_done_;
359   };
360 
361   static void OnInitialRequestSent(void* arg, grpc_error* error);
362   void OnInitialRequestSentLocked();
363   static void OnResponseReceived(void* arg, grpc_error* error);
364   bool OnResponseReceivedLocked();
365   static void OnStatusReceived(void* arg, grpc_error* error);
366   void OnStatusReceivedLocked(grpc_error* error);
367 
368   bool IsCurrentCallOnChannel() const;
369 
370   // The owning RetryableCall<>.
371   RefCountedPtr<RetryableCall<LrsCallState>> parent_;
372   bool seen_response_ = false;
373 
374   // Always non-NULL.
375   grpc_call* call_;
376 
377   // recv_initial_metadata
378   grpc_metadata_array initial_metadata_recv_;
379 
380   // send_message
381   grpc_byte_buffer* send_message_payload_ = nullptr;
382   grpc_closure on_initial_request_sent_;
383 
384   // recv_message
385   grpc_byte_buffer* recv_message_payload_ = nullptr;
386   grpc_closure on_response_received_;
387 
388   // recv_trailing_metadata
389   grpc_metadata_array trailing_metadata_recv_;
390   grpc_status_code status_code_;
391   grpc_slice status_details_;
392   grpc_closure on_status_received_;
393 
394   // Load reporting state.
395   bool send_all_clusters_ = false;
396   std::set<std::string> cluster_names_;  // Asked for by the LRS server.
397   grpc_millis load_reporting_interval_ = 0;
398   OrphanablePtr<Reporter> reporter_;
399 };
400 
401 //
402 // XdsClient::ChannelState::StateWatcher
403 //
404 
405 class XdsClient::ChannelState::StateWatcher
406     : public AsyncConnectivityStateWatcherInterface {
407  public:
StateWatcher(RefCountedPtr<ChannelState> parent)408   explicit StateWatcher(RefCountedPtr<ChannelState> parent)
409       : parent_(std::move(parent)) {}
410 
411  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)412   void OnConnectivityStateChange(grpc_connectivity_state new_state,
413                                  const absl::Status& status) override {
414     MutexLock lock(&parent_->xds_client_->mu_);
415     if (!parent_->shutting_down_ &&
416         new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
417       // In TRANSIENT_FAILURE.  Notify all watchers of error.
418       gpr_log(GPR_INFO,
419               "[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
420               "status_message:(%s)",
421               parent_->xds_client(), status.ToString().c_str());
422       parent_->xds_client()->NotifyOnErrorLocked(
423           GRPC_ERROR_CREATE_FROM_STATIC_STRING(
424               "xds channel in TRANSIENT_FAILURE"));
425     }
426   }
427 
428   RefCountedPtr<ChannelState> parent_;
429 };
430 
431 //
432 // XdsClient::ChannelState
433 //
434 
435 namespace {
436 
CreateXdsChannel(const XdsBootstrap::XdsServer & server)437 grpc_channel* CreateXdsChannel(const XdsBootstrap::XdsServer& server) {
438   // Build channel args.
439   absl::InlinedVector<grpc_arg, 2> args_to_add = {
440       grpc_channel_arg_integer_create(
441           const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
442           5 * 60 * GPR_MS_PER_SEC),
443       grpc_channel_arg_integer_create(
444           const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
445   };
446   grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
447       g_channel_args, args_to_add.data(), args_to_add.size());
448   // Create channel creds.
449   RefCountedPtr<grpc_channel_credentials> channel_creds =
450       XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type,
451                                                 server.channel_creds_config);
452   // Create channel.
453   grpc_channel* channel = grpc_secure_channel_create(
454       channel_creds.get(), server.server_uri.c_str(), new_args, nullptr);
455   grpc_channel_args_destroy(new_args);
456   return channel;
457 }
458 
459 }  // namespace
460 
ChannelState(WeakRefCountedPtr<XdsClient> xds_client,const XdsBootstrap::XdsServer & server)461 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
462                                       const XdsBootstrap::XdsServer& server)
463     : InternallyRefCounted<ChannelState>(
464           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
465               ? "ChannelState"
466               : nullptr),
467       xds_client_(std::move(xds_client)),
468       server_(server) {
469   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
470     gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
471             xds_client_.get(), server.server_uri.c_str());
472   }
473   channel_ = CreateXdsChannel(server);
474   GPR_ASSERT(channel_ != nullptr);
475   StartConnectivityWatchLocked();
476 }
477 
~ChannelState()478 XdsClient::ChannelState::~ChannelState() {
479   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
480     gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
481             this);
482   }
483   grpc_channel_destroy(channel_);
484   xds_client_.reset(DEBUG_LOCATION, "ChannelState");
485 }
486 
Orphan()487 void XdsClient::ChannelState::Orphan() {
488   shutting_down_ = true;
489   CancelConnectivityWatchLocked();
490   ads_calld_.reset();
491   lrs_calld_.reset();
492   Unref(DEBUG_LOCATION, "ChannelState+orphaned");
493 }
494 
ads_calld() const495 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
496     const {
497   return ads_calld_->calld();
498 }
499 
lrs_calld() const500 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
501     const {
502   return lrs_calld_->calld();
503 }
504 
HasActiveAdsCall() const505 bool XdsClient::ChannelState::HasActiveAdsCall() const {
506   return ads_calld_->calld() != nullptr;
507 }
508 
MaybeStartLrsCall()509 void XdsClient::ChannelState::MaybeStartLrsCall() {
510   if (lrs_calld_ != nullptr) return;
511   lrs_calld_.reset(
512       new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
513 }
514 
StopLrsCall()515 void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
516 
StartConnectivityWatchLocked()517 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
518   grpc_channel_element* client_channel_elem =
519       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
520   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
521   watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
522   grpc_client_channel_start_connectivity_watch(
523       client_channel_elem, GRPC_CHANNEL_IDLE,
524       OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
525 }
526 
CancelConnectivityWatchLocked()527 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
528   grpc_channel_element* client_channel_elem =
529       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
530   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
531   grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
532 }
533 
Subscribe(const std::string & type_url,const std::string & name)534 void XdsClient::ChannelState::Subscribe(const std::string& type_url,
535                                         const std::string& name) {
536   if (ads_calld_ == nullptr) {
537     // Start the ADS call if this is the first request.
538     ads_calld_.reset(new RetryableCall<AdsCallState>(
539         Ref(DEBUG_LOCATION, "ChannelState+ads")));
540     // Note: AdsCallState's ctor will automatically subscribe to all
541     // resources that the XdsClient already has watchers for, so we can
542     // return here.
543     return;
544   }
545   // If the ADS call is in backoff state, we don't need to do anything now
546   // because when the call is restarted it will resend all necessary requests.
547   if (ads_calld() == nullptr) return;
548   // Subscribe to this resource if the ADS call is active.
549   ads_calld()->Subscribe(type_url, name);
550 }
551 
Unsubscribe(const std::string & type_url,const std::string & name,bool delay_unsubscription)552 void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
553                                           const std::string& name,
554                                           bool delay_unsubscription) {
555   if (ads_calld_ != nullptr) {
556     auto* calld = ads_calld_->calld();
557     if (calld != nullptr) {
558       calld->Unsubscribe(type_url, name, delay_unsubscription);
559       if (!calld->HasSubscribedResources()) ads_calld_.reset();
560     }
561   }
562 }
563 
564 //
565 // XdsClient::ChannelState::RetryableCall<>
566 //
567 
568 template <typename T>
RetryableCall(RefCountedPtr<ChannelState> chand)569 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
570     RefCountedPtr<ChannelState> chand)
571     : chand_(std::move(chand)),
572       backoff_(
573           BackOff::Options()
574               .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
575                                    1000)
576               .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
577               .set_jitter(GRPC_XDS_RECONNECT_JITTER)
578               .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
579   // Closure Initialization
580   GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
581                     grpc_schedule_on_exec_ctx);
582   StartNewCallLocked();
583 }
584 
585 template <typename T>
Orphan()586 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
587   shutting_down_ = true;
588   calld_.reset();
589   if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
590   this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
591 }
592 
593 template <typename T>
OnCallFinishedLocked()594 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
595   const bool seen_response = calld_->seen_response();
596   calld_.reset();
597   if (seen_response) {
598     // If we lost connection to the xds server, reset backoff and restart the
599     // call immediately.
600     backoff_.Reset();
601     StartNewCallLocked();
602   } else {
603     // If we failed to connect to the xds server, retry later.
604     StartRetryTimerLocked();
605   }
606 }
607 
608 template <typename T>
StartNewCallLocked()609 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
610   if (shutting_down_) return;
611   GPR_ASSERT(chand_->channel_ != nullptr);
612   GPR_ASSERT(calld_ == nullptr);
613   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
614     gpr_log(GPR_INFO,
615             "[xds_client %p] Start new call from retryable call (chand: %p, "
616             "retryable call: %p)",
617             chand()->xds_client(), chand(), this);
618   }
619   calld_ = MakeOrphanable<T>(
620       this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
621 }
622 
623 template <typename T>
StartRetryTimerLocked()624 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
625   if (shutting_down_) return;
626   const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
627   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
628     grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
629     gpr_log(GPR_INFO,
630             "[xds_client %p] Failed to connect to xds server (chand: %p) "
631             "retry timer will fire in %" PRId64 "ms.",
632             chand()->xds_client(), chand(), timeout);
633   }
634   this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
635   grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
636   retry_timer_callback_pending_ = true;
637 }
638 
639 template <typename T>
OnRetryTimer(void * arg,grpc_error * error)640 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
641     void* arg, grpc_error* error) {
642   RetryableCall* calld = static_cast<RetryableCall*>(arg);
643   {
644     MutexLock lock(&calld->chand_->xds_client()->mu_);
645     calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
646   }
647   calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
648 }
649 
650 template <typename T>
OnRetryTimerLocked(grpc_error * error)651 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
652     grpc_error* error) {
653   retry_timer_callback_pending_ = false;
654   if (!shutting_down_ && error == GRPC_ERROR_NONE) {
655     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
656       gpr_log(
657           GPR_INFO,
658           "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
659           chand()->xds_client(), chand(), this);
660     }
661     StartNewCallLocked();
662   }
663   GRPC_ERROR_UNREF(error);
664 }
665 
666 //
667 // XdsClient::ChannelState::AdsCallState
668 //
669 
AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent)670 XdsClient::ChannelState::AdsCallState::AdsCallState(
671     RefCountedPtr<RetryableCall<AdsCallState>> parent)
672     : InternallyRefCounted<AdsCallState>(
673           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
674               ? "AdsCallState"
675               : nullptr),
676       parent_(std::move(parent)) {
677   // Init the ADS call. Note that the call will progress every time there's
678   // activity in xds_client()->interested_parties_, which is comprised of
679   // the polling entities from client_channel.
680   GPR_ASSERT(xds_client() != nullptr);
681   // Create a call with the specified method name.
682   const auto& method =
683       chand()->server_.ShouldUseV3()
684           ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
685           : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
686   call_ = grpc_channel_create_pollset_set_call(
687       chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
688       xds_client()->interested_parties_, method, nullptr,
689       GRPC_MILLIS_INF_FUTURE, nullptr);
690   GPR_ASSERT(call_ != nullptr);
691   // Init data associated with the call.
692   grpc_metadata_array_init(&initial_metadata_recv_);
693   grpc_metadata_array_init(&trailing_metadata_recv_);
694   // Start the call.
695   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
696     gpr_log(GPR_INFO,
697             "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
698             "call: %p)",
699             xds_client(), chand(), this, call_);
700   }
701   // Create the ops.
702   grpc_call_error call_error;
703   grpc_op ops[3];
704   memset(ops, 0, sizeof(ops));
705   // Op: send initial metadata.
706   grpc_op* op = ops;
707   op->op = GRPC_OP_SEND_INITIAL_METADATA;
708   op->data.send_initial_metadata.count = 0;
709   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
710               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
711   op->reserved = nullptr;
712   op++;
713   call_error = grpc_call_start_batch_and_execute(
714       call_, ops, static_cast<size_t>(op - ops), nullptr);
715   GPR_ASSERT(GRPC_CALL_OK == call_error);
716   // Op: send request message.
717   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
718                     grpc_schedule_on_exec_ctx);
719   for (const auto& p : xds_client()->listener_map_) {
720     Subscribe(XdsApi::kLdsTypeUrl, std::string(p.first));
721   }
722   for (const auto& p : xds_client()->route_config_map_) {
723     Subscribe(XdsApi::kRdsTypeUrl, std::string(p.first));
724   }
725   for (const auto& p : xds_client()->cluster_map_) {
726     Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
727   }
728   for (const auto& p : xds_client()->endpoint_map_) {
729     Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first));
730   }
731   // Op: recv initial metadata.
732   op = ops;
733   op->op = GRPC_OP_RECV_INITIAL_METADATA;
734   op->data.recv_initial_metadata.recv_initial_metadata =
735       &initial_metadata_recv_;
736   op->flags = 0;
737   op->reserved = nullptr;
738   op++;
739   // Op: recv response.
740   op->op = GRPC_OP_RECV_MESSAGE;
741   op->data.recv_message.recv_message = &recv_message_payload_;
742   op->flags = 0;
743   op->reserved = nullptr;
744   op++;
745   Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
746   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
747                     grpc_schedule_on_exec_ctx);
748   call_error = grpc_call_start_batch_and_execute(
749       call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
750   GPR_ASSERT(GRPC_CALL_OK == call_error);
751   // Op: recv server status.
752   op = ops;
753   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
754   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
755   op->data.recv_status_on_client.status = &status_code_;
756   op->data.recv_status_on_client.status_details = &status_details_;
757   op->flags = 0;
758   op->reserved = nullptr;
759   op++;
760   // This callback signals the end of the call, so it relies on the initial
761   // ref instead of a new ref. When it's invoked, it's the initial ref that is
762   // unreffed.
763   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
764                     grpc_schedule_on_exec_ctx);
765   call_error = grpc_call_start_batch_and_execute(
766       call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
767   GPR_ASSERT(GRPC_CALL_OK == call_error);
768 }
769 
~AdsCallState()770 XdsClient::ChannelState::AdsCallState::~AdsCallState() {
771   grpc_metadata_array_destroy(&initial_metadata_recv_);
772   grpc_metadata_array_destroy(&trailing_metadata_recv_);
773   grpc_byte_buffer_destroy(send_message_payload_);
774   grpc_byte_buffer_destroy(recv_message_payload_);
775   grpc_slice_unref_internal(status_details_);
776   GPR_ASSERT(call_ != nullptr);
777   grpc_call_unref(call_);
778 }
779 
Orphan()780 void XdsClient::ChannelState::AdsCallState::Orphan() {
781   GPR_ASSERT(call_ != nullptr);
782   // If we are here because xds_client wants to cancel the call,
783   // on_status_received_ will complete the cancellation and clean up. Otherwise,
784   // we are here because xds_client has to orphan a failed call, then the
785   // following cancellation will be a no-op.
786   grpc_call_cancel_internal(call_);
787   state_map_.clear();
788   // Note that the initial ref is hold by on_status_received_. So the
789   // corresponding unref happens in on_status_received_ instead of here.
790 }
791 
SendMessageLocked(const std::string & type_url)792 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
793     const std::string& type_url) {
794   // Buffer message sending if an existing message is in flight.
795   if (send_message_payload_ != nullptr) {
796     buffered_requests_.insert(type_url);
797     return;
798   }
799   auto& state = state_map_[type_url];
800   grpc_slice request_payload_slice;
801   std::set<absl::string_view> resource_names =
802       ResourceNamesForRequest(type_url);
803   request_payload_slice = xds_client()->api_.CreateAdsRequest(
804       chand()->server_, type_url, resource_names,
805       xds_client()->resource_version_map_[type_url], state.nonce,
806       GRPC_ERROR_REF(state.error), !sent_initial_message_);
807   if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
808       type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
809     state_map_.erase(type_url);
810   }
811   sent_initial_message_ = true;
812   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
813     gpr_log(GPR_INFO,
814             "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
815             "error=%s resources=%s",
816             xds_client(), type_url.c_str(),
817             xds_client()->resource_version_map_[type_url].c_str(),
818             state.nonce.c_str(), grpc_error_string(state.error),
819             absl::StrJoin(resource_names, " ").c_str());
820   }
821   GRPC_ERROR_UNREF(state.error);
822   state.error = GRPC_ERROR_NONE;
823   // Create message payload.
824   send_message_payload_ =
825       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
826   grpc_slice_unref_internal(request_payload_slice);
827   // Send the message.
828   grpc_op op;
829   memset(&op, 0, sizeof(op));
830   op.op = GRPC_OP_SEND_MESSAGE;
831   op.data.send_message.send_message = send_message_payload_;
832   Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
833   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
834                     grpc_schedule_on_exec_ctx);
835   grpc_call_error call_error =
836       grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
837   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
838     gpr_log(GPR_ERROR,
839             "[xds_client %p] calld=%p call_error=%d sending ADS message",
840             xds_client(), this, call_error);
841     GPR_ASSERT(GRPC_CALL_OK == call_error);
842   }
843 }
844 
Subscribe(const std::string & type_url,const std::string & name)845 void XdsClient::ChannelState::AdsCallState::Subscribe(
846     const std::string& type_url, const std::string& name) {
847   auto& state = state_map_[type_url].subscribed_resources[name];
848   if (state == nullptr) {
849     state = MakeOrphanable<ResourceState>(
850         type_url, name, !xds_client()->resource_version_map_[type_url].empty());
851     SendMessageLocked(type_url);
852   }
853 }
854 
Unsubscribe(const std::string & type_url,const std::string & name,bool delay_unsubscription)855 void XdsClient::ChannelState::AdsCallState::Unsubscribe(
856     const std::string& type_url, const std::string& name,
857     bool delay_unsubscription) {
858   state_map_[type_url].subscribed_resources.erase(name);
859   if (!delay_unsubscription) SendMessageLocked(type_url);
860 }
861 
HasSubscribedResources() const862 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
863   for (const auto& p : state_map_) {
864     if (!p.second.subscribed_resources.empty()) return true;
865   }
866   return false;
867 }
868 
AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map)869 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
870     XdsApi::LdsUpdateMap lds_update_map) {
871   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
872     gpr_log(GPR_INFO,
873             "[xds_client %p] LDS update received containing %" PRIuPTR
874             " resources",
875             xds_client(), lds_update_map.size());
876   }
877   auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
878   std::set<std::string> rds_resource_names_seen;
879   for (auto& p : lds_update_map) {
880     const std::string& listener_name = p.first;
881     XdsApi::LdsUpdate& lds_update = p.second;
882     auto& state = lds_state.subscribed_resources[listener_name];
883     if (state != nullptr) state->Finish();
884     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
885       gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: %s", xds_client(),
886               listener_name.c_str(), lds_update.ToString().c_str());
887     }
888     // Record the RDS resource names seen.
889     if (!lds_update.route_config_name.empty()) {
890       rds_resource_names_seen.insert(lds_update.route_config_name);
891     }
892     // Ignore identical update.
893     ListenerState& listener_state = xds_client()->listener_map_[listener_name];
894     if (listener_state.update.has_value() &&
895         *listener_state.update == lds_update) {
896       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
897         gpr_log(GPR_INFO,
898                 "[xds_client %p] LDS update for %s identical to current, "
899                 "ignoring.",
900                 xds_client(), listener_name.c_str());
901       }
902       continue;
903     }
904     // Update the listener state.
905     listener_state.update = std::move(lds_update);
906     // Notify watchers.
907     for (const auto& p : listener_state.watchers) {
908       p.first->OnListenerChanged(*listener_state.update);
909     }
910   }
911   // For any subscribed resource that is not present in the update,
912   // remove it from the cache and notify watchers that it does not exist.
913   for (const auto& p : lds_state.subscribed_resources) {
914     const std::string& listener_name = p.first;
915     if (lds_update_map.find(listener_name) == lds_update_map.end()) {
916       ListenerState& listener_state =
917           xds_client()->listener_map_[listener_name];
918       // If the resource was newly requested but has not yet been received,
919       // we don't want to generate an error for the watchers, because this LDS
920       // response may be in reaction to an earlier request that did not yet
921       // request the new resource, so its absence from the response does not
922       // necessarily indicate that the resource does not exist.
923       // For that case, we rely on the request timeout instead.
924       if (!listener_state.update.has_value()) continue;
925       listener_state.update.reset();
926       for (const auto& p : listener_state.watchers) {
927         p.first->OnResourceDoesNotExist();
928       }
929     }
930   }
931   // For any RDS resource that is no longer referred to by any LDS
932   // resources, remove it from the cache and notify watchers that it
933   // does not exist.
934   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
935   for (const auto& p : rds_state.subscribed_resources) {
936     const std::string& rds_resource_name = p.first;
937     if (rds_resource_names_seen.find(rds_resource_name) ==
938         rds_resource_names_seen.end()) {
939       RouteConfigState& route_config_state =
940           xds_client()->route_config_map_[rds_resource_name];
941       route_config_state.update.reset();
942       for (const auto& p : route_config_state.watchers) {
943         p.first->OnResourceDoesNotExist();
944       }
945     }
946   }
947 }
948 
AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map)949 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
950     XdsApi::RdsUpdateMap rds_update_map) {
951   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
952     gpr_log(GPR_INFO,
953             "[xds_client %p] RDS update received containing %" PRIuPTR
954             " resources",
955             xds_client(), rds_update_map.size());
956   }
957   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
958   for (auto& p : rds_update_map) {
959     const std::string& route_config_name = p.first;
960     XdsApi::RdsUpdate& rds_update = p.second;
961     auto& state = rds_state.subscribed_resources[route_config_name];
962     if (state != nullptr) state->Finish();
963     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
964       gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
965               rds_update.ToString().c_str());
966     }
967     RouteConfigState& route_config_state =
968         xds_client()->route_config_map_[route_config_name];
969     // Ignore identical update.
970     if (route_config_state.update.has_value() &&
971         *route_config_state.update == rds_update) {
972       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
973         gpr_log(GPR_INFO,
974                 "[xds_client %p] RDS resource identical to current, ignoring",
975                 xds_client());
976       }
977       continue;
978     }
979     // Update the cache.
980     route_config_state.update = std::move(rds_update);
981     // Notify all watchers.
982     for (const auto& p : route_config_state.watchers) {
983       p.first->OnRouteConfigChanged(*route_config_state.update);
984     }
985   }
986 }
987 
AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map)988 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
989     XdsApi::CdsUpdateMap cds_update_map) {
990   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
991     gpr_log(GPR_INFO,
992             "[xds_client %p] CDS update received containing %" PRIuPTR
993             " resources",
994             xds_client(), cds_update_map.size());
995   }
996   auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
997   std::set<std::string> eds_resource_names_seen;
998   for (auto& p : cds_update_map) {
999     const char* cluster_name = p.first.c_str();
1000     XdsApi::CdsUpdate& cds_update = p.second;
1001     auto& state = cds_state.subscribed_resources[cluster_name];
1002     if (state != nullptr) state->Finish();
1003     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1004       gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(),
1005               cluster_name, cds_update.ToString().c_str());
1006     }
1007     // Record the EDS resource names seen.
1008     eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
1009                                        ? cluster_name
1010                                        : cds_update.eds_service_name);
1011     // Ignore identical update.
1012     ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1013     if (cluster_state.update.has_value() &&
1014         *cluster_state.update == cds_update) {
1015       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1016         gpr_log(GPR_INFO,
1017                 "[xds_client %p] CDS update identical to current, ignoring.",
1018                 xds_client());
1019       }
1020       continue;
1021     }
1022     // Update the cluster state.
1023     cluster_state.update = std::move(cds_update);
1024     // Notify all watchers.
1025     for (const auto& p : cluster_state.watchers) {
1026       p.first->OnClusterChanged(cluster_state.update.value());
1027     }
1028   }
1029   // For any subscribed resource that is not present in the update,
1030   // remove it from the cache and notify watchers that it does not exist.
1031   for (const auto& p : cds_state.subscribed_resources) {
1032     const std::string& cluster_name = p.first;
1033     if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
1034       ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1035       // If the resource was newly requested but has not yet been received,
1036       // we don't want to generate an error for the watchers, because this CDS
1037       // response may be in reaction to an earlier request that did not yet
1038       // request the new resource, so its absence from the response does not
1039       // necessarily indicate that the resource does not exist.
1040       // For that case, we rely on the request timeout instead.
1041       if (!cluster_state.update.has_value()) continue;
1042       cluster_state.update.reset();
1043       for (const auto& p : cluster_state.watchers) {
1044         p.first->OnResourceDoesNotExist();
1045       }
1046     }
1047   }
1048   // For any EDS resource that is no longer referred to by any CDS
1049   // resources, remove it from the cache and notify watchers that it
1050   // does not exist.
1051   auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1052   for (const auto& p : eds_state.subscribed_resources) {
1053     const std::string& eds_resource_name = p.first;
1054     if (eds_resource_names_seen.find(eds_resource_name) ==
1055         eds_resource_names_seen.end()) {
1056       EndpointState& endpoint_state =
1057           xds_client()->endpoint_map_[eds_resource_name];
1058       endpoint_state.update.reset();
1059       for (const auto& p : endpoint_state.watchers) {
1060         p.first->OnResourceDoesNotExist();
1061       }
1062     }
1063   }
1064 }
1065 
AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map)1066 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
1067     XdsApi::EdsUpdateMap eds_update_map) {
1068   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1069     gpr_log(GPR_INFO,
1070             "[xds_client %p] EDS update received containing %" PRIuPTR
1071             " resources",
1072             xds_client(), eds_update_map.size());
1073   }
1074   auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1075   for (auto& p : eds_update_map) {
1076     const char* eds_service_name = p.first.c_str();
1077     XdsApi::EdsUpdate& eds_update = p.second;
1078     auto& state = eds_state.subscribed_resources[eds_service_name];
1079     if (state != nullptr) state->Finish();
1080     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1081       gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
1082               eds_service_name, eds_update.ToString().c_str());
1083     }
1084     EndpointState& endpoint_state =
1085         xds_client()->endpoint_map_[eds_service_name];
1086     // Ignore identical update.
1087     if (endpoint_state.update.has_value() &&
1088         *endpoint_state.update == eds_update) {
1089       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1090         gpr_log(GPR_INFO,
1091                 "[xds_client %p] EDS update identical to current, ignoring.",
1092                 xds_client());
1093       }
1094       continue;
1095     }
1096     // Update the cluster state.
1097     endpoint_state.update = std::move(eds_update);
1098     // Notify all watchers.
1099     for (const auto& p : endpoint_state.watchers) {
1100       p.first->OnEndpointChanged(endpoint_state.update.value());
1101     }
1102   }
1103 }
1104 
OnRequestSent(void * arg,grpc_error * error)1105 void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
1106                                                           grpc_error* error) {
1107   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1108   {
1109     MutexLock lock(&ads_calld->xds_client()->mu_);
1110     ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
1111   }
1112   ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1113 }
1114 
OnRequestSentLocked(grpc_error * error)1115 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1116     grpc_error* error) {
1117   if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
1118     // Clean up the sent message.
1119     grpc_byte_buffer_destroy(send_message_payload_);
1120     send_message_payload_ = nullptr;
1121     // Continue to send another pending message if any.
1122     // TODO(roth): The current code to handle buffered messages has the
1123     // advantage of sending only the most recent list of resource names for
1124     // each resource type (no matter how many times that resource type has
1125     // been requested to send while the current message sending is still
1126     // pending). But its disadvantage is that we send the requests in fixed
1127     // order of resource types. We need to fix this if we are seeing some
1128     // resource type(s) starved due to frequent requests of other resource
1129     // type(s).
1130     auto it = buffered_requests_.begin();
1131     if (it != buffered_requests_.end()) {
1132       SendMessageLocked(*it);
1133       buffered_requests_.erase(it);
1134     }
1135   }
1136   GRPC_ERROR_UNREF(error);
1137 }
1138 
OnResponseReceived(void * arg,grpc_error *)1139 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1140     void* arg, grpc_error* /* error */) {
1141   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1142   bool done;
1143   {
1144     MutexLock lock(&ads_calld->xds_client()->mu_);
1145     done = ads_calld->OnResponseReceivedLocked();
1146   }
1147   if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1148 }
1149 
OnResponseReceivedLocked()1150 bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1151   // Empty payload means the call was cancelled.
1152   if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1153     return true;
1154   }
1155   // Read the response.
1156   grpc_byte_buffer_reader bbr;
1157   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1158   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1159   grpc_byte_buffer_reader_destroy(&bbr);
1160   grpc_byte_buffer_destroy(recv_message_payload_);
1161   recv_message_payload_ = nullptr;
1162   // Parse and validate the response.
1163   XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
1164       response_slice, ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
1165       ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
1166       ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
1167       ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
1168   grpc_slice_unref_internal(response_slice);
1169   if (result.type_url.empty()) {
1170     // Ignore unparsable response.
1171     gpr_log(GPR_ERROR,
1172             "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
1173             xds_client(), grpc_error_string(result.parse_error));
1174     GRPC_ERROR_UNREF(result.parse_error);
1175   } else {
1176     // Update nonce.
1177     auto& state = state_map_[result.type_url];
1178     state.nonce = std::move(result.nonce);
1179     // NACK or ACK the response.
1180     if (result.parse_error != GRPC_ERROR_NONE) {
1181       GRPC_ERROR_UNREF(state.error);
1182       state.error = result.parse_error;
1183       // NACK unacceptable update.
1184       gpr_log(GPR_ERROR,
1185               "[xds_client %p] ADS response invalid for resource type %s "
1186               "version %s, will NACK: nonce=%s error=%s",
1187               xds_client(), result.type_url.c_str(), result.version.c_str(),
1188               state.nonce.c_str(), grpc_error_string(result.parse_error));
1189       SendMessageLocked(result.type_url);
1190     } else {
1191       seen_response_ = true;
1192       // Accept the ADS response according to the type_url.
1193       if (result.type_url == XdsApi::kLdsTypeUrl) {
1194         AcceptLdsUpdate(std::move(result.lds_update_map));
1195       } else if (result.type_url == XdsApi::kRdsTypeUrl) {
1196         AcceptRdsUpdate(std::move(result.rds_update_map));
1197       } else if (result.type_url == XdsApi::kCdsTypeUrl) {
1198         AcceptCdsUpdate(std::move(result.cds_update_map));
1199       } else if (result.type_url == XdsApi::kEdsTypeUrl) {
1200         AcceptEdsUpdate(std::move(result.eds_update_map));
1201       }
1202       xds_client()->resource_version_map_[result.type_url] =
1203           std::move(result.version);
1204       // ACK the update.
1205       SendMessageLocked(result.type_url);
1206       // Start load reporting if needed.
1207       auto& lrs_call = chand()->lrs_calld_;
1208       if (lrs_call != nullptr) {
1209         LrsCallState* lrs_calld = lrs_call->calld();
1210         if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1211       }
1212     }
1213   }
1214   if (xds_client()->shutting_down_) return true;
1215   // Keep listening for updates.
1216   grpc_op op;
1217   memset(&op, 0, sizeof(op));
1218   op.op = GRPC_OP_RECV_MESSAGE;
1219   op.data.recv_message.recv_message = &recv_message_payload_;
1220   op.flags = 0;
1221   op.reserved = nullptr;
1222   GPR_ASSERT(call_ != nullptr);
1223   // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1224   const grpc_call_error call_error =
1225       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1226   GPR_ASSERT(GRPC_CALL_OK == call_error);
1227   return false;
1228 }
1229 
OnStatusReceived(void * arg,grpc_error * error)1230 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1231     void* arg, grpc_error* error) {
1232   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1233   {
1234     MutexLock lock(&ads_calld->xds_client()->mu_);
1235     ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1236   }
1237   ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1238 }
1239 
OnStatusReceivedLocked(grpc_error * error)1240 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1241     grpc_error* error) {
1242   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1243     char* status_details = grpc_slice_to_c_string(status_details_);
1244     gpr_log(GPR_INFO,
1245             "[xds_client %p] ADS call status received. Status = %d, details "
1246             "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
1247             xds_client(), status_code_, status_details, chand(), this, call_,
1248             grpc_error_string(error));
1249     gpr_free(status_details);
1250   }
1251   // Ignore status from a stale call.
1252   if (IsCurrentCallOnChannel()) {
1253     // Try to restart the call.
1254     parent_->OnCallFinishedLocked();
1255     // Send error to all watchers.
1256     xds_client()->NotifyOnErrorLocked(
1257         GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
1258   }
1259   GRPC_ERROR_UNREF(error);
1260 }
1261 
IsCurrentCallOnChannel() const1262 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1263   // If the retryable ADS call is null (which only happens when the xds channel
1264   // is shutting down), all the ADS calls are stale.
1265   if (chand()->ads_calld_ == nullptr) return false;
1266   return this == chand()->ads_calld_->calld();
1267 }
1268 
1269 std::set<absl::string_view>
ResourceNamesForRequest(const std::string & type_url)1270 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1271     const std::string& type_url) {
1272   std::set<absl::string_view> resource_names;
1273   auto it = state_map_.find(type_url);
1274   if (it != state_map_.end()) {
1275     for (auto& p : it->second.subscribed_resources) {
1276       resource_names.insert(p.first);
1277       OrphanablePtr<ResourceState>& state = p.second;
1278       state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
1279     }
1280   }
1281   return resource_names;
1282 }
1283 
1284 //
1285 // XdsClient::ChannelState::LrsCallState::Reporter
1286 //
1287 
Orphan()1288 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1289   if (next_report_timer_callback_pending_) {
1290     grpc_timer_cancel(&next_report_timer_);
1291   }
1292 }
1293 
1294 void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked()1295     ScheduleNextReportLocked() {
1296   const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1297   grpc_timer_init(&next_report_timer_, next_report_time,
1298                   &on_next_report_timer_);
1299   next_report_timer_callback_pending_ = true;
1300 }
1301 
OnNextReportTimer(void * arg,grpc_error * error)1302 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1303     void* arg, grpc_error* error) {
1304   Reporter* self = static_cast<Reporter*>(arg);
1305   bool done;
1306   {
1307     MutexLock lock(&self->xds_client()->mu_);
1308     done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
1309   }
1310   if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
1311 }
1312 
OnNextReportTimerLocked(grpc_error * error)1313 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1314     grpc_error* error) {
1315   next_report_timer_callback_pending_ = false;
1316   if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1317     GRPC_ERROR_UNREF(error);
1318     return true;
1319   }
1320   return SendReportLocked();
1321 }
1322 
1323 namespace {
1324 
LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap & snapshot)1325 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1326   for (const auto& p : snapshot) {
1327     const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1328     if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1329     for (const auto& q : cluster_snapshot.locality_stats) {
1330       const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1331       if (!locality_snapshot.IsZero()) return false;
1332     }
1333   }
1334   return true;
1335 }
1336 
1337 }  // namespace
1338 
SendReportLocked()1339 bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1340   // Construct snapshot from all reported stats.
1341   XdsApi::ClusterLoadReportMap snapshot =
1342       xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
1343                                                   parent_->cluster_names_);
1344   // Skip client load report if the counters were all zero in the last
1345   // report and they are still zero in this one.
1346   const bool old_val = last_report_counters_were_zero_;
1347   last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1348   if (old_val && last_report_counters_were_zero_) {
1349     if (xds_client()->load_report_map_.empty()) {
1350       parent_->chand()->StopLrsCall();
1351       return true;
1352     }
1353     ScheduleNextReportLocked();
1354     return false;
1355   }
1356   // Create a request that contains the snapshot.
1357   grpc_slice request_payload_slice =
1358       xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1359   parent_->send_message_payload_ =
1360       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1361   grpc_slice_unref_internal(request_payload_slice);
1362   // Send the report.
1363   grpc_op op;
1364   memset(&op, 0, sizeof(op));
1365   op.op = GRPC_OP_SEND_MESSAGE;
1366   op.data.send_message.send_message = parent_->send_message_payload_;
1367   grpc_call_error call_error = grpc_call_start_batch_and_execute(
1368       parent_->call_, &op, 1, &on_report_done_);
1369   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1370     gpr_log(GPR_ERROR,
1371             "[xds_client %p] calld=%p call_error=%d sending client load report",
1372             xds_client(), this, call_error);
1373     GPR_ASSERT(GRPC_CALL_OK == call_error);
1374   }
1375   return false;
1376 }
1377 
OnReportDone(void * arg,grpc_error * error)1378 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
1379     void* arg, grpc_error* error) {
1380   Reporter* self = static_cast<Reporter*>(arg);
1381   bool done;
1382   {
1383     MutexLock lock(&self->xds_client()->mu_);
1384     done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
1385   }
1386   if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1387 }
1388 
OnReportDoneLocked(grpc_error * error)1389 bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1390     grpc_error* error) {
1391   grpc_byte_buffer_destroy(parent_->send_message_payload_);
1392   parent_->send_message_payload_ = nullptr;
1393   // If there are no more registered stats to report, cancel the call.
1394   if (xds_client()->load_report_map_.empty()) {
1395     parent_->chand()->StopLrsCall();
1396     GRPC_ERROR_UNREF(error);
1397     return true;
1398   }
1399   if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1400     GRPC_ERROR_UNREF(error);
1401     // If this reporter is no longer the current one on the call, the reason
1402     // might be that it was orphaned for a new one due to config update.
1403     if (!IsCurrentReporterOnCall()) {
1404       parent_->MaybeStartReportingLocked();
1405     }
1406     return true;
1407   }
1408   ScheduleNextReportLocked();
1409   return false;
1410 }
1411 
1412 //
1413 // XdsClient::ChannelState::LrsCallState
1414 //
1415 
LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent)1416 XdsClient::ChannelState::LrsCallState::LrsCallState(
1417     RefCountedPtr<RetryableCall<LrsCallState>> parent)
1418     : InternallyRefCounted<LrsCallState>(
1419           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
1420               ? "LrsCallState"
1421               : nullptr),
1422       parent_(std::move(parent)) {
1423   // Init the LRS call. Note that the call will progress every time there's
1424   // activity in xds_client()->interested_parties_, which is comprised of
1425   // the polling entities from client_channel.
1426   GPR_ASSERT(xds_client() != nullptr);
1427   const auto& method =
1428       chand()->server_.ShouldUseV3()
1429           ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
1430           : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
1431   call_ = grpc_channel_create_pollset_set_call(
1432       chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1433       xds_client()->interested_parties_, method, nullptr,
1434       GRPC_MILLIS_INF_FUTURE, nullptr);
1435   GPR_ASSERT(call_ != nullptr);
1436   // Init the request payload.
1437   grpc_slice request_payload_slice =
1438       xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
1439   send_message_payload_ =
1440       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1441   grpc_slice_unref_internal(request_payload_slice);
1442   // Init other data associated with the LRS call.
1443   grpc_metadata_array_init(&initial_metadata_recv_);
1444   grpc_metadata_array_init(&trailing_metadata_recv_);
1445   // Start the call.
1446   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1447     gpr_log(GPR_INFO,
1448             "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
1449             "call: %p)",
1450             xds_client(), chand(), this, call_);
1451   }
1452   // Create the ops.
1453   grpc_call_error call_error;
1454   grpc_op ops[3];
1455   memset(ops, 0, sizeof(ops));
1456   // Op: send initial metadata.
1457   grpc_op* op = ops;
1458   op->op = GRPC_OP_SEND_INITIAL_METADATA;
1459   op->data.send_initial_metadata.count = 0;
1460   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
1461               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1462   op->reserved = nullptr;
1463   op++;
1464   // Op: send request message.
1465   GPR_ASSERT(send_message_payload_ != nullptr);
1466   op->op = GRPC_OP_SEND_MESSAGE;
1467   op->data.send_message.send_message = send_message_payload_;
1468   op->flags = 0;
1469   op->reserved = nullptr;
1470   op++;
1471   Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1472   GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
1473                     grpc_schedule_on_exec_ctx);
1474   call_error = grpc_call_start_batch_and_execute(
1475       call_, ops, static_cast<size_t>(op - ops), &on_initial_request_sent_);
1476   GPR_ASSERT(GRPC_CALL_OK == call_error);
1477   // Op: recv initial metadata.
1478   op = ops;
1479   op->op = GRPC_OP_RECV_INITIAL_METADATA;
1480   op->data.recv_initial_metadata.recv_initial_metadata =
1481       &initial_metadata_recv_;
1482   op->flags = 0;
1483   op->reserved = nullptr;
1484   op++;
1485   // Op: recv response.
1486   op->op = GRPC_OP_RECV_MESSAGE;
1487   op->data.recv_message.recv_message = &recv_message_payload_;
1488   op->flags = 0;
1489   op->reserved = nullptr;
1490   op++;
1491   Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1492   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1493                     grpc_schedule_on_exec_ctx);
1494   call_error = grpc_call_start_batch_and_execute(
1495       call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
1496   GPR_ASSERT(GRPC_CALL_OK == call_error);
1497   // Op: recv server status.
1498   op = ops;
1499   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1500   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1501   op->data.recv_status_on_client.status = &status_code_;
1502   op->data.recv_status_on_client.status_details = &status_details_;
1503   op->flags = 0;
1504   op->reserved = nullptr;
1505   op++;
1506   // This callback signals the end of the call, so it relies on the initial
1507   // ref instead of a new ref. When it's invoked, it's the initial ref that is
1508   // unreffed.
1509   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1510                     grpc_schedule_on_exec_ctx);
1511   call_error = grpc_call_start_batch_and_execute(
1512       call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
1513   GPR_ASSERT(GRPC_CALL_OK == call_error);
1514 }
1515 
~LrsCallState()1516 XdsClient::ChannelState::LrsCallState::~LrsCallState() {
1517   grpc_metadata_array_destroy(&initial_metadata_recv_);
1518   grpc_metadata_array_destroy(&trailing_metadata_recv_);
1519   grpc_byte_buffer_destroy(send_message_payload_);
1520   grpc_byte_buffer_destroy(recv_message_payload_);
1521   grpc_slice_unref_internal(status_details_);
1522   GPR_ASSERT(call_ != nullptr);
1523   grpc_call_unref(call_);
1524 }
1525 
Orphan()1526 void XdsClient::ChannelState::LrsCallState::Orphan() {
1527   reporter_.reset();
1528   GPR_ASSERT(call_ != nullptr);
1529   // If we are here because xds_client wants to cancel the call,
1530   // on_status_received_ will complete the cancellation and clean up. Otherwise,
1531   // we are here because xds_client has to orphan a failed call, then the
1532   // following cancellation will be a no-op.
1533   grpc_call_cancel_internal(call_);
1534   // Note that the initial ref is hold by on_status_received_. So the
1535   // corresponding unref happens in on_status_received_ instead of here.
1536 }
1537 
MaybeStartReportingLocked()1538 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1539   // Don't start again if already started.
1540   if (reporter_ != nullptr) return;
1541   // Don't start if the previous send_message op (of the initial request or the
1542   // last report of the previous reporter) hasn't completed.
1543   if (send_message_payload_ != nullptr) return;
1544   // Don't start if no LRS response has arrived.
1545   if (!seen_response()) return;
1546   // Don't start if the ADS call hasn't received any valid response. Note that
1547   // this must be the first channel because it is the current channel but its
1548   // ADS call hasn't seen any response.
1549   if (chand()->ads_calld_ == nullptr ||
1550       chand()->ads_calld_->calld() == nullptr ||
1551       !chand()->ads_calld_->calld()->seen_response()) {
1552     return;
1553   }
1554   // Start reporting.
1555   reporter_ = MakeOrphanable<Reporter>(
1556       Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1557 }
1558 
OnInitialRequestSent(void * arg,grpc_error *)1559 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
1560     void* arg, grpc_error* /*error*/) {
1561   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1562   {
1563     MutexLock lock(&lrs_calld->xds_client()->mu_);
1564     lrs_calld->OnInitialRequestSentLocked();
1565   }
1566   lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1567 }
1568 
OnInitialRequestSentLocked()1569 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1570   // Clear the send_message_payload_.
1571   grpc_byte_buffer_destroy(send_message_payload_);
1572   send_message_payload_ = nullptr;
1573   MaybeStartReportingLocked();
1574 }
1575 
OnResponseReceived(void * arg,grpc_error *)1576 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1577     void* arg, grpc_error* /*error*/) {
1578   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1579   bool done;
1580   {
1581     MutexLock lock(&lrs_calld->xds_client()->mu_);
1582     done = lrs_calld->OnResponseReceivedLocked();
1583   }
1584   if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1585 }
1586 
OnResponseReceivedLocked()1587 bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1588   // Empty payload means the call was cancelled.
1589   if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1590     return true;
1591   }
1592   // Read the response.
1593   grpc_byte_buffer_reader bbr;
1594   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1595   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1596   grpc_byte_buffer_reader_destroy(&bbr);
1597   grpc_byte_buffer_destroy(recv_message_payload_);
1598   recv_message_payload_ = nullptr;
1599   // This anonymous lambda is a hack to avoid the usage of goto.
1600   [&]() {
1601     // Parse the response.
1602     bool send_all_clusters = false;
1603     std::set<std::string> new_cluster_names;
1604     grpc_millis new_load_reporting_interval;
1605     grpc_error* parse_error = xds_client()->api_.ParseLrsResponse(
1606         response_slice, &send_all_clusters, &new_cluster_names,
1607         &new_load_reporting_interval);
1608     if (parse_error != GRPC_ERROR_NONE) {
1609       gpr_log(GPR_ERROR,
1610               "[xds_client %p] LRS response parsing failed. error=%s",
1611               xds_client(), grpc_error_string(parse_error));
1612       GRPC_ERROR_UNREF(parse_error);
1613       return;
1614     }
1615     seen_response_ = true;
1616     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1617       gpr_log(
1618           GPR_INFO,
1619           "[xds_client %p] LRS response received, %" PRIuPTR
1620           " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1621           "ms",
1622           xds_client(), new_cluster_names.size(), send_all_clusters,
1623           new_load_reporting_interval);
1624       size_t i = 0;
1625       for (const auto& name : new_cluster_names) {
1626         gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1627                 xds_client(), i++, name.c_str());
1628       }
1629     }
1630     if (new_load_reporting_interval <
1631         GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1632       new_load_reporting_interval =
1633           GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1634       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1635         gpr_log(GPR_INFO,
1636                 "[xds_client %p] Increased load_report_interval to minimum "
1637                 "value %dms",
1638                 xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1639       }
1640     }
1641     // Ignore identical update.
1642     if (send_all_clusters == send_all_clusters_ &&
1643         cluster_names_ == new_cluster_names &&
1644         load_reporting_interval_ == new_load_reporting_interval) {
1645       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1646         gpr_log(GPR_INFO,
1647                 "[xds_client %p] Incoming LRS response identical to current, "
1648                 "ignoring.",
1649                 xds_client());
1650       }
1651       return;
1652     }
1653     // Stop current load reporting (if any) to adopt the new config.
1654     reporter_.reset();
1655     // Record the new config.
1656     send_all_clusters_ = send_all_clusters;
1657     cluster_names_ = std::move(new_cluster_names);
1658     load_reporting_interval_ = new_load_reporting_interval;
1659     // Try starting sending load report.
1660     MaybeStartReportingLocked();
1661   }();
1662   grpc_slice_unref_internal(response_slice);
1663   if (xds_client()->shutting_down_) return true;
1664   // Keep listening for LRS config updates.
1665   grpc_op op;
1666   memset(&op, 0, sizeof(op));
1667   op.op = GRPC_OP_RECV_MESSAGE;
1668   op.data.recv_message.recv_message = &recv_message_payload_;
1669   op.flags = 0;
1670   op.reserved = nullptr;
1671   GPR_ASSERT(call_ != nullptr);
1672   // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1673   const grpc_call_error call_error =
1674       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1675   GPR_ASSERT(GRPC_CALL_OK == call_error);
1676   return false;
1677 }
1678 
OnStatusReceived(void * arg,grpc_error * error)1679 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1680     void* arg, grpc_error* error) {
1681   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1682   {
1683     MutexLock lock(&lrs_calld->xds_client()->mu_);
1684     lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1685   }
1686   lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1687 }
1688 
OnStatusReceivedLocked(grpc_error * error)1689 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1690     grpc_error* error) {
1691   GPR_ASSERT(call_ != nullptr);
1692   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1693     char* status_details = grpc_slice_to_c_string(status_details_);
1694     gpr_log(GPR_INFO,
1695             "[xds_client %p] LRS call status received. Status = %d, details "
1696             "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
1697             xds_client(), status_code_, status_details, chand(), this, call_,
1698             grpc_error_string(error));
1699     gpr_free(status_details);
1700   }
1701   // Ignore status from a stale call.
1702   if (IsCurrentCallOnChannel()) {
1703     GPR_ASSERT(!xds_client()->shutting_down_);
1704     // Try to restart the call.
1705     parent_->OnCallFinishedLocked();
1706   }
1707   GRPC_ERROR_UNREF(error);
1708 }
1709 
IsCurrentCallOnChannel() const1710 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1711   // If the retryable LRS call is null (which only happens when the xds channel
1712   // is shutting down), all the LRS calls are stale.
1713   if (chand()->lrs_calld_ == nullptr) return false;
1714   return this == chand()->lrs_calld_->calld();
1715 }
1716 
1717 //
1718 // XdsClient
1719 //
1720 
1721 namespace {
1722 
GetRequestTimeout()1723 grpc_millis GetRequestTimeout() {
1724   return grpc_channel_args_find_integer(
1725       g_channel_args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
1726       {15000, 0, INT_MAX});
1727 }
1728 
1729 }  // namespace
1730 
XdsClient(grpc_error ** error)1731 XdsClient::XdsClient(grpc_error** error)
1732     : DualRefCounted<XdsClient>(
1733           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
1734                                                                   : nullptr),
1735       request_timeout_(GetRequestTimeout()),
1736       interested_parties_(grpc_pollset_set_create()),
1737       bootstrap_(XdsBootstrap::Create(this, &grpc_xds_client_trace,
1738                                       g_fallback_bootstrap_config, error)),
1739       certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
1740           bootstrap_ == nullptr
1741               ? CertificateProviderStore::PluginDefinitionMap()
1742               : bootstrap_->certificate_providers())),
1743       api_(this, &grpc_xds_client_trace,
1744            bootstrap_ == nullptr ? nullptr : bootstrap_->node()) {
1745   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1746     gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1747   }
1748   if (*error != GRPC_ERROR_NONE) {
1749     gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
1750             this, grpc_error_string(*error));
1751     return;
1752   }
1753   // Create ChannelState object.
1754   chand_ = MakeOrphanable<ChannelState>(
1755       WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
1756 }
1757 
~XdsClient()1758 XdsClient::~XdsClient() {
1759   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1760     gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1761   }
1762   grpc_pollset_set_destroy(interested_parties_);
1763 }
1764 
AddChannelzLinkage(channelz::ChannelNode * parent_channelz_node)1765 void XdsClient::AddChannelzLinkage(
1766     channelz::ChannelNode* parent_channelz_node) {
1767   channelz::ChannelNode* xds_channelz_node =
1768       grpc_channel_get_channelz_node(chand_->channel());
1769   if (xds_channelz_node != nullptr) {
1770     parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
1771   }
1772 }
1773 
RemoveChannelzLinkage(channelz::ChannelNode * parent_channelz_node)1774 void XdsClient::RemoveChannelzLinkage(
1775     channelz::ChannelNode* parent_channelz_node) {
1776   channelz::ChannelNode* xds_channelz_node =
1777       grpc_channel_get_channelz_node(chand_->channel());
1778   if (xds_channelz_node != nullptr) {
1779     parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
1780   }
1781 }
1782 
Orphan()1783 void XdsClient::Orphan() {
1784   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1785     gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1786   }
1787   {
1788     MutexLock lock(g_mu);
1789     if (g_xds_client == this) g_xds_client = nullptr;
1790   }
1791   {
1792     MutexLock lock(&mu_);
1793     shutting_down_ = true;
1794     // Orphan ChannelState object.
1795     chand_.reset();
1796     // We do not clear cluster_map_ and endpoint_map_ if the xds client was
1797     // created by the XdsResolver because the maps contain refs for watchers
1798     // which in turn hold refs to the loadbalancing policies. At this point, it
1799     // is possible for ADS calls to be in progress. Unreffing the loadbalancing
1800     // policies before those calls are done would lead to issues such as
1801     // https://github.com/grpc/grpc/issues/20928.
1802     if (!listener_map_.empty()) {
1803       cluster_map_.clear();
1804       endpoint_map_.clear();
1805     }
1806   }
1807 }
1808 
WatchListenerData(absl::string_view listener_name,std::unique_ptr<ListenerWatcherInterface> watcher)1809 void XdsClient::WatchListenerData(
1810     absl::string_view listener_name,
1811     std::unique_ptr<ListenerWatcherInterface> watcher) {
1812   std::string listener_name_str = std::string(listener_name);
1813   MutexLock lock(&mu_);
1814   ListenerState& listener_state = listener_map_[listener_name_str];
1815   ListenerWatcherInterface* w = watcher.get();
1816   listener_state.watchers[w] = std::move(watcher);
1817   // If we've already received an LDS update, notify the new watcher
1818   // immediately.
1819   if (listener_state.update.has_value()) {
1820     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1821       gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
1822               this, listener_name_str.c_str());
1823     }
1824     w->OnListenerChanged(*listener_state.update);
1825   }
1826   chand_->Subscribe(XdsApi::kLdsTypeUrl, listener_name_str);
1827 }
1828 
CancelListenerDataWatch(absl::string_view listener_name,ListenerWatcherInterface * watcher,bool delay_unsubscription)1829 void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
1830                                         ListenerWatcherInterface* watcher,
1831                                         bool delay_unsubscription) {
1832   MutexLock lock(&mu_);
1833   if (shutting_down_) return;
1834   std::string listener_name_str = std::string(listener_name);
1835   ListenerState& listener_state = listener_map_[listener_name_str];
1836   auto it = listener_state.watchers.find(watcher);
1837   if (it != listener_state.watchers.end()) {
1838     listener_state.watchers.erase(it);
1839     if (listener_state.watchers.empty()) {
1840       listener_map_.erase(listener_name_str);
1841       chand_->Unsubscribe(XdsApi::kLdsTypeUrl, listener_name_str,
1842                           delay_unsubscription);
1843     }
1844   }
1845 }
1846 
WatchRouteConfigData(absl::string_view route_config_name,std::unique_ptr<RouteConfigWatcherInterface> watcher)1847 void XdsClient::WatchRouteConfigData(
1848     absl::string_view route_config_name,
1849     std::unique_ptr<RouteConfigWatcherInterface> watcher) {
1850   std::string route_config_name_str = std::string(route_config_name);
1851   MutexLock lock(&mu_);
1852   RouteConfigState& route_config_state =
1853       route_config_map_[route_config_name_str];
1854   RouteConfigWatcherInterface* w = watcher.get();
1855   route_config_state.watchers[w] = std::move(watcher);
1856   // If we've already received an RDS update, notify the new watcher
1857   // immediately.
1858   if (route_config_state.update.has_value()) {
1859     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1860       gpr_log(GPR_INFO,
1861               "[xds_client %p] returning cached route config data for %s", this,
1862               route_config_name_str.c_str());
1863     }
1864     w->OnRouteConfigChanged(*route_config_state.update);
1865   }
1866   chand_->Subscribe(XdsApi::kRdsTypeUrl, route_config_name_str);
1867 }
1868 
CancelRouteConfigDataWatch(absl::string_view route_config_name,RouteConfigWatcherInterface * watcher,bool delay_unsubscription)1869 void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
1870                                            RouteConfigWatcherInterface* watcher,
1871                                            bool delay_unsubscription) {
1872   MutexLock lock(&mu_);
1873   if (shutting_down_) return;
1874   std::string route_config_name_str = std::string(route_config_name);
1875   RouteConfigState& route_config_state =
1876       route_config_map_[route_config_name_str];
1877   auto it = route_config_state.watchers.find(watcher);
1878   if (it != route_config_state.watchers.end()) {
1879     route_config_state.watchers.erase(it);
1880     if (route_config_state.watchers.empty()) {
1881       route_config_map_.erase(route_config_name_str);
1882       chand_->Unsubscribe(XdsApi::kRdsTypeUrl, route_config_name_str,
1883                           delay_unsubscription);
1884     }
1885   }
1886 }
1887 
WatchClusterData(absl::string_view cluster_name,std::unique_ptr<ClusterWatcherInterface> watcher)1888 void XdsClient::WatchClusterData(
1889     absl::string_view cluster_name,
1890     std::unique_ptr<ClusterWatcherInterface> watcher) {
1891   std::string cluster_name_str = std::string(cluster_name);
1892   MutexLock lock(&mu_);
1893   ClusterState& cluster_state = cluster_map_[cluster_name_str];
1894   ClusterWatcherInterface* w = watcher.get();
1895   cluster_state.watchers[w] = std::move(watcher);
1896   // If we've already received a CDS update, notify the new watcher
1897   // immediately.
1898   if (cluster_state.update.has_value()) {
1899     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1900       gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
1901               this, cluster_name_str.c_str());
1902     }
1903     w->OnClusterChanged(cluster_state.update.value());
1904   }
1905   chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
1906 }
1907 
CancelClusterDataWatch(absl::string_view cluster_name,ClusterWatcherInterface * watcher,bool delay_unsubscription)1908 void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
1909                                        ClusterWatcherInterface* watcher,
1910                                        bool delay_unsubscription) {
1911   MutexLock lock(&mu_);
1912   if (shutting_down_) return;
1913   std::string cluster_name_str = std::string(cluster_name);
1914   ClusterState& cluster_state = cluster_map_[cluster_name_str];
1915   auto it = cluster_state.watchers.find(watcher);
1916   if (it != cluster_state.watchers.end()) {
1917     cluster_state.watchers.erase(it);
1918     if (cluster_state.watchers.empty()) {
1919       cluster_map_.erase(cluster_name_str);
1920       chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str,
1921                           delay_unsubscription);
1922     }
1923   }
1924 }
1925 
WatchEndpointData(absl::string_view eds_service_name,std::unique_ptr<EndpointWatcherInterface> watcher)1926 void XdsClient::WatchEndpointData(
1927     absl::string_view eds_service_name,
1928     std::unique_ptr<EndpointWatcherInterface> watcher) {
1929   std::string eds_service_name_str = std::string(eds_service_name);
1930   MutexLock lock(&mu_);
1931   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1932   EndpointWatcherInterface* w = watcher.get();
1933   endpoint_state.watchers[w] = std::move(watcher);
1934   // If we've already received an EDS update, notify the new watcher
1935   // immediately.
1936   if (endpoint_state.update.has_value()) {
1937     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1938       gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
1939               this, eds_service_name_str.c_str());
1940     }
1941     w->OnEndpointChanged(endpoint_state.update.value());
1942   }
1943   chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
1944 }
1945 
CancelEndpointDataWatch(absl::string_view eds_service_name,EndpointWatcherInterface * watcher,bool delay_unsubscription)1946 void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
1947                                         EndpointWatcherInterface* watcher,
1948                                         bool delay_unsubscription) {
1949   MutexLock lock(&mu_);
1950   if (shutting_down_) return;
1951   std::string eds_service_name_str = std::string(eds_service_name);
1952   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1953   auto it = endpoint_state.watchers.find(watcher);
1954   if (it != endpoint_state.watchers.end()) {
1955     endpoint_state.watchers.erase(it);
1956     if (endpoint_state.watchers.empty()) {
1957       endpoint_map_.erase(eds_service_name_str);
1958       chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str,
1959                           delay_unsubscription);
1960     }
1961   }
1962 }
1963 
AddClusterDropStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name)1964 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
1965     absl::string_view lrs_server, absl::string_view cluster_name,
1966     absl::string_view eds_service_name) {
1967   // TODO(roth): When we add support for direct federation, use the
1968   // server name specified in lrs_server.
1969   auto key =
1970       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1971   MutexLock lock(&mu_);
1972   // We jump through some hoops here to make sure that the absl::string_views
1973   // stored in the XdsClusterDropStats object point to the strings
1974   // in the load_report_map_ key, so that they have the same lifetime.
1975   auto it = load_report_map_
1976                 .emplace(std::make_pair(std::move(key), LoadReportState()))
1977                 .first;
1978   LoadReportState& load_report_state = it->second;
1979   RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
1980   if (load_report_state.drop_stats != nullptr) {
1981     cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
1982   }
1983   if (cluster_drop_stats == nullptr) {
1984     if (load_report_state.drop_stats != nullptr) {
1985       load_report_state.deleted_drop_stats +=
1986           load_report_state.drop_stats->GetSnapshotAndReset();
1987     }
1988     cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
1989         Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
1990         it->first.first /*cluster_name*/,
1991         it->first.second /*eds_service_name*/);
1992     load_report_state.drop_stats = cluster_drop_stats.get();
1993   }
1994   chand_->MaybeStartLrsCall();
1995   return cluster_drop_stats;
1996 }
1997 
RemoveClusterDropStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,XdsClusterDropStats * cluster_drop_stats)1998 void XdsClient::RemoveClusterDropStats(
1999     absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2000     absl::string_view eds_service_name,
2001     XdsClusterDropStats* cluster_drop_stats) {
2002   MutexLock lock(&mu_);
2003   // TODO(roth): When we add support for direct federation, use the
2004   // server name specified in lrs_server.
2005   auto it = load_report_map_.find(
2006       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2007   if (it == load_report_map_.end()) return;
2008   LoadReportState& load_report_state = it->second;
2009   if (load_report_state.drop_stats == cluster_drop_stats) {
2010     // Record final snapshot in deleted_drop_stats, which will be
2011     // added to the next load report.
2012     load_report_state.deleted_drop_stats +=
2013         load_report_state.drop_stats->GetSnapshotAndReset();
2014     load_report_state.drop_stats = nullptr;
2015   }
2016 }
2017 
AddClusterLocalityStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality)2018 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
2019     absl::string_view lrs_server, absl::string_view cluster_name,
2020     absl::string_view eds_service_name,
2021     RefCountedPtr<XdsLocalityName> locality) {
2022   // TODO(roth): When we add support for direct federation, use the
2023   // server name specified in lrs_server.
2024   auto key =
2025       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2026   MutexLock lock(&mu_);
2027   // We jump through some hoops here to make sure that the absl::string_views
2028   // stored in the XdsClusterLocalityStats object point to the strings
2029   // in the load_report_map_ key, so that they have the same lifetime.
2030   auto it = load_report_map_
2031                 .emplace(std::make_pair(std::move(key), LoadReportState()))
2032                 .first;
2033   LoadReportState& load_report_state = it->second;
2034   LoadReportState::LocalityState& locality_state =
2035       load_report_state.locality_stats[locality];
2036   RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
2037   if (locality_state.locality_stats != nullptr) {
2038     cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
2039   }
2040   if (cluster_locality_stats == nullptr) {
2041     if (locality_state.locality_stats != nullptr) {
2042       locality_state.deleted_locality_stats +=
2043           locality_state.locality_stats->GetSnapshotAndReset();
2044     }
2045     cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2046         Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
2047         it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
2048         std::move(locality));
2049     locality_state.locality_stats = cluster_locality_stats.get();
2050   }
2051   chand_->MaybeStartLrsCall();
2052   return cluster_locality_stats;
2053 }
2054 
RemoveClusterLocalityStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,XdsClusterLocalityStats * cluster_locality_stats)2055 void XdsClient::RemoveClusterLocalityStats(
2056     absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2057     absl::string_view eds_service_name,
2058     const RefCountedPtr<XdsLocalityName>& locality,
2059     XdsClusterLocalityStats* cluster_locality_stats) {
2060   MutexLock lock(&mu_);
2061   // TODO(roth): When we add support for direct federation, use the
2062   // server name specified in lrs_server.
2063   auto it = load_report_map_.find(
2064       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2065   if (it == load_report_map_.end()) return;
2066   LoadReportState& load_report_state = it->second;
2067   auto locality_it = load_report_state.locality_stats.find(locality);
2068   if (locality_it == load_report_state.locality_stats.end()) return;
2069   LoadReportState::LocalityState& locality_state = locality_it->second;
2070   if (locality_state.locality_stats == cluster_locality_stats) {
2071     // Record final snapshot in deleted_locality_stats, which will be
2072     // added to the next load report.
2073     locality_state.deleted_locality_stats +=
2074         locality_state.locality_stats->GetSnapshotAndReset();
2075     locality_state.locality_stats = nullptr;
2076   }
2077 }
2078 
ResetBackoff()2079 void XdsClient::ResetBackoff() {
2080   MutexLock lock(&mu_);
2081   if (chand_ != nullptr) {
2082     grpc_channel_reset_connect_backoff(chand_->channel());
2083   }
2084 }
2085 
NotifyOnErrorLocked(grpc_error * error)2086 void XdsClient::NotifyOnErrorLocked(grpc_error* error) {
2087   for (const auto& p : listener_map_) {
2088     const ListenerState& listener_state = p.second;
2089     for (const auto& p : listener_state.watchers) {
2090       p.first->OnError(GRPC_ERROR_REF(error));
2091     }
2092   }
2093   for (const auto& p : route_config_map_) {
2094     const RouteConfigState& route_config_state = p.second;
2095     for (const auto& p : route_config_state.watchers) {
2096       p.first->OnError(GRPC_ERROR_REF(error));
2097     }
2098   }
2099   for (const auto& p : cluster_map_) {
2100     const ClusterState& cluster_state = p.second;
2101     for (const auto& p : cluster_state.watchers) {
2102       p.first->OnError(GRPC_ERROR_REF(error));
2103     }
2104   }
2105   for (const auto& p : endpoint_map_) {
2106     const EndpointState& endpoint_state = p.second;
2107     for (const auto& p : endpoint_state.watchers) {
2108       p.first->OnError(GRPC_ERROR_REF(error));
2109     }
2110   }
2111   GRPC_ERROR_UNREF(error);
2112 }
2113 
BuildLoadReportSnapshotLocked(bool send_all_clusters,const std::set<std::string> & clusters)2114 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
2115     bool send_all_clusters, const std::set<std::string>& clusters) {
2116   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2117     gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
2118   }
2119   XdsApi::ClusterLoadReportMap snapshot_map;
2120   for (auto load_report_it = load_report_map_.begin();
2121        load_report_it != load_report_map_.end();) {
2122     // Cluster key is cluster and EDS service name.
2123     const auto& cluster_key = load_report_it->first;
2124     LoadReportState& load_report = load_report_it->second;
2125     // If the CDS response for a cluster indicates to use LRS but the
2126     // LRS server does not say that it wants reports for this cluster,
2127     // then we'll have stats objects here whose data we're not going to
2128     // include in the load report.  However, we still need to clear out
2129     // the data from the stats objects, so that if the LRS server starts
2130     // asking for the data in the future, we don't incorrectly include
2131     // data from previous reporting intervals in that future report.
2132     const bool record_stats =
2133         send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2134     XdsApi::ClusterLoadReport snapshot;
2135     // Aggregate drop stats.
2136     snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2137     if (load_report.drop_stats != nullptr) {
2138       snapshot.dropped_requests +=
2139           load_report.drop_stats->GetSnapshotAndReset();
2140       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2141         gpr_log(GPR_INFO,
2142                 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
2143                 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2144                 load_report.drop_stats);
2145       }
2146     }
2147     // Aggregate locality stats.
2148     for (auto it = load_report.locality_stats.begin();
2149          it != load_report.locality_stats.end();) {
2150       const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2151       auto& locality_state = it->second;
2152       XdsClusterLocalityStats::Snapshot& locality_snapshot =
2153           snapshot.locality_stats[locality_name];
2154       locality_snapshot = std::move(locality_state.deleted_locality_stats);
2155       if (locality_state.locality_stats != nullptr) {
2156         locality_snapshot +=
2157             locality_state.locality_stats->GetSnapshotAndReset();
2158         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2159           gpr_log(GPR_INFO,
2160                   "[xds_client %p] cluster=%s eds_service_name=%s "
2161                   "locality=%s locality_stats=%p",
2162                   this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2163                   locality_name->AsHumanReadableString().c_str(),
2164                   locality_state.locality_stats);
2165         }
2166       }
2167       // If the only thing left in this entry was final snapshots from
2168       // deleted locality stats objects, remove the entry.
2169       if (locality_state.locality_stats == nullptr) {
2170         it = load_report.locality_stats.erase(it);
2171       } else {
2172         ++it;
2173       }
2174     }
2175     // Compute load report interval.
2176     const grpc_millis now = ExecCtx::Get()->Now();
2177     snapshot.load_report_interval = now - load_report.last_report_time;
2178     load_report.last_report_time = now;
2179     // Record snapshot.
2180     if (record_stats) {
2181       snapshot_map[cluster_key] = std::move(snapshot);
2182     }
2183     // If the only thing left in this entry was final snapshots from
2184     // deleted stats objects, remove the entry.
2185     if (load_report.locality_stats.empty() &&
2186         load_report.drop_stats == nullptr) {
2187       load_report_it = load_report_map_.erase(load_report_it);
2188     } else {
2189       ++load_report_it;
2190     }
2191   }
2192   return snapshot_map;
2193 }
2194 
2195 //
2196 // accessors for global state
2197 //
2198 
XdsClientGlobalInit()2199 void XdsClientGlobalInit() { g_mu = new Mutex; }
2200 
XdsClientGlobalShutdown()2201 void XdsClientGlobalShutdown() {
2202   delete g_mu;
2203   g_mu = nullptr;
2204   gpr_free(g_fallback_bootstrap_config);
2205   g_fallback_bootstrap_config = nullptr;
2206 }
2207 
GetOrCreate(grpc_error ** error)2208 RefCountedPtr<XdsClient> XdsClient::GetOrCreate(grpc_error** error) {
2209   MutexLock lock(g_mu);
2210   if (g_xds_client != nullptr) {
2211     auto xds_client = g_xds_client->RefIfNonZero();
2212     if (xds_client != nullptr) return xds_client;
2213   }
2214   auto xds_client = MakeRefCounted<XdsClient>(error);
2215   g_xds_client = xds_client.get();
2216   return xds_client;
2217 }
2218 
2219 namespace internal {
2220 
SetXdsChannelArgsForTest(grpc_channel_args * args)2221 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
2222   MutexLock lock(g_mu);
2223   g_channel_args = args;
2224 }
2225 
UnsetGlobalXdsClientForTest()2226 void UnsetGlobalXdsClientForTest() {
2227   MutexLock lock(g_mu);
2228   g_xds_client = nullptr;
2229 }
2230 
SetXdsFallbackBootstrapConfig(const char * config)2231 void SetXdsFallbackBootstrapConfig(const char* config) {
2232   MutexLock lock(g_mu);
2233   gpr_free(g_fallback_bootstrap_config);
2234   g_fallback_bootstrap_config = gpr_strdup(config);
2235 }
2236 
2237 }  // namespace internal
2238 
2239 }  // namespace grpc_core
2240