• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "absl/strings/match.h"
22 #include "absl/strings/str_join.h"
23 #include "absl/strings/str_split.h"
24 #include "re2/re2.h"
25 #define XXH_INLINE_ALL
26 #include "xxhash.h"
27 
28 #include "src/core/ext/filters/client_channel/config_selector.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
30 #include "src/core/ext/filters/client_channel/resolver_registry.h"
31 #include "src/core/ext/xds/xds_channel_args.h"
32 #include "src/core/ext/xds/xds_client.h"
33 #include "src/core/ext/xds/xds_http_filters.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/iomgr/closure.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 #include "src/core/lib/surface/lame_client.h"
38 #include "src/core/lib/transport/timeout_encoding.h"
39 
40 namespace grpc_core {
41 
42 TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
43 
44 const char* kXdsClusterAttribute = "xds_cluster_name";
45 
46 namespace {
47 
48 //
49 // XdsResolver
50 //
51 
52 class XdsResolver : public Resolver {
53  public:
XdsResolver(ResolverArgs args)54   explicit XdsResolver(ResolverArgs args)
55       : work_serializer_(std::move(args.work_serializer)),
56         result_handler_(std::move(args.result_handler)),
57         server_name_(absl::StripPrefix(args.uri.path(), "/")),
58         args_(grpc_channel_args_copy(args.args)),
59         interested_parties_(args.pollset_set) {
60     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
61       gpr_log(GPR_INFO, "[xds_resolver %p] created for server name %s", this,
62               server_name_.c_str());
63     }
64   }
65 
~XdsResolver()66   ~XdsResolver() override {
67     grpc_channel_args_destroy(args_);
68     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
69       gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
70     }
71   }
72 
73   void StartLocked() override;
74 
75   void ShutdownLocked() override;
76 
ResetBackoffLocked()77   void ResetBackoffLocked() override {
78     if (xds_client_ != nullptr) xds_client_->ResetBackoff();
79   }
80 
81  private:
82   class Notifier {
83    public:
84     Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::LdsUpdate update);
85     Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::RdsUpdate update);
86     Notifier(RefCountedPtr<XdsResolver> resolver, grpc_error_handle error);
87     explicit Notifier(RefCountedPtr<XdsResolver> resolver);
88 
89    private:
90     enum Type { kLdsUpdate, kRdsUpdate, kError, kDoesNotExist };
91 
92     static void RunInExecCtx(void* arg, grpc_error_handle error);
93     void RunInWorkSerializer(grpc_error_handle error);
94 
95     RefCountedPtr<XdsResolver> resolver_;
96     grpc_closure closure_;
97     XdsApi::LdsUpdate update_;
98     Type type_;
99   };
100 
101   class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
102    public:
ListenerWatcher(RefCountedPtr<XdsResolver> resolver)103     explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
104         : resolver_(std::move(resolver)) {}
OnListenerChanged(XdsApi::LdsUpdate listener)105     void OnListenerChanged(XdsApi::LdsUpdate listener) override {
106       new Notifier(resolver_, std::move(listener));
107     }
OnError(grpc_error_handle error)108     void OnError(grpc_error_handle error) override {
109       new Notifier(resolver_, error);
110     }
OnResourceDoesNotExist()111     void OnResourceDoesNotExist() override { new Notifier(resolver_); }
112 
113    private:
114     RefCountedPtr<XdsResolver> resolver_;
115   };
116 
117   class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface {
118    public:
RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)119     explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
120         : resolver_(std::move(resolver)) {}
OnRouteConfigChanged(XdsApi::RdsUpdate route_config)121     void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override {
122       new Notifier(resolver_, std::move(route_config));
123     }
OnError(grpc_error_handle error)124     void OnError(grpc_error_handle error) override {
125       new Notifier(resolver_, error);
126     }
OnResourceDoesNotExist()127     void OnResourceDoesNotExist() override { new Notifier(resolver_); }
128 
129    private:
130     RefCountedPtr<XdsResolver> resolver_;
131   };
132 
133   class ClusterState
134       : public RefCounted<ClusterState, PolymorphicRefCount, kUnrefNoDelete> {
135    public:
136     using ClusterStateMap =
137         std::map<std::string, std::unique_ptr<ClusterState>>;
138 
ClusterState(const std::string & cluster_name,ClusterStateMap * cluster_state_map)139     ClusterState(const std::string& cluster_name,
140                  ClusterStateMap* cluster_state_map)
141         : it_(cluster_state_map
142                   ->emplace(cluster_name, std::unique_ptr<ClusterState>(this))
143                   .first) {}
cluster() const144     const std::string& cluster() const { return it_->first; }
145 
146    private:
147     ClusterStateMap::iterator it_;
148   };
149 
150   class XdsConfigSelector : public ConfigSelector {
151    public:
152     XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
153                       grpc_error_handle* error);
154     ~XdsConfigSelector() override;
155 
name() const156     const char* name() const override { return "XdsConfigSelector"; }
157 
Equals(const ConfigSelector * other) const158     bool Equals(const ConfigSelector* other) const override {
159       const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
160       // Don't need to compare resolver_, since that will always be the same.
161       return route_table_ == other_xds->route_table_ &&
162              clusters_ == other_xds->clusters_;
163     }
164 
165     CallConfig GetCallConfig(GetCallConfigArgs args) override;
166 
GetFilters()167     std::vector<const grpc_channel_filter*> GetFilters() override {
168       return filters_;
169     }
170 
171     grpc_channel_args* ModifyChannelArgs(grpc_channel_args* args) override;
172 
173    private:
174     struct Route {
175       struct ClusterWeightState {
176         uint32_t range_end;
177         absl::string_view cluster;
178         RefCountedPtr<ServiceConfig> method_config;
179 
180         bool operator==(const ClusterWeightState& other) const;
181       };
182 
183       XdsApi::Route route;
184       RefCountedPtr<ServiceConfig> method_config;
185       absl::InlinedVector<ClusterWeightState, 2> weighted_cluster_state;
186 
187       bool operator==(const Route& other) const;
188     };
189     using RouteTable = std::vector<Route>;
190 
191     void MaybeAddCluster(const std::string& name);
192     grpc_error_handle CreateMethodConfig(
193         const XdsApi::Route& route,
194         const XdsApi::Route::ClusterWeight* cluster_weight,
195         RefCountedPtr<ServiceConfig>* method_config);
196 
197     RefCountedPtr<XdsResolver> resolver_;
198     RouteTable route_table_;
199     std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
200     std::vector<const grpc_channel_filter*> filters_;
201     grpc_error_handle filter_error_ = GRPC_ERROR_NONE;
202   };
203 
204   void OnListenerUpdate(XdsApi::LdsUpdate listener);
205   void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update);
206   void OnError(grpc_error_handle error);
207   void OnResourceDoesNotExist();
208 
209   grpc_error_handle CreateServiceConfig(
210       RefCountedPtr<ServiceConfig>* service_config);
211   void GenerateResult();
212   void MaybeRemoveUnusedClusters();
213 
214   std::shared_ptr<WorkSerializer> work_serializer_;
215   std::unique_ptr<ResultHandler> result_handler_;
216   std::string server_name_;
217   const grpc_channel_args* args_;
218   grpc_pollset_set* interested_parties_;
219 
220   RefCountedPtr<XdsClient> xds_client_;
221 
222   XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr;
223   // This will not contain the RouteConfiguration, even if it comes with the
224   // LDS response; instead, the relevant VirtualHost from the
225   // RouteConfiguration will be saved in current_virtual_host_.
226   XdsApi::LdsUpdate current_listener_;
227 
228   std::string route_config_name_;
229   XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr;
230   XdsApi::RdsUpdate::VirtualHost current_virtual_host_;
231 
232   ClusterState::ClusterStateMap cluster_state_map_;
233 };
234 
235 //
236 // XdsResolver::Notifier
237 //
238 
Notifier(RefCountedPtr<XdsResolver> resolver,XdsApi::LdsUpdate update)239 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
240                                 XdsApi::LdsUpdate update)
241     : resolver_(std::move(resolver)),
242       update_(std::move(update)),
243       type_(kLdsUpdate) {
244   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
245   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
246 }
247 
Notifier(RefCountedPtr<XdsResolver> resolver,XdsApi::RdsUpdate update)248 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
249                                 XdsApi::RdsUpdate update)
250     : resolver_(std::move(resolver)), type_(kRdsUpdate) {
251   update_.http_connection_manager.rds_update = std::move(update);
252   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
253   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
254 }
255 
Notifier(RefCountedPtr<XdsResolver> resolver,grpc_error_handle error)256 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
257                                 grpc_error_handle error)
258     : resolver_(std::move(resolver)), type_(kError) {
259   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
260   ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
261 }
262 
Notifier(RefCountedPtr<XdsResolver> resolver)263 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver)
264     : resolver_(std::move(resolver)), type_(kDoesNotExist) {
265   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
266   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
267 }
268 
RunInExecCtx(void * arg,grpc_error_handle error)269 void XdsResolver::Notifier::RunInExecCtx(void* arg, grpc_error_handle error) {
270   Notifier* self = static_cast<Notifier*>(arg);
271   GRPC_ERROR_REF(error);
272   self->resolver_->work_serializer_->Run(
273       [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
274 }
275 
RunInWorkSerializer(grpc_error_handle error)276 void XdsResolver::Notifier::RunInWorkSerializer(grpc_error_handle error) {
277   if (resolver_->xds_client_ == nullptr) {
278     GRPC_ERROR_UNREF(error);
279     delete this;
280     return;
281   }
282   switch (type_) {
283     case kLdsUpdate:
284       resolver_->OnListenerUpdate(std::move(update_));
285       break;
286     case kRdsUpdate:
287       resolver_->OnRouteConfigUpdate(
288           std::move(*update_.http_connection_manager.rds_update));
289       break;
290     case kError:
291       resolver_->OnError(error);
292       break;
293     case kDoesNotExist:
294       resolver_->OnResourceDoesNotExist();
295       break;
296   };
297   delete this;
298 }
299 
300 //
301 // XdsResolver::XdsConfigSelector::Route
302 //
303 
MethodConfigsEqual(const ServiceConfig * sc1,const ServiceConfig * sc2)304 bool MethodConfigsEqual(const ServiceConfig* sc1, const ServiceConfig* sc2) {
305   if (sc1 == nullptr) return sc2 == nullptr;
306   if (sc2 == nullptr) return false;
307   return sc1->json_string() == sc2->json_string();
308 }
309 
operator ==(const ClusterWeightState & other) const310 bool XdsResolver::XdsConfigSelector::Route::ClusterWeightState::operator==(
311     const ClusterWeightState& other) const {
312   return range_end == other.range_end && cluster == other.cluster &&
313          MethodConfigsEqual(method_config.get(), other.method_config.get());
314 }
315 
operator ==(const Route & other) const316 bool XdsResolver::XdsConfigSelector::Route::operator==(
317     const Route& other) const {
318   return route == other.route &&
319          weighted_cluster_state == other.weighted_cluster_state &&
320          MethodConfigsEqual(method_config.get(), other.method_config.get());
321 }
322 
323 //
324 // XdsResolver::XdsConfigSelector
325 //
326 
XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,grpc_error_handle * error)327 XdsResolver::XdsConfigSelector::XdsConfigSelector(
328     RefCountedPtr<XdsResolver> resolver, grpc_error_handle* error)
329     : resolver_(std::move(resolver)) {
330   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
331     gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
332             resolver_.get(), this);
333   }
334   // 1. Construct the route table
335   // 2  Update resolver's cluster state map
336   // 3. Construct cluster list to hold on to entries in the cluster state
337   // map.
338   // Reserve the necessary entries up-front to avoid reallocation as we add
339   // elements. This is necessary because the string_view in the entry's
340   // weighted_cluster_state field points to the memory in the route field, so
341   // moving the entry in a reallocation will cause the string_view to point to
342   // invalid data.
343   route_table_.reserve(resolver_->current_virtual_host_.routes.size());
344   for (auto& route : resolver_->current_virtual_host_.routes) {
345     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
346       gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
347               resolver_.get(), this, route.ToString().c_str());
348     }
349     route_table_.emplace_back();
350     auto& route_entry = route_table_.back();
351     route_entry.route = route;
352     // If the route doesn't specify a timeout, set its timeout to the global
353     // one.
354     if (!route.max_stream_duration.has_value()) {
355       route_entry.route.max_stream_duration =
356           resolver_->current_listener_.http_connection_manager
357               .http_max_stream_duration;
358     }
359     if (route.weighted_clusters.empty()) {
360       *error = CreateMethodConfig(route_entry.route, nullptr,
361                                   &route_entry.method_config);
362       MaybeAddCluster(route.cluster_name);
363     } else {
364       uint32_t end = 0;
365       for (const auto& weighted_cluster : route_entry.route.weighted_clusters) {
366         Route::ClusterWeightState cluster_weight_state;
367         *error = CreateMethodConfig(route_entry.route, &weighted_cluster,
368                                     &cluster_weight_state.method_config);
369         if (*error != GRPC_ERROR_NONE) return;
370         end += weighted_cluster.weight;
371         cluster_weight_state.range_end = end;
372         cluster_weight_state.cluster = weighted_cluster.name;
373         route_entry.weighted_cluster_state.push_back(
374             std::move(cluster_weight_state));
375         MaybeAddCluster(weighted_cluster.name);
376       }
377     }
378   }
379   // Populate filter list.
380   bool found_router = false;
381   for (const auto& http_filter :
382        resolver_->current_listener_.http_connection_manager.http_filters) {
383     // Stop at the router filter.  It's a no-op for us, and we ignore
384     // anything that may come after it, for compatibility with Envoy.
385     if (http_filter.config.config_proto_type_name ==
386         kXdsHttpRouterFilterConfigName) {
387       found_router = true;
388       break;
389     }
390     // Find filter.  This is guaranteed to succeed, because it's checked
391     // at config validation time in the XdsApi code.
392     const XdsHttpFilterImpl* filter_impl =
393         XdsHttpFilterRegistry::GetFilterForType(
394             http_filter.config.config_proto_type_name);
395     GPR_ASSERT(filter_impl != nullptr);
396     // Add C-core filter to list.
397     filters_.push_back(filter_impl->channel_filter());
398   }
399   // For compatibility with Envoy, if the router filter is not
400   // configured, we fail all RPCs.
401   if (!found_router) {
402     filter_error_ =
403         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
404                                "no xDS HTTP router filter configured"),
405                            GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
406     filters_.push_back(&grpc_lame_filter);
407   }
408 }
409 
~XdsConfigSelector()410 XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
411   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
412     gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
413             resolver_.get(), this);
414   }
415   clusters_.clear();
416   resolver_->MaybeRemoveUnusedClusters();
417   GRPC_ERROR_UNREF(filter_error_);
418 }
419 
FindFilterConfigOverride(const std::string & instance_name,const XdsApi::RdsUpdate::VirtualHost & vhost,const XdsApi::Route & route,const XdsApi::Route::ClusterWeight * cluster_weight)420 const XdsHttpFilterImpl::FilterConfig* FindFilterConfigOverride(
421     const std::string& instance_name,
422     const XdsApi::RdsUpdate::VirtualHost& vhost, const XdsApi::Route& route,
423     const XdsApi::Route::ClusterWeight* cluster_weight) {
424   // Check ClusterWeight, if any.
425   if (cluster_weight != nullptr) {
426     auto it = cluster_weight->typed_per_filter_config.find(instance_name);
427     if (it != cluster_weight->typed_per_filter_config.end()) return &it->second;
428   }
429   // Check Route.
430   auto it = route.typed_per_filter_config.find(instance_name);
431   if (it != route.typed_per_filter_config.end()) return &it->second;
432   // Check VirtualHost.
433   it = vhost.typed_per_filter_config.find(instance_name);
434   if (it != vhost.typed_per_filter_config.end()) return &it->second;
435   // Not found.
436   return nullptr;
437 }
438 
CreateMethodConfig(const XdsApi::Route & route,const XdsApi::Route::ClusterWeight * cluster_weight,RefCountedPtr<ServiceConfig> * method_config)439 grpc_error_handle XdsResolver::XdsConfigSelector::CreateMethodConfig(
440     const XdsApi::Route& route,
441     const XdsApi::Route::ClusterWeight* cluster_weight,
442     RefCountedPtr<ServiceConfig>* method_config) {
443   std::vector<std::string> fields;
444   // Set timeout.
445   if (route.max_stream_duration.has_value() &&
446       (route.max_stream_duration->seconds != 0 ||
447        route.max_stream_duration->nanos != 0)) {
448     fields.emplace_back(absl::StrFormat("    \"timeout\": \"%d.%09ds\"",
449                                         route.max_stream_duration->seconds,
450                                         route.max_stream_duration->nanos));
451   }
452   // Handle xDS HTTP filters.
453   std::map<std::string, std::vector<std::string>> per_filter_configs;
454   grpc_channel_args* args = grpc_channel_args_copy(resolver_->args_);
455   for (const auto& http_filter :
456        resolver_->current_listener_.http_connection_manager.http_filters) {
457     // Stop at the router filter.  It's a no-op for us, and we ignore
458     // anything that may come after it, for compatibility with Envoy.
459     if (http_filter.config.config_proto_type_name ==
460         kXdsHttpRouterFilterConfigName) {
461       break;
462     }
463     // Find filter.  This is guaranteed to succeed, because it's checked
464     // at config validation time in the XdsApi code.
465     const XdsHttpFilterImpl* filter_impl =
466         XdsHttpFilterRegistry::GetFilterForType(
467             http_filter.config.config_proto_type_name);
468     GPR_ASSERT(filter_impl != nullptr);
469     // Allow filter to add channel args that may affect service config
470     // parsing.
471     args = filter_impl->ModifyChannelArgs(args);
472     // Find config override, if any.
473     const XdsHttpFilterImpl::FilterConfig* config_override =
474         FindFilterConfigOverride(http_filter.name,
475                                  resolver_->current_virtual_host_, route,
476                                  cluster_weight);
477     // Generate service config for filter.
478     auto method_config_field =
479         filter_impl->GenerateServiceConfig(http_filter.config, config_override);
480     if (!method_config_field.ok()) {
481       return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
482           absl::StrCat("failed to generate method config for HTTP filter ",
483                        http_filter.name, ": ",
484                        method_config_field.status().ToString())
485               .c_str());
486     }
487     per_filter_configs[method_config_field->service_config_field_name]
488         .push_back(method_config_field->element);
489   }
490   for (const auto& p : per_filter_configs) {
491     fields.emplace_back(absl::StrCat("    \"", p.first, "\": [\n",
492                                      absl::StrJoin(p.second, ",\n"),
493                                      "\n    ]"));
494   }
495   // Construct service config.
496   grpc_error_handle error = GRPC_ERROR_NONE;
497   if (!fields.empty()) {
498     std::string json = absl::StrCat(
499         "{\n"
500         "  \"methodConfig\": [ {\n"
501         "    \"name\": [\n"
502         "      {}\n"
503         "    ],\n"
504         "    ",
505         absl::StrJoin(fields, ",\n"),
506         "\n  } ]\n"
507         "}");
508     *method_config = ServiceConfig::Create(args, json.c_str(), &error);
509   }
510   grpc_channel_args_destroy(args);
511   return error;
512 }
513 
ModifyChannelArgs(grpc_channel_args * args)514 grpc_channel_args* XdsResolver::XdsConfigSelector::ModifyChannelArgs(
515     grpc_channel_args* args) {
516   if (filter_error_ == GRPC_ERROR_NONE) return args;
517   grpc_arg error_arg = MakeLameClientErrorArg(filter_error_);
518   grpc_channel_args* new_args =
519       grpc_channel_args_copy_and_add(args, &error_arg, 1);
520   grpc_channel_args_destroy(args);
521   return new_args;
522 }
523 
MaybeAddCluster(const std::string & name)524 void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {
525   if (clusters_.find(name) == clusters_.end()) {
526     auto it = resolver_->cluster_state_map_.find(name);
527     if (it == resolver_->cluster_state_map_.end()) {
528       auto new_cluster_state =
529           MakeRefCounted<ClusterState>(name, &resolver_->cluster_state_map_);
530       clusters_[new_cluster_state->cluster()] = std::move(new_cluster_state);
531     } else {
532       clusters_[it->second->cluster()] = it->second->Ref();
533     }
534   }
535 }
536 
GetHeaderValue(grpc_metadata_batch * initial_metadata,absl::string_view header_name,std::string * concatenated_value)537 absl::optional<absl::string_view> GetHeaderValue(
538     grpc_metadata_batch* initial_metadata, absl::string_view header_name,
539     std::string* concatenated_value) {
540   // Note: If we ever allow binary headers here, we still need to
541   // special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since
542   // they are not visible to the LB policy in grpc-go.
543   if (absl::EndsWith(header_name, "-bin")) {
544     return absl::nullopt;
545   } else if (header_name == "content-type") {
546     return "application/grpc";
547   }
548   return grpc_metadata_batch_get_value(initial_metadata, header_name,
549                                        concatenated_value);
550 }
551 
HeadersMatch(const std::vector<HeaderMatcher> & header_matchers,grpc_metadata_batch * initial_metadata)552 bool HeadersMatch(const std::vector<HeaderMatcher>& header_matchers,
553                   grpc_metadata_batch* initial_metadata) {
554   for (const auto& header_matcher : header_matchers) {
555     std::string concatenated_value;
556     if (!header_matcher.Match(GetHeaderValue(
557             initial_metadata, header_matcher.name(), &concatenated_value))) {
558       return false;
559     }
560   }
561   return true;
562 }
563 
HeaderHashHelper(const XdsApi::Route::HashPolicy & policy,grpc_metadata_batch * initial_metadata)564 absl::optional<uint64_t> HeaderHashHelper(
565     const XdsApi::Route::HashPolicy& policy,
566     grpc_metadata_batch* initial_metadata) {
567   GPR_ASSERT(policy.type == XdsApi::Route::HashPolicy::HEADER);
568   std::string value_buffer;
569   absl::optional<absl::string_view> header_value =
570       GetHeaderValue(initial_metadata, policy.header_name, &value_buffer);
571   if (policy.regex != nullptr) {
572     // If GetHeaderValue() did not already store the value in
573     // value_buffer, copy it there now, so we can modify it.
574     if (header_value->data() != value_buffer.data()) {
575       value_buffer = std::string(*header_value);
576     }
577     RE2::GlobalReplace(&value_buffer, *policy.regex, policy.regex_substitution);
578     header_value = value_buffer;
579   }
580   return XXH64(header_value->data(), header_value->size(), 0);
581 }
582 
UnderFraction(const uint32_t fraction_per_million)583 bool UnderFraction(const uint32_t fraction_per_million) {
584   // Generate a random number in [0, 1000000).
585   const uint32_t random_number = rand() % 1000000;
586   return random_number < fraction_per_million;
587 }
588 
GetCallConfig(GetCallConfigArgs args)589 ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
590     GetCallConfigArgs args) {
591   for (const auto& entry : route_table_) {
592     // Path matching.
593     if (!entry.route.matchers.path_matcher.Match(
594             StringViewFromSlice(*args.path))) {
595       continue;
596     }
597     // Header Matching.
598     if (!HeadersMatch(entry.route.matchers.header_matchers,
599                       args.initial_metadata)) {
600       continue;
601     }
602     // Match fraction check
603     if (entry.route.matchers.fraction_per_million.has_value() &&
604         !UnderFraction(entry.route.matchers.fraction_per_million.value())) {
605       continue;
606     }
607     // Found a route match
608     absl::string_view cluster_name;
609     RefCountedPtr<ServiceConfig> method_config;
610     if (entry.route.weighted_clusters.empty()) {
611       cluster_name = entry.route.cluster_name;
612       method_config = entry.method_config;
613     } else {
614       const uint32_t key =
615           rand() %
616           entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
617               .range_end;
618       // Find the index in weighted clusters corresponding to key.
619       size_t mid = 0;
620       size_t start_index = 0;
621       size_t end_index = entry.weighted_cluster_state.size() - 1;
622       size_t index = 0;
623       while (end_index > start_index) {
624         mid = (start_index + end_index) / 2;
625         if (entry.weighted_cluster_state[mid].range_end > key) {
626           end_index = mid;
627         } else if (entry.weighted_cluster_state[mid].range_end < key) {
628           start_index = mid + 1;
629         } else {
630           index = mid + 1;
631           break;
632         }
633       }
634       if (index == 0) index = start_index;
635       GPR_ASSERT(entry.weighted_cluster_state[index].range_end > key);
636       cluster_name = entry.weighted_cluster_state[index].cluster;
637       method_config = entry.weighted_cluster_state[index].method_config;
638     }
639     auto it = clusters_.find(cluster_name);
640     GPR_ASSERT(it != clusters_.end());
641     XdsResolver* resolver =
642         static_cast<XdsResolver*>(resolver_->Ref().release());
643     ClusterState* cluster_state = it->second->Ref().release();
644     // Generate a hash
645     absl::optional<uint64_t> hash;
646     for (const auto& hash_policy : entry.route.hash_policies) {
647       absl::optional<uint64_t> new_hash;
648       switch (hash_policy.type) {
649         case XdsApi::Route::HashPolicy::HEADER:
650           new_hash = HeaderHashHelper(hash_policy, args.initial_metadata);
651           break;
652         case XdsApi::Route::HashPolicy::CHANNEL_ID:
653           new_hash =
654               static_cast<uint64_t>(reinterpret_cast<uintptr_t>(resolver));
655           break;
656         default:
657           GPR_ASSERT(0);
658       }
659       if (new_hash.has_value()) {
660         // Rotating the old value prevents duplicate hash rules from cancelling
661         // each other out and preserves all of the entropy
662         const uint64_t old_value =
663             hash.has_value() ? ((hash.value() << 1) | (hash.value() >> 63)) : 0;
664         hash = old_value ^ new_hash.value();
665       }
666       // If the policy is a terminal policy and a hash has been generated,
667       // ignore the rest of the hash policies.
668       if (hash_policy.terminal && hash.has_value()) {
669         break;
670       }
671     }
672     if (!hash.has_value()) {
673       // If there is no hash, we just choose a random value as a default.
674       hash = rand();
675     }
676     CallConfig call_config;
677     if (method_config != nullptr) {
678       call_config.method_configs =
679           method_config->GetMethodParsedConfigVector(grpc_empty_slice());
680       call_config.service_config = std::move(method_config);
681     }
682     call_config.call_attributes[kXdsClusterAttribute] = it->first;
683     call_config.call_attributes[kRequestRingHashAttribute] =
684         absl::StrFormat("%" PRIu64, hash.value());
685     call_config.on_call_committed = [resolver, cluster_state]() {
686       cluster_state->Unref();
687       ExecCtx::Run(
688           // TODO(roth): This hop into the ExecCtx is being done to avoid
689           // entering the WorkSerializer while holding the client channel data
690           // plane mutex, since that can lead to deadlocks. However, we should
691           // not have to solve this problem in each individual ConfigSelector
692           // implementation. When we have time, we should fix the client channel
693           // code to avoid this by not invoking the
694           // CallConfig::on_call_committed callback until after it has released
695           // the data plane mutex.
696           DEBUG_LOCATION,
697           GRPC_CLOSURE_CREATE(
698               [](void* arg, grpc_error_handle /*error*/) {
699                 auto* resolver = static_cast<XdsResolver*>(arg);
700                 resolver->work_serializer_->Run(
701                     [resolver]() {
702                       resolver->MaybeRemoveUnusedClusters();
703                       resolver->Unref();
704                     },
705                     DEBUG_LOCATION);
706               },
707               resolver, nullptr),
708           GRPC_ERROR_NONE);
709     };
710     return call_config;
711   }
712   return CallConfig();
713 }
714 
715 //
716 // XdsResolver
717 //
718 
StartLocked()719 void XdsResolver::StartLocked() {
720   grpc_error_handle error = GRPC_ERROR_NONE;
721   xds_client_ = XdsClient::GetOrCreate(args_, &error);
722   if (error != GRPC_ERROR_NONE) {
723     gpr_log(GPR_ERROR,
724             "Failed to create xds client -- channel will remain in "
725             "TRANSIENT_FAILURE: %s",
726             grpc_error_std_string(error).c_str());
727     result_handler_->ReturnError(error);
728     return;
729   }
730   grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
731                                    interested_parties_);
732   channelz::ChannelNode* parent_channelz_node =
733       grpc_channel_args_find_pointer<channelz::ChannelNode>(
734           args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
735   if (parent_channelz_node != nullptr) {
736     xds_client_->AddChannelzLinkage(parent_channelz_node);
737   }
738   auto watcher = absl::make_unique<ListenerWatcher>(Ref());
739   listener_watcher_ = watcher.get();
740   xds_client_->WatchListenerData(server_name_, std::move(watcher));
741 }
742 
ShutdownLocked()743 void XdsResolver::ShutdownLocked() {
744   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
745     gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
746   }
747   if (xds_client_ != nullptr) {
748     if (listener_watcher_ != nullptr) {
749       xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_,
750                                            /*delay_unsubscription=*/false);
751     }
752     if (route_config_watcher_ != nullptr) {
753       xds_client_->CancelRouteConfigDataWatch(
754           server_name_, route_config_watcher_, /*delay_unsubscription=*/false);
755     }
756     channelz::ChannelNode* parent_channelz_node =
757         grpc_channel_args_find_pointer<channelz::ChannelNode>(
758             args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
759     if (parent_channelz_node != nullptr) {
760       xds_client_->RemoveChannelzLinkage(parent_channelz_node);
761     }
762     grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
763                                      interested_parties_);
764     xds_client_.reset();
765   }
766 }
767 
OnListenerUpdate(XdsApi::LdsUpdate listener)768 void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) {
769   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
770     gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this);
771   }
772   if (listener.http_connection_manager.route_config_name !=
773       route_config_name_) {
774     if (route_config_watcher_ != nullptr) {
775       xds_client_->CancelRouteConfigDataWatch(
776           route_config_name_, route_config_watcher_,
777           /*delay_unsubscription=*/
778           !listener.http_connection_manager.route_config_name.empty());
779       route_config_watcher_ = nullptr;
780     }
781     route_config_name_ =
782         std::move(listener.http_connection_manager.route_config_name);
783     if (!route_config_name_.empty()) {
784       current_virtual_host_.routes.clear();
785       auto watcher = absl::make_unique<RouteConfigWatcher>(Ref());
786       route_config_watcher_ = watcher.get();
787       xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher));
788     }
789   }
790   current_listener_ = std::move(listener);
791   if (route_config_name_.empty()) {
792     GPR_ASSERT(
793         current_listener_.http_connection_manager.rds_update.has_value());
794     OnRouteConfigUpdate(
795         std::move(*current_listener_.http_connection_manager.rds_update));
796   } else {
797     // HCM may contain newer filter config. We need to propagate the update as
798     // config selector to the channel
799     GenerateResult();
800   }
801 }
802 
OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update)803 void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
804   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
805     gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
806   }
807   // Find the relevant VirtualHost from the RouteConfiguration.
808   XdsApi::RdsUpdate::VirtualHost* vhost =
809       rds_update.FindVirtualHostForDomain(server_name_);
810   if (vhost == nullptr) {
811     OnError(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
812         absl::StrCat("could not find VirtualHost for ", server_name_,
813                      " in RouteConfiguration")
814             .c_str()));
815     return;
816   }
817   // Save the virtual host in the resolver.
818   current_virtual_host_ = std::move(*vhost);
819   // Send a new result to the channel.
820   GenerateResult();
821 }
822 
OnError(grpc_error_handle error)823 void XdsResolver::OnError(grpc_error_handle error) {
824   gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s",
825           this, grpc_error_std_string(error).c_str());
826   Result result;
827   grpc_arg new_arg = xds_client_->MakeChannelArg();
828   result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
829   result.service_config_error = error;
830   result_handler_->ReturnResult(std::move(result));
831 }
832 
OnResourceDoesNotExist()833 void XdsResolver::OnResourceDoesNotExist() {
834   gpr_log(GPR_ERROR,
835           "[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
836           "update and returning empty service config",
837           this);
838   current_virtual_host_.routes.clear();
839   Result result;
840   result.service_config =
841       ServiceConfig::Create(args_, "{}", &result.service_config_error);
842   GPR_ASSERT(result.service_config != nullptr);
843   result.args = grpc_channel_args_copy(args_);
844   result_handler_->ReturnResult(std::move(result));
845 }
846 
CreateServiceConfig(RefCountedPtr<ServiceConfig> * service_config)847 grpc_error_handle XdsResolver::CreateServiceConfig(
848     RefCountedPtr<ServiceConfig>* service_config) {
849   std::vector<std::string> clusters;
850   for (const auto& cluster : cluster_state_map_) {
851     clusters.push_back(
852         absl::StrFormat("      \"%s\":{\n"
853                         "        \"childPolicy\":[ {\n"
854                         "          \"cds_experimental\":{\n"
855                         "            \"cluster\": \"%s\"\n"
856                         "          }\n"
857                         "        } ]\n"
858                         "       }",
859                         cluster.first, cluster.first));
860   }
861   std::vector<std::string> config_parts;
862   config_parts.push_back(
863       "{\n"
864       "  \"loadBalancingConfig\":[\n"
865       "    { \"xds_cluster_manager_experimental\":{\n"
866       "      \"children\":{\n");
867   config_parts.push_back(absl::StrJoin(clusters, ",\n"));
868   config_parts.push_back(
869       "    }\n"
870       "    } }\n"
871       "  ]\n"
872       "}");
873   std::string json = absl::StrJoin(config_parts, "");
874   grpc_error_handle error = GRPC_ERROR_NONE;
875   *service_config = ServiceConfig::Create(args_, json.c_str(), &error);
876   return error;
877 }
878 
GenerateResult()879 void XdsResolver::GenerateResult() {
880   if (current_virtual_host_.routes.empty()) return;
881   // First create XdsConfigSelector, which may add new entries to the cluster
882   // state map, and then CreateServiceConfig for LB policies.
883   grpc_error_handle error = GRPC_ERROR_NONE;
884   auto config_selector = MakeRefCounted<XdsConfigSelector>(Ref(), &error);
885   if (error != GRPC_ERROR_NONE) {
886     OnError(error);
887     return;
888   }
889   Result result;
890   error = CreateServiceConfig(&result.service_config);
891   if (error != GRPC_ERROR_NONE) {
892     OnError(error);
893     return;
894   }
895   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
896     gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
897             result.service_config->json_string().c_str());
898   }
899   grpc_arg new_args[] = {
900       xds_client_->MakeChannelArg(),
901       config_selector->MakeChannelArg(),
902   };
903   result.args =
904       grpc_channel_args_copy_and_add(args_, new_args, GPR_ARRAY_SIZE(new_args));
905   result_handler_->ReturnResult(std::move(result));
906 }
907 
MaybeRemoveUnusedClusters()908 void XdsResolver::MaybeRemoveUnusedClusters() {
909   bool update_needed = false;
910   for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
911     RefCountedPtr<ClusterState> cluster_state = it->second->RefIfNonZero();
912     if (cluster_state != nullptr) {
913       ++it;
914     } else {
915       update_needed = true;
916       it = cluster_state_map_.erase(it);
917     }
918   }
919   if (update_needed && xds_client_ != nullptr) {
920     // Send a new result to the channel.
921     GenerateResult();
922   }
923 }
924 
925 //
926 // Factory
927 //
928 
929 class XdsResolverFactory : public ResolverFactory {
930  public:
IsValidUri(const URI & uri) const931   bool IsValidUri(const URI& uri) const override {
932     if (GPR_UNLIKELY(!uri.authority().empty())) {
933       gpr_log(GPR_ERROR, "URI authority not supported");
934       return false;
935     }
936     return true;
937   }
938 
CreateResolver(ResolverArgs args) const939   OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
940     if (!IsValidUri(args.uri)) return nullptr;
941     return MakeOrphanable<XdsResolver>(std::move(args));
942   }
943 
scheme() const944   const char* scheme() const override { return "xds"; }
945 };
946 
947 }  // namespace
948 
949 }  // namespace grpc_core
950 
grpc_resolver_xds_init()951 void grpc_resolver_xds_init() {
952   grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
953       absl::make_unique<grpc_core::XdsResolverFactory>());
954 }
955 
grpc_resolver_xds_shutdown()956 void grpc_resolver_xds_shutdown() {}
957