• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/xds/xds_endpoint.h"
20 
21 #include <stdlib.h>
22 #include <string.h>
23 
24 #include <algorithm>
25 #include <limits>
26 #include <memory>
27 #include <set>
28 #include <vector>
29 
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/str_join.h"
34 #include "absl/types/optional.h"
35 #include "envoy/config/core/v3/address.upb.h"
36 #include "envoy/config/core/v3/base.upb.h"
37 #include "envoy/config/endpoint/v3/endpoint.upb.h"
38 #include "envoy/config/endpoint/v3/endpoint.upbdefs.h"
39 #include "envoy/config/endpoint/v3/endpoint_components.upb.h"
40 #include "envoy/type/v3/percent.upb.h"
41 #include "google/protobuf/wrappers.upb.h"
42 #include "upb/text/encode.h"
43 
44 #include <grpc/support/log.h>
45 
46 #include "src/core/ext/xds/upb_utils.h"
47 #include "src/core/ext/xds/xds_health_status.h"
48 #include "src/core/ext/xds/xds_resource_type.h"
49 #include "src/core/lib/address_utils/parse_address.h"
50 #include "src/core/lib/address_utils/sockaddr_utils.h"
51 #include "src/core/lib/channel/channel_args.h"
52 #include "src/core/lib/debug/trace.h"
53 #include "src/core/lib/gpr/string.h"
54 #include "src/core/lib/gprpp/env.h"
55 #include "src/core/lib/gprpp/validation_errors.h"
56 #include "src/core/lib/iomgr/resolved_address.h"
57 
58 // IWYU pragma: no_include "absl/meta/type_traits.h"
59 
60 namespace grpc_core {
61 
62 namespace {
63 
64 // TODO(roth): Remove this once dualstack support is stable.
XdsDualstackEndpointsEnabled()65 bool XdsDualstackEndpointsEnabled() {
66   auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
67   if (!value.has_value()) return false;
68   bool parsed_value;
69   bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
70   return parse_succeeded && parsed_value;
71 }
72 
73 }  // namespace
74 
75 //
76 // XdsEndpointResource
77 //
78 
ToString() const79 std::string XdsEndpointResource::Priority::Locality::ToString() const {
80   std::vector<std::string> endpoint_strings;
81   for (const EndpointAddresses& endpoint : endpoints) {
82     endpoint_strings.emplace_back(endpoint.ToString());
83   }
84   return absl::StrCat("{name=", name->human_readable_string().as_string_view(),
85                       ", lb_weight=", lb_weight, ", endpoints=[",
86                       absl::StrJoin(endpoint_strings, ", "), "]}");
87 }
88 
operator ==(const Priority & other) const89 bool XdsEndpointResource::Priority::operator==(const Priority& other) const {
90   if (localities.size() != other.localities.size()) return false;
91   auto it1 = localities.begin();
92   auto it2 = other.localities.begin();
93   while (it1 != localities.end()) {
94     if (*it1->first != *it2->first) return false;
95     if (it1->second != it2->second) return false;
96     ++it1;
97     ++it2;
98   }
99   return true;
100 }
101 
ToString() const102 std::string XdsEndpointResource::Priority::ToString() const {
103   std::vector<std::string> locality_strings;
104   locality_strings.reserve(localities.size());
105   for (const auto& p : localities) {
106     locality_strings.emplace_back(p.second.ToString());
107   }
108   return absl::StrCat("[", absl::StrJoin(locality_strings, ", "), "]");
109 }
110 
ShouldDrop(const std::string ** category_name)111 bool XdsEndpointResource::DropConfig::ShouldDrop(
112     const std::string** category_name) {
113   for (size_t i = 0; i < drop_category_list_.size(); ++i) {
114     const auto& drop_category = drop_category_list_[i];
115     // Generate a random number in [0, 1000000).
116     const uint32_t random = [&]() {
117       MutexLock lock(&mu_);
118       return absl::Uniform<uint32_t>(bit_gen_, 0, 1000000);
119     }();
120     if (random < drop_category.parts_per_million) {
121       *category_name = &drop_category.name;
122       return true;
123     }
124   }
125   return false;
126 }
127 
ToString() const128 std::string XdsEndpointResource::DropConfig::ToString() const {
129   std::vector<std::string> category_strings;
130   for (const DropCategory& category : drop_category_list_) {
131     category_strings.emplace_back(
132         absl::StrCat(category.name, "=", category.parts_per_million));
133   }
134   return absl::StrCat("{[", absl::StrJoin(category_strings, ", "),
135                       "], drop_all=", drop_all_, "}");
136 }
137 
ToString() const138 std::string XdsEndpointResource::ToString() const {
139   std::vector<std::string> priority_strings;
140   for (size_t i = 0; i < priorities.size(); ++i) {
141     const Priority& priority = priorities[i];
142     priority_strings.emplace_back(
143         absl::StrCat("priority ", i, ": ", priority.ToString()));
144   }
145   return absl::StrCat(
146       "priorities=[", absl::StrJoin(priority_strings, ", "), "], drop_config=",
147       drop_config == nullptr ? "<null>" : drop_config->ToString());
148 }
149 
150 //
151 // XdsEndpointResourceType
152 //
153 
154 namespace {
155 
MaybeLogClusterLoadAssignment(const XdsResourceType::DecodeContext & context,const envoy_config_endpoint_v3_ClusterLoadAssignment * cla)156 void MaybeLogClusterLoadAssignment(
157     const XdsResourceType::DecodeContext& context,
158     const envoy_config_endpoint_v3_ClusterLoadAssignment* cla) {
159   if (GRPC_TRACE_FLAG_ENABLED(*context.tracer) &&
160       gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
161     const upb_MessageDef* msg_type =
162         envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(
163             context.symtab);
164     char buf[10240];
165     upb_TextEncode(reinterpret_cast<const upb_Message*>(cla), msg_type, nullptr,
166                    0, buf, sizeof(buf));
167     gpr_log(GPR_DEBUG, "[xds_client %p] ClusterLoadAssignment: %s",
168             context.client, buf);
169   }
170 }
171 
ParseCoreAddress(const envoy_config_core_v3_Address * address,ValidationErrors * errors)172 absl::optional<grpc_resolved_address> ParseCoreAddress(
173     const envoy_config_core_v3_Address* address, ValidationErrors* errors) {
174   if (address == nullptr) {
175     errors->AddError("field not present");
176     return absl::nullopt;
177   }
178   ValidationErrors::ScopedField field(errors, ".socket_address");
179   const envoy_config_core_v3_SocketAddress* socket_address =
180       envoy_config_core_v3_Address_socket_address(address);
181   if (socket_address == nullptr) {
182     errors->AddError("field not present");
183     return absl::nullopt;
184   }
185   std::string address_str = UpbStringToStdString(
186       envoy_config_core_v3_SocketAddress_address(socket_address));
187   uint32_t port;
188   {
189     ValidationErrors::ScopedField field(errors, ".port_value");
190     port = envoy_config_core_v3_SocketAddress_port_value(socket_address);
191     if (GPR_UNLIKELY(port >> 16) != 0) {
192       errors->AddError("invalid port");
193       return absl::nullopt;
194     }
195   }
196   auto addr = StringToSockaddr(address_str, port);
197   if (!addr.ok()) {
198     errors->AddError(addr.status().message());
199     return absl::nullopt;
200   }
201   return *addr;
202 }
203 
EndpointAddressesParse(const envoy_config_endpoint_v3_LbEndpoint * lb_endpoint,ValidationErrors * errors)204 absl::optional<EndpointAddresses> EndpointAddressesParse(
205     const envoy_config_endpoint_v3_LbEndpoint* lb_endpoint,
206     ValidationErrors* errors) {
207   // health_status
208   const int32_t health_status =
209       envoy_config_endpoint_v3_LbEndpoint_health_status(lb_endpoint);
210   auto status = XdsHealthStatus::FromUpb(health_status);
211   if (!status.has_value()) return absl::nullopt;
212   // load_balancing_weight
213   uint32_t weight = 1;
214   {
215     ValidationErrors::ScopedField field(errors, ".load_balancing_weight");
216     const google_protobuf_UInt32Value* load_balancing_weight =
217         envoy_config_endpoint_v3_LbEndpoint_load_balancing_weight(lb_endpoint);
218     if (load_balancing_weight != nullptr) {
219       weight = google_protobuf_UInt32Value_value(load_balancing_weight);
220       if (weight == 0) {
221         errors->AddError("must be greater than 0");
222       }
223     }
224   }
225   // endpoint
226   std::vector<grpc_resolved_address> addresses;
227   {
228     ValidationErrors::ScopedField field(errors, ".endpoint");
229     const envoy_config_endpoint_v3_Endpoint* endpoint =
230         envoy_config_endpoint_v3_LbEndpoint_endpoint(lb_endpoint);
231     if (endpoint == nullptr) {
232       errors->AddError("field not present");
233       return absl::nullopt;
234     }
235     {
236       ValidationErrors::ScopedField field(errors, ".address");
237       auto address = ParseCoreAddress(
238           envoy_config_endpoint_v3_Endpoint_address(endpoint), errors);
239       if (address.has_value()) addresses.push_back(*address);
240     }
241     if (XdsDualstackEndpointsEnabled()) {
242       size_t size;
243       auto* additional_addresses =
244           envoy_config_endpoint_v3_Endpoint_additional_addresses(endpoint,
245                                                                  &size);
246       for (size_t i = 0; i < size; ++i) {
247         ValidationErrors::ScopedField field(
248             errors, absl::StrCat(".additional_addresses[", i, "].address"));
249         auto address = ParseCoreAddress(
250             envoy_config_endpoint_v3_Endpoint_AdditionalAddress_address(
251                 additional_addresses[i]),
252             errors);
253         if (address.has_value()) addresses.push_back(*address);
254       }
255     }
256   }
257   if (addresses.empty()) return absl::nullopt;
258   // Convert to EndpointAddresses.
259   return EndpointAddresses(
260       addresses, ChannelArgs()
261                      .Set(GRPC_ARG_ADDRESS_WEIGHT, weight)
262                      .Set(GRPC_ARG_XDS_HEALTH_STATUS, status->status()));
263 }
264 
265 struct ParsedLocality {
266   size_t priority;
267   XdsEndpointResource::Priority::Locality locality;
268 };
269 
270 struct ResolvedAddressLessThan {
operator ()grpc_core::__anone781888b0311::ResolvedAddressLessThan271   bool operator()(const grpc_resolved_address& a1,
272                   const grpc_resolved_address& a2) const {
273     if (a1.len != a2.len) return a1.len < a2.len;
274     return memcmp(a1.addr, a2.addr, a1.len) < 0;
275   }
276 };
277 using ResolvedAddressSet =
278     std::set<grpc_resolved_address, ResolvedAddressLessThan>;
279 
LocalityParse(const envoy_config_endpoint_v3_LocalityLbEndpoints * locality_lb_endpoints,ResolvedAddressSet * address_set,ValidationErrors * errors)280 absl::optional<ParsedLocality> LocalityParse(
281     const envoy_config_endpoint_v3_LocalityLbEndpoints* locality_lb_endpoints,
282     ResolvedAddressSet* address_set, ValidationErrors* errors) {
283   const size_t original_error_size = errors->size();
284   ParsedLocality parsed_locality;
285   // load_balancing_weight
286   // If LB weight is not specified or 0, it means this locality is assigned
287   // no load.
288   const google_protobuf_UInt32Value* lb_weight =
289       envoy_config_endpoint_v3_LocalityLbEndpoints_load_balancing_weight(
290           locality_lb_endpoints);
291   parsed_locality.locality.lb_weight =
292       lb_weight != nullptr ? google_protobuf_UInt32Value_value(lb_weight) : 0;
293   if (parsed_locality.locality.lb_weight == 0) return absl::nullopt;
294   // locality
295   const envoy_config_core_v3_Locality* locality =
296       envoy_config_endpoint_v3_LocalityLbEndpoints_locality(
297           locality_lb_endpoints);
298   if (locality == nullptr) {
299     ValidationErrors::ScopedField field(errors, ".locality");
300     errors->AddError("field not present");
301     return absl::nullopt;
302   }
303   // region
304   std::string region =
305       UpbStringToStdString(envoy_config_core_v3_Locality_region(locality));
306   // zone
307   std::string zone =
308       UpbStringToStdString(envoy_config_core_v3_Locality_zone(locality));
309   // sub_zone
310   std::string sub_zone =
311       UpbStringToStdString(envoy_config_core_v3_Locality_sub_zone(locality));
312   parsed_locality.locality.name = MakeRefCounted<XdsLocalityName>(
313       std::move(region), std::move(zone), std::move(sub_zone));
314   // lb_endpoints
315   size_t size;
316   const envoy_config_endpoint_v3_LbEndpoint* const* lb_endpoints =
317       envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints(
318           locality_lb_endpoints, &size);
319   for (size_t i = 0; i < size; ++i) {
320     ValidationErrors::ScopedField field(errors,
321                                         absl::StrCat(".lb_endpoints[", i, "]"));
322     auto endpoint = EndpointAddressesParse(lb_endpoints[i], errors);
323     if (endpoint.has_value()) {
324       for (const auto& address : endpoint->addresses()) {
325         bool inserted = address_set->insert(address).second;
326         if (!inserted) {
327           errors->AddError(absl::StrCat(
328               "duplicate endpoint address \"",
329               grpc_sockaddr_to_uri(&address).value_or("<unknown>"), "\""));
330         }
331       }
332       parsed_locality.locality.endpoints.push_back(std::move(*endpoint));
333     }
334   }
335   // priority
336   parsed_locality.priority =
337       envoy_config_endpoint_v3_LocalityLbEndpoints_priority(
338           locality_lb_endpoints);
339   // Return result.
340   if (original_error_size != errors->size()) return absl::nullopt;
341   return parsed_locality;
342 }
343 
DropParseAndAppend(const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload * drop_overload,XdsEndpointResource::DropConfig * drop_config,ValidationErrors * errors)344 void DropParseAndAppend(
345     const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload*
346         drop_overload,
347     XdsEndpointResource::DropConfig* drop_config, ValidationErrors* errors) {
348   // category
349   std::string category = UpbStringToStdString(
350       envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_category(
351           drop_overload));
352   if (category.empty()) {
353     ValidationErrors::ScopedField field(errors, ".category");
354     errors->AddError("empty drop category name");
355   }
356   // drop_percentage
357   uint32_t numerator;
358   {
359     ValidationErrors::ScopedField field(errors, ".drop_percentage");
360     const envoy_type_v3_FractionalPercent* drop_percentage =
361         envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_drop_percentage(
362             drop_overload);
363     if (drop_percentage == nullptr) {
364       errors->AddError("field not present");
365       return;
366     }
367     numerator = envoy_type_v3_FractionalPercent_numerator(drop_percentage);
368     {
369       ValidationErrors::ScopedField field(errors, ".denominator");
370       const int denominator =
371           envoy_type_v3_FractionalPercent_denominator(drop_percentage);
372       // Normalize to million.
373       switch (denominator) {
374         case envoy_type_v3_FractionalPercent_HUNDRED:
375           numerator *= 10000;
376           break;
377         case envoy_type_v3_FractionalPercent_TEN_THOUSAND:
378           numerator *= 100;
379           break;
380         case envoy_type_v3_FractionalPercent_MILLION:
381           break;
382         default:
383           errors->AddError("unknown denominator type");
384       }
385     }
386     // Cap numerator to 1000000.
387     numerator = std::min(numerator, 1000000u);
388   }
389   // Add category.
390   drop_config->AddCategory(std::move(category), numerator);
391 }
392 
EdsResourceParse(const XdsResourceType::DecodeContext &,const envoy_config_endpoint_v3_ClusterLoadAssignment * cluster_load_assignment)393 absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> EdsResourceParse(
394     const XdsResourceType::DecodeContext& /*context*/,
395     const envoy_config_endpoint_v3_ClusterLoadAssignment*
396         cluster_load_assignment) {
397   ValidationErrors errors;
398   auto eds_resource = std::make_shared<XdsEndpointResource>();
399   // endpoints
400   {
401     ValidationErrors::ScopedField field(&errors, "endpoints");
402     ResolvedAddressSet address_set;
403     size_t locality_size;
404     const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints =
405         envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints(
406             cluster_load_assignment, &locality_size);
407     for (size_t i = 0; i < locality_size; ++i) {
408       ValidationErrors::ScopedField field(&errors, absl::StrCat("[", i, "]"));
409       auto parsed_locality = LocalityParse(endpoints[i], &address_set, &errors);
410       if (parsed_locality.has_value()) {
411         GPR_ASSERT(parsed_locality->locality.lb_weight != 0);
412         // Make sure prorities is big enough. Note that they might not
413         // arrive in priority order.
414         if (eds_resource->priorities.size() < parsed_locality->priority + 1) {
415           eds_resource->priorities.resize(parsed_locality->priority + 1);
416         }
417         auto& locality_map =
418             eds_resource->priorities[parsed_locality->priority].localities;
419         auto it = locality_map.find(parsed_locality->locality.name.get());
420         if (it != locality_map.end()) {
421           errors.AddError(absl::StrCat(
422               "duplicate locality ",
423               parsed_locality->locality.name->human_readable_string()
424                   .as_string_view(),
425               " found in priority ", parsed_locality->priority));
426         } else {
427           locality_map.emplace(parsed_locality->locality.name.get(),
428                                std::move(parsed_locality->locality));
429         }
430       }
431     }
432     for (size_t i = 0; i < eds_resource->priorities.size(); ++i) {
433       const auto& priority = eds_resource->priorities[i];
434       if (priority.localities.empty()) {
435         errors.AddError(absl::StrCat("priority ", i, " empty"));
436       } else {
437         // Check that the sum of the locality weights in this priority
438         // does not exceed the max value for a uint32.
439         uint64_t total_weight = 0;
440         for (const auto& p : priority.localities) {
441           total_weight += p.second.lb_weight;
442           if (total_weight > std::numeric_limits<uint32_t>::max()) {
443             errors.AddError(
444                 absl::StrCat("sum of locality weights for priority ", i,
445                              " exceeds uint32 max"));
446             break;
447           }
448         }
449       }
450     }
451   }
452   // policy
453   const auto* policy = envoy_config_endpoint_v3_ClusterLoadAssignment_policy(
454       cluster_load_assignment);
455   if (policy != nullptr) {
456     ValidationErrors::ScopedField field(&errors, "policy");
457     size_t drop_size;
458     const auto* const* drop_overload =
459         envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_drop_overloads(
460             policy, &drop_size);
461     if (drop_size > 0) {
462       eds_resource->drop_config =
463           MakeRefCounted<XdsEndpointResource::DropConfig>();
464     }
465     for (size_t i = 0; i < drop_size; ++i) {
466       ValidationErrors::ScopedField field(
467           &errors, absl::StrCat(".drop_overloads[", i, "]"));
468       DropParseAndAppend(drop_overload[i], eds_resource->drop_config.get(),
469                          &errors);
470     }
471   }
472   // Return result.
473   if (!errors.ok()) {
474     return errors.status(absl::StatusCode::kInvalidArgument,
475                          "errors parsing EDS resource");
476   }
477   return eds_resource;
478 }
479 
480 }  // namespace
481 
Decode(const XdsResourceType::DecodeContext & context,absl::string_view serialized_resource) const482 XdsResourceType::DecodeResult XdsEndpointResourceType::Decode(
483     const XdsResourceType::DecodeContext& context,
484     absl::string_view serialized_resource) const {
485   DecodeResult result;
486   // Parse serialized proto.
487   auto* resource = envoy_config_endpoint_v3_ClusterLoadAssignment_parse(
488       serialized_resource.data(), serialized_resource.size(), context.arena);
489   if (resource == nullptr) {
490     result.resource = absl::InvalidArgumentError(
491         "Can't parse ClusterLoadAssignment resource.");
492     return result;
493   }
494   MaybeLogClusterLoadAssignment(context, resource);
495   // Validate resource.
496   result.name = UpbStringToStdString(
497       envoy_config_endpoint_v3_ClusterLoadAssignment_cluster_name(resource));
498   auto eds_resource = EdsResourceParse(context, resource);
499   if (!eds_resource.ok()) {
500     if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
501       gpr_log(GPR_ERROR, "[xds_client %p] invalid ClusterLoadAssignment %s: %s",
502               context.client, result.name->c_str(),
503               eds_resource.status().ToString().c_str());
504     }
505     result.resource = eds_resource.status();
506   } else {
507     if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
508       gpr_log(GPR_INFO, "[xds_client %p] parsed ClusterLoadAssignment %s: %s",
509               context.client, result.name->c_str(),
510               (*eds_resource)->ToString().c_str());
511     }
512     result.resource = std::move(*eds_resource);
513   }
514   return result;
515 }
516 
517 }  // namespace grpc_core
518