• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/xds/xds_cluster.h"
20 
21 #include <stddef.h>
22 
23 #include <algorithm>
24 #include <memory>
25 #include <utility>
26 
27 #include "absl/status/status.h"
28 #include "absl/status/statusor.h"
29 #include "absl/strings/match.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/strings/strip.h"
33 #include "absl/types/variant.h"
34 #include "envoy/config/cluster/v3/circuit_breaker.upb.h"
35 #include "envoy/config/cluster/v3/cluster.upb.h"
36 #include "envoy/config/cluster/v3/cluster.upbdefs.h"
37 #include "envoy/config/cluster/v3/outlier_detection.upb.h"
38 #include "envoy/config/core/v3/address.upb.h"
39 #include "envoy/config/core/v3/base.upb.h"
40 #include "envoy/config/core/v3/config_source.upb.h"
41 #include "envoy/config/core/v3/extension.upb.h"
42 #include "envoy/config/core/v3/health_check.upb.h"
43 #include "envoy/config/core/v3/protocol.upb.h"
44 #include "envoy/config/endpoint/v3/endpoint.upb.h"
45 #include "envoy/config/endpoint/v3/endpoint_components.upb.h"
46 #include "envoy/extensions/clusters/aggregate/v3/cluster.upb.h"
47 #include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h"
48 #include "envoy/extensions/upstreams/http/v3/http_protocol_options.upb.h"
49 #include "google/protobuf/any.upb.h"
50 #include "google/protobuf/duration.upb.h"
51 #include "google/protobuf/struct.upb.h"
52 #include "google/protobuf/wrappers.upb.h"
53 #include "upb/base/string_view.h"
54 #include "upb/text/encode.h"
55 
56 #include <grpc/support/json.h>
57 #include <grpc/support/log.h>
58 
59 #include "src/core/ext/xds/upb_utils.h"
60 #include "src/core/ext/xds/xds_client.h"
61 #include "src/core/ext/xds/xds_common_types.h"
62 #include "src/core/ext/xds/xds_lb_policy_registry.h"
63 #include "src/core/ext/xds/xds_resource_type.h"
64 #include "src/core/lib/config/core_configuration.h"
65 #include "src/core/lib/debug/trace.h"
66 #include "src/core/lib/gprpp/host_port.h"
67 #include "src/core/lib/gprpp/match.h"
68 #include "src/core/lib/gprpp/ref_counted_ptr.h"
69 #include "src/core/lib/gprpp/time.h"
70 #include "src/core/lib/gprpp/validation_errors.h"
71 #include "src/core/lib/json/json_writer.h"
72 #include "src/core/lib/matchers/matchers.h"
73 #include "src/core/load_balancing/lb_policy_registry.h"
74 
75 namespace grpc_core {
76 
77 //
78 // XdsClusterResource
79 //
80 
ToString() const81 std::string XdsClusterResource::ToString() const {
82   std::vector<std::string> contents;
83   Match(
84       type,
85       [&](const Eds& eds) {
86         contents.push_back("type=EDS");
87         if (!eds.eds_service_name.empty()) {
88           contents.push_back(
89               absl::StrCat("eds_service_name=", eds.eds_service_name));
90         }
91       },
92       [&](const LogicalDns& logical_dns) {
93         contents.push_back("type=LOGICAL_DNS");
94         contents.push_back(absl::StrCat("dns_hostname=", logical_dns.hostname));
95       },
96       [&](const Aggregate& aggregate) {
97         contents.push_back("type=AGGREGATE");
98         contents.push_back(absl::StrCat(
99             "prioritized_cluster_names=[",
100             absl::StrJoin(aggregate.prioritized_cluster_names, ", "), "]"));
101       });
102   contents.push_back(absl::StrCat("lb_policy_config=",
103                                   JsonDump(Json::FromArray(lb_policy_config))));
104   if (lrs_load_reporting_server.has_value()) {
105     contents.push_back(absl::StrCat("lrs_load_reporting_server_name=",
106                                     lrs_load_reporting_server->server_uri()));
107   }
108   if (!common_tls_context.Empty()) {
109     contents.push_back(
110         absl::StrCat("common_tls_context=", common_tls_context.ToString()));
111   }
112   if (connection_idle_timeout != Duration::Zero()) {
113     contents.push_back(absl::StrCat("connection_idle_timeout=",
114                                     connection_idle_timeout.ToString()));
115   }
116   contents.push_back(
117       absl::StrCat("max_concurrent_requests=", max_concurrent_requests));
118   contents.push_back(absl::StrCat("override_host_statuses=",
119                                   override_host_statuses.ToString()));
120   return absl::StrCat("{", absl::StrJoin(contents, ", "), "}");
121 }
122 
123 //
124 // XdsClusterResourceType
125 //
126 
127 namespace {
128 
UpstreamTlsContextParse(const XdsResourceType::DecodeContext & context,const envoy_config_core_v3_TransportSocket * transport_socket,ValidationErrors * errors)129 CommonTlsContext UpstreamTlsContextParse(
130     const XdsResourceType::DecodeContext& context,
131     const envoy_config_core_v3_TransportSocket* transport_socket,
132     ValidationErrors* errors) {
133   ValidationErrors::ScopedField field(errors, ".typed_config");
134   const auto* typed_config =
135       envoy_config_core_v3_TransportSocket_typed_config(transport_socket);
136   auto extension = ExtractXdsExtension(context, typed_config, errors);
137   if (!extension.has_value()) return {};
138   if (extension->type !=
139       "envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext") {
140     ValidationErrors::ScopedField field(errors, ".type_url");
141     errors->AddError("unsupported transport socket type");
142     return {};
143   }
144   absl::string_view* serialized_upstream_tls_context =
145       absl::get_if<absl::string_view>(&extension->value);
146   if (serialized_upstream_tls_context == nullptr) {
147     errors->AddError("can't decode UpstreamTlsContext");
148     return {};
149   }
150   const auto* upstream_tls_context =
151       envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_parse(
152           serialized_upstream_tls_context->data(),
153           serialized_upstream_tls_context->size(), context.arena);
154   if (upstream_tls_context == nullptr) {
155     errors->AddError("can't decode UpstreamTlsContext");
156     return {};
157   }
158   ValidationErrors::ScopedField field3(errors, ".common_tls_context");
159   const auto* common_tls_context_proto =
160       envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_common_tls_context(
161           upstream_tls_context);
162   CommonTlsContext common_tls_context;
163   if (common_tls_context_proto != nullptr) {
164     common_tls_context =
165         CommonTlsContext::Parse(context, common_tls_context_proto, errors);
166   }
167   if (common_tls_context.certificate_validation_context
168           .ca_certificate_provider_instance.instance_name.empty()) {
169     errors->AddError("no CA certificate provider instance configured");
170   }
171   return common_tls_context;
172 }
173 
EdsConfigParse(const envoy_config_cluster_v3_Cluster * cluster,ValidationErrors * errors)174 XdsClusterResource::Eds EdsConfigParse(
175     const envoy_config_cluster_v3_Cluster* cluster, ValidationErrors* errors) {
176   XdsClusterResource::Eds eds;
177   ValidationErrors::ScopedField field(errors, ".eds_cluster_config");
178   const envoy_config_cluster_v3_Cluster_EdsClusterConfig* eds_cluster_config =
179       envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster);
180   if (eds_cluster_config == nullptr) {
181     errors->AddError("field not present");
182   } else {
183     // Validate ConfigSource.
184     {
185       ValidationErrors::ScopedField field(errors, ".eds_config");
186       const envoy_config_core_v3_ConfigSource* eds_config =
187           envoy_config_cluster_v3_Cluster_EdsClusterConfig_eds_config(
188               eds_cluster_config);
189       if (eds_config == nullptr) {
190         errors->AddError("field not present");
191       } else {
192         if (!envoy_config_core_v3_ConfigSource_has_ads(eds_config) &&
193             !envoy_config_core_v3_ConfigSource_has_self(eds_config)) {
194           errors->AddError("ConfigSource is not ads or self");
195         }
196       }
197     }
198     // Record EDS service_name (if any).
199     // This field is required if the CDS resource has an xdstp name.
200     eds.eds_service_name = UpbStringToStdString(
201         envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name(
202             eds_cluster_config));
203     if (eds.eds_service_name.empty()) {
204       absl::string_view cluster_name =
205           UpbStringToAbsl(envoy_config_cluster_v3_Cluster_name(cluster));
206       if (absl::StartsWith(cluster_name, "xdstp:")) {
207         ValidationErrors::ScopedField field(errors, ".service_name");
208         errors->AddError("must be set if Cluster resource has an xdstp name");
209       }
210     }
211   }
212   return eds;
213 }
214 
LogicalDnsParse(const envoy_config_cluster_v3_Cluster * cluster,ValidationErrors * errors)215 XdsClusterResource::LogicalDns LogicalDnsParse(
216     const envoy_config_cluster_v3_Cluster* cluster, ValidationErrors* errors) {
217   XdsClusterResource::LogicalDns logical_dns;
218   ValidationErrors::ScopedField field(errors, ".load_assignment");
219   const auto* load_assignment =
220       envoy_config_cluster_v3_Cluster_load_assignment(cluster);
221   if (load_assignment == nullptr) {
222     errors->AddError("field not present for LOGICAL_DNS cluster");
223     return logical_dns;
224   }
225   ValidationErrors::ScopedField field2(errors, ".endpoints");
226   size_t num_localities;
227   const auto* const* localities =
228       envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints(load_assignment,
229                                                                &num_localities);
230   if (num_localities != 1) {
231     errors->AddError(absl::StrCat(
232         "must contain exactly one locality for LOGICAL_DNS cluster, found ",
233         num_localities));
234     return logical_dns;
235   }
236   ValidationErrors::ScopedField field3(errors, "[0].lb_endpoints");
237   size_t num_endpoints;
238   const auto* const* endpoints =
239       envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints(localities[0],
240                                                                 &num_endpoints);
241   if (num_endpoints != 1) {
242     errors->AddError(absl::StrCat(
243         "must contain exactly one endpoint for LOGICAL_DNS cluster, found ",
244         num_endpoints));
245     return logical_dns;
246   }
247   ValidationErrors::ScopedField field4(errors, "[0].endpoint");
248   const auto* endpoint =
249       envoy_config_endpoint_v3_LbEndpoint_endpoint(endpoints[0]);
250   if (endpoint == nullptr) {
251     errors->AddError("field not present");
252     return logical_dns;
253   }
254   ValidationErrors::ScopedField field5(errors, ".address");
255   const auto* address = envoy_config_endpoint_v3_Endpoint_address(endpoint);
256   if (address == nullptr) {
257     errors->AddError("field not present");
258     return logical_dns;
259   }
260   ValidationErrors::ScopedField field6(errors, ".socket_address");
261   const auto* socket_address =
262       envoy_config_core_v3_Address_socket_address(address);
263   if (socket_address == nullptr) {
264     errors->AddError("field not present");
265     return logical_dns;
266   }
267   if (envoy_config_core_v3_SocketAddress_resolver_name(socket_address).size !=
268       0) {
269     ValidationErrors::ScopedField field(errors, ".resolver_name");
270     errors->AddError(
271         "LOGICAL_DNS clusters must NOT have a custom resolver name set");
272   }
273   absl::string_view address_str = UpbStringToAbsl(
274       envoy_config_core_v3_SocketAddress_address(socket_address));
275   if (address_str.empty()) {
276     ValidationErrors::ScopedField field(errors, ".address");
277     errors->AddError("field not present");
278   }
279   if (!envoy_config_core_v3_SocketAddress_has_port_value(socket_address)) {
280     ValidationErrors::ScopedField field(errors, ".port_value");
281     errors->AddError("field not present");
282   }
283   logical_dns.hostname = JoinHostPort(
284       address_str,
285       envoy_config_core_v3_SocketAddress_port_value(socket_address));
286   return logical_dns;
287 }
288 
AggregateClusterParse(const XdsResourceType::DecodeContext & context,absl::string_view serialized_config,ValidationErrors * errors)289 XdsClusterResource::Aggregate AggregateClusterParse(
290     const XdsResourceType::DecodeContext& context,
291     absl::string_view serialized_config, ValidationErrors* errors) {
292   XdsClusterResource::Aggregate aggregate;
293   const auto* aggregate_cluster_config =
294       envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse(
295           serialized_config.data(), serialized_config.size(), context.arena);
296   if (aggregate_cluster_config == nullptr) {
297     errors->AddError("can't parse aggregate cluster config");
298     return aggregate;
299   }
300   size_t size;
301   const upb_StringView* clusters =
302       envoy_extensions_clusters_aggregate_v3_ClusterConfig_clusters(
303           aggregate_cluster_config, &size);
304   if (size == 0) {
305     ValidationErrors::ScopedField field(errors, ".clusters");
306     errors->AddError("must be non-empty");
307   }
308   for (size_t i = 0; i < size; ++i) {
309     aggregate.prioritized_cluster_names.emplace_back(
310         UpbStringToStdString(clusters[i]));
311   }
312   return aggregate;
313 }
314 
ParseLbPolicyConfig(const XdsResourceType::DecodeContext & context,const envoy_config_cluster_v3_Cluster * cluster,XdsClusterResource * cds_update,ValidationErrors * errors)315 void ParseLbPolicyConfig(const XdsResourceType::DecodeContext& context,
316                          const envoy_config_cluster_v3_Cluster* cluster,
317                          XdsClusterResource* cds_update,
318                          ValidationErrors* errors) {
319   // First, check the new load_balancing_policy field.
320   const auto* load_balancing_policy =
321       envoy_config_cluster_v3_Cluster_load_balancing_policy(cluster);
322   if (load_balancing_policy != nullptr) {
323     const auto& registry =
324         static_cast<const GrpcXdsBootstrap&>(context.client->bootstrap())
325             .lb_policy_registry();
326     ValidationErrors::ScopedField field(errors, ".load_balancing_policy");
327     const size_t original_error_count = errors->size();
328     cds_update->lb_policy_config = registry.ConvertXdsLbPolicyConfig(
329         context, load_balancing_policy, errors);
330     // If there were no conversion errors, validate that the converted config
331     // parses with the gRPC LB policy registry.
332     if (original_error_count == errors->size()) {
333       auto config = CoreConfiguration::Get()
334                         .lb_policy_registry()
335                         .ParseLoadBalancingConfig(
336                             Json::FromArray(cds_update->lb_policy_config));
337       if (!config.ok()) errors->AddError(config.status().message());
338     }
339     return;
340   }
341   // Didn't find load_balancing_policy field, so fall back to the old
342   // lb_policy enum field.
343   if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) ==
344       envoy_config_cluster_v3_Cluster_ROUND_ROBIN) {
345     cds_update->lb_policy_config = {
346         Json::FromObject({
347             {"xds_wrr_locality_experimental",
348              Json::FromObject({
349                  {"childPolicy", Json::FromArray({
350                                      Json::FromObject({
351                                          {"round_robin", Json::FromObject({})},
352                                      }),
353                                  })},
354              })},
355         }),
356     };
357   } else if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) ==
358              envoy_config_cluster_v3_Cluster_RING_HASH) {
359     // Record ring hash lb config
360     auto* ring_hash_config =
361         envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster);
362     uint64_t min_ring_size = 1024;
363     uint64_t max_ring_size = 8388608;
364     if (ring_hash_config != nullptr) {
365       ValidationErrors::ScopedField field(errors, ".ring_hash_lb_config");
366       const google_protobuf_UInt64Value* uint64_value =
367           envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size(
368               ring_hash_config);
369       if (uint64_value != nullptr) {
370         ValidationErrors::ScopedField field(errors, ".maximum_ring_size");
371         max_ring_size = google_protobuf_UInt64Value_value(uint64_value);
372         if (max_ring_size > 8388608 || max_ring_size == 0) {
373           errors->AddError("must be in the range of 1 to 8388608");
374         }
375       }
376       uint64_value =
377           envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size(
378               ring_hash_config);
379       if (uint64_value != nullptr) {
380         ValidationErrors::ScopedField field(errors, ".minimum_ring_size");
381         min_ring_size = google_protobuf_UInt64Value_value(uint64_value);
382         if (min_ring_size > 8388608 || min_ring_size == 0) {
383           errors->AddError("must be in the range of 1 to 8388608");
384         }
385         if (min_ring_size > max_ring_size) {
386           errors->AddError("cannot be greater than maximum_ring_size");
387         }
388       }
389       if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
390               ring_hash_config) !=
391           envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) {
392         ValidationErrors::ScopedField field(errors, ".hash_function");
393         errors->AddError("invalid hash function");
394       }
395     }
396     cds_update->lb_policy_config = {
397         Json::FromObject({
398             {"ring_hash_experimental",
399              Json::FromObject({
400                  {"minRingSize", Json::FromNumber(min_ring_size)},
401                  {"maxRingSize", Json::FromNumber(max_ring_size)},
402              })},
403         }),
404     };
405   } else {
406     ValidationErrors::ScopedField field(errors, ".lb_policy");
407     errors->AddError("LB policy is not supported");
408   }
409 }
410 
ParseUpstreamConfig(const XdsResourceType::DecodeContext & context,const envoy_config_core_v3_TypedExtensionConfig * upstream_config,XdsClusterResource * cds_update,ValidationErrors * errors)411 void ParseUpstreamConfig(
412     const XdsResourceType::DecodeContext& context,
413     const envoy_config_core_v3_TypedExtensionConfig* upstream_config,
414     XdsClusterResource* cds_update, ValidationErrors* errors) {
415   ValidationErrors::ScopedField field(errors, ".typed_config");
416   const auto* typed_config =
417       envoy_config_core_v3_TypedExtensionConfig_typed_config(upstream_config);
418   auto extension = ExtractXdsExtension(context, typed_config, errors);
419   if (!extension.has_value()) return;
420   if (extension->type !=
421       "envoy.extensions.upstreams.http.v3.HttpProtocolOptions") {
422     ValidationErrors::ScopedField field(errors, ".type_url");
423     errors->AddError("unsupported upstream config type");
424     return;
425   }
426   absl::string_view* serialized_http_protocol_options =
427       absl::get_if<absl::string_view>(&extension->value);
428   if (serialized_http_protocol_options == nullptr) {
429     errors->AddError("can't decode HttpProtocolOptions");
430     return;
431   }
432   const auto* http_protocol_options =
433       envoy_extensions_upstreams_http_v3_HttpProtocolOptions_parse(
434           serialized_http_protocol_options->data(),
435           serialized_http_protocol_options->size(), context.arena);
436   if (http_protocol_options == nullptr) {
437     errors->AddError("can't decode HttpProtocolOptions");
438     return;
439   }
440   ValidationErrors::ScopedField field2(errors, ".common_http_protocol_options");
441   const auto* common_http_protocol_options =
442       envoy_extensions_upstreams_http_v3_HttpProtocolOptions_common_http_protocol_options(
443           http_protocol_options);
444   if (common_http_protocol_options != nullptr) {
445     const auto* idle_timeout =
446         envoy_config_core_v3_HttpProtocolOptions_idle_timeout(
447             common_http_protocol_options);
448     if (idle_timeout != nullptr) {
449       ValidationErrors::ScopedField field(errors, ".idle_timeout");
450       cds_update->connection_idle_timeout = ParseDuration(idle_timeout, errors);
451     }
452   }
453 }
454 
CdsResourceParse(const XdsResourceType::DecodeContext & context,const envoy_config_cluster_v3_Cluster * cluster)455 absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
456     const XdsResourceType::DecodeContext& context,
457     const envoy_config_cluster_v3_Cluster* cluster) {
458   auto cds_update = std::make_shared<XdsClusterResource>();
459   ValidationErrors errors;
460   // Check the cluster discovery type.
461   if (envoy_config_cluster_v3_Cluster_type(cluster) ==
462       envoy_config_cluster_v3_Cluster_EDS) {
463     cds_update->type = EdsConfigParse(cluster, &errors);
464   } else if (envoy_config_cluster_v3_Cluster_type(cluster) ==
465              envoy_config_cluster_v3_Cluster_LOGICAL_DNS) {
466     cds_update->type = LogicalDnsParse(cluster, &errors);
467   } else if (envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) {
468     ValidationErrors::ScopedField field(&errors, ".cluster_type");
469     const auto* custom_cluster_type =
470         envoy_config_cluster_v3_Cluster_cluster_type(cluster);
471     GPR_ASSERT(custom_cluster_type != nullptr);
472     ValidationErrors::ScopedField field2(&errors, ".typed_config");
473     const auto* typed_config =
474         envoy_config_cluster_v3_Cluster_CustomClusterType_typed_config(
475             custom_cluster_type);
476     if (typed_config == nullptr) {
477       errors.AddError("field not present");
478     } else {
479       absl::string_view type_url = absl::StripPrefix(
480           UpbStringToAbsl(google_protobuf_Any_type_url(typed_config)),
481           "type.googleapis.com/");
482       if (type_url != "envoy.extensions.clusters.aggregate.v3.ClusterConfig") {
483         ValidationErrors::ScopedField field(&errors, ".type_url");
484         errors.AddError(
485             absl::StrCat("unknown cluster_type extension: ", type_url));
486       } else {
487         // Retrieve aggregate clusters.
488         ValidationErrors::ScopedField field(
489             &errors,
490             ".value[envoy.extensions.clusters.aggregate.v3.ClusterConfig]");
491         absl::string_view serialized_config =
492             UpbStringToAbsl(google_protobuf_Any_value(typed_config));
493         cds_update->type =
494             AggregateClusterParse(context, serialized_config, &errors);
495       }
496     }
497   } else {
498     ValidationErrors::ScopedField field(&errors, ".type");
499     errors.AddError("unknown discovery type");
500   }
501   // Check the LB policy.
502   ParseLbPolicyConfig(context, cluster, cds_update.get(), &errors);
503   // transport_socket
504   auto* transport_socket =
505       envoy_config_cluster_v3_Cluster_transport_socket(cluster);
506   if (transport_socket != nullptr) {
507     ValidationErrors::ScopedField field(&errors, ".transport_socket");
508     cds_update->common_tls_context =
509         UpstreamTlsContextParse(context, transport_socket, &errors);
510   }
511   // Record LRS server name (if any).
512   const envoy_config_core_v3_ConfigSource* lrs_server =
513       envoy_config_cluster_v3_Cluster_lrs_server(cluster);
514   if (lrs_server != nullptr) {
515     if (!envoy_config_core_v3_ConfigSource_has_self(lrs_server)) {
516       ValidationErrors::ScopedField field(&errors, ".lrs_server");
517       errors.AddError("ConfigSource is not self");
518     }
519     cds_update->lrs_load_reporting_server.emplace(
520         static_cast<const GrpcXdsBootstrap::GrpcXdsServer&>(context.server));
521   }
522   // Protocol options.
523   auto* upstream_config =
524       envoy_config_cluster_v3_Cluster_upstream_config(cluster);
525   if (upstream_config != nullptr) {
526     ValidationErrors::ScopedField field(&errors, ".upstream_config");
527     ParseUpstreamConfig(context, upstream_config, cds_update.get(), &errors);
528   }
529   // The Cluster resource encodes the circuit breaking parameters in a list of
530   // Thresholds messages, where each message specifies the parameters for a
531   // particular RoutingPriority. we will look only at the first entry in the
532   // list for priority DEFAULT and default to 1024 if not found.
533   if (envoy_config_cluster_v3_Cluster_has_circuit_breakers(cluster)) {
534     const envoy_config_cluster_v3_CircuitBreakers* circuit_breakers =
535         envoy_config_cluster_v3_Cluster_circuit_breakers(cluster);
536     size_t num_thresholds;
537     const envoy_config_cluster_v3_CircuitBreakers_Thresholds* const*
538         thresholds = envoy_config_cluster_v3_CircuitBreakers_thresholds(
539             circuit_breakers, &num_thresholds);
540     for (size_t i = 0; i < num_thresholds; ++i) {
541       const auto* threshold = thresholds[i];
542       if (envoy_config_cluster_v3_CircuitBreakers_Thresholds_priority(
543               threshold) == envoy_config_core_v3_DEFAULT) {
544         const google_protobuf_UInt32Value* max_requests =
545             envoy_config_cluster_v3_CircuitBreakers_Thresholds_max_requests(
546                 threshold);
547         if (max_requests != nullptr) {
548           cds_update->max_concurrent_requests =
549               google_protobuf_UInt32Value_value(max_requests);
550         }
551         break;
552       }
553     }
554   }
555   // Outlier detection config.
556   if (envoy_config_cluster_v3_Cluster_has_outlier_detection(cluster)) {
557     ValidationErrors::ScopedField field(&errors, ".outlier_detection");
558     OutlierDetectionConfig outlier_detection_update;
559     const envoy_config_cluster_v3_OutlierDetection* outlier_detection =
560         envoy_config_cluster_v3_Cluster_outlier_detection(cluster);
561     const google_protobuf_Duration* duration =
562         envoy_config_cluster_v3_OutlierDetection_interval(outlier_detection);
563     if (duration != nullptr) {
564       ValidationErrors::ScopedField field(&errors, ".interval");
565       outlier_detection_update.interval = ParseDuration(duration, &errors);
566     }
567     duration = envoy_config_cluster_v3_OutlierDetection_base_ejection_time(
568         outlier_detection);
569     if (duration != nullptr) {
570       ValidationErrors::ScopedField field(&errors, ".base_ejection_time");
571       outlier_detection_update.base_ejection_time =
572           ParseDuration(duration, &errors);
573     }
574     duration = envoy_config_cluster_v3_OutlierDetection_max_ejection_time(
575         outlier_detection);
576     if (duration != nullptr) {
577       ValidationErrors::ScopedField field(&errors, ".max_ejection_time");
578       outlier_detection_update.max_ejection_time =
579           ParseDuration(duration, &errors);
580     }
581     const google_protobuf_UInt32Value* max_ejection_percent =
582         envoy_config_cluster_v3_OutlierDetection_max_ejection_percent(
583             outlier_detection);
584     if (max_ejection_percent != nullptr) {
585       outlier_detection_update.max_ejection_percent =
586           google_protobuf_UInt32Value_value(max_ejection_percent);
587       if (outlier_detection_update.max_ejection_percent > 100) {
588         ValidationErrors::ScopedField field(&errors, ".max_ejection_percent");
589         errors.AddError("value must be <= 100");
590       }
591     }
592     const google_protobuf_UInt32Value* enforcing_success_rate =
593         envoy_config_cluster_v3_OutlierDetection_enforcing_success_rate(
594             outlier_detection);
595     if (enforcing_success_rate != nullptr) {
596       uint32_t enforcement_percentage =
597           google_protobuf_UInt32Value_value(enforcing_success_rate);
598       if (enforcement_percentage > 100) {
599         ValidationErrors::ScopedField field(&errors, ".enforcing_success_rate");
600         errors.AddError("value must be <= 100");
601       }
602       if (enforcement_percentage != 0) {
603         OutlierDetectionConfig::SuccessRateEjection success_rate_ejection;
604         success_rate_ejection.enforcement_percentage = enforcement_percentage;
605         const google_protobuf_UInt32Value* minimum_hosts =
606             envoy_config_cluster_v3_OutlierDetection_success_rate_minimum_hosts(
607                 outlier_detection);
608         if (minimum_hosts != nullptr) {
609           success_rate_ejection.minimum_hosts =
610               google_protobuf_UInt32Value_value(minimum_hosts);
611         }
612         const google_protobuf_UInt32Value* request_volume =
613             envoy_config_cluster_v3_OutlierDetection_success_rate_request_volume(
614                 outlier_detection);
615         if (request_volume != nullptr) {
616           success_rate_ejection.request_volume =
617               google_protobuf_UInt32Value_value(request_volume);
618         }
619         const google_protobuf_UInt32Value* stdev_factor =
620             envoy_config_cluster_v3_OutlierDetection_success_rate_stdev_factor(
621                 outlier_detection);
622         if (stdev_factor != nullptr) {
623           success_rate_ejection.stdev_factor =
624               google_protobuf_UInt32Value_value(stdev_factor);
625         }
626         outlier_detection_update.success_rate_ejection = success_rate_ejection;
627       }
628     }
629     const google_protobuf_UInt32Value* enforcing_failure_percentage =
630         envoy_config_cluster_v3_OutlierDetection_enforcing_failure_percentage(
631             outlier_detection);
632     if (enforcing_failure_percentage != nullptr) {
633       uint32_t enforcement_percentage =
634           google_protobuf_UInt32Value_value(enforcing_failure_percentage);
635       if (enforcement_percentage > 100) {
636         ValidationErrors::ScopedField field(&errors,
637                                             ".enforcing_failure_percentage");
638         errors.AddError("value must be <= 100");
639       }
640       if (enforcement_percentage != 0) {
641         OutlierDetectionConfig::FailurePercentageEjection
642             failure_percentage_ejection;
643         failure_percentage_ejection.enforcement_percentage =
644             enforcement_percentage;
645         const google_protobuf_UInt32Value* minimum_hosts =
646             envoy_config_cluster_v3_OutlierDetection_failure_percentage_minimum_hosts(
647                 outlier_detection);
648         if (minimum_hosts != nullptr) {
649           failure_percentage_ejection.minimum_hosts =
650               google_protobuf_UInt32Value_value(minimum_hosts);
651         }
652         const google_protobuf_UInt32Value* request_volume =
653             envoy_config_cluster_v3_OutlierDetection_failure_percentage_request_volume(
654                 outlier_detection);
655         if (request_volume != nullptr) {
656           failure_percentage_ejection.request_volume =
657               google_protobuf_UInt32Value_value(request_volume);
658         }
659         const google_protobuf_UInt32Value* threshold =
660             envoy_config_cluster_v3_OutlierDetection_failure_percentage_threshold(
661                 outlier_detection);
662         if (threshold != nullptr) {
663           failure_percentage_ejection.threshold =
664               google_protobuf_UInt32Value_value(threshold);
665           if (enforcement_percentage > 100) {
666             ValidationErrors::ScopedField field(
667                 &errors, ".failure_percentage_threshold");
668             errors.AddError("value must be <= 100");
669           }
670         }
671         outlier_detection_update.failure_percentage_ejection =
672             failure_percentage_ejection;
673       }
674     }
675     cds_update->outlier_detection = outlier_detection_update;
676   }
677   // Validate override host status.
678   const auto* common_lb_config =
679       envoy_config_cluster_v3_Cluster_common_lb_config(cluster);
680   bool override_host_status_found = false;
681   if (common_lb_config != nullptr) {
682     ValidationErrors::ScopedField field(&errors, ".common_lb_config");
683     const auto* override_host_status =
684         envoy_config_cluster_v3_Cluster_CommonLbConfig_override_host_status(
685             common_lb_config);
686     if (override_host_status != nullptr) {
687       ValidationErrors::ScopedField field(&errors, ".override_host_status");
688       size_t size;
689       const int32_t* statuses = envoy_config_core_v3_HealthStatusSet_statuses(
690           override_host_status, &size);
691       for (size_t i = 0; i < size; ++i) {
692         auto status = XdsHealthStatus::FromUpb(statuses[i]);
693         if (status.has_value()) {
694           cds_update->override_host_statuses.Add(*status);
695         }
696       }
697       override_host_status_found = true;
698     }
699   }
700   // If the field is not set, we default to [UNKNOWN, HEALTHY].
701   if (!override_host_status_found) {
702     cds_update->override_host_statuses.Add(
703         XdsHealthStatus(XdsHealthStatus::kUnknown));
704     cds_update->override_host_statuses.Add(
705         XdsHealthStatus(XdsHealthStatus::kHealthy));
706   }
707   // Record telemetry labels (if any).
708   const envoy_config_core_v3_Metadata* metadata =
709       envoy_config_cluster_v3_Cluster_metadata(cluster);
710   if (metadata != nullptr) {
711     google_protobuf_Struct* telemetry_labels_struct;
712     if (envoy_config_core_v3_Metadata_filter_metadata_get(
713             metadata,
714             StdStringToUpbString(
715                 absl::string_view("com.google.csm.telemetry_labels")),
716             &telemetry_labels_struct)) {
717       size_t iter = kUpb_Map_Begin;
718       const google_protobuf_Struct_FieldsEntry* fields_entry;
719       while ((fields_entry = google_protobuf_Struct_fields_next(
720                   telemetry_labels_struct, &iter)) != nullptr) {
721         // Adds any entry whose value is a string to telemetry_labels.
722         const google_protobuf_Value* value =
723             google_protobuf_Struct_FieldsEntry_value(fields_entry);
724         if (google_protobuf_Value_has_string_value(value)) {
725           if (UpbStringToAbsl(google_protobuf_Struct_FieldsEntry_key(
726                   fields_entry)) == "service_name") {
727             cds_update->service_telemetry_label = RefCountedStringValue(
728                 UpbStringToAbsl(google_protobuf_Value_string_value(value)));
729           } else if (UpbStringToAbsl(google_protobuf_Struct_FieldsEntry_key(
730                          fields_entry)) == "service_namespace") {
731             cds_update->namespace_telemetry_label = RefCountedStringValue(
732                 UpbStringToAbsl(google_protobuf_Value_string_value(value)));
733           }
734         }
735       }
736     }
737   }
738   // Return result.
739   if (!errors.ok()) {
740     return errors.status(absl::StatusCode::kInvalidArgument,
741                          "errors validating Cluster resource");
742   }
743   return cds_update;
744 }
745 
MaybeLogCluster(const XdsResourceType::DecodeContext & context,const envoy_config_cluster_v3_Cluster * cluster)746 void MaybeLogCluster(const XdsResourceType::DecodeContext& context,
747                      const envoy_config_cluster_v3_Cluster* cluster) {
748   if (GRPC_TRACE_FLAG_ENABLED(*context.tracer) &&
749       gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
750     const upb_MessageDef* msg_type =
751         envoy_config_cluster_v3_Cluster_getmsgdef(context.symtab);
752     char buf[10240];
753     upb_TextEncode(reinterpret_cast<const upb_Message*>(cluster), msg_type,
754                    nullptr, 0, buf, sizeof(buf));
755     gpr_log(GPR_DEBUG, "[xds_client %p] Cluster: %s", context.client, buf);
756   }
757 }
758 
759 }  // namespace
760 
Decode(const XdsResourceType::DecodeContext & context,absl::string_view serialized_resource) const761 XdsResourceType::DecodeResult XdsClusterResourceType::Decode(
762     const XdsResourceType::DecodeContext& context,
763     absl::string_view serialized_resource) const {
764   DecodeResult result;
765   // Parse serialized proto.
766   auto* resource = envoy_config_cluster_v3_Cluster_parse(
767       serialized_resource.data(), serialized_resource.size(), context.arena);
768   if (resource == nullptr) {
769     result.resource =
770         absl::InvalidArgumentError("Can't parse Cluster resource.");
771     return result;
772   }
773   MaybeLogCluster(context, resource);
774   // Validate resource.
775   result.name =
776       UpbStringToStdString(envoy_config_cluster_v3_Cluster_name(resource));
777   auto cds_resource = CdsResourceParse(context, resource);
778   if (!cds_resource.ok()) {
779     if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
780       gpr_log(GPR_ERROR, "[xds_client %p] invalid Cluster %s: %s",
781               context.client, result.name->c_str(),
782               cds_resource.status().ToString().c_str());
783     }
784     result.resource = cds_resource.status();
785   } else {
786     if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
787       gpr_log(GPR_INFO, "[xds_client %p] parsed Cluster %s: %s", context.client,
788               result.name->c_str(), (*cds_resource)->ToString().c_str());
789     }
790     result.resource = std::move(*cds_resource);
791   }
792   return result;
793 }
794 
795 }  // namespace grpc_core
796