• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15 
16 #include <gmock/gmock.h>
17 #include <grpc/event_engine/endpoint_config.h>
18 #include <gtest/gtest.h>
19 
20 #include <string>
21 #include <vector>
22 
23 #include "absl/log/check.h"
24 #include "absl/log/log.h"
25 #include "absl/strings/str_cat.h"
26 #include "absl/strings/str_format.h"
27 #include "envoy/config/cluster/v3/cluster.pb.h"
28 #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h"
29 #include "src/core/client_channel/backup_poller.h"
30 #include "src/core/config/config_vars.h"
31 #include "src/core/lib/address_utils/sockaddr_utils.h"
32 #include "src/core/load_balancing/xds/xds_channel_args.h"
33 #include "src/core/resolver/fake/fake_resolver.h"
34 #include "src/core/util/env.h"
35 #include "test/core/test_util/resolve_localhost_ip46.h"
36 #include "test/core/test_util/scoped_env_var.h"
37 #include "test/cpp/end2end/connection_attempt_injector.h"
38 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
39 
40 namespace grpc {
41 namespace testing {
42 namespace {
43 
44 using ::envoy::config::core::v3::HealthStatus;
45 using ::envoy::extensions::clusters::aggregate::v3::ClusterConfig;
46 
47 class RingHashTest : public XdsEnd2endTest {
48  protected:
SetUp()49   void SetUp() override {
50     logical_dns_cluster_resolver_response_generator_ =
51         grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
52     InitClient();
53     SetUpChannel();
54   }
55 
SetUpChannel(ChannelArguments * args=nullptr)56   void SetUpChannel(ChannelArguments* args = nullptr) {
57     ChannelArguments local_args;
58     if (args == nullptr) args = &local_args;
59     args->SetPointerWithVtable(
60         GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR,
61         logical_dns_cluster_resolver_response_generator_.get(),
62         &grpc_core::FakeResolverResponseGenerator::kChannelArgPointerVtable);
63     ResetStub(/*failover_timeout_ms=*/0, args);
64   }
65 
CreateAddressListFromPortList(const std::vector<int> & ports)66   grpc_core::EndpointAddressesList CreateAddressListFromPortList(
67       const std::vector<int>& ports) {
68     grpc_core::EndpointAddressesList addresses;
69     for (int port : ports) {
70       absl::StatusOr<grpc_core::URI> lb_uri =
71           grpc_core::URI::Parse(grpc_core::LocalIpUri(port));
72       CHECK_OK(lb_uri);
73       grpc_resolved_address address;
74       CHECK(grpc_parse_uri(*lb_uri, &address));
75       addresses.emplace_back(address, grpc_core::ChannelArgs());
76     }
77     return addresses;
78   }
79 
CreateMetadataValueThatHashesToBackendPort(int port)80   std::string CreateMetadataValueThatHashesToBackendPort(int port) {
81     return absl::StrCat(grpc_core::LocalIp(), ":", port, "_0");
82   }
83 
CreateMetadataValueThatHashesToBackend(int index)84   std::string CreateMetadataValueThatHashesToBackend(int index) {
85     return CreateMetadataValueThatHashesToBackendPort(backends_[index]->port());
86   }
87 
88   grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
89       logical_dns_cluster_resolver_response_generator_;
90 };
91 
92 // Run both with and without load reporting, just for test coverage.
93 INSTANTIATE_TEST_SUITE_P(
94     XdsTest, RingHashTest,
95     ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
96     &XdsTestType::Name);
97 
TEST_P(RingHashTest,AggregateClusterFallBackFromRingHashAtStartup)98 TEST_P(RingHashTest, AggregateClusterFallBackFromRingHashAtStartup) {
99   CreateAndStartBackends(2);
100   const char* kNewCluster1Name = "new_cluster_1";
101   const char* kNewEdsService1Name = "new_eds_service_name_1";
102   const char* kNewCluster2Name = "new_cluster_2";
103   const char* kNewEdsService2Name = "new_eds_service_name_2";
104   // Populate new EDS resources.
105   EdsResourceArgs args1({
106       {"locality0", {MakeNonExistentEndpoint(), MakeNonExistentEndpoint()}},
107   });
108   EdsResourceArgs args2({
109       {"locality0", CreateEndpointsForBackends()},
110   });
111   balancer_->ads_service()->SetEdsResource(
112       BuildEdsResource(args1, kNewEdsService1Name));
113   balancer_->ads_service()->SetEdsResource(
114       BuildEdsResource(args2, kNewEdsService2Name));
115   // Populate new CDS resources.
116   Cluster new_cluster1 = default_cluster_;
117   new_cluster1.set_name(kNewCluster1Name);
118   new_cluster1.mutable_eds_cluster_config()->set_service_name(
119       kNewEdsService1Name);
120   new_cluster1.set_lb_policy(Cluster::RING_HASH);
121   balancer_->ads_service()->SetCdsResource(new_cluster1);
122   Cluster new_cluster2 = default_cluster_;
123   new_cluster2.set_name(kNewCluster2Name);
124   new_cluster2.mutable_eds_cluster_config()->set_service_name(
125       kNewEdsService2Name);
126   new_cluster2.set_lb_policy(Cluster::RING_HASH);
127   balancer_->ads_service()->SetCdsResource(new_cluster2);
128   // Create Aggregate Cluster
129   auto cluster = default_cluster_;
130   auto* custom_cluster = cluster.mutable_cluster_type();
131   custom_cluster->set_name("envoy.clusters.aggregate");
132   ClusterConfig cluster_config;
133   cluster_config.add_clusters(kNewCluster1Name);
134   cluster_config.add_clusters(kNewCluster2Name);
135   custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
136   balancer_->ads_service()->SetCdsResource(cluster);
137   // Set up route with channel id hashing
138   auto new_route_config = default_route_config_;
139   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
140   auto* hash_policy = route->mutable_route()->add_hash_policy();
141   hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
142   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
143                                    new_route_config);
144   // Verifying that we are using ring hash as only 1 endpoint is receiving all
145   // the traffic.
146   CheckRpcSendOk(DEBUG_LOCATION, 100, RpcOptions().set_timeout_ms(5000));
147   bool found = false;
148   for (size_t i = 0; i < backends_.size(); ++i) {
149     if (backends_[i]->backend_service()->request_count() > 0) {
150       EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
151           << "backend " << i;
152       EXPECT_FALSE(found) << "backend " << i;
153       found = true;
154     }
155   }
156   EXPECT_TRUE(found);
157 }
158 
TEST_P(RingHashTest,AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup)159 TEST_P(RingHashTest,
160        AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup) {
161   CreateAndStartBackends(1);
162   const char* kEdsClusterName = "eds_cluster";
163   const char* kLogicalDNSClusterName = "logical_dns_cluster";
164   // Populate EDS resource.
165   EdsResourceArgs args({
166       {"locality0",
167        {MakeNonExistentEndpoint(), MakeNonExistentEndpoint()},
168        kDefaultLocalityWeight,
169        0},
170       {"locality1",
171        {MakeNonExistentEndpoint(), MakeNonExistentEndpoint()},
172        kDefaultLocalityWeight,
173        1},
174   });
175   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
176   // Populate new CDS resources.
177   Cluster eds_cluster = default_cluster_;
178   eds_cluster.set_name(kEdsClusterName);
179   eds_cluster.set_lb_policy(Cluster::RING_HASH);
180   balancer_->ads_service()->SetCdsResource(eds_cluster);
181   // Populate LOGICAL_DNS cluster.
182   auto logical_dns_cluster = default_cluster_;
183   logical_dns_cluster.set_name(kLogicalDNSClusterName);
184   logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
185   auto* address = logical_dns_cluster.mutable_load_assignment()
186                       ->add_endpoints()
187                       ->add_lb_endpoints()
188                       ->mutable_endpoint()
189                       ->mutable_address()
190                       ->mutable_socket_address();
191   address->set_address(kServerName);
192   address->set_port_value(443);
193   balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
194   // Create Aggregate Cluster
195   auto cluster = default_cluster_;
196   auto* custom_cluster = cluster.mutable_cluster_type();
197   custom_cluster->set_name("envoy.clusters.aggregate");
198   ClusterConfig cluster_config;
199   cluster_config.add_clusters(kEdsClusterName);
200   cluster_config.add_clusters(kLogicalDNSClusterName);
201   custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
202   balancer_->ads_service()->SetCdsResource(cluster);
203   // Set up route with channel id hashing
204   auto new_route_config = default_route_config_;
205   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
206   auto* hash_policy = route->mutable_route()->add_hash_policy();
207   hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
208   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
209                                    new_route_config);
210   // Set Logical DNS result
211   {
212     grpc_core::ExecCtx exec_ctx;
213     grpc_core::Resolver::Result result;
214     result.addresses = CreateAddressListFromPortList(GetBackendPorts());
215     logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
216         std::move(result));
217   }
218   // Inject connection delay to make this act more realistically.
219   ConnectionAttemptInjector injector;
220   injector.SetDelay(grpc_core::Duration::Milliseconds(500) *
221                     grpc_test_slowdown_factor());
222   // Send RPC.  Need the timeout to be long enough to account for the
223   // subchannel connection delays.
224   CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(5000));
225 }
226 
TEST_P(RingHashTest,AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupNoFailedRpcs)227 TEST_P(RingHashTest,
228        AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupNoFailedRpcs) {
229   CreateAndStartBackends(1);
230   const char* kEdsClusterName = "eds_cluster";
231   const char* kLogicalDNSClusterName = "logical_dns_cluster";
232   // Populate EDS resource.
233   EdsResourceArgs args({
234       {"locality0",
235        {MakeNonExistentEndpoint(), MakeNonExistentEndpoint()},
236        kDefaultLocalityWeight,
237        0},
238       {"locality1",
239        {MakeNonExistentEndpoint(), MakeNonExistentEndpoint()},
240        kDefaultLocalityWeight,
241        1},
242   });
243   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
244   // Populate new CDS resources.
245   Cluster eds_cluster = default_cluster_;
246   eds_cluster.set_name(kEdsClusterName);
247   eds_cluster.set_lb_policy(Cluster::RING_HASH);
248   balancer_->ads_service()->SetCdsResource(eds_cluster);
249   // Populate LOGICAL_DNS cluster.
250   auto logical_dns_cluster = default_cluster_;
251   logical_dns_cluster.set_name(kLogicalDNSClusterName);
252   logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
253   auto* address = logical_dns_cluster.mutable_load_assignment()
254                       ->add_endpoints()
255                       ->add_lb_endpoints()
256                       ->mutable_endpoint()
257                       ->mutable_address()
258                       ->mutable_socket_address();
259   address->set_address(kServerName);
260   address->set_port_value(443);
261   balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
262   // Create Aggregate Cluster
263   auto cluster = default_cluster_;
264   auto* custom_cluster = cluster.mutable_cluster_type();
265   custom_cluster->set_name("envoy.clusters.aggregate");
266   ClusterConfig cluster_config;
267   cluster_config.add_clusters(kEdsClusterName);
268   cluster_config.add_clusters(kLogicalDNSClusterName);
269   custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
270   balancer_->ads_service()->SetCdsResource(cluster);
271   // Set up route with channel id hashing
272   auto new_route_config = default_route_config_;
273   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
274   auto* hash_policy = route->mutable_route()->add_hash_policy();
275   hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
276   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
277                                    new_route_config);
278   // Set Logical DNS result
279   {
280     grpc_core::ExecCtx exec_ctx;
281     grpc_core::Resolver::Result result;
282     result.addresses = CreateAddressListFromPortList(GetBackendPorts());
283     logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
284         std::move(result));
285   }
286   // Set up connection attempt injector.
287   ConnectionAttemptInjector injector;
288   auto hold = injector.AddHold(backends_[0]->port());
289   // Increase subchannel backoff time, so that subchannels stay in
290   // TRANSIENT_FAILURE for long enough to trigger potential problems.
291   ChannelArguments channel_args;
292   channel_args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
293                       10000 * grpc_test_slowdown_factor());
294   SetUpChannel(&channel_args);
295   // Start an RPC in the background.
296   LongRunningRpc rpc;
297   rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(5000));
298   // Wait for connection attempt to the backend.
299   hold->Wait();
300   // Channel should report CONNECTING here, and any RPC should be queued.
301   EXPECT_EQ(channel_->GetState(false), GRPC_CHANNEL_CONNECTING);
302   // Start a second RPC at this point, which should be queued as well.
303   // This will fail if the priority policy fails to update the picker to
304   // point to the LOGICAL_DNS child; if it leaves it pointing to the EDS
305   // priority 1, then the RPC will fail, because all subchannels are in
306   // TRANSIENT_FAILURE.
307   // Note that sending only the first RPC does not catch this case,
308   // because if the priority policy fails to update the picker, then the
309   // pick for the first RPC will not be retried.
310   LongRunningRpc rpc2;
311   rpc2.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(5000));
312   // Allow the connection attempt to complete.
313   hold->Resume();
314   // Now the RPCs should complete successfully.
315   LOG(INFO) << "=== WAITING FOR FIRST RPC TO FINISH ===";
316   Status status = rpc.GetStatus();
317   LOG(INFO) << "=== FIRST RPC FINISHED ===";
318   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
319                            << " message=" << status.error_message();
320   LOG(INFO) << "=== WAITING FOR SECOND RPC TO FINISH ===";
321   status = rpc2.GetStatus();
322   LOG(INFO) << "=== SECOND RPC FINISHED ===";
323   EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
324                            << " message=" << status.error_message();
325 }
326 
327 // Tests that ring hash policy that hashes using channel id ensures all RPCs
328 // to go 1 particular backend.
TEST_P(RingHashTest,ChannelIdHashing)329 TEST_P(RingHashTest, ChannelIdHashing) {
330   CreateAndStartBackends(4);
331   auto cluster = default_cluster_;
332   cluster.set_lb_policy(Cluster::RING_HASH);
333   balancer_->ads_service()->SetCdsResource(cluster);
334   auto new_route_config = default_route_config_;
335   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
336   auto* hash_policy = route->mutable_route()->add_hash_policy();
337   hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
338   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
339                                    new_route_config);
340   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
341   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
342   CheckRpcSendOk(DEBUG_LOCATION, 100, RpcOptions().set_timeout_ms(5000));
343   bool found = false;
344   for (size_t i = 0; i < backends_.size(); ++i) {
345     if (backends_[i]->backend_service()->request_count() > 0) {
346       EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
347           << "backend " << i;
348       EXPECT_FALSE(found) << "backend " << i;
349       found = true;
350     }
351   }
352   EXPECT_TRUE(found);
353 }
354 
355 // Tests that ring hash policy that hashes using a header value can spread
356 // RPCs across all the backends.
TEST_P(RingHashTest,HeaderHashing)357 TEST_P(RingHashTest, HeaderHashing) {
358   CreateAndStartBackends(4);
359   auto cluster = default_cluster_;
360   cluster.set_lb_policy(Cluster::RING_HASH);
361   balancer_->ads_service()->SetCdsResource(cluster);
362   auto new_route_config = default_route_config_;
363   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
364   auto* hash_policy = route->mutable_route()->add_hash_policy();
365   hash_policy->mutable_header()->set_header_name("address_hash");
366   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
367                                    new_route_config);
368   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
369   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
370   // Note each type of RPC will contains a header value that will always be
371   // hashed to a specific backend as the header value matches the value used
372   // to create the entry in the ring.
373   std::vector<std::pair<std::string, std::string>> metadata = {
374       {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
375   std::vector<std::pair<std::string, std::string>> metadata1 = {
376       {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
377   std::vector<std::pair<std::string, std::string>> metadata2 = {
378       {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
379   std::vector<std::pair<std::string, std::string>> metadata3 = {
380       {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
381   const auto rpc_options =
382       RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
383   const auto rpc_options1 =
384       RpcOptions().set_metadata(std::move(metadata1)).set_timeout_ms(5000);
385   const auto rpc_options2 =
386       RpcOptions().set_metadata(std::move(metadata2)).set_timeout_ms(5000);
387   const auto rpc_options3 =
388       RpcOptions().set_metadata(std::move(metadata3)).set_timeout_ms(5000);
389   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
390                  WaitForBackendOptions(), rpc_options);
391   WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
392                  WaitForBackendOptions(), rpc_options1);
393   WaitForBackend(DEBUG_LOCATION, 2, /*check_status=*/nullptr,
394                  WaitForBackendOptions(), rpc_options2);
395   WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
396                  WaitForBackendOptions(), rpc_options3);
397   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
398   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options1);
399   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options2);
400   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options3);
401   for (size_t i = 0; i < backends_.size(); ++i) {
402     EXPECT_EQ(100, backends_[i]->backend_service()->request_count());
403   }
404 }
405 
406 // Tests that ring hash policy that hashes using a header value and regex
407 // rewrite to aggregate RPCs to 1 backend.
TEST_P(RingHashTest,HeaderHashingWithRegexRewrite)408 TEST_P(RingHashTest, HeaderHashingWithRegexRewrite) {
409   CreateAndStartBackends(4);
410   auto cluster = default_cluster_;
411   cluster.set_lb_policy(Cluster::RING_HASH);
412   balancer_->ads_service()->SetCdsResource(cluster);
413   auto new_route_config = default_route_config_;
414   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
415   auto* hash_policy = route->mutable_route()->add_hash_policy();
416   hash_policy->mutable_header()->set_header_name("address_hash");
417   hash_policy->mutable_header()
418       ->mutable_regex_rewrite()
419       ->mutable_pattern()
420       ->set_regex("[0-9]+");
421   hash_policy->mutable_header()->mutable_regex_rewrite()->set_substitution(
422       "foo");
423   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
424                                    new_route_config);
425   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
426   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
427   std::vector<std::pair<std::string, std::string>> metadata = {
428       {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
429   std::vector<std::pair<std::string, std::string>> metadata1 = {
430       {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
431   std::vector<std::pair<std::string, std::string>> metadata2 = {
432       {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
433   std::vector<std::pair<std::string, std::string>> metadata3 = {
434       {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
435   const auto rpc_options =
436       RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
437   const auto rpc_options1 =
438       RpcOptions().set_metadata(std::move(metadata1)).set_timeout_ms(5000);
439   const auto rpc_options2 =
440       RpcOptions().set_metadata(std::move(metadata2)).set_timeout_ms(5000);
441   const auto rpc_options3 =
442       RpcOptions().set_metadata(std::move(metadata3)).set_timeout_ms(5000);
443   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
444   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options1);
445   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options2);
446   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options3);
447   bool found = false;
448   for (size_t i = 0; i < backends_.size(); ++i) {
449     if (backends_[i]->backend_service()->request_count() > 0) {
450       EXPECT_EQ(backends_[i]->backend_service()->request_count(), 400)
451           << "backend " << i;
452       EXPECT_FALSE(found) << "backend " << i;
453       found = true;
454     }
455   }
456   EXPECT_TRUE(found);
457 }
458 
TEST_P(RingHashTest,HashKeysInEds)459 TEST_P(RingHashTest, HashKeysInEds) {
460   grpc_core::testing::ScopedEnvVar env(
461       "GRPC_XDS_ENDPOINT_HASH_KEY_BACKWARD_COMPAT", "false");
462   CreateAndStartBackends(4);
463   auto cluster = default_cluster_;
464   cluster.set_lb_policy(Cluster::RING_HASH);
465   balancer_->ads_service()->SetCdsResource(cluster);
466   auto new_route_config = default_route_config_;
467   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
468   auto* hash_policy = route->mutable_route()->add_hash_policy();
469   hash_policy->mutable_header()->set_header_name("address_hash");
470   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
471                                    new_route_config);
472   EdsResourceArgs args(
473       {{"locality0",
474         {
475             CreateEndpoint(0,
476                            /*health_status=*/
477                            ::envoy::config::core::v3::HealthStatus::UNKNOWN,
478                            /*lb_weight=*/1, /*additional_backend_indexes=*/{},
479                            /*hostname=*/"",
480                            {{"envoy.lb", "{\"hash_key\":\"foo\"}"}}),
481             CreateEndpoint(1,
482                            /*health_status=*/
483                            ::envoy::config::core::v3::HealthStatus::UNKNOWN,
484                            /*lb_weight=*/1, /*additional_backend_indexes=*/{},
485                            /*hostname=*/"",
486                            {{"envoy.lb", "{\"hash_key\":\"bar\"}"}}),
487             CreateEndpoint(2,
488                            /*health_status=*/
489                            ::envoy::config::core::v3::HealthStatus::UNKNOWN,
490                            /*lb_weight=*/1, /*additional_backend_indexes=*/{},
491                            /*hostname=*/"",
492                            {{"envoy.lb", "{\"hash_key\":\"baz\"}"}}),
493             CreateEndpoint(3,
494                            /*health_status=*/
495                            ::envoy::config::core::v3::HealthStatus::UNKNOWN,
496                            /*lb_weight=*/1, /*additional_backend_indexes=*/{},
497                            /*hostname=*/"",
498                            {{"envoy.lb", "{\"hash_key\":\"quux\"}"}}),
499         }}});
500   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
501   // Note each type of RPC will contains a header value that will always be
502   // hashed to a specific backend as the header value matches the value used
503   // to create the entry in the ring.
504   std::vector<std::pair<std::string, std::string>> metadata = {
505       {"address_hash", "foo_0"}};
506   std::vector<std::pair<std::string, std::string>> metadata1 = {
507       {"address_hash", "bar_0"}};
508   std::vector<std::pair<std::string, std::string>> metadata2 = {
509       {"address_hash", "baz_0"}};
510   std::vector<std::pair<std::string, std::string>> metadata3 = {
511       {"address_hash", "quux_0"}};
512   const auto rpc_options =
513       RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
514   const auto rpc_options1 =
515       RpcOptions().set_metadata(std::move(metadata1)).set_timeout_ms(5000);
516   const auto rpc_options2 =
517       RpcOptions().set_metadata(std::move(metadata2)).set_timeout_ms(5000);
518   const auto rpc_options3 =
519       RpcOptions().set_metadata(std::move(metadata3)).set_timeout_ms(5000);
520   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
521                  WaitForBackendOptions(), rpc_options);
522   WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
523                  WaitForBackendOptions(), rpc_options1);
524   WaitForBackend(DEBUG_LOCATION, 2, /*check_status=*/nullptr,
525                  WaitForBackendOptions(), rpc_options2);
526   WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
527                  WaitForBackendOptions(), rpc_options3);
528   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
529   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options1);
530   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options2);
531   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options3);
532   for (size_t i = 0; i < backends_.size(); ++i) {
533     EXPECT_EQ(100, backends_[i]->backend_service()->request_count());
534   }
535 }
536 
TEST_P(RingHashTest,HashKeysInEdsNotEnabled)537 TEST_P(RingHashTest, HashKeysInEdsNotEnabled) {
538   CreateAndStartBackends(4);
539   auto cluster = default_cluster_;
540   cluster.set_lb_policy(Cluster::RING_HASH);
541   balancer_->ads_service()->SetCdsResource(cluster);
542   auto new_route_config = default_route_config_;
543   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
544   auto* hash_policy = route->mutable_route()->add_hash_policy();
545   hash_policy->mutable_header()->set_header_name("address_hash");
546   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
547                                    new_route_config);
548   EdsResourceArgs args(
549       {{"locality0",
550         {
551             CreateEndpoint(0,
552                            /*health_status=*/
553                            ::envoy::config::core::v3::HealthStatus::UNKNOWN,
554                            /*lb_weight=*/1, /*additional_backend_indexes=*/{},
555                            /*hostname=*/"",
556                            {{"envoy.lb", "{\"hash_key\":\"foo\"}"}}),
557             CreateEndpoint(1,
558                            /*health_status=*/
559                            ::envoy::config::core::v3::HealthStatus::UNKNOWN,
560                            /*lb_weight=*/1, /*additional_backend_indexes=*/{},
561                            /*hostname=*/"",
562                            {{"envoy.lb", "{\"hash_key\":\"bar\"}"}}),
563             CreateEndpoint(2,
564                            /*health_status=*/
565                            ::envoy::config::core::v3::HealthStatus::UNKNOWN,
566                            /*lb_weight=*/1, /*additional_backend_indexes=*/{},
567                            /*hostname=*/"",
568                            {{"envoy.lb", "{\"hash_key\":\"baz\"}"}}),
569             CreateEndpoint(3,
570                            /*health_status=*/
571                            ::envoy::config::core::v3::HealthStatus::UNKNOWN,
572                            /*lb_weight=*/1, /*additional_backend_indexes=*/{},
573                            /*hostname=*/"",
574                            {{"envoy.lb", "{\"hash_key\":\"quux\"}"}}),
575         }}});
576   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
577   // Note each type of RPC will contains a header value that will always be
578   // hashed to a specific backend as the header value matches the value used
579   // to create the entry in the ring.
580   std::vector<std::pair<std::string, std::string>> metadata = {
581       {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
582   std::vector<std::pair<std::string, std::string>> metadata1 = {
583       {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
584   std::vector<std::pair<std::string, std::string>> metadata2 = {
585       {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
586   std::vector<std::pair<std::string, std::string>> metadata3 = {
587       {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
588   const auto rpc_options =
589       RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
590   const auto rpc_options1 =
591       RpcOptions().set_metadata(std::move(metadata1)).set_timeout_ms(5000);
592   const auto rpc_options2 =
593       RpcOptions().set_metadata(std::move(metadata2)).set_timeout_ms(5000);
594   const auto rpc_options3 =
595       RpcOptions().set_metadata(std::move(metadata3)).set_timeout_ms(5000);
596   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
597                  WaitForBackendOptions(), rpc_options);
598   WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
599                  WaitForBackendOptions(), rpc_options1);
600   WaitForBackend(DEBUG_LOCATION, 2, /*check_status=*/nullptr,
601                  WaitForBackendOptions(), rpc_options2);
602   WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
603                  WaitForBackendOptions(), rpc_options3);
604   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
605   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options1);
606   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options2);
607   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options3);
608   for (size_t i = 0; i < backends_.size(); ++i) {
609     EXPECT_EQ(100, backends_[i]->backend_service()->request_count());
610   }
611 }
612 
613 // Tests that ring hash policy that hashes using a random value.
TEST_P(RingHashTest,NoHashPolicy)614 TEST_P(RingHashTest, NoHashPolicy) {
615   CreateAndStartBackends(2);
616   const double kDistribution50Percent = 0.5;
617   const double kErrorTolerance = 0.05;
618   const uint32_t kRpcTimeoutMs = 10000;
619   const size_t kNumRpcs =
620       ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
621   auto cluster = default_cluster_;
622   // Increasing min ring size for random distribution.
623   cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
624       100000);
625   cluster.set_lb_policy(Cluster::RING_HASH);
626   balancer_->ads_service()->SetCdsResource(cluster);
627   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
628   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
629   // TODO(donnadionne): remove extended timeout after ring creation
630   // optimization.
631   WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
632                      WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
633                      RpcOptions().set_timeout_ms(kRpcTimeoutMs));
634   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
635   const int request_count_1 = backends_[0]->backend_service()->request_count();
636   const int request_count_2 = backends_[1]->backend_service()->request_count();
637   EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
638               ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
639   EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
640               ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
641 }
642 
643 // Tests that we observe endpoint weights.
TEST_P(RingHashTest,EndpointWeights)644 TEST_P(RingHashTest, EndpointWeights) {
645   CreateAndStartBackends(3);
646   const double kDistribution50Percent = 0.5;
647   const double kDistribution25Percent = 0.25;
648   const double kErrorTolerance = 0.05;
649   const uint32_t kRpcTimeoutMs = 10000;
650   const size_t kNumRpcs =
651       ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
652   auto cluster = default_cluster_;
653   // Increasing min ring size for random distribution.
654   cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
655       100000);
656   cluster.set_lb_policy(Cluster::RING_HASH);
657   balancer_->ads_service()->SetCdsResource(cluster);
658   // Endpoint 0 has weight 0, will be treated as weight 1.
659   // Endpoint 1 has weight 1.
660   // Endpoint 2 has weight 2.
661   EdsResourceArgs args(
662       {{"locality0",
663         {CreateEndpoint(0, ::envoy::config::core::v3::HealthStatus::UNKNOWN, 0),
664          CreateEndpoint(1, ::envoy::config::core::v3::HealthStatus::UNKNOWN, 1),
665          CreateEndpoint(2, ::envoy::config::core::v3::HealthStatus::UNKNOWN,
666                         2)}}});
667   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
668   // TODO(donnadionne): remove extended timeout after ring creation
669   // optimization.
670   WaitForAllBackends(DEBUG_LOCATION, 0, 3, /*check_status=*/nullptr,
671                      WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
672                      RpcOptions().set_timeout_ms(kRpcTimeoutMs));
673   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
674   // Endpoint 2 should see 50% of traffic, and endpoints 0 and 1 should
675   // each see 25% of traffic.
676   const int request_count_0 = backends_[0]->backend_service()->request_count();
677   const int request_count_1 = backends_[1]->backend_service()->request_count();
678   const int request_count_2 = backends_[2]->backend_service()->request_count();
679   EXPECT_THAT(static_cast<double>(request_count_0) / kNumRpcs,
680               ::testing::DoubleNear(kDistribution25Percent, kErrorTolerance));
681   EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
682               ::testing::DoubleNear(kDistribution25Percent, kErrorTolerance));
683   EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
684               ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
685 }
686 
687 // Test that ring hash policy evaluation will continue past the terminal
688 // policy if no results are produced yet.
TEST_P(RingHashTest,ContinuesPastTerminalPolicyThatDoesNotProduceResult)689 TEST_P(RingHashTest, ContinuesPastTerminalPolicyThatDoesNotProduceResult) {
690   CreateAndStartBackends(2);
691   auto cluster = default_cluster_;
692   cluster.set_lb_policy(Cluster::RING_HASH);
693   balancer_->ads_service()->SetCdsResource(cluster);
694   auto new_route_config = default_route_config_;
695   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
696   auto* hash_policy = route->mutable_route()->add_hash_policy();
697   hash_policy->mutable_header()->set_header_name("header_not_present");
698   hash_policy->set_terminal(true);
699   auto* hash_policy2 = route->mutable_route()->add_hash_policy();
700   hash_policy2->mutable_header()->set_header_name("address_hash");
701   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
702                                    new_route_config);
703   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
704   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
705   std::vector<std::pair<std::string, std::string>> metadata = {
706       {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
707   const auto rpc_options =
708       RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
709   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
710   EXPECT_EQ(backends_[0]->backend_service()->request_count(), 100);
711   EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0);
712 }
713 
714 // Test random hash is used when header hashing specified a header field that
715 // the RPC did not have.
TEST_P(RingHashTest,HashOnHeaderThatIsNotPresent)716 TEST_P(RingHashTest, HashOnHeaderThatIsNotPresent) {
717   CreateAndStartBackends(2);
718   const double kDistribution50Percent = 0.5;
719   const double kErrorTolerance = 0.05;
720   const uint32_t kRpcTimeoutMs = 10000;
721   const size_t kNumRpcs =
722       ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
723   auto cluster = default_cluster_;
724   // Increasing min ring size for random distribution.
725   cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
726       100000);
727   cluster.set_lb_policy(Cluster::RING_HASH);
728   balancer_->ads_service()->SetCdsResource(cluster);
729   auto new_route_config = default_route_config_;
730   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
731   auto* hash_policy = route->mutable_route()->add_hash_policy();
732   hash_policy->mutable_header()->set_header_name("header_not_present");
733   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
734                                    new_route_config);
735   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
736   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
737   std::vector<std::pair<std::string, std::string>> metadata = {
738       {"unmatched_header", absl::StrFormat("%" PRIu32, rand())},
739   };
740   const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
741   // TODO(donnadionne): remove extended timeout after ring creation
742   // optimization.
743   WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
744                      WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
745                      RpcOptions().set_timeout_ms(kRpcTimeoutMs));
746   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs, rpc_options);
747   const int request_count_1 = backends_[0]->backend_service()->request_count();
748   const int request_count_2 = backends_[1]->backend_service()->request_count();
749   EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
750               ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
751   EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
752               ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
753 }
754 
755 // Test random hash is used when only unsupported hash policies are
756 // configured.
TEST_P(RingHashTest,UnsupportedHashPolicyDefaultToRandomHashing)757 TEST_P(RingHashTest, UnsupportedHashPolicyDefaultToRandomHashing) {
758   CreateAndStartBackends(2);
759   const double kDistribution50Percent = 0.5;
760   const double kErrorTolerance = 0.05;
761   const uint32_t kRpcTimeoutMs = 10000;
762   const size_t kNumRpcs =
763       ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
764   auto cluster = default_cluster_;
765   // Increasing min ring size for random distribution.
766   cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
767       100000);
768   cluster.set_lb_policy(Cluster::RING_HASH);
769   balancer_->ads_service()->SetCdsResource(cluster);
770   auto new_route_config = default_route_config_;
771   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
772   auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
773   hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
774   auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
775   hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
776       true);
777   auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
778   hash_policy_unsupported_3->mutable_query_parameter()->set_name(
779       "query_parameter");
780   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
781                                    new_route_config);
782   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
783   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
784   // TODO(donnadionne): remove extended timeout after ring creation
785   // optimization.
786   WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
787                      WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
788                      RpcOptions().set_timeout_ms(kRpcTimeoutMs));
789   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
790   const int request_count_1 = backends_[0]->backend_service()->request_count();
791   const int request_count_2 = backends_[1]->backend_service()->request_count();
792   EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
793               ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
794   EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
795               ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
796 }
797 
798 // Tests that ring hash policy that hashes using a random value can spread
799 // RPCs across all the backends according to endpoint weight.
TEST_P(RingHashTest,RandomHashingDistributionAccordingToEndpointWeight)800 TEST_P(RingHashTest, RandomHashingDistributionAccordingToEndpointWeight) {
801   CreateAndStartBackends(2);
802   const size_t kWeight1 = 1;
803   const size_t kWeight2 = 2;
804   const size_t kWeightTotal = kWeight1 + kWeight2;
805   const double kWeight33Percent = static_cast<double>(kWeight1) / kWeightTotal;
806   const double kWeight66Percent = static_cast<double>(kWeight2) / kWeightTotal;
807   const double kErrorTolerance = 0.05;
808   const uint32_t kRpcTimeoutMs = 10000;
809   const size_t kNumRpcs =
810       ComputeIdealNumRpcs(kWeight33Percent, kErrorTolerance);
811   auto cluster = default_cluster_;
812   // Increasing min ring size for random distribution.
813   cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
814       100000);
815   cluster.set_lb_policy(Cluster::RING_HASH);
816   balancer_->ads_service()->SetCdsResource(cluster);
817   EdsResourceArgs args({{"locality0",
818                          {CreateEndpoint(0, HealthStatus::UNKNOWN, 1),
819                           CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}}});
820   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
821   // TODO(donnadionne): remove extended timeout after ring creation
822   // optimization.
823   WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
824                      WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
825                      RpcOptions().set_timeout_ms(kRpcTimeoutMs));
826   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
827   const int weight_33_request_count =
828       backends_[0]->backend_service()->request_count();
829   const int weight_66_request_count =
830       backends_[1]->backend_service()->request_count();
831   EXPECT_THAT(static_cast<double>(weight_33_request_count) / kNumRpcs,
832               ::testing::DoubleNear(kWeight33Percent, kErrorTolerance));
833   EXPECT_THAT(static_cast<double>(weight_66_request_count) / kNumRpcs,
834               ::testing::DoubleNear(kWeight66Percent, kErrorTolerance));
835 }
836 
837 // Tests that ring hash policy that hashes using a random value can spread
838 // RPCs across all the backends according to locality and endpoint weight.
TEST_P(RingHashTest,RandomHashingDistributionAccordingToLocalityAndEndpointWeight)839 TEST_P(RingHashTest,
840        RandomHashingDistributionAccordingToLocalityAndEndpointWeight) {
841   CreateAndStartBackends(2);
842   const size_t kWeight1 = 1 * 1;
843   const size_t kWeight2 = 2 * 2;
844   const size_t kWeightTotal = kWeight1 + kWeight2;
845   const double kWeight20Percent = static_cast<double>(kWeight1) / kWeightTotal;
846   const double kWeight80Percent = static_cast<double>(kWeight2) / kWeightTotal;
847   const double kErrorTolerance = 0.05;
848   const uint32_t kRpcTimeoutMs = 10000;
849   const size_t kNumRpcs =
850       ComputeIdealNumRpcs(kWeight20Percent, kErrorTolerance);
851   auto cluster = default_cluster_;
852   // Increasing min ring size for random distribution.
853   cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
854       100000);
855   cluster.set_lb_policy(Cluster::RING_HASH);
856   balancer_->ads_service()->SetCdsResource(cluster);
857   EdsResourceArgs args(
858       {{"locality0", {CreateEndpoint(0, HealthStatus::UNKNOWN, 1)}, 1},
859        {"locality1", {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}, 2}});
860   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
861   // TODO(donnadionne): remove extended timeout after ring creation
862   // optimization.
863   WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
864                      WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
865                      RpcOptions().set_timeout_ms(kRpcTimeoutMs));
866   CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
867   const int weight_20_request_count =
868       backends_[0]->backend_service()->request_count();
869   const int weight_80_request_count =
870       backends_[1]->backend_service()->request_count();
871   EXPECT_THAT(static_cast<double>(weight_20_request_count) / kNumRpcs,
872               ::testing::DoubleNear(kWeight20Percent, kErrorTolerance));
873   EXPECT_THAT(static_cast<double>(weight_80_request_count) / kNumRpcs,
874               ::testing::DoubleNear(kWeight80Percent, kErrorTolerance));
875 }
876 
877 // Tests that ring hash policy that hashes using a fixed string ensures all
878 // RPCs to go 1 particular backend; and that subsequent hashing policies are
879 // ignored due to the setting of terminal.
TEST_P(RingHashTest,FixedHashingTerminalPolicy)880 TEST_P(RingHashTest, FixedHashingTerminalPolicy) {
881   CreateAndStartBackends(2);
882   auto cluster = default_cluster_;
883   cluster.set_lb_policy(Cluster::RING_HASH);
884   balancer_->ads_service()->SetCdsResource(cluster);
885   auto new_route_config = default_route_config_;
886   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
887   auto* hash_policy = route->mutable_route()->add_hash_policy();
888   hash_policy->mutable_header()->set_header_name("fixed_string");
889   hash_policy->set_terminal(true);
890   auto* hash_policy_to_be_ignored = route->mutable_route()->add_hash_policy();
891   hash_policy_to_be_ignored->mutable_header()->set_header_name("random_string");
892   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
893                                    new_route_config);
894   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
895   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
896   std::vector<std::pair<std::string, std::string>> metadata = {
897       {"fixed_string", "fixed_value"},
898       {"random_string", absl::StrFormat("%" PRIu32, rand())},
899   };
900   const auto rpc_options =
901       RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
902   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
903   bool found = false;
904   for (size_t i = 0; i < backends_.size(); ++i) {
905     if (backends_[i]->backend_service()->request_count() > 0) {
906       EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
907           << "backend " << i;
908       EXPECT_FALSE(found) << "backend " << i;
909       found = true;
910     }
911   }
912   EXPECT_TRUE(found);
913 }
914 
915 // Test that the channel will go from idle to ready via connecting;
916 // (tho it is not possible to catch the connecting state before moving to
917 // ready)
TEST_P(RingHashTest,IdleToReady)918 TEST_P(RingHashTest, IdleToReady) {
919   CreateAndStartBackends(1);
920   auto cluster = default_cluster_;
921   cluster.set_lb_policy(Cluster::RING_HASH);
922   balancer_->ads_service()->SetCdsResource(cluster);
923   auto new_route_config = default_route_config_;
924   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
925   auto* hash_policy = route->mutable_route()->add_hash_policy();
926   hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
927   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
928                                    new_route_config);
929   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
930   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
931   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
932   CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(5000));
933   EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
934 }
935 
936 // Test that the channel will transition to READY once it starts
937 // connecting even if there are no RPCs being sent to the picker.
TEST_P(RingHashTest,ContinuesConnectingWithoutPicks)938 TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
939   // Create EDS resource.
940   CreateAndStartBackends(1);
941   auto non_existent_endpoint = MakeNonExistentEndpoint();
942   EdsResourceArgs args(
943       {{"locality0", {non_existent_endpoint, CreateEndpoint(0)}}});
944   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
945   // Change CDS resource to use RING_HASH.
946   auto cluster = default_cluster_;
947   cluster.set_lb_policy(Cluster::RING_HASH);
948   balancer_->ads_service()->SetCdsResource(cluster);
949   // Add hash policy to RDS resource.
950   auto new_route_config = default_route_config_;
951   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
952   auto* hash_policy = route->mutable_route()->add_hash_policy();
953   hash_policy->mutable_header()->set_header_name("address_hash");
954   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
955                                    new_route_config);
956   // Start connection attempt injector and add a hold for the P0
957   // connection attempt.
958   ConnectionAttemptInjector injector;
959   auto hold = injector.AddHold(non_existent_endpoint.port);
960   // A long-running RPC, just used to send the RPC in another thread.
961   LongRunningRpc rpc;
962   std::vector<std::pair<std::string, std::string>> metadata = {
963       {"address_hash",
964        CreateMetadataValueThatHashesToBackendPort(non_existent_endpoint.port)}};
965   rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
966                                 std::move(metadata)));
967   // Wait for the RPC to trigger the P0 connection attempt, then cancel it,
968   // and then allow the connection attempt to complete.
969   hold->Wait();
970   rpc.CancelRpc();
971   EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
972   hold->Resume();
973   // Wait for channel to become connected without any pending RPC.
974   EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(5)));
975   // Make sure the backend did not get any requests.
976   EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
977 }
978 
979 // Tests that when we trigger internal connection attempts without
980 // picks, we do so for only one subchannel at a time.
TEST_P(RingHashTest,ContinuesConnectingWithoutPicksOneSubchannelAtATime)981 TEST_P(RingHashTest, ContinuesConnectingWithoutPicksOneSubchannelAtATime) {
982   // Create EDS resource.
983   CreateAndStartBackends(1);
984   auto non_existent_endpoint0 = MakeNonExistentEndpoint();
985   auto non_existent_endpoint1 = MakeNonExistentEndpoint();
986   auto non_existent_endpoint2 = MakeNonExistentEndpoint();
987   EdsResourceArgs args({{"locality0",
988                          {non_existent_endpoint0, non_existent_endpoint1,
989                           non_existent_endpoint2, CreateEndpoint(0)}}});
990   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
991   // Change CDS resource to use RING_HASH.
992   auto cluster = default_cluster_;
993   cluster.set_lb_policy(Cluster::RING_HASH);
994   balancer_->ads_service()->SetCdsResource(cluster);
995   // Add hash policy to RDS resource.
996   auto new_route_config = default_route_config_;
997   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
998   auto* hash_policy = route->mutable_route()->add_hash_policy();
999   hash_policy->mutable_header()->set_header_name("address_hash");
1000   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1001                                    new_route_config);
1002   // Start connection attempt injector.
1003   ConnectionAttemptInjector injector;
1004   auto hold_non_existent0 = injector.AddHold(non_existent_endpoint0.port);
1005   auto hold_non_existent1 = injector.AddHold(non_existent_endpoint1.port);
1006   auto hold_non_existent2 = injector.AddHold(non_existent_endpoint2.port);
1007   auto hold_good = injector.AddHold(backends_[0]->port());
1008   // A long-running RPC, just used to send the RPC in another thread.
1009   LongRunningRpc rpc;
1010   std::vector<std::pair<std::string, std::string>> metadata = {
1011       {"address_hash", CreateMetadataValueThatHashesToBackendPort(
1012                            non_existent_endpoint0.port)}};
1013   rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
1014                                 std::move(metadata)));
1015   // Wait for the RPC to trigger a connection attempt to the first address,
1016   // then cancel the RPC.  No other connection attempts should be started yet.
1017   hold_non_existent0->Wait();
1018   rpc.CancelRpc();
1019   EXPECT_FALSE(hold_non_existent1->IsStarted());
1020   EXPECT_FALSE(hold_non_existent2->IsStarted());
1021   EXPECT_FALSE(hold_good->IsStarted());
1022   // Allow the connection attempt to the first address to resume and wait
1023   // for the attempt for the second address.  No other connection
1024   // attempts should be started yet.
1025   auto hold_non_existent0_again = injector.AddHold(non_existent_endpoint0.port);
1026   hold_non_existent0->Resume();
1027   hold_non_existent1->Wait();
1028   EXPECT_FALSE(hold_non_existent0_again->IsStarted());
1029   EXPECT_FALSE(hold_non_existent2->IsStarted());
1030   EXPECT_FALSE(hold_good->IsStarted());
1031   // Allow the connection attempt to the second address to resume and wait
1032   // for the attempt for the third address.  No other connection
1033   // attempts should be started yet.
1034   auto hold_non_existent1_again = injector.AddHold(non_existent_endpoint1.port);
1035   hold_non_existent1->Resume();
1036   hold_non_existent2->Wait();
1037   EXPECT_FALSE(hold_non_existent0_again->IsStarted());
1038   EXPECT_FALSE(hold_non_existent1_again->IsStarted());
1039   EXPECT_FALSE(hold_good->IsStarted());
1040   // Allow the connection attempt to the third address to resume and wait
1041   // for the attempt for the final address.  No other connection
1042   // attempts should be started yet.
1043   auto hold_non_existent2_again = injector.AddHold(non_existent_endpoint2.port);
1044   hold_non_existent2->Resume();
1045   hold_good->Wait();
1046   EXPECT_FALSE(hold_non_existent0_again->IsStarted());
1047   EXPECT_FALSE(hold_non_existent1_again->IsStarted());
1048   EXPECT_FALSE(hold_non_existent2_again->IsStarted());
1049   // Allow the final attempt to resume.
1050   hold_good->Resume();
1051   // Wait for channel to become connected without any pending RPC.
1052   EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(10)));
1053   // No other connection attempts should have been started.
1054   EXPECT_FALSE(hold_non_existent0_again->IsStarted());
1055   EXPECT_FALSE(hold_non_existent1_again->IsStarted());
1056   EXPECT_FALSE(hold_non_existent2_again->IsStarted());
1057   // RPC should have been cancelled.
1058   EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
1059   // Make sure the backend did not get any requests.
1060   EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
1061 }
1062 
1063 // Test that when the first pick is down leading to a transient failure, we
1064 // will move on to the next ring hash entry.
TEST_P(RingHashTest,TransientFailureCheckNextOne)1065 TEST_P(RingHashTest, TransientFailureCheckNextOne) {
1066   CreateAndStartBackends(1);
1067   auto cluster = default_cluster_;
1068   cluster.set_lb_policy(Cluster::RING_HASH);
1069   balancer_->ads_service()->SetCdsResource(cluster);
1070   auto new_route_config = default_route_config_;
1071   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1072   auto* hash_policy = route->mutable_route()->add_hash_policy();
1073   hash_policy->mutable_header()->set_header_name("address_hash");
1074   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1075                                    new_route_config);
1076   std::vector<EdsResourceArgs::Endpoint> endpoints;
1077   const int unused_port = grpc_pick_unused_port_or_die();
1078   endpoints.emplace_back(unused_port);
1079   endpoints.emplace_back(backends_[0]->port());
1080   EdsResourceArgs args({{"locality0", std::move(endpoints)}});
1081   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1082   std::vector<std::pair<std::string, std::string>> metadata = {
1083       {"address_hash",
1084        CreateMetadataValueThatHashesToBackendPort(unused_port)}};
1085   const auto rpc_options =
1086       RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
1087   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1088                  WaitForBackendOptions(), rpc_options);
1089   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
1090 }
1091 
1092 // Test that when a backend goes down, we will move on to the next subchannel
1093 // (with a lower priority).  When the backend comes back up, traffic will move
1094 // back.
TEST_P(RingHashTest,SwitchToLowerPriorityAndThenBack)1095 TEST_P(RingHashTest, SwitchToLowerPriorityAndThenBack) {
1096   CreateAndStartBackends(2);
1097   auto cluster = default_cluster_;
1098   cluster.set_lb_policy(Cluster::RING_HASH);
1099   balancer_->ads_service()->SetCdsResource(cluster);
1100   auto new_route_config = default_route_config_;
1101   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1102   auto* hash_policy = route->mutable_route()->add_hash_policy();
1103   hash_policy->mutable_header()->set_header_name("address_hash");
1104   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1105                                    new_route_config);
1106   EdsResourceArgs args({
1107       {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1108        0},
1109       {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1110        1},
1111   });
1112   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1113   std::vector<std::pair<std::string, std::string>> metadata = {
1114       {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
1115   const auto rpc_options =
1116       RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
1117   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1118                  WaitForBackendOptions(), rpc_options);
1119   backends_[0]->StopListeningAndSendGoaways();
1120   WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
1121                  WaitForBackendOptions(), rpc_options);
1122   ShutdownBackend(0);
1123   StartBackend(0);
1124   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1125                  WaitForBackendOptions(), rpc_options);
1126   CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
1127   EXPECT_EQ(100, backends_[0]->backend_service()->request_count());
1128   EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
1129 }
1130 
1131 // Test that when all backends are down, we will keep reattempting.
TEST_P(RingHashTest,ReattemptWhenAllEndpointsUnreachable)1132 TEST_P(RingHashTest, ReattemptWhenAllEndpointsUnreachable) {
1133   CreateAndStartBackends(1);
1134   const uint32_t kConnectionTimeoutMilliseconds = 5000;
1135   auto cluster = default_cluster_;
1136   cluster.set_lb_policy(Cluster::RING_HASH);
1137   balancer_->ads_service()->SetCdsResource(cluster);
1138   auto new_route_config = default_route_config_;
1139   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1140   auto* hash_policy = route->mutable_route()->add_hash_policy();
1141   hash_policy->mutable_header()->set_header_name("address_hash");
1142   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1143                                    new_route_config);
1144   EdsResourceArgs args(
1145       {{"locality0", {MakeNonExistentEndpoint(), CreateEndpoint(0)}}});
1146   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1147   std::vector<std::pair<std::string, std::string>> metadata = {
1148       {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
1149   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1150   ShutdownBackend(0);
1151   CheckRpcSendFailure(
1152       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1153       MakeConnectionFailureRegex(
1154           "ring hash cannot find a connected endpoint; first failure: "),
1155       RpcOptions().set_metadata(std::move(metadata)));
1156   StartBackend(0);
1157   // Ensure we are actively connecting without any traffic.
1158   EXPECT_TRUE(channel_->WaitForConnected(
1159       grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1160 }
1161 
1162 // Test that when all backends are down and then up, we may pick a TF backend
1163 // and we will then jump to ready backend.
TEST_P(RingHashTest,TransientFailureSkipToAvailableReady)1164 TEST_P(RingHashTest, TransientFailureSkipToAvailableReady) {
1165   CreateBackends(2);
1166   const uint32_t kConnectionTimeoutMilliseconds = 5000;
1167   auto cluster = default_cluster_;
1168   cluster.set_lb_policy(Cluster::RING_HASH);
1169   balancer_->ads_service()->SetCdsResource(cluster);
1170   auto new_route_config = default_route_config_;
1171   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1172   auto* hash_policy = route->mutable_route()->add_hash_policy();
1173   hash_policy->mutable_header()->set_header_name("address_hash");
1174   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1175                                    new_route_config);
1176   // Make sure we include some unused ports to fill the ring.
1177   EdsResourceArgs args({
1178       {"locality0",
1179        {CreateEndpoint(0), CreateEndpoint(1), MakeNonExistentEndpoint(),
1180         MakeNonExistentEndpoint()}},
1181   });
1182   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1183   std::vector<std::pair<std::string, std::string>> metadata = {
1184       {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
1185   const auto rpc_options = RpcOptions()
1186                                .set_metadata(std::move(metadata))
1187                                .set_timeout_ms(kConnectionTimeoutMilliseconds);
1188   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1189   LOG(INFO) << "=== SENDING FIRST RPC ===";
1190   CheckRpcSendFailure(
1191       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1192       MakeConnectionFailureRegex(
1193           "ring hash cannot find a connected endpoint; first failure: "),
1194       rpc_options);
1195   LOG(INFO) << "=== DONE WITH FIRST RPC ===";
1196   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
1197   // Bring up backend 0.  The channel should become connected without
1198   // any picks, because in TF, we are always trying to connect to at
1199   // least one backend at all times.
1200   LOG(INFO) << "=== STARTING BACKEND 0 ===";
1201   StartBackend(0);
1202   LOG(INFO) << "=== WAITING FOR CHANNEL TO BECOME READY ===";
1203   EXPECT_TRUE(channel_->WaitForConnected(
1204       grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1205   // RPCs should go to backend 0.
1206   LOG(INFO) << "=== WAITING FOR BACKEND 0 ===";
1207   WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1208                  WaitForBackendOptions(), rpc_options);
1209   EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
1210   // Bring down backend 0 and bring up backend 1.
1211   // Note the RPC contains a header value that will always be hashed to
1212   // backend 0. So by purposely bringing down backend 0 and bringing up another
1213   // backend, this will ensure Picker's first choice of backend 0 will fail
1214   // and it will go through the remaining subchannels to find one in READY.
1215   // Since the the entries in the ring are pretty distributed and we have
1216   // unused ports to fill the ring, it is almost guaranteed that the Picker
1217   // will go through some non-READY entries and skip them as per design.
1218   LOG(INFO) << "=== SHUTTING DOWN BACKEND 0 ===";
1219   ShutdownBackend(0);
1220   LOG(INFO) << "=== WAITING FOR STATE CHANGE ===";
1221   EXPECT_TRUE(channel_->WaitForStateChange(
1222       GRPC_CHANNEL_READY,
1223       grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1224   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
1225   LOG(INFO) << "=== SENDING SECOND RPC ===";
1226   CheckRpcSendFailure(
1227       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1228       MakeConnectionFailureRegex(
1229           "ring hash cannot find a connected endpoint; first failure: "),
1230       rpc_options);
1231   LOG(INFO) << "=== STARTING BACKEND 1 ===";
1232   StartBackend(1);
1233   LOG(INFO) << "=== WAITING FOR CHANNEL TO BECOME READY ===";
1234   EXPECT_TRUE(channel_->WaitForConnected(
1235       grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1236   LOG(INFO) << "=== WAITING FOR BACKEND 1 ===";
1237   WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
1238                  WaitForBackendOptions(), rpc_options);
1239   LOG(INFO) << "=== DONE ===";
1240 }
1241 
1242 // This tests a bug seen in the wild where ring_hash started with no
1243 // endpoints and reported TRANSIENT_FAILURE, then got an update with
1244 // endpoints and reported IDLE, but the picker update was squelched, so
1245 // it failed to ever get reconnected.
TEST_P(RingHashTest,ReattemptWhenGoingFromTransientFailureToIdle)1246 TEST_P(RingHashTest, ReattemptWhenGoingFromTransientFailureToIdle) {
1247   CreateAndStartBackends(1);
1248   const uint32_t kConnectionTimeoutMilliseconds = 5000;
1249   auto cluster = default_cluster_;
1250   cluster.set_lb_policy(Cluster::RING_HASH);
1251   balancer_->ads_service()->SetCdsResource(cluster);
1252   auto new_route_config = default_route_config_;
1253   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1254                                    new_route_config);
1255   // Send empty EDS update.
1256   EdsResourceArgs args(
1257       {{"locality0", std::vector<EdsResourceArgs::Endpoint>()}});
1258   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1259   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1260   // Channel should fail RPCs and go into TRANSIENT_FAILURE.
1261   CheckRpcSendFailure(
1262       DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1263       "empty address list: EDS resource eds_service_name: contains empty "
1264       "localities: \\[\\{region=\"xds_default_locality_region\", "
1265       "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]",
1266       RpcOptions().set_timeout_ms(kConnectionTimeoutMilliseconds));
1267   EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
1268   // Send EDS update with 1 backend.
1269   args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
1270   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1271   // A wait_for_ready RPC should succeed, and the channel should report READY.
1272   CheckRpcSendOk(DEBUG_LOCATION, 1,
1273                  RpcOptions()
1274                      .set_timeout_ms(kConnectionTimeoutMilliseconds)
1275                      .set_wait_for_ready(true));
1276   EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
1277 }
1278 
1279 // Test unsupported hash policy types are all ignored before a supported
1280 // policy.
TEST_P(RingHashTest,UnsupportedHashPolicyUntilChannelIdHashing)1281 TEST_P(RingHashTest, UnsupportedHashPolicyUntilChannelIdHashing) {
1282   CreateAndStartBackends(2);
1283   auto cluster = default_cluster_;
1284   cluster.set_lb_policy(Cluster::RING_HASH);
1285   balancer_->ads_service()->SetCdsResource(cluster);
1286   auto new_route_config = default_route_config_;
1287   auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1288   auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
1289   hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
1290   auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
1291   hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
1292       true);
1293   auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
1294   hash_policy_unsupported_3->mutable_query_parameter()->set_name(
1295       "query_parameter");
1296   auto* hash_policy = route->mutable_route()->add_hash_policy();
1297   hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
1298   SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1299                                    new_route_config);
1300   EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1301   balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1302   CheckRpcSendOk(DEBUG_LOCATION, 100, RpcOptions().set_timeout_ms(5000));
1303   bool found = false;
1304   for (size_t i = 0; i < backends_.size(); ++i) {
1305     if (backends_[i]->backend_service()->request_count() > 0) {
1306       EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
1307           << "backend " << i;
1308       EXPECT_FALSE(found) << "backend " << i;
1309       found = true;
1310     }
1311   }
1312   EXPECT_TRUE(found);
1313 }
1314 
1315 }  // namespace
1316 }  // namespace testing
1317 }  // namespace grpc
1318 
main(int argc,char ** argv)1319 int main(int argc, char** argv) {
1320   grpc::testing::TestEnvironment env(&argc, argv);
1321   ::testing::InitGoogleTest(&argc, argv);
1322   // Make the backup poller poll very frequently in order to pick up
1323   // updates from all the subchannels's FDs.
1324   grpc_core::ConfigVars::Overrides overrides;
1325   overrides.client_channel_backup_poll_interval_ms = 1;
1326   grpc_core::ConfigVars::SetOverrides(overrides);
1327 #if TARGET_OS_IPHONE
1328   // Workaround Apple CFStream bug
1329   grpc_core::SetEnv("grpc_cfstream", "0");
1330 #endif
1331   grpc_init();
1332   grpc::testing::ConnectionAttemptInjector::Init();
1333   const auto result = RUN_ALL_TESTS();
1334   grpc_shutdown();
1335   return result;
1336 }
1337