1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/grpc.h>
18 #include <grpc/support/json.h>
19 #include <stddef.h>
20
21 #include <algorithm>
22 #include <array>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27
28 #include "absl/log/log.h"
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/strings/strip.h"
34 #include "absl/types/optional.h"
35 #include "absl/types/span.h"
36 #include "gmock/gmock.h"
37 #include "gtest/gtest.h"
38 #include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/load_balancing/lb_policy.h"
41 #include "src/core/resolver/endpoint_addresses.h"
42 #include "src/core/resolver/xds/xds_config.h"
43 #include "src/core/util/json/json.h"
44 #include "src/core/util/ref_counted_ptr.h"
45 #include "src/core/xds/grpc/xds_health_status.h"
46 #include "test/core/load_balancing/lb_policy_test_lib.h"
47 #include "test/core/test_util/test_config.h"
48
49 namespace grpc_core {
50 namespace testing {
51 namespace {
52 class XdsOverrideHostTest : public LoadBalancingPolicyTest {
53 protected:
XdsOverrideHostTest()54 XdsOverrideHostTest()
55 : LoadBalancingPolicyTest("xds_override_host_experimental") {}
56
MakeXdsConfig(absl::Span<const absl::string_view> override_host_statuses={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt,std::string cluster_name="cluster_name")57 static RefCountedPtr<const XdsConfig> MakeXdsConfig(
58 absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
59 "HEALTHY"},
60 absl::optional<Duration> connection_idle_timeout = absl::nullopt,
61 std::string cluster_name = "cluster_name") {
62 auto cluster_resource = std::make_shared<XdsClusterResource>();
63 for (const absl::string_view host_status : override_host_statuses) {
64 cluster_resource->override_host_statuses.Add(
65 XdsHealthStatus::FromString(host_status).value());
66 }
67 if (connection_idle_timeout.has_value()) {
68 cluster_resource->connection_idle_timeout = *connection_idle_timeout;
69 }
70 auto xds_config = MakeRefCounted<XdsConfig>();
71 xds_config->clusters[std::move(cluster_name)].emplace(
72 std::move(cluster_resource), nullptr, "");
73 return xds_config;
74 }
75
UpdateXdsOverrideHostPolicy(absl::Span<const EndpointAddresses> endpoints,absl::Span<const absl::string_view> override_host_statuses={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt,std::string cluster_name="cluster_name",std::string child_policy="round_robin")76 absl::Status UpdateXdsOverrideHostPolicy(
77 absl::Span<const EndpointAddresses> endpoints,
78 absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
79 "HEALTHY"},
80 absl::optional<Duration> connection_idle_timeout = absl::nullopt,
81 std::string cluster_name = "cluster_name",
82 std::string child_policy = "round_robin") {
83 auto config = MakeConfig(Json::FromArray({Json::FromObject(
84 {{"xds_override_host_experimental",
85 Json::FromObject(
86 {{"clusterName", Json::FromString(cluster_name)},
87 {"childPolicy",
88 Json::FromArray({Json::FromObject(
89 {{child_policy, Json::FromObject({})}})})}})}})}));
90 auto xds_config = MakeXdsConfig(override_host_statuses,
91 connection_idle_timeout, cluster_name);
92 return ApplyUpdate(
93 BuildUpdate(endpoints, std::move(config),
94 ChannelArgs().SetObject(std::move(xds_config))),
95 lb_policy());
96 }
97
UpdateXdsOverrideHostPolicy(absl::Span<const absl::string_view> addresses,absl::Span<const absl::string_view> override_host_statuses={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt,std::string cluster_name="cluster_name",std::string child_policy="round_robin")98 absl::Status UpdateXdsOverrideHostPolicy(
99 absl::Span<const absl::string_view> addresses,
100 absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
101 "HEALTHY"},
102 absl::optional<Duration> connection_idle_timeout = absl::nullopt,
103 std::string cluster_name = "cluster_name",
104 std::string child_policy = "round_robin") {
105 return UpdateXdsOverrideHostPolicy(
106 MakeEndpointAddressesListFromAddressList(addresses),
107 override_host_statuses, connection_idle_timeout,
108 std::move(cluster_name), std::move(child_policy));
109 }
110
111 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses,SourceLocation location=SourceLocation ())112 ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses,
113 SourceLocation location = SourceLocation()) {
114 EXPECT_EQ(UpdateXdsOverrideHostPolicy(addresses), absl::OkStatus())
115 << location.file() << ":" << location.line();
116 return ExpectRoundRobinStartup(addresses, location);
117 }
118
MakeAddressWithHealthStatus(absl::string_view address,XdsHealthStatus::HealthStatus status)119 EndpointAddresses MakeAddressWithHealthStatus(
120 absl::string_view address, XdsHealthStatus::HealthStatus status) {
121 return EndpointAddresses(
122 MakeAddress(address),
123 ChannelArgs().Set(GRPC_ARG_XDS_HEALTH_STATUS, status));
124 }
125
ApplyUpdateWithHealthStatuses(absl::Span<const std::pair<const absl::string_view,XdsHealthStatus::HealthStatus>> addresses_and_statuses,absl::Span<const absl::string_view> override_host_status={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt)126 void ApplyUpdateWithHealthStatuses(
127 absl::Span<const std::pair<const absl::string_view,
128 XdsHealthStatus::HealthStatus>>
129 addresses_and_statuses,
130 absl::Span<const absl::string_view> override_host_status = {"UNKNOWN",
131 "HEALTHY"},
132 absl::optional<Duration> connection_idle_timeout = absl::nullopt) {
133 EndpointAddressesList endpoints;
134 for (auto address_and_status : addresses_and_statuses) {
135 endpoints.push_back(MakeAddressWithHealthStatus(
136 address_and_status.first, address_and_status.second));
137 }
138 EXPECT_EQ(UpdateXdsOverrideHostPolicy(endpoints, override_host_status,
139 connection_idle_timeout),
140 absl::OkStatus());
141 }
142
143 struct OverrideHostAttributeStorage {
144 // Need to store the string externally, since
145 // XdsOverrideHostAttribute only holds a string_view.
146 std::string address_list;
147 XdsOverrideHostAttribute attribute;
148
OverrideHostAttributeStoragegrpc_core::testing::__anon71c171110111::XdsOverrideHostTest::OverrideHostAttributeStorage149 explicit OverrideHostAttributeStorage(std::string addresses)
150 : address_list(std::move(addresses)), attribute(address_list) {}
151 };
152
MakeOverrideHostAttribute(absl::Span<const absl::string_view> addresses)153 XdsOverrideHostAttribute* MakeOverrideHostAttribute(
154 absl::Span<const absl::string_view> addresses) {
155 std::vector<absl::string_view> address_list;
156 address_list.reserve(addresses.size());
157 for (absl::string_view address : addresses) {
158 address_list.emplace_back(absl::StripPrefix(address, "ipv4:"));
159 }
160 attribute_storage_.emplace_back(
161 std::make_unique<OverrideHostAttributeStorage>(
162 absl::StrJoin(address_list, ",")));
163 return &attribute_storage_.back()->attribute;
164 }
165
MakeOverrideHostAttribute(absl::string_view address)166 XdsOverrideHostAttribute* MakeOverrideHostAttribute(
167 absl::string_view address) {
168 const std::array<absl::string_view, 1> addresses = {address};
169 return MakeOverrideHostAttribute(addresses);
170 }
171
ExpectOverridePicks(LoadBalancingPolicy::SubchannelPicker * picker,XdsOverrideHostAttribute * attribute,absl::string_view expected,absl::Span<const absl::string_view> expected_address_list={},SourceLocation location=SourceLocation ())172 void ExpectOverridePicks(
173 LoadBalancingPolicy::SubchannelPicker* picker,
174 XdsOverrideHostAttribute* attribute, absl::string_view expected,
175 absl::Span<const absl::string_view> expected_address_list = {},
176 SourceLocation location = SourceLocation()) {
177 std::array<absl::string_view, 1> kArray = {expected};
178 if (expected_address_list.empty()) expected_address_list = kArray;
179 std::vector<absl::string_view> expected_addresses;
180 expected_addresses.reserve(expected_address_list.size());
181 for (absl::string_view address : expected_address_list) {
182 expected_addresses.push_back(absl::StripPrefix(address, "ipv4:"));
183 }
184 std::string expected_addresses_str = absl::StrJoin(expected_addresses, ",");
185 for (size_t i = 0; i < 3; ++i) {
186 EXPECT_EQ(ExpectPickComplete(picker, {attribute}, /*metadata=*/{},
187 /*subchannel_call_tracker=*/nullptr,
188 /*picked_subchannel=*/nullptr, location),
189 expected)
190 << location.file() << ":" << location.line();
191 EXPECT_EQ(attribute->actual_address_list(), expected_addresses_str)
192 << " Actual: " << attribute->actual_address_list() << "\n"
193 << "Expected: " << expected_addresses_str << "\n"
194 << location.file() << ":" << location.line();
195 }
196 }
197
ExpectRoundRobinPicksWithAttribute(LoadBalancingPolicy::SubchannelPicker * picker,XdsOverrideHostAttribute * attribute,absl::Span<const absl::string_view> expected,SourceLocation location=SourceLocation ())198 void ExpectRoundRobinPicksWithAttribute(
199 LoadBalancingPolicy::SubchannelPicker* picker,
200 XdsOverrideHostAttribute* attribute,
201 absl::Span<const absl::string_view> expected,
202 SourceLocation location = SourceLocation()) {
203 std::vector<std::string> actual_picks;
204 for (size_t i = 0; i < expected.size(); ++i) {
205 auto address =
206 ExpectPickComplete(picker, {attribute}, /*metadata=*/{},
207 /*subchannel_call_tracker=*/nullptr,
208 /*picked_subchannel=*/nullptr, location);
209 ASSERT_TRUE(address.has_value())
210 << location.file() << ":" << location.line();
211 EXPECT_THAT(*address, ::testing::AnyOfArray(expected))
212 << location.file() << ":" << location.line();
213 EXPECT_EQ(attribute->actual_address_list(),
214 absl::StripPrefix(*address, "ipv4:"))
215 << " Actual: " << attribute->actual_address_list() << "\n"
216 << "Expected: " << absl::StripPrefix(*address, "ipv4:") << "\n"
217 << location.file() << ":" << location.line();
218 actual_picks.push_back(std::move(*address));
219 }
220 EXPECT_TRUE(PicksAreRoundRobin(expected, actual_picks))
221 << location.file() << ":" << location.line();
222 }
223
224 std::vector<std::unique_ptr<OverrideHostAttributeStorage>> attribute_storage_;
225 };
226
TEST_F(XdsOverrideHostTest,DelegatesToChild)227 TEST_F(XdsOverrideHostTest, DelegatesToChild) {
228 ExpectStartupWithRoundRobin(
229 {"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"});
230 }
231
TEST_F(XdsOverrideHostTest,NoConfigReportsError)232 TEST_F(XdsOverrideHostTest, NoConfigReportsError) {
233 EXPECT_EQ(
234 ApplyUpdate(
235 BuildUpdate({"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"}, nullptr),
236 lb_policy()),
237 absl::InvalidArgumentError("Missing policy config"));
238 }
239
TEST_F(XdsOverrideHostTest,OverrideHost)240 TEST_F(XdsOverrideHostTest, OverrideHost) {
241 const std::array<absl::string_view, 3> kAddresses = {
242 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
243 auto picker = ExpectStartupWithRoundRobin(kAddresses);
244 ASSERT_NE(picker, nullptr);
245 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
246 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
247 auto* address0_attribute = MakeOverrideHostAttribute(kAddresses[0]);
248 ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
249 }
250
TEST_F(XdsOverrideHostTest,SubchannelNotFound)251 TEST_F(XdsOverrideHostTest, SubchannelNotFound) {
252 const std::array<absl::string_view, 3> kAddresses = {
253 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
254 auto picker = ExpectStartupWithRoundRobin(kAddresses);
255 ASSERT_NE(picker, nullptr);
256 auto* attribute = MakeOverrideHostAttribute("no such host");
257 ExpectRoundRobinPicksWithAttribute(picker.get(), attribute, kAddresses);
258 }
259
TEST_F(XdsOverrideHostTest,SubchannelsComeAndGo)260 TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) {
261 const std::array<absl::string_view, 3> kAddresses = {
262 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
263 auto picker = ExpectStartupWithRoundRobin(kAddresses);
264 ASSERT_NE(picker, nullptr);
265 // Check that the host override works.
266 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
267 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
268 // The override address is removed.
269 EXPECT_EQ(UpdateXdsOverrideHostPolicy({kAddresses[0], kAddresses[2]}),
270 absl::OkStatus());
271 picker =
272 WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[2]});
273 // Picks are returned in round-robin order, because the address
274 // pointed to by the cookie is not present.
275 ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
276 {kAddresses[0], kAddresses[2]});
277 // The override address comes back.
278 EXPECT_EQ(UpdateXdsOverrideHostPolicy({kAddresses[1], kAddresses[2]}),
279 absl::OkStatus());
280 picker = WaitForRoundRobinListChange({kAddresses[0], kAddresses[2]},
281 {kAddresses[1], kAddresses[2]});
282 // Make sure host override works.
283 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
284 }
285
TEST_F(XdsOverrideHostTest,OverrideIsQueuedInIdleOrConnectingAndFailedInTransientFailure)286 TEST_F(XdsOverrideHostTest,
287 OverrideIsQueuedInIdleOrConnectingAndFailedInTransientFailure) {
288 const std::array<absl::string_view, 3> kAddresses = {
289 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
290 auto picker = ExpectStartupWithRoundRobin(kAddresses);
291 ASSERT_NE(picker, nullptr);
292 // Check that the host is overridden
293 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
294 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
295 // Subchannel for address 1 becomes disconnected.
296 LOG(INFO) << "### subchannel 1 reporting IDLE";
297 auto subchannel = FindSubchannel(kAddresses[1]);
298 ASSERT_NE(subchannel, nullptr);
299 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
300 EXPECT_TRUE(subchannel->ConnectionRequested());
301 LOG(INFO) << "### expecting re-resolution request";
302 ExpectReresolutionRequest();
303 LOG(INFO) << "### expecting RR picks to exclude the disconnected subchannel";
304 picker =
305 WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[2]});
306 // Picks with the override will be queued.
307 ExpectPickQueued(picker.get(), {address1_attribute});
308 // The subchannel starts trying to reconnect.
309 LOG(INFO) << "### subchannel 1 reporting CONNECTING";
310 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
311 picker = ExpectState(GRPC_CHANNEL_READY);
312 ASSERT_NE(picker, nullptr);
313 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
314 // Picks with the override will still be queued.
315 ExpectPickQueued(picker.get(), {address1_attribute});
316 // The connection attempt fails.
317 LOG(INFO) << "### subchannel 1 reporting TRANSIENT_FAILURE";
318 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
319 absl::ResourceExhaustedError("Hmmmm"));
320 LOG(INFO) << "### expecting re-resolution request";
321 ExpectReresolutionRequest();
322 picker = ExpectState(GRPC_CHANNEL_READY);
323 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
324 // The host override is not used.
325 LOG(INFO) << "### checking that host override is not used";
326 ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
327 {kAddresses[0], kAddresses[2]});
328 }
329
TEST_F(XdsOverrideHostTest,DrainingState)330 TEST_F(XdsOverrideHostTest, DrainingState) {
331 const std::array<absl::string_view, 3> kAddresses = {
332 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
333 auto picker = ExpectStartupWithRoundRobin(kAddresses);
334 ASSERT_NE(picker, nullptr);
335 // Do one override pick for endpoint 1, so that it will still be within
336 // the idle threshold and will therefore be retained when it moves to
337 // state DRAINING.
338 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
339 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
340 // Now move endpoint 1 to state DRAINING.
341 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
342 {kAddresses[1], XdsHealthStatus::kDraining},
343 {kAddresses[2], XdsHealthStatus::kHealthy}},
344 {"UNKNOWN", "HEALTHY", "DRAINING"});
345 picker = ExpectState(GRPC_CHANNEL_READY);
346 // Make sure subchannels get orphaned in the WorkSerializer.
347 WaitForWorkSerializerToFlush();
348 // Picks without an override will round-robin over the two endpoints
349 // that are not in draining state.
350 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
351 // Picks with an override are able to select the draining endpoint.
352 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
353 // Send the LB policy an update that removes the draining endpoint.
354 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
355 {kAddresses[2], XdsHealthStatus::kHealthy}});
356 picker = ExpectState(GRPC_CHANNEL_READY);
357 ASSERT_NE(picker, nullptr);
358 // Gone!
359 ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
360 {kAddresses[0], kAddresses[2]});
361 }
362
TEST_F(XdsOverrideHostTest,DrainingSubchannelIsConnecting)363 TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
364 const std::array<absl::string_view, 3> kAddresses = {
365 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
366 auto picker = ExpectStartupWithRoundRobin(kAddresses);
367 ASSERT_NE(picker, nullptr);
368 // Check that the host is overridden
369 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
370 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
371 // Send an update that marks the endpoints with different EDS health
372 // states, but those states are present in override_host_status.
373 // The picker should use the DRAINING host when a call's override
374 // points to that hose, but the host should not be used if there is no
375 // override pointing to it.
376 LOG(INFO) << "### sending update with DRAINING host";
377 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
378 {kAddresses[1], XdsHealthStatus::kDraining},
379 {kAddresses[2], XdsHealthStatus::kHealthy}},
380 {"UNKNOWN", "HEALTHY", "DRAINING"});
381 auto subchannel = FindSubchannel(kAddresses[1]);
382 ASSERT_NE(subchannel, nullptr);
383 picker = ExpectState(GRPC_CHANNEL_READY);
384 // Make sure subchannels get orphaned in the WorkSerializer.
385 WaitForWorkSerializerToFlush();
386 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
387 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
388 // Now the connection to the draining host gets dropped.
389 // The picker should queue picks where the override host is IDLE.
390 // All picks without an override host should not use this host.
391 LOG(INFO) << "### closing connection to DRAINING host";
392 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
393 picker = ExpectState(GRPC_CHANNEL_READY);
394 ExpectPickQueued(picker.get(), {address1_attribute});
395 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
396 // The subchannel should have been asked to reconnect as a result of the
397 // queued pick above. It will therefore transition into state CONNECTING.
398 // The pick behavior is the same as above: The picker should queue
399 // picks where the override host is CONNECTING. All picks without an
400 // override host should not use this host.
401 LOG(INFO) << "### subchannel starts reconnecting";
402 WaitForWorkSerializerToFlush();
403 EXPECT_TRUE(subchannel->ConnectionRequested());
404 ExpectQueueEmpty();
405 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
406 picker = ExpectState(GRPC_CHANNEL_READY);
407 ExpectPickQueued(picker.get(), {address1_attribute});
408 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
409 // The subchannel now becomes connected again.
410 // Now picks with this override host can be completed again.
411 // Picks without an override host still don't use the draining host.
412 LOG(INFO) << "### subchannel becomes reconnected";
413 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
414 picker = ExpectState(GRPC_CHANNEL_READY);
415 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
416 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
417 }
418
TEST_F(XdsOverrideHostTest,DrainingToHealthy)419 TEST_F(XdsOverrideHostTest, DrainingToHealthy) {
420 const std::array<absl::string_view, 3> kAddresses = {
421 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
422 auto picker = ExpectStartupWithRoundRobin(kAddresses);
423 ASSERT_NE(picker, nullptr);
424 // Do one override pick for endpoint 1, so that it will still be within
425 // the idle threshold and will therefore be retained when it moves to
426 // state DRAINING.
427 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
428 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
429 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
430 {kAddresses[1], XdsHealthStatus::kDraining},
431 {kAddresses[2], XdsHealthStatus::kHealthy}},
432 {"UNKNOWN", "HEALTHY", "DRAINING"});
433 picker = ExpectState(GRPC_CHANNEL_READY);
434 // Make sure subchannels get orphaned in the WorkSerializer.
435 WaitForWorkSerializerToFlush();
436 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
437 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
438 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kHealthy},
439 {kAddresses[1], XdsHealthStatus::kHealthy},
440 {kAddresses[2], XdsHealthStatus::kHealthy}},
441 {"UNKNOWN", "HEALTHY", "DRAINING"});
442 picker = ExpectState(GRPC_CHANNEL_READY);
443 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
444 ExpectRoundRobinPicks(picker.get(), kAddresses);
445 }
446
TEST_F(XdsOverrideHostTest,OverrideHostStatus)447 TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
448 const std::array<absl::string_view, 3> kAddresses = {
449 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
450 auto* address0_attribute = MakeOverrideHostAttribute(kAddresses[0]);
451 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
452 auto* address2_attribute = MakeOverrideHostAttribute(kAddresses[2]);
453 auto picker = ExpectStartupWithRoundRobin(kAddresses);
454 ASSERT_NE(picker, nullptr);
455 // Do one override pick for endpoint 2, so that it will still be within
456 // the idle threshold and will therefore be retained when it moves to
457 // state DRAINING.
458 ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
459 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
460 {kAddresses[1], XdsHealthStatus::kHealthy},
461 {kAddresses[2], XdsHealthStatus::kDraining}},
462 {"UNKNOWN", "HEALTHY", "DRAINING"});
463 picker = ExpectState(GRPC_CHANNEL_READY);
464 ASSERT_NE(picker, nullptr);
465 // Make sure subchannels get orphaned in the WorkSerializer.
466 WaitForWorkSerializerToFlush();
467 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
468 ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
469 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
470 ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
471 // UNKNOWN excluded: overrides for first endpoint are not honored.
472 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
473 {kAddresses[1], XdsHealthStatus::kHealthy},
474 {kAddresses[2], XdsHealthStatus::kDraining}},
475 {"HEALTHY", "DRAINING"});
476 picker = ExpectState(GRPC_CHANNEL_READY);
477 ASSERT_NE(picker, nullptr);
478 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
479 ExpectRoundRobinPicksWithAttribute(picker.get(), address0_attribute,
480 {kAddresses[0], kAddresses[1]});
481 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
482 ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
483 // HEALTHY excluded: overrides for second endpoint are not honored.
484 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
485 {kAddresses[1], XdsHealthStatus::kHealthy},
486 {kAddresses[2], XdsHealthStatus::kDraining}},
487 {"UNKNOWN", "DRAINING"});
488 picker = ExpectState(GRPC_CHANNEL_READY);
489 ASSERT_NE(picker, nullptr);
490 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
491 ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
492 ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
493 {kAddresses[0], kAddresses[1]});
494 ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
495 // DRAINING excluded: overrides for third endpoint are not honored.
496 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
497 {kAddresses[1], XdsHealthStatus::kHealthy},
498 {kAddresses[2], XdsHealthStatus::kDraining}},
499 {"UNKNOWN", "HEALTHY"});
500 picker = ExpectState(GRPC_CHANNEL_READY);
501 ASSERT_NE(picker, nullptr);
502 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
503 ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
504 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
505 ExpectRoundRobinPicksWithAttribute(picker.get(), address2_attribute,
506 {kAddresses[0], kAddresses[1]});
507 }
508
TEST_F(XdsOverrideHostTest,MultipleAddressesPerEndpoint)509 TEST_F(XdsOverrideHostTest, MultipleAddressesPerEndpoint) {
510 constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
511 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
512 constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
513 "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
514 constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
515 "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
516 const std::array<EndpointAddresses, 3> kEndpoints = {
517 MakeEndpointAddresses(kEndpoint1Addresses),
518 MakeEndpointAddresses(kEndpoint2Addresses),
519 MakeEndpointAddresses(kEndpoint3Addresses)};
520 EXPECT_EQ(UpdateXdsOverrideHostPolicy(kEndpoints), absl::OkStatus());
521 auto picker = ExpectRoundRobinStartup(kEndpoints);
522 ASSERT_NE(picker, nullptr);
523 // Check that the host is overridden.
524 auto* endpoint1_attribute = MakeOverrideHostAttribute(kEndpoint1Addresses);
525 ExpectOverridePicks(picker.get(), endpoint1_attribute, kEndpoint1Addresses[0],
526 kEndpoint1Addresses);
527 auto* endpoint2_attribute = MakeOverrideHostAttribute(kEndpoint2Addresses);
528 ExpectOverridePicks(picker.get(), endpoint2_attribute, kEndpoint2Addresses[0],
529 kEndpoint2Addresses);
530 // Change endpoint 1 to connect to its second address.
531 ExpectEndpointAddressChange(kEndpoint1Addresses, 0, 1, [&]() {
532 WaitForRoundRobinListChange(
533 {kEndpoint1Addresses[0], kEndpoint2Addresses[0],
534 kEndpoint3Addresses[0]},
535 {kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
536 });
537 WaitForRoundRobinListChange(
538 {kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
539 {kEndpoint1Addresses[1], kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
540 // Now the cookie for endpoint 1 should cause us to use the second address.
541 ExpectOverridePicks(picker.get(), endpoint1_attribute, kEndpoint1Addresses[1],
542 {kEndpoint1Addresses[1], kEndpoint1Addresses[0]});
543 }
544
TEST_F(XdsOverrideHostTest,ChildPolicyNeverCreatedSubchannel)545 TEST_F(XdsOverrideHostTest, ChildPolicyNeverCreatedSubchannel) {
546 const std::array<absl::string_view, 3> kAddresses = {
547 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
548 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
549 {kAddresses[1], XdsHealthStatus::kDraining},
550 {kAddresses[2], XdsHealthStatus::kHealthy}},
551 {"UNKNOWN", "HEALTHY", "DRAINING"});
552 // The draining endpoint is not passed down to the child policy.
553 // Picks without an override will round-robin over the two endpoints
554 // that are not in draining state.
555 auto picker = ExpectRoundRobinStartup({kAddresses[0], kAddresses[2]});
556 // Subchannels should exist for the non-draining endpoints only.
557 auto* subchannel = FindSubchannel(kAddresses[0]);
558 ASSERT_NE(subchannel, nullptr);
559 EXPECT_GE(subchannel->NumWatchers(), 1);
560 auto* subchannel2 = FindSubchannel(kAddresses[1]);
561 EXPECT_EQ(subchannel2, nullptr);
562 auto* subchannel3 = FindSubchannel(kAddresses[2]);
563 ASSERT_NE(subchannel3, nullptr);
564 EXPECT_GE(subchannel3->NumWatchers(), 1);
565 // A pick with an override pointing to the draining endpoint should
566 // queue the pick and trigger subchannel creation.
567 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
568 ExpectPickQueued(picker.get(), {address1_attribute});
569 WaitForWorkSerializerToFlush();
570 subchannel2 = FindSubchannel(kAddresses[1]);
571 ASSERT_NE(subchannel2, nullptr);
572 EXPECT_EQ(subchannel2->NumWatchers(), 1);
573 // Subchannel creation will trigger returning a new picker.
574 // Picks without an override should continue to use only the
575 // non-draining endpoints.
576 picker = ExpectState(GRPC_CHANNEL_READY);
577 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
578 // Trying the pick again with the new picker will trigger a connection
579 // attempt on the new subchannel.
580 ExpectPickQueued(picker.get(), {address1_attribute});
581 WaitForWorkSerializerToFlush();
582 EXPECT_TRUE(subchannel2->ConnectionRequested());
583 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
584 // Subchannel state change will trigger returning a new picker.
585 // Picks without an override should continue to use only the
586 // non-draining endpoints.
587 picker = ExpectState(GRPC_CHANNEL_READY);
588 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
589 // Trying the pick with override again should queue, because the
590 // connection attempt is still pending.
591 ExpectPickQueued(picker.get(), {address1_attribute});
592 // Connection attempt succeeds.
593 subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
594 // Subchannel state change will trigger returning a new picker.
595 // Picks without an override should continue to use only the
596 // non-draining endpoints.
597 picker = ExpectState(GRPC_CHANNEL_READY);
598 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
599 // Now the pick with override should complete.
600 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
601 }
602
TEST_F(XdsOverrideHostTest,ChildPolicyUnrefsSubchannelNotUsedWithinIdleThreshold)603 TEST_F(XdsOverrideHostTest,
604 ChildPolicyUnrefsSubchannelNotUsedWithinIdleThreshold) {
605 const std::array<absl::string_view, 3> kAddresses = {
606 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
607 auto picker = ExpectStartupWithRoundRobin(kAddresses);
608 ASSERT_NE(picker, nullptr);
609 // Now move endpoint 1 to state DRAINING.
610 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
611 {kAddresses[1], XdsHealthStatus::kDraining},
612 {kAddresses[2], XdsHealthStatus::kHealthy}},
613 {"UNKNOWN", "HEALTHY", "DRAINING"});
614 picker = ExpectState(GRPC_CHANNEL_READY);
615 // Make sure subchannels get orphaned in the WorkSerializer.
616 WaitForWorkSerializerToFlush();
617 // Picks without an override will round-robin over the two endpoints
618 // that are not in draining state.
619 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
620 // Child policy should drop its ref to the draining endpoint, and
621 // xds_override_host should not take ownership, since the entry never
622 // had an override pick.
623 auto* subchannel0 = FindSubchannel(kAddresses[0]);
624 ASSERT_NE(subchannel0, nullptr);
625 EXPECT_GE(subchannel0->NumWatchers(), 1);
626 auto* subchannel1 = FindSubchannel(kAddresses[1]);
627 ASSERT_NE(subchannel1, nullptr);
628 EXPECT_EQ(subchannel1->NumWatchers(), 0);
629 auto* subchannel2 = FindSubchannel(kAddresses[2]);
630 ASSERT_NE(subchannel2, nullptr);
631 EXPECT_GE(subchannel2->NumWatchers(), 1);
632 }
633
TEST_F(XdsOverrideHostTest,IdleTimer)634 TEST_F(XdsOverrideHostTest, IdleTimer) {
635 std::vector<grpc_event_engine::experimental::EventEngine::Duration>
636 timer_durations;
637 fuzzing_ee_->SetRunAfterDurationCallback(
638 [&timer_durations](
639 grpc_event_engine::experimental::EventEngine::Duration duration) {
640 timer_durations.push_back(duration);
641 });
642 const std::array<absl::string_view, 3> kAddresses = {
643 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
644 LOG(INFO) << "### sending initial update";
645 EXPECT_EQ(UpdateXdsOverrideHostPolicy(kAddresses, {"UNKNOWN", "HEALTHY"},
646 Duration::Minutes(1)),
647 absl::OkStatus());
648 // Initial update should have caused the timer to be set for the idle
649 // timeout.
650 EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Minutes(1)));
651 timer_durations.clear();
652 auto picker = ExpectRoundRobinStartup(kAddresses);
653 ASSERT_NE(picker, nullptr);
654 // Do an override pick for endpoints 1 and 2, so that they will still be
655 // within the idle threshold and will therefore be retained when they move
656 // to state DRAINING.
657 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
658 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
659 auto* address2_attribute = MakeOverrideHostAttribute(kAddresses[2]);
660 ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
661 // Increment time by 5 seconds and send an update that moves endpoints 1
662 // and 2 to state DRAINING.
663 LOG(INFO) << "### moving endpoints 1 and 2 to state DRAINING";
664 IncrementTimeBy(Duration::Seconds(5));
665 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
666 {kAddresses[1], XdsHealthStatus::kDraining},
667 {kAddresses[2], XdsHealthStatus::kDraining}},
668 {"UNKNOWN", "HEALTHY", "DRAINING"},
669 Duration::Minutes(1));
670 // The update should cause the timer to be reset for the next
671 // expiration time.
672 EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(55)));
673 timer_durations.clear();
674 picker = ExpectState(GRPC_CHANNEL_READY);
675 // Make sure subchannels get orphaned in the WorkSerializer.
676 WaitForWorkSerializerToFlush();
677 // Picks without an override will use only the endpoint that is not in
678 // draining state.
679 ExpectRoundRobinPicks(picker.get(), {kAddresses[0]});
680 // Picks with an override are able to select the draining endpoints.
681 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
682 ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
683 // Both subchannels are owned by the xds_override_host policy.
684 auto* subchannel1 = FindSubchannel(kAddresses[1]);
685 ASSERT_NE(subchannel1, nullptr);
686 EXPECT_EQ(subchannel1->NumWatchers(), 1);
687 auto* subchannel2 = FindSubchannel(kAddresses[2]);
688 ASSERT_NE(subchannel2, nullptr);
689 EXPECT_EQ(subchannel2->NumWatchers(), 1);
690 // Trigger the timer. Both subchannels have gotten an override pick more
691 // recently than the timer was scheduled, so neither one will be unreffed.
692 IncrementTimeBy(Duration::Seconds(55));
693 EXPECT_EQ(subchannel1->NumWatchers(), 1);
694 EXPECT_EQ(subchannel2->NumWatchers(), 1);
695 // The timer will be reset for 5 seconds.
696 EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(5)));
697 timer_durations.clear();
698 // Send another override pick for endpoint 1.
699 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
700 // Trigger the timer again. This time, it should unref endpoint 2 but
701 // keep endpoint 1.
702 IncrementTimeBy(Duration::Seconds(5));
703 EXPECT_EQ(subchannel1->NumWatchers(), 1);
704 EXPECT_EQ(subchannel2->NumWatchers(), 0);
705 // The timer should now be set for 55 seconds, which is how long it
706 // will be until endpoint 1 should be unreffed.
707 EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(55)));
708 }
709
710 } // namespace
711 } // namespace testing
712 } // namespace grpc_core
713
main(int argc,char ** argv)714 int main(int argc, char** argv) {
715 ::testing::InitGoogleTest(&argc, argv);
716 grpc::testing::TestEnvironment env(&argc, argv);
717 return RUN_ALL_TESTS();
718 }
719