1 //
2 // Copyright 2023 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/load_balancing/ring_hash/ring_hash.h"
18
19 #include <grpc/grpc.h>
20 #include <grpc/support/json.h>
21 #include <stdint.h>
22
23 #include <algorithm>
24 #include <array>
25 #include <memory>
26 #include <string>
27 #include <vector>
28
29 #include "absl/status/status.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/strings/strip.h"
33 #include "absl/types/optional.h"
34 #include "gtest/gtest.h"
35 #include "src/core/load_balancing/lb_policy.h"
36 #include "src/core/resolver/endpoint_addresses.h"
37 #include "src/core/util/json/json.h"
38 #include "src/core/util/ref_counted_ptr.h"
39 #include "src/core/util/xxhash_inline.h"
40 #include "test/core/load_balancing/lb_policy_test_lib.h"
41 #include "test/core/test_util/scoped_env_var.h"
42 #include "test/core/test_util/test_config.h"
43
44 namespace grpc_core {
45 namespace testing {
46 namespace {
47
48 // TODO(roth): I created this file when I fixed a bug and wrote only a
49 // very basic test and the test needed for that bug. When we have time,
50 // we need a lot more tests here to cover all of the policy's functionality.
51
52 class RingHashTest : public LoadBalancingPolicyTest {
53 protected:
RingHashTest()54 RingHashTest() : LoadBalancingPolicyTest("ring_hash_experimental") {}
55
MakeRingHashConfig(int min_ring_size=0,int max_ring_size=0,const std::string & request_hash_header="")56 static RefCountedPtr<LoadBalancingPolicy::Config> MakeRingHashConfig(
57 int min_ring_size = 0, int max_ring_size = 0,
58 const std::string& request_hash_header = "") {
59 Json::Object fields;
60 if (min_ring_size > 0) {
61 fields["minRingSize"] = Json::FromString(absl::StrCat(min_ring_size));
62 }
63 if (max_ring_size > 0) {
64 fields["maxRingSize"] = Json::FromString(absl::StrCat(max_ring_size));
65 }
66 if (!request_hash_header.empty()) {
67 fields["requestHashHeader"] = Json::FromString(request_hash_header);
68 }
69 return MakeConfig(Json::FromArray({Json::FromObject(
70 {{"ring_hash_experimental", Json::FromObject(fields)}})}));
71 }
72
MakeHashAttributeForString(absl::string_view key)73 RequestHashAttribute* MakeHashAttributeForString(absl::string_view key) {
74 std::string key_str = absl::StrCat(key, "_0");
75 uint64_t hash = XXH64(key_str.data(), key_str.size(), 0);
76 attribute_storage_.emplace_back(
77 std::make_unique<RequestHashAttribute>(hash));
78 return attribute_storage_.back().get();
79 }
80
MakeHashAttribute(absl::string_view address)81 RequestHashAttribute* MakeHashAttribute(absl::string_view address) {
82 return MakeHashAttributeForString(absl::StripPrefix(address, "ipv4:"));
83 }
84
85 std::vector<std::unique_ptr<RequestHashAttribute>> attribute_storage_;
86 };
87
TEST_F(RingHashTest,Basic)88 TEST_F(RingHashTest, Basic) {
89 const std::array<absl::string_view, 3> kAddresses = {
90 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
91 EXPECT_EQ(
92 ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig()), lb_policy()),
93 absl::OkStatus());
94 auto picker = ExpectState(GRPC_CHANNEL_IDLE);
95 auto* address0_attribute = MakeHashAttribute(kAddresses[0]);
96 ExpectPickQueued(picker.get(), {address0_attribute});
97 WaitForWorkSerializerToFlush();
98 WaitForWorkSerializerToFlush();
99 auto* subchannel = FindSubchannel(kAddresses[0]);
100 ASSERT_NE(subchannel, nullptr);
101 EXPECT_TRUE(subchannel->ConnectionRequested());
102 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
103 picker = ExpectState(GRPC_CHANNEL_CONNECTING);
104 ExpectPickQueued(picker.get(), {address0_attribute});
105 EXPECT_EQ(nullptr, FindSubchannel(kAddresses[1]));
106 EXPECT_EQ(nullptr, FindSubchannel(kAddresses[2]));
107 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
108 picker = ExpectState(GRPC_CHANNEL_READY);
109 auto address = ExpectPickComplete(picker.get(), {address0_attribute});
110 EXPECT_EQ(address, kAddresses[0]);
111 }
112
TEST_F(RingHashTest,SameAddressListedMultipleTimes)113 TEST_F(RingHashTest, SameAddressListedMultipleTimes) {
114 const std::array<absl::string_view, 3> kAddresses = {
115 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:441"};
116 EXPECT_EQ(
117 ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig()), lb_policy()),
118 absl::OkStatus());
119 auto picker = ExpectState(GRPC_CHANNEL_IDLE);
120 auto* address0_attribute = MakeHashAttribute(kAddresses[0]);
121 ExpectPickQueued(picker.get(), {address0_attribute});
122 WaitForWorkSerializerToFlush();
123 WaitForWorkSerializerToFlush();
124 auto* subchannel = FindSubchannel(kAddresses[0]);
125 ASSERT_NE(subchannel, nullptr);
126 EXPECT_TRUE(subchannel->ConnectionRequested());
127 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
128 picker = ExpectState(GRPC_CHANNEL_CONNECTING);
129 ExpectPickQueued(picker.get(), {address0_attribute});
130 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
131 picker = ExpectState(GRPC_CHANNEL_READY);
132 auto address = ExpectPickComplete(picker.get(), {address0_attribute});
133 EXPECT_EQ(address, kAddresses[0]);
134 }
135
TEST_F(RingHashTest,MultipleAddressesPerEndpoint)136 TEST_F(RingHashTest, MultipleAddressesPerEndpoint) {
137 constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
138 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
139 constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
140 "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
141 const std::array<EndpointAddresses, 2> kEndpoints = {
142 MakeEndpointAddresses(kEndpoint1Addresses),
143 MakeEndpointAddresses(kEndpoint2Addresses)};
144 EXPECT_EQ(
145 ApplyUpdate(BuildUpdate(kEndpoints, MakeRingHashConfig()), lb_policy()),
146 absl::OkStatus());
147 auto picker = ExpectState(GRPC_CHANNEL_IDLE);
148 // Normal connection to first address of the first endpoint.
149 auto* address0_attribute = MakeHashAttribute(kEndpoint1Addresses[0]);
150 ExpectPickQueued(picker.get(), {address0_attribute});
151 WaitForWorkSerializerToFlush();
152 WaitForWorkSerializerToFlush();
153 auto* subchannel = FindSubchannel(kEndpoint1Addresses[0]);
154 ASSERT_NE(subchannel, nullptr);
155 EXPECT_TRUE(subchannel->ConnectionRequested());
156 auto* subchannel2 = FindSubchannel(kEndpoint1Addresses[1]);
157 ASSERT_NE(subchannel2, nullptr);
158 EXPECT_FALSE(subchannel2->ConnectionRequested());
159 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
160 picker = ExpectState(GRPC_CHANNEL_CONNECTING);
161 ExpectPickQueued(picker.get(), {address0_attribute});
162 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
163 picker = ExpectState(GRPC_CHANNEL_READY);
164 auto address = ExpectPickComplete(picker.get(), {address0_attribute});
165 EXPECT_EQ(address, kEndpoint1Addresses[0]);
166 // Now that connection fails.
167 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
168 ExpectReresolutionRequest();
169 picker = ExpectState(GRPC_CHANNEL_IDLE);
170 EXPECT_FALSE(subchannel->ConnectionRequested());
171 EXPECT_FALSE(subchannel2->ConnectionRequested());
172 // The LB policy will try to reconnect when it gets another pick.
173 ExpectPickQueued(picker.get(), {address0_attribute});
174 WaitForWorkSerializerToFlush();
175 WaitForWorkSerializerToFlush();
176 EXPECT_TRUE(subchannel->ConnectionRequested());
177 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
178 picker = ExpectState(GRPC_CHANNEL_CONNECTING);
179 ExpectPickQueued(picker.get(), {address0_attribute});
180 // The connection attempt fails.
181 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
182 absl::UnavailableError("ugh"));
183 // The PF child policy will try to connect to the second address for the
184 // endpoint.
185 EXPECT_TRUE(subchannel2->ConnectionRequested());
186 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
187 picker = ExpectState(GRPC_CHANNEL_CONNECTING);
188 ExpectPickQueued(picker.get(), {address0_attribute});
189 subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
190 picker = ExpectState(GRPC_CHANNEL_READY);
191 address = ExpectPickComplete(picker.get(), {address0_attribute});
192 EXPECT_EQ(address, kEndpoint1Addresses[1]);
193 }
194
TEST_F(RingHashTest,EndpointHashKeys)195 TEST_F(RingHashTest, EndpointHashKeys) {
196 const std::array<absl::string_view, 3> kAddresses = {
197 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
198 const std::array<absl::string_view, 3> kHashKeys = {"foo", "bar", "baz"};
199 std::vector<EndpointAddresses> endpoints;
200 for (size_t i = 0; i < 3; ++i) {
201 endpoints.push_back(MakeEndpointAddresses(
202 {kAddresses[i]},
203 ChannelArgs().Set(GRPC_ARG_RING_HASH_ENDPOINT_HASH_KEY, kHashKeys[i])));
204 };
205 EXPECT_EQ(
206 ApplyUpdate(BuildUpdate(endpoints, MakeRingHashConfig()), lb_policy()),
207 absl::OkStatus());
208 auto picker = ExpectState(GRPC_CHANNEL_IDLE);
209 auto* hash_attribute = MakeHashAttributeForString(kHashKeys[1]);
210 ExpectPickQueued(picker.get(), {hash_attribute});
211 WaitForWorkSerializerToFlush();
212 WaitForWorkSerializerToFlush();
213 auto* subchannel = FindSubchannel(kAddresses[1]);
214 ASSERT_NE(subchannel, nullptr);
215 EXPECT_TRUE(subchannel->ConnectionRequested());
216 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
217 picker = ExpectState(GRPC_CHANNEL_CONNECTING);
218 ExpectPickQueued(picker.get(), {hash_attribute});
219 EXPECT_EQ(nullptr, FindSubchannel(kAddresses[0]));
220 EXPECT_EQ(nullptr, FindSubchannel(kAddresses[2]));
221 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
222 picker = ExpectState(GRPC_CHANNEL_READY);
223 auto address = ExpectPickComplete(picker.get(), {hash_attribute});
224 EXPECT_EQ(address, kAddresses[1]);
225 }
226
TEST_F(RingHashTest,PickFailsWithoutRequestHashAttribute)227 TEST_F(RingHashTest, PickFailsWithoutRequestHashAttribute) {
228 const std::array<absl::string_view, 3> kAddresses = {
229 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
230 EXPECT_EQ(
231 ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig()), lb_policy()),
232 absl::OkStatus());
233 auto picker = ExpectState(GRPC_CHANNEL_IDLE);
234 ExpectPickFail(picker.get(), [&](const absl::Status& status) {
235 EXPECT_EQ(status, absl::InternalError("hash attribute not present"));
236 });
237 }
238
TEST_F(RingHashTest,RequestHashHeaderNotEnabled)239 TEST_F(RingHashTest, RequestHashHeaderNotEnabled) {
240 const std::array<absl::string_view, 3> kAddresses = {
241 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
242 EXPECT_EQ(
243 ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig(0, 0, "foo")),
244 lb_policy()),
245 absl::OkStatus());
246 auto picker = ExpectState(GRPC_CHANNEL_IDLE);
247 ExpectPickFail(picker.get(), [&](const absl::Status& status) {
248 EXPECT_EQ(status, absl::InternalError("hash attribute not present"));
249 });
250 }
251
TEST_F(RingHashTest,RequestHashHeader)252 TEST_F(RingHashTest, RequestHashHeader) {
253 ScopedExperimentalEnvVar env(
254 "GRPC_EXPERIMENTAL_RING_HASH_SET_REQUEST_HASH_KEY");
255 const std::array<absl::string_view, 3> kAddresses = {
256 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
257 EXPECT_EQ(
258 ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig(0, 0, "foo")),
259 lb_policy()),
260 absl::OkStatus());
261 auto picker = ExpectState(GRPC_CHANNEL_IDLE);
262 std::string hash_key =
263 absl::StrCat(absl::StripPrefix(kAddresses[0], "ipv4:"), "_0");
264 std::map<std::string, std::string> metadata = {{"foo", hash_key}};
265 ExpectPickQueued(picker.get(), /*call_attributes=*/{}, metadata);
266 WaitForWorkSerializerToFlush();
267 WaitForWorkSerializerToFlush();
268 auto* subchannel = FindSubchannel(kAddresses[0]);
269 ASSERT_NE(subchannel, nullptr);
270 EXPECT_TRUE(subchannel->ConnectionRequested());
271 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
272 picker = ExpectState(GRPC_CHANNEL_CONNECTING);
273 ExpectPickQueued(picker.get(), {}, metadata);
274 EXPECT_EQ(nullptr, FindSubchannel(kAddresses[1]));
275 EXPECT_EQ(nullptr, FindSubchannel(kAddresses[2]));
276 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
277 picker = ExpectState(GRPC_CHANNEL_READY);
278 auto address = ExpectPickComplete(picker.get(), {}, metadata);
279 EXPECT_EQ(address, kAddresses[0]);
280 }
281
TEST_F(RingHashTest,RequestHashHeaderNotPresent)282 TEST_F(RingHashTest, RequestHashHeaderNotPresent) {
283 ScopedExperimentalEnvVar env(
284 "GRPC_EXPERIMENTAL_RING_HASH_SET_REQUEST_HASH_KEY");
285 const std::array<absl::string_view, 3> kAddresses = {
286 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
287 EXPECT_EQ(
288 ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig(0, 0, "foo")),
289 lb_policy()),
290 absl::OkStatus());
291 auto picker = ExpectState(GRPC_CHANNEL_IDLE);
292 ExpectPickQueued(picker.get());
293 WaitForWorkSerializerToFlush();
294 WaitForWorkSerializerToFlush();
295 // It will randomly pick one.
296 size_t index = 0;
297 SubchannelState* subchannel = nullptr;
298 for (; index < kAddresses.size(); ++index) {
299 subchannel = FindSubchannel(kAddresses[index]);
300 if (subchannel != nullptr) {
301 LOG(INFO) << "Randomly picked subchannel index " << index;
302 break;
303 }
304 }
305 ASSERT_NE(subchannel, nullptr);
306 EXPECT_TRUE(subchannel->ConnectionRequested());
307 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
308 picker = ExpectState(GRPC_CHANNEL_CONNECTING);
309 ExpectPickQueued(picker.get());
310 // No other subchannels should have been created yet.
311 for (size_t i = 0; i < kAddresses.size(); ++i) {
312 if (i != index) EXPECT_EQ(nullptr, FindSubchannel(kAddresses[i]));
313 }
314 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
315 picker = ExpectState(GRPC_CHANNEL_READY);
316 auto address = ExpectPickComplete(picker.get());
317 EXPECT_EQ(address, kAddresses[index]);
318 }
319
320 } // namespace
321 } // namespace testing
322 } // namespace grpc_core
323
main(int argc,char ** argv)324 int main(int argc, char** argv) {
325 ::testing::InitGoogleTest(&argc, argv);
326 grpc::testing::TestEnvironment env(&argc, argv);
327 return RUN_ALL_TESTS();
328 }
329