1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <string.h>
20
21 #include "absl/strings/str_cat.h"
22
23 #include "src/core/ext/filters/client_channel/lb_policy.h"
24 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
25 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
26 #include "src/core/ext/filters/client_channel/service_config.h"
27 #include "src/core/ext/xds/xds_certificate_provider.h"
28 #include "src/core/ext/xds/xds_client.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/gprpp/memory.h"
31 #include "src/core/lib/gprpp/orphanable.h"
32 #include "src/core/lib/gprpp/ref_counted_ptr.h"
33 #include "src/core/lib/iomgr/closure.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35 #include "src/core/lib/security/credentials/xds/xds_credentials.h"
36 #include "src/core/lib/transport/error_utils.h"
37
38 namespace grpc_core {
39
40 TraceFlag grpc_cds_lb_trace(false, "cds_lb");
41
42 namespace {
43
44 constexpr char kCds[] = "cds_experimental";
45
46 // Config for this LB policy.
47 class CdsLbConfig : public LoadBalancingPolicy::Config {
48 public:
CdsLbConfig(std::string cluster)49 explicit CdsLbConfig(std::string cluster) : cluster_(std::move(cluster)) {}
cluster() const50 const std::string& cluster() const { return cluster_; }
name() const51 const char* name() const override { return kCds; }
52
53 private:
54 std::string cluster_;
55 };
56
57 // CDS LB policy.
58 class CdsLb : public LoadBalancingPolicy {
59 public:
60 CdsLb(RefCountedPtr<XdsClient> xds_client, Args args);
61
name() const62 const char* name() const override { return kCds; }
63
64 void UpdateLocked(UpdateArgs args) override;
65 void ResetBackoffLocked() override;
66 void ExitIdleLocked() override;
67
68 private:
69 // Watcher for getting cluster data from XdsClient.
70 class ClusterWatcher : public XdsClient::ClusterWatcherInterface {
71 public:
ClusterWatcher(RefCountedPtr<CdsLb> parent,std::string name)72 ClusterWatcher(RefCountedPtr<CdsLb> parent, std::string name)
73 : parent_(std::move(parent)), name_(std::move(name)) {}
74
OnClusterChanged(XdsApi::CdsUpdate cluster_data)75 void OnClusterChanged(XdsApi::CdsUpdate cluster_data) override {
76 new Notifier(parent_, name_, std::move(cluster_data));
77 }
OnError(grpc_error * error)78 void OnError(grpc_error* error) override {
79 new Notifier(parent_, name_, error);
80 }
OnResourceDoesNotExist()81 void OnResourceDoesNotExist() override { new Notifier(parent_, name_); }
82
83 private:
84 class Notifier {
85 public:
86 Notifier(RefCountedPtr<CdsLb> parent, std::string name,
87 XdsApi::CdsUpdate update);
88 Notifier(RefCountedPtr<CdsLb> parent, std::string name,
89 grpc_error* error);
90 explicit Notifier(RefCountedPtr<CdsLb> parent, std::string name);
91
92 private:
93 enum Type { kUpdate, kError, kDoesNotExist };
94
95 static void RunInExecCtx(void* arg, grpc_error* error);
96 void RunInWorkSerializer(grpc_error* error);
97
98 RefCountedPtr<CdsLb> parent_;
99 std::string name_;
100 grpc_closure closure_;
101 XdsApi::CdsUpdate update_;
102 Type type_;
103 };
104
105 RefCountedPtr<CdsLb> parent_;
106 std::string name_;
107 };
108
109 struct WatcherState {
110 // Pointer to watcher, to be used when cancelling.
111 // Not owned, so do not dereference.
112 ClusterWatcher* watcher = nullptr;
113 // Most recent update obtained from this watcher.
114 absl::optional<XdsApi::CdsUpdate> update;
115 };
116
117 // Delegating helper to be passed to child policy.
118 class Helper : public ChannelControlHelper {
119 public:
Helper(RefCountedPtr<CdsLb> parent)120 explicit Helper(RefCountedPtr<CdsLb> parent) : parent_(std::move(parent)) {}
121 RefCountedPtr<SubchannelInterface> CreateSubchannel(
122 ServerAddress address, const grpc_channel_args& args) override;
123 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
124 std::unique_ptr<SubchannelPicker> picker) override;
125 void RequestReresolution() override;
126 void AddTraceEvent(TraceSeverity severity,
127 absl::string_view message) override;
128
129 private:
130 RefCountedPtr<CdsLb> parent_;
131 };
132
133 ~CdsLb() override;
134
135 void ShutdownLocked() override;
136
137 bool GenerateDiscoveryMechanismForCluster(
138 const std::string& name, Json::Array* discovery_mechanisms,
139 std::set<std::string>* clusters_needed);
140 void OnClusterChanged(const std::string& name,
141 XdsApi::CdsUpdate cluster_data);
142 void OnError(const std::string& name, grpc_error* error);
143 void OnResourceDoesNotExist(const std::string& name);
144
145 grpc_error* UpdateXdsCertificateProvider(
146 const std::string& cluster_name, const XdsApi::CdsUpdate& cluster_data);
147
148 void CancelClusterDataWatch(absl::string_view cluster_name,
149 XdsClient::ClusterWatcherInterface* watcher,
150 bool delay_unsubscription = false);
151
152 void MaybeDestroyChildPolicyLocked();
153
154 RefCountedPtr<CdsLbConfig> config_;
155
156 // Current channel args from the resolver.
157 const grpc_channel_args* args_ = nullptr;
158
159 // The xds client.
160 RefCountedPtr<XdsClient> xds_client_;
161
162 // Maps from cluster name to the state for that cluster.
163 // The root of the tree is config_->cluster().
164 std::map<std::string, WatcherState> watchers_;
165
166 RefCountedPtr<grpc_tls_certificate_provider> root_certificate_provider_;
167 RefCountedPtr<grpc_tls_certificate_provider> identity_certificate_provider_;
168 RefCountedPtr<XdsCertificateProvider> xds_certificate_provider_;
169
170 // Child LB policy.
171 OrphanablePtr<LoadBalancingPolicy> child_policy_;
172
173 // Internal state.
174 bool shutting_down_ = false;
175 };
176
177 //
178 // CdsLb::ClusterWatcher::Notifier
179 //
180
Notifier(RefCountedPtr<CdsLb> parent,std::string name,XdsApi::CdsUpdate update)181 CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
182 std::string name,
183 XdsApi::CdsUpdate update)
184 : parent_(std::move(parent)),
185 name_(std::move(name)),
186 update_(std::move(update)),
187 type_(kUpdate) {
188 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
189 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
190 }
191
Notifier(RefCountedPtr<CdsLb> parent,std::string name,grpc_error * error)192 CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
193 std::string name, grpc_error* error)
194 : parent_(std::move(parent)), name_(std::move(name)), type_(kError) {
195 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
196 ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
197 }
198
Notifier(RefCountedPtr<CdsLb> parent,std::string name)199 CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
200 std::string name)
201 : parent_(std::move(parent)), name_(std::move(name)), type_(kDoesNotExist) {
202 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
203 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
204 }
205
RunInExecCtx(void * arg,grpc_error * error)206 void CdsLb::ClusterWatcher::Notifier::RunInExecCtx(void* arg,
207 grpc_error* error) {
208 Notifier* self = static_cast<Notifier*>(arg);
209 GRPC_ERROR_REF(error);
210 self->parent_->work_serializer()->Run(
211 [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
212 }
213
RunInWorkSerializer(grpc_error * error)214 void CdsLb::ClusterWatcher::Notifier::RunInWorkSerializer(grpc_error* error) {
215 switch (type_) {
216 case kUpdate:
217 parent_->OnClusterChanged(name_, std::move(update_));
218 break;
219 case kError:
220 parent_->OnError(name_, error);
221 break;
222 case kDoesNotExist:
223 parent_->OnResourceDoesNotExist(name_);
224 break;
225 };
226 delete this;
227 }
228
229 //
230 // CdsLb::Helper
231 //
232
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)233 RefCountedPtr<SubchannelInterface> CdsLb::Helper::CreateSubchannel(
234 ServerAddress address, const grpc_channel_args& args) {
235 if (parent_->shutting_down_) return nullptr;
236 return parent_->channel_control_helper()->CreateSubchannel(std::move(address),
237 args);
238 }
239
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)240 void CdsLb::Helper::UpdateState(grpc_connectivity_state state,
241 const absl::Status& status,
242 std::unique_ptr<SubchannelPicker> picker) {
243 if (parent_->shutting_down_ || parent_->child_policy_ == nullptr) return;
244 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
245 gpr_log(GPR_INFO,
246 "[cdslb %p] state updated by child: %s message_state: (%s)", this,
247 ConnectivityStateName(state), status.ToString().c_str());
248 }
249 parent_->channel_control_helper()->UpdateState(state, status,
250 std::move(picker));
251 }
252
RequestReresolution()253 void CdsLb::Helper::RequestReresolution() {
254 if (parent_->shutting_down_) return;
255 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
256 gpr_log(GPR_INFO, "[cdslb %p] Re-resolution requested from child policy.",
257 parent_.get());
258 }
259 parent_->channel_control_helper()->RequestReresolution();
260 }
261
AddTraceEvent(TraceSeverity severity,absl::string_view message)262 void CdsLb::Helper::AddTraceEvent(TraceSeverity severity,
263 absl::string_view message) {
264 if (parent_->shutting_down_) return;
265 parent_->channel_control_helper()->AddTraceEvent(severity, message);
266 }
267
268 //
269 // CdsLb
270 //
271
CdsLb(RefCountedPtr<XdsClient> xds_client,Args args)272 CdsLb::CdsLb(RefCountedPtr<XdsClient> xds_client, Args args)
273 : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
274 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
275 gpr_log(GPR_INFO, "[cdslb %p] created -- using xds client %p", this,
276 xds_client_.get());
277 }
278 }
279
~CdsLb()280 CdsLb::~CdsLb() {
281 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
282 gpr_log(GPR_INFO, "[cdslb %p] destroying cds LB policy", this);
283 }
284 }
285
ShutdownLocked()286 void CdsLb::ShutdownLocked() {
287 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
288 gpr_log(GPR_INFO, "[cdslb %p] shutting down", this);
289 }
290 shutting_down_ = true;
291 MaybeDestroyChildPolicyLocked();
292 if (xds_client_ != nullptr) {
293 for (auto& watcher : watchers_) {
294 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
295 gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
296 watcher.first.c_str());
297 }
298 CancelClusterDataWatch(watcher.first, watcher.second.watcher,
299 /*delay_unsubscription=*/false);
300 }
301 watchers_.clear();
302 xds_client_.reset(DEBUG_LOCATION, "CdsLb");
303 }
304 grpc_channel_args_destroy(args_);
305 args_ = nullptr;
306 }
307
MaybeDestroyChildPolicyLocked()308 void CdsLb::MaybeDestroyChildPolicyLocked() {
309 if (child_policy_ != nullptr) {
310 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
311 interested_parties());
312 child_policy_.reset();
313 }
314 }
315
ResetBackoffLocked()316 void CdsLb::ResetBackoffLocked() {
317 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
318 }
319
ExitIdleLocked()320 void CdsLb::ExitIdleLocked() {
321 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
322 }
323
UpdateLocked(UpdateArgs args)324 void CdsLb::UpdateLocked(UpdateArgs args) {
325 // Update config.
326 auto old_config = std::move(config_);
327 config_ = std::move(args.config);
328 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
329 gpr_log(GPR_INFO, "[cdslb %p] received update: cluster=%s", this,
330 config_->cluster().c_str());
331 }
332 // Update args.
333 grpc_channel_args_destroy(args_);
334 args_ = args.args;
335 args.args = nullptr;
336 // If cluster name changed, cancel watcher and restart.
337 if (old_config == nullptr || old_config->cluster() != config_->cluster()) {
338 if (old_config != nullptr) {
339 for (auto& watcher : watchers_) {
340 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
341 gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
342 watcher.first.c_str());
343 }
344 CancelClusterDataWatch(watcher.first, watcher.second.watcher,
345 /*delay_unsubscription=*/true);
346 }
347 watchers_.clear();
348 }
349 auto watcher = absl::make_unique<ClusterWatcher>(Ref(), config_->cluster());
350 watchers_[config_->cluster()].watcher = watcher.get();
351 xds_client_->WatchClusterData(config_->cluster(), std::move(watcher));
352 }
353 }
354
355 // This method will attempt to generate one or multiple entries of discovery
356 // mechanism recursively:
357 // For cluster types EDS or LOGICAL_DNS, one discovery mechanism entry may be
358 // generated cluster name, type and other data from the CdsUpdate inserted into
359 // the entry and the entry appended to the array of entries.
360 // Note, discovery mechanism entry can be generated if an CdsUpdate is
361 // available; otherwise, just return false. For cluster type AGGREGATE,
362 // recursively call the method for each child cluster.
GenerateDiscoveryMechanismForCluster(const std::string & name,Json::Array * discovery_mechanisms,std::set<std::string> * clusters_needed)363 bool CdsLb::GenerateDiscoveryMechanismForCluster(
364 const std::string& name, Json::Array* discovery_mechanisms,
365 std::set<std::string>* clusters_needed) {
366 clusters_needed->insert(name);
367 auto& state = watchers_[name];
368 // Create a new watcher if needed.
369 if (state.watcher == nullptr) {
370 auto watcher = absl::make_unique<ClusterWatcher>(Ref(), name);
371 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
372 gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this,
373 name.c_str());
374 }
375 state.watcher = watcher.get();
376 xds_client_->WatchClusterData(name, std::move(watcher));
377 return false;
378 }
379 // Don't have the update we need yet.
380 if (!state.update.has_value()) return false;
381 // For AGGREGATE clusters, recursively expand to child clusters.
382 if (state.update->cluster_type == XdsApi::CdsUpdate::ClusterType::AGGREGATE) {
383 bool missing_cluster = false;
384 for (const std::string& child_name :
385 state.update->prioritized_cluster_names) {
386 if (!GenerateDiscoveryMechanismForCluster(
387 child_name, discovery_mechanisms, clusters_needed)) {
388 missing_cluster = true;
389 }
390 }
391 return !missing_cluster;
392 }
393 std::string type;
394 switch (state.update->cluster_type) {
395 case XdsApi::CdsUpdate::ClusterType::EDS:
396 type = "EDS";
397 break;
398 case XdsApi::CdsUpdate::ClusterType::LOGICAL_DNS:
399 type = "LOGICAL_DNS";
400 break;
401 default:
402 GPR_ASSERT(0);
403 break;
404 }
405 Json::Object mechanism = {
406 {"clusterName", name},
407 {"max_concurrent_requests", state.update->max_concurrent_requests},
408 {"type", std::move(type)},
409 };
410 if (!state.update->eds_service_name.empty()) {
411 mechanism["edsServiceName"] = state.update->eds_service_name;
412 }
413 if (state.update->lrs_load_reporting_server_name.has_value()) {
414 mechanism["lrsLoadReportingServerName"] =
415 state.update->lrs_load_reporting_server_name.value();
416 }
417 discovery_mechanisms->emplace_back(std::move(mechanism));
418 return true;
419 }
420
OnClusterChanged(const std::string & name,XdsApi::CdsUpdate cluster_data)421 void CdsLb::OnClusterChanged(const std::string& name,
422 XdsApi::CdsUpdate cluster_data) {
423 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
424 gpr_log(
425 GPR_INFO,
426 "[cdslb %p] received CDS update for cluster %s from xds client %p: %s",
427 this, name.c_str(), xds_client_.get(), cluster_data.ToString().c_str());
428 }
429 // Store the update in the map if we are still interested in watching this
430 // cluster (i.e., it is not cancelled already).
431 // If we've already deleted this entry, then this is an update notification
432 // that was scheduled before the deletion, so we can just ignore it.
433 auto it = watchers_.find(name);
434 if (it == watchers_.end()) return;
435 it->second.update = cluster_data;
436 // Take care of integration with new certificate code.
437 grpc_error* error = GRPC_ERROR_NONE;
438 error = UpdateXdsCertificateProvider(name, it->second.update.value());
439 if (error != GRPC_ERROR_NONE) {
440 return OnError(name, error);
441 }
442 // Scan the map starting from the root cluster to generate the list of
443 // discovery mechanisms. If we don't have some of the data we need (i.e., we
444 // just started up and not all watchers have returned data yet), then don't
445 // update the child policy at all.
446 Json::Array discovery_mechanisms;
447 std::set<std::string> clusters_needed;
448 if (GenerateDiscoveryMechanismForCluster(
449 config_->cluster(), &discovery_mechanisms, &clusters_needed)) {
450 // Construct config for child policy.
451 Json::Object xds_lb_policy;
452 if (cluster_data.lb_policy == "RING_HASH") {
453 std::string hash_function;
454 switch (cluster_data.hash_function) {
455 case XdsApi::CdsUpdate::HashFunction::XX_HASH:
456 hash_function = "XX_HASH";
457 break;
458 case XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2:
459 hash_function = "MURMUR_HASH_2";
460 break;
461 default:
462 GPR_ASSERT(0);
463 break;
464 }
465 xds_lb_policy["RING_HASH"] = Json::Object{
466 {"min_ring_size", cluster_data.min_ring_size},
467 {"max_ring_size", cluster_data.max_ring_size},
468 {"hash_function", hash_function},
469 };
470 } else {
471 xds_lb_policy["ROUND_ROBIN"] = Json::Object();
472 }
473 Json::Object child_config = {
474 {"xdsLbPolicy",
475 Json::Array{
476 xds_lb_policy,
477 }},
478 {"discoveryMechanisms", std::move(discovery_mechanisms)},
479 };
480 Json json = Json::Array{
481 Json::Object{
482 {"xds_cluster_resolver_experimental", std::move(child_config)},
483 },
484 };
485 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
486 std::string json_str = json.Dump(/*indent=*/1);
487 gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s",
488 this, json_str.c_str());
489 }
490 RefCountedPtr<LoadBalancingPolicy::Config> config =
491 LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
492 if (error != GRPC_ERROR_NONE) {
493 OnError(name, error);
494 return;
495 }
496 // Create child policy if not already present.
497 if (child_policy_ == nullptr) {
498 LoadBalancingPolicy::Args args;
499 args.work_serializer = work_serializer();
500 args.args = args_;
501 args.channel_control_helper = absl::make_unique<Helper>(Ref());
502 child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
503 config->name(), std::move(args));
504 if (child_policy_ == nullptr) {
505 OnError(name, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
506 "failed to create child policy"));
507 return;
508 }
509 grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
510 interested_parties());
511 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
512 gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this,
513 config->name(), child_policy_.get());
514 }
515 }
516 // Update child policy.
517 UpdateArgs args;
518 args.config = std::move(config);
519 if (xds_certificate_provider_ != nullptr) {
520 grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg();
521 args.args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1);
522 } else {
523 args.args = grpc_channel_args_copy(args_);
524 }
525 child_policy_->UpdateLocked(std::move(args));
526 }
527 // Remove entries in watchers_ for any clusters not in clusters_needed
528 for (auto it = watchers_.begin(); it != watchers_.end();) {
529 const std::string& cluster_name = it->first;
530 if (clusters_needed.find(cluster_name) != clusters_needed.end()) {
531 ++it;
532 continue;
533 }
534 if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
535 gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
536 cluster_name.c_str());
537 }
538 CancelClusterDataWatch(cluster_name, it->second.watcher,
539 /*delay_unsubscription=*/false);
540 it = watchers_.erase(it);
541 }
542 }
543
OnError(const std::string & name,grpc_error * error)544 void CdsLb::OnError(const std::string& name, grpc_error* error) {
545 gpr_log(GPR_ERROR, "[cdslb %p] xds error obtaining data for cluster %s: %s",
546 this, name.c_str(), grpc_error_string(error));
547 // Go into TRANSIENT_FAILURE if we have not yet created the child
548 // policy (i.e., we have not yet received data from xds). Otherwise,
549 // we keep running with the data we had previously.
550 if (child_policy_ == nullptr) {
551 channel_control_helper()->UpdateState(
552 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
553 absl::make_unique<TransientFailurePicker>(error));
554 } else {
555 GRPC_ERROR_UNREF(error);
556 }
557 }
558
OnResourceDoesNotExist(const std::string & name)559 void CdsLb::OnResourceDoesNotExist(const std::string& name) {
560 gpr_log(GPR_ERROR,
561 "[cdslb %p] CDS resource for %s does not exist -- reporting "
562 "TRANSIENT_FAILURE",
563 this, name.c_str());
564 grpc_error* error =
565 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
566 absl::StrCat("CDS resource \"", config_->cluster(),
567 "\" does not exist")
568 .c_str()),
569 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
570 channel_control_helper()->UpdateState(
571 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
572 absl::make_unique<TransientFailurePicker>(error));
573 MaybeDestroyChildPolicyLocked();
574 }
575
UpdateXdsCertificateProvider(const std::string & cluster_name,const XdsApi::CdsUpdate & cluster_data)576 grpc_error* CdsLb::UpdateXdsCertificateProvider(
577 const std::string& cluster_name, const XdsApi::CdsUpdate& cluster_data) {
578 // Early out if channel is not configured to use xds security.
579 grpc_channel_credentials* channel_credentials =
580 grpc_channel_credentials_find_in_args(args_);
581 if (channel_credentials == nullptr ||
582 channel_credentials->type() != kCredentialsTypeXds) {
583 xds_certificate_provider_ = nullptr;
584 return GRPC_ERROR_NONE;
585 }
586 if (xds_certificate_provider_ == nullptr) {
587 xds_certificate_provider_ = MakeRefCounted<XdsCertificateProvider>();
588 }
589 // Configure root cert.
590 absl::string_view root_provider_instance_name =
591 cluster_data.common_tls_context.combined_validation_context
592 .validation_context_certificate_provider_instance.instance_name;
593 absl::string_view root_provider_cert_name =
594 cluster_data.common_tls_context.combined_validation_context
595 .validation_context_certificate_provider_instance.certificate_name;
596 RefCountedPtr<XdsCertificateProvider> new_root_provider;
597 if (!root_provider_instance_name.empty()) {
598 new_root_provider =
599 xds_client_->certificate_provider_store()
600 .CreateOrGetCertificateProvider(root_provider_instance_name);
601 if (new_root_provider == nullptr) {
602 return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
603 absl::StrCat("Certificate provider instance name: \"",
604 root_provider_instance_name, "\" not recognized.")
605 .c_str());
606 }
607 }
608 if (root_certificate_provider_ != new_root_provider) {
609 if (root_certificate_provider_ != nullptr &&
610 root_certificate_provider_->interested_parties() != nullptr) {
611 grpc_pollset_set_del_pollset_set(
612 interested_parties(),
613 root_certificate_provider_->interested_parties());
614 }
615 if (new_root_provider != nullptr &&
616 new_root_provider->interested_parties() != nullptr) {
617 grpc_pollset_set_add_pollset_set(interested_parties(),
618 new_root_provider->interested_parties());
619 }
620 root_certificate_provider_ = std::move(new_root_provider);
621 }
622 xds_certificate_provider_->UpdateRootCertNameAndDistributor(
623 cluster_name, root_provider_cert_name,
624 root_certificate_provider_ == nullptr
625 ? nullptr
626 : root_certificate_provider_->distributor());
627 // Configure identity cert.
628 absl::string_view identity_provider_instance_name =
629 cluster_data.common_tls_context
630 .tls_certificate_certificate_provider_instance.instance_name;
631 absl::string_view identity_provider_cert_name =
632 cluster_data.common_tls_context
633 .tls_certificate_certificate_provider_instance.certificate_name;
634 RefCountedPtr<XdsCertificateProvider> new_identity_provider;
635 if (!identity_provider_instance_name.empty()) {
636 new_identity_provider =
637 xds_client_->certificate_provider_store()
638 .CreateOrGetCertificateProvider(identity_provider_instance_name);
639 if (new_identity_provider == nullptr) {
640 return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
641 absl::StrCat("Certificate provider instance name: \"",
642 identity_provider_instance_name, "\" not recognized.")
643 .c_str());
644 }
645 }
646 if (identity_certificate_provider_ != new_identity_provider) {
647 if (identity_certificate_provider_ != nullptr &&
648 identity_certificate_provider_->interested_parties() != nullptr) {
649 grpc_pollset_set_del_pollset_set(
650 interested_parties(),
651 identity_certificate_provider_->interested_parties());
652 }
653 if (new_identity_provider != nullptr &&
654 new_identity_provider->interested_parties() != nullptr) {
655 grpc_pollset_set_add_pollset_set(
656 interested_parties(), new_identity_provider->interested_parties());
657 }
658 identity_certificate_provider_ = std::move(new_identity_provider);
659 }
660 xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(
661 cluster_name, identity_provider_cert_name,
662 identity_certificate_provider_ == nullptr
663 ? nullptr
664 : identity_certificate_provider_->distributor());
665 // Configure SAN matchers.
666 const std::vector<StringMatcher>& match_subject_alt_names =
667 cluster_data.common_tls_context.combined_validation_context
668 .default_validation_context.match_subject_alt_names;
669 xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(
670 cluster_name, match_subject_alt_names);
671 return GRPC_ERROR_NONE;
672 }
673
CancelClusterDataWatch(absl::string_view cluster_name,XdsClient::ClusterWatcherInterface * watcher,bool delay_unsubscription)674 void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name,
675 XdsClient::ClusterWatcherInterface* watcher,
676 bool delay_unsubscription) {
677 if (xds_certificate_provider_ != nullptr) {
678 std::string name(cluster_name);
679 xds_certificate_provider_->UpdateRootCertNameAndDistributor(name, "",
680 nullptr);
681 xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(name, "",
682 nullptr);
683 xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(name, {});
684 }
685 xds_client_->CancelClusterDataWatch(cluster_name, watcher,
686 delay_unsubscription);
687 }
688 //
689 // factory
690 //
691
692 class CdsLbFactory : public LoadBalancingPolicyFactory {
693 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const694 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
695 LoadBalancingPolicy::Args args) const override {
696 grpc_error* error = GRPC_ERROR_NONE;
697 RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
698 if (error != GRPC_ERROR_NONE) {
699 gpr_log(GPR_ERROR,
700 "cannot get XdsClient to instantiate cds LB policy: %s",
701 grpc_error_string(error));
702 GRPC_ERROR_UNREF(error);
703 return nullptr;
704 }
705 return MakeOrphanable<CdsLb>(std::move(xds_client), std::move(args));
706 }
707
name() const708 const char* name() const override { return kCds; }
709
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const710 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
711 const Json& json, grpc_error** error) const override {
712 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
713 if (json.type() == Json::Type::JSON_NULL) {
714 // xds was mentioned as a policy in the deprecated loadBalancingPolicy
715 // field or in the client API.
716 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
717 "field:loadBalancingPolicy error:cds policy requires configuration. "
718 "Please use loadBalancingConfig field of service config instead.");
719 return nullptr;
720 }
721 std::vector<grpc_error*> error_list;
722 // cluster name.
723 std::string cluster;
724 auto it = json.object_value().find("cluster");
725 if (it == json.object_value().end()) {
726 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
727 "required field 'cluster' not present"));
728 } else if (it->second.type() != Json::Type::STRING) {
729 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
730 "field:cluster error:type should be string"));
731 } else {
732 cluster = it->second.string_value();
733 }
734 if (!error_list.empty()) {
735 *error = GRPC_ERROR_CREATE_FROM_VECTOR("Cds Parser", &error_list);
736 return nullptr;
737 }
738 return MakeRefCounted<CdsLbConfig>(std::move(cluster));
739 }
740 };
741
742 } // namespace
743
744 } // namespace grpc_core
745
746 //
747 // Plugin registration
748 //
749
grpc_lb_policy_cds_init()750 void grpc_lb_policy_cds_init() {
751 grpc_core::LoadBalancingPolicyRegistry::Builder::
752 RegisterLoadBalancingPolicyFactory(
753 absl::make_unique<grpc_core::CdsLbFactory>());
754 }
755
grpc_lb_policy_cds_shutdown()756 void grpc_lb_policy_cds_shutdown() {}
757