1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <string.h>
24
25 #include "absl/container/inlined_vector.h"
26 #include "absl/strings/str_format.h"
27 #include "absl/strings/str_join.h"
28 #include "absl/strings/string_view.h"
29
30 #include <grpc/byte_buffer_reader.h>
31 #include <grpc/grpc.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/time.h>
34
35 #include "src/core/ext/filters/client_channel/client_channel.h"
36 #include "src/core/ext/filters/client_channel/service_config.h"
37 #include "src/core/ext/xds/xds_api.h"
38 #include "src/core/ext/xds/xds_channel_args.h"
39 #include "src/core/ext/xds/xds_client.h"
40 #include "src/core/ext/xds/xds_client_stats.h"
41 #include "src/core/lib/backoff/backoff.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/channel/channel_stack.h"
44 #include "src/core/lib/gpr/string.h"
45 #include "src/core/lib/gprpp/memory.h"
46 #include "src/core/lib/gprpp/orphanable.h"
47 #include "src/core/lib/gprpp/ref_counted_ptr.h"
48 #include "src/core/lib/gprpp/sync.h"
49 #include "src/core/lib/iomgr/sockaddr.h"
50 #include "src/core/lib/iomgr/sockaddr_utils.h"
51 #include "src/core/lib/iomgr/timer.h"
52 #include "src/core/lib/slice/slice_internal.h"
53 #include "src/core/lib/slice/slice_string_helpers.h"
54 #include "src/core/lib/surface/call.h"
55 #include "src/core/lib/surface/channel.h"
56 #include "src/core/lib/surface/channel_init.h"
57 #include "src/core/lib/transport/static_metadata.h"
58
59 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
60 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
61 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
62 #define GRPC_XDS_RECONNECT_JITTER 0.2
63 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
64
65 namespace grpc_core {
66
67 TraceFlag grpc_xds_client_trace(false, "xds_client");
68 TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
69
70 namespace {
71
72 Mutex* g_mu = nullptr;
73 const grpc_channel_args* g_channel_args = nullptr;
74 XdsClient* g_xds_client = nullptr;
75
76 } // namespace
77
78 //
79 // Internal class declarations
80 //
81
82 // An xds call wrapper that can restart a call upon failure. Holds a ref to
83 // the xds channel. The template parameter is the kind of wrapped xds call.
84 template <typename T>
85 class XdsClient::ChannelState::RetryableCall
86 : public InternallyRefCounted<RetryableCall<T>> {
87 public:
88 explicit RetryableCall(RefCountedPtr<ChannelState> chand);
89
90 void Orphan() override;
91
92 void OnCallFinishedLocked();
93
calld() const94 T* calld() const { return calld_.get(); }
chand() const95 ChannelState* chand() const { return chand_.get(); }
96
97 bool IsCurrentCallOnChannel() const;
98
99 private:
100 void StartNewCallLocked();
101 void StartRetryTimerLocked();
102 static void OnRetryTimer(void* arg, grpc_error* error);
103 void OnRetryTimerLocked(grpc_error* error);
104
105 // The wrapped xds call that talks to the xds server. It's instantiated
106 // every time we start a new call. It's null during call retry backoff.
107 OrphanablePtr<T> calld_;
108 // The owning xds channel.
109 RefCountedPtr<ChannelState> chand_;
110
111 // Retry state.
112 BackOff backoff_;
113 grpc_timer retry_timer_;
114 grpc_closure on_retry_timer_;
115 bool retry_timer_callback_pending_ = false;
116
117 bool shutting_down_ = false;
118 };
119
120 // Contains an ADS call to the xds server.
121 class XdsClient::ChannelState::AdsCallState
122 : public InternallyRefCounted<AdsCallState> {
123 public:
124 // The ctor and dtor should not be used directly.
125 explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
126 ~AdsCallState() override;
127
128 void Orphan() override;
129
parent() const130 RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
chand() const131 ChannelState* chand() const { return parent_->chand(); }
xds_client() const132 XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const133 bool seen_response() const { return seen_response_; }
134
135 void Subscribe(const std::string& type_url, const std::string& name);
136 void Unsubscribe(const std::string& type_url, const std::string& name,
137 bool delay_unsubscription);
138
139 bool HasSubscribedResources() const;
140
141 private:
142 class ResourceState : public InternallyRefCounted<ResourceState> {
143 public:
ResourceState(const std::string & type_url,const std::string & name,bool sent_initial_request)144 ResourceState(const std::string& type_url, const std::string& name,
145 bool sent_initial_request)
146 : type_url_(type_url),
147 name_(name),
148 sent_initial_request_(sent_initial_request) {
149 GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
150 grpc_schedule_on_exec_ctx);
151 }
152
Orphan()153 void Orphan() override {
154 Finish();
155 Unref(DEBUG_LOCATION, "Orphan");
156 }
157
Start(RefCountedPtr<AdsCallState> ads_calld)158 void Start(RefCountedPtr<AdsCallState> ads_calld) {
159 if (sent_initial_request_) return;
160 sent_initial_request_ = true;
161 ads_calld_ = std::move(ads_calld);
162 Ref(DEBUG_LOCATION, "timer").release();
163 timer_pending_ = true;
164 grpc_timer_init(
165 &timer_,
166 ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
167 &timer_callback_);
168 }
169
Finish()170 void Finish() {
171 if (timer_pending_) {
172 grpc_timer_cancel(&timer_);
173 timer_pending_ = false;
174 }
175 }
176
177 private:
OnTimer(void * arg,grpc_error * error)178 static void OnTimer(void* arg, grpc_error* error) {
179 ResourceState* self = static_cast<ResourceState*>(arg);
180 {
181 MutexLock lock(&self->ads_calld_->xds_client()->mu_);
182 self->OnTimerLocked(GRPC_ERROR_REF(error));
183 }
184 self->ads_calld_.reset();
185 self->Unref(DEBUG_LOCATION, "timer");
186 }
187
OnTimerLocked(grpc_error * error)188 void OnTimerLocked(grpc_error* error) {
189 if (error == GRPC_ERROR_NONE && timer_pending_) {
190 timer_pending_ = false;
191 grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
192 absl::StrFormat(
193 "timeout obtaining resource {type=%s name=%s} from xds server",
194 type_url_, name_)
195 .c_str());
196 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
197 gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
198 grpc_error_string(watcher_error));
199 }
200 if (type_url_ == XdsApi::kLdsTypeUrl) {
201 ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
202 for (const auto& p : state.watchers) {
203 p.first->OnError(GRPC_ERROR_REF(watcher_error));
204 }
205 } else if (type_url_ == XdsApi::kRdsTypeUrl) {
206 RouteConfigState& state =
207 ads_calld_->xds_client()->route_config_map_[name_];
208 for (const auto& p : state.watchers) {
209 p.first->OnError(GRPC_ERROR_REF(watcher_error));
210 }
211 } else if (type_url_ == XdsApi::kCdsTypeUrl) {
212 ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
213 for (const auto& p : state.watchers) {
214 p.first->OnError(GRPC_ERROR_REF(watcher_error));
215 }
216 } else if (type_url_ == XdsApi::kEdsTypeUrl) {
217 EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
218 for (const auto& p : state.watchers) {
219 p.first->OnError(GRPC_ERROR_REF(watcher_error));
220 }
221 } else {
222 GPR_UNREACHABLE_CODE(return );
223 }
224 GRPC_ERROR_UNREF(watcher_error);
225 }
226 GRPC_ERROR_UNREF(error);
227 }
228
229 const std::string type_url_;
230 const std::string name_;
231
232 RefCountedPtr<AdsCallState> ads_calld_;
233 bool sent_initial_request_;
234 bool timer_pending_ = false;
235 grpc_timer timer_;
236 grpc_closure timer_callback_;
237 };
238
239 struct ResourceTypeState {
~ResourceTypeStategrpc_core::XdsClient::ChannelState::AdsCallState::ResourceTypeState240 ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
241
242 // Nonce and error for this resource type.
243 std::string nonce;
244 grpc_error* error = GRPC_ERROR_NONE;
245
246 // Subscribed resources of this type.
247 std::map<std::string /* name */, OrphanablePtr<ResourceState>>
248 subscribed_resources;
249 };
250
251 void SendMessageLocked(const std::string& type_url);
252
253 void AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map);
254 void AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map);
255 void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
256 void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
257
258 static void OnRequestSent(void* arg, grpc_error* error);
259 void OnRequestSentLocked(grpc_error* error);
260 static void OnResponseReceived(void* arg, grpc_error* error);
261 bool OnResponseReceivedLocked();
262 static void OnStatusReceived(void* arg, grpc_error* error);
263 void OnStatusReceivedLocked(grpc_error* error);
264
265 bool IsCurrentCallOnChannel() const;
266
267 std::set<absl::string_view> ResourceNamesForRequest(
268 const std::string& type_url);
269
270 // The owning RetryableCall<>.
271 RefCountedPtr<RetryableCall<AdsCallState>> parent_;
272
273 bool sent_initial_message_ = false;
274 bool seen_response_ = false;
275
276 // Always non-NULL.
277 grpc_call* call_;
278
279 // recv_initial_metadata
280 grpc_metadata_array initial_metadata_recv_;
281
282 // send_message
283 grpc_byte_buffer* send_message_payload_ = nullptr;
284 grpc_closure on_request_sent_;
285
286 // recv_message
287 grpc_byte_buffer* recv_message_payload_ = nullptr;
288 grpc_closure on_response_received_;
289
290 // recv_trailing_metadata
291 grpc_metadata_array trailing_metadata_recv_;
292 grpc_status_code status_code_;
293 grpc_slice status_details_;
294 grpc_closure on_status_received_;
295
296 // Resource types for which requests need to be sent.
297 std::set<std::string /*type_url*/> buffered_requests_;
298
299 // State for each resource type.
300 std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
301 };
302
303 // Contains an LRS call to the xds server.
304 class XdsClient::ChannelState::LrsCallState
305 : public InternallyRefCounted<LrsCallState> {
306 public:
307 // The ctor and dtor should not be used directly.
308 explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
309 ~LrsCallState() override;
310
311 void Orphan() override;
312
313 void MaybeStartReportingLocked();
314
parent()315 RetryableCall<LrsCallState>* parent() { return parent_.get(); }
chand() const316 ChannelState* chand() const { return parent_->chand(); }
xds_client() const317 XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const318 bool seen_response() const { return seen_response_; }
319
320 private:
321 // Reports client-side load stats according to a fixed interval.
322 class Reporter : public InternallyRefCounted<Reporter> {
323 public:
Reporter(RefCountedPtr<LrsCallState> parent,grpc_millis report_interval)324 Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
325 : parent_(std::move(parent)), report_interval_(report_interval) {
326 GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
327 grpc_schedule_on_exec_ctx);
328 GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
329 grpc_schedule_on_exec_ctx);
330 ScheduleNextReportLocked();
331 }
332
333 void Orphan() override;
334
335 private:
336 void ScheduleNextReportLocked();
337 static void OnNextReportTimer(void* arg, grpc_error* error);
338 bool OnNextReportTimerLocked(grpc_error* error);
339 bool SendReportLocked();
340 static void OnReportDone(void* arg, grpc_error* error);
341 bool OnReportDoneLocked(grpc_error* error);
342
IsCurrentReporterOnCall() const343 bool IsCurrentReporterOnCall() const {
344 return this == parent_->reporter_.get();
345 }
xds_client() const346 XdsClient* xds_client() const { return parent_->xds_client(); }
347
348 // The owning LRS call.
349 RefCountedPtr<LrsCallState> parent_;
350
351 // The load reporting state.
352 const grpc_millis report_interval_;
353 bool last_report_counters_were_zero_ = false;
354 bool next_report_timer_callback_pending_ = false;
355 grpc_timer next_report_timer_;
356 grpc_closure on_next_report_timer_;
357 grpc_closure on_report_done_;
358 };
359
360 static void OnInitialRequestSent(void* arg, grpc_error* error);
361 void OnInitialRequestSentLocked();
362 static void OnResponseReceived(void* arg, grpc_error* error);
363 bool OnResponseReceivedLocked();
364 static void OnStatusReceived(void* arg, grpc_error* error);
365 void OnStatusReceivedLocked(grpc_error* error);
366
367 bool IsCurrentCallOnChannel() const;
368
369 // The owning RetryableCall<>.
370 RefCountedPtr<RetryableCall<LrsCallState>> parent_;
371 bool seen_response_ = false;
372
373 // Always non-NULL.
374 grpc_call* call_;
375
376 // recv_initial_metadata
377 grpc_metadata_array initial_metadata_recv_;
378
379 // send_message
380 grpc_byte_buffer* send_message_payload_ = nullptr;
381 grpc_closure on_initial_request_sent_;
382
383 // recv_message
384 grpc_byte_buffer* recv_message_payload_ = nullptr;
385 grpc_closure on_response_received_;
386
387 // recv_trailing_metadata
388 grpc_metadata_array trailing_metadata_recv_;
389 grpc_status_code status_code_;
390 grpc_slice status_details_;
391 grpc_closure on_status_received_;
392
393 // Load reporting state.
394 bool send_all_clusters_ = false;
395 std::set<std::string> cluster_names_; // Asked for by the LRS server.
396 grpc_millis load_reporting_interval_ = 0;
397 OrphanablePtr<Reporter> reporter_;
398 };
399
400 //
401 // XdsClient::ChannelState::StateWatcher
402 //
403
404 class XdsClient::ChannelState::StateWatcher
405 : public AsyncConnectivityStateWatcherInterface {
406 public:
StateWatcher(RefCountedPtr<ChannelState> parent)407 explicit StateWatcher(RefCountedPtr<ChannelState> parent)
408 : parent_(std::move(parent)) {}
409
410 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)411 void OnConnectivityStateChange(grpc_connectivity_state new_state,
412 const absl::Status& status) override {
413 MutexLock lock(&parent_->xds_client_->mu_);
414 if (!parent_->shutting_down_ &&
415 new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
416 // In TRANSIENT_FAILURE. Notify all watchers of error.
417 gpr_log(GPR_INFO,
418 "[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
419 "status_message:(%s)",
420 parent_->xds_client(), status.ToString().c_str());
421 parent_->xds_client()->NotifyOnErrorLocked(
422 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
423 "xds channel in TRANSIENT_FAILURE"));
424 }
425 }
426
427 RefCountedPtr<ChannelState> parent_;
428 };
429
430 //
431 // XdsClient::ChannelState
432 //
433
434 namespace {
435
CreateXdsChannel(const XdsBootstrap::XdsServer & server)436 grpc_channel* CreateXdsChannel(const XdsBootstrap::XdsServer& server) {
437 // Build channel args.
438 absl::InlinedVector<grpc_arg, 2> args_to_add = {
439 grpc_channel_arg_integer_create(
440 const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
441 5 * 60 * GPR_MS_PER_SEC),
442 grpc_channel_arg_integer_create(
443 const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
444 };
445 grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
446 g_channel_args, args_to_add.data(), args_to_add.size());
447 // Create channel creds.
448 RefCountedPtr<grpc_channel_credentials> channel_creds =
449 XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type,
450 server.channel_creds_config);
451 // Create channel.
452 grpc_channel* channel = grpc_secure_channel_create(
453 channel_creds.get(), server.server_uri.c_str(), new_args, nullptr);
454 grpc_channel_args_destroy(new_args);
455 return channel;
456 }
457
458 } // namespace
459
ChannelState(WeakRefCountedPtr<XdsClient> xds_client,const XdsBootstrap::XdsServer & server)460 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
461 const XdsBootstrap::XdsServer& server)
462 : InternallyRefCounted<ChannelState>(
463 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
464 ? "ChannelState"
465 : nullptr),
466 xds_client_(std::move(xds_client)),
467 server_(server) {
468 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
469 gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
470 xds_client_.get(), server.server_uri.c_str());
471 }
472 channel_ = CreateXdsChannel(server);
473 GPR_ASSERT(channel_ != nullptr);
474 StartConnectivityWatchLocked();
475 }
476
~ChannelState()477 XdsClient::ChannelState::~ChannelState() {
478 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
479 gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
480 this);
481 }
482 grpc_channel_destroy(channel_);
483 xds_client_.reset(DEBUG_LOCATION, "ChannelState");
484 }
485
Orphan()486 void XdsClient::ChannelState::Orphan() {
487 shutting_down_ = true;
488 CancelConnectivityWatchLocked();
489 ads_calld_.reset();
490 lrs_calld_.reset();
491 Unref(DEBUG_LOCATION, "ChannelState+orphaned");
492 }
493
ads_calld() const494 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
495 const {
496 return ads_calld_->calld();
497 }
498
lrs_calld() const499 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
500 const {
501 return lrs_calld_->calld();
502 }
503
HasActiveAdsCall() const504 bool XdsClient::ChannelState::HasActiveAdsCall() const {
505 return ads_calld_->calld() != nullptr;
506 }
507
MaybeStartLrsCall()508 void XdsClient::ChannelState::MaybeStartLrsCall() {
509 if (lrs_calld_ != nullptr) return;
510 lrs_calld_.reset(
511 new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
512 }
513
StopLrsCall()514 void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
515
StartConnectivityWatchLocked()516 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
517 grpc_channel_element* client_channel_elem =
518 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
519 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
520 watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
521 grpc_client_channel_start_connectivity_watch(
522 client_channel_elem, GRPC_CHANNEL_IDLE,
523 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
524 }
525
CancelConnectivityWatchLocked()526 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
527 grpc_channel_element* client_channel_elem =
528 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
529 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
530 grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
531 }
532
Subscribe(const std::string & type_url,const std::string & name)533 void XdsClient::ChannelState::Subscribe(const std::string& type_url,
534 const std::string& name) {
535 if (ads_calld_ == nullptr) {
536 // Start the ADS call if this is the first request.
537 ads_calld_.reset(new RetryableCall<AdsCallState>(
538 Ref(DEBUG_LOCATION, "ChannelState+ads")));
539 // Note: AdsCallState's ctor will automatically subscribe to all
540 // resources that the XdsClient already has watchers for, so we can
541 // return here.
542 return;
543 }
544 // If the ADS call is in backoff state, we don't need to do anything now
545 // because when the call is restarted it will resend all necessary requests.
546 if (ads_calld() == nullptr) return;
547 // Subscribe to this resource if the ADS call is active.
548 ads_calld()->Subscribe(type_url, name);
549 }
550
Unsubscribe(const std::string & type_url,const std::string & name,bool delay_unsubscription)551 void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
552 const std::string& name,
553 bool delay_unsubscription) {
554 if (ads_calld_ != nullptr) {
555 auto* calld = ads_calld_->calld();
556 if (calld != nullptr) {
557 calld->Unsubscribe(type_url, name, delay_unsubscription);
558 if (!calld->HasSubscribedResources()) ads_calld_.reset();
559 }
560 }
561 }
562
563 //
564 // XdsClient::ChannelState::RetryableCall<>
565 //
566
567 template <typename T>
RetryableCall(RefCountedPtr<ChannelState> chand)568 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
569 RefCountedPtr<ChannelState> chand)
570 : chand_(std::move(chand)),
571 backoff_(
572 BackOff::Options()
573 .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
574 1000)
575 .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
576 .set_jitter(GRPC_XDS_RECONNECT_JITTER)
577 .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
578 // Closure Initialization
579 GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
580 grpc_schedule_on_exec_ctx);
581 StartNewCallLocked();
582 }
583
584 template <typename T>
Orphan()585 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
586 shutting_down_ = true;
587 calld_.reset();
588 if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
589 this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
590 }
591
592 template <typename T>
OnCallFinishedLocked()593 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
594 const bool seen_response = calld_->seen_response();
595 calld_.reset();
596 if (seen_response) {
597 // If we lost connection to the xds server, reset backoff and restart the
598 // call immediately.
599 backoff_.Reset();
600 StartNewCallLocked();
601 } else {
602 // If we failed to connect to the xds server, retry later.
603 StartRetryTimerLocked();
604 }
605 }
606
607 template <typename T>
StartNewCallLocked()608 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
609 if (shutting_down_) return;
610 GPR_ASSERT(chand_->channel_ != nullptr);
611 GPR_ASSERT(calld_ == nullptr);
612 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
613 gpr_log(GPR_INFO,
614 "[xds_client %p] Start new call from retryable call (chand: %p, "
615 "retryable call: %p)",
616 chand()->xds_client(), chand(), this);
617 }
618 calld_ = MakeOrphanable<T>(
619 this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
620 }
621
622 template <typename T>
StartRetryTimerLocked()623 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
624 if (shutting_down_) return;
625 const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
626 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
627 grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
628 gpr_log(GPR_INFO,
629 "[xds_client %p] Failed to connect to xds server (chand: %p) "
630 "retry timer will fire in %" PRId64 "ms.",
631 chand()->xds_client(), chand(), timeout);
632 }
633 this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
634 grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
635 retry_timer_callback_pending_ = true;
636 }
637
638 template <typename T>
OnRetryTimer(void * arg,grpc_error * error)639 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
640 void* arg, grpc_error* error) {
641 RetryableCall* calld = static_cast<RetryableCall*>(arg);
642 {
643 MutexLock lock(&calld->chand_->xds_client()->mu_);
644 calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
645 }
646 calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
647 }
648
649 template <typename T>
OnRetryTimerLocked(grpc_error * error)650 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
651 grpc_error* error) {
652 retry_timer_callback_pending_ = false;
653 if (!shutting_down_ && error == GRPC_ERROR_NONE) {
654 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
655 gpr_log(
656 GPR_INFO,
657 "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
658 chand()->xds_client(), chand(), this);
659 }
660 StartNewCallLocked();
661 }
662 GRPC_ERROR_UNREF(error);
663 }
664
665 //
666 // XdsClient::ChannelState::AdsCallState
667 //
668
AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent)669 XdsClient::ChannelState::AdsCallState::AdsCallState(
670 RefCountedPtr<RetryableCall<AdsCallState>> parent)
671 : InternallyRefCounted<AdsCallState>(
672 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
673 ? "AdsCallState"
674 : nullptr),
675 parent_(std::move(parent)) {
676 // Init the ADS call. Note that the call will progress every time there's
677 // activity in xds_client()->interested_parties_, which is comprised of
678 // the polling entities from client_channel.
679 GPR_ASSERT(xds_client() != nullptr);
680 // Create a call with the specified method name.
681 const auto& method =
682 chand()->server_.ShouldUseV3()
683 ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
684 : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
685 call_ = grpc_channel_create_pollset_set_call(
686 chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
687 xds_client()->interested_parties_, method, nullptr,
688 GRPC_MILLIS_INF_FUTURE, nullptr);
689 GPR_ASSERT(call_ != nullptr);
690 // Init data associated with the call.
691 grpc_metadata_array_init(&initial_metadata_recv_);
692 grpc_metadata_array_init(&trailing_metadata_recv_);
693 // Start the call.
694 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
695 gpr_log(GPR_INFO,
696 "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
697 "call: %p)",
698 xds_client(), chand(), this, call_);
699 }
700 // Create the ops.
701 grpc_call_error call_error;
702 grpc_op ops[3];
703 memset(ops, 0, sizeof(ops));
704 // Op: send initial metadata.
705 grpc_op* op = ops;
706 op->op = GRPC_OP_SEND_INITIAL_METADATA;
707 op->data.send_initial_metadata.count = 0;
708 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
709 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
710 op->reserved = nullptr;
711 op++;
712 call_error = grpc_call_start_batch_and_execute(
713 call_, ops, static_cast<size_t>(op - ops), nullptr);
714 GPR_ASSERT(GRPC_CALL_OK == call_error);
715 // Op: send request message.
716 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
717 grpc_schedule_on_exec_ctx);
718 for (const auto& p : xds_client()->listener_map_) {
719 Subscribe(XdsApi::kLdsTypeUrl, std::string(p.first));
720 }
721 for (const auto& p : xds_client()->route_config_map_) {
722 Subscribe(XdsApi::kRdsTypeUrl, std::string(p.first));
723 }
724 for (const auto& p : xds_client()->cluster_map_) {
725 Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
726 }
727 for (const auto& p : xds_client()->endpoint_map_) {
728 Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first));
729 }
730 // Op: recv initial metadata.
731 op = ops;
732 op->op = GRPC_OP_RECV_INITIAL_METADATA;
733 op->data.recv_initial_metadata.recv_initial_metadata =
734 &initial_metadata_recv_;
735 op->flags = 0;
736 op->reserved = nullptr;
737 op++;
738 // Op: recv response.
739 op->op = GRPC_OP_RECV_MESSAGE;
740 op->data.recv_message.recv_message = &recv_message_payload_;
741 op->flags = 0;
742 op->reserved = nullptr;
743 op++;
744 Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
745 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
746 grpc_schedule_on_exec_ctx);
747 call_error = grpc_call_start_batch_and_execute(
748 call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
749 GPR_ASSERT(GRPC_CALL_OK == call_error);
750 // Op: recv server status.
751 op = ops;
752 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
753 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
754 op->data.recv_status_on_client.status = &status_code_;
755 op->data.recv_status_on_client.status_details = &status_details_;
756 op->flags = 0;
757 op->reserved = nullptr;
758 op++;
759 // This callback signals the end of the call, so it relies on the initial
760 // ref instead of a new ref. When it's invoked, it's the initial ref that is
761 // unreffed.
762 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
763 grpc_schedule_on_exec_ctx);
764 call_error = grpc_call_start_batch_and_execute(
765 call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
766 GPR_ASSERT(GRPC_CALL_OK == call_error);
767 }
768
~AdsCallState()769 XdsClient::ChannelState::AdsCallState::~AdsCallState() {
770 grpc_metadata_array_destroy(&initial_metadata_recv_);
771 grpc_metadata_array_destroy(&trailing_metadata_recv_);
772 grpc_byte_buffer_destroy(send_message_payload_);
773 grpc_byte_buffer_destroy(recv_message_payload_);
774 grpc_slice_unref_internal(status_details_);
775 GPR_ASSERT(call_ != nullptr);
776 grpc_call_unref(call_);
777 }
778
Orphan()779 void XdsClient::ChannelState::AdsCallState::Orphan() {
780 GPR_ASSERT(call_ != nullptr);
781 // If we are here because xds_client wants to cancel the call,
782 // on_status_received_ will complete the cancellation and clean up. Otherwise,
783 // we are here because xds_client has to orphan a failed call, then the
784 // following cancellation will be a no-op.
785 grpc_call_cancel_internal(call_);
786 state_map_.clear();
787 // Note that the initial ref is hold by on_status_received_. So the
788 // corresponding unref happens in on_status_received_ instead of here.
789 }
790
SendMessageLocked(const std::string & type_url)791 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
792 const std::string& type_url) {
793 // Buffer message sending if an existing message is in flight.
794 if (send_message_payload_ != nullptr) {
795 buffered_requests_.insert(type_url);
796 return;
797 }
798 auto& state = state_map_[type_url];
799 grpc_slice request_payload_slice;
800 std::set<absl::string_view> resource_names =
801 ResourceNamesForRequest(type_url);
802 request_payload_slice = xds_client()->api_.CreateAdsRequest(
803 chand()->server_, type_url, resource_names,
804 xds_client()->resource_version_map_[type_url], state.nonce,
805 GRPC_ERROR_REF(state.error), !sent_initial_message_);
806 if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
807 type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
808 state_map_.erase(type_url);
809 }
810 sent_initial_message_ = true;
811 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
812 gpr_log(GPR_INFO,
813 "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
814 "error=%s resources=%s",
815 xds_client(), type_url.c_str(),
816 xds_client()->resource_version_map_[type_url].c_str(),
817 state.nonce.c_str(), grpc_error_string(state.error),
818 absl::StrJoin(resource_names, " ").c_str());
819 }
820 GRPC_ERROR_UNREF(state.error);
821 state.error = GRPC_ERROR_NONE;
822 // Create message payload.
823 send_message_payload_ =
824 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
825 grpc_slice_unref_internal(request_payload_slice);
826 // Send the message.
827 grpc_op op;
828 memset(&op, 0, sizeof(op));
829 op.op = GRPC_OP_SEND_MESSAGE;
830 op.data.send_message.send_message = send_message_payload_;
831 Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
832 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
833 grpc_schedule_on_exec_ctx);
834 grpc_call_error call_error =
835 grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
836 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
837 gpr_log(GPR_ERROR,
838 "[xds_client %p] calld=%p call_error=%d sending ADS message",
839 xds_client(), this, call_error);
840 GPR_ASSERT(GRPC_CALL_OK == call_error);
841 }
842 }
843
Subscribe(const std::string & type_url,const std::string & name)844 void XdsClient::ChannelState::AdsCallState::Subscribe(
845 const std::string& type_url, const std::string& name) {
846 auto& state = state_map_[type_url].subscribed_resources[name];
847 if (state == nullptr) {
848 state = MakeOrphanable<ResourceState>(
849 type_url, name, !xds_client()->resource_version_map_[type_url].empty());
850 SendMessageLocked(type_url);
851 }
852 }
853
Unsubscribe(const std::string & type_url,const std::string & name,bool delay_unsubscription)854 void XdsClient::ChannelState::AdsCallState::Unsubscribe(
855 const std::string& type_url, const std::string& name,
856 bool delay_unsubscription) {
857 state_map_[type_url].subscribed_resources.erase(name);
858 if (!delay_unsubscription) SendMessageLocked(type_url);
859 }
860
HasSubscribedResources() const861 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
862 for (const auto& p : state_map_) {
863 if (!p.second.subscribed_resources.empty()) return true;
864 }
865 return false;
866 }
867
AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map)868 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
869 XdsApi::LdsUpdateMap lds_update_map) {
870 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
871 gpr_log(GPR_INFO,
872 "[xds_client %p] LDS update received containing %" PRIuPTR
873 " resources",
874 xds_client(), lds_update_map.size());
875 }
876 auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
877 std::set<std::string> rds_resource_names_seen;
878 for (auto& p : lds_update_map) {
879 const std::string& listener_name = p.first;
880 XdsApi::LdsUpdate& lds_update = p.second;
881 auto& state = lds_state.subscribed_resources[listener_name];
882 if (state != nullptr) state->Finish();
883 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
884 gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: route_config_name=%s",
885 xds_client(), listener_name.c_str(),
886 (!lds_update.route_config_name.empty()
887 ? lds_update.route_config_name.c_str()
888 : "<inlined>"));
889 if (lds_update.rds_update.has_value()) {
890 gpr_log(GPR_INFO, "RouteConfiguration: %s",
891 lds_update.rds_update->ToString().c_str());
892 }
893 }
894 // Record the RDS resource names seen.
895 if (!lds_update.route_config_name.empty()) {
896 rds_resource_names_seen.insert(lds_update.route_config_name);
897 }
898 // Ignore identical update.
899 ListenerState& listener_state = xds_client()->listener_map_[listener_name];
900 if (listener_state.update.has_value() &&
901 *listener_state.update == lds_update) {
902 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
903 gpr_log(GPR_INFO,
904 "[xds_client %p] LDS update for %s identical to current, "
905 "ignoring.",
906 xds_client(), listener_name.c_str());
907 }
908 continue;
909 }
910 // Update the listener state.
911 listener_state.update = std::move(lds_update);
912 // Notify watchers.
913 for (const auto& p : listener_state.watchers) {
914 p.first->OnListenerChanged(*listener_state.update);
915 }
916 }
917 // For any subscribed resource that is not present in the update,
918 // remove it from the cache and notify watchers that it does not exist.
919 for (const auto& p : lds_state.subscribed_resources) {
920 const std::string& listener_name = p.first;
921 if (lds_update_map.find(listener_name) == lds_update_map.end()) {
922 ListenerState& listener_state =
923 xds_client()->listener_map_[listener_name];
924 // If the resource was newly requested but has not yet been received,
925 // we don't want to generate an error for the watchers, because this LDS
926 // response may be in reaction to an earlier request that did not yet
927 // request the new resource, so its absence from the response does not
928 // necessarily indicate that the resource does not exist.
929 // For that case, we rely on the request timeout instead.
930 if (!listener_state.update.has_value()) continue;
931 listener_state.update.reset();
932 for (const auto& p : listener_state.watchers) {
933 p.first->OnResourceDoesNotExist();
934 }
935 }
936 }
937 // For any RDS resource that is no longer referred to by any LDS
938 // resources, remove it from the cache and notify watchers that it
939 // does not exist.
940 auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
941 for (const auto& p : rds_state.subscribed_resources) {
942 const std::string& rds_resource_name = p.first;
943 if (rds_resource_names_seen.find(rds_resource_name) ==
944 rds_resource_names_seen.end()) {
945 RouteConfigState& route_config_state =
946 xds_client()->route_config_map_[rds_resource_name];
947 route_config_state.update.reset();
948 for (const auto& p : route_config_state.watchers) {
949 p.first->OnResourceDoesNotExist();
950 }
951 }
952 }
953 }
954
AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map)955 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
956 XdsApi::RdsUpdateMap rds_update_map) {
957 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
958 gpr_log(GPR_INFO,
959 "[xds_client %p] RDS update received containing %" PRIuPTR
960 " resources",
961 xds_client(), rds_update_map.size());
962 }
963 auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
964 for (auto& p : rds_update_map) {
965 const std::string& route_config_name = p.first;
966 XdsApi::RdsUpdate& rds_update = p.second;
967 auto& state = rds_state.subscribed_resources[route_config_name];
968 if (state != nullptr) state->Finish();
969 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
970 gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
971 rds_update.ToString().c_str());
972 }
973 RouteConfigState& route_config_state =
974 xds_client()->route_config_map_[route_config_name];
975 // Ignore identical update.
976 if (route_config_state.update.has_value() &&
977 *route_config_state.update == rds_update) {
978 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
979 gpr_log(GPR_INFO,
980 "[xds_client %p] RDS resource identical to current, ignoring",
981 xds_client());
982 }
983 continue;
984 }
985 // Update the cache.
986 route_config_state.update = std::move(rds_update);
987 // Notify all watchers.
988 for (const auto& p : route_config_state.watchers) {
989 p.first->OnRouteConfigChanged(*route_config_state.update);
990 }
991 }
992 }
993
AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map)994 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
995 XdsApi::CdsUpdateMap cds_update_map) {
996 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
997 gpr_log(GPR_INFO,
998 "[xds_client %p] CDS update received containing %" PRIuPTR
999 " resources",
1000 xds_client(), cds_update_map.size());
1001 }
1002 auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
1003 std::set<std::string> eds_resource_names_seen;
1004 for (auto& p : cds_update_map) {
1005 const char* cluster_name = p.first.c_str();
1006 XdsApi::CdsUpdate& cds_update = p.second;
1007 auto& state = cds_state.subscribed_resources[cluster_name];
1008 if (state != nullptr) state->Finish();
1009 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1010 gpr_log(GPR_INFO, "[xds_client %p] cluster=%s: %s", xds_client(),
1011 cluster_name, cds_update.ToString().c_str());
1012 }
1013 // Record the EDS resource names seen.
1014 eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
1015 ? cluster_name
1016 : cds_update.eds_service_name);
1017 // Ignore identical update.
1018 ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1019 if (cluster_state.update.has_value() &&
1020 *cluster_state.update == cds_update) {
1021 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1022 gpr_log(GPR_INFO,
1023 "[xds_client %p] CDS update identical to current, ignoring.",
1024 xds_client());
1025 }
1026 continue;
1027 }
1028 // Update the cluster state.
1029 cluster_state.update = std::move(cds_update);
1030 // Notify all watchers.
1031 for (const auto& p : cluster_state.watchers) {
1032 p.first->OnClusterChanged(cluster_state.update.value());
1033 }
1034 }
1035 // For any subscribed resource that is not present in the update,
1036 // remove it from the cache and notify watchers that it does not exist.
1037 for (const auto& p : cds_state.subscribed_resources) {
1038 const std::string& cluster_name = p.first;
1039 if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
1040 ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1041 // If the resource was newly requested but has not yet been received,
1042 // we don't want to generate an error for the watchers, because this CDS
1043 // response may be in reaction to an earlier request that did not yet
1044 // request the new resource, so its absence from the response does not
1045 // necessarily indicate that the resource does not exist.
1046 // For that case, we rely on the request timeout instead.
1047 if (!cluster_state.update.has_value()) continue;
1048 cluster_state.update.reset();
1049 for (const auto& p : cluster_state.watchers) {
1050 p.first->OnResourceDoesNotExist();
1051 }
1052 }
1053 }
1054 // For any EDS resource that is no longer referred to by any CDS
1055 // resources, remove it from the cache and notify watchers that it
1056 // does not exist.
1057 auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1058 for (const auto& p : eds_state.subscribed_resources) {
1059 const std::string& eds_resource_name = p.first;
1060 if (eds_resource_names_seen.find(eds_resource_name) ==
1061 eds_resource_names_seen.end()) {
1062 EndpointState& endpoint_state =
1063 xds_client()->endpoint_map_[eds_resource_name];
1064 endpoint_state.update.reset();
1065 for (const auto& p : endpoint_state.watchers) {
1066 p.first->OnResourceDoesNotExist();
1067 }
1068 }
1069 }
1070 }
1071
AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map)1072 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
1073 XdsApi::EdsUpdateMap eds_update_map) {
1074 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1075 gpr_log(GPR_INFO,
1076 "[xds_client %p] EDS update received containing %" PRIuPTR
1077 " resources",
1078 xds_client(), eds_update_map.size());
1079 }
1080 auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1081 for (auto& p : eds_update_map) {
1082 const char* eds_service_name = p.first.c_str();
1083 XdsApi::EdsUpdate& eds_update = p.second;
1084 auto& state = eds_state.subscribed_resources[eds_service_name];
1085 if (state != nullptr) state->Finish();
1086 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1087 gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
1088 eds_service_name, eds_update.ToString().c_str());
1089 }
1090 EndpointState& endpoint_state =
1091 xds_client()->endpoint_map_[eds_service_name];
1092 // Ignore identical update.
1093 if (endpoint_state.update.has_value() &&
1094 *endpoint_state.update == eds_update) {
1095 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1096 gpr_log(GPR_INFO,
1097 "[xds_client %p] EDS update identical to current, ignoring.",
1098 xds_client());
1099 }
1100 continue;
1101 }
1102 // Update the cluster state.
1103 endpoint_state.update = std::move(eds_update);
1104 // Notify all watchers.
1105 for (const auto& p : endpoint_state.watchers) {
1106 p.first->OnEndpointChanged(endpoint_state.update.value());
1107 }
1108 }
1109 }
1110
OnRequestSent(void * arg,grpc_error * error)1111 void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
1112 grpc_error* error) {
1113 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1114 {
1115 MutexLock lock(&ads_calld->xds_client()->mu_);
1116 ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
1117 }
1118 ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1119 }
1120
OnRequestSentLocked(grpc_error * error)1121 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1122 grpc_error* error) {
1123 if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
1124 // Clean up the sent message.
1125 grpc_byte_buffer_destroy(send_message_payload_);
1126 send_message_payload_ = nullptr;
1127 // Continue to send another pending message if any.
1128 // TODO(roth): The current code to handle buffered messages has the
1129 // advantage of sending only the most recent list of resource names for
1130 // each resource type (no matter how many times that resource type has
1131 // been requested to send while the current message sending is still
1132 // pending). But its disadvantage is that we send the requests in fixed
1133 // order of resource types. We need to fix this if we are seeing some
1134 // resource type(s) starved due to frequent requests of other resource
1135 // type(s).
1136 auto it = buffered_requests_.begin();
1137 if (it != buffered_requests_.end()) {
1138 SendMessageLocked(*it);
1139 buffered_requests_.erase(it);
1140 }
1141 }
1142 GRPC_ERROR_UNREF(error);
1143 }
1144
OnResponseReceived(void * arg,grpc_error *)1145 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1146 void* arg, grpc_error* /* error */) {
1147 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1148 bool done;
1149 {
1150 MutexLock lock(&ads_calld->xds_client()->mu_);
1151 done = ads_calld->OnResponseReceivedLocked();
1152 }
1153 if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1154 }
1155
OnResponseReceivedLocked()1156 bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1157 // Empty payload means the call was cancelled.
1158 if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1159 return true;
1160 }
1161 // Read the response.
1162 grpc_byte_buffer_reader bbr;
1163 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1164 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1165 grpc_byte_buffer_reader_destroy(&bbr);
1166 grpc_byte_buffer_destroy(recv_message_payload_);
1167 recv_message_payload_ = nullptr;
1168 // Parse and validate the response.
1169 XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
1170 response_slice, ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
1171 ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
1172 ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
1173 ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
1174 grpc_slice_unref_internal(response_slice);
1175 if (result.type_url.empty()) {
1176 // Ignore unparsable response.
1177 gpr_log(GPR_ERROR,
1178 "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
1179 xds_client(), grpc_error_string(result.parse_error));
1180 GRPC_ERROR_UNREF(result.parse_error);
1181 } else {
1182 // Update nonce.
1183 auto& state = state_map_[result.type_url];
1184 state.nonce = std::move(result.nonce);
1185 // NACK or ACK the response.
1186 if (result.parse_error != GRPC_ERROR_NONE) {
1187 GRPC_ERROR_UNREF(state.error);
1188 state.error = result.parse_error;
1189 // NACK unacceptable update.
1190 gpr_log(GPR_ERROR,
1191 "[xds_client %p] ADS response invalid for resource type %s "
1192 "version %s, will NACK: nonce=%s error=%s",
1193 xds_client(), result.type_url.c_str(), result.version.c_str(),
1194 state.nonce.c_str(), grpc_error_string(result.parse_error));
1195 SendMessageLocked(result.type_url);
1196 } else {
1197 seen_response_ = true;
1198 // Accept the ADS response according to the type_url.
1199 if (result.type_url == XdsApi::kLdsTypeUrl) {
1200 AcceptLdsUpdate(std::move(result.lds_update_map));
1201 } else if (result.type_url == XdsApi::kRdsTypeUrl) {
1202 AcceptRdsUpdate(std::move(result.rds_update_map));
1203 } else if (result.type_url == XdsApi::kCdsTypeUrl) {
1204 AcceptCdsUpdate(std::move(result.cds_update_map));
1205 } else if (result.type_url == XdsApi::kEdsTypeUrl) {
1206 AcceptEdsUpdate(std::move(result.eds_update_map));
1207 }
1208 xds_client()->resource_version_map_[result.type_url] =
1209 std::move(result.version);
1210 // ACK the update.
1211 SendMessageLocked(result.type_url);
1212 // Start load reporting if needed.
1213 auto& lrs_call = chand()->lrs_calld_;
1214 if (lrs_call != nullptr) {
1215 LrsCallState* lrs_calld = lrs_call->calld();
1216 if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1217 }
1218 }
1219 }
1220 if (xds_client()->shutting_down_) return true;
1221 // Keep listening for updates.
1222 grpc_op op;
1223 memset(&op, 0, sizeof(op));
1224 op.op = GRPC_OP_RECV_MESSAGE;
1225 op.data.recv_message.recv_message = &recv_message_payload_;
1226 op.flags = 0;
1227 op.reserved = nullptr;
1228 GPR_ASSERT(call_ != nullptr);
1229 // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1230 const grpc_call_error call_error =
1231 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1232 GPR_ASSERT(GRPC_CALL_OK == call_error);
1233 return false;
1234 }
1235
OnStatusReceived(void * arg,grpc_error * error)1236 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1237 void* arg, grpc_error* error) {
1238 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1239 {
1240 MutexLock lock(&ads_calld->xds_client()->mu_);
1241 ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1242 }
1243 ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1244 }
1245
OnStatusReceivedLocked(grpc_error * error)1246 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1247 grpc_error* error) {
1248 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1249 char* status_details = grpc_slice_to_c_string(status_details_);
1250 gpr_log(GPR_INFO,
1251 "[xds_client %p] ADS call status received. Status = %d, details "
1252 "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
1253 xds_client(), status_code_, status_details, chand(), this, call_,
1254 grpc_error_string(error));
1255 gpr_free(status_details);
1256 }
1257 // Ignore status from a stale call.
1258 if (IsCurrentCallOnChannel()) {
1259 // Try to restart the call.
1260 parent_->OnCallFinishedLocked();
1261 // Send error to all watchers.
1262 xds_client()->NotifyOnErrorLocked(
1263 GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
1264 }
1265 GRPC_ERROR_UNREF(error);
1266 }
1267
IsCurrentCallOnChannel() const1268 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1269 // If the retryable ADS call is null (which only happens when the xds channel
1270 // is shutting down), all the ADS calls are stale.
1271 if (chand()->ads_calld_ == nullptr) return false;
1272 return this == chand()->ads_calld_->calld();
1273 }
1274
1275 std::set<absl::string_view>
ResourceNamesForRequest(const std::string & type_url)1276 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1277 const std::string& type_url) {
1278 std::set<absl::string_view> resource_names;
1279 auto it = state_map_.find(type_url);
1280 if (it != state_map_.end()) {
1281 for (auto& p : it->second.subscribed_resources) {
1282 resource_names.insert(p.first);
1283 OrphanablePtr<ResourceState>& state = p.second;
1284 state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
1285 }
1286 }
1287 return resource_names;
1288 }
1289
1290 //
1291 // XdsClient::ChannelState::LrsCallState::Reporter
1292 //
1293
Orphan()1294 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1295 if (next_report_timer_callback_pending_) {
1296 grpc_timer_cancel(&next_report_timer_);
1297 }
1298 }
1299
1300 void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked()1301 ScheduleNextReportLocked() {
1302 const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1303 grpc_timer_init(&next_report_timer_, next_report_time,
1304 &on_next_report_timer_);
1305 next_report_timer_callback_pending_ = true;
1306 }
1307
OnNextReportTimer(void * arg,grpc_error * error)1308 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1309 void* arg, grpc_error* error) {
1310 Reporter* self = static_cast<Reporter*>(arg);
1311 bool done;
1312 {
1313 MutexLock lock(&self->xds_client()->mu_);
1314 done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
1315 }
1316 if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
1317 }
1318
OnNextReportTimerLocked(grpc_error * error)1319 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1320 grpc_error* error) {
1321 next_report_timer_callback_pending_ = false;
1322 if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1323 GRPC_ERROR_UNREF(error);
1324 return true;
1325 }
1326 return SendReportLocked();
1327 }
1328
1329 namespace {
1330
LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap & snapshot)1331 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1332 for (const auto& p : snapshot) {
1333 const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1334 if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1335 for (const auto& q : cluster_snapshot.locality_stats) {
1336 const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1337 if (!locality_snapshot.IsZero()) return false;
1338 }
1339 }
1340 return true;
1341 }
1342
1343 } // namespace
1344
SendReportLocked()1345 bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1346 // Construct snapshot from all reported stats.
1347 XdsApi::ClusterLoadReportMap snapshot =
1348 xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
1349 parent_->cluster_names_);
1350 // Skip client load report if the counters were all zero in the last
1351 // report and they are still zero in this one.
1352 const bool old_val = last_report_counters_were_zero_;
1353 last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1354 if (old_val && last_report_counters_were_zero_) {
1355 if (xds_client()->load_report_map_.empty()) {
1356 parent_->chand()->StopLrsCall();
1357 return true;
1358 }
1359 ScheduleNextReportLocked();
1360 return false;
1361 }
1362 // Create a request that contains the snapshot.
1363 grpc_slice request_payload_slice =
1364 xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1365 parent_->send_message_payload_ =
1366 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1367 grpc_slice_unref_internal(request_payload_slice);
1368 // Send the report.
1369 grpc_op op;
1370 memset(&op, 0, sizeof(op));
1371 op.op = GRPC_OP_SEND_MESSAGE;
1372 op.data.send_message.send_message = parent_->send_message_payload_;
1373 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1374 parent_->call_, &op, 1, &on_report_done_);
1375 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1376 gpr_log(GPR_ERROR,
1377 "[xds_client %p] calld=%p call_error=%d sending client load report",
1378 xds_client(), this, call_error);
1379 GPR_ASSERT(GRPC_CALL_OK == call_error);
1380 }
1381 return false;
1382 }
1383
OnReportDone(void * arg,grpc_error * error)1384 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
1385 void* arg, grpc_error* error) {
1386 Reporter* self = static_cast<Reporter*>(arg);
1387 bool done;
1388 {
1389 MutexLock lock(&self->xds_client()->mu_);
1390 done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
1391 }
1392 if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1393 }
1394
OnReportDoneLocked(grpc_error * error)1395 bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1396 grpc_error* error) {
1397 grpc_byte_buffer_destroy(parent_->send_message_payload_);
1398 parent_->send_message_payload_ = nullptr;
1399 // If there are no more registered stats to report, cancel the call.
1400 if (xds_client()->load_report_map_.empty()) {
1401 parent_->chand()->StopLrsCall();
1402 GRPC_ERROR_UNREF(error);
1403 return true;
1404 }
1405 if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1406 GRPC_ERROR_UNREF(error);
1407 // If this reporter is no longer the current one on the call, the reason
1408 // might be that it was orphaned for a new one due to config update.
1409 if (!IsCurrentReporterOnCall()) {
1410 parent_->MaybeStartReportingLocked();
1411 }
1412 return true;
1413 }
1414 ScheduleNextReportLocked();
1415 return false;
1416 }
1417
1418 //
1419 // XdsClient::ChannelState::LrsCallState
1420 //
1421
LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent)1422 XdsClient::ChannelState::LrsCallState::LrsCallState(
1423 RefCountedPtr<RetryableCall<LrsCallState>> parent)
1424 : InternallyRefCounted<LrsCallState>(
1425 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace)
1426 ? "LrsCallState"
1427 : nullptr),
1428 parent_(std::move(parent)) {
1429 // Init the LRS call. Note that the call will progress every time there's
1430 // activity in xds_client()->interested_parties_, which is comprised of
1431 // the polling entities from client_channel.
1432 GPR_ASSERT(xds_client() != nullptr);
1433 const auto& method =
1434 chand()->server_.ShouldUseV3()
1435 ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
1436 : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
1437 call_ = grpc_channel_create_pollset_set_call(
1438 chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1439 xds_client()->interested_parties_, method, nullptr,
1440 GRPC_MILLIS_INF_FUTURE, nullptr);
1441 GPR_ASSERT(call_ != nullptr);
1442 // Init the request payload.
1443 grpc_slice request_payload_slice =
1444 xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
1445 send_message_payload_ =
1446 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1447 grpc_slice_unref_internal(request_payload_slice);
1448 // Init other data associated with the LRS call.
1449 grpc_metadata_array_init(&initial_metadata_recv_);
1450 grpc_metadata_array_init(&trailing_metadata_recv_);
1451 // Start the call.
1452 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1453 gpr_log(GPR_INFO,
1454 "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
1455 "call: %p)",
1456 xds_client(), chand(), this, call_);
1457 }
1458 // Create the ops.
1459 grpc_call_error call_error;
1460 grpc_op ops[3];
1461 memset(ops, 0, sizeof(ops));
1462 // Op: send initial metadata.
1463 grpc_op* op = ops;
1464 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1465 op->data.send_initial_metadata.count = 0;
1466 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
1467 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1468 op->reserved = nullptr;
1469 op++;
1470 // Op: send request message.
1471 GPR_ASSERT(send_message_payload_ != nullptr);
1472 op->op = GRPC_OP_SEND_MESSAGE;
1473 op->data.send_message.send_message = send_message_payload_;
1474 op->flags = 0;
1475 op->reserved = nullptr;
1476 op++;
1477 Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1478 GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
1479 grpc_schedule_on_exec_ctx);
1480 call_error = grpc_call_start_batch_and_execute(
1481 call_, ops, static_cast<size_t>(op - ops), &on_initial_request_sent_);
1482 GPR_ASSERT(GRPC_CALL_OK == call_error);
1483 // Op: recv initial metadata.
1484 op = ops;
1485 op->op = GRPC_OP_RECV_INITIAL_METADATA;
1486 op->data.recv_initial_metadata.recv_initial_metadata =
1487 &initial_metadata_recv_;
1488 op->flags = 0;
1489 op->reserved = nullptr;
1490 op++;
1491 // Op: recv response.
1492 op->op = GRPC_OP_RECV_MESSAGE;
1493 op->data.recv_message.recv_message = &recv_message_payload_;
1494 op->flags = 0;
1495 op->reserved = nullptr;
1496 op++;
1497 Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1498 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1499 grpc_schedule_on_exec_ctx);
1500 call_error = grpc_call_start_batch_and_execute(
1501 call_, ops, static_cast<size_t>(op - ops), &on_response_received_);
1502 GPR_ASSERT(GRPC_CALL_OK == call_error);
1503 // Op: recv server status.
1504 op = ops;
1505 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1506 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1507 op->data.recv_status_on_client.status = &status_code_;
1508 op->data.recv_status_on_client.status_details = &status_details_;
1509 op->flags = 0;
1510 op->reserved = nullptr;
1511 op++;
1512 // This callback signals the end of the call, so it relies on the initial
1513 // ref instead of a new ref. When it's invoked, it's the initial ref that is
1514 // unreffed.
1515 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1516 grpc_schedule_on_exec_ctx);
1517 call_error = grpc_call_start_batch_and_execute(
1518 call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
1519 GPR_ASSERT(GRPC_CALL_OK == call_error);
1520 }
1521
~LrsCallState()1522 XdsClient::ChannelState::LrsCallState::~LrsCallState() {
1523 grpc_metadata_array_destroy(&initial_metadata_recv_);
1524 grpc_metadata_array_destroy(&trailing_metadata_recv_);
1525 grpc_byte_buffer_destroy(send_message_payload_);
1526 grpc_byte_buffer_destroy(recv_message_payload_);
1527 grpc_slice_unref_internal(status_details_);
1528 GPR_ASSERT(call_ != nullptr);
1529 grpc_call_unref(call_);
1530 }
1531
Orphan()1532 void XdsClient::ChannelState::LrsCallState::Orphan() {
1533 reporter_.reset();
1534 GPR_ASSERT(call_ != nullptr);
1535 // If we are here because xds_client wants to cancel the call,
1536 // on_status_received_ will complete the cancellation and clean up. Otherwise,
1537 // we are here because xds_client has to orphan a failed call, then the
1538 // following cancellation will be a no-op.
1539 grpc_call_cancel_internal(call_);
1540 // Note that the initial ref is hold by on_status_received_. So the
1541 // corresponding unref happens in on_status_received_ instead of here.
1542 }
1543
MaybeStartReportingLocked()1544 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1545 // Don't start again if already started.
1546 if (reporter_ != nullptr) return;
1547 // Don't start if the previous send_message op (of the initial request or the
1548 // last report of the previous reporter) hasn't completed.
1549 if (send_message_payload_ != nullptr) return;
1550 // Don't start if no LRS response has arrived.
1551 if (!seen_response()) return;
1552 // Don't start if the ADS call hasn't received any valid response. Note that
1553 // this must be the first channel because it is the current channel but its
1554 // ADS call hasn't seen any response.
1555 if (chand()->ads_calld_ == nullptr ||
1556 chand()->ads_calld_->calld() == nullptr ||
1557 !chand()->ads_calld_->calld()->seen_response()) {
1558 return;
1559 }
1560 // Start reporting.
1561 reporter_ = MakeOrphanable<Reporter>(
1562 Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1563 }
1564
OnInitialRequestSent(void * arg,grpc_error *)1565 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
1566 void* arg, grpc_error* /*error*/) {
1567 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1568 {
1569 MutexLock lock(&lrs_calld->xds_client()->mu_);
1570 lrs_calld->OnInitialRequestSentLocked();
1571 }
1572 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1573 }
1574
OnInitialRequestSentLocked()1575 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1576 // Clear the send_message_payload_.
1577 grpc_byte_buffer_destroy(send_message_payload_);
1578 send_message_payload_ = nullptr;
1579 MaybeStartReportingLocked();
1580 }
1581
OnResponseReceived(void * arg,grpc_error *)1582 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1583 void* arg, grpc_error* /*error*/) {
1584 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1585 bool done;
1586 {
1587 MutexLock lock(&lrs_calld->xds_client()->mu_);
1588 done = lrs_calld->OnResponseReceivedLocked();
1589 }
1590 if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1591 }
1592
OnResponseReceivedLocked()1593 bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1594 // Empty payload means the call was cancelled.
1595 if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1596 return true;
1597 }
1598 // Read the response.
1599 grpc_byte_buffer_reader bbr;
1600 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1601 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1602 grpc_byte_buffer_reader_destroy(&bbr);
1603 grpc_byte_buffer_destroy(recv_message_payload_);
1604 recv_message_payload_ = nullptr;
1605 // This anonymous lambda is a hack to avoid the usage of goto.
1606 [&]() {
1607 // Parse the response.
1608 bool send_all_clusters = false;
1609 std::set<std::string> new_cluster_names;
1610 grpc_millis new_load_reporting_interval;
1611 grpc_error* parse_error = xds_client()->api_.ParseLrsResponse(
1612 response_slice, &send_all_clusters, &new_cluster_names,
1613 &new_load_reporting_interval);
1614 if (parse_error != GRPC_ERROR_NONE) {
1615 gpr_log(GPR_ERROR,
1616 "[xds_client %p] LRS response parsing failed. error=%s",
1617 xds_client(), grpc_error_string(parse_error));
1618 GRPC_ERROR_UNREF(parse_error);
1619 return;
1620 }
1621 seen_response_ = true;
1622 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1623 gpr_log(
1624 GPR_INFO,
1625 "[xds_client %p] LRS response received, %" PRIuPTR
1626 " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1627 "ms",
1628 xds_client(), new_cluster_names.size(), send_all_clusters,
1629 new_load_reporting_interval);
1630 size_t i = 0;
1631 for (const auto& name : new_cluster_names) {
1632 gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1633 xds_client(), i++, name.c_str());
1634 }
1635 }
1636 if (new_load_reporting_interval <
1637 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1638 new_load_reporting_interval =
1639 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1640 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1641 gpr_log(GPR_INFO,
1642 "[xds_client %p] Increased load_report_interval to minimum "
1643 "value %dms",
1644 xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1645 }
1646 }
1647 // Ignore identical update.
1648 if (send_all_clusters == send_all_clusters_ &&
1649 cluster_names_ == new_cluster_names &&
1650 load_reporting_interval_ == new_load_reporting_interval) {
1651 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1652 gpr_log(GPR_INFO,
1653 "[xds_client %p] Incoming LRS response identical to current, "
1654 "ignoring.",
1655 xds_client());
1656 }
1657 return;
1658 }
1659 // Stop current load reporting (if any) to adopt the new config.
1660 reporter_.reset();
1661 // Record the new config.
1662 send_all_clusters_ = send_all_clusters;
1663 cluster_names_ = std::move(new_cluster_names);
1664 load_reporting_interval_ = new_load_reporting_interval;
1665 // Try starting sending load report.
1666 MaybeStartReportingLocked();
1667 }();
1668 grpc_slice_unref_internal(response_slice);
1669 if (xds_client()->shutting_down_) return true;
1670 // Keep listening for LRS config updates.
1671 grpc_op op;
1672 memset(&op, 0, sizeof(op));
1673 op.op = GRPC_OP_RECV_MESSAGE;
1674 op.data.recv_message.recv_message = &recv_message_payload_;
1675 op.flags = 0;
1676 op.reserved = nullptr;
1677 GPR_ASSERT(call_ != nullptr);
1678 // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1679 const grpc_call_error call_error =
1680 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1681 GPR_ASSERT(GRPC_CALL_OK == call_error);
1682 return false;
1683 }
1684
OnStatusReceived(void * arg,grpc_error * error)1685 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1686 void* arg, grpc_error* error) {
1687 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1688 {
1689 MutexLock lock(&lrs_calld->xds_client()->mu_);
1690 lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1691 }
1692 lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1693 }
1694
OnStatusReceivedLocked(grpc_error * error)1695 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1696 grpc_error* error) {
1697 GPR_ASSERT(call_ != nullptr);
1698 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1699 char* status_details = grpc_slice_to_c_string(status_details_);
1700 gpr_log(GPR_INFO,
1701 "[xds_client %p] LRS call status received. Status = %d, details "
1702 "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
1703 xds_client(), status_code_, status_details, chand(), this, call_,
1704 grpc_error_string(error));
1705 gpr_free(status_details);
1706 }
1707 // Ignore status from a stale call.
1708 if (IsCurrentCallOnChannel()) {
1709 GPR_ASSERT(!xds_client()->shutting_down_);
1710 // Try to restart the call.
1711 parent_->OnCallFinishedLocked();
1712 }
1713 GRPC_ERROR_UNREF(error);
1714 }
1715
IsCurrentCallOnChannel() const1716 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1717 // If the retryable LRS call is null (which only happens when the xds channel
1718 // is shutting down), all the LRS calls are stale.
1719 if (chand()->lrs_calld_ == nullptr) return false;
1720 return this == chand()->lrs_calld_->calld();
1721 }
1722
1723 //
1724 // XdsClient
1725 //
1726
1727 namespace {
1728
GetRequestTimeout()1729 grpc_millis GetRequestTimeout() {
1730 return grpc_channel_args_find_integer(
1731 g_channel_args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
1732 {15000, 0, INT_MAX});
1733 }
1734
1735 } // namespace
1736
XdsClient(grpc_error ** error)1737 XdsClient::XdsClient(grpc_error** error)
1738 : DualRefCounted<XdsClient>(
1739 GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_refcount_trace) ? "XdsClient"
1740 : nullptr),
1741 request_timeout_(GetRequestTimeout()),
1742 interested_parties_(grpc_pollset_set_create()),
1743 bootstrap_(
1744 XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
1745 certificate_provider_store_(MakeOrphanable<CertificateProviderStore>(
1746 bootstrap_ == nullptr
1747 ? CertificateProviderStore::PluginDefinitionMap()
1748 : bootstrap_->certificate_providers())),
1749 api_(this, &grpc_xds_client_trace,
1750 bootstrap_ == nullptr ? nullptr : bootstrap_->node()) {
1751 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1752 gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1753 }
1754 if (*error != GRPC_ERROR_NONE) {
1755 gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
1756 this, grpc_error_string(*error));
1757 return;
1758 }
1759 // Create ChannelState object.
1760 chand_ = MakeOrphanable<ChannelState>(
1761 WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
1762 }
1763
~XdsClient()1764 XdsClient::~XdsClient() {
1765 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1766 gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1767 }
1768 grpc_pollset_set_destroy(interested_parties_);
1769 }
1770
AddChannelzLinkage(channelz::ChannelNode * parent_channelz_node)1771 void XdsClient::AddChannelzLinkage(
1772 channelz::ChannelNode* parent_channelz_node) {
1773 channelz::ChannelNode* xds_channelz_node =
1774 grpc_channel_get_channelz_node(chand_->channel());
1775 if (xds_channelz_node != nullptr) {
1776 parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
1777 }
1778 }
1779
RemoveChannelzLinkage(channelz::ChannelNode * parent_channelz_node)1780 void XdsClient::RemoveChannelzLinkage(
1781 channelz::ChannelNode* parent_channelz_node) {
1782 channelz::ChannelNode* xds_channelz_node =
1783 grpc_channel_get_channelz_node(chand_->channel());
1784 if (xds_channelz_node != nullptr) {
1785 parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
1786 }
1787 }
1788
Orphan()1789 void XdsClient::Orphan() {
1790 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1791 gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1792 }
1793 {
1794 MutexLock lock(g_mu);
1795 if (g_xds_client == this) g_xds_client = nullptr;
1796 }
1797 {
1798 MutexLock lock(&mu_);
1799 shutting_down_ = true;
1800 // Orphan ChannelState object.
1801 chand_.reset();
1802 // We do not clear cluster_map_ and endpoint_map_ if the xds client was
1803 // created by the XdsResolver because the maps contain refs for watchers
1804 // which in turn hold refs to the loadbalancing policies. At this point, it
1805 // is possible for ADS calls to be in progress. Unreffing the loadbalancing
1806 // policies before those calls are done would lead to issues such as
1807 // https://github.com/grpc/grpc/issues/20928.
1808 if (!listener_map_.empty()) {
1809 cluster_map_.clear();
1810 endpoint_map_.clear();
1811 }
1812 }
1813 }
1814
WatchListenerData(absl::string_view listener_name,std::unique_ptr<ListenerWatcherInterface> watcher)1815 void XdsClient::WatchListenerData(
1816 absl::string_view listener_name,
1817 std::unique_ptr<ListenerWatcherInterface> watcher) {
1818 std::string listener_name_str = std::string(listener_name);
1819 MutexLock lock(&mu_);
1820 ListenerState& listener_state = listener_map_[listener_name_str];
1821 ListenerWatcherInterface* w = watcher.get();
1822 listener_state.watchers[w] = std::move(watcher);
1823 // If we've already received an LDS update, notify the new watcher
1824 // immediately.
1825 if (listener_state.update.has_value()) {
1826 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1827 gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
1828 this, listener_name_str.c_str());
1829 }
1830 w->OnListenerChanged(*listener_state.update);
1831 }
1832 chand_->Subscribe(XdsApi::kLdsTypeUrl, listener_name_str);
1833 }
1834
CancelListenerDataWatch(absl::string_view listener_name,ListenerWatcherInterface * watcher,bool delay_unsubscription)1835 void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
1836 ListenerWatcherInterface* watcher,
1837 bool delay_unsubscription) {
1838 MutexLock lock(&mu_);
1839 if (shutting_down_) return;
1840 std::string listener_name_str = std::string(listener_name);
1841 ListenerState& listener_state = listener_map_[listener_name_str];
1842 auto it = listener_state.watchers.find(watcher);
1843 if (it != listener_state.watchers.end()) {
1844 listener_state.watchers.erase(it);
1845 if (listener_state.watchers.empty()) {
1846 listener_map_.erase(listener_name_str);
1847 chand_->Unsubscribe(XdsApi::kLdsTypeUrl, listener_name_str,
1848 delay_unsubscription);
1849 }
1850 }
1851 }
1852
WatchRouteConfigData(absl::string_view route_config_name,std::unique_ptr<RouteConfigWatcherInterface> watcher)1853 void XdsClient::WatchRouteConfigData(
1854 absl::string_view route_config_name,
1855 std::unique_ptr<RouteConfigWatcherInterface> watcher) {
1856 std::string route_config_name_str = std::string(route_config_name);
1857 MutexLock lock(&mu_);
1858 RouteConfigState& route_config_state =
1859 route_config_map_[route_config_name_str];
1860 RouteConfigWatcherInterface* w = watcher.get();
1861 route_config_state.watchers[w] = std::move(watcher);
1862 // If we've already received an RDS update, notify the new watcher
1863 // immediately.
1864 if (route_config_state.update.has_value()) {
1865 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1866 gpr_log(GPR_INFO,
1867 "[xds_client %p] returning cached route config data for %s", this,
1868 route_config_name_str.c_str());
1869 }
1870 w->OnRouteConfigChanged(*route_config_state.update);
1871 }
1872 chand_->Subscribe(XdsApi::kRdsTypeUrl, route_config_name_str);
1873 }
1874
CancelRouteConfigDataWatch(absl::string_view route_config_name,RouteConfigWatcherInterface * watcher,bool delay_unsubscription)1875 void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
1876 RouteConfigWatcherInterface* watcher,
1877 bool delay_unsubscription) {
1878 MutexLock lock(&mu_);
1879 if (shutting_down_) return;
1880 std::string route_config_name_str = std::string(route_config_name);
1881 RouteConfigState& route_config_state =
1882 route_config_map_[route_config_name_str];
1883 auto it = route_config_state.watchers.find(watcher);
1884 if (it != route_config_state.watchers.end()) {
1885 route_config_state.watchers.erase(it);
1886 if (route_config_state.watchers.empty()) {
1887 route_config_map_.erase(route_config_name_str);
1888 chand_->Unsubscribe(XdsApi::kRdsTypeUrl, route_config_name_str,
1889 delay_unsubscription);
1890 }
1891 }
1892 }
1893
WatchClusterData(absl::string_view cluster_name,std::unique_ptr<ClusterWatcherInterface> watcher)1894 void XdsClient::WatchClusterData(
1895 absl::string_view cluster_name,
1896 std::unique_ptr<ClusterWatcherInterface> watcher) {
1897 std::string cluster_name_str = std::string(cluster_name);
1898 MutexLock lock(&mu_);
1899 ClusterState& cluster_state = cluster_map_[cluster_name_str];
1900 ClusterWatcherInterface* w = watcher.get();
1901 cluster_state.watchers[w] = std::move(watcher);
1902 // If we've already received a CDS update, notify the new watcher
1903 // immediately.
1904 if (cluster_state.update.has_value()) {
1905 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1906 gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
1907 this, cluster_name_str.c_str());
1908 }
1909 w->OnClusterChanged(cluster_state.update.value());
1910 }
1911 chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
1912 }
1913
CancelClusterDataWatch(absl::string_view cluster_name,ClusterWatcherInterface * watcher,bool delay_unsubscription)1914 void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
1915 ClusterWatcherInterface* watcher,
1916 bool delay_unsubscription) {
1917 MutexLock lock(&mu_);
1918 if (shutting_down_) return;
1919 std::string cluster_name_str = std::string(cluster_name);
1920 ClusterState& cluster_state = cluster_map_[cluster_name_str];
1921 auto it = cluster_state.watchers.find(watcher);
1922 if (it != cluster_state.watchers.end()) {
1923 cluster_state.watchers.erase(it);
1924 if (cluster_state.watchers.empty()) {
1925 cluster_map_.erase(cluster_name_str);
1926 chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str,
1927 delay_unsubscription);
1928 }
1929 }
1930 }
1931
WatchEndpointData(absl::string_view eds_service_name,std::unique_ptr<EndpointWatcherInterface> watcher)1932 void XdsClient::WatchEndpointData(
1933 absl::string_view eds_service_name,
1934 std::unique_ptr<EndpointWatcherInterface> watcher) {
1935 std::string eds_service_name_str = std::string(eds_service_name);
1936 MutexLock lock(&mu_);
1937 EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1938 EndpointWatcherInterface* w = watcher.get();
1939 endpoint_state.watchers[w] = std::move(watcher);
1940 // If we've already received an EDS update, notify the new watcher
1941 // immediately.
1942 if (endpoint_state.update.has_value()) {
1943 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1944 gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
1945 this, eds_service_name_str.c_str());
1946 }
1947 w->OnEndpointChanged(endpoint_state.update.value());
1948 }
1949 chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
1950 }
1951
CancelEndpointDataWatch(absl::string_view eds_service_name,EndpointWatcherInterface * watcher,bool delay_unsubscription)1952 void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
1953 EndpointWatcherInterface* watcher,
1954 bool delay_unsubscription) {
1955 MutexLock lock(&mu_);
1956 if (shutting_down_) return;
1957 std::string eds_service_name_str = std::string(eds_service_name);
1958 EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1959 auto it = endpoint_state.watchers.find(watcher);
1960 if (it != endpoint_state.watchers.end()) {
1961 endpoint_state.watchers.erase(it);
1962 if (endpoint_state.watchers.empty()) {
1963 endpoint_map_.erase(eds_service_name_str);
1964 chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str,
1965 delay_unsubscription);
1966 }
1967 }
1968 }
1969
AddClusterDropStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name)1970 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
1971 absl::string_view lrs_server, absl::string_view cluster_name,
1972 absl::string_view eds_service_name) {
1973 // TODO(roth): When we add support for direct federation, use the
1974 // server name specified in lrs_server.
1975 auto key =
1976 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1977 MutexLock lock(&mu_);
1978 // We jump through some hoops here to make sure that the absl::string_views
1979 // stored in the XdsClusterDropStats object point to the strings
1980 // in the load_report_map_ key, so that they have the same lifetime.
1981 auto it = load_report_map_
1982 .emplace(std::make_pair(std::move(key), LoadReportState()))
1983 .first;
1984 LoadReportState& load_report_state = it->second;
1985 RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
1986 if (load_report_state.drop_stats != nullptr) {
1987 cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
1988 }
1989 if (cluster_drop_stats == nullptr) {
1990 if (load_report_state.drop_stats != nullptr) {
1991 load_report_state.deleted_drop_stats +=
1992 load_report_state.drop_stats->GetSnapshotAndReset();
1993 }
1994 cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
1995 Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
1996 it->first.first /*cluster_name*/,
1997 it->first.second /*eds_service_name*/);
1998 load_report_state.drop_stats = cluster_drop_stats.get();
1999 }
2000 chand_->MaybeStartLrsCall();
2001 return cluster_drop_stats;
2002 }
2003
RemoveClusterDropStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,XdsClusterDropStats * cluster_drop_stats)2004 void XdsClient::RemoveClusterDropStats(
2005 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2006 absl::string_view eds_service_name,
2007 XdsClusterDropStats* cluster_drop_stats) {
2008 MutexLock lock(&mu_);
2009 // TODO(roth): When we add support for direct federation, use the
2010 // server name specified in lrs_server.
2011 auto it = load_report_map_.find(
2012 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2013 if (it == load_report_map_.end()) return;
2014 LoadReportState& load_report_state = it->second;
2015 if (load_report_state.drop_stats == cluster_drop_stats) {
2016 // Record final snapshot in deleted_drop_stats, which will be
2017 // added to the next load report.
2018 load_report_state.deleted_drop_stats +=
2019 load_report_state.drop_stats->GetSnapshotAndReset();
2020 load_report_state.drop_stats = nullptr;
2021 }
2022 }
2023
AddClusterLocalityStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality)2024 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
2025 absl::string_view lrs_server, absl::string_view cluster_name,
2026 absl::string_view eds_service_name,
2027 RefCountedPtr<XdsLocalityName> locality) {
2028 // TODO(roth): When we add support for direct federation, use the
2029 // server name specified in lrs_server.
2030 auto key =
2031 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2032 MutexLock lock(&mu_);
2033 // We jump through some hoops here to make sure that the absl::string_views
2034 // stored in the XdsClusterLocalityStats object point to the strings
2035 // in the load_report_map_ key, so that they have the same lifetime.
2036 auto it = load_report_map_
2037 .emplace(std::make_pair(std::move(key), LoadReportState()))
2038 .first;
2039 LoadReportState& load_report_state = it->second;
2040 LoadReportState::LocalityState& locality_state =
2041 load_report_state.locality_stats[locality];
2042 RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
2043 if (locality_state.locality_stats != nullptr) {
2044 cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
2045 }
2046 if (cluster_locality_stats == nullptr) {
2047 if (locality_state.locality_stats != nullptr) {
2048 locality_state.deleted_locality_stats +=
2049 locality_state.locality_stats->GetSnapshotAndReset();
2050 }
2051 cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2052 Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
2053 it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
2054 std::move(locality));
2055 locality_state.locality_stats = cluster_locality_stats.get();
2056 }
2057 chand_->MaybeStartLrsCall();
2058 return cluster_locality_stats;
2059 }
2060
RemoveClusterLocalityStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,XdsClusterLocalityStats * cluster_locality_stats)2061 void XdsClient::RemoveClusterLocalityStats(
2062 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2063 absl::string_view eds_service_name,
2064 const RefCountedPtr<XdsLocalityName>& locality,
2065 XdsClusterLocalityStats* cluster_locality_stats) {
2066 MutexLock lock(&mu_);
2067 // TODO(roth): When we add support for direct federation, use the
2068 // server name specified in lrs_server.
2069 auto it = load_report_map_.find(
2070 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2071 if (it == load_report_map_.end()) return;
2072 LoadReportState& load_report_state = it->second;
2073 auto locality_it = load_report_state.locality_stats.find(locality);
2074 if (locality_it == load_report_state.locality_stats.end()) return;
2075 LoadReportState::LocalityState& locality_state = locality_it->second;
2076 if (locality_state.locality_stats == cluster_locality_stats) {
2077 // Record final snapshot in deleted_locality_stats, which will be
2078 // added to the next load report.
2079 locality_state.deleted_locality_stats +=
2080 locality_state.locality_stats->GetSnapshotAndReset();
2081 locality_state.locality_stats = nullptr;
2082 }
2083 }
2084
ResetBackoff()2085 void XdsClient::ResetBackoff() {
2086 MutexLock lock(&mu_);
2087 if (chand_ != nullptr) {
2088 grpc_channel_reset_connect_backoff(chand_->channel());
2089 }
2090 }
2091
NotifyOnErrorLocked(grpc_error * error)2092 void XdsClient::NotifyOnErrorLocked(grpc_error* error) {
2093 for (const auto& p : listener_map_) {
2094 const ListenerState& listener_state = p.second;
2095 for (const auto& p : listener_state.watchers) {
2096 p.first->OnError(GRPC_ERROR_REF(error));
2097 }
2098 }
2099 for (const auto& p : route_config_map_) {
2100 const RouteConfigState& route_config_state = p.second;
2101 for (const auto& p : route_config_state.watchers) {
2102 p.first->OnError(GRPC_ERROR_REF(error));
2103 }
2104 }
2105 for (const auto& p : cluster_map_) {
2106 const ClusterState& cluster_state = p.second;
2107 for (const auto& p : cluster_state.watchers) {
2108 p.first->OnError(GRPC_ERROR_REF(error));
2109 }
2110 }
2111 for (const auto& p : endpoint_map_) {
2112 const EndpointState& endpoint_state = p.second;
2113 for (const auto& p : endpoint_state.watchers) {
2114 p.first->OnError(GRPC_ERROR_REF(error));
2115 }
2116 }
2117 GRPC_ERROR_UNREF(error);
2118 }
2119
BuildLoadReportSnapshotLocked(bool send_all_clusters,const std::set<std::string> & clusters)2120 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
2121 bool send_all_clusters, const std::set<std::string>& clusters) {
2122 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2123 gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
2124 }
2125 XdsApi::ClusterLoadReportMap snapshot_map;
2126 for (auto load_report_it = load_report_map_.begin();
2127 load_report_it != load_report_map_.end();) {
2128 // Cluster key is cluster and EDS service name.
2129 const auto& cluster_key = load_report_it->first;
2130 LoadReportState& load_report = load_report_it->second;
2131 // If the CDS response for a cluster indicates to use LRS but the
2132 // LRS server does not say that it wants reports for this cluster,
2133 // then we'll have stats objects here whose data we're not going to
2134 // include in the load report. However, we still need to clear out
2135 // the data from the stats objects, so that if the LRS server starts
2136 // asking for the data in the future, we don't incorrectly include
2137 // data from previous reporting intervals in that future report.
2138 const bool record_stats =
2139 send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2140 XdsApi::ClusterLoadReport snapshot;
2141 // Aggregate drop stats.
2142 snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2143 if (load_report.drop_stats != nullptr) {
2144 snapshot.dropped_requests +=
2145 load_report.drop_stats->GetSnapshotAndReset();
2146 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2147 gpr_log(GPR_INFO,
2148 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
2149 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2150 load_report.drop_stats);
2151 }
2152 }
2153 // Aggregate locality stats.
2154 for (auto it = load_report.locality_stats.begin();
2155 it != load_report.locality_stats.end();) {
2156 const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2157 auto& locality_state = it->second;
2158 XdsClusterLocalityStats::Snapshot& locality_snapshot =
2159 snapshot.locality_stats[locality_name];
2160 locality_snapshot = std::move(locality_state.deleted_locality_stats);
2161 if (locality_state.locality_stats != nullptr) {
2162 locality_snapshot +=
2163 locality_state.locality_stats->GetSnapshotAndReset();
2164 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2165 gpr_log(GPR_INFO,
2166 "[xds_client %p] cluster=%s eds_service_name=%s "
2167 "locality=%s locality_stats=%p",
2168 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2169 locality_name->AsHumanReadableString().c_str(),
2170 locality_state.locality_stats);
2171 }
2172 }
2173 // If the only thing left in this entry was final snapshots from
2174 // deleted locality stats objects, remove the entry.
2175 if (locality_state.locality_stats == nullptr) {
2176 it = load_report.locality_stats.erase(it);
2177 } else {
2178 ++it;
2179 }
2180 }
2181 // Compute load report interval.
2182 const grpc_millis now = ExecCtx::Get()->Now();
2183 snapshot.load_report_interval = now - load_report.last_report_time;
2184 load_report.last_report_time = now;
2185 // Record snapshot.
2186 if (record_stats) {
2187 snapshot_map[cluster_key] = std::move(snapshot);
2188 }
2189 // If the only thing left in this entry was final snapshots from
2190 // deleted stats objects, remove the entry.
2191 if (load_report.locality_stats.empty() &&
2192 load_report.drop_stats == nullptr) {
2193 load_report_it = load_report_map_.erase(load_report_it);
2194 } else {
2195 ++load_report_it;
2196 }
2197 }
2198 return snapshot_map;
2199 }
2200
2201 //
2202 // accessors for global state
2203 //
2204
XdsClientGlobalInit()2205 void XdsClientGlobalInit() { g_mu = new Mutex; }
2206
XdsClientGlobalShutdown()2207 void XdsClientGlobalShutdown() {
2208 delete g_mu;
2209 g_mu = nullptr;
2210 }
2211
GetOrCreate(grpc_error ** error)2212 RefCountedPtr<XdsClient> XdsClient::GetOrCreate(grpc_error** error) {
2213 MutexLock lock(g_mu);
2214 if (g_xds_client != nullptr) {
2215 auto xds_client = g_xds_client->RefIfNonZero();
2216 if (xds_client != nullptr) return xds_client;
2217 }
2218 auto xds_client = MakeRefCounted<XdsClient>(error);
2219 g_xds_client = xds_client.get();
2220 return xds_client;
2221 }
2222
2223 namespace internal {
2224
SetXdsChannelArgsForTest(grpc_channel_args * args)2225 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
2226 MutexLock lock(g_mu);
2227 g_channel_args = args;
2228 }
2229
UnsetGlobalXdsClientForTest()2230 void UnsetGlobalXdsClientForTest() {
2231 MutexLock lock(g_mu);
2232 g_xds_client = nullptr;
2233 }
2234
2235 } // namespace internal
2236
2237 } // namespace grpc_core
2238