1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <string.h>
24
25 #include "absl/container/inlined_vector.h"
26 #include "absl/strings/str_format.h"
27 #include "absl/strings/str_join.h"
28 #include "absl/strings/string_view.h"
29
30 #include <grpc/byte_buffer_reader.h>
31 #include <grpc/grpc.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/time.h>
34
35 #include "src/core/ext/filters/client_channel/client_channel.h"
36 #include "src/core/ext/filters/client_channel/parse_address.h"
37 #include "src/core/ext/filters/client_channel/server_address.h"
38 #include "src/core/ext/filters/client_channel/service_config.h"
39 #include "src/core/ext/filters/client_channel/xds/xds_api.h"
40 #include "src/core/ext/filters/client_channel/xds/xds_channel.h"
41 #include "src/core/ext/filters/client_channel/xds/xds_channel_args.h"
42 #include "src/core/ext/filters/client_channel/xds/xds_client.h"
43 #include "src/core/ext/filters/client_channel/xds/xds_client_stats.h"
44 #include "src/core/lib/backoff/backoff.h"
45 #include "src/core/lib/channel/channel_args.h"
46 #include "src/core/lib/channel/channel_stack.h"
47 #include "src/core/lib/gpr/string.h"
48 #include "src/core/lib/gprpp/map.h"
49 #include "src/core/lib/gprpp/memory.h"
50 #include "src/core/lib/gprpp/orphanable.h"
51 #include "src/core/lib/gprpp/ref_counted_ptr.h"
52 #include "src/core/lib/gprpp/sync.h"
53 #include "src/core/lib/iomgr/sockaddr.h"
54 #include "src/core/lib/iomgr/sockaddr_utils.h"
55 #include "src/core/lib/iomgr/timer.h"
56 #include "src/core/lib/iomgr/work_serializer.h"
57 #include "src/core/lib/slice/slice_hash_table.h"
58 #include "src/core/lib/slice/slice_internal.h"
59 #include "src/core/lib/slice/slice_string_helpers.h"
60 #include "src/core/lib/surface/call.h"
61 #include "src/core/lib/surface/channel.h"
62 #include "src/core/lib/surface/channel_init.h"
63 #include "src/core/lib/transport/static_metadata.h"
64
65 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
66 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
67 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
68 #define GRPC_XDS_RECONNECT_JITTER 0.2
69 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
70
71 namespace grpc_core {
72
73 TraceFlag grpc_xds_client_trace(false, "xds_client");
74
75 //
76 // Internal class declarations
77 //
78
79 // An xds call wrapper that can restart a call upon failure. Holds a ref to
80 // the xds channel. The template parameter is the kind of wrapped xds call.
81 template <typename T>
82 class XdsClient::ChannelState::RetryableCall
83 : public InternallyRefCounted<RetryableCall<T>> {
84 public:
85 explicit RetryableCall(RefCountedPtr<ChannelState> chand);
86
87 void Orphan() override;
88
89 void OnCallFinishedLocked();
90
calld() const91 T* calld() const { return calld_.get(); }
chand() const92 ChannelState* chand() const { return chand_.get(); }
93
94 bool IsCurrentCallOnChannel() const;
95
96 private:
97 void StartNewCallLocked();
98 void StartRetryTimerLocked();
99 static void OnRetryTimer(void* arg, grpc_error* error);
100 void OnRetryTimerLocked(grpc_error* error);
101
102 // The wrapped xds call that talks to the xds server. It's instantiated
103 // every time we start a new call. It's null during call retry backoff.
104 OrphanablePtr<T> calld_;
105 // The owning xds channel.
106 RefCountedPtr<ChannelState> chand_;
107
108 // Retry state.
109 BackOff backoff_;
110 grpc_timer retry_timer_;
111 grpc_closure on_retry_timer_;
112 bool retry_timer_callback_pending_ = false;
113
114 bool shutting_down_ = false;
115 };
116
117 // Contains an ADS call to the xds server.
118 class XdsClient::ChannelState::AdsCallState
119 : public InternallyRefCounted<AdsCallState> {
120 public:
121 // The ctor and dtor should not be used directly.
122 explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
123 ~AdsCallState() override;
124
125 void Orphan() override;
126
parent() const127 RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
chand() const128 ChannelState* chand() const { return parent_->chand(); }
xds_client() const129 XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const130 bool seen_response() const { return seen_response_; }
131
132 void Subscribe(const std::string& type_url, const std::string& name);
133 void Unsubscribe(const std::string& type_url, const std::string& name,
134 bool delay_unsubscription);
135
136 bool HasSubscribedResources() const;
137
138 private:
139 class ResourceState : public InternallyRefCounted<ResourceState> {
140 public:
ResourceState(const std::string & type_url,const std::string & name)141 ResourceState(const std::string& type_url, const std::string& name)
142 : type_url_(type_url), name_(name) {
143 GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
144 grpc_schedule_on_exec_ctx);
145 }
146
Orphan()147 void Orphan() override {
148 Finish();
149 Unref();
150 }
151
Start(RefCountedPtr<AdsCallState> ads_calld)152 void Start(RefCountedPtr<AdsCallState> ads_calld) {
153 if (sent_) return;
154 sent_ = true;
155 ads_calld_ = std::move(ads_calld);
156 Ref().release();
157 timer_pending_ = true;
158 grpc_timer_init(
159 &timer_,
160 ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
161 &timer_callback_);
162 }
163
Finish()164 void Finish() {
165 if (timer_pending_) {
166 grpc_timer_cancel(&timer_);
167 timer_pending_ = false;
168 }
169 }
170
171 private:
OnTimer(void * arg,grpc_error * error)172 static void OnTimer(void* arg, grpc_error* error) {
173 ResourceState* self = static_cast<ResourceState*>(arg);
174 GRPC_ERROR_REF(error); // ref owned by lambda
175 self->ads_calld_->xds_client()->work_serializer_->Run(
176 [self, error]() { self->OnTimerLocked(error); }, DEBUG_LOCATION);
177 }
178
OnTimerLocked(grpc_error * error)179 void OnTimerLocked(grpc_error* error) {
180 if (error == GRPC_ERROR_NONE && timer_pending_) {
181 timer_pending_ = false;
182 grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
183 absl::StrFormat(
184 "timeout obtaining resource {type=%s name=%s} from xds server",
185 type_url_, name_)
186 .c_str());
187 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
188 gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
189 grpc_error_string(watcher_error));
190 }
191 if (type_url_ == XdsApi::kLdsTypeUrl ||
192 type_url_ == XdsApi::kRdsTypeUrl) {
193 ads_calld_->xds_client()->service_config_watcher_->OnError(
194 watcher_error);
195 } else if (type_url_ == XdsApi::kCdsTypeUrl) {
196 ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
197 for (const auto& p : state.watchers) {
198 p.first->OnError(GRPC_ERROR_REF(watcher_error));
199 }
200 GRPC_ERROR_UNREF(watcher_error);
201 } else if (type_url_ == XdsApi::kEdsTypeUrl) {
202 EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
203 for (const auto& p : state.watchers) {
204 p.first->OnError(GRPC_ERROR_REF(watcher_error));
205 }
206 GRPC_ERROR_UNREF(watcher_error);
207 } else {
208 GPR_UNREACHABLE_CODE(return );
209 }
210 }
211 ads_calld_.reset();
212 Unref();
213 GRPC_ERROR_UNREF(error);
214 }
215
216 const std::string type_url_;
217 const std::string name_;
218
219 RefCountedPtr<AdsCallState> ads_calld_;
220 bool sent_ = false;
221 bool timer_pending_ = false;
222 grpc_timer timer_;
223 grpc_closure timer_callback_;
224 };
225
226 struct ResourceTypeState {
~ResourceTypeStategrpc_core::XdsClient::ChannelState::AdsCallState::ResourceTypeState227 ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
228
229 // Version, nonce, and error for this resource type.
230 std::string version;
231 std::string nonce;
232 grpc_error* error = GRPC_ERROR_NONE;
233
234 // Subscribed resources of this type.
235 std::map<std::string /* name */, OrphanablePtr<ResourceState>>
236 subscribed_resources;
237 };
238
239 void SendMessageLocked(const std::string& type_url);
240
241 void AcceptLdsUpdate(absl::optional<XdsApi::LdsUpdate> lds_update);
242 void AcceptRdsUpdate(absl::optional<XdsApi::RdsUpdate> rds_update);
243 void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
244 void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
245
246 static void OnRequestSent(void* arg, grpc_error* error);
247 void OnRequestSentLocked(grpc_error* error);
248 static void OnResponseReceived(void* arg, grpc_error* error);
249 void OnResponseReceivedLocked();
250 static void OnStatusReceived(void* arg, grpc_error* error);
251 void OnStatusReceivedLocked(grpc_error* error);
252
253 bool IsCurrentCallOnChannel() const;
254
255 std::set<absl::string_view> ResourceNamesForRequest(
256 const std::string& type_url);
257
258 // The owning RetryableCall<>.
259 RefCountedPtr<RetryableCall<AdsCallState>> parent_;
260
261 bool sent_initial_message_ = false;
262 bool seen_response_ = false;
263
264 // Always non-NULL.
265 grpc_call* call_;
266
267 // recv_initial_metadata
268 grpc_metadata_array initial_metadata_recv_;
269
270 // send_message
271 grpc_byte_buffer* send_message_payload_ = nullptr;
272 grpc_closure on_request_sent_;
273
274 // recv_message
275 grpc_byte_buffer* recv_message_payload_ = nullptr;
276 grpc_closure on_response_received_;
277
278 // recv_trailing_metadata
279 grpc_metadata_array trailing_metadata_recv_;
280 grpc_status_code status_code_;
281 grpc_slice status_details_;
282 grpc_closure on_status_received_;
283
284 // Resource types for which requests need to be sent.
285 std::set<std::string /*type_url*/> buffered_requests_;
286
287 // State for each resource type.
288 std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
289 };
290
291 // Contains an LRS call to the xds server.
292 class XdsClient::ChannelState::LrsCallState
293 : public InternallyRefCounted<LrsCallState> {
294 public:
295 // The ctor and dtor should not be used directly.
296 explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
297 ~LrsCallState() override;
298
299 void Orphan() override;
300
301 void MaybeStartReportingLocked();
302
parent()303 RetryableCall<LrsCallState>* parent() { return parent_.get(); }
chand() const304 ChannelState* chand() const { return parent_->chand(); }
xds_client() const305 XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const306 bool seen_response() const { return seen_response_; }
307
308 private:
309 // Reports client-side load stats according to a fixed interval.
310 class Reporter : public InternallyRefCounted<Reporter> {
311 public:
Reporter(RefCountedPtr<LrsCallState> parent,grpc_millis report_interval)312 Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
313 : parent_(std::move(parent)), report_interval_(report_interval) {
314 GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
315 grpc_schedule_on_exec_ctx);
316 GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
317 grpc_schedule_on_exec_ctx);
318 ScheduleNextReportLocked();
319 }
320
321 void Orphan() override;
322
323 private:
324 void ScheduleNextReportLocked();
325 static void OnNextReportTimer(void* arg, grpc_error* error);
326 void OnNextReportTimerLocked(grpc_error* error);
327 void SendReportLocked();
328 static void OnReportDone(void* arg, grpc_error* error);
329 void OnReportDoneLocked(grpc_error* error);
330
IsCurrentReporterOnCall() const331 bool IsCurrentReporterOnCall() const {
332 return this == parent_->reporter_.get();
333 }
xds_client() const334 XdsClient* xds_client() const { return parent_->xds_client(); }
335
336 // The owning LRS call.
337 RefCountedPtr<LrsCallState> parent_;
338
339 // The load reporting state.
340 const grpc_millis report_interval_;
341 bool last_report_counters_were_zero_ = false;
342 bool next_report_timer_callback_pending_ = false;
343 grpc_timer next_report_timer_;
344 grpc_closure on_next_report_timer_;
345 grpc_closure on_report_done_;
346 };
347
348 static void OnInitialRequestSent(void* arg, grpc_error* error);
349 void OnInitialRequestSentLocked();
350 static void OnResponseReceived(void* arg, grpc_error* error);
351 void OnResponseReceivedLocked();
352 static void OnStatusReceived(void* arg, grpc_error* error);
353 void OnStatusReceivedLocked(grpc_error* error);
354
355 bool IsCurrentCallOnChannel() const;
356
357 // The owning RetryableCall<>.
358 RefCountedPtr<RetryableCall<LrsCallState>> parent_;
359 bool seen_response_ = false;
360
361 // Always non-NULL.
362 grpc_call* call_;
363
364 // recv_initial_metadata
365 grpc_metadata_array initial_metadata_recv_;
366
367 // send_message
368 grpc_byte_buffer* send_message_payload_ = nullptr;
369 grpc_closure on_initial_request_sent_;
370
371 // recv_message
372 grpc_byte_buffer* recv_message_payload_ = nullptr;
373 grpc_closure on_response_received_;
374
375 // recv_trailing_metadata
376 grpc_metadata_array trailing_metadata_recv_;
377 grpc_status_code status_code_;
378 grpc_slice status_details_;
379 grpc_closure on_status_received_;
380
381 // Load reporting state.
382 bool send_all_clusters_ = false;
383 std::set<std::string> cluster_names_; // Asked for by the LRS server.
384 grpc_millis load_reporting_interval_ = 0;
385 OrphanablePtr<Reporter> reporter_;
386 };
387
388 //
389 // XdsClient::ChannelState::StateWatcher
390 //
391
392 class XdsClient::ChannelState::StateWatcher
393 : public AsyncConnectivityStateWatcherInterface {
394 public:
StateWatcher(RefCountedPtr<ChannelState> parent)395 explicit StateWatcher(RefCountedPtr<ChannelState> parent)
396 : AsyncConnectivityStateWatcherInterface(
397 parent->xds_client()->work_serializer_),
398 parent_(std::move(parent)) {}
399
400 private:
OnConnectivityStateChange(grpc_connectivity_state new_state)401 void OnConnectivityStateChange(grpc_connectivity_state new_state) override {
402 if (!parent_->shutting_down_ &&
403 new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
404 // In TRANSIENT_FAILURE. Notify all watchers of error.
405 gpr_log(GPR_INFO,
406 "[xds_client %p] xds channel in state TRANSIENT_FAILURE",
407 parent_->xds_client());
408 parent_->xds_client()->NotifyOnError(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
409 "xds channel in TRANSIENT_FAILURE"));
410 }
411 }
412
413 RefCountedPtr<ChannelState> parent_;
414 };
415
416 //
417 // XdsClient::ChannelState
418 //
419
420 namespace {
421
422 // Returns the channel args for the xds channel.
BuildXdsChannelArgs(const grpc_channel_args & args)423 grpc_channel_args* BuildXdsChannelArgs(const grpc_channel_args& args) {
424 static const char* args_to_remove[] = {
425 // LB policy name, since we want to use the default (pick_first) in
426 // the LB channel.
427 GRPC_ARG_LB_POLICY_NAME,
428 // The service config that contains the LB config. We don't want to
429 // recursively use xds in the LB channel.
430 GRPC_ARG_SERVICE_CONFIG,
431 // The channel arg for the server URI, since that will be different for
432 // the xds channel than for the parent channel. The client channel
433 // factory will re-add this arg with the right value.
434 GRPC_ARG_SERVER_URI,
435 // The xds channel should use the authority indicated by the target
436 // authority table (see \a ModifyXdsChannelArgs),
437 // as opposed to the authority from the parent channel.
438 GRPC_ARG_DEFAULT_AUTHORITY,
439 // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the xds channel should be
440 // treated as a stand-alone channel and not inherit this argument from the
441 // args of the parent channel.
442 GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
443 // Don't want to pass down channelz node from parent; the balancer
444 // channel will get its own.
445 GRPC_ARG_CHANNELZ_CHANNEL_NODE,
446 // Keepalive interval. We are explicitly setting our own value below.
447 GRPC_ARG_KEEPALIVE_TIME_MS,
448 };
449 // Channel args to add.
450 absl::InlinedVector<grpc_arg, 3> args_to_add;
451 // Keepalive interval.
452 args_to_add.emplace_back(grpc_channel_arg_integer_create(
453 const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), 5000));
454 // A channel arg indicating that the target is an xds server.
455 // TODO(roth): Once we figure out our fallback and credentials story, decide
456 // whether this is actually needed. Note that it's currently used by the
457 // fake security connector as well.
458 args_to_add.emplace_back(grpc_channel_arg_integer_create(
459 const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_SERVER), 1));
460 // The parent channel's channelz uuid.
461 channelz::ChannelNode* channelz_node = nullptr;
462 const grpc_arg* arg =
463 grpc_channel_args_find(&args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
464 if (arg != nullptr && arg->type == GRPC_ARG_POINTER &&
465 arg->value.pointer.p != nullptr) {
466 channelz_node = static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
467 args_to_add.emplace_back(
468 channelz::MakeParentUuidArg(channelz_node->uuid()));
469 }
470 // Construct channel args.
471 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
472 &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
473 args_to_add.size());
474 // Make any necessary modifications for security.
475 return ModifyXdsChannelArgs(new_args);
476 }
477
478 } // namespace
479
ChannelState(RefCountedPtr<XdsClient> xds_client,grpc_channel * channel)480 XdsClient::ChannelState::ChannelState(RefCountedPtr<XdsClient> xds_client,
481 grpc_channel* channel)
482 : InternallyRefCounted<ChannelState>(&grpc_xds_client_trace),
483 xds_client_(std::move(xds_client)),
484 channel_(channel) {
485 GPR_ASSERT(channel_ != nullptr);
486 StartConnectivityWatchLocked();
487 }
488
~ChannelState()489 XdsClient::ChannelState::~ChannelState() {
490 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
491 gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
492 this);
493 }
494 grpc_channel_destroy(channel_);
495 }
496
Orphan()497 void XdsClient::ChannelState::Orphan() {
498 shutting_down_ = true;
499 CancelConnectivityWatchLocked();
500 ads_calld_.reset();
501 lrs_calld_.reset();
502 Unref(DEBUG_LOCATION, "ChannelState+orphaned");
503 }
504
ads_calld() const505 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
506 const {
507 return ads_calld_->calld();
508 }
509
lrs_calld() const510 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
511 const {
512 return lrs_calld_->calld();
513 }
514
HasActiveAdsCall() const515 bool XdsClient::ChannelState::HasActiveAdsCall() const {
516 return ads_calld_->calld() != nullptr;
517 }
518
MaybeStartLrsCall()519 void XdsClient::ChannelState::MaybeStartLrsCall() {
520 if (lrs_calld_ != nullptr) return;
521 lrs_calld_.reset(
522 new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
523 }
524
StopLrsCall()525 void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
526
StartConnectivityWatchLocked()527 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
528 grpc_channel_element* client_channel_elem =
529 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
530 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
531 watcher_ = new StateWatcher(Ref());
532 grpc_client_channel_start_connectivity_watch(
533 client_channel_elem, GRPC_CHANNEL_IDLE,
534 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
535 }
536
CancelConnectivityWatchLocked()537 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
538 grpc_channel_element* client_channel_elem =
539 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
540 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
541 grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
542 }
543
Subscribe(const std::string & type_url,const std::string & name)544 void XdsClient::ChannelState::Subscribe(const std::string& type_url,
545 const std::string& name) {
546 if (ads_calld_ == nullptr) {
547 // Start the ADS call if this is the first request.
548 ads_calld_.reset(new RetryableCall<AdsCallState>(
549 Ref(DEBUG_LOCATION, "ChannelState+ads")));
550 // Note: AdsCallState's ctor will automatically subscribe to all
551 // resources that the XdsClient already has watchers for, so we can
552 // return here.
553 return;
554 }
555 // If the ADS call is in backoff state, we don't need to do anything now
556 // because when the call is restarted it will resend all necessary requests.
557 if (ads_calld() == nullptr) return;
558 // Subscribe to this resource if the ADS call is active.
559 ads_calld()->Subscribe(type_url, name);
560 }
561
Unsubscribe(const std::string & type_url,const std::string & name,bool delay_unsubscription)562 void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
563 const std::string& name,
564 bool delay_unsubscription) {
565 if (ads_calld_ != nullptr) {
566 ads_calld_->calld()->Unsubscribe(type_url, name, delay_unsubscription);
567 if (!ads_calld_->calld()->HasSubscribedResources()) ads_calld_.reset();
568 }
569 }
570
571 //
572 // XdsClient::ChannelState::RetryableCall<>
573 //
574
575 template <typename T>
RetryableCall(RefCountedPtr<ChannelState> chand)576 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
577 RefCountedPtr<ChannelState> chand)
578 : chand_(std::move(chand)),
579 backoff_(
580 BackOff::Options()
581 .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
582 1000)
583 .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
584 .set_jitter(GRPC_XDS_RECONNECT_JITTER)
585 .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
586 // Closure Initialization
587 GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
588 grpc_schedule_on_exec_ctx);
589 StartNewCallLocked();
590 }
591
592 template <typename T>
Orphan()593 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
594 shutting_down_ = true;
595 calld_.reset();
596 if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
597 this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
598 }
599
600 template <typename T>
OnCallFinishedLocked()601 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
602 const bool seen_response = calld_->seen_response();
603 calld_.reset();
604 if (seen_response) {
605 // If we lost connection to the xds server, reset backoff and restart the
606 // call immediately.
607 backoff_.Reset();
608 StartNewCallLocked();
609 } else {
610 // If we failed to connect to the xds server, retry later.
611 StartRetryTimerLocked();
612 }
613 }
614
615 template <typename T>
StartNewCallLocked()616 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
617 if (shutting_down_) return;
618 GPR_ASSERT(chand_->channel_ != nullptr);
619 GPR_ASSERT(calld_ == nullptr);
620 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
621 gpr_log(GPR_INFO,
622 "[xds_client %p] Start new call from retryable call (chand: %p, "
623 "retryable call: %p)",
624 chand()->xds_client(), chand(), this);
625 }
626 calld_ = MakeOrphanable<T>(
627 this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
628 }
629
630 template <typename T>
StartRetryTimerLocked()631 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
632 if (shutting_down_) return;
633 const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
634 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
635 grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
636 gpr_log(GPR_INFO,
637 "[xds_client %p] Failed to connect to xds server (chand: %p) "
638 "retry timer will fire in %" PRId64 "ms.",
639 chand()->xds_client(), chand(), timeout);
640 }
641 this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
642 grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
643 retry_timer_callback_pending_ = true;
644 }
645
646 template <typename T>
OnRetryTimer(void * arg,grpc_error * error)647 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
648 void* arg, grpc_error* error) {
649 RetryableCall* calld = static_cast<RetryableCall*>(arg);
650 GRPC_ERROR_REF(error); // ref owned by lambda
651 calld->chand_->xds_client()->work_serializer_->Run(
652 [calld, error]() { calld->OnRetryTimerLocked(error); }, DEBUG_LOCATION);
653 }
654
655 template <typename T>
OnRetryTimerLocked(grpc_error * error)656 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
657 grpc_error* error) {
658 retry_timer_callback_pending_ = false;
659 if (!shutting_down_ && error == GRPC_ERROR_NONE) {
660 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
661 gpr_log(
662 GPR_INFO,
663 "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
664 chand()->xds_client(), chand(), this);
665 }
666 StartNewCallLocked();
667 }
668 this->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
669 GRPC_ERROR_UNREF(error);
670 }
671
672 //
673 // XdsClient::ChannelState::AdsCallState
674 //
675
AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent)676 XdsClient::ChannelState::AdsCallState::AdsCallState(
677 RefCountedPtr<RetryableCall<AdsCallState>> parent)
678 : InternallyRefCounted<AdsCallState>(&grpc_xds_client_trace),
679 parent_(std::move(parent)) {
680 // Init the ADS call. Note that the call will progress every time there's
681 // activity in xds_client()->interested_parties_, which is comprised of
682 // the polling entities from client_channel.
683 GPR_ASSERT(xds_client() != nullptr);
684 GPR_ASSERT(!xds_client()->server_name_.empty());
685 // Create a call with the specified method name.
686 call_ = grpc_channel_create_pollset_set_call(
687 chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
688 xds_client()->interested_parties_,
689 GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES,
690 nullptr, GRPC_MILLIS_INF_FUTURE, nullptr);
691 GPR_ASSERT(call_ != nullptr);
692 // Init data associated with the call.
693 grpc_metadata_array_init(&initial_metadata_recv_);
694 grpc_metadata_array_init(&trailing_metadata_recv_);
695 // Start the call.
696 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
697 gpr_log(GPR_INFO,
698 "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
699 "call: %p)",
700 xds_client(), chand(), this, call_);
701 }
702 // Create the ops.
703 grpc_call_error call_error;
704 grpc_op ops[3];
705 memset(ops, 0, sizeof(ops));
706 // Op: send initial metadata.
707 grpc_op* op = ops;
708 op->op = GRPC_OP_SEND_INITIAL_METADATA;
709 op->data.send_initial_metadata.count = 0;
710 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
711 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
712 op->reserved = nullptr;
713 op++;
714 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
715 nullptr);
716 GPR_ASSERT(GRPC_CALL_OK == call_error);
717 // Op: send request message.
718 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
719 grpc_schedule_on_exec_ctx);
720 if (xds_client()->service_config_watcher_ != nullptr) {
721 Subscribe(XdsApi::kLdsTypeUrl, xds_client()->server_name_);
722 if (xds_client()->lds_result_.has_value() &&
723 !xds_client()->lds_result_->route_config_name.empty()) {
724 Subscribe(XdsApi::kRdsTypeUrl,
725 xds_client()->lds_result_->route_config_name);
726 }
727 }
728 for (const auto& p : xds_client()->cluster_map_) {
729 Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
730 }
731 for (const auto& p : xds_client()->endpoint_map_) {
732 Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first));
733 }
734 // Op: recv initial metadata.
735 op = ops;
736 op->op = GRPC_OP_RECV_INITIAL_METADATA;
737 op->data.recv_initial_metadata.recv_initial_metadata =
738 &initial_metadata_recv_;
739 op->flags = 0;
740 op->reserved = nullptr;
741 op++;
742 // Op: recv response.
743 op->op = GRPC_OP_RECV_MESSAGE;
744 op->data.recv_message.recv_message = &recv_message_payload_;
745 op->flags = 0;
746 op->reserved = nullptr;
747 op++;
748 Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
749 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
750 grpc_schedule_on_exec_ctx);
751 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
752 &on_response_received_);
753 GPR_ASSERT(GRPC_CALL_OK == call_error);
754 // Op: recv server status.
755 op = ops;
756 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
757 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
758 op->data.recv_status_on_client.status = &status_code_;
759 op->data.recv_status_on_client.status_details = &status_details_;
760 op->flags = 0;
761 op->reserved = nullptr;
762 op++;
763 // This callback signals the end of the call, so it relies on the initial
764 // ref instead of a new ref. When it's invoked, it's the initial ref that is
765 // unreffed.
766 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
767 grpc_schedule_on_exec_ctx);
768 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
769 &on_status_received_);
770 GPR_ASSERT(GRPC_CALL_OK == call_error);
771 }
772
~AdsCallState()773 XdsClient::ChannelState::AdsCallState::~AdsCallState() {
774 grpc_metadata_array_destroy(&initial_metadata_recv_);
775 grpc_metadata_array_destroy(&trailing_metadata_recv_);
776 grpc_byte_buffer_destroy(send_message_payload_);
777 grpc_byte_buffer_destroy(recv_message_payload_);
778 grpc_slice_unref_internal(status_details_);
779 GPR_ASSERT(call_ != nullptr);
780 grpc_call_unref(call_);
781 }
782
Orphan()783 void XdsClient::ChannelState::AdsCallState::Orphan() {
784 GPR_ASSERT(call_ != nullptr);
785 // If we are here because xds_client wants to cancel the call,
786 // on_status_received_ will complete the cancellation and clean up. Otherwise,
787 // we are here because xds_client has to orphan a failed call, then the
788 // following cancellation will be a no-op.
789 grpc_call_cancel(call_, nullptr);
790 state_map_.clear();
791 // Note that the initial ref is hold by on_status_received_. So the
792 // corresponding unref happens in on_status_received_ instead of here.
793 }
794
SendMessageLocked(const std::string & type_url)795 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
796 const std::string& type_url) {
797 // Buffer message sending if an existing message is in flight.
798 if (send_message_payload_ != nullptr) {
799 buffered_requests_.insert(type_url);
800 return;
801 }
802 auto& state = state_map_[type_url];
803 grpc_slice request_payload_slice;
804 std::set<absl::string_view> resource_names =
805 ResourceNamesForRequest(type_url);
806 request_payload_slice = xds_client()->api_.CreateAdsRequest(
807 type_url, resource_names, state.version, state.nonce,
808 GRPC_ERROR_REF(state.error), !sent_initial_message_);
809 if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
810 type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
811 state_map_.erase(type_url);
812 }
813 sent_initial_message_ = true;
814 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
815 gpr_log(GPR_INFO,
816 "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
817 "error=%s resources=%s",
818 xds_client(), type_url.c_str(), state.version.c_str(),
819 state.nonce.c_str(), grpc_error_string(state.error),
820 absl::StrJoin(resource_names, " ").c_str());
821 }
822 GRPC_ERROR_UNREF(state.error);
823 state.error = GRPC_ERROR_NONE;
824 // Create message payload.
825 send_message_payload_ =
826 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
827 grpc_slice_unref_internal(request_payload_slice);
828 // Send the message.
829 grpc_op op;
830 memset(&op, 0, sizeof(op));
831 op.op = GRPC_OP_SEND_MESSAGE;
832 op.data.send_message.send_message = send_message_payload_;
833 Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
834 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
835 grpc_schedule_on_exec_ctx);
836 grpc_call_error call_error =
837 grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
838 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
839 gpr_log(GPR_ERROR,
840 "[xds_client %p] calld=%p call_error=%d sending ADS message",
841 xds_client(), this, call_error);
842 GPR_ASSERT(GRPC_CALL_OK == call_error);
843 }
844 }
845
Subscribe(const std::string & type_url,const std::string & name)846 void XdsClient::ChannelState::AdsCallState::Subscribe(
847 const std::string& type_url, const std::string& name) {
848 auto& state = state_map_[type_url].subscribed_resources[name];
849 if (state == nullptr) {
850 state = MakeOrphanable<ResourceState>(type_url, name);
851 SendMessageLocked(type_url);
852 }
853 }
854
Unsubscribe(const std::string & type_url,const std::string & name,bool delay_unsubscription)855 void XdsClient::ChannelState::AdsCallState::Unsubscribe(
856 const std::string& type_url, const std::string& name,
857 bool delay_unsubscription) {
858 state_map_[type_url].subscribed_resources.erase(name);
859 if (!delay_unsubscription) SendMessageLocked(type_url);
860 }
861
HasSubscribedResources() const862 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
863 for (const auto& p : state_map_) {
864 if (!p.second.subscribed_resources.empty()) return true;
865 }
866 return false;
867 }
868
AcceptLdsUpdate(absl::optional<XdsApi::LdsUpdate> lds_update)869 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
870 absl::optional<XdsApi::LdsUpdate> lds_update) {
871 if (!lds_update.has_value()) {
872 gpr_log(GPR_INFO,
873 "[xds_client %p] LDS update does not include requested resource",
874 xds_client());
875 if (xds_client()->lds_result_.has_value() &&
876 !xds_client()->lds_result_->route_config_name.empty()) {
877 Unsubscribe(XdsApi::kRdsTypeUrl,
878 xds_client()->lds_result_->route_config_name,
879 /*delay_unsubscription=*/false);
880 xds_client()->rds_result_.reset();
881 }
882 xds_client()->lds_result_.reset();
883 xds_client()->service_config_watcher_->OnResourceDoesNotExist();
884 return;
885 }
886 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
887 gpr_log(GPR_INFO,
888 "[xds_client %p] LDS update received: route_config_name=%s",
889 xds_client(),
890 (!lds_update->route_config_name.empty()
891 ? lds_update->route_config_name.c_str()
892 : "<inlined>"));
893 if (lds_update->rds_update.has_value()) {
894 gpr_log(GPR_INFO, "RouteConfiguration contains %" PRIuPTR " routes",
895 lds_update->rds_update.value().routes.size());
896 for (size_t i = 0; i < lds_update->rds_update.value().routes.size();
897 ++i) {
898 gpr_log(GPR_INFO, "Route %" PRIuPTR ":\n%s", i,
899 lds_update->rds_update.value().routes[i].ToString().c_str());
900 }
901 }
902 }
903 auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
904 auto& state = lds_state.subscribed_resources[xds_client()->server_name_];
905 if (state != nullptr) state->Finish();
906 // Ignore identical update.
907 if (xds_client()->lds_result_ == lds_update) {
908 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
909 gpr_log(GPR_INFO,
910 "[xds_client %p] LDS update identical to current, ignoring.",
911 xds_client());
912 }
913 return;
914 }
915 if (xds_client()->lds_result_.has_value() &&
916 !xds_client()->lds_result_->route_config_name.empty()) {
917 Unsubscribe(
918 XdsApi::kRdsTypeUrl, xds_client()->lds_result_->route_config_name,
919 /*delay_unsubscription=*/!lds_update->route_config_name.empty());
920 xds_client()->rds_result_.reset();
921 }
922 xds_client()->lds_result_ = std::move(lds_update);
923 if (xds_client()->lds_result_->rds_update.has_value()) {
924 // If the RouteConfiguration was found inlined in LDS response, notify
925 // the watcher immediately.
926 RefCountedPtr<ServiceConfig> service_config;
927 grpc_error* error = xds_client()->CreateServiceConfig(
928 xds_client()->lds_result_->rds_update.value(), &service_config);
929 if (error == GRPC_ERROR_NONE) {
930 xds_client()->service_config_watcher_->OnServiceConfigChanged(
931 std::move(service_config));
932 } else {
933 xds_client()->service_config_watcher_->OnError(error);
934 }
935 } else {
936 // Send RDS request for dynamic resolution.
937 Subscribe(XdsApi::kRdsTypeUrl,
938 xds_client()->lds_result_->route_config_name);
939 }
940 }
941
AcceptRdsUpdate(absl::optional<XdsApi::RdsUpdate> rds_update)942 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
943 absl::optional<XdsApi::RdsUpdate> rds_update) {
944 if (!rds_update.has_value()) {
945 gpr_log(GPR_INFO,
946 "[xds_client %p] RDS update does not include requested resource",
947 xds_client());
948 xds_client()->rds_result_.reset();
949 xds_client()->service_config_watcher_->OnResourceDoesNotExist();
950 return;
951 }
952 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
953 gpr_log(GPR_INFO,
954 "[xds_client %p] RDS update received; RouteConfiguration contains "
955 "%" PRIuPTR " routes",
956 this, rds_update.value().routes.size());
957 for (size_t i = 0; i < rds_update.value().routes.size(); ++i) {
958 gpr_log(GPR_INFO, "Route %" PRIuPTR ":\n%s", i,
959 rds_update.value().routes[i].ToString().c_str());
960 }
961 }
962 auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
963 auto& state =
964 rds_state
965 .subscribed_resources[xds_client()->lds_result_->route_config_name];
966 if (state != nullptr) state->Finish();
967 // Ignore identical update.
968 if (xds_client()->rds_result_ == rds_update) {
969 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
970 gpr_log(GPR_INFO,
971 "[xds_client %p] RDS update identical to current, ignoring.",
972 xds_client());
973 }
974 return;
975 }
976 xds_client()->rds_result_ = std::move(rds_update);
977 // Notify the watcher.
978 RefCountedPtr<ServiceConfig> service_config;
979 grpc_error* error = xds_client()->CreateServiceConfig(
980 xds_client()->rds_result_.value(), &service_config);
981 if (error == GRPC_ERROR_NONE) {
982 xds_client()->service_config_watcher_->OnServiceConfigChanged(
983 std::move(service_config));
984 } else {
985 xds_client()->service_config_watcher_->OnError(error);
986 }
987 }
988
AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map)989 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
990 XdsApi::CdsUpdateMap cds_update_map) {
991 auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
992 std::set<std::string> eds_resource_names_seen;
993 for (auto& p : cds_update_map) {
994 const char* cluster_name = p.first.c_str();
995 XdsApi::CdsUpdate& cds_update = p.second;
996 auto& state = cds_state.subscribed_resources[cluster_name];
997 if (state != nullptr) state->Finish();
998 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
999 gpr_log(GPR_INFO,
1000 "[xds_client %p] CDS update (cluster=%s) received: "
1001 "eds_service_name=%s, lrs_load_reporting_server_name=%s",
1002 xds_client(), cluster_name, cds_update.eds_service_name.c_str(),
1003 cds_update.lrs_load_reporting_server_name.has_value()
1004 ? cds_update.lrs_load_reporting_server_name.value().c_str()
1005 : "(N/A)");
1006 }
1007 // Record the EDS resource names seen.
1008 eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
1009 ? cluster_name
1010 : cds_update.eds_service_name);
1011 // Ignore identical update.
1012 ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1013 if (cluster_state.update.has_value() &&
1014 cds_update.eds_service_name == cluster_state.update->eds_service_name &&
1015 cds_update.lrs_load_reporting_server_name ==
1016 cluster_state.update->lrs_load_reporting_server_name) {
1017 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1018 gpr_log(GPR_INFO,
1019 "[xds_client %p] CDS update identical to current, ignoring.",
1020 xds_client());
1021 }
1022 continue;
1023 }
1024 // Update the cluster state.
1025 cluster_state.update = std::move(cds_update);
1026 // Notify all watchers.
1027 for (const auto& p : cluster_state.watchers) {
1028 p.first->OnClusterChanged(cluster_state.update.value());
1029 }
1030 }
1031 // For any subscribed resource that is not present in the update,
1032 // remove it from the cache and notify watchers that it does not exist.
1033 for (const auto& p : cds_state.subscribed_resources) {
1034 const std::string& cluster_name = p.first;
1035 if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
1036 ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1037 cluster_state.update.reset();
1038 for (const auto& p : cluster_state.watchers) {
1039 p.first->OnResourceDoesNotExist();
1040 }
1041 }
1042 }
1043 // For any EDS resource that is no longer referred to by any CDS
1044 // resources, remove it from the cache and notify watchers that it
1045 // does not exist.
1046 auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1047 for (const auto& p : eds_state.subscribed_resources) {
1048 const std::string& eds_resource_name = p.first;
1049 if (eds_resource_names_seen.find(eds_resource_name) ==
1050 eds_resource_names_seen.end()) {
1051 EndpointState& endpoint_state =
1052 xds_client()->endpoint_map_[eds_resource_name];
1053 endpoint_state.update.reset();
1054 for (const auto& p : endpoint_state.watchers) {
1055 p.first->OnResourceDoesNotExist();
1056 }
1057 }
1058 }
1059 }
1060
AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map)1061 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
1062 XdsApi::EdsUpdateMap eds_update_map) {
1063 auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1064 for (auto& p : eds_update_map) {
1065 const char* eds_service_name = p.first.c_str();
1066 XdsApi::EdsUpdate& eds_update = p.second;
1067 auto& state = eds_state.subscribed_resources[eds_service_name];
1068 if (state != nullptr) state->Finish();
1069 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1070 gpr_log(GPR_INFO,
1071 "[xds_client %p] EDS response with %" PRIuPTR
1072 " priorities and %" PRIuPTR
1073 " drop categories received (drop_all=%d)",
1074 xds_client(), eds_update.priority_list_update.size(),
1075 eds_update.drop_config->drop_category_list().size(),
1076 eds_update.drop_config->drop_all());
1077 for (size_t priority = 0;
1078 priority < eds_update.priority_list_update.size(); ++priority) {
1079 const auto* locality_map_update = eds_update.priority_list_update.Find(
1080 static_cast<uint32_t>(priority));
1081 gpr_log(GPR_INFO,
1082 "[xds_client %p] Priority %" PRIuPTR " contains %" PRIuPTR
1083 " localities",
1084 xds_client(), priority, locality_map_update->size());
1085 size_t locality_count = 0;
1086 for (const auto& p : locality_map_update->localities) {
1087 const auto& locality = p.second;
1088 gpr_log(GPR_INFO,
1089 "[xds_client %p] Priority %" PRIuPTR ", locality %" PRIuPTR
1090 " %s has weight %d, contains %" PRIuPTR " server addresses",
1091 xds_client(), priority, locality_count,
1092 locality.name->AsHumanReadableString().c_str(),
1093 locality.lb_weight, locality.serverlist.size());
1094 for (size_t i = 0; i < locality.serverlist.size(); ++i) {
1095 std::string ipport = grpc_sockaddr_to_string(
1096 &locality.serverlist[i].address(), false);
1097 gpr_log(GPR_INFO,
1098 "[xds_client %p] Priority %" PRIuPTR ", locality %" PRIuPTR
1099 " %s, server address %" PRIuPTR ": %s",
1100 xds_client(), priority, locality_count,
1101 locality.name->AsHumanReadableString().c_str(), i,
1102 ipport.c_str());
1103 }
1104 ++locality_count;
1105 }
1106 }
1107 for (size_t i = 0;
1108 i < eds_update.drop_config->drop_category_list().size(); ++i) {
1109 const XdsApi::DropConfig::DropCategory& drop_category =
1110 eds_update.drop_config->drop_category_list()[i];
1111 gpr_log(GPR_INFO,
1112 "[xds_client %p] Drop category %s has drop rate %d per million",
1113 xds_client(), drop_category.name.c_str(),
1114 drop_category.parts_per_million);
1115 }
1116 }
1117 EndpointState& endpoint_state =
1118 xds_client()->endpoint_map_[eds_service_name];
1119 // Ignore identical update.
1120 if (endpoint_state.update.has_value()) {
1121 const XdsApi::EdsUpdate& prev_update = endpoint_state.update.value();
1122 const bool priority_list_changed =
1123 prev_update.priority_list_update != eds_update.priority_list_update;
1124 const bool drop_config_changed =
1125 prev_update.drop_config == nullptr ||
1126 *prev_update.drop_config != *eds_update.drop_config;
1127 if (!priority_list_changed && !drop_config_changed) {
1128 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1129 gpr_log(GPR_INFO,
1130 "[xds_client %p] EDS update identical to current, ignoring.",
1131 xds_client());
1132 }
1133 continue;
1134 }
1135 }
1136 // Update the cluster state.
1137 endpoint_state.update = std::move(eds_update);
1138 // Notify all watchers.
1139 for (const auto& p : endpoint_state.watchers) {
1140 p.first->OnEndpointChanged(endpoint_state.update.value());
1141 }
1142 }
1143 }
1144
OnRequestSent(void * arg,grpc_error * error)1145 void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
1146 grpc_error* error) {
1147 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1148 GRPC_ERROR_REF(error); // ref owned by lambda
1149 ads_calld->xds_client()->work_serializer_->Run(
1150 [ads_calld, error]() { ads_calld->OnRequestSentLocked(error); },
1151 DEBUG_LOCATION);
1152 }
1153
OnRequestSentLocked(grpc_error * error)1154 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1155 grpc_error* error) {
1156 if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
1157 // Clean up the sent message.
1158 grpc_byte_buffer_destroy(send_message_payload_);
1159 send_message_payload_ = nullptr;
1160 // Continue to send another pending message if any.
1161 // TODO(roth): The current code to handle buffered messages has the
1162 // advantage of sending only the most recent list of resource names for
1163 // each resource type (no matter how many times that resource type has
1164 // been requested to send while the current message sending is still
1165 // pending). But its disadvantage is that we send the requests in fixed
1166 // order of resource types. We need to fix this if we are seeing some
1167 // resource type(s) starved due to frequent requests of other resource
1168 // type(s).
1169 auto it = buffered_requests_.begin();
1170 if (it != buffered_requests_.end()) {
1171 SendMessageLocked(*it);
1172 buffered_requests_.erase(it);
1173 }
1174 }
1175 Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1176 GRPC_ERROR_UNREF(error);
1177 }
1178
OnResponseReceived(void * arg,grpc_error *)1179 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1180 void* arg, grpc_error* /* error */) {
1181 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1182 ads_calld->xds_client()->work_serializer_->Run(
1183 [ads_calld]() { ads_calld->OnResponseReceivedLocked(); }, DEBUG_LOCATION);
1184 }
1185
OnResponseReceivedLocked()1186 void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1187 // Empty payload means the call was cancelled.
1188 if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1189 Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1190 return;
1191 }
1192 // Read the response.
1193 grpc_byte_buffer_reader bbr;
1194 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1195 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1196 grpc_byte_buffer_reader_destroy(&bbr);
1197 grpc_byte_buffer_destroy(recv_message_payload_);
1198 recv_message_payload_ = nullptr;
1199 // TODO(juanlishen): When we convert this to use the xds protocol, the
1200 // balancer will send us a fallback timeout such that we should go into
1201 // fallback mode if we have lost contact with the balancer after a certain
1202 // period of time. We will need to save the timeout value here, and then
1203 // when the balancer call ends, we will need to start a timer for the
1204 // specified period of time, and if the timer fires, we go into fallback
1205 // mode. We will also need to cancel the timer when we receive a serverlist
1206 // from the balancer.
1207 // Parse the response.
1208 absl::optional<XdsApi::LdsUpdate> lds_update;
1209 absl::optional<XdsApi::RdsUpdate> rds_update;
1210 XdsApi::CdsUpdateMap cds_update_map;
1211 XdsApi::EdsUpdateMap eds_update_map;
1212 std::string version;
1213 std::string nonce;
1214 std::string type_url;
1215 // Note that ParseAdsResponse() also validates the response.
1216 grpc_error* parse_error = xds_client()->api_.ParseAdsResponse(
1217 response_slice, xds_client()->server_name_,
1218 ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
1219 ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
1220 ResourceNamesForRequest(XdsApi::kEdsTypeUrl), &lds_update, &rds_update,
1221 &cds_update_map, &eds_update_map, &version, &nonce, &type_url);
1222 grpc_slice_unref_internal(response_slice);
1223 if (type_url.empty()) {
1224 // Ignore unparsable response.
1225 gpr_log(GPR_ERROR,
1226 "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
1227 xds_client(), grpc_error_string(parse_error));
1228 GRPC_ERROR_UNREF(parse_error);
1229 } else {
1230 // Update nonce.
1231 auto& state = state_map_[type_url];
1232 state.nonce = std::move(nonce);
1233 // NACK or ACK the response.
1234 if (parse_error != GRPC_ERROR_NONE) {
1235 GRPC_ERROR_UNREF(state.error);
1236 state.error = parse_error;
1237 // NACK unacceptable update.
1238 gpr_log(GPR_ERROR,
1239 "[xds_client %p] ADS response invalid for resource type %s "
1240 "version %s, will NACK: nonce=%s error=%s",
1241 xds_client(), type_url.c_str(), version.c_str(),
1242 state.nonce.c_str(), grpc_error_string(parse_error));
1243 SendMessageLocked(type_url);
1244 } else {
1245 seen_response_ = true;
1246 // Accept the ADS response according to the type_url.
1247 if (type_url == XdsApi::kLdsTypeUrl) {
1248 AcceptLdsUpdate(std::move(lds_update));
1249 } else if (type_url == XdsApi::kRdsTypeUrl) {
1250 AcceptRdsUpdate(std::move(rds_update));
1251 } else if (type_url == XdsApi::kCdsTypeUrl) {
1252 AcceptCdsUpdate(std::move(cds_update_map));
1253 } else if (type_url == XdsApi::kEdsTypeUrl) {
1254 AcceptEdsUpdate(std::move(eds_update_map));
1255 }
1256 state.version = std::move(version);
1257 // ACK the update.
1258 SendMessageLocked(type_url);
1259 // Start load reporting if needed.
1260 auto& lrs_call = chand()->lrs_calld_;
1261 if (lrs_call != nullptr) {
1262 LrsCallState* lrs_calld = lrs_call->calld();
1263 if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1264 }
1265 }
1266 }
1267 if (xds_client()->shutting_down_) {
1268 Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked+xds_shutdown");
1269 return;
1270 }
1271 // Keep listening for updates.
1272 grpc_op op;
1273 memset(&op, 0, sizeof(op));
1274 op.op = GRPC_OP_RECV_MESSAGE;
1275 op.data.recv_message.recv_message = &recv_message_payload_;
1276 op.flags = 0;
1277 op.reserved = nullptr;
1278 GPR_ASSERT(call_ != nullptr);
1279 // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1280 const grpc_call_error call_error =
1281 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1282 GPR_ASSERT(GRPC_CALL_OK == call_error);
1283 }
1284
OnStatusReceived(void * arg,grpc_error * error)1285 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1286 void* arg, grpc_error* error) {
1287 AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1288 GRPC_ERROR_REF(error); // ref owned by lambda
1289 ads_calld->xds_client()->work_serializer_->Run(
1290 [ads_calld, error]() { ads_calld->OnStatusReceivedLocked(error); },
1291 DEBUG_LOCATION);
1292 }
1293
OnStatusReceivedLocked(grpc_error * error)1294 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1295 grpc_error* error) {
1296 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1297 char* status_details = grpc_slice_to_c_string(status_details_);
1298 gpr_log(GPR_INFO,
1299 "[xds_client %p] ADS call status received. Status = %d, details "
1300 "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
1301 xds_client(), status_code_, status_details, chand(), this, call_,
1302 grpc_error_string(error));
1303 gpr_free(status_details);
1304 }
1305 // Ignore status from a stale call.
1306 if (IsCurrentCallOnChannel()) {
1307 // Try to restart the call.
1308 parent_->OnCallFinishedLocked();
1309 // Send error to all watchers.
1310 xds_client()->NotifyOnError(
1311 GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
1312 }
1313 Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1314 GRPC_ERROR_UNREF(error);
1315 }
1316
IsCurrentCallOnChannel() const1317 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1318 // If the retryable ADS call is null (which only happens when the xds channel
1319 // is shutting down), all the ADS calls are stale.
1320 if (chand()->ads_calld_ == nullptr) return false;
1321 return this == chand()->ads_calld_->calld();
1322 }
1323
1324 std::set<absl::string_view>
ResourceNamesForRequest(const std::string & type_url)1325 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1326 const std::string& type_url) {
1327 std::set<absl::string_view> resource_names;
1328 auto it = state_map_.find(type_url);
1329 if (it != state_map_.end()) {
1330 for (auto& p : it->second.subscribed_resources) {
1331 resource_names.insert(p.first);
1332 OrphanablePtr<ResourceState>& state = p.second;
1333 state->Start(Ref());
1334 }
1335 }
1336 return resource_names;
1337 }
1338
1339 //
1340 // XdsClient::ChannelState::LrsCallState::Reporter
1341 //
1342
Orphan()1343 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1344 if (next_report_timer_callback_pending_) {
1345 grpc_timer_cancel(&next_report_timer_);
1346 }
1347 }
1348
1349 void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked()1350 ScheduleNextReportLocked() {
1351 const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1352 grpc_timer_init(&next_report_timer_, next_report_time,
1353 &on_next_report_timer_);
1354 next_report_timer_callback_pending_ = true;
1355 }
1356
OnNextReportTimer(void * arg,grpc_error * error)1357 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1358 void* arg, grpc_error* error) {
1359 Reporter* self = static_cast<Reporter*>(arg);
1360 GRPC_ERROR_REF(error); // ref owned by lambda
1361 self->xds_client()->work_serializer_->Run(
1362 [self, error]() { self->OnNextReportTimerLocked(error); },
1363 DEBUG_LOCATION);
1364 }
1365
OnNextReportTimerLocked(grpc_error * error)1366 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1367 grpc_error* error) {
1368 next_report_timer_callback_pending_ = false;
1369 if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1370 Unref(DEBUG_LOCATION, "Reporter+timer");
1371 } else {
1372 SendReportLocked();
1373 }
1374 GRPC_ERROR_UNREF(error);
1375 }
1376
1377 namespace {
1378
LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap & snapshot)1379 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1380 for (const auto& p : snapshot) {
1381 const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1382 for (const auto& q : cluster_snapshot.dropped_requests) {
1383 if (q.second > 0) return false;
1384 }
1385 for (const auto& q : cluster_snapshot.locality_stats) {
1386 const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1387 if (!locality_snapshot.IsZero()) return false;
1388 }
1389 }
1390 return true;
1391 }
1392
1393 } // namespace
1394
SendReportLocked()1395 void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1396 // Construct snapshot from all reported stats.
1397 XdsApi::ClusterLoadReportMap snapshot = xds_client()->BuildLoadReportSnapshot(
1398 parent_->send_all_clusters_, parent_->cluster_names_);
1399 // Skip client load report if the counters were all zero in the last
1400 // report and they are still zero in this one.
1401 const bool old_val = last_report_counters_were_zero_;
1402 last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1403 if (old_val && last_report_counters_were_zero_) {
1404 ScheduleNextReportLocked();
1405 return;
1406 }
1407 // Create a request that contains the snapshot.
1408 grpc_slice request_payload_slice =
1409 xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1410 parent_->send_message_payload_ =
1411 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1412 grpc_slice_unref_internal(request_payload_slice);
1413 // Send the report.
1414 grpc_op op;
1415 memset(&op, 0, sizeof(op));
1416 op.op = GRPC_OP_SEND_MESSAGE;
1417 op.data.send_message.send_message = parent_->send_message_payload_;
1418 grpc_call_error call_error = grpc_call_start_batch_and_execute(
1419 parent_->call_, &op, 1, &on_report_done_);
1420 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1421 gpr_log(GPR_ERROR,
1422 "[xds_client %p] calld=%p call_error=%d sending client load report",
1423 xds_client(), this, call_error);
1424 GPR_ASSERT(GRPC_CALL_OK == call_error);
1425 }
1426 }
1427
OnReportDone(void * arg,grpc_error * error)1428 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
1429 void* arg, grpc_error* error) {
1430 Reporter* self = static_cast<Reporter*>(arg);
1431 GRPC_ERROR_REF(error); // ref owned by lambda
1432 self->xds_client()->work_serializer_->Run(
1433 [self, error]() { self->OnReportDoneLocked(error); }, DEBUG_LOCATION);
1434 }
1435
OnReportDoneLocked(grpc_error * error)1436 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1437 grpc_error* error) {
1438 grpc_byte_buffer_destroy(parent_->send_message_payload_);
1439 parent_->send_message_payload_ = nullptr;
1440 // If there are no more registered stats to report, cancel the call.
1441 if (xds_client()->load_report_map_.empty()) {
1442 parent_->chand()->StopLrsCall();
1443 Unref(DEBUG_LOCATION, "Reporter+report_done+no_more_reporters");
1444 return;
1445 }
1446 if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1447 // If this reporter is no longer the current one on the call, the reason
1448 // might be that it was orphaned for a new one due to config update.
1449 if (!IsCurrentReporterOnCall()) {
1450 parent_->MaybeStartReportingLocked();
1451 }
1452 Unref(DEBUG_LOCATION, "Reporter+report_done");
1453 } else {
1454 ScheduleNextReportLocked();
1455 }
1456 GRPC_ERROR_UNREF(error);
1457 }
1458
1459 //
1460 // XdsClient::ChannelState::LrsCallState
1461 //
1462
LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent)1463 XdsClient::ChannelState::LrsCallState::LrsCallState(
1464 RefCountedPtr<RetryableCall<LrsCallState>> parent)
1465 : InternallyRefCounted<LrsCallState>(&grpc_xds_client_trace),
1466 parent_(std::move(parent)) {
1467 // Init the LRS call. Note that the call will progress every time there's
1468 // activity in xds_client()->interested_parties_, which is comprised of
1469 // the polling entities from client_channel.
1470 GPR_ASSERT(xds_client() != nullptr);
1471 GPR_ASSERT(!xds_client()->server_name_.empty());
1472 call_ = grpc_channel_create_pollset_set_call(
1473 chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1474 xds_client()->interested_parties_,
1475 GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS,
1476 nullptr, GRPC_MILLIS_INF_FUTURE, nullptr);
1477 GPR_ASSERT(call_ != nullptr);
1478 // Init the request payload.
1479 grpc_slice request_payload_slice =
1480 xds_client()->api_.CreateLrsInitialRequest(xds_client()->server_name_);
1481 send_message_payload_ =
1482 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1483 grpc_slice_unref_internal(request_payload_slice);
1484 // Init other data associated with the LRS call.
1485 grpc_metadata_array_init(&initial_metadata_recv_);
1486 grpc_metadata_array_init(&trailing_metadata_recv_);
1487 // Start the call.
1488 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1489 gpr_log(GPR_INFO,
1490 "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
1491 "call: %p)",
1492 xds_client(), chand(), this, call_);
1493 }
1494 // Create the ops.
1495 grpc_call_error call_error;
1496 grpc_op ops[3];
1497 memset(ops, 0, sizeof(ops));
1498 // Op: send initial metadata.
1499 grpc_op* op = ops;
1500 op->op = GRPC_OP_SEND_INITIAL_METADATA;
1501 op->data.send_initial_metadata.count = 0;
1502 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
1503 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1504 op->reserved = nullptr;
1505 op++;
1506 // Op: send request message.
1507 GPR_ASSERT(send_message_payload_ != nullptr);
1508 op->op = GRPC_OP_SEND_MESSAGE;
1509 op->data.send_message.send_message = send_message_payload_;
1510 op->flags = 0;
1511 op->reserved = nullptr;
1512 op++;
1513 Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1514 GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
1515 grpc_schedule_on_exec_ctx);
1516 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1517 &on_initial_request_sent_);
1518 GPR_ASSERT(GRPC_CALL_OK == call_error);
1519 // Op: recv initial metadata.
1520 op = ops;
1521 op->op = GRPC_OP_RECV_INITIAL_METADATA;
1522 op->data.recv_initial_metadata.recv_initial_metadata =
1523 &initial_metadata_recv_;
1524 op->flags = 0;
1525 op->reserved = nullptr;
1526 op++;
1527 // Op: recv response.
1528 op->op = GRPC_OP_RECV_MESSAGE;
1529 op->data.recv_message.recv_message = &recv_message_payload_;
1530 op->flags = 0;
1531 op->reserved = nullptr;
1532 op++;
1533 Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1534 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1535 grpc_schedule_on_exec_ctx);
1536 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1537 &on_response_received_);
1538 GPR_ASSERT(GRPC_CALL_OK == call_error);
1539 // Op: recv server status.
1540 op = ops;
1541 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1542 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1543 op->data.recv_status_on_client.status = &status_code_;
1544 op->data.recv_status_on_client.status_details = &status_details_;
1545 op->flags = 0;
1546 op->reserved = nullptr;
1547 op++;
1548 // This callback signals the end of the call, so it relies on the initial
1549 // ref instead of a new ref. When it's invoked, it's the initial ref that is
1550 // unreffed.
1551 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1552 grpc_schedule_on_exec_ctx);
1553 call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1554 &on_status_received_);
1555 GPR_ASSERT(GRPC_CALL_OK == call_error);
1556 }
1557
~LrsCallState()1558 XdsClient::ChannelState::LrsCallState::~LrsCallState() {
1559 grpc_metadata_array_destroy(&initial_metadata_recv_);
1560 grpc_metadata_array_destroy(&trailing_metadata_recv_);
1561 grpc_byte_buffer_destroy(send_message_payload_);
1562 grpc_byte_buffer_destroy(recv_message_payload_);
1563 grpc_slice_unref_internal(status_details_);
1564 GPR_ASSERT(call_ != nullptr);
1565 grpc_call_unref(call_);
1566 }
1567
Orphan()1568 void XdsClient::ChannelState::LrsCallState::Orphan() {
1569 reporter_.reset();
1570 GPR_ASSERT(call_ != nullptr);
1571 // If we are here because xds_client wants to cancel the call,
1572 // on_status_received_ will complete the cancellation and clean up. Otherwise,
1573 // we are here because xds_client has to orphan a failed call, then the
1574 // following cancellation will be a no-op.
1575 grpc_call_cancel(call_, nullptr);
1576 // Note that the initial ref is hold by on_status_received_. So the
1577 // corresponding unref happens in on_status_received_ instead of here.
1578 }
1579
MaybeStartReportingLocked()1580 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1581 // Don't start again if already started.
1582 if (reporter_ != nullptr) return;
1583 // Don't start if the previous send_message op (of the initial request or the
1584 // last report of the previous reporter) hasn't completed.
1585 if (send_message_payload_ != nullptr) return;
1586 // Don't start if no LRS response has arrived.
1587 if (!seen_response()) return;
1588 // Don't start if the ADS call hasn't received any valid response. Note that
1589 // this must be the first channel because it is the current channel but its
1590 // ADS call hasn't seen any response.
1591 if (chand()->ads_calld_ == nullptr ||
1592 chand()->ads_calld_->calld() == nullptr ||
1593 !chand()->ads_calld_->calld()->seen_response()) {
1594 return;
1595 }
1596 // Start reporting.
1597 reporter_ = MakeOrphanable<Reporter>(
1598 Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1599 }
1600
OnInitialRequestSent(void * arg,grpc_error *)1601 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
1602 void* arg, grpc_error* /*error*/) {
1603 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1604 lrs_calld->xds_client()->work_serializer_->Run(
1605 [lrs_calld]() { lrs_calld->OnInitialRequestSentLocked(); },
1606 DEBUG_LOCATION);
1607 }
1608
OnInitialRequestSentLocked()1609 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1610 // Clear the send_message_payload_.
1611 grpc_byte_buffer_destroy(send_message_payload_);
1612 send_message_payload_ = nullptr;
1613 MaybeStartReportingLocked();
1614 Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1615 }
1616
OnResponseReceived(void * arg,grpc_error *)1617 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1618 void* arg, grpc_error* /*error*/) {
1619 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1620 lrs_calld->xds_client()->work_serializer_->Run(
1621 [lrs_calld]() { lrs_calld->OnResponseReceivedLocked(); }, DEBUG_LOCATION);
1622 }
1623
OnResponseReceivedLocked()1624 void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1625 // Empty payload means the call was cancelled.
1626 if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1627 Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1628 return;
1629 }
1630 // Read the response.
1631 grpc_byte_buffer_reader bbr;
1632 grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1633 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1634 grpc_byte_buffer_reader_destroy(&bbr);
1635 grpc_byte_buffer_destroy(recv_message_payload_);
1636 recv_message_payload_ = nullptr;
1637 // This anonymous lambda is a hack to avoid the usage of goto.
1638 [&]() {
1639 // Parse the response.
1640 bool send_all_clusters = false;
1641 std::set<std::string> new_cluster_names;
1642 grpc_millis new_load_reporting_interval;
1643 grpc_error* parse_error = xds_client()->api_.ParseLrsResponse(
1644 response_slice, &send_all_clusters, &new_cluster_names,
1645 &new_load_reporting_interval);
1646 if (parse_error != GRPC_ERROR_NONE) {
1647 gpr_log(GPR_ERROR,
1648 "[xds_client %p] LRS response parsing failed. error=%s",
1649 xds_client(), grpc_error_string(parse_error));
1650 GRPC_ERROR_UNREF(parse_error);
1651 return;
1652 }
1653 seen_response_ = true;
1654 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1655 gpr_log(
1656 GPR_INFO,
1657 "[xds_client %p] LRS response received, %" PRIuPTR
1658 " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1659 "ms",
1660 xds_client(), new_cluster_names.size(), send_all_clusters,
1661 new_load_reporting_interval);
1662 size_t i = 0;
1663 for (const auto& name : new_cluster_names) {
1664 gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1665 xds_client(), i++, name.c_str());
1666 }
1667 }
1668 if (new_load_reporting_interval <
1669 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1670 new_load_reporting_interval =
1671 GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1672 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1673 gpr_log(GPR_INFO,
1674 "[xds_client %p] Increased load_report_interval to minimum "
1675 "value %dms",
1676 xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1677 }
1678 }
1679 // Ignore identical update.
1680 if (send_all_clusters == send_all_clusters_ &&
1681 cluster_names_ == new_cluster_names &&
1682 load_reporting_interval_ == new_load_reporting_interval) {
1683 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1684 gpr_log(GPR_INFO,
1685 "[xds_client %p] Incoming LRS response identical to current, "
1686 "ignoring.",
1687 xds_client());
1688 }
1689 return;
1690 }
1691 // Stop current load reporting (if any) to adopt the new config.
1692 reporter_.reset();
1693 // Record the new config.
1694 send_all_clusters_ = send_all_clusters;
1695 cluster_names_ = std::move(new_cluster_names);
1696 load_reporting_interval_ = new_load_reporting_interval;
1697 // Try starting sending load report.
1698 MaybeStartReportingLocked();
1699 }();
1700 grpc_slice_unref_internal(response_slice);
1701 if (xds_client()->shutting_down_) {
1702 Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked+xds_shutdown");
1703 return;
1704 }
1705 // Keep listening for LRS config updates.
1706 grpc_op op;
1707 memset(&op, 0, sizeof(op));
1708 op.op = GRPC_OP_RECV_MESSAGE;
1709 op.data.recv_message.recv_message = &recv_message_payload_;
1710 op.flags = 0;
1711 op.reserved = nullptr;
1712 GPR_ASSERT(call_ != nullptr);
1713 // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1714 const grpc_call_error call_error =
1715 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1716 GPR_ASSERT(GRPC_CALL_OK == call_error);
1717 }
1718
OnStatusReceived(void * arg,grpc_error * error)1719 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1720 void* arg, grpc_error* error) {
1721 LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1722 GRPC_ERROR_REF(error); // ref owned by lambda
1723 lrs_calld->xds_client()->work_serializer_->Run(
1724 [lrs_calld, error]() { lrs_calld->OnStatusReceivedLocked(error); },
1725 DEBUG_LOCATION);
1726 }
1727
OnStatusReceivedLocked(grpc_error * error)1728 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1729 grpc_error* error) {
1730 GPR_ASSERT(call_ != nullptr);
1731 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1732 char* status_details = grpc_slice_to_c_string(status_details_);
1733 gpr_log(GPR_INFO,
1734 "[xds_client %p] LRS call status received. Status = %d, details "
1735 "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
1736 xds_client(), status_code_, status_details, chand(), this, call_,
1737 grpc_error_string(error));
1738 gpr_free(status_details);
1739 }
1740 // Ignore status from a stale call.
1741 if (IsCurrentCallOnChannel()) {
1742 GPR_ASSERT(!xds_client()->shutting_down_);
1743 // Try to restart the call.
1744 parent_->OnCallFinishedLocked();
1745 }
1746 Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1747 GRPC_ERROR_UNREF(error);
1748 }
1749
IsCurrentCallOnChannel() const1750 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1751 // If the retryable LRS call is null (which only happens when the xds channel
1752 // is shutting down), all the LRS calls are stale.
1753 if (chand()->lrs_calld_ == nullptr) return false;
1754 return this == chand()->lrs_calld_->calld();
1755 }
1756
1757 //
1758 // XdsClient
1759 //
1760
1761 namespace {
1762
GetRequestTimeout(const grpc_channel_args & args)1763 grpc_millis GetRequestTimeout(const grpc_channel_args& args) {
1764 return grpc_channel_args_find_integer(
1765 &args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
1766 {15000, 0, INT_MAX});
1767 }
1768
1769 } // namespace
1770
XdsClient(std::shared_ptr<WorkSerializer> work_serializer,grpc_pollset_set * interested_parties,absl::string_view server_name,std::unique_ptr<ServiceConfigWatcherInterface> watcher,const grpc_channel_args & channel_args,grpc_error ** error)1771 XdsClient::XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
1772 grpc_pollset_set* interested_parties,
1773 absl::string_view server_name,
1774 std::unique_ptr<ServiceConfigWatcherInterface> watcher,
1775 const grpc_channel_args& channel_args, grpc_error** error)
1776 : InternallyRefCounted<XdsClient>(&grpc_xds_client_trace),
1777 request_timeout_(GetRequestTimeout(channel_args)),
1778 work_serializer_(std::move(work_serializer)),
1779 interested_parties_(interested_parties),
1780 bootstrap_(
1781 XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
1782 api_(this, &grpc_xds_client_trace,
1783 bootstrap_ == nullptr ? nullptr : bootstrap_->node()),
1784 server_name_(server_name),
1785 service_config_watcher_(std::move(watcher)) {
1786 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1787 gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1788 }
1789 if (*error != GRPC_ERROR_NONE) {
1790 gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
1791 this, grpc_error_string(*error));
1792 return;
1793 }
1794 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1795 gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", this,
1796 bootstrap_->server().server_uri.c_str());
1797 }
1798 grpc_channel_args* new_args = BuildXdsChannelArgs(channel_args);
1799 grpc_channel* channel = CreateXdsChannel(*bootstrap_, *new_args, error);
1800 grpc_channel_args_destroy(new_args);
1801 if (*error != GRPC_ERROR_NONE) {
1802 gpr_log(GPR_ERROR, "[xds_client %p] failed to create xds channel: %s", this,
1803 grpc_error_string(*error));
1804 return;
1805 }
1806 chand_ = MakeOrphanable<ChannelState>(
1807 Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel);
1808 if (service_config_watcher_ != nullptr) {
1809 chand_->Subscribe(XdsApi::kLdsTypeUrl, std::string(server_name));
1810 }
1811 }
1812
~XdsClient()1813 XdsClient::~XdsClient() {
1814 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1815 gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1816 }
1817 }
1818
Orphan()1819 void XdsClient::Orphan() {
1820 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1821 gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1822 }
1823 shutting_down_ = true;
1824 chand_.reset();
1825 // We do not clear cluster_map_ and endpoint_map_ if the xds client was
1826 // created by the XdsResolver because the maps contain refs for watchers which
1827 // in turn hold refs to the loadbalancing policies. At this point, it is
1828 // possible for ADS calls to be in progress. Unreffing the loadbalancing
1829 // policies before those calls are done would lead to issues such as
1830 // https://github.com/grpc/grpc/issues/20928.
1831 if (service_config_watcher_ != nullptr) {
1832 cluster_map_.clear();
1833 endpoint_map_.clear();
1834 }
1835 Unref(DEBUG_LOCATION, "XdsClient::Orphan()");
1836 }
1837
WatchClusterData(absl::string_view cluster_name,std::unique_ptr<ClusterWatcherInterface> watcher)1838 void XdsClient::WatchClusterData(
1839 absl::string_view cluster_name,
1840 std::unique_ptr<ClusterWatcherInterface> watcher) {
1841 std::string cluster_name_str = std::string(cluster_name);
1842 ClusterState& cluster_state = cluster_map_[cluster_name_str];
1843 ClusterWatcherInterface* w = watcher.get();
1844 cluster_state.watchers[w] = std::move(watcher);
1845 // If we've already received an CDS update, notify the new watcher
1846 // immediately.
1847 if (cluster_state.update.has_value()) {
1848 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1849 gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
1850 this, cluster_name_str.c_str());
1851 }
1852 w->OnClusterChanged(cluster_state.update.value());
1853 }
1854 chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
1855 }
1856
CancelClusterDataWatch(absl::string_view cluster_name,ClusterWatcherInterface * watcher,bool delay_unsubscription)1857 void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
1858 ClusterWatcherInterface* watcher,
1859 bool delay_unsubscription) {
1860 if (shutting_down_) return;
1861 std::string cluster_name_str = std::string(cluster_name);
1862 ClusterState& cluster_state = cluster_map_[cluster_name_str];
1863 auto it = cluster_state.watchers.find(watcher);
1864 if (it != cluster_state.watchers.end()) {
1865 cluster_state.watchers.erase(it);
1866 if (cluster_state.watchers.empty()) {
1867 cluster_map_.erase(cluster_name_str);
1868 chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str,
1869 delay_unsubscription);
1870 }
1871 }
1872 }
1873
WatchEndpointData(absl::string_view eds_service_name,std::unique_ptr<EndpointWatcherInterface> watcher)1874 void XdsClient::WatchEndpointData(
1875 absl::string_view eds_service_name,
1876 std::unique_ptr<EndpointWatcherInterface> watcher) {
1877 std::string eds_service_name_str = std::string(eds_service_name);
1878 EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1879 EndpointWatcherInterface* w = watcher.get();
1880 endpoint_state.watchers[w] = std::move(watcher);
1881 // If we've already received an EDS update, notify the new watcher
1882 // immediately.
1883 if (endpoint_state.update.has_value()) {
1884 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1885 gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
1886 this, eds_service_name_str.c_str());
1887 }
1888 w->OnEndpointChanged(endpoint_state.update.value());
1889 }
1890 chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
1891 }
1892
CancelEndpointDataWatch(absl::string_view eds_service_name,EndpointWatcherInterface * watcher,bool delay_unsubscription)1893 void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
1894 EndpointWatcherInterface* watcher,
1895 bool delay_unsubscription) {
1896 if (shutting_down_) return;
1897 std::string eds_service_name_str = std::string(eds_service_name);
1898 EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1899 auto it = endpoint_state.watchers.find(watcher);
1900 if (it != endpoint_state.watchers.end()) {
1901 endpoint_state.watchers.erase(it);
1902 if (endpoint_state.watchers.empty()) {
1903 endpoint_map_.erase(eds_service_name_str);
1904 chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str,
1905 delay_unsubscription);
1906 }
1907 }
1908 }
1909
AddClusterDropStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name)1910 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
1911 absl::string_view lrs_server, absl::string_view cluster_name,
1912 absl::string_view eds_service_name) {
1913 // TODO(roth): When we add support for direct federation, use the
1914 // server name specified in lrs_server.
1915 auto key =
1916 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1917 // We jump through some hoops here to make sure that the absl::string_views
1918 // stored in the XdsClusterDropStats object point to the strings
1919 // in the load_report_map_ key, so that they have the same lifetime.
1920 auto it = load_report_map_
1921 .emplace(std::make_pair(std::move(key), LoadReportState()))
1922 .first;
1923 auto cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
1924 Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
1925 it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/);
1926 it->second.drop_stats.insert(cluster_drop_stats.get());
1927 chand_->MaybeStartLrsCall();
1928 return cluster_drop_stats;
1929 }
1930
RemoveClusterDropStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,XdsClusterDropStats * cluster_drop_stats)1931 void XdsClient::RemoveClusterDropStats(
1932 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
1933 absl::string_view eds_service_name,
1934 XdsClusterDropStats* cluster_drop_stats) {
1935 auto load_report_it = load_report_map_.find(
1936 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
1937 if (load_report_it == load_report_map_.end()) return;
1938 LoadReportState& load_report_state = load_report_it->second;
1939 // TODO(roth): When we add support for direct federation, use the
1940 // server name specified in lrs_server.
1941 auto it = load_report_state.drop_stats.find(cluster_drop_stats);
1942 if (it != load_report_state.drop_stats.end()) {
1943 // Record final drop stats in deleted_drop_stats, which will be
1944 // added to the next load report.
1945 for (const auto& p : cluster_drop_stats->GetSnapshotAndReset()) {
1946 load_report_state.deleted_drop_stats[p.first] += p.second;
1947 }
1948 load_report_state.drop_stats.erase(it);
1949 }
1950 }
1951
AddClusterLocalityStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality)1952 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
1953 absl::string_view lrs_server, absl::string_view cluster_name,
1954 absl::string_view eds_service_name,
1955 RefCountedPtr<XdsLocalityName> locality) {
1956 // TODO(roth): When we add support for direct federation, use the
1957 // server name specified in lrs_server.
1958 auto key =
1959 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1960 // We jump through some hoops here to make sure that the absl::string_views
1961 // stored in the XdsClusterLocalityStats object point to the strings
1962 // in the load_report_map_ key, so that they have the same lifetime.
1963 auto it = load_report_map_
1964 .emplace(std::make_pair(std::move(key), LoadReportState()))
1965 .first;
1966 auto cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
1967 Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
1968 it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
1969 locality);
1970 it->second.locality_stats[std::move(locality)].locality_stats.insert(
1971 cluster_locality_stats.get());
1972 chand_->MaybeStartLrsCall();
1973 return cluster_locality_stats;
1974 }
1975
RemoveClusterLocalityStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,XdsClusterLocalityStats * cluster_locality_stats)1976 void XdsClient::RemoveClusterLocalityStats(
1977 absl::string_view /*lrs_server*/, absl::string_view cluster_name,
1978 absl::string_view eds_service_name,
1979 const RefCountedPtr<XdsLocalityName>& locality,
1980 XdsClusterLocalityStats* cluster_locality_stats) {
1981 auto load_report_it = load_report_map_.find(
1982 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
1983 if (load_report_it == load_report_map_.end()) return;
1984 LoadReportState& load_report_state = load_report_it->second;
1985 // TODO(roth): When we add support for direct federation, use the
1986 // server name specified in lrs_server.
1987 auto locality_it = load_report_state.locality_stats.find(locality);
1988 if (locality_it == load_report_state.locality_stats.end()) return;
1989 auto& locality_set = locality_it->second.locality_stats;
1990 auto it = locality_set.find(cluster_locality_stats);
1991 if (it != locality_set.end()) {
1992 // Record final snapshot in deleted_locality_stats, which will be
1993 // added to the next load report.
1994 locality_it->second.deleted_locality_stats.emplace_back(
1995 cluster_locality_stats->GetSnapshotAndReset());
1996 locality_set.erase(it);
1997 }
1998 }
1999
ResetBackoff()2000 void XdsClient::ResetBackoff() {
2001 if (chand_ != nullptr) {
2002 grpc_channel_reset_connect_backoff(chand_->channel());
2003 }
2004 }
2005
2006 namespace {
CreateServiceConfigActionCluster(const std::string & cluster_name)2007 std::string CreateServiceConfigActionCluster(const std::string& cluster_name) {
2008 return absl::StrFormat(
2009 " \"cds:%s\":{\n"
2010 " \"childPolicy\":[ {\n"
2011 " \"cds_experimental\":{\n"
2012 " \"cluster\": \"%s\"\n"
2013 " }\n"
2014 " } ]\n"
2015 " }",
2016 cluster_name, cluster_name);
2017 }
2018
CreateServiceConfigRoute(const std::string & action_name,const XdsApi::RdsUpdate::RdsRoute & route)2019 std::string CreateServiceConfigRoute(const std::string& action_name,
2020 const XdsApi::RdsUpdate::RdsRoute& route) {
2021 std::vector<std::string> headers;
2022 for (const auto& header : route.matchers.header_matchers) {
2023 std::string header_matcher;
2024 switch (header.type) {
2025 case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
2026 HeaderMatcherType::EXACT:
2027 header_matcher = absl::StrFormat(" \"exact_match\": \"%s\"",
2028 header.string_matcher);
2029 break;
2030 case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
2031 HeaderMatcherType::REGEX:
2032 header_matcher = absl::StrFormat(" \"regex_match\": \"%s\"",
2033 header.regex_match->pattern());
2034 break;
2035 case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
2036 HeaderMatcherType::RANGE:
2037 header_matcher = absl::StrFormat(
2038 " \"range_match\":{\n"
2039 " \"start\":%d,\n"
2040 " \"end\":%d\n"
2041 " }",
2042 header.range_start, header.range_end);
2043 break;
2044 case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
2045 HeaderMatcherType::PRESENT:
2046 header_matcher =
2047 absl::StrFormat(" \"present_match\": %s",
2048 header.present_match ? "true" : "false");
2049 break;
2050 case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
2051 HeaderMatcherType::PREFIX:
2052 header_matcher = absl::StrFormat(
2053 " \"prefix_match\": \"%s\"", header.string_matcher);
2054 break;
2055 case XdsApi::RdsUpdate::RdsRoute::Matchers::HeaderMatcher::
2056 HeaderMatcherType::SUFFIX:
2057 header_matcher = absl::StrFormat(
2058 " \"suffix_match\": \"%s\"", header.string_matcher);
2059 break;
2060 default:
2061 break;
2062 }
2063 std::vector<std::string> header_parts;
2064 header_parts.push_back(
2065 absl::StrFormat(" { \n"
2066 " \"name\": \"%s\",\n",
2067 header.name));
2068 header_parts.push_back(header_matcher);
2069 if (header.invert_match) {
2070 header_parts.push_back(
2071 absl::StrFormat(",\n"
2072 " \"invert_match\": true"));
2073 }
2074 header_parts.push_back(
2075 absl::StrFormat("\n"
2076 " }"));
2077 headers.push_back(absl::StrJoin(header_parts, ""));
2078 }
2079 std::vector<std::string> headers_service_config;
2080 if (!headers.empty()) {
2081 headers_service_config.push_back("\"headers\":[\n");
2082 headers_service_config.push_back(absl::StrJoin(headers, ","));
2083 headers_service_config.push_back(" ],\n");
2084 }
2085 std::string path_match_str;
2086 switch (route.matchers.path_matcher.type) {
2087 case XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::PathMatcherType::
2088 PREFIX:
2089 path_match_str = absl::StrFormat(
2090 "\"prefix\": \"%s\",\n", route.matchers.path_matcher.string_matcher);
2091 break;
2092 case XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::PathMatcherType::
2093 PATH:
2094 path_match_str = absl::StrFormat(
2095 "\"path\": \"%s\",\n", route.matchers.path_matcher.string_matcher);
2096 break;
2097 case XdsApi::RdsUpdate::RdsRoute::Matchers::PathMatcher::PathMatcherType::
2098 REGEX:
2099 path_match_str =
2100 absl::StrFormat("\"regex\": \"%s\",\n",
2101 route.matchers.path_matcher.regex_matcher->pattern());
2102 break;
2103 }
2104 return absl::StrFormat(
2105 " { \n"
2106 " %s"
2107 " %s"
2108 " %s"
2109 " \"action\": \"%s\"\n"
2110 " }",
2111 path_match_str, absl::StrJoin(headers_service_config, ""),
2112 route.matchers.fraction_per_million.has_value()
2113 ? absl::StrFormat("\"match_fraction\":%d,\n",
2114 route.matchers.fraction_per_million.value())
2115 : "",
2116 action_name);
2117 }
2118
2119 // Create the service config for one weighted cluster.
CreateServiceConfigActionWeightedCluster(const std::string & name,const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight> & clusters)2120 std::string CreateServiceConfigActionWeightedCluster(
2121 const std::string& name,
2122 const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>& clusters) {
2123 std::vector<std::string> config_parts;
2124 config_parts.push_back(
2125 absl::StrFormat(" \"weighted:%s\":{\n"
2126 " \"childPolicy\":[ {\n"
2127 " \"weighted_target_experimental\":{\n"
2128 " \"targets\":{\n",
2129 name));
2130 std::vector<std::string> weighted_targets;
2131 weighted_targets.reserve(clusters.size());
2132 for (const auto& cluster_weight : clusters) {
2133 weighted_targets.push_back(absl::StrFormat(
2134 " \"%s\":{\n"
2135 " \"weight\":%d,\n"
2136 " \"childPolicy\":[ {\n"
2137 " \"cds_experimental\":{\n"
2138 " \"cluster\": \"%s\"\n"
2139 " }\n"
2140 " } ]\n"
2141 " }",
2142 cluster_weight.name, cluster_weight.weight, cluster_weight.name));
2143 }
2144 config_parts.push_back(absl::StrJoin(weighted_targets, ",\n"));
2145 config_parts.push_back(
2146 " }\n"
2147 " }\n"
2148 " } ]\n"
2149 " }");
2150 return absl::StrJoin(config_parts, "");
2151 }
2152
2153 struct WeightedClustersKeys {
2154 std::string cluster_names_key;
2155 std::string cluster_weights_key;
2156 };
2157
2158 // Returns the cluster names and weights key or the cluster names only key.
GetWeightedClustersKey(const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight> & weighted_clusters)2159 WeightedClustersKeys GetWeightedClustersKey(
2160 const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>&
2161 weighted_clusters) {
2162 std::set<std::string> cluster_names;
2163 std::set<std::string> cluster_weights;
2164 for (const auto& cluster_weight : weighted_clusters) {
2165 cluster_names.emplace(absl::StrFormat("%s", cluster_weight.name));
2166 cluster_weights.emplace(
2167 absl::StrFormat("%s_%d", cluster_weight.name, cluster_weight.weight));
2168 }
2169 return {absl::StrJoin(cluster_names, "_"),
2170 absl::StrJoin(cluster_weights, "_")};
2171 }
2172
2173 } // namespace
2174
WeightedClustersActionName(const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight> & weighted_clusters)2175 std::string XdsClient::WeightedClustersActionName(
2176 const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>&
2177 weighted_clusters) {
2178 WeightedClustersKeys keys = GetWeightedClustersKey(weighted_clusters);
2179 auto cluster_names_map_it =
2180 weighted_cluster_index_map_.find(keys.cluster_names_key);
2181 GPR_ASSERT(cluster_names_map_it != weighted_cluster_index_map_.end());
2182 const auto& cluster_weights_map =
2183 cluster_names_map_it->second.cluster_weights_map;
2184 auto cluster_weights_map_it =
2185 cluster_weights_map.find(keys.cluster_weights_key);
2186 GPR_ASSERT(cluster_weights_map_it != cluster_weights_map.end());
2187 return absl::StrFormat("%s_%d", keys.cluster_names_key,
2188 cluster_weights_map_it->second);
2189 }
2190
UpdateWeightedClusterIndexMap(const XdsApi::RdsUpdate & rds_update)2191 void XdsClient::UpdateWeightedClusterIndexMap(
2192 const XdsApi::RdsUpdate& rds_update) {
2193 // Construct a list of unique WeightedCluster
2194 // actions which we need to process: to find action names
2195 std::map<std::string /* cluster_weights_key */,
2196 std::string /* cluster_names_key */>
2197 actions_to_process;
2198 for (const auto& route : rds_update.routes) {
2199 if (!route.weighted_clusters.empty()) {
2200 WeightedClustersKeys keys =
2201 GetWeightedClustersKey(route.weighted_clusters);
2202 auto action_it = actions_to_process.find(keys.cluster_weights_key);
2203 if (action_it == actions_to_process.end()) {
2204 actions_to_process[std::move(keys.cluster_weights_key)] =
2205 std::move(keys.cluster_names_key);
2206 }
2207 }
2208 }
2209 // First pass of all unique WeightedCluster actions: if the exact same
2210 // weighted target policy (same clusters and weights) appears in the old map,
2211 // then that old action name is taken again and should be moved to the new
2212 // map; any other action names from the old set of actions are candidates for
2213 // reuse.
2214 XdsClient::WeightedClusterIndexMap new_weighted_cluster_index_map;
2215 for (auto action_it = actions_to_process.begin();
2216 action_it != actions_to_process.end();) {
2217 const std::string& cluster_names_key = action_it->second;
2218 const std::string& cluster_weights_key = action_it->first;
2219 auto old_cluster_names_map_it =
2220 weighted_cluster_index_map_.find(cluster_names_key);
2221 if (old_cluster_names_map_it != weighted_cluster_index_map_.end()) {
2222 // Add cluster_names_key to the new map and copy next_index.
2223 auto& new_cluster_names_info =
2224 new_weighted_cluster_index_map[cluster_names_key];
2225 new_cluster_names_info.next_index =
2226 old_cluster_names_map_it->second.next_index;
2227 // Lookup cluster_weights_key in old map.
2228 auto& old_cluster_weights_map =
2229 old_cluster_names_map_it->second.cluster_weights_map;
2230 auto old_cluster_weights_map_it =
2231 old_cluster_weights_map.find(cluster_weights_key);
2232 if (old_cluster_weights_map_it != old_cluster_weights_map.end()) {
2233 // same policy found, move from old map to new map.
2234 new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
2235 old_cluster_weights_map_it->second;
2236 old_cluster_weights_map.erase(old_cluster_weights_map_it);
2237 // This action has been added to new map, so no need to process it
2238 // again.
2239 action_it = actions_to_process.erase(action_it);
2240 continue;
2241 }
2242 }
2243 ++action_it;
2244 }
2245 // Second pass of all remaining unique WeightedCluster actions: if clusters
2246 // for a new action are the same as an old unused action, reuse the name. If
2247 // clusters differ, use a brand new name.
2248 for (const auto& action : actions_to_process) {
2249 const std::string& cluster_names_key = action.second;
2250 const std::string& cluster_weights_key = action.first;
2251 auto& new_cluster_names_info =
2252 new_weighted_cluster_index_map[cluster_names_key];
2253 auto& old_cluster_weights_map =
2254 weighted_cluster_index_map_[cluster_names_key].cluster_weights_map;
2255 auto old_cluster_weights_it = old_cluster_weights_map.begin();
2256 if (old_cluster_weights_it != old_cluster_weights_map.end()) {
2257 // There is something to reuse: this action uses the same set
2258 // of clusters as a previous action and that action name is not
2259 // already taken.
2260 new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
2261 old_cluster_weights_it->second;
2262 // Remove the name from being able to reuse again.
2263 old_cluster_weights_map.erase(old_cluster_weights_it);
2264 } else {
2265 // There is nothing to reuse, take the next index to use and
2266 // increment.
2267 new_cluster_names_info.cluster_weights_map[cluster_weights_key] =
2268 new_cluster_names_info.next_index++;
2269 }
2270 }
2271 weighted_cluster_index_map_ = std::move(new_weighted_cluster_index_map);
2272 }
2273
CreateServiceConfig(const XdsApi::RdsUpdate & rds_update,RefCountedPtr<ServiceConfig> * service_config)2274 grpc_error* XdsClient::CreateServiceConfig(
2275 const XdsApi::RdsUpdate& rds_update,
2276 RefCountedPtr<ServiceConfig>* service_config) {
2277 UpdateWeightedClusterIndexMap(rds_update);
2278 std::vector<std::string> actions_vector;
2279 std::vector<std::string> route_table;
2280 std::set<std::string> actions_set;
2281 for (const auto& route : rds_update.routes) {
2282 const std::string action_name =
2283 route.weighted_clusters.empty()
2284 ? route.cluster_name
2285 : WeightedClustersActionName(route.weighted_clusters);
2286 if (actions_set.find(action_name) == actions_set.end()) {
2287 actions_set.emplace(action_name);
2288 actions_vector.push_back(
2289 route.weighted_clusters.empty()
2290 ? CreateServiceConfigActionCluster(action_name)
2291 : CreateServiceConfigActionWeightedCluster(
2292 action_name, route.weighted_clusters));
2293 }
2294 route_table.push_back(CreateServiceConfigRoute(
2295 absl::StrFormat("%s:%s",
2296 route.weighted_clusters.empty() ? "cds" : "weighted",
2297 action_name),
2298 route));
2299 }
2300 std::vector<std::string> config_parts;
2301 config_parts.push_back(
2302 "{\n"
2303 " \"loadBalancingConfig\":[\n"
2304 " { \"xds_routing_experimental\":{\n"
2305 " \"actions\":{\n");
2306 config_parts.push_back(absl::StrJoin(actions_vector, ",\n"));
2307 config_parts.push_back(
2308 " },\n"
2309 " \"routes\":[\n");
2310 config_parts.push_back(absl::StrJoin(route_table, ",\n"));
2311 config_parts.push_back(
2312 " ]\n"
2313 " } }\n"
2314 " ]\n"
2315 "}");
2316 std::string json = absl::StrJoin(config_parts, "");
2317 grpc_error* error = GRPC_ERROR_NONE;
2318 *service_config = ServiceConfig::Create(json.c_str(), &error);
2319 return error;
2320 }
2321
BuildLoadReportSnapshot(bool send_all_clusters,const std::set<std::string> & clusters)2322 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshot(
2323 bool send_all_clusters, const std::set<std::string>& clusters) {
2324 XdsApi::ClusterLoadReportMap snapshot_map;
2325 for (auto load_report_it = load_report_map_.begin();
2326 load_report_it != load_report_map_.end();) {
2327 // Cluster key is cluster and EDS service name.
2328 const auto& cluster_key = load_report_it->first;
2329 LoadReportState& load_report = load_report_it->second;
2330 // If the CDS response for a cluster indicates to use LRS but the
2331 // LRS server does not say that it wants reports for this cluster,
2332 // then we'll have stats objects here whose data we're not going to
2333 // include in the load report. However, we still need to clear out
2334 // the data from the stats objects, so that if the LRS server starts
2335 // asking for the data in the future, we don't incorrectly include
2336 // data from previous reporting intervals in that future report.
2337 const bool record_stats =
2338 send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2339 XdsApi::ClusterLoadReport snapshot;
2340 // Aggregate drop stats.
2341 snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2342 for (auto& drop_stats : load_report.drop_stats) {
2343 for (const auto& p : drop_stats->GetSnapshotAndReset()) {
2344 snapshot.dropped_requests[p.first] += p.second;
2345 }
2346 }
2347 // Aggregate locality stats.
2348 for (auto it = load_report.locality_stats.begin();
2349 it != load_report.locality_stats.end();) {
2350 const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2351 auto& locality_state = it->second;
2352 XdsClusterLocalityStats::Snapshot& locality_snapshot =
2353 snapshot.locality_stats[locality_name];
2354 for (auto& locality_stats : locality_state.locality_stats) {
2355 locality_snapshot += locality_stats->GetSnapshotAndReset();
2356 }
2357 // Add final snapshots from recently deleted locality stats objects.
2358 for (auto& deleted_locality_stats :
2359 locality_state.deleted_locality_stats) {
2360 locality_snapshot += deleted_locality_stats;
2361 }
2362 locality_state.deleted_locality_stats.clear();
2363 // If the only thing left in this entry was final snapshots from
2364 // deleted locality stats objects, remove the entry.
2365 if (locality_state.locality_stats.empty()) {
2366 it = load_report.locality_stats.erase(it);
2367 } else {
2368 ++it;
2369 }
2370 }
2371 if (record_stats) {
2372 // Compute load report interval.
2373 const grpc_millis now = ExecCtx::Get()->Now();
2374 snapshot.load_report_interval = now - load_report.last_report_time;
2375 load_report.last_report_time = now;
2376 // Record snapshot.
2377 snapshot_map[cluster_key] = std::move(snapshot);
2378 }
2379 // If the only thing left in this entry was final snapshots from
2380 // deleted stats objects, remove the entry.
2381 if (load_report.locality_stats.empty() && load_report.drop_stats.empty()) {
2382 load_report_it = load_report_map_.erase(load_report_it);
2383 } else {
2384 ++load_report_it;
2385 }
2386 }
2387 return snapshot_map;
2388 }
2389
NotifyOnError(grpc_error * error)2390 void XdsClient::NotifyOnError(grpc_error* error) {
2391 if (service_config_watcher_ != nullptr) {
2392 service_config_watcher_->OnError(GRPC_ERROR_REF(error));
2393 }
2394 for (const auto& p : cluster_map_) {
2395 const ClusterState& cluster_state = p.second;
2396 for (const auto& p : cluster_state.watchers) {
2397 p.first->OnError(GRPC_ERROR_REF(error));
2398 }
2399 }
2400 for (const auto& p : endpoint_map_) {
2401 const EndpointState& endpoint_state = p.second;
2402 for (const auto& p : endpoint_state.watchers) {
2403 p.first->OnError(GRPC_ERROR_REF(error));
2404 }
2405 }
2406 GRPC_ERROR_UNREF(error);
2407 }
2408
ChannelArgCopy(void * p)2409 void* XdsClient::ChannelArgCopy(void* p) {
2410 XdsClient* xds_client = static_cast<XdsClient*>(p);
2411 xds_client->Ref(DEBUG_LOCATION, "channel arg").release();
2412 return p;
2413 }
2414
ChannelArgDestroy(void * p)2415 void XdsClient::ChannelArgDestroy(void* p) {
2416 XdsClient* xds_client = static_cast<XdsClient*>(p);
2417 xds_client->Unref(DEBUG_LOCATION, "channel arg");
2418 }
2419
ChannelArgCmp(void * p,void * q)2420 int XdsClient::ChannelArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
2421
2422 const grpc_arg_pointer_vtable XdsClient::kXdsClientVtable = {
2423 XdsClient::ChannelArgCopy, XdsClient::ChannelArgDestroy,
2424 XdsClient::ChannelArgCmp};
2425
MakeChannelArg() const2426 grpc_arg XdsClient::MakeChannelArg() const {
2427 return grpc_channel_arg_pointer_create(const_cast<char*>(GRPC_ARG_XDS_CLIENT),
2428 const_cast<XdsClient*>(this),
2429 &XdsClient::kXdsClientVtable);
2430 }
2431
GetFromChannelArgs(const grpc_channel_args & args)2432 RefCountedPtr<XdsClient> XdsClient::GetFromChannelArgs(
2433 const grpc_channel_args& args) {
2434 XdsClient* xds_client =
2435 grpc_channel_args_find_pointer<XdsClient>(&args, GRPC_ARG_XDS_CLIENT);
2436 if (xds_client != nullptr) return xds_client->Ref();
2437 return nullptr;
2438 }
2439
RemoveFromChannelArgs(const grpc_channel_args & args)2440 grpc_channel_args* XdsClient::RemoveFromChannelArgs(
2441 const grpc_channel_args& args) {
2442 const char* arg_name = GRPC_ARG_XDS_CLIENT;
2443 return grpc_channel_args_copy_and_remove(&args, &arg_name, 1);
2444 }
2445
2446 } // namespace grpc_core
2447