1 // Copyright 2017 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 //
15
16 #include <gmock/gmock.h>
17 #include <grpc/event_engine/endpoint_config.h>
18 #include <gtest/gtest.h>
19
20 #include <string>
21 #include <vector>
22
23 #include "absl/log/check.h"
24 #include "absl/log/log.h"
25 #include "absl/strings/str_cat.h"
26 #include "absl/strings/str_format.h"
27 #include "envoy/config/cluster/v3/cluster.pb.h"
28 #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h"
29 #include "src/core/client_channel/backup_poller.h"
30 #include "src/core/config/config_vars.h"
31 #include "src/core/lib/address_utils/sockaddr_utils.h"
32 #include "src/core/load_balancing/xds/xds_channel_args.h"
33 #include "src/core/resolver/fake/fake_resolver.h"
34 #include "src/core/util/env.h"
35 #include "test/core/test_util/resolve_localhost_ip46.h"
36 #include "test/core/test_util/scoped_env_var.h"
37 #include "test/cpp/end2end/connection_attempt_injector.h"
38 #include "test/cpp/end2end/xds/xds_end2end_test_lib.h"
39
40 namespace grpc {
41 namespace testing {
42 namespace {
43
44 using ::envoy::config::core::v3::HealthStatus;
45 using ::envoy::extensions::clusters::aggregate::v3::ClusterConfig;
46
47 class RingHashTest : public XdsEnd2endTest {
48 protected:
SetUp()49 void SetUp() override {
50 logical_dns_cluster_resolver_response_generator_ =
51 grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
52 InitClient();
53 SetUpChannel();
54 }
55
SetUpChannel(ChannelArguments * args=nullptr)56 void SetUpChannel(ChannelArguments* args = nullptr) {
57 ChannelArguments local_args;
58 if (args == nullptr) args = &local_args;
59 args->SetPointerWithVtable(
60 GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR,
61 logical_dns_cluster_resolver_response_generator_.get(),
62 &grpc_core::FakeResolverResponseGenerator::kChannelArgPointerVtable);
63 ResetStub(/*failover_timeout_ms=*/0, args);
64 }
65
CreateAddressListFromPortList(const std::vector<int> & ports)66 grpc_core::EndpointAddressesList CreateAddressListFromPortList(
67 const std::vector<int>& ports) {
68 grpc_core::EndpointAddressesList addresses;
69 for (int port : ports) {
70 absl::StatusOr<grpc_core::URI> lb_uri =
71 grpc_core::URI::Parse(grpc_core::LocalIpUri(port));
72 CHECK_OK(lb_uri);
73 grpc_resolved_address address;
74 CHECK(grpc_parse_uri(*lb_uri, &address));
75 addresses.emplace_back(address, grpc_core::ChannelArgs());
76 }
77 return addresses;
78 }
79
CreateMetadataValueThatHashesToBackendPort(int port)80 std::string CreateMetadataValueThatHashesToBackendPort(int port) {
81 return absl::StrCat(grpc_core::LocalIp(), ":", port, "_0");
82 }
83
CreateMetadataValueThatHashesToBackend(int index)84 std::string CreateMetadataValueThatHashesToBackend(int index) {
85 return CreateMetadataValueThatHashesToBackendPort(backends_[index]->port());
86 }
87
88 grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
89 logical_dns_cluster_resolver_response_generator_;
90 };
91
92 // Run both with and without load reporting, just for test coverage.
93 INSTANTIATE_TEST_SUITE_P(
94 XdsTest, RingHashTest,
95 ::testing::Values(XdsTestType(), XdsTestType().set_enable_load_reporting()),
96 &XdsTestType::Name);
97
TEST_P(RingHashTest,AggregateClusterFallBackFromRingHashAtStartup)98 TEST_P(RingHashTest, AggregateClusterFallBackFromRingHashAtStartup) {
99 CreateAndStartBackends(2);
100 const char* kNewCluster1Name = "new_cluster_1";
101 const char* kNewEdsService1Name = "new_eds_service_name_1";
102 const char* kNewCluster2Name = "new_cluster_2";
103 const char* kNewEdsService2Name = "new_eds_service_name_2";
104 // Populate new EDS resources.
105 EdsResourceArgs args1({
106 {"locality0", {MakeNonExistentEndpoint(), MakeNonExistentEndpoint()}},
107 });
108 EdsResourceArgs args2({
109 {"locality0", CreateEndpointsForBackends()},
110 });
111 balancer_->ads_service()->SetEdsResource(
112 BuildEdsResource(args1, kNewEdsService1Name));
113 balancer_->ads_service()->SetEdsResource(
114 BuildEdsResource(args2, kNewEdsService2Name));
115 // Populate new CDS resources.
116 Cluster new_cluster1 = default_cluster_;
117 new_cluster1.set_name(kNewCluster1Name);
118 new_cluster1.mutable_eds_cluster_config()->set_service_name(
119 kNewEdsService1Name);
120 new_cluster1.set_lb_policy(Cluster::RING_HASH);
121 balancer_->ads_service()->SetCdsResource(new_cluster1);
122 Cluster new_cluster2 = default_cluster_;
123 new_cluster2.set_name(kNewCluster2Name);
124 new_cluster2.mutable_eds_cluster_config()->set_service_name(
125 kNewEdsService2Name);
126 new_cluster2.set_lb_policy(Cluster::RING_HASH);
127 balancer_->ads_service()->SetCdsResource(new_cluster2);
128 // Create Aggregate Cluster
129 auto cluster = default_cluster_;
130 auto* custom_cluster = cluster.mutable_cluster_type();
131 custom_cluster->set_name("envoy.clusters.aggregate");
132 ClusterConfig cluster_config;
133 cluster_config.add_clusters(kNewCluster1Name);
134 cluster_config.add_clusters(kNewCluster2Name);
135 custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
136 balancer_->ads_service()->SetCdsResource(cluster);
137 // Set up route with channel id hashing
138 auto new_route_config = default_route_config_;
139 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
140 auto* hash_policy = route->mutable_route()->add_hash_policy();
141 hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
142 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
143 new_route_config);
144 // Verifying that we are using ring hash as only 1 endpoint is receiving all
145 // the traffic.
146 CheckRpcSendOk(DEBUG_LOCATION, 100, RpcOptions().set_timeout_ms(5000));
147 bool found = false;
148 for (size_t i = 0; i < backends_.size(); ++i) {
149 if (backends_[i]->backend_service()->request_count() > 0) {
150 EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
151 << "backend " << i;
152 EXPECT_FALSE(found) << "backend " << i;
153 found = true;
154 }
155 }
156 EXPECT_TRUE(found);
157 }
158
TEST_P(RingHashTest,AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup)159 TEST_P(RingHashTest,
160 AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup) {
161 CreateAndStartBackends(1);
162 const char* kEdsClusterName = "eds_cluster";
163 const char* kLogicalDNSClusterName = "logical_dns_cluster";
164 // Populate EDS resource.
165 EdsResourceArgs args({
166 {"locality0",
167 {MakeNonExistentEndpoint(), MakeNonExistentEndpoint()},
168 kDefaultLocalityWeight,
169 0},
170 {"locality1",
171 {MakeNonExistentEndpoint(), MakeNonExistentEndpoint()},
172 kDefaultLocalityWeight,
173 1},
174 });
175 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
176 // Populate new CDS resources.
177 Cluster eds_cluster = default_cluster_;
178 eds_cluster.set_name(kEdsClusterName);
179 eds_cluster.set_lb_policy(Cluster::RING_HASH);
180 balancer_->ads_service()->SetCdsResource(eds_cluster);
181 // Populate LOGICAL_DNS cluster.
182 auto logical_dns_cluster = default_cluster_;
183 logical_dns_cluster.set_name(kLogicalDNSClusterName);
184 logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
185 auto* address = logical_dns_cluster.mutable_load_assignment()
186 ->add_endpoints()
187 ->add_lb_endpoints()
188 ->mutable_endpoint()
189 ->mutable_address()
190 ->mutable_socket_address();
191 address->set_address(kServerName);
192 address->set_port_value(443);
193 balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
194 // Create Aggregate Cluster
195 auto cluster = default_cluster_;
196 auto* custom_cluster = cluster.mutable_cluster_type();
197 custom_cluster->set_name("envoy.clusters.aggregate");
198 ClusterConfig cluster_config;
199 cluster_config.add_clusters(kEdsClusterName);
200 cluster_config.add_clusters(kLogicalDNSClusterName);
201 custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
202 balancer_->ads_service()->SetCdsResource(cluster);
203 // Set up route with channel id hashing
204 auto new_route_config = default_route_config_;
205 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
206 auto* hash_policy = route->mutable_route()->add_hash_policy();
207 hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
208 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
209 new_route_config);
210 // Set Logical DNS result
211 {
212 grpc_core::ExecCtx exec_ctx;
213 grpc_core::Resolver::Result result;
214 result.addresses = CreateAddressListFromPortList(GetBackendPorts());
215 logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
216 std::move(result));
217 }
218 // Inject connection delay to make this act more realistically.
219 ConnectionAttemptInjector injector;
220 injector.SetDelay(grpc_core::Duration::Milliseconds(500) *
221 grpc_test_slowdown_factor());
222 // Send RPC. Need the timeout to be long enough to account for the
223 // subchannel connection delays.
224 CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(5000));
225 }
226
TEST_P(RingHashTest,AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupNoFailedRpcs)227 TEST_P(RingHashTest,
228 AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupNoFailedRpcs) {
229 CreateAndStartBackends(1);
230 const char* kEdsClusterName = "eds_cluster";
231 const char* kLogicalDNSClusterName = "logical_dns_cluster";
232 // Populate EDS resource.
233 EdsResourceArgs args({
234 {"locality0",
235 {MakeNonExistentEndpoint(), MakeNonExistentEndpoint()},
236 kDefaultLocalityWeight,
237 0},
238 {"locality1",
239 {MakeNonExistentEndpoint(), MakeNonExistentEndpoint()},
240 kDefaultLocalityWeight,
241 1},
242 });
243 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
244 // Populate new CDS resources.
245 Cluster eds_cluster = default_cluster_;
246 eds_cluster.set_name(kEdsClusterName);
247 eds_cluster.set_lb_policy(Cluster::RING_HASH);
248 balancer_->ads_service()->SetCdsResource(eds_cluster);
249 // Populate LOGICAL_DNS cluster.
250 auto logical_dns_cluster = default_cluster_;
251 logical_dns_cluster.set_name(kLogicalDNSClusterName);
252 logical_dns_cluster.set_type(Cluster::LOGICAL_DNS);
253 auto* address = logical_dns_cluster.mutable_load_assignment()
254 ->add_endpoints()
255 ->add_lb_endpoints()
256 ->mutable_endpoint()
257 ->mutable_address()
258 ->mutable_socket_address();
259 address->set_address(kServerName);
260 address->set_port_value(443);
261 balancer_->ads_service()->SetCdsResource(logical_dns_cluster);
262 // Create Aggregate Cluster
263 auto cluster = default_cluster_;
264 auto* custom_cluster = cluster.mutable_cluster_type();
265 custom_cluster->set_name("envoy.clusters.aggregate");
266 ClusterConfig cluster_config;
267 cluster_config.add_clusters(kEdsClusterName);
268 cluster_config.add_clusters(kLogicalDNSClusterName);
269 custom_cluster->mutable_typed_config()->PackFrom(cluster_config);
270 balancer_->ads_service()->SetCdsResource(cluster);
271 // Set up route with channel id hashing
272 auto new_route_config = default_route_config_;
273 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
274 auto* hash_policy = route->mutable_route()->add_hash_policy();
275 hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
276 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
277 new_route_config);
278 // Set Logical DNS result
279 {
280 grpc_core::ExecCtx exec_ctx;
281 grpc_core::Resolver::Result result;
282 result.addresses = CreateAddressListFromPortList(GetBackendPorts());
283 logical_dns_cluster_resolver_response_generator_->SetResponseSynchronously(
284 std::move(result));
285 }
286 // Set up connection attempt injector.
287 ConnectionAttemptInjector injector;
288 auto hold = injector.AddHold(backends_[0]->port());
289 // Increase subchannel backoff time, so that subchannels stay in
290 // TRANSIENT_FAILURE for long enough to trigger potential problems.
291 ChannelArguments channel_args;
292 channel_args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
293 10000 * grpc_test_slowdown_factor());
294 SetUpChannel(&channel_args);
295 // Start an RPC in the background.
296 LongRunningRpc rpc;
297 rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(5000));
298 // Wait for connection attempt to the backend.
299 hold->Wait();
300 // Channel should report CONNECTING here, and any RPC should be queued.
301 EXPECT_EQ(channel_->GetState(false), GRPC_CHANNEL_CONNECTING);
302 // Start a second RPC at this point, which should be queued as well.
303 // This will fail if the priority policy fails to update the picker to
304 // point to the LOGICAL_DNS child; if it leaves it pointing to the EDS
305 // priority 1, then the RPC will fail, because all subchannels are in
306 // TRANSIENT_FAILURE.
307 // Note that sending only the first RPC does not catch this case,
308 // because if the priority policy fails to update the picker, then the
309 // pick for the first RPC will not be retried.
310 LongRunningRpc rpc2;
311 rpc2.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(5000));
312 // Allow the connection attempt to complete.
313 hold->Resume();
314 // Now the RPCs should complete successfully.
315 LOG(INFO) << "=== WAITING FOR FIRST RPC TO FINISH ===";
316 Status status = rpc.GetStatus();
317 LOG(INFO) << "=== FIRST RPC FINISHED ===";
318 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
319 << " message=" << status.error_message();
320 LOG(INFO) << "=== WAITING FOR SECOND RPC TO FINISH ===";
321 status = rpc2.GetStatus();
322 LOG(INFO) << "=== SECOND RPC FINISHED ===";
323 EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
324 << " message=" << status.error_message();
325 }
326
327 // Tests that ring hash policy that hashes using channel id ensures all RPCs
328 // to go 1 particular backend.
TEST_P(RingHashTest,ChannelIdHashing)329 TEST_P(RingHashTest, ChannelIdHashing) {
330 CreateAndStartBackends(4);
331 auto cluster = default_cluster_;
332 cluster.set_lb_policy(Cluster::RING_HASH);
333 balancer_->ads_service()->SetCdsResource(cluster);
334 auto new_route_config = default_route_config_;
335 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
336 auto* hash_policy = route->mutable_route()->add_hash_policy();
337 hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
338 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
339 new_route_config);
340 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
341 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
342 CheckRpcSendOk(DEBUG_LOCATION, 100, RpcOptions().set_timeout_ms(5000));
343 bool found = false;
344 for (size_t i = 0; i < backends_.size(); ++i) {
345 if (backends_[i]->backend_service()->request_count() > 0) {
346 EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
347 << "backend " << i;
348 EXPECT_FALSE(found) << "backend " << i;
349 found = true;
350 }
351 }
352 EXPECT_TRUE(found);
353 }
354
355 // Tests that ring hash policy that hashes using a header value can spread
356 // RPCs across all the backends.
TEST_P(RingHashTest,HeaderHashing)357 TEST_P(RingHashTest, HeaderHashing) {
358 CreateAndStartBackends(4);
359 auto cluster = default_cluster_;
360 cluster.set_lb_policy(Cluster::RING_HASH);
361 balancer_->ads_service()->SetCdsResource(cluster);
362 auto new_route_config = default_route_config_;
363 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
364 auto* hash_policy = route->mutable_route()->add_hash_policy();
365 hash_policy->mutable_header()->set_header_name("address_hash");
366 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
367 new_route_config);
368 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
369 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
370 // Note each type of RPC will contains a header value that will always be
371 // hashed to a specific backend as the header value matches the value used
372 // to create the entry in the ring.
373 std::vector<std::pair<std::string, std::string>> metadata = {
374 {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
375 std::vector<std::pair<std::string, std::string>> metadata1 = {
376 {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
377 std::vector<std::pair<std::string, std::string>> metadata2 = {
378 {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
379 std::vector<std::pair<std::string, std::string>> metadata3 = {
380 {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
381 const auto rpc_options =
382 RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
383 const auto rpc_options1 =
384 RpcOptions().set_metadata(std::move(metadata1)).set_timeout_ms(5000);
385 const auto rpc_options2 =
386 RpcOptions().set_metadata(std::move(metadata2)).set_timeout_ms(5000);
387 const auto rpc_options3 =
388 RpcOptions().set_metadata(std::move(metadata3)).set_timeout_ms(5000);
389 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
390 WaitForBackendOptions(), rpc_options);
391 WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
392 WaitForBackendOptions(), rpc_options1);
393 WaitForBackend(DEBUG_LOCATION, 2, /*check_status=*/nullptr,
394 WaitForBackendOptions(), rpc_options2);
395 WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
396 WaitForBackendOptions(), rpc_options3);
397 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
398 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options1);
399 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options2);
400 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options3);
401 for (size_t i = 0; i < backends_.size(); ++i) {
402 EXPECT_EQ(100, backends_[i]->backend_service()->request_count());
403 }
404 }
405
406 // Tests that ring hash policy that hashes using a header value and regex
407 // rewrite to aggregate RPCs to 1 backend.
TEST_P(RingHashTest,HeaderHashingWithRegexRewrite)408 TEST_P(RingHashTest, HeaderHashingWithRegexRewrite) {
409 CreateAndStartBackends(4);
410 auto cluster = default_cluster_;
411 cluster.set_lb_policy(Cluster::RING_HASH);
412 balancer_->ads_service()->SetCdsResource(cluster);
413 auto new_route_config = default_route_config_;
414 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
415 auto* hash_policy = route->mutable_route()->add_hash_policy();
416 hash_policy->mutable_header()->set_header_name("address_hash");
417 hash_policy->mutable_header()
418 ->mutable_regex_rewrite()
419 ->mutable_pattern()
420 ->set_regex("[0-9]+");
421 hash_policy->mutable_header()->mutable_regex_rewrite()->set_substitution(
422 "foo");
423 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
424 new_route_config);
425 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
426 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
427 std::vector<std::pair<std::string, std::string>> metadata = {
428 {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
429 std::vector<std::pair<std::string, std::string>> metadata1 = {
430 {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
431 std::vector<std::pair<std::string, std::string>> metadata2 = {
432 {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
433 std::vector<std::pair<std::string, std::string>> metadata3 = {
434 {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
435 const auto rpc_options =
436 RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
437 const auto rpc_options1 =
438 RpcOptions().set_metadata(std::move(metadata1)).set_timeout_ms(5000);
439 const auto rpc_options2 =
440 RpcOptions().set_metadata(std::move(metadata2)).set_timeout_ms(5000);
441 const auto rpc_options3 =
442 RpcOptions().set_metadata(std::move(metadata3)).set_timeout_ms(5000);
443 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
444 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options1);
445 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options2);
446 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options3);
447 bool found = false;
448 for (size_t i = 0; i < backends_.size(); ++i) {
449 if (backends_[i]->backend_service()->request_count() > 0) {
450 EXPECT_EQ(backends_[i]->backend_service()->request_count(), 400)
451 << "backend " << i;
452 EXPECT_FALSE(found) << "backend " << i;
453 found = true;
454 }
455 }
456 EXPECT_TRUE(found);
457 }
458
TEST_P(RingHashTest,HashKeysInEds)459 TEST_P(RingHashTest, HashKeysInEds) {
460 grpc_core::testing::ScopedEnvVar env(
461 "GRPC_XDS_ENDPOINT_HASH_KEY_BACKWARD_COMPAT", "false");
462 CreateAndStartBackends(4);
463 auto cluster = default_cluster_;
464 cluster.set_lb_policy(Cluster::RING_HASH);
465 balancer_->ads_service()->SetCdsResource(cluster);
466 auto new_route_config = default_route_config_;
467 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
468 auto* hash_policy = route->mutable_route()->add_hash_policy();
469 hash_policy->mutable_header()->set_header_name("address_hash");
470 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
471 new_route_config);
472 EdsResourceArgs args(
473 {{"locality0",
474 {
475 CreateEndpoint(0,
476 /*health_status=*/
477 ::envoy::config::core::v3::HealthStatus::UNKNOWN,
478 /*lb_weight=*/1, /*additional_backend_indexes=*/{},
479 /*hostname=*/"",
480 {{"envoy.lb", "{\"hash_key\":\"foo\"}"}}),
481 CreateEndpoint(1,
482 /*health_status=*/
483 ::envoy::config::core::v3::HealthStatus::UNKNOWN,
484 /*lb_weight=*/1, /*additional_backend_indexes=*/{},
485 /*hostname=*/"",
486 {{"envoy.lb", "{\"hash_key\":\"bar\"}"}}),
487 CreateEndpoint(2,
488 /*health_status=*/
489 ::envoy::config::core::v3::HealthStatus::UNKNOWN,
490 /*lb_weight=*/1, /*additional_backend_indexes=*/{},
491 /*hostname=*/"",
492 {{"envoy.lb", "{\"hash_key\":\"baz\"}"}}),
493 CreateEndpoint(3,
494 /*health_status=*/
495 ::envoy::config::core::v3::HealthStatus::UNKNOWN,
496 /*lb_weight=*/1, /*additional_backend_indexes=*/{},
497 /*hostname=*/"",
498 {{"envoy.lb", "{\"hash_key\":\"quux\"}"}}),
499 }}});
500 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
501 // Note each type of RPC will contains a header value that will always be
502 // hashed to a specific backend as the header value matches the value used
503 // to create the entry in the ring.
504 std::vector<std::pair<std::string, std::string>> metadata = {
505 {"address_hash", "foo_0"}};
506 std::vector<std::pair<std::string, std::string>> metadata1 = {
507 {"address_hash", "bar_0"}};
508 std::vector<std::pair<std::string, std::string>> metadata2 = {
509 {"address_hash", "baz_0"}};
510 std::vector<std::pair<std::string, std::string>> metadata3 = {
511 {"address_hash", "quux_0"}};
512 const auto rpc_options =
513 RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
514 const auto rpc_options1 =
515 RpcOptions().set_metadata(std::move(metadata1)).set_timeout_ms(5000);
516 const auto rpc_options2 =
517 RpcOptions().set_metadata(std::move(metadata2)).set_timeout_ms(5000);
518 const auto rpc_options3 =
519 RpcOptions().set_metadata(std::move(metadata3)).set_timeout_ms(5000);
520 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
521 WaitForBackendOptions(), rpc_options);
522 WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
523 WaitForBackendOptions(), rpc_options1);
524 WaitForBackend(DEBUG_LOCATION, 2, /*check_status=*/nullptr,
525 WaitForBackendOptions(), rpc_options2);
526 WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
527 WaitForBackendOptions(), rpc_options3);
528 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
529 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options1);
530 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options2);
531 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options3);
532 for (size_t i = 0; i < backends_.size(); ++i) {
533 EXPECT_EQ(100, backends_[i]->backend_service()->request_count());
534 }
535 }
536
TEST_P(RingHashTest,HashKeysInEdsNotEnabled)537 TEST_P(RingHashTest, HashKeysInEdsNotEnabled) {
538 CreateAndStartBackends(4);
539 auto cluster = default_cluster_;
540 cluster.set_lb_policy(Cluster::RING_HASH);
541 balancer_->ads_service()->SetCdsResource(cluster);
542 auto new_route_config = default_route_config_;
543 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
544 auto* hash_policy = route->mutable_route()->add_hash_policy();
545 hash_policy->mutable_header()->set_header_name("address_hash");
546 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
547 new_route_config);
548 EdsResourceArgs args(
549 {{"locality0",
550 {
551 CreateEndpoint(0,
552 /*health_status=*/
553 ::envoy::config::core::v3::HealthStatus::UNKNOWN,
554 /*lb_weight=*/1, /*additional_backend_indexes=*/{},
555 /*hostname=*/"",
556 {{"envoy.lb", "{\"hash_key\":\"foo\"}"}}),
557 CreateEndpoint(1,
558 /*health_status=*/
559 ::envoy::config::core::v3::HealthStatus::UNKNOWN,
560 /*lb_weight=*/1, /*additional_backend_indexes=*/{},
561 /*hostname=*/"",
562 {{"envoy.lb", "{\"hash_key\":\"bar\"}"}}),
563 CreateEndpoint(2,
564 /*health_status=*/
565 ::envoy::config::core::v3::HealthStatus::UNKNOWN,
566 /*lb_weight=*/1, /*additional_backend_indexes=*/{},
567 /*hostname=*/"",
568 {{"envoy.lb", "{\"hash_key\":\"baz\"}"}}),
569 CreateEndpoint(3,
570 /*health_status=*/
571 ::envoy::config::core::v3::HealthStatus::UNKNOWN,
572 /*lb_weight=*/1, /*additional_backend_indexes=*/{},
573 /*hostname=*/"",
574 {{"envoy.lb", "{\"hash_key\":\"quux\"}"}}),
575 }}});
576 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
577 // Note each type of RPC will contains a header value that will always be
578 // hashed to a specific backend as the header value matches the value used
579 // to create the entry in the ring.
580 std::vector<std::pair<std::string, std::string>> metadata = {
581 {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
582 std::vector<std::pair<std::string, std::string>> metadata1 = {
583 {"address_hash", CreateMetadataValueThatHashesToBackend(1)}};
584 std::vector<std::pair<std::string, std::string>> metadata2 = {
585 {"address_hash", CreateMetadataValueThatHashesToBackend(2)}};
586 std::vector<std::pair<std::string, std::string>> metadata3 = {
587 {"address_hash", CreateMetadataValueThatHashesToBackend(3)}};
588 const auto rpc_options =
589 RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
590 const auto rpc_options1 =
591 RpcOptions().set_metadata(std::move(metadata1)).set_timeout_ms(5000);
592 const auto rpc_options2 =
593 RpcOptions().set_metadata(std::move(metadata2)).set_timeout_ms(5000);
594 const auto rpc_options3 =
595 RpcOptions().set_metadata(std::move(metadata3)).set_timeout_ms(5000);
596 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
597 WaitForBackendOptions(), rpc_options);
598 WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
599 WaitForBackendOptions(), rpc_options1);
600 WaitForBackend(DEBUG_LOCATION, 2, /*check_status=*/nullptr,
601 WaitForBackendOptions(), rpc_options2);
602 WaitForBackend(DEBUG_LOCATION, 3, /*check_status=*/nullptr,
603 WaitForBackendOptions(), rpc_options3);
604 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
605 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options1);
606 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options2);
607 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options3);
608 for (size_t i = 0; i < backends_.size(); ++i) {
609 EXPECT_EQ(100, backends_[i]->backend_service()->request_count());
610 }
611 }
612
613 // Tests that ring hash policy that hashes using a random value.
TEST_P(RingHashTest,NoHashPolicy)614 TEST_P(RingHashTest, NoHashPolicy) {
615 CreateAndStartBackends(2);
616 const double kDistribution50Percent = 0.5;
617 const double kErrorTolerance = 0.05;
618 const uint32_t kRpcTimeoutMs = 10000;
619 const size_t kNumRpcs =
620 ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
621 auto cluster = default_cluster_;
622 // Increasing min ring size for random distribution.
623 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
624 100000);
625 cluster.set_lb_policy(Cluster::RING_HASH);
626 balancer_->ads_service()->SetCdsResource(cluster);
627 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
628 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
629 // TODO(donnadionne): remove extended timeout after ring creation
630 // optimization.
631 WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
632 WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
633 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
634 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
635 const int request_count_1 = backends_[0]->backend_service()->request_count();
636 const int request_count_2 = backends_[1]->backend_service()->request_count();
637 EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
638 ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
639 EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
640 ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
641 }
642
643 // Tests that we observe endpoint weights.
TEST_P(RingHashTest,EndpointWeights)644 TEST_P(RingHashTest, EndpointWeights) {
645 CreateAndStartBackends(3);
646 const double kDistribution50Percent = 0.5;
647 const double kDistribution25Percent = 0.25;
648 const double kErrorTolerance = 0.05;
649 const uint32_t kRpcTimeoutMs = 10000;
650 const size_t kNumRpcs =
651 ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
652 auto cluster = default_cluster_;
653 // Increasing min ring size for random distribution.
654 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
655 100000);
656 cluster.set_lb_policy(Cluster::RING_HASH);
657 balancer_->ads_service()->SetCdsResource(cluster);
658 // Endpoint 0 has weight 0, will be treated as weight 1.
659 // Endpoint 1 has weight 1.
660 // Endpoint 2 has weight 2.
661 EdsResourceArgs args(
662 {{"locality0",
663 {CreateEndpoint(0, ::envoy::config::core::v3::HealthStatus::UNKNOWN, 0),
664 CreateEndpoint(1, ::envoy::config::core::v3::HealthStatus::UNKNOWN, 1),
665 CreateEndpoint(2, ::envoy::config::core::v3::HealthStatus::UNKNOWN,
666 2)}}});
667 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
668 // TODO(donnadionne): remove extended timeout after ring creation
669 // optimization.
670 WaitForAllBackends(DEBUG_LOCATION, 0, 3, /*check_status=*/nullptr,
671 WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
672 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
673 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
674 // Endpoint 2 should see 50% of traffic, and endpoints 0 and 1 should
675 // each see 25% of traffic.
676 const int request_count_0 = backends_[0]->backend_service()->request_count();
677 const int request_count_1 = backends_[1]->backend_service()->request_count();
678 const int request_count_2 = backends_[2]->backend_service()->request_count();
679 EXPECT_THAT(static_cast<double>(request_count_0) / kNumRpcs,
680 ::testing::DoubleNear(kDistribution25Percent, kErrorTolerance));
681 EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
682 ::testing::DoubleNear(kDistribution25Percent, kErrorTolerance));
683 EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
684 ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
685 }
686
687 // Test that ring hash policy evaluation will continue past the terminal
688 // policy if no results are produced yet.
TEST_P(RingHashTest,ContinuesPastTerminalPolicyThatDoesNotProduceResult)689 TEST_P(RingHashTest, ContinuesPastTerminalPolicyThatDoesNotProduceResult) {
690 CreateAndStartBackends(2);
691 auto cluster = default_cluster_;
692 cluster.set_lb_policy(Cluster::RING_HASH);
693 balancer_->ads_service()->SetCdsResource(cluster);
694 auto new_route_config = default_route_config_;
695 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
696 auto* hash_policy = route->mutable_route()->add_hash_policy();
697 hash_policy->mutable_header()->set_header_name("header_not_present");
698 hash_policy->set_terminal(true);
699 auto* hash_policy2 = route->mutable_route()->add_hash_policy();
700 hash_policy2->mutable_header()->set_header_name("address_hash");
701 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
702 new_route_config);
703 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
704 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
705 std::vector<std::pair<std::string, std::string>> metadata = {
706 {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
707 const auto rpc_options =
708 RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
709 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
710 EXPECT_EQ(backends_[0]->backend_service()->request_count(), 100);
711 EXPECT_EQ(backends_[1]->backend_service()->request_count(), 0);
712 }
713
714 // Test random hash is used when header hashing specified a header field that
715 // the RPC did not have.
TEST_P(RingHashTest,HashOnHeaderThatIsNotPresent)716 TEST_P(RingHashTest, HashOnHeaderThatIsNotPresent) {
717 CreateAndStartBackends(2);
718 const double kDistribution50Percent = 0.5;
719 const double kErrorTolerance = 0.05;
720 const uint32_t kRpcTimeoutMs = 10000;
721 const size_t kNumRpcs =
722 ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
723 auto cluster = default_cluster_;
724 // Increasing min ring size for random distribution.
725 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
726 100000);
727 cluster.set_lb_policy(Cluster::RING_HASH);
728 balancer_->ads_service()->SetCdsResource(cluster);
729 auto new_route_config = default_route_config_;
730 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
731 auto* hash_policy = route->mutable_route()->add_hash_policy();
732 hash_policy->mutable_header()->set_header_name("header_not_present");
733 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
734 new_route_config);
735 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
736 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
737 std::vector<std::pair<std::string, std::string>> metadata = {
738 {"unmatched_header", absl::StrFormat("%" PRIu32, rand())},
739 };
740 const auto rpc_options = RpcOptions().set_metadata(std::move(metadata));
741 // TODO(donnadionne): remove extended timeout after ring creation
742 // optimization.
743 WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
744 WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
745 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
746 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs, rpc_options);
747 const int request_count_1 = backends_[0]->backend_service()->request_count();
748 const int request_count_2 = backends_[1]->backend_service()->request_count();
749 EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
750 ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
751 EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
752 ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
753 }
754
755 // Test random hash is used when only unsupported hash policies are
756 // configured.
TEST_P(RingHashTest,UnsupportedHashPolicyDefaultToRandomHashing)757 TEST_P(RingHashTest, UnsupportedHashPolicyDefaultToRandomHashing) {
758 CreateAndStartBackends(2);
759 const double kDistribution50Percent = 0.5;
760 const double kErrorTolerance = 0.05;
761 const uint32_t kRpcTimeoutMs = 10000;
762 const size_t kNumRpcs =
763 ComputeIdealNumRpcs(kDistribution50Percent, kErrorTolerance);
764 auto cluster = default_cluster_;
765 // Increasing min ring size for random distribution.
766 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
767 100000);
768 cluster.set_lb_policy(Cluster::RING_HASH);
769 balancer_->ads_service()->SetCdsResource(cluster);
770 auto new_route_config = default_route_config_;
771 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
772 auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
773 hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
774 auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
775 hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
776 true);
777 auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
778 hash_policy_unsupported_3->mutable_query_parameter()->set_name(
779 "query_parameter");
780 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
781 new_route_config);
782 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
783 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
784 // TODO(donnadionne): remove extended timeout after ring creation
785 // optimization.
786 WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
787 WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
788 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
789 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
790 const int request_count_1 = backends_[0]->backend_service()->request_count();
791 const int request_count_2 = backends_[1]->backend_service()->request_count();
792 EXPECT_THAT(static_cast<double>(request_count_1) / kNumRpcs,
793 ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
794 EXPECT_THAT(static_cast<double>(request_count_2) / kNumRpcs,
795 ::testing::DoubleNear(kDistribution50Percent, kErrorTolerance));
796 }
797
798 // Tests that ring hash policy that hashes using a random value can spread
799 // RPCs across all the backends according to endpoint weight.
TEST_P(RingHashTest,RandomHashingDistributionAccordingToEndpointWeight)800 TEST_P(RingHashTest, RandomHashingDistributionAccordingToEndpointWeight) {
801 CreateAndStartBackends(2);
802 const size_t kWeight1 = 1;
803 const size_t kWeight2 = 2;
804 const size_t kWeightTotal = kWeight1 + kWeight2;
805 const double kWeight33Percent = static_cast<double>(kWeight1) / kWeightTotal;
806 const double kWeight66Percent = static_cast<double>(kWeight2) / kWeightTotal;
807 const double kErrorTolerance = 0.05;
808 const uint32_t kRpcTimeoutMs = 10000;
809 const size_t kNumRpcs =
810 ComputeIdealNumRpcs(kWeight33Percent, kErrorTolerance);
811 auto cluster = default_cluster_;
812 // Increasing min ring size for random distribution.
813 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
814 100000);
815 cluster.set_lb_policy(Cluster::RING_HASH);
816 balancer_->ads_service()->SetCdsResource(cluster);
817 EdsResourceArgs args({{"locality0",
818 {CreateEndpoint(0, HealthStatus::UNKNOWN, 1),
819 CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}}});
820 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
821 // TODO(donnadionne): remove extended timeout after ring creation
822 // optimization.
823 WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
824 WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
825 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
826 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
827 const int weight_33_request_count =
828 backends_[0]->backend_service()->request_count();
829 const int weight_66_request_count =
830 backends_[1]->backend_service()->request_count();
831 EXPECT_THAT(static_cast<double>(weight_33_request_count) / kNumRpcs,
832 ::testing::DoubleNear(kWeight33Percent, kErrorTolerance));
833 EXPECT_THAT(static_cast<double>(weight_66_request_count) / kNumRpcs,
834 ::testing::DoubleNear(kWeight66Percent, kErrorTolerance));
835 }
836
837 // Tests that ring hash policy that hashes using a random value can spread
838 // RPCs across all the backends according to locality and endpoint weight.
TEST_P(RingHashTest,RandomHashingDistributionAccordingToLocalityAndEndpointWeight)839 TEST_P(RingHashTest,
840 RandomHashingDistributionAccordingToLocalityAndEndpointWeight) {
841 CreateAndStartBackends(2);
842 const size_t kWeight1 = 1 * 1;
843 const size_t kWeight2 = 2 * 2;
844 const size_t kWeightTotal = kWeight1 + kWeight2;
845 const double kWeight20Percent = static_cast<double>(kWeight1) / kWeightTotal;
846 const double kWeight80Percent = static_cast<double>(kWeight2) / kWeightTotal;
847 const double kErrorTolerance = 0.05;
848 const uint32_t kRpcTimeoutMs = 10000;
849 const size_t kNumRpcs =
850 ComputeIdealNumRpcs(kWeight20Percent, kErrorTolerance);
851 auto cluster = default_cluster_;
852 // Increasing min ring size for random distribution.
853 cluster.mutable_ring_hash_lb_config()->mutable_minimum_ring_size()->set_value(
854 100000);
855 cluster.set_lb_policy(Cluster::RING_HASH);
856 balancer_->ads_service()->SetCdsResource(cluster);
857 EdsResourceArgs args(
858 {{"locality0", {CreateEndpoint(0, HealthStatus::UNKNOWN, 1)}, 1},
859 {"locality1", {CreateEndpoint(1, HealthStatus::UNKNOWN, 2)}, 2}});
860 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
861 // TODO(donnadionne): remove extended timeout after ring creation
862 // optimization.
863 WaitForAllBackends(DEBUG_LOCATION, 0, 2, /*check_status=*/nullptr,
864 WaitForBackendOptions().set_timeout_ms(kRpcTimeoutMs),
865 RpcOptions().set_timeout_ms(kRpcTimeoutMs));
866 CheckRpcSendOk(DEBUG_LOCATION, kNumRpcs);
867 const int weight_20_request_count =
868 backends_[0]->backend_service()->request_count();
869 const int weight_80_request_count =
870 backends_[1]->backend_service()->request_count();
871 EXPECT_THAT(static_cast<double>(weight_20_request_count) / kNumRpcs,
872 ::testing::DoubleNear(kWeight20Percent, kErrorTolerance));
873 EXPECT_THAT(static_cast<double>(weight_80_request_count) / kNumRpcs,
874 ::testing::DoubleNear(kWeight80Percent, kErrorTolerance));
875 }
876
877 // Tests that ring hash policy that hashes using a fixed string ensures all
878 // RPCs to go 1 particular backend; and that subsequent hashing policies are
879 // ignored due to the setting of terminal.
TEST_P(RingHashTest,FixedHashingTerminalPolicy)880 TEST_P(RingHashTest, FixedHashingTerminalPolicy) {
881 CreateAndStartBackends(2);
882 auto cluster = default_cluster_;
883 cluster.set_lb_policy(Cluster::RING_HASH);
884 balancer_->ads_service()->SetCdsResource(cluster);
885 auto new_route_config = default_route_config_;
886 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
887 auto* hash_policy = route->mutable_route()->add_hash_policy();
888 hash_policy->mutable_header()->set_header_name("fixed_string");
889 hash_policy->set_terminal(true);
890 auto* hash_policy_to_be_ignored = route->mutable_route()->add_hash_policy();
891 hash_policy_to_be_ignored->mutable_header()->set_header_name("random_string");
892 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
893 new_route_config);
894 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
895 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
896 std::vector<std::pair<std::string, std::string>> metadata = {
897 {"fixed_string", "fixed_value"},
898 {"random_string", absl::StrFormat("%" PRIu32, rand())},
899 };
900 const auto rpc_options =
901 RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
902 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
903 bool found = false;
904 for (size_t i = 0; i < backends_.size(); ++i) {
905 if (backends_[i]->backend_service()->request_count() > 0) {
906 EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
907 << "backend " << i;
908 EXPECT_FALSE(found) << "backend " << i;
909 found = true;
910 }
911 }
912 EXPECT_TRUE(found);
913 }
914
915 // Test that the channel will go from idle to ready via connecting;
916 // (tho it is not possible to catch the connecting state before moving to
917 // ready)
TEST_P(RingHashTest,IdleToReady)918 TEST_P(RingHashTest, IdleToReady) {
919 CreateAndStartBackends(1);
920 auto cluster = default_cluster_;
921 cluster.set_lb_policy(Cluster::RING_HASH);
922 balancer_->ads_service()->SetCdsResource(cluster);
923 auto new_route_config = default_route_config_;
924 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
925 auto* hash_policy = route->mutable_route()->add_hash_policy();
926 hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
927 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
928 new_route_config);
929 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
930 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
931 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
932 CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_timeout_ms(5000));
933 EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
934 }
935
936 // Test that the channel will transition to READY once it starts
937 // connecting even if there are no RPCs being sent to the picker.
TEST_P(RingHashTest,ContinuesConnectingWithoutPicks)938 TEST_P(RingHashTest, ContinuesConnectingWithoutPicks) {
939 // Create EDS resource.
940 CreateAndStartBackends(1);
941 auto non_existent_endpoint = MakeNonExistentEndpoint();
942 EdsResourceArgs args(
943 {{"locality0", {non_existent_endpoint, CreateEndpoint(0)}}});
944 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
945 // Change CDS resource to use RING_HASH.
946 auto cluster = default_cluster_;
947 cluster.set_lb_policy(Cluster::RING_HASH);
948 balancer_->ads_service()->SetCdsResource(cluster);
949 // Add hash policy to RDS resource.
950 auto new_route_config = default_route_config_;
951 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
952 auto* hash_policy = route->mutable_route()->add_hash_policy();
953 hash_policy->mutable_header()->set_header_name("address_hash");
954 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
955 new_route_config);
956 // Start connection attempt injector and add a hold for the P0
957 // connection attempt.
958 ConnectionAttemptInjector injector;
959 auto hold = injector.AddHold(non_existent_endpoint.port);
960 // A long-running RPC, just used to send the RPC in another thread.
961 LongRunningRpc rpc;
962 std::vector<std::pair<std::string, std::string>> metadata = {
963 {"address_hash",
964 CreateMetadataValueThatHashesToBackendPort(non_existent_endpoint.port)}};
965 rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
966 std::move(metadata)));
967 // Wait for the RPC to trigger the P0 connection attempt, then cancel it,
968 // and then allow the connection attempt to complete.
969 hold->Wait();
970 rpc.CancelRpc();
971 EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
972 hold->Resume();
973 // Wait for channel to become connected without any pending RPC.
974 EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(5)));
975 // Make sure the backend did not get any requests.
976 EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
977 }
978
979 // Tests that when we trigger internal connection attempts without
980 // picks, we do so for only one subchannel at a time.
TEST_P(RingHashTest,ContinuesConnectingWithoutPicksOneSubchannelAtATime)981 TEST_P(RingHashTest, ContinuesConnectingWithoutPicksOneSubchannelAtATime) {
982 // Create EDS resource.
983 CreateAndStartBackends(1);
984 auto non_existent_endpoint0 = MakeNonExistentEndpoint();
985 auto non_existent_endpoint1 = MakeNonExistentEndpoint();
986 auto non_existent_endpoint2 = MakeNonExistentEndpoint();
987 EdsResourceArgs args({{"locality0",
988 {non_existent_endpoint0, non_existent_endpoint1,
989 non_existent_endpoint2, CreateEndpoint(0)}}});
990 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
991 // Change CDS resource to use RING_HASH.
992 auto cluster = default_cluster_;
993 cluster.set_lb_policy(Cluster::RING_HASH);
994 balancer_->ads_service()->SetCdsResource(cluster);
995 // Add hash policy to RDS resource.
996 auto new_route_config = default_route_config_;
997 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
998 auto* hash_policy = route->mutable_route()->add_hash_policy();
999 hash_policy->mutable_header()->set_header_name("address_hash");
1000 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1001 new_route_config);
1002 // Start connection attempt injector.
1003 ConnectionAttemptInjector injector;
1004 auto hold_non_existent0 = injector.AddHold(non_existent_endpoint0.port);
1005 auto hold_non_existent1 = injector.AddHold(non_existent_endpoint1.port);
1006 auto hold_non_existent2 = injector.AddHold(non_existent_endpoint2.port);
1007 auto hold_good = injector.AddHold(backends_[0]->port());
1008 // A long-running RPC, just used to send the RPC in another thread.
1009 LongRunningRpc rpc;
1010 std::vector<std::pair<std::string, std::string>> metadata = {
1011 {"address_hash", CreateMetadataValueThatHashesToBackendPort(
1012 non_existent_endpoint0.port)}};
1013 rpc.StartRpc(stub_.get(), RpcOptions().set_timeout_ms(0).set_metadata(
1014 std::move(metadata)));
1015 // Wait for the RPC to trigger a connection attempt to the first address,
1016 // then cancel the RPC. No other connection attempts should be started yet.
1017 hold_non_existent0->Wait();
1018 rpc.CancelRpc();
1019 EXPECT_FALSE(hold_non_existent1->IsStarted());
1020 EXPECT_FALSE(hold_non_existent2->IsStarted());
1021 EXPECT_FALSE(hold_good->IsStarted());
1022 // Allow the connection attempt to the first address to resume and wait
1023 // for the attempt for the second address. No other connection
1024 // attempts should be started yet.
1025 auto hold_non_existent0_again = injector.AddHold(non_existent_endpoint0.port);
1026 hold_non_existent0->Resume();
1027 hold_non_existent1->Wait();
1028 EXPECT_FALSE(hold_non_existent0_again->IsStarted());
1029 EXPECT_FALSE(hold_non_existent2->IsStarted());
1030 EXPECT_FALSE(hold_good->IsStarted());
1031 // Allow the connection attempt to the second address to resume and wait
1032 // for the attempt for the third address. No other connection
1033 // attempts should be started yet.
1034 auto hold_non_existent1_again = injector.AddHold(non_existent_endpoint1.port);
1035 hold_non_existent1->Resume();
1036 hold_non_existent2->Wait();
1037 EXPECT_FALSE(hold_non_existent0_again->IsStarted());
1038 EXPECT_FALSE(hold_non_existent1_again->IsStarted());
1039 EXPECT_FALSE(hold_good->IsStarted());
1040 // Allow the connection attempt to the third address to resume and wait
1041 // for the attempt for the final address. No other connection
1042 // attempts should be started yet.
1043 auto hold_non_existent2_again = injector.AddHold(non_existent_endpoint2.port);
1044 hold_non_existent2->Resume();
1045 hold_good->Wait();
1046 EXPECT_FALSE(hold_non_existent0_again->IsStarted());
1047 EXPECT_FALSE(hold_non_existent1_again->IsStarted());
1048 EXPECT_FALSE(hold_non_existent2_again->IsStarted());
1049 // Allow the final attempt to resume.
1050 hold_good->Resume();
1051 // Wait for channel to become connected without any pending RPC.
1052 EXPECT_TRUE(channel_->WaitForConnected(grpc_timeout_seconds_to_deadline(10)));
1053 // No other connection attempts should have been started.
1054 EXPECT_FALSE(hold_non_existent0_again->IsStarted());
1055 EXPECT_FALSE(hold_non_existent1_again->IsStarted());
1056 EXPECT_FALSE(hold_non_existent2_again->IsStarted());
1057 // RPC should have been cancelled.
1058 EXPECT_EQ(StatusCode::CANCELLED, rpc.GetStatus().error_code());
1059 // Make sure the backend did not get any requests.
1060 EXPECT_EQ(0UL, backends_[0]->backend_service()->request_count());
1061 }
1062
1063 // Test that when the first pick is down leading to a transient failure, we
1064 // will move on to the next ring hash entry.
TEST_P(RingHashTest,TransientFailureCheckNextOne)1065 TEST_P(RingHashTest, TransientFailureCheckNextOne) {
1066 CreateAndStartBackends(1);
1067 auto cluster = default_cluster_;
1068 cluster.set_lb_policy(Cluster::RING_HASH);
1069 balancer_->ads_service()->SetCdsResource(cluster);
1070 auto new_route_config = default_route_config_;
1071 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1072 auto* hash_policy = route->mutable_route()->add_hash_policy();
1073 hash_policy->mutable_header()->set_header_name("address_hash");
1074 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1075 new_route_config);
1076 std::vector<EdsResourceArgs::Endpoint> endpoints;
1077 const int unused_port = grpc_pick_unused_port_or_die();
1078 endpoints.emplace_back(unused_port);
1079 endpoints.emplace_back(backends_[0]->port());
1080 EdsResourceArgs args({{"locality0", std::move(endpoints)}});
1081 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1082 std::vector<std::pair<std::string, std::string>> metadata = {
1083 {"address_hash",
1084 CreateMetadataValueThatHashesToBackendPort(unused_port)}};
1085 const auto rpc_options =
1086 RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
1087 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1088 WaitForBackendOptions(), rpc_options);
1089 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
1090 }
1091
1092 // Test that when a backend goes down, we will move on to the next subchannel
1093 // (with a lower priority). When the backend comes back up, traffic will move
1094 // back.
TEST_P(RingHashTest,SwitchToLowerPriorityAndThenBack)1095 TEST_P(RingHashTest, SwitchToLowerPriorityAndThenBack) {
1096 CreateAndStartBackends(2);
1097 auto cluster = default_cluster_;
1098 cluster.set_lb_policy(Cluster::RING_HASH);
1099 balancer_->ads_service()->SetCdsResource(cluster);
1100 auto new_route_config = default_route_config_;
1101 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1102 auto* hash_policy = route->mutable_route()->add_hash_policy();
1103 hash_policy->mutable_header()->set_header_name("address_hash");
1104 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1105 new_route_config);
1106 EdsResourceArgs args({
1107 {"locality0", CreateEndpointsForBackends(0, 1), kDefaultLocalityWeight,
1108 0},
1109 {"locality1", CreateEndpointsForBackends(1, 2), kDefaultLocalityWeight,
1110 1},
1111 });
1112 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1113 std::vector<std::pair<std::string, std::string>> metadata = {
1114 {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
1115 const auto rpc_options =
1116 RpcOptions().set_metadata(std::move(metadata)).set_timeout_ms(5000);
1117 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1118 WaitForBackendOptions(), rpc_options);
1119 backends_[0]->StopListeningAndSendGoaways();
1120 WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
1121 WaitForBackendOptions(), rpc_options);
1122 ShutdownBackend(0);
1123 StartBackend(0);
1124 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1125 WaitForBackendOptions(), rpc_options);
1126 CheckRpcSendOk(DEBUG_LOCATION, 100, rpc_options);
1127 EXPECT_EQ(100, backends_[0]->backend_service()->request_count());
1128 EXPECT_EQ(0, backends_[1]->backend_service()->request_count());
1129 }
1130
1131 // Test that when all backends are down, we will keep reattempting.
TEST_P(RingHashTest,ReattemptWhenAllEndpointsUnreachable)1132 TEST_P(RingHashTest, ReattemptWhenAllEndpointsUnreachable) {
1133 CreateAndStartBackends(1);
1134 const uint32_t kConnectionTimeoutMilliseconds = 5000;
1135 auto cluster = default_cluster_;
1136 cluster.set_lb_policy(Cluster::RING_HASH);
1137 balancer_->ads_service()->SetCdsResource(cluster);
1138 auto new_route_config = default_route_config_;
1139 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1140 auto* hash_policy = route->mutable_route()->add_hash_policy();
1141 hash_policy->mutable_header()->set_header_name("address_hash");
1142 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1143 new_route_config);
1144 EdsResourceArgs args(
1145 {{"locality0", {MakeNonExistentEndpoint(), CreateEndpoint(0)}}});
1146 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1147 std::vector<std::pair<std::string, std::string>> metadata = {
1148 {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
1149 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1150 ShutdownBackend(0);
1151 CheckRpcSendFailure(
1152 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1153 MakeConnectionFailureRegex(
1154 "ring hash cannot find a connected endpoint; first failure: "),
1155 RpcOptions().set_metadata(std::move(metadata)));
1156 StartBackend(0);
1157 // Ensure we are actively connecting without any traffic.
1158 EXPECT_TRUE(channel_->WaitForConnected(
1159 grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1160 }
1161
1162 // Test that when all backends are down and then up, we may pick a TF backend
1163 // and we will then jump to ready backend.
TEST_P(RingHashTest,TransientFailureSkipToAvailableReady)1164 TEST_P(RingHashTest, TransientFailureSkipToAvailableReady) {
1165 CreateBackends(2);
1166 const uint32_t kConnectionTimeoutMilliseconds = 5000;
1167 auto cluster = default_cluster_;
1168 cluster.set_lb_policy(Cluster::RING_HASH);
1169 balancer_->ads_service()->SetCdsResource(cluster);
1170 auto new_route_config = default_route_config_;
1171 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1172 auto* hash_policy = route->mutable_route()->add_hash_policy();
1173 hash_policy->mutable_header()->set_header_name("address_hash");
1174 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1175 new_route_config);
1176 // Make sure we include some unused ports to fill the ring.
1177 EdsResourceArgs args({
1178 {"locality0",
1179 {CreateEndpoint(0), CreateEndpoint(1), MakeNonExistentEndpoint(),
1180 MakeNonExistentEndpoint()}},
1181 });
1182 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1183 std::vector<std::pair<std::string, std::string>> metadata = {
1184 {"address_hash", CreateMetadataValueThatHashesToBackend(0)}};
1185 const auto rpc_options = RpcOptions()
1186 .set_metadata(std::move(metadata))
1187 .set_timeout_ms(kConnectionTimeoutMilliseconds);
1188 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1189 LOG(INFO) << "=== SENDING FIRST RPC ===";
1190 CheckRpcSendFailure(
1191 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1192 MakeConnectionFailureRegex(
1193 "ring hash cannot find a connected endpoint; first failure: "),
1194 rpc_options);
1195 LOG(INFO) << "=== DONE WITH FIRST RPC ===";
1196 EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
1197 // Bring up backend 0. The channel should become connected without
1198 // any picks, because in TF, we are always trying to connect to at
1199 // least one backend at all times.
1200 LOG(INFO) << "=== STARTING BACKEND 0 ===";
1201 StartBackend(0);
1202 LOG(INFO) << "=== WAITING FOR CHANNEL TO BECOME READY ===";
1203 EXPECT_TRUE(channel_->WaitForConnected(
1204 grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1205 // RPCs should go to backend 0.
1206 LOG(INFO) << "=== WAITING FOR BACKEND 0 ===";
1207 WaitForBackend(DEBUG_LOCATION, 0, /*check_status=*/nullptr,
1208 WaitForBackendOptions(), rpc_options);
1209 EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
1210 // Bring down backend 0 and bring up backend 1.
1211 // Note the RPC contains a header value that will always be hashed to
1212 // backend 0. So by purposely bringing down backend 0 and bringing up another
1213 // backend, this will ensure Picker's first choice of backend 0 will fail
1214 // and it will go through the remaining subchannels to find one in READY.
1215 // Since the the entries in the ring are pretty distributed and we have
1216 // unused ports to fill the ring, it is almost guaranteed that the Picker
1217 // will go through some non-READY entries and skip them as per design.
1218 LOG(INFO) << "=== SHUTTING DOWN BACKEND 0 ===";
1219 ShutdownBackend(0);
1220 LOG(INFO) << "=== WAITING FOR STATE CHANGE ===";
1221 EXPECT_TRUE(channel_->WaitForStateChange(
1222 GRPC_CHANNEL_READY,
1223 grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1224 EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
1225 LOG(INFO) << "=== SENDING SECOND RPC ===";
1226 CheckRpcSendFailure(
1227 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1228 MakeConnectionFailureRegex(
1229 "ring hash cannot find a connected endpoint; first failure: "),
1230 rpc_options);
1231 LOG(INFO) << "=== STARTING BACKEND 1 ===";
1232 StartBackend(1);
1233 LOG(INFO) << "=== WAITING FOR CHANNEL TO BECOME READY ===";
1234 EXPECT_TRUE(channel_->WaitForConnected(
1235 grpc_timeout_milliseconds_to_deadline(kConnectionTimeoutMilliseconds)));
1236 LOG(INFO) << "=== WAITING FOR BACKEND 1 ===";
1237 WaitForBackend(DEBUG_LOCATION, 1, /*check_status=*/nullptr,
1238 WaitForBackendOptions(), rpc_options);
1239 LOG(INFO) << "=== DONE ===";
1240 }
1241
1242 // This tests a bug seen in the wild where ring_hash started with no
1243 // endpoints and reported TRANSIENT_FAILURE, then got an update with
1244 // endpoints and reported IDLE, but the picker update was squelched, so
1245 // it failed to ever get reconnected.
TEST_P(RingHashTest,ReattemptWhenGoingFromTransientFailureToIdle)1246 TEST_P(RingHashTest, ReattemptWhenGoingFromTransientFailureToIdle) {
1247 CreateAndStartBackends(1);
1248 const uint32_t kConnectionTimeoutMilliseconds = 5000;
1249 auto cluster = default_cluster_;
1250 cluster.set_lb_policy(Cluster::RING_HASH);
1251 balancer_->ads_service()->SetCdsResource(cluster);
1252 auto new_route_config = default_route_config_;
1253 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1254 new_route_config);
1255 // Send empty EDS update.
1256 EdsResourceArgs args(
1257 {{"locality0", std::vector<EdsResourceArgs::Endpoint>()}});
1258 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1259 EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1260 // Channel should fail RPCs and go into TRANSIENT_FAILURE.
1261 CheckRpcSendFailure(
1262 DEBUG_LOCATION, StatusCode::UNAVAILABLE,
1263 "empty address list: EDS resource eds_service_name: contains empty "
1264 "localities: \\[\\{region=\"xds_default_locality_region\", "
1265 "zone=\"xds_default_locality_zone\", sub_zone=\"locality0\"\\}\\]",
1266 RpcOptions().set_timeout_ms(kConnectionTimeoutMilliseconds));
1267 EXPECT_EQ(GRPC_CHANNEL_TRANSIENT_FAILURE, channel_->GetState(false));
1268 // Send EDS update with 1 backend.
1269 args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends()}});
1270 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1271 // A wait_for_ready RPC should succeed, and the channel should report READY.
1272 CheckRpcSendOk(DEBUG_LOCATION, 1,
1273 RpcOptions()
1274 .set_timeout_ms(kConnectionTimeoutMilliseconds)
1275 .set_wait_for_ready(true));
1276 EXPECT_EQ(GRPC_CHANNEL_READY, channel_->GetState(false));
1277 }
1278
1279 // Test unsupported hash policy types are all ignored before a supported
1280 // policy.
TEST_P(RingHashTest,UnsupportedHashPolicyUntilChannelIdHashing)1281 TEST_P(RingHashTest, UnsupportedHashPolicyUntilChannelIdHashing) {
1282 CreateAndStartBackends(2);
1283 auto cluster = default_cluster_;
1284 cluster.set_lb_policy(Cluster::RING_HASH);
1285 balancer_->ads_service()->SetCdsResource(cluster);
1286 auto new_route_config = default_route_config_;
1287 auto* route = new_route_config.mutable_virtual_hosts(0)->mutable_routes(0);
1288 auto* hash_policy_unsupported_1 = route->mutable_route()->add_hash_policy();
1289 hash_policy_unsupported_1->mutable_cookie()->set_name("cookie");
1290 auto* hash_policy_unsupported_2 = route->mutable_route()->add_hash_policy();
1291 hash_policy_unsupported_2->mutable_connection_properties()->set_source_ip(
1292 true);
1293 auto* hash_policy_unsupported_3 = route->mutable_route()->add_hash_policy();
1294 hash_policy_unsupported_3->mutable_query_parameter()->set_name(
1295 "query_parameter");
1296 auto* hash_policy = route->mutable_route()->add_hash_policy();
1297 hash_policy->mutable_filter_state()->set_key("io.grpc.channel_id");
1298 SetListenerAndRouteConfiguration(balancer_.get(), default_listener_,
1299 new_route_config);
1300 EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}});
1301 balancer_->ads_service()->SetEdsResource(BuildEdsResource(args));
1302 CheckRpcSendOk(DEBUG_LOCATION, 100, RpcOptions().set_timeout_ms(5000));
1303 bool found = false;
1304 for (size_t i = 0; i < backends_.size(); ++i) {
1305 if (backends_[i]->backend_service()->request_count() > 0) {
1306 EXPECT_EQ(backends_[i]->backend_service()->request_count(), 100)
1307 << "backend " << i;
1308 EXPECT_FALSE(found) << "backend " << i;
1309 found = true;
1310 }
1311 }
1312 EXPECT_TRUE(found);
1313 }
1314
1315 } // namespace
1316 } // namespace testing
1317 } // namespace grpc
1318
main(int argc,char ** argv)1319 int main(int argc, char** argv) {
1320 grpc::testing::TestEnvironment env(&argc, argv);
1321 ::testing::InitGoogleTest(&argc, argv);
1322 // Make the backup poller poll very frequently in order to pick up
1323 // updates from all the subchannels's FDs.
1324 grpc_core::ConfigVars::Overrides overrides;
1325 overrides.client_channel_backup_poll_interval_ms = 1;
1326 grpc_core::ConfigVars::SetOverrides(overrides);
1327 #if TARGET_OS_IPHONE
1328 // Workaround Apple CFStream bug
1329 grpc_core::SetEnv("grpc_cfstream", "0");
1330 #endif
1331 grpc_init();
1332 grpc::testing::ConnectionAttemptInjector::Init();
1333 const auto result = RUN_ALL_TESTS();
1334 grpc_shutdown();
1335 return result;
1336 }
1337