1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <stddef.h>
18
19 #include <algorithm>
20 #include <array>
21 #include <memory>
22 #include <string>
23 #include <utility>
24 #include <vector>
25
26 #include "absl/status/status.h"
27 #include "absl/status/statusor.h"
28 #include "absl/strings/str_join.h"
29 #include "absl/strings/string_view.h"
30 #include "absl/strings/strip.h"
31 #include "absl/types/optional.h"
32 #include "absl/types/span.h"
33 #include "gmock/gmock.h"
34 #include "gtest/gtest.h"
35
36 #include <grpc/grpc.h>
37 #include <grpc/support/json.h>
38 #include <grpc/support/log.h>
39
40 #include "src/core/ext/filters/stateful_session/stateful_session_filter.h"
41 #include "src/core/ext/xds/xds_health_status.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/gprpp/debug_location.h"
44 #include "src/core/lib/gprpp/ref_counted_ptr.h"
45 #include "src/core/lib/json/json.h"
46 #include "src/core/load_balancing/lb_policy.h"
47 #include "src/core/resolver/endpoint_addresses.h"
48 #include "src/core/resolver/xds/xds_dependency_manager.h"
49 #include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
50 #include "test/core/util/test_config.h"
51
52 namespace grpc_core {
53 namespace testing {
54 namespace {
55 class XdsOverrideHostTest : public LoadBalancingPolicyTest {
56 protected:
XdsOverrideHostTest()57 XdsOverrideHostTest()
58 : LoadBalancingPolicyTest("xds_override_host_experimental") {}
59
MakeXdsConfig(absl::Span<const absl::string_view> override_host_statuses={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt,std::string cluster_name="cluster_name")60 static RefCountedPtr<const XdsDependencyManager::XdsConfig> MakeXdsConfig(
61 absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
62 "HEALTHY"},
63 absl::optional<Duration> connection_idle_timeout = absl::nullopt,
64 std::string cluster_name = "cluster_name") {
65 auto cluster_resource = std::make_shared<XdsClusterResource>();
66 for (const absl::string_view host_status : override_host_statuses) {
67 cluster_resource->override_host_statuses.Add(
68 XdsHealthStatus::FromString(host_status).value());
69 }
70 if (connection_idle_timeout.has_value()) {
71 cluster_resource->connection_idle_timeout = *connection_idle_timeout;
72 }
73 auto xds_config = MakeRefCounted<XdsDependencyManager::XdsConfig>();
74 xds_config->clusters[std::move(cluster_name)].emplace(
75 std::move(cluster_resource), nullptr, "");
76 return xds_config;
77 }
78
UpdateXdsOverrideHostPolicy(absl::Span<const EndpointAddresses> endpoints,absl::Span<const absl::string_view> override_host_statuses={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt,std::string cluster_name="cluster_name",std::string child_policy="round_robin")79 absl::Status UpdateXdsOverrideHostPolicy(
80 absl::Span<const EndpointAddresses> endpoints,
81 absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
82 "HEALTHY"},
83 absl::optional<Duration> connection_idle_timeout = absl::nullopt,
84 std::string cluster_name = "cluster_name",
85 std::string child_policy = "round_robin") {
86 auto config = MakeConfig(Json::FromArray({Json::FromObject(
87 {{"xds_override_host_experimental",
88 Json::FromObject(
89 {{"clusterName", Json::FromString(cluster_name)},
90 {"childPolicy",
91 Json::FromArray({Json::FromObject(
92 {{child_policy, Json::FromObject({})}})})}})}})}));
93 auto xds_config = MakeXdsConfig(override_host_statuses,
94 connection_idle_timeout, cluster_name);
95 return ApplyUpdate(
96 BuildUpdate(endpoints, std::move(config),
97 ChannelArgs().SetObject(std::move(xds_config))),
98 lb_policy());
99 }
100
UpdateXdsOverrideHostPolicy(absl::Span<const absl::string_view> addresses,absl::Span<const absl::string_view> override_host_statuses={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt,std::string cluster_name="cluster_name",std::string child_policy="round_robin")101 absl::Status UpdateXdsOverrideHostPolicy(
102 absl::Span<const absl::string_view> addresses,
103 absl::Span<const absl::string_view> override_host_statuses = {"UNKNOWN",
104 "HEALTHY"},
105 absl::optional<Duration> connection_idle_timeout = absl::nullopt,
106 std::string cluster_name = "cluster_name",
107 std::string child_policy = "round_robin") {
108 return UpdateXdsOverrideHostPolicy(
109 MakeEndpointAddressesListFromAddressList(addresses),
110 override_host_statuses, connection_idle_timeout,
111 std::move(cluster_name), std::move(child_policy));
112 }
113
114 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>
ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses,SourceLocation location=SourceLocation ())115 ExpectStartupWithRoundRobin(absl::Span<const absl::string_view> addresses,
116 SourceLocation location = SourceLocation()) {
117 EXPECT_EQ(UpdateXdsOverrideHostPolicy(addresses), absl::OkStatus())
118 << location.file() << ":" << location.line();
119 return ExpectRoundRobinStartup(addresses, location);
120 }
121
MakeAddressWithHealthStatus(absl::string_view address,XdsHealthStatus::HealthStatus status)122 EndpointAddresses MakeAddressWithHealthStatus(
123 absl::string_view address, XdsHealthStatus::HealthStatus status) {
124 return EndpointAddresses(
125 MakeAddress(address),
126 ChannelArgs().Set(GRPC_ARG_XDS_HEALTH_STATUS, status));
127 }
128
ApplyUpdateWithHealthStatuses(absl::Span<const std::pair<const absl::string_view,XdsHealthStatus::HealthStatus>> addresses_and_statuses,absl::Span<const absl::string_view> override_host_status={"UNKNOWN", "HEALTHY"},absl::optional<Duration> connection_idle_timeout=absl::nullopt)129 void ApplyUpdateWithHealthStatuses(
130 absl::Span<const std::pair<const absl::string_view,
131 XdsHealthStatus::HealthStatus>>
132 addresses_and_statuses,
133 absl::Span<const absl::string_view> override_host_status = {"UNKNOWN",
134 "HEALTHY"},
135 absl::optional<Duration> connection_idle_timeout = absl::nullopt) {
136 EndpointAddressesList endpoints;
137 for (auto address_and_status : addresses_and_statuses) {
138 endpoints.push_back(MakeAddressWithHealthStatus(
139 address_and_status.first, address_and_status.second));
140 }
141 EXPECT_EQ(UpdateXdsOverrideHostPolicy(endpoints, override_host_status,
142 connection_idle_timeout),
143 absl::OkStatus());
144 }
145
146 struct OverrideHostAttributeStorage {
147 // Need to store the string externally, since
148 // XdsOverrideHostAttribute only holds a string_view.
149 std::string address_list;
150 XdsOverrideHostAttribute attribute;
151
OverrideHostAttributeStoragegrpc_core::testing::__anon830dd4100111::XdsOverrideHostTest::OverrideHostAttributeStorage152 explicit OverrideHostAttributeStorage(std::string addresses)
153 : address_list(std::move(addresses)), attribute(address_list) {}
154 };
155
MakeOverrideHostAttribute(absl::Span<const absl::string_view> addresses)156 XdsOverrideHostAttribute* MakeOverrideHostAttribute(
157 absl::Span<const absl::string_view> addresses) {
158 std::vector<absl::string_view> address_list;
159 address_list.reserve(addresses.size());
160 for (absl::string_view address : addresses) {
161 address_list.emplace_back(absl::StripPrefix(address, "ipv4:"));
162 }
163 attribute_storage_.emplace_back(
164 std::make_unique<OverrideHostAttributeStorage>(
165 absl::StrJoin(address_list, ",")));
166 return &attribute_storage_.back()->attribute;
167 }
168
MakeOverrideHostAttribute(absl::string_view address)169 XdsOverrideHostAttribute* MakeOverrideHostAttribute(
170 absl::string_view address) {
171 const std::array<absl::string_view, 1> addresses = {address};
172 return MakeOverrideHostAttribute(addresses);
173 }
174
ExpectOverridePicks(LoadBalancingPolicy::SubchannelPicker * picker,XdsOverrideHostAttribute * attribute,absl::string_view expected,absl::Span<const absl::string_view> expected_address_list={},SourceLocation location=SourceLocation ())175 void ExpectOverridePicks(
176 LoadBalancingPolicy::SubchannelPicker* picker,
177 XdsOverrideHostAttribute* attribute, absl::string_view expected,
178 absl::Span<const absl::string_view> expected_address_list = {},
179 SourceLocation location = SourceLocation()) {
180 std::array<absl::string_view, 1> kArray = {expected};
181 if (expected_address_list.empty()) expected_address_list = kArray;
182 std::vector<absl::string_view> expected_addresses;
183 expected_addresses.reserve(expected_address_list.size());
184 for (absl::string_view address : expected_address_list) {
185 expected_addresses.push_back(absl::StripPrefix(address, "ipv4:"));
186 }
187 std::string expected_addresses_str = absl::StrJoin(expected_addresses, ",");
188 for (size_t i = 0; i < 3; ++i) {
189 EXPECT_EQ(ExpectPickComplete(picker, {attribute},
190 /*subchannel_call_tracker=*/nullptr,
191 /*picked_subchannel=*/nullptr, location),
192 expected)
193 << location.file() << ":" << location.line();
194 EXPECT_EQ(attribute->actual_address_list(), expected_addresses_str)
195 << " Actual: " << attribute->actual_address_list() << "\n"
196 << "Expected: " << expected_addresses_str << "\n"
197 << location.file() << ":" << location.line();
198 }
199 }
200
ExpectRoundRobinPicksWithAttribute(LoadBalancingPolicy::SubchannelPicker * picker,XdsOverrideHostAttribute * attribute,absl::Span<const absl::string_view> expected,SourceLocation location=SourceLocation ())201 void ExpectRoundRobinPicksWithAttribute(
202 LoadBalancingPolicy::SubchannelPicker* picker,
203 XdsOverrideHostAttribute* attribute,
204 absl::Span<const absl::string_view> expected,
205 SourceLocation location = SourceLocation()) {
206 std::vector<std::string> actual_picks;
207 for (size_t i = 0; i < expected.size(); ++i) {
208 auto address = ExpectPickComplete(
209 picker, {attribute}, /*subchannel_call_tracker=*/nullptr,
210 /*picked_subchannel=*/nullptr, location);
211 ASSERT_TRUE(address.has_value())
212 << location.file() << ":" << location.line();
213 EXPECT_THAT(*address, ::testing::AnyOfArray(expected))
214 << location.file() << ":" << location.line();
215 EXPECT_EQ(attribute->actual_address_list(),
216 absl::StripPrefix(*address, "ipv4:"))
217 << " Actual: " << attribute->actual_address_list() << "\n"
218 << "Expected: " << absl::StripPrefix(*address, "ipv4:") << "\n"
219 << location.file() << ":" << location.line();
220 actual_picks.push_back(std::move(*address));
221 }
222 EXPECT_TRUE(PicksAreRoundRobin(expected, actual_picks))
223 << location.file() << ":" << location.line();
224 }
225
226 std::vector<std::unique_ptr<OverrideHostAttributeStorage>> attribute_storage_;
227 };
228
TEST_F(XdsOverrideHostTest,DelegatesToChild)229 TEST_F(XdsOverrideHostTest, DelegatesToChild) {
230 ExpectStartupWithRoundRobin(
231 {"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"});
232 }
233
TEST_F(XdsOverrideHostTest,NoConfigReportsError)234 TEST_F(XdsOverrideHostTest, NoConfigReportsError) {
235 EXPECT_EQ(
236 ApplyUpdate(
237 BuildUpdate({"ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442"}, nullptr),
238 lb_policy()),
239 absl::InvalidArgumentError("Missing policy config"));
240 }
241
TEST_F(XdsOverrideHostTest,OverrideHost)242 TEST_F(XdsOverrideHostTest, OverrideHost) {
243 const std::array<absl::string_view, 3> kAddresses = {
244 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
245 auto picker = ExpectStartupWithRoundRobin(kAddresses);
246 ASSERT_NE(picker, nullptr);
247 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
248 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
249 auto* address0_attribute = MakeOverrideHostAttribute(kAddresses[0]);
250 ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
251 }
252
TEST_F(XdsOverrideHostTest,SubchannelNotFound)253 TEST_F(XdsOverrideHostTest, SubchannelNotFound) {
254 const std::array<absl::string_view, 3> kAddresses = {
255 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
256 auto picker = ExpectStartupWithRoundRobin(kAddresses);
257 ASSERT_NE(picker, nullptr);
258 auto* attribute = MakeOverrideHostAttribute("no such host");
259 ExpectRoundRobinPicksWithAttribute(picker.get(), attribute, kAddresses);
260 }
261
TEST_F(XdsOverrideHostTest,SubchannelsComeAndGo)262 TEST_F(XdsOverrideHostTest, SubchannelsComeAndGo) {
263 const std::array<absl::string_view, 3> kAddresses = {
264 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
265 auto picker = ExpectStartupWithRoundRobin(kAddresses);
266 ASSERT_NE(picker, nullptr);
267 // Check that the host override works.
268 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
269 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
270 // The override address is removed.
271 EXPECT_EQ(UpdateXdsOverrideHostPolicy({kAddresses[0], kAddresses[2]}),
272 absl::OkStatus());
273 picker =
274 WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[2]});
275 // Picks are returned in round-robin order, because the address
276 // pointed to by the cookie is not present.
277 ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
278 {kAddresses[0], kAddresses[2]});
279 // The override address comes back.
280 EXPECT_EQ(UpdateXdsOverrideHostPolicy({kAddresses[1], kAddresses[2]}),
281 absl::OkStatus());
282 picker = WaitForRoundRobinListChange({kAddresses[0], kAddresses[2]},
283 {kAddresses[1], kAddresses[2]});
284 // Make sure host override works.
285 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
286 }
287
TEST_F(XdsOverrideHostTest,OverrideIsQueuedInIdleOrConnectingAndFailedInTransientFailure)288 TEST_F(XdsOverrideHostTest,
289 OverrideIsQueuedInIdleOrConnectingAndFailedInTransientFailure) {
290 const std::array<absl::string_view, 3> kAddresses = {
291 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
292 auto picker = ExpectStartupWithRoundRobin(kAddresses);
293 ASSERT_NE(picker, nullptr);
294 // Check that the host is overridden
295 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
296 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
297 // Subchannel for address 1 becomes disconnected.
298 gpr_log(GPR_INFO, "### subchannel 1 reporting IDLE");
299 auto subchannel = FindSubchannel(kAddresses[1]);
300 ASSERT_NE(subchannel, nullptr);
301 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
302 EXPECT_TRUE(subchannel->ConnectionRequested());
303 gpr_log(GPR_INFO, "### expecting re-resolution request");
304 ExpectReresolutionRequest();
305 gpr_log(GPR_INFO,
306 "### expecting RR picks to exclude the disconnected subchannel");
307 picker =
308 WaitForRoundRobinListChange(kAddresses, {kAddresses[0], kAddresses[2]});
309 // Picks with the override will be queued.
310 ExpectPickQueued(picker.get(), {address1_attribute});
311 // The subchannel starts trying to reconnect.
312 gpr_log(GPR_INFO, "### subchannel 1 reporting CONNECTING");
313 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
314 picker = ExpectState(GRPC_CHANNEL_READY);
315 ASSERT_NE(picker, nullptr);
316 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
317 // Picks with the override will still be queued.
318 ExpectPickQueued(picker.get(), {address1_attribute});
319 // The connection attempt fails.
320 gpr_log(GPR_INFO, "### subchannel 1 reporting TRANSIENT_FAILURE");
321 subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE,
322 absl::ResourceExhaustedError("Hmmmm"));
323 gpr_log(GPR_INFO, "### expecting re-resolution request");
324 ExpectReresolutionRequest();
325 picker = ExpectState(GRPC_CHANNEL_READY);
326 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
327 // The host override is not used.
328 gpr_log(GPR_INFO, "### checking that host override is not used");
329 ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
330 {kAddresses[0], kAddresses[2]});
331 }
332
TEST_F(XdsOverrideHostTest,DrainingState)333 TEST_F(XdsOverrideHostTest, DrainingState) {
334 const std::array<absl::string_view, 3> kAddresses = {
335 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
336 auto picker = ExpectStartupWithRoundRobin(kAddresses);
337 ASSERT_NE(picker, nullptr);
338 // Do one override pick for endpoint 1, so that it will still be within
339 // the idle threshold and will therefore be retained when it moves to
340 // state DRAINING.
341 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
342 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
343 // Now move endpoint 1 to state DRAINING.
344 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
345 {kAddresses[1], XdsHealthStatus::kDraining},
346 {kAddresses[2], XdsHealthStatus::kHealthy}},
347 {"UNKNOWN", "HEALTHY", "DRAINING"});
348 picker = ExpectState(GRPC_CHANNEL_READY);
349 // Make sure subchannels get orphaned in the WorkSerializer.
350 WaitForWorkSerializerToFlush();
351 // Picks without an override will round-robin over the two endpoints
352 // that are not in draining state.
353 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
354 // Picks with an override are able to select the draining endpoint.
355 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
356 // Send the LB policy an update that removes the draining endpoint.
357 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
358 {kAddresses[2], XdsHealthStatus::kHealthy}});
359 picker = ExpectState(GRPC_CHANNEL_READY);
360 ASSERT_NE(picker, nullptr);
361 // Gone!
362 ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
363 {kAddresses[0], kAddresses[2]});
364 }
365
TEST_F(XdsOverrideHostTest,DrainingSubchannelIsConnecting)366 TEST_F(XdsOverrideHostTest, DrainingSubchannelIsConnecting) {
367 const std::array<absl::string_view, 3> kAddresses = {
368 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
369 auto picker = ExpectStartupWithRoundRobin(kAddresses);
370 ASSERT_NE(picker, nullptr);
371 // Check that the host is overridden
372 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
373 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
374 // Send an update that marks the endpoints with different EDS health
375 // states, but those states are present in override_host_status.
376 // The picker should use the DRAINING host when a call's override
377 // points to that hose, but the host should not be used if there is no
378 // override pointing to it.
379 gpr_log(GPR_INFO, "### sending update with DRAINING host");
380 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
381 {kAddresses[1], XdsHealthStatus::kDraining},
382 {kAddresses[2], XdsHealthStatus::kHealthy}},
383 {"UNKNOWN", "HEALTHY", "DRAINING"});
384 auto subchannel = FindSubchannel(kAddresses[1]);
385 ASSERT_NE(subchannel, nullptr);
386 picker = ExpectState(GRPC_CHANNEL_READY);
387 // Make sure subchannels get orphaned in the WorkSerializer.
388 WaitForWorkSerializerToFlush();
389 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
390 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
391 // Now the connection to the draining host gets dropped.
392 // The picker should queue picks where the override host is IDLE.
393 // All picks without an override host should not use this host.
394 gpr_log(GPR_INFO, "### closing connection to DRAINING host");
395 subchannel->SetConnectivityState(GRPC_CHANNEL_IDLE);
396 picker = ExpectState(GRPC_CHANNEL_READY);
397 ExpectPickQueued(picker.get(), {address1_attribute});
398 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
399 // The subchannel should have been asked to reconnect as a result of the
400 // queued pick above. It will therefore transition into state CONNECTING.
401 // The pick behavior is the same as above: The picker should queue
402 // picks where the override host is CONNECTING. All picks without an
403 // override host should not use this host.
404 gpr_log(GPR_INFO, "### subchannel starts reconnecting");
405 WaitForWorkSerializerToFlush();
406 EXPECT_TRUE(subchannel->ConnectionRequested());
407 ExpectQueueEmpty();
408 subchannel->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
409 picker = ExpectState(GRPC_CHANNEL_READY);
410 ExpectPickQueued(picker.get(), {address1_attribute});
411 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
412 // The subchannel now becomes connected again.
413 // Now picks with this override host can be completed again.
414 // Picks without an override host still don't use the draining host.
415 gpr_log(GPR_INFO, "### subchannel becomes reconnected");
416 subchannel->SetConnectivityState(GRPC_CHANNEL_READY);
417 picker = ExpectState(GRPC_CHANNEL_READY);
418 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
419 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
420 }
421
TEST_F(XdsOverrideHostTest,DrainingToHealthy)422 TEST_F(XdsOverrideHostTest, DrainingToHealthy) {
423 const std::array<absl::string_view, 3> kAddresses = {
424 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
425 auto picker = ExpectStartupWithRoundRobin(kAddresses);
426 ASSERT_NE(picker, nullptr);
427 // Do one override pick for endpoint 1, so that it will still be within
428 // the idle threshold and will therefore be retained when it moves to
429 // state DRAINING.
430 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
431 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
432 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
433 {kAddresses[1], XdsHealthStatus::kDraining},
434 {kAddresses[2], XdsHealthStatus::kHealthy}},
435 {"UNKNOWN", "HEALTHY", "DRAINING"});
436 picker = ExpectState(GRPC_CHANNEL_READY);
437 // Make sure subchannels get orphaned in the WorkSerializer.
438 WaitForWorkSerializerToFlush();
439 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
440 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
441 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kHealthy},
442 {kAddresses[1], XdsHealthStatus::kHealthy},
443 {kAddresses[2], XdsHealthStatus::kHealthy}},
444 {"UNKNOWN", "HEALTHY", "DRAINING"});
445 picker = ExpectState(GRPC_CHANNEL_READY);
446 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
447 ExpectRoundRobinPicks(picker.get(), kAddresses);
448 }
449
TEST_F(XdsOverrideHostTest,OverrideHostStatus)450 TEST_F(XdsOverrideHostTest, OverrideHostStatus) {
451 const std::array<absl::string_view, 3> kAddresses = {
452 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
453 auto* address0_attribute = MakeOverrideHostAttribute(kAddresses[0]);
454 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
455 auto* address2_attribute = MakeOverrideHostAttribute(kAddresses[2]);
456 auto picker = ExpectStartupWithRoundRobin(kAddresses);
457 ASSERT_NE(picker, nullptr);
458 // Do one override pick for endpoint 2, so that it will still be within
459 // the idle threshold and will therefore be retained when it moves to
460 // state DRAINING.
461 ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
462 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
463 {kAddresses[1], XdsHealthStatus::kHealthy},
464 {kAddresses[2], XdsHealthStatus::kDraining}},
465 {"UNKNOWN", "HEALTHY", "DRAINING"});
466 picker = ExpectState(GRPC_CHANNEL_READY);
467 ASSERT_NE(picker, nullptr);
468 // Make sure subchannels get orphaned in the WorkSerializer.
469 WaitForWorkSerializerToFlush();
470 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
471 ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
472 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
473 ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
474 // UNKNOWN excluded: overrides for first endpoint are not honored.
475 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
476 {kAddresses[1], XdsHealthStatus::kHealthy},
477 {kAddresses[2], XdsHealthStatus::kDraining}},
478 {"HEALTHY", "DRAINING"});
479 picker = ExpectState(GRPC_CHANNEL_READY);
480 ASSERT_NE(picker, nullptr);
481 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
482 ExpectRoundRobinPicksWithAttribute(picker.get(), address0_attribute,
483 {kAddresses[0], kAddresses[1]});
484 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
485 ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
486 // HEALTHY excluded: overrides for second endpoint are not honored.
487 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
488 {kAddresses[1], XdsHealthStatus::kHealthy},
489 {kAddresses[2], XdsHealthStatus::kDraining}},
490 {"UNKNOWN", "DRAINING"});
491 picker = ExpectState(GRPC_CHANNEL_READY);
492 ASSERT_NE(picker, nullptr);
493 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
494 ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
495 ExpectRoundRobinPicksWithAttribute(picker.get(), address1_attribute,
496 {kAddresses[0], kAddresses[1]});
497 ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
498 // DRAINING excluded: overrides for third endpoint are not honored.
499 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
500 {kAddresses[1], XdsHealthStatus::kHealthy},
501 {kAddresses[2], XdsHealthStatus::kDraining}},
502 {"UNKNOWN", "HEALTHY"});
503 picker = ExpectState(GRPC_CHANNEL_READY);
504 ASSERT_NE(picker, nullptr);
505 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[1]});
506 ExpectOverridePicks(picker.get(), address0_attribute, kAddresses[0]);
507 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
508 ExpectRoundRobinPicksWithAttribute(picker.get(), address2_attribute,
509 {kAddresses[0], kAddresses[1]});
510 }
511
TEST_F(XdsOverrideHostTest,MultipleAddressesPerEndpoint)512 TEST_F(XdsOverrideHostTest, MultipleAddressesPerEndpoint) {
513 constexpr std::array<absl::string_view, 2> kEndpoint1Addresses = {
514 "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"};
515 constexpr std::array<absl::string_view, 2> kEndpoint2Addresses = {
516 "ipv4:127.0.0.1:445", "ipv4:127.0.0.1:446"};
517 constexpr std::array<absl::string_view, 2> kEndpoint3Addresses = {
518 "ipv4:127.0.0.1:447", "ipv4:127.0.0.1:448"};
519 const std::array<EndpointAddresses, 3> kEndpoints = {
520 MakeEndpointAddresses(kEndpoint1Addresses),
521 MakeEndpointAddresses(kEndpoint2Addresses),
522 MakeEndpointAddresses(kEndpoint3Addresses)};
523 EXPECT_EQ(UpdateXdsOverrideHostPolicy(kEndpoints), absl::OkStatus());
524 auto picker = ExpectRoundRobinStartup(kEndpoints);
525 ASSERT_NE(picker, nullptr);
526 // Check that the host is overridden.
527 auto* endpoint1_attribute = MakeOverrideHostAttribute(kEndpoint1Addresses);
528 ExpectOverridePicks(picker.get(), endpoint1_attribute, kEndpoint1Addresses[0],
529 kEndpoint1Addresses);
530 auto* endpoint2_attribute = MakeOverrideHostAttribute(kEndpoint2Addresses);
531 ExpectOverridePicks(picker.get(), endpoint2_attribute, kEndpoint2Addresses[0],
532 kEndpoint2Addresses);
533 // Change endpoint 1 to connect to its second address.
534 ExpectEndpointAddressChange(kEndpoint1Addresses, 0, 1, [&]() {
535 WaitForRoundRobinListChange(
536 {kEndpoint1Addresses[0], kEndpoint2Addresses[0],
537 kEndpoint3Addresses[0]},
538 {kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
539 });
540 WaitForRoundRobinListChange(
541 {kEndpoint2Addresses[0], kEndpoint3Addresses[0]},
542 {kEndpoint1Addresses[1], kEndpoint2Addresses[0], kEndpoint3Addresses[0]});
543 // Now the cookie for endpoint 1 should cause us to use the second address.
544 ExpectOverridePicks(picker.get(), endpoint1_attribute, kEndpoint1Addresses[1],
545 {kEndpoint1Addresses[1], kEndpoint1Addresses[0]});
546 }
547
TEST_F(XdsOverrideHostTest,ChildPolicyNeverCreatedSubchannel)548 TEST_F(XdsOverrideHostTest, ChildPolicyNeverCreatedSubchannel) {
549 const std::array<absl::string_view, 3> kAddresses = {
550 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
551 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
552 {kAddresses[1], XdsHealthStatus::kDraining},
553 {kAddresses[2], XdsHealthStatus::kHealthy}},
554 {"UNKNOWN", "HEALTHY", "DRAINING"});
555 // The draining endpoint is not passed down to the child policy.
556 // Picks without an override will round-robin over the two endpoints
557 // that are not in draining state.
558 auto picker = ExpectRoundRobinStartup({kAddresses[0], kAddresses[2]});
559 // Subchannels should exist for the non-draining endpoints only.
560 auto* subchannel = FindSubchannel(kAddresses[0]);
561 ASSERT_NE(subchannel, nullptr);
562 EXPECT_GE(subchannel->NumWatchers(), 1);
563 auto* subchannel2 = FindSubchannel(kAddresses[1]);
564 EXPECT_EQ(subchannel2, nullptr);
565 auto* subchannel3 = FindSubchannel(kAddresses[2]);
566 ASSERT_NE(subchannel3, nullptr);
567 EXPECT_GE(subchannel3->NumWatchers(), 1);
568 // A pick with an override pointing to the draining endpoint should
569 // queue the pick and trigger subchannel creation.
570 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
571 ExpectPickQueued(picker.get(), {address1_attribute});
572 WaitForWorkSerializerToFlush();
573 subchannel2 = FindSubchannel(kAddresses[1]);
574 ASSERT_NE(subchannel2, nullptr);
575 EXPECT_EQ(subchannel2->NumWatchers(), 1);
576 // Subchannel creation will trigger returning a new picker.
577 // Picks without an override should continue to use only the
578 // non-draining endpoints.
579 picker = ExpectState(GRPC_CHANNEL_READY);
580 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
581 // Trying the pick again with the new picker will trigger a connection
582 // attempt on the new subchannel.
583 ExpectPickQueued(picker.get(), {address1_attribute});
584 WaitForWorkSerializerToFlush();
585 EXPECT_TRUE(subchannel2->ConnectionRequested());
586 subchannel2->SetConnectivityState(GRPC_CHANNEL_CONNECTING);
587 // Subchannel state change will trigger returning a new picker.
588 // Picks without an override should continue to use only the
589 // non-draining endpoints.
590 picker = ExpectState(GRPC_CHANNEL_READY);
591 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
592 // Trying the pick with override again should queue, because the
593 // connection attempt is still pending.
594 ExpectPickQueued(picker.get(), {address1_attribute});
595 // Connection attempt succeeds.
596 subchannel2->SetConnectivityState(GRPC_CHANNEL_READY);
597 // Subchannel state change will trigger returning a new picker.
598 // Picks without an override should continue to use only the
599 // non-draining endpoints.
600 picker = ExpectState(GRPC_CHANNEL_READY);
601 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
602 // Now the pick with override should complete.
603 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
604 }
605
TEST_F(XdsOverrideHostTest,ChildPolicyUnrefsSubchannelNotUsedWithinIdleThreshold)606 TEST_F(XdsOverrideHostTest,
607 ChildPolicyUnrefsSubchannelNotUsedWithinIdleThreshold) {
608 const std::array<absl::string_view, 3> kAddresses = {
609 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
610 auto picker = ExpectStartupWithRoundRobin(kAddresses);
611 ASSERT_NE(picker, nullptr);
612 // Now move endpoint 1 to state DRAINING.
613 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
614 {kAddresses[1], XdsHealthStatus::kDraining},
615 {kAddresses[2], XdsHealthStatus::kHealthy}},
616 {"UNKNOWN", "HEALTHY", "DRAINING"});
617 picker = ExpectState(GRPC_CHANNEL_READY);
618 // Make sure subchannels get orphaned in the WorkSerializer.
619 WaitForWorkSerializerToFlush();
620 // Picks without an override will round-robin over the two endpoints
621 // that are not in draining state.
622 ExpectRoundRobinPicks(picker.get(), {kAddresses[0], kAddresses[2]});
623 // Child policy should drop its ref to the draining endpoint, and
624 // xds_override_host should not take ownership, since the entry never
625 // had an override pick.
626 auto* subchannel0 = FindSubchannel(kAddresses[0]);
627 ASSERT_NE(subchannel0, nullptr);
628 EXPECT_GE(subchannel0->NumWatchers(), 1);
629 auto* subchannel1 = FindSubchannel(kAddresses[1]);
630 ASSERT_NE(subchannel1, nullptr);
631 EXPECT_EQ(subchannel1->NumWatchers(), 0);
632 auto* subchannel2 = FindSubchannel(kAddresses[2]);
633 ASSERT_NE(subchannel2, nullptr);
634 EXPECT_GE(subchannel2->NumWatchers(), 1);
635 }
636
TEST_F(XdsOverrideHostTest,IdleTimer)637 TEST_F(XdsOverrideHostTest, IdleTimer) {
638 std::vector<grpc_event_engine::experimental::EventEngine::Duration>
639 timer_durations;
640 fuzzing_ee_->SetRunAfterDurationCallback(
641 [&timer_durations](
642 grpc_event_engine::experimental::EventEngine::Duration duration) {
643 timer_durations.push_back(duration);
644 });
645 const std::array<absl::string_view, 3> kAddresses = {
646 "ipv4:127.0.0.1:441", "ipv4:127.0.0.1:442", "ipv4:127.0.0.1:443"};
647 gpr_log(GPR_INFO, "### sending initial update");
648 EXPECT_EQ(UpdateXdsOverrideHostPolicy(kAddresses, {"UNKNOWN", "HEALTHY"},
649 Duration::Minutes(1)),
650 absl::OkStatus());
651 // Initial update should have caused the timer to be set for the idle
652 // timeout.
653 EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Minutes(1)));
654 timer_durations.clear();
655 auto picker = ExpectRoundRobinStartup(kAddresses);
656 ASSERT_NE(picker, nullptr);
657 // Do an override pick for endpoints 1 and 2, so that they will still be
658 // within the idle threshold and will therefore be retained when they move
659 // to state DRAINING.
660 auto* address1_attribute = MakeOverrideHostAttribute(kAddresses[1]);
661 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
662 auto* address2_attribute = MakeOverrideHostAttribute(kAddresses[2]);
663 ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
664 // Increment time by 5 seconds and send an update that moves endpoints 1
665 // and 2 to state DRAINING.
666 gpr_log(GPR_INFO, "### moving endpoints 1 and 2 to state DRAINING");
667 IncrementTimeBy(Duration::Seconds(5));
668 ApplyUpdateWithHealthStatuses({{kAddresses[0], XdsHealthStatus::kUnknown},
669 {kAddresses[1], XdsHealthStatus::kDraining},
670 {kAddresses[2], XdsHealthStatus::kDraining}},
671 {"UNKNOWN", "HEALTHY", "DRAINING"},
672 Duration::Minutes(1));
673 // The update should cause the timer to be reset for the next
674 // expiration time.
675 EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(55)));
676 timer_durations.clear();
677 picker = ExpectState(GRPC_CHANNEL_READY);
678 // Make sure subchannels get orphaned in the WorkSerializer.
679 WaitForWorkSerializerToFlush();
680 // Picks without an override will use only the endpoint that is not in
681 // draining state.
682 ExpectRoundRobinPicks(picker.get(), {kAddresses[0]});
683 // Picks with an override are able to select the draining endpoints.
684 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
685 ExpectOverridePicks(picker.get(), address2_attribute, kAddresses[2]);
686 // Both subchannels are owned by the xds_override_host policy.
687 auto* subchannel1 = FindSubchannel(kAddresses[1]);
688 ASSERT_NE(subchannel1, nullptr);
689 EXPECT_EQ(subchannel1->NumWatchers(), 1);
690 auto* subchannel2 = FindSubchannel(kAddresses[2]);
691 ASSERT_NE(subchannel2, nullptr);
692 EXPECT_EQ(subchannel2->NumWatchers(), 1);
693 // Trigger the timer. Both subchannels have gotten an override pick more
694 // recently than the timer was scheduled, so neither one will be unreffed.
695 IncrementTimeBy(Duration::Seconds(55));
696 EXPECT_EQ(subchannel1->NumWatchers(), 1);
697 EXPECT_EQ(subchannel2->NumWatchers(), 1);
698 // The timer will be reset for 5 seconds.
699 EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(5)));
700 timer_durations.clear();
701 // Send another override pick for endpoint 1.
702 ExpectOverridePicks(picker.get(), address1_attribute, kAddresses[1]);
703 // Trigger the timer again. This time, it should unref endpoint 2 but
704 // keep endpoint 1.
705 IncrementTimeBy(Duration::Seconds(5));
706 EXPECT_EQ(subchannel1->NumWatchers(), 1);
707 EXPECT_EQ(subchannel2->NumWatchers(), 0);
708 // The timer should now be set for 55 seconds, which is how long it
709 // will be until endpoint 1 should be unreffed.
710 EXPECT_THAT(timer_durations, ::testing::ElementsAre(Duration::Seconds(55)));
711 }
712
713 } // namespace
714 } // namespace testing
715 } // namespace grpc_core
716
main(int argc,char ** argv)717 int main(int argc, char** argv) {
718 ::testing::InitGoogleTest(&argc, argv);
719 grpc::testing::TestEnvironment env(&argc, argv);
720 return RUN_ALL_TESTS();
721 }
722