1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "absl/strings/match.h"
22 #include "absl/strings/str_join.h"
23 #include "absl/strings/str_split.h"
24 #include "re2/re2.h"
25 #define XXH_INLINE_ALL
26 #include "xxhash.h"
27
28 #include "src/core/ext/filters/client_channel/config_selector.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h"
30 #include "src/core/ext/filters/client_channel/resolver_registry.h"
31 #include "src/core/ext/xds/xds_channel_args.h"
32 #include "src/core/ext/xds/xds_client.h"
33 #include "src/core/ext/xds/xds_http_filters.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/iomgr/closure.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 #include "src/core/lib/surface/lame_client.h"
38 #include "src/core/lib/transport/timeout_encoding.h"
39
40 namespace grpc_core {
41
42 TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
43
44 const char* kXdsClusterAttribute = "xds_cluster_name";
45
46 namespace {
47
48 //
49 // XdsResolver
50 //
51
52 class XdsResolver : public Resolver {
53 public:
XdsResolver(ResolverArgs args)54 explicit XdsResolver(ResolverArgs args)
55 : work_serializer_(std::move(args.work_serializer)),
56 result_handler_(std::move(args.result_handler)),
57 server_name_(absl::StripPrefix(args.uri.path(), "/")),
58 args_(grpc_channel_args_copy(args.args)),
59 interested_parties_(args.pollset_set) {
60 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
61 gpr_log(GPR_INFO, "[xds_resolver %p] created for server name %s", this,
62 server_name_.c_str());
63 }
64 }
65
~XdsResolver()66 ~XdsResolver() override {
67 grpc_channel_args_destroy(args_);
68 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
69 gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
70 }
71 }
72
73 void StartLocked() override;
74
75 void ShutdownLocked() override;
76
ResetBackoffLocked()77 void ResetBackoffLocked() override {
78 if (xds_client_ != nullptr) xds_client_->ResetBackoff();
79 }
80
81 private:
82 class Notifier {
83 public:
84 Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::LdsUpdate update);
85 Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::RdsUpdate update);
86 Notifier(RefCountedPtr<XdsResolver> resolver, grpc_error_handle error);
87 explicit Notifier(RefCountedPtr<XdsResolver> resolver);
88
89 private:
90 enum Type { kLdsUpdate, kRdsUpdate, kError, kDoesNotExist };
91
92 static void RunInExecCtx(void* arg, grpc_error_handle error);
93 void RunInWorkSerializer(grpc_error_handle error);
94
95 RefCountedPtr<XdsResolver> resolver_;
96 grpc_closure closure_;
97 XdsApi::LdsUpdate update_;
98 Type type_;
99 };
100
101 class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
102 public:
ListenerWatcher(RefCountedPtr<XdsResolver> resolver)103 explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
104 : resolver_(std::move(resolver)) {}
OnListenerChanged(XdsApi::LdsUpdate listener)105 void OnListenerChanged(XdsApi::LdsUpdate listener) override {
106 new Notifier(resolver_, std::move(listener));
107 }
OnError(grpc_error_handle error)108 void OnError(grpc_error_handle error) override {
109 new Notifier(resolver_, error);
110 }
OnResourceDoesNotExist()111 void OnResourceDoesNotExist() override { new Notifier(resolver_); }
112
113 private:
114 RefCountedPtr<XdsResolver> resolver_;
115 };
116
117 class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface {
118 public:
RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)119 explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
120 : resolver_(std::move(resolver)) {}
OnRouteConfigChanged(XdsApi::RdsUpdate route_config)121 void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override {
122 new Notifier(resolver_, std::move(route_config));
123 }
OnError(grpc_error_handle error)124 void OnError(grpc_error_handle error) override {
125 new Notifier(resolver_, error);
126 }
OnResourceDoesNotExist()127 void OnResourceDoesNotExist() override { new Notifier(resolver_); }
128
129 private:
130 RefCountedPtr<XdsResolver> resolver_;
131 };
132
133 class ClusterState
134 : public RefCounted<ClusterState, PolymorphicRefCount, kUnrefNoDelete> {
135 public:
136 using ClusterStateMap =
137 std::map<std::string, std::unique_ptr<ClusterState>>;
138
ClusterState(const std::string & cluster_name,ClusterStateMap * cluster_state_map)139 ClusterState(const std::string& cluster_name,
140 ClusterStateMap* cluster_state_map)
141 : it_(cluster_state_map
142 ->emplace(cluster_name, std::unique_ptr<ClusterState>(this))
143 .first) {}
cluster() const144 const std::string& cluster() const { return it_->first; }
145
146 private:
147 ClusterStateMap::iterator it_;
148 };
149
150 class XdsConfigSelector : public ConfigSelector {
151 public:
152 XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
153 grpc_error_handle* error);
154 ~XdsConfigSelector() override;
155
name() const156 const char* name() const override { return "XdsConfigSelector"; }
157
Equals(const ConfigSelector * other) const158 bool Equals(const ConfigSelector* other) const override {
159 const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
160 // Don't need to compare resolver_, since that will always be the same.
161 return route_table_ == other_xds->route_table_ &&
162 clusters_ == other_xds->clusters_;
163 }
164
165 CallConfig GetCallConfig(GetCallConfigArgs args) override;
166
GetFilters()167 std::vector<const grpc_channel_filter*> GetFilters() override {
168 return filters_;
169 }
170
171 grpc_channel_args* ModifyChannelArgs(grpc_channel_args* args) override;
172
173 private:
174 struct Route {
175 struct ClusterWeightState {
176 uint32_t range_end;
177 absl::string_view cluster;
178 RefCountedPtr<ServiceConfig> method_config;
179
180 bool operator==(const ClusterWeightState& other) const;
181 };
182
183 XdsApi::Route route;
184 RefCountedPtr<ServiceConfig> method_config;
185 absl::InlinedVector<ClusterWeightState, 2> weighted_cluster_state;
186
187 bool operator==(const Route& other) const;
188 };
189 using RouteTable = std::vector<Route>;
190
191 void MaybeAddCluster(const std::string& name);
192 grpc_error_handle CreateMethodConfig(
193 const XdsApi::Route& route,
194 const XdsApi::Route::ClusterWeight* cluster_weight,
195 RefCountedPtr<ServiceConfig>* method_config);
196
197 RefCountedPtr<XdsResolver> resolver_;
198 RouteTable route_table_;
199 std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
200 std::vector<const grpc_channel_filter*> filters_;
201 grpc_error_handle filter_error_ = GRPC_ERROR_NONE;
202 };
203
204 void OnListenerUpdate(XdsApi::LdsUpdate listener);
205 void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update);
206 void OnError(grpc_error_handle error);
207 void OnResourceDoesNotExist();
208
209 grpc_error_handle CreateServiceConfig(
210 RefCountedPtr<ServiceConfig>* service_config);
211 void GenerateResult();
212 void MaybeRemoveUnusedClusters();
213
214 std::shared_ptr<WorkSerializer> work_serializer_;
215 std::unique_ptr<ResultHandler> result_handler_;
216 std::string server_name_;
217 const grpc_channel_args* args_;
218 grpc_pollset_set* interested_parties_;
219
220 RefCountedPtr<XdsClient> xds_client_;
221
222 XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr;
223 // This will not contain the RouteConfiguration, even if it comes with the
224 // LDS response; instead, the relevant VirtualHost from the
225 // RouteConfiguration will be saved in current_virtual_host_.
226 XdsApi::LdsUpdate current_listener_;
227
228 std::string route_config_name_;
229 XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr;
230 XdsApi::RdsUpdate::VirtualHost current_virtual_host_;
231
232 ClusterState::ClusterStateMap cluster_state_map_;
233 };
234
235 //
236 // XdsResolver::Notifier
237 //
238
Notifier(RefCountedPtr<XdsResolver> resolver,XdsApi::LdsUpdate update)239 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
240 XdsApi::LdsUpdate update)
241 : resolver_(std::move(resolver)),
242 update_(std::move(update)),
243 type_(kLdsUpdate) {
244 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
245 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
246 }
247
Notifier(RefCountedPtr<XdsResolver> resolver,XdsApi::RdsUpdate update)248 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
249 XdsApi::RdsUpdate update)
250 : resolver_(std::move(resolver)), type_(kRdsUpdate) {
251 update_.http_connection_manager.rds_update = std::move(update);
252 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
253 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
254 }
255
Notifier(RefCountedPtr<XdsResolver> resolver,grpc_error_handle error)256 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
257 grpc_error_handle error)
258 : resolver_(std::move(resolver)), type_(kError) {
259 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
260 ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
261 }
262
Notifier(RefCountedPtr<XdsResolver> resolver)263 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver)
264 : resolver_(std::move(resolver)), type_(kDoesNotExist) {
265 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
266 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
267 }
268
RunInExecCtx(void * arg,grpc_error_handle error)269 void XdsResolver::Notifier::RunInExecCtx(void* arg, grpc_error_handle error) {
270 Notifier* self = static_cast<Notifier*>(arg);
271 GRPC_ERROR_REF(error);
272 self->resolver_->work_serializer_->Run(
273 [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
274 }
275
RunInWorkSerializer(grpc_error_handle error)276 void XdsResolver::Notifier::RunInWorkSerializer(grpc_error_handle error) {
277 if (resolver_->xds_client_ == nullptr) {
278 GRPC_ERROR_UNREF(error);
279 delete this;
280 return;
281 }
282 switch (type_) {
283 case kLdsUpdate:
284 resolver_->OnListenerUpdate(std::move(update_));
285 break;
286 case kRdsUpdate:
287 resolver_->OnRouteConfigUpdate(
288 std::move(*update_.http_connection_manager.rds_update));
289 break;
290 case kError:
291 resolver_->OnError(error);
292 break;
293 case kDoesNotExist:
294 resolver_->OnResourceDoesNotExist();
295 break;
296 };
297 delete this;
298 }
299
300 //
301 // XdsResolver::XdsConfigSelector::Route
302 //
303
MethodConfigsEqual(const ServiceConfig * sc1,const ServiceConfig * sc2)304 bool MethodConfigsEqual(const ServiceConfig* sc1, const ServiceConfig* sc2) {
305 if (sc1 == nullptr) return sc2 == nullptr;
306 if (sc2 == nullptr) return false;
307 return sc1->json_string() == sc2->json_string();
308 }
309
operator ==(const ClusterWeightState & other) const310 bool XdsResolver::XdsConfigSelector::Route::ClusterWeightState::operator==(
311 const ClusterWeightState& other) const {
312 return range_end == other.range_end && cluster == other.cluster &&
313 MethodConfigsEqual(method_config.get(), other.method_config.get());
314 }
315
operator ==(const Route & other) const316 bool XdsResolver::XdsConfigSelector::Route::operator==(
317 const Route& other) const {
318 return route == other.route &&
319 weighted_cluster_state == other.weighted_cluster_state &&
320 MethodConfigsEqual(method_config.get(), other.method_config.get());
321 }
322
323 //
324 // XdsResolver::XdsConfigSelector
325 //
326
XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,grpc_error_handle * error)327 XdsResolver::XdsConfigSelector::XdsConfigSelector(
328 RefCountedPtr<XdsResolver> resolver, grpc_error_handle* error)
329 : resolver_(std::move(resolver)) {
330 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
331 gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
332 resolver_.get(), this);
333 }
334 // 1. Construct the route table
335 // 2 Update resolver's cluster state map
336 // 3. Construct cluster list to hold on to entries in the cluster state
337 // map.
338 // Reserve the necessary entries up-front to avoid reallocation as we add
339 // elements. This is necessary because the string_view in the entry's
340 // weighted_cluster_state field points to the memory in the route field, so
341 // moving the entry in a reallocation will cause the string_view to point to
342 // invalid data.
343 route_table_.reserve(resolver_->current_virtual_host_.routes.size());
344 for (auto& route : resolver_->current_virtual_host_.routes) {
345 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
346 gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
347 resolver_.get(), this, route.ToString().c_str());
348 }
349 route_table_.emplace_back();
350 auto& route_entry = route_table_.back();
351 route_entry.route = route;
352 // If the route doesn't specify a timeout, set its timeout to the global
353 // one.
354 if (!route.max_stream_duration.has_value()) {
355 route_entry.route.max_stream_duration =
356 resolver_->current_listener_.http_connection_manager
357 .http_max_stream_duration;
358 }
359 if (route.weighted_clusters.empty()) {
360 *error = CreateMethodConfig(route_entry.route, nullptr,
361 &route_entry.method_config);
362 MaybeAddCluster(route.cluster_name);
363 } else {
364 uint32_t end = 0;
365 for (const auto& weighted_cluster : route_entry.route.weighted_clusters) {
366 Route::ClusterWeightState cluster_weight_state;
367 *error = CreateMethodConfig(route_entry.route, &weighted_cluster,
368 &cluster_weight_state.method_config);
369 if (*error != GRPC_ERROR_NONE) return;
370 end += weighted_cluster.weight;
371 cluster_weight_state.range_end = end;
372 cluster_weight_state.cluster = weighted_cluster.name;
373 route_entry.weighted_cluster_state.push_back(
374 std::move(cluster_weight_state));
375 MaybeAddCluster(weighted_cluster.name);
376 }
377 }
378 }
379 // Populate filter list.
380 bool found_router = false;
381 for (const auto& http_filter :
382 resolver_->current_listener_.http_connection_manager.http_filters) {
383 // Stop at the router filter. It's a no-op for us, and we ignore
384 // anything that may come after it, for compatibility with Envoy.
385 if (http_filter.config.config_proto_type_name ==
386 kXdsHttpRouterFilterConfigName) {
387 found_router = true;
388 break;
389 }
390 // Find filter. This is guaranteed to succeed, because it's checked
391 // at config validation time in the XdsApi code.
392 const XdsHttpFilterImpl* filter_impl =
393 XdsHttpFilterRegistry::GetFilterForType(
394 http_filter.config.config_proto_type_name);
395 GPR_ASSERT(filter_impl != nullptr);
396 // Add C-core filter to list.
397 filters_.push_back(filter_impl->channel_filter());
398 }
399 // For compatibility with Envoy, if the router filter is not
400 // configured, we fail all RPCs.
401 if (!found_router) {
402 filter_error_ =
403 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
404 "no xDS HTTP router filter configured"),
405 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
406 filters_.push_back(&grpc_lame_filter);
407 }
408 }
409
~XdsConfigSelector()410 XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
411 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
412 gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
413 resolver_.get(), this);
414 }
415 clusters_.clear();
416 resolver_->MaybeRemoveUnusedClusters();
417 GRPC_ERROR_UNREF(filter_error_);
418 }
419
FindFilterConfigOverride(const std::string & instance_name,const XdsApi::RdsUpdate::VirtualHost & vhost,const XdsApi::Route & route,const XdsApi::Route::ClusterWeight * cluster_weight)420 const XdsHttpFilterImpl::FilterConfig* FindFilterConfigOverride(
421 const std::string& instance_name,
422 const XdsApi::RdsUpdate::VirtualHost& vhost, const XdsApi::Route& route,
423 const XdsApi::Route::ClusterWeight* cluster_weight) {
424 // Check ClusterWeight, if any.
425 if (cluster_weight != nullptr) {
426 auto it = cluster_weight->typed_per_filter_config.find(instance_name);
427 if (it != cluster_weight->typed_per_filter_config.end()) return &it->second;
428 }
429 // Check Route.
430 auto it = route.typed_per_filter_config.find(instance_name);
431 if (it != route.typed_per_filter_config.end()) return &it->second;
432 // Check VirtualHost.
433 it = vhost.typed_per_filter_config.find(instance_name);
434 if (it != vhost.typed_per_filter_config.end()) return &it->second;
435 // Not found.
436 return nullptr;
437 }
438
CreateMethodConfig(const XdsApi::Route & route,const XdsApi::Route::ClusterWeight * cluster_weight,RefCountedPtr<ServiceConfig> * method_config)439 grpc_error_handle XdsResolver::XdsConfigSelector::CreateMethodConfig(
440 const XdsApi::Route& route,
441 const XdsApi::Route::ClusterWeight* cluster_weight,
442 RefCountedPtr<ServiceConfig>* method_config) {
443 std::vector<std::string> fields;
444 // Set timeout.
445 if (route.max_stream_duration.has_value() &&
446 (route.max_stream_duration->seconds != 0 ||
447 route.max_stream_duration->nanos != 0)) {
448 fields.emplace_back(absl::StrFormat(" \"timeout\": \"%d.%09ds\"",
449 route.max_stream_duration->seconds,
450 route.max_stream_duration->nanos));
451 }
452 // Handle xDS HTTP filters.
453 std::map<std::string, std::vector<std::string>> per_filter_configs;
454 grpc_channel_args* args = grpc_channel_args_copy(resolver_->args_);
455 for (const auto& http_filter :
456 resolver_->current_listener_.http_connection_manager.http_filters) {
457 // Stop at the router filter. It's a no-op for us, and we ignore
458 // anything that may come after it, for compatibility with Envoy.
459 if (http_filter.config.config_proto_type_name ==
460 kXdsHttpRouterFilterConfigName) {
461 break;
462 }
463 // Find filter. This is guaranteed to succeed, because it's checked
464 // at config validation time in the XdsApi code.
465 const XdsHttpFilterImpl* filter_impl =
466 XdsHttpFilterRegistry::GetFilterForType(
467 http_filter.config.config_proto_type_name);
468 GPR_ASSERT(filter_impl != nullptr);
469 // Allow filter to add channel args that may affect service config
470 // parsing.
471 args = filter_impl->ModifyChannelArgs(args);
472 // Find config override, if any.
473 const XdsHttpFilterImpl::FilterConfig* config_override =
474 FindFilterConfigOverride(http_filter.name,
475 resolver_->current_virtual_host_, route,
476 cluster_weight);
477 // Generate service config for filter.
478 auto method_config_field =
479 filter_impl->GenerateServiceConfig(http_filter.config, config_override);
480 if (!method_config_field.ok()) {
481 return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
482 absl::StrCat("failed to generate method config for HTTP filter ",
483 http_filter.name, ": ",
484 method_config_field.status().ToString())
485 .c_str());
486 }
487 per_filter_configs[method_config_field->service_config_field_name]
488 .push_back(method_config_field->element);
489 }
490 for (const auto& p : per_filter_configs) {
491 fields.emplace_back(absl::StrCat(" \"", p.first, "\": [\n",
492 absl::StrJoin(p.second, ",\n"),
493 "\n ]"));
494 }
495 // Construct service config.
496 grpc_error_handle error = GRPC_ERROR_NONE;
497 if (!fields.empty()) {
498 std::string json = absl::StrCat(
499 "{\n"
500 " \"methodConfig\": [ {\n"
501 " \"name\": [\n"
502 " {}\n"
503 " ],\n"
504 " ",
505 absl::StrJoin(fields, ",\n"),
506 "\n } ]\n"
507 "}");
508 *method_config = ServiceConfig::Create(args, json.c_str(), &error);
509 }
510 grpc_channel_args_destroy(args);
511 return error;
512 }
513
ModifyChannelArgs(grpc_channel_args * args)514 grpc_channel_args* XdsResolver::XdsConfigSelector::ModifyChannelArgs(
515 grpc_channel_args* args) {
516 if (filter_error_ == GRPC_ERROR_NONE) return args;
517 grpc_arg error_arg = MakeLameClientErrorArg(filter_error_);
518 grpc_channel_args* new_args =
519 grpc_channel_args_copy_and_add(args, &error_arg, 1);
520 grpc_channel_args_destroy(args);
521 return new_args;
522 }
523
MaybeAddCluster(const std::string & name)524 void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {
525 if (clusters_.find(name) == clusters_.end()) {
526 auto it = resolver_->cluster_state_map_.find(name);
527 if (it == resolver_->cluster_state_map_.end()) {
528 auto new_cluster_state =
529 MakeRefCounted<ClusterState>(name, &resolver_->cluster_state_map_);
530 clusters_[new_cluster_state->cluster()] = std::move(new_cluster_state);
531 } else {
532 clusters_[it->second->cluster()] = it->second->Ref();
533 }
534 }
535 }
536
GetHeaderValue(grpc_metadata_batch * initial_metadata,absl::string_view header_name,std::string * concatenated_value)537 absl::optional<absl::string_view> GetHeaderValue(
538 grpc_metadata_batch* initial_metadata, absl::string_view header_name,
539 std::string* concatenated_value) {
540 // Note: If we ever allow binary headers here, we still need to
541 // special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since
542 // they are not visible to the LB policy in grpc-go.
543 if (absl::EndsWith(header_name, "-bin")) {
544 return absl::nullopt;
545 } else if (header_name == "content-type") {
546 return "application/grpc";
547 }
548 return grpc_metadata_batch_get_value(initial_metadata, header_name,
549 concatenated_value);
550 }
551
HeadersMatch(const std::vector<HeaderMatcher> & header_matchers,grpc_metadata_batch * initial_metadata)552 bool HeadersMatch(const std::vector<HeaderMatcher>& header_matchers,
553 grpc_metadata_batch* initial_metadata) {
554 for (const auto& header_matcher : header_matchers) {
555 std::string concatenated_value;
556 if (!header_matcher.Match(GetHeaderValue(
557 initial_metadata, header_matcher.name(), &concatenated_value))) {
558 return false;
559 }
560 }
561 return true;
562 }
563
HeaderHashHelper(const XdsApi::Route::HashPolicy & policy,grpc_metadata_batch * initial_metadata)564 absl::optional<uint64_t> HeaderHashHelper(
565 const XdsApi::Route::HashPolicy& policy,
566 grpc_metadata_batch* initial_metadata) {
567 GPR_ASSERT(policy.type == XdsApi::Route::HashPolicy::HEADER);
568 std::string value_buffer;
569 absl::optional<absl::string_view> header_value =
570 GetHeaderValue(initial_metadata, policy.header_name, &value_buffer);
571 if (policy.regex != nullptr) {
572 // If GetHeaderValue() did not already store the value in
573 // value_buffer, copy it there now, so we can modify it.
574 if (header_value->data() != value_buffer.data()) {
575 value_buffer = std::string(*header_value);
576 }
577 RE2::GlobalReplace(&value_buffer, *policy.regex, policy.regex_substitution);
578 header_value = value_buffer;
579 }
580 return XXH64(header_value->data(), header_value->size(), 0);
581 }
582
UnderFraction(const uint32_t fraction_per_million)583 bool UnderFraction(const uint32_t fraction_per_million) {
584 // Generate a random number in [0, 1000000).
585 const uint32_t random_number = rand() % 1000000;
586 return random_number < fraction_per_million;
587 }
588
GetCallConfig(GetCallConfigArgs args)589 ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
590 GetCallConfigArgs args) {
591 for (const auto& entry : route_table_) {
592 // Path matching.
593 if (!entry.route.matchers.path_matcher.Match(
594 StringViewFromSlice(*args.path))) {
595 continue;
596 }
597 // Header Matching.
598 if (!HeadersMatch(entry.route.matchers.header_matchers,
599 args.initial_metadata)) {
600 continue;
601 }
602 // Match fraction check
603 if (entry.route.matchers.fraction_per_million.has_value() &&
604 !UnderFraction(entry.route.matchers.fraction_per_million.value())) {
605 continue;
606 }
607 // Found a route match
608 absl::string_view cluster_name;
609 RefCountedPtr<ServiceConfig> method_config;
610 if (entry.route.weighted_clusters.empty()) {
611 cluster_name = entry.route.cluster_name;
612 method_config = entry.method_config;
613 } else {
614 const uint32_t key =
615 rand() %
616 entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
617 .range_end;
618 // Find the index in weighted clusters corresponding to key.
619 size_t mid = 0;
620 size_t start_index = 0;
621 size_t end_index = entry.weighted_cluster_state.size() - 1;
622 size_t index = 0;
623 while (end_index > start_index) {
624 mid = (start_index + end_index) / 2;
625 if (entry.weighted_cluster_state[mid].range_end > key) {
626 end_index = mid;
627 } else if (entry.weighted_cluster_state[mid].range_end < key) {
628 start_index = mid + 1;
629 } else {
630 index = mid + 1;
631 break;
632 }
633 }
634 if (index == 0) index = start_index;
635 GPR_ASSERT(entry.weighted_cluster_state[index].range_end > key);
636 cluster_name = entry.weighted_cluster_state[index].cluster;
637 method_config = entry.weighted_cluster_state[index].method_config;
638 }
639 auto it = clusters_.find(cluster_name);
640 GPR_ASSERT(it != clusters_.end());
641 XdsResolver* resolver =
642 static_cast<XdsResolver*>(resolver_->Ref().release());
643 ClusterState* cluster_state = it->second->Ref().release();
644 // Generate a hash
645 absl::optional<uint64_t> hash;
646 for (const auto& hash_policy : entry.route.hash_policies) {
647 absl::optional<uint64_t> new_hash;
648 switch (hash_policy.type) {
649 case XdsApi::Route::HashPolicy::HEADER:
650 new_hash = HeaderHashHelper(hash_policy, args.initial_metadata);
651 break;
652 case XdsApi::Route::HashPolicy::CHANNEL_ID:
653 new_hash =
654 static_cast<uint64_t>(reinterpret_cast<uintptr_t>(resolver));
655 break;
656 default:
657 GPR_ASSERT(0);
658 }
659 if (new_hash.has_value()) {
660 // Rotating the old value prevents duplicate hash rules from cancelling
661 // each other out and preserves all of the entropy
662 const uint64_t old_value =
663 hash.has_value() ? ((hash.value() << 1) | (hash.value() >> 63)) : 0;
664 hash = old_value ^ new_hash.value();
665 }
666 // If the policy is a terminal policy and a hash has been generated,
667 // ignore the rest of the hash policies.
668 if (hash_policy.terminal && hash.has_value()) {
669 break;
670 }
671 }
672 if (!hash.has_value()) {
673 // If there is no hash, we just choose a random value as a default.
674 hash = rand();
675 }
676 CallConfig call_config;
677 if (method_config != nullptr) {
678 call_config.method_configs =
679 method_config->GetMethodParsedConfigVector(grpc_empty_slice());
680 call_config.service_config = std::move(method_config);
681 }
682 call_config.call_attributes[kXdsClusterAttribute] = it->first;
683 call_config.call_attributes[kRequestRingHashAttribute] =
684 absl::StrFormat("%" PRIu64, hash.value());
685 call_config.on_call_committed = [resolver, cluster_state]() {
686 cluster_state->Unref();
687 ExecCtx::Run(
688 // TODO(roth): This hop into the ExecCtx is being done to avoid
689 // entering the WorkSerializer while holding the client channel data
690 // plane mutex, since that can lead to deadlocks. However, we should
691 // not have to solve this problem in each individual ConfigSelector
692 // implementation. When we have time, we should fix the client channel
693 // code to avoid this by not invoking the
694 // CallConfig::on_call_committed callback until after it has released
695 // the data plane mutex.
696 DEBUG_LOCATION,
697 GRPC_CLOSURE_CREATE(
698 [](void* arg, grpc_error_handle /*error*/) {
699 auto* resolver = static_cast<XdsResolver*>(arg);
700 resolver->work_serializer_->Run(
701 [resolver]() {
702 resolver->MaybeRemoveUnusedClusters();
703 resolver->Unref();
704 },
705 DEBUG_LOCATION);
706 },
707 resolver, nullptr),
708 GRPC_ERROR_NONE);
709 };
710 return call_config;
711 }
712 return CallConfig();
713 }
714
715 //
716 // XdsResolver
717 //
718
StartLocked()719 void XdsResolver::StartLocked() {
720 grpc_error_handle error = GRPC_ERROR_NONE;
721 xds_client_ = XdsClient::GetOrCreate(args_, &error);
722 if (error != GRPC_ERROR_NONE) {
723 gpr_log(GPR_ERROR,
724 "Failed to create xds client -- channel will remain in "
725 "TRANSIENT_FAILURE: %s",
726 grpc_error_std_string(error).c_str());
727 result_handler_->ReturnError(error);
728 return;
729 }
730 grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
731 interested_parties_);
732 channelz::ChannelNode* parent_channelz_node =
733 grpc_channel_args_find_pointer<channelz::ChannelNode>(
734 args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
735 if (parent_channelz_node != nullptr) {
736 xds_client_->AddChannelzLinkage(parent_channelz_node);
737 }
738 auto watcher = absl::make_unique<ListenerWatcher>(Ref());
739 listener_watcher_ = watcher.get();
740 xds_client_->WatchListenerData(server_name_, std::move(watcher));
741 }
742
ShutdownLocked()743 void XdsResolver::ShutdownLocked() {
744 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
745 gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
746 }
747 if (xds_client_ != nullptr) {
748 if (listener_watcher_ != nullptr) {
749 xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_,
750 /*delay_unsubscription=*/false);
751 }
752 if (route_config_watcher_ != nullptr) {
753 xds_client_->CancelRouteConfigDataWatch(
754 server_name_, route_config_watcher_, /*delay_unsubscription=*/false);
755 }
756 channelz::ChannelNode* parent_channelz_node =
757 grpc_channel_args_find_pointer<channelz::ChannelNode>(
758 args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
759 if (parent_channelz_node != nullptr) {
760 xds_client_->RemoveChannelzLinkage(parent_channelz_node);
761 }
762 grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
763 interested_parties_);
764 xds_client_.reset();
765 }
766 }
767
OnListenerUpdate(XdsApi::LdsUpdate listener)768 void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) {
769 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
770 gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this);
771 }
772 if (listener.http_connection_manager.route_config_name !=
773 route_config_name_) {
774 if (route_config_watcher_ != nullptr) {
775 xds_client_->CancelRouteConfigDataWatch(
776 route_config_name_, route_config_watcher_,
777 /*delay_unsubscription=*/
778 !listener.http_connection_manager.route_config_name.empty());
779 route_config_watcher_ = nullptr;
780 }
781 route_config_name_ =
782 std::move(listener.http_connection_manager.route_config_name);
783 if (!route_config_name_.empty()) {
784 current_virtual_host_.routes.clear();
785 auto watcher = absl::make_unique<RouteConfigWatcher>(Ref());
786 route_config_watcher_ = watcher.get();
787 xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher));
788 }
789 }
790 current_listener_ = std::move(listener);
791 if (route_config_name_.empty()) {
792 GPR_ASSERT(
793 current_listener_.http_connection_manager.rds_update.has_value());
794 OnRouteConfigUpdate(
795 std::move(*current_listener_.http_connection_manager.rds_update));
796 } else {
797 // HCM may contain newer filter config. We need to propagate the update as
798 // config selector to the channel
799 GenerateResult();
800 }
801 }
802
OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update)803 void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
804 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
805 gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
806 }
807 // Find the relevant VirtualHost from the RouteConfiguration.
808 XdsApi::RdsUpdate::VirtualHost* vhost =
809 rds_update.FindVirtualHostForDomain(server_name_);
810 if (vhost == nullptr) {
811 OnError(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
812 absl::StrCat("could not find VirtualHost for ", server_name_,
813 " in RouteConfiguration")
814 .c_str()));
815 return;
816 }
817 // Save the virtual host in the resolver.
818 current_virtual_host_ = std::move(*vhost);
819 // Send a new result to the channel.
820 GenerateResult();
821 }
822
OnError(grpc_error_handle error)823 void XdsResolver::OnError(grpc_error_handle error) {
824 gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s",
825 this, grpc_error_std_string(error).c_str());
826 Result result;
827 grpc_arg new_arg = xds_client_->MakeChannelArg();
828 result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
829 result.service_config_error = error;
830 result_handler_->ReturnResult(std::move(result));
831 }
832
OnResourceDoesNotExist()833 void XdsResolver::OnResourceDoesNotExist() {
834 gpr_log(GPR_ERROR,
835 "[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
836 "update and returning empty service config",
837 this);
838 current_virtual_host_.routes.clear();
839 Result result;
840 result.service_config =
841 ServiceConfig::Create(args_, "{}", &result.service_config_error);
842 GPR_ASSERT(result.service_config != nullptr);
843 result.args = grpc_channel_args_copy(args_);
844 result_handler_->ReturnResult(std::move(result));
845 }
846
CreateServiceConfig(RefCountedPtr<ServiceConfig> * service_config)847 grpc_error_handle XdsResolver::CreateServiceConfig(
848 RefCountedPtr<ServiceConfig>* service_config) {
849 std::vector<std::string> clusters;
850 for (const auto& cluster : cluster_state_map_) {
851 clusters.push_back(
852 absl::StrFormat(" \"%s\":{\n"
853 " \"childPolicy\":[ {\n"
854 " \"cds_experimental\":{\n"
855 " \"cluster\": \"%s\"\n"
856 " }\n"
857 " } ]\n"
858 " }",
859 cluster.first, cluster.first));
860 }
861 std::vector<std::string> config_parts;
862 config_parts.push_back(
863 "{\n"
864 " \"loadBalancingConfig\":[\n"
865 " { \"xds_cluster_manager_experimental\":{\n"
866 " \"children\":{\n");
867 config_parts.push_back(absl::StrJoin(clusters, ",\n"));
868 config_parts.push_back(
869 " }\n"
870 " } }\n"
871 " ]\n"
872 "}");
873 std::string json = absl::StrJoin(config_parts, "");
874 grpc_error_handle error = GRPC_ERROR_NONE;
875 *service_config = ServiceConfig::Create(args_, json.c_str(), &error);
876 return error;
877 }
878
GenerateResult()879 void XdsResolver::GenerateResult() {
880 if (current_virtual_host_.routes.empty()) return;
881 // First create XdsConfigSelector, which may add new entries to the cluster
882 // state map, and then CreateServiceConfig for LB policies.
883 grpc_error_handle error = GRPC_ERROR_NONE;
884 auto config_selector = MakeRefCounted<XdsConfigSelector>(Ref(), &error);
885 if (error != GRPC_ERROR_NONE) {
886 OnError(error);
887 return;
888 }
889 Result result;
890 error = CreateServiceConfig(&result.service_config);
891 if (error != GRPC_ERROR_NONE) {
892 OnError(error);
893 return;
894 }
895 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
896 gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
897 result.service_config->json_string().c_str());
898 }
899 grpc_arg new_args[] = {
900 xds_client_->MakeChannelArg(),
901 config_selector->MakeChannelArg(),
902 };
903 result.args =
904 grpc_channel_args_copy_and_add(args_, new_args, GPR_ARRAY_SIZE(new_args));
905 result_handler_->ReturnResult(std::move(result));
906 }
907
MaybeRemoveUnusedClusters()908 void XdsResolver::MaybeRemoveUnusedClusters() {
909 bool update_needed = false;
910 for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
911 RefCountedPtr<ClusterState> cluster_state = it->second->RefIfNonZero();
912 if (cluster_state != nullptr) {
913 ++it;
914 } else {
915 update_needed = true;
916 it = cluster_state_map_.erase(it);
917 }
918 }
919 if (update_needed && xds_client_ != nullptr) {
920 // Send a new result to the channel.
921 GenerateResult();
922 }
923 }
924
925 //
926 // Factory
927 //
928
929 class XdsResolverFactory : public ResolverFactory {
930 public:
IsValidUri(const URI & uri) const931 bool IsValidUri(const URI& uri) const override {
932 if (GPR_UNLIKELY(!uri.authority().empty())) {
933 gpr_log(GPR_ERROR, "URI authority not supported");
934 return false;
935 }
936 return true;
937 }
938
CreateResolver(ResolverArgs args) const939 OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
940 if (!IsValidUri(args.uri)) return nullptr;
941 return MakeOrphanable<XdsResolver>(std::move(args));
942 }
943
scheme() const944 const char* scheme() const override { return "xds"; }
945 };
946
947 } // namespace
948
949 } // namespace grpc_core
950
grpc_resolver_xds_init()951 void grpc_resolver_xds_init() {
952 grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
953 absl::make_unique<grpc_core::XdsResolverFactory>());
954 }
955
grpc_resolver_xds_shutdown()956 void grpc_resolver_xds_shutdown() {}
957