• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2023 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/load_balancing/ring_hash/ring_hash.h"
18 
19 #include <grpc/grpc.h>
20 #include <grpc/support/json.h>
21 #include <stdint.h>
22 
23 #include <algorithm>
24 #include <array>
25 #include <memory>
26 #include <string>
27 #include <vector>
28 
29 #include "absl/status/status.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/strings/strip.h"
33 #include "absl/types/optional.h"
34 #include "gtest/gtest.h"
35 #include "src/core/load_balancing/lb_policy.h"
36 #include "src/core/resolver/endpoint_addresses.h"
37 #include "src/core/util/json/json.h"
38 #include "src/core/util/ref_counted_ptr.h"
39 #include "src/core/util/xxhash_inline.h"
40 #include "test/core/load_balancing/lb_policy_test_lib.h"
41 #include "test/core/test_util/scoped_env_var.h"
42 #include "test/core/test_util/test_config.h"
43 
44 namespace grpc_core {
45 namespace testing {
46 namespace {
47 
48 // TODO(roth): I created this file when I fixed a bug and wrote only a
49 // very basic test and the test needed for that bug.  When we have time,
50 // we need a lot more tests here to cover all of the policy's functionality.
51 
52 class RingHashTest : public LoadBalancingPolicyTest {
53  protected:
RingHashTest()54   RingHashTest() : LoadBalancingPolicyTest("ring_hash_experimental") {}
55 
MakeRingHashConfig(int min_ring_size=0,int max_ring_size=0,const std::string & request_hash_header="")56   static RefCountedPtr<LoadBalancingPolicy::Config> MakeRingHashConfig(
57       int min_ring_size = 0, int max_ring_size = 0,
58       const std::string& request_hash_header = "") {
59     Json::Object fields;
60     if (min_ring_size > 0) {
61       fields["minRingSize"] = Json::FromString(absl::StrCat(min_ring_size));
62     }
63     if (max_ring_size > 0) {
64       fields["maxRingSize"] = Json::FromString(absl::StrCat(max_ring_size));
65     }
66     if (!request_hash_header.empty()) {
67       fields["requestHashHeader"] = Json::FromString(request_hash_header);
68     }
69     return MakeConfig(Json::FromArray({Json::FromObject(
70         {{"ring_hash_experimental", Json::FromObject(fields)}})}));
71   }
72 
MakeHashAttributeForString(absl::string_view key)73   RequestHashAttribute* MakeHashAttributeForString(absl::string_view key) {
74     std::string key_str = absl::StrCat(key, "_0");
75     uint64_t hash = XXH64(key_str.data(), key_str.size(), 0);
76     attribute_storage_.emplace_back(
77         std::make_unique<RequestHashAttribute>(hash));
78     return attribute_storage_.back().get();
79   }
80 
MakeHashAttribute(absl::string_view address)81   RequestHashAttribute* MakeHashAttribute(absl::string_view address) {
82     return MakeHashAttributeForString(absl::StripPrefix(address, "ipv4:"));
83   }
84 
85   std::vector<std::unique_ptr<RequestHashAttribute>> attribute_storage_;
86 };
87 
TEST_F(RingHashTest,Basic)88 TEST_F(RingHashTest, Basic) {
89   const std::array<absl::string_view, 3> kAddresses = {
90       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
91   EXPECT_EQ(
92       ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig()), lb_policy()),
93       absl::OkStatus());
94   auto picker = ExpectState(GRPC_CHANNEL_IDLE);
95   auto* address0_attribute = MakeHashAttribute(kAddresses[0]);
96   ExpectPickQueued(picker.get(), {address0_attribute});
97   WaitForWorkSerializerToFlush();
98   WaitForWorkSerializerToFlush();
99   auto* subchannel = FindSubchannel(kAddresses[0]);
100   ASSERT_NE(subchannel, nullptr);
101   EXPECT_TRUE(subchannel->ConnectionRequested());
102   subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
103   picker = ExpectState(GRPC_CHANNEL_CONNECTING);
104   ExpectPickQueued(picker.get(), {address0_attribute});
105   EXPECT_EQ(nullptr, FindSubchannel(kAddresses[1]));
106   EXPECT_EQ(nullptr, FindSubchannel(kAddresses[2]));
107   subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
108   picker = ExpectState(GRPC_CHANNEL_READY);
109   auto address = ExpectPickComplete(picker.get(), {address0_attribute});
110   EXPECT_EQ(address, kAddresses[0]);
111 }
112 
TEST_F(RingHashTest,SameAddressListedMultipleTimes)113 TEST_F(RingHashTest, SameAddressListedMultipleTimes) {
114   const std::array<absl::string_view, 3> kAddresses = {
115       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:441"};
116   EXPECT_EQ(
117       ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig()), lb_policy()),
118       absl::OkStatus());
119   auto picker = ExpectState(GRPC_CHANNEL_IDLE);
120   auto* address0_attribute = MakeHashAttribute(kAddresses[0]);
121   ExpectPickQueued(picker.get(), {address0_attribute});
122   WaitForWorkSerializerToFlush();
123   WaitForWorkSerializerToFlush();
124   auto* subchannel = FindSubchannel(kAddresses[0]);
125   ASSERT_NE(subchannel, nullptr);
126   EXPECT_TRUE(subchannel->ConnectionRequested());
127   subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
128   picker = ExpectState(GRPC_CHANNEL_CONNECTING);
129   ExpectPickQueued(picker.get(), {address0_attribute});
130   subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
131   picker = ExpectState(GRPC_CHANNEL_READY);
132   auto address = ExpectPickComplete(picker.get(), {address0_attribute});
133   EXPECT_EQ(address, kAddresses[0]);
134 }
135 
TEST_F(RingHashTest,MultipleAddressesPerEndpoint)136 TEST_F(RingHashTest, MultipleAddressesPerEndpoint) {
137   constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
138       "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
139   constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
140       "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
141   const std::array<EndpointAddresses, 2> kEndpoints = {
142       MakeEndpointAddresses(kEndpoint1Addresses),
143       MakeEndpointAddresses(kEndpoint2Addresses)};
144   EXPECT_EQ(
145       ApplyUpdate(BuildUpdate(kEndpoints, MakeRingHashConfig()), lb_policy()),
146       absl::OkStatus());
147   auto picker = ExpectState(GRPC_CHANNEL_IDLE);
148   // Normal connection to first address of the first endpoint.
149   auto* address0_attribute = MakeHashAttribute(kEndpoint1Addresses[0]);
150   ExpectPickQueued(picker.get(), {address0_attribute});
151   WaitForWorkSerializerToFlush();
152   WaitForWorkSerializerToFlush();
153   auto* subchannel = FindSubchannel(kEndpoint1Addresses[0]);
154   ASSERT_NE(subchannel, nullptr);
155   EXPECT_TRUE(subchannel->ConnectionRequested());
156   auto* subchannel2 = FindSubchannel(kEndpoint1Addresses[1]);
157   ASSERT_NE(subchannel2, nullptr);
158   EXPECT_FALSE(subchannel2->ConnectionRequested());
159   subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
160   picker = ExpectState(GRPC_CHANNEL_CONNECTING);
161   ExpectPickQueued(picker.get(), {address0_attribute});
162   subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
163   picker = ExpectState(GRPC_CHANNEL_READY);
164   auto address = ExpectPickComplete(picker.get(), {address0_attribute});
165   EXPECT_EQ(address, kEndpoint1Addresses[0]);
166   // Now that connection fails.
167   subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
168   ExpectReresolutionRequest();
169   picker = ExpectState(GRPC_CHANNEL_IDLE);
170   EXPECT_FALSE(subchannel->ConnectionRequested());
171   EXPECT_FALSE(subchannel2->ConnectionRequested());
172   // The LB policy will try to reconnect when it gets another pick.
173   ExpectPickQueued(picker.get(), {address0_attribute});
174   WaitForWorkSerializerToFlush();
175   WaitForWorkSerializerToFlush();
176   EXPECT_TRUE(subchannel->ConnectionRequested());
177   subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
178   picker = ExpectState(GRPC_CHANNEL_CONNECTING);
179   ExpectPickQueued(picker.get(), {address0_attribute});
180   // The connection attempt fails.
181   subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
182                                    absl::UnavailableError("ugh"));
183   // The PF child policy will try to connect to the second address for the
184   // endpoint.
185   EXPECT_TRUE(subchannel2->ConnectionRequested());
186   subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
187   picker = ExpectState(GRPC_CHANNEL_CONNECTING);
188   ExpectPickQueued(picker.get(), {address0_attribute});
189   subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
190   picker = ExpectState(GRPC_CHANNEL_READY);
191   address = ExpectPickComplete(picker.get(), {address0_attribute});
192   EXPECT_EQ(address, kEndpoint1Addresses[1]);
193 }
194 
TEST_F(RingHashTest,EndpointHashKeys)195 TEST_F(RingHashTest, EndpointHashKeys) {
196   const std::array<absl::string_view, 3> kAddresses = {
197       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
198   const std::array<absl::string_view, 3> kHashKeys = {"foo", "bar", "baz"};
199   std::vector<EndpointAddresses> endpoints;
200   for (size_t i = 0; i < 3; ++i) {
201     endpoints.push_back(MakeEndpointAddresses(
202         {kAddresses[i]},
203         ChannelArgs().Set(GRPC_ARG_RING_HASH_ENDPOINT_HASH_KEY, kHashKeys[i])));
204   };
205   EXPECT_EQ(
206       ApplyUpdate(BuildUpdate(endpoints, MakeRingHashConfig()), lb_policy()),
207       absl::OkStatus());
208   auto picker = ExpectState(GRPC_CHANNEL_IDLE);
209   auto* hash_attribute = MakeHashAttributeForString(kHashKeys[1]);
210   ExpectPickQueued(picker.get(), {hash_attribute});
211   WaitForWorkSerializerToFlush();
212   WaitForWorkSerializerToFlush();
213   auto* subchannel = FindSubchannel(kAddresses[1]);
214   ASSERT_NE(subchannel, nullptr);
215   EXPECT_TRUE(subchannel->ConnectionRequested());
216   subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
217   picker = ExpectState(GRPC_CHANNEL_CONNECTING);
218   ExpectPickQueued(picker.get(), {hash_attribute});
219   EXPECT_EQ(nullptr, FindSubchannel(kAddresses[0]));
220   EXPECT_EQ(nullptr, FindSubchannel(kAddresses[2]));
221   subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
222   picker = ExpectState(GRPC_CHANNEL_READY);
223   auto address = ExpectPickComplete(picker.get(), {hash_attribute});
224   EXPECT_EQ(address, kAddresses[1]);
225 }
226 
TEST_F(RingHashTest,PickFailsWithoutRequestHashAttribute)227 TEST_F(RingHashTest, PickFailsWithoutRequestHashAttribute) {
228   const std::array<absl::string_view, 3> kAddresses = {
229       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
230   EXPECT_EQ(
231       ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig()), lb_policy()),
232       absl::OkStatus());
233   auto picker = ExpectState(GRPC_CHANNEL_IDLE);
234   ExpectPickFail(picker.get(), [&](const absl::Status& status) {
235     EXPECT_EQ(status, absl::InternalError("hash attribute not present"));
236   });
237 }
238 
TEST_F(RingHashTest,RequestHashHeaderNotEnabled)239 TEST_F(RingHashTest, RequestHashHeaderNotEnabled) {
240   const std::array<absl::string_view, 3> kAddresses = {
241       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
242   EXPECT_EQ(
243       ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig(0, 0, "foo")),
244                   lb_policy()),
245       absl::OkStatus());
246   auto picker = ExpectState(GRPC_CHANNEL_IDLE);
247   ExpectPickFail(picker.get(), [&](const absl::Status& status) {
248     EXPECT_EQ(status, absl::InternalError("hash attribute not present"));
249   });
250 }
251 
TEST_F(RingHashTest,RequestHashHeader)252 TEST_F(RingHashTest, RequestHashHeader) {
253   ScopedExperimentalEnvVar env(
254       "GRPC_EXPERIMENTAL_RING_HASH_SET_REQUEST_HASH_KEY");
255   const std::array<absl::string_view, 3> kAddresses = {
256       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
257   EXPECT_EQ(
258       ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig(0, 0, "foo")),
259                   lb_policy()),
260       absl::OkStatus());
261   auto picker = ExpectState(GRPC_CHANNEL_IDLE);
262   std::string hash_key =
263       absl::StrCat(absl::StripPrefix(kAddresses[0], "ipv4:"), "_0");
264   std::map<std::string, std::string> metadata = {{"foo", hash_key}};
265   ExpectPickQueued(picker.get(), /*call_attributes=*/{}, metadata);
266   WaitForWorkSerializerToFlush();
267   WaitForWorkSerializerToFlush();
268   auto* subchannel = FindSubchannel(kAddresses[0]);
269   ASSERT_NE(subchannel, nullptr);
270   EXPECT_TRUE(subchannel->ConnectionRequested());
271   subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
272   picker = ExpectState(GRPC_CHANNEL_CONNECTING);
273   ExpectPickQueued(picker.get(), {}, metadata);
274   EXPECT_EQ(nullptr, FindSubchannel(kAddresses[1]));
275   EXPECT_EQ(nullptr, FindSubchannel(kAddresses[2]));
276   subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
277   picker = ExpectState(GRPC_CHANNEL_READY);
278   auto address = ExpectPickComplete(picker.get(), {}, metadata);
279   EXPECT_EQ(address, kAddresses[0]);
280 }
281 
TEST_F(RingHashTest,RequestHashHeaderNotPresent)282 TEST_F(RingHashTest, RequestHashHeaderNotPresent) {
283   ScopedExperimentalEnvVar env(
284       "GRPC_EXPERIMENTAL_RING_HASH_SET_REQUEST_HASH_KEY");
285   const std::array<absl::string_view, 3> kAddresses = {
286       "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
287   EXPECT_EQ(
288       ApplyUpdate(BuildUpdate(kAddresses, MakeRingHashConfig(0, 0, "foo")),
289                   lb_policy()),
290       absl::OkStatus());
291   auto picker = ExpectState(GRPC_CHANNEL_IDLE);
292   ExpectPickQueued(picker.get());
293   WaitForWorkSerializerToFlush();
294   WaitForWorkSerializerToFlush();
295   // It will randomly pick one.
296   size_t index = 0;
297   SubchannelState* subchannel = nullptr;
298   for (; index < kAddresses.size(); ++index) {
299     subchannel = FindSubchannel(kAddresses[index]);
300     if (subchannel != nullptr) {
301       LOG(INFO) << "Randomly picked subchannel index " << index;
302       break;
303     }
304   }
305   ASSERT_NE(subchannel, nullptr);
306   EXPECT_TRUE(subchannel->ConnectionRequested());
307   subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
308   picker = ExpectState(GRPC_CHANNEL_CONNECTING);
309   ExpectPickQueued(picker.get());
310   // No other subchannels should have been created yet.
311   for (size_t i = 0; i < kAddresses.size(); ++i) {
312     if (i != index) EXPECT_EQ(nullptr, FindSubchannel(kAddresses[i]));
313   }
314   subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
315   picker = ExpectState(GRPC_CHANNEL_READY);
316   auto address = ExpectPickComplete(picker.get());
317   EXPECT_EQ(address, kAddresses[index]);
318 }
319 
320 }  // namespace
321 }  // namespace testing
322 }  // namespace grpc_core
323 
main(int argc,char ** argv)324 int main(int argc, char** argv) {
325   ::testing::InitGoogleTest(&argc, argv);
326   grpc::testing::TestEnvironment env(&argc, argv);
327   return RUN_ALL_TESTS();
328 }
329