1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <string.h>
24
25 #include "absl/container/inlined_vector.h"
26 #include "absl/strings/str_format.h"
27 #include "absl/strings/str_join.h"
28 #include "absl/strings/string_view.h"
29
30 #include <grpc/byte_buffer_reader.h>
31 #include <grpc/grpc.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/time.h>
34
35 #include "src/core/ext/filters/client_channel/client_channel.h"
36 #include "src/core/ext/filters/client_channel/service_config.h"
37 #include "src/core/ext/xds/xds_api.h"
38 #include "src/core/ext/xds/xds_channel_args.h"
39 #include "src/core/ext/xds/xds_client.h"
40 #include "src/core/ext/xds/xds_client_stats.h"
41 #include "src/core/lib/backoff/backoff.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/channel/channel_stack.h"
44 #include "src/core/lib/gpr/string.h"
45 #include "src/core/lib/gprpp/memory.h"
46 #include "src/core/lib/gprpp/orphanable.h"
47 #include "src/core/lib/gprpp/ref_counted_ptr.h"
48 #include "src/core/lib/gprpp/sync.h"
49 #include "src/core/lib/iomgr/sockaddr.h"
50 #include "src/core/lib/iomgr/sockaddr_utils.h"
51 #include "src/core/lib/iomgr/timer.h"
52 #include "src/core/lib/slice/slice_internal.h"
53 #include "src/core/lib/slice/slice_string_helpers.h"
54 #include "src/core/lib/surface/call.h"
55 #include "src/core/lib/surface/channel.h"
56 #include "src/core/lib/surface/channel_init.h"
57 #include "src/core/lib/transport/static_metadata.h"
58
59 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
60 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
61 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
62 #define GRPC_XDS_RECONNECT_JITTER 0.2
63 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
64
65 namespace grpc_core {
66
67 TraceFlag grpc_xds_client_trace(false, "xds_client");
68 TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
69
70 namespace {
71
72 Mutex* g_mu = nullptr;
73 const grpc_channel_args* g_channel_args = nullptr;
74 XdsClient* g_xds_client = nullptr;
75 char* g_fallback_bootstrap_config = nullptr;
76
77 } // namespace
78
79 //
80 // Internal class declarations
81 //
82
83 // An xds call wrapper that can restart a call upon failure. Holds a ref to
84 // the xds channel. The template parameter is the kind of wrapped xds call.
85 template <typename T>
86 class XdsClient::ChannelState::RetryableCall
87 : public InternallyRefCounted<RetryableCall<T>> {
88 public:
89 explicit RetryableCall(RefCountedPtr<ChannelState> chand);
90
91 void Orphan() override;
92
93 void OnCallFinishedLocked();
94
calld() const95 T* calld() const { return calld_.get(); }
chand() const96 ChannelState* chand() const { return chand_.get(); }
97
98 bool IsCurrentCallOnChannel() const;
99
100 private:
101 void StartNewCallLocked();
102 void StartRetryTimerLocked();
103 static void OnRetryTimer(void* arg, grpc_error* error);
104 void OnRetryTimerLocked(grpc_error* error);
105
106 // The wrapped xds call that talks to the xds server. It's instantiated
107 // every time we start a new call. It's null during call retry backoff.
108 OrphanablePtr<T> calld_;
109 // The owning xds channel.
110 RefCountedPtr<ChannelState> chand_;
111
112 // Retry state.
113 BackOff backoff_;
114 grpc_timer retry_timer_;
115 grpc_closure on_retry_timer_;
116 bool retry_timer_callback_pending_ = false;
117
118 bool shutting_down_ = false;
119 };
120
121 // Contains an ADS call to the xds server.
122 class XdsClient::ChannelState::AdsCallState
123 : public InternallyRefCounted<AdsCallState> {
124 public:
125 // The ctor and dtor should not be used directly.
126 explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
127 ~AdsCallState() override;
128
129 void Orphan() override;
130
parent() const131 RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
chand() const132 ChannelState* chand() const { return parent_->chand(); }
xds_client() const133 XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const134 bool seen_response() const { return seen_response_; }
135
136 void Subscribe(const std::string& type_url, const std::string& name);
137 void Unsubscribe(const std::string& type_url, const std::string& name,
138 bool delay_unsubscription);
139
140 bool HasSubscribedResources() const;
141
142 private:
143 class ResourceState : public InternallyRefCounted<ResourceState> {
144 public:
ResourceState(const std::string & type_url,const std::string & name,bool sent_initial_request)145 ResourceState(const std::string& type_url, const std::string& name,
146 bool sent_initial_request)
147 : type_url_(type_url),
148 name_(name),
149 sent_initial_request_(sent_initial_request) {
150 GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
151 grpc_schedule_on_exec_ctx);
152 }
153
Orphan()154 void Orphan() override {
155 Finish();
156 Unref(DEBUG_LOCATION, "Orphan");
157 }
158
Start(RefCountedPtr<AdsCallState> ads_calld)159 void Start(RefCountedPtr<AdsCallState> ads_calld) {
160 if (sent_initial_request_) return;
161 sent_initial_request_ = true;
162 ads_calld_ = std::move(ads_calld);
163 Ref(DEBUG_LOCATION, "timer").release();
164 timer_pending_ = true;
165 grpc_timer_init(
166 &timer_,
167 ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
168 &timer_callback_);
169 }
170
Finish()171 void Finish() {
172 if (timer_pending_) {
173 grpc_timer_cancel(&timer_);
174 timer_pending_ = false;
175 }
176 }
177
178 private:
OnTimer(void * arg,grpc_error * error)179 static void OnTimer(void* arg, grpc_error* error) {
180 ResourceState* self = static_cast<ResourceState*>(arg);
181 {
182 MutexLock lock(&self->ads_calld_->xds_client()->mu_);
183 self->OnTimerLocked(GRPC_ERROR_REF(error));
184 }
185 self->ads_calld_.reset();
186 self->Unref(DEBUG_LOCATION, "timer");
187 }
188
OnTimerLocked(grpc_error * error)189 void OnTimerLocked(grpc_error* error) {
190 if (error == GRPC_ERROR_NONE && timer_pending_) {
191 timer_pending_ = false;
192 grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
193 absl::StrFormat(
194 "timeout obtaining resource {type=%s name=%s} from xds server",
195 type_url_, name_)
196 .c_str());
197 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
198 gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
199 grpc_error_string(watcher_error));
200 }
201 if (type_url_ == XdsApi::kLdsTypeUrl) {
202 ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
203 for (const auto& p : state.watchers) {
204 p.first->OnError(GRPC_ERROR_REF(watcher_error));
205 }
206 } else if (type_url_ == XdsApi::kRdsTypeUrl) {
207 RouteConfigState& state =
208 ads_calld_->xds_client()->route_config_map_[name_];
209 for (const auto& p : state.watchers) {
210 p.first->OnError(GRPC_ERROR_REF(watcher_error));
211 }
212 } else if (type_url_ == XdsApi::kCdsTypeUrl) {
213 ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
214 for (const auto& p : state.watchers) {
215 p.first->OnError(GRPC_ERROR_REF(watcher_error));
216 }
217 } else if (type_url_ == XdsApi::kEdsTypeUrl) {
218 EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
219 for (const auto& p : state.watchers) {
220 p.first->OnError(GRPC_ERROR_REF(watcher_error));
221 }
222 } else {
223 GPR_UNREACHABLE_CODE(return );
224 }
225 GRPC_ERROR_UNREF(watcher_error);
226 }
227 GRPC_ERROR_UNREF(error);
228 }
229
230 const std::string type_url_;
231 const std::string name_;
232
233 RefCountedPtr<AdsCallState> ads_calld_;
234 bool sent_initial_request_;
235 bool timer_pending_ = false;
236 grpc_timer timer_;
237 grpc_closure timer_callback_;
238 };
239
240 struct ResourceTypeState {
~ResourceTypeStategrpc_core::XdsClient::ChannelState::AdsCallState::ResourceTypeState241 ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
242
243 // Nonce and error for this resource type.
244 std::string nonce;
245 grpc_error* error = GRPC_ERROR_NONE;
246
247 // Subscribed resources of this type.
248 std::map<std::string /* name */, OrphanablePtr<ResourceState>>
249 subscribed_resources;
250 };
251
252 void SendMessageLocked(const std::string& type_url);
253
254 void AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map);
255 void AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map);
256 void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
257 void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
258
259 static void OnRequestSent(void* arg, grpc_error* error);
260 void OnRequestSentLocked(grpc_error* error);
261 static void OnResponseReceived(void* arg, grpc_error* error);
262 bool OnResponseReceivedLocked();
263 static void OnStatusReceived(void* arg, grpc_error* error);
264 void OnStatusReceivedLocked(grpc_error* error);
265
266 bool IsCurrentCallOnChannel() const;
267
268 std::set<absl::string_view> ResourceNamesForRequest(
269 const std::string& type_url);
270
271 // The owning RetryableCall<>.
272 RefCountedPtr<RetryableCall<AdsCallState>> parent_;
273
274 bool sent_initial_message_ = false;
275 bool seen_response_ = false;
276
277 // Always non-NULL.
278 grpc_call* call_;
279
280 // recv_initial_metadata
281 grpc_metadata_array initial_metadata_recv_;
282
283 // send_message
284 grpc_byte_buffer* send_message_payload_ = nullptr;
285 grpc_closure on_request_sent_;
286
287 // recv_message
288 grpc_byte_buffer* recv_message_payload_ = nullptr;
289 grpc_closure on_response_received_;
290
291 // recv_trailing_metadata
292 grpc_metadata_array trailing_metadata_recv_;
293 grpc_status_code status_code_;
294 grpc_slice status_details_;
295 grpc_closure on_status_received_;
296
297 // Resource types for which requests need to be sent.
298 std::set<std::string /*type_url*/> buffered_requests_;
299
300 // State for each resource type.
301 std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
302 };
303
304 // Contains an LRS call to the xds server.
305 class XdsClient::ChannelState::LrsCallState
306 : public InternallyRefCounted<LrsCallState> {
307 public:
308 // The ctor and dtor should not be used directly.
309 explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
310 ~LrsCallState() override;
311
312 void Orphan() override;
313
314 void MaybeStartReportingLocked();
315
parent()316 RetryableCall<LrsCallState>* parent() { return parent_.get(); }
chand() const317 ChannelState* chand() const { return parent_->chand(); }
xds_client() const318 XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const319 bool seen_response() const { return seen_response_; }
320
321 private:
322 // Reports client-side load stats according to a fixed interval.
323 class Reporter : public InternallyRefCounted<Reporter> {
324 public:
Reporter(RefCountedPtr<LrsCallState> parent,grpc_millis report_interval)325 Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
326 : parent_(std::move(parent)), report_interval_(report_interval) {
327 GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
328 grpc_schedule_on_exec_ctx);
329 GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
330 grpc_schedule_on_exec_ctx);
331 ScheduleNextReportLocked();
332 }
333
334 void Orphan() override;
335
336 private:
337 void ScheduleNextReportLocked();
338 static void OnNextReportTimer(void* arg, grpc_error* error);
339 bool OnNextReportTimerLocked(grpc_error* error);
340 bool SendReportLocked();
341 static void OnReportDone(void* arg, grpc_error* error);
342 bool OnReportDoneLocked(grpc_error* error);
343
IsCurrentReporterOnCall() const344 bool IsCurrentReporterOnCall() const {
345 return this == parent_->reporter_.get();
346 }
xds_client() const347 XdsClient* xds_client() const { return parent_->xds_client(); }
348
349 // The owning LRS call.
350 RefCountedPtr<LrsCallState> parent_;
351
352 // The load reporting state.
353 const grpc_millis report_interval_;
354 bool last_report_counters_were_zero_ = false;
355 bool next_report_timer_callback_pending_ = false;
356 grpc_timer next_report_timer_;
357 grpc_closure on_next_report_timer_;
358 grpc_closure on_report_done_;
359 };
360
361 static void OnInitialRequestSent(void* arg, grpc_error* error);
362 void OnInitialRequestSentLocked();
363 static void OnResponseReceived(void* arg, grpc_error* error);
364 bool OnResponseReceivedLocked();
365 static void OnStatusReceived(void* arg, grpc_error* error);
366 void OnStatusReceivedLocked(grpc_error* error);
367
368 bool IsCurrentCallOnChannel() const;
369
370 // The owning RetryableCall<>.
371 RefCountedPtr<RetryableCall<LrsCallState>> parent_;
372 bool seen_response_ = false;
373
374 // Always non-NULL.
375 grpc_call* call_;
376
377 // recv_initial_metadata
378 grpc_metadata_array initial_metadata_recv_;
379
380 // send_message
381 grpc_byte_buffer* send_message_payload_ = nullptr;
382 grpc_closure on_initial_request_sent_;
383
384 // recv_message
385 grpc_byte_buffer* recv_message_payload_ = nullptr;
386 grpc_closure on_response_received_;
387
388 // recv_trailing_metadata
389 grpc_metadata_array trailing_metadata_recv_;
390 grpc_status_code status_code_;
391 grpc_slice status_details_;
392 grpc_closure on_status_received_;
393
394 // Load reporting state.
395 bool send_all_clusters_ = false;
396 std::set<std::string> cluster_names_; // Asked for by the LRS server.
397 grpc_millis load_reporting_interval_ = 0;
398 OrphanablePtr<Reporter> reporter_;
399 };
400
401 //
402 // XdsClient::ChannelState::StateWatcher
403 //
404
405 class XdsClient::ChannelState::StateWatcher
406 : public AsyncConnectivityStateWatcherInterface {
407 public:
StateWatcher(RefCountedPtr<ChannelState> parent)408 explicit StateWatcher(RefCountedPtr<ChannelState> parent)
409 : parent_(std::move(parent)) {}
410
411 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)412 void OnConnectivityStateChange(grpc_connectivity_state new_state,
413 const absl::Status& status) override {
414 MutexLock lock(&parent_->xds_client_->mu_);
415 if (!parent_->shutting_down_ &&
416 new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
417 // In TRANSIENT_FAILURE. Notify all watchers of error.
418 gpr_log(GPR_INFO,
419 "[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
420 "status_message:(%s)",
421 parent_->xds_client(), status.ToString().c_str());
422 parent_->xds_client()->NotifyOnErrorLocked(
423 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
424 "xds channel in TRANSIENT_FAILURE"));
425 }
426 }
427
428 RefCountedPtr<ChannelState> parent_;
429 };
430
431 //
432 // XdsClient::ChannelState
433 //
434
435 namespace {
436
CreateXdsChannel(const XdsBootstrap::XdsServer & server)437 grpc_channel* CreateXdsChannel(const XdsBootstrap::XdsServer& server) {
438 // Build channel args.
439 absl::InlinedVector<grpc_arg, 2> args_to_add = {
440 grpc_channel_arg_integer_create(
441 const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
442 5 * 60 * GPR_MS_PER_SEC),
443 grpc_channel_arg_integer_create(
444 const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
445 };
446 grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
447 g_channel_args, args_to_add.data(), args_to_add.size());
448 // Create channel creds.
449 RefCountedPtr<grpc_channel_credentials> channel_creds =
450 XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type,
451 server.channel_creds_config);
452 // Create channel.
453 grpc_channel* channel = grpc_secure_channel_create(
454 channel_creds.get(), server.server_uri.c_str(), new_args, nullptr);
455 grpc_channel_args_destroy(new_args);
456 return channel;
457 }
458
459 } // namespace
460
ChannelState(WeakRefCountedPtr<XdsClient> xds_client,const XdsBootstrap::XdsServer & server)461 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
462 const XdsBootstrap::XdsServer& server)
463 : InternallyRefCounted<ChannelState>(
464 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
465 ? "ChannelState"
466 : nullptr),
467 xds_client_(std::move(xds_client)),
468 server_(server) {
469 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
470 gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
471 xds_client_.get(), server.server_uri.c_str());
472 }
473 channel_ = CreateXdsChannel(server);
474 GPR_ASSERT(channel_ != nullptr);
475 StartConnectivityWatchLocked();
476 }
477
~ChannelState()478 XdsClient::ChannelState::~ChannelState() {
479 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
480 gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
481 this);
482 }
483 grpc_channel_destroy(channel_);
484 xds_client_.reset(DEBUG_LOCATION, "ChannelState");
485 }
486
Orphan()487 void XdsClient::ChannelState::Orphan() {
488 shutting_down_ = true;
489 CancelConnectivityWatchLocked();
490 ads_calld_.reset();
491 lrs_calld_.reset();
492 Unref(DEBUG_LOCATION, "ChannelState+orphaned");
493 }
494
ads_calld() const495 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
496 const {
497 return ads_calld_->calld();
498 }
499
lrs_calld() const500 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
501 const {
502 return lrs_calld_->calld();
503 }
504
HasActiveAdsCall() const505 bool XdsClient::ChannelState::HasActiveAdsCall() const {
506 return ads_calld_->calld() != nullptr;
507 }
508
MaybeStartLrsCall()509 void XdsClient::ChannelState::MaybeStartLrsCall() {
510 if (lrs_calld_ != nullptr) return;
511 lrs_calld_.reset(
512 new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
513 }
514
StopLrsCall()515 void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
516
StartConnectivityWatchLocked()517 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
518 grpc_channel_element* client_channel_elem =
519 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
520 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
521 watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
522 grpc_client_channel_start_connectivity_watch(
523 client_channel_elem, GRPC_CHANNEL_IDLE,
524 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
525 }
526
CancelConnectivityWatchLocked()527 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
528 grpc_channel_element* client_channel_elem =
529 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
530 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
531 grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
532 }
533
Subscribe(const std::string & type_url,const std::string & name)534 void XdsClient::ChannelState::Subscribe(const std::string& type_url,
535 const std::string& name) {
536 if (ads_calld_ == nullptr) {
537 // Start the ADS call if this is the first request.
538 ads_calld_.reset(new RetryableCall<AdsCallState>(
539 Ref(DEBUG_LOCATION, "ChannelState+ads")));
540 // Note: AdsCallState's ctor will automatically subscribe to all
541 // resources that the XdsClient already has watchers for, so we can
542 // return here.
543 return;
544 }
545 // If the ADS call is in backoff state, we don't need to do anything now
546 // because when the call is restarted it will resend all necessary requests.
547 if (ads_calld() == nullptr) return;
548 // Subscribe to this resource if the ADS call is active.
549 ads_calld()->Subscribe(type_url, name);
550 }
551
Unsubscribe(const std::string & type_url,const std::string & name,bool delay_unsubscription)552 void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
553 const std::string& name,
554 bool delay_unsubscription) {
555 if (ads_calld_ != nullptr) {
556 auto* calld = ads_calld_->calld();
557 if (calld != nullptr) {
558 calld->Unsubscribe(type_url, name, delay_unsubscription);
559 if (!calld->HasSubscribedResources()) ads_calld_.reset();
560 }
561 }
562 }
563
564 //
565 // XdsClient::ChannelState::RetryableCall<>
566 //
567
568 template <typename T>
RetryableCall(RefCountedPtr<ChannelState> chand)569 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
570 RefCountedPtr<ChannelState> chand)
571 : chand_(std::move(chand)),
572 backoff_(
573 BackOff::Options()
574 .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
575 1000)
576 .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
577 .set_jitter(GRPC_XDS_RECONNECT_JITTER)
578 .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
579 // Closure Initialization
580 GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
581 grpc_schedule_on_exec_ctx);
582 StartNewCallLocked();
583 }
584
585 template <typename T>
Orphan()586 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
587 shutting_down_ = true;
588 calld_.reset();
589 if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
590 this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
591 }
592
593 template <typename T>
OnCallFinishedLocked()594 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
595 const bool seen_response = calld_->seen_response();
596 calld_.reset();
597 if (seen_response) {
598 // If we lost connection to the xds server, reset backoff and restart the
599 // call immediately.
600 backoff_.Reset();
601 StartNewCallLocked();
602 } else {
603 // If we failed to connect to the xds server, retry later.
604 StartRetryTimerLocked();
605 }
606 }
607
608 template <typename T>
StartNewCallLocked()609 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
610 if (shutting_down_) return;
611 GPR_ASSERT(chand_->channel_ != nullptr);
612 GPR_ASSERT(calld_ == nullptr);
613 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
614 gpr_log(GPR_INFO,
615 "[xds_client %p] Start new call from retryable call (chand: %p, "
616 "retryable call: %p)",
617 chand()->xds_client(), chand(), this);
618 }
619 calld_ = MakeOrphanable<T>(
620 this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
621 }
622
623 template <typename T>
StartRetryTimerLocked()624 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
625 if (shutting_down_) return;
626 const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
627 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
628 grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
629 gpr_log(GPR_INFO,
630 "[xds_client %p] Failed to connect to xds server (chand: %p) "
631 "retry timer will fire in %" PRId64 "ms.",
632 chand()->xds_client(), chand(), timeout);
633 }
634 this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
635 grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
636 retry_timer_callback_pending_ = true;
637 }
638
639 template <typename T>
OnRetryTimer(void * arg,grpc_error * error)640 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
641 void* arg, grpc_error* error) {
642 RetryableCall* calld = static_cast<RetryableCall*>(arg);
643 {
644 MutexLock lock(&calld->chand_->xds_client()->mu_);
645 calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
646 }
647 calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
648 }
649
650 template <typename T>
OnRetryTimerLocked(grpc_error * error)651 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
652 grpc_error* error) {
653 retry_timer_callback_pending_ = false;
654 if (!shutting_down_ && error == GRPC_ERROR_NONE) {
655 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
656 gpr_log(
657 GPR_INFO,
658 "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
659 chand()->xds_client(), chand(), this);
660 }
661 StartNewCallLocked();
662 }
663 GRPC_ERROR_UNREF(error);
664 }
665
666 //
667 // XdsClient::ChannelState::AdsCallState
668 //
669
AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent)670 XdsClient::ChannelState::AdsCallState::AdsCallState(
671 RefCountedPtr<RetryableCall<AdsCallState>> parent)
672 : InternallyRefCounted<AdsCallState>(
673 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
674 ? "AdsCallState"
675 : nullptr),
676 parent_(std::move(parent)) {
677 // Init the ADS call. Note that the call will progress every time there's
678 // activity in xds_client()->interested_parties_, which is comprised of
679 // the polling entities from client_channel.
680 GPR_ASSERT(xds_client() != nullptr);
681 // Create a call with the specified method name.
682 const auto& method =
683 chand()->server_.ShouldUseV3()
684 ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
685 : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
686 call_ = grpc_channel_create_pollset_set_call(
687 chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
688 xds_client()->interested_parties_, method, nullptr,
689 GRPC_MILLIS_INF_FUTURE, nullptr);
690 GPR_ASSERT(call_ != nullptr);
691 // Init data associated with the call.
692 grpc_metadata_array_init(&initial_metadata_recv_);
693 grpc_metadata_array_init(&trailing_metadata_recv_);
694 // Start the call.
695 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
696 gpr_log(GPR_INFO,
697 "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
698 "call: %p)",
699 xds_client(), chand(), this, call_);
700 }
701 // Create the ops.
702 grpc_call_error call_error;
703 grpc_op ops[3];
704 memset(ops, 0, sizeof(ops));
705 // Op: send initial metadata.
706 grpc_op* op = ops;
707 op->op = GRPC_OP_SEND_INITIAL_METADATA;
708 op->data.send_initial_metadata.count = 0;
709 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
710 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
711 op->reserved = nullptr;
712 op++;
713 call_error = grpc_call_start_batch_and_execute(
714 call_, ops, static_cast<size_t>(op - ops), nullptr);
715 GPR_ASSERT(GRPC_CALL_OK == call_error);
716 // Op: send request message.
717 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
718 grpc_schedule_on_exec_ctx);
719 for (const auto& p : xds_client()->listener_map_) {
720 Subscribe(XdsApi::kLdsTypeUrl, std::string(p.first));
721 }
722 for (const auto& p : xds_client()->route_config_map_) {
723 Subscribe(XdsApi::kRdsTypeUrl, std::string(p.first));
724 }
725 for (const auto& p : xds_client()->cluster_map_) {
726 Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
727 }
728 for (const auto& p : xds_client()->endpoint_map_) {
729 Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first));
730 }
731 // Op: recv initial metadata.
732 op = ops;
733 op->op = GRPC_OP_RECV_INITIAL_METADATA;
734 op->data.recv_initial_metadata.recv_initial_metadata =
735 &initial_metadata_recv_;
736 op->flags = 0;
737 op->reserved = nullptr;
738 op++;
739 // Op: recv response.
740 op->op = GRPC_OP_RECV_MESSAGE;
741 op->data.recv_message.recv_message = &recv_message_payload_;
742 op->flags = 0;
743 op->reserved = nullptr;
744 op++;
745 Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
746 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
747 grpc_schedule_on_exec_ctx);
748 call_error = grpc_call_start_batch_and_execute(
749 call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
750 GPR_ASSERT(GRPC_CALL_OK == call_error);
751 // Op: recv server status.
752 op = ops;
753 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
754 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
755 op->data.recv_status_on_client.status = &status_code_;
756 op->data.recv_status_on_client.status_details = &status_details_;
757 op->flags = 0;
758 op->reserved = nullptr;
759 op++;
760 // This callback signals the end of the call, so it relies on the initial
761 // ref instead of a new ref. When it's invoked, it's the initial ref that is
762 // unreffed.
763 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
764 grpc_schedule_on_exec_ctx);
765 call_error = grpc_call_start_batch_and_execute(
766 call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
767 GPR_ASSERT(GRPC_CALL_OK == call_error);
768 }
769
~AdsCallState()770 XdsClient::ChannelState::AdsCallState::~AdsCallState() {
771 grpc_metadata_array_destroy(&initial_metadata_recv_);
772 grpc_metadata_array_destroy(&trailing_metadata_recv_);
773 grpc_byte_buffer_destroy(send_message_payload_);
774 grpc_byte_buffer_destroy(recv_message_payload_);
775 grpc_slice_unref_internal(status_details_);
776 GPR_ASSERT(call_ != nullptr);
777 grpc_call_unref(call_);
778 }
779
Orphan()780 void XdsClient::ChannelState::AdsCallState::Orphan() {
781 GPR_ASSERT(call_ != nullptr);
782 // If we are here because xds_client wants to cancel the call,
783 // on_status_received_ will complete the cancellation and clean up. Otherwise,
784 // we are here because xds_client has to orphan a failed call, then the
785 // following cancellation will be a no-op.
786 grpc_call_cancel_internal(call_);
787 state_map_.clear();
788 // Note that the initial ref is hold by on_status_received_. So the
789 // corresponding unref happens in on_status_received_ instead of here.
790 }
791
SendMessageLocked(const std::string & type_url)792 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
793 const std::string& type_url) {
794 // Buffer message sending if an existing message is in flight.
795 if (send_message_payload_ != nullptr) {
796 buffered_requests_.insert(type_url);
797 return;
798 }
799 auto& state = state_map_[type_url];
800 grpc_slice request_payload_slice;
801 std::set<absl::string_view> resource_names =
802 ResourceNamesForRequest(type_url);
803 request_payload_slice = xds_client()->api_.CreateAdsRequest(
804 chand()->server_, type_url, resource_names,
805 xds_client()->resource_version_map_[type_url], state.nonce,
806 GRPC_ERROR_REF(state.error), !sent_initial_message_);
807 if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
808 type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
809 state_map_.erase(type_url);
810 }
811 sent_initial_message_ = true;
812 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
813 gpr_log(GPR_INFO,
814 "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
815 "error=%s resources=%s",
816 xds_client(), type_url.c_str(),
817 xds_client()->resource_version_map_[type_url].c_str(),
818 state.nonce.c_str(), grpc_error_string(state.error),
819 absl::StrJoin(resource_names, " ").c_str());
820 }
821 GRPC_ERROR_UNREF(state.error);
822 state.error = GRPC_ERROR_NONE;
823 // Create message payload.
824 send_message_payload_ =
825 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
826 grpc_slice_unref_internal(request_payload_slice);
827 // Send the message.
828 grpc_op op;
829 memset(&op, 0, sizeof(op));
830 op.op = GRPC_OP_SEND_MESSAGE;
831 op.data.send_message.send_message = send_message_payload_;
832 Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
833 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
834 grpc_schedule_on_exec_ctx);
835 grpc_call_error call_error =
836 grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
837 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
838 gpr_log(GPR_ERROR,
839 "[xds_client %p] calld=%p call_error=%d sending ADS message",
840 xds_client(), this, call_error);
841 GPR_ASSERT(GRPC_CALL_OK == call_error);
842 }
843 }
844
Subscribe(const std::string & type_url,const std::string & name)845 void XdsClient::ChannelState::AdsCallState::Subscribe(
846 const std::string& type_url, const std::string& name) {
847 auto& state = state_map_[type_url].subscribed_resources[name];
848 if (state == nullptr) {
849 state = MakeOrphanable<ResourceState>(
850 type_url, name, !xds_client()->resource_version_map_[type_url].empty());
851 SendMessageLocked(type_url);
852 }
853 }
854
Unsubscribe(const std::string & type_url,const std::string & name,bool delay_unsubscription)855 void XdsClient::ChannelState::AdsCallState::Unsubscribe(
856 const std::string& type_url, const std::string& name,
857 bool delay_unsubscription) {
858 state_map_[type_url].subscribed_resources.erase(name);
859 if (!delay_unsubscription) SendMessageLocked(type_url);
860 }
861
HasSubscribedResources() const862 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
863 for (const auto& p : state_map_) {
864 if (!p.second.subscribed_resources.empty()) return true;
865 }
866 return false;
867 }
868
AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map)869 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
870 XdsApi::LdsUpdateMap lds_update_map) {
871 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
872 gpr_log(GPR_INFO,
873 "[xds_client %p] LDS update received containing %" PRIuPTR
874 " resources",
875 xds_client(), lds_update_map.size());
876 }
877 auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
878 std::set<std::string> rds_resource_names_seen;
879 for (auto& p : lds_update_map) {
880 const std::string& listener_name = p.first;
881 XdsApi::LdsUpdate& lds_update = p.second;
882 auto& state = lds_state.subscribed_resources[listener_name];
883 if (state != nullptr) state->Finish();
884 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
885 gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: %s", xds_client(),
886 listener_name.c_str(), lds_update.ToString().c_str());
887 }
888 // Record the RDS resource names seen.
889 if (!lds_update.route_config_name.empty()) {
890 rds_resource_names_seen.insert(lds_update.route_config_name);
891 }
892 // Ignore identical update.
893 ListenerState& listener_state = xds_client()->listener_map_[listener_name];
894 if (listener_state.update.has_value() &&
895 *listener_state.update == lds_update) {
896 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
897 gpr_log(GPR_INFO,
898 "[xds_client %p] LDS update for %s identical to current, "
899 "ignoring.",
900 xds_client(), listener_name.c_str());
901 }
902 continue;
903 }
904 // Update the listener state.
905 listener_state.update = std::move(lds_update);
906 // Notify watchers.
907 for (const auto& p : listener_state.watchers) {
908 p.first->OnListenerChanged(*listener_state.update);
909 }
910 }
911 // For any subscribed resource that is not present in the update,
912 // remove it from the cache and notify watchers that it does not exist.
913 for (const auto& p : lds_state.subscribed_resources) {
914 const std::string& listener_name = p.first;
915 if (lds_update_map.find(listener_name) == lds_update_map.end()) {
916 ListenerState& listener_state =
917 xds_client()->listener_map_[listener_name];
918 // If the resource was newly requested but has not yet been received,
919 // we don't want to generate an error for the watchers, because this LDS
920 // response may be in reaction to an earlier request that did not yet
921 // request the new resource, so its absence from the response does not
922 // necessarily indicate that the resource does not exist.
923 // For that case, we rely on the request timeout instead.
924 if (!listener_state.update.has_value()) continue;
925 listener_state.update.reset();
926 for (const auto& p : listener_state.watchers) {
927 p.first->OnResourceDoesNotExist();
928 }
929 }
930 }
931 // For any RDS resource that is no longer referred to by any LDS
932 // resources, remove it from the cache and notify watchers that it
933 // does not exist.
934 auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
935 for (const auto& p : rds_state.subscribed_resources) {
936 const std::string& rds_resource_name = p.first;
937 if (rds_resource_names_seen.find(rds_resource_name) ==
938 rds_resource_names_seen.end()) {
939 RouteConfigState& route_config_state =
940 xds_client()->route_config_map_[rds_resource_name];
941 route_config_state.update.reset();
942 for (const auto& p : route_config_state.watchers) {
943 p.first->OnResourceDoesNotExist();
944 }
945 }
946 }
947 }
948
AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map)949 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
950 XdsApi::RdsUpdateMap rds_update_map) {
951 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
952 gpr_log(GPR_INFO,
953 "[xds_client %p] RDS update received containing %" PRIuPTR
954 " resources",
955 xds_client(), rds_update_map.size());
956 }
957 auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
958 for (auto& p : rds_update_map) {
959 const std::string& route_config_name = p.first;
960 XdsApi::RdsUpdate& rds_update = p.second;
961 auto& state = rds_state.subscribed_resources[route_config_name];
962 if (state != nullptr) state->Finish();
963 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
964 gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
965 rds_update.ToString().c_str());
966 }
967 RouteConfigState& route_config_state =
968 xds_client()->route_config_map_[route_config_name];
969 // Ignore identical update.
970 if (route_config_state.update.has_value() &&
971 *route_config_state.update == rds_update) {
972 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
973 gpr_log(GPR_INFO,
974 "[xds_client %p] RDS resource identical to current, ignoring",
975 xds_client());
976 }
977 continue;
978 }
979 // Update the cache.
980 route_config_state.update = std::move(rds_update);
981 // Notify all watchers.
982 for (const auto& p : route_config_state.watchers) {
983 p.first->OnRouteConfigChanged(*route_config_state.update);
984 }
985 }
986 }
987
AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map)988 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
989 XdsApi::CdsUpdateMap cds_update_map) {
990 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
991 gpr_log(GPR_INFO,
992 "[xds_client %p] CDS update received containing %" PRIuPTR
993 " resources",
994 xds_client(), cds_update_map.size());
995 }
996 auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
997 std::set<std::string> eds_resource_names_seen;
998 for (auto& p : cds_update_map) {
999 const char* cluster_name = p.first.c_str();
1000 XdsApi::CdsUpdate& cds_update = p.second;
1001 auto& state = cds_state.subscribed_resources[cluster_name];
1002 if (state != nullptr) state->Finish();
1003 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1004 gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(),
1005 cluster_name, cds_update.ToString().c_str());
1006 }
1007 // Record the EDS resource names seen.
1008 eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
1009 ? cluster_name
1010 : cds_update.eds_service_name);
1011 // Ignore identical update.
1012 ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1013 if (cluster_state.update.has_value() &&
1014 *cluster_state.update == cds_update) {
1015 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1016 gpr_log(GPR_INFO,
1017 "[xds_client %p] CDS update identical to current, ignoring.",
1018 xds_client());
1019 }
1020 continue;
1021 }
1022 // Update the cluster state.
1023 cluster_state.update = std::move(cds_update);
1024 // Notify all watchers.
1025 for (const auto& p : cluster_state.watchers) {
1026 p.first->OnClusterChanged(cluster_state.update.value());
1027 }
1028 }
1029 // For any subscribed resource that is not present in the update,
1030 // remove it from the cache and notify watchers that it does not exist.
1031 for (const auto& p : cds_state.subscribed_resources) {
1032 const std::string& cluster_name = p.first;
1033 if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
1034 ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1035 // If the resource was newly requested but has not yet been received,
1036 // we don't want to generate an error for the watchers, because this CDS
1037 // response may be in reaction to an earlier request that did not yet
1038 // request the new resource, so its absence from the response does not
1039 // necessarily indicate that the resource does not exist.
1040 // For that case, we rely on the request timeout instead.
1041 if (!cluster_state.update.has_value()) continue;
1042 cluster_state.update.reset();
1043 for (const auto& p : cluster_state.watchers) {
1044 p.first->OnResourceDoesNotExist();
1045 }
1046 }
1047 }
1048 // For any EDS resource that is no longer referred to by any CDS
1049 // resources, remove it from the cache and notify watchers that it
1050 // does not exist.
1051 auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1052 for (const auto& p : eds_state.subscribed_resources) {
1053 const std::string& eds_resource_name = p.first;
1054 if (eds_resource_names_seen.find(eds_resource_name) ==
1055 eds_resource_names_seen.end()) {
1056 EndpointState& endpoint_state =
1057 xds_client()->endpoint_map_[eds_resource_name];
1058 endpoint_state.update.reset();
1059 for (const auto& p : endpoint_state.watchers) {
1060 p.first->OnResourceDoesNotExist();
1061 }
1062 }
1063 }
1064 }
1065
AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map)1066 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
1067 XdsApi::EdsUpdateMap eds_update_map) {
1068 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1069 gpr_log(GPR_INFO,
1070 "[xds_client %p] EDS update received containing %" PRIuPTR
1071 " resources",
1072 xds_client(), eds_update_map.size());
1073 }
1074 auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1075 for (auto& p : eds_update_map) {
1076 const char* eds_service_name = p.first.c_str();
1077 XdsApi::EdsUpdate& eds_update = p.second;
1078 auto& state = eds_state.subscribed_resources[eds_service_name];
1079 if (state != nullptr) state->Finish();
1080 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1081 gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
1082 eds_service_name, eds_update.ToString().c_str());
1083 }
1084 EndpointState& endpoint_state =
1085 xds_client()->endpoint_map_[eds_service_name];
1086 // Ignore identical update.
1087 if (endpoint_state.update.has_value() &&
1088 *endpoint_state.update == eds_update) {
1089 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1090 gpr_log(GPR_INFO,
1091 "[xds_client %p] EDS update identical to current, ignoring.",
1092 xds_client());
1093 }
1094 continue;
1095 }
1096 // Update the cluster state.
1097 endpoint_state.update = std::move(eds_update);
1098 // Notify all watchers.
1099 for (const auto& p : endpoint_state.watchers) {
1100 p.first->OnEndpointChanged(endpoint_state.update.value());
1101 }
1102 }
1103 }
1104
OnRequestSent(void * arg,grpc_error * error)1105 void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
1106 grpc_error* error) {
1107 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1108 {
1109 MutexLock lock(&ads_calld->xds_client()->mu_);
1110 ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
1111 }
1112 ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1113 }
1114
OnRequestSentLocked(grpc_error * error)1115 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1116 grpc_error* error) {
1117 if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
1118 // Clean up the sent message.
1119 grpc_byte_buffer_destroy(send_message_payload_);
1120 send_message_payload_ = nullptr;
1121 // Continue to send another pending message if any.
1122 // TODO(roth): The current code to handle buffered messages has the
1123 // advantage of sending only the most recent list of resource names for
1124 // each resource type (no matter how many times that resource type has
1125 // been requested to send while the current message sending is still
1126 // pending). But its disadvantage is that we send the requests in fixed
1127 // order of resource types. We need to fix this if we are seeing some
1128 // resource type(s) starved due to frequent requests of other resource
1129 // type(s).
1130 auto it = buffered_requests_.begin();
1131 if (it != buffered_requests_.end()) {
1132 SendMessageLocked(*it);
1133 buffered_requests_.erase(it);
1134 }
1135 }
1136 GRPC_ERROR_UNREF(error);
1137 }
1138
OnResponseReceived(void * arg,grpc_error *)1139 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1140 void* arg, grpc_error* /* error */) {
1141 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1142 bool done;
1143 {
1144 MutexLock lock(&ads_calld->xds_client()->mu_);
1145 done = ads_calld->OnResponseReceivedLocked();
1146 }
1147 if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1148 }
1149
OnResponseReceivedLocked()1150 bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1151 // Empty payload means the call was cancelled.
1152 if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1153 return true;
1154 }
1155 // Read the response.
1156 grpc_byte_buffer_reader bbr;
1157 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1158 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1159 grpc_byte_buffer_reader_destroy(&bbr);
1160 grpc_byte_buffer_destroy(recv_message_payload_);
1161 recv_message_payload_ = nullptr;
1162 // Parse and validate the response.
1163 XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
1164 response_slice, ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
1165 ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
1166 ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
1167 ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
1168 grpc_slice_unref_internal(response_slice);
1169 if (result.type_url.empty()) {
1170 // Ignore unparsable response.
1171 gpr_log(GPR_ERROR,
1172 "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
1173 xds_client(), grpc_error_string(result.parse_error));
1174 GRPC_ERROR_UNREF(result.parse_error);
1175 } else {
1176 // Update nonce.
1177 auto& state = state_map_[result.type_url];
1178 state.nonce = std::move(result.nonce);
1179 // NACK or ACK the response.
1180 if (result.parse_error != GRPC_ERROR_NONE) {
1181 GRPC_ERROR_UNREF(state.error);
1182 state.error = result.parse_error;
1183 // NACK unacceptable update.
1184 gpr_log(GPR_ERROR,
1185 "[xds_client %p] ADS response invalid for resource type %s "
1186 "version %s, will NACK: nonce=%s error=%s",
1187 xds_client(), result.type_url.c_str(), result.version.c_str(),
1188 state.nonce.c_str(), grpc_error_string(result.parse_error));
1189 SendMessageLocked(result.type_url);
1190 } else {
1191 seen_response_ = true;
1192 // Accept the ADS response according to the type_url.
1193 if (result.type_url == XdsApi::kLdsTypeUrl) {
1194 AcceptLdsUpdate(std::move(result.lds_update_map));
1195 } else if (result.type_url == XdsApi::kRdsTypeUrl) {
1196 AcceptRdsUpdate(std::move(result.rds_update_map));
1197 } else if (result.type_url == XdsApi::kCdsTypeUrl) {
1198 AcceptCdsUpdate(std::move(result.cds_update_map));
1199 } else if (result.type_url == XdsApi::kEdsTypeUrl) {
1200 AcceptEdsUpdate(std::move(result.eds_update_map));
1201 }
1202 xds_client()->resource_version_map_[result.type_url] =
1203 std::move(result.version);
1204 // ACK the update.
1205 SendMessageLocked(result.type_url);
1206 // Start load reporting if needed.
1207 auto& lrs_call = chand()->lrs_calld_;
1208 if (lrs_call != nullptr) {
1209 LrsCallState* lrs_calld = lrs_call->calld();
1210 if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1211 }
1212 }
1213 }
1214 if (xds_client()->shutting_down_) return true;
1215 // Keep listening for updates.
1216 grpc_op op;
1217 memset(&op, 0, sizeof(op));
1218 op.op = GRPC_OP_RECV_MESSAGE;
1219 op.data.recv_message.recv_message = &recv_message_payload_;
1220 op.flags = 0;
1221 op.reserved = nullptr;
1222 GPR_ASSERT(call_ != nullptr);
1223 // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1224 const grpc_call_error call_error =
1225 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1226 GPR_ASSERT(GRPC_CALL_OK == call_error);
1227 return false;
1228 }
1229
OnStatusReceived(void * arg,grpc_error * error)1230 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1231 void* arg, grpc_error* error) {
1232 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1233 {
1234 MutexLock lock(&ads_calld->xds_client()->mu_);
1235 ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1236 }
1237 ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1238 }
1239
OnStatusReceivedLocked(grpc_error * error)1240 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1241 grpc_error* error) {
1242 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1243 char* status_details = grpc_slice_to_c_string(status_details_);
1244 gpr_log(GPR_INFO,
1245 "[xds_client %p] ADS call status received. Status = %d, details "
1246 "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
1247 xds_client(), status_code_, status_details, chand(), this, call_,
1248 grpc_error_string(error));
1249 gpr_free(status_details);
1250 }
1251 // Ignore status from a stale call.
1252 if (IsCurrentCallOnChannel()) {
1253 // Try to restart the call.
1254 parent_->OnCallFinishedLocked();
1255 // Send error to all watchers.
1256 xds_client()->NotifyOnErrorLocked(
1257 GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
1258 }
1259 GRPC_ERROR_UNREF(error);
1260 }
1261
IsCurrentCallOnChannel() const1262 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1263 // If the retryable ADS call is null (which only happens when the xds channel
1264 // is shutting down), all the ADS calls are stale.
1265 if (chand()->ads_calld_ == nullptr) return false;
1266 return this == chand()->ads_calld_->calld();
1267 }
1268
1269 std::set<absl::string_view>
ResourceNamesForRequest(const std::string & type_url)1270 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1271 const std::string& type_url) {
1272 std::set<absl::string_view> resource_names;
1273 auto it = state_map_.find(type_url);
1274 if (it != state_map_.end()) {
1275 for (auto& p : it->second.subscribed_resources) {
1276 resource_names.insert(p.first);
1277 OrphanablePtr<ResourceState>& state = p.second;
1278 state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
1279 }
1280 }
1281 return resource_names;
1282 }
1283
1284 //
1285 // XdsClient::ChannelState::LrsCallState::Reporter
1286 //
1287
Orphan()1288 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1289 if (next_report_timer_callback_pending_) {
1290 grpc_timer_cancel(&next_report_timer_);
1291 }
1292 }
1293
1294 void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked()1295 ScheduleNextReportLocked() {
1296 const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1297 grpc_timer_init(&next_report_timer_, next_report_time,
1298 &on_next_report_timer_);
1299 next_report_timer_callback_pending_ = true;
1300 }
1301
OnNextReportTimer(void * arg,grpc_error * error)1302 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1303 void* arg, grpc_error* error) {
1304 Reporter* self = static_cast<Reporter*>(arg);
1305 bool done;
1306 {
1307 MutexLock lock(&self->xds_client()->mu_);
1308 done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
1309 }
1310 if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
1311 }
1312
OnNextReportTimerLocked(grpc_error * error)1313 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1314 grpc_error* error) {
1315 next_report_timer_callback_pending_ = false;
1316 if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1317 GRPC_ERROR_UNREF(error);
1318 return true;
1319 }
1320 return SendReportLocked();
1321 }
1322
1323 namespace {
1324
LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap & snapshot)1325 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1326 for (const auto& p : snapshot) {
1327 const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1328 if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1329 for (const auto& q : cluster_snapshot.locality_stats) {
1330 const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1331 if (!locality_snapshot.IsZero()) return false;
1332 }
1333 }
1334 return true;
1335 }
1336
1337 } // namespace
1338
SendReportLocked()1339 bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1340 // Construct snapshot from all reported stats.
1341 XdsApi::ClusterLoadReportMap snapshot =
1342 xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
1343 parent_->cluster_names_);
1344 // Skip client load report if the counters were all zero in the last
1345 // report and they are still zero in this one.
1346 const bool old_val = last_report_counters_were_zero_;
1347 last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1348 if (old_val && last_report_counters_were_zero_) {
1349 if (xds_client()->load_report_map_.empty()) {
1350 parent_->chand()->StopLrsCall();
1351 return true;
1352 }
1353 ScheduleNextReportLocked();
1354 return false;
1355 }
1356 // Create a request that contains the snapshot.
1357 grpc_slice request_payload_slice =
1358 xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1359 parent_->send_message_payload_ =
1360 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1361 grpc_slice_unref_internal(request_payload_slice);
1362 // Send the report.
1363 grpc_op op;
1364 memset(&op, 0, sizeof(op));
1365 op.op = GRPC_OP_SEND_MESSAGE;
1366 op.data.send_message.send_message = parent_->send_message_payload_;
1367 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1368 parent_->call_, &op, 1, &on_report_done_);
1369 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1370 gpr_log(GPR_ERROR,
1371 "[xds_client %p] calld=%p call_error=%d sending client load report",
1372 xds_client(), this, call_error);
1373 GPR_ASSERT(GRPC_CALL_OK == call_error);
1374 }
1375 return false;
1376 }
1377
OnReportDone(void * arg,grpc_error * error)1378 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
1379 void* arg, grpc_error* error) {
1380 Reporter* self = static_cast<Reporter*>(arg);
1381 bool done;
1382 {
1383 MutexLock lock(&self->xds_client()->mu_);
1384 done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
1385 }
1386 if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1387 }
1388
OnReportDoneLocked(grpc_error * error)1389 bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1390 grpc_error* error) {
1391 grpc_byte_buffer_destroy(parent_->send_message_payload_);
1392 parent_->send_message_payload_ = nullptr;
1393 // If there are no more registered stats to report, cancel the call.
1394 if (xds_client()->load_report_map_.empty()) {
1395 parent_->chand()->StopLrsCall();
1396 GRPC_ERROR_UNREF(error);
1397 return true;
1398 }
1399 if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1400 GRPC_ERROR_UNREF(error);
1401 // If this reporter is no longer the current one on the call, the reason
1402 // might be that it was orphaned for a new one due to config update.
1403 if (!IsCurrentReporterOnCall()) {
1404 parent_->MaybeStartReportingLocked();
1405 }
1406 return true;
1407 }
1408 ScheduleNextReportLocked();
1409 return false;
1410 }
1411
1412 //
1413 // XdsClient::ChannelState::LrsCallState
1414 //
1415
LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent)1416 XdsClient::ChannelState::LrsCallState::LrsCallState(
1417 RefCountedPtr<RetryableCall<LrsCallState>> parent)
1418 : InternallyRefCounted<LrsCallState>(
1419 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
1420 ? "LrsCallState"
1421 : nullptr),
1422 parent_(std::move(parent)) {
1423 // Init the LRS call. Note that the call will progress every time there's
1424 // activity in xds_client()->interested_parties_, which is comprised of
1425 // the polling entities from client_channel.
1426 GPR_ASSERT(xds_client() != nullptr);
1427 const auto& method =
1428 chand()->server_.ShouldUseV3()
1429 ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
1430 : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
1431 call_ = grpc_channel_create_pollset_set_call(
1432 chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1433 xds_client()->interested_parties_, method, nullptr,
1434 GRPC_MILLIS_INF_FUTURE, nullptr);
1435 GPR_ASSERT(call_ != nullptr);
1436 // Init the request payload.
1437 grpc_slice request_payload_slice =
1438 xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
1439 send_message_payload_ =
1440 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1441 grpc_slice_unref_internal(request_payload_slice);
1442 // Init other data associated with the LRS call.
1443 grpc_metadata_array_init(&initial_metadata_recv_);
1444 grpc_metadata_array_init(&trailing_metadata_recv_);
1445 // Start the call.
1446 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1447 gpr_log(GPR_INFO,
1448 "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
1449 "call: %p)",
1450 xds_client(), chand(), this, call_);
1451 }
1452 // Create the ops.
1453 grpc_call_error call_error;
1454 grpc_op ops[3];
1455 memset(ops, 0, sizeof(ops));
1456 // Op: send initial metadata.
1457 grpc_op* op = ops;
1458 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1459 op->data.send_initial_metadata.count = 0;
1460 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
1461 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1462 op->reserved = nullptr;
1463 op++;
1464 // Op: send request message.
1465 GPR_ASSERT(send_message_payload_ != nullptr);
1466 op->op = GRPC_OP_SEND_MESSAGE;
1467 op->data.send_message.send_message = send_message_payload_;
1468 op->flags = 0;
1469 op->reserved = nullptr;
1470 op++;
1471 Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1472 GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
1473 grpc_schedule_on_exec_ctx);
1474 call_error = grpc_call_start_batch_and_execute(
1475 call_, ops, static_cast<size_t>(op - ops), &on_initial_request_sent_);
1476 GPR_ASSERT(GRPC_CALL_OK == call_error);
1477 // Op: recv initial metadata.
1478 op = ops;
1479 op->op = GRPC_OP_RECV_INITIAL_METADATA;
1480 op->data.recv_initial_metadata.recv_initial_metadata =
1481 &initial_metadata_recv_;
1482 op->flags = 0;
1483 op->reserved = nullptr;
1484 op++;
1485 // Op: recv response.
1486 op->op = GRPC_OP_RECV_MESSAGE;
1487 op->data.recv_message.recv_message = &recv_message_payload_;
1488 op->flags = 0;
1489 op->reserved = nullptr;
1490 op++;
1491 Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1492 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1493 grpc_schedule_on_exec_ctx);
1494 call_error = grpc_call_start_batch_and_execute(
1495 call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
1496 GPR_ASSERT(GRPC_CALL_OK == call_error);
1497 // Op: recv server status.
1498 op = ops;
1499 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1500 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1501 op->data.recv_status_on_client.status = &status_code_;
1502 op->data.recv_status_on_client.status_details = &status_details_;
1503 op->flags = 0;
1504 op->reserved = nullptr;
1505 op++;
1506 // This callback signals the end of the call, so it relies on the initial
1507 // ref instead of a new ref. When it's invoked, it's the initial ref that is
1508 // unreffed.
1509 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1510 grpc_schedule_on_exec_ctx);
1511 call_error = grpc_call_start_batch_and_execute(
1512 call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
1513 GPR_ASSERT(GRPC_CALL_OK == call_error);
1514 }
1515
~LrsCallState()1516 XdsClient::ChannelState::LrsCallState::~LrsCallState() {
1517 grpc_metadata_array_destroy(&initial_metadata_recv_);
1518 grpc_metadata_array_destroy(&trailing_metadata_recv_);
1519 grpc_byte_buffer_destroy(send_message_payload_);
1520 grpc_byte_buffer_destroy(recv_message_payload_);
1521 grpc_slice_unref_internal(status_details_);
1522 GPR_ASSERT(call_ != nullptr);
1523 grpc_call_unref(call_);
1524 }
1525
Orphan()1526 void XdsClient::ChannelState::LrsCallState::Orphan() {
1527 reporter_.reset();
1528 GPR_ASSERT(call_ != nullptr);
1529 // If we are here because xds_client wants to cancel the call,
1530 // on_status_received_ will complete the cancellation and clean up. Otherwise,
1531 // we are here because xds_client has to orphan a failed call, then the
1532 // following cancellation will be a no-op.
1533 grpc_call_cancel_internal(call_);
1534 // Note that the initial ref is hold by on_status_received_. So the
1535 // corresponding unref happens in on_status_received_ instead of here.
1536 }
1537
MaybeStartReportingLocked()1538 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1539 // Don't start again if already started.
1540 if (reporter_ != nullptr) return;
1541 // Don't start if the previous send_message op (of the initial request or the
1542 // last report of the previous reporter) hasn't completed.
1543 if (send_message_payload_ != nullptr) return;
1544 // Don't start if no LRS response has arrived.
1545 if (!seen_response()) return;
1546 // Don't start if the ADS call hasn't received any valid response. Note that
1547 // this must be the first channel because it is the current channel but its
1548 // ADS call hasn't seen any response.
1549 if (chand()->ads_calld_ == nullptr ||
1550 chand()->ads_calld_->calld() == nullptr ||
1551 !chand()->ads_calld_->calld()->seen_response()) {
1552 return;
1553 }
1554 // Start reporting.
1555 reporter_ = MakeOrphanable<Reporter>(
1556 Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1557 }
1558
OnInitialRequestSent(void * arg,grpc_error *)1559 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
1560 void* arg, grpc_error* /*error*/) {
1561 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1562 {
1563 MutexLock lock(&lrs_calld->xds_client()->mu_);
1564 lrs_calld->OnInitialRequestSentLocked();
1565 }
1566 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1567 }
1568
OnInitialRequestSentLocked()1569 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1570 // Clear the send_message_payload_.
1571 grpc_byte_buffer_destroy(send_message_payload_);
1572 send_message_payload_ = nullptr;
1573 MaybeStartReportingLocked();
1574 }
1575
OnResponseReceived(void * arg,grpc_error *)1576 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1577 void* arg, grpc_error* /*error*/) {
1578 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1579 bool done;
1580 {
1581 MutexLock lock(&lrs_calld->xds_client()->mu_);
1582 done = lrs_calld->OnResponseReceivedLocked();
1583 }
1584 if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1585 }
1586
OnResponseReceivedLocked()1587 bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1588 // Empty payload means the call was cancelled.
1589 if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1590 return true;
1591 }
1592 // Read the response.
1593 grpc_byte_buffer_reader bbr;
1594 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1595 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1596 grpc_byte_buffer_reader_destroy(&bbr);
1597 grpc_byte_buffer_destroy(recv_message_payload_);
1598 recv_message_payload_ = nullptr;
1599 // This anonymous lambda is a hack to avoid the usage of goto.
1600 [&]() {
1601 // Parse the response.
1602 bool send_all_clusters = false;
1603 std::set<std::string> new_cluster_names;
1604 grpc_millis new_load_reporting_interval;
1605 grpc_error* parse_error = xds_client()->api_.ParseLrsResponse(
1606 response_slice, &send_all_clusters, &new_cluster_names,
1607 &new_load_reporting_interval);
1608 if (parse_error != GRPC_ERROR_NONE) {
1609 gpr_log(GPR_ERROR,
1610 "[xds_client %p] LRS response parsing failed. error=%s",
1611 xds_client(), grpc_error_string(parse_error));
1612 GRPC_ERROR_UNREF(parse_error);
1613 return;
1614 }
1615 seen_response_ = true;
1616 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1617 gpr_log(
1618 GPR_INFO,
1619 "[xds_client %p] LRS response received, %" PRIuPTR
1620 " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1621 "ms",
1622 xds_client(), new_cluster_names.size(), send_all_clusters,
1623 new_load_reporting_interval);
1624 size_t i = 0;
1625 for (const auto& name : new_cluster_names) {
1626 gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1627 xds_client(), i++, name.c_str());
1628 }
1629 }
1630 if (new_load_reporting_interval <
1631 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1632 new_load_reporting_interval =
1633 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1634 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1635 gpr_log(GPR_INFO,
1636 "[xds_client %p] Increased load_report_interval to minimum "
1637 "value %dms",
1638 xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1639 }
1640 }
1641 // Ignore identical update.
1642 if (send_all_clusters == send_all_clusters_ &&
1643 cluster_names_ == new_cluster_names &&
1644 load_reporting_interval_ == new_load_reporting_interval) {
1645 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1646 gpr_log(GPR_INFO,
1647 "[xds_client %p] Incoming LRS response identical to current, "
1648 "ignoring.",
1649 xds_client());
1650 }
1651 return;
1652 }
1653 // Stop current load reporting (if any) to adopt the new config.
1654 reporter_.reset();
1655 // Record the new config.
1656 send_all_clusters_ = send_all_clusters;
1657 cluster_names_ = std::move(new_cluster_names);
1658 load_reporting_interval_ = new_load_reporting_interval;
1659 // Try starting sending load report.
1660 MaybeStartReportingLocked();
1661 }();
1662 grpc_slice_unref_internal(response_slice);
1663 if (xds_client()->shutting_down_) return true;
1664 // Keep listening for LRS config updates.
1665 grpc_op op;
1666 memset(&op, 0, sizeof(op));
1667 op.op = GRPC_OP_RECV_MESSAGE;
1668 op.data.recv_message.recv_message = &recv_message_payload_;
1669 op.flags = 0;
1670 op.reserved = nullptr;
1671 GPR_ASSERT(call_ != nullptr);
1672 // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1673 const grpc_call_error call_error =
1674 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1675 GPR_ASSERT(GRPC_CALL_OK == call_error);
1676 return false;
1677 }
1678
OnStatusReceived(void * arg,grpc_error * error)1679 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1680 void* arg, grpc_error* error) {
1681 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1682 {
1683 MutexLock lock(&lrs_calld->xds_client()->mu_);
1684 lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1685 }
1686 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1687 }
1688
OnStatusReceivedLocked(grpc_error * error)1689 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1690 grpc_error* error) {
1691 GPR_ASSERT(call_ != nullptr);
1692 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1693 char* status_details = grpc_slice_to_c_string(status_details_);
1694 gpr_log(GPR_INFO,
1695 "[xds_client %p] LRS call status received. Status = %d, details "
1696 "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
1697 xds_client(), status_code_, status_details, chand(), this, call_,
1698 grpc_error_string(error));
1699 gpr_free(status_details);
1700 }
1701 // Ignore status from a stale call.
1702 if (IsCurrentCallOnChannel()) {
1703 GPR_ASSERT(!xds_client()->shutting_down_);
1704 // Try to restart the call.
1705 parent_->OnCallFinishedLocked();
1706 }
1707 GRPC_ERROR_UNREF(error);
1708 }
1709
IsCurrentCallOnChannel() const1710 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1711 // If the retryable LRS call is null (which only happens when the xds channel
1712 // is shutting down), all the LRS calls are stale.
1713 if (chand()->lrs_calld_ == nullptr) return false;
1714 return this == chand()->lrs_calld_->calld();
1715 }
1716
1717 //
1718 // XdsClient
1719 //
1720
1721 namespace {
1722
GetRequestTimeout()1723 grpc_millis GetRequestTimeout() {
1724 return grpc_channel_args_find_integer(
1725 g_channel_args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
1726 {15000, 0, INT_MAX});
1727 }
1728
1729 } // namespace
1730
XdsClient(grpc_error ** error)1731 XdsClient::XdsClient(grpc_error** error)
1732 : DualRefCounted<XdsClient>(
1733 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
1734 : nullptr),
1735 request_timeout_(GetRequestTimeout()),
1736 interested_parties_(grpc_pollset_set_create()),
1737 bootstrap_(XdsBootstrap::Create(this, &grpc_xds_client_trace,
1738 g_fallback_bootstrap_config, error)),
1739 certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
1740 bootstrap_ == nullptr
1741 ? CertificateProviderStore::PluginDefinitionMap()
1742 : bootstrap_->certificate_providers())),
1743 api_(this, &grpc_xds_client_trace,
1744 bootstrap_ == nullptr ? nullptr : bootstrap_->node()) {
1745 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1746 gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1747 }
1748 if (*error != GRPC_ERROR_NONE) {
1749 gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
1750 this, grpc_error_string(*error));
1751 return;
1752 }
1753 // Create ChannelState object.
1754 chand_ = MakeOrphanable<ChannelState>(
1755 WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
1756 }
1757
~XdsClient()1758 XdsClient::~XdsClient() {
1759 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1760 gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1761 }
1762 grpc_pollset_set_destroy(interested_parties_);
1763 }
1764
AddChannelzLinkage(channelz::ChannelNode * parent_channelz_node)1765 void XdsClient::AddChannelzLinkage(
1766 channelz::ChannelNode* parent_channelz_node) {
1767 channelz::ChannelNode* xds_channelz_node =
1768 grpc_channel_get_channelz_node(chand_->channel());
1769 if (xds_channelz_node != nullptr) {
1770 parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
1771 }
1772 }
1773
RemoveChannelzLinkage(channelz::ChannelNode * parent_channelz_node)1774 void XdsClient::RemoveChannelzLinkage(
1775 channelz::ChannelNode* parent_channelz_node) {
1776 channelz::ChannelNode* xds_channelz_node =
1777 grpc_channel_get_channelz_node(chand_->channel());
1778 if (xds_channelz_node != nullptr) {
1779 parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
1780 }
1781 }
1782
Orphan()1783 void XdsClient::Orphan() {
1784 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1785 gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1786 }
1787 {
1788 MutexLock lock(g_mu);
1789 if (g_xds_client == this) g_xds_client = nullptr;
1790 }
1791 {
1792 MutexLock lock(&mu_);
1793 shutting_down_ = true;
1794 // Orphan ChannelState object.
1795 chand_.reset();
1796 // We do not clear cluster_map_ and endpoint_map_ if the xds client was
1797 // created by the XdsResolver because the maps contain refs for watchers
1798 // which in turn hold refs to the loadbalancing policies. At this point, it
1799 // is possible for ADS calls to be in progress. Unreffing the loadbalancing
1800 // policies before those calls are done would lead to issues such as
1801 // https://github.com/grpc/grpc/issues/20928.
1802 if (!listener_map_.empty()) {
1803 cluster_map_.clear();
1804 endpoint_map_.clear();
1805 }
1806 }
1807 }
1808
WatchListenerData(absl::string_view listener_name,std::unique_ptr<ListenerWatcherInterface> watcher)1809 void XdsClient::WatchListenerData(
1810 absl::string_view listener_name,
1811 std::unique_ptr<ListenerWatcherInterface> watcher) {
1812 std::string listener_name_str = std::string(listener_name);
1813 MutexLock lock(&mu_);
1814 ListenerState& listener_state = listener_map_[listener_name_str];
1815 ListenerWatcherInterface* w = watcher.get();
1816 listener_state.watchers[w] = std::move(watcher);
1817 // If we've already received an LDS update, notify the new watcher
1818 // immediately.
1819 if (listener_state.update.has_value()) {
1820 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1821 gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
1822 this, listener_name_str.c_str());
1823 }
1824 w->OnListenerChanged(*listener_state.update);
1825 }
1826 chand_->Subscribe(XdsApi::kLdsTypeUrl, listener_name_str);
1827 }
1828
CancelListenerDataWatch(absl::string_view listener_name,ListenerWatcherInterface * watcher,bool delay_unsubscription)1829 void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
1830 ListenerWatcherInterface* watcher,
1831 bool delay_unsubscription) {
1832 MutexLock lock(&mu_);
1833 if (shutting_down_) return;
1834 std::string listener_name_str = std::string(listener_name);
1835 ListenerState& listener_state = listener_map_[listener_name_str];
1836 auto it = listener_state.watchers.find(watcher);
1837 if (it != listener_state.watchers.end()) {
1838 listener_state.watchers.erase(it);
1839 if (listener_state.watchers.empty()) {
1840 listener_map_.erase(listener_name_str);
1841 chand_->Unsubscribe(XdsApi::kLdsTypeUrl, listener_name_str,
1842 delay_unsubscription);
1843 }
1844 }
1845 }
1846
WatchRouteConfigData(absl::string_view route_config_name,std::unique_ptr<RouteConfigWatcherInterface> watcher)1847 void XdsClient::WatchRouteConfigData(
1848 absl::string_view route_config_name,
1849 std::unique_ptr<RouteConfigWatcherInterface> watcher) {
1850 std::string route_config_name_str = std::string(route_config_name);
1851 MutexLock lock(&mu_);
1852 RouteConfigState& route_config_state =
1853 route_config_map_[route_config_name_str];
1854 RouteConfigWatcherInterface* w = watcher.get();
1855 route_config_state.watchers[w] = std::move(watcher);
1856 // If we've already received an RDS update, notify the new watcher
1857 // immediately.
1858 if (route_config_state.update.has_value()) {
1859 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1860 gpr_log(GPR_INFO,
1861 "[xds_client %p] returning cached route config data for %s", this,
1862 route_config_name_str.c_str());
1863 }
1864 w->OnRouteConfigChanged(*route_config_state.update);
1865 }
1866 chand_->Subscribe(XdsApi::kRdsTypeUrl, route_config_name_str);
1867 }
1868
CancelRouteConfigDataWatch(absl::string_view route_config_name,RouteConfigWatcherInterface * watcher,bool delay_unsubscription)1869 void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
1870 RouteConfigWatcherInterface* watcher,
1871 bool delay_unsubscription) {
1872 MutexLock lock(&mu_);
1873 if (shutting_down_) return;
1874 std::string route_config_name_str = std::string(route_config_name);
1875 RouteConfigState& route_config_state =
1876 route_config_map_[route_config_name_str];
1877 auto it = route_config_state.watchers.find(watcher);
1878 if (it != route_config_state.watchers.end()) {
1879 route_config_state.watchers.erase(it);
1880 if (route_config_state.watchers.empty()) {
1881 route_config_map_.erase(route_config_name_str);
1882 chand_->Unsubscribe(XdsApi::kRdsTypeUrl, route_config_name_str,
1883 delay_unsubscription);
1884 }
1885 }
1886 }
1887
WatchClusterData(absl::string_view cluster_name,std::unique_ptr<ClusterWatcherInterface> watcher)1888 void XdsClient::WatchClusterData(
1889 absl::string_view cluster_name,
1890 std::unique_ptr<ClusterWatcherInterface> watcher) {
1891 std::string cluster_name_str = std::string(cluster_name);
1892 MutexLock lock(&mu_);
1893 ClusterState& cluster_state = cluster_map_[cluster_name_str];
1894 ClusterWatcherInterface* w = watcher.get();
1895 cluster_state.watchers[w] = std::move(watcher);
1896 // If we've already received a CDS update, notify the new watcher
1897 // immediately.
1898 if (cluster_state.update.has_value()) {
1899 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1900 gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
1901 this, cluster_name_str.c_str());
1902 }
1903 w->OnClusterChanged(cluster_state.update.value());
1904 }
1905 chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
1906 }
1907
CancelClusterDataWatch(absl::string_view cluster_name,ClusterWatcherInterface * watcher,bool delay_unsubscription)1908 void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
1909 ClusterWatcherInterface* watcher,
1910 bool delay_unsubscription) {
1911 MutexLock lock(&mu_);
1912 if (shutting_down_) return;
1913 std::string cluster_name_str = std::string(cluster_name);
1914 ClusterState& cluster_state = cluster_map_[cluster_name_str];
1915 auto it = cluster_state.watchers.find(watcher);
1916 if (it != cluster_state.watchers.end()) {
1917 cluster_state.watchers.erase(it);
1918 if (cluster_state.watchers.empty()) {
1919 cluster_map_.erase(cluster_name_str);
1920 chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str,
1921 delay_unsubscription);
1922 }
1923 }
1924 }
1925
WatchEndpointData(absl::string_view eds_service_name,std::unique_ptr<EndpointWatcherInterface> watcher)1926 void XdsClient::WatchEndpointData(
1927 absl::string_view eds_service_name,
1928 std::unique_ptr<EndpointWatcherInterface> watcher) {
1929 std::string eds_service_name_str = std::string(eds_service_name);
1930 MutexLock lock(&mu_);
1931 EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1932 EndpointWatcherInterface* w = watcher.get();
1933 endpoint_state.watchers[w] = std::move(watcher);
1934 // If we've already received an EDS update, notify the new watcher
1935 // immediately.
1936 if (endpoint_state.update.has_value()) {
1937 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1938 gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
1939 this, eds_service_name_str.c_str());
1940 }
1941 w->OnEndpointChanged(endpoint_state.update.value());
1942 }
1943 chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
1944 }
1945
CancelEndpointDataWatch(absl::string_view eds_service_name,EndpointWatcherInterface * watcher,bool delay_unsubscription)1946 void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
1947 EndpointWatcherInterface* watcher,
1948 bool delay_unsubscription) {
1949 MutexLock lock(&mu_);
1950 if (shutting_down_) return;
1951 std::string eds_service_name_str = std::string(eds_service_name);
1952 EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1953 auto it = endpoint_state.watchers.find(watcher);
1954 if (it != endpoint_state.watchers.end()) {
1955 endpoint_state.watchers.erase(it);
1956 if (endpoint_state.watchers.empty()) {
1957 endpoint_map_.erase(eds_service_name_str);
1958 chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str,
1959 delay_unsubscription);
1960 }
1961 }
1962 }
1963
AddClusterDropStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name)1964 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
1965 absl::string_view lrs_server, absl::string_view cluster_name,
1966 absl::string_view eds_service_name) {
1967 // TODO(roth): When we add support for direct federation, use the
1968 // server name specified in lrs_server.
1969 auto key =
1970 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1971 MutexLock lock(&mu_);
1972 // We jump through some hoops here to make sure that the absl::string_views
1973 // stored in the XdsClusterDropStats object point to the strings
1974 // in the load_report_map_ key, so that they have the same lifetime.
1975 auto it = load_report_map_
1976 .emplace(std::make_pair(std::move(key), LoadReportState()))
1977 .first;
1978 LoadReportState& load_report_state = it->second;
1979 RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
1980 if (load_report_state.drop_stats != nullptr) {
1981 cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
1982 }
1983 if (cluster_drop_stats == nullptr) {
1984 if (load_report_state.drop_stats != nullptr) {
1985 load_report_state.deleted_drop_stats +=
1986 load_report_state.drop_stats->GetSnapshotAndReset();
1987 }
1988 cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
1989 Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
1990 it->first.first /*cluster_name*/,
1991 it->first.second /*eds_service_name*/);
1992 load_report_state.drop_stats = cluster_drop_stats.get();
1993 }
1994 chand_->MaybeStartLrsCall();
1995 return cluster_drop_stats;
1996 }
1997
RemoveClusterDropStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,XdsClusterDropStats * cluster_drop_stats)1998 void XdsClient::RemoveClusterDropStats(
1999 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2000 absl::string_view eds_service_name,
2001 XdsClusterDropStats* cluster_drop_stats) {
2002 MutexLock lock(&mu_);
2003 // TODO(roth): When we add support for direct federation, use the
2004 // server name specified in lrs_server.
2005 auto it = load_report_map_.find(
2006 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2007 if (it == load_report_map_.end()) return;
2008 LoadReportState& load_report_state = it->second;
2009 if (load_report_state.drop_stats == cluster_drop_stats) {
2010 // Record final snapshot in deleted_drop_stats, which will be
2011 // added to the next load report.
2012 load_report_state.deleted_drop_stats +=
2013 load_report_state.drop_stats->GetSnapshotAndReset();
2014 load_report_state.drop_stats = nullptr;
2015 }
2016 }
2017
AddClusterLocalityStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality)2018 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
2019 absl::string_view lrs_server, absl::string_view cluster_name,
2020 absl::string_view eds_service_name,
2021 RefCountedPtr<XdsLocalityName> locality) {
2022 // TODO(roth): When we add support for direct federation, use the
2023 // server name specified in lrs_server.
2024 auto key =
2025 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2026 MutexLock lock(&mu_);
2027 // We jump through some hoops here to make sure that the absl::string_views
2028 // stored in the XdsClusterLocalityStats object point to the strings
2029 // in the load_report_map_ key, so that they have the same lifetime.
2030 auto it = load_report_map_
2031 .emplace(std::make_pair(std::move(key), LoadReportState()))
2032 .first;
2033 LoadReportState& load_report_state = it->second;
2034 LoadReportState::LocalityState& locality_state =
2035 load_report_state.locality_stats[locality];
2036 RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
2037 if (locality_state.locality_stats != nullptr) {
2038 cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
2039 }
2040 if (cluster_locality_stats == nullptr) {
2041 if (locality_state.locality_stats != nullptr) {
2042 locality_state.deleted_locality_stats +=
2043 locality_state.locality_stats->GetSnapshotAndReset();
2044 }
2045 cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2046 Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
2047 it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
2048 std::move(locality));
2049 locality_state.locality_stats = cluster_locality_stats.get();
2050 }
2051 chand_->MaybeStartLrsCall();
2052 return cluster_locality_stats;
2053 }
2054
RemoveClusterLocalityStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,XdsClusterLocalityStats * cluster_locality_stats)2055 void XdsClient::RemoveClusterLocalityStats(
2056 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2057 absl::string_view eds_service_name,
2058 const RefCountedPtr<XdsLocalityName>& locality,
2059 XdsClusterLocalityStats* cluster_locality_stats) {
2060 MutexLock lock(&mu_);
2061 // TODO(roth): When we add support for direct federation, use the
2062 // server name specified in lrs_server.
2063 auto it = load_report_map_.find(
2064 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2065 if (it == load_report_map_.end()) return;
2066 LoadReportState& load_report_state = it->second;
2067 auto locality_it = load_report_state.locality_stats.find(locality);
2068 if (locality_it == load_report_state.locality_stats.end()) return;
2069 LoadReportState::LocalityState& locality_state = locality_it->second;
2070 if (locality_state.locality_stats == cluster_locality_stats) {
2071 // Record final snapshot in deleted_locality_stats, which will be
2072 // added to the next load report.
2073 locality_state.deleted_locality_stats +=
2074 locality_state.locality_stats->GetSnapshotAndReset();
2075 locality_state.locality_stats = nullptr;
2076 }
2077 }
2078
ResetBackoff()2079 void XdsClient::ResetBackoff() {
2080 MutexLock lock(&mu_);
2081 if (chand_ != nullptr) {
2082 grpc_channel_reset_connect_backoff(chand_->channel());
2083 }
2084 }
2085
NotifyOnErrorLocked(grpc_error * error)2086 void XdsClient::NotifyOnErrorLocked(grpc_error* error) {
2087 for (const auto& p : listener_map_) {
2088 const ListenerState& listener_state = p.second;
2089 for (const auto& p : listener_state.watchers) {
2090 p.first->OnError(GRPC_ERROR_REF(error));
2091 }
2092 }
2093 for (const auto& p : route_config_map_) {
2094 const RouteConfigState& route_config_state = p.second;
2095 for (const auto& p : route_config_state.watchers) {
2096 p.first->OnError(GRPC_ERROR_REF(error));
2097 }
2098 }
2099 for (const auto& p : cluster_map_) {
2100 const ClusterState& cluster_state = p.second;
2101 for (const auto& p : cluster_state.watchers) {
2102 p.first->OnError(GRPC_ERROR_REF(error));
2103 }
2104 }
2105 for (const auto& p : endpoint_map_) {
2106 const EndpointState& endpoint_state = p.second;
2107 for (const auto& p : endpoint_state.watchers) {
2108 p.first->OnError(GRPC_ERROR_REF(error));
2109 }
2110 }
2111 GRPC_ERROR_UNREF(error);
2112 }
2113
BuildLoadReportSnapshotLocked(bool send_all_clusters,const std::set<std::string> & clusters)2114 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
2115 bool send_all_clusters, const std::set<std::string>& clusters) {
2116 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2117 gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
2118 }
2119 XdsApi::ClusterLoadReportMap snapshot_map;
2120 for (auto load_report_it = load_report_map_.begin();
2121 load_report_it != load_report_map_.end();) {
2122 // Cluster key is cluster and EDS service name.
2123 const auto& cluster_key = load_report_it->first;
2124 LoadReportState& load_report = load_report_it->second;
2125 // If the CDS response for a cluster indicates to use LRS but the
2126 // LRS server does not say that it wants reports for this cluster,
2127 // then we'll have stats objects here whose data we're not going to
2128 // include in the load report. However, we still need to clear out
2129 // the data from the stats objects, so that if the LRS server starts
2130 // asking for the data in the future, we don't incorrectly include
2131 // data from previous reporting intervals in that future report.
2132 const bool record_stats =
2133 send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2134 XdsApi::ClusterLoadReport snapshot;
2135 // Aggregate drop stats.
2136 snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2137 if (load_report.drop_stats != nullptr) {
2138 snapshot.dropped_requests +=
2139 load_report.drop_stats->GetSnapshotAndReset();
2140 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2141 gpr_log(GPR_INFO,
2142 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
2143 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2144 load_report.drop_stats);
2145 }
2146 }
2147 // Aggregate locality stats.
2148 for (auto it = load_report.locality_stats.begin();
2149 it != load_report.locality_stats.end();) {
2150 const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2151 auto& locality_state = it->second;
2152 XdsClusterLocalityStats::Snapshot& locality_snapshot =
2153 snapshot.locality_stats[locality_name];
2154 locality_snapshot = std::move(locality_state.deleted_locality_stats);
2155 if (locality_state.locality_stats != nullptr) {
2156 locality_snapshot +=
2157 locality_state.locality_stats->GetSnapshotAndReset();
2158 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2159 gpr_log(GPR_INFO,
2160 "[xds_client %p] cluster=%s eds_service_name=%s "
2161 "locality=%s locality_stats=%p",
2162 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2163 locality_name->AsHumanReadableString().c_str(),
2164 locality_state.locality_stats);
2165 }
2166 }
2167 // If the only thing left in this entry was final snapshots from
2168 // deleted locality stats objects, remove the entry.
2169 if (locality_state.locality_stats == nullptr) {
2170 it = load_report.locality_stats.erase(it);
2171 } else {
2172 ++it;
2173 }
2174 }
2175 // Compute load report interval.
2176 const grpc_millis now = ExecCtx::Get()->Now();
2177 snapshot.load_report_interval = now - load_report.last_report_time;
2178 load_report.last_report_time = now;
2179 // Record snapshot.
2180 if (record_stats) {
2181 snapshot_map[cluster_key] = std::move(snapshot);
2182 }
2183 // If the only thing left in this entry was final snapshots from
2184 // deleted stats objects, remove the entry.
2185 if (load_report.locality_stats.empty() &&
2186 load_report.drop_stats == nullptr) {
2187 load_report_it = load_report_map_.erase(load_report_it);
2188 } else {
2189 ++load_report_it;
2190 }
2191 }
2192 return snapshot_map;
2193 }
2194
2195 //
2196 // accessors for global state
2197 //
2198
XdsClientGlobalInit()2199 void XdsClientGlobalInit() { g_mu = new Mutex; }
2200
XdsClientGlobalShutdown()2201 void XdsClientGlobalShutdown() {
2202 delete g_mu;
2203 g_mu = nullptr;
2204 gpr_free(g_fallback_bootstrap_config);
2205 g_fallback_bootstrap_config = nullptr;
2206 }
2207
GetOrCreate(grpc_error ** error)2208 RefCountedPtr<XdsClient> XdsClient::GetOrCreate(grpc_error** error) {
2209 MutexLock lock(g_mu);
2210 if (g_xds_client != nullptr) {
2211 auto xds_client = g_xds_client->RefIfNonZero();
2212 if (xds_client != nullptr) return xds_client;
2213 }
2214 auto xds_client = MakeRefCounted<XdsClient>(error);
2215 g_xds_client = xds_client.get();
2216 return xds_client;
2217 }
2218
2219 namespace internal {
2220
SetXdsChannelArgsForTest(grpc_channel_args * args)2221 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
2222 MutexLock lock(g_mu);
2223 g_channel_args = args;
2224 }
2225
UnsetGlobalXdsClientForTest()2226 void UnsetGlobalXdsClientForTest() {
2227 MutexLock lock(g_mu);
2228 g_xds_client = nullptr;
2229 }
2230
SetXdsFallbackBootstrapConfig(const char * config)2231 void SetXdsFallbackBootstrapConfig(const char* config) {
2232 MutexLock lock(g_mu);
2233 gpr_free(g_fallback_bootstrap_config);
2234 g_fallback_bootstrap_config = gpr_strdup(config);
2235 }
2236
2237 } // namespace internal
2238
2239 } // namespace grpc_core
2240