• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 #include <gmock/gmock.h>
17 #include <gtest/gtest.h>
18 
19 #include <numeric>
20 #include <string>
21 #include <vector>
22 
23 #include "absl/log/log.h"
24 #include "absl/strings/match.h"
25 #include "absl/strings/str_cat.h"
26 #include "src/core/client_channel/backup_poller.h"
27 #include "src/core/config/config_vars.h"
28 #include "src/core/lib/address_utils/sockaddr_utils.h"
29 #include "src/core/lib/surface/call.h"
30 #include "src/core/telemetry/call_tracer.h"
31 #include "test/core/test_util/fake_stats_plugin.h"
32 #include "test/core/test_util/scoped_env_var.h"
33 #include "test/cpp/end2end/connection_attempt_injector.h"
34 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
35 #include "xds/data/orca/v3/orca_load_report.pb.h"
36 
37 namespace grpc {
38 namespace testing {
39 namespace {
40 
41 using ::envoy::config::cluster::v3::CircuitBreakers;
42 using ::envoy::config::core::v3::HealthStatus;
43 using ::envoy::config::core::v3::RoutingPriority;
44 using ::envoy::type::v3::FractionalPercent;
45 
46 using ClientStats = LrsServiceImpl::ClientStats;
47 using OptionalLabelKey =
48     grpc_core::ClientCallTracer::CallAttemptTracer::OptionalLabelKey;
49 
50 constexpr char kLbDropType[] = "lb";
51 constexpr char kThrottleDropType[] = "throttle";
52 constexpr char kStatusMessageDropPrefix[] = "EDS-configured drop: ";
53 
54 //
55 // CDS tests
56 //
57 
58 using CdsTest = XdsEnd2endTest;
59 
60 INSTANTIATE_TEST_SUITE_P(XdsTest, CdsTest, ::testing::Values(XdsTestType()),
61                          &XdsTestType::Name);
62 
63 // Tests that CDS client should send an ACK upon correct CDS response.
TEST_P(CdsTest,Vanilla)64 TEST_P(CdsTest, Vanilla) {
65   (void)SendRpc();
66   auto response_state = balancer_->ads_service()->cds_response_state();
67   ASSERT_TRUE(response_state.has_value());
68   EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
69 }
70 
71 // Testing just one example of an invalid resource here.
72 // Unit tests for XdsClusterResourceType have exhaustive tests for all
73 // of the invalid cases.
TEST_P(CdsTest,InvalidClusterResource)74 TEST_P(CdsTest, InvalidClusterResource) {
75   auto cluster = default_cluster_;
76   cluster.set_type(Cluster::STATIC);
77   balancer_->ads_service()->SetCdsResource(cluster);
78   const auto response_state = WaitForCdsNack(DEBUG_LOCATION);
79   ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
80   EXPECT_EQ(response_state->error_message,
81             "xDS response validation errors: ["
82             "resource index 0: cluster_name: "
83             "INVALID_ARGUMENT: errors validating Cluster resource: ["
84             "field:type error:unknown discovery type]]");
85 }
86 
87 // Tests that we don't trigger does-not-exist callbacks for a resource
88 // that was previously valid but is updated to be invalid.
TEST_P(CdsTest,InvalidClusterStillExistsIfPreviouslyCached)89 TEST_P(CdsTest, InvalidClusterStillExistsIfPreviouslyCached) {
90   CreateAndStartBackends(1);
91   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
92   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
93   // Check that everything works.
94   CheckRpcSendOk(DEBUG_LOCATION);
95   // Now send an update changing the Cluster to be invalid.
96   auto cluster = default_cluster_;
97   cluster.set_type(Cluster::STATIC);
98   balancer_->ads_service()->SetCdsResource(cluster);
99   const auto response_state =
100       WaitForCdsNack(DEBUG_LOCATION, RpcOptions(), StatusCode::OK);
101   ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
102   EXPECT_EQ(response_state->error_message,
103             "xDS response validation errors: ["
104             "resource index 0: cluster_name: "
105             "INVALID_ARGUMENT: errors validating Cluster resource: ["
106             "field:type error:unknown discovery type]]");
107   CheckRpcSendOk(DEBUG_LOCATION);
108 }
109 
110 // Tests round robin is not impacted by the endpoint weight, and that the
111 // localities in a locality map are picked according to their weights.
TEST_P(CdsTest,EndpointWeightDoesNotImpactWeightedRoundRobin)112 TEST_P(CdsTest, EndpointWeightDoesNotImpactWeightedRoundRobin) {
113   CreateAndStartBackends(2);
114   const int kLocalityWeight0 = 2;
115   const int kLocalityWeight1 = 8;
116   const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
117   const double kLocalityWeightRate0 =
118       static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
119   const double kLocalityWeightRate1 =
120       static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
121   const double kErrorTolerance = 0.05;
122   const size_t kNumRpcs =
123       ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
124   // ADS response contains 2 localities, each of which contains 1 backend.
125   EdsResourceArgs args({
126       {"locality0",
127        {CreateEndpoint(0, HealthStatus::UNKNOWN, 8)},
128        kLocalityWeight0},
129       {"locality1",
130        {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)},
131        kLocalityWeight1},
132   });
133   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
134   // Wait for both backends to be ready.
135   WaitForAllBackends(DEBUG_LOCATION, 0, 2);
136   // Send kNumRpcs RPCs.
137   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
138   // The locality picking rates should be roughly equal to the expectation.
139   const double locality_picked_rate_0 =
140       static_cast<double>(backends_[0]->backend_service()->request_count()) /
141       kNumRpcs;
142   const double locality_picked_rate_1 =
143       static_cast<double>(backends_[1]->backend_service()->request_count()) /
144       kNumRpcs;
145   EXPECT_THAT(locality_picked_rate_0,
146               ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
147   EXPECT_THAT(locality_picked_rate_1,
148               ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
149 }
150 
151 // In most of our tests, we use different names for different resource
152 // types, to make sure that there are no cut-and-paste errors in the code
153 // that cause us to look at data for the wrong resource type.  So we add
154 // this test to make sure that the EDS resource name defaults to the
155 // cluster name if not specified in the CDS resource.
TEST_P(CdsTest,EdsServiceNameDefaultsToClusterName)156 TEST_P(CdsTest, EdsServiceNameDefaultsToClusterName) {
157   CreateAndStartBackends(1);
158   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
159   balancer_->ads_service()->SetEdsResource(
160       BuildEdsResource(args, kDefaultClusterName));
161   Cluster cluster = default_cluster_;
162   cluster.mutable_eds_cluster_config()->clear_service_name();
163   balancer_->ads_service()->SetCdsResource(cluster);
164   CheckRpcSendOk(DEBUG_LOCATION, /*times=*/1,
165                  RpcOptions().set_timeout_ms(5000));
166 }
167 
168 // Tests switching over from one cluster to another.
TEST_P(CdsTest,ChangeClusters)169 TEST_P(CdsTest, ChangeClusters) {
170   CreateAndStartBackends(2);
171   const char* kNewClusterName = "new_cluster_name";
172   const char* kNewEdsServiceName = "new_eds_service_name";
173   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
174   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
175   // We need to wait for all backends to come online.
176   WaitForAllBackends(DEBUG_LOCATION, 0, 1);
177   // Populate new EDS resource.
178   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
179   balancer_->ads_service()->SetEdsResource(
180       BuildEdsResource(args, kNewEdsServiceName));
181   // Populate new CDS resource.
182   Cluster new_cluster = default_cluster_;
183   new_cluster.set_name(kNewClusterName);
184   new_cluster.mutable_eds_cluster_config()->set_service_name(
185       kNewEdsServiceName);
186   balancer_->ads_service()->SetCdsResource(new_cluster);
187   // Change RDS resource to point to new cluster.
188   RouteConfiguration new_route_config = default_route_config_;
189   new_route_config.mutable_virtual_hosts(0)
190       ->mutable_routes(0)
191       ->mutable_route()
192       ->set_cluster(kNewClusterName);
193   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
194                                    new_route_config);
195   // Wait for all new backends to be used.
196   WaitForAllBackends(DEBUG_LOCATION, 1, 2);
197 }
198 
TEST_P(CdsTest,CircuitBreaking)199 TEST_P(CdsTest, CircuitBreaking) {
200   CreateAndStartBackends(1);
201   constexpr size_t kMaxConcurrentRequests = 10;
202   // Populate new EDS resources.
203   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
204   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
205   // Update CDS resource to set max concurrent request.
206   CircuitBreakers circuit_breaks;
207   Cluster cluster = default_cluster_;
208   auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds();
209   threshold->set_priority(RoutingPriority::DEFAULT);
210   threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
211   balancer_->ads_service()->SetCdsResource(cluster);
212   // Send exactly max_concurrent_requests long RPCs.
213   LongRunningRpc rpcs[kMaxConcurrentRequests];
214   for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
215     rpcs[i].StartRpc(stub_.get());
216   }
217   // Wait for all RPCs to be in flight.
218   while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() <
219          kMaxConcurrentRequests) {
220     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
221                                  gpr_time_from_micros(1 * 1000, GPR_TIMESPAN)));
222   }
223   // Sending a RPC now should fail, the error message should tell us
224   // we hit the max concurrent requests limit and got dropped.
225   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
226                       "circuit breaker drop");
227   // Cancel one RPC to allow another one through.
228   rpcs[0].CancelRpc();
229   // Add a sleep here to ensure the RPC cancellation has completed correctly
230   // before trying the next RPC. There maybe a slight delay between return of
231   // CANCELLED RPC status and update of internal state tracking the number of
232   // concurrent active requests.
233   gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
234                                gpr_time_from_millis(1000, GPR_TIMESPAN)));
235   CheckRpcSendOk(DEBUG_LOCATION);
236   // Clean up.
237   for (size_t i = 1; i < kMaxConcurrentRequests; ++i) {
238     rpcs[i].CancelRpc();
239   }
240 }
241 
TEST_P(CdsTest,CircuitBreakingMultipleChannelsShareCallCounter)242 TEST_P(CdsTest, CircuitBreakingMultipleChannelsShareCallCounter) {
243   CreateAndStartBackends(1);
244   constexpr size_t kMaxConcurrentRequests = 10;
245   // Populate new EDS resources.
246   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
247   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
248   // Update CDS resource to set max concurrent request.
249   CircuitBreakers circuit_breaks;
250   Cluster cluster = default_cluster_;
251   auto* threshold = cluster.mutable_circuit_breakers()->add_thresholds();
252   threshold->set_priority(RoutingPriority::DEFAULT);
253   threshold->mutable_max_requests()->set_value(kMaxConcurrentRequests);
254   balancer_->ads_service()->SetCdsResource(cluster);
255   auto channel2 = CreateChannel();
256   auto stub2 = grpc::testing::EchoTestService::NewStub(channel2);
257   // Send exactly max_concurrent_requests long RPCs, alternating between
258   // the two channels.
259   LongRunningRpc rpcs[kMaxConcurrentRequests];
260   for (size_t i = 0; i < kMaxConcurrentRequests; ++i) {
261     rpcs[i].StartRpc(i % 2 == 0 ? stub_.get() : stub2.get());
262   }
263   // Wait for all RPCs to be in flight.
264   while (backends_[0]->backend_service()->RpcsWaitingForClientCancel() <
265          kMaxConcurrentRequests) {
266     gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
267                                  gpr_time_from_micros(1 * 1000, GPR_TIMESPAN)));
268   }
269   // Sending a RPC now should fail, the error message should tell us
270   // we hit the max concurrent requests limit and got dropped.
271   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
272                       "circuit breaker drop");
273   // Cancel one RPC to allow another one through
274   rpcs[0].CancelRpc();
275   // Add a sleep here to ensure the RPC cancellation has completed correctly
276   // before trying the next RPC. There maybe a slight delay between return of
277   // CANCELLED RPC status and update of internal state tracking the number of
278   // concurrent active requests.
279   gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
280                                gpr_time_from_millis(1000, GPR_TIMESPAN)));
281   CheckRpcSendOk(DEBUG_LOCATION);
282   // Clean up.
283   for (size_t i = 1; i < kMaxConcurrentRequests; ++i) {
284     rpcs[i].CancelRpc();
285   }
286 }
287 
TEST_P(CdsTest,ClusterChangeAfterAdsCallFails)288 TEST_P(CdsTest, ClusterChangeAfterAdsCallFails) {
289   CreateAndStartBackends(2);
290   const char* kNewEdsResourceName = "new_eds_resource_name";
291   // Populate EDS resources.
292   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
293   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
294   // Check that the channel is working.
295   CheckRpcSendOk(DEBUG_LOCATION);
296   // Stop and restart the balancer.
297   balancer_->Shutdown();
298   balancer_->Start();
299   // Create new EDS resource.
300   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
301   balancer_->ads_service()->SetEdsResource(
302       BuildEdsResource(args, kNewEdsResourceName));
303   // Change CDS resource to point to new EDS resource.
304   auto cluster = default_cluster_;
305   cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
306   balancer_->ads_service()->SetCdsResource(cluster);
307   // Make sure client sees the change.
308   WaitForBackend(DEBUG_LOCATION, 1);
309 }
310 
TEST_P(CdsTest,MetricLabels)311 TEST_P(CdsTest, MetricLabels) {
312   // Injects a fake client call tracer factory. Try keep this at top.
313   grpc_core::FakeClientCallTracerFactory fake_client_call_tracer_factory;
314   CreateAndStartBackends(2);
315   // Populates EDS resources.
316   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)},
317                         {"locality1", CreateEndpointsForBackends(1, 2)}});
318   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
319   // Populates service labels to CDS resources.
320   auto cluster = default_cluster_;
321   auto& filter_map = *cluster.mutable_metadata()->mutable_filter_metadata();
322   auto& label_map =
323       *filter_map["com.google.csm.telemetry_labels"].mutable_fields();
324   *label_map["service_name"].mutable_string_value() = "myservice";
325   *label_map["service_namespace"].mutable_string_value() = "mynamespace";
326   balancer_->ads_service()->SetCdsResource(cluster);
327   ChannelArguments channel_args;
328   channel_args.SetPointer(GRPC_ARG_INJECT_FAKE_CLIENT_CALL_TRACER_FACTORY,
329                           &fake_client_call_tracer_factory);
330   ResetStub(/*failover_timeout_ms=*/0, &channel_args);
331   // Send an RPC to backend 0.
332   WaitForBackend(DEBUG_LOCATION, 0);
333   // Verify that the optional labels are recorded in the call tracer.
334   EXPECT_THAT(
335       fake_client_call_tracer_factory.GetLastFakeClientCallTracer()
336           ->GetLastCallAttemptTracer()
337           ->GetOptionalLabels(),
338       ::testing::ElementsAre(
339           ::testing::Pair(OptionalLabelKey::kXdsServiceName, "myservice"),
340           ::testing::Pair(OptionalLabelKey::kXdsServiceNamespace,
341                           "mynamespace"),
342           ::testing::Pair(OptionalLabelKey::kLocality,
343                           LocalityNameString("locality0"))));
344   // Send an RPC to backend 1.
345   WaitForBackend(DEBUG_LOCATION, 1);
346   // Verify that the optional labels are recorded in the call
347   // tracer.
348   EXPECT_THAT(
349       fake_client_call_tracer_factory.GetLastFakeClientCallTracer()
350           ->GetLastCallAttemptTracer()
351           ->GetOptionalLabels(),
352       ::testing::ElementsAre(
353           ::testing::Pair(OptionalLabelKey::kXdsServiceName, "myservice"),
354           ::testing::Pair(OptionalLabelKey::kXdsServiceNamespace,
355                           "mynamespace"),
356           ::testing::Pair(OptionalLabelKey::kLocality,
357                           LocalityNameString("locality1"))));
358 }
359 
360 //
361 // CDS deletion tests
362 //
363 
364 class CdsDeletionTest : public XdsEnd2endTest {
365  protected:
SetUp()366   void SetUp() override {}  // Individual tests call InitClient().
367 };
368 
369 INSTANTIATE_TEST_SUITE_P(XdsTest, CdsDeletionTest,
370                          ::testing::Values(XdsTestType()), &XdsTestType::Name);
371 
372 // Tests that we go into TRANSIENT_FAILURE if the Cluster is deleted.
TEST_P(CdsDeletionTest,ClusterDeleted)373 TEST_P(CdsDeletionTest, ClusterDeleted) {
374   InitClient();
375   CreateAndStartBackends(1);
376   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
377   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
378   // We need to wait for all backends to come online.
379   WaitForAllBackends(DEBUG_LOCATION);
380   // Unset CDS resource.
381   balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
382   // Wait for RPCs to start failing.
383   SendRpcsUntilFailure(
384       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
385       absl::StrCat("CDS resource ", kDefaultClusterName,
386                    ": does not exist \\(node ID:xds_end2end_test\\)"));
387   // Make sure we ACK'ed the update.
388   auto response_state = balancer_->ads_service()->cds_response_state();
389   ASSERT_TRUE(response_state.has_value());
390   EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
391 }
392 
393 // Tests that we ignore Cluster deletions if configured to do so.
TEST_P(CdsDeletionTest,ClusterDeletionIgnored)394 TEST_P(CdsDeletionTest, ClusterDeletionIgnored) {
395   InitClient(MakeBootstrapBuilder().SetIgnoreResourceDeletion());
396   CreateAndStartBackends(2);
397   // Bring up client pointing to backend 0 and wait for it to connect.
398   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
399   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
400   WaitForAllBackends(DEBUG_LOCATION, 0, 1);
401   // Make sure we ACKed the CDS update.
402   auto response_state = balancer_->ads_service()->cds_response_state();
403   ASSERT_TRUE(response_state.has_value());
404   EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
405   // Unset CDS resource and wait for client to ACK the update.
406   balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName);
407   const auto deadline = absl::Now() + absl::Seconds(30);
408   while (true) {
409     ASSERT_LT(absl::Now(), deadline) << "timed out waiting for CDS ACK";
410     response_state = balancer_->ads_service()->cds_response_state();
411     if (response_state.has_value()) break;
412   }
413   EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED);
414   // Make sure we can still send RPCs.
415   CheckRpcSendOk(DEBUG_LOCATION);
416   // Now recreate the CDS resource pointing to a new EDS resource that
417   // specified backend 1, and make sure the client uses it.
418   const char* kNewEdsResourceName = "new_eds_resource_name";
419   auto cluster = default_cluster_;
420   cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName);
421   balancer_->ads_service()->SetCdsResource(cluster);
422   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
423   balancer_->ads_service()->SetEdsResource(
424       BuildEdsResource(args, kNewEdsResourceName));
425   // Wait for client to start using backend 1.
426   WaitForAllBackends(DEBUG_LOCATION, 1, 2);
427 }
428 
429 //
430 // EDS tests
431 //
432 
433 using EdsTest = XdsEnd2endTest;
434 
435 INSTANTIATE_TEST_SUITE_P(
436     XdsTest, EdsTest,
437     ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
438     &XdsTestType::Name);
439 
440 // Tests that the balancer sends the correct response to the client, and the
441 // client sends RPCs to the backends using the default child policy.
TEST_P(EdsTest,Vanilla)442 TEST_P(EdsTest, Vanilla) {
443   CreateAndStartBackends(3);
444   const size_t kNumRpcsPerAddress = 100;
445   EdsResourceArgs args({
446       {"locality0", CreateEndpointsForBackends()},
447   });
448   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
449   // Make sure that trying to connect works without a call.
450   channel_->GetState(true /* try_to_connect */);
451   // We need to wait for all backends to come online.
452   WaitForAllBackends(DEBUG_LOCATION);
453   // Send kNumRpcsPerAddress RPCs per server.
454   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
455   // Each backend should have gotten 100 requests.
456   for (size_t i = 0; i < backends_.size(); ++i) {
457     EXPECT_EQ(kNumRpcsPerAddress,
458               backends_[i]->backend_service()->request_count());
459   }
460   // Check LB policy name for the channel.
461   EXPECT_EQ("xds_cluster_manager_experimental",
462             channel_->GetLoadBalancingPolicyName());
463 }
464 
TEST_P(EdsTest,MultipleAddressesPerEndpoint)465 TEST_P(EdsTest, MultipleAddressesPerEndpoint) {
466   grpc_core::testing::ScopedExperimentalEnvVar env(
467       "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS");
468   const size_t kNumRpcsPerAddress = 10;
469   // Create 3 backends, but leave backend 0 unstarted.
470   CreateBackends(3);
471   StartBackend(1);
472   StartBackend(2);
473   // The first endpoint is backends 0 and 1, the second endpoint is backend 2.
474   EdsResourceArgs args({
475       {"locality0",
476        {CreateEndpoint(0, HealthStatus::UNKNOWN, 1, {1}), CreateEndpoint(2)}},
477   });
478   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
479   // Initially, backend 0 is offline, so the first endpoint should
480   // connect to backend 1 instead.  Traffic should round-robin across
481   // backends 1 and 2.
482   WaitForAllBackends(DEBUG_LOCATION, 1);  // Wait for backends 1 and 2.
483   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * 2);
484   EXPECT_EQ(kNumRpcsPerAddress,
485             backends_[1]->backend_service()->request_count());
486   EXPECT_EQ(kNumRpcsPerAddress,
487             backends_[2]->backend_service()->request_count());
488   // Now start backend 0 and shutdown backend 1.
489   StartBackend(0);
490   ShutdownBackend(1);
491   // Wait for traffic to go to backend 0.
492   WaitForBackend(DEBUG_LOCATION, 0);
493   // Traffic should now round-robin across backends 0 and 2.
494   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * 2);
495   EXPECT_EQ(kNumRpcsPerAddress,
496             backends_[0]->backend_service()->request_count());
497   EXPECT_EQ(kNumRpcsPerAddress,
498             backends_[2]->backend_service()->request_count());
499 }
500 
TEST_P(EdsTest,IgnoresUnhealthyEndpoints)501 TEST_P(EdsTest, IgnoresUnhealthyEndpoints) {
502   CreateAndStartBackends(2);
503   const size_t kNumRpcsPerAddress = 100;
504   auto endpoints = CreateEndpointsForBackends();
505   endpoints.push_back(MakeNonExistentEndpoint());
506   endpoints.back().health_status = HealthStatus::DRAINING;
507   EdsResourceArgs args({
508       {"locality0", std::move(endpoints), kDefaultLocalityWeight,
509        kDefaultLocalityPriority},
510   });
511   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
512   // Make sure that trying to connect works without a call.
513   channel_->GetState(true /* try_to_connect */);
514   // We need to wait for all backends to come online.
515   WaitForAllBackends(DEBUG_LOCATION);
516   // Send kNumRpcsPerAddress RPCs per server.
517   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size());
518   // Each backend should have gotten 100 requests.
519   for (size_t i = 0; i < backends_.size(); ++i) {
520     EXPECT_EQ(kNumRpcsPerAddress,
521               backends_[i]->backend_service()->request_count());
522   }
523 }
524 
525 // This tests the bug described in https://github.com/grpc/grpc/issues/32486.
TEST_P(EdsTest,LocalityBecomesEmptyWithDeactivatedChildStateUpdate)526 TEST_P(EdsTest, LocalityBecomesEmptyWithDeactivatedChildStateUpdate) {
527   CreateAndStartBackends(1);
528   // Initial EDS resource has one locality with no endpoints.
529   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
530   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
531   WaitForAllBackends(DEBUG_LOCATION);
532   // EDS update removes all endpoints from the locality.
533   EdsResourceArgs::Locality empty_locality("locality0", {});
534   args = EdsResourceArgs({std::move(empty_locality)});
535   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
536   // Wait for RPCs to start failing.
537   constexpr char kErrorMessage[] =
538       "no children in weighted_target policy: "
539       "EDS resource eds_service_name: contains empty localities: "
540       "\\[\\{region=\"xds_default_locality_region\", "
541       "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
542   SendRpcsUntilFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
543   // Shut down backend.  This triggers a connectivity state update from the
544   // deactivated child of the weighted_target policy.
545   ShutdownAllBackends();
546   // Now restart the backend.
547   StartAllBackends();
548   // Re-add endpoint.
549   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
550   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
551   // RPCs should eventually succeed.
552   WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
553     if (!result.status.ok()) {
554       EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
555       EXPECT_THAT(result.status.error_message(),
556                   ::testing::MatchesRegex(absl::StrCat(
557                       // The error message we see here depends on whether
558                       // the client sees the EDS update before or after it
559                       // sees the backend come back up.
560                       MakeConnectionFailureRegex(
561                           "connections to all backends failing; last error: "),
562                       "|", kErrorMessage)));
563     }
564   });
565 }
566 
TEST_P(EdsTest,NoLocalities)567 TEST_P(EdsTest, NoLocalities) {
568   CreateAndStartBackends(1);
569   // Initial EDS resource has no localities.
570   EdsResourceArgs args;
571   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
572   // RPCs should fail.
573   constexpr char kErrorMessage[] =
574       "no children in weighted_target policy: EDS resource eds_service_name: "
575       "contains no localities";
576   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
577   // Send EDS resource that has an endpoint.
578   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
579   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
580   // RPCs should eventually succeed.
581   WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
582     if (!result.status.ok()) {
583       EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
584       EXPECT_THAT(result.status.error_message(),
585                   ::testing::MatchesRegex(kErrorMessage));
586     }
587   });
588 }
589 
590 // Tests that RPCs will fail with UNAVAILABLE instead of DEADLINE_EXCEEDED if
591 // all the servers are unreachable.
TEST_P(EdsTest,AllServersUnreachableFailFast)592 TEST_P(EdsTest, AllServersUnreachableFailFast) {
593   // Set Rpc timeout to 5 seconds to ensure there is enough time
594   // for communication with the xDS server to take place upon test start up.
595   const uint32_t kRpcTimeoutMs = 5000;
596   const size_t kNumUnreachableServers = 5;
597   std::vector<EdsResourceArgs::Endpoint> endpoints;
598   for (size_t i = 0; i < kNumUnreachableServers; ++i) {
599     endpoints.emplace_back(MakeNonExistentEndpoint());
600   }
601   EdsResourceArgs args({{"locality0", std::move(endpoints)}});
602   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
603   // The error shouldn't be DEADLINE_EXCEEDED because timeout is set to 5
604   // seconds, and we should discover in that time that the target backend is
605   // down.
606   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
607                       MakeConnectionFailureRegex(
608                           "connections to all backends failing; last error: "),
609                       RpcOptions().set_timeout_ms(kRpcTimeoutMs));
610 }
611 
612 // Tests that RPCs fail when the backends are down, and will succeed again
613 // after the backends are restarted.
TEST_P(EdsTest,BackendsRestart)614 TEST_P(EdsTest, BackendsRestart) {
615   CreateAndStartBackends(3);
616   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
617   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
618   WaitForAllBackends(DEBUG_LOCATION);
619   // Stop backends.  RPCs should fail.
620   ShutdownAllBackends();
621   // Wait for channel to transition out of READY, so that we know it has
622   // noticed that all of the subchannels have failed.  Note that it may
623   // be reporting either CONNECTING or TRANSIENT_FAILURE at this point.
624   EXPECT_TRUE(channel_->WaitForStateChange(
625       GRPC_CHANNEL_READY, grpc_timeout_seconds_to_deadline(5)));
626   EXPECT_THAT(channel_->GetState(false),
627               ::testing::AnyOf(::testing::Eq(GRPC_CHANNEL_TRANSIENT_FAILURE),
628                                ::testing::Eq(GRPC_CHANNEL_CONNECTING)));
629   // RPCs should fail.
630   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
631                       MakeConnectionFailureRegex(
632                           "connections to all backends failing; last error: "));
633   // Restart all backends.  RPCs should start succeeding again.
634   StartAllBackends();
635   CheckRpcSendOk(DEBUG_LOCATION, 1,
636                  RpcOptions().set_timeout_ms(2000).set_wait_for_ready(true));
637 }
638 
TEST_P(EdsTest,IgnoresDuplicateUpdates)639 TEST_P(EdsTest, IgnoresDuplicateUpdates) {
640   CreateAndStartBackends(1);
641   const size_t kNumRpcsPerAddress = 100;
642   EdsResourceArgs args({
643       {"locality0", CreateEndpointsForBackends()},
644   });
645   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
646   // Wait for all backends to come online.
647   WaitForAllBackends(DEBUG_LOCATION);
648   // Send kNumRpcsPerAddress RPCs per server, but send an EDS update in
649   // between.  If the update is not ignored, this will cause the
650   // round_robin policy to see an update, which will randomly reset its
651   // position in the address list.
652   for (size_t i = 0; i < kNumRpcsPerAddress; ++i) {
653     CheckRpcSendOk(DEBUG_LOCATION, 2);
654     balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
655     CheckRpcSendOk(DEBUG_LOCATION, 2);
656   }
657   // Each backend should have gotten the right number of requests.
658   for (size_t i = 1; i < backends_.size(); ++i) {
659     EXPECT_EQ(kNumRpcsPerAddress,
660               backends_[i]->backend_service()->request_count());
661   }
662 }
663 
664 // Testing just one example of an invalid resource here.
665 // Unit tests for XdsEndpointResourceType have exhaustive tests for all
666 // of the invalid cases.
TEST_P(EdsTest,NacksInvalidResource)667 TEST_P(EdsTest, NacksInvalidResource) {
668   EdsResourceArgs args({
669       {"locality0", {MakeNonExistentEndpoint()}, kDefaultLocalityWeight, 1},
670   });
671   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
672   const auto response_state = WaitForEdsNack(DEBUG_LOCATION);
673   ASSERT_TRUE(response_state.has_value()) << "timed out waiting for NACK";
674   EXPECT_EQ(response_state->error_message,
675             "xDS response validation errors: ["
676             "resource index 0: eds_service_name: "
677             "INVALID_ARGUMENT: errors parsing EDS resource: ["
678             "field:endpoints error:priority 0 empty]]");
679 }
680 
681 // Tests that if the balancer is down, the RPCs will still be sent to the
682 // backends according to the last balancer response, until a new balancer is
683 // reachable.
TEST_P(EdsTest,KeepUsingLastDataIfBalancerGoesDown)684 TEST_P(EdsTest, KeepUsingLastDataIfBalancerGoesDown) {
685   CreateAndStartBackends(2);
686   // Set up EDS resource pointing to backend 0.
687   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
688   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
689   // Start the client and make sure it sees the backend.
690   WaitForBackend(DEBUG_LOCATION, 0);
691   // Stop the balancer, and verify that RPCs continue to flow to backend 0.
692   balancer_->Shutdown();
693   auto deadline = grpc_timeout_seconds_to_deadline(5);
694   do {
695     CheckRpcSendOk(DEBUG_LOCATION);
696   } while (gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
697   // Check the EDS resource to point to backend 1 and bring the balancer
698   // back up.
699   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}});
700   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
701   balancer_->Start();
702   // Wait for client to see backend 1.
703   WaitForBackend(DEBUG_LOCATION, 1);
704 }
705 
706 // Tests that the localities in a locality map are picked according to their
707 // weights.
TEST_P(EdsTest,LocalityWeights)708 TEST_P(EdsTest, LocalityWeights) {
709   CreateAndStartBackends(2);
710   const int kLocalityWeight0 = 2;
711   const int kLocalityWeight1 = 8;
712   const int kTotalLocalityWeight = kLocalityWeight0 + kLocalityWeight1;
713   const double kLocalityWeightRate0 =
714       static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
715   const double kLocalityWeightRate1 =
716       static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
717   const double kErrorTolerance = 0.05;
718   const size_t kNumRpcs =
719       ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
720   // ADS response contains 2 localities, each of which contains 1 backend.
721   EdsResourceArgs args({
722       {"locality0", CreateEndpointsForBackends(0, 1), kLocalityWeight0},
723       {"locality1", CreateEndpointsForBackends(1, 2), kLocalityWeight1},
724   });
725   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
726   // Wait for both backends to be ready.
727   WaitForAllBackends(DEBUG_LOCATION, 0, 2);
728   // Send kNumRpcs RPCs.
729   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
730   // The locality picking rates should be roughly equal to the expectation.
731   const double locality_picked_rate_0 =
732       static_cast<double>(backends_[0]->backend_service()->request_count()) /
733       kNumRpcs;
734   const double locality_picked_rate_1 =
735       static_cast<double>(backends_[1]->backend_service()->request_count()) /
736       kNumRpcs;
737   EXPECT_THAT(locality_picked_rate_0,
738               ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
739   EXPECT_THAT(locality_picked_rate_1,
740               ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
741 }
742 
743 // Tests that we don't suffer from integer overflow in locality weights.
TEST_P(EdsTest,NoIntegerOverflowInLocalityWeights)744 TEST_P(EdsTest, NoIntegerOverflowInLocalityWeights) {
745   CreateAndStartBackends(2);
746   const uint32_t kLocalityWeight1 = std::numeric_limits<uint32_t>::max() / 3;
747   const uint32_t kLocalityWeight0 =
748       std::numeric_limits<uint32_t>::max() - kLocalityWeight1;
749   const uint64_t kTotalLocalityWeight =
750       static_cast<uint64_t>(kLocalityWeight0) +
751       static_cast<uint64_t>(kLocalityWeight1);
752   const double kLocalityWeightRate0 =
753       static_cast<double>(kLocalityWeight0) / kTotalLocalityWeight;
754   const double kLocalityWeightRate1 =
755       static_cast<double>(kLocalityWeight1) / kTotalLocalityWeight;
756   const double kErrorTolerance = 0.05;
757   const size_t kNumRpcs =
758       ComputeIdealNumRpcs(kLocalityWeightRate0, kErrorTolerance);
759   // ADS response contains 2 localities, each of which contains 1 backend.
760   EdsResourceArgs args({
761       {"locality0", CreateEndpointsForBackends(0, 1), kLocalityWeight0},
762       {"locality1", CreateEndpointsForBackends(1, 2), kLocalityWeight1},
763   });
764   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
765   // Wait for both backends to be ready.
766   WaitForAllBackends(DEBUG_LOCATION, 0, 2);
767   // Send kNumRpcs RPCs.
768   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
769   // The locality picking rates should be roughly equal to the expectation.
770   const double locality_picked_rate_0 =
771       static_cast<double>(backends_[0]->backend_service()->request_count()) /
772       kNumRpcs;
773   const double locality_picked_rate_1 =
774       static_cast<double>(backends_[1]->backend_service()->request_count()) /
775       kNumRpcs;
776   EXPECT_THAT(locality_picked_rate_0,
777               ::testing::DoubleNear(kLocalityWeightRate0, kErrorTolerance));
778   EXPECT_THAT(locality_picked_rate_1,
779               ::testing::DoubleNear(kLocalityWeightRate1, kErrorTolerance));
780 }
781 
TEST_P(EdsTest,OneLocalityWithNoEndpoints)782 TEST_P(EdsTest, OneLocalityWithNoEndpoints) {
783   CreateAndStartBackends(1);
784   // Initial EDS resource has one locality with no endpoints.
785   EdsResourceArgs::Locality empty_locality("locality0", {});
786   EdsResourceArgs args({std::move(empty_locality)});
787   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
788   // RPCs should fail.
789   constexpr char kErrorMessage[] =
790       "no children in weighted_target policy: "
791       "EDS resource eds_service_name: contains empty localities: "
792       "\\[\\{region=\"xds_default_locality_region\", "
793       "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]";
794   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, kErrorMessage);
795   // Send EDS resource that has an endpoint.
796   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
797   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
798   // RPCs should eventually succeed.
799   WaitForAllBackends(DEBUG_LOCATION, 0, 1, [&](const RpcResult& result) {
800     if (!result.status.ok()) {
801       EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
802       EXPECT_THAT(result.status.error_message(),
803                   ::testing::MatchesRegex(kErrorMessage));
804     }
805   });
806 }
807 
808 // Tests that we correctly handle a locality containing no endpoints.
TEST_P(EdsTest,LocalityContainingNoEndpoints)809 TEST_P(EdsTest, LocalityContainingNoEndpoints) {
810   CreateAndStartBackends(2);
811   const size_t kNumRpcs = 5000;
812   // EDS response contains 2 localities, one with no endpoints.
813   EdsResourceArgs args({
814       {"locality0", CreateEndpointsForBackends()},
815       {"locality1", {}},
816   });
817   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
818   // Wait for both backends to be ready.
819   WaitForAllBackends(DEBUG_LOCATION);
820   // Send kNumRpcs RPCs.
821   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
822   // All traffic should go to the reachable locality.
823   EXPECT_EQ(backends_[0]->backend_service()->request_count(),
824             kNumRpcs / backends_.size());
825   EXPECT_EQ(backends_[1]->backend_service()->request_count(),
826             kNumRpcs / backends_.size());
827 }
828 
829 // Tests that the locality map can work properly even when it contains a large
830 // number of localities.
TEST_P(EdsTest,ManyLocalitiesStressTest)831 TEST_P(EdsTest, ManyLocalitiesStressTest) {
832   const size_t kNumLocalities = 50;
833   CreateAndStartBackends(kNumLocalities + 1);
834   const uint32_t kRpcTimeoutMs = 5000;
835   // The first ADS response contains kNumLocalities localities, each of which
836   // contains its own backend.
837   EdsResourceArgs args;
838   for (size_t i = 0; i < kNumLocalities; ++i) {
839     std::string name = absl::StrCat("locality", i);
840     EdsResourceArgs::Locality locality(name,
841                                        CreateEndpointsForBackends(i, i + 1));
842     args.locality_list.emplace_back(std::move(locality));
843   }
844   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
845   // Wait until all backends are ready.
846   WaitForAllBackends(DEBUG_LOCATION, 0, kNumLocalities,
847                      /*check_status=*/nullptr,
848                      WaitForBackendOptions().set_reset_counters(false),
849                      RpcOptions().set_timeout_ms(kRpcTimeoutMs));
850   // The second ADS response contains 1 locality, which contains backend 50.
851   args =
852       EdsResourceArgs({{"locality0", CreateEndpointsForBackends(
853                                          kNumLocalities, kNumLocalities + 1)}});
854   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
855   // Wait until backend 50 is ready.
856   WaitForBackend(DEBUG_LOCATION, kNumLocalities);
857 }
858 
859 // Tests that the localities in a locality map are picked correctly after
860 // update (addition, modification, deletion).
TEST_P(EdsTest,LocalityMapUpdateChurn)861 TEST_P(EdsTest, LocalityMapUpdateChurn) {
862   CreateAndStartBackends(4);
863   const size_t kNumRpcs = 3000;
864   // The locality weight for the first 3 localities.
865   const std::vector<int> kLocalityWeights0 = {2, 3, 4};
866   const double kTotalLocalityWeight0 =
867       std::accumulate(kLocalityWeights0.begin(), kLocalityWeights0.end(), 0);
868   std::vector<double> locality_weight_rate_0;
869   locality_weight_rate_0.reserve(kLocalityWeights0.size());
870   for (int weight : kLocalityWeights0) {
871     locality_weight_rate_0.push_back(weight / kTotalLocalityWeight0);
872   }
873   // Delete the first locality, keep the second locality, change the third
874   // locality's weight from 4 to 2, and add a new locality with weight 6.
875   const std::vector<int> kLocalityWeights1 = {3, 2, 6};
876   const double kTotalLocalityWeight1 =
877       std::accumulate(kLocalityWeights1.begin(), kLocalityWeights1.end(), 0);
878   std::vector<double> locality_weight_rate_1 = {
879       0 /* placeholder for locality 0 */};
880   for (int weight : kLocalityWeights1) {
881     locality_weight_rate_1.push_back(weight / kTotalLocalityWeight1);
882   }
883   EdsResourceArgs args({
884       {"locality0", CreateEndpointsForBackends(0, 1), 2},
885       {"locality1", CreateEndpointsForBackends(1, 2), 3},
886       {"locality2", CreateEndpointsForBackends(2, 3), 4},
887   });
888   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
889   // Wait for the first 3 backends to be ready.
890   WaitForAllBackends(DEBUG_LOCATION, 0, 3);
891   LOG(INFO) << "========= BEFORE FIRST BATCH ==========";
892   // Send kNumRpcs RPCs.
893   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
894   LOG(INFO) << "========= DONE WITH FIRST BATCH ==========";
895   // The picking rates of the first 3 backends should be roughly equal to the
896   // expectation.
897   std::vector<double> locality_picked_rates;
898   for (size_t i = 0; i < 3; ++i) {
899     locality_picked_rates.push_back(
900         static_cast<double>(backends_[i]->backend_service()->request_count()) /
901         kNumRpcs);
902   }
903   const double kErrorTolerance = 0.2;
904   for (size_t i = 0; i < 3; ++i) {
905     LOG(INFO) << "Locality " << i << " rate " << locality_picked_rates[i];
906     EXPECT_THAT(
907         locality_picked_rates[i],
908         ::testing::AllOf(
909             ::testing::Ge(locality_weight_rate_0[i] * (1 - kErrorTolerance)),
910             ::testing::Le(locality_weight_rate_0[i] * (1 + kErrorTolerance))));
911   }
912   args = EdsResourceArgs({
913       {"locality1", CreateEndpointsForBackends(1, 2), 3},
914       {"locality2", CreateEndpointsForBackends(2, 3), 2},
915       {"locality3", CreateEndpointsForBackends(3, 4), 6},
916   });
917   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
918   // Backend 3 hasn't received any request.
919   EXPECT_EQ(0U, backends_[3]->backend_service()->request_count());
920   // Wait until the locality update has been processed, as signaled by backend
921   // 3 receiving a request.
922   WaitForAllBackends(DEBUG_LOCATION, 3, 4);
923   LOG(INFO) << "========= BEFORE SECOND BATCH ==========";
924   // Send kNumRpcs RPCs.
925   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
926   LOG(INFO) << "========= DONE WITH SECOND BATCH ==========";
927   // Backend 0 no longer receives any request.
928   EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
929   // The picking rates of the last 3 backends should be roughly equal to the
930   // expectation.
931   locality_picked_rates = {0 /* placeholder for backend 0 */};
932   for (size_t i = 1; i < 4; ++i) {
933     locality_picked_rates.push_back(
934         static_cast<double>(backends_[i]->backend_service()->request_count()) /
935         kNumRpcs);
936   }
937   for (size_t i = 1; i < 4; ++i) {
938     LOG(INFO) << "Locality " << i << " rate " << locality_picked_rates[i];
939     EXPECT_THAT(
940         locality_picked_rates[i],
941         ::testing::AllOf(
942             ::testing::Ge(locality_weight_rate_1[i] * (1 - kErrorTolerance)),
943             ::testing::Le(locality_weight_rate_1[i] * (1 + kErrorTolerance))));
944   }
945 }
946 
947 // Tests that we don't fail RPCs when replacing all of the localities in
948 // a given priority.
TEST_P(EdsTest,ReplaceAllLocalitiesInPriority)949 TEST_P(EdsTest, ReplaceAllLocalitiesInPriority) {
950   CreateAndStartBackends(2);
951   // Initial EDS update has backend 0.
952   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
953   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
954   // Wait for the first backend to be ready.
955   WaitForBackend(DEBUG_LOCATION, 0);
956   // Send EDS update that replaces the locality and switches to backend 1.
957   args = EdsResourceArgs({{"locality1", CreateEndpointsForBackends(1, 2)}});
958   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
959   // When the client sees the update, RPCs should start going to backend 1.
960   // No RPCs should fail during this change.
961   WaitForBackend(DEBUG_LOCATION, 1);
962 }
963 
TEST_P(EdsTest,ConsistentWeightedTargetUpdates)964 TEST_P(EdsTest, ConsistentWeightedTargetUpdates) {
965   CreateAndStartBackends(4);
966   // Initial update has two localities.
967   EdsResourceArgs args({
968       {"locality0", CreateEndpointsForBackends(1, 2)},
969       {"locality1", CreateEndpointsForBackends(2, 3)},
970   });
971   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
972   WaitForAllBackends(DEBUG_LOCATION, 1, 3);
973   // Next update removes locality1.
974   // Also add backend 0 to locality0, so that we can tell when the
975   // update has been seen.
976   args = EdsResourceArgs({
977       {"locality0", CreateEndpointsForBackends(0, 2)},
978   });
979   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
980   WaitForBackend(DEBUG_LOCATION, 0);
981   // Next update re-adds locality1.
982   // Also add backend 3 to locality1, so that we can tell when the
983   // update has been seen.
984   args = EdsResourceArgs({
985       {"locality0", CreateEndpointsForBackends(0, 2)},
986       {"locality1", CreateEndpointsForBackends(2, 4)},
987   });
988   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
989   WaitForBackend(DEBUG_LOCATION, 3);
990 }
991 
992 // Tests that RPCs are dropped according to the drop config.
TEST_P(EdsTest,Drops)993 TEST_P(EdsTest, Drops) {
994   CreateAndStartBackends(1);
995   const uint32_t kDropPerMillionForLb = 100000;
996   const uint32_t kDropPerMillionForThrottle = 200000;
997   const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
998   const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
999   const double kDropRateForLbAndThrottle =
1000       kDropRateForLb + ((1 - kDropRateForLb) * kDropRateForThrottle);
1001   const double kErrorTolerance = 0.05;
1002   const size_t kNumRpcs =
1003       ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
1004   // The ADS response contains two drop categories.
1005   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1006   args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1007                           {kThrottleDropType, kDropPerMillionForThrottle}};
1008   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1009   // Send kNumRpcs RPCs and count the drops.
1010   size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1011       DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1012       kStatusMessageDropPrefix);
1013   // The drop rate should be roughly equal to the expectation.
1014   const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1015   EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
1016                                                     kErrorTolerance));
1017 }
1018 
1019 // Tests that drop config is converted correctly from per hundred.
TEST_P(EdsTest,DropPerHundred)1020 TEST_P(EdsTest, DropPerHundred) {
1021   CreateAndStartBackends(1);
1022   const uint32_t kDropPerHundredForLb = 10;
1023   const double kDropRateForLb = kDropPerHundredForLb / 100.0;
1024   const double kErrorTolerance = 0.05;
1025   const size_t kNumRpcs = ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
1026   // The ADS response contains one drop category.
1027   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1028   args.drop_categories = {{kLbDropType, kDropPerHundredForLb}};
1029   args.drop_denominator = FractionalPercent::HUNDRED;
1030   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1031   // Send kNumRpcs RPCs and count the drops.
1032   size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1033       DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1034       kStatusMessageDropPrefix);
1035   // The drop rate should be roughly equal to the expectation.
1036   const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1037   EXPECT_THAT(seen_drop_rate,
1038               ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1039 }
1040 
1041 // Tests that drop config is converted correctly from per ten thousand.
TEST_P(EdsTest,DropPerTenThousand)1042 TEST_P(EdsTest, DropPerTenThousand) {
1043   CreateAndStartBackends(1);
1044   const uint32_t kDropPerTenThousandForLb = 1000;
1045   const double kDropRateForLb = kDropPerTenThousandForLb / 10000.0;
1046   const double kErrorTolerance = 0.05;
1047   const size_t kNumRpcs = ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
1048   // The ADS response contains one drop category.
1049   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1050   args.drop_categories = {{kLbDropType, kDropPerTenThousandForLb}};
1051   args.drop_denominator = FractionalPercent::TEN_THOUSAND;
1052   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1053   // Send kNumRpcs RPCs and count the drops.
1054   size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1055       DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1056       kStatusMessageDropPrefix);
1057   // The drop rate should be roughly equal to the expectation.
1058   const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
1059   EXPECT_THAT(seen_drop_rate,
1060               ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1061 }
1062 
1063 // Tests that drop is working correctly after update.
TEST_P(EdsTest,DropConfigUpdate)1064 TEST_P(EdsTest, DropConfigUpdate) {
1065   CreateAndStartBackends(2);
1066   const uint32_t kDropPerMillionForLb = 100000;
1067   const uint32_t kDropPerMillionForThrottle = 200000;
1068   const double kErrorTolerance = 0.05;
1069   const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
1070   const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
1071   const double kDropRateForLbAndThrottle =
1072       kDropRateForLb + ((1 - kDropRateForLb) * kDropRateForThrottle);
1073   const size_t kNumRpcsLbOnly =
1074       ComputeIdealNumRpcs(kDropRateForLb, kErrorTolerance);
1075   const size_t kNumRpcsBoth =
1076       ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
1077   // The first EDS response contains backend 0 and one drop category.
1078   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}});
1079   args.drop_categories = {{kLbDropType, kDropPerMillionForLb}};
1080   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1081   // Send kNumRpcsLbOnly RPCs and count the drops.
1082   LOG(INFO) << "========= BEFORE FIRST BATCH ==========";
1083   size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1084       DEBUG_LOCATION, kNumRpcsLbOnly, StatusCode::UNAVAILABLE,
1085       kStatusMessageDropPrefix);
1086   LOG(INFO) << "========= DONE WITH FIRST BATCH ==========";
1087   // The drop rate should be roughly equal to the expectation.
1088   double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsLbOnly;
1089   LOG(INFO) << "First batch drop rate " << seen_drop_rate;
1090   EXPECT_THAT(seen_drop_rate,
1091               ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
1092   // The second EDS response contains both backends and two drop categories.
1093   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
1094   args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1095                           {kThrottleDropType, kDropPerMillionForThrottle}};
1096   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1097   // Wait until backend 1 sees traffic, so that we know the client has
1098   // seen the update.
1099   WaitForBackend(DEBUG_LOCATION, 1, [&](const RpcResult& result) {
1100     if (!result.status.ok()) {
1101       EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
1102       EXPECT_THAT(result.status.error_message(),
1103                   ::testing::StartsWith(kStatusMessageDropPrefix));
1104     }
1105   });
1106   // Send kNumRpcsBoth RPCs and count the drops.
1107   LOG(INFO) << "========= BEFORE SECOND BATCH ==========";
1108   num_drops = SendRpcsAndCountFailuresWithMessage(DEBUG_LOCATION, kNumRpcsBoth,
1109                                                   StatusCode::UNAVAILABLE,
1110                                                   kStatusMessageDropPrefix);
1111   LOG(INFO) << "========= DONE WITH SECOND BATCH ==========";
1112   // The new drop rate should be roughly equal to the expectation.
1113   seen_drop_rate = static_cast<double>(num_drops) / kNumRpcsBoth;
1114   LOG(INFO) << "Second batch drop rate " << seen_drop_rate;
1115   EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
1116                                                     kErrorTolerance));
1117 }
1118 
1119 // Tests that all the RPCs are dropped if any drop category drops 100%.
TEST_P(EdsTest,DropAll)1120 TEST_P(EdsTest, DropAll) {
1121   const size_t kNumRpcs = 1000;
1122   const uint32_t kDropPerMillionForLb = 100000;
1123   const uint32_t kDropPerMillionForThrottle = 1000000;
1124   // The ADS response contains two drop categories.
1125   EdsResourceArgs args;
1126   args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
1127                           {kThrottleDropType, kDropPerMillionForThrottle}};
1128   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1129   // Send kNumRpcs RPCs and all of them are dropped.
1130   size_t num_drops = SendRpcsAndCountFailuresWithMessage(
1131       DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
1132       kStatusMessageDropPrefix);
1133   EXPECT_EQ(num_drops, kNumRpcs);
1134 }
1135 
1136 class EdsAuthorityRewriteTest : public XdsEnd2endTest {
1137  protected:
SetUp()1138   void SetUp() override {}  // Individual tests call InitClient().
1139 };
1140 
1141 INSTANTIATE_TEST_SUITE_P(XdsTest, EdsAuthorityRewriteTest,
1142                          ::testing::Values(XdsTestType()), &XdsTestType::Name);
1143 
TEST_P(EdsAuthorityRewriteTest,AutoAuthorityRewrite)1144 TEST_P(EdsAuthorityRewriteTest, AutoAuthorityRewrite) {
1145   grpc_core::testing::ScopedExperimentalEnvVar env(
1146       "GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE");
1147   constexpr char kAltAuthority1[] = "alt_authority1";
1148   constexpr char kAltAuthority2[] = "alt_authority2";
1149   // Note: We use InsecureCreds, since FakeCreds are too picky about
1150   // what authority gets sent.
1151   InitClient(MakeBootstrapBuilder().SetTrustedXdsServer(),
1152              /*lb_expected_authority=*/"",
1153              /*xds_resource_does_not_exist_timeout_ms=*/0,
1154              /*balancer_authority_override=*/"", /*args=*/nullptr,
1155              InsecureChannelCredentials());
1156   // Set auto_host_rewrite in the RouteConfig.
1157   RouteConfiguration new_route_config = default_route_config_;
1158   new_route_config.mutable_virtual_hosts(0)
1159       ->mutable_routes(0)
1160       ->mutable_route()
1161       ->mutable_auto_host_rewrite()
1162       ->set_value(true);
1163   SetRouteConfiguration(balancer_.get(), new_route_config);
1164   // Create 3 backends.  Backend 0 does not have a hostname, but 1 and 2 do.
1165   CreateAndStartBackends(3, /*xds_enabled=*/false, InsecureServerCredentials());
1166   EdsResourceArgs args(
1167       {{"locality0",
1168         {
1169             CreateEndpoint(0),
1170             CreateEndpoint(1, ::envoy::config::core::v3::HealthStatus::UNKNOWN,
1171                            /*lb_weight=*/1, /*additional_backend_indexes=*/{},
1172                            /*hostname=*/kAltAuthority1),
1173             CreateEndpoint(2, ::envoy::config::core::v3::HealthStatus::UNKNOWN,
1174                            /*lb_weight=*/1, /*additional_backend_indexes=*/{},
1175                            /*hostname=*/kAltAuthority2),
1176         }}});
1177   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1178   WaitForAllBackends(DEBUG_LOCATION);
1179   // Send one RPC for each backend, check the authority headers seen on
1180   // the servers, and make sure we see the expected ones.
1181   std::set<std::string> authorities_seen;
1182   for (size_t i = 0; i < backends_.size(); ++i) {
1183     EchoResponse response;
1184     Status status = SendRpc(
1185         RpcOptions().set_echo_host_from_authority_header(true), &response);
1186     EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1187                              << " message=" << status.error_message();
1188     authorities_seen.insert(response.param().host());
1189   }
1190   EXPECT_THAT(
1191       authorities_seen,
1192       ::testing::ElementsAre(kAltAuthority1, kAltAuthority2, kServerName));
1193 }
1194 
TEST_P(EdsAuthorityRewriteTest,NoRewriteWithoutEnvVar)1195 TEST_P(EdsAuthorityRewriteTest, NoRewriteWithoutEnvVar) {
1196   constexpr char kAltAuthority[] = "alt_authority";
1197   InitClient(MakeBootstrapBuilder().SetTrustedXdsServer());
1198   // Set auto_host_rewrite in the RouteConfig.
1199   RouteConfiguration new_route_config = default_route_config_;
1200   new_route_config.mutable_virtual_hosts(0)
1201       ->mutable_routes(0)
1202       ->mutable_route()
1203       ->mutable_auto_host_rewrite()
1204       ->set_value(true);
1205   SetRouteConfiguration(balancer_.get(), new_route_config);
1206   // Create a backend with a hostname in EDS.
1207   CreateAndStartBackends(1);
1208   EdsResourceArgs args(
1209       {{"locality0",
1210         {CreateEndpoint(0, ::envoy::config::core::v3::HealthStatus::UNKNOWN,
1211                         /*lb_weight=*/1, /*additional_backend_indexes=*/{},
1212                         /*hostname=*/kAltAuthority)}}});
1213   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1214   // Send an RPC and check the authority seen on the server side.
1215   EchoResponse response;
1216   Status status = SendRpc(
1217       RpcOptions().set_echo_host_from_authority_header(true), &response);
1218   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1219                            << " message=" << status.error_message();
1220   EXPECT_EQ(response.param().host(), kServerName);
1221 }
1222 
TEST_P(EdsAuthorityRewriteTest,NoRewriteIfServerNotTrustedInBootstrap)1223 TEST_P(EdsAuthorityRewriteTest, NoRewriteIfServerNotTrustedInBootstrap) {
1224   grpc_core::testing::ScopedExperimentalEnvVar env(
1225       "GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE");
1226   constexpr char kAltAuthority[] = "alt_authority";
1227   InitClient();
1228   // Set auto_host_rewrite in the RouteConfig.
1229   RouteConfiguration new_route_config = default_route_config_;
1230   new_route_config.mutable_virtual_hosts(0)
1231       ->mutable_routes(0)
1232       ->mutable_route()
1233       ->mutable_auto_host_rewrite()
1234       ->set_value(true);
1235   SetRouteConfiguration(balancer_.get(), new_route_config);
1236   // Create a backend with a hostname in EDS.
1237   CreateAndStartBackends(1);
1238   EdsResourceArgs args(
1239       {{"locality0",
1240         {CreateEndpoint(0, ::envoy::config::core::v3::HealthStatus::UNKNOWN,
1241                         /*lb_weight=*/1, /*additional_backend_indexes=*/{},
1242                         /*hostname=*/kAltAuthority)}}});
1243   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1244   // Send an RPC and check the authority seen on the server side.
1245   EchoResponse response;
1246   Status status = SendRpc(
1247       RpcOptions().set_echo_host_from_authority_header(true), &response);
1248   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1249                            << " message=" << status.error_message();
1250   EXPECT_EQ(response.param().host(), kServerName);
1251 }
1252 
TEST_P(EdsAuthorityRewriteTest,NoRewriteIfNoHostnameInEds)1253 TEST_P(EdsAuthorityRewriteTest, NoRewriteIfNoHostnameInEds) {
1254   grpc_core::testing::ScopedExperimentalEnvVar env(
1255       "GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE");
1256   InitClient(MakeBootstrapBuilder().SetTrustedXdsServer());
1257   // Set auto_host_rewrite in the RouteConfig.
1258   RouteConfiguration new_route_config = default_route_config_;
1259   new_route_config.mutable_virtual_hosts(0)
1260       ->mutable_routes(0)
1261       ->mutable_route()
1262       ->mutable_auto_host_rewrite()
1263       ->set_value(true);
1264   SetRouteConfiguration(balancer_.get(), new_route_config);
1265   // Create a backend with no hostname in EDS.
1266   CreateAndStartBackends(1);
1267   EdsResourceArgs args({{"locality0", {CreateEndpoint(0)}}});
1268   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1269   // Send an RPC and check the authority seen on the server side.
1270   EchoResponse response;
1271   Status status = SendRpc(
1272       RpcOptions().set_echo_host_from_authority_header(true), &response);
1273   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1274                            << " message=" << status.error_message();
1275   EXPECT_EQ(response.param().host(), kServerName);
1276 }
1277 
TEST_P(EdsAuthorityRewriteTest,NoRewriteIfNotEnabledInRoute)1278 TEST_P(EdsAuthorityRewriteTest, NoRewriteIfNotEnabledInRoute) {
1279   grpc_core::testing::ScopedExperimentalEnvVar env(
1280       "GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE");
1281   constexpr char kAltAuthority[] = "alt_authority";
1282   InitClient(MakeBootstrapBuilder().SetTrustedXdsServer());
1283   // Create a backend with a hostname in EDS.
1284   CreateAndStartBackends(1);
1285   EdsResourceArgs args(
1286       {{"locality0",
1287         {CreateEndpoint(0, ::envoy::config::core::v3::HealthStatus::UNKNOWN,
1288                         /*lb_weight=*/1, /*additional_backend_indexes=*/{},
1289                         /*hostname=*/kAltAuthority)}}});
1290   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1291   // Send an RPC and check the authority seen on the server side.
1292   EchoResponse response;
1293   Status status = SendRpc(
1294       RpcOptions().set_echo_host_from_authority_header(true), &response);
1295   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1296                            << " message=" << status.error_message();
1297   EXPECT_EQ(response.param().host(), kServerName);
1298 }
1299 
1300 //
1301 // EDS failover tests
1302 //
1303 
1304 class FailoverTest : public XdsEnd2endTest {
1305  public:
SetUp()1306   void SetUp() override {
1307     XdsEnd2endTest::SetUp();
1308     ResetStub(/*failover_timeout_ms=*/500);
1309   }
1310 };
1311 
1312 INSTANTIATE_TEST_SUITE_P(
1313     XdsTest, FailoverTest,
1314     ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
1315     &XdsTestType::Name);
1316 
1317 // Localities with the highest priority are used when multiple priority exist.
TEST_P(FailoverTest,ChooseHighestPriority)1318 TEST_P(FailoverTest, ChooseHighestPriority) {
1319   CreateAndStartBackends(4);
1320   EdsResourceArgs args({
1321       {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1322        1},
1323       {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1324        2},
1325       {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1326        3},
1327       {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1328        0},
1329   });
1330   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1331   WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1332                  WaitForBackendOptions().set_reset_counters(false));
1333   for (size_t i = 0; i < 3; ++i) {
1334     EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1335   }
1336 }
1337 
1338 // Does not choose priority with no endpoints.
TEST_P(FailoverTest,DoesNotUsePriorityWithNoEndpoints)1339 TEST_P(FailoverTest, DoesNotUsePriorityWithNoEndpoints) {
1340   CreateAndStartBackends(3);
1341   EdsResourceArgs args({
1342       {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1343        1},
1344       {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1345        2},
1346       {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1347        3},
1348       {"locality3", {}, kDefaultLocalityWeight, 0},
1349   });
1350   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1351   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1352                  WaitForBackendOptions().set_reset_counters(false));
1353   for (size_t i = 1; i < 3; ++i) {
1354     EXPECT_EQ(0U, backends_[i]->backend_service()->request_count());
1355   }
1356 }
1357 
1358 // Does not choose locality with no endpoints.
TEST_P(FailoverTest,DoesNotUseLocalityWithNoEndpoints)1359 TEST_P(FailoverTest, DoesNotUseLocalityWithNoEndpoints) {
1360   CreateAndStartBackends(1);
1361   EdsResourceArgs args({
1362       {"locality0", {}, kDefaultLocalityWeight, 0},
1363       {"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 0},
1364   });
1365   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1366   // Wait for all backends to be used.
1367   WaitForAllBackends(DEBUG_LOCATION);
1368 }
1369 
1370 // If the higher priority localities are not reachable, failover to the
1371 // highest priority among the rest.
TEST_P(FailoverTest,Failover)1372 TEST_P(FailoverTest, Failover) {
1373   CreateAndStartBackends(2);
1374   EdsResourceArgs args({
1375       {"locality0", {MakeNonExistentEndpoint()}, kDefaultLocalityWeight, 1},
1376       {"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1377        2},
1378       {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1379        3},
1380       {"locality3", {MakeNonExistentEndpoint()}, kDefaultLocalityWeight, 0},
1381   });
1382   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1383   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1384                  WaitForBackendOptions().set_reset_counters(false));
1385   EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
1386 }
1387 
1388 // Reports CONNECTING when failing over to a lower priority.
TEST_P(FailoverTest,ReportsConnectingDuringFailover)1389 TEST_P(FailoverTest, ReportsConnectingDuringFailover) {
1390   CreateAndStartBackends(1);
1391   // Priority 0 will be unreachable, so we'll use priority 1.
1392   EdsResourceArgs args({
1393       {"locality0", {MakeNonExistentEndpoint()}, kDefaultLocalityWeight, 0},
1394       {"locality1", CreateEndpointsForBackends(), kDefaultLocalityWeight, 1},
1395   });
1396   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1397   ConnectionAttemptInjector injector;
1398   auto hold = injector.AddHold(backends_[0]->port());
1399   // Start an RPC in the background, which should cause the channel to
1400   // try to connect.
1401   LongRunningRpc rpc;
1402   rpc.StartRpc(stub_.get(), RpcOptions());
1403   // Wait for connection attempt to start to the backend.
1404   hold->Wait();
1405   // Channel state should be CONNECTING here, and any RPC should be
1406   // queued.
1407   EXPECT_EQ(channel_->GetState(false), GRPC_CHANNEL_CONNECTING);
1408   // Allow the connection attempt to complete.
1409   hold->Resume();
1410   // Now the RPC should complete successfully.
1411   LOG(INFO) << "=== WAITING FOR RPC TO FINISH ===";
1412   Status status = rpc.GetStatus();
1413   LOG(INFO) << "=== RPC FINISHED ===";
1414   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
1415                            << " message=" << status.error_message();
1416 }
1417 
1418 // If a locality with higher priority than the current one becomes ready,
1419 // switch to it.
TEST_P(FailoverTest,SwitchBackToHigherPriority)1420 TEST_P(FailoverTest, SwitchBackToHigherPriority) {
1421   CreateAndStartBackends(4);
1422   const size_t kNumRpcs = 100;
1423   EdsResourceArgs args({
1424       {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1425        1},
1426       {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1427        2},
1428       {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1429        3},
1430       {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1431        0},
1432   });
1433   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1434   WaitForBackend(DEBUG_LOCATION, 3);
1435   backends_[3]->StopListeningAndSendGoaways();
1436   backends_[0]->StopListeningAndSendGoaways();
1437   WaitForBackend(DEBUG_LOCATION, 1);
1438   ShutdownBackend(0);
1439   StartBackend(0);
1440   WaitForBackend(DEBUG_LOCATION, 0);
1441   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
1442   EXPECT_EQ(kNumRpcs, backends_[0]->backend_service()->request_count());
1443 }
1444 
1445 // The first update only contains unavailable priorities. The second update
1446 // contains available priorities.
TEST_P(FailoverTest,UpdateInitialUnavailable)1447 TEST_P(FailoverTest, UpdateInitialUnavailable) {
1448   CreateAndStartBackends(2);
1449   EdsResourceArgs args({
1450       {"locality0", {MakeNonExistentEndpoint()}, kDefaultLocalityWeight, 0},
1451       {"locality1", {MakeNonExistentEndpoint()}, kDefaultLocalityWeight, 1},
1452   });
1453   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1454   CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1455                       MakeConnectionFailureRegex(
1456                           "connections to all backends failing; last error: "));
1457   args = EdsResourceArgs({
1458       {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1459        0},
1460       {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1461        1},
1462   });
1463   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1464   WaitForBackend(DEBUG_LOCATION, 0, [&](const RpcResult& result) {
1465     if (!result.status.ok()) {
1466       EXPECT_EQ(result.status.error_code(), StatusCode::UNAVAILABLE);
1467       EXPECT_THAT(result.status.error_message(),
1468                   ::testing::MatchesRegex(MakeConnectionFailureRegex(
1469                       "connections to all backends failing; last error: ")));
1470     }
1471   });
1472 }
1473 
1474 // Tests that after the localities' priorities are updated, we still choose
1475 // the highest READY priority with the updated localities.
TEST_P(FailoverTest,UpdatePriority)1476 TEST_P(FailoverTest, UpdatePriority) {
1477   CreateAndStartBackends(4);
1478   const size_t kNumRpcs = 100;
1479   EdsResourceArgs args({
1480       {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1481        1},
1482       {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1483        2},
1484       {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1485        3},
1486       {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1487        0},
1488   });
1489   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1490   WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1491                  WaitForBackendOptions().set_reset_counters(false));
1492   EXPECT_EQ(0U, backends_[0]->backend_service()->request_count());
1493   EXPECT_EQ(0U, backends_[1]->backend_service()->request_count());
1494   EXPECT_EQ(0U, backends_[2]->backend_service()->request_count());
1495   args = EdsResourceArgs({
1496       {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1497        2},
1498       {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1499        0},
1500       {"locality2", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1501        1},
1502       {"locality3", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1503        3},
1504   });
1505   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1506   WaitForBackend(DEBUG_LOCATION, 1);
1507   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
1508   EXPECT_EQ(kNumRpcs, backends_[1]->backend_service()->request_count());
1509 }
1510 
1511 // Moves all localities in the current priority to a higher priority.
TEST_P(FailoverTest,MoveAllLocalitiesInCurrentPriorityToHigherPriority)1512 TEST_P(FailoverTest, MoveAllLocalitiesInCurrentPriorityToHigherPriority) {
1513   CreateAndStartBackends(3);
1514   auto non_existent_endpoint = MakeNonExistentEndpoint();
1515   // First update:
1516   // - Priority 0 is locality 0, containing an unreachable backend.
1517   // - Priority 1 is locality 1, containing backends 0 and 1.
1518   EdsResourceArgs args({
1519       {"locality0", {non_existent_endpoint}, kDefaultLocalityWeight, 0},
1520       {"locality1", CreateEndpointsForBackends(0, 2), kDefaultLocalityWeight,
1521        1},
1522   });
1523   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1524   // When we get the first update, all backends in priority 0 are down,
1525   // so we will create priority 1.  Backends 0 and 1 should have traffic,
1526   // but backend 2 should not.
1527   WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
1528                      WaitForBackendOptions().set_reset_counters(false));
1529   EXPECT_EQ(0UL, backends_[2]->backend_service()->request_count());
1530   // Second update:
1531   // - Priority 0 contains both localities 0 and 1.
1532   // - Priority 1 is not present.
1533   // - We add backend 2 to locality 1, just so we have a way to know
1534   //   when the update has been seen by the client.
1535   args = EdsResourceArgs({
1536       {"locality0", {non_existent_endpoint}, kDefaultLocalityWeight, 0},
1537       {"locality1", CreateEndpointsForBackends(0, 3), kDefaultLocalityWeight,
1538        0},
1539   });
1540   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1541   // When backend 2 gets traffic, we know the second update has been seen.
1542   WaitForBackend(DEBUG_LOCATION, 2);
1543 }
1544 
1545 // This tests a bug triggered by the xds_cluster_resolver policy reusing
1546 // a child name for the priority policy when that child name was still
1547 // present but deactivated.
TEST_P(FailoverTest,PriorityChildNameChurn)1548 TEST_P(FailoverTest, PriorityChildNameChurn) {
1549   CreateAndStartBackends(4);
1550   auto non_existent_endpoint = MakeNonExistentEndpoint();
1551   // Initial update:
1552   // - P0:locality0, child number 0 (unreachable)
1553   // - P1:locality1, child number 1
1554   // - P2:locality2, child number 2
1555   EdsResourceArgs args({
1556       {"locality0", {non_existent_endpoint}, kDefaultLocalityWeight, 0},
1557       {"locality1", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1558        1},
1559       {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1560        2},
1561   });
1562   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1563   WaitForBackend(DEBUG_LOCATION, 0);
1564   // Next update:
1565   // - P0:locality0, child number 0 (still unreachable)
1566   // - P1:locality2, child number 2 (moved from P2 to P1)
1567   // - P2:locality3, child number 3 (new child)
1568   // Child number 1 will be deactivated.
1569   args = EdsResourceArgs({
1570       {"locality0", {non_existent_endpoint}, kDefaultLocalityWeight, 0},
1571       {"locality2", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1572        1},
1573       {"locality3", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1574        2},
1575   });
1576   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1577   WaitForBackend(DEBUG_LOCATION, 1);
1578   // Next update:
1579   // - P0:locality0, child number 0 (still unreachable)
1580   // - P1:locality4, child number 4 (new child number -- should not reuse #1)
1581   // - P2:locality3, child number 3
1582   // Child number 1 will be deactivated.
1583   args = EdsResourceArgs({
1584       {"locality0", {non_existent_endpoint}, kDefaultLocalityWeight, 0},
1585       {"locality4", CreateEndpointsForBackends(3, 4), kDefaultLocalityWeight,
1586        1},
1587       {"locality3", CreateEndpointsForBackends(2, 3), kDefaultLocalityWeight,
1588        2},
1589   });
1590   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1591   WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
1592                  WaitForBackendOptions().set_reset_counters(false));
1593   // P2 should not have gotten any traffic in this change.
1594   EXPECT_EQ(0UL, backends_[2]->backend_service()->request_count());
1595 }
1596 
1597 //
1598 // EDS client load reporting tests
1599 //
1600 
1601 using ClientLoadReportingTest = XdsEnd2endTest;
1602 
1603 INSTANTIATE_TEST_SUITE_P(
1604     XdsTest, ClientLoadReportingTest,
1605     ::testing::Values(XdsTestType().set_enable_load_reporting()),
1606     &XdsTestType::Name);
1607 
1608 MATCHER_P2(LoadMetricEq, num_requests_finished_with_metric, total_metric_value,
1609            "equals LoadMetric") {
1610   bool match = true;
1611   match &= ::testing::ExplainMatchResult(num_requests_finished_with_metric,
1612                                          arg.num_requests_finished_with_metric,
1613                                          result_listener);
1614   match &=
1615       ::testing::ExplainMatchResult(::testing::DoubleEq(total_metric_value),
1616                                     arg.total_metric_value, result_listener);
1617   return match;
1618 }
1619 
1620 // Tests that the load report received at the balancer is correct.
TEST_P(ClientLoadReportingTest,Vanilla)1621 TEST_P(ClientLoadReportingTest, Vanilla) {
1622   CreateAndStartBackends(4);
1623   const size_t kNumRpcsPerAddress = 10;
1624   const size_t kNumFailuresPerAddress = 3;
1625   EdsResourceArgs args({
1626       {"locality0", CreateEndpointsForBackends(0, 2)},
1627       {"locality1", CreateEndpointsForBackends(2, 4)},
1628   });
1629   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1630   // Wait until all backends are ready.
1631   size_t num_warmup_rpcs =
1632       WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
1633                          WaitForBackendOptions().set_reset_counters(false));
1634   // Send kNumRpcsPerAddress RPCs per server with named metrics.
1635   xds::data::orca::v3::OrcaLoadReport backend_metrics;
1636   auto& named_metrics = (*backend_metrics.mutable_named_metrics());
1637   named_metrics["foo"] = 1.0;
1638   named_metrics["bar"] = 2.0;
1639   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
1640                  RpcOptions().set_backend_metrics(backend_metrics));
1641   named_metrics["foo"] = 0.3;
1642   named_metrics["bar"] = 0.4;
1643   for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
1644     CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
1645                         RpcOptions().set_server_fail(true).set_backend_metrics(
1646                             backend_metrics));
1647   }
1648   const size_t total_successful_rpcs_sent =
1649       (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
1650   const size_t total_failed_rpcs_sent =
1651       kNumFailuresPerAddress * backends_.size();
1652   // Check that the backends got the right number of requests.
1653   size_t total_rpcs_sent = 0;
1654   for (const auto& backend : backends_) {
1655     total_rpcs_sent += backend->backend_service()->request_count();
1656   }
1657   EXPECT_EQ(total_rpcs_sent,
1658             total_successful_rpcs_sent + total_failed_rpcs_sent);
1659   // The load report received at the balancer should be correct.
1660   std::vector<ClientStats> load_report =
1661       balancer_->lrs_service()->WaitForLoadReport();
1662   ASSERT_EQ(load_report.size(), 1UL);
1663   ClientStats& client_stats = load_report.front();
1664   EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
1665   EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
1666   EXPECT_EQ(total_successful_rpcs_sent,
1667             client_stats.total_successful_requests());
1668   EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1669   EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
1670   EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
1671   EXPECT_EQ(0U, client_stats.total_dropped_requests());
1672   ASSERT_THAT(
1673       client_stats.locality_stats(),
1674       ::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
1675                              ::testing::Pair("locality1", ::testing::_)));
1676   size_t num_successful_rpcs = 0;
1677   size_t num_failed_rpcs = 0;
1678   std::map<std::string, ClientStats::LocalityStats::LoadMetric>
1679       named_metrics_total;
1680   for (const auto& p : client_stats.locality_stats()) {
1681     EXPECT_EQ(p.second.total_requests_in_progress, 0U);
1682     EXPECT_EQ(
1683         p.second.total_issued_requests,
1684         p.second.total_successful_requests + p.second.total_error_requests);
1685     num_successful_rpcs += p.second.total_successful_requests;
1686     num_failed_rpcs += p.second.total_error_requests;
1687     for (const auto& s : p.second.load_metrics) {
1688       named_metrics_total[s.first] += s.second;
1689     }
1690   }
1691   EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
1692   EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
1693   EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
1694   EXPECT_THAT(
1695       named_metrics_total,
1696       ::testing::UnorderedElementsAre(
1697           ::testing::Pair(
1698               "foo",
1699               LoadMetricEq(
1700                   (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1701                       backends_.size(),
1702                   (kNumRpcsPerAddress * backends_.size()) * 1.0 +
1703                       (kNumFailuresPerAddress * backends_.size()) * 0.3)),
1704           ::testing::Pair(
1705               "bar",
1706               LoadMetricEq(
1707                   (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1708                       backends_.size(),
1709                   (kNumRpcsPerAddress * backends_.size()) * 2.0 +
1710                       (kNumFailuresPerAddress * backends_.size()) * 0.4))));
1711   // The LRS service got a single request, and sent a single response.
1712   EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1713   EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1714 }
1715 
1716 // Tests ORCA to LRS propagation.
TEST_P(ClientLoadReportingTest,OrcaPropagation)1717 TEST_P(ClientLoadReportingTest, OrcaPropagation) {
1718   grpc_core::testing::ScopedExperimentalEnvVar env(
1719       "GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
1720   CreateAndStartBackends(4);
1721   const size_t kNumRpcsPerAddress = 10;
1722   const size_t kNumFailuresPerAddress = 3;
1723   Cluster cluster = default_cluster_;
1724   cluster.add_lrs_report_endpoint_metrics("named_metrics.foo");
1725   cluster.add_lrs_report_endpoint_metrics("cpu_utilization");
1726   cluster.add_lrs_report_endpoint_metrics("mem_utilization");
1727   cluster.add_lrs_report_endpoint_metrics("application_utilization");
1728   cluster.add_lrs_report_endpoint_metrics("unknown_field");
1729   balancer_->ads_service()->SetCdsResource(cluster);
1730   EdsResourceArgs args({
1731       {"locality0", CreateEndpointsForBackends(0, 2)},
1732       {"locality1", CreateEndpointsForBackends(2, 4)},
1733   });
1734   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1735   // Wait until all backends are ready.
1736   size_t num_warmup_rpcs =
1737       WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
1738                          WaitForBackendOptions().set_reset_counters(false));
1739   // Send kNumRpcsPerAddress RPCs per server with named metrics.
1740   xds::data::orca::v3::OrcaLoadReport backend_metrics;
1741   backend_metrics.set_cpu_utilization(0.8);
1742   backend_metrics.set_mem_utilization(0.6);
1743   backend_metrics.set_application_utilization(0.4);
1744   auto& named_metrics = (*backend_metrics.mutable_named_metrics());
1745   named_metrics["foo"] = 1.0;
1746   named_metrics["bar"] = 2.0;  // Not propagated.
1747   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
1748                  RpcOptions().set_backend_metrics(backend_metrics));
1749   backend_metrics.set_cpu_utilization(0.4);
1750   backend_metrics.set_mem_utilization(0.3);
1751   backend_metrics.set_application_utilization(0.2);
1752   named_metrics["foo"] = 0.3;
1753   named_metrics["bar"] = 0.4;  // Not propagated.
1754   for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
1755     CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
1756                         RpcOptions().set_server_fail(true).set_backend_metrics(
1757                             backend_metrics));
1758   }
1759   const size_t total_successful_rpcs_sent =
1760       (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
1761   const size_t total_failed_rpcs_sent =
1762       kNumFailuresPerAddress * backends_.size();
1763   // Check that the backends got the right number of requests.
1764   size_t total_rpcs_sent = 0;
1765   for (const auto& backend : backends_) {
1766     total_rpcs_sent += backend->backend_service()->request_count();
1767   }
1768   EXPECT_EQ(total_rpcs_sent,
1769             total_successful_rpcs_sent + total_failed_rpcs_sent);
1770   // The load report received at the balancer should be correct.
1771   std::vector<ClientStats> load_report =
1772       balancer_->lrs_service()->WaitForLoadReport();
1773   ASSERT_EQ(load_report.size(), 1UL);
1774   ClientStats& client_stats = load_report.front();
1775   EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
1776   EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
1777   EXPECT_EQ(total_successful_rpcs_sent,
1778             client_stats.total_successful_requests());
1779   EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1780   EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
1781   EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
1782   EXPECT_EQ(0U, client_stats.total_dropped_requests());
1783   ASSERT_THAT(
1784       client_stats.locality_stats(),
1785       ::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
1786                              ::testing::Pair("locality1", ::testing::_)));
1787   size_t num_successful_rpcs = 0;
1788   size_t num_failed_rpcs = 0;
1789   ClientStats::LocalityStats::LoadMetric cpu_utilization;
1790   ClientStats::LocalityStats::LoadMetric mem_utilization;
1791   ClientStats::LocalityStats::LoadMetric application_utilization;
1792   std::map<std::string, ClientStats::LocalityStats::LoadMetric>
1793       named_metrics_total;
1794   for (const auto& p : client_stats.locality_stats()) {
1795     EXPECT_EQ(p.second.total_requests_in_progress, 0U);
1796     EXPECT_EQ(
1797         p.second.total_issued_requests,
1798         p.second.total_successful_requests + p.second.total_error_requests);
1799     num_successful_rpcs += p.second.total_successful_requests;
1800     num_failed_rpcs += p.second.total_error_requests;
1801     cpu_utilization += p.second.cpu_utilization;
1802     mem_utilization += p.second.mem_utilization;
1803     application_utilization += p.second.application_utilization;
1804     for (const auto& s : p.second.load_metrics) {
1805       named_metrics_total[s.first] += s.second;
1806     }
1807   }
1808   EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
1809   EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
1810   EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
1811   EXPECT_THAT(
1812       cpu_utilization,
1813       LoadMetricEq(
1814           (kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size(),
1815           (kNumRpcsPerAddress * backends_.size()) * 0.8 +
1816               (kNumFailuresPerAddress * backends_.size()) * 0.4));
1817   EXPECT_THAT(
1818       mem_utilization,
1819       LoadMetricEq(
1820           (kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size(),
1821           (kNumRpcsPerAddress * backends_.size()) * 0.6 +
1822               (kNumFailuresPerAddress * backends_.size()) * 0.3));
1823   EXPECT_THAT(
1824       application_utilization,
1825       LoadMetricEq(
1826           (kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size(),
1827           (kNumRpcsPerAddress * backends_.size()) * 0.4 +
1828               (kNumFailuresPerAddress * backends_.size()) * 0.2));
1829   EXPECT_THAT(
1830       named_metrics_total,
1831       ::testing::UnorderedElementsAre(::testing::Pair(
1832           "named_metrics.foo",
1833           LoadMetricEq(
1834               (kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size(),
1835               (kNumRpcsPerAddress * backends_.size()) * 1.0 +
1836                   (kNumFailuresPerAddress * backends_.size()) * 0.3))));
1837   // The LRS service got a single request, and sent a single response.
1838   EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1839   EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1840 }
1841 
TEST_P(ClientLoadReportingTest,OrcaPropagationNamedMetricsAll)1842 TEST_P(ClientLoadReportingTest, OrcaPropagationNamedMetricsAll) {
1843   grpc_core::testing::ScopedExperimentalEnvVar env(
1844       "GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
1845   CreateAndStartBackends(4);
1846   const size_t kNumRpcsPerAddress = 10;
1847   const size_t kNumFailuresPerAddress = 3;
1848   Cluster cluster = default_cluster_;
1849   cluster.add_lrs_report_endpoint_metrics("named_metrics.*");
1850   balancer_->ads_service()->SetCdsResource(cluster);
1851   EdsResourceArgs args({
1852       {"locality0", CreateEndpointsForBackends(0, 2)},
1853       {"locality1", CreateEndpointsForBackends(2, 4)},
1854   });
1855   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1856   // Wait until all backends are ready.
1857   size_t num_warmup_rpcs =
1858       WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
1859                          WaitForBackendOptions().set_reset_counters(false));
1860   // Send kNumRpcsPerAddress RPCs per server with named metrics.
1861   xds::data::orca::v3::OrcaLoadReport backend_metrics;
1862   backend_metrics.set_cpu_utilization(0.8);
1863   backend_metrics.set_mem_utilization(0.6);
1864   backend_metrics.set_application_utilization(0.4);
1865   auto& named_metrics = (*backend_metrics.mutable_named_metrics());
1866   named_metrics["foo"] = 1.0;
1867   named_metrics["bar"] = 2.0;
1868   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
1869                  RpcOptions().set_backend_metrics(backend_metrics));
1870   backend_metrics.set_cpu_utilization(0.4);
1871   backend_metrics.set_mem_utilization(0.3);
1872   backend_metrics.set_application_utilization(0.2);
1873   named_metrics["foo"] = 0.3;
1874   named_metrics["bar"] = 0.4;
1875   for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
1876     CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
1877                         RpcOptions().set_server_fail(true).set_backend_metrics(
1878                             backend_metrics));
1879   }
1880   const size_t total_successful_rpcs_sent =
1881       (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
1882   const size_t total_failed_rpcs_sent =
1883       kNumFailuresPerAddress * backends_.size();
1884   // Check that the backends got the right number of requests.
1885   size_t total_rpcs_sent = 0;
1886   for (const auto& backend : backends_) {
1887     total_rpcs_sent += backend->backend_service()->request_count();
1888   }
1889   EXPECT_EQ(total_rpcs_sent,
1890             total_successful_rpcs_sent + total_failed_rpcs_sent);
1891   // The load report received at the balancer should be correct.
1892   std::vector<ClientStats> load_report =
1893       balancer_->lrs_service()->WaitForLoadReport();
1894   ASSERT_EQ(load_report.size(), 1UL);
1895   ClientStats& client_stats = load_report.front();
1896   EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
1897   EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
1898   EXPECT_EQ(total_successful_rpcs_sent,
1899             client_stats.total_successful_requests());
1900   EXPECT_EQ(0U, client_stats.total_requests_in_progress());
1901   EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
1902   EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
1903   EXPECT_EQ(0U, client_stats.total_dropped_requests());
1904   ASSERT_THAT(
1905       client_stats.locality_stats(),
1906       ::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
1907                              ::testing::Pair("locality1", ::testing::_)));
1908   size_t num_successful_rpcs = 0;
1909   size_t num_failed_rpcs = 0;
1910   std::map<std::string, ClientStats::LocalityStats::LoadMetric>
1911       named_metrics_total;
1912   for (const auto& p : client_stats.locality_stats()) {
1913     EXPECT_EQ(p.second.total_requests_in_progress, 0U);
1914     EXPECT_EQ(
1915         p.second.total_issued_requests,
1916         p.second.total_successful_requests + p.second.total_error_requests);
1917     num_successful_rpcs += p.second.total_successful_requests;
1918     num_failed_rpcs += p.second.total_error_requests;
1919     for (const auto& s : p.second.load_metrics) {
1920       named_metrics_total[s.first] += s.second;
1921     }
1922   }
1923   EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
1924   EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
1925   EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
1926   EXPECT_THAT(
1927       named_metrics_total,
1928       ::testing::UnorderedElementsAre(
1929           ::testing::Pair(
1930               "named_metrics.foo",
1931               LoadMetricEq(
1932                   (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1933                       backends_.size(),
1934                   (kNumRpcsPerAddress * backends_.size()) * 1.0 +
1935                       (kNumFailuresPerAddress * backends_.size()) * 0.3)),
1936           ::testing::Pair(
1937               "named_metrics.bar",
1938               LoadMetricEq(
1939                   (kNumRpcsPerAddress + kNumFailuresPerAddress) *
1940                       backends_.size(),
1941                   (kNumRpcsPerAddress * backends_.size()) * 2.0 +
1942                       (kNumFailuresPerAddress * backends_.size()) * 0.4))));
1943   // The LRS service got a single request, and sent a single response.
1944   EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
1945   EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
1946 }
1947 
TEST_P(ClientLoadReportingTest,OrcaPropagationNotConfigured)1948 TEST_P(ClientLoadReportingTest, OrcaPropagationNotConfigured) {
1949   grpc_core::testing::ScopedExperimentalEnvVar env(
1950       "GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
1951   CreateAndStartBackends(4);
1952   const size_t kNumRpcsPerAddress = 10;
1953   const size_t kNumFailuresPerAddress = 3;
1954   EdsResourceArgs args({
1955       {"locality0", CreateEndpointsForBackends(0, 2)},
1956       {"locality1", CreateEndpointsForBackends(2, 4)},
1957   });
1958   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1959   // Wait until all backends are ready.
1960   size_t num_warmup_rpcs =
1961       WaitForAllBackends(DEBUG_LOCATION, 0, 4, /*check_status=*/nullptr,
1962                          WaitForBackendOptions().set_reset_counters(false));
1963   // Send kNumRpcsPerAddress RPCs per server with named metrics.
1964   xds::data::orca::v3::OrcaLoadReport backend_metrics;
1965   backend_metrics.set_cpu_utilization(0.8);
1966   backend_metrics.set_mem_utilization(0.6);
1967   backend_metrics.set_application_utilization(0.4);
1968   auto& named_metrics = (*backend_metrics.mutable_named_metrics());
1969   named_metrics["foo"] = 1.0;
1970   named_metrics["bar"] = 2.0;
1971   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
1972                  RpcOptions().set_backend_metrics(backend_metrics));
1973   backend_metrics.set_cpu_utilization(0.4);
1974   backend_metrics.set_mem_utilization(0.3);
1975   backend_metrics.set_application_utilization(0.2);
1976   named_metrics["foo"] = 0.3;
1977   named_metrics["bar"] = 0.4;
1978   for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
1979     CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
1980                         RpcOptions().set_server_fail(true).set_backend_metrics(
1981                             backend_metrics));
1982   }
1983   const size_t total_successful_rpcs_sent =
1984       (kNumRpcsPerAddress * backends_.size()) + num_warmup_rpcs;
1985   const size_t total_failed_rpcs_sent =
1986       kNumFailuresPerAddress * backends_.size();
1987   // Check that the backends got the right number of requests.
1988   size_t total_rpcs_sent = 0;
1989   for (const auto& backend : backends_) {
1990     total_rpcs_sent += backend->backend_service()->request_count();
1991   }
1992   EXPECT_EQ(total_rpcs_sent,
1993             total_successful_rpcs_sent + total_failed_rpcs_sent);
1994   // The load report received at the balancer should be correct.
1995   std::vector<ClientStats> load_report =
1996       balancer_->lrs_service()->WaitForLoadReport();
1997   ASSERT_EQ(load_report.size(), 1UL);
1998   ClientStats& client_stats = load_report.front();
1999   EXPECT_EQ(client_stats.cluster_name(), kDefaultClusterName);
2000   EXPECT_EQ(client_stats.eds_service_name(), kDefaultEdsServiceName);
2001   EXPECT_EQ(total_successful_rpcs_sent,
2002             client_stats.total_successful_requests());
2003   EXPECT_EQ(0U, client_stats.total_requests_in_progress());
2004   EXPECT_EQ(total_rpcs_sent, client_stats.total_issued_requests());
2005   EXPECT_EQ(total_failed_rpcs_sent, client_stats.total_error_requests());
2006   EXPECT_EQ(0U, client_stats.total_dropped_requests());
2007   ASSERT_THAT(
2008       client_stats.locality_stats(),
2009       ::testing::ElementsAre(::testing::Pair("locality0", ::testing::_),
2010                              ::testing::Pair("locality1", ::testing::_)));
2011   size_t num_successful_rpcs = 0;
2012   size_t num_failed_rpcs = 0;
2013   std::map<std::string, ClientStats::LocalityStats::LoadMetric>
2014       named_metrics_total;
2015   for (const auto& p : client_stats.locality_stats()) {
2016     EXPECT_EQ(p.second.total_requests_in_progress, 0U);
2017     EXPECT_EQ(
2018         p.second.total_issued_requests,
2019         p.second.total_successful_requests + p.second.total_error_requests);
2020     num_successful_rpcs += p.second.total_successful_requests;
2021     num_failed_rpcs += p.second.total_error_requests;
2022     for (const auto& s : p.second.load_metrics) {
2023       named_metrics_total[s.first] += s.second;
2024     }
2025   }
2026   EXPECT_EQ(num_successful_rpcs, total_successful_rpcs_sent);
2027   EXPECT_EQ(num_failed_rpcs, total_failed_rpcs_sent);
2028   EXPECT_EQ(num_successful_rpcs + num_failed_rpcs, total_rpcs_sent);
2029   EXPECT_THAT(named_metrics_total, ::testing::UnorderedElementsAre());
2030   // The LRS service got a single request, and sent a single response.
2031   EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
2032   EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
2033 }
2034 
2035 // Tests send_all_clusters.
TEST_P(ClientLoadReportingTest,SendAllClusters)2036 TEST_P(ClientLoadReportingTest, SendAllClusters) {
2037   CreateAndStartBackends(2);
2038   balancer_->lrs_service()->set_send_all_clusters(true);
2039   const size_t kNumRpcsPerAddress = 10;
2040   const size_t kNumFailuresPerAddress = 3;
2041   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
2042   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
2043   // Wait until all backends are ready.
2044   size_t num_warmup_rpcs = WaitForAllBackends(DEBUG_LOCATION);
2045   // Send kNumRpcsPerAddress RPCs per server.
2046   xds::data::orca::v3::OrcaLoadReport backend_metrics;
2047   auto& named_metrics = (*backend_metrics.mutable_named_metrics());
2048   named_metrics["foo"] = 1.0;
2049   named_metrics["bar"] = 2.0;
2050   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcsPerAddress * backends_.size(),
2051                  RpcOptions().set_backend_metrics(backend_metrics));
2052   named_metrics["foo"] = 0.3;
2053   named_metrics["bar"] = 0.4;
2054   for (size_t i = 0; i < kNumFailuresPerAddress * backends_.size(); ++i) {
2055     CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::FAILED_PRECONDITION, "",
2056                         RpcOptions().set_server_fail(true).set_backend_metrics(
2057                             backend_metrics));
2058   }
2059   // Check that each backend got the right number of requests.
2060   for (size_t i = 0; i < backends_.size(); ++i) {
2061     EXPECT_EQ(kNumRpcsPerAddress + kNumFailuresPerAddress,
2062               backends_[i]->backend_service()->request_count());
2063   }
2064   // The load report received at the balancer should be correct.
2065   std::vector<ClientStats> load_report =
2066       balancer_->lrs_service()->WaitForLoadReport();
2067   ASSERT_EQ(load_report.size(), 1UL);
2068   ClientStats& client_stats = load_report.front();
2069   EXPECT_EQ(kNumRpcsPerAddress * backends_.size() + num_warmup_rpcs,
2070             client_stats.total_successful_requests());
2071   EXPECT_EQ(0U, client_stats.total_requests_in_progress());
2072   EXPECT_EQ((kNumRpcsPerAddress + kNumFailuresPerAddress) * backends_.size() +
2073                 num_warmup_rpcs,
2074             client_stats.total_issued_requests());
2075   EXPECT_EQ(kNumFailuresPerAddress * backends_.size(),
2076             client_stats.total_error_requests());
2077   EXPECT_EQ(0U, client_stats.total_dropped_requests());
2078   EXPECT_THAT(
2079       client_stats.locality_stats(),
2080       ::testing::ElementsAre(::testing::Pair(
2081           "locality0",
2082           ::testing::Field(
2083               &ClientStats::LocalityStats::load_metrics,
2084               ::testing::UnorderedElementsAre(
2085                   ::testing::Pair(
2086                       "foo",
2087                       LoadMetricEq(
2088                           (kNumRpcsPerAddress + kNumFailuresPerAddress) *
2089                               backends_.size(),
2090                           (kNumRpcsPerAddress * backends_.size()) * 1.0 +
2091                               (kNumFailuresPerAddress * backends_.size()) *
2092                                   0.3)),
2093                   ::testing::Pair(
2094                       "bar",
2095                       LoadMetricEq(
2096                           (kNumRpcsPerAddress + kNumFailuresPerAddress) *
2097                               backends_.size(),
2098                           (kNumRpcsPerAddress * backends_.size()) * 2.0 +
2099                               (kNumFailuresPerAddress * backends_.size()) *
2100                                   0.4)))))));
2101   // The LRS service got a single request, and sent a single response.
2102   EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
2103   EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
2104 }
2105 
2106 // Tests that we don't include stats for clusters that are not requested
2107 // by the LRS server.
TEST_P(ClientLoadReportingTest,HonorsClustersRequestedByLrsServer)2108 TEST_P(ClientLoadReportingTest, HonorsClustersRequestedByLrsServer) {
2109   CreateAndStartBackends(1);
2110   balancer_->lrs_service()->set_cluster_names({"bogus"});
2111   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
2112   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
2113   // Wait until all backends are ready.
2114   WaitForAllBackends(DEBUG_LOCATION);
2115   // The load report received at the balancer should be correct.
2116   std::vector<ClientStats> load_report =
2117       balancer_->lrs_service()->WaitForLoadReport();
2118   ASSERT_EQ(load_report.size(), 0UL);
2119   // The LRS service got a single request, and sent a single response.
2120   EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
2121   EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
2122 }
2123 
2124 // Tests that if the balancer restarts, the client load report contains the
2125 // stats before and after the restart correctly.
TEST_P(ClientLoadReportingTest,BalancerRestart)2126 TEST_P(ClientLoadReportingTest, BalancerRestart) {
2127   CreateAndStartBackends(4);
2128   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 2)}});
2129   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
2130   // Wait until all backends returned by the balancer are ready.
2131   size_t num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
2132   std::vector<ClientStats> load_report =
2133       balancer_->lrs_service()->WaitForLoadReport();
2134   ASSERT_EQ(load_report.size(), 1UL);
2135   ClientStats client_stats = std::move(load_report.front());
2136   EXPECT_EQ(num_rpcs, client_stats.total_successful_requests());
2137   EXPECT_EQ(0U, client_stats.total_requests_in_progress());
2138   EXPECT_EQ(0U, client_stats.total_error_requests());
2139   EXPECT_EQ(0U, client_stats.total_dropped_requests());
2140   EXPECT_THAT(client_stats.locality_stats(),
2141               ::testing::ElementsAre(::testing::Pair(
2142                   "locality0",
2143                   ::testing::Field(&ClientStats::LocalityStats::load_metrics,
2144                                    ::testing::IsEmpty()))));
2145   // Shut down the balancer.
2146   balancer_->Shutdown();
2147   // We should continue using the last EDS response we received from the
2148   // balancer before it was shut down.
2149   // Note: We need to use WaitForAllBackends() here instead of just
2150   // CheckRpcSendOk(kNumBackendsFirstPass), because when the balancer
2151   // shuts down, the XdsClient will generate an error to the
2152   // ListenerWatcher, which will cause the xds resolver to send a
2153   // no-op update to the LB policy.  When this update gets down to the
2154   // round_robin child policy for the locality, it will generate a new
2155   // subchannel list, which resets the start index randomly.  So we need
2156   // to be a little more permissive here to avoid spurious failures.
2157   ResetBackendCounters();
2158   num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
2159   // Now restart the balancer, this time pointing to the new backends.
2160   balancer_->Start();
2161   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(2, 4)}});
2162   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
2163   // Wait for queries to start going to one of the new backends.
2164   // This tells us that we're now using the new serverlist.
2165   num_rpcs += WaitForAllBackends(DEBUG_LOCATION, 2, 4);
2166   // Send one RPC per backend.
2167   xds::data::orca::v3::OrcaLoadReport backend_metrics;
2168   auto& named_metrics = (*backend_metrics.mutable_named_metrics());
2169   named_metrics["foo"] = 1.0;
2170   named_metrics["bar"] = 2.0;
2171   CheckRpcSendOk(DEBUG_LOCATION, 2,
2172                  RpcOptions().set_backend_metrics(backend_metrics));
2173   num_rpcs += 2;
2174   // Check client stats.
2175   load_report = balancer_->lrs_service()->WaitForLoadReport();
2176   ASSERT_EQ(load_report.size(), 1UL);
2177   client_stats = std::move(load_report.front());
2178   EXPECT_EQ(num_rpcs, client_stats.total_successful_requests());
2179   EXPECT_EQ(0U, client_stats.total_requests_in_progress());
2180   EXPECT_EQ(0U, client_stats.total_error_requests());
2181   EXPECT_EQ(0U, client_stats.total_dropped_requests());
2182   EXPECT_THAT(client_stats.locality_stats(),
2183               ::testing::ElementsAre(::testing::Pair(
2184                   "locality0",
2185                   ::testing::Field(
2186                       &ClientStats::LocalityStats::load_metrics,
2187                       ::testing::UnorderedElementsAre(
2188                           ::testing::Pair("foo", LoadMetricEq(2, 2.0)),
2189                           ::testing::Pair("bar", LoadMetricEq(2, 4.0)))))));
2190 }
2191 
2192 // Tests load reporting when switching over from one cluster to another.
TEST_P(ClientLoadReportingTest,ChangeClusters)2193 TEST_P(ClientLoadReportingTest, ChangeClusters) {
2194   CreateAndStartBackends(4);
2195   const char* kNewClusterName = "new_cluster_name";
2196   const char* kNewEdsServiceName = "new_eds_service_name";
2197   balancer_->lrs_service()->set_cluster_names(
2198       {kDefaultClusterName, kNewClusterName});
2199   // cluster kDefaultClusterName -> locality0 -> backends 0 and 1
2200   EdsResourceArgs args({
2201       {"locality0", CreateEndpointsForBackends(0, 2)},
2202   });
2203   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
2204   // cluster kNewClusterName -> locality1 -> backends 2 and 3
2205   EdsResourceArgs args2({
2206       {"locality1", CreateEndpointsForBackends(2, 4)},
2207   });
2208   balancer_->ads_service()->SetEdsResource(
2209       BuildEdsResource(args2, kNewEdsServiceName));
2210   // CDS resource for kNewClusterName.
2211   Cluster new_cluster = default_cluster_;
2212   new_cluster.set_name(kNewClusterName);
2213   new_cluster.mutable_eds_cluster_config()->set_service_name(
2214       kNewEdsServiceName);
2215   balancer_->ads_service()->SetCdsResource(new_cluster);
2216   // Wait for all backends to come online.
2217   size_t num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 0, 2);
2218   // The load report received at the balancer should be correct.
2219   std::vector<ClientStats> load_report =
2220       balancer_->lrs_service()->WaitForLoadReport();
2221   EXPECT_THAT(
2222       load_report,
2223       ::testing::ElementsAre(::testing::AllOf(
2224           ::testing::Property(&ClientStats::cluster_name, kDefaultClusterName),
2225           ::testing::Property(&ClientStats::eds_service_name,
2226                               kDefaultEdsServiceName),
2227           ::testing::Property(
2228               &ClientStats::locality_stats,
2229               ::testing::ElementsAre(::testing::Pair(
2230                   "locality0",
2231                   ::testing::AllOf(
2232                       ::testing::Field(&ClientStats::LocalityStats::
2233                                            total_successful_requests,
2234                                        num_rpcs),
2235                       ::testing::Field(&ClientStats::LocalityStats::
2236                                            total_requests_in_progress,
2237                                        0UL),
2238                       ::testing::Field(
2239                           &ClientStats::LocalityStats::total_error_requests,
2240                           0UL),
2241                       ::testing::Field(
2242                           &ClientStats::LocalityStats::total_issued_requests,
2243                           num_rpcs),
2244                       ::testing::Field(
2245                           &ClientStats::LocalityStats::load_metrics,
2246                           ::testing::IsEmpty()))))),
2247           ::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
2248   // Change RDS resource to point to new cluster.
2249   RouteConfiguration new_route_config = default_route_config_;
2250   new_route_config.mutable_virtual_hosts(0)
2251       ->mutable_routes(0)
2252       ->mutable_route()
2253       ->set_cluster(kNewClusterName);
2254   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
2255                                    new_route_config);
2256   // Wait for all new backends to be used.
2257   num_rpcs = WaitForAllBackends(DEBUG_LOCATION, 2, 4);
2258   // The load report received at the balancer should be correct.
2259   load_report = balancer_->lrs_service()->WaitForLoadReport();
2260   EXPECT_THAT(
2261       load_report,
2262       ::testing::ElementsAre(
2263           ::testing::AllOf(
2264               ::testing::Property(&ClientStats::cluster_name,
2265                                   kDefaultClusterName),
2266               ::testing::Property(&ClientStats::eds_service_name,
2267                                   kDefaultEdsServiceName),
2268               ::testing::Property(
2269                   &ClientStats::locality_stats,
2270                   ::testing::ElementsAre(::testing::Pair(
2271                       "locality0",
2272                       ::testing::AllOf(
2273                           ::testing::Field(&ClientStats::LocalityStats::
2274                                                total_successful_requests,
2275                                            ::testing::Lt(num_rpcs)),
2276                           ::testing::Field(&ClientStats::LocalityStats::
2277                                                total_requests_in_progress,
2278                                            0UL),
2279                           ::testing::Field(
2280                               &ClientStats::LocalityStats::total_error_requests,
2281                               0UL),
2282                           ::testing::Field(&ClientStats::LocalityStats::
2283                                                total_issued_requests,
2284                                            ::testing::Le(num_rpcs)),
2285                           ::testing::Field(
2286                               &ClientStats::LocalityStats::load_metrics,
2287                               ::testing::IsEmpty()))))),
2288               ::testing::Property(&ClientStats::total_dropped_requests, 0UL)),
2289           ::testing::AllOf(
2290               ::testing::Property(&ClientStats::cluster_name, kNewClusterName),
2291               ::testing::Property(&ClientStats::eds_service_name,
2292                                   kNewEdsServiceName),
2293               ::testing::Property(
2294                   &ClientStats::locality_stats,
2295                   ::testing::ElementsAre(::testing::Pair(
2296                       "locality1",
2297                       ::testing::AllOf(
2298                           ::testing::Field(&ClientStats::LocalityStats::
2299                                                total_successful_requests,
2300                                            ::testing::Le(num_rpcs)),
2301                           ::testing::Field(&ClientStats::LocalityStats::
2302                                                total_requests_in_progress,
2303                                            0UL),
2304                           ::testing::Field(
2305                               &ClientStats::LocalityStats::total_error_requests,
2306                               0UL),
2307                           ::testing::Field(&ClientStats::LocalityStats::
2308                                                total_issued_requests,
2309                                            ::testing::Le(num_rpcs)),
2310                           ::testing::Field(
2311                               &ClientStats::LocalityStats::load_metrics,
2312                               ::testing::IsEmpty()))))),
2313               ::testing::Property(&ClientStats::total_dropped_requests, 0UL))));
2314   size_t total_ok = 0;
2315   for (const ClientStats& client_stats : load_report) {
2316     total_ok += client_stats.total_successful_requests();
2317   }
2318   EXPECT_EQ(total_ok, num_rpcs);
2319   // The LRS service got a single request, and sent a single response.
2320   EXPECT_EQ(1U, balancer_->lrs_service()->request_count());
2321   EXPECT_EQ(1U, balancer_->lrs_service()->response_count());
2322 }
2323 
2324 // Tests that the drop stats are correctly reported by client load reporting.
TEST_P(ClientLoadReportingTest,DropStats)2325 TEST_P(ClientLoadReportingTest, DropStats) {
2326   CreateAndStartBackends(1);
2327   const uint32_t kDropPerMillionForLb = 100000;
2328   const uint32_t kDropPerMillionForThrottle = 200000;
2329   const double kErrorTolerance = 0.05;
2330   const double kDropRateForLb = kDropPerMillionForLb / 1000000.0;
2331   const double kDropRateForThrottle = kDropPerMillionForThrottle / 1000000.0;
2332   const double kDropRateForLbAndThrottle =
2333       kDropRateForLb + ((1 - kDropRateForLb) * kDropRateForThrottle);
2334   const size_t kNumRpcs =
2335       ComputeIdealNumRpcs(kDropRateForLbAndThrottle, kErrorTolerance);
2336   // The ADS response contains two drop categories.
2337   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
2338   args.drop_categories = {{kLbDropType, kDropPerMillionForLb},
2339                           {kThrottleDropType, kDropPerMillionForThrottle}};
2340   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
2341   // Send kNumRpcs RPCs and count the drops.
2342   size_t num_drops = SendRpcsAndCountFailuresWithMessage(
2343       DEBUG_LOCATION, kNumRpcs, StatusCode::UNAVAILABLE,
2344       kStatusMessageDropPrefix);
2345   // The drop rate should be roughly equal to the expectation.
2346   const double seen_drop_rate = static_cast<double>(num_drops) / kNumRpcs;
2347   EXPECT_THAT(seen_drop_rate, ::testing::DoubleNear(kDropRateForLbAndThrottle,
2348                                                     kErrorTolerance));
2349   // Check client stats.
2350   ClientStats client_stats;
2351   do {
2352     std::vector<ClientStats> load_reports =
2353         balancer_->lrs_service()->WaitForLoadReport();
2354     for (const auto& load_report : load_reports) {
2355       client_stats += load_report;
2356     }
2357   } while (client_stats.total_issued_requests() +
2358                client_stats.total_dropped_requests() <
2359            kNumRpcs);
2360   EXPECT_EQ(num_drops, client_stats.total_dropped_requests());
2361   EXPECT_THAT(static_cast<double>(client_stats.dropped_requests(kLbDropType)) /
2362                   kNumRpcs,
2363               ::testing::DoubleNear(kDropRateForLb, kErrorTolerance));
2364   EXPECT_THAT(
2365       static_cast<double>(client_stats.dropped_requests(kThrottleDropType)) /
2366           (kNumRpcs * (1 - kDropRateForLb)),
2367       ::testing::DoubleNear(kDropRateForThrottle, kErrorTolerance));
2368 }
2369 
2370 }  // namespace
2371 }  // namespace testing
2372 }  // namespace grpc
2373 
main(int argc,char ** argv)2374 int main(int argc, char** argv) {
2375   grpc::testing::TestEnvironment env(&argc, argv);
2376   ::testing::InitGoogleTest(&argc, argv);
2377   // Make the backup poller poll very frequently in order to pick up
2378   // updates from all the subchannels's FDs.
2379   grpc_core::ConfigVars::Overrides overrides;
2380   overrides.client_channel_backup_poll_interval_ms = 1;
2381   grpc_core::ConfigVars::SetOverrides(overrides);
2382 #if TARGET_OS_IPHONE
2383   // Workaround Apple CFStream bug
2384   grpc_core::SetEnv("grpc_cfstream", "0");
2385 #endif
2386   grpc_core::RegisterFakeStatsPlugin();
2387   grpc_init();
2388   grpc::testing::ConnectionAttemptInjector::Init();
2389   const auto result = RUN_ALL_TESTS();
2390   grpc_shutdown();
2391   return result;
2392 }
2393