1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15
16 #include <gmock/gmock.h>
17 #include <gtest/gtest.h>
18
19 #include <numeric>
20 #include <string>
21 #include <vector>
22
23 #include "absl/log/log.h"
24 #include "absl/strings/match.h"
25 #include "absl/strings/str_cat.h"
26 #include "src/core/client_channel/backup_poller.h"
27 #include "src/core/config/config_vars.h"
28 #include "src/core/lib/address_utils/sockaddr_utils.h"
29 #include "src/core/lib/surface/call.h"
30 #include "src/core/telemetry/call_tracer.h"
31 #include "test/core/test_util/fake_stats_plugin.h"
32 #include "test/core/test_util/scoped_env_var.h"
33 #include "test/cpp/end2end/connection_attempt_injector.h"
34 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
35 #include "xds/data/orca/v3/orca_load_report.pb.h"
36
37 namespace grpc {
38 namespace testing {
39 namespace {
40
41 using ::envoy::config::cluster::v3::CircuitBreakers;
42 using ::envoy::config::core::v3::HealthStatus;
43 using ::envoy::config::core::v3::RoutingPriority;
44 using ::envoy::type::v3::FractionalPercent;
45
46 using ClientStats = LrsServiceImpl::ClientStats;
47 using OptionalLabelKey =
48 grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey;
49
50 constexpr char kLbDropType[] = "lb";
51 constexpr char kThrottleDropType[] = "throttle";
52 constexpr char kStatusMessageDropPrefix[] = "EDS-configured drop: ";
53
54 //
55 // CDS tests
56 //
57
58 using CdsTest = XdsEnd2endTest;
59
60 INSTANTIATE_TEST_SUITE_P(XdsTest, CdsTest, ::testing::Values(XdsTestType()),
61 &XdsTestType::Name);
62
63 // Tests that CDS client should send an ACK upon correct CDS response.
TEST_P(CdsTest,Vanilla)64 TEST_P(CdsTest, Vanilla) {
65 (void)SendRpc();
66 auto response_state = balancer_->ads_service()->cds_response_state();
67 ASSERT_TRUE(response_state.has_value());
68 EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
69 }
70
71 // Testing just one example of an invalid resource here.
72 // Unit tests for XdsClusterResourceType have exhaustive tests for all
73 // of the invalid cases.
TEST_P(CdsTest,InvalidClusterResource)74 TEST_P(CdsTest, InvalidClusterResource) {
75 auto cluster = default_cluster_;
76 cluster.set_type(Cluster::STATIC);
77 balancer_->ads_service()->SetCdsResource(cluster);
78 const auto response_state = WaitForCdsNack(DEBUG_LOCATION);
79 ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
80 EXPECT_EQ(response_state->error_message,
81 "xDS response validation errors: ["
82 "resource index 0: cluster_name: "
83 "INVALID_ARGUMENT: errors validating Cluster resource: ["
84 "field:type error:unknown discovery type]]");
85 }
86
87 // Tests that we don't trigger does-not-exist callbacks for a resource
88 // that was previously valid but is updated to be invalid.
TEST_P(CdsTest,InvalidClusterStillExistsIfPreviouslyCached)89 TEST_P(CdsTest, InvalidClusterStillExistsIfPreviouslyCached) {
90 CreateAndStartBackends(1);
91 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
92 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
93 // Check that everything works.
94 CheckRpcSendOk(DEBUG_LOCATION);
95 // Now send an update changing the Cluster to be invalid.
96 auto cluster = default_cluster_;
97 cluster.set_type(Cluster::STATIC);
98 balancer_->ads_service()->SetCdsResource(cluster);
99 const auto response_state =
100 WaitForCdsNack(DEBUG_LOCATION, RpcOptions(), StatusCode::OK);
101 ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
102 EXPECT_EQ(response_state->error_message,
103 "xDS response validation errors: ["
104 "resource index 0: cluster_name: "
105 "INVALID_ARGUMENT: errors validating Cluster resource: ["
106 "field:type error:unknown discovery type]]");
107 CheckRpcSendOk(DEBUG_LOCATION);
108 }
109
110 // Tests round robin is not impacted by the endpoint weight, and that the
111 // localities in a locality map are picked according to their weights.
TEST_P(CdsTest,EndpointWeightDoesNotImpactWeightedRoundRobin)112 TEST_P(CdsTest, EndpointWeightDoesNotImpactWeightedRoundRobin) {
113 CreateAndStartBackends(2);
114 const int kLocalityWeight0 = 2;
115 const int kLocalityWeight1 = 8;
116 const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
117 const double kLocalityWeightRate0 =
118 static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
119 const double kLocalityWeightRate1 =
120 static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
121 const double kErrorTolerance = 0.05;
122 const size_t kNumRpcs =
123 ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
124 // ADS response contains 2 localities, each of which contains 1 backend.
125 EdsResourceArgs args({
126 {"locality0",
127 {CreateEndpoint(0, HealthStatus::UNKNOWN, 8)},
128 kLocalityWeight0},
129 {"locality1",
130 {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)},
131 kLocalityWeight1},
132 });
133 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
134 // Wait for both backends to be ready.
135 WaitForAllBackends(DEBUG_LOCATION, 0, 2);
136 // Send kNumRpcs RPCs.
137 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
138 // The locality picking rates should be roughly equal to the expectation.
139 const double locality_picked_rate_0 =
140 static_cast<double>(backends_[0]->backend_service()->request_count()) /
141 kNumRpcs;
142 const double locality_picked_rate_1 =
143 static_cast<double>(backends_[1]->backend_service()->request_count()) /
144 kNumRpcs;
145 EXPECT_THAT(locality_picked_rate_0,
146 ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
147 EXPECT_THAT(locality_picked_rate_1,
148 ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
149 }
150
151 // In most of our tests, we use different names for different resource
152 // types, to make sure that there are no cut-and-paste errors in the code
153 // that cause us to look at data for the wrong resource type. So we add
154 // this test to make sure that the EDS resource name defaults to the
155 // cluster name if not specified in the CDS resource.
TEST_P(CdsTest,EdsServiceNameDefaultsToClusterName)156 TEST_P(CdsTest, EdsServiceNameDefaultsToClusterName) {
157 CreateAndStartBackends(1);
158 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
159 balancer_->ads_service()->SetEdsResource(
160 BuildEdsResource(args, kDefaultClusterName));
161 Cluster cluster = default_cluster_;
162 cluster.mutable_eds_cluster_config()->clear_service_name();
163 balancer_->ads_service()->SetCdsResource(cluster);
164 CheckRpcSendOk(DEBUG_LOCATION, /*times=*/1,
165 RpcOptions().set_timeout_ms(5000));
166 }
167
168 // Tests switching over from one cluster to another.
TEST_P(CdsTest,ChangeClusters)169 TEST_P(CdsTest, ChangeClusters) {
170 CreateAndStartBackends(2);
171 const char* kNewClusterName = "new_cluster_name";
172 const char* kNewEdsServiceName = "new_eds_service_name";
173 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
174 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
175 // We need to wait for all backends to come online.
176 WaitForAllBackends(DEBUG_LOCATION, 0, 1);
177 // Populate new EDS resource.
178 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
179 balancer_->ads_service()->SetEdsResource(
180 BuildEdsResource(args, kNewEdsServiceName));
181 // Populate new CDS resource.
182 Cluster new_cluster = default_cluster_;
183 new_cluster.set_name(kNewClusterName);
184 new_cluster.mutable_eds_cluster_config()->set_service_name(
185 kNewEdsServiceName);
186 balancer_->ads_service()->SetCdsResource(new_cluster);
187 // Change RDS resource to point to new cluster.
188 RouteConfiguration new_route_config = default_route_config_;
189 new_route_config.mutable_virtual_hosts(0)
190 ->mutable_routes(0)
191 ->mutable_route()
192 ->set_cluster(kNewClusterName);
193 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
194 new_route_config);
195 // Wait for all new backends to be used.
196 WaitForAllBackends(DEBUG_LOCATION, 1, 2);
197 }
198
TEST_P(CdsTest,CircuitBreaking)199 TEST_P(CdsTest, CircuitBreaking) {
200 CreateAndStartBackends(1);
201 constexpr size_t kMaxConcurrentRequests = 10;
202 // Populate new EDS resources.
203 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
204 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
205 // Update CDS resource to set max concurrent request.
206 CircuitBreakers circuit_breaks;
207 Cluster cluster = default_cluster_;
208 auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds();
209 threshold->set_priority(RoutingPriority::DEFAULT);
210 threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
211 balancer_->ads_service()->SetCdsResource(cluster);
212 // Send exactly max_concurrent_requests long RPCs.
213 LongRunningRpc rpcs[kMaxConcurrentRequests];
214 for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
215 rpcs[i].StartRpc(stub_.get());
216 }
217 // Wait for all RPCs to be in flight.
218 while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() <
219 kMaxConcurrentRequests) {
220 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
221 gpr_time_from_micros(1 * 1000, GPR_TIMESPAN)));
222 }
223 // Sending a RPC now should fail, the error message should tell us
224 // we hit the max concurrent requests limit and got dropped.
225 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
226 "circuit breaker drop");
227 // Cancel one RPC to allow another one through.
228 rpcs[0].CancelRpc();
229 // Add a sleep here to ensure the RPC cancellation has completed correctly
230 // before trying the next RPC. There maybe a slight delay between return of
231 // CANCELLED RPC status and update of internal state tracking the number of
232 // concurrent active requests.
233 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
234 gpr_time_from_millis(1000, GPR_TIMESPAN)));
235 CheckRpcSendOk(DEBUG_LOCATION);
236 // Clean up.
237 for (size_t i = 1; i < kMaxConcurrentRequests; ++i) {
238 rpcs[i].CancelRpc();
239 }
240 }
241
TEST_P(CdsTest,CircuitBreakingMultipleChannelsShareCallCounter)242 TEST_P(CdsTest, CircuitBreakingMultipleChannelsShareCallCounter) {
243 CreateAndStartBackends(1);
244 constexpr size_t kMaxConcurrentRequests = 10;
245 // Populate new EDS resources.
246 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
247 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
248 // Update CDS resource to set max concurrent request.
249 CircuitBreakers circuit_breaks;
250 Cluster cluster = default_cluster_;
251 auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds();
252 threshold->set_priority(RoutingPriority::DEFAULT);
253 threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
254 balancer_->ads_service()->SetCdsResource(cluster);
255 auto channel2 = CreateChannel();
256 auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
257 // Send exactly max_concurrent_requests long RPCs, alternating between
258 // the two channels.
259 LongRunningRpc rpcs[kMaxConcurrentRequests];
260 for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
261 rpcs[i].StartRpc(i % 2 == 0 ? stub_.get() : stub2.get());
262 }
263 // Wait for all RPCs to be in flight.
264 while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() <
265 kMaxConcurrentRequests) {
266 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
267 gpr_time_from_micros(1 * 1000, GPR_TIMESPAN)));
268 }
269 // Sending a RPC now should fail, the error message should tell us
270 // we hit the max concurrent requests limit and got dropped.
271 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
272 "circuit breaker drop");
273 // Cancel one RPC to allow another one through
274 rpcs[0].CancelRpc();
275 // Add a sleep here to ensure the RPC cancellation has completed correctly
276 // before trying the next RPC. There maybe a slight delay between return of
277 // CANCELLED RPC status and update of internal state tracking the number of
278 // concurrent active requests.
279 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
280 gpr_time_from_millis(1000, GPR_TIMESPAN)));
281 CheckRpcSendOk(DEBUG_LOCATION);
282 // Clean up.
283 for (size_t i = 1; i < kMaxConcurrentRequests; ++i) {
284 rpcs[i].CancelRpc();
285 }
286 }
287
TEST_P(CdsTest,ClusterChangeAfterAdsCallFails)288 TEST_P(CdsTest, ClusterChangeAfterAdsCallFails) {
289 CreateAndStartBackends(2);
290 const char* kNewEdsResourceName = "new_eds_resource_name";
291 // Populate EDS resources.
292 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
293 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
294 // Check that the channel is working.
295 CheckRpcSendOk(DEBUG_LOCATION);
296 // Stop and restart the balancer.
297 balancer_->Shutdown();
298 balancer_->Start();
299 // Create new EDS resource.
300 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
301 balancer_->ads_service()->SetEdsResource(
302 BuildEdsResource(args, kNewEdsResourceName));
303 // Change CDS resource to point to new EDS resource.
304 auto cluster = default_cluster_;
305 cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
306 balancer_->ads_service()->SetCdsResource(cluster);
307 // Make sure client sees the change.
308 WaitForBackend(DEBUG_LOCATION, 1);
309 }
310
TEST_P(CdsTest,MetricLabels)311 TEST_P(CdsTest, MetricLabels) {
312 // Injects a fake client call tracer factory. Try keep this at top.
313 grpc_core::FakeClientCallTracerFactory fake_client_call_tracer_factory;
314 CreateAndStartBackends(2);
315 // Populates EDS resources.
316 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)},
317 {"locality1", CreateEndpointsForBackends(1, 2)}});
318 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
319 // Populates service labels to CDS resources.
320 auto cluster = default_cluster_;
321 auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata();
322 auto& label_map =
323 *filter_map["com.google.csm.telemetry_labels"].mutable_fields();
324 *label_map["service_name"].mutable_string_value() = "myservice";
325 *label_map["service_namespace"].mutable_string_value() = "mynamespace";
326 balancer_->ads_service()->SetCdsResource(cluster);
327 ChannelArguments channel_args;
328 channel_args.SetPointer(GRPC_ARG_INJECT_FAKE_CLIENT_CALL_TRACER_FACTORY,
329 &fake_client_call_tracer_factory);
330 ResetStub(/*failover_timeout_ms=*/0, &channel_args);
331 // Send an RPC to backend 0.
332 WaitForBackend(DEBUG_LOCATION, 0);
333 // Verify that the optional labels are recorded in the call tracer.
334 EXPECT_THAT(
335 fake_client_call_tracer_factory.GetLastFakeClientCallTracer()
336 ->GetLastCallAttemptTracer()
337 ->GetOptionalLabels(),
338 ::testing::ElementsAre(
339 ::testing::Pair(OptionalLabelKey::kXdsServiceName, "myservice"),
340 ::testing::Pair(OptionalLabelKey::kXdsServiceNamespace,
341 "mynamespace"),
342 ::testing::Pair(OptionalLabelKey::kLocality,
343 LocalityNameString("locality0"))));
344 // Send an RPC to backend 1.
345 WaitForBackend(DEBUG_LOCATION, 1);
346 // Verify that the optional labels are recorded in the call
347 // tracer.
348 EXPECT_THAT(
349 fake_client_call_tracer_factory.GetLastFakeClientCallTracer()
350 ->GetLastCallAttemptTracer()
351 ->GetOptionalLabels(),
352 ::testing::ElementsAre(
353 ::testing::Pair(OptionalLabelKey::kXdsServiceName, "myservice"),
354 ::testing::Pair(OptionalLabelKey::kXdsServiceNamespace,
355 "mynamespace"),
356 ::testing::Pair(OptionalLabelKey::kLocality,
357 LocalityNameString("locality1"))));
358 }
359
360 //
361 // CDS deletion tests
362 //
363
364 class CdsDeletionTest : public XdsEnd2endTest {
365 protected:
SetUp()366 void SetUp() override {} // Individual tests call InitClient().
367 };
368
369 INSTANTIATE_TEST_SUITE_P(XdsTest, CdsDeletionTest,
370 ::testing::Values(XdsTestType()), &XdsTestType::Name);
371
372 // Tests that we go into TRANSIENT_FAILURE if the Cluster is deleted.
TEST_P(CdsDeletionTest,ClusterDeleted)373 TEST_P(CdsDeletionTest, ClusterDeleted) {
374 InitClient();
375 CreateAndStartBackends(1);
376 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
377 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
378 // We need to wait for all backends to come online.
379 WaitForAllBackends(DEBUG_LOCATION);
380 // Unset CDS resource.
381 balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
382 // Wait for RPCs to start failing.
383 SendRpcsUntilFailure(
384 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
385 absl::StrCat("CDS resource ", kDefaultClusterName,
386 ": does not exist \\(node ID:xds_end2end_test\\)"));
387 // Make sure we ACK'ed the update.
388 auto response_state = balancer_->ads_service()->cds_response_state();
389 ASSERT_TRUE(response_state.has_value());
390 EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
391 }
392
393 // Tests that we ignore Cluster deletions if configured to do so.
TEST_P(CdsDeletionTest,ClusterDeletionIgnored)394 TEST_P(CdsDeletionTest, ClusterDeletionIgnored) {
395 InitClient(MakeBootstrapBuilder().SetIgnoreResourceDeletion());
396 CreateAndStartBackends(2);
397 // Bring up client pointing to backend 0 and wait for it to connect.
398 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
399 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
400 WaitForAllBackends(DEBUG_LOCATION, 0, 1);
401 // Make sure we ACKed the CDS update.
402 auto response_state = balancer_->ads_service()->cds_response_state();
403 ASSERT_TRUE(response_state.has_value());
404 EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
405 // Unset CDS resource and wait for client to ACK the update.
406 balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
407 const auto deadline = absl::Now() + absl::Seconds(30);
408 while (true) {
409 ASSERT_LT(absl::Now(), deadline) << "timed out waiting for CDS ACK";
410 response_state = balancer_->ads_service()->cds_response_state();
411 if (response_state.has_value()) break;
412 }
413 EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
414 // Make sure we can still send RPCs.
415 CheckRpcSendOk(DEBUG_LOCATION);
416 // Now recreate the CDS resource pointing to a new EDS resource that
417 // specified backend 1, and make sure the client uses it.
418 const char* kNewEdsResourceName = "new_eds_resource_name";
419 auto cluster = default_cluster_;
420 cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
421 balancer_->ads_service()->SetCdsResource(cluster);
422 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
423 balancer_->ads_service()->SetEdsResource(
424 BuildEdsResource(args, kNewEdsResourceName));
425 // Wait for client to start using backend 1.
426 WaitForAllBackends(DEBUG_LOCATION, 1, 2);
427 }
428
429 //
430 // EDS tests
431 //
432
433 using EdsTest = XdsEnd2endTest;
434
435 INSTANTIATE_TEST_SUITE_P(
436 XdsTest, EdsTest,
437 ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
438 &XdsTestType::Name);
439
440 // Tests that the balancer sends the correct response to the client, and the
441 // client sends RPCs to the backends using the default child policy.
TEST_P(EdsTest,Vanilla)442 TEST_P(EdsTest, Vanilla) {
443 CreateAndStartBackends(3);
444 const size_t kNumRpcsPerAddress = 100;
445 EdsResourceArgs args({
446 {"locality0", CreateEndpointsForBackends()},
447 });
448 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
449 // Make sure that trying to connect works without a call.
450 channel_->GetState(true /* try_to_connect */);
451 // We need to wait for all backends to come online.
452 WaitForAllBackends(DEBUG_LOCATION);
453 // Send kNumRpcsPerAddress RPCs per server.
454 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
455 // Each backend should have gotten 100 requests.
456 for (size_t i = 0; i < backends_.size(); ++i) {
457 EXPECT_EQ(kNumRpcsPerAddress,
458 backends_[i]->backend_service()->request_count());
459 }
460 // Check LB policy name for the channel.
461 EXPECT_EQ("xds_cluster_manager_experimental",
462 channel_->GetLoadBalancingPolicyName());
463 }
464
TEST_P(EdsTest,MultipleAddressesPerEndpoint)465 TEST_P(EdsTest, MultipleAddressesPerEndpoint) {
466 grpc_core::testing::ScopedExperimentalEnvVar env(
467 "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
468 const size_t kNumRpcsPerAddress = 10;
469 // Create 3 backends, but leave backend 0 unstarted.
470 CreateBackends(3);
471 StartBackend(1);
472 StartBackend(2);
473 // The first endpoint is backends 0 and 1, the second endpoint is backend 2.
474 EdsResourceArgs args({
475 {"locality0",
476 {CreateEndpoint(0, HealthStatus::UNKNOWN, 1, {1}), CreateEndpoint(2)}},
477 });
478 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
479 // Initially, backend 0 is offline, so the first endpoint should
480 // connect to backend 1 instead. Traffic should round-robin across
481 // backends 1 and 2.
482 WaitForAllBackends(DEBUG_LOCATION, 1); // Wait for backends 1 and 2.
483 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * 2);
484 EXPECT_EQ(kNumRpcsPerAddress,
485 backends_[1]->backend_service()->request_count());
486 EXPECT_EQ(kNumRpcsPerAddress,
487 backends_[2]->backend_service()->request_count());
488 // Now start backend 0 and shutdown backend 1.
489 StartBackend(0);
490 ShutdownBackend(1);
491 // Wait for traffic to go to backend 0.
492 WaitForBackend(DEBUG_LOCATION, 0);
493 // Traffic should now round-robin across backends 0 and 2.
494 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * 2);
495 EXPECT_EQ(kNumRpcsPerAddress,
496 backends_[0]->backend_service()->request_count());
497 EXPECT_EQ(kNumRpcsPerAddress,
498 backends_[2]->backend_service()->request_count());
499 }
500
TEST_P(EdsTest,IgnoresUnhealthyEndpoints)501 TEST_P(EdsTest, IgnoresUnhealthyEndpoints) {
502 CreateAndStartBackends(2);
503 const size_t kNumRpcsPerAddress = 100;
504 auto endpoints = CreateEndpointsForBackends();
505 endpoints.push_back(MakeNonExistentEndpoint());
506 endpoints.back().health_status = HealthStatus::DRAINING;
507 EdsResourceArgs args({
508 {"locality0", std::move(endpoints), kDefaultLocalityWeight,
509 kDefaultLocalityPriority},
510 });
511 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
512 // Make sure that trying to connect works without a call.
513 channel_->GetState(true /* try_to_connect */);
514 // We need to wait for all backends to come online.
515 WaitForAllBackends(DEBUG_LOCATION);
516 // Send kNumRpcsPerAddress RPCs per server.
517 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
518 // Each backend should have gotten 100 requests.
519 for (size_t i = 0; i < backends_.size(); ++i) {
520 EXPECT_EQ(kNumRpcsPerAddress,
521 backends_[i]->backend_service()->request_count());
522 }
523 }
524
525 // This tests the bug described in https://github.com/grpc/grpc/issues/32486.
TEST_P(EdsTest,LocalityBecomesEmptyWithDeactivatedChildStateUpdate)526 TEST_P(EdsTest, LocalityBecomesEmptyWithDeactivatedChildStateUpdate) {
527 CreateAndStartBackends(1);
528 // Initial EDS resource has one locality with no endpoints.
529 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
530 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
531 WaitForAllBackends(DEBUG_LOCATION);
532 // EDS update removes all endpoints from the locality.
533 EdsResourceArgs::Locality empty_locality("locality0", {});
534 args = EdsResourceArgs({std::move(empty_locality)});
535 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
536 // Wait for RPCs to start failing.
537 constexpr char kErrorMessage[] =
538 "no children in weighted_target policy: "
539 "EDS resource eds_service_name: contains empty localities: "
540 "\\[\\{region=\"xds_default_locality_region\", "
541 "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
542 SendRpcsUntilFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
543 // Shut down backend. This triggers a connectivity state update from the
544 // deactivated child of the weighted_target policy.
545 ShutdownAllBackends();
546 // Now restart the backend.
547 StartAllBackends();
548 // Re-add endpoint.
549 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
550 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
551 // RPCs should eventually succeed.
552 WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
553 if (!result.status.ok()) {
554 EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
555 EXPECT_THAT(result.status.error_message(),
556 ::testing::MatchesRegex(absl::StrCat(
557 // The error message we see here depends on whether
558 // the client sees the EDS update before or after it
559 // sees the backend come back up.
560 MakeConnectionFailureRegex(
561 "connections to all backends failing; last error: "),
562 "|", kErrorMessage)));
563 }
564 });
565 }
566
TEST_P(EdsTest,NoLocalities)567 TEST_P(EdsTest, NoLocalities) {
568 CreateAndStartBackends(1);
569 // Initial EDS resource has no localities.
570 EdsResourceArgs args;
571 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
572 // RPCs should fail.
573 constexpr char kErrorMessage[] =
574 "no children in weighted_target policy: EDS resource eds_service_name: "
575 "contains no localities";
576 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
577 // Send EDS resource that has an endpoint.
578 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
579 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
580 // RPCs should eventually succeed.
581 WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
582 if (!result.status.ok()) {
583 EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
584 EXPECT_THAT(result.status.error_message(),
585 ::testing::MatchesRegex(kErrorMessage));
586 }
587 });
588 }
589
590 // Tests that RPCs will fail with UNAVAILABLE instead of DEADLINE_EXCEEDED if
591 // all the servers are unreachable.
TEST_P(EdsTest,AllServersUnreachableFailFast)592 TEST_P(EdsTest, AllServersUnreachableFailFast) {
593 // Set Rpc timeout to 5 seconds to ensure there is enough time
594 // for communication with the xDS server to take place upon test start up.
595 const uint32_t kRpcTimeoutMs = 5000;
596 const size_t kNumUnreachableServers = 5;
597 std::vector<EdsResourceArgs::Endpoint> endpoints;
598 for (size_t i = 0; i < kNumUnreachableServers; ++i) {
599 endpoints.emplace_back(MakeNonExistentEndpoint());
600 }
601 EdsResourceArgs args({{"locality0", std::move(endpoints)}});
602 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
603 // The error shouldn't be DEADLINE_EXCEEDED because timeout is set to 5
604 // seconds, and we should discover in that time that the target backend is
605 // down.
606 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
607 MakeConnectionFailureRegex(
608 "connections to all backends failing; last error: "),
609 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
610 }
611
612 // Tests that RPCs fail when the backends are down, and will succeed again
613 // after the backends are restarted.
TEST_P(EdsTest,BackendsRestart)614 TEST_P(EdsTest, BackendsRestart) {
615 CreateAndStartBackends(3);
616 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
617 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
618 WaitForAllBackends(DEBUG_LOCATION);
619 // Stop backends. RPCs should fail.
620 ShutdownAllBackends();
621 // Wait for channel to transition out of READY, so that we know it has
622 // noticed that all of the subchannels have failed. Note that it may
623 // be reporting either CONNECTING or TRANSIENT_FAILURE at this point.
624 EXPECT_TRUE(channel_->WaitForStateChange(
625 GRPC_CHANNEL_READY, grpc_timeout_seconds_to_deadline(5)));
626 EXPECT_THAT(channel_->GetState(false),
627 ::testing::AnyOf(::testing::Eq(GRPC_CHANNEL_TRANSIENT_FAILURE),
628 ::testing::Eq(GRPC_CHANNEL_CONNECTING)));
629 // RPCs should fail.
630 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
631 MakeConnectionFailureRegex(
632 "connections to all backends failing; last error: "));
633 // Restart all backends. RPCs should start succeeding again.
634 StartAllBackends();
635 CheckRpcSendOk(DEBUG_LOCATION, 1,
636 RpcOptions().set_timeout_ms(2000).set_wait_for_ready(true));
637 }
638
TEST_P(EdsTest,IgnoresDuplicateUpdates)639 TEST_P(EdsTest, IgnoresDuplicateUpdates) {
640 CreateAndStartBackends(1);
641 const size_t kNumRpcsPerAddress = 100;
642 EdsResourceArgs args({
643 {"locality0", CreateEndpointsForBackends()},
644 });
645 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
646 // Wait for all backends to come online.
647 WaitForAllBackends(DEBUG_LOCATION);
648 // Send kNumRpcsPerAddress RPCs per server, but send an EDS update in
649 // between. If the update is not ignored, this will cause the
650 // round_robin policy to see an update, which will randomly reset its
651 // position in the address list.
652 for (size_t i = 0; i < kNumRpcsPerAddress; ++i) {
653 CheckRpcSendOk(DEBUG_LOCATION, 2);
654 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
655 CheckRpcSendOk(DEBUG_LOCATION, 2);
656 }
657 // Each backend should have gotten the right number of requests.
658 for (size_t i = 1; i < backends_.size(); ++i) {
659 EXPECT_EQ(kNumRpcsPerAddress,
660 backends_[i]->backend_service()->request_count());
661 }
662 }
663
664 // Testing just one example of an invalid resource here.
665 // Unit tests for XdsEndpointResourceType have exhaustive tests for all
666 // of the invalid cases.
TEST_P(EdsTest,NacksInvalidResource)667 TEST_P(EdsTest, NacksInvalidResource) {
668 EdsResourceArgs args({
669 {"locality0", {MakeNonExistentEndpoint()}, kDefaultLocalityWeight, 1},
670 });
671 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
672 const auto response_state = WaitForEdsNack(DEBUG_LOCATION);
673 ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
674 EXPECT_EQ(response_state->error_message,
675 "xDS response validation errors: ["
676 "resource index 0: eds_service_name: "
677 "INVALID_ARGUMENT: errors parsing EDS resource: ["
678 "field:endpoints error:priority 0 empty]]");
679 }
680
681 // Tests that if the balancer is down, the RPCs will still be sent to the
682 // backends according to the last balancer response, until a new balancer is
683 // reachable.
TEST_P(EdsTest,KeepUsingLastDataIfBalancerGoesDown)684 TEST_P(EdsTest, KeepUsingLastDataIfBalancerGoesDown) {
685 CreateAndStartBackends(2);
686 // Set up EDS resource pointing to backend 0.
687 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
688 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
689 // Start the client and make sure it sees the backend.
690 WaitForBackend(DEBUG_LOCATION, 0);
691 // Stop the balancer, and verify that RPCs continue to flow to backend 0.
692 balancer_->Shutdown();
693 auto deadline = grpc_timeout_seconds_to_deadline(5);
694 do {
695 CheckRpcSendOk(DEBUG_LOCATION);
696 } while (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
697 // Check the EDS resource to point to backend 1 and bring the balancer
698 // back up.
699 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
700 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
701 balancer_->Start();
702 // Wait for client to see backend 1.
703 WaitForBackend(DEBUG_LOCATION, 1);
704 }
705
706 // Tests that the localities in a locality map are picked according to their
707 // weights.
TEST_P(EdsTest,LocalityWeights)708 TEST_P(EdsTest, LocalityWeights) {
709 CreateAndStartBackends(2);
710 const int kLocalityWeight0 = 2;
711 const int kLocalityWeight1 = 8;
712 const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
713 const double kLocalityWeightRate0 =
714 static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
715 const double kLocalityWeightRate1 =
716 static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
717 const double kErrorTolerance = 0.05;
718 const size_t kNumRpcs =
719 ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
720 // ADS response contains 2 localities, each of which contains 1 backend.
721 EdsResourceArgs args({
722 {"locality0", CreateEndpointsForBackends(0, 1), kLocalityWeight0},
723 {"locality1", CreateEndpointsForBackends(1, 2), kLocalityWeight1},
724 });
725 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
726 // Wait for both backends to be ready.
727 WaitForAllBackends(DEBUG_LOCATION, 0, 2);
728 // Send kNumRpcs RPCs.
729 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
730 // The locality picking rates should be roughly equal to the expectation.
731 const double locality_picked_rate_0 =
732 static_cast<double>(backends_[0]->backend_service()->request_count()) /
733 kNumRpcs;
734 const double locality_picked_rate_1 =
735 static_cast<double>(backends_[1]->backend_service()->request_count()) /
736 kNumRpcs;
737 EXPECT_THAT(locality_picked_rate_0,
738 ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
739 EXPECT_THAT(locality_picked_rate_1,
740 ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
741 }
742
743 // Tests that we don't suffer from integer overflow in locality weights.
TEST_P(EdsTest,NoIntegerOverflowInLocalityWeights)744 TEST_P(EdsTest, NoIntegerOverflowInLocalityWeights) {
745 CreateAndStartBackends(2);
746 const uint32_t kLocalityWeight1 = std::numeric_limits<uint32_t>::max() / 3;
747 const uint32_t kLocalityWeight0 =
748 std::numeric_limits<uint32_t>::max() - kLocalityWeight1;
749 const uint64_t kTotalLocalityWeight =
750 static_cast<uint64_t>(kLocalityWeight0) +
751 static_cast<uint64_t>(kLocalityWeight1);
752 const double kLocalityWeightRate0 =
753 static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
754 const double kLocalityWeightRate1 =
755 static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
756 const double kErrorTolerance = 0.05;
757 const size_t kNumRpcs =
758 ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
759 // ADS response contains 2 localities, each of which contains 1 backend.
760 EdsResourceArgs args({
761 {"locality0", CreateEndpointsForBackends(0, 1), kLocalityWeight0},
762 {"locality1", CreateEndpointsForBackends(1, 2), kLocalityWeight1},
763 });
764 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
765 // Wait for both backends to be ready.
766 WaitForAllBackends(DEBUG_LOCATION, 0, 2);
767 // Send kNumRpcs RPCs.
768 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
769 // The locality picking rates should be roughly equal to the expectation.
770 const double locality_picked_rate_0 =
771 static_cast<double>(backends_[0]->backend_service()->request_count()) /
772 kNumRpcs;
773 const double locality_picked_rate_1 =
774 static_cast<double>(backends_[1]->backend_service()->request_count()) /
775 kNumRpcs;
776 EXPECT_THAT(locality_picked_rate_0,
777 ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
778 EXPECT_THAT(locality_picked_rate_1,
779 ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
780 }
781
TEST_P(EdsTest,OneLocalityWithNoEndpoints)782 TEST_P(EdsTest, OneLocalityWithNoEndpoints) {
783 CreateAndStartBackends(1);
784 // Initial EDS resource has one locality with no endpoints.
785 EdsResourceArgs::Locality empty_locality("locality0", {});
786 EdsResourceArgs args({std::move(empty_locality)});
787 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
788 // RPCs should fail.
789 constexpr char kErrorMessage[] =
790 "no children in weighted_target policy: "
791 "EDS resource eds_service_name: contains empty localities: "
792 "\\[\\{region=\"xds_default_locality_region\", "
793 "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
794 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
795 // Send EDS resource that has an endpoint.
796 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
797 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
798 // RPCs should eventually succeed.
799 WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
800 if (!result.status.ok()) {
801 EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
802 EXPECT_THAT(result.status.error_message(),
803 ::testing::MatchesRegex(kErrorMessage));
804 }
805 });
806 }
807
808 // Tests that we correctly handle a locality containing no endpoints.
TEST_P(EdsTest,LocalityContainingNoEndpoints)809 TEST_P(EdsTest, LocalityContainingNoEndpoints) {
810 CreateAndStartBackends(2);
811 const size_t kNumRpcs = 5000;
812 // EDS response contains 2 localities, one with no endpoints.
813 EdsResourceArgs args({
814 {"locality0", CreateEndpointsForBackends()},
815 {"locality1", {}},
816 });
817 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
818 // Wait for both backends to be ready.
819 WaitForAllBackends(DEBUG_LOCATION);
820 // Send kNumRpcs RPCs.
821 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
822 // All traffic should go to the reachable locality.
823 EXPECT_EQ(backends_[0]->backend_service()->request_count(),
824 kNumRpcs / backends_.size());
825 EXPECT_EQ(backends_[1]->backend_service()->request_count(),
826 kNumRpcs / backends_.size());
827 }
828
829 // Tests that the locality map can work properly even when it contains a large
830 // number of localities.
TEST_P(EdsTest,ManyLocalitiesStressTest)831 TEST_P(EdsTest, ManyLocalitiesStressTest) {
832 const size_t kNumLocalities = 50;
833 CreateAndStartBackends(kNumLocalities + 1);
834 const uint32_t kRpcTimeoutMs = 5000;
835 // The first ADS response contains kNumLocalities localities, each of which
836 // contains its own backend.
837 EdsResourceArgs args;
838 for (size_t i = 0; i < kNumLocalities; ++i) {
839 std::string name = absl::StrCat("locality", i);
840 EdsResourceArgs::Locality locality(name,
841 CreateEndpointsForBackends(i, i + 1));
842 args.locality_list.emplace_back(std::move(locality));
843 }
844 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
845 // Wait until all backends are ready.
846 WaitForAllBackends(DEBUG_LOCATION, 0, kNumLocalities,
847 /*check_status=*/nullptr,
848 WaitForBackendOptions().set_reset_counters(false),
849 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
850 // The second ADS response contains 1 locality, which contains backend 50.
851 args =
852 EdsResourceArgs({{"locality0", CreateEndpointsForBackends(
853 kNumLocalities, kNumLocalities + 1)}});
854 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
855 // Wait until backend 50 is ready.
856 WaitForBackend(DEBUG_LOCATION, kNumLocalities);
857 }
858
859 // Tests that the localities in a locality map are picked correctly after
860 // update (addition, modification, deletion).
TEST_P(EdsTest,LocalityMapUpdateChurn)861 TEST_P(EdsTest, LocalityMapUpdateChurn) {
862 CreateAndStartBackends(4);
863 const size_t kNumRpcs = 3000;
864 // The locality weight for the first 3 localities.
865 const std::vector<int> kLocalityWeights0 = {2, 3, 4};
866 const double kTotalLocalityWeight0 =
867 std::accumulate(kLocalityWeights0.begin(), kLocalityWeights0.end(), 0);
868 std::vector<double> locality_weight_rate_0;
869 locality_weight_rate_0.reserve(kLocalityWeights0.size());
870 for (int weight : kLocalityWeights0) {
871 locality_weight_rate_0.push_back(weight / kTotalLocalityWeight0);
872 }
873 // Delete the first locality, keep the second locality, change the third
874 // locality's weight from 4 to 2, and add a new locality with weight 6.
875 const std::vector<int> kLocalityWeights1 = {3, 2, 6};
876 const double kTotalLocalityWeight1 =
877 std::accumulate(kLocalityWeights1.begin(), kLocalityWeights1.end(), 0);
878 std::vector<double> locality_weight_rate_1 = {
879 0 /* placeholder for locality 0 */};
880 for (int weight : kLocalityWeights1) {
881 locality_weight_rate_1.push_back(weight / kTotalLocalityWeight1);
882 }
883 EdsResourceArgs args({
884 {"locality0", CreateEndpointsForBackends(0, 1), 2},
885 {"locality1", CreateEndpointsForBackends(1, 2), 3},
886 {"locality2", CreateEndpointsForBackends(2, 3), 4},
887 });
888 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
889 // Wait for the first 3 backends to be ready.
890 WaitForAllBackends(DEBUG_LOCATION, 0, 3);
891 LOG(INFO) << "========= BEFORE FIRST BATCH ==========";
892 // Send kNumRpcs RPCs.
893 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
894 LOG(INFO) << "========= DONE WITH FIRST BATCH ==========";
895 // The picking rates of the first 3 backends should be roughly equal to the
896 // expectation.
897 std::vector<double> locality_picked_rates;
898 for (size_t i = 0; i < 3; ++i) {
899 locality_picked_rates.push_back(
900 static_cast<double>(backends_[i]->backend_service()->request_count()) /
901 kNumRpcs);
902 }
903 const double kErrorTolerance = 0.2;
904 for (size_t i = 0; i < 3; ++i) {
905 LOG(INFO) << "Locality " << i << " rate " << locality_picked_rates[i];
906 EXPECT_THAT(
907 locality_picked_rates[i],
908 ::testing::AllOf(
909 ::testing::Ge(locality_weight_rate_0[i] * (1 - kErrorTolerance)),
910 ::testing::Le(locality_weight_rate_0[i] * (1 + kErrorTolerance))));
911 }
912 args = EdsResourceArgs({
913 {"locality1", CreateEndpointsForBackends(1, 2), 3},
914 {"locality2", CreateEndpointsForBackends(2, 3), 2},
915 {"locality3", CreateEndpointsForBackends(3, 4), 6},
916 });
917 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
918 // Backend 3 hasn't received any request.
919 EXPECT_EQ(0U, backends_[3]->backend_service()->request_count());
920 // Wait until the locality update has been processed, as signaled by backend
921 // 3 receiving a request.
922 WaitForAllBackends(DEBUG_LOCATION, 3, 4);
923 LOG(INFO) << "========= BEFORE SECOND BATCH ==========";
924 // Send kNumRpcs RPCs.
925 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
926 LOG(INFO) << "========= DONE WITH SECOND BATCH ==========";
927 // Backend 0 no longer receives any request.
928 EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
929 // The picking rates of the last 3 backends should be roughly equal to the
930 // expectation.
931 locality_picked_rates = {0 /* placeholder for backend 0 */};
932 for (size_t i = 1; i < 4; ++i) {
933 locality_picked_rates.push_back(
934 static_cast<double>(backends_[i]->backend_service()->request_count()) /
935 kNumRpcs);
936 }
937 for (size_t i = 1; i < 4; ++i) {
938 LOG(INFO) << "Locality " << i << " rate " << locality_picked_rates[i];
939 EXPECT_THAT(
940 locality_picked_rates[i],
941 ::testing::AllOf(
942 ::testing::Ge(locality_weight_rate_1[i] * (1 - kErrorTolerance)),
943 ::testing::Le(locality_weight_rate_1[i] * (1 + kErrorTolerance))));
944 }
945 }
946
947 // Tests that we don't fail RPCs when replacing all of the localities in
948 // a given priority.
TEST_P(EdsTest,ReplaceAllLocalitiesInPriority)949 TEST_P(EdsTest, ReplaceAllLocalitiesInPriority) {
950 CreateAndStartBackends(2);
951 // Initial EDS update has backend 0.
952 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
953 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
954 // Wait for the first backend to be ready.
955 WaitForBackend(DEBUG_LOCATION, 0);
956 // Send EDS update that replaces the locality and switches to backend 1.
957 args = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(1, 2)}});
958 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
959 // When the client sees the update, RPCs should start going to backend 1.
960 // No RPCs should fail during this change.
961 WaitForBackend(DEBUG_LOCATION, 1);
962 }
963
TEST_P(EdsTest,ConsistentWeightedTargetUpdates)964 TEST_P(EdsTest, ConsistentWeightedTargetUpdates) {
965 CreateAndStartBackends(4);
966 // Initial update has two localities.
967 EdsResourceArgs args({
968 {"locality0", CreateEndpointsForBackends(1, 2)},
969 {"locality1", CreateEndpointsForBackends(2, 3)},
970 });
971 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
972 WaitForAllBackends(DEBUG_LOCATION, 1, 3);
973 // Next update removes locality1.
974 // Also add backend 0 to locality0, so that we can tell when the
975 // update has been seen.
976 args = EdsResourceArgs({
977 {"locality0", CreateEndpointsForBackends(0, 2)},
978 });
979 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
980 WaitForBackend(DEBUG_LOCATION, 0);
981 // Next update re-adds locality1.
982 // Also add backend 3 to locality1, so that we can tell when the
983 // update has been seen.
984 args = EdsResourceArgs({
985 {"locality0", CreateEndpointsForBackends(0, 2)},
986 {"locality1", CreateEndpointsForBackends(2, 4)},
987 });
988 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
989 WaitForBackend(DEBUG_LOCATION, 3);
990 }
991
992 // Tests that RPCs are dropped according to the drop config.
TEST_P(EdsTest,Drops)993 TEST_P(EdsTest, Drops) {
994 CreateAndStartBackends(1);
995 const uint32_t kDropPerMillionForLb = 100000;
996 const uint32_t kDropPerMillionForThrottle = 200000;
997 const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
998 const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
999 const double kDropRateForLbAndThrottle =
1000 kDropRateForLb + ((1 - kDropRateForLb) * kDropRateForThrottle);
1001 const double kErrorTolerance = 0.05;
1002 const size_t kNumRpcs =
1003 ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
1004 // The ADS response contains two drop categories.
1005 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1006 args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1007 {kThrottleDropType, kDropPerMillionForThrottle}};
1008 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1009 // Send kNumRpcs RPCs and count the drops.
1010 size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1011 DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1012 kStatusMessageDropPrefix);
1013 // The drop rate should be roughly equal to the expectation.
1014 const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1015 EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
1016 kErrorTolerance));
1017 }
1018
1019 // Tests that drop config is converted correctly from per hundred.
TEST_P(EdsTest,DropPerHundred)1020 TEST_P(EdsTest, DropPerHundred) {
1021 CreateAndStartBackends(1);
1022 const uint32_t kDropPerHundredForLb = 10;
1023 const double kDropRateForLb = kDropPerHundredForLb / 100.0;
1024 const double kErrorTolerance = 0.05;
1025 const size_t kNumRpcs = ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
1026 // The ADS response contains one drop category.
1027 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1028 args.drop_categories = {{kLbDropType, kDropPerHundredForLb}};
1029 args.drop_denominator = FractionalPercent::HUNDRED;
1030 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1031 // Send kNumRpcs RPCs and count the drops.
1032 size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1033 DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1034 kStatusMessageDropPrefix);
1035 // The drop rate should be roughly equal to the expectation.
1036 const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1037 EXPECT_THAT(seen_drop_rate,
1038 ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1039 }
1040
1041 // Tests that drop config is converted correctly from per ten thousand.
TEST_P(EdsTest,DropPerTenThousand)1042 TEST_P(EdsTest, DropPerTenThousand) {
1043 CreateAndStartBackends(1);
1044 const uint32_t kDropPerTenThousandForLb = 1000;
1045 const double kDropRateForLb = kDropPerTenThousandForLb / 10000.0;
1046 const double kErrorTolerance = 0.05;
1047 const size_t kNumRpcs = ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
1048 // The ADS response contains one drop category.
1049 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1050 args.drop_categories = {{kLbDropType, kDropPerTenThousandForLb}};
1051 args.drop_denominator = FractionalPercent::TEN_THOUSAND;
1052 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1053 // Send kNumRpcs RPCs and count the drops.
1054 size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1055 DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1056 kStatusMessageDropPrefix);
1057 // The drop rate should be roughly equal to the expectation.
1058 const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1059 EXPECT_THAT(seen_drop_rate,
1060 ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1061 }
1062
1063 // Tests that drop is working correctly after update.
TEST_P(EdsTest,DropConfigUpdate)1064 TEST_P(EdsTest, DropConfigUpdate) {
1065 CreateAndStartBackends(2);
1066 const uint32_t kDropPerMillionForLb = 100000;
1067 const uint32_t kDropPerMillionForThrottle = 200000;
1068 const double kErrorTolerance = 0.05;
1069 const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
1070 const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
1071 const double kDropRateForLbAndThrottle =
1072 kDropRateForLb + ((1 - kDropRateForLb) * kDropRateForThrottle);
1073 const size_t kNumRpcsLbOnly =
1074 ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
1075 const size_t kNumRpcsBoth =
1076 ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
1077 // The first EDS response contains backend 0 and one drop category.
1078 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
1079 args.drop_categories = {{kLbDropType, kDropPerMillionForLb}};
1080 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1081 // Send kNumRpcsLbOnly RPCs and count the drops.
1082 LOG(INFO) << "========= BEFORE FIRST BATCH ==========";
1083 size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1084 DEBUG_LOCATION, kNumRpcsLbOnly, StatusCode::UNAVAILABLE,
1085 kStatusMessageDropPrefix);
1086 LOG(INFO) << "========= DONE WITH FIRST BATCH ==========";
1087 // The drop rate should be roughly equal to the expectation.
1088 double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsLbOnly;
1089 LOG(INFO) << "First batch drop rate " << seen_drop_rate;
1090 EXPECT_THAT(seen_drop_rate,
1091 ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1092 // The second EDS response contains both backends and two drop categories.
1093 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
1094 args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1095 {kThrottleDropType, kDropPerMillionForThrottle}};
1096 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1097 // Wait until backend 1 sees traffic, so that we know the client has
1098 // seen the update.
1099 WaitForBackend(DEBUG_LOCATION, 1, [&](const RpcResult& result) {
1100 if (!result.status.ok()) {
1101 EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
1102 EXPECT_THAT(result.status.error_message(),
1103 ::testing::StartsWith(kStatusMessageDropPrefix));
1104 }
1105 });
1106 // Send kNumRpcsBoth RPCs and count the drops.
1107 LOG(INFO) << "========= BEFORE SECOND BATCH ==========";
1108 num_drops = SendRpcsAndCountFailuresWithMessage(DEBUG_LOCATION, kNumRpcsBoth,
1109 StatusCode::UNAVAILABLE,
1110 kStatusMessageDropPrefix);
1111 LOG(INFO) << "========= DONE WITH SECOND BATCH ==========";
1112 // The new drop rate should be roughly equal to the expectation.
1113 seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsBoth;
1114 LOG(INFO) << "Second batch drop rate " << seen_drop_rate;
1115 EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
1116 kErrorTolerance));
1117 }
1118
1119 // Tests that all the RPCs are dropped if any drop category drops 100%.
TEST_P(EdsTest,DropAll)1120 TEST_P(EdsTest, DropAll) {
1121 const size_t kNumRpcs = 1000;
1122 const uint32_t kDropPerMillionForLb = 100000;
1123 const uint32_t kDropPerMillionForThrottle = 1000000;
1124 // The ADS response contains two drop categories.
1125 EdsResourceArgs args;
1126 args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1127 {kThrottleDropType, kDropPerMillionForThrottle}};
1128 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1129 // Send kNumRpcs RPCs and all of them are dropped.
1130 size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1131 DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1132 kStatusMessageDropPrefix);
1133 EXPECT_EQ(num_drops, kNumRpcs);
1134 }
1135
1136 class EdsAuthorityRewriteTest : public XdsEnd2endTest {
1137 protected:
SetUp()1138 void SetUp() override {} // Individual tests call InitClient().
1139 };
1140
1141 INSTANTIATE_TEST_SUITE_P(XdsTest, EdsAuthorityRewriteTest,
1142 ::testing::Values(XdsTestType()), &XdsTestType::Name);
1143
TEST_P(EdsAuthorityRewriteTest,AutoAuthorityRewrite)1144 TEST_P(EdsAuthorityRewriteTest, AutoAuthorityRewrite) {
1145 grpc_core::testing::ScopedExperimentalEnvVar env(
1146 "GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE");
1147 constexpr char kAltAuthority1[] = "alt_authority1";
1148 constexpr char kAltAuthority2[] = "alt_authority2";
1149 // Note: We use InsecureCreds, since FakeCreds are too picky about
1150 // what authority gets sent.
1151 InitClient(MakeBootstrapBuilder().SetTrustedXdsServer(),
1152 /*lb_expected_authority=*/"",
1153 /*xds_resource_does_not_exist_timeout_ms=*/0,
1154 /*balancer_authority_override=*/"", /*args=*/nullptr,
1155 InsecureChannelCredentials());
1156 // Set auto_host_rewrite in the RouteConfig.
1157 RouteConfiguration new_route_config = default_route_config_;
1158 new_route_config.mutable_virtual_hosts(0)
1159 ->mutable_routes(0)
1160 ->mutable_route()
1161 ->mutable_auto_host_rewrite()
1162 ->set_value(true);
1163 SetRouteConfiguration(balancer_.get(), new_route_config);
1164 // Create 3 backends. Backend 0 does not have a hostname, but 1 and 2 do.
1165 CreateAndStartBackends(3, /*xds_enabled=*/false, InsecureServerCredentials());
1166 EdsResourceArgs args(
1167 {{"locality0",
1168 {
1169 CreateEndpoint(0),
1170 CreateEndpoint(1, ::envoy::config::core::v3::HealthStatus::UNKNOWN,
1171 /*lb_weight=*/1, /*additional_backend_indexes=*/{},
1172 /*hostname=*/kAltAuthority1),
1173 CreateEndpoint(2, ::envoy::config::core::v3::HealthStatus::UNKNOWN,
1174 /*lb_weight=*/1, /*additional_backend_indexes=*/{},
1175 /*hostname=*/kAltAuthority2),
1176 }}});
1177 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1178 WaitForAllBackends(DEBUG_LOCATION);
1179 // Send one RPC for each backend, check the authority headers seen on
1180 // the servers, and make sure we see the expected ones.
1181 std::set<std::string> authorities_seen;
1182 for (size_t i = 0; i < backends_.size(); ++i) {
1183 EchoResponse response;
1184 Status status = SendRpc(
1185 RpcOptions().set_echo_host_from_authority_header(true), &response);
1186 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1187 << " message=" << status.error_message();
1188 authorities_seen.insert(response.param().host());
1189 }
1190 EXPECT_THAT(
1191 authorities_seen,
1192 ::testing::ElementsAre(kAltAuthority1, kAltAuthority2, kServerName));
1193 }
1194
TEST_P(EdsAuthorityRewriteTest,NoRewriteWithoutEnvVar)1195 TEST_P(EdsAuthorityRewriteTest, NoRewriteWithoutEnvVar) {
1196 constexpr char kAltAuthority[] = "alt_authority";
1197 InitClient(MakeBootstrapBuilder().SetTrustedXdsServer());
1198 // Set auto_host_rewrite in the RouteConfig.
1199 RouteConfiguration new_route_config = default_route_config_;
1200 new_route_config.mutable_virtual_hosts(0)
1201 ->mutable_routes(0)
1202 ->mutable_route()
1203 ->mutable_auto_host_rewrite()
1204 ->set_value(true);
1205 SetRouteConfiguration(balancer_.get(), new_route_config);
1206 // Create a backend with a hostname in EDS.
1207 CreateAndStartBackends(1);
1208 EdsResourceArgs args(
1209 {{"locality0",
1210 {CreateEndpoint(0, ::envoy::config::core::v3::HealthStatus::UNKNOWN,
1211 /*lb_weight=*/1, /*additional_backend_indexes=*/{},
1212 /*hostname=*/kAltAuthority)}}});
1213 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1214 // Send an RPC and check the authority seen on the server side.
1215 EchoResponse response;
1216 Status status = SendRpc(
1217 RpcOptions().set_echo_host_from_authority_header(true), &response);
1218 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1219 << " message=" << status.error_message();
1220 EXPECT_EQ(response.param().host(), kServerName);
1221 }
1222
TEST_P(EdsAuthorityRewriteTest,NoRewriteIfServerNotTrustedInBootstrap)1223 TEST_P(EdsAuthorityRewriteTest, NoRewriteIfServerNotTrustedInBootstrap) {
1224 grpc_core::testing::ScopedExperimentalEnvVar env(
1225 "GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE");
1226 constexpr char kAltAuthority[] = "alt_authority";
1227 InitClient();
1228 // Set auto_host_rewrite in the RouteConfig.
1229 RouteConfiguration new_route_config = default_route_config_;
1230 new_route_config.mutable_virtual_hosts(0)
1231 ->mutable_routes(0)
1232 ->mutable_route()
1233 ->mutable_auto_host_rewrite()
1234 ->set_value(true);
1235 SetRouteConfiguration(balancer_.get(), new_route_config);
1236 // Create a backend with a hostname in EDS.
1237 CreateAndStartBackends(1);
1238 EdsResourceArgs args(
1239 {{"locality0",
1240 {CreateEndpoint(0, ::envoy::config::core::v3::HealthStatus::UNKNOWN,
1241 /*lb_weight=*/1, /*additional_backend_indexes=*/{},
1242 /*hostname=*/kAltAuthority)}}});
1243 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1244 // Send an RPC and check the authority seen on the server side.
1245 EchoResponse response;
1246 Status status = SendRpc(
1247 RpcOptions().set_echo_host_from_authority_header(true), &response);
1248 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1249 << " message=" << status.error_message();
1250 EXPECT_EQ(response.param().host(), kServerName);
1251 }
1252
TEST_P(EdsAuthorityRewriteTest,NoRewriteIfNoHostnameInEds)1253 TEST_P(EdsAuthorityRewriteTest, NoRewriteIfNoHostnameInEds) {
1254 grpc_core::testing::ScopedExperimentalEnvVar env(
1255 "GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE");
1256 InitClient(MakeBootstrapBuilder().SetTrustedXdsServer());
1257 // Set auto_host_rewrite in the RouteConfig.
1258 RouteConfiguration new_route_config = default_route_config_;
1259 new_route_config.mutable_virtual_hosts(0)
1260 ->mutable_routes(0)
1261 ->mutable_route()
1262 ->mutable_auto_host_rewrite()
1263 ->set_value(true);
1264 SetRouteConfiguration(balancer_.get(), new_route_config);
1265 // Create a backend with no hostname in EDS.
1266 CreateAndStartBackends(1);
1267 EdsResourceArgs args({{"locality0", {CreateEndpoint(0)}}});
1268 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1269 // Send an RPC and check the authority seen on the server side.
1270 EchoResponse response;
1271 Status status = SendRpc(
1272 RpcOptions().set_echo_host_from_authority_header(true), &response);
1273 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1274 << " message=" << status.error_message();
1275 EXPECT_EQ(response.param().host(), kServerName);
1276 }
1277
TEST_P(EdsAuthorityRewriteTest,NoRewriteIfNotEnabledInRoute)1278 TEST_P(EdsAuthorityRewriteTest, NoRewriteIfNotEnabledInRoute) {
1279 grpc_core::testing::ScopedExperimentalEnvVar env(
1280 "GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE");
1281 constexpr char kAltAuthority[] = "alt_authority";
1282 InitClient(MakeBootstrapBuilder().SetTrustedXdsServer());
1283 // Create a backend with a hostname in EDS.
1284 CreateAndStartBackends(1);
1285 EdsResourceArgs args(
1286 {{"locality0",
1287 {CreateEndpoint(0, ::envoy::config::core::v3::HealthStatus::UNKNOWN,
1288 /*lb_weight=*/1, /*additional_backend_indexes=*/{},
1289 /*hostname=*/kAltAuthority)}}});
1290 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1291 // Send an RPC and check the authority seen on the server side.
1292 EchoResponse response;
1293 Status status = SendRpc(
1294 RpcOptions().set_echo_host_from_authority_header(true), &response);
1295 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1296 << " message=" << status.error_message();
1297 EXPECT_EQ(response.param().host(), kServerName);
1298 }
1299
1300 //
1301 // EDS failover tests
1302 //
1303
1304 class FailoverTest : public XdsEnd2endTest {
1305 public:
SetUp()1306 void SetUp() override {
1307 XdsEnd2endTest::SetUp();
1308 ResetStub(/*failover_timeout_ms=*/500);
1309 }
1310 };
1311
1312 INSTANTIATE_TEST_SUITE_P(
1313 XdsTest, FailoverTest,
1314 ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
1315 &XdsTestType::Name);
1316
1317 // Localities with the highest priority are used when multiple priority exist.
TEST_P(FailoverTest,ChooseHighestPriority)1318 TEST_P(FailoverTest, ChooseHighestPriority) {
1319 CreateAndStartBackends(4);
1320 EdsResourceArgs args({
1321 {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1322 1},
1323 {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1324 2},
1325 {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1326 3},
1327 {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1328 0},
1329 });
1330 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1331 WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1332 WaitForBackendOptions().set_reset_counters(false));
1333 for (size_t i = 0; i < 3; ++i) {
1334 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1335 }
1336 }
1337
1338 // Does not choose priority with no endpoints.
TEST_P(FailoverTest,DoesNotUsePriorityWithNoEndpoints)1339 TEST_P(FailoverTest, DoesNotUsePriorityWithNoEndpoints) {
1340 CreateAndStartBackends(3);
1341 EdsResourceArgs args({
1342 {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1343 1},
1344 {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1345 2},
1346 {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1347 3},
1348 {"locality3", {}, kDefaultLocalityWeight, 0},
1349 });
1350 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1351 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1352 WaitForBackendOptions().set_reset_counters(false));
1353 for (size_t i = 1; i < 3; ++i) {
1354 EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1355 }
1356 }
1357
1358 // Does not choose locality with no endpoints.
TEST_P(FailoverTest,DoesNotUseLocalityWithNoEndpoints)1359 TEST_P(FailoverTest, DoesNotUseLocalityWithNoEndpoints) {
1360 CreateAndStartBackends(1);
1361 EdsResourceArgs args({
1362 {"locality0", {}, kDefaultLocalityWeight, 0},
1363 {"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 0},
1364 });
1365 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1366 // Wait for all backends to be used.
1367 WaitForAllBackends(DEBUG_LOCATION);
1368 }
1369
1370 // If the higher priority localities are not reachable, failover to the
1371 // highest priority among the rest.
TEST_P(FailoverTest,Failover)1372 TEST_P(FailoverTest, Failover) {
1373 CreateAndStartBackends(2);
1374 EdsResourceArgs args({
1375 {"locality0", {MakeNonExistentEndpoint()}, kDefaultLocalityWeight, 1},
1376 {"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1377 2},
1378 {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1379 3},
1380 {"locality3", {MakeNonExistentEndpoint()}, kDefaultLocalityWeight, 0},
1381 });
1382 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1383 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1384 WaitForBackendOptions().set_reset_counters(false));
1385 EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
1386 }
1387
1388 // Reports CONNECTING when failing over to a lower priority.
TEST_P(FailoverTest,ReportsConnectingDuringFailover)1389 TEST_P(FailoverTest, ReportsConnectingDuringFailover) {
1390 CreateAndStartBackends(1);
1391 // Priority 0 will be unreachable, so we'll use priority 1.
1392 EdsResourceArgs args({
1393 {"locality0", {MakeNonExistentEndpoint()}, kDefaultLocalityWeight, 0},
1394 {"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 1},
1395 });
1396 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1397 ConnectionAttemptInjector injector;
1398 auto hold = injector.AddHold(backends_[0]->port());
1399 // Start an RPC in the background, which should cause the channel to
1400 // try to connect.
1401 LongRunningRpc rpc;
1402 rpc.StartRpc(stub_.get(), RpcOptions());
1403 // Wait for connection attempt to start to the backend.
1404 hold->Wait();
1405 // Channel state should be CONNECTING here, and any RPC should be
1406 // queued.
1407 EXPECT_EQ(channel_->GetState(false), GRPC_CHANNEL_CONNECTING);
1408 // Allow the connection attempt to complete.
1409 hold->Resume();
1410 // Now the RPC should complete successfully.
1411 LOG(INFO) << "=== WAITING FOR RPC TO FINISH ===";
1412 Status status = rpc.GetStatus();
1413 LOG(INFO) << "=== RPC FINISHED ===";
1414 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1415 << " message=" << status.error_message();
1416 }
1417
1418 // If a locality with higher priority than the current one becomes ready,
1419 // switch to it.
TEST_P(FailoverTest,SwitchBackToHigherPriority)1420 TEST_P(FailoverTest, SwitchBackToHigherPriority) {
1421 CreateAndStartBackends(4);
1422 const size_t kNumRpcs = 100;
1423 EdsResourceArgs args({
1424 {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1425 1},
1426 {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1427 2},
1428 {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1429 3},
1430 {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1431 0},
1432 });
1433 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1434 WaitForBackend(DEBUG_LOCATION, 3);
1435 backends_[3]->StopListeningAndSendGoaways();
1436 backends_[0]->StopListeningAndSendGoaways();
1437 WaitForBackend(DEBUG_LOCATION, 1);
1438 ShutdownBackend(0);
1439 StartBackend(0);
1440 WaitForBackend(DEBUG_LOCATION, 0);
1441 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
1442 EXPECT_EQ(kNumRpcs, backends_[0]->backend_service()->request_count());
1443 }
1444
1445 // The first update only contains unavailable priorities. The second update
1446 // contains available priorities.
TEST_P(FailoverTest,UpdateInitialUnavailable)1447 TEST_P(FailoverTest, UpdateInitialUnavailable) {
1448 CreateAndStartBackends(2);
1449 EdsResourceArgs args({
1450 {"locality0", {MakeNonExistentEndpoint()}, kDefaultLocalityWeight, 0},
1451 {"locality1", {MakeNonExistentEndpoint()}, kDefaultLocalityWeight, 1},
1452 });
1453 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1454 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1455 MakeConnectionFailureRegex(
1456 "connections to all backends failing; last error: "));
1457 args = EdsResourceArgs({
1458 {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1459 0},
1460 {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1461 1},
1462 });
1463 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1464 WaitForBackend(DEBUG_LOCATION, 0, [&](const RpcResult& result) {
1465 if (!result.status.ok()) {
1466 EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
1467 EXPECT_THAT(result.status.error_message(),
1468 ::testing::MatchesRegex(MakeConnectionFailureRegex(
1469 "connections to all backends failing; last error: ")));
1470 }
1471 });
1472 }
1473
1474 // Tests that after the localities' priorities are updated, we still choose
1475 // the highest READY priority with the updated localities.
TEST_P(FailoverTest,UpdatePriority)1476 TEST_P(FailoverTest, UpdatePriority) {
1477 CreateAndStartBackends(4);
1478 const size_t kNumRpcs = 100;
1479 EdsResourceArgs args({
1480 {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1481 1},
1482 {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1483 2},
1484 {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1485 3},
1486 {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1487 0},
1488 });
1489 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1490 WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1491 WaitForBackendOptions().set_reset_counters(false));
1492 EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
1493 EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
1494 EXPECT_EQ(0U, backends_[2]->backend_service()->request_count());
1495 args = EdsResourceArgs({
1496 {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1497 2},
1498 {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1499 0},
1500 {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1501 1},
1502 {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1503 3},
1504 });
1505 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1506 WaitForBackend(DEBUG_LOCATION, 1);
1507 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
1508 EXPECT_EQ(kNumRpcs, backends_[1]->backend_service()->request_count());
1509 }
1510
1511 // Moves all localities in the current priority to a higher priority.
TEST_P(FailoverTest,MoveAllLocalitiesInCurrentPriorityToHigherPriority)1512 TEST_P(FailoverTest, MoveAllLocalitiesInCurrentPriorityToHigherPriority) {
1513 CreateAndStartBackends(3);
1514 auto non_existent_endpoint = MakeNonExistentEndpoint();
1515 // First update:
1516 // - Priority 0 is locality 0, containing an unreachable backend.
1517 // - Priority 1 is locality 1, containing backends 0 and 1.
1518 EdsResourceArgs args({
1519 {"locality0", {non_existent_endpoint}, kDefaultLocalityWeight, 0},
1520 {"locality1", CreateEndpointsForBackends(0, 2), kDefaultLocalityWeight,
1521 1},
1522 });
1523 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1524 // When we get the first update, all backends in priority 0 are down,
1525 // so we will create priority 1. Backends 0 and 1 should have traffic,
1526 // but backend 2 should not.
1527 WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
1528 WaitForBackendOptions().set_reset_counters(false));
1529 EXPECT_EQ(0UL, backends_[2]->backend_service()->request_count());
1530 // Second update:
1531 // - Priority 0 contains both localities 0 and 1.
1532 // - Priority 1 is not present.
1533 // - We add backend 2 to locality 1, just so we have a way to know
1534 // when the update has been seen by the client.
1535 args = EdsResourceArgs({
1536 {"locality0", {non_existent_endpoint}, kDefaultLocalityWeight, 0},
1537 {"locality1", CreateEndpointsForBackends(0, 3), kDefaultLocalityWeight,
1538 0},
1539 });
1540 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1541 // When backend 2 gets traffic, we know the second update has been seen.
1542 WaitForBackend(DEBUG_LOCATION, 2);
1543 }
1544
1545 // This tests a bug triggered by the xds_cluster_resolver policy reusing
1546 // a child name for the priority policy when that child name was still
1547 // present but deactivated.
TEST_P(FailoverTest,PriorityChildNameChurn)1548 TEST_P(FailoverTest, PriorityChildNameChurn) {
1549 CreateAndStartBackends(4);
1550 auto non_existent_endpoint = MakeNonExistentEndpoint();
1551 // Initial update:
1552 // - P0:locality0, child number 0 (unreachable)
1553 // - P1:locality1, child number 1
1554 // - P2:locality2, child number 2
1555 EdsResourceArgs args({
1556 {"locality0", {non_existent_endpoint}, kDefaultLocalityWeight, 0},
1557 {"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1558 1},
1559 {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1560 2},
1561 });
1562 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1563 WaitForBackend(DEBUG_LOCATION, 0);
1564 // Next update:
1565 // - P0:locality0, child number 0 (still unreachable)
1566 // - P1:locality2, child number 2 (moved from P2 to P1)
1567 // - P2:locality3, child number 3 (new child)
1568 // Child number 1 will be deactivated.
1569 args = EdsResourceArgs({
1570 {"locality0", {non_existent_endpoint}, kDefaultLocalityWeight, 0},
1571 {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1572 1},
1573 {"locality3", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1574 2},
1575 });
1576 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1577 WaitForBackend(DEBUG_LOCATION, 1);
1578 // Next update:
1579 // - P0:locality0, child number 0 (still unreachable)
1580 // - P1:locality4, child number 4 (new child number -- should not reuse #1)
1581 // - P2:locality3, child number 3
1582 // Child number 1 will be deactivated.
1583 args = EdsResourceArgs({
1584 {"locality0", {non_existent_endpoint}, kDefaultLocalityWeight, 0},
1585 {"locality4", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1586 1},
1587 {"locality3", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1588 2},
1589 });
1590 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1591 WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1592 WaitForBackendOptions().set_reset_counters(false));
1593 // P2 should not have gotten any traffic in this change.
1594 EXPECT_EQ(0UL, backends_[2]->backend_service()->request_count());
1595 }
1596
1597 //
1598 // EDS client load reporting tests
1599 //
1600
1601 using ClientLoadReportingTest = XdsEnd2endTest;
1602
1603 INSTANTIATE_TEST_SUITE_P(
1604 XdsTest, ClientLoadReportingTest,
1605 ::testing::Values(XdsTestType().set_enable_load_reporting()),
1606 &XdsTestType::Name);
1607
1608 MATCHER_P2(LoadMetricEq, num_requests_finished_with_metric, total_metric_value,
1609 "equals LoadMetric") {
1610 bool match = true;
1611 match &= ::testing::ExplainMatchResult(num_requests_finished_with_metric,
1612 arg.num_requests_finished_with_metric,
1613 result_listener);
1614 match &=
1615 ::testing::ExplainMatchResult(::testing::DoubleEq(total_metric_value),
1616 arg.total_metric_value, result_listener);
1617 return match;
1618 }
1619
1620 // Tests that the load report received at the balancer is correct.
TEST_P(ClientLoadReportingTest,Vanilla)1621 TEST_P(ClientLoadReportingTest, Vanilla) {
1622 CreateAndStartBackends(4);
1623 const size_t kNumRpcsPerAddress = 10;
1624 const size_t kNumFailuresPerAddress = 3;
1625 EdsResourceArgs args({
1626 {"locality0", CreateEndpointsForBackends(0, 2)},
1627 {"locality1", CreateEndpointsForBackends(2, 4)},
1628 });
1629 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1630 // Wait until all backends are ready.
1631 size_t num_warmup_rpcs =
1632 WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
1633 WaitForBackendOptions().set_reset_counters(false));
1634 // Send kNumRpcsPerAddress RPCs per server with named metrics.
1635 xds::data::orca::v3::OrcaLoadReport backend_metrics;
1636 auto& named_metrics = (*backend_metrics.mutable_named_metrics());
1637 named_metrics["foo"] = 1.0;
1638 named_metrics["bar"] = 2.0;
1639 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
1640 RpcOptions().set_backend_metrics(backend_metrics));
1641 named_metrics["foo"] = 0.3;
1642 named_metrics["bar"] = 0.4;
1643 for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
1644 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
1645 RpcOptions().set_server_fail(true).set_backend_metrics(
1646 backend_metrics));
1647 }
1648 const size_t total_successful_rpcs_sent =
1649 (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
1650 const size_t total_failed_rpcs_sent =
1651 kNumFailuresPerAddress * backends_.size();
1652 // Check that the backends got the right number of requests.
1653 size_t total_rpcs_sent = 0;
1654 for (const auto& backend : backends_) {
1655 total_rpcs_sent += backend->backend_service()->request_count();
1656 }
1657 EXPECT_EQ(total_rpcs_sent,
1658 total_successful_rpcs_sent + total_failed_rpcs_sent);
1659 // The load report received at the balancer should be correct.
1660 std::vector<ClientStats> load_report =
1661 balancer_->lrs_service()->WaitForLoadReport();
1662 ASSERT_EQ(load_report.size(), 1UL);
1663 ClientStats& client_stats = load_report.front();
1664 EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
1665 EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
1666 EXPECT_EQ(total_successful_rpcs_sent,
1667 client_stats.total_successful_requests());
1668 EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1669 EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
1670 EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
1671 EXPECT_EQ(0U, client_stats.total_dropped_requests());
1672 ASSERT_THAT(
1673 client_stats.locality_stats(),
1674 ::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
1675 ::testing::Pair("locality1", ::testing::_)));
1676 size_t num_successful_rpcs = 0;
1677 size_t num_failed_rpcs = 0;
1678 std::map<std::string, ClientStats::LocalityStats::LoadMetric>
1679 named_metrics_total;
1680 for (const auto& p : client_stats.locality_stats()) {
1681 EXPECT_EQ(p.second.total_requests_in_progress, 0U);
1682 EXPECT_EQ(
1683 p.second.total_issued_requests,
1684 p.second.total_successful_requests + p.second.total_error_requests);
1685 num_successful_rpcs += p.second.total_successful_requests;
1686 num_failed_rpcs += p.second.total_error_requests;
1687 for (const auto& s : p.second.load_metrics) {
1688 named_metrics_total[s.first] += s.second;
1689 }
1690 }
1691 EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
1692 EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
1693 EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
1694 EXPECT_THAT(
1695 named_metrics_total,
1696 ::testing::UnorderedElementsAre(
1697 ::testing::Pair(
1698 "foo",
1699 LoadMetricEq(
1700 (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1701 backends_.size(),
1702 (kNumRpcsPerAddress * backends_.size()) * 1.0 +
1703 (kNumFailuresPerAddress * backends_.size()) * 0.3)),
1704 ::testing::Pair(
1705 "bar",
1706 LoadMetricEq(
1707 (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1708 backends_.size(),
1709 (kNumRpcsPerAddress * backends_.size()) * 2.0 +
1710 (kNumFailuresPerAddress * backends_.size()) * 0.4))));
1711 // The LRS service got a single request, and sent a single response.
1712 EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1713 EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1714 }
1715
1716 // Tests ORCA to LRS propagation.
TEST_P(ClientLoadReportingTest,OrcaPropagation)1717 TEST_P(ClientLoadReportingTest, OrcaPropagation) {
1718 grpc_core::testing::ScopedExperimentalEnvVar env(
1719 "GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
1720 CreateAndStartBackends(4);
1721 const size_t kNumRpcsPerAddress = 10;
1722 const size_t kNumFailuresPerAddress = 3;
1723 Cluster cluster = default_cluster_;
1724 cluster.add_lrs_report_endpoint_metrics("named_metrics.foo");
1725 cluster.add_lrs_report_endpoint_metrics("cpu_utilization");
1726 cluster.add_lrs_report_endpoint_metrics("mem_utilization");
1727 cluster.add_lrs_report_endpoint_metrics("application_utilization");
1728 cluster.add_lrs_report_endpoint_metrics("unknown_field");
1729 balancer_->ads_service()->SetCdsResource(cluster);
1730 EdsResourceArgs args({
1731 {"locality0", CreateEndpointsForBackends(0, 2)},
1732 {"locality1", CreateEndpointsForBackends(2, 4)},
1733 });
1734 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1735 // Wait until all backends are ready.
1736 size_t num_warmup_rpcs =
1737 WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
1738 WaitForBackendOptions().set_reset_counters(false));
1739 // Send kNumRpcsPerAddress RPCs per server with named metrics.
1740 xds::data::orca::v3::OrcaLoadReport backend_metrics;
1741 backend_metrics.set_cpu_utilization(0.8);
1742 backend_metrics.set_mem_utilization(0.6);
1743 backend_metrics.set_application_utilization(0.4);
1744 auto& named_metrics = (*backend_metrics.mutable_named_metrics());
1745 named_metrics["foo"] = 1.0;
1746 named_metrics["bar"] = 2.0; // Not propagated.
1747 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
1748 RpcOptions().set_backend_metrics(backend_metrics));
1749 backend_metrics.set_cpu_utilization(0.4);
1750 backend_metrics.set_mem_utilization(0.3);
1751 backend_metrics.set_application_utilization(0.2);
1752 named_metrics["foo"] = 0.3;
1753 named_metrics["bar"] = 0.4; // Not propagated.
1754 for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
1755 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
1756 RpcOptions().set_server_fail(true).set_backend_metrics(
1757 backend_metrics));
1758 }
1759 const size_t total_successful_rpcs_sent =
1760 (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
1761 const size_t total_failed_rpcs_sent =
1762 kNumFailuresPerAddress * backends_.size();
1763 // Check that the backends got the right number of requests.
1764 size_t total_rpcs_sent = 0;
1765 for (const auto& backend : backends_) {
1766 total_rpcs_sent += backend->backend_service()->request_count();
1767 }
1768 EXPECT_EQ(total_rpcs_sent,
1769 total_successful_rpcs_sent + total_failed_rpcs_sent);
1770 // The load report received at the balancer should be correct.
1771 std::vector<ClientStats> load_report =
1772 balancer_->lrs_service()->WaitForLoadReport();
1773 ASSERT_EQ(load_report.size(), 1UL);
1774 ClientStats& client_stats = load_report.front();
1775 EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
1776 EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
1777 EXPECT_EQ(total_successful_rpcs_sent,
1778 client_stats.total_successful_requests());
1779 EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1780 EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
1781 EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
1782 EXPECT_EQ(0U, client_stats.total_dropped_requests());
1783 ASSERT_THAT(
1784 client_stats.locality_stats(),
1785 ::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
1786 ::testing::Pair("locality1", ::testing::_)));
1787 size_t num_successful_rpcs = 0;
1788 size_t num_failed_rpcs = 0;
1789 ClientStats::LocalityStats::LoadMetric cpu_utilization;
1790 ClientStats::LocalityStats::LoadMetric mem_utilization;
1791 ClientStats::LocalityStats::LoadMetric application_utilization;
1792 std::map<std::string, ClientStats::LocalityStats::LoadMetric>
1793 named_metrics_total;
1794 for (const auto& p : client_stats.locality_stats()) {
1795 EXPECT_EQ(p.second.total_requests_in_progress, 0U);
1796 EXPECT_EQ(
1797 p.second.total_issued_requests,
1798 p.second.total_successful_requests + p.second.total_error_requests);
1799 num_successful_rpcs += p.second.total_successful_requests;
1800 num_failed_rpcs += p.second.total_error_requests;
1801 cpu_utilization += p.second.cpu_utilization;
1802 mem_utilization += p.second.mem_utilization;
1803 application_utilization += p.second.application_utilization;
1804 for (const auto& s : p.second.load_metrics) {
1805 named_metrics_total[s.first] += s.second;
1806 }
1807 }
1808 EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
1809 EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
1810 EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
1811 EXPECT_THAT(
1812 cpu_utilization,
1813 LoadMetricEq(
1814 (kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size(),
1815 (kNumRpcsPerAddress * backends_.size()) * 0.8 +
1816 (kNumFailuresPerAddress * backends_.size()) * 0.4));
1817 EXPECT_THAT(
1818 mem_utilization,
1819 LoadMetricEq(
1820 (kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size(),
1821 (kNumRpcsPerAddress * backends_.size()) * 0.6 +
1822 (kNumFailuresPerAddress * backends_.size()) * 0.3));
1823 EXPECT_THAT(
1824 application_utilization,
1825 LoadMetricEq(
1826 (kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size(),
1827 (kNumRpcsPerAddress * backends_.size()) * 0.4 +
1828 (kNumFailuresPerAddress * backends_.size()) * 0.2));
1829 EXPECT_THAT(
1830 named_metrics_total,
1831 ::testing::UnorderedElementsAre(::testing::Pair(
1832 "named_metrics.foo",
1833 LoadMetricEq(
1834 (kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size(),
1835 (kNumRpcsPerAddress * backends_.size()) * 1.0 +
1836 (kNumFailuresPerAddress * backends_.size()) * 0.3))));
1837 // The LRS service got a single request, and sent a single response.
1838 EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1839 EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1840 }
1841
TEST_P(ClientLoadReportingTest,OrcaPropagationNamedMetricsAll)1842 TEST_P(ClientLoadReportingTest, OrcaPropagationNamedMetricsAll) {
1843 grpc_core::testing::ScopedExperimentalEnvVar env(
1844 "GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
1845 CreateAndStartBackends(4);
1846 const size_t kNumRpcsPerAddress = 10;
1847 const size_t kNumFailuresPerAddress = 3;
1848 Cluster cluster = default_cluster_;
1849 cluster.add_lrs_report_endpoint_metrics("named_metrics.*");
1850 balancer_->ads_service()->SetCdsResource(cluster);
1851 EdsResourceArgs args({
1852 {"locality0", CreateEndpointsForBackends(0, 2)},
1853 {"locality1", CreateEndpointsForBackends(2, 4)},
1854 });
1855 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1856 // Wait until all backends are ready.
1857 size_t num_warmup_rpcs =
1858 WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
1859 WaitForBackendOptions().set_reset_counters(false));
1860 // Send kNumRpcsPerAddress RPCs per server with named metrics.
1861 xds::data::orca::v3::OrcaLoadReport backend_metrics;
1862 backend_metrics.set_cpu_utilization(0.8);
1863 backend_metrics.set_mem_utilization(0.6);
1864 backend_metrics.set_application_utilization(0.4);
1865 auto& named_metrics = (*backend_metrics.mutable_named_metrics());
1866 named_metrics["foo"] = 1.0;
1867 named_metrics["bar"] = 2.0;
1868 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
1869 RpcOptions().set_backend_metrics(backend_metrics));
1870 backend_metrics.set_cpu_utilization(0.4);
1871 backend_metrics.set_mem_utilization(0.3);
1872 backend_metrics.set_application_utilization(0.2);
1873 named_metrics["foo"] = 0.3;
1874 named_metrics["bar"] = 0.4;
1875 for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
1876 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
1877 RpcOptions().set_server_fail(true).set_backend_metrics(
1878 backend_metrics));
1879 }
1880 const size_t total_successful_rpcs_sent =
1881 (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
1882 const size_t total_failed_rpcs_sent =
1883 kNumFailuresPerAddress * backends_.size();
1884 // Check that the backends got the right number of requests.
1885 size_t total_rpcs_sent = 0;
1886 for (const auto& backend : backends_) {
1887 total_rpcs_sent += backend->backend_service()->request_count();
1888 }
1889 EXPECT_EQ(total_rpcs_sent,
1890 total_successful_rpcs_sent + total_failed_rpcs_sent);
1891 // The load report received at the balancer should be correct.
1892 std::vector<ClientStats> load_report =
1893 balancer_->lrs_service()->WaitForLoadReport();
1894 ASSERT_EQ(load_report.size(), 1UL);
1895 ClientStats& client_stats = load_report.front();
1896 EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
1897 EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
1898 EXPECT_EQ(total_successful_rpcs_sent,
1899 client_stats.total_successful_requests());
1900 EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1901 EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
1902 EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
1903 EXPECT_EQ(0U, client_stats.total_dropped_requests());
1904 ASSERT_THAT(
1905 client_stats.locality_stats(),
1906 ::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
1907 ::testing::Pair("locality1", ::testing::_)));
1908 size_t num_successful_rpcs = 0;
1909 size_t num_failed_rpcs = 0;
1910 std::map<std::string, ClientStats::LocalityStats::LoadMetric>
1911 named_metrics_total;
1912 for (const auto& p : client_stats.locality_stats()) {
1913 EXPECT_EQ(p.second.total_requests_in_progress, 0U);
1914 EXPECT_EQ(
1915 p.second.total_issued_requests,
1916 p.second.total_successful_requests + p.second.total_error_requests);
1917 num_successful_rpcs += p.second.total_successful_requests;
1918 num_failed_rpcs += p.second.total_error_requests;
1919 for (const auto& s : p.second.load_metrics) {
1920 named_metrics_total[s.first] += s.second;
1921 }
1922 }
1923 EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
1924 EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
1925 EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
1926 EXPECT_THAT(
1927 named_metrics_total,
1928 ::testing::UnorderedElementsAre(
1929 ::testing::Pair(
1930 "named_metrics.foo",
1931 LoadMetricEq(
1932 (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1933 backends_.size(),
1934 (kNumRpcsPerAddress * backends_.size()) * 1.0 +
1935 (kNumFailuresPerAddress * backends_.size()) * 0.3)),
1936 ::testing::Pair(
1937 "named_metrics.bar",
1938 LoadMetricEq(
1939 (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1940 backends_.size(),
1941 (kNumRpcsPerAddress * backends_.size()) * 2.0 +
1942 (kNumFailuresPerAddress * backends_.size()) * 0.4))));
1943 // The LRS service got a single request, and sent a single response.
1944 EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1945 EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1946 }
1947
TEST_P(ClientLoadReportingTest,OrcaPropagationNotConfigured)1948 TEST_P(ClientLoadReportingTest, OrcaPropagationNotConfigured) {
1949 grpc_core::testing::ScopedExperimentalEnvVar env(
1950 "GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
1951 CreateAndStartBackends(4);
1952 const size_t kNumRpcsPerAddress = 10;
1953 const size_t kNumFailuresPerAddress = 3;
1954 EdsResourceArgs args({
1955 {"locality0", CreateEndpointsForBackends(0, 2)},
1956 {"locality1", CreateEndpointsForBackends(2, 4)},
1957 });
1958 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1959 // Wait until all backends are ready.
1960 size_t num_warmup_rpcs =
1961 WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
1962 WaitForBackendOptions().set_reset_counters(false));
1963 // Send kNumRpcsPerAddress RPCs per server with named metrics.
1964 xds::data::orca::v3::OrcaLoadReport backend_metrics;
1965 backend_metrics.set_cpu_utilization(0.8);
1966 backend_metrics.set_mem_utilization(0.6);
1967 backend_metrics.set_application_utilization(0.4);
1968 auto& named_metrics = (*backend_metrics.mutable_named_metrics());
1969 named_metrics["foo"] = 1.0;
1970 named_metrics["bar"] = 2.0;
1971 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
1972 RpcOptions().set_backend_metrics(backend_metrics));
1973 backend_metrics.set_cpu_utilization(0.4);
1974 backend_metrics.set_mem_utilization(0.3);
1975 backend_metrics.set_application_utilization(0.2);
1976 named_metrics["foo"] = 0.3;
1977 named_metrics["bar"] = 0.4;
1978 for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
1979 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
1980 RpcOptions().set_server_fail(true).set_backend_metrics(
1981 backend_metrics));
1982 }
1983 const size_t total_successful_rpcs_sent =
1984 (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
1985 const size_t total_failed_rpcs_sent =
1986 kNumFailuresPerAddress * backends_.size();
1987 // Check that the backends got the right number of requests.
1988 size_t total_rpcs_sent = 0;
1989 for (const auto& backend : backends_) {
1990 total_rpcs_sent += backend->backend_service()->request_count();
1991 }
1992 EXPECT_EQ(total_rpcs_sent,
1993 total_successful_rpcs_sent + total_failed_rpcs_sent);
1994 // The load report received at the balancer should be correct.
1995 std::vector<ClientStats> load_report =
1996 balancer_->lrs_service()->WaitForLoadReport();
1997 ASSERT_EQ(load_report.size(), 1UL);
1998 ClientStats& client_stats = load_report.front();
1999 EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
2000 EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
2001 EXPECT_EQ(total_successful_rpcs_sent,
2002 client_stats.total_successful_requests());
2003 EXPECT_EQ(0U, client_stats.total_requests_in_progress());
2004 EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
2005 EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
2006 EXPECT_EQ(0U, client_stats.total_dropped_requests());
2007 ASSERT_THAT(
2008 client_stats.locality_stats(),
2009 ::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
2010 ::testing::Pair("locality1", ::testing::_)));
2011 size_t num_successful_rpcs = 0;
2012 size_t num_failed_rpcs = 0;
2013 std::map<std::string, ClientStats::LocalityStats::LoadMetric>
2014 named_metrics_total;
2015 for (const auto& p : client_stats.locality_stats()) {
2016 EXPECT_EQ(p.second.total_requests_in_progress, 0U);
2017 EXPECT_EQ(
2018 p.second.total_issued_requests,
2019 p.second.total_successful_requests + p.second.total_error_requests);
2020 num_successful_rpcs += p.second.total_successful_requests;
2021 num_failed_rpcs += p.second.total_error_requests;
2022 for (const auto& s : p.second.load_metrics) {
2023 named_metrics_total[s.first] += s.second;
2024 }
2025 }
2026 EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
2027 EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
2028 EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
2029 EXPECT_THAT(named_metrics_total, ::testing::UnorderedElementsAre());
2030 // The LRS service got a single request, and sent a single response.
2031 EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
2032 EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
2033 }
2034
2035 // Tests send_all_clusters.
TEST_P(ClientLoadReportingTest,SendAllClusters)2036 TEST_P(ClientLoadReportingTest, SendAllClusters) {
2037 CreateAndStartBackends(2);
2038 balancer_->lrs_service()->set_send_all_clusters(true);
2039 const size_t kNumRpcsPerAddress = 10;
2040 const size_t kNumFailuresPerAddress = 3;
2041 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
2042 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
2043 // Wait until all backends are ready.
2044 size_t num_warmup_rpcs = WaitForAllBackends(DEBUG_LOCATION);
2045 // Send kNumRpcsPerAddress RPCs per server.
2046 xds::data::orca::v3::OrcaLoadReport backend_metrics;
2047 auto& named_metrics = (*backend_metrics.mutable_named_metrics());
2048 named_metrics["foo"] = 1.0;
2049 named_metrics["bar"] = 2.0;
2050 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
2051 RpcOptions().set_backend_metrics(backend_metrics));
2052 named_metrics["foo"] = 0.3;
2053 named_metrics["bar"] = 0.4;
2054 for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
2055 CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
2056 RpcOptions().set_server_fail(true).set_backend_metrics(
2057 backend_metrics));
2058 }
2059 // Check that each backend got the right number of requests.
2060 for (size_t i = 0; i < backends_.size(); ++i) {
2061 EXPECT_EQ(kNumRpcsPerAddress + kNumFailuresPerAddress,
2062 backends_[i]->backend_service()->request_count());
2063 }
2064 // The load report received at the balancer should be correct.
2065 std::vector<ClientStats> load_report =
2066 balancer_->lrs_service()->WaitForLoadReport();
2067 ASSERT_EQ(load_report.size(), 1UL);
2068 ClientStats& client_stats = load_report.front();
2069 EXPECT_EQ(kNumRpcsPerAddress * backends_.size() + num_warmup_rpcs,
2070 client_stats.total_successful_requests());
2071 EXPECT_EQ(0U, client_stats.total_requests_in_progress());
2072 EXPECT_EQ((kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size() +
2073 num_warmup_rpcs,
2074 client_stats.total_issued_requests());
2075 EXPECT_EQ(kNumFailuresPerAddress * backends_.size(),
2076 client_stats.total_error_requests());
2077 EXPECT_EQ(0U, client_stats.total_dropped_requests());
2078 EXPECT_THAT(
2079 client_stats.locality_stats(),
2080 ::testing::ElementsAre(::testing::Pair(
2081 "locality0",
2082 ::testing::Field(
2083 &ClientStats::LocalityStats::load_metrics,
2084 ::testing::UnorderedElementsAre(
2085 ::testing::Pair(
2086 "foo",
2087 LoadMetricEq(
2088 (kNumRpcsPerAddress + kNumFailuresPerAddress) *
2089 backends_.size(),
2090 (kNumRpcsPerAddress * backends_.size()) * 1.0 +
2091 (kNumFailuresPerAddress * backends_.size()) *
2092 0.3)),
2093 ::testing::Pair(
2094 "bar",
2095 LoadMetricEq(
2096 (kNumRpcsPerAddress + kNumFailuresPerAddress) *
2097 backends_.size(),
2098 (kNumRpcsPerAddress * backends_.size()) * 2.0 +
2099 (kNumFailuresPerAddress * backends_.size()) *
2100 0.4)))))));
2101 // The LRS service got a single request, and sent a single response.
2102 EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
2103 EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
2104 }
2105
2106 // Tests that we don't include stats for clusters that are not requested
2107 // by the LRS server.
TEST_P(ClientLoadReportingTest,HonorsClustersRequestedByLrsServer)2108 TEST_P(ClientLoadReportingTest, HonorsClustersRequestedByLrsServer) {
2109 CreateAndStartBackends(1);
2110 balancer_->lrs_service()->set_cluster_names({"bogus"});
2111 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
2112 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
2113 // Wait until all backends are ready.
2114 WaitForAllBackends(DEBUG_LOCATION);
2115 // The load report received at the balancer should be correct.
2116 std::vector<ClientStats> load_report =
2117 balancer_->lrs_service()->WaitForLoadReport();
2118 ASSERT_EQ(load_report.size(), 0UL);
2119 // The LRS service got a single request, and sent a single response.
2120 EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
2121 EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
2122 }
2123
2124 // Tests that if the balancer restarts, the client load report contains the
2125 // stats before and after the restart correctly.
TEST_P(ClientLoadReportingTest,BalancerRestart)2126 TEST_P(ClientLoadReportingTest, BalancerRestart) {
2127 CreateAndStartBackends(4);
2128 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 2)}});
2129 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
2130 // Wait until all backends returned by the balancer are ready.
2131 size_t num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
2132 std::vector<ClientStats> load_report =
2133 balancer_->lrs_service()->WaitForLoadReport();
2134 ASSERT_EQ(load_report.size(), 1UL);
2135 ClientStats client_stats = std::move(load_report.front());
2136 EXPECT_EQ(num_rpcs, client_stats.total_successful_requests());
2137 EXPECT_EQ(0U, client_stats.total_requests_in_progress());
2138 EXPECT_EQ(0U, client_stats.total_error_requests());
2139 EXPECT_EQ(0U, client_stats.total_dropped_requests());
2140 EXPECT_THAT(client_stats.locality_stats(),
2141 ::testing::ElementsAre(::testing::Pair(
2142 "locality0",
2143 ::testing::Field(&ClientStats::LocalityStats::load_metrics,
2144 ::testing::IsEmpty()))));
2145 // Shut down the balancer.
2146 balancer_->Shutdown();
2147 // We should continue using the last EDS response we received from the
2148 // balancer before it was shut down.
2149 // Note: We need to use WaitForAllBackends() here instead of just
2150 // CheckRpcSendOk(kNumBackendsFirstPass), because when the balancer
2151 // shuts down, the XdsClient will generate an error to the
2152 // ListenerWatcher, which will cause the xds resolver to send a
2153 // no-op update to the LB policy. When this update gets down to the
2154 // round_robin child policy for the locality, it will generate a new
2155 // subchannel list, which resets the start index randomly. So we need
2156 // to be a little more permissive here to avoid spurious failures.
2157 ResetBackendCounters();
2158 num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
2159 // Now restart the balancer, this time pointing to the new backends.
2160 balancer_->Start();
2161 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(2, 4)}});
2162 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
2163 // Wait for queries to start going to one of the new backends.
2164 // This tells us that we're now using the new serverlist.
2165 num_rpcs += WaitForAllBackends(DEBUG_LOCATION, 2, 4);
2166 // Send one RPC per backend.
2167 xds::data::orca::v3::OrcaLoadReport backend_metrics;
2168 auto& named_metrics = (*backend_metrics.mutable_named_metrics());
2169 named_metrics["foo"] = 1.0;
2170 named_metrics["bar"] = 2.0;
2171 CheckRpcSendOk(DEBUG_LOCATION, 2,
2172 RpcOptions().set_backend_metrics(backend_metrics));
2173 num_rpcs += 2;
2174 // Check client stats.
2175 load_report = balancer_->lrs_service()->WaitForLoadReport();
2176 ASSERT_EQ(load_report.size(), 1UL);
2177 client_stats = std::move(load_report.front());
2178 EXPECT_EQ(num_rpcs, client_stats.total_successful_requests());
2179 EXPECT_EQ(0U, client_stats.total_requests_in_progress());
2180 EXPECT_EQ(0U, client_stats.total_error_requests());
2181 EXPECT_EQ(0U, client_stats.total_dropped_requests());
2182 EXPECT_THAT(client_stats.locality_stats(),
2183 ::testing::ElementsAre(::testing::Pair(
2184 "locality0",
2185 ::testing::Field(
2186 &ClientStats::LocalityStats::load_metrics,
2187 ::testing::UnorderedElementsAre(
2188 ::testing::Pair("foo", LoadMetricEq(2, 2.0)),
2189 ::testing::Pair("bar", LoadMetricEq(2, 4.0)))))));
2190 }
2191
2192 // Tests load reporting when switching over from one cluster to another.
TEST_P(ClientLoadReportingTest,ChangeClusters)2193 TEST_P(ClientLoadReportingTest, ChangeClusters) {
2194 CreateAndStartBackends(4);
2195 const char* kNewClusterName = "new_cluster_name";
2196 const char* kNewEdsServiceName = "new_eds_service_name";
2197 balancer_->lrs_service()->set_cluster_names(
2198 {kDefaultClusterName, kNewClusterName});
2199 // cluster kDefaultClusterName -> locality0 -> backends 0 and 1
2200 EdsResourceArgs args({
2201 {"locality0", CreateEndpointsForBackends(0, 2)},
2202 });
2203 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
2204 // cluster kNewClusterName -> locality1 -> backends 2 and 3
2205 EdsResourceArgs args2({
2206 {"locality1", CreateEndpointsForBackends(2, 4)},
2207 });
2208 balancer_->ads_service()->SetEdsResource(
2209 BuildEdsResource(args2, kNewEdsServiceName));
2210 // CDS resource for kNewClusterName.
2211 Cluster new_cluster = default_cluster_;
2212 new_cluster.set_name(kNewClusterName);
2213 new_cluster.mutable_eds_cluster_config()->set_service_name(
2214 kNewEdsServiceName);
2215 balancer_->ads_service()->SetCdsResource(new_cluster);
2216 // Wait for all backends to come online.
2217 size_t num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
2218 // The load report received at the balancer should be correct.
2219 std::vector<ClientStats> load_report =
2220 balancer_->lrs_service()->WaitForLoadReport();
2221 EXPECT_THAT(
2222 load_report,
2223 ::testing::ElementsAre(::testing::AllOf(
2224 ::testing::Property(&ClientStats::cluster_name, kDefaultClusterName),
2225 ::testing::Property(&ClientStats::eds_service_name,
2226 kDefaultEdsServiceName),
2227 ::testing::Property(
2228 &ClientStats::locality_stats,
2229 ::testing::ElementsAre(::testing::Pair(
2230 "locality0",
2231 ::testing::AllOf(
2232 ::testing::Field(&ClientStats::LocalityStats::
2233 total_successful_requests,
2234 num_rpcs),
2235 ::testing::Field(&ClientStats::LocalityStats::
2236 total_requests_in_progress,
2237 0UL),
2238 ::testing::Field(
2239 &ClientStats::LocalityStats::total_error_requests,
2240 0UL),
2241 ::testing::Field(
2242 &ClientStats::LocalityStats::total_issued_requests,
2243 num_rpcs),
2244 ::testing::Field(
2245 &ClientStats::LocalityStats::load_metrics,
2246 ::testing::IsEmpty()))))),
2247 ::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
2248 // Change RDS resource to point to new cluster.
2249 RouteConfiguration new_route_config = default_route_config_;
2250 new_route_config.mutable_virtual_hosts(0)
2251 ->mutable_routes(0)
2252 ->mutable_route()
2253 ->set_cluster(kNewClusterName);
2254 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
2255 new_route_config);
2256 // Wait for all new backends to be used.
2257 num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 2, 4);
2258 // The load report received at the balancer should be correct.
2259 load_report = balancer_->lrs_service()->WaitForLoadReport();
2260 EXPECT_THAT(
2261 load_report,
2262 ::testing::ElementsAre(
2263 ::testing::AllOf(
2264 ::testing::Property(&ClientStats::cluster_name,
2265 kDefaultClusterName),
2266 ::testing::Property(&ClientStats::eds_service_name,
2267 kDefaultEdsServiceName),
2268 ::testing::Property(
2269 &ClientStats::locality_stats,
2270 ::testing::ElementsAre(::testing::Pair(
2271 "locality0",
2272 ::testing::AllOf(
2273 ::testing::Field(&ClientStats::LocalityStats::
2274 total_successful_requests,
2275 ::testing::Lt(num_rpcs)),
2276 ::testing::Field(&ClientStats::LocalityStats::
2277 total_requests_in_progress,
2278 0UL),
2279 ::testing::Field(
2280 &ClientStats::LocalityStats::total_error_requests,
2281 0UL),
2282 ::testing::Field(&ClientStats::LocalityStats::
2283 total_issued_requests,
2284 ::testing::Le(num_rpcs)),
2285 ::testing::Field(
2286 &ClientStats::LocalityStats::load_metrics,
2287 ::testing::IsEmpty()))))),
2288 ::testing::Property(&ClientStats::total_dropped_requests, 0UL)),
2289 ::testing::AllOf(
2290 ::testing::Property(&ClientStats::cluster_name, kNewClusterName),
2291 ::testing::Property(&ClientStats::eds_service_name,
2292 kNewEdsServiceName),
2293 ::testing::Property(
2294 &ClientStats::locality_stats,
2295 ::testing::ElementsAre(::testing::Pair(
2296 "locality1",
2297 ::testing::AllOf(
2298 ::testing::Field(&ClientStats::LocalityStats::
2299 total_successful_requests,
2300 ::testing::Le(num_rpcs)),
2301 ::testing::Field(&ClientStats::LocalityStats::
2302 total_requests_in_progress,
2303 0UL),
2304 ::testing::Field(
2305 &ClientStats::LocalityStats::total_error_requests,
2306 0UL),
2307 ::testing::Field(&ClientStats::LocalityStats::
2308 total_issued_requests,
2309 ::testing::Le(num_rpcs)),
2310 ::testing::Field(
2311 &ClientStats::LocalityStats::load_metrics,
2312 ::testing::IsEmpty()))))),
2313 ::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
2314 size_t total_ok = 0;
2315 for (const ClientStats& client_stats : load_report) {
2316 total_ok += client_stats.total_successful_requests();
2317 }
2318 EXPECT_EQ(total_ok, num_rpcs);
2319 // The LRS service got a single request, and sent a single response.
2320 EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
2321 EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
2322 }
2323
2324 // Tests that the drop stats are correctly reported by client load reporting.
TEST_P(ClientLoadReportingTest,DropStats)2325 TEST_P(ClientLoadReportingTest, DropStats) {
2326 CreateAndStartBackends(1);
2327 const uint32_t kDropPerMillionForLb = 100000;
2328 const uint32_t kDropPerMillionForThrottle = 200000;
2329 const double kErrorTolerance = 0.05;
2330 const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
2331 const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
2332 const double kDropRateForLbAndThrottle =
2333 kDropRateForLb + ((1 - kDropRateForLb) * kDropRateForThrottle);
2334 const size_t kNumRpcs =
2335 ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
2336 // The ADS response contains two drop categories.
2337 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
2338 args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
2339 {kThrottleDropType, kDropPerMillionForThrottle}};
2340 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
2341 // Send kNumRpcs RPCs and count the drops.
2342 size_t num_drops = SendRpcsAndCountFailuresWithMessage(
2343 DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
2344 kStatusMessageDropPrefix);
2345 // The drop rate should be roughly equal to the expectation.
2346 const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
2347 EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
2348 kErrorTolerance));
2349 // Check client stats.
2350 ClientStats client_stats;
2351 do {
2352 std::vector<ClientStats> load_reports =
2353 balancer_->lrs_service()->WaitForLoadReport();
2354 for (const auto& load_report : load_reports) {
2355 client_stats += load_report;
2356 }
2357 } while (client_stats.total_issued_requests() +
2358 client_stats.total_dropped_requests() <
2359 kNumRpcs);
2360 EXPECT_EQ(num_drops, client_stats.total_dropped_requests());
2361 EXPECT_THAT(static_cast<double>(client_stats.dropped_requests(kLbDropType)) /
2362 kNumRpcs,
2363 ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
2364 EXPECT_THAT(
2365 static_cast<double>(client_stats.dropped_requests(kThrottleDropType)) /
2366 (kNumRpcs * (1 - kDropRateForLb)),
2367 ::testing::DoubleNear(kDropRateForThrottle, kErrorTolerance));
2368 }
2369
2370 } // namespace
2371 } // namespace testing
2372 } // namespace grpc
2373
main(int argc,char ** argv)2374 int main(int argc, char** argv) {
2375 grpc::testing::TestEnvironment env(&argc, argv);
2376 ::testing::InitGoogleTest(&argc, argv);
2377 // Make the backup poller poll very frequently in order to pick up
2378 // updates from all the subchannels's FDs.
2379 grpc_core::ConfigVars::Overrides overrides;
2380 overrides.client_channel_backup_poll_interval_ms = 1;
2381 grpc_core::ConfigVars::SetOverrides(overrides);
2382 #if TARGET_OS_IPHONE
2383 // Workaround Apple CFStream bug
2384 grpc_core::SetEnv("grpc_cfstream", "0");
2385 #endif
2386 grpc_core::RegisterFakeStatsPlugin();
2387 grpc_init();
2388 grpc::testing::ConnectionAttemptInjector::Init();
2389 const auto result = RUN_ALL_TESTS();
2390 grpc_shutdown();
2391 return result;
2392 }
2393