• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2018 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 #include "src/core/xds/grpc/xds_cluster_parser.h"
17 
18 #include <algorithm>
19 #include <memory>
20 #include <utility>
21 
22 #include "absl/log/check.h"
23 #include "absl/log/log.h"
24 #include "absl/status/status.h"
25 #include "absl/status/statusor.h"
26 #include "absl/strings/str_cat.h"
27 #include "absl/strings/strip.h"
28 #include "envoy/config/cluster/v3/circuit_breaker.upb.h"
29 #include "envoy/config/cluster/v3/cluster.upb.h"
30 #include "envoy/config/cluster/v3/cluster.upbdefs.h"
31 #include "envoy/config/cluster/v3/outlier_detection.upb.h"
32 #include "envoy/config/core/v3/address.upb.h"
33 #include "envoy/config/core/v3/base.upb.h"
34 #include "envoy/config/core/v3/config_source.upb.h"
35 #include "envoy/config/core/v3/extension.upb.h"
36 #include "envoy/config/core/v3/health_check.upb.h"
37 #include "envoy/config/core/v3/protocol.upb.h"
38 #include "envoy/config/endpoint/v3/endpoint.upb.h"
39 #include "envoy/config/endpoint/v3/endpoint_components.upb.h"
40 #include "envoy/extensions/clusters/aggregate/v3/cluster.upb.h"
41 #include "envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.upb.h"
42 #include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h"
43 #include "envoy/extensions/upstreams/http/v3/http_protocol_options.upb.h"
44 #include "google/protobuf/any.upb.h"
45 #include "google/protobuf/duration.upb.h"
46 #include "google/protobuf/struct.upb.h"
47 #include "google/protobuf/wrappers.upb.h"
48 #include "src/core/config/core_configuration.h"
49 #include "src/core/lib/debug/trace.h"
50 #include "src/core/load_balancing/lb_policy_registry.h"
51 #include "src/core/util/env.h"
52 #include "src/core/util/host_port.h"
53 #include "src/core/util/time.h"
54 #include "src/core/util/upb_utils.h"
55 #include "src/core/util/validation_errors.h"
56 #include "src/core/xds/grpc/xds_bootstrap_grpc.h"
57 #include "src/core/xds/grpc/xds_common_types.h"
58 #include "src/core/xds/grpc/xds_common_types_parser.h"
59 #include "src/core/xds/grpc/xds_lb_policy_registry.h"
60 #include "src/core/xds/grpc/xds_metadata_parser.h"
61 #include "src/core/xds/xds_client/lrs_client.h"
62 #include "src/core/xds/xds_client/xds_backend_metric_propagation.h"
63 #include "upb/base/string_view.h"
64 #include "upb/text/encode.h"
65 
66 namespace grpc_core {
67 
68 // TODO(roth): Remove this once the feature passes interop tests.
XdsHttpConnectEnabled()69 bool XdsHttpConnectEnabled() {
70   auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_HTTP_CONNECT");
71   if (!value.has_value()) return false;
72   bool parsed_value;
73   bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
74   return parse_succeeded && parsed_value;
75 }
76 
77 namespace {
78 
79 constexpr absl::string_view kUpstreamTlsContextType =
80     "envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext";
81 
82 constexpr absl::string_view kHttp11ProxyType =
83     "envoy.extensions.transport_sockets.http_11_proxy.v3"
84     ".Http11ProxyUpstreamTransport";
85 
UpstreamTlsContextParse(const XdsResourceType::DecodeContext & context,const XdsExtension & extension,ValidationErrors * errors)86 CommonTlsContext UpstreamTlsContextParse(
87     const XdsResourceType::DecodeContext& context,
88     const XdsExtension& extension, ValidationErrors* errors) {
89   const absl::string_view* serialized_upstream_tls_context =
90       absl::get_if<absl::string_view>(&extension.value);
91   if (serialized_upstream_tls_context == nullptr) {
92     errors->AddError("can't decode UpstreamTlsContext");
93     return {};
94   }
95   const auto* upstream_tls_context =
96       envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_parse(
97           serialized_upstream_tls_context->data(),
98           serialized_upstream_tls_context->size(), context.arena);
99   if (upstream_tls_context == nullptr) {
100     errors->AddError("can't decode UpstreamTlsContext");
101     return {};
102   }
103   ValidationErrors::ScopedField field3(errors, ".common_tls_context");
104   const auto* common_tls_context_proto =
105       envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_common_tls_context(
106           upstream_tls_context);
107   CommonTlsContext common_tls_context;
108   if (common_tls_context_proto != nullptr) {
109     common_tls_context =
110         CommonTlsContextParse(context, common_tls_context_proto, errors);
111   }
112   if (absl::holds_alternative<absl::monostate>(
113           common_tls_context.certificate_validation_context.ca_certs)) {
114     errors->AddError("no CA certs configured");
115   }
116   return common_tls_context;
117 }
118 
Http11ProxyUpstreamTransportParse(const XdsResourceType::DecodeContext & context,const XdsExtension & extension,ValidationErrors * errors)119 CommonTlsContext Http11ProxyUpstreamTransportParse(
120     const XdsResourceType::DecodeContext& context,
121     const XdsExtension& extension, ValidationErrors* errors) {
122   const absl::string_view* serialized =
123       absl::get_if<absl::string_view>(&extension.value);
124   if (serialized == nullptr) {
125     errors->AddError("can't decode Http11ProxyUpstreamTransport");
126     return {};
127   }
128   const auto* http_11_proxy =
129       envoy_extensions_transport_sockets_http_11_proxy_v3_Http11ProxyUpstreamTransport_parse(
130           serialized->data(), serialized->size(), context.arena);
131   if (http_11_proxy == nullptr) {
132     errors->AddError("can't decode Http11ProxyUpstreamTransport");
133     return {};
134   }
135   const auto* transport_socket =
136       envoy_extensions_transport_sockets_http_11_proxy_v3_Http11ProxyUpstreamTransport_transport_socket(
137           http_11_proxy);
138   if (transport_socket == nullptr) return {};
139   ValidationErrors::ScopedField field(errors, ".transport_socket.typed_config");
140   const auto* typed_config =
141       envoy_config_core_v3_TransportSocket_typed_config(transport_socket);
142   auto wrapped_extension = ExtractXdsExtension(context, typed_config, errors);
143   if (!wrapped_extension.has_value()) return {};
144   if (wrapped_extension->type != kUpstreamTlsContextType) {
145     ValidationErrors::ScopedField field(errors, ".type_url");
146     errors->AddError("unsupported transport socket type");
147     return {};
148   }
149   return UpstreamTlsContextParse(context, *wrapped_extension, errors);
150 }
151 
EdsConfigParse(const envoy_config_cluster_v3_Cluster * cluster,ValidationErrors * errors)152 XdsClusterResource::Eds EdsConfigParse(
153     const envoy_config_cluster_v3_Cluster* cluster, ValidationErrors* errors) {
154   XdsClusterResource::Eds eds;
155   ValidationErrors::ScopedField field(errors, ".eds_cluster_config");
156   const envoy_config_cluster_v3_Cluster_EdsClusterConfig* eds_cluster_config =
157       envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster);
158   if (eds_cluster_config == nullptr) {
159     errors->AddError("field not present");
160   } else {
161     // Validate ConfigSource.
162     {
163       ValidationErrors::ScopedField field(errors, ".eds_config");
164       const envoy_config_core_v3_ConfigSource* eds_config =
165           envoy_config_cluster_v3_Cluster_EdsClusterConfig_eds_config(
166               eds_cluster_config);
167       if (eds_config == nullptr) {
168         errors->AddError("field not present");
169       } else {
170         if (!envoy_config_core_v3_ConfigSource_has_ads(eds_config) &&
171             !envoy_config_core_v3_ConfigSource_has_self(eds_config)) {
172           errors->AddError("ConfigSource is not ads or self");
173         }
174       }
175     }
176     // Record EDS service_name (if any).
177     // This field is required if the CDS resource has an xdstp name.
178     eds.eds_service_name = UpbStringToStdString(
179         envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name(
180             eds_cluster_config));
181     if (eds.eds_service_name.empty()) {
182       absl::string_view cluster_name =
183           UpbStringToAbsl(envoy_config_cluster_v3_Cluster_name(cluster));
184       if (absl::StartsWith(cluster_name, "xdstp:")) {
185         ValidationErrors::ScopedField field(errors, ".service_name");
186         errors->AddError("must be set if Cluster resource has an xdstp name");
187       }
188     }
189   }
190   return eds;
191 }
192 
LogicalDnsParse(const envoy_config_cluster_v3_Cluster * cluster,ValidationErrors * errors)193 XdsClusterResource::LogicalDns LogicalDnsParse(
194     const envoy_config_cluster_v3_Cluster* cluster, ValidationErrors* errors) {
195   XdsClusterResource::LogicalDns logical_dns;
196   ValidationErrors::ScopedField field(errors, ".load_assignment");
197   const auto* load_assignment =
198       envoy_config_cluster_v3_Cluster_load_assignment(cluster);
199   if (load_assignment == nullptr) {
200     errors->AddError("field not present for LOGICAL_DNS cluster");
201     return logical_dns;
202   }
203   ValidationErrors::ScopedField field2(errors, ".endpoints");
204   size_t num_localities;
205   const auto* const* localities =
206       envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints(load_assignment,
207                                                                &num_localities);
208   if (num_localities != 1) {
209     errors->AddError(absl::StrCat(
210         "must contain exactly one locality for LOGICAL_DNS cluster, found ",
211         num_localities));
212     return logical_dns;
213   }
214   ValidationErrors::ScopedField field3(errors, "[0].lb_endpoints");
215   size_t num_endpoints;
216   const auto* const* endpoints =
217       envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints(localities[0],
218                                                                 &num_endpoints);
219   if (num_endpoints != 1) {
220     errors->AddError(absl::StrCat(
221         "must contain exactly one endpoint for LOGICAL_DNS cluster, found ",
222         num_endpoints));
223     return logical_dns;
224   }
225   ValidationErrors::ScopedField field4(errors, "[0].endpoint");
226   const auto* endpoint =
227       envoy_config_endpoint_v3_LbEndpoint_endpoint(endpoints[0]);
228   if (endpoint == nullptr) {
229     errors->AddError("field not present");
230     return logical_dns;
231   }
232   ValidationErrors::ScopedField field5(errors, ".address");
233   const auto* address = envoy_config_endpoint_v3_Endpoint_address(endpoint);
234   if (address == nullptr) {
235     errors->AddError("field not present");
236     return logical_dns;
237   }
238   ValidationErrors::ScopedField field6(errors, ".socket_address");
239   const auto* socket_address =
240       envoy_config_core_v3_Address_socket_address(address);
241   if (socket_address == nullptr) {
242     errors->AddError("field not present");
243     return logical_dns;
244   }
245   if (envoy_config_core_v3_SocketAddress_resolver_name(socket_address).size !=
246       0) {
247     ValidationErrors::ScopedField field(errors, ".resolver_name");
248     errors->AddError(
249         "LOGICAL_DNS clusters must NOT have a custom resolver name set");
250   }
251   absl::string_view address_str = UpbStringToAbsl(
252       envoy_config_core_v3_SocketAddress_address(socket_address));
253   if (address_str.empty()) {
254     ValidationErrors::ScopedField field(errors, ".address");
255     errors->AddError("field not present");
256   }
257   if (!envoy_config_core_v3_SocketAddress_has_port_value(socket_address)) {
258     ValidationErrors::ScopedField field(errors, ".port_value");
259     errors->AddError("field not present");
260   }
261   logical_dns.hostname = JoinHostPort(
262       address_str,
263       envoy_config_core_v3_SocketAddress_port_value(socket_address));
264   return logical_dns;
265 }
266 
AggregateClusterParse(const XdsResourceType::DecodeContext & context,absl::string_view serialized_config,ValidationErrors * errors)267 XdsClusterResource::Aggregate AggregateClusterParse(
268     const XdsResourceType::DecodeContext& context,
269     absl::string_view serialized_config, ValidationErrors* errors) {
270   XdsClusterResource::Aggregate aggregate;
271   const auto* aggregate_cluster_config =
272       envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse(
273           serialized_config.data(), serialized_config.size(), context.arena);
274   if (aggregate_cluster_config == nullptr) {
275     errors->AddError("can't parse aggregate cluster config");
276     return aggregate;
277   }
278   size_t size;
279   const upb_StringView* clusters =
280       envoy_extensions_clusters_aggregate_v3_ClusterConfig_clusters(
281           aggregate_cluster_config, &size);
282   if (size == 0) {
283     ValidationErrors::ScopedField field(errors, ".clusters");
284     errors->AddError("must be non-empty");
285   }
286   for (size_t i = 0; i < size; ++i) {
287     aggregate.prioritized_cluster_names.emplace_back(
288         UpbStringToStdString(clusters[i]));
289   }
290   return aggregate;
291 }
292 
ParseLbPolicyConfig(const XdsResourceType::DecodeContext & context,const envoy_config_cluster_v3_Cluster * cluster,XdsClusterResource * cds_update,ValidationErrors * errors)293 void ParseLbPolicyConfig(const XdsResourceType::DecodeContext& context,
294                          const envoy_config_cluster_v3_Cluster* cluster,
295                          XdsClusterResource* cds_update,
296                          ValidationErrors* errors) {
297   // First, check the new load_balancing_policy field.
298   const auto* load_balancing_policy =
299       envoy_config_cluster_v3_Cluster_load_balancing_policy(cluster);
300   if (load_balancing_policy != nullptr) {
301     const auto& registry =
302         static_cast<const GrpcXdsBootstrap&>(context.client->bootstrap())
303             .lb_policy_registry();
304     ValidationErrors::ScopedField field(errors, ".load_balancing_policy");
305     const size_t original_error_count = errors->size();
306     cds_update->lb_policy_config = registry.ConvertXdsLbPolicyConfig(
307         context, load_balancing_policy, errors);
308     // If there were no conversion errors, validate that the converted config
309     // parses with the gRPC LB policy registry.
310     if (original_error_count == errors->size()) {
311       auto config = CoreConfiguration::Get()
312                         .lb_policy_registry()
313                         .ParseLoadBalancingConfig(
314                             Json::FromArray(cds_update->lb_policy_config));
315       if (!config.ok()) errors->AddError(config.status().message());
316     }
317     return;
318   }
319   // Didn't find load_balancing_policy field, so fall back to the old
320   // lb_policy enum field.
321   if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) ==
322       envoy_config_cluster_v3_Cluster_ROUND_ROBIN) {
323     cds_update->lb_policy_config = {
324         Json::FromObject({
325             {"xds_wrr_locality_experimental",
326              Json::FromObject({
327                  {"childPolicy", Json::FromArray({
328                                      Json::FromObject({
329                                          {"round_robin", Json::FromObject({})},
330                                      }),
331                                  })},
332              })},
333         }),
334     };
335   } else if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) ==
336              envoy_config_cluster_v3_Cluster_RING_HASH) {
337     // Record ring hash lb config
338     auto* ring_hash_config =
339         envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster);
340     uint64_t min_ring_size = 1024;
341     uint64_t max_ring_size = 8388608;
342     if (ring_hash_config != nullptr) {
343       ValidationErrors::ScopedField field(errors, ".ring_hash_lb_config");
344       auto value = ParseUInt64Value(
345           envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size(
346               ring_hash_config));
347       if (value.has_value()) {
348         ValidationErrors::ScopedField field(errors, ".maximum_ring_size");
349         max_ring_size = *value;
350         if (max_ring_size > 8388608 || max_ring_size == 0) {
351           errors->AddError("must be in the range of 1 to 8388608");
352         }
353       }
354       value = ParseUInt64Value(
355           envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size(
356               ring_hash_config));
357       if (value.has_value()) {
358         ValidationErrors::ScopedField field(errors, ".minimum_ring_size");
359         min_ring_size = *value;
360         if (min_ring_size > 8388608 || min_ring_size == 0) {
361           errors->AddError("must be in the range of 1 to 8388608");
362         }
363         if (min_ring_size > max_ring_size) {
364           errors->AddError("cannot be greater than maximum_ring_size");
365         }
366       }
367       if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
368               ring_hash_config) !=
369           envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) {
370         ValidationErrors::ScopedField field(errors, ".hash_function");
371         errors->AddError("invalid hash function");
372       }
373     }
374     cds_update->lb_policy_config = {
375         Json::FromObject({
376             {"ring_hash_experimental",
377              Json::FromObject({
378                  {"minRingSize", Json::FromNumber(min_ring_size)},
379                  {"maxRingSize", Json::FromNumber(max_ring_size)},
380              })},
381         }),
382     };
383   } else {
384     ValidationErrors::ScopedField field(errors, ".lb_policy");
385     errors->AddError("LB policy is not supported");
386   }
387 }
388 
ParseUpstreamConfig(const XdsResourceType::DecodeContext & context,const envoy_config_core_v3_TypedExtensionConfig * upstream_config,XdsClusterResource * cds_update,ValidationErrors * errors)389 void ParseUpstreamConfig(
390     const XdsResourceType::DecodeContext& context,
391     const envoy_config_core_v3_TypedExtensionConfig* upstream_config,
392     XdsClusterResource* cds_update, ValidationErrors* errors) {
393   ValidationErrors::ScopedField field(errors, ".typed_config");
394   const auto* typed_config =
395       envoy_config_core_v3_TypedExtensionConfig_typed_config(upstream_config);
396   auto extension = ExtractXdsExtension(context, typed_config, errors);
397   if (!extension.has_value()) return;
398   if (extension->type !=
399       "envoy.extensions.upstreams.http.v3.HttpProtocolOptions") {
400     ValidationErrors::ScopedField field(errors, ".type_url");
401     errors->AddError("unsupported upstream config type");
402     return;
403   }
404   absl::string_view* serialized_http_protocol_options =
405       absl::get_if<absl::string_view>(&extension->value);
406   if (serialized_http_protocol_options == nullptr) {
407     errors->AddError("can't decode HttpProtocolOptions");
408     return;
409   }
410   const auto* http_protocol_options =
411       envoy_extensions_upstreams_http_v3_HttpProtocolOptions_parse(
412           serialized_http_protocol_options->data(),
413           serialized_http_protocol_options->size(), context.arena);
414   if (http_protocol_options == nullptr) {
415     errors->AddError("can't decode HttpProtocolOptions");
416     return;
417   }
418   ValidationErrors::ScopedField field2(errors, ".common_http_protocol_options");
419   const auto* common_http_protocol_options =
420       envoy_extensions_upstreams_http_v3_HttpProtocolOptions_common_http_protocol_options(
421           http_protocol_options);
422   if (common_http_protocol_options != nullptr) {
423     const auto* idle_timeout =
424         envoy_config_core_v3_HttpProtocolOptions_idle_timeout(
425             common_http_protocol_options);
426     if (idle_timeout != nullptr) {
427       ValidationErrors::ScopedField field(errors, ".idle_timeout");
428       cds_update->connection_idle_timeout = ParseDuration(idle_timeout, errors);
429     }
430   }
431 }
432 
CdsResourceParse(const XdsResourceType::DecodeContext & context,const envoy_config_cluster_v3_Cluster * cluster)433 absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
434     const XdsResourceType::DecodeContext& context,
435     const envoy_config_cluster_v3_Cluster* cluster) {
436   auto cds_update = std::make_shared<XdsClusterResource>();
437   ValidationErrors errors;
438   // Check the cluster discovery type.
439   if (envoy_config_cluster_v3_Cluster_type(cluster) ==
440       envoy_config_cluster_v3_Cluster_EDS) {
441     cds_update->type = EdsConfigParse(cluster, &errors);
442   } else if (envoy_config_cluster_v3_Cluster_type(cluster) ==
443              envoy_config_cluster_v3_Cluster_LOGICAL_DNS) {
444     cds_update->type = LogicalDnsParse(cluster, &errors);
445   } else if (envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) {
446     ValidationErrors::ScopedField field(&errors, ".cluster_type");
447     const auto* custom_cluster_type =
448         envoy_config_cluster_v3_Cluster_cluster_type(cluster);
449     CHECK_NE(custom_cluster_type, nullptr);
450     ValidationErrors::ScopedField field2(&errors, ".typed_config");
451     const auto* typed_config =
452         envoy_config_cluster_v3_Cluster_CustomClusterType_typed_config(
453             custom_cluster_type);
454     if (typed_config == nullptr) {
455       errors.AddError("field not present");
456     } else {
457       absl::string_view type_url = absl::StripPrefix(
458           UpbStringToAbsl(google_protobuf_Any_type_url(typed_config)),
459           "type.googleapis.com/");
460       if (type_url != "envoy.extensions.clusters.aggregate.v3.ClusterConfig") {
461         ValidationErrors::ScopedField field(&errors, ".type_url");
462         errors.AddError(
463             absl::StrCat("unknown cluster_type extension: ", type_url));
464       } else {
465         // Retrieve aggregate clusters.
466         ValidationErrors::ScopedField field(
467             &errors,
468             ".value[envoy.extensions.clusters.aggregate.v3.ClusterConfig]");
469         absl::string_view serialized_config =
470             UpbStringToAbsl(google_protobuf_Any_value(typed_config));
471         cds_update->type =
472             AggregateClusterParse(context, serialized_config, &errors);
473       }
474     }
475   } else {
476     ValidationErrors::ScopedField field(&errors, ".type");
477     errors.AddError("unknown discovery type");
478   }
479   // Check the LB policy.
480   ParseLbPolicyConfig(context, cluster, cds_update.get(), &errors);
481   // transport_socket
482   auto* transport_socket =
483       envoy_config_cluster_v3_Cluster_transport_socket(cluster);
484   if (transport_socket != nullptr) {
485     ValidationErrors::ScopedField field(&errors,
486                                         ".transport_socket.typed_config");
487     const auto* typed_config =
488         envoy_config_core_v3_TransportSocket_typed_config(transport_socket);
489     auto extension = ExtractXdsExtension(context, typed_config, &errors);
490     if (extension.has_value()) {
491       if (XdsHttpConnectEnabled() && extension->type == kHttp11ProxyType) {
492         cds_update->use_http_connect = true;
493         cds_update->common_tls_context =
494             Http11ProxyUpstreamTransportParse(context, *extension, &errors);
495       } else if (extension->type == kUpstreamTlsContextType) {
496         cds_update->common_tls_context =
497             UpstreamTlsContextParse(context, *extension, &errors);
498       } else {
499         ValidationErrors::ScopedField field(&errors, ".type_url");
500         errors.AddError("unsupported transport socket type");
501       }
502     }
503   }
504   // Record LRS server name (if any).
505   const envoy_config_core_v3_ConfigSource* lrs_server =
506       envoy_config_cluster_v3_Cluster_lrs_server(cluster);
507   if (lrs_server != nullptr) {
508     if (!envoy_config_core_v3_ConfigSource_has_self(lrs_server)) {
509       ValidationErrors::ScopedField field(&errors, ".lrs_server");
510       errors.AddError("ConfigSource is not self");
511     }
512     cds_update->lrs_load_reporting_server = std::make_shared<GrpcXdsServer>(
513         static_cast<const GrpcXdsServer&>(context.server));
514   }
515   // Record LRS metric propagation.
516   auto propagation = MakeRefCounted<BackendMetricPropagation>();
517   if (XdsOrcaLrsPropagationChangesEnabled()) {
518     size_t size;
519     upb_StringView const* metrics =
520         envoy_config_cluster_v3_Cluster_lrs_report_endpoint_metrics(cluster,
521                                                                     &size);
522     for (size_t i = 0; i < size; ++i) {
523       absl::string_view metric_name = UpbStringToAbsl(metrics[i]);
524       if (metric_name == "cpu_utilization") {
525         propagation->propagation_bits |= propagation->kCpuUtilization;
526       } else if (metric_name == "mem_utilization") {
527         propagation->propagation_bits |= propagation->kMemUtilization;
528       } else if (metric_name == "application_utilization") {
529         propagation->propagation_bits |= propagation->kApplicationUtilization;
530       } else if (absl::ConsumePrefix(&metric_name, "named_metrics.")) {
531         if (metric_name == "*") {
532           propagation->propagation_bits |= propagation->kNamedMetricsAll;
533         } else {
534           propagation->named_metric_keys.emplace(metric_name);
535         }
536       }
537     }
538   }
539   cds_update->lrs_backend_metric_propagation = std::move(propagation);
540   // Protocol options.
541   auto* upstream_config =
542       envoy_config_cluster_v3_Cluster_upstream_config(cluster);
543   if (upstream_config != nullptr) {
544     ValidationErrors::ScopedField field(&errors, ".upstream_config");
545     ParseUpstreamConfig(context, upstream_config, cds_update.get(), &errors);
546   }
547   // The Cluster resource encodes the circuit breaking parameters in a list of
548   // Thresholds messages, where each message specifies the parameters for a
549   // particular RoutingPriority. we will look only at the first entry in the
550   // list for priority DEFAULT and default to 1024 if not found.
551   if (envoy_config_cluster_v3_Cluster_has_circuit_breakers(cluster)) {
552     const envoy_config_cluster_v3_CircuitBreakers* circuit_breakers =
553         envoy_config_cluster_v3_Cluster_circuit_breakers(cluster);
554     size_t num_thresholds;
555     const envoy_config_cluster_v3_CircuitBreakers_Thresholds* const*
556         thresholds = envoy_config_cluster_v3_CircuitBreakers_thresholds(
557             circuit_breakers, &num_thresholds);
558     for (size_t i = 0; i < num_thresholds; ++i) {
559       const auto* threshold = thresholds[i];
560       if (envoy_config_cluster_v3_CircuitBreakers_Thresholds_priority(
561               threshold) == envoy_config_core_v3_DEFAULT) {
562         auto value = ParseUInt32Value(
563             envoy_config_cluster_v3_CircuitBreakers_Thresholds_max_requests(
564                 threshold));
565         if (value.has_value()) cds_update->max_concurrent_requests = *value;
566         break;
567       }
568     }
569   }
570   // Outlier detection config.
571   if (envoy_config_cluster_v3_Cluster_has_outlier_detection(cluster)) {
572     ValidationErrors::ScopedField field(&errors, ".outlier_detection");
573     OutlierDetectionConfig outlier_detection_update;
574     const envoy_config_cluster_v3_OutlierDetection* outlier_detection =
575         envoy_config_cluster_v3_Cluster_outlier_detection(cluster);
576     const google_protobuf_Duration* duration =
577         envoy_config_cluster_v3_OutlierDetection_interval(outlier_detection);
578     if (duration != nullptr) {
579       ValidationErrors::ScopedField field(&errors, ".interval");
580       outlier_detection_update.interval = ParseDuration(duration, &errors);
581     }
582     duration = envoy_config_cluster_v3_OutlierDetection_base_ejection_time(
583         outlier_detection);
584     if (duration != nullptr) {
585       ValidationErrors::ScopedField field(&errors, ".base_ejection_time");
586       outlier_detection_update.base_ejection_time =
587           ParseDuration(duration, &errors);
588     }
589     duration = envoy_config_cluster_v3_OutlierDetection_max_ejection_time(
590         outlier_detection);
591     if (duration != nullptr) {
592       ValidationErrors::ScopedField field(&errors, ".max_ejection_time");
593       outlier_detection_update.max_ejection_time =
594           ParseDuration(duration, &errors);
595     }
596     auto max_ejection_percent = ParseUInt32Value(
597         envoy_config_cluster_v3_OutlierDetection_max_ejection_percent(
598             outlier_detection));
599     if (max_ejection_percent.has_value()) {
600       outlier_detection_update.max_ejection_percent = *max_ejection_percent;
601       if (outlier_detection_update.max_ejection_percent > 100) {
602         ValidationErrors::ScopedField field(&errors, ".max_ejection_percent");
603         errors.AddError("value must be <= 100");
604       }
605     }
606     auto enforcement_percentage = ParseUInt32Value(
607         envoy_config_cluster_v3_OutlierDetection_enforcing_success_rate(
608             outlier_detection));
609     if (enforcement_percentage.has_value()) {
610       if (*enforcement_percentage > 100) {
611         ValidationErrors::ScopedField field(&errors, ".enforcing_success_rate");
612         errors.AddError("value must be <= 100");
613       }
614       if (*enforcement_percentage != 0) {
615         OutlierDetectionConfig::SuccessRateEjection success_rate_ejection;
616         success_rate_ejection.enforcement_percentage = *enforcement_percentage;
617         auto minimum_hosts = ParseUInt32Value(
618             envoy_config_cluster_v3_OutlierDetection_success_rate_minimum_hosts(
619                 outlier_detection));
620         if (minimum_hosts.has_value()) {
621           success_rate_ejection.minimum_hosts = *minimum_hosts;
622         }
623         auto request_volume = ParseUInt32Value(
624             envoy_config_cluster_v3_OutlierDetection_success_rate_request_volume(
625                 outlier_detection));
626         if (request_volume.has_value()) {
627           success_rate_ejection.request_volume = *request_volume;
628         }
629         auto stdev_factor = ParseUInt32Value(
630             envoy_config_cluster_v3_OutlierDetection_success_rate_stdev_factor(
631                 outlier_detection));
632         if (stdev_factor.has_value()) {
633           success_rate_ejection.stdev_factor = *stdev_factor;
634         }
635         outlier_detection_update.success_rate_ejection = success_rate_ejection;
636       }
637     }
638     enforcement_percentage = ParseUInt32Value(
639         envoy_config_cluster_v3_OutlierDetection_enforcing_failure_percentage(
640             outlier_detection));
641     if (enforcement_percentage.has_value()) {
642       if (*enforcement_percentage > 100) {
643         ValidationErrors::ScopedField field(&errors,
644                                             ".enforcing_failure_percentage");
645         errors.AddError("value must be <= 100");
646       }
647       if (*enforcement_percentage != 0) {
648         OutlierDetectionConfig::FailurePercentageEjection
649             failure_percentage_ejection;
650         failure_percentage_ejection.enforcement_percentage =
651             *enforcement_percentage;
652         auto minimum_hosts = ParseUInt32Value(
653             envoy_config_cluster_v3_OutlierDetection_failure_percentage_minimum_hosts(
654                 outlier_detection));
655         if (minimum_hosts.has_value()) {
656           failure_percentage_ejection.minimum_hosts = *minimum_hosts;
657         }
658         auto request_volume = ParseUInt32Value(
659             envoy_config_cluster_v3_OutlierDetection_failure_percentage_request_volume(
660                 outlier_detection));
661         if (request_volume.has_value()) {
662           failure_percentage_ejection.request_volume = *request_volume;
663         }
664         auto threshold = ParseUInt32Value(
665             envoy_config_cluster_v3_OutlierDetection_failure_percentage_threshold(
666                 outlier_detection));
667         if (threshold.has_value()) {
668           failure_percentage_ejection.threshold = *threshold;
669           if (*enforcement_percentage > 100) {
670             ValidationErrors::ScopedField field(
671                 &errors, ".failure_percentage_threshold");
672             errors.AddError("value must be <= 100");
673           }
674         }
675         outlier_detection_update.failure_percentage_ejection =
676             failure_percentage_ejection;
677       }
678     }
679     cds_update->outlier_detection = outlier_detection_update;
680   }
681   // Validate override host status.
682   const auto* common_lb_config =
683       envoy_config_cluster_v3_Cluster_common_lb_config(cluster);
684   bool override_host_status_found = false;
685   if (common_lb_config != nullptr) {
686     ValidationErrors::ScopedField field(&errors, ".common_lb_config");
687     const auto* override_host_status =
688         envoy_config_cluster_v3_Cluster_CommonLbConfig_override_host_status(
689             common_lb_config);
690     if (override_host_status != nullptr) {
691       ValidationErrors::ScopedField field(&errors, ".override_host_status");
692       size_t size;
693       const int32_t* statuses = envoy_config_core_v3_HealthStatusSet_statuses(
694           override_host_status, &size);
695       for (size_t i = 0; i < size; ++i) {
696         auto status = XdsHealthStatus::FromUpb(statuses[i]);
697         if (status.has_value()) {
698           cds_update->override_host_statuses.Add(*status);
699         }
700       }
701       override_host_status_found = true;
702     }
703   }
704   // If the field is not set, we default to [UNKNOWN, HEALTHY].
705   if (!override_host_status_found) {
706     cds_update->override_host_statuses.Add(
707         XdsHealthStatus(XdsHealthStatus::kUnknown));
708     cds_update->override_host_statuses.Add(
709         XdsHealthStatus(XdsHealthStatus::kHealthy));
710   }
711   // Parse metadata.
712   {
713     ValidationErrors::ScopedField field(&errors, ".metadata");
714     cds_update->metadata = ParseXdsMetadataMap(
715         context, envoy_config_cluster_v3_Cluster_metadata(cluster), &errors);
716   }
717   // Return result.
718   if (!errors.ok()) {
719     return errors.status(absl::StatusCode::kInvalidArgument,
720                          "errors validating Cluster resource");
721   }
722   return cds_update;
723 }
724 
MaybeLogCluster(const XdsResourceType::DecodeContext & context,const envoy_config_cluster_v3_Cluster * cluster)725 void MaybeLogCluster(const XdsResourceType::DecodeContext& context,
726                      const envoy_config_cluster_v3_Cluster* cluster) {
727   if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer) && ABSL_VLOG_IS_ON(2)) {
728     const upb_MessageDef* msg_type =
729         envoy_config_cluster_v3_Cluster_getmsgdef(context.symtab);
730     char buf[10240];
731     upb_TextEncode(reinterpret_cast<const upb_Message*>(cluster), msg_type,
732                    nullptr, 0, buf, sizeof(buf));
733     VLOG(2) << "[xds_client " << context.client << "] Cluster: " << buf;
734   }
735 }
736 
737 }  // namespace
738 
Decode(const XdsResourceType::DecodeContext & context,absl::string_view serialized_resource) const739 XdsResourceType::DecodeResult XdsClusterResourceType::Decode(
740     const XdsResourceType::DecodeContext& context,
741     absl::string_view serialized_resource) const {
742   DecodeResult result;
743   // Parse serialized proto.
744   auto* resource = envoy_config_cluster_v3_Cluster_parse(
745       serialized_resource.data(), serialized_resource.size(), context.arena);
746   if (resource == nullptr) {
747     result.resource =
748         absl::InvalidArgumentError("Can't parse Cluster resource.");
749     return result;
750   }
751   MaybeLogCluster(context, resource);
752   // Validate resource.
753   result.name =
754       UpbStringToStdString(envoy_config_cluster_v3_Cluster_name(resource));
755   auto cds_resource = CdsResourceParse(context, resource);
756   if (!cds_resource.ok()) {
757     if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer)) {
758       LOG(ERROR) << "[xds_client " << context.client << "] invalid Cluster "
759                  << *result.name << ": " << cds_resource.status();
760     }
761     result.resource = cds_resource.status();
762   } else {
763     if (GRPC_TRACE_FLAG_ENABLED_OBJ(*context.tracer)) {
764       LOG(INFO) << "[xds_client " << context.client << "] parsed Cluster "
765                 << *result.name << ": " << (*cds_resource)->ToString();
766     }
767     result.resource = std::move(*cds_resource);
768   }
769   return result;
770 }
771 
772 }  // namespace grpc_core
773