1 // Copyright 2018 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15
16 #include "src/core/xds/grpc/xds_cluster_parser.h"
17
18 #include <algorithm>
19 #include <memory>
20 #include <utility>
21
22 #include "absl/log/check.h"
23 #include "absl/log/log.h"
24 #include "absl/status/status.h"
25 #include "absl/status/statusor.h"
26 #include "absl/strings/str_cat.h"
27 #include "absl/strings/strip.h"
28 #include "envoy/config/cluster/v3/circuit_breaker.upb.h"
29 #include "envoy/config/cluster/v3/cluster.upb.h"
30 #include "envoy/config/cluster/v3/cluster.upbdefs.h"
31 #include "envoy/config/cluster/v3/outlier_detection.upb.h"
32 #include "envoy/config/core/v3/address.upb.h"
33 #include "envoy/config/core/v3/base.upb.h"
34 #include "envoy/config/core/v3/config_source.upb.h"
35 #include "envoy/config/core/v3/extension.upb.h"
36 #include "envoy/config/core/v3/health_check.upb.h"
37 #include "envoy/config/core/v3/protocol.upb.h"
38 #include "envoy/config/endpoint/v3/endpoint.upb.h"
39 #include "envoy/config/endpoint/v3/endpoint_components.upb.h"
40 #include "envoy/extensions/clusters/aggregate/v3/cluster.upb.h"
41 #include "envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.upb.h"
42 #include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h"
43 #include "envoy/extensions/upstreams/http/v3/http_protocol_options.upb.h"
44 #include "google/protobuf/any.upb.h"
45 #include "google/protobuf/duration.upb.h"
46 #include "google/protobuf/struct.upb.h"
47 #include "google/protobuf/wrappers.upb.h"
48 #include "src/core/config/core_configuration.h"
49 #include "src/core/lib/debug/trace.h"
50 #include "src/core/load_balancing/lb_policy_registry.h"
51 #include "src/core/util/env.h"
52 #include "src/core/util/host_port.h"
53 #include "src/core/util/time.h"
54 #include "src/core/util/upb_utils.h"
55 #include "src/core/util/validation_errors.h"
56 #include "src/core/xds/grpc/xds_bootstrap_grpc.h"
57 #include "src/core/xds/grpc/xds_common_types.h"
58 #include "src/core/xds/grpc/xds_common_types_parser.h"
59 #include "src/core/xds/grpc/xds_lb_policy_registry.h"
60 #include "src/core/xds/grpc/xds_metadata_parser.h"
61 #include "src/core/xds/xds_client/lrs_client.h"
62 #include "src/core/xds/xds_client/xds_backend_metric_propagation.h"
63 #include "upb/base/string_view.h"
64 #include "upb/text/encode.h"
65
66 namespace grpc_core {
67
68 // TODO(roth): Remove this once the feature passes interop tests.
XdsHttpConnectEnabled()69 bool XdsHttpConnectEnabled() {
70 auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_HTTP_CONNECT");
71 if (!value.has_value()) return false;
72 bool parsed_value;
73 bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
74 return parse_succeeded && parsed_value;
75 }
76
77 namespace {
78
79 constexpr absl::string_view kUpstreamTlsContextType =
80 "envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext";
81
82 constexpr absl::string_view kHttp11ProxyType =
83 "envoy.extensions.transport_sockets.http_11_proxy.v3"
84 ".Http11ProxyUpstreamTransport";
85
UpstreamTlsContextParse(const XdsResourceType::DecodeContext & context,const XdsExtension & extension,ValidationErrors * errors)86 CommonTlsContext UpstreamTlsContextParse(
87 const XdsResourceType::DecodeContext& context,
88 const XdsExtension& extension, ValidationErrors* errors) {
89 const absl::string_view* serialized_upstream_tls_context =
90 absl::get_if<absl::string_view>(&extension.value);
91 if (serialized_upstream_tls_context == nullptr) {
92 errors->AddError("can't decode UpstreamTlsContext");
93 return {};
94 }
95 const auto* upstream_tls_context =
96 envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_parse(
97 serialized_upstream_tls_context->data(),
98 serialized_upstream_tls_context->size(), context.arena);
99 if (upstream_tls_context == nullptr) {
100 errors->AddError("can't decode UpstreamTlsContext");
101 return {};
102 }
103 ValidationErrors::ScopedField field3(errors, ".common_tls_context");
104 const auto* common_tls_context_proto =
105 envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_common_tls_context(
106 upstream_tls_context);
107 CommonTlsContext common_tls_context;
108 if (common_tls_context_proto != nullptr) {
109 common_tls_context =
110 CommonTlsContextParse(context, common_tls_context_proto, errors);
111 }
112 if (absl::holds_alternative<absl::monostate>(
113 common_tls_context.certificate_validation_context.ca_certs)) {
114 errors->AddError("no CA certs configured");
115 }
116 return common_tls_context;
117 }
118
Http11ProxyUpstreamTransportParse(const XdsResourceType::DecodeContext & context,const XdsExtension & extension,ValidationErrors * errors)119 CommonTlsContext Http11ProxyUpstreamTransportParse(
120 const XdsResourceType::DecodeContext& context,
121 const XdsExtension& extension, ValidationErrors* errors) {
122 const absl::string_view* serialized =
123 absl::get_if<absl::string_view>(&extension.value);
124 if (serialized == nullptr) {
125 errors->AddError("can't decode Http11ProxyUpstreamTransport");
126 return {};
127 }
128 const auto* http_11_proxy =
129 envoy_extensions_transport_sockets_http_11_proxy_v3_Http11ProxyUpstreamTransport_parse(
130 serialized->data(), serialized->size(), context.arena);
131 if (http_11_proxy == nullptr) {
132 errors->AddError("can't decode Http11ProxyUpstreamTransport");
133 return {};
134 }
135 const auto* transport_socket =
136 envoy_extensions_transport_sockets_http_11_proxy_v3_Http11ProxyUpstreamTransport_transport_socket(
137 http_11_proxy);
138 if (transport_socket == nullptr) return {};
139 ValidationErrors::ScopedField field(errors, ".transport_socket.typed_config");
140 const auto* typed_config =
141 envoy_config_core_v3_TransportSocket_typed_config(transport_socket);
142 auto wrapped_extension = ExtractXdsExtension(context, typed_config, errors);
143 if (!wrapped_extension.has_value()) return {};
144 if (wrapped_extension->type != kUpstreamTlsContextType) {
145 ValidationErrors::ScopedField field(errors, ".type_url");
146 errors->AddError("unsupported transport socket type");
147 return {};
148 }
149 return UpstreamTlsContextParse(context, *wrapped_extension, errors);
150 }
151
EdsConfigParse(const envoy_config_cluster_v3_Cluster * cluster,ValidationErrors * errors)152 XdsClusterResource::Eds EdsConfigParse(
153 const envoy_config_cluster_v3_Cluster* cluster, ValidationErrors* errors) {
154 XdsClusterResource::Eds eds;
155 ValidationErrors::ScopedField field(errors, ".eds_cluster_config");
156 const envoy_config_cluster_v3_Cluster_EdsClusterConfig* eds_cluster_config =
157 envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster);
158 if (eds_cluster_config == nullptr) {
159 errors->AddError("field not present");
160 } else {
161 // Validate ConfigSource.
162 {
163 ValidationErrors::ScopedField field(errors, ".eds_config");
164 const envoy_config_core_v3_ConfigSource* eds_config =
165 envoy_config_cluster_v3_Cluster_EdsClusterConfig_eds_config(
166 eds_cluster_config);
167 if (eds_config == nullptr) {
168 errors->AddError("field not present");
169 } else {
170 if (!envoy_config_core_v3_ConfigSource_has_ads(eds_config) &&
171 !envoy_config_core_v3_ConfigSource_has_self(eds_config)) {
172 errors->AddError("ConfigSource is not ads or self");
173 }
174 }
175 }
176 // Record EDS service_name (if any).
177 // This field is required if the CDS resource has an xdstp name.
178 eds.eds_service_name = UpbStringToStdString(
179 envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name(
180 eds_cluster_config));
181 if (eds.eds_service_name.empty()) {
182 absl::string_view cluster_name =
183 UpbStringToAbsl(envoy_config_cluster_v3_Cluster_name(cluster));
184 if (absl::StartsWith(cluster_name, "xdstp:")) {
185 ValidationErrors::ScopedField field(errors, ".service_name");
186 errors->AddError("must be set if Cluster resource has an xdstp name");
187 }
188 }
189 }
190 return eds;
191 }
192
LogicalDnsParse(const envoy_config_cluster_v3_Cluster * cluster,ValidationErrors * errors)193 XdsClusterResource::LogicalDns LogicalDnsParse(
194 const envoy_config_cluster_v3_Cluster* cluster, ValidationErrors* errors) {
195 XdsClusterResource::LogicalDns logical_dns;
196 ValidationErrors::ScopedField field(errors, ".load_assignment");
197 const auto* load_assignment =
198 envoy_config_cluster_v3_Cluster_load_assignment(cluster);
199 if (load_assignment == nullptr) {
200 errors->AddError("field not present for LOGICAL_DNS cluster");
201 return logical_dns;
202 }
203 ValidationErrors::ScopedField field2(errors, ".endpoints");
204 size_t num_localities;
205 const auto* const* localities =
206 envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints(load_assignment,
207 &num_localities);
208 if (num_localities != 1) {
209 errors->AddError(absl::StrCat(
210 "must contain exactly one locality for LOGICAL_DNS cluster, found ",
211 num_localities));
212 return logical_dns;
213 }
214 ValidationErrors::ScopedField field3(errors, "[0].lb_endpoints");
215 size_t num_endpoints;
216 const auto* const* endpoints =
217 envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints(localities[0],
218 &num_endpoints);
219 if (num_endpoints != 1) {
220 errors->AddError(absl::StrCat(
221 "must contain exactly one endpoint for LOGICAL_DNS cluster, found ",
222 num_endpoints));
223 return logical_dns;
224 }
225 ValidationErrors::ScopedField field4(errors, "[0].endpoint");
226 const auto* endpoint =
227 envoy_config_endpoint_v3_LbEndpoint_endpoint(endpoints[0]);
228 if (endpoint == nullptr) {
229 errors->AddError("field not present");
230 return logical_dns;
231 }
232 ValidationErrors::ScopedField field5(errors, ".address");
233 const auto* address = envoy_config_endpoint_v3_Endpoint_address(endpoint);
234 if (address == nullptr) {
235 errors->AddError("field not present");
236 return logical_dns;
237 }
238 ValidationErrors::ScopedField field6(errors, ".socket_address");
239 const auto* socket_address =
240 envoy_config_core_v3_Address_socket_address(address);
241 if (socket_address == nullptr) {
242 errors->AddError("field not present");
243 return logical_dns;
244 }
245 if (envoy_config_core_v3_SocketAddress_resolver_name(socket_address).size !=
246 0) {
247 ValidationErrors::ScopedField field(errors, ".resolver_name");
248 errors->AddError(
249 "LOGICAL_DNS clusters must NOT have a custom resolver name set");
250 }
251 absl::string_view address_str = UpbStringToAbsl(
252 envoy_config_core_v3_SocketAddress_address(socket_address));
253 if (address_str.empty()) {
254 ValidationErrors::ScopedField field(errors, ".address");
255 errors->AddError("field not present");
256 }
257 if (!envoy_config_core_v3_SocketAddress_has_port_value(socket_address)) {
258 ValidationErrors::ScopedField field(errors, ".port_value");
259 errors->AddError("field not present");
260 }
261 logical_dns.hostname = JoinHostPort(
262 address_str,
263 envoy_config_core_v3_SocketAddress_port_value(socket_address));
264 return logical_dns;
265 }
266
AggregateClusterParse(const XdsResourceType::DecodeContext & context,absl::string_view serialized_config,ValidationErrors * errors)267 XdsClusterResource::Aggregate AggregateClusterParse(
268 const XdsResourceType::DecodeContext& context,
269 absl::string_view serialized_config, ValidationErrors* errors) {
270 XdsClusterResource::Aggregate aggregate;
271 const auto* aggregate_cluster_config =
272 envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse(
273 serialized_config.data(), serialized_config.size(), context.arena);
274 if (aggregate_cluster_config == nullptr) {
275 errors->AddError("can't parse aggregate cluster config");
276 return aggregate;
277 }
278 size_t size;
279 const upb_StringView* clusters =
280 envoy_extensions_clusters_aggregate_v3_ClusterConfig_clusters(
281 aggregate_cluster_config, &size);
282 if (size == 0) {
283 ValidationErrors::ScopedField field(errors, ".clusters");
284 errors->AddError("must be non-empty");
285 }
286 for (size_t i = 0; i < size; ++i) {
287 aggregate.prioritized_cluster_names.emplace_back(
288 UpbStringToStdString(clusters[i]));
289 }
290 return aggregate;
291 }
292
ParseLbPolicyConfig(const XdsResourceType::DecodeContext & context,const envoy_config_cluster_v3_Cluster * cluster,XdsClusterResource * cds_update,ValidationErrors * errors)293 void ParseLbPolicyConfig(const XdsResourceType::DecodeContext& context,
294 const envoy_config_cluster_v3_Cluster* cluster,
295 XdsClusterResource* cds_update,
296 ValidationErrors* errors) {
297 // First, check the new load_balancing_policy field.
298 const auto* load_balancing_policy =
299 envoy_config_cluster_v3_Cluster_load_balancing_policy(cluster);
300 if (load_balancing_policy != nullptr) {
301 const auto& registry =
302 static_cast<const GrpcXdsBootstrap&>(context.client->bootstrap())
303 .lb_policy_registry();
304 ValidationErrors::ScopedField field(errors, ".load_balancing_policy");
305 const size_t original_error_count = errors->size();
306 cds_update->lb_policy_config = registry.ConvertXdsLbPolicyConfig(
307 context, load_balancing_policy, errors);
308 // If there were no conversion errors, validate that the converted config
309 // parses with the gRPC LB policy registry.
310 if (original_error_count == errors->size()) {
311 auto config = CoreConfiguration::Get()
312 .lb_policy_registry()
313 .ParseLoadBalancingConfig(
314 Json::FromArray(cds_update->lb_policy_config));
315 if (!config.ok()) errors->AddError(config.status().message());
316 }
317 return;
318 }
319 // Didn't find load_balancing_policy field, so fall back to the old
320 // lb_policy enum field.
321 if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) ==
322 envoy_config_cluster_v3_Cluster_ROUND_ROBIN) {
323 cds_update->lb_policy_config = {
324 Json::FromObject({
325 {"xds_wrr_locality_experimental",
326 Json::FromObject({
327 {"childPolicy", Json::FromArray({
328 Json::FromObject({
329 {"round_robin", Json::FromObject({})},
330 }),
331 })},
332 })},
333 }),
334 };
335 } else if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) ==
336 envoy_config_cluster_v3_Cluster_RING_HASH) {
337 // Record ring hash lb config
338 auto* ring_hash_config =
339 envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster);
340 uint64_t min_ring_size = 1024;
341 uint64_t max_ring_size = 8388608;
342 if (ring_hash_config != nullptr) {
343 ValidationErrors::ScopedField field(errors, ".ring_hash_lb_config");
344 auto value = ParseUInt64Value(
345 envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size(
346 ring_hash_config));
347 if (value.has_value()) {
348 ValidationErrors::ScopedField field(errors, ".maximum_ring_size");
349 max_ring_size = *value;
350 if (max_ring_size > 8388608 || max_ring_size == 0) {
351 errors->AddError("must be in the range of 1 to 8388608");
352 }
353 }
354 value = ParseUInt64Value(
355 envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size(
356 ring_hash_config));
357 if (value.has_value()) {
358 ValidationErrors::ScopedField field(errors, ".minimum_ring_size");
359 min_ring_size = *value;
360 if (min_ring_size > 8388608 || min_ring_size == 0) {
361 errors->AddError("must be in the range of 1 to 8388608");
362 }
363 if (min_ring_size > max_ring_size) {
364 errors->AddError("cannot be greater than maximum_ring_size");
365 }
366 }
367 if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
368 ring_hash_config) !=
369 envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) {
370 ValidationErrors::ScopedField field(errors, ".hash_function");
371 errors->AddError("invalid hash function");
372 }
373 }
374 cds_update->lb_policy_config = {
375 Json::FromObject({
376 {"ring_hash_experimental",
377 Json::FromObject({
378 {"minRingSize", Json::FromNumber(min_ring_size)},
379 {"maxRingSize", Json::FromNumber(max_ring_size)},
380 })},
381 }),
382 };
383 } else {
384 ValidationErrors::ScopedField field(errors, ".lb_policy");
385 errors->AddError("LB policy is not supported");
386 }
387 }
388
ParseUpstreamConfig(const XdsResourceType::DecodeContext & context,const envoy_config_core_v3_TypedExtensionConfig * upstream_config,XdsClusterResource * cds_update,ValidationErrors * errors)389 void ParseUpstreamConfig(
390 const XdsResourceType::DecodeContext& context,
391 const envoy_config_core_v3_TypedExtensionConfig* upstream_config,
392 XdsClusterResource* cds_update, ValidationErrors* errors) {
393 ValidationErrors::ScopedField field(errors, ".typed_config");
394 const auto* typed_config =
395 envoy_config_core_v3_TypedExtensionConfig_typed_config(upstream_config);
396 auto extension = ExtractXdsExtension(context, typed_config, errors);
397 if (!extension.has_value()) return;
398 if (extension->type !=
399 "envoy.extensions.upstreams.http.v3.HttpProtocolOptions") {
400 ValidationErrors::ScopedField field(errors, ".type_url");
401 errors->AddError("unsupported upstream config type");
402 return;
403 }
404 absl::string_view* serialized_http_protocol_options =
405 absl::get_if<absl::string_view>(&extension->value);
406 if (serialized_http_protocol_options == nullptr) {
407 errors->AddError("can't decode HttpProtocolOptions");
408 return;
409 }
410 const auto* http_protocol_options =
411 envoy_extensions_upstreams_http_v3_HttpProtocolOptions_parse(
412 serialized_http_protocol_options->data(),
413 serialized_http_protocol_options->size(), context.arena);
414 if (http_protocol_options == nullptr) {
415 errors->AddError("can't decode HttpProtocolOptions");
416 return;
417 }
418 ValidationErrors::ScopedField field2(errors, ".common_http_protocol_options");
419 const auto* common_http_protocol_options =
420 envoy_extensions_upstreams_http_v3_HttpProtocolOptions_common_http_protocol_options(
421 http_protocol_options);
422 if (common_http_protocol_options != nullptr) {
423 const auto* idle_timeout =
424 envoy_config_core_v3_HttpProtocolOptions_idle_timeout(
425 common_http_protocol_options);
426 if (idle_timeout != nullptr) {
427 ValidationErrors::ScopedField field(errors, ".idle_timeout");
428 cds_update->connection_idle_timeout = ParseDuration(idle_timeout, errors);
429 }
430 }
431 }
432
CdsResourceParse(const XdsResourceType::DecodeContext & context,const envoy_config_cluster_v3_Cluster * cluster)433 absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
434 const XdsResourceType::DecodeContext& context,
435 const envoy_config_cluster_v3_Cluster* cluster) {
436 auto cds_update = std::make_shared<XdsClusterResource>();
437 ValidationErrors errors;
438 // Check the cluster discovery type.
439 if (envoy_config_cluster_v3_Cluster_type(cluster) ==
440 envoy_config_cluster_v3_Cluster_EDS) {
441 cds_update->type = EdsConfigParse(cluster, &errors);
442 } else if (envoy_config_cluster_v3_Cluster_type(cluster) ==
443 envoy_config_cluster_v3_Cluster_LOGICAL_DNS) {
444 cds_update->type = LogicalDnsParse(cluster, &errors);
445 } else if (envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) {
446 ValidationErrors::ScopedField field(&errors, ".cluster_type");
447 const auto* custom_cluster_type =
448 envoy_config_cluster_v3_Cluster_cluster_type(cluster);
449 CHECK_NE(custom_cluster_type, nullptr);
450 ValidationErrors::ScopedField field2(&errors, ".typed_config");
451 const auto* typed_config =
452 envoy_config_cluster_v3_Cluster_CustomClusterType_typed_config(
453 custom_cluster_type);
454 if (typed_config == nullptr) {
455 errors.AddError("field not present");
456 } else {
457 absl::string_view type_url = absl::StripPrefix(
458 UpbStringToAbsl(google_protobuf_Any_type_url(typed_config)),
459 "type.googleapis.com/");
460 if (type_url != "envoy.extensions.clusters.aggregate.v3.ClusterConfig") {
461 ValidationErrors::ScopedField field(&errors, ".type_url");
462 errors.AddError(
463 absl::StrCat("unknown cluster_type extension: ", type_url));
464 } else {
465 // Retrieve aggregate clusters.
466 ValidationErrors::ScopedField field(
467 &errors,
468 ".value[envoy.extensions.clusters.aggregate.v3.ClusterConfig]");
469 absl::string_view serialized_config =
470 UpbStringToAbsl(google_protobuf_Any_value(typed_config));
471 cds_update->type =
472 AggregateClusterParse(context, serialized_config, &errors);
473 }
474 }
475 } else {
476 ValidationErrors::ScopedField field(&errors, ".type");
477 errors.AddError("unknown discovery type");
478 }
479 // Check the LB policy.
480 ParseLbPolicyConfig(context, cluster, cds_update.get(), &errors);
481 // transport_socket
482 auto* transport_socket =
483 envoy_config_cluster_v3_Cluster_transport_socket(cluster);
484 if (transport_socket != nullptr) {
485 ValidationErrors::ScopedField field(&errors,
486 ".transport_socket.typed_config");
487 const auto* typed_config =
488 envoy_config_core_v3_TransportSocket_typed_config(transport_socket);
489 auto extension = ExtractXdsExtension(context, typed_config, &errors);
490 if (extension.has_value()) {
491 if (XdsHttpConnectEnabled() && extension->type == kHttp11ProxyType) {
492 cds_update->use_http_connect = true;
493 cds_update->common_tls_context =
494 Http11ProxyUpstreamTransportParse(context, *extension, &errors);
495 } else if (extension->type == kUpstreamTlsContextType) {
496 cds_update->common_tls_context =
497 UpstreamTlsContextParse(context, *extension, &errors);
498 } else {
499 ValidationErrors::ScopedField field(&errors, ".type_url");
500 errors.AddError("unsupported transport socket type");
501 }
502 }
503 }
504 // Record LRS server name (if any).
505 const envoy_config_core_v3_ConfigSource* lrs_server =
506 envoy_config_cluster_v3_Cluster_lrs_server(cluster);
507 if (lrs_server != nullptr) {
508 if (!envoy_config_core_v3_ConfigSource_has_self(lrs_server)) {
509 ValidationErrors::ScopedField field(&errors, ".lrs_server");
510 errors.AddError("ConfigSource is not self");
511 }
512 cds_update->lrs_load_reporting_server = std::make_shared<GrpcXdsServer>(
513 static_cast<const GrpcXdsServer&>(context.server));
514 }
515 // Record LRS metric propagation.
516 auto propagation = MakeRefCounted<BackendMetricPropagation>();
517 if (XdsOrcaLrsPropagationChangesEnabled()) {
518 size_t size;
519 upb_StringView const* metrics =
520 envoy_config_cluster_v3_Cluster_lrs_report_endpoint_metrics(cluster,
521 &size);
522 for (size_t i = 0; i < size; ++i) {
523 absl::string_view metric_name = UpbStringToAbsl(metrics[i]);
524 if (metric_name == "cpu_utilization") {
525 propagation->propagation_bits |= propagation->kCpuUtilization;
526 } else if (metric_name == "mem_utilization") {
527 propagation->propagation_bits |= propagation->kMemUtilization;
528 } else if (metric_name == "application_utilization") {
529 propagation->propagation_bits |= propagation->kApplicationUtilization;
530 } else if (absl::ConsumePrefix(&metric_name, "named_metrics.")) {
531 if (metric_name == "*") {
532 propagation->propagation_bits |= propagation->kNamedMetricsAll;
533 } else {
534 propagation->named_metric_keys.emplace(metric_name);
535 }
536 }
537 }
538 }
539 cds_update->lrs_backend_metric_propagation = std::move(propagation);
540 // Protocol options.
541 auto* upstream_config =
542 envoy_config_cluster_v3_Cluster_upstream_config(cluster);
543 if (upstream_config != nullptr) {
544 ValidationErrors::ScopedField field(&errors, ".upstream_config");
545 ParseUpstreamConfig(context, upstream_config, cds_update.get(), &errors);
546 }
547 // The Cluster resource encodes the circuit breaking parameters in a list of
548 // Thresholds messages, where each message specifies the parameters for a
549 // particular RoutingPriority. we will look only at the first entry in the
550 // list for priority DEFAULT and default to 1024 if not found.
551 if (envoy_config_cluster_v3_Cluster_has_circuit_breakers(cluster)) {
552 const envoy_config_cluster_v3_CircuitBreakers* circuit_breakers =
553 envoy_config_cluster_v3_Cluster_circuit_breakers(cluster);
554 size_t num_thresholds;
555 const envoy_config_cluster_v3_CircuitBreakers_Thresholds* const*
556 thresholds = envoy_config_cluster_v3_CircuitBreakers_thresholds(
557 circuit_breakers, &num_thresholds);
558 for (size_t i = 0; i < num_thresholds; ++i) {
559 const auto* threshold = thresholds[i];
560 if (envoy_config_cluster_v3_CircuitBreakers_Thresholds_priority(
561 threshold) == envoy_config_core_v3_DEFAULT) {
562 auto value = ParseUInt32Value(
563 envoy_config_cluster_v3_CircuitBreakers_Thresholds_max_requests(
564 threshold));
565 if (value.has_value()) cds_update->max_concurrent_requests = *value;
566 break;
567 }
568 }
569 }
570 // Outlier detection config.
571 if (envoy_config_cluster_v3_Cluster_has_outlier_detection(cluster)) {
572 ValidationErrors::ScopedField field(&errors, ".outlier_detection");
573 OutlierDetectionConfig outlier_detection_update;
574 const envoy_config_cluster_v3_OutlierDetection* outlier_detection =
575 envoy_config_cluster_v3_Cluster_outlier_detection(cluster);
576 const google_protobuf_Duration* duration =
577 envoy_config_cluster_v3_OutlierDetection_interval(outlier_detection);
578 if (duration != nullptr) {
579 ValidationErrors::ScopedField field(&errors, ".interval");
580 outlier_detection_update.interval = ParseDuration(duration, &errors);
581 }
582 duration = envoy_config_cluster_v3_OutlierDetection_base_ejection_time(
583 outlier_detection);
584 if (duration != nullptr) {
585 ValidationErrors::ScopedField field(&errors, ".base_ejection_time");
586 outlier_detection_update.base_ejection_time =
587 ParseDuration(duration, &errors);
588 }
589 duration = envoy_config_cluster_v3_OutlierDetection_max_ejection_time(
590 outlier_detection);
591 if (duration != nullptr) {
592 ValidationErrors::ScopedField field(&errors, ".max_ejection_time");
593 outlier_detection_update.max_ejection_time =
594 ParseDuration(duration, &errors);
595 }
596 auto max_ejection_percent = ParseUInt32Value(
597 envoy_config_cluster_v3_OutlierDetection_max_ejection_percent(
598 outlier_detection));
599 if (max_ejection_percent.has_value()) {
600 outlier_detection_update.max_ejection_percent = *max_ejection_percent;
601 if (outlier_detection_update.max_ejection_percent > 100) {
602 ValidationErrors::ScopedField field(&errors, ".max_ejection_percent");
603 errors.AddError("value must be <= 100");
604 }
605 }
606 auto enforcement_percentage = ParseUInt32Value(
607 envoy_config_cluster_v3_OutlierDetection_enforcing_success_rate(
608 outlier_detection));
609 if (enforcement_percentage.has_value()) {
610 if (*enforcement_percentage > 100) {
611 ValidationErrors::ScopedField field(&errors, ".enforcing_success_rate");
612 errors.AddError("value must be <= 100");
613 }
614 if (*enforcement_percentage != 0) {
615 OutlierDetectionConfig::SuccessRateEjection success_rate_ejection;
616 success_rate_ejection.enforcement_percentage = *enforcement_percentage;
617 auto minimum_hosts = ParseUInt32Value(
618 envoy_config_cluster_v3_OutlierDetection_success_rate_minimum_hosts(
619 outlier_detection));
620 if (minimum_hosts.has_value()) {
621 success_rate_ejection.minimum_hosts = *minimum_hosts;
622 }
623 auto request_volume = ParseUInt32Value(
624 envoy_config_cluster_v3_OutlierDetection_success_rate_request_volume(
625 outlier_detection));
626 if (request_volume.has_value()) {
627 success_rate_ejection.request_volume = *request_volume;
628 }
629 auto stdev_factor = ParseUInt32Value(
630 envoy_config_cluster_v3_OutlierDetection_success_rate_stdev_factor(
631 outlier_detection));
632 if (stdev_factor.has_value()) {
633 success_rate_ejection.stdev_factor = *stdev_factor;
634 }
635 outlier_detection_update.success_rate_ejection = success_rate_ejection;
636 }
637 }
638 enforcement_percentage = ParseUInt32Value(
639 envoy_config_cluster_v3_OutlierDetection_enforcing_failure_percentage(
640 outlier_detection));
641 if (enforcement_percentage.has_value()) {
642 if (*enforcement_percentage > 100) {
643 ValidationErrors::ScopedField field(&errors,
644 ".enforcing_failure_percentage");
645 errors.AddError("value must be <= 100");
646 }
647 if (*enforcement_percentage != 0) {
648 OutlierDetectionConfig::FailurePercentageEjection
649 failure_percentage_ejection;
650 failure_percentage_ejection.enforcement_percentage =
651 *enforcement_percentage;
652 auto minimum_hosts = ParseUInt32Value(
653 envoy_config_cluster_v3_OutlierDetection_failure_percentage_minimum_hosts(
654 outlier_detection));
655 if (minimum_hosts.has_value()) {
656 failure_percentage_ejection.minimum_hosts = *minimum_hosts;
657 }
658 auto request_volume = ParseUInt32Value(
659 envoy_config_cluster_v3_OutlierDetection_failure_percentage_request_volume(
660 outlier_detection));
661 if (request_volume.has_value()) {
662 failure_percentage_ejection.request_volume = *request_volume;
663 }
664 auto threshold = ParseUInt32Value(
665 envoy_config_cluster_v3_OutlierDetection_failure_percentage_threshold(
666 outlier_detection));
667 if (threshold.has_value()) {
668 failure_percentage_ejection.threshold = *threshold;
669 if (*enforcement_percentage > 100) {
670 ValidationErrors::ScopedField field(
671 &errors, ".failure_percentage_threshold");
672 errors.AddError("value must be <= 100");
673 }
674 }
675 outlier_detection_update.failure_percentage_ejection =
676 failure_percentage_ejection;
677 }
678 }
679 cds_update->outlier_detection = outlier_detection_update;
680 }
681 // Validate override host status.
682 const auto* common_lb_config =
683 envoy_config_cluster_v3_Cluster_common_lb_config(cluster);
684 bool override_host_status_found = false;
685 if (common_lb_config != nullptr) {
686 ValidationErrors::ScopedField field(&errors, ".common_lb_config");
687 const auto* override_host_status =
688 envoy_config_cluster_v3_Cluster_CommonLbConfig_override_host_status(
689 common_lb_config);
690 if (override_host_status != nullptr) {
691 ValidationErrors::ScopedField field(&errors, ".override_host_status");
692 size_t size;
693 const int32_t* statuses = envoy_config_core_v3_HealthStatusSet_statuses(
694 override_host_status, &size);
695 for (size_t i = 0; i < size; ++i) {
696 auto status = XdsHealthStatus::FromUpb(statuses[i]);
697 if (status.has_value()) {
698 cds_update->override_host_statuses.Add(*status);
699 }
700 }
701 override_host_status_found = true;
702 }
703 }
704 // If the field is not set, we default to [UNKNOWN, HEALTHY].
705 if (!override_host_status_found) {
706 cds_update->override_host_statuses.Add(
707 XdsHealthStatus(XdsHealthStatus::kUnknown));
708 cds_update->override_host_statuses.Add(
709 XdsHealthStatus(XdsHealthStatus::kHealthy));
710 }
711 // Parse metadata.
712 {
713 ValidationErrors::ScopedField field(&errors, ".metadata");
714 cds_update->metadata = ParseXdsMetadataMap(
715 context, envoy_config_cluster_v3_Cluster_metadata(cluster), &errors);
716 }
717 // Return result.
718 if (!errors.ok()) {
719 return errors.status(absl::StatusCode::kInvalidArgument,
720 "errors validating Cluster resource");
721 }
722 return cds_update;
723 }
724
MaybeLogCluster(const XdsResourceType::DecodeContext & context,const envoy_config_cluster_v3_Cluster * cluster)725 void MaybeLogCluster(const XdsResourceType::DecodeContext& context,
726 const envoy_config_cluster_v3_Cluster* cluster) {
727 if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer) && ABSL_VLOG_IS_ON(2)) {
728 const upb_MessageDef* msg_type =
729 envoy_config_cluster_v3_Cluster_getmsgdef(context.symtab);
730 char buf[10240];
731 upb_TextEncode(reinterpret_cast<const upb_Message*>(cluster), msg_type,
732 nullptr, 0, buf, sizeof(buf));
733 VLOG(2) << "[xds_client " << context.client << "] Cluster: " << buf;
734 }
735 }
736
737 } // namespace
738
Decode(const XdsResourceType::DecodeContext & context,absl::string_view serialized_resource) const739 XdsResourceType::DecodeResult XdsClusterResourceType::Decode(
740 const XdsResourceType::DecodeContext& context,
741 absl::string_view serialized_resource) const {
742 DecodeResult result;
743 // Parse serialized proto.
744 auto* resource = envoy_config_cluster_v3_Cluster_parse(
745 serialized_resource.data(), serialized_resource.size(), context.arena);
746 if (resource == nullptr) {
747 result.resource =
748 absl::InvalidArgumentError("Can't parse Cluster resource.");
749 return result;
750 }
751 MaybeLogCluster(context, resource);
752 // Validate resource.
753 result.name =
754 UpbStringToStdString(envoy_config_cluster_v3_Cluster_name(resource));
755 auto cds_resource = CdsResourceParse(context, resource);
756 if (!cds_resource.ok()) {
757 if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer)) {
758 LOG(ERROR) << "[xds_client " << context.client << "] invalid Cluster "
759 << *result.name << ": " << cds_resource.status();
760 }
761 result.resource = cds_resource.status();
762 } else {
763 if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer)) {
764 LOG(INFO) << "[xds_client " << context.client << "] parsed Cluster "
765 << *result.name << ": " << (*cds_resource)->ToString();
766 }
767 result.resource = std::move(*cds_resource);
768 }
769 return result;
770 }
771
772 } // namespace grpc_core
773