1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/xds/xds_cluster.h"
20
21 #include <stddef.h>
22
23 #include <algorithm>
24 #include <memory>
25 #include <utility>
26
27 #include "absl/status/status.h"
28 #include "absl/status/statusor.h"
29 #include "absl/strings/match.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/strings/strip.h"
33 #include "absl/types/variant.h"
34 #include "envoy/config/cluster/v3/circuit_breaker.upb.h"
35 #include "envoy/config/cluster/v3/cluster.upb.h"
36 #include "envoy/config/cluster/v3/cluster.upbdefs.h"
37 #include "envoy/config/cluster/v3/outlier_detection.upb.h"
38 #include "envoy/config/core/v3/address.upb.h"
39 #include "envoy/config/core/v3/base.upb.h"
40 #include "envoy/config/core/v3/config_source.upb.h"
41 #include "envoy/config/core/v3/extension.upb.h"
42 #include "envoy/config/core/v3/health_check.upb.h"
43 #include "envoy/config/core/v3/protocol.upb.h"
44 #include "envoy/config/endpoint/v3/endpoint.upb.h"
45 #include "envoy/config/endpoint/v3/endpoint_components.upb.h"
46 #include "envoy/extensions/clusters/aggregate/v3/cluster.upb.h"
47 #include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h"
48 #include "envoy/extensions/upstreams/http/v3/http_protocol_options.upb.h"
49 #include "google/protobuf/any.upb.h"
50 #include "google/protobuf/duration.upb.h"
51 #include "google/protobuf/struct.upb.h"
52 #include "google/protobuf/wrappers.upb.h"
53 #include "upb/base/string_view.h"
54 #include "upb/text/encode.h"
55
56 #include <grpc/support/json.h>
57 #include <grpc/support/log.h>
58
59 #include "src/core/ext/xds/upb_utils.h"
60 #include "src/core/ext/xds/xds_client.h"
61 #include "src/core/ext/xds/xds_common_types.h"
62 #include "src/core/ext/xds/xds_lb_policy_registry.h"
63 #include "src/core/ext/xds/xds_resource_type.h"
64 #include "src/core/lib/config/core_configuration.h"
65 #include "src/core/lib/debug/trace.h"
66 #include "src/core/lib/gprpp/host_port.h"
67 #include "src/core/lib/gprpp/match.h"
68 #include "src/core/lib/gprpp/ref_counted_ptr.h"
69 #include "src/core/lib/gprpp/time.h"
70 #include "src/core/lib/gprpp/validation_errors.h"
71 #include "src/core/lib/json/json_writer.h"
72 #include "src/core/lib/matchers/matchers.h"
73 #include "src/core/load_balancing/lb_policy_registry.h"
74
75 namespace grpc_core {
76
77 //
78 // XdsClusterResource
79 //
80
ToString() const81 std::string XdsClusterResource::ToString() const {
82 std::vector<std::string> contents;
83 Match(
84 type,
85 [&](const Eds& eds) {
86 contents.push_back("type=EDS");
87 if (!eds.eds_service_name.empty()) {
88 contents.push_back(
89 absl::StrCat("eds_service_name=", eds.eds_service_name));
90 }
91 },
92 [&](const LogicalDns& logical_dns) {
93 contents.push_back("type=LOGICAL_DNS");
94 contents.push_back(absl::StrCat("dns_hostname=", logical_dns.hostname));
95 },
96 [&](const Aggregate& aggregate) {
97 contents.push_back("type=AGGREGATE");
98 contents.push_back(absl::StrCat(
99 "prioritized_cluster_names=[",
100 absl::StrJoin(aggregate.prioritized_cluster_names, ", "), "]"));
101 });
102 contents.push_back(absl::StrCat("lb_policy_config=",
103 JsonDump(Json::FromArray(lb_policy_config))));
104 if (lrs_load_reporting_server.has_value()) {
105 contents.push_back(absl::StrCat("lrs_load_reporting_server_name=",
106 lrs_load_reporting_server->server_uri()));
107 }
108 if (!common_tls_context.Empty()) {
109 contents.push_back(
110 absl::StrCat("common_tls_context=", common_tls_context.ToString()));
111 }
112 if (connection_idle_timeout != Duration::Zero()) {
113 contents.push_back(absl::StrCat("connection_idle_timeout=",
114 connection_idle_timeout.ToString()));
115 }
116 contents.push_back(
117 absl::StrCat("max_concurrent_requests=", max_concurrent_requests));
118 contents.push_back(absl::StrCat("override_host_statuses=",
119 override_host_statuses.ToString()));
120 return absl::StrCat("{", absl::StrJoin(contents, ", "), "}");
121 }
122
123 //
124 // XdsClusterResourceType
125 //
126
127 namespace {
128
UpstreamTlsContextParse(const XdsResourceType::DecodeContext & context,const envoy_config_core_v3_TransportSocket * transport_socket,ValidationErrors * errors)129 CommonTlsContext UpstreamTlsContextParse(
130 const XdsResourceType::DecodeContext& context,
131 const envoy_config_core_v3_TransportSocket* transport_socket,
132 ValidationErrors* errors) {
133 ValidationErrors::ScopedField field(errors, ".typed_config");
134 const auto* typed_config =
135 envoy_config_core_v3_TransportSocket_typed_config(transport_socket);
136 auto extension = ExtractXdsExtension(context, typed_config, errors);
137 if (!extension.has_value()) return {};
138 if (extension->type !=
139 "envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext") {
140 ValidationErrors::ScopedField field(errors, ".type_url");
141 errors->AddError("unsupported transport socket type");
142 return {};
143 }
144 absl::string_view* serialized_upstream_tls_context =
145 absl::get_if<absl::string_view>(&extension->value);
146 if (serialized_upstream_tls_context == nullptr) {
147 errors->AddError("can't decode UpstreamTlsContext");
148 return {};
149 }
150 const auto* upstream_tls_context =
151 envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_parse(
152 serialized_upstream_tls_context->data(),
153 serialized_upstream_tls_context->size(), context.arena);
154 if (upstream_tls_context == nullptr) {
155 errors->AddError("can't decode UpstreamTlsContext");
156 return {};
157 }
158 ValidationErrors::ScopedField field3(errors, ".common_tls_context");
159 const auto* common_tls_context_proto =
160 envoy_extensions_transport_sockets_tls_v3_UpstreamTlsContext_common_tls_context(
161 upstream_tls_context);
162 CommonTlsContext common_tls_context;
163 if (common_tls_context_proto != nullptr) {
164 common_tls_context =
165 CommonTlsContext::Parse(context, common_tls_context_proto, errors);
166 }
167 if (common_tls_context.certificate_validation_context
168 .ca_certificate_provider_instance.instance_name.empty()) {
169 errors->AddError("no CA certificate provider instance configured");
170 }
171 return common_tls_context;
172 }
173
EdsConfigParse(const envoy_config_cluster_v3_Cluster * cluster,ValidationErrors * errors)174 XdsClusterResource::Eds EdsConfigParse(
175 const envoy_config_cluster_v3_Cluster* cluster, ValidationErrors* errors) {
176 XdsClusterResource::Eds eds;
177 ValidationErrors::ScopedField field(errors, ".eds_cluster_config");
178 const envoy_config_cluster_v3_Cluster_EdsClusterConfig* eds_cluster_config =
179 envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster);
180 if (eds_cluster_config == nullptr) {
181 errors->AddError("field not present");
182 } else {
183 // Validate ConfigSource.
184 {
185 ValidationErrors::ScopedField field(errors, ".eds_config");
186 const envoy_config_core_v3_ConfigSource* eds_config =
187 envoy_config_cluster_v3_Cluster_EdsClusterConfig_eds_config(
188 eds_cluster_config);
189 if (eds_config == nullptr) {
190 errors->AddError("field not present");
191 } else {
192 if (!envoy_config_core_v3_ConfigSource_has_ads(eds_config) &&
193 !envoy_config_core_v3_ConfigSource_has_self(eds_config)) {
194 errors->AddError("ConfigSource is not ads or self");
195 }
196 }
197 }
198 // Record EDS service_name (if any).
199 // This field is required if the CDS resource has an xdstp name.
200 eds.eds_service_name = UpbStringToStdString(
201 envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name(
202 eds_cluster_config));
203 if (eds.eds_service_name.empty()) {
204 absl::string_view cluster_name =
205 UpbStringToAbsl(envoy_config_cluster_v3_Cluster_name(cluster));
206 if (absl::StartsWith(cluster_name, "xdstp:")) {
207 ValidationErrors::ScopedField field(errors, ".service_name");
208 errors->AddError("must be set if Cluster resource has an xdstp name");
209 }
210 }
211 }
212 return eds;
213 }
214
LogicalDnsParse(const envoy_config_cluster_v3_Cluster * cluster,ValidationErrors * errors)215 XdsClusterResource::LogicalDns LogicalDnsParse(
216 const envoy_config_cluster_v3_Cluster* cluster, ValidationErrors* errors) {
217 XdsClusterResource::LogicalDns logical_dns;
218 ValidationErrors::ScopedField field(errors, ".load_assignment");
219 const auto* load_assignment =
220 envoy_config_cluster_v3_Cluster_load_assignment(cluster);
221 if (load_assignment == nullptr) {
222 errors->AddError("field not present for LOGICAL_DNS cluster");
223 return logical_dns;
224 }
225 ValidationErrors::ScopedField field2(errors, ".endpoints");
226 size_t num_localities;
227 const auto* const* localities =
228 envoy_config_endpoint_v3_ClusterLoadAssignment_endpoints(load_assignment,
229 &num_localities);
230 if (num_localities != 1) {
231 errors->AddError(absl::StrCat(
232 "must contain exactly one locality for LOGICAL_DNS cluster, found ",
233 num_localities));
234 return logical_dns;
235 }
236 ValidationErrors::ScopedField field3(errors, "[0].lb_endpoints");
237 size_t num_endpoints;
238 const auto* const* endpoints =
239 envoy_config_endpoint_v3_LocalityLbEndpoints_lb_endpoints(localities[0],
240 &num_endpoints);
241 if (num_endpoints != 1) {
242 errors->AddError(absl::StrCat(
243 "must contain exactly one endpoint for LOGICAL_DNS cluster, found ",
244 num_endpoints));
245 return logical_dns;
246 }
247 ValidationErrors::ScopedField field4(errors, "[0].endpoint");
248 const auto* endpoint =
249 envoy_config_endpoint_v3_LbEndpoint_endpoint(endpoints[0]);
250 if (endpoint == nullptr) {
251 errors->AddError("field not present");
252 return logical_dns;
253 }
254 ValidationErrors::ScopedField field5(errors, ".address");
255 const auto* address = envoy_config_endpoint_v3_Endpoint_address(endpoint);
256 if (address == nullptr) {
257 errors->AddError("field not present");
258 return logical_dns;
259 }
260 ValidationErrors::ScopedField field6(errors, ".socket_address");
261 const auto* socket_address =
262 envoy_config_core_v3_Address_socket_address(address);
263 if (socket_address == nullptr) {
264 errors->AddError("field not present");
265 return logical_dns;
266 }
267 if (envoy_config_core_v3_SocketAddress_resolver_name(socket_address).size !=
268 0) {
269 ValidationErrors::ScopedField field(errors, ".resolver_name");
270 errors->AddError(
271 "LOGICAL_DNS clusters must NOT have a custom resolver name set");
272 }
273 absl::string_view address_str = UpbStringToAbsl(
274 envoy_config_core_v3_SocketAddress_address(socket_address));
275 if (address_str.empty()) {
276 ValidationErrors::ScopedField field(errors, ".address");
277 errors->AddError("field not present");
278 }
279 if (!envoy_config_core_v3_SocketAddress_has_port_value(socket_address)) {
280 ValidationErrors::ScopedField field(errors, ".port_value");
281 errors->AddError("field not present");
282 }
283 logical_dns.hostname = JoinHostPort(
284 address_str,
285 envoy_config_core_v3_SocketAddress_port_value(socket_address));
286 return logical_dns;
287 }
288
AggregateClusterParse(const XdsResourceType::DecodeContext & context,absl::string_view serialized_config,ValidationErrors * errors)289 XdsClusterResource::Aggregate AggregateClusterParse(
290 const XdsResourceType::DecodeContext& context,
291 absl::string_view serialized_config, ValidationErrors* errors) {
292 XdsClusterResource::Aggregate aggregate;
293 const auto* aggregate_cluster_config =
294 envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse(
295 serialized_config.data(), serialized_config.size(), context.arena);
296 if (aggregate_cluster_config == nullptr) {
297 errors->AddError("can't parse aggregate cluster config");
298 return aggregate;
299 }
300 size_t size;
301 const upb_StringView* clusters =
302 envoy_extensions_clusters_aggregate_v3_ClusterConfig_clusters(
303 aggregate_cluster_config, &size);
304 if (size == 0) {
305 ValidationErrors::ScopedField field(errors, ".clusters");
306 errors->AddError("must be non-empty");
307 }
308 for (size_t i = 0; i < size; ++i) {
309 aggregate.prioritized_cluster_names.emplace_back(
310 UpbStringToStdString(clusters[i]));
311 }
312 return aggregate;
313 }
314
ParseLbPolicyConfig(const XdsResourceType::DecodeContext & context,const envoy_config_cluster_v3_Cluster * cluster,XdsClusterResource * cds_update,ValidationErrors * errors)315 void ParseLbPolicyConfig(const XdsResourceType::DecodeContext& context,
316 const envoy_config_cluster_v3_Cluster* cluster,
317 XdsClusterResource* cds_update,
318 ValidationErrors* errors) {
319 // First, check the new load_balancing_policy field.
320 const auto* load_balancing_policy =
321 envoy_config_cluster_v3_Cluster_load_balancing_policy(cluster);
322 if (load_balancing_policy != nullptr) {
323 const auto& registry =
324 static_cast<const GrpcXdsBootstrap&>(context.client->bootstrap())
325 .lb_policy_registry();
326 ValidationErrors::ScopedField field(errors, ".load_balancing_policy");
327 const size_t original_error_count = errors->size();
328 cds_update->lb_policy_config = registry.ConvertXdsLbPolicyConfig(
329 context, load_balancing_policy, errors);
330 // If there were no conversion errors, validate that the converted config
331 // parses with the gRPC LB policy registry.
332 if (original_error_count == errors->size()) {
333 auto config = CoreConfiguration::Get()
334 .lb_policy_registry()
335 .ParseLoadBalancingConfig(
336 Json::FromArray(cds_update->lb_policy_config));
337 if (!config.ok()) errors->AddError(config.status().message());
338 }
339 return;
340 }
341 // Didn't find load_balancing_policy field, so fall back to the old
342 // lb_policy enum field.
343 if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) ==
344 envoy_config_cluster_v3_Cluster_ROUND_ROBIN) {
345 cds_update->lb_policy_config = {
346 Json::FromObject({
347 {"xds_wrr_locality_experimental",
348 Json::FromObject({
349 {"childPolicy", Json::FromArray({
350 Json::FromObject({
351 {"round_robin", Json::FromObject({})},
352 }),
353 })},
354 })},
355 }),
356 };
357 } else if (envoy_config_cluster_v3_Cluster_lb_policy(cluster) ==
358 envoy_config_cluster_v3_Cluster_RING_HASH) {
359 // Record ring hash lb config
360 auto* ring_hash_config =
361 envoy_config_cluster_v3_Cluster_ring_hash_lb_config(cluster);
362 uint64_t min_ring_size = 1024;
363 uint64_t max_ring_size = 8388608;
364 if (ring_hash_config != nullptr) {
365 ValidationErrors::ScopedField field(errors, ".ring_hash_lb_config");
366 const google_protobuf_UInt64Value* uint64_value =
367 envoy_config_cluster_v3_Cluster_RingHashLbConfig_maximum_ring_size(
368 ring_hash_config);
369 if (uint64_value != nullptr) {
370 ValidationErrors::ScopedField field(errors, ".maximum_ring_size");
371 max_ring_size = google_protobuf_UInt64Value_value(uint64_value);
372 if (max_ring_size > 8388608 || max_ring_size == 0) {
373 errors->AddError("must be in the range of 1 to 8388608");
374 }
375 }
376 uint64_value =
377 envoy_config_cluster_v3_Cluster_RingHashLbConfig_minimum_ring_size(
378 ring_hash_config);
379 if (uint64_value != nullptr) {
380 ValidationErrors::ScopedField field(errors, ".minimum_ring_size");
381 min_ring_size = google_protobuf_UInt64Value_value(uint64_value);
382 if (min_ring_size > 8388608 || min_ring_size == 0) {
383 errors->AddError("must be in the range of 1 to 8388608");
384 }
385 if (min_ring_size > max_ring_size) {
386 errors->AddError("cannot be greater than maximum_ring_size");
387 }
388 }
389 if (envoy_config_cluster_v3_Cluster_RingHashLbConfig_hash_function(
390 ring_hash_config) !=
391 envoy_config_cluster_v3_Cluster_RingHashLbConfig_XX_HASH) {
392 ValidationErrors::ScopedField field(errors, ".hash_function");
393 errors->AddError("invalid hash function");
394 }
395 }
396 cds_update->lb_policy_config = {
397 Json::FromObject({
398 {"ring_hash_experimental",
399 Json::FromObject({
400 {"minRingSize", Json::FromNumber(min_ring_size)},
401 {"maxRingSize", Json::FromNumber(max_ring_size)},
402 })},
403 }),
404 };
405 } else {
406 ValidationErrors::ScopedField field(errors, ".lb_policy");
407 errors->AddError("LB policy is not supported");
408 }
409 }
410
ParseUpstreamConfig(const XdsResourceType::DecodeContext & context,const envoy_config_core_v3_TypedExtensionConfig * upstream_config,XdsClusterResource * cds_update,ValidationErrors * errors)411 void ParseUpstreamConfig(
412 const XdsResourceType::DecodeContext& context,
413 const envoy_config_core_v3_TypedExtensionConfig* upstream_config,
414 XdsClusterResource* cds_update, ValidationErrors* errors) {
415 ValidationErrors::ScopedField field(errors, ".typed_config");
416 const auto* typed_config =
417 envoy_config_core_v3_TypedExtensionConfig_typed_config(upstream_config);
418 auto extension = ExtractXdsExtension(context, typed_config, errors);
419 if (!extension.has_value()) return;
420 if (extension->type !=
421 "envoy.extensions.upstreams.http.v3.HttpProtocolOptions") {
422 ValidationErrors::ScopedField field(errors, ".type_url");
423 errors->AddError("unsupported upstream config type");
424 return;
425 }
426 absl::string_view* serialized_http_protocol_options =
427 absl::get_if<absl::string_view>(&extension->value);
428 if (serialized_http_protocol_options == nullptr) {
429 errors->AddError("can't decode HttpProtocolOptions");
430 return;
431 }
432 const auto* http_protocol_options =
433 envoy_extensions_upstreams_http_v3_HttpProtocolOptions_parse(
434 serialized_http_protocol_options->data(),
435 serialized_http_protocol_options->size(), context.arena);
436 if (http_protocol_options == nullptr) {
437 errors->AddError("can't decode HttpProtocolOptions");
438 return;
439 }
440 ValidationErrors::ScopedField field2(errors, ".common_http_protocol_options");
441 const auto* common_http_protocol_options =
442 envoy_extensions_upstreams_http_v3_HttpProtocolOptions_common_http_protocol_options(
443 http_protocol_options);
444 if (common_http_protocol_options != nullptr) {
445 const auto* idle_timeout =
446 envoy_config_core_v3_HttpProtocolOptions_idle_timeout(
447 common_http_protocol_options);
448 if (idle_timeout != nullptr) {
449 ValidationErrors::ScopedField field(errors, ".idle_timeout");
450 cds_update->connection_idle_timeout = ParseDuration(idle_timeout, errors);
451 }
452 }
453 }
454
CdsResourceParse(const XdsResourceType::DecodeContext & context,const envoy_config_cluster_v3_Cluster * cluster)455 absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
456 const XdsResourceType::DecodeContext& context,
457 const envoy_config_cluster_v3_Cluster* cluster) {
458 auto cds_update = std::make_shared<XdsClusterResource>();
459 ValidationErrors errors;
460 // Check the cluster discovery type.
461 if (envoy_config_cluster_v3_Cluster_type(cluster) ==
462 envoy_config_cluster_v3_Cluster_EDS) {
463 cds_update->type = EdsConfigParse(cluster, &errors);
464 } else if (envoy_config_cluster_v3_Cluster_type(cluster) ==
465 envoy_config_cluster_v3_Cluster_LOGICAL_DNS) {
466 cds_update->type = LogicalDnsParse(cluster, &errors);
467 } else if (envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) {
468 ValidationErrors::ScopedField field(&errors, ".cluster_type");
469 const auto* custom_cluster_type =
470 envoy_config_cluster_v3_Cluster_cluster_type(cluster);
471 GPR_ASSERT(custom_cluster_type != nullptr);
472 ValidationErrors::ScopedField field2(&errors, ".typed_config");
473 const auto* typed_config =
474 envoy_config_cluster_v3_Cluster_CustomClusterType_typed_config(
475 custom_cluster_type);
476 if (typed_config == nullptr) {
477 errors.AddError("field not present");
478 } else {
479 absl::string_view type_url = absl::StripPrefix(
480 UpbStringToAbsl(google_protobuf_Any_type_url(typed_config)),
481 "type.googleapis.com/");
482 if (type_url != "envoy.extensions.clusters.aggregate.v3.ClusterConfig") {
483 ValidationErrors::ScopedField field(&errors, ".type_url");
484 errors.AddError(
485 absl::StrCat("unknown cluster_type extension: ", type_url));
486 } else {
487 // Retrieve aggregate clusters.
488 ValidationErrors::ScopedField field(
489 &errors,
490 ".value[envoy.extensions.clusters.aggregate.v3.ClusterConfig]");
491 absl::string_view serialized_config =
492 UpbStringToAbsl(google_protobuf_Any_value(typed_config));
493 cds_update->type =
494 AggregateClusterParse(context, serialized_config, &errors);
495 }
496 }
497 } else {
498 ValidationErrors::ScopedField field(&errors, ".type");
499 errors.AddError("unknown discovery type");
500 }
501 // Check the LB policy.
502 ParseLbPolicyConfig(context, cluster, cds_update.get(), &errors);
503 // transport_socket
504 auto* transport_socket =
505 envoy_config_cluster_v3_Cluster_transport_socket(cluster);
506 if (transport_socket != nullptr) {
507 ValidationErrors::ScopedField field(&errors, ".transport_socket");
508 cds_update->common_tls_context =
509 UpstreamTlsContextParse(context, transport_socket, &errors);
510 }
511 // Record LRS server name (if any).
512 const envoy_config_core_v3_ConfigSource* lrs_server =
513 envoy_config_cluster_v3_Cluster_lrs_server(cluster);
514 if (lrs_server != nullptr) {
515 if (!envoy_config_core_v3_ConfigSource_has_self(lrs_server)) {
516 ValidationErrors::ScopedField field(&errors, ".lrs_server");
517 errors.AddError("ConfigSource is not self");
518 }
519 cds_update->lrs_load_reporting_server.emplace(
520 static_cast<const GrpcXdsBootstrap::GrpcXdsServer&>(context.server));
521 }
522 // Protocol options.
523 auto* upstream_config =
524 envoy_config_cluster_v3_Cluster_upstream_config(cluster);
525 if (upstream_config != nullptr) {
526 ValidationErrors::ScopedField field(&errors, ".upstream_config");
527 ParseUpstreamConfig(context, upstream_config, cds_update.get(), &errors);
528 }
529 // The Cluster resource encodes the circuit breaking parameters in a list of
530 // Thresholds messages, where each message specifies the parameters for a
531 // particular RoutingPriority. we will look only at the first entry in the
532 // list for priority DEFAULT and default to 1024 if not found.
533 if (envoy_config_cluster_v3_Cluster_has_circuit_breakers(cluster)) {
534 const envoy_config_cluster_v3_CircuitBreakers* circuit_breakers =
535 envoy_config_cluster_v3_Cluster_circuit_breakers(cluster);
536 size_t num_thresholds;
537 const envoy_config_cluster_v3_CircuitBreakers_Thresholds* const*
538 thresholds = envoy_config_cluster_v3_CircuitBreakers_thresholds(
539 circuit_breakers, &num_thresholds);
540 for (size_t i = 0; i < num_thresholds; ++i) {
541 const auto* threshold = thresholds[i];
542 if (envoy_config_cluster_v3_CircuitBreakers_Thresholds_priority(
543 threshold) == envoy_config_core_v3_DEFAULT) {
544 const google_protobuf_UInt32Value* max_requests =
545 envoy_config_cluster_v3_CircuitBreakers_Thresholds_max_requests(
546 threshold);
547 if (max_requests != nullptr) {
548 cds_update->max_concurrent_requests =
549 google_protobuf_UInt32Value_value(max_requests);
550 }
551 break;
552 }
553 }
554 }
555 // Outlier detection config.
556 if (envoy_config_cluster_v3_Cluster_has_outlier_detection(cluster)) {
557 ValidationErrors::ScopedField field(&errors, ".outlier_detection");
558 OutlierDetectionConfig outlier_detection_update;
559 const envoy_config_cluster_v3_OutlierDetection* outlier_detection =
560 envoy_config_cluster_v3_Cluster_outlier_detection(cluster);
561 const google_protobuf_Duration* duration =
562 envoy_config_cluster_v3_OutlierDetection_interval(outlier_detection);
563 if (duration != nullptr) {
564 ValidationErrors::ScopedField field(&errors, ".interval");
565 outlier_detection_update.interval = ParseDuration(duration, &errors);
566 }
567 duration = envoy_config_cluster_v3_OutlierDetection_base_ejection_time(
568 outlier_detection);
569 if (duration != nullptr) {
570 ValidationErrors::ScopedField field(&errors, ".base_ejection_time");
571 outlier_detection_update.base_ejection_time =
572 ParseDuration(duration, &errors);
573 }
574 duration = envoy_config_cluster_v3_OutlierDetection_max_ejection_time(
575 outlier_detection);
576 if (duration != nullptr) {
577 ValidationErrors::ScopedField field(&errors, ".max_ejection_time");
578 outlier_detection_update.max_ejection_time =
579 ParseDuration(duration, &errors);
580 }
581 const google_protobuf_UInt32Value* max_ejection_percent =
582 envoy_config_cluster_v3_OutlierDetection_max_ejection_percent(
583 outlier_detection);
584 if (max_ejection_percent != nullptr) {
585 outlier_detection_update.max_ejection_percent =
586 google_protobuf_UInt32Value_value(max_ejection_percent);
587 if (outlier_detection_update.max_ejection_percent > 100) {
588 ValidationErrors::ScopedField field(&errors, ".max_ejection_percent");
589 errors.AddError("value must be <= 100");
590 }
591 }
592 const google_protobuf_UInt32Value* enforcing_success_rate =
593 envoy_config_cluster_v3_OutlierDetection_enforcing_success_rate(
594 outlier_detection);
595 if (enforcing_success_rate != nullptr) {
596 uint32_t enforcement_percentage =
597 google_protobuf_UInt32Value_value(enforcing_success_rate);
598 if (enforcement_percentage > 100) {
599 ValidationErrors::ScopedField field(&errors, ".enforcing_success_rate");
600 errors.AddError("value must be <= 100");
601 }
602 if (enforcement_percentage != 0) {
603 OutlierDetectionConfig::SuccessRateEjection success_rate_ejection;
604 success_rate_ejection.enforcement_percentage = enforcement_percentage;
605 const google_protobuf_UInt32Value* minimum_hosts =
606 envoy_config_cluster_v3_OutlierDetection_success_rate_minimum_hosts(
607 outlier_detection);
608 if (minimum_hosts != nullptr) {
609 success_rate_ejection.minimum_hosts =
610 google_protobuf_UInt32Value_value(minimum_hosts);
611 }
612 const google_protobuf_UInt32Value* request_volume =
613 envoy_config_cluster_v3_OutlierDetection_success_rate_request_volume(
614 outlier_detection);
615 if (request_volume != nullptr) {
616 success_rate_ejection.request_volume =
617 google_protobuf_UInt32Value_value(request_volume);
618 }
619 const google_protobuf_UInt32Value* stdev_factor =
620 envoy_config_cluster_v3_OutlierDetection_success_rate_stdev_factor(
621 outlier_detection);
622 if (stdev_factor != nullptr) {
623 success_rate_ejection.stdev_factor =
624 google_protobuf_UInt32Value_value(stdev_factor);
625 }
626 outlier_detection_update.success_rate_ejection = success_rate_ejection;
627 }
628 }
629 const google_protobuf_UInt32Value* enforcing_failure_percentage =
630 envoy_config_cluster_v3_OutlierDetection_enforcing_failure_percentage(
631 outlier_detection);
632 if (enforcing_failure_percentage != nullptr) {
633 uint32_t enforcement_percentage =
634 google_protobuf_UInt32Value_value(enforcing_failure_percentage);
635 if (enforcement_percentage > 100) {
636 ValidationErrors::ScopedField field(&errors,
637 ".enforcing_failure_percentage");
638 errors.AddError("value must be <= 100");
639 }
640 if (enforcement_percentage != 0) {
641 OutlierDetectionConfig::FailurePercentageEjection
642 failure_percentage_ejection;
643 failure_percentage_ejection.enforcement_percentage =
644 enforcement_percentage;
645 const google_protobuf_UInt32Value* minimum_hosts =
646 envoy_config_cluster_v3_OutlierDetection_failure_percentage_minimum_hosts(
647 outlier_detection);
648 if (minimum_hosts != nullptr) {
649 failure_percentage_ejection.minimum_hosts =
650 google_protobuf_UInt32Value_value(minimum_hosts);
651 }
652 const google_protobuf_UInt32Value* request_volume =
653 envoy_config_cluster_v3_OutlierDetection_failure_percentage_request_volume(
654 outlier_detection);
655 if (request_volume != nullptr) {
656 failure_percentage_ejection.request_volume =
657 google_protobuf_UInt32Value_value(request_volume);
658 }
659 const google_protobuf_UInt32Value* threshold =
660 envoy_config_cluster_v3_OutlierDetection_failure_percentage_threshold(
661 outlier_detection);
662 if (threshold != nullptr) {
663 failure_percentage_ejection.threshold =
664 google_protobuf_UInt32Value_value(threshold);
665 if (enforcement_percentage > 100) {
666 ValidationErrors::ScopedField field(
667 &errors, ".failure_percentage_threshold");
668 errors.AddError("value must be <= 100");
669 }
670 }
671 outlier_detection_update.failure_percentage_ejection =
672 failure_percentage_ejection;
673 }
674 }
675 cds_update->outlier_detection = outlier_detection_update;
676 }
677 // Validate override host status.
678 const auto* common_lb_config =
679 envoy_config_cluster_v3_Cluster_common_lb_config(cluster);
680 bool override_host_status_found = false;
681 if (common_lb_config != nullptr) {
682 ValidationErrors::ScopedField field(&errors, ".common_lb_config");
683 const auto* override_host_status =
684 envoy_config_cluster_v3_Cluster_CommonLbConfig_override_host_status(
685 common_lb_config);
686 if (override_host_status != nullptr) {
687 ValidationErrors::ScopedField field(&errors, ".override_host_status");
688 size_t size;
689 const int32_t* statuses = envoy_config_core_v3_HealthStatusSet_statuses(
690 override_host_status, &size);
691 for (size_t i = 0; i < size; ++i) {
692 auto status = XdsHealthStatus::FromUpb(statuses[i]);
693 if (status.has_value()) {
694 cds_update->override_host_statuses.Add(*status);
695 }
696 }
697 override_host_status_found = true;
698 }
699 }
700 // If the field is not set, we default to [UNKNOWN, HEALTHY].
701 if (!override_host_status_found) {
702 cds_update->override_host_statuses.Add(
703 XdsHealthStatus(XdsHealthStatus::kUnknown));
704 cds_update->override_host_statuses.Add(
705 XdsHealthStatus(XdsHealthStatus::kHealthy));
706 }
707 // Record telemetry labels (if any).
708 const envoy_config_core_v3_Metadata* metadata =
709 envoy_config_cluster_v3_Cluster_metadata(cluster);
710 if (metadata != nullptr) {
711 google_protobuf_Struct* telemetry_labels_struct;
712 if (envoy_config_core_v3_Metadata_filter_metadata_get(
713 metadata,
714 StdStringToUpbString(
715 absl::string_view("com.google.csm.telemetry_labels")),
716 &telemetry_labels_struct)) {
717 size_t iter = kUpb_Map_Begin;
718 const google_protobuf_Struct_FieldsEntry* fields_entry;
719 while ((fields_entry = google_protobuf_Struct_fields_next(
720 telemetry_labels_struct, &iter)) != nullptr) {
721 // Adds any entry whose value is a string to telemetry_labels.
722 const google_protobuf_Value* value =
723 google_protobuf_Struct_FieldsEntry_value(fields_entry);
724 if (google_protobuf_Value_has_string_value(value)) {
725 if (UpbStringToAbsl(google_protobuf_Struct_FieldsEntry_key(
726 fields_entry)) == "service_name") {
727 cds_update->service_telemetry_label = RefCountedStringValue(
728 UpbStringToAbsl(google_protobuf_Value_string_value(value)));
729 } else if (UpbStringToAbsl(google_protobuf_Struct_FieldsEntry_key(
730 fields_entry)) == "service_namespace") {
731 cds_update->namespace_telemetry_label = RefCountedStringValue(
732 UpbStringToAbsl(google_protobuf_Value_string_value(value)));
733 }
734 }
735 }
736 }
737 }
738 // Return result.
739 if (!errors.ok()) {
740 return errors.status(absl::StatusCode::kInvalidArgument,
741 "errors validating Cluster resource");
742 }
743 return cds_update;
744 }
745
MaybeLogCluster(const XdsResourceType::DecodeContext & context,const envoy_config_cluster_v3_Cluster * cluster)746 void MaybeLogCluster(const XdsResourceType::DecodeContext& context,
747 const envoy_config_cluster_v3_Cluster* cluster) {
748 if (GRPC_TRACE_FLAG_ENABLED(*context.tracer) &&
749 gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
750 const upb_MessageDef* msg_type =
751 envoy_config_cluster_v3_Cluster_getmsgdef(context.symtab);
752 char buf[10240];
753 upb_TextEncode(reinterpret_cast<const upb_Message*>(cluster), msg_type,
754 nullptr, 0, buf, sizeof(buf));
755 gpr_log(GPR_DEBUG, "[xds_client %p] Cluster: %s", context.client, buf);
756 }
757 }
758
759 } // namespace
760
Decode(const XdsResourceType::DecodeContext & context,absl::string_view serialized_resource) const761 XdsResourceType::DecodeResult XdsClusterResourceType::Decode(
762 const XdsResourceType::DecodeContext& context,
763 absl::string_view serialized_resource) const {
764 DecodeResult result;
765 // Parse serialized proto.
766 auto* resource = envoy_config_cluster_v3_Cluster_parse(
767 serialized_resource.data(), serialized_resource.size(), context.arena);
768 if (resource == nullptr) {
769 result.resource =
770 absl::InvalidArgumentError("Can't parse Cluster resource.");
771 return result;
772 }
773 MaybeLogCluster(context, resource);
774 // Validate resource.
775 result.name =
776 UpbStringToStdString(envoy_config_cluster_v3_Cluster_name(resource));
777 auto cds_resource = CdsResourceParse(context, resource);
778 if (!cds_resource.ok()) {
779 if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
780 gpr_log(GPR_ERROR, "[xds_client %p] invalid Cluster %s: %s",
781 context.client, result.name->c_str(),
782 cds_resource.status().ToString().c_str());
783 }
784 result.resource = cds_resource.status();
785 } else {
786 if (GRPC_TRACE_FLAG_ENABLED(*context.tracer)) {
787 gpr_log(GPR_INFO, "[xds_client %p] parsed Cluster %s: %s", context.client,
788 result.name->c_str(), (*cds_resource)->ToString().c_str());
789 }
790 result.resource = std::move(*cds_resource);
791 }
792 return result;
793 }
794
795 } // namespace grpc_core
796