• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <string.h>
24 
25 #include "absl/container/inlined_vector.h"
26 #include "absl/strings/str_format.h"
27 #include "absl/strings/str_join.h"
28 #include "absl/strings/string_view.h"
29 
30 #include <grpc/byte_buffer_reader.h>
31 #include <grpc/grpc.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/time.h>
34 
35 #include "src/core/ext/filters/client_channel/client_channel.h"
36 #include "src/core/ext/filters/client_channel/parse_address.h"
37 #include "src/core/ext/filters/client_channel/server_address.h"
38 #include "src/core/ext/filters/client_channel/service_config.h"
39 #include "src/core/ext/filters/client_channel/xds/xds_api.h"
40 #include "src/core/ext/filters/client_channel/xds/xds_channel.h"
41 #include "src/core/ext/filters/client_channel/xds/xds_channel_args.h"
42 #include "src/core/ext/filters/client_channel/xds/xds_client.h"
43 #include "src/core/ext/filters/client_channel/xds/xds_client_stats.h"
44 #include "src/core/lib/backoff/backoff.h"
45 #include "src/core/lib/channel/channel_args.h"
46 #include "src/core/lib/channel/channel_stack.h"
47 #include "src/core/lib/gpr/string.h"
48 #include "src/core/lib/gprpp/map.h"
49 #include "src/core/lib/gprpp/memory.h"
50 #include "src/core/lib/gprpp/orphanable.h"
51 #include "src/core/lib/gprpp/ref_counted_ptr.h"
52 #include "src/core/lib/gprpp/sync.h"
53 #include "src/core/lib/iomgr/sockaddr.h"
54 #include "src/core/lib/iomgr/sockaddr_utils.h"
55 #include "src/core/lib/iomgr/timer.h"
56 #include "src/core/lib/iomgr/work_serializer.h"
57 #include "src/core/lib/slice/slice_hash_table.h"
58 #include "src/core/lib/slice/slice_internal.h"
59 #include "src/core/lib/slice/slice_string_helpers.h"
60 #include "src/core/lib/surface/call.h"
61 #include "src/core/lib/surface/channel.h"
62 #include "src/core/lib/surface/channel_init.h"
63 #include "src/core/lib/transport/static_metadata.h"
64 
65 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
66 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
67 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
68 #define GRPC_XDS_RECONNECT_JITTER 0.2
69 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
70 
71 namespace grpc_core {
72 
73 TraceFlag grpc_xds_client_trace(false, "xds_client");
74 
75 //
76 // Internal class declarations
77 //
78 
79 // An xds call wrapper that can restart a call upon failure. Holds a ref to
80 // the xds channel. The template parameter is the kind of wrapped xds call.
81 template <typename T>
82 class XdsClient::ChannelState::RetryableCall
83     : public InternallyRefCounted<RetryableCall<T>> {
84  public:
85   explicit RetryableCall(RefCountedPtr<ChannelState> chand);
86 
87   void Orphan() override;
88 
89   void OnCallFinishedLocked();
90 
calld() const91   T* calld() const { return calld_.get(); }
chand() const92   ChannelState* chand() const { return chand_.get(); }
93 
94   bool IsCurrentCallOnChannel() const;
95 
96  private:
97   void StartNewCallLocked();
98   void StartRetryTimerLocked();
99   static void OnRetryTimer(void* arg, grpc_error* error);
100   void OnRetryTimerLocked(grpc_error* error);
101 
102   // The wrapped xds call that talks to the xds server. It's instantiated
103   // every time we start a new call. It's null during call retry backoff.
104   OrphanablePtr<T> calld_;
105   // The owning xds channel.
106   RefCountedPtr<ChannelState> chand_;
107 
108   // Retry state.
109   BackOff backoff_;
110   grpc_timer retry_timer_;
111   grpc_closure on_retry_timer_;
112   bool retry_timer_callback_pending_ = false;
113 
114   bool shutting_down_ = false;
115 };
116 
117 // Contains an ADS call to the xds server.
118 class XdsClient::ChannelState::AdsCallState
119     : public InternallyRefCounted<AdsCallState> {
120  public:
121   // The ctor and dtor should not be used directly.
122   explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
123   ~AdsCallState() override;
124 
125   void Orphan() override;
126 
parent() const127   RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
chand() const128   ChannelState* chand() const { return parent_->chand(); }
xds_client() const129   XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const130   bool seen_response() const { return seen_response_; }
131 
132   void Subscribe(const std::string& type_url, const std::string& name);
133   void Unsubscribe(const std::string& type_url, const std::string& name,
134                    bool delay_unsubscription);
135 
136   bool HasSubscribedResources() const;
137 
138  private:
139   class ResourceState : public InternallyRefCounted<ResourceState> {
140    public:
ResourceState(const std::string & type_url,const std::string & name)141     ResourceState(const std::string& type_url, const std::string& name)
142         : type_url_(type_url), name_(name) {
143       GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
144                         grpc_schedule_on_exec_ctx);
145     }
146 
Orphan()147     void Orphan() override {
148       Finish();
149       Unref();
150     }
151 
Start(RefCountedPtr<AdsCallState> ads_calld)152     void Start(RefCountedPtr<AdsCallState> ads_calld) {
153       if (sent_) return;
154       sent_ = true;
155       ads_calld_ = std::move(ads_calld);
156       Ref().release();
157       timer_pending_ = true;
158       grpc_timer_init(
159           &timer_,
160           ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
161           &timer_callback_);
162     }
163 
Finish()164     void Finish() {
165       if (timer_pending_) {
166         grpc_timer_cancel(&timer_);
167         timer_pending_ = false;
168       }
169     }
170 
171    private:
OnTimer(void * arg,grpc_error * error)172     static void OnTimer(void* arg, grpc_error* error) {
173       ResourceState* self = static_cast<ResourceState*>(arg);
174       GRPC_ERROR_REF(error);  // ref owned by lambda
175       self->ads_calld_->xds_client()->work_serializer_->Run(
176           [self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION);
177     }
178 
OnTimerLocked(grpc_error * error)179     void OnTimerLocked(grpc_error* error) {
180       if (error == GRPC_ERROR_NONE && timer_pending_) {
181         timer_pending_ = false;
182         grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
183             absl::StrFormat(
184                 "timeout obtaining resource {type=%s name=%s} from xds server",
185                 type_url_, name_)
186                 .c_str());
187         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
188           gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
189                   grpc_error_string(watcher_error));
190         }
191         if (type_url_ == XdsApi::kLdsTypeUrl ||
192             type_url_ == XdsApi::kRdsTypeUrl) {
193           ads_calld_->xds_client()->service_config_watcher_->OnError(
194               watcher_error);
195         } else if (type_url_ == XdsApi::kCdsTypeUrl) {
196           ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
197           for (const auto& p : state.watchers) {
198             p.first->OnError(GRPC_ERROR_REF(watcher_error));
199           }
200           GRPC_ERROR_UNREF(watcher_error);
201         } else if (type_url_ == XdsApi::kEdsTypeUrl) {
202           EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
203           for (const auto& p : state.watchers) {
204             p.first->OnError(GRPC_ERROR_REF(watcher_error));
205           }
206           GRPC_ERROR_UNREF(watcher_error);
207         } else {
208           GPR_UNREACHABLE_CODE(return );
209         }
210       }
211       ads_calld_.reset();
212       Unref();
213       GRPC_ERROR_UNREF(error);
214     }
215 
216     const std::string type_url_;
217     const std::string name_;
218 
219     RefCountedPtr<AdsCallState> ads_calld_;
220     bool sent_ = false;
221     bool timer_pending_ = false;
222     grpc_timer timer_;
223     grpc_closure timer_callback_;
224   };
225 
226   struct ResourceTypeState {
~ResourceTypeStategrpc_core::XdsClient::ChannelState::AdsCallState::ResourceTypeState227     ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
228 
229     // Version, nonce, and error for this resource type.
230     std::string version;
231     std::string nonce;
232     grpc_error* error = GRPC_ERROR_NONE;
233 
234     // Subscribed resources of this type.
235     std::map<std::string /* name */, OrphanablePtr<ResourceState>>
236         subscribed_resources;
237   };
238 
239   void SendMessageLocked(const std::string& type_url);
240 
241   void AcceptLdsUpdate(absl::optional<XdsApi::LdsUpdate> lds_update);
242   void AcceptRdsUpdate(absl::optional<XdsApi::RdsUpdate> rds_update);
243   void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
244   void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
245 
246   static void OnRequestSent(void* arg, grpc_error* error);
247   void OnRequestSentLocked(grpc_error* error);
248   static void OnResponseReceived(void* arg, grpc_error* error);
249   void OnResponseReceivedLocked();
250   static void OnStatusReceived(void* arg, grpc_error* error);
251   void OnStatusReceivedLocked(grpc_error* error);
252 
253   bool IsCurrentCallOnChannel() const;
254 
255   std::set<absl::string_view> ResourceNamesForRequest(
256       const std::string& type_url);
257 
258   // The owning RetryableCall<>.
259   RefCountedPtr<RetryableCall<AdsCallState>> parent_;
260 
261   bool sent_initial_message_ = false;
262   bool seen_response_ = false;
263 
264   // Always non-NULL.
265   grpc_call* call_;
266 
267   // recv_initial_metadata
268   grpc_metadata_array initial_metadata_recv_;
269 
270   // send_message
271   grpc_byte_buffer* send_message_payload_ = nullptr;
272   grpc_closure on_request_sent_;
273 
274   // recv_message
275   grpc_byte_buffer* recv_message_payload_ = nullptr;
276   grpc_closure on_response_received_;
277 
278   // recv_trailing_metadata
279   grpc_metadata_array trailing_metadata_recv_;
280   grpc_status_code status_code_;
281   grpc_slice status_details_;
282   grpc_closure on_status_received_;
283 
284   // Resource types for which requests need to be sent.
285   std::set<std::string /*type_url*/> buffered_requests_;
286 
287   // State for each resource type.
288   std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
289 };
290 
291 // Contains an LRS call to the xds server.
292 class XdsClient::ChannelState::LrsCallState
293     : public InternallyRefCounted<LrsCallState> {
294  public:
295   // The ctor and dtor should not be used directly.
296   explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
297   ~LrsCallState() override;
298 
299   void Orphan() override;
300 
301   void MaybeStartReportingLocked();
302 
parent()303   RetryableCall<LrsCallState>* parent() { return parent_.get(); }
chand() const304   ChannelState* chand() const { return parent_->chand(); }
xds_client() const305   XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const306   bool seen_response() const { return seen_response_; }
307 
308  private:
309   // Reports client-side load stats according to a fixed interval.
310   class Reporter : public InternallyRefCounted<Reporter> {
311    public:
Reporter(RefCountedPtr<LrsCallState> parent,grpc_millis report_interval)312     Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
313         : parent_(std::move(parent)), report_interval_(report_interval) {
314       GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
315                         grpc_schedule_on_exec_ctx);
316       GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
317                         grpc_schedule_on_exec_ctx);
318       ScheduleNextReportLocked();
319     }
320 
321     void Orphan() override;
322 
323    private:
324     void ScheduleNextReportLocked();
325     static void OnNextReportTimer(void* arg, grpc_error* error);
326     void OnNextReportTimerLocked(grpc_error* error);
327     void SendReportLocked();
328     static void OnReportDone(void* arg, grpc_error* error);
329     void OnReportDoneLocked(grpc_error* error);
330 
IsCurrentReporterOnCall() const331     bool IsCurrentReporterOnCall() const {
332       return this == parent_->reporter_.get();
333     }
xds_client() const334     XdsClient* xds_client() const { return parent_->xds_client(); }
335 
336     // The owning LRS call.
337     RefCountedPtr<LrsCallState> parent_;
338 
339     // The load reporting state.
340     const grpc_millis report_interval_;
341     bool last_report_counters_were_zero_ = false;
342     bool next_report_timer_callback_pending_ = false;
343     grpc_timer next_report_timer_;
344     grpc_closure on_next_report_timer_;
345     grpc_closure on_report_done_;
346   };
347 
348   static void OnInitialRequestSent(void* arg, grpc_error* error);
349   void OnInitialRequestSentLocked();
350   static void OnResponseReceived(void* arg, grpc_error* error);
351   void OnResponseReceivedLocked();
352   static void OnStatusReceived(void* arg, grpc_error* error);
353   void OnStatusReceivedLocked(grpc_error* error);
354 
355   bool IsCurrentCallOnChannel() const;
356 
357   // The owning RetryableCall<>.
358   RefCountedPtr<RetryableCall<LrsCallState>> parent_;
359   bool seen_response_ = false;
360 
361   // Always non-NULL.
362   grpc_call* call_;
363 
364   // recv_initial_metadata
365   grpc_metadata_array initial_metadata_recv_;
366 
367   // send_message
368   grpc_byte_buffer* send_message_payload_ = nullptr;
369   grpc_closure on_initial_request_sent_;
370 
371   // recv_message
372   grpc_byte_buffer* recv_message_payload_ = nullptr;
373   grpc_closure on_response_received_;
374 
375   // recv_trailing_metadata
376   grpc_metadata_array trailing_metadata_recv_;
377   grpc_status_code status_code_;
378   grpc_slice status_details_;
379   grpc_closure on_status_received_;
380 
381   // Load reporting state.
382   bool send_all_clusters_ = false;
383   std::set<std::string> cluster_names_;  // Asked for by the LRS server.
384   grpc_millis load_reporting_interval_ = 0;
385   OrphanablePtr<Reporter> reporter_;
386 };
387 
388 //
389 // XdsClient::ChannelState::StateWatcher
390 //
391 
392 class XdsClient::ChannelState::StateWatcher
393     : public AsyncConnectivityStateWatcherInterface {
394  public:
StateWatcher(RefCountedPtr<ChannelState> parent)395   explicit StateWatcher(RefCountedPtr<ChannelState> parent)
396       : AsyncConnectivityStateWatcherInterface(
397             parent->xds_client()->work_serializer_),
398         parent_(std::move(parent)) {}
399 
400  private:
OnConnectivityStateChange(grpc_connectivity_state new_state)401   void OnConnectivityStateChange(grpc_connectivity_state new_state) override {
402     if (!parent_->shutting_down_ &&
403         new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
404       // In TRANSIENT_FAILURE.  Notify all watchers of error.
405       gpr_log(GPR_INFO,
406               "[xds_client %p] xds channel in state TRANSIENT_FAILURE",
407               parent_->xds_client());
408       parent_->xds_client()->NotifyOnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
409           "xds channel in TRANSIENT_FAILURE"));
410     }
411   }
412 
413   RefCountedPtr<ChannelState> parent_;
414 };
415 
416 //
417 // XdsClient::ChannelState
418 //
419 
420 namespace {
421 
422 // Returns the channel args for the xds channel.
BuildXdsChannelArgs(const grpc_channel_args & args)423 grpc_channel_args* BuildXdsChannelArgs(const grpc_channel_args& args) {
424   static const char* args_to_remove[] = {
425       // LB policy name, since we want to use the default (pick_first) in
426       // the LB channel.
427       GRPC_ARG_LB_POLICY_NAME,
428       // The service config that contains the LB config. We don't want to
429       // recursively use xds in the LB channel.
430       GRPC_ARG_SERVICE_CONFIG,
431       // The channel arg for the server URI, since that will be different for
432       // the xds channel than for the parent channel.  The client channel
433       // factory will re-add this arg with the right value.
434       GRPC_ARG_SERVER_URI,
435       // The xds channel should use the authority indicated by the target
436       // authority table (see \a ModifyXdsChannelArgs),
437       // as opposed to the authority from the parent channel.
438       GRPC_ARG_DEFAULT_AUTHORITY,
439       // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the xds channel should be
440       // treated as a stand-alone channel and not inherit this argument from the
441       // args of the parent channel.
442       GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
443       // Don't want to pass down channelz node from parent; the balancer
444       // channel will get its own.
445       GRPC_ARG_CHANNELZ_CHANNEL_NODE,
446       // Keepalive interval.  We are explicitly setting our own value below.
447       GRPC_ARG_KEEPALIVE_TIME_MS,
448   };
449   // Channel args to add.
450   absl::InlinedVector<grpc_arg, 3> args_to_add;
451   // Keepalive interval.
452   args_to_add.emplace_back(grpc_channel_arg_integer_create(
453       const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), 5000));
454   // A channel arg indicating that the target is an xds server.
455   // TODO(roth): Once we figure out our fallback and credentials story, decide
456   // whether this is actually needed.  Note that it's currently used by the
457   // fake security connector as well.
458   args_to_add.emplace_back(grpc_channel_arg_integer_create(
459       const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_SERVER), 1));
460   // The parent channel's channelz uuid.
461   channelz::ChannelNode* channelz_node = nullptr;
462   const grpc_arg* arg =
463       grpc_channel_args_find(&args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
464   if (arg != nullptr && arg->type == GRPC_ARG_POINTER &&
465       arg->value.pointer.p != nullptr) {
466     channelz_node = static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
467     args_to_add.emplace_back(
468         channelz::MakeParentUuidArg(channelz_node->uuid()));
469   }
470   // Construct channel args.
471   grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
472       &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
473       args_to_add.size());
474   // Make any necessary modifications for security.
475   return ModifyXdsChannelArgs(new_args);
476 }
477 
478 }  // namespace
479 
ChannelState(RefCountedPtr<XdsClient> xds_client,grpc_channel * channel)480 XdsClient::ChannelState::ChannelState(RefCountedPtr<XdsClient> xds_client,
481                                       grpc_channel* channel)
482     : InternallyRefCounted<ChannelState>(&grpc_xds_client_trace),
483       xds_client_(std::move(xds_client)),
484       channel_(channel) {
485   GPR_ASSERT(channel_ != nullptr);
486   StartConnectivityWatchLocked();
487 }
488 
~ChannelState()489 XdsClient::ChannelState::~ChannelState() {
490   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
491     gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
492             this);
493   }
494   grpc_channel_destroy(channel_);
495 }
496 
Orphan()497 void XdsClient::ChannelState::Orphan() {
498   shutting_down_ = true;
499   CancelConnectivityWatchLocked();
500   ads_calld_.reset();
501   lrs_calld_.reset();
502   Unref(DEBUG_LOCATION, "ChannelState+orphaned");
503 }
504 
ads_calld() const505 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
506     const {
507   return ads_calld_->calld();
508 }
509 
lrs_calld() const510 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
511     const {
512   return lrs_calld_->calld();
513 }
514 
HasActiveAdsCall() const515 bool XdsClient::ChannelState::HasActiveAdsCall() const {
516   return ads_calld_->calld() != nullptr;
517 }
518 
MaybeStartLrsCall()519 void XdsClient::ChannelState::MaybeStartLrsCall() {
520   if (lrs_calld_ != nullptr) return;
521   lrs_calld_.reset(
522       new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
523 }
524 
StopLrsCall()525 void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
526 
StartConnectivityWatchLocked()527 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
528   grpc_channel_element* client_channel_elem =
529       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
530   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
531   watcher_ = new StateWatcher(Ref());
532   grpc_client_channel_start_connectivity_watch(
533       client_channel_elem, GRPC_CHANNEL_IDLE,
534       OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
535 }
536 
CancelConnectivityWatchLocked()537 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
538   grpc_channel_element* client_channel_elem =
539       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
540   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
541   grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
542 }
543 
Subscribe(const std::string & type_url,const std::string & name)544 void XdsClient::ChannelState::Subscribe(const std::string& type_url,
545                                         const std::string& name) {
546   if (ads_calld_ == nullptr) {
547     // Start the ADS call if this is the first request.
548     ads_calld_.reset(new RetryableCall<AdsCallState>(
549         Ref(DEBUG_LOCATION, "ChannelState+ads")));
550     // Note: AdsCallState's ctor will automatically subscribe to all
551     // resources that the XdsClient already has watchers for, so we can
552     // return here.
553     return;
554   }
555   // If the ADS call is in backoff state, we don't need to do anything now
556   // because when the call is restarted it will resend all necessary requests.
557   if (ads_calld() == nullptr) return;
558   // Subscribe to this resource if the ADS call is active.
559   ads_calld()->Subscribe(type_url, name);
560 }
561 
Unsubscribe(const std::string & type_url,const std::string & name,bool delay_unsubscription)562 void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
563                                           const std::string& name,
564                                           bool delay_unsubscription) {
565   if (ads_calld_ != nullptr) {
566     ads_calld_->calld()->Unsubscribe(type_url, name, delay_unsubscription);
567     if (!ads_calld_->calld()->HasSubscribedResources()) ads_calld_.reset();
568   }
569 }
570 
571 //
572 // XdsClient::ChannelState::RetryableCall<>
573 //
574 
575 template <typename T>
RetryableCall(RefCountedPtr<ChannelState> chand)576 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
577     RefCountedPtr<ChannelState> chand)
578     : chand_(std::move(chand)),
579       backoff_(
580           BackOff::Options()
581               .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
582                                    1000)
583               .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
584               .set_jitter(GRPC_XDS_RECONNECT_JITTER)
585               .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
586   // Closure Initialization
587   GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
588                     grpc_schedule_on_exec_ctx);
589   StartNewCallLocked();
590 }
591 
592 template <typename T>
Orphan()593 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
594   shutting_down_ = true;
595   calld_.reset();
596   if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
597   this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
598 }
599 
600 template <typename T>
OnCallFinishedLocked()601 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
602   const bool seen_response = calld_->seen_response();
603   calld_.reset();
604   if (seen_response) {
605     // If we lost connection to the xds server, reset backoff and restart the
606     // call immediately.
607     backoff_.Reset();
608     StartNewCallLocked();
609   } else {
610     // If we failed to connect to the xds server, retry later.
611     StartRetryTimerLocked();
612   }
613 }
614 
615 template <typename T>
StartNewCallLocked()616 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
617   if (shutting_down_) return;
618   GPR_ASSERT(chand_->channel_ != nullptr);
619   GPR_ASSERT(calld_ == nullptr);
620   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
621     gpr_log(GPR_INFO,
622             "[xds_client %p] Start new call from retryable call (chand: %p, "
623             "retryable call: %p)",
624             chand()->xds_client(), chand(), this);
625   }
626   calld_ = MakeOrphanable<T>(
627       this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
628 }
629 
630 template <typename T>
StartRetryTimerLocked()631 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
632   if (shutting_down_) return;
633   const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
634   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
635     grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
636     gpr_log(GPR_INFO,
637             "[xds_client %p] Failed to connect to xds server (chand: %p) "
638             "retry timer will fire in %" PRId64 "ms.",
639             chand()->xds_client(), chand(), timeout);
640   }
641   this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
642   grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
643   retry_timer_callback_pending_ = true;
644 }
645 
646 template <typename T>
OnRetryTimer(void * arg,grpc_error * error)647 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
648     void* arg, grpc_error* error) {
649   RetryableCall* calld = static_cast<RetryableCall*>(arg);
650   GRPC_ERROR_REF(error);  // ref owned by lambda
651   calld->chand_->xds_client()->work_serializer_->Run(
652       [calld, error]() { calld->OnRetryTimerLocked(error); }, DEBUG_LOCATION);
653 }
654 
655 template <typename T>
OnRetryTimerLocked(grpc_error * error)656 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
657     grpc_error* error) {
658   retry_timer_callback_pending_ = false;
659   if (!shutting_down_ && error == GRPC_ERROR_NONE) {
660     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
661       gpr_log(
662           GPR_INFO,
663           "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
664           chand()->xds_client(), chand(), this);
665     }
666     StartNewCallLocked();
667   }
668   this->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
669   GRPC_ERROR_UNREF(error);
670 }
671 
672 //
673 // XdsClient::ChannelState::AdsCallState
674 //
675 
AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent)676 XdsClient::ChannelState::AdsCallState::AdsCallState(
677     RefCountedPtr<RetryableCall<AdsCallState>> parent)
678     : InternallyRefCounted<AdsCallState>(&grpc_xds_client_trace),
679       parent_(std::move(parent)) {
680   // Init the ADS call. Note that the call will progress every time there's
681   // activity in xds_client()->interested_parties_, which is comprised of
682   // the polling entities from client_channel.
683   GPR_ASSERT(xds_client() != nullptr);
684   GPR_ASSERT(!xds_client()->server_name_.empty());
685   // Create a call with the specified method name.
686   call_ = grpc_channel_create_pollset_set_call(
687       chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
688       xds_client()->interested_parties_,
689       GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES,
690       nullptr, GRPC_MILLIS_INF_FUTURE, nullptr);
691   GPR_ASSERT(call_ != nullptr);
692   // Init data associated with the call.
693   grpc_metadata_array_init(&initial_metadata_recv_);
694   grpc_metadata_array_init(&trailing_metadata_recv_);
695   // Start the call.
696   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
697     gpr_log(GPR_INFO,
698             "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
699             "call: %p)",
700             xds_client(), chand(), this, call_);
701   }
702   // Create the ops.
703   grpc_call_error call_error;
704   grpc_op ops[3];
705   memset(ops, 0, sizeof(ops));
706   // Op: send initial metadata.
707   grpc_op* op = ops;
708   op->op = GRPC_OP_SEND_INITIAL_METADATA;
709   op->data.send_initial_metadata.count = 0;
710   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
711               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
712   op->reserved = nullptr;
713   op++;
714   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
715                                                  nullptr);
716   GPR_ASSERT(GRPC_CALL_OK == call_error);
717   // Op: send request message.
718   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
719                     grpc_schedule_on_exec_ctx);
720   if (xds_client()->service_config_watcher_ != nullptr) {
721     Subscribe(XdsApi::kLdsTypeUrl, xds_client()->server_name_);
722     if (xds_client()->lds_result_.has_value() &&
723         !xds_client()->lds_result_->route_config_name.empty()) {
724       Subscribe(XdsApi::kRdsTypeUrl,
725                 xds_client()->lds_result_->route_config_name);
726     }
727   }
728   for (const auto& p : xds_client()->cluster_map_) {
729     Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
730   }
731   for (const auto& p : xds_client()->endpoint_map_) {
732     Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first));
733   }
734   // Op: recv initial metadata.
735   op = ops;
736   op->op = GRPC_OP_RECV_INITIAL_METADATA;
737   op->data.recv_initial_metadata.recv_initial_metadata =
738       &initial_metadata_recv_;
739   op->flags = 0;
740   op->reserved = nullptr;
741   op++;
742   // Op: recv response.
743   op->op = GRPC_OP_RECV_MESSAGE;
744   op->data.recv_message.recv_message = &recv_message_payload_;
745   op->flags = 0;
746   op->reserved = nullptr;
747   op++;
748   Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
749   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
750                     grpc_schedule_on_exec_ctx);
751   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
752                                                  &on_response_received_);
753   GPR_ASSERT(GRPC_CALL_OK == call_error);
754   // Op: recv server status.
755   op = ops;
756   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
757   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
758   op->data.recv_status_on_client.status = &status_code_;
759   op->data.recv_status_on_client.status_details = &status_details_;
760   op->flags = 0;
761   op->reserved = nullptr;
762   op++;
763   // This callback signals the end of the call, so it relies on the initial
764   // ref instead of a new ref. When it's invoked, it's the initial ref that is
765   // unreffed.
766   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
767                     grpc_schedule_on_exec_ctx);
768   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
769                                                  &on_status_received_);
770   GPR_ASSERT(GRPC_CALL_OK == call_error);
771 }
772 
~AdsCallState()773 XdsClient::ChannelState::AdsCallState::~AdsCallState() {
774   grpc_metadata_array_destroy(&initial_metadata_recv_);
775   grpc_metadata_array_destroy(&trailing_metadata_recv_);
776   grpc_byte_buffer_destroy(send_message_payload_);
777   grpc_byte_buffer_destroy(recv_message_payload_);
778   grpc_slice_unref_internal(status_details_);
779   GPR_ASSERT(call_ != nullptr);
780   grpc_call_unref(call_);
781 }
782 
Orphan()783 void XdsClient::ChannelState::AdsCallState::Orphan() {
784   GPR_ASSERT(call_ != nullptr);
785   // If we are here because xds_client wants to cancel the call,
786   // on_status_received_ will complete the cancellation and clean up. Otherwise,
787   // we are here because xds_client has to orphan a failed call, then the
788   // following cancellation will be a no-op.
789   grpc_call_cancel(call_, nullptr);
790   state_map_.clear();
791   // Note that the initial ref is hold by on_status_received_. So the
792   // corresponding unref happens in on_status_received_ instead of here.
793 }
794 
SendMessageLocked(const std::string & type_url)795 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
796     const std::string& type_url) {
797   // Buffer message sending if an existing message is in flight.
798   if (send_message_payload_ != nullptr) {
799     buffered_requests_.insert(type_url);
800     return;
801   }
802   auto& state = state_map_[type_url];
803   grpc_slice request_payload_slice;
804   std::set<absl::string_view> resource_names =
805       ResourceNamesForRequest(type_url);
806   request_payload_slice = xds_client()->api_.CreateAdsRequest(
807       type_url, resource_names, state.version, state.nonce,
808       GRPC_ERROR_REF(state.error), !sent_initial_message_);
809   if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
810       type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
811     state_map_.erase(type_url);
812   }
813   sent_initial_message_ = true;
814   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
815     gpr_log(GPR_INFO,
816             "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
817             "error=%s resources=%s",
818             xds_client(), type_url.c_str(), state.version.c_str(),
819             state.nonce.c_str(), grpc_error_string(state.error),
820             absl::StrJoin(resource_names, " ").c_str());
821   }
822   GRPC_ERROR_UNREF(state.error);
823   state.error = GRPC_ERROR_NONE;
824   // Create message payload.
825   send_message_payload_ =
826       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
827   grpc_slice_unref_internal(request_payload_slice);
828   // Send the message.
829   grpc_op op;
830   memset(&op, 0, sizeof(op));
831   op.op = GRPC_OP_SEND_MESSAGE;
832   op.data.send_message.send_message = send_message_payload_;
833   Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
834   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
835                     grpc_schedule_on_exec_ctx);
836   grpc_call_error call_error =
837       grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
838   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
839     gpr_log(GPR_ERROR,
840             "[xds_client %p] calld=%p call_error=%d sending ADS message",
841             xds_client(), this, call_error);
842     GPR_ASSERT(GRPC_CALL_OK == call_error);
843   }
844 }
845 
Subscribe(const std::string & type_url,const std::string & name)846 void XdsClient::ChannelState::AdsCallState::Subscribe(
847     const std::string& type_url, const std::string& name) {
848   auto& state = state_map_[type_url].subscribed_resources[name];
849   if (state == nullptr) {
850     state = MakeOrphanable<ResourceState>(type_url, name);
851     SendMessageLocked(type_url);
852   }
853 }
854 
Unsubscribe(const std::string & type_url,const std::string & name,bool delay_unsubscription)855 void XdsClient::ChannelState::AdsCallState::Unsubscribe(
856     const std::string& type_url, const std::string& name,
857     bool delay_unsubscription) {
858   state_map_[type_url].subscribed_resources.erase(name);
859   if (!delay_unsubscription) SendMessageLocked(type_url);
860 }
861 
HasSubscribedResources() const862 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
863   for (const auto& p : state_map_) {
864     if (!p.second.subscribed_resources.empty()) return true;
865   }
866   return false;
867 }
868 
AcceptLdsUpdate(absl::optional<XdsApi::LdsUpdate> lds_update)869 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
870     absl::optional<XdsApi::LdsUpdate> lds_update) {
871   if (!lds_update.has_value()) {
872     gpr_log(GPR_INFO,
873             "[xds_client %p] LDS update does not include requested resource",
874             xds_client());
875     if (xds_client()->lds_result_.has_value() &&
876         !xds_client()->lds_result_->route_config_name.empty()) {
877       Unsubscribe(XdsApi::kRdsTypeUrl,
878                   xds_client()->lds_result_->route_config_name,
879                   /*delay_unsubscription=*/false);
880       xds_client()->rds_result_.reset();
881     }
882     xds_client()->lds_result_.reset();
883     xds_client()->service_config_watcher_->OnResourceDoesNotExist();
884     return;
885   }
886   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
887     gpr_log(GPR_INFO,
888             "[xds_client %p] LDS update received: route_config_name=%s",
889             xds_client(),
890             (!lds_update->route_config_name.empty()
891                  ? lds_update->route_config_name.c_str()
892                  : "<inlined>"));
893     if (lds_update->rds_update.has_value()) {
894       gpr_log(GPR_INFO, "RouteConfiguration contains %" PRIuPTR " routes",
895               lds_update->rds_update.value().routes.size());
896       for (size_t i = 0; i < lds_update->rds_update.value().routes.size();
897            ++i) {
898         gpr_log(GPR_INFO, "Route %" PRIuPTR ":\n%s", i,
899                 lds_update->rds_update.value().routes[i].ToString().c_str());
900       }
901     }
902   }
903   auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
904   auto& state = lds_state.subscribed_resources[xds_client()->server_name_];
905   if (state != nullptr) state->Finish();
906   // Ignore identical update.
907   if (xds_client()->lds_result_ == lds_update) {
908     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
909       gpr_log(GPR_INFO,
910               "[xds_client %p] LDS update identical to current, ignoring.",
911               xds_client());
912     }
913     return;
914   }
915   if (xds_client()->lds_result_.has_value() &&
916       !xds_client()->lds_result_->route_config_name.empty()) {
917     Unsubscribe(
918         XdsApi::kRdsTypeUrl, xds_client()->lds_result_->route_config_name,
919         /*delay_unsubscription=*/!lds_update->route_config_name.empty());
920     xds_client()->rds_result_.reset();
921   }
922   xds_client()->lds_result_ = std::move(lds_update);
923   if (xds_client()->lds_result_->rds_update.has_value()) {
924     // If the RouteConfiguration was found inlined in LDS response, notify
925     // the watcher immediately.
926     RefCountedPtr<ServiceConfig> service_config;
927     grpc_error* error = xds_client()->CreateServiceConfig(
928         xds_client()->lds_result_->rds_update.value(), &service_config);
929     if (error == GRPC_ERROR_NONE) {
930       xds_client()->service_config_watcher_->OnServiceConfigChanged(
931           std::move(service_config));
932     } else {
933       xds_client()->service_config_watcher_->OnError(error);
934     }
935   } else {
936     // Send RDS request for dynamic resolution.
937     Subscribe(XdsApi::kRdsTypeUrl,
938               xds_client()->lds_result_->route_config_name);
939   }
940 }
941 
AcceptRdsUpdate(absl::optional<XdsApi::RdsUpdate> rds_update)942 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
943     absl::optional<XdsApi::RdsUpdate> rds_update) {
944   if (!rds_update.has_value()) {
945     gpr_log(GPR_INFO,
946             "[xds_client %p] RDS update does not include requested resource",
947             xds_client());
948     xds_client()->rds_result_.reset();
949     xds_client()->service_config_watcher_->OnResourceDoesNotExist();
950     return;
951   }
952   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
953     gpr_log(GPR_INFO,
954             "[xds_client %p] RDS update received;  RouteConfiguration contains "
955             "%" PRIuPTR " routes",
956             this, rds_update.value().routes.size());
957     for (size_t i = 0; i < rds_update.value().routes.size(); ++i) {
958       gpr_log(GPR_INFO, "Route %" PRIuPTR ":\n%s", i,
959               rds_update.value().routes[i].ToString().c_str());
960     }
961   }
962   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
963   auto& state =
964       rds_state
965           .subscribed_resources[xds_client()->lds_result_->route_config_name];
966   if (state != nullptr) state->Finish();
967   // Ignore identical update.
968   if (xds_client()->rds_result_ == rds_update) {
969     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
970       gpr_log(GPR_INFO,
971               "[xds_client %p] RDS update identical to current, ignoring.",
972               xds_client());
973     }
974     return;
975   }
976   xds_client()->rds_result_ = std::move(rds_update);
977   // Notify the watcher.
978   RefCountedPtr<ServiceConfig> service_config;
979   grpc_error* error = xds_client()->CreateServiceConfig(
980       xds_client()->rds_result_.value(), &service_config);
981   if (error == GRPC_ERROR_NONE) {
982     xds_client()->service_config_watcher_->OnServiceConfigChanged(
983         std::move(service_config));
984   } else {
985     xds_client()->service_config_watcher_->OnError(error);
986   }
987 }
988 
AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map)989 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
990     XdsApi::CdsUpdateMap cds_update_map) {
991   auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
992   std::set<std::string> eds_resource_names_seen;
993   for (auto& p : cds_update_map) {
994     const char* cluster_name = p.first.c_str();
995     XdsApi::CdsUpdate& cds_update = p.second;
996     auto& state = cds_state.subscribed_resources[cluster_name];
997     if (state != nullptr) state->Finish();
998     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
999       gpr_log(GPR_INFO,
1000               "[xds_client %p] CDS update (cluster=%s) received: "
1001               "eds_service_name=%s, lrs_load_reporting_server_name=%s",
1002               xds_client(), cluster_name, cds_update.eds_service_name.c_str(),
1003               cds_update.lrs_load_reporting_server_name.has_value()
1004                   ? cds_update.lrs_load_reporting_server_name.value().c_str()
1005                   : "(N/A)");
1006     }
1007     // Record the EDS resource names seen.
1008     eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
1009                                        ? cluster_name
1010                                        : cds_update.eds_service_name);
1011     // Ignore identical update.
1012     ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1013     if (cluster_state.update.has_value() &&
1014         cds_update.eds_service_name == cluster_state.update->eds_service_name &&
1015         cds_update.lrs_load_reporting_server_name ==
1016             cluster_state.update->lrs_load_reporting_server_name) {
1017       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1018         gpr_log(GPR_INFO,
1019                 "[xds_client %p] CDS update identical to current, ignoring.",
1020                 xds_client());
1021       }
1022       continue;
1023     }
1024     // Update the cluster state.
1025     cluster_state.update = std::move(cds_update);
1026     // Notify all watchers.
1027     for (const auto& p : cluster_state.watchers) {
1028       p.first->OnClusterChanged(cluster_state.update.value());
1029     }
1030   }
1031   // For any subscribed resource that is not present in the update,
1032   // remove it from the cache and notify watchers that it does not exist.
1033   for (const auto& p : cds_state.subscribed_resources) {
1034     const std::string& cluster_name = p.first;
1035     if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
1036       ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1037       cluster_state.update.reset();
1038       for (const auto& p : cluster_state.watchers) {
1039         p.first->OnResourceDoesNotExist();
1040       }
1041     }
1042   }
1043   // For any EDS resource that is no longer referred to by any CDS
1044   // resources, remove it from the cache and notify watchers that it
1045   // does not exist.
1046   auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1047   for (const auto& p : eds_state.subscribed_resources) {
1048     const std::string& eds_resource_name = p.first;
1049     if (eds_resource_names_seen.find(eds_resource_name) ==
1050         eds_resource_names_seen.end()) {
1051       EndpointState& endpoint_state =
1052           xds_client()->endpoint_map_[eds_resource_name];
1053       endpoint_state.update.reset();
1054       for (const auto& p : endpoint_state.watchers) {
1055         p.first->OnResourceDoesNotExist();
1056       }
1057     }
1058   }
1059 }
1060 
AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map)1061 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
1062     XdsApi::EdsUpdateMap eds_update_map) {
1063   auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1064   for (auto& p : eds_update_map) {
1065     const char* eds_service_name = p.first.c_str();
1066     XdsApi::EdsUpdate& eds_update = p.second;
1067     auto& state = eds_state.subscribed_resources[eds_service_name];
1068     if (state != nullptr) state->Finish();
1069     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1070       gpr_log(GPR_INFO,
1071               "[xds_client %p] EDS response with %" PRIuPTR
1072               " priorities and %" PRIuPTR
1073               " drop categories received (drop_all=%d)",
1074               xds_client(), eds_update.priority_list_update.size(),
1075               eds_update.drop_config->drop_category_list().size(),
1076               eds_update.drop_config->drop_all());
1077       for (size_t priority = 0;
1078            priority < eds_update.priority_list_update.size(); ++priority) {
1079         const auto* locality_map_update = eds_update.priority_list_update.Find(
1080             static_cast<uint32_t>(priority));
1081         gpr_log(GPR_INFO,
1082                 "[xds_client %p] Priority %" PRIuPTR " contains %" PRIuPTR
1083                 " localities",
1084                 xds_client(), priority, locality_map_update->size());
1085         size_t locality_count = 0;
1086         for (const auto& p : locality_map_update->localities) {
1087           const auto& locality = p.second;
1088           gpr_log(GPR_INFO,
1089                   "[xds_client %p] Priority %" PRIuPTR ", locality %" PRIuPTR
1090                   " %s has weight %d, contains %" PRIuPTR " server addresses",
1091                   xds_client(), priority, locality_count,
1092                   locality.name->AsHumanReadableString().c_str(),
1093                   locality.lb_weight, locality.serverlist.size());
1094           for (size_t i = 0; i < locality.serverlist.size(); ++i) {
1095             std::string ipport = grpc_sockaddr_to_string(
1096                 &locality.serverlist[i].address(), false);
1097             gpr_log(GPR_INFO,
1098                     "[xds_client %p] Priority %" PRIuPTR ", locality %" PRIuPTR
1099                     " %s, server address %" PRIuPTR ": %s",
1100                     xds_client(), priority, locality_count,
1101                     locality.name->AsHumanReadableString().c_str(), i,
1102                     ipport.c_str());
1103           }
1104           ++locality_count;
1105         }
1106       }
1107       for (size_t i = 0;
1108            i < eds_update.drop_config->drop_category_list().size(); ++i) {
1109         const XdsApi::DropConfig::DropCategory& drop_category =
1110             eds_update.drop_config->drop_category_list()[i];
1111         gpr_log(GPR_INFO,
1112                 "[xds_client %p] Drop category %s has drop rate %d per million",
1113                 xds_client(), drop_category.name.c_str(),
1114                 drop_category.parts_per_million);
1115       }
1116     }
1117     EndpointState& endpoint_state =
1118         xds_client()->endpoint_map_[eds_service_name];
1119     // Ignore identical update.
1120     if (endpoint_state.update.has_value()) {
1121       const XdsApi::EdsUpdate& prev_update = endpoint_state.update.value();
1122       const bool priority_list_changed =
1123           prev_update.priority_list_update != eds_update.priority_list_update;
1124       const bool drop_config_changed =
1125           prev_update.drop_config == nullptr ||
1126           *prev_update.drop_config != *eds_update.drop_config;
1127       if (!priority_list_changed && !drop_config_changed) {
1128         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1129           gpr_log(GPR_INFO,
1130                   "[xds_client %p] EDS update identical to current, ignoring.",
1131                   xds_client());
1132         }
1133         continue;
1134       }
1135     }
1136     // Update the cluster state.
1137     endpoint_state.update = std::move(eds_update);
1138     // Notify all watchers.
1139     for (const auto& p : endpoint_state.watchers) {
1140       p.first->OnEndpointChanged(endpoint_state.update.value());
1141     }
1142   }
1143 }
1144 
OnRequestSent(void * arg,grpc_error * error)1145 void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
1146                                                           grpc_error* error) {
1147   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1148   GRPC_ERROR_REF(error);  // ref owned by lambda
1149   ads_calld->xds_client()->work_serializer_->Run(
1150       [ads_calld, error]() { ads_calld->OnRequestSentLocked(error); },
1151       DEBUG_LOCATION);
1152 }
1153 
OnRequestSentLocked(grpc_error * error)1154 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1155     grpc_error* error) {
1156   if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
1157     // Clean up the sent message.
1158     grpc_byte_buffer_destroy(send_message_payload_);
1159     send_message_payload_ = nullptr;
1160     // Continue to send another pending message if any.
1161     // TODO(roth): The current code to handle buffered messages has the
1162     // advantage of sending only the most recent list of resource names for
1163     // each resource type (no matter how many times that resource type has
1164     // been requested to send while the current message sending is still
1165     // pending). But its disadvantage is that we send the requests in fixed
1166     // order of resource types. We need to fix this if we are seeing some
1167     // resource type(s) starved due to frequent requests of other resource
1168     // type(s).
1169     auto it = buffered_requests_.begin();
1170     if (it != buffered_requests_.end()) {
1171       SendMessageLocked(*it);
1172       buffered_requests_.erase(it);
1173     }
1174   }
1175   Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1176   GRPC_ERROR_UNREF(error);
1177 }
1178 
OnResponseReceived(void * arg,grpc_error *)1179 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1180     void* arg, grpc_error* /* error */) {
1181   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1182   ads_calld->xds_client()->work_serializer_->Run(
1183       [ads_calld]() { ads_calld->OnResponseReceivedLocked(); }, DEBUG_LOCATION);
1184 }
1185 
OnResponseReceivedLocked()1186 void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1187   // Empty payload means the call was cancelled.
1188   if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1189     Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1190     return;
1191   }
1192   // Read the response.
1193   grpc_byte_buffer_reader bbr;
1194   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1195   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1196   grpc_byte_buffer_reader_destroy(&bbr);
1197   grpc_byte_buffer_destroy(recv_message_payload_);
1198   recv_message_payload_ = nullptr;
1199   // TODO(juanlishen): When we convert this to use the xds protocol, the
1200   // balancer will send us a fallback timeout such that we should go into
1201   // fallback mode if we have lost contact with the balancer after a certain
1202   // period of time. We will need to save the timeout value here, and then
1203   // when the balancer call ends, we will need to start a timer for the
1204   // specified period of time, and if the timer fires, we go into fallback
1205   // mode. We will also need to cancel the timer when we receive a serverlist
1206   // from the balancer.
1207   // Parse the response.
1208   absl::optional<XdsApi::LdsUpdate> lds_update;
1209   absl::optional<XdsApi::RdsUpdate> rds_update;
1210   XdsApi::CdsUpdateMap cds_update_map;
1211   XdsApi::EdsUpdateMap eds_update_map;
1212   std::string version;
1213   std::string nonce;
1214   std::string type_url;
1215   // Note that ParseAdsResponse() also validates the response.
1216   grpc_error* parse_error = xds_client()->api_.ParseAdsResponse(
1217       response_slice, xds_client()->server_name_,
1218       ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
1219       ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
1220       ResourceNamesForRequest(XdsApi::kEdsTypeUrl), &lds_update, &rds_update,
1221       &cds_update_map, &eds_update_map, &version, &nonce, &type_url);
1222   grpc_slice_unref_internal(response_slice);
1223   if (type_url.empty()) {
1224     // Ignore unparsable response.
1225     gpr_log(GPR_ERROR,
1226             "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
1227             xds_client(), grpc_error_string(parse_error));
1228     GRPC_ERROR_UNREF(parse_error);
1229   } else {
1230     // Update nonce.
1231     auto& state = state_map_[type_url];
1232     state.nonce = std::move(nonce);
1233     // NACK or ACK the response.
1234     if (parse_error != GRPC_ERROR_NONE) {
1235       GRPC_ERROR_UNREF(state.error);
1236       state.error = parse_error;
1237       // NACK unacceptable update.
1238       gpr_log(GPR_ERROR,
1239               "[xds_client %p] ADS response invalid for resource type %s "
1240               "version %s, will NACK: nonce=%s error=%s",
1241               xds_client(), type_url.c_str(), version.c_str(),
1242               state.nonce.c_str(), grpc_error_string(parse_error));
1243       SendMessageLocked(type_url);
1244     } else {
1245       seen_response_ = true;
1246       // Accept the ADS response according to the type_url.
1247       if (type_url == XdsApi::kLdsTypeUrl) {
1248         AcceptLdsUpdate(std::move(lds_update));
1249       } else if (type_url == XdsApi::kRdsTypeUrl) {
1250         AcceptRdsUpdate(std::move(rds_update));
1251       } else if (type_url == XdsApi::kCdsTypeUrl) {
1252         AcceptCdsUpdate(std::move(cds_update_map));
1253       } else if (type_url == XdsApi::kEdsTypeUrl) {
1254         AcceptEdsUpdate(std::move(eds_update_map));
1255       }
1256       state.version = std::move(version);
1257       // ACK the update.
1258       SendMessageLocked(type_url);
1259       // Start load reporting if needed.
1260       auto& lrs_call = chand()->lrs_calld_;
1261       if (lrs_call != nullptr) {
1262         LrsCallState* lrs_calld = lrs_call->calld();
1263         if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1264       }
1265     }
1266   }
1267   if (xds_client()->shutting_down_) {
1268     Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked+xds_shutdown");
1269     return;
1270   }
1271   // Keep listening for updates.
1272   grpc_op op;
1273   memset(&op, 0, sizeof(op));
1274   op.op = GRPC_OP_RECV_MESSAGE;
1275   op.data.recv_message.recv_message = &recv_message_payload_;
1276   op.flags = 0;
1277   op.reserved = nullptr;
1278   GPR_ASSERT(call_ != nullptr);
1279   // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1280   const grpc_call_error call_error =
1281       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1282   GPR_ASSERT(GRPC_CALL_OK == call_error);
1283 }
1284 
OnStatusReceived(void * arg,grpc_error * error)1285 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1286     void* arg, grpc_error* error) {
1287   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1288   GRPC_ERROR_REF(error);  // ref owned by lambda
1289   ads_calld->xds_client()->work_serializer_->Run(
1290       [ads_calld, error]() { ads_calld->OnStatusReceivedLocked(error); },
1291       DEBUG_LOCATION);
1292 }
1293 
OnStatusReceivedLocked(grpc_error * error)1294 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1295     grpc_error* error) {
1296   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1297     char* status_details = grpc_slice_to_c_string(status_details_);
1298     gpr_log(GPR_INFO,
1299             "[xds_client %p] ADS call status received. Status = %d, details "
1300             "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
1301             xds_client(), status_code_, status_details, chand(), this, call_,
1302             grpc_error_string(error));
1303     gpr_free(status_details);
1304   }
1305   // Ignore status from a stale call.
1306   if (IsCurrentCallOnChannel()) {
1307     // Try to restart the call.
1308     parent_->OnCallFinishedLocked();
1309     // Send error to all watchers.
1310     xds_client()->NotifyOnError(
1311         GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
1312   }
1313   Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1314   GRPC_ERROR_UNREF(error);
1315 }
1316 
IsCurrentCallOnChannel() const1317 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1318   // If the retryable ADS call is null (which only happens when the xds channel
1319   // is shutting down), all the ADS calls are stale.
1320   if (chand()->ads_calld_ == nullptr) return false;
1321   return this == chand()->ads_calld_->calld();
1322 }
1323 
1324 std::set<absl::string_view>
ResourceNamesForRequest(const std::string & type_url)1325 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1326     const std::string& type_url) {
1327   std::set<absl::string_view> resource_names;
1328   auto it = state_map_.find(type_url);
1329   if (it != state_map_.end()) {
1330     for (auto& p : it->second.subscribed_resources) {
1331       resource_names.insert(p.first);
1332       OrphanablePtr<ResourceState>& state = p.second;
1333       state->Start(Ref());
1334     }
1335   }
1336   return resource_names;
1337 }
1338 
1339 //
1340 // XdsClient::ChannelState::LrsCallState::Reporter
1341 //
1342 
Orphan()1343 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1344   if (next_report_timer_callback_pending_) {
1345     grpc_timer_cancel(&next_report_timer_);
1346   }
1347 }
1348 
1349 void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked()1350     ScheduleNextReportLocked() {
1351   const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1352   grpc_timer_init(&next_report_timer_, next_report_time,
1353                   &on_next_report_timer_);
1354   next_report_timer_callback_pending_ = true;
1355 }
1356 
OnNextReportTimer(void * arg,grpc_error * error)1357 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1358     void* arg, grpc_error* error) {
1359   Reporter* self = static_cast<Reporter*>(arg);
1360   GRPC_ERROR_REF(error);  // ref owned by lambda
1361   self->xds_client()->work_serializer_->Run(
1362       [self, error]() { self->OnNextReportTimerLocked(error); },
1363       DEBUG_LOCATION);
1364 }
1365 
OnNextReportTimerLocked(grpc_error * error)1366 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1367     grpc_error* error) {
1368   next_report_timer_callback_pending_ = false;
1369   if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1370     Unref(DEBUG_LOCATION, "Reporter+timer");
1371   } else {
1372     SendReportLocked();
1373   }
1374   GRPC_ERROR_UNREF(error);
1375 }
1376 
1377 namespace {
1378 
LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap & snapshot)1379 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1380   for (const auto& p : snapshot) {
1381     const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1382     for (const auto& q : cluster_snapshot.dropped_requests) {
1383       if (q.second > 0) return false;
1384     }
1385     for (const auto& q : cluster_snapshot.locality_stats) {
1386       const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1387       if (!locality_snapshot.IsZero()) return false;
1388     }
1389   }
1390   return true;
1391 }
1392 
1393 }  // namespace
1394 
SendReportLocked()1395 void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1396   // Construct snapshot from all reported stats.
1397   XdsApi::ClusterLoadReportMap snapshot = xds_client()->BuildLoadReportSnapshot(
1398       parent_->send_all_clusters_, parent_->cluster_names_);
1399   // Skip client load report if the counters were all zero in the last
1400   // report and they are still zero in this one.
1401   const bool old_val = last_report_counters_were_zero_;
1402   last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1403   if (old_val && last_report_counters_were_zero_) {
1404     ScheduleNextReportLocked();
1405     return;
1406   }
1407   // Create a request that contains the snapshot.
1408   grpc_slice request_payload_slice =
1409       xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1410   parent_->send_message_payload_ =
1411       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1412   grpc_slice_unref_internal(request_payload_slice);
1413   // Send the report.
1414   grpc_op op;
1415   memset(&op, 0, sizeof(op));
1416   op.op = GRPC_OP_SEND_MESSAGE;
1417   op.data.send_message.send_message = parent_->send_message_payload_;
1418   grpc_call_error call_error = grpc_call_start_batch_and_execute(
1419       parent_->call_, &op, 1, &on_report_done_);
1420   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1421     gpr_log(GPR_ERROR,
1422             "[xds_client %p] calld=%p call_error=%d sending client load report",
1423             xds_client(), this, call_error);
1424     GPR_ASSERT(GRPC_CALL_OK == call_error);
1425   }
1426 }
1427 
OnReportDone(void * arg,grpc_error * error)1428 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
1429     void* arg, grpc_error* error) {
1430   Reporter* self = static_cast<Reporter*>(arg);
1431   GRPC_ERROR_REF(error);  // ref owned by lambda
1432   self->xds_client()->work_serializer_->Run(
1433       [self, error]() { self->OnReportDoneLocked(error); }, DEBUG_LOCATION);
1434 }
1435 
OnReportDoneLocked(grpc_error * error)1436 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1437     grpc_error* error) {
1438   grpc_byte_buffer_destroy(parent_->send_message_payload_);
1439   parent_->send_message_payload_ = nullptr;
1440   // If there are no more registered stats to report, cancel the call.
1441   if (xds_client()->load_report_map_.empty()) {
1442     parent_->chand()->StopLrsCall();
1443     Unref(DEBUG_LOCATION, "Reporter+report_done+no_more_reporters");
1444     return;
1445   }
1446   if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1447     // If this reporter is no longer the current one on the call, the reason
1448     // might be that it was orphaned for a new one due to config update.
1449     if (!IsCurrentReporterOnCall()) {
1450       parent_->MaybeStartReportingLocked();
1451     }
1452     Unref(DEBUG_LOCATION, "Reporter+report_done");
1453   } else {
1454     ScheduleNextReportLocked();
1455   }
1456   GRPC_ERROR_UNREF(error);
1457 }
1458 
1459 //
1460 // XdsClient::ChannelState::LrsCallState
1461 //
1462 
LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent)1463 XdsClient::ChannelState::LrsCallState::LrsCallState(
1464     RefCountedPtr<RetryableCall<LrsCallState>> parent)
1465     : InternallyRefCounted<LrsCallState>(&grpc_xds_client_trace),
1466       parent_(std::move(parent)) {
1467   // Init the LRS call. Note that the call will progress every time there's
1468   // activity in xds_client()->interested_parties_, which is comprised of
1469   // the polling entities from client_channel.
1470   GPR_ASSERT(xds_client() != nullptr);
1471   GPR_ASSERT(!xds_client()->server_name_.empty());
1472   call_ = grpc_channel_create_pollset_set_call(
1473       chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1474       xds_client()->interested_parties_,
1475       GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS,
1476       nullptr, GRPC_MILLIS_INF_FUTURE, nullptr);
1477   GPR_ASSERT(call_ != nullptr);
1478   // Init the request payload.
1479   grpc_slice request_payload_slice =
1480       xds_client()->api_.CreateLrsInitialRequest(xds_client()->server_name_);
1481   send_message_payload_ =
1482       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1483   grpc_slice_unref_internal(request_payload_slice);
1484   // Init other data associated with the LRS call.
1485   grpc_metadata_array_init(&initial_metadata_recv_);
1486   grpc_metadata_array_init(&trailing_metadata_recv_);
1487   // Start the call.
1488   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1489     gpr_log(GPR_INFO,
1490             "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
1491             "call: %p)",
1492             xds_client(), chand(), this, call_);
1493   }
1494   // Create the ops.
1495   grpc_call_error call_error;
1496   grpc_op ops[3];
1497   memset(ops, 0, sizeof(ops));
1498   // Op: send initial metadata.
1499   grpc_op* op = ops;
1500   op->op = GRPC_OP_SEND_INITIAL_METADATA;
1501   op->data.send_initial_metadata.count = 0;
1502   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
1503               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1504   op->reserved = nullptr;
1505   op++;
1506   // Op: send request message.
1507   GPR_ASSERT(send_message_payload_ != nullptr);
1508   op->op = GRPC_OP_SEND_MESSAGE;
1509   op->data.send_message.send_message = send_message_payload_;
1510   op->flags = 0;
1511   op->reserved = nullptr;
1512   op++;
1513   Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1514   GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
1515                     grpc_schedule_on_exec_ctx);
1516   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1517                                                  &on_initial_request_sent_);
1518   GPR_ASSERT(GRPC_CALL_OK == call_error);
1519   // Op: recv initial metadata.
1520   op = ops;
1521   op->op = GRPC_OP_RECV_INITIAL_METADATA;
1522   op->data.recv_initial_metadata.recv_initial_metadata =
1523       &initial_metadata_recv_;
1524   op->flags = 0;
1525   op->reserved = nullptr;
1526   op++;
1527   // Op: recv response.
1528   op->op = GRPC_OP_RECV_MESSAGE;
1529   op->data.recv_message.recv_message = &recv_message_payload_;
1530   op->flags = 0;
1531   op->reserved = nullptr;
1532   op++;
1533   Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1534   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1535                     grpc_schedule_on_exec_ctx);
1536   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1537                                                  &on_response_received_);
1538   GPR_ASSERT(GRPC_CALL_OK == call_error);
1539   // Op: recv server status.
1540   op = ops;
1541   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1542   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1543   op->data.recv_status_on_client.status = &status_code_;
1544   op->data.recv_status_on_client.status_details = &status_details_;
1545   op->flags = 0;
1546   op->reserved = nullptr;
1547   op++;
1548   // This callback signals the end of the call, so it relies on the initial
1549   // ref instead of a new ref. When it's invoked, it's the initial ref that is
1550   // unreffed.
1551   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1552                     grpc_schedule_on_exec_ctx);
1553   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1554                                                  &on_status_received_);
1555   GPR_ASSERT(GRPC_CALL_OK == call_error);
1556 }
1557 
~LrsCallState()1558 XdsClient::ChannelState::LrsCallState::~LrsCallState() {
1559   grpc_metadata_array_destroy(&initial_metadata_recv_);
1560   grpc_metadata_array_destroy(&trailing_metadata_recv_);
1561   grpc_byte_buffer_destroy(send_message_payload_);
1562   grpc_byte_buffer_destroy(recv_message_payload_);
1563   grpc_slice_unref_internal(status_details_);
1564   GPR_ASSERT(call_ != nullptr);
1565   grpc_call_unref(call_);
1566 }
1567 
Orphan()1568 void XdsClient::ChannelState::LrsCallState::Orphan() {
1569   reporter_.reset();
1570   GPR_ASSERT(call_ != nullptr);
1571   // If we are here because xds_client wants to cancel the call,
1572   // on_status_received_ will complete the cancellation and clean up. Otherwise,
1573   // we are here because xds_client has to orphan a failed call, then the
1574   // following cancellation will be a no-op.
1575   grpc_call_cancel(call_, nullptr);
1576   // Note that the initial ref is hold by on_status_received_. So the
1577   // corresponding unref happens in on_status_received_ instead of here.
1578 }
1579 
MaybeStartReportingLocked()1580 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1581   // Don't start again if already started.
1582   if (reporter_ != nullptr) return;
1583   // Don't start if the previous send_message op (of the initial request or the
1584   // last report of the previous reporter) hasn't completed.
1585   if (send_message_payload_ != nullptr) return;
1586   // Don't start if no LRS response has arrived.
1587   if (!seen_response()) return;
1588   // Don't start if the ADS call hasn't received any valid response. Note that
1589   // this must be the first channel because it is the current channel but its
1590   // ADS call hasn't seen any response.
1591   if (chand()->ads_calld_ == nullptr ||
1592       chand()->ads_calld_->calld() == nullptr ||
1593       !chand()->ads_calld_->calld()->seen_response()) {
1594     return;
1595   }
1596   // Start reporting.
1597   reporter_ = MakeOrphanable<Reporter>(
1598       Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1599 }
1600 
OnInitialRequestSent(void * arg,grpc_error *)1601 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
1602     void* arg, grpc_error* /*error*/) {
1603   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1604   lrs_calld->xds_client()->work_serializer_->Run(
1605       [lrs_calld]() { lrs_calld->OnInitialRequestSentLocked(); },
1606       DEBUG_LOCATION);
1607 }
1608 
OnInitialRequestSentLocked()1609 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1610   // Clear the send_message_payload_.
1611   grpc_byte_buffer_destroy(send_message_payload_);
1612   send_message_payload_ = nullptr;
1613   MaybeStartReportingLocked();
1614   Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1615 }
1616 
OnResponseReceived(void * arg,grpc_error *)1617 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1618     void* arg, grpc_error* /*error*/) {
1619   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1620   lrs_calld->xds_client()->work_serializer_->Run(
1621       [lrs_calld]() { lrs_calld->OnResponseReceivedLocked(); }, DEBUG_LOCATION);
1622 }
1623 
OnResponseReceivedLocked()1624 void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1625   // Empty payload means the call was cancelled.
1626   if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1627     Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1628     return;
1629   }
1630   // Read the response.
1631   grpc_byte_buffer_reader bbr;
1632   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1633   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1634   grpc_byte_buffer_reader_destroy(&bbr);
1635   grpc_byte_buffer_destroy(recv_message_payload_);
1636   recv_message_payload_ = nullptr;
1637   // This anonymous lambda is a hack to avoid the usage of goto.
1638   [&]() {
1639     // Parse the response.
1640     bool send_all_clusters = false;
1641     std::set<std::string> new_cluster_names;
1642     grpc_millis new_load_reporting_interval;
1643     grpc_error* parse_error = xds_client()->api_.ParseLrsResponse(
1644         response_slice, &send_all_clusters, &new_cluster_names,
1645         &new_load_reporting_interval);
1646     if (parse_error != GRPC_ERROR_NONE) {
1647       gpr_log(GPR_ERROR,
1648               "[xds_client %p] LRS response parsing failed. error=%s",
1649               xds_client(), grpc_error_string(parse_error));
1650       GRPC_ERROR_UNREF(parse_error);
1651       return;
1652     }
1653     seen_response_ = true;
1654     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1655       gpr_log(
1656           GPR_INFO,
1657           "[xds_client %p] LRS response received, %" PRIuPTR
1658           " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1659           "ms",
1660           xds_client(), new_cluster_names.size(), send_all_clusters,
1661           new_load_reporting_interval);
1662       size_t i = 0;
1663       for (const auto& name : new_cluster_names) {
1664         gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1665                 xds_client(), i++, name.c_str());
1666       }
1667     }
1668     if (new_load_reporting_interval <
1669         GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1670       new_load_reporting_interval =
1671           GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1672       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1673         gpr_log(GPR_INFO,
1674                 "[xds_client %p] Increased load_report_interval to minimum "
1675                 "value %dms",
1676                 xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1677       }
1678     }
1679     // Ignore identical update.
1680     if (send_all_clusters == send_all_clusters_ &&
1681         cluster_names_ == new_cluster_names &&
1682         load_reporting_interval_ == new_load_reporting_interval) {
1683       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1684         gpr_log(GPR_INFO,
1685                 "[xds_client %p] Incoming LRS response identical to current, "
1686                 "ignoring.",
1687                 xds_client());
1688       }
1689       return;
1690     }
1691     // Stop current load reporting (if any) to adopt the new config.
1692     reporter_.reset();
1693     // Record the new config.
1694     send_all_clusters_ = send_all_clusters;
1695     cluster_names_ = std::move(new_cluster_names);
1696     load_reporting_interval_ = new_load_reporting_interval;
1697     // Try starting sending load report.
1698     MaybeStartReportingLocked();
1699   }();
1700   grpc_slice_unref_internal(response_slice);
1701   if (xds_client()->shutting_down_) {
1702     Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked+xds_shutdown");
1703     return;
1704   }
1705   // Keep listening for LRS config updates.
1706   grpc_op op;
1707   memset(&op, 0, sizeof(op));
1708   op.op = GRPC_OP_RECV_MESSAGE;
1709   op.data.recv_message.recv_message = &recv_message_payload_;
1710   op.flags = 0;
1711   op.reserved = nullptr;
1712   GPR_ASSERT(call_ != nullptr);
1713   // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1714   const grpc_call_error call_error =
1715       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1716   GPR_ASSERT(GRPC_CALL_OK == call_error);
1717 }
1718 
OnStatusReceived(void * arg,grpc_error * error)1719 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1720     void* arg, grpc_error* error) {
1721   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1722   GRPC_ERROR_REF(error);  // ref owned by lambda
1723   lrs_calld->xds_client()->work_serializer_->Run(
1724       [lrs_calld, error]() { lrs_calld->OnStatusReceivedLocked(error); },
1725       DEBUG_LOCATION);
1726 }
1727 
OnStatusReceivedLocked(grpc_error * error)1728 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1729     grpc_error* error) {
1730   GPR_ASSERT(call_ != nullptr);
1731   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1732     char* status_details = grpc_slice_to_c_string(status_details_);
1733     gpr_log(GPR_INFO,
1734             "[xds_client %p] LRS call status received. Status = %d, details "
1735             "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
1736             xds_client(), status_code_, status_details, chand(), this, call_,
1737             grpc_error_string(error));
1738     gpr_free(status_details);
1739   }
1740   // Ignore status from a stale call.
1741   if (IsCurrentCallOnChannel()) {
1742     GPR_ASSERT(!xds_client()->shutting_down_);
1743     // Try to restart the call.
1744     parent_->OnCallFinishedLocked();
1745   }
1746   Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1747   GRPC_ERROR_UNREF(error);
1748 }
1749 
IsCurrentCallOnChannel() const1750 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1751   // If the retryable LRS call is null (which only happens when the xds channel
1752   // is shutting down), all the LRS calls are stale.
1753   if (chand()->lrs_calld_ == nullptr) return false;
1754   return this == chand()->lrs_calld_->calld();
1755 }
1756 
1757 //
1758 // XdsClient
1759 //
1760 
1761 namespace {
1762 
GetRequestTimeout(const grpc_channel_args & args)1763 grpc_millis GetRequestTimeout(const grpc_channel_args& args) {
1764   return grpc_channel_args_find_integer(
1765       &args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
1766       {15000, 0, INT_MAX});
1767 }
1768 
1769 }  // namespace
1770 
XdsClient(std::shared_ptr<WorkSerializer> work_serializer,grpc_pollset_set * interested_parties,absl::string_view server_name,std::unique_ptr<ServiceConfigWatcherInterface> watcher,const grpc_channel_args & channel_args,grpc_error ** error)1771 XdsClient::XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
1772                      grpc_pollset_set* interested_parties,
1773                      absl::string_view server_name,
1774                      std::unique_ptr<ServiceConfigWatcherInterface> watcher,
1775                      const grpc_channel_args& channel_args, grpc_error** error)
1776     : InternallyRefCounted<XdsClient>(&grpc_xds_client_trace),
1777       request_timeout_(GetRequestTimeout(channel_args)),
1778       work_serializer_(std::move(work_serializer)),
1779       interested_parties_(interested_parties),
1780       bootstrap_(
1781           XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
1782       api_(this, &grpc_xds_client_trace,
1783            bootstrap_ == nullptr ? nullptr : bootstrap_->node()),
1784       server_name_(server_name),
1785       service_config_watcher_(std::move(watcher)) {
1786   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1787     gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1788   }
1789   if (*error != GRPC_ERROR_NONE) {
1790     gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
1791             this, grpc_error_string(*error));
1792     return;
1793   }
1794   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1795     gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", this,
1796             bootstrap_->server().server_uri.c_str());
1797   }
1798   grpc_channel_args* new_args = BuildXdsChannelArgs(channel_args);
1799   grpc_channel* channel = CreateXdsChannel(*bootstrap_, *new_args, error);
1800   grpc_channel_args_destroy(new_args);
1801   if (*error != GRPC_ERROR_NONE) {
1802     gpr_log(GPR_ERROR, "[xds_client %p] failed to create xds channel: %s", this,
1803             grpc_error_string(*error));
1804     return;
1805   }
1806   chand_ = MakeOrphanable<ChannelState>(
1807       Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel);
1808   if (service_config_watcher_ != nullptr) {
1809     chand_->Subscribe(XdsApi::kLdsTypeUrl, std::string(server_name));
1810   }
1811 }
1812 
~XdsClient()1813 XdsClient::~XdsClient() {
1814   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1815     gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1816   }
1817 }
1818 
Orphan()1819 void XdsClient::Orphan() {
1820   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1821     gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1822   }
1823   shutting_down_ = true;
1824   chand_.reset();
1825   // We do not clear cluster_map_ and endpoint_map_ if the xds client was
1826   // created by the XdsResolver because the maps contain refs for watchers which
1827   // in turn hold refs to the loadbalancing policies. At this point, it is
1828   // possible for ADS calls to be in progress. Unreffing the loadbalancing
1829   // policies before those calls are done would lead to issues such as
1830   // https://github.com/grpc/grpc/issues/20928.
1831   if (service_config_watcher_ != nullptr) {
1832     cluster_map_.clear();
1833     endpoint_map_.clear();
1834   }
1835   Unref(DEBUG_LOCATION, "XdsClient::Orphan()");
1836 }
1837 
WatchClusterData(absl::string_view cluster_name,std::unique_ptr<ClusterWatcherInterface> watcher)1838 void XdsClient::WatchClusterData(
1839     absl::string_view cluster_name,
1840     std::unique_ptr<ClusterWatcherInterface> watcher) {
1841   std::string cluster_name_str = std::string(cluster_name);
1842   ClusterState& cluster_state = cluster_map_[cluster_name_str];
1843   ClusterWatcherInterface* w = watcher.get();
1844   cluster_state.watchers[w] = std::move(watcher);
1845   // If we've already received an CDS update, notify the new watcher
1846   // immediately.
1847   if (cluster_state.update.has_value()) {
1848     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1849       gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
1850               this, cluster_name_str.c_str());
1851     }
1852     w->OnClusterChanged(cluster_state.update.value());
1853   }
1854   chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
1855 }
1856 
CancelClusterDataWatch(absl::string_view cluster_name,ClusterWatcherInterface * watcher,bool delay_unsubscription)1857 void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
1858                                        ClusterWatcherInterface* watcher,
1859                                        bool delay_unsubscription) {
1860   if (shutting_down_) return;
1861   std::string cluster_name_str = std::string(cluster_name);
1862   ClusterState& cluster_state = cluster_map_[cluster_name_str];
1863   auto it = cluster_state.watchers.find(watcher);
1864   if (it != cluster_state.watchers.end()) {
1865     cluster_state.watchers.erase(it);
1866     if (cluster_state.watchers.empty()) {
1867       cluster_map_.erase(cluster_name_str);
1868       chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str,
1869                           delay_unsubscription);
1870     }
1871   }
1872 }
1873 
WatchEndpointData(absl::string_view eds_service_name,std::unique_ptr<EndpointWatcherInterface> watcher)1874 void XdsClient::WatchEndpointData(
1875     absl::string_view eds_service_name,
1876     std::unique_ptr<EndpointWatcherInterface> watcher) {
1877   std::string eds_service_name_str = std::string(eds_service_name);
1878   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1879   EndpointWatcherInterface* w = watcher.get();
1880   endpoint_state.watchers[w] = std::move(watcher);
1881   // If we've already received an EDS update, notify the new watcher
1882   // immediately.
1883   if (endpoint_state.update.has_value()) {
1884     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1885       gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
1886               this, eds_service_name_str.c_str());
1887     }
1888     w->OnEndpointChanged(endpoint_state.update.value());
1889   }
1890   chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
1891 }
1892 
CancelEndpointDataWatch(absl::string_view eds_service_name,EndpointWatcherInterface * watcher,bool delay_unsubscription)1893 void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
1894                                         EndpointWatcherInterface* watcher,
1895                                         bool delay_unsubscription) {
1896   if (shutting_down_) return;
1897   std::string eds_service_name_str = std::string(eds_service_name);
1898   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1899   auto it = endpoint_state.watchers.find(watcher);
1900   if (it != endpoint_state.watchers.end()) {
1901     endpoint_state.watchers.erase(it);
1902     if (endpoint_state.watchers.empty()) {
1903       endpoint_map_.erase(eds_service_name_str);
1904       chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str,
1905                           delay_unsubscription);
1906     }
1907   }
1908 }
1909 
AddClusterDropStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name)1910 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
1911     absl::string_view lrs_server, absl::string_view cluster_name,
1912     absl::string_view eds_service_name) {
1913   // TODO(roth): When we add support for direct federation, use the
1914   // server name specified in lrs_server.
1915   auto key =
1916       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1917   // We jump through some hoops here to make sure that the absl::string_views
1918   // stored in the XdsClusterDropStats object point to the strings
1919   // in the load_report_map_ key, so that they have the same lifetime.
1920   auto it = load_report_map_
1921                 .emplace(std::make_pair(std::move(key), LoadReportState()))
1922                 .first;
1923   auto cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
1924       Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
1925       it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/);
1926   it->second.drop_stats.insert(cluster_drop_stats.get());
1927   chand_->MaybeStartLrsCall();
1928   return cluster_drop_stats;
1929 }
1930 
RemoveClusterDropStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,XdsClusterDropStats * cluster_drop_stats)1931 void XdsClient::RemoveClusterDropStats(
1932     absl::string_view /*lrs_server*/, absl::string_view cluster_name,
1933     absl::string_view eds_service_name,
1934     XdsClusterDropStats* cluster_drop_stats) {
1935   auto load_report_it = load_report_map_.find(
1936       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
1937   if (load_report_it == load_report_map_.end()) return;
1938   LoadReportState& load_report_state = load_report_it->second;
1939   // TODO(roth): When we add support for direct federation, use the
1940   // server name specified in lrs_server.
1941   auto it = load_report_state.drop_stats.find(cluster_drop_stats);
1942   if (it != load_report_state.drop_stats.end()) {
1943     // Record final drop stats in deleted_drop_stats, which will be
1944     // added to the next load report.
1945     for (const auto& p : cluster_drop_stats->GetSnapshotAndReset()) {
1946       load_report_state.deleted_drop_stats[p.first] += p.second;
1947     }
1948     load_report_state.drop_stats.erase(it);
1949   }
1950 }
1951 
AddClusterLocalityStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality)1952 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
1953     absl::string_view lrs_server, absl::string_view cluster_name,
1954     absl::string_view eds_service_name,
1955     RefCountedPtr<XdsLocalityName> locality) {
1956   // TODO(roth): When we add support for direct federation, use the
1957   // server name specified in lrs_server.
1958   auto key =
1959       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1960   // We jump through some hoops here to make sure that the absl::string_views
1961   // stored in the XdsClusterLocalityStats object point to the strings
1962   // in the load_report_map_ key, so that they have the same lifetime.
1963   auto it = load_report_map_
1964                 .emplace(std::make_pair(std::move(key), LoadReportState()))
1965                 .first;
1966   auto cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
1967       Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
1968       it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
1969       locality);
1970   it->second.locality_stats[std::move(locality)].locality_stats.insert(
1971       cluster_locality_stats.get());
1972   chand_->MaybeStartLrsCall();
1973   return cluster_locality_stats;
1974 }
1975 
RemoveClusterLocalityStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,XdsClusterLocalityStats * cluster_locality_stats)1976 void XdsClient::RemoveClusterLocalityStats(
1977     absl::string_view /*lrs_server*/, absl::string_view cluster_name,
1978     absl::string_view eds_service_name,
1979     const RefCountedPtr<XdsLocalityName>& locality,
1980     XdsClusterLocalityStats* cluster_locality_stats) {
1981   auto load_report_it = load_report_map_.find(
1982       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
1983   if (load_report_it == load_report_map_.end()) return;
1984   LoadReportState& load_report_state = load_report_it->second;
1985   // TODO(roth): When we add support for direct federation, use the
1986   // server name specified in lrs_server.
1987   auto locality_it = load_report_state.locality_stats.find(locality);
1988   if (locality_it == load_report_state.locality_stats.end()) return;
1989   auto& locality_set = locality_it->second.locality_stats;
1990   auto it = locality_set.find(cluster_locality_stats);
1991   if (it != locality_set.end()) {
1992     // Record final snapshot in deleted_locality_stats, which will be
1993     // added to the next load report.
1994     locality_it->second.deleted_locality_stats.emplace_back(
1995         cluster_locality_stats->GetSnapshotAndReset());
1996     locality_set.erase(it);
1997   }
1998 }
1999 
ResetBackoff()2000 void XdsClient::ResetBackoff() {
2001   if (chand_ != nullptr) {
2002     grpc_channel_reset_connect_backoff(chand_->channel());
2003   }
2004 }
2005 
2006 namespace {
CreateServiceConfigActionCluster(const std::string & cluster_name)2007 std::string CreateServiceConfigActionCluster(const std::string& cluster_name) {
2008   return absl::StrFormat(
2009       "      \"cds:%s\":{\n"
2010       "        \"childPolicy\":[ {\n"
2011       "          \"cds_experimental\":{\n"
2012       "            \"cluster\": \"%s\"\n"
2013       "          }\n"
2014       "        } ]\n"
2015       "       }",
2016       cluster_name, cluster_name);
2017 }
2018 
CreateServiceConfigRoute(const std::string & action_name,const XdsApi::RdsUpdate::RdsRoute & route)2019 std::string CreateServiceConfigRoute(const std::string& action_name,
2020                                      const XdsApi::RdsUpdate::RdsRoute& route) {
2021   std::vector<std::string> headers;
2022   for (const auto& header : route.matchers.header_matchers) {
2023     std::string header_matcher;
2024     switch (header.type) {
2025       case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
2026           HeaderMatcherType::EXACT:
2027         header_matcher = absl::StrFormat("             \"exact_match\": \"%s\"",
2028                                          header.string_matcher);
2029         break;
2030       case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
2031           HeaderMatcherType::REGEX:
2032         header_matcher = absl::StrFormat("             \"regex_match\": \"%s\"",
2033                                          header.regex_match->pattern());
2034         break;
2035       case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
2036           HeaderMatcherType::RANGE:
2037         header_matcher = absl::StrFormat(
2038             "             \"range_match\":{\n"
2039             "              \"start\":%d,\n"
2040             "              \"end\":%d\n"
2041             "             }",
2042             header.range_start, header.range_end);
2043         break;
2044       case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
2045           HeaderMatcherType::PRESENT:
2046         header_matcher =
2047             absl::StrFormat("             \"present_match\": %s",
2048                             header.present_match ? "true" : "false");
2049         break;
2050       case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
2051           HeaderMatcherType::PREFIX:
2052         header_matcher = absl::StrFormat(
2053             "             \"prefix_match\": \"%s\"", header.string_matcher);
2054         break;
2055       case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
2056           HeaderMatcherType::SUFFIX:
2057         header_matcher = absl::StrFormat(
2058             "             \"suffix_match\": \"%s\"", header.string_matcher);
2059         break;
2060       default:
2061         break;
2062     }
2063     std::vector<std::string> header_parts;
2064     header_parts.push_back(
2065         absl::StrFormat("           { \n"
2066                         "             \"name\": \"%s\",\n",
2067                         header.name));
2068     header_parts.push_back(header_matcher);
2069     if (header.invert_match) {
2070       header_parts.push_back(
2071           absl::StrFormat(",\n"
2072                           "             \"invert_match\": true"));
2073     }
2074     header_parts.push_back(
2075         absl::StrFormat("\n"
2076                         "           }"));
2077     headers.push_back(absl::StrJoin(header_parts, ""));
2078   }
2079   std::vector<std::string> headers_service_config;
2080   if (!headers.empty()) {
2081     headers_service_config.push_back("\"headers\":[\n");
2082     headers_service_config.push_back(absl::StrJoin(headers, ","));
2083     headers_service_config.push_back("           ],\n");
2084   }
2085   std::string path_match_str;
2086   switch (route.matchers.path_matcher.type) {
2087     case XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::PathMatcherType::
2088         PREFIX:
2089       path_match_str = absl::StrFormat(
2090           "\"prefix\": \"%s\",\n", route.matchers.path_matcher.string_matcher);
2091       break;
2092     case XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::PathMatcherType::
2093         PATH:
2094       path_match_str = absl::StrFormat(
2095           "\"path\": \"%s\",\n", route.matchers.path_matcher.string_matcher);
2096       break;
2097     case XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::PathMatcherType::
2098         REGEX:
2099       path_match_str =
2100           absl::StrFormat("\"regex\": \"%s\",\n",
2101                           route.matchers.path_matcher.regex_matcher->pattern());
2102       break;
2103   }
2104   return absl::StrFormat(
2105       "      { \n"
2106       "           %s"
2107       "           %s"
2108       "           %s"
2109       "           \"action\": \"%s\"\n"
2110       "      }",
2111       path_match_str, absl::StrJoin(headers_service_config, ""),
2112       route.matchers.fraction_per_million.has_value()
2113           ? absl::StrFormat("\"match_fraction\":%d,\n",
2114                             route.matchers.fraction_per_million.value())
2115           : "",
2116       action_name);
2117 }
2118 
2119 // Create the service config for one weighted cluster.
CreateServiceConfigActionWeightedCluster(const std::string & name,const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight> & clusters)2120 std::string CreateServiceConfigActionWeightedCluster(
2121     const std::string& name,
2122     const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>& clusters) {
2123   std::vector<std::string> config_parts;
2124   config_parts.push_back(
2125       absl::StrFormat("      \"weighted:%s\":{\n"
2126                       "        \"childPolicy\":[ {\n"
2127                       "          \"weighted_target_experimental\":{\n"
2128                       "            \"targets\":{\n",
2129                       name));
2130   std::vector<std::string> weighted_targets;
2131   weighted_targets.reserve(clusters.size());
2132   for (const auto& cluster_weight : clusters) {
2133     weighted_targets.push_back(absl::StrFormat(
2134         "              \"%s\":{\n"
2135         "                \"weight\":%d,\n"
2136         "                \"childPolicy\":[ {\n"
2137         "                  \"cds_experimental\":{\n"
2138         "                    \"cluster\": \"%s\"\n"
2139         "                  }\n"
2140         "                } ]\n"
2141         "               }",
2142         cluster_weight.name, cluster_weight.weight, cluster_weight.name));
2143   }
2144   config_parts.push_back(absl::StrJoin(weighted_targets, ",\n"));
2145   config_parts.push_back(
2146       "            }\n"
2147       "          }\n"
2148       "        } ]\n"
2149       "       }");
2150   return absl::StrJoin(config_parts, "");
2151 }
2152 
2153 struct WeightedClustersKeys {
2154   std::string cluster_names_key;
2155   std::string cluster_weights_key;
2156 };
2157 
2158 // Returns the cluster names and weights key or the cluster names only key.
GetWeightedClustersKey(const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight> & weighted_clusters)2159 WeightedClustersKeys GetWeightedClustersKey(
2160     const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>&
2161         weighted_clusters) {
2162   std::set<std::string> cluster_names;
2163   std::set<std::string> cluster_weights;
2164   for (const auto& cluster_weight : weighted_clusters) {
2165     cluster_names.emplace(absl::StrFormat("%s", cluster_weight.name));
2166     cluster_weights.emplace(
2167         absl::StrFormat("%s_%d", cluster_weight.name, cluster_weight.weight));
2168   }
2169   return {absl::StrJoin(cluster_names, "_"),
2170           absl::StrJoin(cluster_weights, "_")};
2171 }
2172 
2173 }  // namespace
2174 
WeightedClustersActionName(const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight> & weighted_clusters)2175 std::string XdsClient::WeightedClustersActionName(
2176     const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>&
2177         weighted_clusters) {
2178   WeightedClustersKeys keys = GetWeightedClustersKey(weighted_clusters);
2179   auto cluster_names_map_it =
2180       weighted_cluster_index_map_.find(keys.cluster_names_key);
2181   GPR_ASSERT(cluster_names_map_it != weighted_cluster_index_map_.end());
2182   const auto& cluster_weights_map =
2183       cluster_names_map_it->second.cluster_weights_map;
2184   auto cluster_weights_map_it =
2185       cluster_weights_map.find(keys.cluster_weights_key);
2186   GPR_ASSERT(cluster_weights_map_it != cluster_weights_map.end());
2187   return absl::StrFormat("%s_%d", keys.cluster_names_key,
2188                          cluster_weights_map_it->second);
2189 }
2190 
UpdateWeightedClusterIndexMap(const XdsApi::RdsUpdate & rds_update)2191 void XdsClient::UpdateWeightedClusterIndexMap(
2192     const XdsApi::RdsUpdate& rds_update) {
2193   // Construct a list of unique WeightedCluster
2194   // actions which we need to process: to find action names
2195   std::map<std::string /* cluster_weights_key */,
2196            std::string /* cluster_names_key */>
2197       actions_to_process;
2198   for (const auto& route : rds_update.routes) {
2199     if (!route.weighted_clusters.empty()) {
2200       WeightedClustersKeys keys =
2201           GetWeightedClustersKey(route.weighted_clusters);
2202       auto action_it = actions_to_process.find(keys.cluster_weights_key);
2203       if (action_it == actions_to_process.end()) {
2204         actions_to_process[std::move(keys.cluster_weights_key)] =
2205             std::move(keys.cluster_names_key);
2206       }
2207     }
2208   }
2209   // First pass of all unique WeightedCluster actions: if the exact same
2210   // weighted target policy (same clusters and weights) appears in the old map,
2211   // then that old action name is taken again and should be moved to the new
2212   // map; any other action names from the old set of actions are candidates for
2213   // reuse.
2214   XdsClient::WeightedClusterIndexMap new_weighted_cluster_index_map;
2215   for (auto action_it = actions_to_process.begin();
2216        action_it != actions_to_process.end();) {
2217     const std::string& cluster_names_key = action_it->second;
2218     const std::string& cluster_weights_key = action_it->first;
2219     auto old_cluster_names_map_it =
2220         weighted_cluster_index_map_.find(cluster_names_key);
2221     if (old_cluster_names_map_it != weighted_cluster_index_map_.end()) {
2222       // Add cluster_names_key to the new map and copy next_index.
2223       auto& new_cluster_names_info =
2224           new_weighted_cluster_index_map[cluster_names_key];
2225       new_cluster_names_info.next_index =
2226           old_cluster_names_map_it->second.next_index;
2227       // Lookup cluster_weights_key in old map.
2228       auto& old_cluster_weights_map =
2229           old_cluster_names_map_it->second.cluster_weights_map;
2230       auto old_cluster_weights_map_it =
2231           old_cluster_weights_map.find(cluster_weights_key);
2232       if (old_cluster_weights_map_it != old_cluster_weights_map.end()) {
2233         // same policy found, move from old map to new map.
2234         new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
2235             old_cluster_weights_map_it->second;
2236         old_cluster_weights_map.erase(old_cluster_weights_map_it);
2237         // This action has been added to new map, so no need to process it
2238         // again.
2239         action_it = actions_to_process.erase(action_it);
2240         continue;
2241       }
2242     }
2243     ++action_it;
2244   }
2245   // Second pass of all remaining unique WeightedCluster actions: if clusters
2246   // for a new action are the same as an old unused action, reuse the name.  If
2247   // clusters differ, use a brand new name.
2248   for (const auto& action : actions_to_process) {
2249     const std::string& cluster_names_key = action.second;
2250     const std::string& cluster_weights_key = action.first;
2251     auto& new_cluster_names_info =
2252         new_weighted_cluster_index_map[cluster_names_key];
2253     auto& old_cluster_weights_map =
2254         weighted_cluster_index_map_[cluster_names_key].cluster_weights_map;
2255     auto old_cluster_weights_it = old_cluster_weights_map.begin();
2256     if (old_cluster_weights_it != old_cluster_weights_map.end()) {
2257       // There is something to reuse: this action uses the same set
2258       // of clusters as a previous action and that action name is not
2259       // already taken.
2260       new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
2261           old_cluster_weights_it->second;
2262       // Remove the name from being able to reuse again.
2263       old_cluster_weights_map.erase(old_cluster_weights_it);
2264     } else {
2265       // There is nothing to reuse, take the next index to use and
2266       // increment.
2267       new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
2268           new_cluster_names_info.next_index++;
2269     }
2270   }
2271   weighted_cluster_index_map_ = std::move(new_weighted_cluster_index_map);
2272 }
2273 
CreateServiceConfig(const XdsApi::RdsUpdate & rds_update,RefCountedPtr<ServiceConfig> * service_config)2274 grpc_error* XdsClient::CreateServiceConfig(
2275     const XdsApi::RdsUpdate& rds_update,
2276     RefCountedPtr<ServiceConfig>* service_config) {
2277   UpdateWeightedClusterIndexMap(rds_update);
2278   std::vector<std::string> actions_vector;
2279   std::vector<std::string> route_table;
2280   std::set<std::string> actions_set;
2281   for (const auto& route : rds_update.routes) {
2282     const std::string action_name =
2283         route.weighted_clusters.empty()
2284             ? route.cluster_name
2285             : WeightedClustersActionName(route.weighted_clusters);
2286     if (actions_set.find(action_name) == actions_set.end()) {
2287       actions_set.emplace(action_name);
2288       actions_vector.push_back(
2289           route.weighted_clusters.empty()
2290               ? CreateServiceConfigActionCluster(action_name)
2291               : CreateServiceConfigActionWeightedCluster(
2292                     action_name, route.weighted_clusters));
2293     }
2294     route_table.push_back(CreateServiceConfigRoute(
2295         absl::StrFormat("%s:%s",
2296                         route.weighted_clusters.empty() ? "cds" : "weighted",
2297                         action_name),
2298         route));
2299   }
2300   std::vector<std::string> config_parts;
2301   config_parts.push_back(
2302       "{\n"
2303       "  \"loadBalancingConfig\":[\n"
2304       "    { \"xds_routing_experimental\":{\n"
2305       "      \"actions\":{\n");
2306   config_parts.push_back(absl::StrJoin(actions_vector, ",\n"));
2307   config_parts.push_back(
2308       "    },\n"
2309       "      \"routes\":[\n");
2310   config_parts.push_back(absl::StrJoin(route_table, ",\n"));
2311   config_parts.push_back(
2312       "    ]\n"
2313       "    } }\n"
2314       "  ]\n"
2315       "}");
2316   std::string json = absl::StrJoin(config_parts, "");
2317   grpc_error* error = GRPC_ERROR_NONE;
2318   *service_config = ServiceConfig::Create(json.c_str(), &error);
2319   return error;
2320 }
2321 
BuildLoadReportSnapshot(bool send_all_clusters,const std::set<std::string> & clusters)2322 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshot(
2323     bool send_all_clusters, const std::set<std::string>& clusters) {
2324   XdsApi::ClusterLoadReportMap snapshot_map;
2325   for (auto load_report_it = load_report_map_.begin();
2326        load_report_it != load_report_map_.end();) {
2327     // Cluster key is cluster and EDS service name.
2328     const auto& cluster_key = load_report_it->first;
2329     LoadReportState& load_report = load_report_it->second;
2330     // If the CDS response for a cluster indicates to use LRS but the
2331     // LRS server does not say that it wants reports for this cluster,
2332     // then we'll have stats objects here whose data we're not going to
2333     // include in the load report.  However, we still need to clear out
2334     // the data from the stats objects, so that if the LRS server starts
2335     // asking for the data in the future, we don't incorrectly include
2336     // data from previous reporting intervals in that future report.
2337     const bool record_stats =
2338         send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2339     XdsApi::ClusterLoadReport snapshot;
2340     // Aggregate drop stats.
2341     snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2342     for (auto& drop_stats : load_report.drop_stats) {
2343       for (const auto& p : drop_stats->GetSnapshotAndReset()) {
2344         snapshot.dropped_requests[p.first] += p.second;
2345       }
2346     }
2347     // Aggregate locality stats.
2348     for (auto it = load_report.locality_stats.begin();
2349          it != load_report.locality_stats.end();) {
2350       const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2351       auto& locality_state = it->second;
2352       XdsClusterLocalityStats::Snapshot& locality_snapshot =
2353           snapshot.locality_stats[locality_name];
2354       for (auto& locality_stats : locality_state.locality_stats) {
2355         locality_snapshot += locality_stats->GetSnapshotAndReset();
2356       }
2357       // Add final snapshots from recently deleted locality stats objects.
2358       for (auto& deleted_locality_stats :
2359            locality_state.deleted_locality_stats) {
2360         locality_snapshot += deleted_locality_stats;
2361       }
2362       locality_state.deleted_locality_stats.clear();
2363       // If the only thing left in this entry was final snapshots from
2364       // deleted locality stats objects, remove the entry.
2365       if (locality_state.locality_stats.empty()) {
2366         it = load_report.locality_stats.erase(it);
2367       } else {
2368         ++it;
2369       }
2370     }
2371     if (record_stats) {
2372       // Compute load report interval.
2373       const grpc_millis now = ExecCtx::Get()->Now();
2374       snapshot.load_report_interval = now - load_report.last_report_time;
2375       load_report.last_report_time = now;
2376       // Record snapshot.
2377       snapshot_map[cluster_key] = std::move(snapshot);
2378     }
2379     // If the only thing left in this entry was final snapshots from
2380     // deleted stats objects, remove the entry.
2381     if (load_report.locality_stats.empty() && load_report.drop_stats.empty()) {
2382       load_report_it = load_report_map_.erase(load_report_it);
2383     } else {
2384       ++load_report_it;
2385     }
2386   }
2387   return snapshot_map;
2388 }
2389 
NotifyOnError(grpc_error * error)2390 void XdsClient::NotifyOnError(grpc_error* error) {
2391   if (service_config_watcher_ != nullptr) {
2392     service_config_watcher_->OnError(GRPC_ERROR_REF(error));
2393   }
2394   for (const auto& p : cluster_map_) {
2395     const ClusterState& cluster_state = p.second;
2396     for (const auto& p : cluster_state.watchers) {
2397       p.first->OnError(GRPC_ERROR_REF(error));
2398     }
2399   }
2400   for (const auto& p : endpoint_map_) {
2401     const EndpointState& endpoint_state = p.second;
2402     for (const auto& p : endpoint_state.watchers) {
2403       p.first->OnError(GRPC_ERROR_REF(error));
2404     }
2405   }
2406   GRPC_ERROR_UNREF(error);
2407 }
2408 
ChannelArgCopy(void * p)2409 void* XdsClient::ChannelArgCopy(void* p) {
2410   XdsClient* xds_client = static_cast<XdsClient*>(p);
2411   xds_client->Ref(DEBUG_LOCATION, "channel arg").release();
2412   return p;
2413 }
2414 
ChannelArgDestroy(void * p)2415 void XdsClient::ChannelArgDestroy(void* p) {
2416   XdsClient* xds_client = static_cast<XdsClient*>(p);
2417   xds_client->Unref(DEBUG_LOCATION, "channel arg");
2418 }
2419 
ChannelArgCmp(void * p,void * q)2420 int XdsClient::ChannelArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
2421 
2422 const grpc_arg_pointer_vtable XdsClient::kXdsClientVtable = {
2423     XdsClient::ChannelArgCopy, XdsClient::ChannelArgDestroy,
2424     XdsClient::ChannelArgCmp};
2425 
MakeChannelArg() const2426 grpc_arg XdsClient::MakeChannelArg() const {
2427   return grpc_channel_arg_pointer_create(const_cast<char*>(GRPC_ARG_XDS_CLIENT),
2428                                          const_cast<XdsClient*>(this),
2429                                          &XdsClient::kXdsClientVtable);
2430 }
2431 
GetFromChannelArgs(const grpc_channel_args & args)2432 RefCountedPtr<XdsClient> XdsClient::GetFromChannelArgs(
2433     const grpc_channel_args& args) {
2434   XdsClient* xds_client =
2435       grpc_channel_args_find_pointer<XdsClient>(&args, GRPC_ARG_XDS_CLIENT);
2436   if (xds_client != nullptr) return xds_client->Ref();
2437   return nullptr;
2438 }
2439 
RemoveFromChannelArgs(const grpc_channel_args & args)2440 grpc_channel_args* XdsClient::RemoveFromChannelArgs(
2441     const grpc_channel_args& args) {
2442   const char* arg_name = GRPC_ARG_XDS_CLIENT;
2443   return grpc_channel_args_copy_and_remove(&args, &arg_name, 1);
2444 }
2445 
2446 }  // namespace grpc_core
2447