1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <inttypes.h>
20 #include <limits.h>
21 #include <string.h>
22
23 #include "absl/container/inlined_vector.h"
24 #include "absl/strings/str_format.h"
25 #include "absl/strings/str_join.h"
26 #include "absl/strings/string_view.h"
27
28 #include <grpc/byte_buffer_reader.h>
29 #include <grpc/grpc.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/time.h>
32
33 #include "src/core/ext/filters/client_channel/client_channel.h"
34 #include "src/core/ext/filters/client_channel/service_config.h"
35 #include "src/core/ext/xds/xds_api.h"
36 #include "src/core/ext/xds/xds_bootstrap.h"
37 #include "src/core/ext/xds/xds_channel_args.h"
38 #include "src/core/ext/xds/xds_client.h"
39 #include "src/core/ext/xds/xds_client_stats.h"
40 #include "src/core/ext/xds/xds_http_filters.h"
41 #include "src/core/lib/address_utils/sockaddr_utils.h"
42 #include "src/core/lib/backoff/backoff.h"
43 #include "src/core/lib/channel/channel_args.h"
44 #include "src/core/lib/channel/channel_stack.h"
45 #include "src/core/lib/gpr/env.h"
46 #include "src/core/lib/gpr/string.h"
47 #include "src/core/lib/gprpp/memory.h"
48 #include "src/core/lib/gprpp/orphanable.h"
49 #include "src/core/lib/gprpp/ref_counted_ptr.h"
50 #include "src/core/lib/gprpp/sync.h"
51 #include "src/core/lib/iomgr/sockaddr.h"
52 #include "src/core/lib/iomgr/timer.h"
53 #include "src/core/lib/slice/slice_internal.h"
54 #include "src/core/lib/slice/slice_string_helpers.h"
55 #include "src/core/lib/surface/call.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/channel_init.h"
58 #include "src/core/lib/transport/static_metadata.h"
59
60 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
61 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
62 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
63 #define GRPC_XDS_RECONNECT_JITTER 0.2
64 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
65
66 namespace grpc_core {
67
68 TraceFlag grpc_xds_client_trace(false, "xds_client");
69 TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
70
71 namespace {
72
73 Mutex* g_mu = nullptr;
74 const grpc_channel_args* g_channel_args ABSL_GUARDED_BY(*g_mu) = nullptr;
75 XdsClient* g_xds_client ABSL_GUARDED_BY(*g_mu) = nullptr;
76 char* g_fallback_bootstrap_config ABSL_GUARDED_BY(*g_mu) = nullptr;
77
78 } // namespace
79
80 //
81 // Internal class declarations
82 //
83
84 // An xds call wrapper that can restart a call upon failure. Holds a ref to
85 // the xds channel. The template parameter is the kind of wrapped xds call.
86 template <typename T>
87 class XdsClient::ChannelState::RetryableCall
88 : public InternallyRefCounted<RetryableCall<T>> {
89 public:
90 explicit RetryableCall(RefCountedPtr<ChannelState> chand);
91
92 void Orphan() override;
93
94 void OnCallFinishedLocked();
95
calld() const96 T* calld() const { return calld_.get(); }
chand() const97 ChannelState* chand() const { return chand_.get(); }
98
99 bool IsCurrentCallOnChannel() const;
100
101 private:
102 void StartNewCallLocked();
103 void StartRetryTimerLocked();
104 static void OnRetryTimer(void* arg, grpc_error_handle error);
105 void OnRetryTimerLocked(grpc_error_handle error);
106
107 // The wrapped xds call that talks to the xds server. It's instantiated
108 // every time we start a new call. It's null during call retry backoff.
109 OrphanablePtr<T> calld_;
110 // The owning xds channel.
111 RefCountedPtr<ChannelState> chand_;
112
113 // Retry state.
114 BackOff backoff_;
115 grpc_timer retry_timer_;
116 grpc_closure on_retry_timer_;
117 bool retry_timer_callback_pending_ = false;
118
119 bool shutting_down_ = false;
120 };
121
122 // Contains an ADS call to the xds server.
123 class XdsClient::ChannelState::AdsCallState
124 : public InternallyRefCounted<AdsCallState> {
125 public:
126 // The ctor and dtor should not be used directly.
127 explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
128 ~AdsCallState() override;
129
130 void Orphan() override;
131
parent() const132 RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
chand() const133 ChannelState* chand() const { return parent_->chand(); }
xds_client() const134 XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const135 bool seen_response() const { return seen_response_; }
136
137 void SubscribeLocked(const std::string& type_url, const std::string& name)
138 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
139 void UnsubscribeLocked(const std::string& type_url, const std::string& name,
140 bool delay_unsubscription)
141 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
142
143 bool HasSubscribedResources() const;
144
145 private:
146 class ResourceState : public InternallyRefCounted<ResourceState> {
147 public:
ResourceState(const std::string & type_url,const std::string & name,bool sent_initial_request)148 ResourceState(const std::string& type_url, const std::string& name,
149 bool sent_initial_request)
150 : type_url_(type_url),
151 name_(name),
152 sent_initial_request_(sent_initial_request) {
153 GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
154 grpc_schedule_on_exec_ctx);
155 }
156
Orphan()157 void Orphan() override {
158 Finish();
159 Unref(DEBUG_LOCATION, "Orphan");
160 }
161
Start(RefCountedPtr<AdsCallState> ads_calld)162 void Start(RefCountedPtr<AdsCallState> ads_calld) {
163 if (sent_initial_request_) return;
164 sent_initial_request_ = true;
165 ads_calld_ = std::move(ads_calld);
166 Ref(DEBUG_LOCATION, "timer").release();
167 timer_pending_ = true;
168 grpc_timer_init(
169 &timer_,
170 ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
171 &timer_callback_);
172 }
173
Finish()174 void Finish() {
175 if (timer_pending_) {
176 grpc_timer_cancel(&timer_);
177 timer_pending_ = false;
178 }
179 }
180
181 private:
OnTimer(void * arg,grpc_error_handle error)182 static void OnTimer(void* arg, grpc_error_handle error) {
183 ResourceState* self = static_cast<ResourceState*>(arg);
184 {
185 MutexLock lock(&self->ads_calld_->xds_client()->mu_);
186 self->OnTimerLocked(GRPC_ERROR_REF(error));
187 }
188 self->ads_calld_.reset();
189 self->Unref(DEBUG_LOCATION, "timer");
190 }
191
OnTimerLocked(grpc_error_handle error)192 void OnTimerLocked(grpc_error_handle error)
193 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
194 if (error == GRPC_ERROR_NONE && timer_pending_) {
195 timer_pending_ = false;
196 grpc_error_handle watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
197 absl::StrFormat(
198 "timeout obtaining resource {type=%s name=%s} from xds server",
199 type_url_, name_)
200 .c_str());
201 watcher_error = grpc_error_set_int(
202 watcher_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
203 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
204 gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
205 grpc_error_std_string(watcher_error).c_str());
206 }
207 if (type_url_ == XdsApi::kLdsTypeUrl) {
208 ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
209 state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
210 for (const auto& p : state.watchers) {
211 p.first->OnError(GRPC_ERROR_REF(watcher_error));
212 }
213 } else if (type_url_ == XdsApi::kRdsTypeUrl) {
214 RouteConfigState& state =
215 ads_calld_->xds_client()->route_config_map_[name_];
216 state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
217 for (const auto& p : state.watchers) {
218 p.first->OnError(GRPC_ERROR_REF(watcher_error));
219 }
220 } else if (type_url_ == XdsApi::kCdsTypeUrl) {
221 ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
222 state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
223 for (const auto& p : state.watchers) {
224 p.first->OnError(GRPC_ERROR_REF(watcher_error));
225 }
226 } else if (type_url_ == XdsApi::kEdsTypeUrl) {
227 EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
228 state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
229 for (const auto& p : state.watchers) {
230 p.first->OnError(GRPC_ERROR_REF(watcher_error));
231 }
232 } else {
233 GPR_UNREACHABLE_CODE(return );
234 }
235 GRPC_ERROR_UNREF(watcher_error);
236 }
237 GRPC_ERROR_UNREF(error);
238 }
239
240 const std::string type_url_;
241 const std::string name_;
242
243 RefCountedPtr<AdsCallState> ads_calld_;
244 bool sent_initial_request_;
245 bool timer_pending_ = false;
246 grpc_timer timer_;
247 grpc_closure timer_callback_;
248 };
249
250 struct ResourceTypeState {
~ResourceTypeStategrpc_core::XdsClient::ChannelState::AdsCallState::ResourceTypeState251 ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
252
253 // Nonce and error for this resource type.
254 std::string nonce;
255 grpc_error_handle error = GRPC_ERROR_NONE;
256
257 // Subscribed resources of this type.
258 std::map<std::string /* name */, OrphanablePtr<ResourceState>>
259 subscribed_resources;
260 };
261
262 void SendMessageLocked(const std::string& type_url)
263 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
264
265 void AcceptLdsUpdateLocked(std::string version, grpc_millis update_time,
266 XdsApi::LdsUpdateMap lds_update_map)
267 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
268 void AcceptRdsUpdateLocked(std::string version, grpc_millis update_time,
269 XdsApi::RdsUpdateMap rds_update_map)
270 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
271 void AcceptCdsUpdateLocked(std::string version, grpc_millis update_time,
272 XdsApi::CdsUpdateMap cds_update_map)
273 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
274 void AcceptEdsUpdateLocked(std::string version, grpc_millis update_time,
275 XdsApi::EdsUpdateMap eds_update_map)
276 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
277
278 static void OnRequestSent(void* arg, grpc_error_handle error);
279 void OnRequestSentLocked(grpc_error_handle error)
280 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
281 static void OnResponseReceived(void* arg, grpc_error_handle error);
282 bool OnResponseReceivedLocked()
283 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
284 static void OnStatusReceived(void* arg, grpc_error_handle error);
285 void OnStatusReceivedLocked(grpc_error_handle error)
286 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
287
288 bool IsCurrentCallOnChannel() const;
289
290 std::set<absl::string_view> ResourceNamesForRequest(
291 const std::string& type_url);
292
293 // The owning RetryableCall<>.
294 RefCountedPtr<RetryableCall<AdsCallState>> parent_;
295
296 bool sent_initial_message_ = false;
297 bool seen_response_ = false;
298
299 // Always non-NULL.
300 grpc_call* call_;
301
302 // recv_initial_metadata
303 grpc_metadata_array initial_metadata_recv_;
304
305 // send_message
306 grpc_byte_buffer* send_message_payload_ = nullptr;
307 grpc_closure on_request_sent_;
308
309 // recv_message
310 grpc_byte_buffer* recv_message_payload_ = nullptr;
311 grpc_closure on_response_received_;
312
313 // recv_trailing_metadata
314 grpc_metadata_array trailing_metadata_recv_;
315 grpc_status_code status_code_;
316 grpc_slice status_details_;
317 grpc_closure on_status_received_;
318
319 // Resource types for which requests need to be sent.
320 std::set<std::string /*type_url*/> buffered_requests_;
321
322 // State for each resource type.
323 std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
324 };
325
326 // Contains an LRS call to the xds server.
327 class XdsClient::ChannelState::LrsCallState
328 : public InternallyRefCounted<LrsCallState> {
329 public:
330 // The ctor and dtor should not be used directly.
331 explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
332 ~LrsCallState() override;
333
334 void Orphan() override;
335
336 void MaybeStartReportingLocked();
337
parent()338 RetryableCall<LrsCallState>* parent() { return parent_.get(); }
chand() const339 ChannelState* chand() const { return parent_->chand(); }
xds_client() const340 XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const341 bool seen_response() const { return seen_response_; }
342
343 private:
344 // Reports client-side load stats according to a fixed interval.
345 class Reporter : public InternallyRefCounted<Reporter> {
346 public:
Reporter(RefCountedPtr<LrsCallState> parent,grpc_millis report_interval)347 Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
348 : parent_(std::move(parent)), report_interval_(report_interval) {
349 GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
350 grpc_schedule_on_exec_ctx);
351 GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
352 grpc_schedule_on_exec_ctx);
353 ScheduleNextReportLocked();
354 }
355
356 void Orphan() override;
357
358 private:
359 void ScheduleNextReportLocked()
360 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
361 static void OnNextReportTimer(void* arg, grpc_error_handle error);
362 bool OnNextReportTimerLocked(grpc_error_handle error)
363 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
364 bool SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
365 static void OnReportDone(void* arg, grpc_error_handle error);
366 bool OnReportDoneLocked(grpc_error_handle error)
367 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
368
IsCurrentReporterOnCall() const369 bool IsCurrentReporterOnCall() const {
370 return this == parent_->reporter_.get();
371 }
xds_client() const372 XdsClient* xds_client() const { return parent_->xds_client(); }
373
374 // The owning LRS call.
375 RefCountedPtr<LrsCallState> parent_;
376
377 // The load reporting state.
378 const grpc_millis report_interval_;
379 bool last_report_counters_were_zero_ = false;
380 bool next_report_timer_callback_pending_ = false;
381 grpc_timer next_report_timer_;
382 grpc_closure on_next_report_timer_;
383 grpc_closure on_report_done_;
384 };
385
386 static void OnInitialRequestSent(void* arg, grpc_error_handle error);
387 void OnInitialRequestSentLocked()
388 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
389 static void OnResponseReceived(void* arg, grpc_error_handle error);
390 bool OnResponseReceivedLocked()
391 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
392 static void OnStatusReceived(void* arg, grpc_error_handle error);
393 void OnStatusReceivedLocked(grpc_error_handle error)
394 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
395
396 bool IsCurrentCallOnChannel() const;
397
398 // The owning RetryableCall<>.
399 RefCountedPtr<RetryableCall<LrsCallState>> parent_;
400 bool seen_response_ = false;
401
402 // Always non-NULL.
403 grpc_call* call_;
404
405 // recv_initial_metadata
406 grpc_metadata_array initial_metadata_recv_;
407
408 // send_message
409 grpc_byte_buffer* send_message_payload_ = nullptr;
410 grpc_closure on_initial_request_sent_;
411
412 // recv_message
413 grpc_byte_buffer* recv_message_payload_ = nullptr;
414 grpc_closure on_response_received_;
415
416 // recv_trailing_metadata
417 grpc_metadata_array trailing_metadata_recv_;
418 grpc_status_code status_code_;
419 grpc_slice status_details_;
420 grpc_closure on_status_received_;
421
422 // Load reporting state.
423 bool send_all_clusters_ = false;
424 std::set<std::string> cluster_names_; // Asked for by the LRS server.
425 grpc_millis load_reporting_interval_ = 0;
426 OrphanablePtr<Reporter> reporter_;
427 };
428
429 //
430 // XdsClient::ChannelState::StateWatcher
431 //
432
433 class XdsClient::ChannelState::StateWatcher
434 : public AsyncConnectivityStateWatcherInterface {
435 public:
StateWatcher(RefCountedPtr<ChannelState> parent)436 explicit StateWatcher(RefCountedPtr<ChannelState> parent)
437 : parent_(std::move(parent)) {}
438
439 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)440 void OnConnectivityStateChange(grpc_connectivity_state new_state,
441 const absl::Status& status) override {
442 MutexLock lock(&parent_->xds_client_->mu_);
443 if (!parent_->shutting_down_ &&
444 new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
445 // In TRANSIENT_FAILURE. Notify all watchers of error.
446 gpr_log(GPR_INFO,
447 "[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
448 "status_message:(%s)",
449 parent_->xds_client(), status.ToString().c_str());
450 parent_->xds_client_->NotifyOnErrorLocked(
451 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
452 "xds channel in TRANSIENT_FAILURE"));
453 }
454 }
455
456 RefCountedPtr<ChannelState> parent_;
457 };
458
459 //
460 // XdsClient::ChannelState
461 //
462
463 namespace {
464
CreateXdsChannel(grpc_channel_args * args,const XdsBootstrap::XdsServer & server)465 grpc_channel* CreateXdsChannel(grpc_channel_args* args,
466 const XdsBootstrap::XdsServer& server) {
467 RefCountedPtr<grpc_channel_credentials> channel_creds =
468 XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type,
469 server.channel_creds_config);
470 return grpc_secure_channel_create(channel_creds.get(),
471 server.server_uri.c_str(), args, nullptr);
472 }
473
474 } // namespace
475
ChannelState(WeakRefCountedPtr<XdsClient> xds_client,const XdsBootstrap::XdsServer & server)476 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
477 const XdsBootstrap::XdsServer& server)
478 : InternallyRefCounted<ChannelState>(
479 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
480 ? "ChannelState"
481 : nullptr),
482 xds_client_(std::move(xds_client)),
483 server_(server) {
484 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
485 gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
486 xds_client_.get(), server.server_uri.c_str());
487 }
488 channel_ = CreateXdsChannel(xds_client_->args_, server);
489 GPR_ASSERT(channel_ != nullptr);
490 StartConnectivityWatchLocked();
491 }
492
~ChannelState()493 XdsClient::ChannelState::~ChannelState() {
494 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
495 gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
496 this);
497 }
498 grpc_channel_destroy(channel_);
499 xds_client_.reset(DEBUG_LOCATION, "ChannelState");
500 }
501
Orphan()502 void XdsClient::ChannelState::Orphan() {
503 shutting_down_ = true;
504 CancelConnectivityWatchLocked();
505 ads_calld_.reset();
506 lrs_calld_.reset();
507 Unref(DEBUG_LOCATION, "ChannelState+orphaned");
508 }
509
ads_calld() const510 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
511 const {
512 return ads_calld_->calld();
513 }
514
lrs_calld() const515 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
516 const {
517 return lrs_calld_->calld();
518 }
519
HasActiveAdsCall() const520 bool XdsClient::ChannelState::HasActiveAdsCall() const {
521 return ads_calld_ != nullptr && ads_calld_->calld() != nullptr;
522 }
523
MaybeStartLrsCall()524 void XdsClient::ChannelState::MaybeStartLrsCall() {
525 if (lrs_calld_ != nullptr) return;
526 lrs_calld_.reset(
527 new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
528 }
529
StopLrsCall()530 void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
531
StartConnectivityWatchLocked()532 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
533 ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
534 GPR_ASSERT(client_channel != nullptr);
535 watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
536 client_channel->AddConnectivityWatcher(
537 GRPC_CHANNEL_IDLE,
538 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
539 }
540
CancelConnectivityWatchLocked()541 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
542 ClientChannel* client_channel = ClientChannel::GetFromChannel(channel_);
543 GPR_ASSERT(client_channel != nullptr);
544 client_channel->RemoveConnectivityWatcher(watcher_);
545 }
546
SubscribeLocked(const std::string & type_url,const std::string & name)547 void XdsClient::ChannelState::SubscribeLocked(const std::string& type_url,
548 const std::string& name) {
549 if (ads_calld_ == nullptr) {
550 // Start the ADS call if this is the first request.
551 ads_calld_.reset(new RetryableCall<AdsCallState>(
552 Ref(DEBUG_LOCATION, "ChannelState+ads")));
553 // Note: AdsCallState's ctor will automatically subscribe to all
554 // resources that the XdsClient already has watchers for, so we can
555 // return here.
556 return;
557 }
558 // If the ADS call is in backoff state, we don't need to do anything now
559 // because when the call is restarted it will resend all necessary requests.
560 if (ads_calld() == nullptr) return;
561 // Subscribe to this resource if the ADS call is active.
562 ads_calld()->SubscribeLocked(type_url, name);
563 }
564
UnsubscribeLocked(const std::string & type_url,const std::string & name,bool delay_unsubscription)565 void XdsClient::ChannelState::UnsubscribeLocked(const std::string& type_url,
566 const std::string& name,
567 bool delay_unsubscription) {
568 if (ads_calld_ != nullptr) {
569 auto* calld = ads_calld_->calld();
570 if (calld != nullptr) {
571 calld->UnsubscribeLocked(type_url, name, delay_unsubscription);
572 if (!calld->HasSubscribedResources()) ads_calld_.reset();
573 }
574 }
575 }
576
577 //
578 // XdsClient::ChannelState::RetryableCall<>
579 //
580
581 template <typename T>
RetryableCall(RefCountedPtr<ChannelState> chand)582 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
583 RefCountedPtr<ChannelState> chand)
584 : chand_(std::move(chand)),
585 backoff_(
586 BackOff::Options()
587 .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
588 1000)
589 .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
590 .set_jitter(GRPC_XDS_RECONNECT_JITTER)
591 .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
592 // Closure Initialization
593 GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
594 grpc_schedule_on_exec_ctx);
595 StartNewCallLocked();
596 }
597
598 template <typename T>
Orphan()599 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
600 shutting_down_ = true;
601 calld_.reset();
602 if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
603 this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
604 }
605
606 template <typename T>
OnCallFinishedLocked()607 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
608 const bool seen_response = calld_->seen_response();
609 calld_.reset();
610 if (seen_response) {
611 // If we lost connection to the xds server, reset backoff and restart the
612 // call immediately.
613 backoff_.Reset();
614 StartNewCallLocked();
615 } else {
616 // If we failed to connect to the xds server, retry later.
617 StartRetryTimerLocked();
618 }
619 }
620
621 template <typename T>
StartNewCallLocked()622 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
623 if (shutting_down_) return;
624 GPR_ASSERT(chand_->channel_ != nullptr);
625 GPR_ASSERT(calld_ == nullptr);
626 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
627 gpr_log(GPR_INFO,
628 "[xds_client %p] Start new call from retryable call (chand: %p, "
629 "retryable call: %p)",
630 chand()->xds_client(), chand(), this);
631 }
632 calld_ = MakeOrphanable<T>(
633 this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
634 }
635
636 template <typename T>
StartRetryTimerLocked()637 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
638 if (shutting_down_) return;
639 const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
640 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
641 grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
642 gpr_log(GPR_INFO,
643 "[xds_client %p] Failed to connect to xds server (chand: %p) "
644 "retry timer will fire in %" PRId64 "ms.",
645 chand()->xds_client(), chand(), timeout);
646 }
647 this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
648 grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
649 retry_timer_callback_pending_ = true;
650 }
651
652 template <typename T>
OnRetryTimer(void * arg,grpc_error_handle error)653 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
654 void* arg, grpc_error_handle error) {
655 RetryableCall* calld = static_cast<RetryableCall*>(arg);
656 {
657 MutexLock lock(&calld->chand_->xds_client()->mu_);
658 calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
659 }
660 calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
661 }
662
663 template <typename T>
OnRetryTimerLocked(grpc_error_handle error)664 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
665 grpc_error_handle error) {
666 retry_timer_callback_pending_ = false;
667 if (!shutting_down_ && error == GRPC_ERROR_NONE) {
668 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
669 gpr_log(
670 GPR_INFO,
671 "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
672 chand()->xds_client(), chand(), this);
673 }
674 StartNewCallLocked();
675 }
676 GRPC_ERROR_UNREF(error);
677 }
678
679 //
680 // XdsClient::ChannelState::AdsCallState
681 //
682
AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent)683 XdsClient::ChannelState::AdsCallState::AdsCallState(
684 RefCountedPtr<RetryableCall<AdsCallState>> parent)
685 : InternallyRefCounted<AdsCallState>(
686 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
687 ? "AdsCallState"
688 : nullptr),
689 parent_(std::move(parent)) {
690 // Init the ADS call. Note that the call will progress every time there's
691 // activity in xds_client()->interested_parties_, which is comprised of
692 // the polling entities from client_channel.
693 GPR_ASSERT(xds_client() != nullptr);
694 // Create a call with the specified method name.
695 const auto& method =
696 chand()->server_.ShouldUseV3()
697 ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
698 : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
699 call_ = grpc_channel_create_pollset_set_call(
700 chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
701 xds_client()->interested_parties_, method, nullptr,
702 GRPC_MILLIS_INF_FUTURE, nullptr);
703 GPR_ASSERT(call_ != nullptr);
704 // Init data associated with the call.
705 grpc_metadata_array_init(&initial_metadata_recv_);
706 grpc_metadata_array_init(&trailing_metadata_recv_);
707 // Start the call.
708 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
709 gpr_log(GPR_INFO,
710 "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
711 "call: %p)",
712 xds_client(), chand(), this, call_);
713 }
714 // Create the ops.
715 grpc_call_error call_error;
716 grpc_op ops[3];
717 memset(ops, 0, sizeof(ops));
718 // Op: send initial metadata.
719 grpc_op* op = ops;
720 op->op = GRPC_OP_SEND_INITIAL_METADATA;
721 op->data.send_initial_metadata.count = 0;
722 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
723 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
724 op->reserved = nullptr;
725 op++;
726 call_error = grpc_call_start_batch_and_execute(
727 call_, ops, static_cast<size_t>(op - ops), nullptr);
728 GPR_ASSERT(GRPC_CALL_OK == call_error);
729 // Op: send request message.
730 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
731 grpc_schedule_on_exec_ctx);
732 for (const auto& p : xds_client()->listener_map_) {
733 SubscribeLocked(XdsApi::kLdsTypeUrl, std::string(p.first));
734 }
735 for (const auto& p : xds_client()->route_config_map_) {
736 SubscribeLocked(XdsApi::kRdsTypeUrl, std::string(p.first));
737 }
738 for (const auto& p : xds_client()->cluster_map_) {
739 SubscribeLocked(XdsApi::kCdsTypeUrl, std::string(p.first));
740 }
741 for (const auto& p : xds_client()->endpoint_map_) {
742 SubscribeLocked(XdsApi::kEdsTypeUrl, std::string(p.first));
743 }
744 // Op: recv initial metadata.
745 op = ops;
746 op->op = GRPC_OP_RECV_INITIAL_METADATA;
747 op->data.recv_initial_metadata.recv_initial_metadata =
748 &initial_metadata_recv_;
749 op->flags = 0;
750 op->reserved = nullptr;
751 op++;
752 // Op: recv response.
753 op->op = GRPC_OP_RECV_MESSAGE;
754 op->data.recv_message.recv_message = &recv_message_payload_;
755 op->flags = 0;
756 op->reserved = nullptr;
757 op++;
758 Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
759 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
760 grpc_schedule_on_exec_ctx);
761 call_error = grpc_call_start_batch_and_execute(
762 call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
763 GPR_ASSERT(GRPC_CALL_OK == call_error);
764 // Op: recv server status.
765 op = ops;
766 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
767 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
768 op->data.recv_status_on_client.status = &status_code_;
769 op->data.recv_status_on_client.status_details = &status_details_;
770 op->flags = 0;
771 op->reserved = nullptr;
772 op++;
773 // This callback signals the end of the call, so it relies on the initial
774 // ref instead of a new ref. When it's invoked, it's the initial ref that is
775 // unreffed.
776 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
777 grpc_schedule_on_exec_ctx);
778 call_error = grpc_call_start_batch_and_execute(
779 call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
780 GPR_ASSERT(GRPC_CALL_OK == call_error);
781 }
782
~AdsCallState()783 XdsClient::ChannelState::AdsCallState::~AdsCallState() {
784 grpc_metadata_array_destroy(&initial_metadata_recv_);
785 grpc_metadata_array_destroy(&trailing_metadata_recv_);
786 grpc_byte_buffer_destroy(send_message_payload_);
787 grpc_byte_buffer_destroy(recv_message_payload_);
788 grpc_slice_unref_internal(status_details_);
789 GPR_ASSERT(call_ != nullptr);
790 grpc_call_unref(call_);
791 }
792
Orphan()793 void XdsClient::ChannelState::AdsCallState::Orphan() {
794 GPR_ASSERT(call_ != nullptr);
795 // If we are here because xds_client wants to cancel the call,
796 // on_status_received_ will complete the cancellation and clean up. Otherwise,
797 // we are here because xds_client has to orphan a failed call, then the
798 // following cancellation will be a no-op.
799 grpc_call_cancel_internal(call_);
800 state_map_.clear();
801 // Note that the initial ref is hold by on_status_received_. So the
802 // corresponding unref happens in on_status_received_ instead of here.
803 }
804
SendMessageLocked(const std::string & type_url)805 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
806 const std::string& type_url)
807 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_) {
808 // Buffer message sending if an existing message is in flight.
809 if (send_message_payload_ != nullptr) {
810 buffered_requests_.insert(type_url);
811 return;
812 }
813 auto& state = state_map_[type_url];
814 grpc_slice request_payload_slice;
815 std::set<absl::string_view> resource_names =
816 ResourceNamesForRequest(type_url);
817 request_payload_slice = xds_client()->api_.CreateAdsRequest(
818 chand()->server_, type_url, resource_names,
819 xds_client()->resource_version_map_[type_url], state.nonce,
820 GRPC_ERROR_REF(state.error), !sent_initial_message_);
821 if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
822 type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
823 state_map_.erase(type_url);
824 }
825 sent_initial_message_ = true;
826 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
827 gpr_log(GPR_INFO,
828 "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
829 "error=%s resources=%s",
830 xds_client(), type_url.c_str(),
831 xds_client()->resource_version_map_[type_url].c_str(),
832 state.nonce.c_str(), grpc_error_std_string(state.error).c_str(),
833 absl::StrJoin(resource_names, " ").c_str());
834 }
835 GRPC_ERROR_UNREF(state.error);
836 state.error = GRPC_ERROR_NONE;
837 // Create message payload.
838 send_message_payload_ =
839 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
840 grpc_slice_unref_internal(request_payload_slice);
841 // Send the message.
842 grpc_op op;
843 memset(&op, 0, sizeof(op));
844 op.op = GRPC_OP_SEND_MESSAGE;
845 op.data.send_message.send_message = send_message_payload_;
846 Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
847 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
848 grpc_schedule_on_exec_ctx);
849 grpc_call_error call_error =
850 grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
851 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
852 gpr_log(GPR_ERROR,
853 "[xds_client %p] calld=%p call_error=%d sending ADS message",
854 xds_client(), this, call_error);
855 GPR_ASSERT(GRPC_CALL_OK == call_error);
856 }
857 }
858
SubscribeLocked(const std::string & type_url,const std::string & name)859 void XdsClient::ChannelState::AdsCallState::SubscribeLocked(
860 const std::string& type_url, const std::string& name) {
861 auto& state = state_map_[type_url].subscribed_resources[name];
862 if (state == nullptr) {
863 state = MakeOrphanable<ResourceState>(
864 type_url, name, !xds_client()->resource_version_map_[type_url].empty());
865 SendMessageLocked(type_url);
866 }
867 }
868
UnsubscribeLocked(const std::string & type_url,const std::string & name,bool delay_unsubscription)869 void XdsClient::ChannelState::AdsCallState::UnsubscribeLocked(
870 const std::string& type_url, const std::string& name,
871 bool delay_unsubscription) {
872 state_map_[type_url].subscribed_resources.erase(name);
873 if (!delay_unsubscription) SendMessageLocked(type_url);
874 }
875
HasSubscribedResources() const876 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
877 for (const auto& p : state_map_) {
878 if (!p.second.subscribed_resources.empty()) return true;
879 }
880 return false;
881 }
882
883 namespace {
884
885 // Build a resource metadata struct for ADS result accepting methods and CSDS.
CreateResourceMetadataAcked(std::string serialized_proto,std::string version,grpc_millis update_time)886 XdsApi::ResourceMetadata CreateResourceMetadataAcked(
887 std::string serialized_proto, std::string version,
888 grpc_millis update_time) {
889 XdsApi::ResourceMetadata resource_metadata;
890 resource_metadata.serialized_proto = std::move(serialized_proto);
891 resource_metadata.update_time = update_time;
892 resource_metadata.version = std::move(version);
893 resource_metadata.client_status = XdsApi::ResourceMetadata::ACKED;
894 return resource_metadata;
895 }
896
897 } // namespace
898
AcceptLdsUpdateLocked(std::string version,grpc_millis update_time,XdsApi::LdsUpdateMap lds_update_map)899 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdateLocked(
900 std::string version, grpc_millis update_time,
901 XdsApi::LdsUpdateMap lds_update_map) {
902 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
903 gpr_log(GPR_INFO,
904 "[xds_client %p] LDS update received containing %" PRIuPTR
905 " resources",
906 xds_client(), lds_update_map.size());
907 }
908 auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
909 std::set<std::string> rds_resource_names_seen;
910 for (auto& p : lds_update_map) {
911 const std::string& listener_name = p.first;
912 XdsApi::LdsUpdate& lds_update = p.second.resource;
913 auto& state = lds_state.subscribed_resources[listener_name];
914 if (state != nullptr) state->Finish();
915 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
916 gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: %s", xds_client(),
917 listener_name.c_str(), lds_update.ToString().c_str());
918 }
919 // Record the RDS resource names seen.
920 if (!lds_update.http_connection_manager.route_config_name.empty()) {
921 rds_resource_names_seen.insert(
922 lds_update.http_connection_manager.route_config_name);
923 }
924 // Ignore identical update.
925 ListenerState& listener_state = xds_client()->listener_map_[listener_name];
926 if (listener_state.update.has_value() &&
927 *listener_state.update == lds_update) {
928 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
929 gpr_log(GPR_INFO,
930 "[xds_client %p] LDS update for %s identical to current, "
931 "ignoring.",
932 xds_client(), listener_name.c_str());
933 }
934 continue;
935 }
936 // Update the listener state.
937 listener_state.update = std::move(lds_update);
938 listener_state.meta = CreateResourceMetadataAcked(
939 std::move(p.second.serialized_proto), version, update_time);
940 // Notify watchers.
941 for (const auto& p : listener_state.watchers) {
942 p.first->OnListenerChanged(*listener_state.update);
943 }
944 }
945 // For any subscribed resource that is not present in the update,
946 // remove it from the cache and notify watchers that it does not exist.
947 for (const auto& p : lds_state.subscribed_resources) {
948 const std::string& listener_name = p.first;
949 if (lds_update_map.find(listener_name) == lds_update_map.end()) {
950 ListenerState& listener_state =
951 xds_client()->listener_map_[listener_name];
952 // If the resource was newly requested but has not yet been received,
953 // we don't want to generate an error for the watchers, because this LDS
954 // response may be in reaction to an earlier request that did not yet
955 // request the new resource, so its absence from the response does not
956 // necessarily indicate that the resource does not exist.
957 // For that case, we rely on the request timeout instead.
958 if (!listener_state.update.has_value()) continue;
959 listener_state.update.reset();
960 for (const auto& p : listener_state.watchers) {
961 p.first->OnResourceDoesNotExist();
962 }
963 }
964 }
965 // For any RDS resource that is no longer referred to by any LDS
966 // resources, remove it from the cache and notify watchers that it
967 // does not exist.
968 auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
969 for (const auto& p : rds_state.subscribed_resources) {
970 const std::string& rds_resource_name = p.first;
971 if (rds_resource_names_seen.find(rds_resource_name) ==
972 rds_resource_names_seen.end()) {
973 RouteConfigState& route_config_state =
974 xds_client()->route_config_map_[rds_resource_name];
975 route_config_state.update.reset();
976 for (const auto& p : route_config_state.watchers) {
977 p.first->OnResourceDoesNotExist();
978 }
979 }
980 }
981 }
982
AcceptRdsUpdateLocked(std::string version,grpc_millis update_time,XdsApi::RdsUpdateMap rds_update_map)983 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdateLocked(
984 std::string version, grpc_millis update_time,
985 XdsApi::RdsUpdateMap rds_update_map) {
986 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
987 gpr_log(GPR_INFO,
988 "[xds_client %p] RDS update received containing %" PRIuPTR
989 " resources",
990 xds_client(), rds_update_map.size());
991 }
992 auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
993 for (auto& p : rds_update_map) {
994 const std::string& route_config_name = p.first;
995 XdsApi::RdsUpdate& rds_update = p.second.resource;
996 auto& state = rds_state.subscribed_resources[route_config_name];
997 if (state != nullptr) state->Finish();
998 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
999 gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
1000 rds_update.ToString().c_str());
1001 }
1002 RouteConfigState& route_config_state =
1003 xds_client()->route_config_map_[route_config_name];
1004 // Ignore identical update.
1005 if (route_config_state.update.has_value() &&
1006 *route_config_state.update == rds_update) {
1007 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1008 gpr_log(GPR_INFO,
1009 "[xds_client %p] RDS resource identical to current, ignoring",
1010 xds_client());
1011 }
1012 continue;
1013 }
1014 // Update the cache.
1015 route_config_state.update = std::move(rds_update);
1016 route_config_state.meta = CreateResourceMetadataAcked(
1017 std::move(p.second.serialized_proto), version, update_time);
1018 // Notify all watchers.
1019 for (const auto& p : route_config_state.watchers) {
1020 p.first->OnRouteConfigChanged(*route_config_state.update);
1021 }
1022 }
1023 }
1024
AcceptCdsUpdateLocked(std::string version,grpc_millis update_time,XdsApi::CdsUpdateMap cds_update_map)1025 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdateLocked(
1026 std::string version, grpc_millis update_time,
1027 XdsApi::CdsUpdateMap cds_update_map) {
1028 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1029 gpr_log(GPR_INFO,
1030 "[xds_client %p] CDS update received containing %" PRIuPTR
1031 " resources",
1032 xds_client(), cds_update_map.size());
1033 }
1034 auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
1035 std::set<std::string> eds_resource_names_seen;
1036 for (auto& p : cds_update_map) {
1037 const char* cluster_name = p.first.c_str();
1038 XdsApi::CdsUpdate& cds_update = p.second.resource;
1039 auto& state = cds_state.subscribed_resources[cluster_name];
1040 if (state != nullptr) state->Finish();
1041 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1042 gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(),
1043 cluster_name, cds_update.ToString().c_str());
1044 }
1045 // Record the EDS resource names seen.
1046 eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
1047 ? cluster_name
1048 : cds_update.eds_service_name);
1049 // Ignore identical update.
1050 ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1051 if (cluster_state.update.has_value() &&
1052 *cluster_state.update == cds_update) {
1053 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1054 gpr_log(GPR_INFO,
1055 "[xds_client %p] CDS update identical to current, ignoring.",
1056 xds_client());
1057 }
1058 continue;
1059 }
1060 // Update the cluster state.
1061 cluster_state.update = std::move(cds_update);
1062 cluster_state.meta = CreateResourceMetadataAcked(
1063 std::move(p.second.serialized_proto), version, update_time);
1064 // Notify all watchers.
1065 for (const auto& p : cluster_state.watchers) {
1066 p.first->OnClusterChanged(cluster_state.update.value());
1067 }
1068 }
1069 // For any subscribed resource that is not present in the update,
1070 // remove it from the cache and notify watchers that it does not exist.
1071 for (const auto& p : cds_state.subscribed_resources) {
1072 const std::string& cluster_name = p.first;
1073 if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
1074 ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1075 // If the resource was newly requested but has not yet been received,
1076 // we don't want to generate an error for the watchers, because this CDS
1077 // response may be in reaction to an earlier request that did not yet
1078 // request the new resource, so its absence from the response does not
1079 // necessarily indicate that the resource does not exist.
1080 // For that case, we rely on the request timeout instead.
1081 if (!cluster_state.update.has_value()) continue;
1082 cluster_state.update.reset();
1083 for (const auto& p : cluster_state.watchers) {
1084 p.first->OnResourceDoesNotExist();
1085 }
1086 }
1087 }
1088 // For any EDS resource that is no longer referred to by any CDS
1089 // resources, remove it from the cache and notify watchers that it
1090 // does not exist.
1091 auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1092 for (const auto& p : eds_state.subscribed_resources) {
1093 const std::string& eds_resource_name = p.first;
1094 if (eds_resource_names_seen.find(eds_resource_name) ==
1095 eds_resource_names_seen.end()) {
1096 EndpointState& endpoint_state =
1097 xds_client()->endpoint_map_[eds_resource_name];
1098 endpoint_state.update.reset();
1099 for (const auto& p : endpoint_state.watchers) {
1100 p.first->OnResourceDoesNotExist();
1101 }
1102 }
1103 }
1104 }
1105
AcceptEdsUpdateLocked(std::string version,grpc_millis update_time,XdsApi::EdsUpdateMap eds_update_map)1106 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdateLocked(
1107 std::string version, grpc_millis update_time,
1108 XdsApi::EdsUpdateMap eds_update_map) {
1109 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1110 gpr_log(GPR_INFO,
1111 "[xds_client %p] EDS update received containing %" PRIuPTR
1112 " resources",
1113 xds_client(), eds_update_map.size());
1114 }
1115 auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1116 for (auto& p : eds_update_map) {
1117 const char* eds_service_name = p.first.c_str();
1118 XdsApi::EdsUpdate& eds_update = p.second.resource;
1119 auto& state = eds_state.subscribed_resources[eds_service_name];
1120 if (state != nullptr) state->Finish();
1121 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1122 gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
1123 eds_service_name, eds_update.ToString().c_str());
1124 }
1125 EndpointState& endpoint_state =
1126 xds_client()->endpoint_map_[eds_service_name];
1127 // Ignore identical update.
1128 if (endpoint_state.update.has_value() &&
1129 *endpoint_state.update == eds_update) {
1130 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1131 gpr_log(GPR_INFO,
1132 "[xds_client %p] EDS update identical to current, ignoring.",
1133 xds_client());
1134 }
1135 continue;
1136 }
1137 // Update the cluster state.
1138 endpoint_state.update = std::move(eds_update);
1139 endpoint_state.meta = CreateResourceMetadataAcked(
1140 std::move(p.second.serialized_proto), version, update_time);
1141 // Notify all watchers.
1142 for (const auto& p : endpoint_state.watchers) {
1143 p.first->OnEndpointChanged(endpoint_state.update.value());
1144 }
1145 }
1146 }
1147
OnRequestSent(void * arg,grpc_error_handle error)1148 void XdsClient::ChannelState::AdsCallState::OnRequestSent(
1149 void* arg, grpc_error_handle error) {
1150 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1151 {
1152 MutexLock lock(&ads_calld->xds_client()->mu_);
1153 ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
1154 }
1155 ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1156 }
1157
OnRequestSentLocked(grpc_error_handle error)1158 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1159 grpc_error_handle error) {
1160 if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
1161 // Clean up the sent message.
1162 grpc_byte_buffer_destroy(send_message_payload_);
1163 send_message_payload_ = nullptr;
1164 // Continue to send another pending message if any.
1165 // TODO(roth): The current code to handle buffered messages has the
1166 // advantage of sending only the most recent list of resource names for
1167 // each resource type (no matter how many times that resource type has
1168 // been requested to send while the current message sending is still
1169 // pending). But its disadvantage is that we send the requests in fixed
1170 // order of resource types. We need to fix this if we are seeing some
1171 // resource type(s) starved due to frequent requests of other resource
1172 // type(s).
1173 auto it = buffered_requests_.begin();
1174 if (it != buffered_requests_.end()) {
1175 SendMessageLocked(*it);
1176 buffered_requests_.erase(it);
1177 }
1178 }
1179 GRPC_ERROR_UNREF(error);
1180 }
1181
OnResponseReceived(void * arg,grpc_error_handle)1182 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1183 void* arg, grpc_error_handle /* error */) {
1184 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1185 bool done;
1186 {
1187 MutexLock lock(&ads_calld->xds_client()->mu_);
1188 done = ads_calld->OnResponseReceivedLocked();
1189 }
1190 if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1191 }
1192
OnResponseReceivedLocked()1193 bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1194 // Empty payload means the call was cancelled.
1195 if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1196 return true;
1197 }
1198 // Read the response.
1199 grpc_byte_buffer_reader bbr;
1200 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1201 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1202 grpc_byte_buffer_reader_destroy(&bbr);
1203 grpc_byte_buffer_destroy(recv_message_payload_);
1204 recv_message_payload_ = nullptr;
1205 // Parse and validate the response.
1206 XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
1207 chand()->server_, response_slice,
1208 ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
1209 ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
1210 ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
1211 ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
1212 grpc_slice_unref_internal(response_slice);
1213 if (result.type_url.empty()) {
1214 // Ignore unparsable response.
1215 gpr_log(GPR_ERROR,
1216 "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
1217 xds_client(), grpc_error_std_string(result.parse_error).c_str());
1218 GRPC_ERROR_UNREF(result.parse_error);
1219 } else {
1220 grpc_millis update_time = grpc_core::ExecCtx::Get()->Now();
1221 // Update nonce.
1222 auto& state = state_map_[result.type_url];
1223 state.nonce = std::move(result.nonce);
1224 // NACK or ACK the response.
1225 if (result.parse_error != GRPC_ERROR_NONE) {
1226 xds_client()->UpdateResourceMetadataWithFailedParseResultLocked(
1227 update_time, result);
1228 GRPC_ERROR_UNREF(state.error);
1229 state.error = result.parse_error;
1230 // NACK unacceptable update.
1231 gpr_log(GPR_ERROR,
1232 "[xds_client %p] ADS response invalid for resource type %s "
1233 "version %s, will NACK: nonce=%s error=%s",
1234 xds_client(), result.type_url.c_str(), result.version.c_str(),
1235 state.nonce.c_str(),
1236 grpc_error_std_string(result.parse_error).c_str());
1237 SendMessageLocked(result.type_url);
1238 } else {
1239 seen_response_ = true;
1240 // Accept the ADS response according to the type_url.
1241 if (result.type_url == XdsApi::kLdsTypeUrl) {
1242 AcceptLdsUpdateLocked(result.version, update_time,
1243 std::move(result.lds_update_map));
1244 } else if (result.type_url == XdsApi::kRdsTypeUrl) {
1245 AcceptRdsUpdateLocked(result.version, update_time,
1246 std::move(result.rds_update_map));
1247 } else if (result.type_url == XdsApi::kCdsTypeUrl) {
1248 AcceptCdsUpdateLocked(result.version, update_time,
1249 std::move(result.cds_update_map));
1250 } else if (result.type_url == XdsApi::kEdsTypeUrl) {
1251 AcceptEdsUpdateLocked(result.version, update_time,
1252 std::move(result.eds_update_map));
1253 }
1254 xds_client()->resource_version_map_[result.type_url] =
1255 std::move(result.version);
1256 // ACK the update.
1257 SendMessageLocked(result.type_url);
1258 // Start load reporting if needed.
1259 auto& lrs_call = chand()->lrs_calld_;
1260 if (lrs_call != nullptr) {
1261 LrsCallState* lrs_calld = lrs_call->calld();
1262 if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1263 }
1264 }
1265 }
1266 if (xds_client()->shutting_down_) return true;
1267 // Keep listening for updates.
1268 grpc_op op;
1269 memset(&op, 0, sizeof(op));
1270 op.op = GRPC_OP_RECV_MESSAGE;
1271 op.data.recv_message.recv_message = &recv_message_payload_;
1272 op.flags = 0;
1273 op.reserved = nullptr;
1274 GPR_ASSERT(call_ != nullptr);
1275 // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1276 const grpc_call_error call_error =
1277 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1278 GPR_ASSERT(GRPC_CALL_OK == call_error);
1279 return false;
1280 }
1281
OnStatusReceived(void * arg,grpc_error_handle error)1282 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1283 void* arg, grpc_error_handle error) {
1284 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1285 {
1286 MutexLock lock(&ads_calld->xds_client()->mu_);
1287 ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1288 }
1289 ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1290 }
1291
OnStatusReceivedLocked(grpc_error_handle error)1292 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1293 grpc_error_handle error) {
1294 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1295 char* status_details = grpc_slice_to_c_string(status_details_);
1296 gpr_log(GPR_INFO,
1297 "[xds_client %p] ADS call status received. Status = %d, details "
1298 "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
1299 xds_client(), status_code_, status_details, chand(), this, call_,
1300 grpc_error_std_string(error).c_str());
1301 gpr_free(status_details);
1302 }
1303 // Ignore status from a stale call.
1304 if (IsCurrentCallOnChannel()) {
1305 // Try to restart the call.
1306 parent_->OnCallFinishedLocked();
1307 // Send error to all watchers.
1308 xds_client()->NotifyOnErrorLocked(
1309 GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
1310 }
1311 GRPC_ERROR_UNREF(error);
1312 }
1313
IsCurrentCallOnChannel() const1314 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1315 // If the retryable ADS call is null (which only happens when the xds channel
1316 // is shutting down), all the ADS calls are stale.
1317 if (chand()->ads_calld_ == nullptr) return false;
1318 return this == chand()->ads_calld_->calld();
1319 }
1320
1321 std::set<absl::string_view>
ResourceNamesForRequest(const std::string & type_url)1322 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1323 const std::string& type_url) {
1324 std::set<absl::string_view> resource_names;
1325 auto it = state_map_.find(type_url);
1326 if (it != state_map_.end()) {
1327 for (auto& p : it->second.subscribed_resources) {
1328 resource_names.insert(p.first);
1329 OrphanablePtr<ResourceState>& state = p.second;
1330 state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
1331 }
1332 }
1333 return resource_names;
1334 }
1335
1336 //
1337 // XdsClient::ChannelState::LrsCallState::Reporter
1338 //
1339
Orphan()1340 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1341 if (next_report_timer_callback_pending_) {
1342 grpc_timer_cancel(&next_report_timer_);
1343 }
1344 }
1345
1346 void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked()1347 ScheduleNextReportLocked() {
1348 const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1349 grpc_timer_init(&next_report_timer_, next_report_time,
1350 &on_next_report_timer_);
1351 next_report_timer_callback_pending_ = true;
1352 }
1353
OnNextReportTimer(void * arg,grpc_error_handle error)1354 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1355 void* arg, grpc_error_handle error) {
1356 Reporter* self = static_cast<Reporter*>(arg);
1357 bool done;
1358 {
1359 MutexLock lock(&self->xds_client()->mu_);
1360 done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
1361 }
1362 if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
1363 }
1364
OnNextReportTimerLocked(grpc_error_handle error)1365 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1366 grpc_error_handle error) {
1367 next_report_timer_callback_pending_ = false;
1368 if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1369 GRPC_ERROR_UNREF(error);
1370 return true;
1371 }
1372 return SendReportLocked();
1373 }
1374
1375 namespace {
1376
LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap & snapshot)1377 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1378 for (const auto& p : snapshot) {
1379 const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1380 if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1381 for (const auto& q : cluster_snapshot.locality_stats) {
1382 const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1383 if (!locality_snapshot.IsZero()) return false;
1384 }
1385 }
1386 return true;
1387 }
1388
1389 } // namespace
1390
SendReportLocked()1391 bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1392 // Construct snapshot from all reported stats.
1393 XdsApi::ClusterLoadReportMap snapshot =
1394 xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
1395 parent_->cluster_names_);
1396 // Skip client load report if the counters were all zero in the last
1397 // report and they are still zero in this one.
1398 const bool old_val = last_report_counters_were_zero_;
1399 last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1400 if (old_val && last_report_counters_were_zero_) {
1401 if (xds_client()->load_report_map_.empty()) {
1402 parent_->chand()->StopLrsCall();
1403 return true;
1404 }
1405 ScheduleNextReportLocked();
1406 return false;
1407 }
1408 // Create a request that contains the snapshot.
1409 grpc_slice request_payload_slice =
1410 xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1411 parent_->send_message_payload_ =
1412 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1413 grpc_slice_unref_internal(request_payload_slice);
1414 // Send the report.
1415 grpc_op op;
1416 memset(&op, 0, sizeof(op));
1417 op.op = GRPC_OP_SEND_MESSAGE;
1418 op.data.send_message.send_message = parent_->send_message_payload_;
1419 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1420 parent_->call_, &op, 1, &on_report_done_);
1421 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1422 gpr_log(GPR_ERROR,
1423 "[xds_client %p] calld=%p call_error=%d sending client load report",
1424 xds_client(), this, call_error);
1425 GPR_ASSERT(GRPC_CALL_OK == call_error);
1426 }
1427 return false;
1428 }
1429
OnReportDone(void * arg,grpc_error_handle error)1430 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
1431 void* arg, grpc_error_handle error) {
1432 Reporter* self = static_cast<Reporter*>(arg);
1433 bool done;
1434 {
1435 MutexLock lock(&self->xds_client()->mu_);
1436 done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
1437 }
1438 if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1439 }
1440
OnReportDoneLocked(grpc_error_handle error)1441 bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1442 grpc_error_handle error) {
1443 grpc_byte_buffer_destroy(parent_->send_message_payload_);
1444 parent_->send_message_payload_ = nullptr;
1445 // If there are no more registered stats to report, cancel the call.
1446 if (xds_client()->load_report_map_.empty()) {
1447 parent_->chand()->StopLrsCall();
1448 GRPC_ERROR_UNREF(error);
1449 return true;
1450 }
1451 if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1452 GRPC_ERROR_UNREF(error);
1453 // If this reporter is no longer the current one on the call, the reason
1454 // might be that it was orphaned for a new one due to config update.
1455 if (!IsCurrentReporterOnCall()) {
1456 parent_->MaybeStartReportingLocked();
1457 }
1458 return true;
1459 }
1460 ScheduleNextReportLocked();
1461 return false;
1462 }
1463
1464 //
1465 // XdsClient::ChannelState::LrsCallState
1466 //
1467
LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent)1468 XdsClient::ChannelState::LrsCallState::LrsCallState(
1469 RefCountedPtr<RetryableCall<LrsCallState>> parent)
1470 : InternallyRefCounted<LrsCallState>(
1471 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
1472 ? "LrsCallState"
1473 : nullptr),
1474 parent_(std::move(parent)) {
1475 // Init the LRS call. Note that the call will progress every time there's
1476 // activity in xds_client()->interested_parties_, which is comprised of
1477 // the polling entities from client_channel.
1478 GPR_ASSERT(xds_client() != nullptr);
1479 const auto& method =
1480 chand()->server_.ShouldUseV3()
1481 ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
1482 : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
1483 call_ = grpc_channel_create_pollset_set_call(
1484 chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1485 xds_client()->interested_parties_, method, nullptr,
1486 GRPC_MILLIS_INF_FUTURE, nullptr);
1487 GPR_ASSERT(call_ != nullptr);
1488 // Init the request payload.
1489 grpc_slice request_payload_slice =
1490 xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
1491 send_message_payload_ =
1492 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1493 grpc_slice_unref_internal(request_payload_slice);
1494 // Init other data associated with the LRS call.
1495 grpc_metadata_array_init(&initial_metadata_recv_);
1496 grpc_metadata_array_init(&trailing_metadata_recv_);
1497 // Start the call.
1498 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1499 gpr_log(GPR_INFO,
1500 "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
1501 "call: %p)",
1502 xds_client(), chand(), this, call_);
1503 }
1504 // Create the ops.
1505 grpc_call_error call_error;
1506 grpc_op ops[3];
1507 memset(ops, 0, sizeof(ops));
1508 // Op: send initial metadata.
1509 grpc_op* op = ops;
1510 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1511 op->data.send_initial_metadata.count = 0;
1512 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
1513 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1514 op->reserved = nullptr;
1515 op++;
1516 // Op: send request message.
1517 GPR_ASSERT(send_message_payload_ != nullptr);
1518 op->op = GRPC_OP_SEND_MESSAGE;
1519 op->data.send_message.send_message = send_message_payload_;
1520 op->flags = 0;
1521 op->reserved = nullptr;
1522 op++;
1523 Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1524 GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
1525 grpc_schedule_on_exec_ctx);
1526 call_error = grpc_call_start_batch_and_execute(
1527 call_, ops, static_cast<size_t>(op - ops), &on_initial_request_sent_);
1528 GPR_ASSERT(GRPC_CALL_OK == call_error);
1529 // Op: recv initial metadata.
1530 op = ops;
1531 op->op = GRPC_OP_RECV_INITIAL_METADATA;
1532 op->data.recv_initial_metadata.recv_initial_metadata =
1533 &initial_metadata_recv_;
1534 op->flags = 0;
1535 op->reserved = nullptr;
1536 op++;
1537 // Op: recv response.
1538 op->op = GRPC_OP_RECV_MESSAGE;
1539 op->data.recv_message.recv_message = &recv_message_payload_;
1540 op->flags = 0;
1541 op->reserved = nullptr;
1542 op++;
1543 Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1544 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1545 grpc_schedule_on_exec_ctx);
1546 call_error = grpc_call_start_batch_and_execute(
1547 call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
1548 GPR_ASSERT(GRPC_CALL_OK == call_error);
1549 // Op: recv server status.
1550 op = ops;
1551 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1552 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1553 op->data.recv_status_on_client.status = &status_code_;
1554 op->data.recv_status_on_client.status_details = &status_details_;
1555 op->flags = 0;
1556 op->reserved = nullptr;
1557 op++;
1558 // This callback signals the end of the call, so it relies on the initial
1559 // ref instead of a new ref. When it's invoked, it's the initial ref that is
1560 // unreffed.
1561 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1562 grpc_schedule_on_exec_ctx);
1563 call_error = grpc_call_start_batch_and_execute(
1564 call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
1565 GPR_ASSERT(GRPC_CALL_OK == call_error);
1566 }
1567
~LrsCallState()1568 XdsClient::ChannelState::LrsCallState::~LrsCallState() {
1569 grpc_metadata_array_destroy(&initial_metadata_recv_);
1570 grpc_metadata_array_destroy(&trailing_metadata_recv_);
1571 grpc_byte_buffer_destroy(send_message_payload_);
1572 grpc_byte_buffer_destroy(recv_message_payload_);
1573 grpc_slice_unref_internal(status_details_);
1574 GPR_ASSERT(call_ != nullptr);
1575 grpc_call_unref(call_);
1576 }
1577
Orphan()1578 void XdsClient::ChannelState::LrsCallState::Orphan() {
1579 reporter_.reset();
1580 GPR_ASSERT(call_ != nullptr);
1581 // If we are here because xds_client wants to cancel the call,
1582 // on_status_received_ will complete the cancellation and clean up. Otherwise,
1583 // we are here because xds_client has to orphan a failed call, then the
1584 // following cancellation will be a no-op.
1585 grpc_call_cancel_internal(call_);
1586 // Note that the initial ref is hold by on_status_received_. So the
1587 // corresponding unref happens in on_status_received_ instead of here.
1588 }
1589
MaybeStartReportingLocked()1590 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1591 // Don't start again if already started.
1592 if (reporter_ != nullptr) return;
1593 // Don't start if the previous send_message op (of the initial request or the
1594 // last report of the previous reporter) hasn't completed.
1595 if (send_message_payload_ != nullptr) return;
1596 // Don't start if no LRS response has arrived.
1597 if (!seen_response()) return;
1598 // Don't start if the ADS call hasn't received any valid response. Note that
1599 // this must be the first channel because it is the current channel but its
1600 // ADS call hasn't seen any response.
1601 if (chand()->ads_calld_ == nullptr ||
1602 chand()->ads_calld_->calld() == nullptr ||
1603 !chand()->ads_calld_->calld()->seen_response()) {
1604 return;
1605 }
1606 // Start reporting.
1607 reporter_ = MakeOrphanable<Reporter>(
1608 Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1609 }
1610
OnInitialRequestSent(void * arg,grpc_error_handle)1611 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
1612 void* arg, grpc_error_handle /*error*/) {
1613 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1614 {
1615 MutexLock lock(&lrs_calld->xds_client()->mu_);
1616 lrs_calld->OnInitialRequestSentLocked();
1617 }
1618 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1619 }
1620
OnInitialRequestSentLocked()1621 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1622 // Clear the send_message_payload_.
1623 grpc_byte_buffer_destroy(send_message_payload_);
1624 send_message_payload_ = nullptr;
1625 MaybeStartReportingLocked();
1626 }
1627
OnResponseReceived(void * arg,grpc_error_handle)1628 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1629 void* arg, grpc_error_handle /*error*/) {
1630 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1631 bool done;
1632 {
1633 MutexLock lock(&lrs_calld->xds_client()->mu_);
1634 done = lrs_calld->OnResponseReceivedLocked();
1635 }
1636 if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1637 }
1638
OnResponseReceivedLocked()1639 bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1640 // Empty payload means the call was cancelled.
1641 if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1642 return true;
1643 }
1644 // Read the response.
1645 grpc_byte_buffer_reader bbr;
1646 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1647 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1648 grpc_byte_buffer_reader_destroy(&bbr);
1649 grpc_byte_buffer_destroy(recv_message_payload_);
1650 recv_message_payload_ = nullptr;
1651 // This anonymous lambda is a hack to avoid the usage of goto.
1652 [&]() {
1653 // Parse the response.
1654 bool send_all_clusters = false;
1655 std::set<std::string> new_cluster_names;
1656 grpc_millis new_load_reporting_interval;
1657 grpc_error_handle parse_error = xds_client()->api_.ParseLrsResponse(
1658 response_slice, &send_all_clusters, &new_cluster_names,
1659 &new_load_reporting_interval);
1660 if (parse_error != GRPC_ERROR_NONE) {
1661 gpr_log(GPR_ERROR,
1662 "[xds_client %p] LRS response parsing failed. error=%s",
1663 xds_client(), grpc_error_std_string(parse_error).c_str());
1664 GRPC_ERROR_UNREF(parse_error);
1665 return;
1666 }
1667 seen_response_ = true;
1668 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1669 gpr_log(
1670 GPR_INFO,
1671 "[xds_client %p] LRS response received, %" PRIuPTR
1672 " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1673 "ms",
1674 xds_client(), new_cluster_names.size(), send_all_clusters,
1675 new_load_reporting_interval);
1676 size_t i = 0;
1677 for (const auto& name : new_cluster_names) {
1678 gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1679 xds_client(), i++, name.c_str());
1680 }
1681 }
1682 if (new_load_reporting_interval <
1683 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1684 new_load_reporting_interval =
1685 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1686 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1687 gpr_log(GPR_INFO,
1688 "[xds_client %p] Increased load_report_interval to minimum "
1689 "value %dms",
1690 xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1691 }
1692 }
1693 // Ignore identical update.
1694 if (send_all_clusters == send_all_clusters_ &&
1695 cluster_names_ == new_cluster_names &&
1696 load_reporting_interval_ == new_load_reporting_interval) {
1697 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1698 gpr_log(GPR_INFO,
1699 "[xds_client %p] Incoming LRS response identical to current, "
1700 "ignoring.",
1701 xds_client());
1702 }
1703 return;
1704 }
1705 // Stop current load reporting (if any) to adopt the new config.
1706 reporter_.reset();
1707 // Record the new config.
1708 send_all_clusters_ = send_all_clusters;
1709 cluster_names_ = std::move(new_cluster_names);
1710 load_reporting_interval_ = new_load_reporting_interval;
1711 // Try starting sending load report.
1712 MaybeStartReportingLocked();
1713 }();
1714 grpc_slice_unref_internal(response_slice);
1715 if (xds_client()->shutting_down_) return true;
1716 // Keep listening for LRS config updates.
1717 grpc_op op;
1718 memset(&op, 0, sizeof(op));
1719 op.op = GRPC_OP_RECV_MESSAGE;
1720 op.data.recv_message.recv_message = &recv_message_payload_;
1721 op.flags = 0;
1722 op.reserved = nullptr;
1723 GPR_ASSERT(call_ != nullptr);
1724 // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1725 const grpc_call_error call_error =
1726 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1727 GPR_ASSERT(GRPC_CALL_OK == call_error);
1728 return false;
1729 }
1730
OnStatusReceived(void * arg,grpc_error_handle error)1731 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1732 void* arg, grpc_error_handle error) {
1733 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1734 {
1735 MutexLock lock(&lrs_calld->xds_client()->mu_);
1736 lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1737 }
1738 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1739 }
1740
OnStatusReceivedLocked(grpc_error_handle error)1741 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1742 grpc_error_handle error) {
1743 GPR_ASSERT(call_ != nullptr);
1744 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1745 char* status_details = grpc_slice_to_c_string(status_details_);
1746 gpr_log(GPR_INFO,
1747 "[xds_client %p] LRS call status received. Status = %d, details "
1748 "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
1749 xds_client(), status_code_, status_details, chand(), this, call_,
1750 grpc_error_std_string(error).c_str());
1751 gpr_free(status_details);
1752 }
1753 // Ignore status from a stale call.
1754 if (IsCurrentCallOnChannel()) {
1755 GPR_ASSERT(!xds_client()->shutting_down_);
1756 // Try to restart the call.
1757 parent_->OnCallFinishedLocked();
1758 }
1759 GRPC_ERROR_UNREF(error);
1760 }
1761
IsCurrentCallOnChannel() const1762 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1763 // If the retryable LRS call is null (which only happens when the xds channel
1764 // is shutting down), all the LRS calls are stale.
1765 if (chand()->lrs_calld_ == nullptr) return false;
1766 return this == chand()->lrs_calld_->calld();
1767 }
1768
1769 //
1770 // XdsClient
1771 //
1772
1773 namespace {
1774
GetRequestTimeout(const grpc_channel_args * args)1775 grpc_millis GetRequestTimeout(const grpc_channel_args* args) {
1776 return grpc_channel_args_find_integer(
1777 args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
1778 {15000, 0, INT_MAX});
1779 }
1780
ModifyChannelArgs(const grpc_channel_args * args)1781 grpc_channel_args* ModifyChannelArgs(const grpc_channel_args* args) {
1782 absl::InlinedVector<grpc_arg, 2> args_to_add = {
1783 grpc_channel_arg_integer_create(
1784 const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
1785 5 * 60 * GPR_MS_PER_SEC),
1786 grpc_channel_arg_integer_create(
1787 const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
1788 };
1789 return grpc_channel_args_copy_and_add(args, args_to_add.data(),
1790 args_to_add.size());
1791 }
1792
1793 } // namespace
1794
XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,const grpc_channel_args * args)1795 XdsClient::XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,
1796 const grpc_channel_args* args)
1797 : DualRefCounted<XdsClient>(
1798 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
1799 : nullptr),
1800 bootstrap_(std::move(bootstrap)),
1801 args_(ModifyChannelArgs(args)),
1802 request_timeout_(GetRequestTimeout(args)),
1803 interested_parties_(grpc_pollset_set_create()),
1804 certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
1805 bootstrap_->certificate_providers())),
1806 api_(this, &grpc_xds_client_trace, bootstrap_->node()) {
1807 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1808 gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1809 }
1810 // Create ChannelState object.
1811 chand_ = MakeOrphanable<ChannelState>(
1812 WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
1813 }
1814
~XdsClient()1815 XdsClient::~XdsClient() {
1816 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1817 gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1818 }
1819 grpc_channel_args_destroy(args_);
1820 grpc_pollset_set_destroy(interested_parties_);
1821 }
1822
AddChannelzLinkage(channelz::ChannelNode * parent_channelz_node)1823 void XdsClient::AddChannelzLinkage(
1824 channelz::ChannelNode* parent_channelz_node) {
1825 MutexLock lock(&mu_);
1826 channelz::ChannelNode* xds_channelz_node =
1827 grpc_channel_get_channelz_node(chand_->channel());
1828 if (xds_channelz_node != nullptr) {
1829 parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
1830 }
1831 }
1832
RemoveChannelzLinkage(channelz::ChannelNode * parent_channelz_node)1833 void XdsClient::RemoveChannelzLinkage(
1834 channelz::ChannelNode* parent_channelz_node) {
1835 MutexLock lock(&mu_);
1836 channelz::ChannelNode* xds_channelz_node =
1837 grpc_channel_get_channelz_node(chand_->channel());
1838 if (xds_channelz_node != nullptr) {
1839 parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
1840 }
1841 }
1842
Orphan()1843 void XdsClient::Orphan() {
1844 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1845 gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1846 }
1847 {
1848 MutexLock lock(g_mu);
1849 if (g_xds_client == this) g_xds_client = nullptr;
1850 }
1851 {
1852 MutexLock lock(&mu_);
1853 shutting_down_ = true;
1854 // Orphan ChannelState object.
1855 chand_.reset();
1856 // We do not clear cluster_map_ and endpoint_map_ if the xds client was
1857 // created by the XdsResolver because the maps contain refs for watchers
1858 // which in turn hold refs to the loadbalancing policies. At this point, it
1859 // is possible for ADS calls to be in progress. Unreffing the loadbalancing
1860 // policies before those calls are done would lead to issues such as
1861 // https://github.com/grpc/grpc/issues/20928.
1862 if (!listener_map_.empty()) {
1863 cluster_map_.clear();
1864 endpoint_map_.clear();
1865 }
1866 }
1867 }
1868
WatchListenerData(absl::string_view listener_name,std::unique_ptr<ListenerWatcherInterface> watcher)1869 void XdsClient::WatchListenerData(
1870 absl::string_view listener_name,
1871 std::unique_ptr<ListenerWatcherInterface> watcher) {
1872 std::string listener_name_str = std::string(listener_name);
1873 MutexLock lock(&mu_);
1874 ListenerState& listener_state = listener_map_[listener_name_str];
1875 ListenerWatcherInterface* w = watcher.get();
1876 listener_state.watchers[w] = std::move(watcher);
1877 // If we've already received an LDS update, notify the new watcher
1878 // immediately.
1879 if (listener_state.update.has_value()) {
1880 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1881 gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
1882 this, listener_name_str.c_str());
1883 }
1884 w->OnListenerChanged(*listener_state.update);
1885 }
1886 chand_->SubscribeLocked(XdsApi::kLdsTypeUrl, listener_name_str);
1887 }
1888
CancelListenerDataWatch(absl::string_view listener_name,ListenerWatcherInterface * watcher,bool delay_unsubscription)1889 void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
1890 ListenerWatcherInterface* watcher,
1891 bool delay_unsubscription) {
1892 MutexLock lock(&mu_);
1893 if (shutting_down_) return;
1894 std::string listener_name_str = std::string(listener_name);
1895 ListenerState& listener_state = listener_map_[listener_name_str];
1896 auto it = listener_state.watchers.find(watcher);
1897 if (it != listener_state.watchers.end()) {
1898 listener_state.watchers.erase(it);
1899 if (listener_state.watchers.empty()) {
1900 listener_map_.erase(listener_name_str);
1901 chand_->UnsubscribeLocked(XdsApi::kLdsTypeUrl, listener_name_str,
1902 delay_unsubscription);
1903 }
1904 }
1905 }
1906
WatchRouteConfigData(absl::string_view route_config_name,std::unique_ptr<RouteConfigWatcherInterface> watcher)1907 void XdsClient::WatchRouteConfigData(
1908 absl::string_view route_config_name,
1909 std::unique_ptr<RouteConfigWatcherInterface> watcher) {
1910 std::string route_config_name_str = std::string(route_config_name);
1911 MutexLock lock(&mu_);
1912 RouteConfigState& route_config_state =
1913 route_config_map_[route_config_name_str];
1914 RouteConfigWatcherInterface* w = watcher.get();
1915 route_config_state.watchers[w] = std::move(watcher);
1916 // If we've already received an RDS update, notify the new watcher
1917 // immediately.
1918 if (route_config_state.update.has_value()) {
1919 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1920 gpr_log(GPR_INFO,
1921 "[xds_client %p] returning cached route config data for %s", this,
1922 route_config_name_str.c_str());
1923 }
1924 w->OnRouteConfigChanged(*route_config_state.update);
1925 }
1926 chand_->SubscribeLocked(XdsApi::kRdsTypeUrl, route_config_name_str);
1927 }
1928
CancelRouteConfigDataWatch(absl::string_view route_config_name,RouteConfigWatcherInterface * watcher,bool delay_unsubscription)1929 void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
1930 RouteConfigWatcherInterface* watcher,
1931 bool delay_unsubscription) {
1932 MutexLock lock(&mu_);
1933 if (shutting_down_) return;
1934 std::string route_config_name_str = std::string(route_config_name);
1935 RouteConfigState& route_config_state =
1936 route_config_map_[route_config_name_str];
1937 auto it = route_config_state.watchers.find(watcher);
1938 if (it != route_config_state.watchers.end()) {
1939 route_config_state.watchers.erase(it);
1940 if (route_config_state.watchers.empty()) {
1941 route_config_map_.erase(route_config_name_str);
1942 chand_->UnsubscribeLocked(XdsApi::kRdsTypeUrl, route_config_name_str,
1943 delay_unsubscription);
1944 }
1945 }
1946 }
1947
WatchClusterData(absl::string_view cluster_name,std::unique_ptr<ClusterWatcherInterface> watcher)1948 void XdsClient::WatchClusterData(
1949 absl::string_view cluster_name,
1950 std::unique_ptr<ClusterWatcherInterface> watcher) {
1951 std::string cluster_name_str = std::string(cluster_name);
1952 MutexLock lock(&mu_);
1953 ClusterState& cluster_state = cluster_map_[cluster_name_str];
1954 ClusterWatcherInterface* w = watcher.get();
1955 cluster_state.watchers[w] = std::move(watcher);
1956 // If we've already received a CDS update, notify the new watcher
1957 // immediately.
1958 if (cluster_state.update.has_value()) {
1959 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1960 gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
1961 this, cluster_name_str.c_str());
1962 }
1963 w->OnClusterChanged(cluster_state.update.value());
1964 }
1965 chand_->SubscribeLocked(XdsApi::kCdsTypeUrl, cluster_name_str);
1966 }
1967
CancelClusterDataWatch(absl::string_view cluster_name,ClusterWatcherInterface * watcher,bool delay_unsubscription)1968 void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
1969 ClusterWatcherInterface* watcher,
1970 bool delay_unsubscription) {
1971 MutexLock lock(&mu_);
1972 if (shutting_down_) return;
1973 std::string cluster_name_str = std::string(cluster_name);
1974 ClusterState& cluster_state = cluster_map_[cluster_name_str];
1975 auto it = cluster_state.watchers.find(watcher);
1976 if (it != cluster_state.watchers.end()) {
1977 cluster_state.watchers.erase(it);
1978 if (cluster_state.watchers.empty()) {
1979 cluster_map_.erase(cluster_name_str);
1980 chand_->UnsubscribeLocked(XdsApi::kCdsTypeUrl, cluster_name_str,
1981 delay_unsubscription);
1982 }
1983 }
1984 }
1985
WatchEndpointData(absl::string_view eds_service_name,std::unique_ptr<EndpointWatcherInterface> watcher)1986 void XdsClient::WatchEndpointData(
1987 absl::string_view eds_service_name,
1988 std::unique_ptr<EndpointWatcherInterface> watcher) {
1989 std::string eds_service_name_str = std::string(eds_service_name);
1990 MutexLock lock(&mu_);
1991 EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1992 EndpointWatcherInterface* w = watcher.get();
1993 endpoint_state.watchers[w] = std::move(watcher);
1994 // If we've already received an EDS update, notify the new watcher
1995 // immediately.
1996 if (endpoint_state.update.has_value()) {
1997 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1998 gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
1999 this, eds_service_name_str.c_str());
2000 }
2001 w->OnEndpointChanged(endpoint_state.update.value());
2002 }
2003 chand_->SubscribeLocked(XdsApi::kEdsTypeUrl, eds_service_name_str);
2004 }
2005
CancelEndpointDataWatch(absl::string_view eds_service_name,EndpointWatcherInterface * watcher,bool delay_unsubscription)2006 void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
2007 EndpointWatcherInterface* watcher,
2008 bool delay_unsubscription) {
2009 MutexLock lock(&mu_);
2010 if (shutting_down_) return;
2011 std::string eds_service_name_str = std::string(eds_service_name);
2012 EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
2013 auto it = endpoint_state.watchers.find(watcher);
2014 if (it != endpoint_state.watchers.end()) {
2015 endpoint_state.watchers.erase(it);
2016 if (endpoint_state.watchers.empty()) {
2017 endpoint_map_.erase(eds_service_name_str);
2018 chand_->UnsubscribeLocked(XdsApi::kEdsTypeUrl, eds_service_name_str,
2019 delay_unsubscription);
2020 }
2021 }
2022 }
2023
AddClusterDropStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name)2024 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
2025 absl::string_view lrs_server, absl::string_view cluster_name,
2026 absl::string_view eds_service_name) {
2027 // TODO(roth): When we add support for direct federation, use the
2028 // server name specified in lrs_server.
2029 auto key =
2030 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2031 MutexLock lock(&mu_);
2032 // We jump through some hoops here to make sure that the absl::string_views
2033 // stored in the XdsClusterDropStats object point to the strings
2034 // in the load_report_map_ key, so that they have the same lifetime.
2035 auto it = load_report_map_
2036 .emplace(std::make_pair(std::move(key), LoadReportState()))
2037 .first;
2038 LoadReportState& load_report_state = it->second;
2039 RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
2040 if (load_report_state.drop_stats != nullptr) {
2041 cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
2042 }
2043 if (cluster_drop_stats == nullptr) {
2044 if (load_report_state.drop_stats != nullptr) {
2045 load_report_state.deleted_drop_stats +=
2046 load_report_state.drop_stats->GetSnapshotAndReset();
2047 }
2048 cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
2049 Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
2050 it->first.first /*cluster_name*/,
2051 it->first.second /*eds_service_name*/);
2052 load_report_state.drop_stats = cluster_drop_stats.get();
2053 }
2054 chand_->MaybeStartLrsCall();
2055 return cluster_drop_stats;
2056 }
2057
RemoveClusterDropStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,XdsClusterDropStats * cluster_drop_stats)2058 void XdsClient::RemoveClusterDropStats(
2059 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2060 absl::string_view eds_service_name,
2061 XdsClusterDropStats* cluster_drop_stats) {
2062 MutexLock lock(&mu_);
2063 // TODO(roth): When we add support for direct federation, use the
2064 // server name specified in lrs_server.
2065 auto it = load_report_map_.find(
2066 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2067 if (it == load_report_map_.end()) return;
2068 LoadReportState& load_report_state = it->second;
2069 if (load_report_state.drop_stats == cluster_drop_stats) {
2070 // Record final snapshot in deleted_drop_stats, which will be
2071 // added to the next load report.
2072 load_report_state.deleted_drop_stats +=
2073 load_report_state.drop_stats->GetSnapshotAndReset();
2074 load_report_state.drop_stats = nullptr;
2075 }
2076 }
2077
AddClusterLocalityStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality)2078 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
2079 absl::string_view lrs_server, absl::string_view cluster_name,
2080 absl::string_view eds_service_name,
2081 RefCountedPtr<XdsLocalityName> locality) {
2082 // TODO(roth): When we add support for direct federation, use the
2083 // server name specified in lrs_server.
2084 auto key =
2085 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2086 MutexLock lock(&mu_);
2087 // We jump through some hoops here to make sure that the absl::string_views
2088 // stored in the XdsClusterLocalityStats object point to the strings
2089 // in the load_report_map_ key, so that they have the same lifetime.
2090 auto it = load_report_map_
2091 .emplace(std::make_pair(std::move(key), LoadReportState()))
2092 .first;
2093 LoadReportState& load_report_state = it->second;
2094 LoadReportState::LocalityState& locality_state =
2095 load_report_state.locality_stats[locality];
2096 RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
2097 if (locality_state.locality_stats != nullptr) {
2098 cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
2099 }
2100 if (cluster_locality_stats == nullptr) {
2101 if (locality_state.locality_stats != nullptr) {
2102 locality_state.deleted_locality_stats +=
2103 locality_state.locality_stats->GetSnapshotAndReset();
2104 }
2105 cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2106 Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
2107 it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
2108 std::move(locality));
2109 locality_state.locality_stats = cluster_locality_stats.get();
2110 }
2111 chand_->MaybeStartLrsCall();
2112 return cluster_locality_stats;
2113 }
2114
RemoveClusterLocalityStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,XdsClusterLocalityStats * cluster_locality_stats)2115 void XdsClient::RemoveClusterLocalityStats(
2116 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2117 absl::string_view eds_service_name,
2118 const RefCountedPtr<XdsLocalityName>& locality,
2119 XdsClusterLocalityStats* cluster_locality_stats) {
2120 MutexLock lock(&mu_);
2121 // TODO(roth): When we add support for direct federation, use the
2122 // server name specified in lrs_server.
2123 auto it = load_report_map_.find(
2124 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2125 if (it == load_report_map_.end()) return;
2126 LoadReportState& load_report_state = it->second;
2127 auto locality_it = load_report_state.locality_stats.find(locality);
2128 if (locality_it == load_report_state.locality_stats.end()) return;
2129 LoadReportState::LocalityState& locality_state = locality_it->second;
2130 if (locality_state.locality_stats == cluster_locality_stats) {
2131 // Record final snapshot in deleted_locality_stats, which will be
2132 // added to the next load report.
2133 locality_state.deleted_locality_stats +=
2134 locality_state.locality_stats->GetSnapshotAndReset();
2135 locality_state.locality_stats = nullptr;
2136 }
2137 }
2138
ResetBackoff()2139 void XdsClient::ResetBackoff() {
2140 MutexLock lock(&mu_);
2141 if (chand_ != nullptr) {
2142 grpc_channel_reset_connect_backoff(chand_->channel());
2143 }
2144 }
2145
NotifyOnErrorLocked(grpc_error_handle error)2146 void XdsClient::NotifyOnErrorLocked(grpc_error_handle error) {
2147 for (const auto& p : listener_map_) {
2148 const ListenerState& listener_state = p.second;
2149 for (const auto& p : listener_state.watchers) {
2150 p.first->OnError(GRPC_ERROR_REF(error));
2151 }
2152 }
2153 for (const auto& p : route_config_map_) {
2154 const RouteConfigState& route_config_state = p.second;
2155 for (const auto& p : route_config_state.watchers) {
2156 p.first->OnError(GRPC_ERROR_REF(error));
2157 }
2158 }
2159 for (const auto& p : cluster_map_) {
2160 const ClusterState& cluster_state = p.second;
2161 for (const auto& p : cluster_state.watchers) {
2162 p.first->OnError(GRPC_ERROR_REF(error));
2163 }
2164 }
2165 for (const auto& p : endpoint_map_) {
2166 const EndpointState& endpoint_state = p.second;
2167 for (const auto& p : endpoint_state.watchers) {
2168 p.first->OnError(GRPC_ERROR_REF(error));
2169 }
2170 }
2171 GRPC_ERROR_UNREF(error);
2172 }
2173
BuildLoadReportSnapshotLocked(bool send_all_clusters,const std::set<std::string> & clusters)2174 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
2175 bool send_all_clusters, const std::set<std::string>& clusters) {
2176 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2177 gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
2178 }
2179 XdsApi::ClusterLoadReportMap snapshot_map;
2180 for (auto load_report_it = load_report_map_.begin();
2181 load_report_it != load_report_map_.end();) {
2182 // Cluster key is cluster and EDS service name.
2183 const auto& cluster_key = load_report_it->first;
2184 LoadReportState& load_report = load_report_it->second;
2185 // If the CDS response for a cluster indicates to use LRS but the
2186 // LRS server does not say that it wants reports for this cluster,
2187 // then we'll have stats objects here whose data we're not going to
2188 // include in the load report. However, we still need to clear out
2189 // the data from the stats objects, so that if the LRS server starts
2190 // asking for the data in the future, we don't incorrectly include
2191 // data from previous reporting intervals in that future report.
2192 const bool record_stats =
2193 send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2194 XdsApi::ClusterLoadReport snapshot;
2195 // Aggregate drop stats.
2196 snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2197 if (load_report.drop_stats != nullptr) {
2198 snapshot.dropped_requests +=
2199 load_report.drop_stats->GetSnapshotAndReset();
2200 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2201 gpr_log(GPR_INFO,
2202 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
2203 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2204 load_report.drop_stats);
2205 }
2206 }
2207 // Aggregate locality stats.
2208 for (auto it = load_report.locality_stats.begin();
2209 it != load_report.locality_stats.end();) {
2210 const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2211 auto& locality_state = it->second;
2212 XdsClusterLocalityStats::Snapshot& locality_snapshot =
2213 snapshot.locality_stats[locality_name];
2214 locality_snapshot = std::move(locality_state.deleted_locality_stats);
2215 if (locality_state.locality_stats != nullptr) {
2216 locality_snapshot +=
2217 locality_state.locality_stats->GetSnapshotAndReset();
2218 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2219 gpr_log(GPR_INFO,
2220 "[xds_client %p] cluster=%s eds_service_name=%s "
2221 "locality=%s locality_stats=%p",
2222 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2223 locality_name->AsHumanReadableString().c_str(),
2224 locality_state.locality_stats);
2225 }
2226 }
2227 // If the only thing left in this entry was final snapshots from
2228 // deleted locality stats objects, remove the entry.
2229 if (locality_state.locality_stats == nullptr) {
2230 it = load_report.locality_stats.erase(it);
2231 } else {
2232 ++it;
2233 }
2234 }
2235 // Compute load report interval.
2236 const grpc_millis now = ExecCtx::Get()->Now();
2237 snapshot.load_report_interval = now - load_report.last_report_time;
2238 load_report.last_report_time = now;
2239 // Record snapshot.
2240 if (record_stats) {
2241 snapshot_map[cluster_key] = std::move(snapshot);
2242 }
2243 // If the only thing left in this entry was final snapshots from
2244 // deleted stats objects, remove the entry.
2245 if (load_report.locality_stats.empty() &&
2246 load_report.drop_stats == nullptr) {
2247 load_report_it = load_report_map_.erase(load_report_it);
2248 } else {
2249 ++load_report_it;
2250 }
2251 }
2252 return snapshot_map;
2253 }
2254
UpdateResourceMetadataWithFailedParseResultLocked(grpc_millis update_time,const XdsApi::AdsParseResult & result)2255 void XdsClient::UpdateResourceMetadataWithFailedParseResultLocked(
2256 grpc_millis update_time, const XdsApi::AdsParseResult& result) {
2257 // ADS update is rejected and the resource names in the failed update is
2258 // available.
2259 std::string details = grpc_error_std_string(result.parse_error);
2260 for (auto& name : result.resource_names_failed) {
2261 XdsApi::ResourceMetadata* resource_metadata = nullptr;
2262 if (result.type_url == XdsApi::kLdsTypeUrl) {
2263 auto it = listener_map_.find(name);
2264 if (it != listener_map_.end()) {
2265 resource_metadata = &it->second.meta;
2266 }
2267 } else if (result.type_url == XdsApi::kRdsTypeUrl) {
2268 auto it = route_config_map_.find(name);
2269 if (route_config_map_.find(name) != route_config_map_.end()) {
2270 resource_metadata = &it->second.meta;
2271 }
2272 } else if (result.type_url == XdsApi::kCdsTypeUrl) {
2273 auto it = cluster_map_.find(name);
2274 if (cluster_map_.find(name) != cluster_map_.end()) {
2275 resource_metadata = &it->second.meta;
2276 }
2277 } else if (result.type_url == XdsApi::kEdsTypeUrl) {
2278 auto it = endpoint_map_.find(name);
2279 if (endpoint_map_.find(name) != endpoint_map_.end()) {
2280 resource_metadata = &it->second.meta;
2281 }
2282 }
2283 if (resource_metadata == nullptr) {
2284 return;
2285 }
2286 resource_metadata->client_status = XdsApi::ResourceMetadata::NACKED;
2287 resource_metadata->failed_version = result.version;
2288 resource_metadata->failed_details = details;
2289 resource_metadata->failed_update_time = update_time;
2290 }
2291 }
2292
DumpClientConfigBinary()2293 std::string XdsClient::DumpClientConfigBinary() {
2294 MutexLock lock(&mu_);
2295 XdsApi::ResourceTypeMetadataMap resource_type_metadata_map;
2296 // Update per-xds-type version if available, this version corresponding to the
2297 // last successful ADS update version.
2298 for (auto& p : resource_version_map_) {
2299 resource_type_metadata_map[p.first].version = p.second;
2300 }
2301 // Collect resource metadata from listeners
2302 auto& lds_map =
2303 resource_type_metadata_map[XdsApi::kLdsTypeUrl].resource_metadata_map;
2304 for (auto& p : listener_map_) {
2305 lds_map[p.first] = &p.second.meta;
2306 }
2307 // Collect resource metadata from route configs
2308 auto& rds_map =
2309 resource_type_metadata_map[XdsApi::kRdsTypeUrl].resource_metadata_map;
2310 for (auto& p : route_config_map_) {
2311 rds_map[p.first] = &p.second.meta;
2312 }
2313 // Collect resource metadata from clusters
2314 auto& cds_map =
2315 resource_type_metadata_map[XdsApi::kCdsTypeUrl].resource_metadata_map;
2316 for (auto& p : cluster_map_) {
2317 cds_map[p.first] = &p.second.meta;
2318 }
2319 // Collect resource metadata from endpoints
2320 auto& eds_map =
2321 resource_type_metadata_map[XdsApi::kEdsTypeUrl].resource_metadata_map;
2322 for (auto& p : endpoint_map_) {
2323 eds_map[p.first] = &p.second.meta;
2324 }
2325 // Assemble config dump messages
2326 return api_.AssembleClientConfig(resource_type_metadata_map);
2327 }
2328
2329 //
2330 // accessors for global state
2331 //
2332
XdsClientGlobalInit()2333 void XdsClientGlobalInit() {
2334 g_mu = new Mutex;
2335 XdsHttpFilterRegistry::Init();
2336 }
2337
2338 // TODO(roth): Find a better way to clear the fallback config that does
2339 // not require using ABSL_NO_THREAD_SAFETY_ANALYSIS.
XdsClientGlobalShutdown()2340 void XdsClientGlobalShutdown() ABSL_NO_THREAD_SAFETY_ANALYSIS {
2341 gpr_free(g_fallback_bootstrap_config);
2342 g_fallback_bootstrap_config = nullptr;
2343 delete g_mu;
2344 g_mu = nullptr;
2345 XdsHttpFilterRegistry::Shutdown();
2346 }
2347
2348 namespace {
2349
GetBootstrapContents(const char * fallback_config,grpc_error_handle * error)2350 std::string GetBootstrapContents(const char* fallback_config,
2351 grpc_error_handle* error) {
2352 // First, try GRPC_XDS_BOOTSTRAP env var.
2353 grpc_core::UniquePtr<char> path(gpr_getenv("GRPC_XDS_BOOTSTRAP"));
2354 if (path != nullptr) {
2355 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2356 gpr_log(GPR_INFO,
2357 "Got bootstrap file location from GRPC_XDS_BOOTSTRAP "
2358 "environment variable: %s",
2359 path.get());
2360 }
2361 grpc_slice contents;
2362 *error =
2363 grpc_load_file(path.get(), /*add_null_terminator=*/true, &contents);
2364 if (*error != GRPC_ERROR_NONE) return "";
2365 std::string contents_str(StringViewFromSlice(contents));
2366 grpc_slice_unref_internal(contents);
2367 return contents_str;
2368 }
2369 // Next, try GRPC_XDS_BOOTSTRAP_CONFIG env var.
2370 grpc_core::UniquePtr<char> env_config(
2371 gpr_getenv("GRPC_XDS_BOOTSTRAP_CONFIG"));
2372 if (env_config != nullptr) {
2373 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2374 gpr_log(GPR_INFO,
2375 "Got bootstrap contents from GRPC_XDS_BOOTSTRAP_CONFIG "
2376 "environment variable");
2377 }
2378 return env_config.get();
2379 }
2380 // Finally, try fallback config.
2381 if (fallback_config != nullptr) {
2382 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2383 gpr_log(GPR_INFO, "Got bootstrap contents from fallback config");
2384 }
2385 return fallback_config;
2386 }
2387 // No bootstrap config found.
2388 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2389 "Environment variables GRPC_XDS_BOOTSTRAP or GRPC_XDS_BOOTSTRAP_CONFIG "
2390 "not defined");
2391 return "";
2392 }
2393
2394 } // namespace
2395
GetOrCreate(const grpc_channel_args * args,grpc_error_handle * error)2396 RefCountedPtr<XdsClient> XdsClient::GetOrCreate(const grpc_channel_args* args,
2397 grpc_error_handle* error) {
2398 RefCountedPtr<XdsClient> xds_client;
2399 // If getting bootstrap from channel args, create a local XdsClient
2400 // instance for the channel or server instead of using the global instance.
2401 const char* bootstrap_config = grpc_channel_args_find_string(
2402 args, GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_BOOTSTRAP_CONFIG);
2403 if (bootstrap_config != nullptr) {
2404 std::unique_ptr<XdsBootstrap> bootstrap =
2405 XdsBootstrap::Create(bootstrap_config, error);
2406 if (*error == GRPC_ERROR_NONE) {
2407 grpc_channel_args* xds_channel_args =
2408 grpc_channel_args_find_pointer<grpc_channel_args>(
2409 args,
2410 GRPC_ARG_TEST_ONLY_DO_NOT_USE_IN_PROD_XDS_CLIENT_CHANNEL_ARGS);
2411 return MakeRefCounted<XdsClient>(std::move(bootstrap), xds_channel_args);
2412 }
2413 return nullptr;
2414 }
2415 // Otherwise, use the global instance.
2416 {
2417 MutexLock lock(g_mu);
2418 if (g_xds_client != nullptr) {
2419 auto xds_client = g_xds_client->RefIfNonZero();
2420 if (xds_client != nullptr) return xds_client;
2421 }
2422 // Find bootstrap contents.
2423 std::string bootstrap_contents =
2424 GetBootstrapContents(g_fallback_bootstrap_config, error);
2425 if (*error != GRPC_ERROR_NONE) return nullptr;
2426 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2427 gpr_log(GPR_INFO, "xDS bootstrap contents: %s",
2428 bootstrap_contents.c_str());
2429 }
2430 // Parse bootstrap.
2431 std::unique_ptr<XdsBootstrap> bootstrap =
2432 XdsBootstrap::Create(bootstrap_contents, error);
2433 if (*error != GRPC_ERROR_NONE) return nullptr;
2434 // Instantiate XdsClient.
2435 xds_client =
2436 MakeRefCounted<XdsClient>(std::move(bootstrap), g_channel_args);
2437 g_xds_client = xds_client.get();
2438 }
2439 return xds_client;
2440 }
2441
2442 namespace internal {
2443
SetXdsChannelArgsForTest(grpc_channel_args * args)2444 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
2445 MutexLock lock(g_mu);
2446 g_channel_args = args;
2447 }
2448
UnsetGlobalXdsClientForTest()2449 void UnsetGlobalXdsClientForTest() {
2450 MutexLock lock(g_mu);
2451 g_xds_client = nullptr;
2452 }
2453
SetXdsFallbackBootstrapConfig(const char * config)2454 void SetXdsFallbackBootstrapConfig(const char* config) {
2455 MutexLock lock(g_mu);
2456 gpr_free(g_fallback_bootstrap_config);
2457 g_fallback_bootstrap_config = gpr_strdup(config);
2458 }
2459
2460 } // namespace internal
2461
2462 //
2463 // embedding XdsClient in channel args
2464 //
2465
2466 #define GRPC_ARG_XDS_CLIENT "grpc.internal.xds_client"
2467
2468 namespace {
2469
XdsClientArgCopy(void * p)2470 void* XdsClientArgCopy(void* p) {
2471 XdsClient* xds_client = static_cast<XdsClient*>(p);
2472 xds_client->Ref(DEBUG_LOCATION, "channel arg").release();
2473 return p;
2474 }
2475
XdsClientArgDestroy(void * p)2476 void XdsClientArgDestroy(void* p) {
2477 XdsClient* xds_client = static_cast<XdsClient*>(p);
2478 xds_client->Unref(DEBUG_LOCATION, "channel arg");
2479 }
2480
XdsClientArgCmp(void * p,void * q)2481 int XdsClientArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
2482
2483 const grpc_arg_pointer_vtable kXdsClientArgVtable = {
2484 XdsClientArgCopy, XdsClientArgDestroy, XdsClientArgCmp};
2485
2486 } // namespace
2487
MakeChannelArg() const2488 grpc_arg XdsClient::MakeChannelArg() const {
2489 return grpc_channel_arg_pointer_create(const_cast<char*>(GRPC_ARG_XDS_CLIENT),
2490 const_cast<XdsClient*>(this),
2491 &kXdsClientArgVtable);
2492 }
2493
GetFromChannelArgs(const grpc_channel_args & args)2494 RefCountedPtr<XdsClient> XdsClient::GetFromChannelArgs(
2495 const grpc_channel_args& args) {
2496 XdsClient* xds_client =
2497 grpc_channel_args_find_pointer<XdsClient>(&args, GRPC_ARG_XDS_CLIENT);
2498 if (xds_client == nullptr) return nullptr;
2499 return xds_client->Ref(DEBUG_LOCATION, "GetFromChannelArgs");
2500 }
2501
2502 } // namespace grpc_core
2503
2504 // The returned bytes may contain NULL(0), so we can't use c-string.
grpc_dump_xds_configs()2505 grpc_slice grpc_dump_xds_configs() {
2506 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2507 grpc_core::ExecCtx exec_ctx;
2508 grpc_error_handle error = GRPC_ERROR_NONE;
2509 auto xds_client = grpc_core::XdsClient::GetOrCreate(nullptr, &error);
2510 if (error != GRPC_ERROR_NONE) {
2511 // If we isn't using xDS, just return an empty string.
2512 GRPC_ERROR_UNREF(error);
2513 return grpc_empty_slice();
2514 }
2515 return grpc_slice_from_cpp_string(xds_client->DumpClientConfigBinary());
2516 }
2517