1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/xds/xds_endpoint.h"
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include <algorithm>
25 #include <limits>
26 #include <memory>
27 #include <set>
28 #include <vector>
29
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/str_cat.h"
33 #include "absl/strings/str_join.h"
34 #include "absl/types/optional.h"
35 #include "envoy/config/core/v3/address.upb.h"
36 #include "envoy/config/core/v3/base.upb.h"
37 #include "envoy/config/endpoint/v3/endpoint.upb.h"
38 #include "envoy/config/endpoint/v3/endpoint.upbdefs.h"
39 #include "envoy/config/endpoint/v3/endpoint_components.upb.h"
40 #include "envoy/type/v3/percent.upb.h"
41 #include "google/protobuf/wrappers.upb.h"
42 #include "upb/text/encode.h"
43
44 #include <grpc/support/log.h>
45
46 #include "src/core/ext/xds/upb_utils.h"
47 #include "src/core/ext/xds/xds_health_status.h"
48 #include "src/core/ext/xds/xds_resource_type.h"
49 #include "src/core/lib/address_utils/parse_address.h"
50 #include "src/core/lib/address_utils/sockaddr_utils.h"
51 #include "src/core/lib/channel/channel_args.h"
52 #include "src/core/lib/debug/trace.h"
53 #include "src/core/lib/gpr/string.h"
54 #include "src/core/lib/gprpp/env.h"
55 #include "src/core/lib/gprpp/validation_errors.h"
56 #include "src/core/lib/iomgr/resolved_address.h"
57
58 // IWYU pragma: no_include "absl/meta/type_traits.h"
59
60 namespace grpc_core {
61
62 namespace {
63
64 // TODO(roth): Remove this once dualstack support is stable.
XdsDualstackEndpointsEnabled()65 bool XdsDualstackEndpointsEnabled() {
66 auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
67 if (!value.has_value()) return false;
68 bool parsed_value;
69 bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
70 return parse_succeeded && parsed_value;
71 }
72
73 } // namespace
74
75 //
76 // XdsEndpointResource
77 //
78
ToString() const79 std::string XdsEndpointResource::Priority::Locality::ToString() const {
80 std::vector<std::string> endpoint_strings;
81 for (const EndpointAddresses& endpoint : endpoints) {
82 endpoint_strings.emplace_back(endpoint.ToString());
83 }
84 return absl::StrCat("{name=", name->human_readable_string().as_string_view(),
85 ", lb_weight=", lb_weight, ", endpoints=[",
86 absl::StrJoin(endpoint_strings, ", "), "]}");
87 }
88
operator ==(const Priority & other) const89 bool XdsEndpointResource::Priority::operator==(const Priority& other) const {
90 if (localities.size() != other.localities.size()) return false;
91 auto it1 = localities.begin();
92 auto it2 = other.localities.begin();
93 while (it1 != localities.end()) {
94 if (*it1->first != *it2->first) return false;
95 if (it1->second != it2->second) return false;
96 ++it1;
97 ++it2;
98 }
99 return true;
100 }
101
ToString() const102 std::string XdsEndpointResource::Priority::ToString() const {
103 std::vector<std::string> locality_strings;
104 locality_strings.reserve(localities.size());
105 for (const auto& p : localities) {
106 locality_strings.emplace_back(p.second.ToString());
107 }
108 return absl::StrCat("[", absl::StrJoin(locality_strings, ", "), "]");
109 }
110
ShouldDrop(const std::string ** category_name)111 bool XdsEndpointResource::DropConfig::ShouldDrop(
112 const std::string** category_name) {
113 for (size_t i = 0; i < drop_category_list_.size(); ++i) {
114 const auto& drop_category = drop_category_list_[i];
115 // Generate a random number in [0, 1000000).
116 const uint32_t random = [&]() {
117 MutexLock lock(&mu_);
118 return absl::Uniform<uint32_t>(bit_gen_, 0, 1000000);
119 }();
120 if (random < drop_category.parts_per_million) {
121 *category_name = &drop_category.name;
122 return true;
123 }
124 }
125 return false;
126 }
127
ToString() const128 std::string XdsEndpointResource::DropConfig::ToString() const {
129 std::vector<std::string> category_strings;
130 for (const DropCategory& category : drop_category_list_) {
131 category_strings.emplace_back(
132 absl::StrCat(category.name, "=", category.parts_per_million));
133 }
134 return absl::StrCat("{[", absl::StrJoin(category_strings, ", "),
135 "], drop_all=", drop_all_, "}");
136 }
137
ToString() const138 std::string XdsEndpointResource::ToString() const {
139 std::vector<std::string> priority_strings;
140 for (size_t i = 0; i < priorities.size(); ++i) {
141 const Priority& priority = priorities[i];
142 priority_strings.emplace_back(
143 absl::StrCat("priority ", i, ": ", priority.ToString()));
144 }
145 return absl::StrCat(
146 "priorities=[", absl::StrJoin(priority_strings, ", "), "], drop_config=",
147 drop_config == nullptr ? "<null>" : drop_config->ToString());
148 }
149
150 //
151 // XdsEndpointResourceType
152 //
153
154 namespace {
155
MaybeLogClusterLoadAssignment(const XdsResourceType::DecodeContext & context,const envoy_config_endpoint_v3_ClusterLoadAssignment * cla)156 void MaybeLogClusterLoadAssignment(
157 const XdsResourceType::DecodeContext& context,
158 const envoy_config_endpoint_v3_ClusterLoadAssignment* cla) {
159 if (GRPC_TRACE_FLAG_ENABLED(*context.tracer) &&
160 gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
161 const upb_MessageDef* msg_type =
162 envoy_config_endpoint_v3_ClusterLoadAssignment_getmsgdef(
163 context.symtab);
164 char buf[10240];
165 upb_TextEncode(reinterpret_cast<const upb_Message*>(cla), msg_type, nullptr,
166 0, buf, sizeof(buf));
167 gpr_log(GPR_DEBUG, "[xds_client %p] ClusterLoadAssignment: %s",
168 context.client, buf);
169 }
170 }
171
ParseCoreAddress(const envoy_config_core_v3_Address * address,ValidationErrors * errors)172 absl::optional<grpc_resolved_address> ParseCoreAddress(
173 const envoy_config_core_v3_Address* address, ValidationErrors* errors) {
174 if (address == nullptr) {
175 errors->AddError("field not present");
176 return absl::nullopt;
177 }
178 ValidationErrors::ScopedField field(errors, ".socket_address");
179 const envoy_config_core_v3_SocketAddress* socket_address =
180 envoy_config_core_v3_Address_socket_address(address);
181 if (socket_address == nullptr) {
182 errors->AddError("field not present");
183 return absl::nullopt;
184 }
185 std::string address_str = UpbStringToStdString(
186 envoy_config_core_v3_SocketAddress_address(socket_address));
187 uint32_t port;
188 {
189 ValidationErrors::ScopedField field(errors, ".port_value");
190 port = envoy_config_core_v3_SocketAddress_port_value(socket_address);
191 if (GPR_UNLIKELY(port >> 16) != 0) {
192 errors->AddError("invalid port");
193 return absl::nullopt;
194 }
195 }
196 auto addr = StringToSockaddr(address_str, port);
197 if (!addr.ok()) {
198 errors->AddError(addr.status().message());
199 return absl::nullopt;
200 }
201 return *addr;
202 }
203
EndpointAddressesParse(const envoy_config_endpoint_v3_LbEndpoint * lb_endpoint,ValidationErrors * errors)204 absl::optional<EndpointAddresses> EndpointAddressesParse(
205 const envoy_config_endpoint_v3_LbEndpoint* lb_endpoint,
206 ValidationErrors* errors) {
207 // health_status
208 const int32_t health_status =
209 envoy_config_endpoint_v3_LbEndpoint_health_status(lb_endpoint);
210 auto status = XdsHealthStatus::FromUpb(health_status);
211 if (!status.has_value()) return absl::nullopt;
212 // load_balancing_weight
213 uint32_t weight = 1;
214 {
215 ValidationErrors::ScopedField field(errors, ".load_balancing_weight");
216 const google_protobuf_UInt32Value* load_balancing_weight =
217 envoy_config_endpoint_v3_LbEndpoint_load_balancing_weight(lb_endpoint);
218 if (load_balancing_weight != nullptr) {
219 weight = google_protobuf_UInt32Value_value(load_balancing_weight);
220 if (weight == 0) {
221 errors->AddError("must be greater than 0");
222 }
223 }
224 }
225 // endpoint
226 std::vector<grpc_resolved_address> addresses;
227 {
228 ValidationErrors::ScopedField field(errors, ".endpoint");
229 const envoy_config_endpoint_v3_Endpoint* endpoint =
230 envoy_config_endpoint_v3_LbEndpoint_endpoint(lb_endpoint);
231 if (endpoint == nullptr) {
232 errors->AddError("field not present");
233 return absl::nullopt;
234 }
235 {
236 ValidationErrors::ScopedField field(errors, ".address");
237 auto address = ParseCoreAddress(
238 envoy_config_endpoint_v3_Endpoint_address(endpoint), errors);
239 if (address.has_value()) addresses.push_back(*address);
240 }
241 if (XdsDualstackEndpointsEnabled()) {
242 size_t size;
243 auto* additional_addresses =
244 envoy_config_endpoint_v3_Endpoint_additional_addresses(endpoint,
245 &size);
246 for (size_t i = 0; i < size; ++i) {
247 ValidationErrors::ScopedField field(
248 errors, absl::StrCat(".additional_addresses[", i, "].address"));
249 auto address = ParseCoreAddress(
250 envoy_config_endpoint_v3_Endpoint_AdditionalAddress_address(
251 additional_addresses[i]),
252 errors);
253 if (address.has_value()) addresses.push_back(*address);
254 }
255 }
256 }
257 if (addresses.empty()) return absl::nullopt;
258 // Convert to EndpointAddresses.
259 return EndpointAddresses(
260 addresses, ChannelArgs()
261 .Set(GRPC_ARG_ADDRESS_WEIGHT, weight)
262 .Set(GRPC_ARG_XDS_HEALTH_STATUS, status->status()));
263 }
264
265 struct ParsedLocality {
266 size_t priority;
267 XdsEndpointResource::Priority::Locality locality;
268 };
269
270 struct ResolvedAddressLessThan {
operator ()grpc_core::__anone781888b0311::ResolvedAddressLessThan271 bool operator()(const grpc_resolved_address& a1,
272 const grpc_resolved_address& a2) const {
273 if (a1.len != a2.len) return a1.len < a2.len;
274 return memcmp(a1.addr, a2.addr, a1.len) < 0;
275 }
276 };
277 using ResolvedAddressSet =
278 std::set<grpc_resolved_address, ResolvedAddressLessThan>;
279
LocalityParse(const envoy_config_endpoint_v3_LocalityLbEndpoints * locality_lb_endpoints,ResolvedAddressSet * address_set,ValidationErrors * errors)280 absl::optional<ParsedLocality> LocalityParse(
281 const envoy_config_endpoint_v3_LocalityLbEndpoints* locality_lb_endpoints,
282 ResolvedAddressSet* address_set, ValidationErrors* errors) {
283 const size_t original_error_size = errors->size();
284 ParsedLocality parsed_locality;
285 // load_balancing_weight
286 // If LB weight is not specified or 0, it means this locality is assigned
287 // no load.
288 const google_protobuf_UInt32Value* lb_weight =
289 envoy_config_endpoint_v3_LocalityLbEndpoints_load_balancing_weight(
290 locality_lb_endpoints);
291 parsed_locality.locality.lb_weight =
292 lb_weight != nullptr ? google_protobuf_UInt32Value_value(lb_weight) : 0;
293 if (parsed_locality.locality.lb_weight == 0) return absl::nullopt;
294 // locality
295 const envoy_config_core_v3_Locality* locality =
296 envoy_config_endpoint_v3_LocalityLbEndpoints_locality(
297 locality_lb_endpoints);
298 if (locality == nullptr) {
299 ValidationErrors::ScopedField field(errors, ".locality");
300 errors->AddError("field not present");
301 return absl::nullopt;
302 }
303 // region
304 std::string region =
305 UpbStringToStdString(envoy_config_core_v3_Locality_region(locality));
306 // zone
307 std::string zone =
308 UpbStringToStdString(envoy_config_core_v3_Locality_zone(locality));
309 // sub_zone
310 std::string sub_zone =
311 UpbStringToStdString(envoy_config_core_v3_Locality_sub_zone(locality));
312 parsed_locality.locality.name = MakeRefCounted<XdsLocalityName>(
313 std::move(region), std::move(zone), std::move(sub_zone));
314 // lb_endpoints
315 size_t size;
316 const envoy_config_endpoint_v3_LbEndpoint* const* lb_endpoints =
317 envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints(
318 locality_lb_endpoints, &size);
319 for (size_t i = 0; i < size; ++i) {
320 ValidationErrors::ScopedField field(errors,
321 absl::StrCat(".lb_endpoints[", i, "]"));
322 auto endpoint = EndpointAddressesParse(lb_endpoints[i], errors);
323 if (endpoint.has_value()) {
324 for (const auto& address : endpoint->addresses()) {
325 bool inserted = address_set->insert(address).second;
326 if (!inserted) {
327 errors->AddError(absl::StrCat(
328 "duplicate endpoint address \"",
329 grpc_sockaddr_to_uri(&address).value_or("<unknown>"), "\""));
330 }
331 }
332 parsed_locality.locality.endpoints.push_back(std::move(*endpoint));
333 }
334 }
335 // priority
336 parsed_locality.priority =
337 envoy_config_endpoint_v3_LocalityLbEndpoints_priority(
338 locality_lb_endpoints);
339 // Return result.
340 if (original_error_size != errors->size()) return absl::nullopt;
341 return parsed_locality;
342 }
343
DropParseAndAppend(const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload * drop_overload,XdsEndpointResource::DropConfig * drop_config,ValidationErrors * errors)344 void DropParseAndAppend(
345 const envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload*
346 drop_overload,
347 XdsEndpointResource::DropConfig* drop_config, ValidationErrors* errors) {
348 // category
349 std::string category = UpbStringToStdString(
350 envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_category(
351 drop_overload));
352 if (category.empty()) {
353 ValidationErrors::ScopedField field(errors, ".category");
354 errors->AddError("empty drop category name");
355 }
356 // drop_percentage
357 uint32_t numerator;
358 {
359 ValidationErrors::ScopedField field(errors, ".drop_percentage");
360 const envoy_type_v3_FractionalPercent* drop_percentage =
361 envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_DropOverload_drop_percentage(
362 drop_overload);
363 if (drop_percentage == nullptr) {
364 errors->AddError("field not present");
365 return;
366 }
367 numerator = envoy_type_v3_FractionalPercent_numerator(drop_percentage);
368 {
369 ValidationErrors::ScopedField field(errors, ".denominator");
370 const int denominator =
371 envoy_type_v3_FractionalPercent_denominator(drop_percentage);
372 // Normalize to million.
373 switch (denominator) {
374 case envoy_type_v3_FractionalPercent_HUNDRED:
375 numerator *= 10000;
376 break;
377 case envoy_type_v3_FractionalPercent_TEN_THOUSAND:
378 numerator *= 100;
379 break;
380 case envoy_type_v3_FractionalPercent_MILLION:
381 break;
382 default:
383 errors->AddError("unknown denominator type");
384 }
385 }
386 // Cap numerator to 1000000.
387 numerator = std::min(numerator, 1000000u);
388 }
389 // Add category.
390 drop_config->AddCategory(std::move(category), numerator);
391 }
392
EdsResourceParse(const XdsResourceType::DecodeContext &,const envoy_config_endpoint_v3_ClusterLoadAssignment * cluster_load_assignment)393 absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> EdsResourceParse(
394 const XdsResourceType::DecodeContext& /*context*/,
395 const envoy_config_endpoint_v3_ClusterLoadAssignment*
396 cluster_load_assignment) {
397 ValidationErrors errors;
398 auto eds_resource = std::make_shared<XdsEndpointResource>();
399 // endpoints
400 {
401 ValidationErrors::ScopedField field(&errors, "endpoints");
402 ResolvedAddressSet address_set;
403 size_t locality_size;
404 const envoy_config_endpoint_v3_LocalityLbEndpoints* const* endpoints =
405 envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints(
406 cluster_load_assignment, &locality_size);
407 for (size_t i = 0; i < locality_size; ++i) {
408 ValidationErrors::ScopedField field(&errors, absl::StrCat("[", i, "]"));
409 auto parsed_locality = LocalityParse(endpoints[i], &address_set, &errors);
410 if (parsed_locality.has_value()) {
411 GPR_ASSERT(parsed_locality->locality.lb_weight != 0);
412 // Make sure prorities is big enough. Note that they might not
413 // arrive in priority order.
414 if (eds_resource->priorities.size() < parsed_locality->priority + 1) {
415 eds_resource->priorities.resize(parsed_locality->priority + 1);
416 }
417 auto& locality_map =
418 eds_resource->priorities[parsed_locality->priority].localities;
419 auto it = locality_map.find(parsed_locality->locality.name.get());
420 if (it != locality_map.end()) {
421 errors.AddError(absl::StrCat(
422 "duplicate locality ",
423 parsed_locality->locality.name->human_readable_string()
424 .as_string_view(),
425 " found in priority ", parsed_locality->priority));
426 } else {
427 locality_map.emplace(parsed_locality->locality.name.get(),
428 std::move(parsed_locality->locality));
429 }
430 }
431 }
432 for (size_t i = 0; i < eds_resource->priorities.size(); ++i) {
433 const auto& priority = eds_resource->priorities[i];
434 if (priority.localities.empty()) {
435 errors.AddError(absl::StrCat("priority ", i, " empty"));
436 } else {
437 // Check that the sum of the locality weights in this priority
438 // does not exceed the max value for a uint32.
439 uint64_t total_weight = 0;
440 for (const auto& p : priority.localities) {
441 total_weight += p.second.lb_weight;
442 if (total_weight > std::numeric_limits<uint32_t>::max()) {
443 errors.AddError(
444 absl::StrCat("sum of locality weights for priority ", i,
445 " exceeds uint32 max"));
446 break;
447 }
448 }
449 }
450 }
451 }
452 // policy
453 const auto* policy = envoy_config_endpoint_v3_ClusterLoadAssignment_policy(
454 cluster_load_assignment);
455 if (policy != nullptr) {
456 ValidationErrors::ScopedField field(&errors, "policy");
457 size_t drop_size;
458 const auto* const* drop_overload =
459 envoy_config_endpoint_v3_ClusterLoadAssignment_Policy_drop_overloads(
460 policy, &drop_size);
461 if (drop_size > 0) {
462 eds_resource->drop_config =
463 MakeRefCounted<XdsEndpointResource::DropConfig>();
464 }
465 for (size_t i = 0; i < drop_size; ++i) {
466 ValidationErrors::ScopedField field(
467 &errors, absl::StrCat(".drop_overloads[", i, "]"));
468 DropParseAndAppend(drop_overload[i], eds_resource->drop_config.get(),
469 &errors);
470 }
471 }
472 // Return result.
473 if (!errors.ok()) {
474 return errors.status(absl::StatusCode::kInvalidArgument,
475 "errors parsing EDS resource");
476 }
477 return eds_resource;
478 }
479
480 } // namespace
481
Decode(const XdsResourceType::DecodeContext & context,absl::string_view serialized_resource) const482 XdsResourceType::DecodeResult XdsEndpointResourceType::Decode(
483 const XdsResourceType::DecodeContext& context,
484 absl::string_view serialized_resource) const {
485 DecodeResult result;
486 // Parse serialized proto.
487 auto* resource = envoy_config_endpoint_v3_ClusterLoadAssignment_parse(
488 serialized_resource.data(), serialized_resource.size(), context.arena);
489 if (resource == nullptr) {
490 result.resource = absl::InvalidArgumentError(
491 "Can't parse ClusterLoadAssignment resource.");
492 return result;
493 }
494 MaybeLogClusterLoadAssignment(context, resource);
495 // Validate resource.
496 result.name = UpbStringToStdString(
497 envoy_config_endpoint_v3_ClusterLoadAssignment_cluster_name(resource));
498 auto eds_resource = EdsResourceParse(context, resource);
499 if (!eds_resource.ok()) {
500 if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
501 gpr_log(GPR_ERROR, "[xds_client %p] invalid ClusterLoadAssignment %s: %s",
502 context.client, result.name->c_str(),
503 eds_resource.status().ToString().c_str());
504 }
505 result.resource = eds_resource.status();
506 } else {
507 if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
508 gpr_log(GPR_INFO, "[xds_client %p] parsed ClusterLoadAssignment %s: %s",
509 context.client, result.name->c_str(),
510 (*eds_resource)->ToString().c_str());
511 }
512 result.resource = std::move(*eds_resource);
513 }
514 return result;
515 }
516
517 } // namespace grpc_core
518