• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/xds/grpc/xds_endpoint_parser.h"
18 
19 #include <grpc/support/port_platform.h>
20 #include <stdlib.h>
21 #include <string.h>
22 
23 #include <algorithm>
24 #include <limits>
25 #include <memory>
26 #include <set>
27 #include <vector>
28 
29 #include "absl/log/check.h"
30 #include "absl/log/log.h"
31 #include "absl/status/status.h"
32 #include "absl/status/statusor.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_join.h"
35 #include "absl/types/optional.h"
36 #include "envoy/config/core/v3/address.upb.h"
37 #include "envoy/config/core/v3/base.upb.h"
38 #include "envoy/config/endpoint/v3/endpoint.upb.h"
39 #include "envoy/config/endpoint/v3/endpoint.upbdefs.h"
40 #include "envoy/config/endpoint/v3/endpoint_components.upb.h"
41 #include "envoy/type/v3/percent.upb.h"
42 #include "google/protobuf/wrappers.upb.h"
43 #include "src/core/lib/address_utils/parse_address.h"
44 #include "src/core/lib/address_utils/sockaddr_utils.h"
45 #include "src/core/lib/channel/channel_args.h"
46 #include "src/core/lib/debug/trace.h"
47 #include "src/core/lib/iomgr/resolved_address.h"
48 #include "src/core/load_balancing/ring_hash/ring_hash.h"
49 #include "src/core/util/down_cast.h"
50 #include "src/core/util/env.h"
51 #include "src/core/util/json/json_args.h"
52 #include "src/core/util/json/json_object_loader.h"
53 #include "src/core/util/string.h"
54 #include "src/core/util/upb_utils.h"
55 #include "src/core/util/validation_errors.h"
56 #include "src/core/xds/grpc/xds_cluster_parser.h"
57 #include "src/core/xds/grpc/xds_common_types_parser.h"
58 #include "src/core/xds/grpc/xds_health_status.h"
59 #include "src/core/xds/grpc/xds_metadata_parser.h"
60 #include "src/core/xds/xds_client/xds_resource_type.h"
61 #include "upb/text/encode.h"
62 
63 // IWYU pragma: no_include "absl/meta/type_traits.h"
64 
65 namespace grpc_core {
66 
67 namespace {
68 
69 // TODO(roth): Remove this after 1.67 is released.
XdsDualstackEndpointsEnabled()70 bool XdsDualstackEndpointsEnabled() {
71   auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
72   if (!value.has_value()) return true;
73   bool parsed_value;
74   bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
75   return parse_succeeded && parsed_value;
76 }
77 
78 // TODO(roth): Flip the default to false once this proves stable, then
79 // remove it entirely at some point in the future.
XdsEndpointHashKeyBackwardCompatEnabled()80 bool XdsEndpointHashKeyBackwardCompatEnabled() {
81   auto value = GetEnv("GRPC_XDS_ENDPOINT_HASH_KEY_BACKWARD_COMPAT");
82   if (!value.has_value()) return true;
83   bool parsed_value;
84   bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
85   return parse_succeeded && parsed_value;
86 }
87 
MaybeLogClusterLoadAssignment(const XdsResourceType::DecodeContext & context,const envoy_config_endpoint_v3_ClusterLoadAssignment * cla)88 void MaybeLogClusterLoadAssignment(
89     const XdsResourceType::DecodeContext& context,
90     const envoy_config_endpoint_v3_ClusterLoadAssignment* cla) {
91   if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer) && ABSL_VLOG_IS_ON(2)) {
92     const upb_MessageDef* msg_type =
93         envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(
94             context.symtab);
95     char buf[10240];
96     upb_TextEncode(reinterpret_cast<const upb_Message*>(cla), msg_type, nullptr,
97                    0, buf, sizeof(buf));
98     VLOG(2) << "[xds_client " << context.client
99             << "] ClusterLoadAssignment: " << buf;
100   }
101 }
102 
GetProxyAddressFromMetadata(const XdsMetadataMap & metadata_map)103 std::string GetProxyAddressFromMetadata(const XdsMetadataMap& metadata_map) {
104   auto* proxy_address_entry = metadata_map.FindType<XdsAddressMetadataValue>(
105       "envoy.http11_proxy_transport_socket.proxy_address");
106   if (proxy_address_entry == nullptr) return "";
107   return proxy_address_entry->address();
108 }
109 
GetHashKeyFromMetadata(const XdsMetadataMap & metadata_map)110 std::string GetHashKeyFromMetadata(const XdsMetadataMap& metadata_map) {
111   auto* hash_key_entry =
112       metadata_map.FindType<XdsStructMetadataValue>("envoy.lb");
113   if (hash_key_entry == nullptr) return "";
114   ValidationErrors unused_errors;
115   return LoadJsonObjectField<std::string>(hash_key_entry->json().object(),
116                                           JsonArgs(), "hash_key",
117                                           &unused_errors)
118       .value_or("");
119 }
120 
EndpointAddressesParse(const XdsResourceType::DecodeContext & context,const envoy_config_endpoint_v3_LbEndpoint * lb_endpoint,absl::string_view locality_proxy_address,ValidationErrors * errors)121 absl::optional<EndpointAddresses> EndpointAddressesParse(
122     const XdsResourceType::DecodeContext& context,
123     const envoy_config_endpoint_v3_LbEndpoint* lb_endpoint,
124     absl::string_view locality_proxy_address, ValidationErrors* errors) {
125   // health_status
126   const int32_t health_status =
127       envoy_config_endpoint_v3_LbEndpoint_health_status(lb_endpoint);
128   auto status = XdsHealthStatus::FromUpb(health_status);
129   if (!status.has_value()) return absl::nullopt;
130   // load_balancing_weight
131   uint32_t weight;
132   {
133     ValidationErrors::ScopedField field(errors, ".load_balancing_weight");
134     weight = ParseUInt32Value(
135                  envoy_config_endpoint_v3_LbEndpoint_load_balancing_weight(
136                      lb_endpoint))
137                  .value_or(1);
138     if (weight == 0) {
139       errors->AddError("must be greater than 0");
140     }
141   }
142   // metadata
143   std::string proxy_address;
144   std::string hash_key;
145   if (XdsHttpConnectEnabled() || !XdsEndpointHashKeyBackwardCompatEnabled()) {
146     XdsMetadataMap metadata_map = ParseXdsMetadataMap(
147         context, envoy_config_endpoint_v3_LbEndpoint_metadata(lb_endpoint),
148         errors);
149     if (XdsHttpConnectEnabled()) {
150       proxy_address = GetProxyAddressFromMetadata(metadata_map);
151     }
152     if (!XdsEndpointHashKeyBackwardCompatEnabled()) {
153       hash_key = GetHashKeyFromMetadata(metadata_map);
154     }
155   }
156   // endpoint
157   std::vector<grpc_resolved_address> addresses;
158   absl::string_view hostname;
159   {
160     ValidationErrors::ScopedField field(errors, ".endpoint");
161     const envoy_config_endpoint_v3_Endpoint* endpoint =
162         envoy_config_endpoint_v3_LbEndpoint_endpoint(lb_endpoint);
163     if (endpoint == nullptr) {
164       errors->AddError("field not present");
165       return absl::nullopt;
166     }
167     {
168       ValidationErrors::ScopedField field(errors, ".address");
169       auto address = ParseXdsAddress(
170           envoy_config_endpoint_v3_Endpoint_address(endpoint), errors);
171       if (address.has_value()) addresses.push_back(*address);
172     }
173     if (XdsDualstackEndpointsEnabled()) {
174       size_t size;
175       auto* additional_addresses =
176           envoy_config_endpoint_v3_Endpoint_additional_addresses(endpoint,
177                                                                  &size);
178       for (size_t i = 0; i < size; ++i) {
179         ValidationErrors::ScopedField field(
180             errors, absl::StrCat(".additional_addresses[", i, "].address"));
181         auto address = ParseXdsAddress(
182             envoy_config_endpoint_v3_Endpoint_AdditionalAddress_address(
183                 additional_addresses[i]),
184             errors);
185         if (address.has_value()) addresses.push_back(*address);
186       }
187     }
188     hostname =
189         UpbStringToAbsl(envoy_config_endpoint_v3_Endpoint_hostname(endpoint));
190   }
191   if (addresses.empty()) return absl::nullopt;
192   // Convert to EndpointAddresses.
193   auto args = ChannelArgs()
194                   .Set(GRPC_ARG_ADDRESS_WEIGHT, weight)
195                   .Set(GRPC_ARG_XDS_HEALTH_STATUS, status->status());
196   if (!hostname.empty()) {
197     args = args.Set(GRPC_ARG_ADDRESS_NAME, hostname);
198   }
199   if (!proxy_address.empty()) {
200     args = args.Set(GRPC_ARG_XDS_HTTP_PROXY, proxy_address);
201   } else if (!locality_proxy_address.empty()) {
202     args = args.Set(GRPC_ARG_XDS_HTTP_PROXY, locality_proxy_address);
203   }
204   if (!hash_key.empty()) {
205     args = args.Set(GRPC_ARG_RING_HASH_ENDPOINT_HASH_KEY, hash_key);
206   }
207   return EndpointAddresses(addresses, args);
208 }
209 
210 struct ParsedLocality {
211   size_t priority;
212   XdsEndpointResource::Priority::Locality locality;
213 };
214 
215 struct ResolvedAddressLessThan {
operator ()grpc_core::__anon046957960111::ResolvedAddressLessThan216   bool operator()(const grpc_resolved_address& a1,
217                   const grpc_resolved_address& a2) const {
218     if (a1.len != a2.len) return a1.len < a2.len;
219     return memcmp(a1.addr, a2.addr, a1.len) < 0;
220   }
221 };
222 using ResolvedAddressSet =
223     std::set<grpc_resolved_address, ResolvedAddressLessThan>;
224 
LocalityParse(const XdsResourceType::DecodeContext & context,const envoy_config_endpoint_v3_LocalityLbEndpoints * locality_lb_endpoints,ResolvedAddressSet * address_set,ValidationErrors * errors)225 absl::optional<ParsedLocality> LocalityParse(
226     const XdsResourceType::DecodeContext& context,
227     const envoy_config_endpoint_v3_LocalityLbEndpoints* locality_lb_endpoints,
228     ResolvedAddressSet* address_set, ValidationErrors* errors) {
229   const size_t original_error_size = errors->size();
230   ParsedLocality parsed_locality;
231   // load_balancing_weight
232   // If LB weight is not specified or 0, it means this locality is assigned
233   // no load.
234   parsed_locality.locality.lb_weight =
235       ParseUInt32Value(
236           envoy_config_endpoint_v3_LocalityLbEndpoints_load_balancing_weight(
237               locality_lb_endpoints))
238           .value_or(0);
239   if (parsed_locality.locality.lb_weight == 0) return absl::nullopt;
240   // locality
241   const envoy_config_core_v3_Locality* locality =
242       envoy_config_endpoint_v3_LocalityLbEndpoints_locality(
243           locality_lb_endpoints);
244   if (locality == nullptr) {
245     ValidationErrors::ScopedField field(errors, ".locality");
246     errors->AddError("field not present");
247     return absl::nullopt;
248   }
249   // region
250   std::string region =
251       UpbStringToStdString(envoy_config_core_v3_Locality_region(locality));
252   // zone
253   std::string zone =
254       UpbStringToStdString(envoy_config_core_v3_Locality_zone(locality));
255   // sub_zone
256   std::string sub_zone =
257       UpbStringToStdString(envoy_config_core_v3_Locality_sub_zone(locality));
258   parsed_locality.locality.name = MakeRefCounted<XdsLocalityName>(
259       std::move(region), std::move(zone), std::move(sub_zone));
260   // metadata
261   std::string proxy_address;
262   if (XdsHttpConnectEnabled()) {
263     XdsMetadataMap metadata_map = ParseXdsMetadataMap(
264         context,
265         envoy_config_endpoint_v3_LocalityLbEndpoints_metadata(
266             locality_lb_endpoints),
267         errors);
268     proxy_address = GetProxyAddressFromMetadata(metadata_map);
269   }
270   // lb_endpoints
271   size_t size;
272   const envoy_config_endpoint_v3_LbEndpoint* const* lb_endpoints =
273       envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints(
274           locality_lb_endpoints, &size);
275   for (size_t i = 0; i < size; ++i) {
276     ValidationErrors::ScopedField field(errors,
277                                         absl::StrCat(".lb_endpoints[", i, "]"));
278     auto endpoint =
279         EndpointAddressesParse(context, lb_endpoints[i], proxy_address, errors);
280     if (endpoint.has_value()) {
281       for (const auto& address : endpoint->addresses()) {
282         bool inserted = address_set->insert(address).second;
283         if (!inserted) {
284           errors->AddError(absl::StrCat(
285               "duplicate endpoint address \"",
286               grpc_sockaddr_to_uri(&address).value_or("<unknown>"), "\""));
287         }
288       }
289       parsed_locality.locality.endpoints.push_back(std::move(*endpoint));
290     }
291   }
292   // priority
293   parsed_locality.priority =
294       envoy_config_endpoint_v3_LocalityLbEndpoints_priority(
295           locality_lb_endpoints);
296   // Return result.
297   if (original_error_size != errors->size()) return absl::nullopt;
298   return parsed_locality;
299 }
300 
DropParseAndAppend(const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload * drop_overload,XdsEndpointResource::DropConfig * drop_config,ValidationErrors * errors)301 void DropParseAndAppend(
302     const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload*
303         drop_overload,
304     XdsEndpointResource::DropConfig* drop_config, ValidationErrors* errors) {
305   // category
306   std::string category = UpbStringToStdString(
307       envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_category(
308           drop_overload));
309   if (category.empty()) {
310     ValidationErrors::ScopedField field(errors, ".category");
311     errors->AddError("empty drop category name");
312   }
313   // drop_percentage
314   uint32_t numerator;
315   {
316     ValidationErrors::ScopedField field(errors, ".drop_percentage");
317     const envoy_type_v3_FractionalPercent* drop_percentage =
318         envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_drop_percentage(
319             drop_overload);
320     if (drop_percentage == nullptr) {
321       errors->AddError("field not present");
322       return;
323     }
324     numerator = envoy_type_v3_FractionalPercent_numerator(drop_percentage);
325     {
326       ValidationErrors::ScopedField field(errors, ".denominator");
327       const int denominator =
328           envoy_type_v3_FractionalPercent_denominator(drop_percentage);
329       // Normalize to million.
330       switch (denominator) {
331         case envoy_type_v3_FractionalPercent_HUNDRED:
332           numerator *= 10000;
333           break;
334         case envoy_type_v3_FractionalPercent_TEN_THOUSAND:
335           numerator *= 100;
336           break;
337         case envoy_type_v3_FractionalPercent_MILLION:
338           break;
339         default:
340           errors->AddError("unknown denominator type");
341       }
342     }
343     // Cap numerator to 1000000.
344     numerator = std::min(numerator, 1000000u);
345   }
346   // Add category.
347   drop_config->AddCategory(std::move(category), numerator);
348 }
349 
EdsResourceParse(const XdsResourceType::DecodeContext & context,const envoy_config_endpoint_v3_ClusterLoadAssignment * cluster_load_assignment)350 absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> EdsResourceParse(
351     const XdsResourceType::DecodeContext& context,
352     const envoy_config_endpoint_v3_ClusterLoadAssignment*
353         cluster_load_assignment) {
354   ValidationErrors errors;
355   auto eds_resource = std::make_shared<XdsEndpointResource>();
356   // endpoints
357   {
358     ValidationErrors::ScopedField field(&errors, "endpoints");
359     ResolvedAddressSet address_set;
360     size_t locality_size;
361     const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints =
362         envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints(
363             cluster_load_assignment, &locality_size);
364     for (size_t i = 0; i < locality_size; ++i) {
365       ValidationErrors::ScopedField field(&errors, absl::StrCat("[", i, "]"));
366       auto parsed_locality =
367           LocalityParse(context, endpoints[i], &address_set, &errors);
368       if (parsed_locality.has_value()) {
369         CHECK_NE(parsed_locality->locality.lb_weight, 0u);
370         // Make sure prorities is big enough. Note that they might not
371         // arrive in priority order.
372         if (eds_resource->priorities.size() < parsed_locality->priority + 1) {
373           eds_resource->priorities.resize(parsed_locality->priority + 1);
374         }
375         auto& locality_map =
376             eds_resource->priorities[parsed_locality->priority].localities;
377         auto it = locality_map.find(parsed_locality->locality.name.get());
378         if (it != locality_map.end()) {
379           errors.AddError(absl::StrCat(
380               "duplicate locality ",
381               parsed_locality->locality.name->human_readable_string()
382                   .as_string_view(),
383               " found in priority ", parsed_locality->priority));
384         } else {
385           locality_map.emplace(parsed_locality->locality.name.get(),
386                                std::move(parsed_locality->locality));
387         }
388       }
389     }
390     for (size_t i = 0; i < eds_resource->priorities.size(); ++i) {
391       const auto& priority = eds_resource->priorities[i];
392       if (priority.localities.empty()) {
393         errors.AddError(absl::StrCat("priority ", i, " empty"));
394       } else {
395         // Check that the sum of the locality weights in this priority
396         // does not exceed the max value for a uint32.
397         uint64_t total_weight = 0;
398         for (const auto& p : priority.localities) {
399           total_weight += p.second.lb_weight;
400           if (total_weight > std::numeric_limits<uint32_t>::max()) {
401             errors.AddError(
402                 absl::StrCat("sum of locality weights for priority ", i,
403                              " exceeds uint32 max"));
404             break;
405           }
406         }
407       }
408     }
409   }
410   // policy
411   const auto* policy = envoy_config_endpoint_v3_ClusterLoadAssignment_policy(
412       cluster_load_assignment);
413   if (policy != nullptr) {
414     ValidationErrors::ScopedField field(&errors, "policy");
415     size_t drop_size;
416     const auto* const* drop_overload =
417         envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_drop_overloads(
418             policy, &drop_size);
419     if (drop_size > 0) {
420       eds_resource->drop_config =
421           MakeRefCounted<XdsEndpointResource::DropConfig>();
422     }
423     for (size_t i = 0; i < drop_size; ++i) {
424       ValidationErrors::ScopedField field(
425           &errors, absl::StrCat(".drop_overloads[", i, "]"));
426       DropParseAndAppend(drop_overload[i], eds_resource->drop_config.get(),
427                          &errors);
428     }
429   }
430   // Return result.
431   if (!errors.ok()) {
432     return errors.status(absl::StatusCode::kInvalidArgument,
433                          "errors parsing EDS resource");
434   }
435   return eds_resource;
436 }
437 
438 }  // namespace
439 
Decode(const XdsResourceType::DecodeContext & context,absl::string_view serialized_resource) const440 XdsResourceType::DecodeResult XdsEndpointResourceType::Decode(
441     const XdsResourceType::DecodeContext& context,
442     absl::string_view serialized_resource) const {
443   DecodeResult result;
444   // Parse serialized proto.
445   auto* resource = envoy_config_endpoint_v3_ClusterLoadAssignment_parse(
446       serialized_resource.data(), serialized_resource.size(), context.arena);
447   if (resource == nullptr) {
448     result.resource = absl::InvalidArgumentError(
449         "Can't parse ClusterLoadAssignment resource.");
450     return result;
451   }
452   MaybeLogClusterLoadAssignment(context, resource);
453   // Validate resource.
454   result.name = UpbStringToStdString(
455       envoy_config_endpoint_v3_ClusterLoadAssignment_cluster_name(resource));
456   auto eds_resource = EdsResourceParse(context, resource);
457   if (!eds_resource.ok()) {
458     if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer)) {
459       LOG(ERROR) << "[xds_client " << context.client
460                  << "] invalid ClusterLoadAssignment " << *result.name << ": "
461                  << eds_resource.status();
462     }
463     result.resource = eds_resource.status();
464   } else {
465     if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer)) {
466       LOG(INFO) << "[xds_client " << context.client
467                 << "] parsed ClusterLoadAssignment " << *result.name << ": "
468                 << (*eds_resource)->ToString();
469     }
470     result.resource = std::move(*eds_resource);
471   }
472   return result;
473 }
474 
475 }  // namespace grpc_core
476