• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <string.h>
20 
21 #include "absl/strings/str_cat.h"
22 
23 #include "src/core/ext/filters/client_channel/lb_policy.h"
24 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
25 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
26 #include "src/core/ext/filters/client_channel/service_config.h"
27 #include "src/core/ext/xds/xds_certificate_provider.h"
28 #include "src/core/ext/xds/xds_client.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/gprpp/memory.h"
31 #include "src/core/lib/gprpp/orphanable.h"
32 #include "src/core/lib/gprpp/ref_counted_ptr.h"
33 #include "src/core/lib/iomgr/closure.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35 #include "src/core/lib/security/credentials/xds/xds_credentials.h"
36 #include "src/core/lib/transport/error_utils.h"
37 
38 namespace grpc_core {
39 
40 TraceFlag grpc_cds_lb_trace(false, "cds_lb");
41 
42 namespace {
43 
44 constexpr char kCds[] = "cds_experimental";
45 
46 // Config for this LB policy.
47 class CdsLbConfig : public LoadBalancingPolicy::Config {
48  public:
CdsLbConfig(std::string cluster)49   explicit CdsLbConfig(std::string cluster) : cluster_(std::move(cluster)) {}
cluster() const50   const std::string& cluster() const { return cluster_; }
name() const51   const char* name() const override { return kCds; }
52 
53  private:
54   std::string cluster_;
55 };
56 
57 // CDS LB policy.
58 class CdsLb : public LoadBalancingPolicy {
59  public:
60   CdsLb(RefCountedPtr<XdsClient> xds_client, Args args);
61 
name() const62   const char* name() const override { return kCds; }
63 
64   void UpdateLocked(UpdateArgs args) override;
65   void ResetBackoffLocked() override;
66   void ExitIdleLocked() override;
67 
68  private:
69   // Watcher for getting cluster data from XdsClient.
70   class ClusterWatcher : public XdsClient::ClusterWatcherInterface {
71    public:
ClusterWatcher(RefCountedPtr<CdsLb> parent,std::string name)72     ClusterWatcher(RefCountedPtr<CdsLb> parent, std::string name)
73         : parent_(std::move(parent)), name_(std::move(name)) {}
74 
OnClusterChanged(XdsApi::CdsUpdate cluster_data)75     void OnClusterChanged(XdsApi::CdsUpdate cluster_data) override {
76       new Notifier(parent_, name_, std::move(cluster_data));
77     }
OnError(grpc_error * error)78     void OnError(grpc_error* error) override {
79       new Notifier(parent_, name_, error);
80     }
OnResourceDoesNotExist()81     void OnResourceDoesNotExist() override { new Notifier(parent_, name_); }
82 
83    private:
84     class Notifier {
85      public:
86       Notifier(RefCountedPtr<CdsLb> parent, std::string name,
87                XdsApi::CdsUpdate update);
88       Notifier(RefCountedPtr<CdsLb> parent, std::string name,
89                grpc_error* error);
90       explicit Notifier(RefCountedPtr<CdsLb> parent, std::string name);
91 
92      private:
93       enum Type { kUpdate, kError, kDoesNotExist };
94 
95       static void RunInExecCtx(void* arg, grpc_error* error);
96       void RunInWorkSerializer(grpc_error* error);
97 
98       RefCountedPtr<CdsLb> parent_;
99       std::string name_;
100       grpc_closure closure_;
101       XdsApi::CdsUpdate update_;
102       Type type_;
103     };
104 
105     RefCountedPtr<CdsLb> parent_;
106     std::string name_;
107   };
108 
109   struct WatcherState {
110     // Pointer to watcher, to be used when cancelling.
111     // Not owned, so do not dereference.
112     ClusterWatcher* watcher = nullptr;
113     // Most recent update obtained from this watcher.
114     absl::optional<XdsApi::CdsUpdate> update;
115   };
116 
117   // Delegating helper to be passed to child policy.
118   class Helper : public ChannelControlHelper {
119    public:
Helper(RefCountedPtr<CdsLb> parent)120     explicit Helper(RefCountedPtr<CdsLb> parent) : parent_(std::move(parent)) {}
121     RefCountedPtr<SubchannelInterface> CreateSubchannel(
122         ServerAddress address, const grpc_channel_args& args) override;
123     void UpdateState(grpc_connectivity_state state, const absl::Status& status,
124                      std::unique_ptr<SubchannelPicker> picker) override;
125     void RequestReresolution() override;
126     void AddTraceEvent(TraceSeverity severity,
127                        absl::string_view message) override;
128 
129    private:
130     RefCountedPtr<CdsLb> parent_;
131   };
132 
133   ~CdsLb() override;
134 
135   void ShutdownLocked() override;
136 
137   bool GenerateDiscoveryMechanismForCluster(
138       const std::string& name, Json::Array* discovery_mechanisms,
139       std::set<std::string>* clusters_needed);
140   void OnClusterChanged(const std::string& name,
141                         XdsApi::CdsUpdate cluster_data);
142   void OnError(const std::string& name, grpc_error* error);
143   void OnResourceDoesNotExist(const std::string& name);
144 
145   grpc_error* UpdateXdsCertificateProvider(
146       const std::string& cluster_name, const XdsApi::CdsUpdate& cluster_data);
147 
148   void CancelClusterDataWatch(absl::string_view cluster_name,
149                               XdsClient::ClusterWatcherInterface* watcher,
150                               bool delay_unsubscription = false);
151 
152   void MaybeDestroyChildPolicyLocked();
153 
154   RefCountedPtr<CdsLbConfig> config_;
155 
156   // Current channel args from the resolver.
157   const grpc_channel_args* args_ = nullptr;
158 
159   // The xds client.
160   RefCountedPtr<XdsClient> xds_client_;
161 
162   // Maps from cluster name to the state for that cluster.
163   // The root of the tree is config_->cluster().
164   std::map<std::string, WatcherState> watchers_;
165 
166   RefCountedPtr<grpc_tls_certificate_provider> root_certificate_provider_;
167   RefCountedPtr<grpc_tls_certificate_provider> identity_certificate_provider_;
168   RefCountedPtr<XdsCertificateProvider> xds_certificate_provider_;
169 
170   // Child LB policy.
171   OrphanablePtr<LoadBalancingPolicy> child_policy_;
172 
173   // Internal state.
174   bool shutting_down_ = false;
175 };
176 
177 //
178 // CdsLb::ClusterWatcher::Notifier
179 //
180 
Notifier(RefCountedPtr<CdsLb> parent,std::string name,XdsApi::CdsUpdate update)181 CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
182                                           std::string name,
183                                           XdsApi::CdsUpdate update)
184     : parent_(std::move(parent)),
185       name_(std::move(name)),
186       update_(std::move(update)),
187       type_(kUpdate) {
188   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
189   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
190 }
191 
Notifier(RefCountedPtr<CdsLb> parent,std::string name,grpc_error * error)192 CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
193                                           std::string name, grpc_error* error)
194     : parent_(std::move(parent)), name_(std::move(name)), type_(kError) {
195   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
196   ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
197 }
198 
Notifier(RefCountedPtr<CdsLb> parent,std::string name)199 CdsLb::ClusterWatcher::Notifier::Notifier(RefCountedPtr<CdsLb> parent,
200                                           std::string name)
201     : parent_(std::move(parent)), name_(std::move(name)), type_(kDoesNotExist) {
202   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
203   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
204 }
205 
RunInExecCtx(void * arg,grpc_error * error)206 void CdsLb::ClusterWatcher::Notifier::RunInExecCtx(void* arg,
207                                                    grpc_error* error) {
208   Notifier* self = static_cast<Notifier*>(arg);
209   GRPC_ERROR_REF(error);
210   self->parent_->work_serializer()->Run(
211       [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
212 }
213 
RunInWorkSerializer(grpc_error * error)214 void CdsLb::ClusterWatcher::Notifier::RunInWorkSerializer(grpc_error* error) {
215   switch (type_) {
216     case kUpdate:
217       parent_->OnClusterChanged(name_, std::move(update_));
218       break;
219     case kError:
220       parent_->OnError(name_, error);
221       break;
222     case kDoesNotExist:
223       parent_->OnResourceDoesNotExist(name_);
224       break;
225   };
226   delete this;
227 }
228 
229 //
230 // CdsLb::Helper
231 //
232 
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)233 RefCountedPtr<SubchannelInterface> CdsLb::Helper::CreateSubchannel(
234     ServerAddress address, const grpc_channel_args& args) {
235   if (parent_->shutting_down_) return nullptr;
236   return parent_->channel_control_helper()->CreateSubchannel(std::move(address),
237                                                              args);
238 }
239 
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)240 void CdsLb::Helper::UpdateState(grpc_connectivity_state state,
241                                 const absl::Status& status,
242                                 std::unique_ptr<SubchannelPicker> picker) {
243   if (parent_->shutting_down_ || parent_->child_policy_ == nullptr) return;
244   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
245     gpr_log(GPR_INFO,
246             "[cdslb %p] state updated by child: %s message_state: (%s)", this,
247             ConnectivityStateName(state), status.ToString().c_str());
248   }
249   parent_->channel_control_helper()->UpdateState(state, status,
250                                                  std::move(picker));
251 }
252 
RequestReresolution()253 void CdsLb::Helper::RequestReresolution() {
254   if (parent_->shutting_down_) return;
255   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
256     gpr_log(GPR_INFO, "[cdslb %p] Re-resolution requested from child policy.",
257             parent_.get());
258   }
259   parent_->channel_control_helper()->RequestReresolution();
260 }
261 
AddTraceEvent(TraceSeverity severity,absl::string_view message)262 void CdsLb::Helper::AddTraceEvent(TraceSeverity severity,
263                                   absl::string_view message) {
264   if (parent_->shutting_down_) return;
265   parent_->channel_control_helper()->AddTraceEvent(severity, message);
266 }
267 
268 //
269 // CdsLb
270 //
271 
CdsLb(RefCountedPtr<XdsClient> xds_client,Args args)272 CdsLb::CdsLb(RefCountedPtr<XdsClient> xds_client, Args args)
273     : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
274   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
275     gpr_log(GPR_INFO, "[cdslb %p] created -- using xds client %p", this,
276             xds_client_.get());
277   }
278 }
279 
~CdsLb()280 CdsLb::~CdsLb() {
281   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
282     gpr_log(GPR_INFO, "[cdslb %p] destroying cds LB policy", this);
283   }
284 }
285 
ShutdownLocked()286 void CdsLb::ShutdownLocked() {
287   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
288     gpr_log(GPR_INFO, "[cdslb %p] shutting down", this);
289   }
290   shutting_down_ = true;
291   MaybeDestroyChildPolicyLocked();
292   if (xds_client_ != nullptr) {
293     for (auto& watcher : watchers_) {
294       if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
295         gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
296                 watcher.first.c_str());
297       }
298       CancelClusterDataWatch(watcher.first, watcher.second.watcher,
299                              /*delay_unsubscription=*/false);
300     }
301     watchers_.clear();
302     xds_client_.reset(DEBUG_LOCATION, "CdsLb");
303   }
304   grpc_channel_args_destroy(args_);
305   args_ = nullptr;
306 }
307 
MaybeDestroyChildPolicyLocked()308 void CdsLb::MaybeDestroyChildPolicyLocked() {
309   if (child_policy_ != nullptr) {
310     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
311                                      interested_parties());
312     child_policy_.reset();
313   }
314 }
315 
ResetBackoffLocked()316 void CdsLb::ResetBackoffLocked() {
317   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
318 }
319 
ExitIdleLocked()320 void CdsLb::ExitIdleLocked() {
321   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
322 }
323 
UpdateLocked(UpdateArgs args)324 void CdsLb::UpdateLocked(UpdateArgs args) {
325   // Update config.
326   auto old_config = std::move(config_);
327   config_ = std::move(args.config);
328   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
329     gpr_log(GPR_INFO, "[cdslb %p] received update: cluster=%s", this,
330             config_->cluster().c_str());
331   }
332   // Update args.
333   grpc_channel_args_destroy(args_);
334   args_ = args.args;
335   args.args = nullptr;
336   // If cluster name changed, cancel watcher and restart.
337   if (old_config == nullptr || old_config->cluster() != config_->cluster()) {
338     if (old_config != nullptr) {
339       for (auto& watcher : watchers_) {
340         if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
341           gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
342                   watcher.first.c_str());
343         }
344         CancelClusterDataWatch(watcher.first, watcher.second.watcher,
345                                /*delay_unsubscription=*/true);
346       }
347       watchers_.clear();
348     }
349     auto watcher = absl::make_unique<ClusterWatcher>(Ref(), config_->cluster());
350     watchers_[config_->cluster()].watcher = watcher.get();
351     xds_client_->WatchClusterData(config_->cluster(), std::move(watcher));
352   }
353 }
354 
355 // This method will attempt to generate one or multiple entries of discovery
356 // mechanism recursively:
357 // For cluster types EDS or LOGICAL_DNS, one discovery mechanism entry may be
358 // generated cluster name, type and other data from the CdsUpdate inserted into
359 // the entry and the entry appended to the array of entries.
360 // Note, discovery mechanism entry can be generated if an CdsUpdate is
361 // available; otherwise, just return false. For cluster type AGGREGATE,
362 // recursively call the method for each child cluster.
GenerateDiscoveryMechanismForCluster(const std::string & name,Json::Array * discovery_mechanisms,std::set<std::string> * clusters_needed)363 bool CdsLb::GenerateDiscoveryMechanismForCluster(
364     const std::string& name, Json::Array* discovery_mechanisms,
365     std::set<std::string>* clusters_needed) {
366   clusters_needed->insert(name);
367   auto& state = watchers_[name];
368   // Create a new watcher if needed.
369   if (state.watcher == nullptr) {
370     auto watcher = absl::make_unique<ClusterWatcher>(Ref(), name);
371     if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
372       gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this,
373               name.c_str());
374     }
375     state.watcher = watcher.get();
376     xds_client_->WatchClusterData(name, std::move(watcher));
377     return false;
378   }
379   // Don't have the update we need yet.
380   if (!state.update.has_value()) return false;
381   // For AGGREGATE clusters, recursively expand to child clusters.
382   if (state.update->cluster_type == XdsApi::CdsUpdate::ClusterType::AGGREGATE) {
383     bool missing_cluster = false;
384     for (const std::string& child_name :
385          state.update->prioritized_cluster_names) {
386       if (!GenerateDiscoveryMechanismForCluster(
387               child_name, discovery_mechanisms, clusters_needed)) {
388         missing_cluster = true;
389       }
390     }
391     return !missing_cluster;
392   }
393   std::string type;
394   switch (state.update->cluster_type) {
395     case XdsApi::CdsUpdate::ClusterType::EDS:
396       type = "EDS";
397       break;
398     case XdsApi::CdsUpdate::ClusterType::LOGICAL_DNS:
399       type = "LOGICAL_DNS";
400       break;
401     default:
402       GPR_ASSERT(0);
403       break;
404   }
405   Json::Object mechanism = {
406       {"clusterName", name},
407       {"max_concurrent_requests", state.update->max_concurrent_requests},
408       {"type", std::move(type)},
409   };
410   if (!state.update->eds_service_name.empty()) {
411     mechanism["edsServiceName"] = state.update->eds_service_name;
412   }
413   if (state.update->lrs_load_reporting_server_name.has_value()) {
414     mechanism["lrsLoadReportingServerName"] =
415         state.update->lrs_load_reporting_server_name.value();
416   }
417   discovery_mechanisms->emplace_back(std::move(mechanism));
418   return true;
419 }
420 
OnClusterChanged(const std::string & name,XdsApi::CdsUpdate cluster_data)421 void CdsLb::OnClusterChanged(const std::string& name,
422                              XdsApi::CdsUpdate cluster_data) {
423   if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
424     gpr_log(
425         GPR_INFO,
426         "[cdslb %p] received CDS update for cluster %s from xds client %p: %s",
427         this, name.c_str(), xds_client_.get(), cluster_data.ToString().c_str());
428   }
429   // Store the update in the map if we are still interested in watching this
430   // cluster (i.e., it is not cancelled already).
431   // If we've already deleted this entry, then this is an update notification
432   // that was scheduled before the deletion, so we can just ignore it.
433   auto it = watchers_.find(name);
434   if (it == watchers_.end()) return;
435   it->second.update = cluster_data;
436   // Take care of integration with new certificate code.
437   grpc_error* error = GRPC_ERROR_NONE;
438   error = UpdateXdsCertificateProvider(name, it->second.update.value());
439   if (error != GRPC_ERROR_NONE) {
440     return OnError(name, error);
441   }
442   // Scan the map starting from the root cluster to generate the list of
443   // discovery mechanisms. If we don't have some of the data we need (i.e., we
444   // just started up and not all watchers have returned data yet), then don't
445   // update the child policy at all.
446   Json::Array discovery_mechanisms;
447   std::set<std::string> clusters_needed;
448   if (GenerateDiscoveryMechanismForCluster(
449           config_->cluster(), &discovery_mechanisms, &clusters_needed)) {
450     // Construct config for child policy.
451     Json::Object xds_lb_policy;
452     if (cluster_data.lb_policy == "RING_HASH") {
453       std::string hash_function;
454       switch (cluster_data.hash_function) {
455         case XdsApi::CdsUpdate::HashFunction::XX_HASH:
456           hash_function = "XX_HASH";
457           break;
458         case XdsApi::CdsUpdate::HashFunction::MURMUR_HASH_2:
459           hash_function = "MURMUR_HASH_2";
460           break;
461         default:
462           GPR_ASSERT(0);
463           break;
464       }
465       xds_lb_policy["RING_HASH"] = Json::Object{
466           {"min_ring_size", cluster_data.min_ring_size},
467           {"max_ring_size", cluster_data.max_ring_size},
468           {"hash_function", hash_function},
469       };
470     } else {
471       xds_lb_policy["ROUND_ROBIN"] = Json::Object();
472     }
473     Json::Object child_config = {
474         {"xdsLbPolicy",
475          Json::Array{
476              xds_lb_policy,
477          }},
478         {"discoveryMechanisms", std::move(discovery_mechanisms)},
479     };
480     Json json = Json::Array{
481         Json::Object{
482             {"xds_cluster_resolver_experimental", std::move(child_config)},
483         },
484     };
485     if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
486       std::string json_str = json.Dump(/*indent=*/1);
487       gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s",
488               this, json_str.c_str());
489     }
490     RefCountedPtr<LoadBalancingPolicy::Config> config =
491         LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
492     if (error != GRPC_ERROR_NONE) {
493       OnError(name, error);
494       return;
495     }
496     // Create child policy if not already present.
497     if (child_policy_ == nullptr) {
498       LoadBalancingPolicy::Args args;
499       args.work_serializer = work_serializer();
500       args.args = args_;
501       args.channel_control_helper = absl::make_unique<Helper>(Ref());
502       child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
503           config->name(), std::move(args));
504       if (child_policy_ == nullptr) {
505         OnError(name, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
506                           "failed to create child policy"));
507         return;
508       }
509       grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
510                                        interested_parties());
511       if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
512         gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this,
513                 config->name(), child_policy_.get());
514       }
515     }
516     // Update child policy.
517     UpdateArgs args;
518     args.config = std::move(config);
519     if (xds_certificate_provider_ != nullptr) {
520       grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg();
521       args.args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1);
522     } else {
523       args.args = grpc_channel_args_copy(args_);
524     }
525     child_policy_->UpdateLocked(std::move(args));
526   }
527   // Remove entries in watchers_ for any clusters not in clusters_needed
528   for (auto it = watchers_.begin(); it != watchers_.end();) {
529     const std::string& cluster_name = it->first;
530     if (clusters_needed.find(cluster_name) != clusters_needed.end()) {
531       ++it;
532       continue;
533     }
534     if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) {
535       gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this,
536               cluster_name.c_str());
537     }
538     CancelClusterDataWatch(cluster_name, it->second.watcher,
539                            /*delay_unsubscription=*/false);
540     it = watchers_.erase(it);
541   }
542 }
543 
OnError(const std::string & name,grpc_error * error)544 void CdsLb::OnError(const std::string& name, grpc_error* error) {
545   gpr_log(GPR_ERROR, "[cdslb %p] xds error obtaining data for cluster %s: %s",
546           this, name.c_str(), grpc_error_string(error));
547   // Go into TRANSIENT_FAILURE if we have not yet created the child
548   // policy (i.e., we have not yet received data from xds).  Otherwise,
549   // we keep running with the data we had previously.
550   if (child_policy_ == nullptr) {
551     channel_control_helper()->UpdateState(
552         GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
553         absl::make_unique<TransientFailurePicker>(error));
554   } else {
555     GRPC_ERROR_UNREF(error);
556   }
557 }
558 
OnResourceDoesNotExist(const std::string & name)559 void CdsLb::OnResourceDoesNotExist(const std::string& name) {
560   gpr_log(GPR_ERROR,
561           "[cdslb %p] CDS resource for %s does not exist -- reporting "
562           "TRANSIENT_FAILURE",
563           this, name.c_str());
564   grpc_error* error =
565       grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
566                              absl::StrCat("CDS resource \"", config_->cluster(),
567                                           "\" does not exist")
568                                  .c_str()),
569                          GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
570   channel_control_helper()->UpdateState(
571       GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
572       absl::make_unique<TransientFailurePicker>(error));
573   MaybeDestroyChildPolicyLocked();
574 }
575 
UpdateXdsCertificateProvider(const std::string & cluster_name,const XdsApi::CdsUpdate & cluster_data)576 grpc_error* CdsLb::UpdateXdsCertificateProvider(
577     const std::string& cluster_name, const XdsApi::CdsUpdate& cluster_data) {
578   // Early out if channel is not configured to use xds security.
579   grpc_channel_credentials* channel_credentials =
580       grpc_channel_credentials_find_in_args(args_);
581   if (channel_credentials == nullptr ||
582       channel_credentials->type() != kCredentialsTypeXds) {
583     xds_certificate_provider_ = nullptr;
584     return GRPC_ERROR_NONE;
585   }
586   if (xds_certificate_provider_ == nullptr) {
587     xds_certificate_provider_ = MakeRefCounted<XdsCertificateProvider>();
588   }
589   // Configure root cert.
590   absl::string_view root_provider_instance_name =
591       cluster_data.common_tls_context.combined_validation_context
592           .validation_context_certificate_provider_instance.instance_name;
593   absl::string_view root_provider_cert_name =
594       cluster_data.common_tls_context.combined_validation_context
595           .validation_context_certificate_provider_instance.certificate_name;
596   RefCountedPtr<XdsCertificateProvider> new_root_provider;
597   if (!root_provider_instance_name.empty()) {
598     new_root_provider =
599         xds_client_->certificate_provider_store()
600             .CreateOrGetCertificateProvider(root_provider_instance_name);
601     if (new_root_provider == nullptr) {
602       return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
603           absl::StrCat("Certificate provider instance name: \"",
604                        root_provider_instance_name, "\" not recognized.")
605               .c_str());
606     }
607   }
608   if (root_certificate_provider_ != new_root_provider) {
609     if (root_certificate_provider_ != nullptr &&
610         root_certificate_provider_->interested_parties() != nullptr) {
611       grpc_pollset_set_del_pollset_set(
612           interested_parties(),
613           root_certificate_provider_->interested_parties());
614     }
615     if (new_root_provider != nullptr &&
616         new_root_provider->interested_parties() != nullptr) {
617       grpc_pollset_set_add_pollset_set(interested_parties(),
618                                        new_root_provider->interested_parties());
619     }
620     root_certificate_provider_ = std::move(new_root_provider);
621   }
622   xds_certificate_provider_->UpdateRootCertNameAndDistributor(
623       cluster_name, root_provider_cert_name,
624       root_certificate_provider_ == nullptr
625           ? nullptr
626           : root_certificate_provider_->distributor());
627   // Configure identity cert.
628   absl::string_view identity_provider_instance_name =
629       cluster_data.common_tls_context
630           .tls_certificate_certificate_provider_instance.instance_name;
631   absl::string_view identity_provider_cert_name =
632       cluster_data.common_tls_context
633           .tls_certificate_certificate_provider_instance.certificate_name;
634   RefCountedPtr<XdsCertificateProvider> new_identity_provider;
635   if (!identity_provider_instance_name.empty()) {
636     new_identity_provider =
637         xds_client_->certificate_provider_store()
638             .CreateOrGetCertificateProvider(identity_provider_instance_name);
639     if (new_identity_provider == nullptr) {
640       return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
641           absl::StrCat("Certificate provider instance name: \"",
642                        identity_provider_instance_name, "\" not recognized.")
643               .c_str());
644     }
645   }
646   if (identity_certificate_provider_ != new_identity_provider) {
647     if (identity_certificate_provider_ != nullptr &&
648         identity_certificate_provider_->interested_parties() != nullptr) {
649       grpc_pollset_set_del_pollset_set(
650           interested_parties(),
651           identity_certificate_provider_->interested_parties());
652     }
653     if (new_identity_provider != nullptr &&
654         new_identity_provider->interested_parties() != nullptr) {
655       grpc_pollset_set_add_pollset_set(
656           interested_parties(), new_identity_provider->interested_parties());
657     }
658     identity_certificate_provider_ = std::move(new_identity_provider);
659   }
660   xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(
661       cluster_name, identity_provider_cert_name,
662       identity_certificate_provider_ == nullptr
663           ? nullptr
664           : identity_certificate_provider_->distributor());
665   // Configure SAN matchers.
666   const std::vector<StringMatcher>& match_subject_alt_names =
667       cluster_data.common_tls_context.combined_validation_context
668           .default_validation_context.match_subject_alt_names;
669   xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(
670       cluster_name, match_subject_alt_names);
671   return GRPC_ERROR_NONE;
672 }
673 
CancelClusterDataWatch(absl::string_view cluster_name,XdsClient::ClusterWatcherInterface * watcher,bool delay_unsubscription)674 void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name,
675                                    XdsClient::ClusterWatcherInterface* watcher,
676                                    bool delay_unsubscription) {
677   if (xds_certificate_provider_ != nullptr) {
678     std::string name(cluster_name);
679     xds_certificate_provider_->UpdateRootCertNameAndDistributor(name, "",
680                                                                 nullptr);
681     xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(name, "",
682                                                                     nullptr);
683     xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(name, {});
684   }
685   xds_client_->CancelClusterDataWatch(cluster_name, watcher,
686                                       delay_unsubscription);
687 }
688 //
689 // factory
690 //
691 
692 class CdsLbFactory : public LoadBalancingPolicyFactory {
693  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const694   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
695       LoadBalancingPolicy::Args args) const override {
696     grpc_error* error = GRPC_ERROR_NONE;
697     RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
698     if (error != GRPC_ERROR_NONE) {
699       gpr_log(GPR_ERROR,
700               "cannot get XdsClient to instantiate cds LB policy: %s",
701               grpc_error_string(error));
702       GRPC_ERROR_UNREF(error);
703       return nullptr;
704     }
705     return MakeOrphanable<CdsLb>(std::move(xds_client), std::move(args));
706   }
707 
name() const708   const char* name() const override { return kCds; }
709 
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const710   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
711       const Json& json, grpc_error** error) const override {
712     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
713     if (json.type() == Json::Type::JSON_NULL) {
714       // xds was mentioned as a policy in the deprecated loadBalancingPolicy
715       // field or in the client API.
716       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
717           "field:loadBalancingPolicy error:cds policy requires configuration. "
718           "Please use loadBalancingConfig field of service config instead.");
719       return nullptr;
720     }
721     std::vector<grpc_error*> error_list;
722     // cluster name.
723     std::string cluster;
724     auto it = json.object_value().find("cluster");
725     if (it == json.object_value().end()) {
726       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
727           "required field 'cluster' not present"));
728     } else if (it->second.type() != Json::Type::STRING) {
729       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
730           "field:cluster error:type should be string"));
731     } else {
732       cluster = it->second.string_value();
733     }
734     if (!error_list.empty()) {
735       *error = GRPC_ERROR_CREATE_FROM_VECTOR("Cds Parser", &error_list);
736       return nullptr;
737     }
738     return MakeRefCounted<CdsLbConfig>(std::move(cluster));
739   }
740 };
741 
742 }  // namespace
743 
744 }  // namespace grpc_core
745 
746 //
747 // Plugin registration
748 //
749 
grpc_lb_policy_cds_init()750 void grpc_lb_policy_cds_init() {
751   grpc_core::LoadBalancingPolicyRegistry::Builder::
752       RegisterLoadBalancingPolicyFactory(
753           absl::make_unique<grpc_core::CdsLbFactory>());
754 }
755 
grpc_lb_policy_cds_shutdown()756 void grpc_lb_policy_cds_shutdown() {}
757