1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/xds/grpc/xds_endpoint_parser.h"
18
19 #include <grpc/support/port_platform.h>
20 #include <stdlib.h>
21 #include <string.h>
22
23 #include <algorithm>
24 #include <limits>
25 #include <memory>
26 #include <set>
27 #include <vector>
28
29 #include "absl/log/check.h"
30 #include "absl/log/log.h"
31 #include "absl/status/status.h"
32 #include "absl/status/statusor.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_join.h"
35 #include "absl/types/optional.h"
36 #include "envoy/config/core/v3/address.upb.h"
37 #include "envoy/config/core/v3/base.upb.h"
38 #include "envoy/config/endpoint/v3/endpoint.upb.h"
39 #include "envoy/config/endpoint/v3/endpoint.upbdefs.h"
40 #include "envoy/config/endpoint/v3/endpoint_components.upb.h"
41 #include "envoy/type/v3/percent.upb.h"
42 #include "google/protobuf/wrappers.upb.h"
43 #include "src/core/lib/address_utils/parse_address.h"
44 #include "src/core/lib/address_utils/sockaddr_utils.h"
45 #include "src/core/lib/channel/channel_args.h"
46 #include "src/core/lib/debug/trace.h"
47 #include "src/core/lib/iomgr/resolved_address.h"
48 #include "src/core/load_balancing/ring_hash/ring_hash.h"
49 #include "src/core/util/down_cast.h"
50 #include "src/core/util/env.h"
51 #include "src/core/util/json/json_args.h"
52 #include "src/core/util/json/json_object_loader.h"
53 #include "src/core/util/string.h"
54 #include "src/core/util/upb_utils.h"
55 #include "src/core/util/validation_errors.h"
56 #include "src/core/xds/grpc/xds_cluster_parser.h"
57 #include "src/core/xds/grpc/xds_common_types_parser.h"
58 #include "src/core/xds/grpc/xds_health_status.h"
59 #include "src/core/xds/grpc/xds_metadata_parser.h"
60 #include "src/core/xds/xds_client/xds_resource_type.h"
61 #include "upb/text/encode.h"
62
63 // IWYU pragma: no_include "absl/meta/type_traits.h"
64
65 namespace grpc_core {
66
67 namespace {
68
69 // TODO(roth): Remove this after 1.67 is released.
XdsDualstackEndpointsEnabled()70 bool XdsDualstackEndpointsEnabled() {
71 auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
72 if (!value.has_value()) return true;
73 bool parsed_value;
74 bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
75 return parse_succeeded && parsed_value;
76 }
77
78 // TODO(roth): Flip the default to false once this proves stable, then
79 // remove it entirely at some point in the future.
XdsEndpointHashKeyBackwardCompatEnabled()80 bool XdsEndpointHashKeyBackwardCompatEnabled() {
81 auto value = GetEnv("GRPC_XDS_ENDPOINT_HASH_KEY_BACKWARD_COMPAT");
82 if (!value.has_value()) return true;
83 bool parsed_value;
84 bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
85 return parse_succeeded && parsed_value;
86 }
87
MaybeLogClusterLoadAssignment(const XdsResourceType::DecodeContext & context,const envoy_config_endpoint_v3_ClusterLoadAssignment * cla)88 void MaybeLogClusterLoadAssignment(
89 const XdsResourceType::DecodeContext& context,
90 const envoy_config_endpoint_v3_ClusterLoadAssignment* cla) {
91 if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer) && ABSL_VLOG_IS_ON(2)) {
92 const upb_MessageDef* msg_type =
93 envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(
94 context.symtab);
95 char buf[10240];
96 upb_TextEncode(reinterpret_cast<const upb_Message*>(cla), msg_type, nullptr,
97 0, buf, sizeof(buf));
98 VLOG(2) << "[xds_client " << context.client
99 << "] ClusterLoadAssignment: " << buf;
100 }
101 }
102
GetProxyAddressFromMetadata(const XdsMetadataMap & metadata_map)103 std::string GetProxyAddressFromMetadata(const XdsMetadataMap& metadata_map) {
104 auto* proxy_address_entry = metadata_map.FindType<XdsAddressMetadataValue>(
105 "envoy.http11_proxy_transport_socket.proxy_address");
106 if (proxy_address_entry == nullptr) return "";
107 return proxy_address_entry->address();
108 }
109
GetHashKeyFromMetadata(const XdsMetadataMap & metadata_map)110 std::string GetHashKeyFromMetadata(const XdsMetadataMap& metadata_map) {
111 auto* hash_key_entry =
112 metadata_map.FindType<XdsStructMetadataValue>("envoy.lb");
113 if (hash_key_entry == nullptr) return "";
114 ValidationErrors unused_errors;
115 return LoadJsonObjectField<std::string>(hash_key_entry->json().object(),
116 JsonArgs(), "hash_key",
117 &unused_errors)
118 .value_or("");
119 }
120
EndpointAddressesParse(const XdsResourceType::DecodeContext & context,const envoy_config_endpoint_v3_LbEndpoint * lb_endpoint,absl::string_view locality_proxy_address,ValidationErrors * errors)121 absl::optional<EndpointAddresses> EndpointAddressesParse(
122 const XdsResourceType::DecodeContext& context,
123 const envoy_config_endpoint_v3_LbEndpoint* lb_endpoint,
124 absl::string_view locality_proxy_address, ValidationErrors* errors) {
125 // health_status
126 const int32_t health_status =
127 envoy_config_endpoint_v3_LbEndpoint_health_status(lb_endpoint);
128 auto status = XdsHealthStatus::FromUpb(health_status);
129 if (!status.has_value()) return absl::nullopt;
130 // load_balancing_weight
131 uint32_t weight;
132 {
133 ValidationErrors::ScopedField field(errors, ".load_balancing_weight");
134 weight = ParseUInt32Value(
135 envoy_config_endpoint_v3_LbEndpoint_load_balancing_weight(
136 lb_endpoint))
137 .value_or(1);
138 if (weight == 0) {
139 errors->AddError("must be greater than 0");
140 }
141 }
142 // metadata
143 std::string proxy_address;
144 std::string hash_key;
145 if (XdsHttpConnectEnabled() || !XdsEndpointHashKeyBackwardCompatEnabled()) {
146 XdsMetadataMap metadata_map = ParseXdsMetadataMap(
147 context, envoy_config_endpoint_v3_LbEndpoint_metadata(lb_endpoint),
148 errors);
149 if (XdsHttpConnectEnabled()) {
150 proxy_address = GetProxyAddressFromMetadata(metadata_map);
151 }
152 if (!XdsEndpointHashKeyBackwardCompatEnabled()) {
153 hash_key = GetHashKeyFromMetadata(metadata_map);
154 }
155 }
156 // endpoint
157 std::vector<grpc_resolved_address> addresses;
158 absl::string_view hostname;
159 {
160 ValidationErrors::ScopedField field(errors, ".endpoint");
161 const envoy_config_endpoint_v3_Endpoint* endpoint =
162 envoy_config_endpoint_v3_LbEndpoint_endpoint(lb_endpoint);
163 if (endpoint == nullptr) {
164 errors->AddError("field not present");
165 return absl::nullopt;
166 }
167 {
168 ValidationErrors::ScopedField field(errors, ".address");
169 auto address = ParseXdsAddress(
170 envoy_config_endpoint_v3_Endpoint_address(endpoint), errors);
171 if (address.has_value()) addresses.push_back(*address);
172 }
173 if (XdsDualstackEndpointsEnabled()) {
174 size_t size;
175 auto* additional_addresses =
176 envoy_config_endpoint_v3_Endpoint_additional_addresses(endpoint,
177 &size);
178 for (size_t i = 0; i < size; ++i) {
179 ValidationErrors::ScopedField field(
180 errors, absl::StrCat(".additional_addresses[", i, "].address"));
181 auto address = ParseXdsAddress(
182 envoy_config_endpoint_v3_Endpoint_AdditionalAddress_address(
183 additional_addresses[i]),
184 errors);
185 if (address.has_value()) addresses.push_back(*address);
186 }
187 }
188 hostname =
189 UpbStringToAbsl(envoy_config_endpoint_v3_Endpoint_hostname(endpoint));
190 }
191 if (addresses.empty()) return absl::nullopt;
192 // Convert to EndpointAddresses.
193 auto args = ChannelArgs()
194 .Set(GRPC_ARG_ADDRESS_WEIGHT, weight)
195 .Set(GRPC_ARG_XDS_HEALTH_STATUS, status->status());
196 if (!hostname.empty()) {
197 args = args.Set(GRPC_ARG_ADDRESS_NAME, hostname);
198 }
199 if (!proxy_address.empty()) {
200 args = args.Set(GRPC_ARG_XDS_HTTP_PROXY, proxy_address);
201 } else if (!locality_proxy_address.empty()) {
202 args = args.Set(GRPC_ARG_XDS_HTTP_PROXY, locality_proxy_address);
203 }
204 if (!hash_key.empty()) {
205 args = args.Set(GRPC_ARG_RING_HASH_ENDPOINT_HASH_KEY, hash_key);
206 }
207 return EndpointAddresses(addresses, args);
208 }
209
210 struct ParsedLocality {
211 size_t priority;
212 XdsEndpointResource::Priority::Locality locality;
213 };
214
215 struct ResolvedAddressLessThan {
operator ()grpc_core::__anon046957960111::ResolvedAddressLessThan216 bool operator()(const grpc_resolved_address& a1,
217 const grpc_resolved_address& a2) const {
218 if (a1.len != a2.len) return a1.len < a2.len;
219 return memcmp(a1.addr, a2.addr, a1.len) < 0;
220 }
221 };
222 using ResolvedAddressSet =
223 std::set<grpc_resolved_address, ResolvedAddressLessThan>;
224
LocalityParse(const XdsResourceType::DecodeContext & context,const envoy_config_endpoint_v3_LocalityLbEndpoints * locality_lb_endpoints,ResolvedAddressSet * address_set,ValidationErrors * errors)225 absl::optional<ParsedLocality> LocalityParse(
226 const XdsResourceType::DecodeContext& context,
227 const envoy_config_endpoint_v3_LocalityLbEndpoints* locality_lb_endpoints,
228 ResolvedAddressSet* address_set, ValidationErrors* errors) {
229 const size_t original_error_size = errors->size();
230 ParsedLocality parsed_locality;
231 // load_balancing_weight
232 // If LB weight is not specified or 0, it means this locality is assigned
233 // no load.
234 parsed_locality.locality.lb_weight =
235 ParseUInt32Value(
236 envoy_config_endpoint_v3_LocalityLbEndpoints_load_balancing_weight(
237 locality_lb_endpoints))
238 .value_or(0);
239 if (parsed_locality.locality.lb_weight == 0) return absl::nullopt;
240 // locality
241 const envoy_config_core_v3_Locality* locality =
242 envoy_config_endpoint_v3_LocalityLbEndpoints_locality(
243 locality_lb_endpoints);
244 if (locality == nullptr) {
245 ValidationErrors::ScopedField field(errors, ".locality");
246 errors->AddError("field not present");
247 return absl::nullopt;
248 }
249 // region
250 std::string region =
251 UpbStringToStdString(envoy_config_core_v3_Locality_region(locality));
252 // zone
253 std::string zone =
254 UpbStringToStdString(envoy_config_core_v3_Locality_zone(locality));
255 // sub_zone
256 std::string sub_zone =
257 UpbStringToStdString(envoy_config_core_v3_Locality_sub_zone(locality));
258 parsed_locality.locality.name = MakeRefCounted<XdsLocalityName>(
259 std::move(region), std::move(zone), std::move(sub_zone));
260 // metadata
261 std::string proxy_address;
262 if (XdsHttpConnectEnabled()) {
263 XdsMetadataMap metadata_map = ParseXdsMetadataMap(
264 context,
265 envoy_config_endpoint_v3_LocalityLbEndpoints_metadata(
266 locality_lb_endpoints),
267 errors);
268 proxy_address = GetProxyAddressFromMetadata(metadata_map);
269 }
270 // lb_endpoints
271 size_t size;
272 const envoy_config_endpoint_v3_LbEndpoint* const* lb_endpoints =
273 envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints(
274 locality_lb_endpoints, &size);
275 for (size_t i = 0; i < size; ++i) {
276 ValidationErrors::ScopedField field(errors,
277 absl::StrCat(".lb_endpoints[", i, "]"));
278 auto endpoint =
279 EndpointAddressesParse(context, lb_endpoints[i], proxy_address, errors);
280 if (endpoint.has_value()) {
281 for (const auto& address : endpoint->addresses()) {
282 bool inserted = address_set->insert(address).second;
283 if (!inserted) {
284 errors->AddError(absl::StrCat(
285 "duplicate endpoint address \"",
286 grpc_sockaddr_to_uri(&address).value_or("<unknown>"), "\""));
287 }
288 }
289 parsed_locality.locality.endpoints.push_back(std::move(*endpoint));
290 }
291 }
292 // priority
293 parsed_locality.priority =
294 envoy_config_endpoint_v3_LocalityLbEndpoints_priority(
295 locality_lb_endpoints);
296 // Return result.
297 if (original_error_size != errors->size()) return absl::nullopt;
298 return parsed_locality;
299 }
300
DropParseAndAppend(const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload * drop_overload,XdsEndpointResource::DropConfig * drop_config,ValidationErrors * errors)301 void DropParseAndAppend(
302 const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload*
303 drop_overload,
304 XdsEndpointResource::DropConfig* drop_config, ValidationErrors* errors) {
305 // category
306 std::string category = UpbStringToStdString(
307 envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_category(
308 drop_overload));
309 if (category.empty()) {
310 ValidationErrors::ScopedField field(errors, ".category");
311 errors->AddError("empty drop category name");
312 }
313 // drop_percentage
314 uint32_t numerator;
315 {
316 ValidationErrors::ScopedField field(errors, ".drop_percentage");
317 const envoy_type_v3_FractionalPercent* drop_percentage =
318 envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_drop_percentage(
319 drop_overload);
320 if (drop_percentage == nullptr) {
321 errors->AddError("field not present");
322 return;
323 }
324 numerator = envoy_type_v3_FractionalPercent_numerator(drop_percentage);
325 {
326 ValidationErrors::ScopedField field(errors, ".denominator");
327 const int denominator =
328 envoy_type_v3_FractionalPercent_denominator(drop_percentage);
329 // Normalize to million.
330 switch (denominator) {
331 case envoy_type_v3_FractionalPercent_HUNDRED:
332 numerator *= 10000;
333 break;
334 case envoy_type_v3_FractionalPercent_TEN_THOUSAND:
335 numerator *= 100;
336 break;
337 case envoy_type_v3_FractionalPercent_MILLION:
338 break;
339 default:
340 errors->AddError("unknown denominator type");
341 }
342 }
343 // Cap numerator to 1000000.
344 numerator = std::min(numerator, 1000000u);
345 }
346 // Add category.
347 drop_config->AddCategory(std::move(category), numerator);
348 }
349
EdsResourceParse(const XdsResourceType::DecodeContext & context,const envoy_config_endpoint_v3_ClusterLoadAssignment * cluster_load_assignment)350 absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> EdsResourceParse(
351 const XdsResourceType::DecodeContext& context,
352 const envoy_config_endpoint_v3_ClusterLoadAssignment*
353 cluster_load_assignment) {
354 ValidationErrors errors;
355 auto eds_resource = std::make_shared<XdsEndpointResource>();
356 // endpoints
357 {
358 ValidationErrors::ScopedField field(&errors, "endpoints");
359 ResolvedAddressSet address_set;
360 size_t locality_size;
361 const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints =
362 envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints(
363 cluster_load_assignment, &locality_size);
364 for (size_t i = 0; i < locality_size; ++i) {
365 ValidationErrors::ScopedField field(&errors, absl::StrCat("[", i, "]"));
366 auto parsed_locality =
367 LocalityParse(context, endpoints[i], &address_set, &errors);
368 if (parsed_locality.has_value()) {
369 CHECK_NE(parsed_locality->locality.lb_weight, 0u);
370 // Make sure prorities is big enough. Note that they might not
371 // arrive in priority order.
372 if (eds_resource->priorities.size() < parsed_locality->priority + 1) {
373 eds_resource->priorities.resize(parsed_locality->priority + 1);
374 }
375 auto& locality_map =
376 eds_resource->priorities[parsed_locality->priority].localities;
377 auto it = locality_map.find(parsed_locality->locality.name.get());
378 if (it != locality_map.end()) {
379 errors.AddError(absl::StrCat(
380 "duplicate locality ",
381 parsed_locality->locality.name->human_readable_string()
382 .as_string_view(),
383 " found in priority ", parsed_locality->priority));
384 } else {
385 locality_map.emplace(parsed_locality->locality.name.get(),
386 std::move(parsed_locality->locality));
387 }
388 }
389 }
390 for (size_t i = 0; i < eds_resource->priorities.size(); ++i) {
391 const auto& priority = eds_resource->priorities[i];
392 if (priority.localities.empty()) {
393 errors.AddError(absl::StrCat("priority ", i, " empty"));
394 } else {
395 // Check that the sum of the locality weights in this priority
396 // does not exceed the max value for a uint32.
397 uint64_t total_weight = 0;
398 for (const auto& p : priority.localities) {
399 total_weight += p.second.lb_weight;
400 if (total_weight > std::numeric_limits<uint32_t>::max()) {
401 errors.AddError(
402 absl::StrCat("sum of locality weights for priority ", i,
403 " exceeds uint32 max"));
404 break;
405 }
406 }
407 }
408 }
409 }
410 // policy
411 const auto* policy = envoy_config_endpoint_v3_ClusterLoadAssignment_policy(
412 cluster_load_assignment);
413 if (policy != nullptr) {
414 ValidationErrors::ScopedField field(&errors, "policy");
415 size_t drop_size;
416 const auto* const* drop_overload =
417 envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_drop_overloads(
418 policy, &drop_size);
419 if (drop_size > 0) {
420 eds_resource->drop_config =
421 MakeRefCounted<XdsEndpointResource::DropConfig>();
422 }
423 for (size_t i = 0; i < drop_size; ++i) {
424 ValidationErrors::ScopedField field(
425 &errors, absl::StrCat(".drop_overloads[", i, "]"));
426 DropParseAndAppend(drop_overload[i], eds_resource->drop_config.get(),
427 &errors);
428 }
429 }
430 // Return result.
431 if (!errors.ok()) {
432 return errors.status(absl::StatusCode::kInvalidArgument,
433 "errors parsing EDS resource");
434 }
435 return eds_resource;
436 }
437
438 } // namespace
439
Decode(const XdsResourceType::DecodeContext & context,absl::string_view serialized_resource) const440 XdsResourceType::DecodeResult XdsEndpointResourceType::Decode(
441 const XdsResourceType::DecodeContext& context,
442 absl::string_view serialized_resource) const {
443 DecodeResult result;
444 // Parse serialized proto.
445 auto* resource = envoy_config_endpoint_v3_ClusterLoadAssignment_parse(
446 serialized_resource.data(), serialized_resource.size(), context.arena);
447 if (resource == nullptr) {
448 result.resource = absl::InvalidArgumentError(
449 "Can't parse ClusterLoadAssignment resource.");
450 return result;
451 }
452 MaybeLogClusterLoadAssignment(context, resource);
453 // Validate resource.
454 result.name = UpbStringToStdString(
455 envoy_config_endpoint_v3_ClusterLoadAssignment_cluster_name(resource));
456 auto eds_resource = EdsResourceParse(context, resource);
457 if (!eds_resource.ok()) {
458 if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer)) {
459 LOG(ERROR) << "[xds_client " << context.client
460 << "] invalid ClusterLoadAssignment " << *result.name << ": "
461 << eds_resource.status();
462 }
463 result.resource = eds_resource.status();
464 } else {
465 if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer)) {
466 LOG(INFO) << "[xds_client " << context.client
467 << "] parsed ClusterLoadAssignment " << *result.name << ": "
468 << (*eds_resource)->ToString();
469 }
470 result.resource = std::move(*eds_resource);
471 }
472 return result;
473 }
474
475 } // namespace grpc_core
476